How to use word streams in the streaming package?

Novice Haskell user here!

I am trying to use the streaming and streaming-bytestream packages to build a count of text fields. For each input line, we find the nth field and increment its key in the map.

Here is what I have so far:

import qualified Data.HashMap.Strict as H
import qualified Streaming.Prelude as S
import qualified Streaming.ByteString.Char8 as Q
import qualified Data.ByteString.Char8 as B

makeMap :: FilePath -> Int -> IO (H.HashMap B.ByteString Int)
makeMap filename n = withFile filename ReadMode $ \h -> do
    S.fold_ (increment n) H.empty id
    $ S.mapped Q.toStrict
    $ Q.lines
    $ Q.fromHandle h

increment :: Int -> H.HashMap B.ByteString Int -> B.ByteString  -> H.HashMap B.ByteString Int
increment n m l = case atMay (B.words l) n of
    Nothing  -> m
    (Just k) -> H.insertWith (+) k 1 m

It works, which is already somewhat surprising.

I would next like to eliminate the toStrict call on each line, and instead use Streaming.ByteString.Char8.words to split each line into a stream of words, and then somehow select the nth word in each line. I am stuck here, and can’t work out how this might be done.

Any help would be appreciated, thanks.

1 Like

The nice thing about using Streaming.ByteString.Char8.words instead of B.words would be that, because we don’t materialize entire lines in memory at any point, a line could be gigabytes in length without exhausting memory.

Because we process each line with the function we pass to S.mapped, one thing we could try is to put Q.words there. Because we will need to do some further processing, lets put a type hole to get a hint of what we need:

ghci> :t S.mapped (_ . Q.words) . Q.lines . Q.fromHandle

<interactive>:1:11: error:
    * Found hole: _ :: Stream (Q.ByteStream m) m x -> m (g x)

In our case m is IO, so the hole really has type

Stream (Q.ByteStream IO) IO x -> IO (g x)

What should be g? Presumably the Of functor carrying a strict ByteString, the nth word in the line. So:

Stream (Q.ByteStream IO) IO x -> IO (Of Bytestring x)

So we need a function that takes a stream of words (which are themselves streamed, in the form of Q.ByteStream IO), takes the nth one, “compacts” that word into a strict bytestring, and finally returns it (after going through the rest of the streamed words).

Right now I’m not sure if there are combinators in streaming that let you skip over whole “streaming groups”, instead of single chunks. Perhaps we’ll need to write that function ourselves.

1 Like

After looking around a little, it seems that Streaming.Prelude.splitAt is the key function.

splitAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r)

This function “peels off” n layers of the functor f that parameterizes the Stream. And it works with any functor, not only with Of!

In our case, f will be a Q.ByteStream, and we’ll still need to “go through” each one, because it’s itself a streaming abstraction, not just some data in memory like with Of.

First, two auxiliary functions that we’ll need:

dumpSections :: Stream (Q.ByteStream IO) IO x -> IO x
dumpSections = Q.effects . Q.concat

takeSections :: Stream (Q.ByteStream IO) IO x -> IO (Of B.ByteString x)
takeSections = Q.toStrict . Q.concat

dumpSections goes through a delimited sequence of ByteStreams, discarding everything except the final result (that’s important, because in streaming the final result often contains some “rest of a stream”).

takeSections concats and compacts a delimited sequence of ByteStreams.

Now for the main dish:

nth :: Int -> Stream (Q.ByteStream IO) IO x -> IO (Of B.ByteString x)
nth n stream = do
    atWord <- dumpSections $ splitsAt n stream
    compacted :> rest <- takeSections $ splitAt 1 atWord 
    x <- dumpSections rest
    pure $ compacted :> x

This function takes a delimited stream of (themselves streamed) words. First, it takes the first n words and tears through them (likely advancing some file handle in the process) returning another stream which begins just at the word we want to extract.

Then we peel a single layer and compact the layer with takeSections (although the layer will only contain a single ByteStream word, because of the S.splitAt 1).

