Simple Tasks in Streamly

I am trying to use Streamly, but struggling to wrap my head around the library functions.

Here is a simple task I want to do.

I have a stream. I want to take the first N elements, package them in a sequence, and do something else with the rest of the stream.

import qualified Streamly.Internal.Data.Fold as Fold

data TakeState a = TakeState
  { current :: Seq a
  , count :: Int
  }
  
takeS :: Monad m => Int -> Fold.Fold m a (Seq a)
takeS n = Fold.Fold step initial extr extr
  where
    done tk
      | tk.count == n = Fold.Done tk.current
      | otherwise = Fold.Partial tk
    initial = pure . done $ TakeState Seq.empty 0
    step tk a = pure . done $ TakeState
      { current = tk.current |> a
      , count = tk.count + 1
      }
    extr tk = pure tk.current

If I use \n -> Stream.foldBreak (takeS n), that does what I want.

ghci> (sq, rest) <- Stream.foldBreak (takeS 5) $ Stream.fromList [1..]

What would be the more idiomatic way to do this?

1 Like

You can write it as

import qualified Streamly.Data.Fold as Fold
import qualified Data.Sequence as Seq

toSeq = Fold.foldl' (Seq.|>) Seq.empty
takeS n = Fold.take n toSeq
1 Like

I see!

It was difficult to wrap my head around the Fold m a b -> Fold m a b pattern of Stream.take at first.

Is there a way to do a scan by using a Parser?

Some function I see in Streamly.Data.Stream are

scan :: Monad m => Fold m a b -> Stream m a -> Stream m b
parseMany :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b)
  • scan only works with a fold
  • parseMany works with a scan, but it forgets the state of the parser for every new parse

I need to use a Parser because I want to use takeWhile. takeWhile is a Parser rather than a Fold because it relies on peeking – it has to know that the next element does not satisfy the given predicate

Here is another (simple?) task I am struggling to solve with Streamly.

I want to write a function like this:

slidingWindowFuture :: Float -> Stream m (Float, a) -> Stream m (Float, Seq a)
slidingWindowFuture wlen input = undefined

Let’s say that the input stream is (t[0], v[0]), (t[1], v[1]), ....

Then, the output stream should be (t[0], s[0]), (t[1], s[1]), ... where s[i] = Seq.fromList [v[i], v[i + 1], ... v[n] such that n has the following property:

  • t[n+1] should be the smallest t[j] such that t[j] - t[i] > wlen

If such an n is not defined, then neither is the pair (t[i], s[i])

For example, say

  • wlen is 2
  • The input stream is Stream.fromList [(0, 'a'), (1, 'b'), (2, 'c'), (2.5, 'd'), (3, 'e'), (3.1, 'f')]
  • Then, the output stream is [(0, Seq.fromList ['a', 'b', 'c']), (1, Seq.fromList ['b', 'c', 'd', 'e'])]
  • Note that the output tuple with (2, ...) is not yet defined because the stream transformer is still waiting for an element with time more than 4.