Communicating Sequential Processes with core.async

note from Timothy Baldridge's video series

Chapter 2: Channels

create channel

(let [c (chan)]
  (future (>!! c 42))
  (future (>!! c 41))

  (future (println (<!! c)))
  (future (println (<!! c))))

async operations

provide callbacks to put! (optional) or take! (required)

(let [c (chan)]
  (put! c 42 (fn [v] (println "Sent: " v)))
  (take! c (fn [v] (println "Got: " v))))

buffers

(let [c (chan (sliding-buffer 2))]
  @(future
    (dotimes [x 3]
      (>!! c x)
      (println "Sent: " x))
    (println "done"))
  (future
    (dotimes [x 3]
      (println "Got: " (<!! c)))
    (println "done getting")))

@ here deref the future to make sure it's finished before executing next future s-exp

close channel

(let [c (chan 2)]
  @(future
    (dotimes [x 2]
      (>!! c x))
    (close! c)
    (println "Closed."))
  (future
    (loop []
      (when-some [v (<!! c)]
        (println "Got: " v)
        (recur)))
    (println "Exiting")))

Chapter 3: Threads

thread

(<!! (thread
       (let [t1 (thread "Thread 1")
             t2 (thread "Thread 2")]
         [(<!! t1)
          (<!! t2)])))


(let [c (chan)]
  (thread
    (dotimes [x 3]
      (>!! c x)
      (println "Put: " x)))
  (thread
    (dotimes [x 3]
      (println "Take: " (<!! c)))))

when dealing with core.async, it's prefer to use thread instead of future since thread returns channel, while future returns a promise

lightweight threads

(let [c (chan)]
  (go (doseq [x (range 3)]
        (>! c x)))
  (go (dotimes [x 3]
        (println "Take: " (<! c)))))
        
