In the documentation for mapConcurrently it says
Take into account that async
will try to immediately spawn a thread for each element of the Traversable
, so running this on large inputs without care may lead to resource exhaustion (of memory, file descriptors, or other limited resources).
I understand the danger of using it comes from what those asyncs actually do (like each opening a file or opening network connection) rather than from the nature of async
itself? As far as I understand Haskell concurrency, threads are rather lightweight and will still map to capabilities available. What are the dangers of spawning, let’s say, 1000 such asyncs that all do rather lightweight mostly pure computations (and each has allocation limits set, so if not it will be killed). An alternative would be i.e. to create 8 worker threads that pull work from a queue until they calculate everything, but I’d like to have a definite reason to complicate the code in such a way over just mapConcurrently
.
What are the dangers of spawning, let’s say, 1000 such asyncs that all do rather lightweight mostly pure computations (and each has allocation limits set, so if not it will be killed).
That use case should be fine, GHC’s threads are indeed sufficiently lightweight for that as you say, the warning does not apply.
An alternative would be i.e. to create 8 worker threads that pull work from a queue until they calculate everything, but I’d like to have a definite reason to complicate the code in such a way over just mapConcurrently
.
Note that you can use eg pooledMapConcurrentlyN
for that, which is just as convenient as mapConcurrently
. (For more complicated cases, you can eg use conduit-concurrent-map
.)
4 Likes
Sorry it doesn’t really answer your question, but you don’t really have to do this work - UnliftIO.Internals.Async has already done the hard work for you! In general unbounded concurrency is rarely something you actually want, so I’d probably suggest going straight to pooled concurrency.
Edit: ha, hi @amesgen!
3 Likes
Interesting, might try doing this (with some form of wrapper as I use MonadBaseControl IO
stack - is the IO
version exported?). Thanks!
is the IO
version exported?
Not sure what you mean, pooledMapConcurrentlyN
works for all m
with a MonadUnliftIO
instance, and IO
has a MonadUnliftIO
instance.
But maybe you want to have an analogue that requires MonadBaseControl IO
instead (this is always possible as MonadUnliftIO
is strictly less general, namely exactly as expressive as (MonadBaseControl IO m, Forall (Pure m)
):
import Control.Concurrent.Async.Lifted.Safe (Forall, Pure)
import Control.Monad.Trans.Control
import UnliftIO.Async
pooledMapConcurrentlyN' ::
(MonadBaseControl IO m, Forall (Pure m), Traversable t) =>
Int -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN' n f as = liftBaseWith \runInBase ->
pooledMapConcurrentlyN n (runInBase . f) as
Just as in Control.Concurrent.Async.Lifted.Safe
, this intentionally restricts m
to monads that “have no state”, ie StateT
does not work, as the semantics are otherwise too tricky IMO (e.g. what should the final state be when they are modified to different values in different threads?).
2 Likes