I’m currently working on a piece of code and I could use some help.
Essentially, what I have is a main thread that accepts connections and delegates work to a handler:
data Conn
acceptConn :: IO Conn
handle :: Conn -> IO ()
main :: IO ()
main = forever $ do
conn <- acceptConn
handle conn
What I would like to have is some infrastructure that:
Makes handle work asynchronously, that is, handle works in a separate thread.
Ensures that any exception that is not caught by handle gets propagated to main
If main gets interrupted (ex. due to a CTRL+C), all threads that are still running should also get interrupted.
Notice that operators like mapConcurrently and the like cannot be used here: we don’t have a fixed number of connections so we cannot construct a [Conn] (well, maybe with lazy IO).
Using the async package I came up with the following solution:
main :: IO ()
main = do
fix $ \loop -> do
conn <- awaitConn
withAsync handleConn (\a -> loop `finally` wait a)
This seems to work and checks all the boxes: new connections are accepted immediately, exceptions get propagated and interruptions properly kill any running tasks.
Unfortunately I think there is an issue with the \a -> loop `finally` wait a since we’re constructing on each iteration a larger chain of finallys: wait aN `finally` wait a(N-1) `finally` ... wait a0, so we have a clear memory leak. We would need to ensure that we’re waiting only for those asyncs that are still running but I’m not sure how.
Either way, I think I’m approaching the problem from the wrong direction so I was wondering if anyone knows how to tackle this kind of tasks using the async package (preferably without additional packages).
Also, does your implementation take into account that if I kill the thread running loop then all children that were spawned with forkIOWithUnmask and are still running should also be terminated? It seems like it only propagates from children to parent.
I assumed you’d run this on the main thread, which does behave that way, but, indeed, yes, this won’t work that way out of the box for the general case. You can still get the desired behavior by collecting ThreadIds into an MVar/TMVar.
Both of your solutions do not propagate the cancellation of the thread running the forever loop. It works because you’re using main but it does not work for the general case (ex. try running multiple threads in main, with one of them being the one doing the forever $ do ... loop).
I’m having a hard time coming up with a smaller example to show why the proposed approach does not work, but compare the following implementations, one using async as you propose, and the other using ki. Notice how the async implementation has handle threads running even after the server thread has been canceled while ki properly stops them all:
{-# LANGUAGE NumericUnderscores #-}
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.IORef
import Ki
import Prelude hiding (id)
data Conn = Conn
awaitConn :: IO Conn
awaitConn = do
threadDelay 100_000
return Conn
handleConn :: Int -> IO ()
handleConn id = do
threadDelay 2_000_000
putStrLn $ "> " ++ show id
return ()
finalizer :: Int -> IO ()
finalizer id = do
putStrLn $ "< " ++ show id
return ()
serverAsync :: IO a
serverAsync = do
ref <- newIORef 0
forever $ do
id <- readIORef ref
_ <- awaitConn
modifyIORef' ref (+ 1)
a <- async $ do
handleConn id `finally` finalizer id
link a
serverKi :: IO a
serverKi = do
ref <- newIORef 0
scoped $ \scope -> forever $ do
id <- readIORef ref
_ <- awaitConn
modifyIORef' ref (+ 1)
fork scope $ do
handleConn id `finally` finalizer id
mainAsync :: IO ()
mainAsync = do
putStrLn "Server started"
server' <- async (serverAsync `finally` putStrLn "Server stopped")
link server'
threadDelay 1_000_000
cancel server'
putStrLn "Server cancelled"
threadDelay 1_000_000
putStrLn "Main thread finished"
threadDelay 5_000_000
mainKi :: IO ()
mainKi = do
putStrLn "Server started"
server' <- async (serverKi `finally` putStrLn "Server stopped")
link server'
threadDelay 1_000_000
cancel server'
putStrLn "Server cancelled"
threadDelay 1_000_000
putStrLn "Main thread finished"
threadDelay 5_000_000
main :: IO ()
main = mainAsync