I’m relatively new to haskell, so I don’t have a deep understanding of a lot of haskell concepts yet, so I’m unable to piece together everything from just the hackage docs. But has anyone been able to connect to google cloud pubsub in haskell and pull messages?
Can you share any code examples that can guide me?
gogol requires you first to be set up with a IAM key file or the proper env.variables in scope. It all starts by loading a Credentials object, e.g. from your key file using fromFilePath : Network.Google.Auth
The IAM role you’ll use for requests must have the right scopes, in this case the general GCP ones and the ones for using PubSub. In gogol these are represented with type-level strings : Network.Google.PubSub
With the Credentials and a HTTPS connection manager we can create a gogolEnvironment, which we’ll use for all connections to the Google Cloud API .
Alternatively, if your credential file is in one of the system-standard locations (see the manual), you can create the Env and everything in one shot with newEnv :
Above is taken from the Storage example, which you can see from the matching auth scope.
gogol relies heavily on lens, which is a library of combinators for getting/setting fields of data structures and composing stuff in a concise way. What you need here is just (.~) (which sets a field of the first argument to its second argument). Whenever you encounter a funny combinator or type, Hoogle is your friend : (.~) - Hoogle
API requests must be wrapped in a “runner” block, which takes care of all configuration and resource management (e.g. the IO associated with the HTTPS connections):
Where upload/download are for dealing with large request bodies in a streaming fahion, such as you might have in your PubSub messages.
Before we can make an actual API request, we must configure its required metadata fields (which correspond to the fields of the on-wire JSON representation). Request metadata too is usually built up with lens combinators because their objects are large and deeply nested:
For example (still taken from gogol-storage), if we want to upload a file to a given GCS bucket :
rq = objectsInsert bucket object' & oiName ?~ key
^ this starts with a blank “insert” request object that only sets the “bucket” field with the appropriate bucket name, and sets the object name field via the oiName lens. Here we use the ?~ setter, which sets the given field to Just key (assuming a variable named key is in scope at the right type).
With this request object, we can configure the action that will eventually perform the actual upload :
upload rq body
Whew ! After all this setup, now we can actually run our request (or as many as we like, inside a single do block) :
runResourceT . runGoogle env $ do ...
The runResourceT . runGoogle env is a composite incantation for configuring the gogol actions and managing IO resources while guaranteeing memory cleanup if an exception strikes.
Hope this helps!
If you have more questions feel free to reply in the thread
Now that you understand the basics, here’s some example code for interacting with GCS :
Thanks again @ocramz for the response earlier. It definitely guided me in the right direction. But do you have any pubsub specific code snippet you could share? I’m unable to piece together everything I need from just the gogol documentation, and I’m having a not so easy time figuring out the errors I’m getting.
I’m trying to pull messages from a pubsub topic. But it’s not clear how I’m supposed to construct and make the request.
This is the code I’m stuck at.
startApp :: IO ()
startApp = do
lgr <- Google.newLogger Google.Debug stdout
env <-
Google.newEnv
<&> (Google.envLogger .~ lgr)
. (Google.envScopes .~ PubSub.pubSubScope)
-- putStrLn $ show env
runResourceT . Google.runGoogle env $ do
let subscription = PubSub.subscription & PubSub.subName .~ Just "projects/<project-id>/subscriptions/<subscription-id>"
trace "Subscription: " <> show subscription
pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
_ <- Google.send PubSub.projectsSubscriptionsPull subscription pullReq
pure ()
I get errors including:
/Users/tonyalaribe/Projects/apitoolkit-server/src/Lib.hs:98:16: error:
• Couldn't match expected type ‘Google.Google
'["https://www.googleapis.com/auth/pubsub"] t0’
with actual type ‘PubSub.PullRequest’
• In a stmt of a 'do' block:
pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
In the second argument of ‘($)’, namely
‘do let subscription
= PubSub.subscription
& PubSub.subName
.~ Just "projects/past-3/subscriptions/apitoolkit-go-client-sub"
trace "Subscription: " <> show subscription
pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
_ <- Google.send
PubSubPull.projectsSubscriptionsPull subscription pullReq
....’
In a stmt of a 'do' block:
runResourceT . Google.runGoogle env
$ do let subscription
= PubSub.subscription
& PubSub.subName
.~ Just "projects/past-3/subscriptions/apitoolkit-go-client-sub"
trace "Subscription: " <> show subscription
pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
_ <- Google.send
PubSubPull.projectsSubscriptionsPull subscription pullReq
....
|
98 | pullReq <- PubSub.pullRequest & PubSub.prMaxMessages .~ (Just 1)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Also, how do you log arbitrary values to the terminal? My intention was to log at each step of the process to make sure I’m doing everything correctly, starting with the env (Google.newEnv), but it seems I need to derive Show for env which I don’t access to (implemented in gogol). I have the same issue with the subscription which I tried to log via Debug.Trace.trace.
(I’ve imported the library functions un-qualified so it’s easier to see)
Re. arbitrary logging: in Haskell this is complicated by the fact that values can contain function-like things which cannot be represented as a string in a straightforward way. For instance in a gogol Env we store a Logger which is just a continuation : type Logger = LogLevel → Builder → IO ()
By default gogol will log each API call at the level of verbosity you specified when creating the Logger. Unfortunately the raw logging functions are not exposed (because the authors want us to use a more full-featured logging library with rotation, batching etc. in production). Nonetheless you can copy some code from https://hackage.haskell.org/package/gogol-0.5.0/src/src/Network/Google/Internal/Logger.hs and use e.g. logInfo when inside a MonadIO block of statements. HTH!
Thanks @ocramz . This helped a lot. I was able to get pull messages from google cloud pubsub, and also acknowledge the messages.
Here’s the complete code incase it helps someone else:
-- pubsubService connects to the pubsub service and listens for messages,
-- then it calls the processMessage function to process the messages, and
-- acknoleges the list message in one request.
pubsubService :: IO ()
pubsubService = do
lgr <- Google.newLogger Google.Trace stdout
env <-
Google.newEnv
<&> (Google.envLogger .~ lgr)
. (Google.envScopes .~ PubSub.pubSubScope)
let pullReq = PubSub.pullRequest & PubSub.prMaxMessages ?~ 1
let subscription = "projects/<project-id>/subscriptions/<topic-id>"
runResourceT . Google.runGoogle env $ do
pullResp <- Google.send $ PubSub.projectsSubscriptionsPull pullReq subscription
let messages = pullResp ^. PubSub.prReceivedMessages
traceM $ "Received messages " ++ show messages
msgIds <- mapM processMessage messages
-- acknowledge the list of messages pulled from pubsub, so they can be taken out of the queue
_ <- Google.send $ PubSub.projectsSubscriptionsAcknowledge (PubSub.acknowledgeRequest & PubSub.arAckIds .~ catMaybes msgIds) subscription
pure ()
-- processMessage :: PubSub.ReceivedMessage -> IO PubSub.ReceivedMessage
processMessage msg = do
let rmMsg = msg ^. PubSub.rmMessage
let rmAckId = msg ^. PubSub.rmAckId
traceM $ "Message " ++ show rmMsg ++ "; ID: " ++ show rmAckId
pure rmAckId