Then we tear through the rest of the words and return the compacted bytes along with the final result of the stream (always important in streaming).

This might be the missing piece of the puzzle to put in

ghci> :t S.mapped (_ . Q.words) . Q.lines . Q.fromHandle

But I haven’t run it, only compiled it :slight_smile:

Incidentally, because you mentioned that you’re a novice: this counts as somewhat advanced Haskell because it deals with monads (IO), monads transformers (Stream and ByteStream) and nontrivial streaming scenarios in which we want to keep as little stuff in memory as possible.

Edit: here’s an alternate way of defining nth.

nth' :: Int -> Stream (Q.ByteStream IO) IO x -> IO (Of B.ByteString x)
nth' n stream = do
    -- partial pattern match!
    Right (word, rest) <- S.next $ S.drop n $ S.mapped Q.toStrict stream 
    x <- S.effects rest
    pure $ word :> x

This one “streams” a bit less because it compacts all the words it encounters, not only the nth word. So it could potentially load words completely into memory only to discard them immediately.

1 Like

Thanks, this is helpful indeed.

I’m probably in over my head here. I managed to fit things together based on examples in the package documentation, but I’m still a bit vague about how things work “under the hood”.

I had not previously seen type holes, which look like a very useful tool. I spent a lot of time trying to fit types together and this will clearly make that easier.

I dropped your code into my program, and it gives the same result as my previous code. It may take a few days for me to understand how it all works; I’ll probably be back with more questions.

The Stream datatype has three type parameters:

data Stream f m r
  • m is the base monad over which the stream works. For most applications, it will be IO so let’s assume that here.

  • r is the return value of the stream, which it gives up once the stream is exhausted. Note that it’s not the type of the values emitted by the Stream while it’s running, but something else that it returns at the end.

    Streams are themselves Functors on r. That is, fmap for a Stream changes its final result value, not the yielded elements.

    Sometimes r will be a boring uninfomative (). But it plays an important role in splitting operations, as we shall see.

    Also, if we have a function that transforms streams and it’s polymorphic on r, like this

    forall r . Stream (Of Int) IO r -> IO (Of Int r)
    

    that gives us a strong hint that the function actually exhausts the stream, instead of simply taking the first two elements or something like that. Why? The function must work for all possible rs, and the only way it knows to get such an r is from getting to the end of the Stream!

  • f is some Functor. A Stream parameterized by f is like a succession of f layers that we go through, one by one. Each time we advance through a layer, effects from the base monad (assume IO) might be performed. And when there are no more layers, a value of type r is returned, as mentioned earlier.

    The most common Functor used as f is Of. That’s what most functions from Streaming.Prelude expect. Of is basically a fancy tuple, where the first component carries the elements yielded by the stream.

So, putting it all together: a value Stream (Of Int) IO Bool is a succession of Int values, and each time we advance to the next Int, some IO effect (say, reading from a file handle) might take place, and when there are no more Ints to be yielded, a final Bool is returned.

Now, the guiding principle of streaming is to provide functions on Streams that resemble functions that we would use with normal pure lists, while avoiding as much as possible the need to “materialize” intermediate results. That sounds good but, how to do that with splitting and grouping operations?

streaming uses two “tricks” to preserve streaming in those cases:

  • When splitting a Stream, the second part of the split is moved to the return type r. That way you can consume the first part and know where it ends, and then move to the second part.

  • When grouping a Stream, the result is a new Stream where the functor parameter becomes itself a Stream. The intuition is that each functor “layer” we must go through represents a single group. And to reach the next group, we must first exhaust all the elements of the previous one.

    Functions like mapped and maps are useful to manipulate and transform groups.

As for ByteStream, it’s like a Stream for which the functor f is fixed to contain bytes (that’s why it falls back to regular Streams for grouping operations, that require a change in the functor parameter). Also, its operations treat the sequence of bytes uniformly, instead of forcing us to be aware of the underlying sequence of chunks. For example, length counts bytes, not chunks. For contrast, length for a Stream (Of ByteString) IO () would count chunks, not bytes.

1 Like