Async job queues? [solved: use pooledMapConcurrently]

Hi!

I’m looking for a package or approach to solve this scenario.

We have N jobs :: IO (), and M < N capabiltiies (hardware threads). As it happens, the runtime of the jobs is wildly variable, ranging from hours to weeks, due to a complex set of constraints.

Each job is independent and we don’t care about ordering the results and the job running times are unknown up front.

After some tinkering with async and timeout threads I came up with something a bit too complex for my taste.

1 Like

what about the pooled.*Concurrently set of functions? You don’t sound like you want to reschedule and just wait until it comes to an end? Then I think this is enough.
https://hackage.haskell.org/package/unliftio-0.2.25.0/docs/UnliftIO-Async.html#g:9

1 Like

I think in your case you would just want pooledMapConcurrently which takes your M capabilities and let it run on your list of jobs of length N.

Thanks! But to me this looks like it will wait for the whole batch to end before scheduling the next one, which in my case will gradually leave threads idle.

ah! I understand the problem. I guess the ordering of outputs is also not important?

no, each job writes to a different file, independently.

{-# LANGUAGE BlockArguments #-}

module Main where

import UnliftIO
import UnliftIO.Concurrent

work :: [IO a] -> IO [a]
work jobs = do
  sem <- newQSem =<< getNumCapabilities
  go sem [] jobs
 where
  go _ acc [] = traverse wait acc
  go sem acc (job : jobs) = do
    as <- do
      waitQSem sem
      async do
        job `finally` signalQSem sem

    go sem (as : acc) jobs

main :: IO ()
main = do
  setNumCapabilities 2
  print =<< getNumCapabilities
  print =<< work (threads [n ^ 2 | n <- [10, 5, 1]])
 where
  threads = map \n -> do
    tid <- myThreadId
    let m = n * 100_000
    putStrLn (show tid <> ": sleeping " <> show m)
    threadDelay m
    putStrLn (show tid <> ": waking up")
    pure n

seems to work:

2
ThreadId 21: sleeping 10000000
ThreadId 22: sleeping 2500000
ThreadId 22: waking up
ThreadId 23: sleeping 100000
ThreadId 23: waking up
ThreadId 21: waking up
[1,25,100]
1 Like

work:

  • provide a list of jobs
  • make a new semaphore which represent the available threads
  • start with zero queued threads

go:

  • if no further jobs available, wait for all the async jobs remaining
  • if further jobs available:
    • wait for a capability to become available (waitQSem)
    • queue the job
    • free the capability if the job fails or completes
  • recursively call with one less job

as you can see in my test, you have the longest running thread, 21 start processing with the second longest running thread, thread 22, but it finishes earlier, so 23 starts going and wakes up and finally thread 21 wakes up.

1 Like

I so happen to have written this recently: Transparently hand off jobs to a pool of workers. · GitHub

Since you don’t need the results you can simplify it a fair bit.

2 Likes

ah, waitQSem was what I was missing. Thanks a lot!

1 Like

I wonder what happens if you just traverse with asyncBound and wait for all Asyncs? (doesn’t seem to work?)

this too looks good, thanks! Are you planning to publish it as package as well?

Hadn’t planned on it—I figured something pretty close would already be available somewhere. But if not (or it’s too hard to find) then perhaps.

cleaned it up a bit and added some utility functions:
https://bin.mangoiv.com/note?id=8fa0d4af-8db5-4af6-b334-6a39b1d0a77d

IIUC, I have a function that does something like that here. Also the input jobs can be produced incrementally.

As for a more mature library, perhaps the concurrency features of streamly could do the trick? In particular, I’m looking at the docs for (|$), but it’s not clear to me if the results are returned in the original order or not. In your case you would like your results to appear as the jobs are completed.

2 Likes

Does it? I have always thought the use case for the pooled*Concurrently family of functions was this exact scenario. In fact, I have modified the code @MangoIV provided to include the pooledMapConcurrently variant, and they seem to behave identical (wrt to the scheduling behavior). The output order seems to differ, but you mentioned that it is not important here:

{-# LANGUAGE BlockArguments #-}

module Main where

import UnliftIO
import UnliftIO.Concurrent
import System.Environment (getArgs)
import Data.Functor (($>))

work :: [IO a] -> IO [a]
work jobs = do
  sem <- newQSem =<< getNumCapabilities 
  go sem [] jobs
 where
  go _ acc [] = traverse wait acc
  go sem acc (job : jobs) = do
    as <- do
      waitQSem sem
      async do
        job `finally` signalQSem sem

    go sem (as : acc) jobs

work2 :: [IO a] -> IO [a]
work2 = pooledMapConcurrently id

main :: IO ()
main = do
  [variant] <- getArgs
  setNumCapabilities 2
  print =<< getNumCapabilities 
  worker <-
    if variant == "1"
       then putStrLn "handrolled variant" $> work
       else putStrLn "pooledMapConcurrently variant" $> work2
  print =<< worker (threads [n ^ 2 | n <- [10, 5, 1]])
 where
  threads = map \n -> do
    tid <- myThreadId
    let m = n * 100_000
    putStrLn (show tid <> ": sleeping " <> show m)
    threadDelay m
    putStrLn (show tid <> ": waking up")
    pure n

and the outputs:

$ cabal exec tmp-map-concurrently 1
2
handrolled variant
ThreadId 8: sleeping 2500000
ThreadId 7: sleeping 10000000
ThreadId 8: waking up
ThreadId 9: sleeping 100000
ThreadId 9: waking up
ThreadId 7: waking up
[1,25,100]

$ cabal exec tmp-map-concurrently 2
2
pooledMapConcurrently variant
ThreadId 8: sleeping 2500000
ThreadId 7: sleeping 10000000
ThreadId 8: waking up
ThreadId 8: sleeping 100000
ThreadId 8: waking up
ThreadId 7: waking up
[100,25,1]
3 Likes

Interesting, thanks for investigating! This property was not obvious from the documentation.

Edit: indeed, the behaviour is only documented in a function that is not exported : https://hackage.haskell.org/package/unliftio-0.2.25.0/docs/src/UnliftIO.Internals.Async.html#pooledConcurrently

1 Like