Dynamic number of asyncs

I’m currently working on a piece of code and I could use some help.

Essentially, what I have is a main thread that accepts connections and delegates work to a handler:


data Conn
acceptConn :: IO Conn
handle :: Conn -> IO ()

main :: IO ()
main = forever $ do
   conn <- acceptConn
   handle conn

What I would like to have is some infrastructure that:

  • Makes handle work asynchronously, that is, handle works in a separate thread.
  • Ensures that any exception that is not caught by handle gets propagated to main
  • If main gets interrupted (ex. due to a CTRL+C), all threads that are still running should also get interrupted.

Notice that operators like mapConcurrently and the like cannot be used here: we don’t have a fixed number of connections so we cannot construct a [Conn] (well, maybe with lazy IO).

Using the async package I came up with the following solution:

main :: IO ()
main = do
  fix $ \loop -> do
    conn <- awaitConn
    withAsync handleConn (\a -> loop `finally` wait a)

This seems to work and checks all the boxes: new connections are accepted immediately, exceptions get propagated and interruptions properly kill any running tasks.

Unfortunately I think there is an issue with the \a -> loop `finally` wait a since we’re constructing on each iteration a larger chain of finallys: wait aN `finally` wait a(N-1) `finally` ... wait a0, so we have a clear memory leak. We would need to ensure that we’re waiting only for those asyncs that are still running but I’m not sure how.

Either way, I think I’m approaching the problem from the wrong direction so I was wondering if anyone knows how to tackle this kind of tasks using the async package (preferably without additional packages).

4 Likes

Would this work?

main = do
   conn <- acceptConn
   concurrently_ (handle conn) main 

Edit: No, that also seems to leak.

Untested, but what about link?

main =
  fix $ \loop -> do
    conn <- awaitConn
    withAsync (handle conn) (\a -> link a >> loop)

Hmm, maybe not. ‘Nesting many withAsync calls requires linear memory’, so maybe no fiddling inside the withAsync handler will do the trick.

1 Like

Wouldn’t the most boring way around be enough? I feel like I’m missing something.

loop = do
  conn <- acceptConn

  this <- myThreadId
  mask_ $
    forkIOWithUnmask $ \unmask -> 
      unmask (handle conn)
        `catch` \ex -> throwTo this (ex :: SomeException)

  loop
1 Like

If you take a look at https://hackage.haskell.org/package/streaming-commons-0.2.3.0/docs/src/Data.Streaming.Network.html#runTCPServerWithHandle you can see that it uses a mix of forkIO and mask, but I wanted to get a solution that uses the async package succinctly, avoiding the need for such low level operations.

Also, does your implementation take into account that if I kill the thread running loop then all children that were spawned with forkIOWithUnmask and are still running should also be terminated? It seems like it only propagates from children to parent.

1 Like

That still leaks according to my tests. It crashes after a few seconds when running a dummy implementation with +RTS -M1g. This is the dummy:

