Question about mkWeakPtr and deRefWeak

Hi.

I have a question about mkWeakPtr and deRefWeak.

Below is my example test code.
In this code, I am trying simple broadcasting using forkIO and Chan.

The problem is that deRefWeak is failed and I don’t know why.
As workaround, If I use seq before mkWeakPtr, It works but looks like leak is happened.

I want to see deRefWeak’s successful result and I don’t want any leak.
Can someone help me understand this problem?

Sorry for bad English writing.
Thanks.

{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TemplateHaskell #-}

-- ghc 9.6.6
-- stack lts-22.43
-- dependencies:
-- - base >= 4.7 && < 5
-- - mtl
-- - monad-logger
-- - containers
-- - text
-- - random

module Lib (someFunc) where

import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.Identity
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.State.Strict
import Data.Foldable
import Data.Map.Strict as DMS
import qualified Data.Text as DT
import System.Mem.Weak
import System.Random

data Data1 = Data1
  { data1MapForkBroadcaster_ :: MVar (Maybe (Map Int (Chan Input1))),
    data1MapForkSubscriber_ :: MVar (Maybe (Map Int (Chan Input2))),
    data1QSem_ :: QSem,
    data1StdGen_ :: MVar StdGen
  }

data Data2 = Data2
  { data2Data1_ :: Data1,
    data2Chan_ :: Chan Input1,
    data2Id_ :: Int,
    data2Restore_ :: IO Input1 -> IO Input1
  }

data State1 = State1
  { state1MapSubscriber_ :: Map Int (Weak (Chan Input2)),
    state1StdGen_ :: StdGen,
    state1QuitThread_ :: Bool,
    state1QuitChan_ :: Bool
  }

type ReaderLoggingT r m a t = t (ReaderT r (LoggingT m)) a

type ReaderStateLoggingT r s m a t = t (ReaderT r (StateT s (LoggingT m))) a

runReaderLogging ::
  (MonadIO m) =>
  ReaderLoggingT r m a IdentityT ->
  r ->
  (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) ->
  m a
runReaderLogging a r l = runLoggingT (runReaderT (runIdentityT a) r) l

runReaderStateLogging ::
  (MonadIO m) =>
  ReaderStateLoggingT r s m a IdentityT ->
  r ->
  s ->
  (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) ->
  m (a, s)
runReaderStateLogging a r s l = runLoggingT (runStateT (runReaderT (runIdentityT a) r) s) l

type ReaderLogging1 m a = ReaderLoggingT Data1 m a IdentityT

type ReaderStateLogging21 m a = ReaderStateLoggingT Data2 State1 m a IdentityT

data Input1
  = Input1Timeout
  | Input1Subscribe (Int, Weak (Chan Input2))
  | Input1Quit

data Input2
  = Input2Broadcast DT.Text
  | Input2Quit

someFunc :: IO ()
someFunc = do
  m <- newMVar (Just empty)
  m' <- newMVar (Just empty)
  m'' <- newQSem 0
  m''' <- newMVar (mkStdGen 0)
  let data1 =
        Data1
          { data1MapForkBroadcaster_ = m,
            data1MapForkSubscriber_ = m',
            data1QSem_ = m'',
            data1StdGen_ = m'''
          }
   in runStdoutLoggingT
        ( filterLogger
            (\_ _ -> False)
            (runReaderT (runIdentityT someFunc2) data1)
        )

modifyMVarM_ :: (MonadIO m) => MVar a -> (a -> IO a) -> m ()
modifyMVarM_ a m = liftIO (modifyMVar_ a m)

withMVarM :: (MonadIO m) => MVar a -> (a -> IO b) -> m b
withMVarM a f = liftIO (withMVar a f)

writeChanM :: (MonadIO m) => Chan a -> a -> m ()
writeChanM c a = liftIO (writeChan c a)

setTimeout' :: (MonadIO m) => Int -> Chan a -> a -> m ()
setTimeout' i c a = do
  -- if 'seq' is not used, below deRefWeak failed.
  -- c' <- liftIO ({-# SCC mkWeakPtrCNothing #-} (mkWeakPtr c Nothing))
  -- if 'seq' is used, deRefWeak successed, but maybe leak? i don't know why.
  c' <- seq c (liftIO ({-# SCC mkWeakPtrCNothing #-} (mkWeakPtr c Nothing)))
  liftIO
    ( void
        ( forkIO
            ( do
                threadDelay i
                deRefWeak c'
                >>= ( \case
                        Just c'' -> writeChan c'' a
                        Nothing -> return ()
                    )
            )
        )
    )

