Hi.
I have a question about mkWeakPtr
and deRefWeak
.
Below is my example test code.
In this code, I am trying simple broadcasting using forkIO
and Chan
.
The problem is that deRefWeak
is failed and I don’t know why.
As workaround, If I use seq before mkWeakPtr
, It works but looks like leak is happened.
I want to see deRefWeak
’s successful result and I don’t want any leak.
Can someone help me understand this problem?
Sorry for bad English writing.
Thanks.
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TemplateHaskell #-}
-- ghc 9.6.6
-- stack lts-22.43
-- dependencies:
-- - base >= 4.7 && < 5
-- - mtl
-- - monad-logger
-- - containers
-- - text
-- - random
module Lib (someFunc) where
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.Identity
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.State.Strict
import Data.Foldable
import Data.Map.Strict as DMS
import qualified Data.Text as DT
import System.Mem.Weak
import System.Random
data Data1 = Data1
{ data1MapForkBroadcaster_ :: MVar (Maybe (Map Int (Chan Input1))),
data1MapForkSubscriber_ :: MVar (Maybe (Map Int (Chan Input2))),
data1QSem_ :: QSem,
data1StdGen_ :: MVar StdGen
}
data Data2 = Data2
{ data2Data1_ :: Data1,
data2Chan_ :: Chan Input1,
data2Id_ :: Int,
data2Restore_ :: IO Input1 -> IO Input1
}
data State1 = State1
{ state1MapSubscriber_ :: Map Int (Weak (Chan Input2)),
state1StdGen_ :: StdGen,
state1QuitThread_ :: Bool,
state1QuitChan_ :: Bool
}
type ReaderLoggingT r m a t = t (ReaderT r (LoggingT m)) a
type ReaderStateLoggingT r s m a t = t (ReaderT r (StateT s (LoggingT m))) a
runReaderLogging ::
(MonadIO m) =>
ReaderLoggingT r m a IdentityT ->
r ->
(Loc -> LogSource -> LogLevel -> LogStr -> IO ()) ->
m a
runReaderLogging a r l = runLoggingT (runReaderT (runIdentityT a) r) l
runReaderStateLogging ::
(MonadIO m) =>
ReaderStateLoggingT r s m a IdentityT ->
r ->
s ->
(Loc -> LogSource -> LogLevel -> LogStr -> IO ()) ->
m (a, s)
runReaderStateLogging a r s l = runLoggingT (runStateT (runReaderT (runIdentityT a) r) s) l
type ReaderLogging1 m a = ReaderLoggingT Data1 m a IdentityT
type ReaderStateLogging21 m a = ReaderStateLoggingT Data2 State1 m a IdentityT
data Input1
= Input1Timeout
| Input1Subscribe (Int, Weak (Chan Input2))
| Input1Quit
data Input2
= Input2Broadcast DT.Text
| Input2Quit
someFunc :: IO ()
someFunc = do
m <- newMVar (Just empty)
m' <- newMVar (Just empty)
m'' <- newQSem 0
m''' <- newMVar (mkStdGen 0)
let data1 =
Data1
{ data1MapForkBroadcaster_ = m,
data1MapForkSubscriber_ = m',
data1QSem_ = m'',
data1StdGen_ = m'''
}
in runStdoutLoggingT
( filterLogger
(\_ _ -> False)
(runReaderT (runIdentityT someFunc2) data1)
)
modifyMVarM_ :: (MonadIO m) => MVar a -> (a -> IO a) -> m ()
modifyMVarM_ a m = liftIO (modifyMVar_ a m)
withMVarM :: (MonadIO m) => MVar a -> (a -> IO b) -> m b
withMVarM a f = liftIO (withMVar a f)
writeChanM :: (MonadIO m) => Chan a -> a -> m ()
writeChanM c a = liftIO (writeChan c a)
setTimeout' :: (MonadIO m) => Int -> Chan a -> a -> m ()
setTimeout' i c a = do
-- if 'seq' is not used, below deRefWeak failed.
-- c' <- liftIO ({-# SCC mkWeakPtrCNothing #-} (mkWeakPtr c Nothing))
-- if 'seq' is used, deRefWeak successed, but maybe leak? i don't know why.
c' <- seq c (liftIO ({-# SCC mkWeakPtrCNothing #-} (mkWeakPtr c Nothing)))
liftIO
( void
( forkIO
( do
threadDelay i
deRefWeak c'
>>= ( \case
Just c'' -> writeChan c'' a
Nothing -> return ()
)
)
)
)
setTimeout'' :: (MonadIO m) => Int -> Chan a -> a -> m ()
setTimeout'' i c a = do
-- in this case, it looks like there is no leak in profiling result.
liftIO
( void
( forkIO
( do
threadDelay i
writeChan c a
)
)
)
setTimeout :: (MonadIO m) => ReaderStateLogging21 m ()
setTimeout = do
r <- ask
get
>>= ( \a -> do
let (i, a') = randomR (500 :: Int, 1500) (state1StdGen_ a)
in do
-- setTimeout' or setTimeout''
setTimeout' (i * 1000) (data2Chan_ r) Input1Timeout
put (a {state1StdGen_ = a'})
)
forkBroadcasterInner' ::
(MonadIO m) =>
Either SomeException Input1 ->
ReaderStateLogging21 m ()
forkBroadcasterInner' (Left e) = do
r <- ask
$logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter. " ++ show e))
liftIO (signalQSem (data1QSem_ (data2Data1_ r)))
forkBroadcasterInner' (Right Input1Quit) = do
r <- ask
$logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter. Input1Quit"))
get >>= (\a -> put a {state1QuitThread_ = True})
modifyMVarM_
(data1MapForkBroadcaster_ (data2Data1_ r))
(mapM (return . delete (data2Id_ r)))
forkBroadcasterInner
forkBroadcasterInner' (Right (Input1Subscribe (i, c))) = do
r <- ask
$logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter. Input1Subscribe " ++ show i))
get >>= (\a -> put a {state1MapSubscriber_ = insert i c (state1MapSubscriber_ a)})
forkBroadcasterInner
forkBroadcasterInner' (Right Input1Timeout) = do
r <- ask
$logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter. Input1Timeout"))
get
>>= ( \a -> do
unless (state1QuitThread_ a) $ do
setTimeout
let f ret (i, c) = do
c' <- liftIO (deRefWeak c)
case c' of
Just c'' -> do
$logInfo "TODO log"
writeChanM c'' (Input2Broadcast "broadcast")
return ret
Nothing -> do
$logInfo "TODO log"
return (delete i ret)
f' a' = a {state1MapSubscriber_ = a'}
in foldlM
f
(state1MapSubscriber_ a)
(DMS.toList (state1MapSubscriber_ a))
>>= put . f'
)
forkBroadcasterInner
forkBroadcasterInner :: (MonadIO m) => ReaderStateLogging21 m ()
forkBroadcasterInner = do
r <- ask
$logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter."))
liftIO (try (data2Restore_ r (readChan (data2Chan_ r))))
>>= forkBroadcasterInner'
forkBroadcaster :: (MonadIO m) => ReaderStateLogging21 m ()
forkBroadcaster = do
r <- ask
$logInfo (DT.pack ("forkBroadcaster " ++ show (data2Id_ r) ++ " enter."))
setTimeout
forkBroadcasterInner
$logInfo (DT.pack ("forkBroadcaster " ++ show (data2Id_ r) ++ " leave."))
someFunc2 :: (MonadIO m) => ReaderLogging1 m ()
someFunc2 = do
let cnt = 500
r <- ask
l <- askLoggerIO
$logInfo "someFunc2 enter."
mapM_
( \i -> do
i' <-
liftIO
( modifyMVar
(data1StdGen_ r)
( \a ->
let (i', a') = randomR (0 :: Int, 1000) a
in return (a', i')
)
)
modifyMVarM_
(data1MapForkBroadcaster_ r)
( mapM
( \m -> liftIO $ do
c <- newChan
let data2 restore =
Data2
{ data2Data1_ = r,
data2Chan_ = c,
data2Id_ = i,
data2Restore_ = restore
}
state1 =
State1
{ state1MapSubscriber_ = empty,
state1StdGen_ = mkStdGen i',
state1QuitChan_ = False,
state1QuitThread_ = False
}
in mask
( \restore ->
void
( forkIO
( void
( runReaderStateLogging
forkBroadcaster
(data2 restore)
state1
l
)
)
)
)
return (insert i c m)
)
)
)
[1 :: Int .. cnt]
void (liftIO getLine)
withMVarM
(data1MapForkBroadcaster_ r)
(mapM_ (mapM_ (`writeChan` Input1Quit)))
withMVarM
(data1MapForkSubscriber_ r)
(mapM_ (mapM_ (`writeChan` Input2Quit)))
let f i = do
runLoggingT ($logInfo (DT.pack (show i))) l
let f' =
(try (waitQSem (data1QSem_ r)) :: IO (Either SomeException ()))
>>= f''
f'' (Right _) = return ()
f'' (Left _) = f'
in f'
in liftIO (mapM_ f [1 :: Int .. cnt])
$logInfo "someFunc2 leave."