Clojure transducers from the ground up: the practice.

This is the second part of an article dedicated to Clojure transducers. In the first part we discussed transducers fundamentals and the functional abstraction they represent. In this article, we are going to explore how they are used, including:

Composability

Transducers isolate transforming reducing functions (like map or filter in their transducer version) from sequential iteration. One interesting consequence of this design is that transducers can now compose like any other function. Have a look at the following example:

(def inc-and-filter (comp (map inc) (filter odd?)))
(def special+ (inc-and-filter +))
(special+ 1 1)
;; 1
(special+ 1 2)
;; 4

(map inc) and (filter odd?) both generate a function with the same interface: taking one item and returning a transformation of that item. We can compose them with comp to form a new function inc-and-filter which is the composition of the two.

Finally, we can provide an argument + to inc-and-filter which again returns a new function. We call the new function special+ because it enhances the normal + with two transformations. We can see the effect of incorporating inc and odd? into special+ by calling it with two arguments; the second argument is incremented and then conditionally added to the first depending on it being odd or even. We can use reduce with special+ as per usual and compare it with the version using simple +:

(reduce special+ 0 (range 10))
;; 25

(reduce + 0 (filter odd? (map inc (range 10))))
;; 25

The two reduce are just apparently the same, but the mechanic is completely different. In the first case, our special+ applies transformations as it iterates through the input sequence. The second case produces intermediate sequences for each transformation, plus one for the final reduction. More importantly, we can now isolate transformations from reduction, so transduce makes that explicit in the arguments:

(transduce (comp (map inc) (filter odd?)) + (range 10))
;; 25

How many transducers (our transforming reducing functions) can be composed this way? The answer is … as many as you like:

(def x-form
  (comp
    (map inc)
    (filter even?)
    (dedupe)
    (mapcat range)
    (partition-all 3)
    (partition-by #(< (apply + %) 7))
    (mapcat flatten)
    (random-sample 1.0)
    (take-nth 1)
    (keep #(when (odd? %) (* % %)))
    (keep-indexed #(when (even? %1) (* %1 %2)))
    (replace {2 "two" 6 "six" 18 "eighteen"})
    (take 11)
    (take-while #(not= 300 %))
    (drop 1)
    (drop-while string?)
    (remove string?)))

(transduce x-form + (vec (interleave (range 18) (range 20))))
;; 246

The example above is a famous "gist" created by Rich Hickey (the inventor of Clojure) to show off the available transducers in the standard library.

But there’s more: composition can be applied on top of more composition, allowing programmers to isolate and name concepts consistently. We could group the previous, long chain of transducers into more manageable chunks:

(def x-clean
  (comp
    (map inc)
    (filter even?)
    (dedupe)
    (mapcat range)))

(def x-filter
  (comp
    (partition-all 3)
    (partition-by #(< (apply + %) 7))
    (mapcat flatten)
    (random-sample 1.0)))

(def x-additional-info
  (comp
    (take-nth 1)
    (keep #(when (odd? %) (* % %)))
    (keep-indexed #(when (even? %1) (* %1 %2)))
    (replace {2 "two" 6 "six" 18 "eighteen"})))

(def x-calculate
  (comp
    (take 11)
    (take-while #(not= 300 %))
    (drop 1)
    (drop-while string?)
    (remove string?)))

(def x-prepare
  (comp
    x-clean
    x-filter))

(def x-process
  (comp
    x-additional-info
    x-calculate))

(def x-form
  (comp
    x-prepare
    x-process))

(transduce x-form + data)
;; 246

As you can see, comp can be used several times to compose over already composed transducers, with the end result that logically consistent pieces of computation can be grouped together and named for better reuse and readability.

Note that, although there are structural similarities with the thread last macro ->>, without transducers and comp, it wouldn't be possible to have more than one level of grouping.

Reuse across transports

There is another aspect which contributes to reuse (apart from composition). Now that transducers are isolated from the collection they are applied to, we can reuse transducers with other transports. A transport is essentially the way a collection of items is iterated. One of the most common transports in the standard library is sequential iteration (that is, using sequence functions like map, filter, etc.). But there are other examples of transports. The core.async library, for instance, implements iteration through an abstraction called ‘channel’, which behaves similarly to a blocking queue.

The following example shows the same transducers chain we have seen before to process incoming elements from a core.async channel:

(require '[clojure.core.async :refer [>! <! <!!] :as a])

(def xform (comp (filter odd?) (map inc)))

(defn process [items]
  (let [out (a/chan 1 xform)
        in (a/to-chan items)]
    (a/go-loop []
      (if-some [item (<! in)]
        (do
          (>! out item)
          (recur))
        (a/close! out)))
    (<!! (a/reduce conj [] out))))

(process (range 10))
;; [2 4 6 8 10]

The transducer xform is now applied to a channel ‘out’ inside a go-loop. Every item pushed down the channel passes through the transducer chain. The input is a range of 10 numbers transformed into a channel, but items could be streamed asynchronously (and often are; for example, as server side events on a web page).

The go-loop works sequentially in this case, but core.async also contains facilities to apply the same transducers in parallel. A pipeline is an abstraction that supports multi-threaded access that sits between an input and an output channel.

And it can be used with transducers:

(defn process [items]
  (let [out (a/chan (a/buffer 100))]
    (a/pipeline 4 out xform (a/to-chan items))
    (<!! (a/reduce conj [] out))))

(process (range 10))
[2 4 6 8 10]

A pipeline accepts a max number of parallel threads (usually the same of the physical cores available in the system). Each item coming from the input channel is processed in parallel through the transducer chain up to the max number of threads.

Custom transducers

The standard library provides a core number of transducer-enabled functions, which cover most of the situations. When those are not enough, you can create your own.

However, a custom transducer needs to obey a few rules to play well with other transducers:

  • It should support at least a zero, one and two-arguments calls.
  • The zero-argument call defines the initial value for the reduction. Despite not being used at the moment, it is recommended to invoke the reducing function (usually abbreviated "rf"), with no arguments, and return the result (a conservative approach that plays well with the current behavior of transduce in this case).
  • The single-argument call defines the tear-down behavior. This is especially useful if the transducer contains any state that needs deallocation. It will be called once with the final result of the reduction. After providing any custom logic (optional), the custom transducer is expected to call "rf" with the result, so other transducers in the chain can also have their chance for cleanup.
  • The two-arguments call represents a reduction step, which contains the business logic for the custom transducer. This is the typical reduce operation: the first argument represents the results so far, followed by the next item from the transport. It is expected that the reducing function "rf" is invoked after applying any transformation, propagating the call to other transducers that might be in the chain.
  • A transducer can decide to terminate reduction at any time by calling reduced on the results. Other transducers should pay attention to reduced? elements and prevent unnecessary computation. This is, for example, what happens using transducers like take-while; when the predicate becomes false, no other items should be reduced and computation should stop there. If after take-while there are other computationally intensive transducers, they should also stop processing and just let the results flow to the end.

The rules has mainly to do with the fact that each transducer in the chain should give a fair chance to the others to perform (or stop) their transformations, initialisation or cleanup logic.

Other than that, a custom transducer can implement any sort of complicated behaviours, including maintaining internal state, if necessary. Not surprisingly, transducers maintaining state are called "stateful" to distinguish them from "stateless" transducers. Maintaining state in a transducer is quite common and roughly half of the transducers in the standard library do.

It is so common that a new concurrency primitive has been created called volatile. In the historical group of vars, atom, refs and agents, volatile is the primitive that allows multiple threads to promptly see what's inside as soon as it is updated. It's very different from the other concurrency primitives that are instead "protecting" state from concurrent access. If you are wondering why, the answer is that in a context like core.async pipelines where the same transducer is being accessed concurrently, a volatile guarantees that the state is seen by all threads (but it won't work in foldable stateful transducers that we are going to see later in this article).

A logging (stateless) transducer

So, let's get practical and create our first stateless transducer. Assume that you just created a complicated, but nicely composed, transducer chain for data processing (we are simulating that here with a much shorter and simpler one).

How would you go about debugging it? You might need to understand at which step in the chain things are not working as expected. Here's an idea for a logging transducer with a printing side effect. Other than printing on screen, the transducer is not altering the reduction:

(defn log [& [idx]]
  (fn [rf]
    (fn
      ([] (rf))
      ([result] (rf result))
      ([result el]
        (let [n-step (if idx (str "Step: " idx ". ") "")]
          (println (format "%sResult: %s, Item: %s" n-step result el)))
        (rf result el)))))

(sequence (log) [:a :b :c])
;; Result: null, Item: :a
;; Result: null, Item: :b
;; Result: null, Item: :c
;; (:a :b :c)

In this example, log is a transducer accepting an optional single argument "idx". When "idx" is present, log additionally prints it, assuming that is the position of the transducer in a composed chain. (We'll see in a second how that information can be used.) Before being composed via comp, transducers are just a list of functions. The idea is that we can interleave the list with the logging transducer ahead of comp, and use a dynamic variable to control when to print to the console:

(def ^:dynamic *dbg?* false)

(defn comp* [& xforms]
  (apply comp
    (if *dbg?*
      (->>
        (range)
        (map log)
        (interleave xforms))
      xforms)))

(transduce
  (comp*
    (filter odd?)
    (map inc))
  +
  (range 5))
;; 6

(binding [*dbg?* true]
  (transduce
    (comp*
      (filter odd?)
      (map inc))
    +
    (range 5)))
;; Step: 0. Result: 0, Item: 1
;; Step: 1. Result: 0, Item: 2
;; Step: 0. Result: 2, Item: 3
;; Step: 1. Result: 2, Item: 4
;; 6

Here, comp* is a thin wrapper around the normal comp function in the standard library. It has the responsibility to check the *dbg?* dynamic variable. When *dbg?* is true, we interleave our logging transducer to an already existing chain of transducers passed as input. We don't do anything otherwise.

The first example invocation of transduce shows that comp* behaves like normal comp. When we bind *dbg*? to true though, the logging transducer starts printing. Several logging transducers instances are created — as many as necessary to interleave the input chain. Each logging transducer carries the "idx" information about its position in the chain, so it can print it. By looking at the source code, we know that ‘step 1’ corresponds to the transducer at index 1 in the (0-indexed) list passed to comp* (which is filter). If we see some odd value of ‘Item’ or ‘Result’, we know which step is producing it and we can take action.

An interleave (stateful) transducer

Let's now see an example of stateful custom transducer.

The following reduce illustrates the point by showing a way to implement the Egyptian multiplication algorithm. Ancient Egyptians didn't use times tables to multiply numbers, but they worked out the operation by decomposing numbers by powers of two:

(defn egypt-mult [x y]
    (->> (interleave
           (iterate #(quot % 2) x)
           (iterate #(* % 2) y))
      (partition-all 2)
      (take-while #(pos? (first %)))
      (filter #(odd? (first %)))
      (map second)
      (reduce +)))

(egypt-mult 640 10)
;; 6400

We would like to express this algorithm with transducers but we don't have an interleave transducer in the standard library — just normal interleave. All other operations inside the thread last ->> form are available as transducers.

So, how should we design the interleave transducer to work with transduce? First of all, transduce does not support multiple collections as input. So, the idea is to use one sequence as the main input for transduce and the other as the sequence of interleaving elements. The sequence of interleaving elements lives inside the state of the interleave transducer, taking the first item to interleave at each reducing step. The remaining elements have to be stored as state inside the transducer while waiting for the next call. Without further ado, here's the interleave transducer:

(defn interleave-xform                    ; <1>
  [coll]
  (fn [rf]
    (let [fillers (volatile! (seq coll))]   ; <2>
      (fn
        ([] (rf))
        ([result] (rf result))
        ([result input]
         (if-let [[filler] @fillers]             ; <3>
           (let [step (rf result input)]
             (if (reduced? step)            ; <4>
               step
               (do
                 (vswap! fillers next)      ; <5>
                 (rf step filler))))
           (reduced result)))))))          ; <6>

There is quite a lot to cover, please refer to the numbers in the above listing:

  1. interleave-xform is modeled on the same semantic of the interleave function in the standard library: it interleaves elements up to the end of the shortest sequence. interleave-xform contains all the required arities: no argument, single argument and two argument.
  2. interleave-xform assumes the interleaving is coming from a collection passed while creating the transducer. We need to keep track of the remaining items in the sequence as we consume them, so the rest of are stored in a volatile! instance.
  3. During the reducing step, we verify to have at least one more element to interleave before allowing the reduction. Note the use of if-let and destructuring on the first element of the content of the volatile instance.
  4. As any good transducer citizen, we need to check whether another transducer along the chain has required the end of the reduction. In that case, we obey without propagating any further reducing step.
  5. If, instead, we are not at the end of the reduction and we have more elements to interleave, we can proceed to update our volatile state and call the next transducer using the "filler" element coming from the internal state. Note that at this point, this is the second time we invoke ‘rf’; the first being for the normal reducing step, the second is an additional reducing step for the interleaving.
  6. In case we don't have any more items to interleave, we end the reduction using reduced. This prevents nil elements from appearing in the final output — exactly the same as normal interleave.

With interleave-xform in place, we can express the Egyptian multiplication method as follows:

(defn egypt-mult [x y]
  (transduce
    (comp
      (interleave-xform (iterate #(* % 2) y))
      (partition-all 2)
      (take-while #(pos? (first %)))
      (filter #(odd? (first %)))
      (map second))
    +
    (iterate #(quot % 2) x)))

(egypt-mult 4 5)
;; 20

The second iteration of increasingly doubling numbers is now considered the interleaving sequence that we pass when creating the transducer. The other iteration with increasingly halved numbers is instead the normal input for transduce. The two sequences are interleaved together and partitioned into vectors as part of the transducing step. Apart from the interleaving part, the rest of the processing is a mechanical refactoring from a sequence operation into the related transducer version.

Laziness

There are four main ways to apply transducers in the standard library: transduce, sequence, eduction and into. Up until now we've seen one of the most popular, transduce, which is designed to completely consume the input collection. Even when transduce is used to output a new sequence, it doesn't work lazily, as we can quickly verify by using a counter on the number of transformations happening on each item:

(def cnt (atom 0))
(take 10 (transduce (map #(do (swap! cnt inc) %)) conj () (range 1000)))
;; (999 998 997 996 995 994 993 992 991 990)
@cnt
;; 1000

In the example above, you can see that transduce completely consumes a lazy sequence, despite the fact that we want just the first 10 elements (note that conj on a list is pre-pending element at the beginning of the current list). The counter shows that the transducer has been called on all of the items, fully evaluating the range. into uses transduce underneath and has the same behaviour. If you are interested in applying a transducer chain lazily by gradually consuming the input, sequence and eduction are designed for that:

(def cnt1 (atom 0))
(let [res (eduction (map #(do (swap! cnt1 inc) %)) (range 1000))]
  (doall (take 10 res))
  @cnt1)
;; 33

(def cnt2 (atom 0))
(let [res (sequence (map #(do (swap! cnt2 inc) %)) (range 1000))]
  (doall (take 10 res))
  @cnt2)
;; 33

The type of laziness in eduction and sequence is called ‘chunked’. It means that they are lazy but not extremely lazy, by allowing some consumption of the input sequence ahead of the actual position in the iteration. New items are processed in chunks of 32 elements each: once the 32th element is reached, all others up to the 64th are processed, and so on. In our example, we consume 10 elements but process the whole first chunk of 32.

So, what’s the difference between eduction and sequence? eduction allows a variable number of transducers to be passed as argument without comp. Besides that, eduction starts a brand-new loop for every sequence operation, including running all transducers again if necessary. Here's a way to demonstrate this behaviour:

(def cnt1 (atom 0))
(let [res (eduction (map #(do (swap! cnt1 inc) %)) (range 10))]
  (conj (rest res) (first res))
  @cnt1)
;; 20

(def cnt2 (atom 0))
(let [res (sequence (map #(do (swap! cnt2 inc) %)) (range 10))]
  (conj (rest res) (first res)) ; (2)
  @cnt2)
;; 10

In our new examples, we are using both first and rest on the result of eduction and sequence, respectively. You can see that both first and rest forces eduction to scan the input sequence, as demonstrated by the counter printing of ‘20’, which is twice the amount of items in the input sequence. sequence caches the results internally, so it doesn't execute transducers again just returning the result from the cache. eduction approach has benefits on memory allocation at the price of multiple evaluations. A rule of thumb to pick the right choice would be:

  • Use sequence when you plan to use the produced output multiple times. For example, assigning it to a local binding to then proceed to further processing. Internal caching of sequence results in better performances overall. At the same time, it could consume more memory, as the entire sequence could load in memory if the last element is requested.
  • Use eduction when there is no plan to perform multiple scans of the output, saving on unnecessary caching. This is the case with transducer chains which depend on some user-generated search parameters for example. In this scenario, the application needs to produce a one-off view of the system that is discarded as soon as the response is returned.

Parallelism

We have seen an elegant example of parallelism in transducers using core.async pipelines. There is also another option to parallelize transducible operations using fold, a function from the reducers namespace in the standard library. fold offers parallelism based on the "divide and conquer" model: chunks of work are created and computation happens in parallel while, at the same time, finished tasks are combined back into the final result.

The following example shows ptransduce, a function that works like a parallel transduce. Since reducers are based on the same functional abstraction, we can leverage fold without changing the transducers chain:

(require '[clojure.core.reducers :refer [fold]])

(def xform (comp (map inc) (filter odd?)))

(defn ptransduce [xform rf combinef coll]
  (fold
    combinef
    (xform rf)
    (into [] coll)))

(ptransduce xform + + (range 1000000))
;; 250000000000

Note that the input collection needs to be a vector (or a map) in order for reducers to work in parallel. It also needs to be bigger than a certain size (512 items by default) to enable parallelism. Apart from this, the xform transducer chain is the same as before but we need to call it (xform rf) because fold expects a reducing function, which is what xform returns plus added transformations when invoked with a basic reducing function (like + in our case).

Both pipelines and fold parallelism work unless stateful transducers are involved. Observe:

(require '[clojure.core.async :refer [<!!] :as a])
(require '[clojure.core.reducers :refer [fold]])
(def xform (comp (drop 100) (map inc) (filter odd?)))

(transduce xform + (range 10000))
;; 24997500

(let [out (a/chan (a/buffer 100))]
  (a/pipeline 4 out xform (a/to-chan (range 10000)))
  (<!! (a/reduce + 0 out)))
;; 0

(distinct
  (for [_ (range 100)]
    (fold
      +
      (xform +)
      (into [] (range 10000)))))
;; (24997500 24877997 24781126 24912310 ....

(drop 100) is now part of the transducer chain. We can see the expected result of ‘24997500’ with a normal transduce; but pipeline and fold are both producing unexpected results. The reason is because multiple instances of the same stateful transducer exists. In the case of pipeline, one transducer chain is instantiated for each item (so drop is always dropping the single element available).

In the case of fold, there is one transducer chain each chunk, and the situation is additionally complicated by work stealing, a feature in which threads that are currently not doing any work can steal items from other threads. When that happens, the stateful transducer is not initialized again with the result that the same state is suddenly shared across threads. That's why, to see the inconsistency, you need to run fold multiple times.

The problem of parallelism with stateful transducer is both technical and semantic. The drop operation for instance is well defined sequentially. The elements are skipped at the beginning of the collection in the requested number. But the definition needs to be reformulated when chunks are involved: should drop remove the same number of elements each chunk? In that case sequential drop would diverge from parallel drop. Since an unique and consistent solution does not exist in the standard library at the moment, the problem of parallel stateful transducers remains unsolved and they should be avoided with fold or pipeline.

Conclusions

As we’ve seen in the previous part of this two articles series, transducers are, at their core, a simple but powerful functional abstraction. "Reduce" encapsulates a pattern for recursion that can be adapted to many situations and, at the same time, it promotes the design of code in terms of a standard reducing interface. Since transducers’ building blocks conform to the same contract, they are easy to compose and reuse. Transducers were introduced late in Clojure, perhaps explaining why some struggle in their adoption at first.

There are still some rough edges and room for improvement in transducers as they are today. A few libraries started to emerge to provide transducers version of other functions (most notably, Christophe Grand’s xforms), hinting at the fact that more could be added to the standard library itself. Transducers are also amenable for parallel computation; but there is no solid semantic for parallel stateful transducers, so they can’t be used with fold. This is somehow discouraging their parallel use as a whole.

On the positive side, transducers already cover a fair amount of common use cases and you should consider reaching them as the default for everyday programming.

Resources