After looking around a little, it seems that
Streaming.Prelude.splitAt is the key function.
splitAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r)
This function “peels off”
n layers of the functor
f that parameterizes the
Stream. And it works with any functor, not only with
In our case,
f will be a
Q.ByteStream, and we’ll still need to “go through” each one, because it’s itself a streaming abstraction, not just some data in memory like with
First, two auxiliary functions that we’ll need:
dumpSections :: Stream (Q.ByteStream IO) IO x -> IO x
dumpSections = Q.effects . Q.concat
takeSections :: Stream (Q.ByteStream IO) IO x -> IO (Of B.ByteString x)
takeSections = Q.toStrict . Q.concat
dumpSections goes through a delimited sequence of
ByteStreams, discarding everything except the final result (that’s important, because in streaming the final result often contains some “rest of a stream”).
takeSections concats and compacts a delimited sequence of
Now for the main dish:
nth :: Int -> Stream (Q.ByteStream IO) IO x -> IO (Of B.ByteString x)
nth n stream = do
atWord <- dumpSections $ splitsAt n stream
compacted :> rest <- takeSections $ splitAt 1 atWord
x <- dumpSections rest
pure $ compacted :> x
This function takes a delimited stream of (themselves streamed) words. First, it takes the first
n words and tears through them (likely advancing some file handle in the process) returning another stream which begins just at the word we want to extract.
Then we peel a single layer and compact the layer with
takeSections (although the layer will only contain a single
ByteStream word, because of the
Then we tear through the rest of the words and return the compacted bytes along with the final result of the stream (always important in streaming).
This might be the missing piece of the puzzle to put in
ghci> :t S.mapped (_ . Q.words) . Q.lines . Q.fromHandle
But I haven’t run it, only compiled it
Incidentally, because you mentioned that you’re a novice: this counts as somewhat advanced Haskell because it deals with monads (
IO), monads transformers (
ByteStream) and nontrivial streaming scenarios in which we want to keep as little stuff in memory as possible.
Edit: here’s an alternate way of defining
nth' :: Int -> Stream (Q.ByteStream IO) IO x -> IO (Of B.ByteString x)
nth' n stream = do
-- partial pattern match!
Right (word, rest) <- S.next $ S.drop n $ S.mapped Q.toStrict stream
x <- S.effects rest
pure $ word :> x
This one “streams” a bit less because it compacts all the words it encounters, not only the nth word. So it could potentially load words completely into memory only to discard them immediately.