I’m looking for a package or approach to solve this scenario.
We have N jobs :: IO (), and M < N capabiltiies (hardware threads). As it happens, the runtime of the jobs is wildly variable, ranging from hours to weeks, due to a complex set of constraints.
Each job is independent and we don’t care about ordering the results and the job running times are unknown up front.
After some tinkering with async and timeout threads I came up with something a bit too complex for my taste.
Thanks! But to me this looks like it will wait for the whole batch to end before scheduling the next one, which in my case will gradually leave threads idle.
{-# LANGUAGE BlockArguments #-}
module Main where
import UnliftIO
import UnliftIO.Concurrent
work :: [IO a] -> IO [a]
work jobs = do
sem <- newQSem =<< getNumCapabilities
go sem [] jobs
where
go _ acc [] = traverse wait acc
go sem acc (job : jobs) = do
as <- do
waitQSem sem
async do
job `finally` signalQSem sem
go sem (as : acc) jobs
main :: IO ()
main = do
setNumCapabilities 2
print =<< getNumCapabilities
print =<< work (threads [n ^ 2 | n <- [10, 5, 1]])
where
threads = map \n -> do
tid <- myThreadId
let m = n * 100_000
putStrLn (show tid <> ": sleeping " <> show m)
threadDelay m
putStrLn (show tid <> ": waking up")
pure n
seems to work:
2
ThreadId 21: sleeping 10000000
ThreadId 22: sleeping 2500000
ThreadId 22: waking up
ThreadId 23: sleeping 100000
ThreadId 23: waking up
ThreadId 21: waking up
[1,25,100]
make a new semaphore which represent the available threads
start with zero queued threads
go:
if no further jobs available, wait for all the async jobs remaining
if further jobs available:
wait for a capability to become available (waitQSem)
queue the job
free the capability if the job fails or completes
recursively call with one less job
as you can see in my test, you have the longest running thread, 21 start processing with the second longest running thread, thread 22, but it finishes earlier, so 23 starts going and wakes up and finally thread 21 wakes up.
IIUC, I have a function that does something like that here. Also the input jobs can be produced incrementally.
As for a more mature library, perhaps the concurrency features of streamly could do the trick? In particular, I’m looking at the docs for (|$), but it’s not clear to me if the results are returned in the original order or not. In your case you would like your results to appear as the jobs are completed.
Does it? I have always thought the use case for the pooled*Concurrently family of functions was this exact scenario. In fact, I have modified the code @MangoIV provided to include the pooledMapConcurrently variant, and they seem to behave identical (wrt to the scheduling behavior). The output order seems to differ, but you mentioned that it is not important here:
{-# LANGUAGE BlockArguments #-}
module Main where
import UnliftIO
import UnliftIO.Concurrent
import System.Environment (getArgs)
import Data.Functor (($>))
work :: [IO a] -> IO [a]
work jobs = do
sem <- newQSem =<< getNumCapabilities
go sem [] jobs
where
go _ acc [] = traverse wait acc
go sem acc (job : jobs) = do
as <- do
waitQSem sem
async do
job `finally` signalQSem sem
go sem (as : acc) jobs
work2 :: [IO a] -> IO [a]
work2 = pooledMapConcurrently id
main :: IO ()
main = do
[variant] <- getArgs
setNumCapabilities 2
print =<< getNumCapabilities
worker <-
if variant == "1"
then putStrLn "handrolled variant" $> work
else putStrLn "pooledMapConcurrently variant" $> work2
print =<< worker (threads [n ^ 2 | n <- [10, 5, 1]])
where
threads = map \n -> do
tid <- myThreadId
let m = n * 100_000
putStrLn (show tid <> ": sleeping " <> show m)
threadDelay m
putStrLn (show tid <> ": waking up")
pure n