In my last two posts I’ve been looking at ways to extend the conduit library to create conduits with 2 inputs and conduits with more than 2 inputs. One might wonder, then, about conduits with multiple outputs. I have found that multi-outuput conduits are much less interesting than multi-input conduits, and I want to explain why.
For multi-input conduits, we have await, lift await, and so on to take input from one input stream and leave the others undisturbed. This lets you say
I need an i1, not an i2, before I go any further.
Closely related to this is a 1-input conduit which takes an Either i1 i2 input. This lets you say
I need either an i1 or an i2, before I go any further.
Now lets consider these two strategies applied to the output side. We can have a nested, two-output conduit and use yield and lift yield. Then yield says
I have an o1, but no o2. You have to take it before I go any further.
If instead we have a 1-output conduit which gives values of type Either o1 o2, we can use yield Left and yield Right. Then yield Left says the same thing again!
I have an o1, but no o2. You have to take it before I go any further.
Whats going on here? When we have multiple input streams, we are assuming the streams are independent. If the input streams were coupled together in some way, then taking 1000 values from the first input without taking anything from the second input might cause memory leaks or worse. When we have two different output “streams”, they are most certainly not independent. Anything trying to use these values better take them from the right stream at the right time or something bad will happen.
If 2-output conduits are allowed, one might naturally want to connect the two output conduits of A to the two input conduits of B. But this can lead to nonsensical situations, like A yielding on output 1 while B is awaiting on input 2:
So, multiple inputs can be very helpful, and are genuinely more powerful than zipped or Either tagged inputs, but multiple outputs are not worth the trouble. If you just want to send output to multiple sinks, you should use zipSinks.
Last time I described how to create a conduit that can read from either one of two input streams. Today I want to look at conduits with more than two input streams.
The key idea in my last post was to use “stateful monad morphisms”, which can be hoisted into some monad transformers (specifically, the ConduitM i o transformer).
This works great to fuse to the inner input of a 2-input conduit. When I tried a 3-input conduit, however, I realized I got the abstraction a little wrong. I start off with fuseStateful on the inner input, then statefulHoist that to the second layer. But then I’m left with a plain old function, which I can’t statefulHoist again to the outer layer. Using plain-old transPipe/hoist to get back to the outer conduit is no good for the same reason that it doesn’t work for the 2-input conduit: all my careful stateful fusing and hoistng gets reset whenever there is an action in the outer conduit.
Luckily this is easy to fix: I just need fuseStateful to return another StatefulMorph rather than a function:
Now I can fuse to conduits of arbitrary depth:
I have updated the repository with these changes.
I’ve been working with the Haskell conduit library lately. It is great for building pipelines to process streams of data. The Conduit monad allows you to await for an input value, yield an output value, or perform any action in an underlying monad (e.g. IO actions like writing to a file).
I did, however, run into difficulties when I tried to handle multiple inputs.
First, what does a “conduit with multiple inputs” mean? Well, a conduit with one input can await on that input whenever it needs an input value, so a conduit with two inputs should have two await operations, so await1 and await2 that respectively get an input value from the first and second input stream.
The solution, inspired by this section of the pipes manual, is to write a conduit whose inner monad is another conduit, e.g.
Then await1 and await2 become
This can be connected to two sources and a sink like this:
This works because source1 $$ merged returns a value in the underlying monad, which is ConduitM i2 o m, which the outer ($$) and (=$) can then connect to.
This is a good start. But I also wanted to transform the incoming streams by fusing (=$=), and this is where things got complicated.
Fusing the outer stream is easy: plain old (=$=) works. But how do I fuse the inner stream? First lets figure out the type of the function I want. It should take a conduit transforming values of some type i2′ into i2 (so they can be consumed by the inner input), together with a 2-input conduit as described above. It should produce a 2-input conduit with the same outer input type (i1) but a new inner input type (i2′). So I want a function fuseInner with the following signature:
This signature shows why its hard to write this function: the underlying monad of the outer conduit is changing. As far as I could find, there is only one function in the conduit library that changes the underlying monad: transPipe. transPipe takes a monad morphism from the underlying monad of the conduit to some other monad, and a conduit, and produces a new conduit in the new monad. And (left =$=) appears to give the, monad morphism that we need:
This compiles just fine, but it doesn’t do what we want. Every time the old conduit performed an action in the underlying monad, transPipe applies our monad morphism (left =$=) to get an action in the new underlying monad. So left is fused separately to each action in the inner conduit. If left is stateful (e.g. isolate), this fails completely, since the state is reset on every lift await. You might hope for some “better” implementation of transPipe that avoids this, but if you think about it, there isn’t really any other option: there are multiple, separate actions in the first monad that you want to turn into multiple, separate actions in the second monad, so each one needs to be transformed separately.
Morally, we want (left =$=) to be a kind of “stateful transformation”, in which each call fuses to the part of left that was remaining after the last call. This clearly isn’t a (pure) function, but we can represent it with the following data structure
To use such a stateful transformation, we replace transPipe (a special case of hoist) with the following:
It is possible to write the instance StatefulHoist (ConduitM i o) by only slightly changing the code for transPipe. A replacement for (=$=) which uses a StatefulMorph can be written by slightly adjusting the code for the internal function pipeL. Its signature is
Putting it all together, we can now write fuseInner
I’ve posted the full implementation here, along with a test to demonstrate it really does what I say it does.
I have searched far and wide for a better way to do this, and would be very interested in any suggestions.
Another aspect of this approach that I haven’t touched on is multiple outputs. When we have nested conduits, we not only have two possible input actions (await and lift await) we also have two possible output actions: yield and lift yield. What possible fun could be had with conduits with multiple inputs and multiple outputs? I will take a look at that in a later post.