forkIO causes program output to cease


#1

I’m a bit puzzled about why running a function in forkIO causes program output to cease. I have one function, consumer, which I have run in both the main thread and in a forked thread. In the main thread, data is read and displayed as expected. The program works as expected. But in the forked thread, nothing happens. Intuitively, it seems like it shouldn’t matter which thread the function executes in.

It seems like because I’m trying to evaluate the consumer function in another thread it’s not being evaluated. I’d be curious to see the Haskell idioms to approach this use case.

module Main where

import Control.Concurrent.Chan.Unagi.NoBlocking (InChan, OutChan, newChan, readChan, writeChan)
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad (forever)

import Lib
import Source.Local.FXCM.Producer
import Types

fxcmBaseDir = "/home/doug/dev/historical-data/fxcm-bid-ask/test"

getSecondChan :: IO (InChan Price, OutChan Price)
getSecondChan = newChan

getMinuteChan :: IO (InChan Price, OutChan Price)
getMinuteChan = newChan

getHourChan :: IO (InChan Price, OutChan Price)
getHourChan = newChan

-- main :: IO ()
main = do
    print "Program Started"
    secondChan <- getSecondChan
    let secondIn = fst secondChan
    let secondOut = snd secondChan
    minuteChan <- getMinuteChan
    let minuteIn = fst minuteChan
    let minuteOut = snd minuteChan
    forkIO $ beginFeed secondIn fxcmBaseDir
    forkIO $ consumer [] secondOut 1000 minuteIn -- this doesn't work
    -- consumer [] secondOut 1000 minuteIn -- this works
    forever $ do
	return ()

consumer :: [Price] -> OutChan Price -> Int -> InChan Price -> IO ()
consumer prices source period sink = forever $ do
    print "consumer here"
    price <- readChan (threadDelay 1000) source
    print $ "c: " ++ (show price)
    consumer prices source period sink

#2

This is problematic:

    forever $ do
	return ()

That is an infinite loop that will probably not be pre-empted by GHC’s runtime. So no other forks will be allowed to run because that loop will keep running. Read more about this here:

More specifically, a thread may be pre-empted whenever it allocates some memory, which unfortunately means that tight loops which do no allocation tend to lock out other threads (this only seems to happen with pathological benchmark-style code, however).

You could fix it by using yield:

    forever yield

But it is probably nicer to just run one of the forks in the main thread.

Another way to get the right behaviour it is to give the program more capabilities. By default your GHC program only gets one capability which means it will only use one physical CPU core. If you use the -threaded GHC flag when compiling and the +RTS -N -RTS runtime flag, then the program will probably also run correctly.

But that will still mean that one thread is using 100% of one CPU core running your empty loop.


#3

Thanks for the info. I ran the program with -N and the issue persisted, but yielding the main loop worked as expected.

But it is probably nicer to just run one of the forks in the main thread.

In the end that’s the plan. Right now I’m just trying to prove out the architecture and eventually the main thread will contain the logic for analyzing signals produced by these consumer threads.


#4

An even nicer solution might be waiting on one of the two threads:

    wait <- newEmptyMVar
    tid <- forkIO $ do
      consumer [] secondOut 1000 minuteIn
      putMVar wait ()
    takeMVar wait

Or alternatively wating for a long time in the infinite loop:

    forever $ threadDelay maxBound

If you just use yield then that will not take up 100% of one CPU core, but it will still run many times.


#5

This is a fair point. The second solution is nice as it’s a one line change while I get the plumbing set up.


#6

A similar option to the MVar is to use the primitives from the async library:

...
import Control.Concurrent.Async

main = do
  ...
  concurrently_
    (beginFeed secondIn fxcmBaseDir)
    (consumer [] secondOut 1000 minuteIn)

This waits for both threads to finish and also handles exceptions in a reasonable manner.

I’ve found it very easy to get into surprising and hard-to-debug program states due to exceptions and using forkIOs. For example, with forkIO if one of your threads throws an exception the rest of the program won’t notice.


#7

Yeah, I also heavily recommend using the async library – it’s a fairly light wrapper around forkIO & friends but it makes it so much easier to get things “right”, especially when exceptions are involved.