Atomicity of `readChan`?

I’m very surprised by a behavior that involves race and readChan. Consider this small example:

{-# LANGUAGE NumericUnderscores #-}

{- Cabal file:

executable example
  main-is:             Example.hs
  default-language:    Haskell2010
  ghc-options:         -Wall -threaded -rtsopts -with-rtsopts=-N

  hs-source-dirs: .
  build-depends:       base >= 4.8 && <5
                     , async
-}


import Control.Concurrent (forkIO, threadDelay)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.Chan (newChan, readChan, writeChan)
import Control.Monad (forM_, forever)

main :: IO ()
main = do
  chan <- newChan
  -- Writer thread writes a number every 10ms to a channel
  _ <-
    forkIO
      ( forM_
          [(1 :: Int) ..]
          (\i -> threadDelay 10_000 *> writeChan chan i)
      )
  -- Reader thread tries to read from the channel. If nothing
  -- is available within 5ms, do nothing (pure ()), but if something
  -- is available, print it
  forever
    ( Async.race
        (threadDelay 5_000)
        (readChan chan)
        >>= either
          pure
          print
    )

I would expect to see numbers 1,2,3,4,5 … printed in order. However, when I run this, some numbers are skipped:

1
2
4
5
7
9
10
12
...

I would have expected that if readChan doesn’t return (because the channel is empty), then cancelling readChan from within race would not drop values in the channel.

How can I think about this?

The implementation of readChan uses mask, so it will always finish the whole operation once it has started. I guess that if it does get interrupted, that will only take effect after the mask, so right before it would return which drops the value. Nevermind, this is completely false in multiple ways.

1 Like

Uh, thanks! What a head scratcher.

Do you have any suggestions as to what data structure would be more suitable for this, if I needed to use ‘race’ to return data at least ever N seconds?

I guess TChan has a non-blocking read, so that’s a good start

Indeed, but it also doesn’t use mask and instead relies on the STM rollback mechanism, so you could probably even just use it as a drop in replacement in your current code.

1 Like

Thanks, I’ll try it out later today

You could also turn it inside out by racing loops instead of looping races.

Can you expand what you mean? Like race one loop continuously using non blocking reads VS another loop incrementing a counter?

I don’t think this is 100% the case, because most blocking MVar operations are interruptible, even inside a mask.

Could it be that, because the delay 10_000 is a multiple of 5_000, sometimes the reader thread is killed just after returning from readChan with a value?

4 Likes

I was wrong, readChan doesn’t use mask and indeed even if it would it would not stop async exceptions from interrupting blocking MVar operations.

Your guess also seems right. If I change the writer thread delay to 7_500 then nothing gets dropped.

Rust has the „select“ macro for such cases, btw :slight_smile:

It’s a rather common footgun

It depends on your application logic but essentially you want to avoid your readChan thread non-atomically transferring its result to the TVar stored in the Async in race.

I think here you could just wait 5 seconds and then do a non-blocking read.

You could also mask the entire thread that reads from the chan, it will be unmasked while blocking on the MVar (as already mentioned) but then can’t get cancelled after it completed its read.

Idk if race is enough then though, (putting to the TVar still would have to be masked) haven’t looked at its definition

This is a simple race condition involving, unsurprisingly, a literal function call to race.

The race is on what the race function chooses; the scheduler doesn’t guarantee exact wake up times or anything like that.

And readChan doesn’t automagically put the value back; like in STM. Which is the source of the bug.

If I was just reading the code I’d consider it a concurrency code smell. Don’t rely on timing basically ever in concurrent code and you’ll be fine.

The scary part is that sometimes you write code like this and it works… until it’s deployed a week later and the timings don’t align up as nicely.

4 Likes

I think it’s very sensible code and an actual footgun, you need to really understand how race is implemented and other languages do prevent things like this. (Cf rust tokio select)

Yeah, true, it’s something that only becomes normal once you know it. I’d agree, Chan and MVar are full of pitfalls despite seeming pretty reasonable on the surface. I think most of Control.Concurrent is like that. (With exceptions, STM is wonderful!)

main :: IO ()
main = do
 chan <- newChan
 -- Writer thread writes a number every 10ms to a channel
 _ <-
  forkIO
   ( forM_
      [(1 :: Int) ..]
      (\i -> threadDelay 10_000 *> writeChan chan i)
   )
 forever do
  v <- newEmptyMVar
  -- this mask makes it such that the blocking behaviour 
  -- of readChan remains interruptible, s.t. when
  -- the main thread kills it while waiting, we can continue
  rdId <- mask_ $ forkIO do
   res <- readChan chan `catch` \ThreadKilled -> putStrLn "thread was killed" *> throwIO ThreadKilled
   -- this mask is uninterruptible since we know that the 
   -- operation of putting the MVar will always immediately 
   -- succeed and we don't want the thread to be interruptible
   -- because then we could lose the result while putting the 
   -- MVar
   uninterruptibleMask_ do
    putMVar v res

  threadDelay 5_000
  killThread rdId

  tryReadMVar v >>= \case
   Nothing -> pure ()
   Just r -> print r
1 Like

unagi-chan mentions this issue in its readChan documentation and gives you a way to recover on async exceptions.
You’ll probably want to use it over Chan for performance reasons anyway

5 Likes

Interestingly, naively replacing Chan with TChan didn’t help at all!

There’s no intention to rely on exact timing.

What I want is for a stream to produce values at most every N seconds (in the case of the example, at most every 5 milliseconds). Whatever solution should also work if there’s some harmonics between the reader and writer frequency, which there happens to be in the example.