I am trying to use Streamly, but struggling to wrap my head around the library functions.
Here is a simple task I want to do.
I have a stream. I want to take the first N elements, package them in a sequence, and do something else with the rest of the stream.
import qualified Streamly.Internal.Data.Fold as Fold
data TakeState a = TakeState
{ current :: Seq a
, count :: Int
}
takeS :: Monad m => Int -> Fold.Fold m a (Seq a)
takeS n = Fold.Fold step initial extr extr
where
done tk
| tk.count == n = Fold.Done tk.current
| otherwise = Fold.Partial tk
initial = pure . done $ TakeState Seq.empty 0
step tk a = pure . done $ TakeState
{ current = tk.current |> a
, count = tk.count + 1
}
extr tk = pure tk.current
If I use \n -> Stream.foldBreak (takeS n), that does what I want.
scan :: Monad m => Fold m a b -> Stream m a -> Stream m b
parseMany :: Monad m => Parser a m b -> Stream m a -> Stream m (Either ParseError b)
scan only works with a fold
parseMany works with a scan, but it forgets the state of the parser for every new parse
I need to use a Parser because I want to use takeWhile. takeWhile is a Parser rather than a Fold because it relies on peeking – it has to know that the next element does not satisfy the given predicate
Here is another (simple?) task I am struggling to solve with Streamly.
I want to write a function like this:
slidingWindowFuture :: Float -> Stream m (Float, a) -> Stream m (Float, Seq a)
slidingWindowFuture wlen input = undefined
Let’s say that the input stream is (t[0], v[0]), (t[1], v[1]), ....
Then, the output stream should be (t[0], s[0]), (t[1], s[1]), ... where s[i] = Seq.fromList [v[i], v[i + 1], ... v[n] such that n has the following property:
t[n+1] should be the smallest t[j] such that t[j] - t[i] > wlen
If such an n is not defined, then neither is the pair (t[i], s[i])
For example, say
wlen is 2
The input stream is Stream.fromList [(0, 'a'), (1, 'b'), (2, 'c'), (2.5, 'd'), (3, 'e'), (3.1, 'f')]
Then, the output stream is [(0, Seq.fromList ['a', 'b', 'c']), (1, Seq.fromList ['b', 'c', 'd', 'e'])]
Note that the output tuple with (2, ...) is not yet defined because the stream transformer is still waiting for an element with time more than 4.