Solving a ResourceT-related space leak in production

10 Likes

Thanks for sharing! This is interesting. I’m trying to work out what the “ultimate” problem is here. There are a number of interrelated issues:

  • If the ResourceT blocks were tightly scoped the problem would be much reduced.
  • In some cases it is still important to be able to release a resource before the end of the block which acquired the resource.
  • It seems strange to me that Response knows how to release itself. If we had a general purpose “Release” value that we carried around with the Response, and was responsible for knowing how to release it, then there could be a general solution to this problem: adding a Release to a ResourceT block could always return a new Release that frees the resources in the ResourceT block that know about that Release!
  • If conduit streams finalized promptly then we wouldn’t even need ResourceT in the first place, but they don’t (Bluefin streams do).

I’d also say I think it’s confusing to describe this as a “space leak”. For precision of language I’d prefer to reserve “space leak” for things that only occur in Haskell, due to laziness[1]. I would have described this one as a “resource leak”.

[1]: Well, and other lazy languages too, but there aren’t really any.

2 Likes

As I understand it, ResourceT exists to work around streams which are pull-based, so upstream doesn’t know when you are done.

How do other languages solve this problem? I only really remember python, which just doesn’t solve it, e.g. if you do for line in open('x'): break, the file will remain open. GC will get it when it goes out of scope though.

So maybe the problem with ResourceT is that it actually defeats GC by keeping a reference? It’s nice to get cleaning up non-memory resources at a known time, but not so nice if it makes even memory resources get delayed to a known but too late time. It looks like it doesn’t use weak references, I wonder why not?

1 Like

I think in Python you’re supposed to do

with open('x') as f:
    for line in f:
        raise Whatever() -- for example

then if the exception breaks you out of the with block then the file is closed.

As I understand it, ResourceT exists to work around streams which are pull-based, so upstream doesn’t know when you are done.

I think ultimately ResourceT works around not being able to bracket computations that don’t run in an IO-based monad. So conduit/pipes/streaming need them because their monads are transformers, they don’t run directly in IO. Bluefin doesn’t need ResourceT because you can just bracket the streams directly (bluefin-streams-finalize-promptly – I actually wrote this partly in response to some of your questions @elaforge, so thanks for prompting that!).

So even if you have a pull stream, with an IO-based monad you wouldn’t need ResourceT per se. You could just use normal bracket and the stream’s resources would be closed as soon as you leave the body of the bracket, regardless of how many elements you had “pulled” from the stream.

2 Likes

That’s a bracket, and works or doesn’t in the same places as in haskell. What I was trying to say is that it’s a general problem with iterators, so it would be interesting to see how other languages solve it. In the case of python, so far as I know the answer is it just “leaks” it freely, but it usually works out for files because they have ref counting and attach close to the finalizer… which ResourceT would prevent.

But this would mean ResourceT is not only unnecessary but counterproductive, which can’t be true, because it was created for a reason. I imagine the reason is because it’s for non memory resources and we are rightly leery of attaching them to GC finalizers. So, files, sockets, shared memory, locks. ResourceT ensures they are released at a predictable point, but it might be a late point!

So I imagine if you tried to do this same thing in python, you wouldn’t leak sockets because GC would save you, and wouldn’t leak memory either because there’s no ResourceT holding on to stuff. But you would have been saved for the “wrong” reasons, and it would be hard to see in the source where the socket was closed and it would be easy to keep a reference, and if you somehow got a loop the regime changes from ref counting to mark sweep… just the same as for memory.

If you try to do it in another language with iterators but that didn’t put file close in the finalizer or doesn’t use ref counting, you will probably leak file descriptors, but not memory, because again, no ResourceT. But surely those other languages care about not leaking sockets! What do they do instead? I don’t know, but maybe they don’t try to build their http libraries around streams.

In the specific case, it seems like amazonka just had a bug where it didn’t free something. It was harder to notice because ResourceT acted like a backstop when you can wrap the whole transaction in one, in the same way you don’t notice leaks when your process exits soon. So it’s useful in that it allows you to have something like a process exit backstop without exiting the process, but might conceal leaks.

