Parallel stream processing with zero-copy fan-out and sharding

I’m interested in parallel stream processing. Not the map/reduce kind of
parallelism, but rather pipeline parallelism like in manufacturing and CPU
instruction pipelines.

I’ve recently made some progress using an Arrow interface and leveraging
associated types to avoid copying data between queues that connect the different
stages of the pipeline:

Parallel stream processing with zero-copy fan-out and
sharding

This is heavily inspired by Martin Thompson et al’s work on the LMAX Disruptor
and Aeron. As I was writing things up I also stumbled upon an interview with the
late Jim Gray where he advocated for pipeline parallelism.

As far as I know, no streaming library in Haskell uses this approach, but I’d be
happy to be proven wrong. I would also be happy to try to answer any questions
or listen to any kind of feedback, cheers!

9 Likes

One thing I often run into with such approaches is that arrow notation desugars into a lot of arr and >>>, but never *** even if it could. For example:

proc (a, b) -> do
  c <- f -< a
  d <- g -< b
  return (c, d)

Semantically, this is ***, but it will desugar as some mess like first f >>> second g decorated with needless arrs. But there is no way to shard >>>, only *** and &&&, right? Or is your approach able to parallelize >>> well? If not, do you avoid arrow notation then?

Yeah, you are right that arrow notation desugaring is bad/wrong, I briefly touch
upon this in footnote 3, an alternative could also be to do your own desugaring
a la the arrowp-qq library. Either way, for now one would have to use the
combinators explicitly, unfortunately.

You can think of sharding (what Jim Gray calls partition parallelism) as taking
a copy of a whole pipeline and diverge half of the traffic (even indices) to the
first copy and the other half (odd indices) to the second copy. This works
irregardless of what combinators the pipeline is composed out of, and we can
deterministically retain the order of the outputs (because we’ve split by
even/odd).

Pipeline parallelism (which is orthogonal to sharding/partition parallelism) is
introduced by (>>>), where by its first argument gets deployed on one thread
and the second argument on another thread and the two stages get connected with
a queue. You can see this in the runQueueSleep example (it uses &&&, but
that’s not important, parallelism would happen despite it) and you can also compare
it with the runQueueSleepSharded example, which adds sharding.

Just an observation: I think what you have defined with P is a free arrow (see below) and deployment is mapping the free arrow to a less free one. I’m not exactly an expert on this, but there exists research on arrows that uses the arrow laws to study program transformation, whose goals may include parallelism.

Recently I attempted to use the arrow idiom to define a complex model that should be checkable against a graphical specification, but gave up eventually because code became incomprehensible. Do-notation for Kleisli arrows with let bindings turned out to be superior for this task. (Plus it turns out that there exists no automatic translation from Haskell arrows back to diagrams, where one would easily see the parallelism.)

-- | Abstract syntax tree over arrow combinators.
-- In @FreeArrow arr@ the type @arr@ contains 
-- The special capabilities we want our workflow to have. 
data FreeArrow :: (Type -> Type -> Type) -> Type -> Type -> Type where
    Arr :: (x -> y) -> FreeArrow arr x y
    PureArr :: arr x y -> FreeArrow arr x y
    ComposeArrow :: FreeArrow arr y z -> FreeArrow arr x y -> FreeArrow arr x z
    First  :: FreeArrow arr x y -> FreeArrow arr (x,z) (y,z)
    Second :: FreeArrow arr x y -> FreeArrow arr (z,x) (z,y)
    Pair   :: FreeArrow arr x y -> FreeArrow arr a b -> FreeArrow arr (x,a) (y,b)
    FanOut :: FreeArrow arr x y -> FreeArrow arr x z -> FreeArrow arr x (y,z)

instance Category (FreeArrow arr) where
    id = Arr (Data.Function.id)
    f . g = ComposeArrow f g
instance Arrow (FreeArrow arr) where
    arr = Arr
    first  = First
    second = Second
    (***) = Pair
    (&&&) = FanOut

