Can anyone share code examples for connecting and pulling messages from Google pubsub via gogol library

I’m relatively new to haskell, so I don’t have a deep understanding of a lot of haskell concepts yet, so I’m unable to piece together everything from just the hackage docs. But has anyone been able to connect to google cloud pubsub in haskell and pull messages?

Can you share any code examples that can guide me?

2 Likes

gogol requires you first to be set up with a IAM key file or the proper env.variables in scope. It all starts by loading a Credentials object, e.g. from your key file using fromFilePath : Network.Google.Auth

The IAM role you’ll use for requests must have the right scopes, in this case the general GCP ones and the ones for using PubSub. In gogol these are represented with type-level strings : Network.Google.PubSub

With the Credentials and a HTTPS connection manager we can create a gogol Environment, which we’ll use for all connections to the Google Cloud API .

Alternatively, if your credential file is in one of the system-standard locations (see the manual), you can create the Env and everything in one shot with newEnv :

env  <- newEnv <&>
          (envLogger .~ lgr)
        . (envScopes .~ storageReadWriteScope)

Above is taken from the Storage example, which you can see from the matching auth scope.

gogol relies heavily on lens, which is a library of combinators for getting/setting fields of data structures and composing stuff in a concise way. What you need here is just (.~) (which sets a field of the first argument to its second argument). Whenever you encounter a funny combinator or type, Hoogle is your friend : (.~) - Hoogle

API requests must be wrapped in a “runner” block, which takes care of all configuration and resource management (e.g. the IO associated with the HTTPS connections):

https://hackage.haskell.org/package/gogol-0.5.0/docs/Network-Google.html#v:send

https://hackage.haskell.org/package/gogol-0.5.0/docs/Network-Google.html#v:upload

https://hackage.haskell.org/package/gogol-0.5.0/docs/Network-Google.html#v:download

Where upload/download are for dealing with large request bodies in a streaming fahion, such as you might have in your PubSub messages.

Before we can make an actual API request, we must configure its required metadata fields (which correspond to the fields of the on-wire JSON representation). Request metadata too is usually built up with lens combinators because their objects are large and deeply nested:

For example (still taken from gogol-storage), if we want to upload a file to a given GCS bucket :

rq = objectsInsert bucket object' & oiName ?~ key

^ this starts with a blank “insert” request object that only sets the “bucket” field with the appropriate bucket name, and sets the object name field via the oiName lens. Here we use the ?~ setter, which sets the given field to Just key (assuming a variable named key is in scope at the right type).

With this request object, we can configure the action that will eventually perform the actual upload :

upload rq body

Whew ! After all this setup, now we can actually run our request (or as many as we like, inside a single do block) :

runResourceT . runGoogle env $ do ... 

The runResourceT . runGoogle env is a composite incantation for configuring the gogol actions and managing IO resources while guaranteeing memory cleanup if an exception strikes.

Hope this helps!

If you have more questions feel free to reply in the thread


Now that you understand the basics, here’s some example code for interacting with GCS :

4 Likes

Thanks @ocramz this kind of makes sense. I have to try it out to see if it all makes sense, and get back to you.

1 Like

Thanks again @ocramz for the response earlier. It definitely guided me in the right direction. But do you have any pubsub specific code snippet you could share? I’m unable to piece together everything I need from just the gogol documentation, and I’m having a not so easy time figuring out the errors I’m getting.
I’m trying to pull messages from a pubsub topic. But it’s not clear how I’m supposed to construct and make the request.

This is the code I’m stuck at.


startApp :: IO ()
startApp = do
  lgr <- Google.newLogger Google.Debug stdout

  env <-
    Google.newEnv
      <&> (Google.envLogger .~ lgr)
        . (Google.envScopes .~ PubSub.pubSubScope)

  -- putStrLn $ show env

  runResourceT . Google.runGoogle env $ do
    let subscription  = PubSub.subscription & PubSub.subName .~  Just "projects/<project-id>/subscriptions/<subscription-id>"

    trace "Subscription: " <> show subscription 

    pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
    _ <- Google.send PubSub.projectsSubscriptionsPull subscription pullReq

    pure ()


I get errors including:


