Communicating Sequential Processes with core.async
note from Timothy Baldridge's video series
Chapter 2: Channels
create channel
(let [c (chan)]
(future (>!! c 42))
(future (>!! c 41))
(future (println (<!! c)))
(future (println (<!! c))))
async operations
provide callbacks to put!
(optional) or take!
(required)
(let [c (chan)]
(put! c 42 (fn [v] (println "Sent: " v)))
(take! c (fn [v] (println "Got: " v))))
buffers
(let [c (chan (sliding-buffer 2))]
@(future
(dotimes [x 3]
(>!! c x)
(println "Sent: " x))
(println "done"))
(future
(dotimes [x 3]
(println "Got: " (<!! c)))
(println "done getting")))
@
here deref the future
to make sure it's finished before executing next future
s-exp
close channel
(let [c (chan 2)]
@(future
(dotimes [x 2]
(>!! c x))
(close! c)
(println "Closed."))
(future
(loop []
(when-some [v (<!! c)]
(println "Got: " v)
(recur)))
(println "Exiting")))
Chapter 3: Threads
thread
(<!! (thread
(let [t1 (thread "Thread 1")
t2 (thread "Thread 2")]
[(<!! t1)
(<!! t2)])))
(let [c (chan)]
(thread
(dotimes [x 3]
(>!! c x)
(println "Put: " x)))
(thread
(dotimes [x 3]
(println "Take: " (<!! c)))))
when dealing with core.async
, it's prefer to use thread
instead of future
since thread
returns channel, while future
returns a promise
lightweight threads
(let [c (chan)]
(go (doseq [x (range 3)]
(>! c x)))
(go (dotimes [x 3]
(println "Take: " (<! c)))))
(clojure.pprint/pprint (macroexpand '(go (>! (chan) 42))))
inside go
block, we use put parking (>!
) and take parking (<!
)
consider put and take parking marker for go
macro to rewrite its content to a callback and pass to put!
and take!
and go
macro stop translate when it sees fn
, so anonymous function inside a go
block with put/take parking won't work.
Chapter 4: Practical Application Of core.async Basics
Interfacing With A HTTP Client
(defn http-get [url]
(let [c (chan)]
(println url)
(http/get url (partial put! c))
c))
(defn request-and-process [nm]
(go
(-> (str "http://imdbapi.poromenos.org/js/?name=%25" nm "%25")
http-get
<!
:body
(cheshire/parse-string true))))
(<!! (request-and-process "Matrix"))
Interfacing With A DB Client
(defn new-connection [ip keyspace]
(let [cluster (Cluster/builder)]
(.addContactPoint cluster ip)
(.connect (.build cluster) keyspace)))
(defn execute-async [session stmt ch]
(let [rsf (.executeAsync session stmt)]
(.addListener
rsf
(fn []
(let [resultset (.getUninterruptibly rsf)]
(doseq [row (iterator-seq (.iterator resultset))]
(let [converted (reduce
(fn [acc idx]
(conj acc (.getString ^Row row (int idx))))
[]
(range (.size (.getColumnDefinitions row))))]
(put! ch converted)))
(close! ch)))
clojure.core.async.impl.exec.threadpool/the-executor)
ch))
(let [session (new-connection "localhost" "testdb")]
(<!! (execute-async session "CREATE TABLE user_test
(first varchar, last varchar,
PRIMARY KEY (first, last));"
(chan))))
(let [session (new-connection "localhost" "testdb")]
(<!! (execute-async session (.. (QueryBuilder/insertInto "user_test")
(value "first" "Jane")
(value "last" "Smith"))
(chan))))
(let [session (new-connection "localhost" "testdb")
results (execute-async session (.. (QueryBuilder/select)
(from "user_test"))
(chan))]
[(<!! results)
(<!! results)
(<!! results)
(<!! results)])
Interfacing With Blocking I/O
(def logging-chan (chan 24))
(future
(loop []
(when-some [v (<!! logging-chan)]
(println v)
(recur))))
(defn log [& args]
(>!! logging-chan (apply str args)))
(do (future
(dotimes [x 100]
(log "(..." x "...)")))
(future
(dotimes [x 100]
(log "(..." x "...)"))))
Chapter 5: Backpressure
Introduction to backpressure
(defn new-connection [ip keyspace]
(let [cluster (Cluster/builder)]
(.addContactPoint cluster ip)
(.connect (.build cluster) keyspace)))
(defn execute-async [session stmt ch]
(let [rsf (.executeAsync session stmt)]
(.addListener
rsf
(fn []
(let [resultset (.getUninterruptibly rsf)]
(doseq [row (iterator-seq (.iterator resultset))]
(let [converted (reduce
(fn [acc idx]
(conj acc (.getString ^Row row (int idx))))
[]
(range (.size (.getColumnDefinitions row))))]
(put! ch converted)))
(close! ch)))
clojure.core.async.impl.exec.threadpool/the-executor)
ch))
Tuning Backpressure
(defn map-pipe
([in out f]
(map-pipe 0 in out f))
([p in out f]
(dotimes [_ p]
(go (loop []
(when-some [v (<! in)]
(>! out (f v))
(recur)))
(close! out)))))
(let [in (chan 1024)
a (chan 1024)
b (chan 1024)
c (chan 1024)
out (chan 1024)]
(map-pipe in a step-a)
(map-pipe a b step-b)
(map-pipe 2 b c step-c)
(map-pipe c out step-d))
Chapter 6: Choosing Channels With alts!
Introduction to alts! and alt!
(let [c1 (chan 1)
c2 (chan 1)]
(>!! c1 44)
(thread
(println (alt!! [c1] ([v] [:got v])
[[c2 42]] ([v] [:sent v])))
(comment (let [[v c] (alts!! [c1 [c2 42]])]
(println "Value: " v)
(println "Chan 1?" (= c1 c))
(println "Chan 2?" (= c2 c))))))
here, alts!!
can take or put value
if it's a channel (c1
), it takes value from channel c1
if it's a vector ([c2 42]
), it puts value to channel c2
and alt!!
is the simple form to perform the pattern in comment
form.
alt
and alts
always perform one and only one operations (applies to both take and put)
alt! Defaults
(let [c1 (chan 1)
c2 (chan 1)]
(thread
(println "Got: " (alt!! [c1] :c1
[c2] :c2
:default :the-default))
(comment (println "Got: " (alts!! [c1 c2]
:default :the-default)))))
add the :default
when there's no values from channels and you don't want to block the thread
Channel Priority With alts
(let [c1 (chan 1)
c2 (chan 1)]
(>!! c2 43)
(thread
(let [[v c] (alts!! [c1 c2]
:priority true)]
(println "Value: " v)
(println "Chan 1?" (= c1 c))
(println "Chan 2?" (= c2 c)))))
:priority true
removes the random behaviour from the clause
you always get the value from c1
first, when c1
is available
:priority true
tries the options in order, but not waiting them to complete
and it's also apply to alt!!
Chapter 7: Combining and Splitting Channel Streams
Overview Of The Merge Function
(let [c1 (chan)
c2 (chan)
cm (merge [c1 c2] 10)]
(>!! c1 1)
(>!! c1 2)
(>!! c2 3)
(>!! c2 4)
(close! c1)
(close! c2)
(loop []
(when-some [v (<!! cm)]
(println v)
(recur)))
(println "Done"))
Overview Of mult And tap
(let [c (chan 10)
m (mult c)
t1 (chan 10)
t2 (chan 10)]
(tap m t1)
(tap m t2)
(>!! c 42)
(>!! c 43)
(thread
(println "Got from T1: " (<!! t1))
(println "Got from T1: " (<!! t1)))
(thread
(println "Got from T2: " (<!! t2))
(println "Got from T2: " (<!! t2))))
fan out from one channel (c
) to multiple channels (t1
, t2
)
Overview Of pub/sub
similar to mult
and tap
, but with filter behaviour
can be used to categorize the data in a channel
(let [c (chan)
p (pub c pos?)
s1 (chan 10)
s2 (chan 10)]
(sub p true s1)
(sub p false s2)
(>!! c 42)
(>!! c -42)
(>!! c -2)
(>!! c 2)
(close! c)
(thread
(loop []
(when-some [v (<!! s1)]
(println "S1: " v)
(recur)))
(println "S1: done"))
(thread
(loop []
(when-some [v (<!! s2)]
(println "S2: " v)
(recur)))
(println "S2: done")))
Draining Channels With reduce And into
(let [c (chan)]
(async/onto-chan c (range 10))
(<!! (async/reduce conj [] c)))
(let [c (chan)]
(async/onto-chan c (range 10))
(<!! (async/into #{} c)))
Chapter 8: A Short Introdction To Transducers
The Problem With Reducer Functions
first, try implement map
and filter
functions:
(defn -map [f col]
(reduce
(fn [acc v]
(conj acc (f v)))
[]
col))
(defn -filter [f col]
(reduce
(fn [acc v]
(if (f v)
(conj acc v)
acc))
[]
col))
here, the reduce
function has nothing to do with the logic of map or filter
move reduce
out:
(defn -mapping [f]
(fn [acc v]
(conj acc (f v))))
(reduce (-mapping inc) [] [1 2 3 4])
now the other problem is conj
, which is not flexible
next make conj
part as parameter
(defn -map [f]
(fn [rf]
(fn [acc v]
(rf acc (f v)))))
(defn -filter [f]
(fn [rf]
(fn [acc v]
(if (f v)
(rf acc v)
acc))))
(def inc-xf (comp (-map inc)
(-filter even?)))
(reduce (inc-xf conj) [] [1 2 3 4])
(-filter even? [1 2 3 4])
The Three Parts Of Transducer Functions
(defn -map [f]
(fn [rf]
(fn
([] (rf))
([acc] (rf acc))
([acc v]
(rf acc (f v))))))
(defn -filter [f]
(fn [rf]
(fn
([] (rf))
([acc] (rf acc))
([acc v]
(if (f v)
(rf acc v)
acc)))))
(def inc-xf (comp (-map inc)
(-filter even?)))
(defn -conj!
([] (transient []))
([acc] (persistent! acc))
([acc v] (conj! acc v)))
(let [rf (inc-xf -conj!)]
(rf (reduce rf (rf) [1 2 3 4])))
Adding Transducer Logic To Channels
(let [c (chan 3 (comp (map inc)
(filter even?)))]
(async/onto-chan c (range 10))
(<!! (async/into [] c)))
(let [c (chan 3 (mapcat identity))]
(async/onto-chan c [[1 2 3]
[4 5 6]
[7 8 9]])
(<!! (async/into [] c)))
chan
accepts second argument, which is the transducer
it applies the function to every element in the channel