-- | interpret the abstract syntax in a concrete category.
fromFreeWith :: Arrow arr' =>
    (forall x y. arr x y -> arr' x y) ->
    FreeArrow arr a b -> arr' a b
fromFreeWith h (Arr f) = arr f
fromFreeWith h (ComposeArrow f g) = fromFreeWith h f <<< fromFreeWith h g
fromFreeWith h (First f         ) = first (fromFreeWith h f)
fromFreeWith h (Second f        ) = second (fromFreeWith h f)
fromFreeWith h (Pair f g        ) = fromFreeWith h f *** fromFreeWith h g
fromFreeWith h (FanOut f g      ) = fromFreeWith h f &&& fromFreeWith h g
fromFreeWith h (PureArr f       ) = h f

Or with other words: first f >>> second g will be parallel, but not as
parallel as f *** g.

In the first case, we’ll have two threads working on the queue in a pipeline
fashion, i.e. once first f is done with the first item in the queue it will
pass it on to second g while at the same time starting to work on the second
item, thus introducing parallelism (assuming the two threads run on different
CPUs/cores).

In the second case, we first need to note that a Disruptor queue of pairs will
be represented as a pair of queues (similar to how vector or array of structs
vs struct of arrays works). So here we will again have two threads, but this
time without any dependency between them. The first thread gets the first queue
(containing all the fst components of the pair, without having to do any copying of items) while the second thread gets the second queue, and in the end we form a pair of queues again. Here the fst
part of the first item is processed in parallel with the snd part of the first
item, so this should perform better than the first case. (It’s important to note
that any consumer of the output of f *** g may only consume items that have
been handled by both f and g, this is done by keeping track of which index
both components are at and then taking the minimum.)

Just an observation: I think what you have defined with P is a free arrow (see
below) and deployment is mapping the free arrow to a less free one.

Thanks! I currently have deployment and visualisation as a combined
transformation, it would be nice to be more clear about P being a free arrow
and separating the two transformations.

I’m not exactly an expert on this, but there exists research on arrows that
uses the arrow laws to study program transformation, whose goals may include
parallelism.

Do you have any reference?

Recently I attempted to use the arrow idiom to define a complex model that
should be checkable against a graphical specification, but gave up eventually
because code became incomprehensible. Do-notation for Kleisli arrows with let
bindings turned out to be superior for this task. (Plus it turns out that
there exists no automatic translation from Haskell arrows back to diagrams,
where one would easily see the parallelism.)

Interesting, do you have any code to share?

I also tried to visualise my pipelines, see the section on observability, but granted only for small examples.

I’m not sure if it’s any use to you or perhaps you are already aware of it, but
Conal Elliott’s work on compiling to categories also visualises parallelism.

There is for example a paper on how point-free style can help in program transformation, but it does not mention arrows. (But arrows are point-free syle.) More relevant may be this preprint. Judging from section 4.1 it seems the authors are concerned with classical parallel evaluation like

[arr x y] -> arr [x] [y]

where a list models a vector of ad-hoc accessible data rather than a stream of values that become available one after the other.

Unfortunately not at the moment, since it is company property. But the FreeArrow definition I showed arose in that attempt. I am concerned with self-documenting computations and arrows could be a great tool for that. The idea is that one first defines the algorithm in a free arrow, hiding a chosen level of implementation details inside the PureArrow constructor. The FreeArrow AST could be used to generate graphical representations, apply program transformations such as identify potential parallelism and machine-check the algorithm against graphical specifications. For actually running the algorithm the AST is mapped to a concrete and more efficient Arrow category.

I asked the author of the Haskell arrows page how the diagrams were generated and he said they were done by hand. Disappointing. Do you think Conal’s compiling to categories has the necessary toolkit for that?

The SVG viewer ist cool! I also use SVG to visualize data because it provides interactive features when embedded in HTML, which can be shared as a static file. But it never occurred to me I could overlay completely different pictures in the same canvas. I use the blaze family of packages to generate both the SVG and the surrounding HTML with JavaScript from the same Haskell program. I embed data-whatever attributes in the elements and have a JavaScript routine that collects their values and performs computation. That is a limited way of embedding functions into a picture.

I’m not sure if you can get diagrams exactly like those on the arrows wiki, also note that Conal works with cartesian closed categories rather than arrows, but I’d guess it can be adapted for your purpose.