Multi-Input Conduits

I’ve been working with the Haskell conduit library lately. It is great for building pipelines to process streams of data. The Conduit monad allows you to await for an input value, yield an output value, or perform any action in an underlying monad (e.g. IO actions like writing to a file).

I did, however, run into difficulties when I tried to handle multiple inputs.

First, what does a “conduit with multiple inputs” mean? Well, a conduit with one input can await on that input whenever it needs an input value, so a conduit with two inputs should have two await operations, so await1 and await2 that respectively get an input value from the first and second input stream.

The solution, inspired by this section of the pipes manual, is to write a conduit whose inner monad is another conduit, e.g.

Then await1 and await2 become

This can be connected to two sources and a sink like this:

This works because source1 $$ merged returns a value in the underlying monad, which is ConduitM i2 o m, which the outer ($$) and (=$) can then connect to.

This is a good start. But I also wanted to transform the incoming streams by fusing (=$=), and this is where things got complicated.

Fusing the outer stream is easy: plain old (=$=) works. But how do I fuse the inner stream? First lets figure out the type of the function I want. It should take a conduit transforming values of some type i2′ into i2 (so they can be consumed by the inner input), together with a 2-input conduit as described above. It should produce a 2-input conduit with the same outer input type (i1) but a new inner input type (i2′). So I want a function fuseInner with the following signature:

This signature shows why its hard to write this function: the underlying monad of the outer conduit is changing. As far as I could find, there is only one function in the conduit library that changes the underlying monad: transPipe. transPipe takes a monad morphism from the underlying monad of the conduit to some other monad, and a conduit, and produces a new conduit in the new monad. And (left =$=) appears to give the, monad morphism that we need:

This compiles just fine, but it doesn’t do what we want. Every time the old conduit performed an action in the underlying monad, transPipe applies our monad morphism (left =$=) to get an action in the new underlying monad. So left is fused separately to each action in the inner conduit. If left is stateful (e.g. isolate), this fails completely, since the state is reset on every lift await. You might hope for some “better” implementation of transPipe that avoids this, but if you think about it, there isn’t really any other option: there are multiple, separate actions in the first monad that you want to turn into multiple, separate actions in the second monad, so each one needs to be transformed separately.

Morally, we want (left =$=) to be a kind of “stateful transformation”, in which each call fuses to the part of left that was remaining after the last call. This clearly isn’t a (pure) function, but we can represent it with the following data structure

To use such a stateful transformation, we replace transPipe (a special case of hoist) with the following:

It is possible to write the instance StatefulHoist (ConduitM i o) by only slightly changing the code for transPipe. A replacement for (=$=) which uses a StatefulMorph can be written by slightly adjusting the code for the internal function pipeL. Its signature is

Putting it all together, we can now write fuseInner

I’ve posted the full implementation here, along with a test to demonstrate it really does what I say it does.

I have searched far and wide for a better way to do this, and would be very interested in any suggestions.

Another aspect of this approach that I haven’t touched on is multiple outputs. When we have nested conduits, we not only have two possible input actions (await and lift await) we also have two possible output actions: yield and lift yield. What possible fun could be had with conduits with multiple inputs and multiple outputs? I will take a look at that in a later post.