Help: Haskell async behavior I don't understand

Hi all!

Haskell is said to be great with concurrency, so they say, even though I haven’t really explored that avenue much. Recently I heard a podcast where someone talked about a little challenge/scenario they built to compare structured concurrency in several languages. That person was coming from a Scala background, but there are solutions in many languages.

Haskell is missing, so I thought, that should be easy! Did not turn out so easy.

The main Scala implementations seem to use functions for racing several concurrent tasks, that only consider a “successfully” finished task to have won the race. The Haskell async package doesn’t seem to have such a function, but no problem, I’ll write my own:

-- Race a list of actions and return the first one to
-- _successfully_ finish. Fails if all actions fail.
raceAnySuccess :: [IO a] -> IO a
raceAnySuccess tasks =
  bracket
  (trace "acquire" $ mapM async tasks)
  (trace "release" cancelMany)
  (trace "waitForSuccess" waitForSuccess)
  where
    waitForSuccess :: [Async a] -> IO a
    waitForSuccess [] = error "All tasks failed!"
    waitForSuccess asyncs = do
      (completed, result) <- waitAnyCatch asyncs
      case result of
        Right val -> trace "success" $ return val
        Left _ ->
          let remaining = trace "filtering" $ filter (/= completed) asyncs
           in trace "recursive" $ waitForSuccess remaining

Great. Now, once scenario requires you to start 10,000 tasks, race the one that finishes first, cancel the others.

I wrote a little reproducer to highlight my issue: Haskell async - race successful tasks · GitHub

In that example, I start one task that just waits (threadDelay) for 1 second and then finishes. The other ones are running infinitely, just calling threadDelay again and again.
So my expectation: Run 1 second + whatever overhead you need to start and cancel n tasks.
Works fine for 1,000 tasks:

real 0m1,123s
user 0m0,987s
sys 0m0,339s

How long does it take for 5,000 tasks? 1 Second + 5 times the overhead? I.e. linear increase? Nope:

real 2m52,497s
user 2m52,823s
sys 0m0,907s

I’m using GHC 9.6.7 (ghcup recommended). Cabal 3.12.1.0. On Ubuntu. I ran the tests with:
cabal build && time cabal run all -- +RTS -N6 -RTS
(on an 8 core machine).

It prints the waitForSuccess message and then stalls. I have 1 single core at 100% utilization.
I used the -B option (yeah, really), to see if it was the GC and heard a bing.
Looking at the RTS opts docs, I tried -xn (non-moving GC), but that’s somehow worse:

real 3m38,358s
user 3m39,272s
sys 0m0,654s

The parallel GC options should all be enabled by default, according to the docs, that is -qg, -qb, -qn, even though I only see one core engaged.

Is there anything obviously wrong with my code? If not, can anyone suggest some tuning options, or ways how to debug this?

Would appreciate any input. Thank you.

3 Likes

The first thing I thought of was to use:

raceAnySuccess = foldr1 (\x y -> either id id <$> race x y)

You can also write it more nicely like this:

raceAnySuccess = runConcurrently . asum . map Concurrently

And that is a lot faster. There is a comment in the source code of race:

-- More ugly than the Async versions, but quite a bit faster.

Maybe that has to do with it. I don’t know why the Async version is so slow.


Edit: this does ignore catching exceptions in the child threads. See my next comment for how to do that: Help: Haskell async behavior I don't understand - #4 by jaror

That does look way nicer and is faster. However, it does not do the same thing. I mentioned the requirement in my text, but failed to include it in the example.

The actual code will perform HTTP requests in the async tasks. Some of them will fail with an exception (due to connection error), others will fail with a non 200 status code (which I also map to an exception). race will take the first task to finish - successfully or with an error.

I need to discard the errors and wait for the first successful task.

1 Like

You can do that like this:

raceAnySuccess xs = do
  nref <- newIORef (length xs)
  let
    h :: SomeException -> IO a
    h e = do
      n <- atomicModifyIORef' nref (\n -> (n - 1, n - 1))
      if n == 0 then error "All threads failed" else forever
  runConcurrently $ asum $
    map
      (\x -> Concurrently $
        x `catches` [ Handler (\(e :: AsyncCancelled) -> throwIO e)
                    , Handler h ])
      xs

Instead of catching and rethrowing AsyncCancelled you can also just only catch the particular errors you’re interested in, if you know your domain.

By the way, you might see even better results if you use Conc from unliftio. Edit: I tried this and it also seems to be very slow.

First off, thank you for your solution.

It does work nicely, with one caveat. I guess forever is supposed to keep the task around until the race is over? E.g., forever = threadDelay 1000 >> forever? Would be nice to just discard it and continue with the rest. But I guess that’s more involved…

Still bugs me that I don’t understand what is happening in my “naive” version. I was trying to write this using mostly high-level constructs and having it be somewhat understandable.
It’s even more frustrating, that I don’t even know how to inspect that black box - i.e. the program not making any progress.

