This is the first part of an article dedicated to Clojure transducers. The first part concentrates more on the functional abstraction they implement. The second part shows practical examples, tools and techniques.
Transducers have been introduced in Clojure 1.7 (at the end of 2014) and never got the attention they deserved. They are still relegated to niche or advanced uses, but there are compelling reasons to reach for them as the default, for example replacing thread first/last (
->> respectively) when applicable.
Maybe because their name is similar to "reducers" (a similar Clojure feature which focuses on parallelism), or because of some conceptual learning curve, many programmers I know are still reluctant to use transducers in everyday code.
In this article I’m going to show you that transducers are a functional abstraction (similar to combining object oriented patterns). They can be derived with a few refactoring moves on top of existing collection processing functions. The fact that Clojure offers them out of the box removes any excuse to start using them now!
Same function, different implementations
filter are very common operation for stream oriented programming (there are many more in the Clojure standard library and the following examples apply to most of them as well). Here's a simplified version of how
filter are implemented in Clojure:
(defn map [f coll] (when (not= '() coll) (conj (map f (rest coll)) (f (first coll))))) (defn filter [pred coll] (when (not= '() coll) (let [f (first coll) r (rest coll)] (if (pred f) (conj (filter pred r) f) (filter pred r)))))
filter clearly share some common traits:
- The access mechanism to the input collection (
rest, the empty list
'()are all specific to the Clojure sequential interface).
- Building the output (
conjis used to put elements in the final list, but something else could be used).
- The recursion mechanism is used to consume the input (note that this is a stack consuming and not tail-recursive loop).
- The "essence" of the operation itself, which is the way "mapping" or "filtering" works (
filterrequires a conditional for example).
There are similar operations for data pipelines in other Clojure libraries. For example core.async is a library implementing the CSP mode for concurrency (Communicating Sequential Processes) in which channels exchange information between processes. A common case for the sender is to apply transformations to the outgoing messages, including
filter and many others.
Let's have a look at how they could be implemented (this is a simplified version of the now deprecated ones in
(defn map [f in out] (go-loop  (let [val (<! in)] (if (nil? val) (close! out) (do (doseq [v (f val)] (>! out v)) (when-not (impl/closed? out) (recur))))))) (defn filter [pred ch] (let [out (chan)] (go-loop  (let [val (<! ch)] (if (nil? val) (close! out) (do (when (pred val) (>! out val)) (recur))))) out))
Again there are similarities and common traits:
- The access mechanism uses core.async channel primitives (
<!). The channel semantic also includes checking if the message is
- Building the output consists of pushing (
>!) items down an output channel.
- The recursion mechanism is implemented by the
go-loopmacro and related
- The "essence" of the operation itself is the same as before:
mapconsists of applying "f" to each value and
filteruses a predicate on each value in a
We are going to see one last example inspired by another library: the Clojure Reactive extensions. RxClojure is a library implementing the Clojure bindings for RxJava.
Reactive programming is a push-based model based on streams: events (called “observables”) are collected and routed to components "reacting" to compose behavior. How could
filter be implemented in this case?
The following are not in RxClojure, as they are just calling into the relative Java version. If we had to implement them in Clojure though, they would probably look something like:
(defn map [f xs] (let [op (operator* (fn [o] (subscriber o (fn [o v] (catch-error-value o v (on-next o (f v)))))))] (lift op xs))) (defn filter [pred xs] (let [op (operator* (fn [o] (subscriber o (fn [o v] (catch-error-value o v (when (f v) (on-next o v)))))))] (lift op xs)))
We start to see a pattern emerging, once again we can distinguish between:
- The access mechanism uses
liftto iterate through the incoming
xsin conjunction with
on-nextinside the operator implementation.
- Building the output is not explicit as before. Events are consumed downstream without accumulating.
- The recursion mechanism is implicit. Somewhere else in the code a loop is happening, but it's not exposed as part of the main API.
- The "essence" of the operation is the same as before:
(f v)for each value and
By looking at three popular implementations of
filter, we learned that the essence of the implemented operation and some form of iteration are common traits. Making access to the input or building the output depends on the specific transport. The same happens if we look at the implementation of other functions for collection processing (in Clojure alone), such as:
cat, mapcat, remove, take, take-while take-nth, drop, drop-while, replace partition-by, partition-all, keep, keep-indexed map-indexed, distinct, interpose, dedupe random-sample ...
To the list above you should also include any custom functions that you might need beyond what's offered by the Clojure standard library.
The dilemma is; how can we deal with the ensuing combinatorial explosion? Are we doomed to implement the same function all over for each new type of transport/sequence/collection? Could we just write
map once (and possibly only once) to use it everywhere? Transducers are the solution to this problem (and much more).
An exercise in refactoring
Our goal now is to isolate
filter (or any other collection processing function) "essence" and provide a way to run them in a transport-independent fashion. If we succeed, we'll have a recipe to build processing pipelines that can be reused in different contexts. It turns out that
reduce, a well known operation in functional programming, is the key to achieve this goal.
Reduce is an abstraction that encapsulates the prototypical tail-recursive loop. The following "sum of all numbers in the list" is a typical example:
(defn reduce [f result coll] (if (not= '() coll) (reduce f (f result (first coll)) (rest coll)) result)) (reduce + 0 (range 10))
Reduce accumulates the result explicitly as one of the parameters. This form of recursion is also called "iterative" and once transformed into a
loop-recur does not consume the stack. The other interesting fact about
reduce is that it decouples the iteration from the transformation semantic, which is what we are trying to achieve.
filter (as well as many other recursive algorithms) can be rewritten "reduce style". The fact that a stack-consumung algorithm can be re-written as iterative is a well known property in theory of computation. By re-writing
filter (and possibly other sequential functions) as iterative, we are offered the possibility to extract the "essence" of the operation:
;; refactoring step 1: iterative recursion style. (defn map [f result coll] (if (not= '() coll) (map f (f result (first coll)) (rest coll)) result)) (map (fn [result el] (conj result (inc el)))  (range 10)) (defn filter [f result coll] (if (not= '() coll) (filter f (f result (first coll)) (rest coll)) result)) (filter (fn [result el] (if (odd? el) (conj result el) result))  (range 10))
"f" is now passed as part of the parameters in our new
filter. If you look carefully, the two functions
filter are now identical (except for the name). Invoking them requires a more sophisticated "f" function than before, one taking two arguments: the result so far (also called accumulator) and the next element to process.
One big plus after this change is that the essence of filtering (or mapping), is now isolated from the recursion and input iteration mechanics. It is not yet isolated from the way the output is built (
conj in both cases) and the actual function (
odd? respectively). But let's take baby steps and do some renaming (
filter can be renamed
reduce) and extract functions ("f" can be called
;; refactoring step 2: rename and reuse. (defn reduce [f result coll] (if (not= '() coll) (reduce f (f result (first coll)) (rest coll)) result)) (defn mapping [result el] (conj result (inc el))) (reduce mapping  (range 10)) (defn filtering [result el] (if (odd? el) (conj result el) result)) (reduce filtering  (range 10))
Reduce encapsulates the iteration and the sequential access mechanism. But there is still a problem with
filtering: if we want to be able to use
filtering on a
core.async channel for instance, we need to abstract
conj away. We can't modify
filtering interface, because it is part of the
reduce contract. But we can add a parameter "rf" (for Reducing Function) in a wrapping lambda and return again a function of two parameters:
;; refactoring step 3: extract output construction parameter. (defn reduce [f result coll] (if (not= '() coll) (reduce f (f result (first coll)) (rest coll)) result)) (defn mapping [rf] (fn [result el] (rf result (inc el)))) (reduce (mapping conj)  (range 10)) (defn filtering [rf] (fn [result el] (if (odd? el) (rf result el) result))) (reduce (filtering conj)  (range 10))
We also need to extract
odd? which are just example functions and should be passed from the outside. Again, we don't want to alter the two arguments interface required by
reduce, so we wrap again and introduce the new parameter "f" (or "pred"):
;; refactoring step 4: extract transforming and predicate functions. (defn mapping [f] (fn [rf] (fn [result el] (rf result (f el))))) (reduce ((mapping inc) conj)  (range 10)) (defn filtering [pred] (fn [rf] (fn [result el] (if (pred? el) (rf result el) result)))) (reduce ((filtering odd?) conj)  (range 10))
Finally, let's rename the relevant functions back to
filter (because this is what they are after all):
;; refactoring step 5: final clean-up. (defn map [f] (fn [rf] (fn [result el] (rf result (f el))))) (defn filter [pred] (fn [rf] (fn [result el] (if (pred el) (rf result el) result))))
This is exactly how
clojure.core/filter appears in the Clojure standard library. Clojure core contains additional "arities" (not implemented here for clarity) to enable the use of
filter without necessarily going through
Transduce for example, is a thin layer on top of
reduce. At the same time, it improves readability:
(transduce (map inc) conj (range 10)) ;; same as: (reduce ((map inc) conj)  (range 10))
The standard library also provides "transducers awareness" in many other places. The following for instance, are also possible and remove the need for an explicit
(sequence (map inc) (range 10)) (into  (map inc) (range 10))
Conj is not explicit because the reducing function can be inferred from the specific call to
sequence (we want to build a sequence) or
into  (we want to build a vector). Now that we have the basic recipe, it's time to put the new construct in practice and see how it can be used in our daily code.
The article shows that transducers are built on top of a simple functional abstraction and there is nothing magic happening under the hood. Apart from the interesting refactoring exercise, transducers have deeper consequences in terms of reusability, composability and performances that we are going to explore in the second part of this article.