Would a conduit expert be able to write me a conduit that takes in file names and yields the first n lines from each file without leaking space or file handles? I have the following, but it leaks file handles and crashes on my machine if there are more than about 1000 files.
cabal-script-test27.hs: /tmp/manyfiles/1981: openBinaryFile: resource exhausted (Too many open files)
import Conduit
import qualified Data.Conduit.Combinators as CC
import System.Directory (doesFileExist)
import Data.Text (Text)
main :: IO ()
main = runConduitRes $
CC.sourceDirectoryDeep False "/tmp/manyfiles"
.| firstNLines 1
.| CC.mapM_ (liftIO . print)
firstNLines
:: Int
-> ConduitT FilePath Text (ResourceT IO) ()
firstNLines n = awaitForever $ \fp ->
CC.sourceFile fp
.| CC.decodeUtf8Lenient
.| CC.linesUnbounded
.| CC.take n
Right, when take takes less than everything (i.e. when it’s actually doing its job) it seems that finalizers aren’t run promptly. It’s hard to imagine that conduit users have been suffering this problem for years though, so I wonder what their alternative is.
I’ve looked into this for longer than I should have, but I think the problem is that you need to make take aware of the ResourceT for this to work. Here’s a hack I came up with:
takeRes :: MonadResource m => Int -> ConduitT a a m ()
takeRes 0 = liftResourceT $ do
is <- getInternalState
closeInternalState is
liftIO $ writeIORef is $ ReleaseMap maxBound (minBound + 1) mempty
takeRes n = await >>= \case
Nothing -> mempty
Just x -> yield x *> takeRes (n - 1)
Full code
{- cabal:
build-depends: base, text, directory, conduit, resourcet
default-language: GHC2021
-}
{-# LANGUAGE LambdaCase #-}
import Conduit
import qualified Data.Conduit.Combinators as CC
import Control.Monad.Trans.Resource
import Control.Monad.Trans.Resource.Internal
import Data.IORef
takeRes :: MonadResource m => Int -> ConduitT a a m ()
takeRes 0 = liftResourceT $ do
is <- getInternalState
closeInternalState is
liftIO $ writeIORef is $ ReleaseMap maxBound (minBound + 1) mempty
takeRes n = await >>= \case
Nothing -> mempty
Just x -> yield x *> takeRes (n - 1)
main :: IO ()
main =
runConduitRes $
CC.enumFromTo (1 :: Int) 4
.| awaitForever (\x ->
bracketP (pure ()) (\_ -> putStrLn "close")
(\_ -> CC.enumFromTo (1 :: Int) x)
.| takeRes 3)
.| CC.mapM_ (liftIO . print)
.| sinkNull
This is a place where continuation-based streaming libraries (like streamly or my own toy library jet-stream) are a bit more ergonomic than pull-based libraries like conduit or streaming. Because they control the continuation, it’s easier for them to manage allocation/deallocation of resources, even with functions like take.
I wrote a helper wrapper over streaming called streaming-bracketed to help with this same issue. From the package description:
Given a list [(Filepath,Int,Int)] of files and line ranges, create a stream
of lines belonging to the concatenated ranges.
Prompt release of file handles is required.
Thanks for looking into it! Unfortunately I don’t think the problem can be solved by making takeResResourceT-aware. After all, there may be no resource at all, and even if there is, it doesn’t know which one to free. For example, some other resource can be released too soon:
{- cabal:
build-depends: base, text, directory, conduit, resourcet
-}
{-# LANGUAGE GHC2021 #-}
{-# LANGUAGE LambdaCase #-}
import Conduit
import Control.Monad.Trans.Resource
import Control.Monad.Trans.Resource.Internal
import Data.Conduit.Combinators qualified as CC
import Data.IORef
main :: IO ()
main =
runConduitRes $
bracketP
(pure ())
(\() -> putStrLn "should close at end")
( \() ->
CC.enumFromTo (1 :: Int) 4
.| awaitForever
( \x ->
bracketP
(pure ())
(\_ -> putStrLn "close")
(\_ -> CC.enumFromTo (1 :: Int) x)
.| takeRes 3
)
.| CC.mapM_ (liftIO . print)
.| sinkNull
)
takeRes :: (MonadResource m) => Int -> ConduitT a a m ()
takeRes 0 = liftResourceT $ do
is <- getInternalState
closeInternalState is
liftIO $ writeIORef is $ ReleaseMap maxBound (minBound + 1) mempty
takeRes n =
await >>= \case
Nothing -> mempty
Just x -> yield x *> takeRes (n - 1)
ghci> main
1
close
1
2
close
1
2
3
close
should close at end
1
2
3
close
jet-stream wouldn’t work well with thousands of files because IIRC the way I implemented it concatenation of large numbers of streams will cause a quadratic slowdown. It seems that concatMap from streamly doesn’t have this problem, judging from the note here.
You can see an implementation in Bluefin below, analogous to the conduit implementation. There is no established way of doing this in Bluefin so I just cooked up a reasonable API. It differs from conduit in ways that are solvable and not substantive to this question (uses hGetLine rather than having a pipeline component for splitting into lines).
The important part is that the resource is bracketed in sourceFileLines (using withLine) and takeConsume is completely unaware that there is any resource in scope.
Nonetheless, the resources are managed properly and there is only one handle open at any point during this program.
Bluefin streaming without leaking handles
{- cabal:
build-depends: base, directory, bluefin
-}
{-# LANGUAGE GHC2021 #-}
{-# LANGUAGE ExplicitNamespaces #-}
{-# OPTIONS_GHC -Wall #-}
module Bluefin.Examples.ConduitLike where
import Bluefin.Compound (mapHandle)
import Bluefin.Consume (Consume, await)
import Bluefin.Eff (Eff, runEff_, (:&), type (<:))
import Bluefin.IO (IOE, effIO)
import Bluefin.Jump (jumpTo, withJump)
import Bluefin.Stream
( Stream,
enumerateFrom,
inFoldable,
streamConsume,
takeConsume,
yield,
)
import Bluefin.System.IO (hIsEOF, hGetLine, withFile)
import Control.Monad (forever)
import Data.Foldable (for_)
import System.Directory (listDirectory)
import System.IO qualified
infixr 9 `connect`
main :: IO ()
main = runEff_ $ \io ->
run $
sourceDirectory "/tmp/manyfiles" io
`connect` firstNLines 1 io
`connect` printC io
firstNLines ::
(e1 <: es, e2 <: es, e3 <: es) =>
Int ->
IOE e1 ->
Consume FilePath e2 ->
Stream String e3 ->
Eff es ()
firstNLines n io = awaitForever $ \filepath ->
sourceFileLines filepath io
`connect` takeConsume n
sourceDirectory ::
(e1 <: es, e2 <: es) =>
FilePath ->
IOE e1 ->
consume ->
Stream String e2 ->
Eff es ()
sourceDirectory filepath io _ y = do
dirs <- effIO io (listDirectory filepath)
for_ dirs $ \dir -> yield y (filepath <> "/" <> dir)
sourceFileLines ::
(e1 <: es, e2 <: es) =>
FilePath ->
IOE e1 ->
consume ->
Stream String e2 ->
Eff es ()
sourceFileLines filename io _ y = do
withFile io filename System.IO.ReadMode $ \h -> do
withJump $ \done -> forever $ do
eof <- hIsEOF h
if eof
then jumpTo done
else do
line <- hGetLine h
yield y line
awaitForever ::
(e1 <: es) =>
(a -> () -> stream -> Eff es b) ->
Consume a e1 ->
stream ->
Eff es r
awaitForever f a y = forever $ do
x <- await a
f x () y
connect ::
( forall e.
consume ->
Stream b e ->
Eff (e :& es) r
) ->
( forall e.
Consume b e ->
stream ->
Eff (e :& es) r
) ->
consume ->
stream ->
Eff es r
connect f1 f2 c s = streamConsume (\s' -> f1 c s') (\c' -> f2 c' s)
run :: (() -> () -> Eff es r) -> Eff es r
run k = k () ()
printC ::
(e1 <: es, e2 <: es) =>
(Show a) =>
IOE e1 ->
Consume a e2 ->
stream ->
Eff es r
printC io c _ = forever ((effIO io . print) =<< await c)
-- Other interesting bits
asConsume ::
(e1 <: es) =>
((forall e. Stream a e -> Eff (e :& es) ()) -> stream -> Eff es ()) ->
Consume a e1 ->
stream ->
Eff es ()
asConsume k c s = k (\y -> forever (await c >>= yield y)) s
foo :: IOE e -> consume -> stream -> Eff e ()
foo io =
(\_ -> inFoldable ['A' .. 'Z'])
`connect` asConsume (enumerateFrom 5)
`connect` takeConsume 5
`connect` takeConsume 4
`connect` takeConsume 3
`connect` printC io
-- > example
-- (5,'A')
-- (6,'B')
-- (7,'C')
example :: IO ()
example = runEff_ $ \io -> run (\c s -> foo (mapHandle io) c s)
I don’t think it’s a bug with awaitForever and bracketP and take. If anything you can criticize the batteries-included API as trading promptness of file closing for quietness of code.
The quick solution here is to sequester the file reading in its own ResourceT. See here.
Agreed. It should say the finalization will be run as soon as the conduit is exited, either normally or via exception.
I’d make a PR but I’m still waiting for this one to receive any attention…
Well it can run the finalizer early if you reach the end of the (nested) stream. It only falls back on the end of the whole stream because take prevents the end of the nested streams from being reached.
I hope your contributions will get attention soon, but it seems like maintenance has been stagnant for more than five years now.
Thanks for taking a look! That has the opposite problem for me: too early finalization. I have made a reply with an example.
Unfortunately I think not. In fact I don’t see any way to use ResourceT with ConduitT to achieve prompt finalization in the present of exceptions. That’s what I wrote about at Bluefin streams finalize promptly.
It seems like there ought to be a way of having a runResourceC given the other transformations in Data.Conduit.Lift. Maybe there’s no way to dehydrate the ResourceT state without running finalizers though.
EDIT: Probably not, because of exceptions. I read your post about Bluefin prompt finalization. It seems you’re using threads in more of an io-streams style of programming. Good read!