But I still wonder why no weak refs in ResourceT. Maybe that would just conceal leaks more!

I don’t think this is it, if you’re doing IO to open the input and read elements then you have IO to close the input. If you’re not doing IO, there’s no reason to use a stream or conduit, a lazy list is better. The example is clearly using IO.

You need ResourceT when the scope of the resource is not dynamically under a single expression, the same way that you can’t use the stack or alloca for everything and eventually need explicit free, or GC, or ownership passing.

This was a great case study. Thank you Bellroy for digging in, finding a fix, blogging about it and sharing it upstream.

3 Likes

Unfortunately the solution provided in jackdk’s article - releasing resources explicitly - isn’t viable in the long term, just as it won’t be viable for programmers to “micro-manage” the minutiae of parallelism:


So will there be an analogous critique about those now working on Haskell compiler/s in 2028?

In simple situations, I tend to always thightly scope the resource handling around streams. Run the ResourceT as soon as possible, or forgo ResourceT and run the stream into IO inside a regular bracket, exiting the bracket the moment we have consumed the part we needed.

There are other situations—when some form of “composite” stream is convenient to have—where things become difficult. For example:

Given a folder containing (possibly thousands) of text files, produce a stream composed of the first 10 lines of each file, all concatenated. File handle release should be prompt: no more that one file handle should be open at any given time. And it should really stream: having the complete 10 first lines of a file in memory at any time should not be required.

  • This is tricky for the ResourceT approach because we might be tempted to use the stream library’s take function to get the first 10 likes of each ResourceT-bracketed file stream, and then concatenate the streams, causing a file handle leak.
  • It’s also tricky for the “let’s use IO's vanilla bracket and run the stream inside it” approach because it’s difficult to “compose” the brackets to get the unified result stream. The result stream involves several different brackets! In the past I wrote the library streaming-bracketed to help with this, but I haven’t used it much.
1 Like

It’s not difficult for Bluefin. You just write the code you want to write, in a direct style, without thinking about ResourceT. You don’t even need to think about bracket because withFile does the bracket for you. (In the code below there is a bracket for observability reasons only.)

main :: IO ()
main = runEff $ \io -> do
  let dir = "/tmp/test-dir"
  forEach (firstThreeLinesOfEach dir io) $ \line ->
    effIO io (putStrLn line)

firstThreeLinesOfEach ::
  (e1 :> es, e2 :> es) =>
  FilePath ->
  IOE e1 ->
  Stream String e2 ->
  Eff es ()
firstThreeLinesOfEach dir io y = do
  filenames <- effIO io (listDirectory dir)
  let sortedFilenames = sort filenames

  for_ sortedFilenames $ \filename -> do
    let filepath = dir <> "/" <> filename
    let firstThree = take 3 (linesOfFile filepath io)

    forEach firstThree (yield y)

-- General purpose Bluefin function for streaming the
-- lines of a file. Perhaps it should be part of the
-- Bluefin standard library
linesOfFile ::
  (e1 :> es, e2 :> es) =>
  String ->
  IOE e1 ->
  Stream String e2 ->
  Eff es ()
linesOfFile filename io y = do
  withJump $ \onEOF -> do
    withFile io filename ReadMode $ \h -> do
      -- This bracket is only so we can observe the
      -- prompt closing of the file.
      bracket
        (effIO io (putStrLn "File opened"))
        (\() -> effIO io (putStrLn "File closed"))
        ( \() -> do
            forever $ do
              isEOF <- hIsEOF h
              when isEOF $
                jumpTo onEOF
              yield y =<< hGetLine h
        )
