I've been working with the Haskell conduit library lately. It is great for building pipelines to process streams of data. The *Conduit* monad allows you to *await* for an input value, *yield* an output value, or perform any action in an underlying monad (e.g. IO actions like writing to a file).

I did, however, run into difficulties when I tried to handle multiple inputs.

First, what does a "conduit with multiple inputs" mean? Well, a conduit with one input can *await* on that input whenever it needs an input value, so a conduit with two inputs should have two await operations, so *await1* and *await2* that respectively get an input value from the first and second input stream.

The solution, inspired by this section of the pipes manual, is to write a conduit whose inner monad is another conduit, e.g.

Then *await1* and *await2* become

This can be connected to two sources and a sink like this:

This works because *source1 $$ merged* returns a value in the underlying monad, which is *ConduitM i2 o m*, which the outer *($$)* and *(=$)* can then connect to.

This is a good start. But I also wanted to transform the incoming streams by fusing (=$=), and this is where things got complicated.

Fusing the outer stream is easy: plain old (=$=) works. But how do I fuse the inner stream? First lets figure out the type of the function I want. It should take a conduit transforming values of some type *i2'* into *i2* (so they can be consumed by the inner input), together with a 2-input conduit as described above. It should produce a 2-input conduit with the same outer input type (*i1*) but a new inner input type (*i2'*). So I want a function *fuseInner* with the following signature:

This signature shows why its hard to write this function: the underlying monad of the outer conduit is changing. As far as I could find, there is only one function in the conduit library that changes the underlying monad: *transPipe*. *transPipe* takes a monad morphism from the underlying monad of the conduit to some other monad, and a conduit, and produces a new conduit in the new monad. And (left =$=) appears to give the, monad morphism that we need:

This compiles just fine, but it doesn't do what we want. Every time the old conduit performed an action in the underlying monad, *transPipe* applies our monad morphism *(left =$=)* to get an action in the new underlying monad. So *left* is fused separately to each action in the inner conduit. If *left* is stateful (e.g. *isolate*), this fails completely, since the state is reset on every *lift await*. You might hope for some "better" implementation of *transPipe* that avoids this, but if you think about it, there isn't really any other option: there are multiple, separate actions in the first monad that you want to turn into multiple, separate actions in the second monad, so each one needs to be transformed separately.

Morally, we want (left =$=) to be a kind of "stateful transformation", in which each call fuses to the part of left that was remaining after the last call. This clearly isn't a (pure) function, but we can represent it with the following data structure

To use such a stateful transformation, we replace *transPipe* (a special case of *hoist*) with the following:

It is possible to write the instance *StatefulHoist (ConduitM i o)* by only slightly changing the code for *transPipe*. A replacement for *(=$=)* which uses a *StatefulMorph* can be written by slightly adjusting the code for the internal function *pipeL*. Its signature is

Putting it all together, we can now write *fuseInner*

I've posted the full implementation here, along with a test to demonstrate it really does what I say it does.

I have searched far and wide for a better way to do this, and would be very interested in any suggestions.

Another aspect of this approach that I haven't touched on is multiple outputs. When we have nested conduits, we not only have two possible input actions (*await* and *lift await*) we also have two possible output actions: *yield* and *lift yield*. What possible fun could be had with conduits with multiple inputs and multiple outputs? I will take a look at that in a later post.