Clojure transducers from the ground-up: the essence.

This is the first part of an article dedicated to Clojure transducers. The first part concentrates more on the functional abstraction they implement. The second part shows practical examples, tools and techniques.

Transducers have been introduced in Clojure 1.7 (at the end of 2014) and never got the attention they deserved. They are still relegated to niche or advanced uses, but there are compelling reasons to reach for them as the default, for example replacing thread first/last (-> and ->> respectively) when applicable.

Maybe because their name is similar to "reducers" (a similar Clojure feature which focuses on parallelism), or because of some conceptual learning curve, many programmers I know are still reluctant to use transducers in everyday code.

In this article I’m going to show you that transducers are a functional abstraction (similar to combining object oriented patterns). They can be derived with a few refactoring moves on top of existing collection processing functions. The fact that Clojure offers them out of the box removes any excuse to start using them now!

Same function, different implementations

Map and filter are very common operation for stream oriented programming (there are many more in the Clojure standard library and the following examples apply to most of them as well). Here's a simplified version of how map and filter are implemented in Clojure:

(defn map [f coll]
  (when (not= '() coll)
    (conj
      (map f (rest coll))
      (f (first coll)))))

(defn filter [pred coll]
  (when (not= '() coll)
    (let [f (first coll) r (rest coll)]
      (if (pred f)
        (conj (filter pred r) f)
        (filter pred r)))))

Map and filter clearly share some common traits:

  • The access mechanism to the input collection (first, rest, the empty list '() are all specific to the Clojure sequential interface).
  • Building the output (conj is used to put elements in the final list, but something else could be used).
  • The recursion mechanism is used to consume the input (note that this is a stack consuming and not tail-recursive loop).
  • The "essence" of the operation itself, which is the way "mapping" or "filtering" works (filter requires a conditional for example).

There are similar operations for data pipelines in other Clojure libraries. For example core.async is a library implementing the CSP mode for concurrency (Communicating Sequential Processes) in which channels exchange information between processes. A common case for the sender is to apply transformations to the outgoing messages, including map, filter and many others.

Let's have a look at how they could be implemented (this is a simplified version of the now deprecated ones in core.async):

(defn map [f in out]
 (go-loop []
  (let [val (<! in)]
   (if (nil? val)
    (close! out)
    (do (doseq [v (f val)]
         (>! out v))
     (when-not (impl/closed? out)
      (recur)))))))

(defn filter [pred ch]
 (let [out (chan)]
  (go-loop []
   (let [val (<! ch)]
    (if (nil? val)
     (close! out)
     (do (when (pred val)
          (>! out val))
      (recur)))))
  out))

Again there are similarities and common traits:

  • The access mechanism uses core.async channel primitives (<!). The channel semantic also includes checking if the message is nil before proceeding.
  • Building the output consists of pushing (>!) items down an output channel.
  • The recursion mechanism is implemented by the go-loop macro and related recur instruction.
  • The "essence" of the operation itself is the same as before: map consists of applying "f" to each value and filter uses a predicate on each value in a when condition.

We are going to see one last example inspired by another library: the Clojure Reactive extensions. RxClojure is a library implementing the Clojure bindings for RxJava.

Reactive programming is a push-based model based on streams: events (called “observables”) are collected and routed to components "reacting" to compose behavior. How could map or filter be implemented in this case?

The following are not in RxClojure, as they are just calling into the relative Java version. If we had to implement them in Clojure though, they would probably look something like:

(defn map [f xs]
 (let [op (operator*
           (fn [o]
            (subscriber o
             (fn [o v]
              (catch-error-value o v
               (on-next o (f v)))))))]
  (lift op xs)))

(defn filter [pred xs]
 (let [op (operator*
           (fn [o]
            (subscriber o
             (fn [o v]
              (catch-error-value o v
               (when (f v)
                (on-next o v)))))))]
  (lift op xs)))

We start to see a pattern emerging, once again we can distinguish between:

  • The access mechanism uses lift to iterate through the incoming xs in conjunction with on-next inside the operator implementation.
  • Building the output is not explicit as before. Events are consumed downstream without accumulating.
  • The recursion mechanism is implicit. Somewhere else in the code a loop is happening, but it's not exposed as part of the main API.
  • The "essence" of the operation is the same as before: map consists of (f v) for each value and filter uses a when condition.

Combinatorial explosion

By looking at three popular implementations of map and filter, we learned that the essence of the implemented operation and some form of iteration are common traits. Making access to the input or building the output depends on the specific transport. The same happens if we look at the implementation of other functions for collection processing (in Clojure alone), such as:

cat, mapcat, remove, take, take-while  
take-nth, drop, drop-while, replace  
partition-by, partition-all, keep, keep-indexed  
map-indexed, distinct, interpose, dedupe  
random-sample ...  

To the list above you should also include any custom functions that you might need beyond what's offered by the Clojure standard library.

The dilemma is; how can we deal with the ensuing combinatorial explosion? Are we doomed to implement the same function all over for each new type of transport/sequence/collection? Could we just write map once (and possibly only once) to use it everywhere? Transducers are the solution to this problem (and much more).

An exercise in refactoring

Our goal now is to isolate map or filter (or any other collection processing function) "essence" and provide a way to run them in a transport-independent fashion. If we succeed, we'll have a recipe to build processing pipelines that can be reused in different contexts. It turns out that reduce, a well known operation in functional programming, is the key to achieve this goal.

Reduce is an abstraction that encapsulates the prototypical tail-recursive loop. The following "sum of all numbers in the list" is a typical example:

(defn reduce [f result coll]
  (if (not= '() coll)
    (reduce f (f result (first coll)) (rest coll))
    result))

(reduce + 0 (range 10))

Reduce accumulates the result explicitly as one of the parameters. This form of recursion is also called "iterative" and once transformed into a loop-recur does not consume the stack. The other interesting fact about reduce is that it decouples the iteration from the transformation semantic, which is what we are trying to achieve.

Map and filter (as well as many other recursive algorithms) can be rewritten "reduce style". The fact that a stack-consumung algorithm can be re-written as iterative is a well known property in theory of computation. By re-writing map and filter (and possibly other sequential functions) as iterative, we are offered the possibility to extract the "essence" of the operation:

;; refactoring step 1: iterative recursion style.

(defn map [f result coll]
  (if (not= '() coll)
    (map f (f result (first coll)) (rest coll))
    result))

(map (fn [result el] (conj result (inc el))) [] (range 10))

(defn filter [f result coll]
  (if (not= '() coll)
    (filter f (f result (first coll)) (rest coll))
    result))

(filter (fn [result el] (if (odd? el) (conj result el) result)) [] (range 10))

"f" is now passed as part of the parameters in our new map and filter. If you look carefully, the two functions map and filter are now identical (except for the name). Invoking them requires a more sophisticated "f" function than before, one taking two arguments: the result so far (also called accumulator) and the next element to process.

One big plus after this change is that the essence of filtering (or mapping), is now isolated from the recursion and input iteration mechanics. It is not yet isolated from the way the output is built (conj in both cases) and the actual function (inc and odd? respectively). But let's take baby steps and do some renaming (map and filter can be renamed reduce) and extract functions ("f" can be called mapping for map and filtering for filter):

;; refactoring step 2: rename and reuse.

(defn reduce [f result coll]
  (if (not= '() coll)
    (reduce f (f result (first coll)) (rest coll))
    result))

(defn mapping [result el]
  (conj result (inc el)))

(reduce mapping [] (range 10))

(defn filtering [result el]
  (if (odd? el)
    (conj result el)
    result))

(reduce filtering [] (range 10))

Reduce encapsulates the iteration and the sequential access mechanism. But there is still a problem with mapping and filtering: if we want to be able to use mapping or filtering on a core.async channel for instance, we need to abstract conj away. We can't modify mapping or filtering interface, because it is part of the reduce contract. But we can add a parameter "rf" (for Reducing Function) in a wrapping lambda and return again a function of two parameters:

;; refactoring step 3: extract output construction parameter.

(defn reduce [f result coll]
  (if (not= '() coll)
    (reduce f (f result (first coll)) (rest coll))
    result))

(defn mapping [rf]
  (fn [result el]
    (rf result (inc el))))

(reduce (mapping conj) [] (range 10))

(defn filtering [rf]
  (fn [result el]
    (if (odd? el)
      (rf result el)
      result)))

(reduce (filtering conj) [] (range 10))

We also need to extract inc and odd? which are just example functions and should be passed from the outside. Again, we don't want to alter the two arguments interface required by reduce, so we wrap again and introduce the new parameter "f" (or "pred"):

;; refactoring step 4: extract transforming and predicate functions.

(defn mapping [f]
  (fn [rf]
    (fn [result el]
      (rf result (f el)))))

(reduce ((mapping inc) conj) [] (range 10))

(defn filtering [pred]
  (fn [rf]
    (fn [result el]
      (if (pred? el)
        (rf result el)
        result))))

(reduce ((filtering odd?) conj) [] (range 10))

Finally, let's rename the relevant functions back to map and filter (because this is what they are after all):

;; refactoring step 5: final clean-up.

(defn map [f]
  (fn [rf]
    (fn [result el]
      (rf result (f el)))))

(defn filter [pred]
  (fn [rf]
    (fn [result el]
      (if (pred el)
        (rf result el)
        result))))

This is exactly how clojure.core/map and clojure.core/filter appears in the Clojure standard library. Clojure core contains additional "arities" (not implemented here for clarity) to enable the use of map and filter without necessarily going through reduce. Transduce for example, is a thin layer on top of reduce. At the same time, it improves readability:

(transduce (map inc) conj (range 10))
;; same as: (reduce ((map inc) conj) [] (range 10))

The standard library also provides "transducers awareness" in many other places. The following for instance, are also possible and remove the need for an explicit conj:

(sequence (map inc) (range 10))
(into [] (map inc) (range 10))

Conj is not explicit because the reducing function can be inferred from the specific call to sequence (we want to build a sequence) or into [] (we want to build a vector). Now that we have the basic recipe, it's time to put the new construct in practice and see how it can be used in our daily code.

Conclusions

The article shows that transducers are built on top of a simple functional abstraction and there is nothing magic happening under the hood. Apart from the interesting refactoring exercise, transducers have deeper consequences in terms of reusability, composability and performances that we are going to explore in the second part of this article.