# Multi-Output Conduits?

**Posted:**March 30, 2013

**Filed under:**Haskell, Programming Leave a comment

In my last two posts I’ve been looking at ways to extend the conduit library to create conduits with 2 inputs and conduits with more than 2 inputs. One might wonder, then, about conduits with multiple outputs. I have found that multi-outuput conduits are much less interesting than multi-input conduits, and I want to explain why.

For multi-input conduits, we have *await*, *lift await*, and so on to take input from one input stream and leave the others undisturbed. This lets you say

I need an

i1, not ani2, before I go any further.

Closely related to this is a 1-input conduit which takes an *Either i1 i2* input. This lets you say

I need either an

i1or ani2, before I go any further.

Now lets consider these two strategies applied to the output side. We can have a nested, two-output conduit and use *yield* and *lift yield*. Then *yield* says

I have an

o1, but noo2. You have to take it before I go any further.

If instead we have a 1-output conduit which gives values of type *Either o1 o2*, we can use *yield Left* and *yield Right*. Then *yield Left* says the same thing again!

I have an

o1, but noo2. You have to take it before I go any further.

Whats going on here? When we have multiple input streams, we are assuming the streams are independent. If the input streams were coupled together in some way, then taking 1000 values from the first input without taking anything from the second input might cause memory leaks or worse. When we have two different output “streams”, they are most certainly not independent. Anything trying to use these values better take them from the right stream at the right time or something bad will happen.

If 2-output conduits are allowed, one might naturally want to connect the two output conduits of A to the two input conduits of B. But this can lead to nonsensical situations, like *A* yielding on output 1 while *B* is awaiting on input 2:

So, multiple inputs can be very helpful, and are genuinely more powerful than zipped or Either tagged inputs, but multiple outputs are not worth the trouble. If you just want to send output to multiple sinks, you should use zipSinks.

# (>2)-Input Conduits

**Posted:**March 27, 2013

**Filed under:**Haskell, Programming Leave a comment

Last time I described how to create a conduit that can read from either one of two input streams. Today I want to look at conduits with more than two input streams.

The key idea in my last post was to use “stateful monad morphisms”, which can be hoisted into some monad transformers (specifically, the *ConduitM i o* transformer).

This works great to fuse to the inner input of a 2-input conduit. When I tried a 3-input conduit, however, I realized I got the abstraction a little wrong. I start off with *fuseStateful* on the inner input, then *statefulHoist* that to the second layer. But then I’m left with a plain old function, which I can’t *statefulHoist* again to the outer layer. Using plain-old *transPipe*/*hoist* to get back to the outer conduit is no good for the same reason that it doesn’t work for the 2-input conduit: all my careful stateful fusing and hoistng gets reset whenever there is an action in the outer conduit.

Luckily this is easy to fix: I just need *fuseStateful* to return another *StatefulMorph* rather than a function:

Now I can fuse to conduits of arbitrary depth:

I have updated the repository with these changes.

# Multi-Input Conduits

**Posted:**March 24, 2013

**Filed under:**Haskell, Programming, Uncategorized 1 Comment

I’ve been working with the Haskell conduit library lately. It is great for building pipelines to process streams of data. The *Conduit* monad allows you to *await* for an input value, *yield* an output value, or perform any action in an underlying monad (e.g. IO actions like writing to a file).

I did, however, run into difficulties when I tried to handle multiple inputs.

First, what does a “conduit with multiple inputs” mean? Well, a conduit with one input can *await* on that input whenever it needs an input value, so a conduit with two inputs should have two await operations, so *await1* and *await2* that respectively get an input value from the first and second input stream.

The solution, inspired by this section of the pipes manual, is to write a conduit whose inner monad is another conduit, e.g.

Then *await1* and *await2* become

This can be connected to two sources and a sink like this:

This works because *source1 $$ merged* returns a value in the underlying monad, which is *ConduitM i2 o m*, which the outer *($$)* and *(=$)* can then connect to.

This is a good start. But I also wanted to transform the incoming streams by fusing (=$=), and this is where things got complicated.

Fusing the outer stream is easy: plain old (=$=) works. But how do I fuse the inner stream? First lets figure out the type of the function I want. It should take a conduit transforming values of some type *i2′* into *i2* (so they can be consumed by the inner input), together with a 2-input conduit as described above. It should produce a 2-input conduit with the same outer input type (*i1*) but a new inner input type (*i2′*). So I want a function *fuseInner* with the following signature:

This signature shows why its hard to write this function: the underlying monad of the outer conduit is changing. As far as I could find, there is only one function in the conduit library that changes the underlying monad: *transPipe*. *transPipe* takes a monad morphism from the underlying monad of the conduit to some other monad, and a conduit, and produces a new conduit in the new monad. And (left =$=) appears to give the, monad morphism that we need:

This compiles just fine, but it doesn’t do what we want. Every time the old conduit performed an action in the underlying monad, *transPipe* applies our monad morphism *(left =$=)* to get an action in the new underlying monad. So *left* is fused separately to each action in the inner conduit. If *left* is stateful (e.g. *isolate*), this fails completely, since the state is reset on every *lift await*. You might hope for some “better” implementation of *transPipe* that avoids this, but if you think about it, there isn’t really any other option: there are multiple, separate actions in the first monad that you want to turn into multiple, separate actions in the second monad, so each one needs to be transformed separately.

Morally, we want (left =$=) to be a kind of “stateful transformation”, in which each call fuses to the part of left that was remaining after the last call. This clearly isn’t a (pure) function, but we can represent it with the following data structure

To use such a stateful transformation, we replace *transPipe* (a special case of *hoist*) with the following:

It is possible to write the instance *StatefulHoist (ConduitM i o)* by only slightly changing the code for *transPipe*. A replacement for *(=$=)* which uses a *StatefulMorph* can be written by slightly adjusting the code for the internal function *pipeL*. Its signature is

Putting it all together, we can now write *fuseInner*

I’ve posted the full implementation here, along with a test to demonstrate it really does what I say it does.

I have searched far and wide for a better way to do this, and would be very interested in any suggestions.

Another aspect of this approach that I haven’t touched on is multiple outputs. When we have nested conduits, we not only have two possible input actions (*await* and *lift await*) we also have two possible output actions: *yield* and *lift yield*. What possible fun could be had with conduits with multiple inputs and multiple outputs? I will take a look at that in a later post.

## Recent Comments