How to tell an upstream conduit to push up it's leftovers?

Lets say I’ve got a good old ByteString conduit source:

source :: Conduit () ByteString m a

And I’ve got a conduit that transforms this into tokens

data Token = AnInt Int | OtherStuff String | EndOfLine | ...

myTransform1 :: Conduit ByteString Token m ()

And lets say I’ve also got:

myTransform2 :: Settings -> Conduit Token Token m ()
myTransform3 :: Conduit Token ByteString m ()

sink :: Conduit ByteString () m ()

Ordinarily, I would want to process one line, which gives me the Settings on how to process the rest.

But, if the Settings are invalid, I don’t want to continue tokenising my input. I just want to pass the whole conduit through unchanged.

i.e. I just want to send the remainder of source straight to sink, not through myTransform(1,2,3)

What I think needs to be done is what I’m using to read from myTransform1 needs to tell myTransform1, after EndOfLine has been pulled from it, to push the “leftovers” from myTransform1 back to source.

But I don’t think I can do this. After all, myTransform1 is basically just awaiting on bytestrings and yielding tokens. Internally there’s an index of where it’s up to in the currently processing bytestring, but that’s not exposed to conduit.

I guess I could change myTransform1 to:

data AbortAtEOL = YesAbortAtEOL | NoAbortAtEOL

myTransform1 :: AbortAtEOL -> Conduit ByteString Token m ()

And have the conduit, if it receives YesAbortAtEOL as an option, then just push any leftovers back once it receives an EndOfLine token.

But I note that this solution is quite specific. What about if in general I have a downstream provider, and it may want to perform some computation before deciding how to continue, but it doesn’t want to lose any data in doing so?

Or am I just going about solving this in a completely wrong way?

In short, I want to process a little bit of a Conduit ByteString source, but if something goes wrong, I just want to pass it through to a sink without losing information.

use a conduit of Either instead, and assemble the results at the end?

I think you want conduit's “connect and resume” operators. The way I think it works is:

  1. Write a settingsC :: Conduit ByteString Void m (Either SettingsParseError Settings).
  2. Use ($$+) to parse the settings and give you a “sealed conduit” for the rest of the stream. source $$+ settingsC should give you m (SealedConduitT () a m (), Either SettingsParseError Settings).
  3. Connect to the sealed conduit (and don’t return an updated one) using ($$+-): sealedSource $$+- myTransform1 .| myTransform2 settings .| myTransform3 .| sink

Honestly, trying to figure out how to do nontrivial things in conduit is what made me switch to the streaming ecosystem for anything more complex than source-to-sink connection, in which case you’d write a function like:

-- | Attempt to parse the header records from the start of a byte
-- stream. On success, return the remainder of the stream.
decodeHeader ::
  Monad m =>
  ByteStream m r ->
  m (Either String (Header, ByteStream m r))
1 Like

I’m going to answer by own question here. Writing it out helped me think through it, I’m going to leave it here incase it’s helpful to anyone else. Thanks for the answers in the meantime.

I think I do need to modify myTransform, but with one simple modification. Change:

myTransform1 :: Conduit ByteString Token m ()

to:

myTransform1 :: Conduit ByteString (ByteString, Token) m ()

Where the returned ByteString is the remainder of the current chunk

Then any downstream consumer can push that back onto the remaining conduit from $$+.