Clojure transducers from the ground-up: the essence.
This is the first part of an article dedicated to Clojure transducers. The first part concentrates more on the functional abstraction they implement. The second part shows practical examples, tools and techniques.
Transducers have been introduced in Clojure 1.7 (at the end of 2014) and never got the attention they deserved. They are still relegated to niche or advanced uses, but there are compelling reasons to reach for them as the default, for example replacing thread first/last (->
and ->>
respectively) when applicable.
Maybe because their name is similar to "reducers" (a similar Clojure feature which focuses on parallelism), or because of some conceptual learning curve, many programmers I know are still reluctant to use transducers in everyday code.
In this article I’m going to show you that transducers are a functional abstraction (similar to combining object oriented patterns). They can be derived with a few refactoring moves on top of existing collection processing functions. The fact that Clojure offers them out of the box removes any excuse to start using them now!
Same function, different implementations
Map
and filter
are very common operation for stream oriented programming (there are many more in the Clojure standard library and the following examples apply to most of them as well). Here's a simplified version of how map
and filter
are implemented in Clojure:
(defn map [f coll]
(when (not= '() coll)
(conj
(map f (rest coll))
(f (first coll)))))
(defn filter [pred coll]
(when (not= '() coll)
(let [f (first coll) r (rest coll)]
(if (pred f)
(conj (filter pred r) f)
(filter pred r)))))
Map
and filter
clearly share some common traits:
- The access mechanism to the input collection (
first
,rest
, the empty list'()
are all specific to the Clojure sequential interface). - Building the output (
conj
is used to put elements in the final list, but something else could be used). - The recursion mechanism is used to consume the input (note that this is a stack consuming and not tail-recursive loop).
- The "essence" of the operation itself, which is the way "mapping" or "filtering" works (
filter
requires a conditional for example).
There are similar operations for data pipelines in other Clojure libraries. For example core.async is a library implementing the CSP mode for concurrency (Communicating Sequential Processes) in which channels exchange information between processes. A common case for the sender is to apply transformations to the outgoing messages, including map
, filter
and many others.
Let's have a look at how they could be implemented (this is a simplified version of the now deprecated ones in core.async
):
(defn map [f in out]
(go-loop []
(let [val (<! in)]
(if (nil? val)
(close! out)
(do (doseq [v (f val)]
(>! out v))
(when-not (impl/closed? out)
(recur)))))))
(defn filter [pred ch]
(let [out (chan)]
(go-loop []
(let [val (<! ch)]
(if (nil? val)
(close! out)
(do (when (pred val)
(>! out val))
(recur)))))
out))
Again there are similarities and common traits:
- The access mechanism uses core.async channel primitives (
<!
). The channel semantic also includes checking if the message isnil
before proceeding. - Building the output consists of pushing (
>!
) items down an output channel. - The recursion mechanism is implemented by the
go-loop
macro and relatedrecur
instruction. - The "essence" of the operation itself is the same as before:
map
consists of applying "f" to each value andfilter
uses a predicate on each value in awhen
condition.
We are going to see one last example inspired by another library: the Clojure Reactive extensions. RxClojure is a library implementing the Clojure bindings for RxJava.
Reactive programming is a push-based model based on streams: events (called “observables”) are collected and routed to components "reacting" to compose behavior. How could map
or filter
be implemented in this case?
The following are not in RxClojure, as they are just calling into the relative Java version. If we had to implement them in Clojure though, they would probably look something like:
(defn map [f xs]
(let [op (operator*
(fn [o]
(subscriber o
(fn [o v]
(catch-error-value o v
(on-next o (f v)))))))]
(lift op xs)))
(defn filter [pred xs]
(let [op (operator*
(fn [o]
(subscriber o
(fn [o v]
(catch-error-value o v
(when (f v)
(on-next o v)))))))]
(lift op xs)))
We start to see a pattern emerging, once again we can distinguish between:
- The access mechanism uses
lift
to iterate through the incomingxs
in conjunction withon-next
inside the operator implementation. - Building the output is not explicit as before. Events are consumed downstream without accumulating.
- The recursion mechanism is implicit. Somewhere else in the code a loop is happening, but it's not exposed as part of the main API.
- The "essence" of the operation is the same as before:
map
consists of(f v)
for each value andfilter
uses awhen
condition.
Combinatorial explosion
By looking at three popular implementations of map
and filter
, we learned that the essence of the implemented operation and some form of iteration are common traits. Making access to the input or building the output depends on the specific transport. The same happens if we look at the implementation of other functions for collection processing (in Clojure alone), such as:
cat, mapcat, remove, take, take-while
take-nth, drop, drop-while, replace
partition-by, partition-all, keep, keep-indexed
map-indexed, distinct, interpose, dedupe
random-sample ...
To the list above you should also include any custom functions that you might need beyond what's offered by the Clojure standard library.
The dilemma is; how can we deal with the ensuing combinatorial explosion? Are we doomed to implement the same function all over for each new type of transport/sequence/collection? Could we just write map
once (and possibly only once) to use it everywhere? Transducers are the solution to this problem (and much more).
An exercise in refactoring
Our goal now is to isolate map
or filter
(or any other collection processing function) "essence" and provide a way to run them in a transport-independent fashion. If we succeed, we'll have a recipe to build processing pipelines that can be reused in different contexts. It turns out that reduce
, a well known operation in functional programming, is the key to achieve this goal.
Reduce
is an abstraction that encapsulates the prototypical tail-recursive loop. The following "sum of all numbers in the list" is a typical example:
(defn reduce [f result coll]
(if (not= '() coll)
(reduce f (f result (first coll)) (rest coll))
result))
(reduce + 0 (range 10))
Reduce
accumulates the result explicitly as one of the parameters. This form of recursion is also called "iterative" and once transformed into a loop-recur
does not consume the stack. The other interesting fact about reduce
is that it decouples the iteration from the transformation semantic, which is what we are trying to achieve.
Map
and filter
(as well as many other recursive algorithms) can be rewritten "reduce style". The fact that a stack-consumung algorithm can be re-written as iterative is a well known property in theory of computation. By re-writing map
and filter
(and possibly other sequential functions) as iterative, we are offered the possibility to extract the "essence" of the operation:
;; refactoring step 1: iterative recursion style.
(defn map [f result coll]
(if (not= '() coll)
(map f (f result (first coll)) (rest coll))
result))
(map (fn [result el] (conj result (inc el))) [] (range 10))
(defn filter [f result coll]
(if (not= '() coll)
(filter f (f result (first coll)) (rest coll))
result))
(filter (fn [result el] (if (odd? el) (conj result el) result)) [] (range 10))
"f" is now passed as part of the parameters in our new map
and filter
. If you look carefully, the two functions map
and filter
are now identical (except for the name). Invoking them requires a more sophisticated "f" function than before, one taking two arguments: the result so far (also called accumulator) and the next element to process.
One big plus after this change is that the essence of filtering (or mapping), is now isolated from the recursion and input iteration mechanics. It is not yet isolated from the way the output is built (conj
in both cases) and the actual function (inc
and odd?
respectively). But let's take baby steps and do some renaming (map
and filter
can be renamed reduce
) and extract functions ("f" can be called mapping
for map
and filtering
for filter
):
;; refactoring step 2: rename and reuse.
(defn reduce [f result coll]
(if (not= '() coll)
(reduce f (f result (first coll)) (rest coll))
result))
(defn mapping [result el]
(conj result (inc el)))
(reduce mapping [] (range 10))
(defn filtering [result el]
(if (odd? el)
(conj result el)
result))
(reduce filtering [] (range 10))
Reduce
encapsulates the iteration and the sequential access mechanism. But there is still a problem with mapping
and filtering
: if we want to be able to use mapping
or filtering
on a core.async
channel for instance, we need to abstract conj
away. We can't modify mapping
or filtering
interface, because it is part of the reduce
contract. But we can add a parameter "rf" (for Reducing Function) in a wrapping lambda and return again a function of two parameters:
;; refactoring step 3: extract output construction parameter.
(defn reduce [f result coll]
(if (not= '() coll)
(reduce f (f result (first coll)) (rest coll))
result))
(defn mapping [rf]
(fn [result el]
(rf result (inc el))))
(reduce (mapping conj) [] (range 10))
(defn filtering [rf]
(fn [result el]
(if (odd? el)
(rf result el)
result)))
(reduce (filtering conj) [] (range 10))
We also need to extract inc
and odd?
which are just example functions and should be passed from the outside. Again, we don't want to alter the two arguments interface required by reduce
, so we wrap again and introduce the new parameter "f" (or "pred"):
;; refactoring step 4: extract transforming and predicate functions.
(defn mapping [f]
(fn [rf]
(fn [result el]
(rf result (f el)))))
(reduce ((mapping inc) conj) [] (range 10))
(defn filtering [pred]
(fn [rf]
(fn [result el]
(if (pred? el)
(rf result el)
result))))
(reduce ((filtering odd?) conj) [] (range 10))
Finally, let's rename the relevant functions back to map
and filter
(because this is what they are after all):
;; refactoring step 5: final clean-up.
(defn map [f]
(fn [rf]
(fn [result el]
(rf result (f el)))))
(defn filter [pred]
(fn [rf]
(fn [result el]
(if (pred el)
(rf result el)
result))))
This is exactly how clojure.core/map
and clojure.core/filter
appears in the Clojure standard library. Clojure core contains additional "arities" (not implemented here for clarity) to enable the use of map
and filter
without necessarily going through reduce
. Transduce
for example, is a thin layer on top of reduce
. At the same time, it improves readability:
(transduce (map inc) conj (range 10))
;; same as: (reduce ((map inc) conj) [] (range 10))
The standard library also provides "transducers awareness" in many other places. The following for instance, are also possible and remove the need for an explicit conj
:
(sequence (map inc) (range 10))
(into [] (map inc) (range 10))
Conj
is not explicit because the reducing function can be inferred from the specific call to sequence
(we want to build a sequence) or into []
(we want to build a vector). Now that we have the basic recipe, it's time to put the new construct in practice and see how it can be used in our daily code.
Conclusions
The article shows that transducers are built on top of a simple functional abstraction and there is nothing magic happening under the hood. Apart from the interesting refactoring exercise, transducers have deeper consequences in terms of reusability, composability and performances that we are going to explore in the second part of this article.