ghci> main
File opened
a1
a2
a3
File closed
File opened
b1
b2
b3
File closed
...
Full code
{-# LANGUAGE GHC2021 #-}
{-# LANGUAGE NoMonomorphismRestriction #-}

import Bluefin.Compound (useImplUnder)
import Bluefin.Eff (Eff, bracket, (:&), (:>))
import Bluefin.IO (IOE, effIO, runEff)
import Bluefin.Jump (jumpTo, withJump)
import Bluefin.State (evalState, get, modify)
import Bluefin.Stream (Stream, forEach, yield)
import Bluefin.System.IO (Handle, unsafeWithHandle, withFile)
import Control.Monad (forever, when)
import Data.Foldable (for_)
import Data.List (sort)
import System.Directory
import System.IO (IOMode (ReadMode))
import System.IO qualified
import Prelude hiding (take)

main :: IO ()
main = runEff $ \io -> do
  let dir = "/tmp/test-dir"
  forEach (firstThreeLinesOfEach dir io) $ \line ->
    effIO io (putStrLn line)

firstThreeLinesOfEach ::
  (e1 :> es, e2 :> es) =>
  FilePath ->
  IOE e1 ->
  Stream String e2 ->
  Eff es ()
firstThreeLinesOfEach dir io y = do
  filenames <- effIO io (listDirectory dir)
  let sortedFilenames = sort filenames

  for_ sortedFilenames $ \filename -> do
    let filepath = dir <> "/" <> filename
    let firstThree = take 3 (linesOfFile filepath io)

    forEach firstThree (yield y)

-- General purpose Bluefin function for streaming the
-- lines of a file
linesOfFile ::
  (e1 :> es, e2 :> es) =>
  String ->
  IOE e1 ->
  Stream String e2 ->
  Eff es ()
linesOfFile filename io y = do
  withJump $ \onEOF -> do
    withFile io filename ReadMode $ \h -> do
      -- This bracket is only so we can observe the
      -- prompt closing of the file.
      bracket
        (effIO io (putStrLn "File opened"))
        (\() -> effIO io (putStrLn "File closed"))
        ( \() -> do
            forever $ do
              isEOF <- hIsEOF h
              when isEOF $
                jumpTo onEOF
              yield y =<< hGetLine h
        )

-- This should be part of the Bluefin standard library
take ::
  (e1 :> es) =>
  Integer ->
  (forall e. Stream a e -> Eff (e :& es) ()) ->
  Stream a e1 ->
  Eff es ()
take n k y =
  withJump $ \done -> do
    evalState n $ \s -> do
      forEach (useImplUnder . k) $ \a -> do
        s' <- get s
        when (s' <= 0) $
          jumpTo done

        modify s (subtract 1)
        yield y a

-- This should be part of the Bluefin standard library
hGetLine :: (e1 :> es) => Handle e1 -> Eff es String
hGetLine h = unsafeWithHandle h System.IO.hGetLine

-- This should be part of the Bluefin standard library
hIsEOF :: (e1 :> es) => Handle e1 -> Eff es Bool
hIsEOF h = unsafeWithHandle h System.IO.hIsEOF
2 Likes

I don’t think there’s a general problem with iterators. At least I haven’t found a problem that Bluefin suffers from. Perhaps you can give an example where pull-based iterators release a resource too late and I’ll try to convert it to Bluefin to show that Bluefin’s pull-based iterators don’t have the same problem. In particular, my comment above is an example of Bluefin not having the same problem.

Oh, sorry, by “IO-based” I did’t mean “has IO at the base of the transformer stack” I mean (roughly) “supports a MonadUnliftIO instance”. Perhaps that was a bad choice of terminology.

Oops I wrote this before I saw the example above:

Here’s an example in python (the same as the one before, but concrete this time), the same situation exists with streams and I assume conduit and the rest. The scope of lines is dynamic, not amenable to bracket. But it also illustrates how it doesn’t turn out to be a problem in python, due to GC. The File object is just to make closing noisy:

#!/usr/bin/env python3

def main():
    open('a', 'w').write(''.join(f"a{i}\n" for i in range(6)))
    open('b', 'w').write(''.join(f"b{i}\n" for i in range(6)))
    timings = [
        (0, 'a'),
        (2, 'b'),
        (4, 'a'),
    ]
    # with resourcet:
    for out in mix(timings):
        print(out)

def mix(timings):
    iters = [pad(start, fname) for (start, fname) in timings]
    while iters:
        outs = [next(iter, None) for iter in iters]
        yield [o for o in outs]
        iters = [iter for (iter, o) in zip(iters, outs) if o is not None]

def pad(start, fname):
    for i in range(start):
        yield ''
    for _, line in zip(range(3), lines(fname)):
        yield line

def lines(fname):
    return iter(File(fname))

class File:
    def __init__(self, fname):
        self.file = open(fname)
        # self.key = resourcet.acquire(self.file)
    def __iter__(self):
        return self
    def __next__(self):
        line = self.file.readline()
        if not line:
            self.close()
            # resourcet.release(self.key)
            raise StopIteration
        else:
            return line
    def close(self):
        print('** close', self.file)
        self.file.close()
    def __del__(self):
        print('** del close', self.file)
        self.file.close()

if __name__ == '__main__':
    main()

When I run it, the files are closed promptly, after the take 3, but they are closed by __del__ rather than close. In the real situation, it isn’t take 3, but another stream that goes to zero, but I think it comes out the same.
If python had a registry analogous to resourcet, it would be in the places where the comments are, and if it didn’t use weak pointers, it would also prevent del close until the backstop at the end of the with.

I’m sure there’s a more minimal example, but this is how I came across it!

Maybe bluefin can do it! It looks pretty similar in theory. The jumpTo stuff in take doesn’t exist in traditional streams. I don’t yet understand how the jumpTo done in take then triggers the corresponding jumpTo onEOF in linesOfFile… if it does… and who is calling who here.

1 Like

Got it, ok that makes sense then.

1 Like

On the contrary, it is amenable to bracket (assuming you’re working in an IO-based monad (in the sense that it supports MonadUnliftIO).

Bluefin can do it! Here are the outputs side by side:

% pr --width=84 -Tm /tmp/haskell-out /tmp/python-out
[Just "a0",Just "",Just ""]               ['a0\n', '', '']
[Just "a1",Just "",Just ""]               ['a1\n', '', '']
[Just "a2",Just "b0",Just ""]             ['a2\n', 'b0\n', '']
File closed: a                            ** del close <_io.TextIOWrapper name='a'
[Nothing,Just "b1",Just ""]               [None, 'b1\n', '']
[Just "b2",Just "a0"]                     ['b2\n', 'a0\n']
File closed: b                            ** del close <_io.TextIOWrapper name='b'
[Nothing,Just "a1"]                       [None, 'a1\n']
[Just "a2"]                               ['a2\n']
File closed: a                            ** del close <_io.TextIOWrapper name='a'
[Nothing]                                 [None]

jumpTo is just the Bluefin equivalent of hoistMaybe Nothing from MaybeT, so traditional streams do have it.

jumpTo done doesn’t trigger jumpTo onEOF, rather it causes an exception to be thrown in yield y (the one that’s being fed by =<< hGetLine h) (because take is being applied to linesOfFile). That causes linesOfFile to exit abnormally (i.e. via exception rather than via normal return), and in doing so the cleanup action of linesOfFile's bracket will trigger, closing the file.

Does that make it clearer?

EDIT: I added some explanatory comments in the “Full code” section (initially collapsed).


Here is the code corresponding your Python version:

main :: IO ()
main = runEff $ \io -> do
  effIO io $ do
    writeFile
      "a"
      (unlines (map (\i -> "a" <> show i) [0 :: Int .. 5]))
    writeFile
      "b"
      (unlines (map (\i -> "b" <> show i) [0 :: Int .. 5]))

  let timings =
        [ (0, "a"),
          (2, "b"),
          (4, "a")
        ]

  forEach (mix timings io) $ \out ->
    effIO io (print out)

mix ::
  forall e1 e2 es.
  (e1 :> es, e2 :> es) =>
  [(Int, FilePath)] ->
  IOE e2 ->
  Stream [Maybe String] e1 ->
  Eff es ()
mix timings io y = do
  let itersStreams ::
        [ forall e.
          Stream (Maybe String) e ->
          Eff (e :& es) ()
        ]
      itersStreams = map (\x -> nothingOnEnd (pad io x)) timings

  connectMany itersStreams $ \itersStart -> do
    flip fix itersStart $ \again iters -> do
      when (not (null iters)) $ do
        outs <- traverse await iters
        yield y outs
        let iters' =
              [ iter
                | (iter, o) <- zip iters outs,
                  not (isNothing o)
              ]
        again iters'

nothingOnEnd ::
  (e :> es) =>
  (forall e. Stream a e -> Eff (e :& es) r) ->
  Stream (Maybe a) e ->
  Eff es r
nothingOnEnd s y = do
  forEach s $ \a -> yield y (Just a)
  forever (yield y Nothing)

pad ::
  (e1 :> es, e2 :> es) =>
  IOE e1 ->
  (Int, FilePath) ->
  Stream String e2 ->
  Eff es ()
pad io (start, fname) y = do
  replicateM_ start (yield y "")
  take 3 (linesOfFile fname io) y
Full code

To help understand this code, it can be useful to keep in mind

  • Eff es a is just a newtype-wrapped IO a
  • Stream a e is just a newtype-wrapped a -> IO ()
  • Consume b e is just a newtype-wrapped IO b
  • yield y a is just a newtype-wrapped y a
  • await c is just a newtype-wrapped c
  • forEach k body is just a newtype-wrapped k body
{-# LANGUAGE GHC2021 #-}
{-# LANGUAGE ImpredicativeTypes #-}
{-# LANGUAGE NoMonomorphismRestriction #-}

import Bluefin.Compound hiding (Handle)
import Bluefin.Consume (Consume, await, consumeStream)
import Bluefin.Eff (Eff, bracket, (:&), (:>))
import Bluefin.IO (IOE, effIO, runEff)
import Bluefin.Jump (jumpTo, withJump)
import Bluefin.State (evalState, get, modify)
import Bluefin.Stream (Stream, forEach, yield)
import Bluefin.System.IO (Handle, hGetLine, hIsEOF, unsafeWithHandle, withFile)
import Control.Monad (forever, replicateM_, when)
import Data.Foldable (for_)
import Data.Function (fix)
import Data.List (sort)
import Data.Maybe (isNothing)
import System.IO (IOMode (ReadMode))
import Prelude hiding (take)

main :: IO ()
main = runEff $ \io -> do
  effIO io $ do
    writeFile
      "a"
      (unlines (map (\i -> "a" <> show i) [0 :: Int .. 5]))
    writeFile
      "b"
      (unlines (map (\i -> "b" <> show i) [0 :: Int .. 5]))

  let timings =
        [ (0, "a"),
          (2, "b"),
          (4, "a")
        ]

  -- With newtype wrapping removed, `Stream a e` is just `a -> IO ()`
  -- and `forEach` is just function application. `effIO io` is a no-op, so
  -- this is:
  --
  --     mix timings io print
  forEach (mix timings io) $ \out ->
    effIO io (print out)

mix ::
  forall e1 e2 es.
  (e1 :> es, e2 :> es) =>
  [(Int, FilePath)] ->
  IOE e2 ->
  Stream [Maybe String] e1 ->
  Eff es ()
mix timings io y = do
  -- There are `ImpredicativeTypes` here, but that's not essential.
  -- We could have just used a rank-1 newtype wrapper instead.
  -- `ImpredicativeTypes` are just slightly more convenient.
  let itersStreams ::
        [ forall e.
          Stream (Maybe String) e ->
          Eff (e :& es) ()
        ]
      itersStreams = map (\x -> nothingOnEnd (pad io x)) timings

  connectMany itersStreams $ \itersStart -> do
    flip fix itersStart $ \again iters -> do
      when (not (null iters)) $ do
        outs <- traverse await iters
        yield y outs
        let iters' =
              [ iter
                | (iter, o) <- zip iters outs,
                  not (isNothing o)
              ]
        again iters'

-- | When the stream is finished, yield Nothing for evermore.
nothingOnEnd ::
  (e :> es) =>
  (forall e. Stream a e -> Eff (e :& es) r) ->
  Stream (Maybe a) e ->
  Eff es r
nothingOnEnd s y = do
  -- Without newtype wrapping, this is
  -- do
  --   s (\a -> y (Just a))
  --   forever (y Nothing)
  forEach s $ \a -> yield y (Just a)
  forever (yield y Nothing)

pad ::
  (e1 :> es, e2 :> es) =>
  IOE e1 ->
  (Int, FilePath) ->
  Stream String e2 ->
  Eff es ()
pad io (start, fname) y = do
  replicateM_ start (yield y "")
  take 3 (linesOfFile fname io) y

-- `connectMany` is the part of this program that behaves in the
-- least familiar manner. This is what it does:
--
-- When given a list of n effectful operations which yield `a`s it
-- forks n threads for them to run in. There is also a thread forked
-- for the function that accepts a list of `Consume`s. Each `Consume`
-- receives the `a`s yielded by one of the n threads.
--
-- The threads are synchronised by `MVar`s, so there is only one
-- possible interleaving of the concurrent code. When `await` is
-- called on one of the n `Consume`s the `Stream` thread corresponding
-- to that `Consume` is asked "please do the work required to get
-- me your next `a`". That unblocks it, it does its work, gives its `a`,
-- blocks, and the `Consume` thread continues.
--
-- (Actually, there are n threads forked for the `Consume` action,
-- but it only runs in the last one. This is not an essential feature,
-- it's just how the current implementation happens to work.)
--
-- The function also uses `ImpredicativeTypes` to put a `forall` inside the
-- list type constructor, but that's just a convenience, not something
-- essential to the operation of the program.
connectMany ::
  -- | n effectful operations that yield `a`s
  [forall e. Stream a e -> Eff (e :& es) r] ->
  -- | Will be called with a list of n Consumes,
  -- to which the streams above will yield their
  -- `a`s.
  (forall e. [Consume a e] -> Eff (e :& es) r) ->
  Eff es r
connectMany ss k =
  makeOp (connectMany' ss k [])

connectMany' ::
  [forall e. Stream a e -> Eff (e :& es) r] ->
  (forall e. [Consume a e] -> Eff (e :& es) r) ->
  (forall e. [Consume a e] -> Eff (e :& es) r)
connectMany' [] k = k
connectMany' (s : ss) k =
  connectMany'
    ss
    ( \cs ->
        consumeStream
          (\c -> useImplIn k (mapHandle c : map mapHandle cs))
          (useImplWithin s)
    )

-- General purpose Bluefin function for streaming the
-- lines of a file
linesOfFile ::
  (e1 :> es, e2 :> es) =>
  String ->
  IOE e1 ->
  Stream String e2 ->
  Eff es ()
linesOfFile filename io y = do
  withJump $ \onEOF -> do
    withFile io filename ReadMode $ \h -> do
      -- This bracket is only so we can observe the
      -- prompt closing of the file.
      bracket
        (pure ())
        (\() -> effIO io (putStrLn ("File closed: " <> filename)))
        ( \() -> do
            forever $ do
              isEOF <- hIsEOF h
              when isEOF $
                jumpTo onEOF
              -- A `Stream a e` is just a newtype wrapped `a -> IO ()`, and
              -- without the wrapping `yield y a` is just `y a`. So, without the
              -- wrapping this is `y =<< hGetLine h`.
              yield y =<< hGetLine h
        )

-- This should be part of the Bluefin standard library
--
-- `take n s y` takes the first n elements of `s` (yielding them to `y`)
-- and then stops. It does so by jumping out of an infinite loop
-- when a countdown (which starts at n) hits 0.
take ::
  (e1 :> es) =>
  Integer ->
  (forall e. Stream a e -> Eff (e :& es) ()) ->
  Stream a e1 ->
  Eff es ()
take n k y =
  withJump $ \done -> do
    evalState n $ \s -> do
      -- `Stream a e` is a newtype wrapped `a -> IO ()`, and
      -- with all the wrapping removed, `forEach k body` is
      -- just `k body`.
      --
      -- `useImplUnder` is a no-op, when the newtype wrapping
      -- is removed. It just massages effect type tags.
      forEach (useImplUnder . k) $ \a -> do
        s' <- get s
        when (s' <= 0) $
          jumpTo done

        modify s (subtract 1)
        -- Wtth newtype wrapping removed this is just `y a`.
        yield y a