1 Like

I’ve played around with this some more and came up with this direct solution:

{- cabal:
build-depends: base
default-language: GHC2021
ghc-options: -threaded
-}

import Control.Concurrent (killThread, threadDelay, forkFinally)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Data.IORef (newIORef, atomicModifyIORef')
import Control.Monad (when)
import Data.Foldable (for_)
import Data.Traversable (for)

sleep :: Int -> IO ()
sleep n = threadDelay (n * 1000000)

main :: IO ()
main = do 
  let n = 100000
  let xs = sleep 1 : replicate (n - 1) (sleep 100)
  liveCount <- newIORef n
  result <- newEmptyMVar 
  threads <- for xs $ \x -> do
    forkFinally x $ \e -> do
      case e of
        Left e -> do
          n' <- atomicModifyIORef' liveCount (\n -> let !n' = n - 1 in (n', n'))
          when (n' == 0) (putMVar result Nothing)
        Right x' -> putMVar result (Just x')
  x <- takeMVar result
  case x of
    Nothing -> error "All threads failed"
    Just x' -> do
      for_ threads killThread
      putStrLn ("Success: " ++ show x')

On my machine it takes just 1.170 seconds for 100k threads (so 170 ms overhead).

Edit: Note that running with -N makes it take longer (I think because of GC).

The behaviour you want is easier to get with standard base concurrency than async (or ki).

raceAll :: [IO a] -> IO a
raceAll jobs = bracket
  do result  <- newEmptyMVar
     threads <- for jobs \job -> forkIO (job >>= putMVar result)
     pure (result, threads)
  (forkIO . traverse_ killThread . snd)
  (         takeMVar             . fst)

Handles 100,000 threads easy. (Throws BlockedIndefinitelyOnMVar if all jobs fail.)

Ah, cool idea to rely on BlockedIndefinitelyOnMVar. However, you may want to silence the errors to prevent them from being printed to stderr:

raceAll :: [IO a] -> IO a
raceAll jobs = bracket
  (do result  <- newEmptyMVar
      threads <- for jobs \job -> forkIO $
        (job >>= putMVar result) `catch` \(_ :: SomeException) -> pure ()
      pure (result, threads))
  (forkIO . traverse_ killThread . snd)
  (         takeMVar             . fst)

If it’s due to gc (not unlikely) you can use something like -N16 -qn3 iirc. Which should only use three threads for GC.

That does not seem to help. I do only have a 6 core machine.

To be precise, this is without -N:

$ cabal -v1 run Main.hs -- +RTS -s
Success
     875,575,368 bytes allocated in the heap
     640,641,160 bytes copied during GC
     743,446,976 bytes maximum residency (9 sample(s))
       8,842,376 bytes maximum slop
             853 MiB total memory in use (0 MiB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       155 colls,     0 par    0.127s   0.127s     0.0008s    0.0184s
  Gen  1         9 colls,     0 par    0.291s   0.293s     0.0325s    0.0933s

  TASKS: 4 (1 bound, 3 peak workers (3 total), using -N1)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.142s  (  0.711s elapsed)
  GC      time    0.418s  (  0.420s elapsed)
  EXIT    time    0.029s  (  0.028s elapsed)
  Total   time    0.589s  (  1.160s elapsed)

  Alloc rate    6,164,581,987 bytes per MUT second

  Productivity  24.1% of total user, 61.3% of total elapsed

And this is with -N:

$ cabal -v1 run Main.hs -- +RTS -s -N
Success
   1,338,393,984 bytes allocated in the heap
     729,304,192 bytes copied during GC
     651,358,200 bytes maximum residency (7 sample(s))
      11,256,136 bytes maximum slop
            1187 MiB total memory in use (4 MiB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       239 colls,   239 par    0.286s   0.081s     0.0003s    0.0130s
  Gen  1         7 colls,     6 par    1.054s   0.250s     0.0357s    0.0713s

  Parallel GC work balance: 73.62% (serial 0%, perfect 100%)

  TASKS: 37 (1 bound, 36 peak workers (36 total), using -N12)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.003s  (  0.001s elapsed)
  MUT     time    2.490s  (  1.798s elapsed)
  GC      time    1.341s  (  0.331s elapsed)
  EXIT    time    0.042s  (  0.010s elapsed)
  Total   time    3.876s  (  2.140s elapsed)

  Alloc rate    537,415,118 bytes per MUT second

  Productivity  64.3% of total user, 84.0% of total elapsed

I you can modify your tasks so that errors are signalled using an Either instead of exceptions (this may require catching their exceptions and putting them in a branch of the Either) then perhaps we could use the ConcurrentlyE datatype, like this:

raceAnySuccess :: [IO (Either a ())] -> IO (Either a ())
raceAnySuccess = runConcurrentlyE . foldMap ConcurrentlyE

ConcurrentlyE is a variant/generalization of Concurrently. If the concurrent actions all return with a Right, we wait for all and return a Right. But the moment a concurrent action returns a Left, all the other actions are cancelled and we return a Left. (ConcurrentlyE Void a, where no actions might produce a Left value, behaves similarly to Concurrently.)

Here we would use a Left to indicate task success. Also, in the example, I put a () in the Right branch for simplicity, but we could also put some error accumulator like [IOException], thanks to the following instances:

(Semigroup a, Monoid a) => Monoid (ConcurrentlyE e a)

Semigroup a => Semigroup (ConcurrentlyE e a)

If you wanted to use regular Concurrently, I would define a special purpose exception to signal the successful completion of a task. It’s kind of odd to use exceptions for the “happy path” but here it would be warranted I think.

Something weird is going on.

I simplified your example:

forever :: IO String
forever = delay 10 >> forever

terminates :: IO String
terminates = delay 4 >> pure "terminates"

fails :: IO String
fails = delay 2 >> throwIO TimeoutException

delay s = threadDelay (1000000 * s)

main :: IO ()
main = do
  x <- mapM async $ terminates : fails : replicate 2000 forever
  (c, r) <- waitAny x
  print r

Compiled with GHC 9.12.1 on Windows with -O1 -threaded and ran with -N -s (I got 4 cores, so -N4).

Results

1 forever, 2s, TimeoutException
100 forever, 2s, TimeoutException
1000 forever, 2s, TimeoutException
2000 forever, 10s, “terminates” (I can actually consistently reproduce this one)
3000 forever, 40s, TimeoutException
4000 forever, 90s, TimeoutException
5000 forever, 200s, TimeoutException

5002 TMVars, huh.

Also, all the time is spent on MUT:

2000 forever
"terminates"
       5,682,568 bytes allocated in the heap
       4,652,128 bytes copied during GC
       2,633,832 bytes maximum residency (2 sample(s))
         290,712 bytes maximum slop
              23 MiB total memory in use (0 MiB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0         0 colls,     0 par    0.000s   0.000s     0.0000s    0.0000s
  Gen  1         2 colls,     1 par    0.000s   0.004s     0.0021s    0.0030s

  Parallel GC work balance: 97.65% (serial 0%, perfect 100%)

  TASKS: 14 (1 bound, 5 peak workers (13 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.000s  (  0.002s elapsed)
  MUT     time   10.844s  ( 10.836s elapsed)
  GC      time    0.000s  (  0.004s elapsed)
  EXIT    time    0.000s  (  0.001s elapsed)
  Total   time   10.844s  ( 10.843s elapsed)

  Alloc rate    524,040 bytes per MUT second

  Productivity 100.0% of total user, 99.9% of total elapsed
5000 forever
program: Uncaught exception test-0.1.0-inplace-program:Main.TimeoutException:

TimeoutException
      21,487,136 bytes allocated in the heap
      26,980,016 bytes copied during GC
       8,218,536 bytes maximum residency (3 sample(s))
       1,050,712 bytes maximum slop
              33 MiB total memory in use (0 MiB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0         2 colls,     2 par    0.000s   0.007s     0.0035s    0.0038s
  Gen  1         3 colls,     2 par    0.000s   0.008s     0.0027s    0.0044s

  Parallel GC work balance: 67.99% (serial 0%, perfect 100%)

  TASKS: 16 (1 bound, 5 peak workers (15 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.000s  (  0.002s elapsed)
  MUT     time  194.312s  (199.678s elapsed)
  GC      time    0.000s  (  0.015s elapsed)
  EXIT    time    0.000s  (  0.002s elapsed)
  Total   time  194.312s  (199.697s elapsed)

  Alloc rate    110,580 bytes per MUT second

  Productivity 100.0% of total user, 100.0% of total elapsed
2 Likes

If someone would be willing to make my life easier by putting together a reproducer that only depends on GHC (copying the relevant async code into the reproducer is fine) and open a GHC ticket I can take a closer look at this.

4 Likes

Done: #26028: Very slow STM (observed in async package) · Issues · Glasgow Haskell Compiler / GHC · GitLab

7 Likes

Wow, thanks everyone for pitching in. Didn’t expect this to go so deep…

I do like u/leary’s version with u/jaror’s addition for suppressing the error messages.

1 Like

Slightly tangential, but if you’re interested in structured concurrency specifically, we do have ki: A lightweight structured concurrency library
I prefer using it to async, although I haven’t used either in anger.

2 Likes

Thanks. I promise I will get to it!

1 Like

Even though this is more like a nit-pick, it’s probably worth mentioning that using trace to actually log “when” something happens is not a sound solution.
Use print, putStr(Ln) etc to be sure the printing is in the IO pipeline so it will happen before / between / after the functions that are around it.
trace’s printing is based on evaluation, which means it might never happen, once, or maybe even more than once. And potentially at completely different times than you might expect.

1 Like

You can also just use traceIO if you’re in IO and in general traceM is often good enough.

1 Like