data Conn = Conn
{-# NOINLINE awaitConn #-}
awaitConn :: IO Conn
awaitConn = pure Conn

{-# NOINLINE handle #-}
handle :: Conn -> IO ()
handle Conn = pure ()

Edit: Ah, just saw your edit.

1 Like

I assumed you’d run this on the main thread, which does behave that way, but, indeed, yes, this won’t work that way out of the box for the general case. You can still get the desired behavior by collecting ThreadIds into an MVar/TMVar.

I think we should keep the use of link but dispense with the nested withAsyncs.

Instead of that, we should keep an explicit set of Async values in a mutable reference. The main thread can cancel the pending asyncs as needed.

Something like (haven’t actually tested it :sweat_smile:)

main :: IO ()
main = do
    ref <- newIORef $ Data.Set.empty @(Async ())
    let register theAsync = 
            modifyIORef' ref $ Data.Set.insert theAsync
        finallyDeregister theAsync = flip finally $
            modifyIORef' ref $ Data.Set.delete theAsync
        launch conn = fixIO $ \theAsync -> async $ finallyDeregister theAsync $ handle conn
        loop = do
            bracketOnError awaitConn releaseConn $ \conn -> do
                bracketOnError (launch conn) uninterruptibleCancel $ \theAsync -> do
                    link theAsync
                    register theAsync
            loop
        cleanup = do
            asyncs <- readIORef ref
            for_ asyncs uninterruptibleCancel
    loop `onException` cleanup
1 Like

Consider ditching async for ki. Untested, but this should just work:

main :: IO ()
main = scoped \scope -> forever do
  conn <- acceptConn
  fork scope (handle conn)
8 Likes

I was about to suggest the same thing! Here’s a link to the Ki library:

https://hackage.haskell.org/package/ki-1.0.1.2/docs/Ki.html

1 Like

Maybe like this:

import Control.Concurrent (forkIO, threadDelay, myThreadId, throwTo)
import Control.Exception (catch, SomeException)
import Control.Monad (forever)

data Conn = Conn

acceptConn :: IO Conn
acceptConn = do
  threadDelay 5000000
  pure Conn

handle :: Conn -> IO ()
handle conn = do
  print 0
  threadDelay 10000000
  print (div 1 0)

main :: IO ()
main = do
  parent <- myThreadId
  forever $ do
    conn <- acceptConn
    forkIO $ catch (handle conn) (throwTo parent :: SomeException -> IO ())

Or like this:


import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (link, async)
import Control.Monad (forever)

data Conn = Conn

acceptConn :: IO Conn
acceptConn = do
  threadDelay 5000000
  pure Conn

handle :: Conn -> IO ()
handle conn = do
  print 0
  threadDelay 10000000
  print (div 1 0)

main :: IO ()
main = forever $ do
  conn <- acceptConn
  task <- async (handle conn)
  link task

I think this is the right approach. Unfortunately seems like the async library is not powerful enough to express this behavior.

Both of your solutions do not propagate the cancellation of the thread running the forever loop. It works because you’re using main but it does not work for the general case (ex. try running multiple threads in main, with one of them being the one doing the forever $ do ... loop).

Like this?

import Control.Concurrent (threadDelay, forkIO)
import Control.Concurrent.Async (link, async)
import Control.Monad (forever)

data Conn = Conn

acceptConn :: IO Conn
acceptConn = do
  threadDelay 5000000
  pure Conn

handle :: Conn -> IO ()
handle conn = do
  print 0
  threadDelay 10000000
  print (div 1 0)

test = forever $ do
  conn <- acceptConn
  task <- async (handle conn)
  link task

main :: IO ()
main = do
  task <- async test
  link task
  putStrLn "hello"
  threadDelay 5000000
  putStrLn "world"
  main

I’m having a hard time coming up with a smaller example to show why the proposed approach does not work, but compare the following implementations, one using async as you propose, and the other using ki. Notice how the async implementation has handle threads running even after the server thread has been canceled while ki properly stops them all:

{-# LANGUAGE NumericUnderscores #-}

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.IORef
import Ki
import Prelude hiding (id)

data Conn = Conn

awaitConn :: IO Conn
awaitConn = do
  threadDelay 100_000
  return Conn

handleConn :: Int -> IO ()
handleConn id = do
  threadDelay 2_000_000
  putStrLn $ "> " ++ show id
  return ()

finalizer :: Int -> IO ()
finalizer id = do
  putStrLn $ "< " ++ show id
  return ()

serverAsync :: IO a
serverAsync = do
  ref <- newIORef 0
  forever $ do
    id <- readIORef ref
    _ <- awaitConn
    modifyIORef' ref (+ 1)
    a <- async $ do
      handleConn id `finally` finalizer id
    link a

serverKi :: IO a
serverKi = do
  ref <- newIORef 0
  scoped $ \scope -> forever $ do
    id <- readIORef ref
    _ <- awaitConn
    modifyIORef' ref (+ 1)
    fork scope $ do
      handleConn id `finally` finalizer id

mainAsync :: IO ()
mainAsync = do
  putStrLn "Server started"
  server' <- async (serverAsync `finally` putStrLn "Server stopped")
  link server'

  threadDelay 1_000_000

  cancel server'
  putStrLn "Server cancelled"

  threadDelay 1_000_000
  putStrLn "Main thread finished"

  threadDelay 5_000_000

mainKi :: IO ()
mainKi = do
  putStrLn "Server started"
  server' <- async (serverKi `finally` putStrLn "Server stopped")
  link server'

  threadDelay 1_000_000

  cancel server'
  putStrLn "Server cancelled"

  threadDelay 1_000_000
  putStrLn "Main thread finished"

  threadDelay 5_000_000

main :: IO ()
main = mainAsync
3 Likes