by Bren Brightwell


tl;dr node's streams are horrible and I hate them.

EDIT: save yourself the trouble of using them, I've written Pirandello

I've been trying to shoehorn some algebra and category theory into node.js's streams. I figured I'd see how much of a fantasy land I could make them think they were in.

Readable streams are pretty obviously a Semigroup and Monoid. Semigroup because you can concatenate streams just by switching over to the second stream when the first is done. Monoid because a stream that just ends can be stuck anywhere and not change anything. So:

# semigroup
Readable::concat = (b)->
  a = this

  new class extends Readable
      super ...
      @read-from = a

    _read: (size)->
      if ( size)?
        @push that
      else if @read-from is a
        @read-from = b
        @push ""
        @push null

# monoid
Readable.empty = -> Readable.of ""

(more on Readable.of later)

Well, it's not pretty. What does it do? It tries to read from the first stream. If it can, it pushes to the output stream. Once it can't, it switches to the second stream and says "hey, try that again maybe?". When it can't from the second stream, it ends. Sounds an awful lot like it's concatenating the contents of the streams, which is neat.

What's left? Functor. Applicative. Chain. Monad.

It's definitely a Functor. We can map over the data chunks.

Oh, but hold on. We can derive Functor from Monad. A Monad is an Applicative and a Chain. An Applicative needs of and ap. But ap can derive from Monad, too.

Then all we need is Readable.of and Readable::chain and we've got Functor, Monad, and Applicative for free. Not bad.

So, Readable.of just creates a Readable stream from a bare value. Readables work over Strings and Buffers, which can both be .sliced.

Readable.of = (body)->
  new class extends Readable
    offset: 0
    _read: (size)->
      @push body.slice @offset,@offset+size-1
      @offset += size-1
      if @offset > body.length then return @push null # end the Readable

Now we see how Readable.empty works: it pushes the empty string, then because we've tried to read more than zero bytes, pushes null and ends the stream.

How about Chain? Well, the type signature of chain for Readable would be Readable a → (a → Readable b) → Readable b. Since the function passed returns a stream and we can't just subsume the current stream with a concatenation, we have to unwrap and then push. Sigh.

Readable::chain = (f)->
  orig = this

  new class extends Readable
    _read: (size)->
      if ( size)?
        if ((f that)read size)?
          @push that
        else @push ""
        @push null

But now we get implementations of ap and map for free:

# applicative derived from monad
Readable::ap = (m)->
  @chain (f)-> f

# functor derived from monad
Readable::map = (f)->
  @chain (a)~>
    Readable.of f a

Nice. Wait. What does Applicative even mean for a stream? ap :: Readable (a → b) → Readable a → Readable b. Streams of functions? We don't got those. They sound awesome though. If only Node's streams had some kind of object mode where you could just stream any old thing. That'd be great. (I'll think about it.)

As you can probably tell, I am fed up to the back teeth of _read: (size)-> and @push null. Node streams are a stupid API over a ridiculous abstraction. I've tried to make them nicer so you don't have to.

npm install fantasy-streams