I’m very surprised by a behavior that involves race and readChan. Consider this small example:
{-# LANGUAGE NumericUnderscores #-}
{- Cabal file:
executable example
main-is: Example.hs
default-language: Haskell2010
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
hs-source-dirs: .
build-depends: base >= 4.8 && <5
, async
-}
import Control.Concurrent (forkIO, threadDelay)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.Chan (newChan, readChan, writeChan)
import Control.Monad (forM_, forever)
main :: IO ()
main = do
chan <- newChan
-- Writer thread writes a number every 10ms to a channel
_ <-
forkIO
( forM_
[(1 :: Int) ..]
(\i -> threadDelay 10_000 *> writeChan chan i)
)
-- Reader thread tries to read from the channel. If nothing
-- is available within 5ms, do nothing (pure ()), but if something
-- is available, print it
forever
( Async.race
(threadDelay 5_000)
(readChan chan)
>>= either
pure
print
)
I would expect to see numbers 1,2,3,4,5 … printed in order. However, when I run this, some numbers are skipped:
1
2
4
5
7
9
10
12
...
I would have expected that if readChan doesn’t return (because the channel is empty), then cancelling readChan from within race would not drop values in the channel.
The implementation of readChan uses mask, so it will always finish the whole operation once it has started. I guess that if it does get interrupted, that will only take effect after the mask, so right before it would return which drops the value. Nevermind, this is completely false in multiple ways.
Do you have any suggestions as to what data structure would be more suitable for this, if I needed to use ‘race’ to return data at least ever N seconds?
Indeed, but it also doesn’t use mask and instead relies on the STM rollback mechanism, so you could probably even just use it as a drop in replacement in your current code.
I don’t think this is 100% the case, because most blocking MVar operations are interruptible, even inside a mask.
Could it be that, because the delay 10_000 is a multiple of 5_000, sometimes the reader thread is killed just after returning from readChan with a value?
It depends on your application logic but essentially you want to avoid your readChan thread non-atomically transferring its result to the TVar stored in the Async in race.
I think here you could just wait 5 seconds and then do a non-blocking read.
You could also mask the entire thread that reads from the chan, it will be unmasked while blocking on the MVar (as already mentioned) but then can’t get cancelled after it completed its read.
Idk if race is enough then though, (putting to the TVar still would have to be masked) haven’t looked at its definition
I think it’s very sensible code and an actual footgun, you need to really understand how race is implemented and other languages do prevent things like this. (Cf rust tokio select)
Yeah, true, it’s something that only becomes normal once you know it. I’d agree, Chan and MVar are full of pitfalls despite seeming pretty reasonable on the surface. I think most of Control.Concurrent is like that. (With exceptions, STM is wonderful!)
main :: IO ()
main = do
chan <- newChan
-- Writer thread writes a number every 10ms to a channel
_ <-
forkIO
( forM_
[(1 :: Int) ..]
(\i -> threadDelay 10_000 *> writeChan chan i)
)
forever do
v <- newEmptyMVar
-- this mask makes it such that the blocking behaviour
-- of readChan remains interruptible, s.t. when
-- the main thread kills it while waiting, we can continue
rdId <- mask_ $ forkIO do
res <- readChan chan `catch` \ThreadKilled -> putStrLn "thread was killed" *> throwIO ThreadKilled
-- this mask is uninterruptible since we know that the
-- operation of putting the MVar will always immediately
-- succeed and we don't want the thread to be interruptible
-- because then we could lose the result while putting the
-- MVar
uninterruptibleMask_ do
putMVar v res
threadDelay 5_000
killThread rdId
tryReadMVar v >>= \case
Nothing -> pure ()
Just r -> print r
What I want is for a stream to produce values at most every N seconds (in the case of the example, at most every 5 milliseconds). Whatever solution should also work if there’s some harmonics between the reader and writer frequency, which there happens to be in the example.