(clojure.pprint/pprint (macroexpand '(go (>! (chan) 42))))

inside go block, we use put parking (>!) and take parking (<!)

consider put and take parking marker for go macro to rewrite its content to a callback and pass to put! and take!

and go macro stop translate when it sees fn, so anonymous function inside a go block with put/take parking won't work.

Chapter 4: Practical Application Of core.async Basics

Interfacing With A HTTP Client

(defn http-get [url]
  (let [c (chan)]
    (println url)
    (http/get url (partial put! c))
    c))

(defn request-and-process [nm]
  (go
    (-> (str "http://imdbapi.poromenos.org/js/?name=%25" nm "%25")
        http-get
        <!
        :body
        (cheshire/parse-string true))))

(<!! (request-and-process "Matrix"))

Interfacing With A DB Client

(defn new-connection [ip keyspace]
  (let [cluster (Cluster/builder)]
    (.addContactPoint cluster ip)
    (.connect (.build cluster) keyspace)))

(defn execute-async [session stmt ch]
  (let [rsf (.executeAsync session stmt)]
    (.addListener
      rsf
      (fn []
        (let [resultset (.getUninterruptibly rsf)]
          (doseq [row (iterator-seq (.iterator resultset))]
            (let [converted (reduce
                              (fn [acc idx]
                                (conj acc (.getString ^Row row (int idx))))
                              []
                              (range (.size (.getColumnDefinitions row))))]
              (put! ch converted)))
          (close! ch)))
      clojure.core.async.impl.exec.threadpool/the-executor)
    ch))


(let [session (new-connection "localhost" "testdb")]
  (<!! (execute-async session "CREATE TABLE user_test
                               (first varchar, last varchar,
                               PRIMARY KEY (first, last));"
                      (chan))))

(let [session (new-connection "localhost" "testdb")]
  (<!! (execute-async session (.. (QueryBuilder/insertInto "user_test")
                                  (value "first" "Jane")
                                  (value "last" "Smith"))
                      (chan))))

(let [session (new-connection "localhost" "testdb")
      results (execute-async session (.. (QueryBuilder/select)
                                         (from "user_test"))
                             (chan))]
  [(<!! results)
   (<!! results)
   (<!! results)
   (<!! results)])

Interfacing With Blocking I/O

(def logging-chan (chan 24))

(future
  (loop []
    (when-some [v (<!! logging-chan)]
      (println v)
      (recur))))

(defn log [& args]
  (>!! logging-chan (apply str args)))

(do (future
      (dotimes [x 100]
        (log "(..." x "...)")))
    (future
      (dotimes [x 100]
        (log "(..." x "...)"))))

Chapter 5: Backpressure

Introduction to backpressure

(defn new-connection [ip keyspace]
  (let [cluster (Cluster/builder)]
    (.addContactPoint cluster ip)
    (.connect (.build cluster) keyspace)))

(defn execute-async [session stmt ch]
  (let [rsf (.executeAsync session stmt)]
    (.addListener
      rsf
      (fn []
        (let [resultset (.getUninterruptibly rsf)]

          (doseq [row (iterator-seq (.iterator resultset))]
            (let [converted (reduce
                              (fn [acc idx]
                                (conj acc (.getString ^Row row (int idx))))
                              []
                              (range (.size (.getColumnDefinitions row))))]
              (put! ch converted)))
          (close! ch)))
      clojure.core.async.impl.exec.threadpool/the-executor)
    ch))

Tuning Backpressure

(defn map-pipe
  ([in out f]
    (map-pipe 0 in out f))
  ([p in out f]
   (dotimes [_ p]
     (go (loop []
           (when-some [v (<! in)]
             (>! out (f v))
             (recur)))
         (close! out)))))

(let [in (chan 1024)
      a (chan 1024)
      b (chan 1024)
      c (chan 1024)
      out (chan 1024)]
  (map-pipe in a step-a)
  (map-pipe a b step-b)
  (map-pipe 2 b c step-c)
  (map-pipe c out step-d))

Chapter 6: Choosing Channels With alts!

Introduction to alts! and alt!

(let [c1 (chan 1)
      c2 (chan 1)]
  (>!! c1 44)
  (thread
    (println (alt!! [c1] ([v] [:got v])
                    [[c2 42]] ([v] [:sent v])))


    (comment (let [[v c] (alts!! [c1 [c2 42]])]
               (println "Value: " v)
               (println "Chan 1?" (= c1 c))
               (println "Chan 2?" (= c2 c))))))

here, alts!! can take or put value

if it's a channel (c1), it takes value from channel c1 if it's a vector ([c2 42]), it puts value to channel c2

and alt!! is the simple form to perform the pattern in comment form.

alt and alts always perform one and only one operations (applies to both take and put)

alt! Defaults

(let [c1 (chan 1)
      c2 (chan 1)]
  (thread
    (println "Got: " (alt!! [c1] :c1
                            [c2] :c2
                            :default :the-default))

    (comment (println "Got: " (alts!! [c1 c2]
                                      :default :the-default)))))

add the :default when there's no values from channels and you don't want to block the thread

Channel Priority With alts

(let [c1 (chan 1)
      c2 (chan 1)]
  (>!! c2 43)
  (thread
    (let [[v c] (alts!! [c1 c2]
                        :priority true)]
      (println "Value: " v)
      (println "Chan 1?" (= c1 c))
      (println "Chan 2?" (= c2 c)))))

:priority true removes the random behaviour from the clause

you always get the value from c1 first, when c1 is available

:priority true tries the options in order, but not waiting them to complete

and it's also apply to alt!!

Chapter 7: Combining and Splitting Channel Streams

Overview Of The Merge Function

(let [c1 (chan)
      c2 (chan)
      cm (merge [c1 c2] 10)]
  (>!! c1 1)
  (>!! c1 2)
  (>!! c2 3)
  (>!! c2 4)
  (close! c1)
  (close! c2)

  (loop []
    (when-some [v (<!! cm)]
      (println v)
      (recur)))

(println "Done"))

Overview Of mult And tap

(let [c (chan 10)
      m (mult c)
      t1 (chan 10)
      t2 (chan 10)]
  (tap m t1)
  (tap m t2)

  (>!! c 42)
  (>!! c 43)

  (thread
    (println "Got from T1: " (<!! t1))
    (println "Got from T1: " (<!! t1)))

  (thread
    (println "Got from T2: " (<!! t2))
    (println "Got from T2: " (<!! t2))))

fan out from one channel (c) to multiple channels (t1, t2)

Overview Of pub/sub

similar to mult and tap, but with filter behaviour

can be used to categorize the data in a channel

(let [c (chan)
      p (pub c pos?)
      s1 (chan 10)
      s2 (chan 10)]
  (sub p true s1)
  (sub p false s2)

  (>!! c 42)
  (>!! c -42)
  (>!! c -2)
  (>!! c 2)
  (close! c)

  (thread
    (loop []
      (when-some [v (<!! s1)]
        (println "S1: " v)
        (recur)))
    (println "S1: done"))

  (thread
    (loop []
      (when-some [v (<!! s2)]
        (println "S2: " v)
        (recur)))
    (println "S2: done")))

Draining Channels With reduce And into

(let [c (chan)]
  (async/onto-chan c (range 10))
  (<!! (async/reduce conj [] c)))
  
(let [c (chan)]
  (async/onto-chan c (range 10))
  (<!! (async/into #{} c)))

Chapter 8: A Short Introdction To Transducers

The Problem With Reducer Functions

first, try implement map and filter functions:


(defn -map [f col]
  (reduce
    (fn [acc v]
      (conj acc (f v)))
    []
    col))

(defn -filter [f col]
  (reduce
    (fn [acc v]
      (if (f v)
        (conj acc v)
        acc))
    []
    col))

here, the reduce function has nothing to do with the logic of map or filter

move reduce out:

(defn -mapping [f]
  (fn [acc v]
    (conj acc (f v))))

(reduce (-mapping inc) [] [1 2 3 4])

now the other problem is conj, which is not flexible

next make conj part as parameter

(defn -map [f]
  (fn [rf]
    (fn [acc v]
      (rf acc (f v)))))

(defn -filter [f]
  (fn [rf]
    (fn [acc v]
      (if (f v)
        (rf acc v)
        acc))))

(def inc-xf (comp (-map inc)
                  (-filter even?)))

(reduce (inc-xf conj) [] [1 2 3 4])


(-filter even? [1 2 3 4])

The Three Parts Of Transducer Functions

(defn -map [f]
  (fn [rf]
    (fn
      ([] (rf))
      ([acc] (rf acc))
      ([acc v]
       (rf acc (f v))))))

(defn -filter [f]
  (fn [rf]
    (fn
      ([] (rf))
      ([acc] (rf acc))
      ([acc v]
       (if (f v)
         (rf acc v)
         acc)))))

(def inc-xf (comp (-map inc)
                  (-filter even?)))

(defn -conj!
  ([] (transient []))
  ([acc] (persistent! acc))
  ([acc v] (conj! acc v)))

(let [rf (inc-xf -conj!)]
  (rf (reduce rf (rf) [1 2 3 4])))

Adding Transducer Logic To Channels

(let [c (chan 3 (comp (map inc)
                      (filter even?)))]
  (async/onto-chan c (range 10))
  (<!! (async/into [] c)))

(let [c (chan 3 (mapcat identity))]
  (async/onto-chan c [[1 2 3]
                      [4 5 6]
                      [7 8 9]])
  (<!! (async/into [] c)))

chan accepts second argument, which is the transducer

it applies the function to every element in the channel

Search Blog: