Multitasking: a new concurrency library

Hello everyone, it is generally assumed that Haskell is well-suited for concurrency. However, when doing concurrency tasks I found that it is actually not so easy.

In order to improve upon the situation, I am working on the new library multitasking, which provides built-in functions for common concurrency patterns. Essentially, I want to make a concurrency toolbox. Currently included:

  • Starting and waiting for tasks
  • Racing
  • Timeouts
  • Worker threads
  • Rate Limits
  • Specialized concurrency variables

It implements structured concurrency based on the library ki. Here is a racing implementation as a motivating example for structured concurrency:

Naive implementation

race = do
  mvar <- newEmptyMVar
  -- concurrently run two actions
  thread1 <- forkIO $ action1 >>= putMVar mvar
  thread2 <- forkIO $ action2 >>= putMVar mvar
  -- wait for winner
  result <- takeMVar mvar
  -- kill the remaining one
  killThread thread1
  killThread thread2
  useResult result

The above implementation should definitely not be used. It will only work as long as no exceptions are thrown. For example, both action1 and action2 might crash, resulting in an infinite waiting for the mvar in the parent thread.

Structured concurrency implementation with multitasking

race = do
  mvar <- newEmptyMVar
  result <- multitask $ \coordinator -> do
    -- scope starts
    task1 <- start coordinator $ action1 >>= putMVar mvar
    task2 <- start coordinator $ action2 >>= putMVar mvar
    takeMVar mvar
    -- scope ends
  useResult result

The above implementation is safe while staying simple to understand. The function multitask opens up a new scope for creating tasks, giving you a coordinator. Tasks created with start coordinator belong to that concurrency scope.

After the scope ends, all tasks belonging to that scope are killed. In other words, the lifetime of the created tasks is bounded by the scope. Thus, the slower task of the race is canceled automatically when the scope is exited.

Additionally, tasks propagate their exceptions to the parent thread. Therefore, an exception from action1 or action2 will interrupt takeMVar mvar and stop the infinite wait. Of course, you still need to deal with the exception.


Feel free to try out the library. If you have ideas for new features, please share them here. I am still in the process of including additional functionality and multitasking is not quite ready yet for a full package release.

12 Likes

Cool new library! Great to see Ki refined, I enjoyed reading through Ki’s source code, probably because of the hilarious comments.
I’ve had a hard time understanding how the RateLimit works and had to dig through the source to get an idea about how I should use it. Examples in the documentation would surely help, are you interested in pull requests or is this a project you want to hack on by yourself?

Feature Idea:

Improving on the situation of escaping handles

E.g.: When the scope from Ki.scoped escapes the callback, then bad things may happen.
This could be prevented by using a type-indexed handle, just like runST does.

Provide a resource-safe function like this:

multitask :: MonadIO m => (forall s. Coordinator s -> IO a) -> m a 

Alternatively, incorporate the Coordinator into the monad, so it can’t be obtained as a value, starting a task then looks like this:

start :: MonadCoordinator m => IO a => m (Task a)

I’ll be that person. How does it compare to async ?

7 Likes

It is true that multitask/Ki.scoped does not prevent Coordinator/Scope from escaping. In a previous version of multitasking, you could choose between using a monad or the ST trick to manage scopes.

However, I figured that the increased complexity and flexibility loss might not be worth the additional safety. I believe you cannot start any tasks with an escaped scope (it throws an exception instead), so it seems OK to me even if you leak a scope. However, I am not 100% sure about this yet.

Pull Requests are always appreciated. I’ll try to be responsive if you contribute.

I agree, the documentation still needs quite a lot of work.

1 Like

A good question! async is the standard library for doing concurrency, so it’s a given that multitasking and async are compared with each other.

The most important difference is that multitasking implements structured concurrency while async does not.

There are two main consequences I want to point out.

Less flexibilty for starting tasks

The recommended way to spawn threads with async is:

withAsync :: IO a -> (Async a -> IO b) -> IO b

withAsync has similar (though less strong) guarantees as multitask+start, but its merged into one function. This results in less flexibility. Take a look at the following examples:

multitask $ \coordinator ->
  traverse (\i -> start coordinator $ print i) [0..10]

You can easily combine start with traverse and friends. You cannot achieve the same with withAsync quite so easily.

myConcurrentFunction :: Coordinator -> IO ()
myConcurrentFunction coordinator = do
  start coordinator $ threadDelay 100000 >> putStrLn "I outlive myConcurrentFunction"

In more detail, it is possible to pass a Coordinator to functions. This allows the creation of tasks which live beyond the scope of the function. With the async library, you would need to use the low-level async function to achieve similar behavior.

Guaranteed propagation of errors

multitasking always propagates errors, while async does so only when directly waiting for an async. The following example is not safe in async, while the equivalent multitasking code is just fine.

race = do
  mvar <- newEmptyMVar  
  result <- withAsync (action1 >>= putMVar mvar) $ \async1 -> 
    withAsync (action2 >>= putMVar mvar) $ \async2 -> 
      takeMVar mvar
  useResult result 

Exceptions from action1 and action2 are not propagated, since no one is waiting on async1 or async2. This seems like a pretty big oversight to me, making it more difficult to use MVars or STMVars in a safe manner.

async provides waitEither to wait on async1 and on async2 to work around this issue. However, I believe all the different waiting functions of async are too complex and show that this is not a good API.

How is it any more low-level than start?

Is your

multitask $ \coordinator ->
  traverse (\i -> start coordinator $ print i) [0..10]

example just mapConcurrently print [0..10]?

That’s just one way of spawning threads in async. You can recover the behavior that you want with a simple definition (the Void part is probably unnecessary, but apparently I’ve never needed it not to be Void):

withAsyncLink :: IO Void -> (Async Void -> IO a) -> IO a
withAsyncLink a k = withAsync a $ \side -> link side *> k side

Here’s a little cheat sheet:

                  | a died | b died | a returned | b returned
--------------------------------------------------------------
async      a *> b | b goes | a goes | b goes     | a goes
withAsync     a b | b goes | a dies | b goes     | a dies
withAsyncLink a b | b dies | a dies | error      | a dies
race          a b | b dies | a dies | b dies     | a dies
concurrently  a b | b dies | a dies | b goes     | a goes
3 Likes

Note: Provide module Ki.Typed to prevent escape by LSLeary · Pull Request #21 · awkward-squad/ki · GitHub

In particular, the contents of my closing comment beyond the horizontal bar.

1 Like

The async function is described as low-level in the async documentation, probably because it does not make the same cleanup guarantees that withAsync does. multitasking has no low/high-level API, there is only start with similar guarantees as withAsync.

traverse start and mapConcurrently are similar in the sense that both will run threads for 0, 1..10. traverse start will give you back Tasks which you can wait for. mapConcurrently already waits.
While async introduces multiple new concurrency helpers for mapping, start is flexible enough to work with traverse and does not need any helpers. All of this results in a smaller and therefore simpler API, though this is a personal opinion.

That’s true, I overlooked those functions when writing the above example. However, I believe this is cumbersome and not what most users will do. Most will simply use withAsync since that is the default recommended option. multitask makes more opinionated choices in exception handling while async gives you more flexibility. Both approaches have their pros and cons.

4 Likes

Bluefin is essentially the general solution to well-scoping problems in Haskell. This is a discussion about ki-style concurrency in Bluefin that you might find interesting.

Thanks, it’s interesting to read about your ideas to integrate structured concurrency into bluefin and it gave me some things to think about for multitasking. Dealing with resources in a concurrent context is really quite tricky.

1 Like

traverse async will give you back Asyncs which you can wait for.

You can use async directly if you want start-like behavior.

Anyway, from what I can tell the only notables differences between multitasking and async is that the former has all the Coordinator-related business and less rich API. The former looks helpful, I’d be really curious to hear an opinion of someone who used both styles of concurrency on which one is more ergonomic and less error-prone.

As for exception-handling, I do believe that both “fail everything if one fails” and “keep going if one fails” are useful, so it’s fine to expect the user to make a conscious choice.

I think “keep going if one fails” might be an antipattern and should only be done in a very explicit manner by using try/catch manually. Threads can theoretically exit almost anywhere, so action1 >> action2 does not guarantee that action2 will run after action1. If the exception gets propagated, then you know about it at least and can shutdown any waits for action2.

This is also why there is no cancelTask and no startTry (a combination of start and try), since I do not want users to spawn threads which do not run fully. Of course, action1 >> action2 is not guaranteed to run after another even if you use multitasking, but I like to think that it is more difficult to make mistakes.

This is all a rather opinionated choice, so I second that an experience report on a real-world project would be helpful.