Bounded number of asynchronous jobs

I have a list of jobs :: [IO ()] and I would like to run them asynchronously, with the caveat that I can run at most 5 of them at the same time.

Is there an available function in the ecosystem which would allow me to do something like

runBoundedAsync :: Int -> [IO ()] -> IO ()

where the actions are run asynchronously being sure that no more of 5 of them is run at the same time?

2 Likes

pooledMapConcurrentlyN from the unliftio package perhaps?

I’ve never personally used this function but I’ve seen it get recommended before and there are benchmarks like this one that show that pooledMapConcurrently is the fastest.

3 Likes

I’ve previously used async-pool with success:

I think it addresses that exactly, without needing the whole unliftio business.

2 Likes

Namely, @marcosh you’d use something like mapTasks_, which has almost exactly the signature you’re looking for (you just need to create a task group first):

main = withTaskGroup 5 $ \tg -> do
   mapTasks_ tg [io1, io2, io3, io4]

If the tasks don’t return anything and they are bare IO, you could build something simple around QSem from Control.Concurrent.QSem.

EDIT:

So this appears to work:

import Control.Monad (forM_)
import Control.Concurrent (forkIO)
import Control.Concurrent.QSem qualified as QSem

runJobs :: Int -> [IO ()] -> IO ()
runJobs max jobs = do
  sem <- QSem.newQSem max
  forM_ jobs $ \job -> forkIO $ do
    QSem.waitQSem sem
    job
    QSem.signalQSem sem
2 Likes

Or he could just use pooledMapConcurrentlyN (or other existing options) instead of trying to reinvent the wheel.

Shouldn’t the job be inside a bracket to ensure the sem is signaled even if the action throw an exception?

1 Like

pooledMapConcurrentlyN only works in simple situations, so QSem is definitely something that should be in a Haskeller’s engineering toolkit.

For instance, I wanted my tool to respect a -j param when generating an import graph pruned to a specific module. The algorithm is

  1. Parse imports from the file we are pruning too.
  2. Recursively parse all its imports.
  3. Repeat until we hit the leaves.

I want a stable -j across that concurrency. Using pooledMapConcurrentlyN naively to do step 2 would violate -j since each recursive call would also call it. So a QSem is a better fit. (Code here)

And I wouldn’t really consider it reinventing the wheel. Programming with semaphores is a pretty fundamental engineering skill that every programmer who does concurrency should be able to do :slight_smile:

1 Like

Yup. For any production/work use, you’d probably look into what one can find in libraries you already use and depend on. I.e use async instead of forkIO, maybe even a resource manager that expands over what bracket does, and so on.

Though, if one is interested in reinventing the wheel, then as QSem is implemented in terms of MVars and exception masking, then one might look into using MVars and exceptions directly.