setTimeout'' :: (MonadIO m) => Int -> Chan a -> a -> m ()
setTimeout'' i c a = do
  -- in this case, it looks like there is no leak in profiling result.
  liftIO
    ( void
        ( forkIO
            ( do
                threadDelay i
                writeChan c a
            )
        )
    )

setTimeout :: (MonadIO m) => ReaderStateLogging21 m ()
setTimeout = do
  r <- ask
  get
    >>= ( \a -> do
            let (i, a') = randomR (500 :: Int, 1500) (state1StdGen_ a)
             in do
                  -- setTimeout' or setTimeout''
                  setTimeout' (i * 1000) (data2Chan_ r) Input1Timeout
                  put (a {state1StdGen_ = a'})
        )

forkBroadcasterInner' ::
  (MonadIO m) =>
  Either SomeException Input1 ->
  ReaderStateLogging21 m ()
forkBroadcasterInner' (Left e) = do
  r <- ask
  $logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter. " ++ show e))
  liftIO (signalQSem (data1QSem_ (data2Data1_ r)))
forkBroadcasterInner' (Right Input1Quit) = do
  r <- ask
  $logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter. Input1Quit"))
  get >>= (\a -> put a {state1QuitThread_ = True})
  modifyMVarM_
    (data1MapForkBroadcaster_ (data2Data1_ r))
    (mapM (return . delete (data2Id_ r)))
  forkBroadcasterInner
forkBroadcasterInner' (Right (Input1Subscribe (i, c))) = do
  r <- ask
  $logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter. Input1Subscribe " ++ show i))
  get >>= (\a -> put a {state1MapSubscriber_ = insert i c (state1MapSubscriber_ a)})
  forkBroadcasterInner
forkBroadcasterInner' (Right Input1Timeout) = do
  r <- ask
  $logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter. Input1Timeout"))
  get
    >>= ( \a -> do
            unless (state1QuitThread_ a) $ do
              setTimeout
              let f ret (i, c) = do
                    c' <- liftIO (deRefWeak c)
                    case c' of
                      Just c'' -> do
                        $logInfo "TODO log"
                        writeChanM c'' (Input2Broadcast "broadcast")
                        return ret
                      Nothing -> do
                        $logInfo "TODO log"
                        return (delete i ret)
                  f' a' = a {state1MapSubscriber_ = a'}
               in foldlM
                    f
                    (state1MapSubscriber_ a)
                    (DMS.toList (state1MapSubscriber_ a))
                    >>= put . f'
        )
  forkBroadcasterInner

forkBroadcasterInner :: (MonadIO m) => ReaderStateLogging21 m ()
forkBroadcasterInner = do
  r <- ask
  $logInfo (DT.pack ("forkBroadcasterInner " ++ show (data2Id_ r) ++ " enter."))
  liftIO (try (data2Restore_ r (readChan (data2Chan_ r))))
    >>= forkBroadcasterInner'

forkBroadcaster :: (MonadIO m) => ReaderStateLogging21 m ()
forkBroadcaster = do
  r <- ask
  $logInfo (DT.pack ("forkBroadcaster " ++ show (data2Id_ r) ++ " enter."))
  setTimeout
  forkBroadcasterInner
  $logInfo (DT.pack ("forkBroadcaster " ++ show (data2Id_ r) ++ " leave."))

someFunc2 :: (MonadIO m) => ReaderLogging1 m ()
someFunc2 = do
  let cnt = 500
  r <- ask
  l <- askLoggerIO
  $logInfo "someFunc2 enter."
  mapM_
    ( \i -> do
        i' <-
          liftIO
            ( modifyMVar
                (data1StdGen_ r)
                ( \a ->
                    let (i', a') = randomR (0 :: Int, 1000) a
                     in return (a', i')
                )
            )
        modifyMVarM_
          (data1MapForkBroadcaster_ r)
          ( mapM
              ( \m -> liftIO $ do
                  c <- newChan
                  let data2 restore =
                        Data2
                          { data2Data1_ = r,
                            data2Chan_ = c,
                            data2Id_ = i,
                            data2Restore_ = restore
                          }
                      state1 =
                        State1
                          { state1MapSubscriber_ = empty,
                            state1StdGen_ = mkStdGen i',
                            state1QuitChan_ = False,
                            state1QuitThread_ = False
                          }
                   in mask
                        ( \restore ->
                            void
                              ( forkIO
                                  ( void
                                      ( runReaderStateLogging
                                          forkBroadcaster
                                          (data2 restore)
                                          state1
                                          l
                                      )
                                  )
                              )
                        )
                  return (insert i c m)
              )
          )
    )
    [1 :: Int .. cnt]

  void (liftIO getLine)

  withMVarM
    (data1MapForkBroadcaster_ r)
    (mapM_ (mapM_ (`writeChan` Input1Quit)))
  withMVarM
    (data1MapForkSubscriber_ r)
    (mapM_ (mapM_ (`writeChan` Input2Quit)))
  let f i = do
        runLoggingT ($logInfo (DT.pack (show i))) l
        let f' =
              (try (waitQSem (data1QSem_ r)) :: IO (Either SomeException ()))
                >>= f''
            f'' (Right _) = return ()
            f'' (Left _) = f'
         in f'
   in liftIO (mapM_ f [1 :: Int .. cnt])
  $logInfo "someFunc2 leave."

Not my area of expertise, sorry, but you might have more luck getting answers if you trim down your code to a minimal reproducible example. Also:

What kind of failure, is there a run-time error?

Hi.

Thanks for reply.

As you suggested, I prepared a more little test code.

Sorry for bad English writing.

{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}

-- to show deRefWeak's Noting result.

module Lib2 (someFunc) where

import Control.Concurrent
import Control.Monad
import Control.Monad.Identity
import Control.Monad.Logger
import Control.Monad.Reader
import qualified Data.Text as DT
import System.Mem.Weak

data Data1 = Data1 {data1Chan_ :: Chan String}

type ReaderLoggingT r m a t = t (ReaderT r (LoggingT m)) a

type ReaderLogging1 m a = ReaderLoggingT Data1 m a IdentityT

runReaderLoggingT ::
  (MonadIO m) =>
  ReaderLoggingT r m a IdentityT ->
  r ->
  (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) ->
  m a
runReaderLoggingT a r l =
  runLoggingT (runReaderT (runIdentityT a) r) l

someFunc :: IO ()
someFunc = do
  c <- liftIO newChan
  runStdoutLoggingT (runReaderT (runIdentityT someFunc2) (data1 c))
  where
    data1 c = Data1 {data1Chan_ = c}

forkDeRefWeakWriteChan :: (MonadIO m) => Weak (Chan String) -> ReaderLogging1 m ()
forkDeRefWeakWriteChan c = do
  $logInfo "before threadDelay"
  liftIO (deRefWeak c)
    >>= ( \case
            Just c' -> do
              -- at this time, it's fine.
              $logInfo "deRefWeak 1 success."
              liftIO (writeChan c' "it's ok.")
            Nothing -> $logInfo "deRefWeak 1 fail."
        )
  liftIO (threadDelay (1000 * 1000))
  $logInfo "after threadDelay"
  liftIO (deRefWeak c)
    >>= ( \case
            Just c' -> do
              -- I expect that this should be executed too.
              $logInfo "deRefWeak 2 success."
              liftIO (writeChan c' "it's ok too.")
            Nothing ->
              -- but, this was executed when "stack run".
              $logInfo "deRefWeak 2 fail."
        )

forkReadChan :: (MonadIO m) => ReaderLogging1 m ()
forkReadChan = do
  r <- ask
  l <- askLoggerIO
  c' <- liftIO (mkWeakPtr (data1Chan_ r) Nothing)
  $logInfo "before forkIO"
  void (liftIO (forkIO (runReaderLoggingT (forkDeRefWeakWriteChan c') r l)))
  $logInfo "before readChan 1"
  ret <- liftIO (readChan (data1Chan_ r))
  $logInfo (DT.pack ("after readChan 1. " ++ ret))
  $logInfo "before readChan 2"
  ret' <- liftIO (readChan (data1Chan_ r))
  $logInfo (DT.pack ("after readChan 2. " ++ ret'))

someFunc2 :: (MonadIO m) => ReaderLogging1 m ()
someFunc2 = do
  r <- ask
  l <- askLoggerIO
  liftIO $ do
    -- in this test code,
    -- because readChan doesn't throw BlockedIndefinitelyOnMVar,
    -- I assumed that 1 < the Chan's refcount.
    -- the key point of question is that if 1 < the Chan's refcount,
    -- why did deRefWeak return Nothing instead of Just?
    -- or, if I did something wrongly or my assumption was wrong, please let me know.
    void (forkIO (void (runReaderLoggingT forkReadChan r l)))
    void getLine

Is this truly minimal? Do you get the same results if you strip out the monad transformers and do everything directly in IO?

Note this warning about weak pointers in the docs:

WARNING: weak pointers to ordinary non-primitive Haskell types are particularly fragile, because the compiler is free to optimise away or duplicate the underlying data structure. Therefore attempting to place a finalizer on an ordinary Haskell type may well result in the finalizer running earlier than you expected. This is not a problem for caches and memo tables where early finalization is benign.

Finalizers can be used reliably for types that are created explicitly and have identity, such as IORef, MVar, and TVar. However, to place a finalizer on one of these types, you should use the specific operation provided for that type, e.g. mkWeakIORef, mkWeakMVar and mkWeakTVar respectively. These operations attach the finalizer to the primitive object inside the box (e.g. MutVar# in the case of IORef), because attaching the finalizer to the box itself fails when the outer box is optimised away by the compiler.

For some reason they only mention that finalizers may run early, but I also think this means the actual weak reference will return Nothing like you are experiencing. Sadly, there is no mkWeakChan. I guess you could reuse mkWeakMVar on one of the MVars of the Chan.

Thanks for reply.

As you suggested, I prepared a even more little test code.

Although I hoped that the result will be different, it was same.

Sorry for bad English writing.

{-# LANGUAGE LambdaCase #-}

module Lib4 (someFunc) where

import Control.Concurrent
import Control.Monad
import Debug.Trace
import System.Mem.Weak

forkDeRefWeakWriteChan :: Weak (Chan String) -> IO ()
forkDeRefWeakWriteChan c' = do
  deRefWeak c'
    >>= ( \case
            Just c -> do
              traceM "deRefWeak 1 success."
              writeChan c "it's ok."
            Nothing ->
              traceM "deRefWeak 1 fail."
        )
  threadDelay (1000 * 1000)
  deRefWeak c'
    >>= ( \case
            Just c -> do
              traceM "deRefWeak 2 success."
              writeChan c "it's ok too."
            Nothing ->
              traceM "deRefWeak 2 fail."
        )

forkReadChan :: Chan String -> IO ()
forkReadChan c = do
  traceM "before readChan 1"
  ret <- readChan c
  traceM ("after readChan 1 " ++ ret)
  traceM "before readChan 2"
  ret' <- readChan c
  traceM ("after readChan 2 " ++ ret')

someFunc :: IO ()
someFunc = do
  c <- newChan
  c' <- mkWeakPtr c Nothing
  void (forkIO (forkDeRefWeakWriteChan c'))
  void (forkIO (forkReadChan c))
  void getLine

-- traceM result are below
--
-- before readChan 1
-- deRefWeak 1 success.
-- after readChan 1 it's ok.
-- before readChan 2
-- deRefWeak 2 fail.

Thanks for reply.

As you suggested, I tried to do MVar (Maybe (Chan ...)) before writing modified post, and I could see the result I want to see.

But, I was curious if I was wrong and, I wanted to know if there are better way because I thought that withMVar + writeChan are double cost of atomic operation.

Sorry for bad English writing.

Sadly, the internals of Chan are not exposed anywhere, but otherwise you could do something like this:

{-# LANGUAGE MagicHash, UnboxedTuples #-}

import GHC.Weak
import GHC.IO
import GHC.MVar
import GHC.Exts

data Chan a
 = Chan {-# UNPACK #-} !(MVar (Stream a))
        {-# UNPACK #-} !(MVar (Stream a))

type Stream a = MVar (ChItem a)

data ChItem a = ChItem a {-# UNPACK #-} !(Stream a)

mkWeakChan :: Chan a -> IO () -> IO (Weak (Chan a))
mkWeakChan c@(Chan (MVar m#) _) (IO f) = IO $ \s ->
    case mkWeak# m# c f s of (# s1, w #) -> (# s1, Weak w #)

Perhaps that could be added to base.

Thanks for reply!

By the way, those # like somethings looks like quiet difficult things.

Can you recommend about some learning resource to learn these things or at least search engine keyword?

There’s some documentation here:

https://downloads.haskell.org/~ghc/9.12.1/docs/users_guide/exts/primitives.html

Thanks! :grinning: :grinning: :grinning:
Because the system complain that text is too short, I have to write more words than “Thanks!”.

I got a simpler example:

import Control.Concurrent
import System.Mem.Weak

writeWeak w a = do
  mcs <- deRefWeak w
  putStrLn "deref"
  case mcs of
    Nothing -> putStrLn "nothing"
    Just cs -> do
      writeChan cs a
      putStrLn a

main = do
  c <- newChan
  w <- mkWeak c c (Just (putStrLn "deleting ptr"))
  writeWeak w "1"
  threadDelay (1000 * 1000)
  writeWeak w "2"
  threadDelay (1000 * 1000)
  putStrLn "done"

I removed threadDelay and it worked as intended.
With this I conclude that threadDelay is running the finalizer.
Maybe it is because of the MVars held by Chan?

I replaced the key in mkWeak:

import Control.Concurrent
import System.Mem.Weak

writeWeak w a = do
  mcs <- deRefWeak w
  putStrLn "deref"
  case mcs of
    Nothing -> putStrLn "nothing"
    Just cs -> do
      writeChan cs a
      putStrLn a

main = do
  c <- newChan
  w <- mkWeak "c1" c (Just (putStrLn "deleting ptr"))
  writeWeak w "1"
  threadDelay (1000 * 1000)
  writeWeak w "2"
  threadDelay (1000 * 1000)
  putStrLn "done"

And now it works.

I think the threadDelays give the GC enough time to actually free the channel.

Also, I believe string literals are never freed, so using them as key in a weak reference means the reference will always be considered live.

You’re right. It’s not the MVars.

import Control.Concurrent
import System.Mem.Weak
import System.Environment

writeWeak w a = do
  mcs <- deRefWeak w
  putStrLn "deref"
  case mcs of
    Nothing -> putStrLn "nothing"
    Just cs -> do
      putStrLn (show a)

main = do
  x <- getProgName
  w <- mkWeak x () (Just (putStrLn "deleting ptr"))
  writeWeak w "1"
  threadDelay (1000 * 1000)
  writeWeak w "2"
  threadDelay (1000 * 1000)
  putStrLn "done"

This replicates the issue.
In here, x lives until it’s used, then it’s gone because it’s used no longer.

Yes, many things got constanted to memory: I tried string literals, numbers, pure (Just 0) and it got optimized.

After playing with it I either got things that died way too early or things that lived forever, so at this point, if you’re going to share MVars or primitives to “create lifetimes” for the channel, why not share the channel?

Thanks for reply.

I was trying simple server/client broadcasting.

In this exercise, I made a some external event callback (for example, gi-gtk4) in which writeChan is used.

And, I wanted to make sure that there is no input anymore when the server should be shut down.

If there was closeChan like something, it would be more convenient. But, because there is no such thing, I tried to find another way, and it was BlockedIndefinitelyOnMVar exception, and to make it works properly, I thought that Weak is required.

Of course, there must be better design, anyway, it was my question’s motivation.

Sorry for bad English writing.

Thanks for reading.

Something like this maybe:

import Control.Concurrent
import Control.Exception
import System.Timeout

forkWriteChan :: Chan String -> IO ()
forkWriteChan c = do
  writeChan c "it's ok."
  threadDelay (1000 * 1000)
  writeChan c "it's ok too."
  threadDelay (1000 * 1000)

forkReadChan :: Chan String -> IO ()
forkReadChan c = do
  putStrLn "before readChan 1"
  ret <- readChan c
  putStrLn $ "after readChan 1 " ++ ret
  putStrLn "before readChan 2"
  ret' <- readChan c
  putStrLn $ "after readChan 2 " ++ ret'

runUntilWeAreDone :: Chan String -> IO ()
runUntilWeAreDone c = do
  m <- timeout (5000 * 1000) (readChan c)  -- if nothing is getting written, we are done
  case m of
    Nothing -> pure ()  
    Just _  -> runUntilWeAreDone c

processRemainder :: Maybe [String] -> IO ()
processRemainder mxs = case mxs of
  Nothing -> putStrLn "nothing there, but we did not block"
  Just xs -> print xs >> putStrLn "we finished with the rest"

blockedChanHandler :: BlockedIndefinitelyOnMVar -> IO ()
blockedChanHandler _ = putStrLn "empty and blocked, nothing to process and nothing is coming, leaving"

main :: IO ()
main = do
  c <- newChan
  keepAlive <- dupChan c  -- things to c get broadcasted to keepAlive
  forkIO (forkWriteChan c)
  forkIO (forkReadChan c)
  flip catch blockedChanHandler $ do
    runUntilWeAreDone keepAlive
    rest <- timeout (5000 * 1000) (getChanContents c)
    processRemainder rest

Thanks for reply and suggestion.

To make sure that there is no remaining input anymore,
what I wanted to do was to make sure that writeChan should be failed after server was shut down.

Based on the suggested code,
not during below period, after below period was expired,

If there are writeChan, I think that this input cannot be processed by server.

And, because I didn’t want to make some extra state that is used to test whether Chan is enabled still or not, I tried Weak.

I think that a example of making extra state could be MVar (Maybe (Chan ...)) if out-of-box Weak doesn’t work as I want.
In this way, client/event callback should use this MVar (Maybe (Chan ...)) not just Chan, and only server should modifyMVar the MVar (Maybe (Chan ...)) to Nothing when server is shut down.

If I understood anything incorrectly, please let me know.

Again, thanks for your suggestion.