/Users/tonyalaribe/Projects/apitoolkit-server/src/Lib.hs:98:16: error:
    • Couldn't match expected type ‘Google.Google
                                      '["https://www.googleapis.com/auth/pubsub"] t0’
                  with actual type ‘PubSub.PullRequest’
    • In a stmt of a 'do' block:
        pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
      In the second argument of ‘($)’, namely
        ‘do let subscription
                  = PubSub.subscription
                      & PubSub.subName
                          .~ Just "projects/past-3/subscriptions/apitoolkit-go-client-sub"
            trace "Subscription: " <> show subscription
            pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
            _ <- Google.send
                   PubSubPull.projectsSubscriptionsPull subscription pullReq
            ....’
      In a stmt of a 'do' block:
        runResourceT . Google.runGoogle env
          $ do let subscription
                     = PubSub.subscription
                         & PubSub.subName
                             .~ Just "projects/past-3/subscriptions/apitoolkit-go-client-sub"
               trace "Subscription: " <> show subscription
               pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
               _ <- Google.send
                      PubSubPull.projectsSubscriptionsPull subscription pullReq
               ....
   |
98 |     pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
   |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


Also, how do you log arbitrary values to the terminal? My intention was to log at each step of the process to make sure I’m doing everything correctly, starting with the env (Google.newEnv), but it seems I need to derive Show for env which I don’t access to (implemented in gogol). I have the same issue with the subscription which I tried to log via Debug.Trace.trace.

Thanks again

Re. your error : a pullRequest here is a pure value, so you don’t need to “bind” it (with <-) but just a let clause :

let preq = pullRequest & prMaxMessages .~ (Just 1)
_ <- send projectsSubscriptionsPull subscription preq

(I’ve imported the library functions un-qualified so it’s easier to see)

Re. arbitrary logging: in Haskell this is complicated by the fact that values can contain function-like things which cannot be represented as a string in a straightforward way. For instance in a gogol Env we store a Logger which is just a continuation : type Logger = LogLevelBuilderIO ()

By default gogol will log each API call at the level of verbosity you specified when creating the Logger. Unfortunately the raw logging functions are not exposed (because the authors want us to use a more full-featured logging library with rotation, batching etc. in production). Nonetheless you can copy some code from https://hackage.haskell.org/package/gogol-0.5.0/src/src/Network/Google/Internal/Logger.hs and use e.g. logInfo when inside a MonadIO block of statements. HTH!

1 Like

Thanks @ocramz . This helped a lot. I was able to get pull messages from google cloud pubsub, and also acknowledge the messages.

Here’s the complete code incase it helps someone else:

-- pubsubService connects to the pubsub service and listens for  messages,
-- then it calls the processMessage function to process the messages, and
-- acknoleges the list message in one request.
pubsubService :: IO ()
pubsubService = do
  lgr <- Google.newLogger Google.Trace stdout
  env <-
    Google.newEnv
      <&> (Google.envLogger .~ lgr)
        . (Google.envScopes .~ PubSub.pubSubScope)

  let pullReq = PubSub.pullRequest & PubSub.prMaxMessages ?~ 1
  let subscription = "projects/<project-id>/subscriptions/<topic-id>"

  runResourceT . Google.runGoogle env $ do
    pullResp <- Google.send $ PubSub.projectsSubscriptionsPull pullReq subscription
    let messages = pullResp ^. PubSub.prReceivedMessages
    traceM $ "Received  messages " ++ show messages

    msgIds <- mapM processMessage messages

    -- acknowledge the list of messages pulled from pubsub, so they can be taken out of the queue
    _ <- Google.send $ PubSub.projectsSubscriptionsAcknowledge (PubSub.acknowledgeRequest & PubSub.arAckIds .~ catMaybes msgIds) subscription
    pure ()

-- processMessage :: PubSub.ReceivedMessage -> IO PubSub.ReceivedMessage
processMessage msg = do
  let rmMsg = msg ^. PubSub.rmMessage
  let rmAckId = msg ^. PubSub.rmAckId
  traceM $ "Message " ++ show rmMsg ++ "; ID: " ++ show rmAckId

  pure rmAckId


2 Likes

My pleasure, glad it helped !

I think Haskell is actually a superpower for developing cloud services, let us know how it goes!

2 Likes