# -*- coding: UTF-8 -*-
"""Classes to facilitate connections to Google Cloud Pub/Sub streams.
.. autosummary::
Consumer
Response
Subscription
Topic
----
"""
import concurrent.futures
import datetime
import importlib.resources
import logging
import queue
import time
from typing import Any, Callable, List, Optional, Union
import attrs
import attrs.validators
import google.api_core.exceptions
import google.cloud.pubsub_v1
from . import exceptions
from .alert import Alert
from .auth import Auth
LOGGER = logging.getLogger(__name__)
PACKAGE_DIR = importlib.resources.files(__package__)
def msg_callback_example(alert: Alert) -> "Response":
print(f"processing message: {alert.metadata['message_id']}")
return Response(ack=True, result=alert.dict)
def batch_callback_example(batch: list) -> None:
oids = set(alert.dict["objectId"] for alert in batch)
print(f"num oids: {len(oids)}")
print(f"batch length: {len(batch)}")
[docs]
def pull_batch(
subscription: Union[str, "Subscription"],
max_messages: int = 1,
schema_name: str = str(),
**subscription_kwargs,
) -> List["Alert"]:
"""Pulls a single batch of messages from the specified subscription.
Args:
subscription (str or Subscription):
The subscription to be pulled. If str, the name of the subscription. The subscription is
expected to exist in Google Cloud.
max_messages (int):
The maximum number of messages to be pulled.
schema_name (str):
The schema name of the alerts in the subscription. See :meth:`pittgoogle.registry.Schemas.names`
for the list of options. Passed to Alert for unpacking. If not provided, some properties of
the Alert may not be available.
**subscription_kwargs:
Keyword arguments used to create the :class:`Subscription` object, if needed.
Returns:
list[Alert]:
A list of Alert objects representing the pulled messages.
"""
if isinstance(subscription, str):
subscription = Subscription(subscription, **subscription_kwargs)
try:
response = subscription.client.pull(
{"subscription": subscription.path, "max_messages": max_messages}
)
except google.api_core.exceptions.NotFound as excep:
msg = f"NotFound: {subscription.path}. You may need to create the subscription using `pittgoogle.Subscription.touch`."
raise exceptions.CloudConnectionError(msg) from excep
alerts = [
Alert.from_msg(msg.message, schema_name=schema_name) for msg in response.received_messages
]
ack_ids = [msg.ack_id for msg in response.received_messages]
if len(ack_ids) > 0:
subscription.client.acknowledge({"subscription": subscription.path, "ack_ids": ack_ids})
return alerts
[docs]
@attrs.define
class Topic:
"""Class to manage a Google Cloud Pub/Sub topic.
Args:
name (str):
Name of the Pub/Sub topic.
projectid (str, optional):
The topic owner's Google Cloud project ID. Either this or ``auth`` is required. Use this
if you are connecting to a subscription owned by a different project than this topic.
:class:`pittgoogle.registry.ProjectIds` is a registry containing Pitt-Google's project IDs.
auth (Auth, optional):
Credentials for the Google Cloud project that owns this topic. If not provided,
it will be created from environment variables when needed.
client (google.cloud.pubsub_v1.PublisherClient, optional):
Pub/Sub client that will be used to access the topic. If not provided,
a new client will be created the first time it is requested.
Example:
.. code-block:: python
# Create a new topic in your project
my_topic = pittgoogle.Topic(name="my-new-topic")
my_topic.touch()
# Create a dummy message to publish
my_alert = pittgoogle.Alert(
dict={"message": "Hello, World!"}, # the message payload
attributes={"custom_key": "custom_value"} # custom attributes for the message
)
# Publish the message to the topic
my_topic.publish(my_alert) # returns the ID of the published message
To pull the message back from the topic, use a :class:`Subscription`.
----
"""
name: str = attrs.field()
_projectid: str = attrs.field(default=None)
_auth: Auth = attrs.field(
default=None, validator=attrs.validators.optional(attrs.validators.instance_of(Auth))
)
_client: Optional[google.cloud.pubsub_v1.PublisherClient] = attrs.field(
default=None,
validator=attrs.validators.optional(
attrs.validators.instance_of(google.cloud.pubsub_v1.PublisherClient)
),
)
[docs]
@classmethod
def from_cloud(
cls,
name: str,
*,
projectid: str,
survey: Optional[str] = None,
testid: Optional[str] = None,
):
"""Creates a :class:`Topic` with a :attr:`Topic.client` that uses implicit credentials.
Args:
name (str):
Name of the topic. If ``survey`` and/or ``testid`` are provided, they will be added to this
name following the Pitt-Google naming syntax.
projectid (str):
Project ID of the Google Cloud project that owns this resource. Project IDs used by
Pitt-Google are listed in the registry for convenience (:class:`pittgoogle.registry.ProjectIds`).
Required because it cannot be retrieved from the `client` and there is no explicit `auth`.
survey (str, optional):
Name of the survey. If provided, it will be prepended to `name` following the
Pitt-Google naming syntax.
testid (str, optional):
Pipeline identifier. If this is not None, False, or "False", it will be appended to
the ``name`` following the Pitt-Google naming syntax. This is used to allow pipeline modules
to find the correct resources without interfering with other pipelines that may have
deployed resources with the same base names (e.g., for development and testing purposes).
"""
# if survey and/or testid passed in, use them to construct full name using the pitt-google naming syntax
if survey is not None:
name = f"{survey}-{name}"
# must accommodate False and "False" for consistency with the broker pipeline
if testid and testid != "False":
name = f"{name}-{testid}"
return cls(name, projectid=projectid, client=google.cloud.pubsub_v1.PublisherClient())
[docs]
@classmethod
def from_path(cls, path) -> "Topic":
"""Parse the ``path`` and return a new :class:`Topic`."""
_, projectid, _, name = path.split("/")
return cls(name, projectid)
@property
def auth(self) -> Auth:
"""Credentials for the Google Cloud project that owns this topic.
This will be created from environment variables if needed.
"""
if self._auth is None:
self._auth = Auth()
return self._auth
@property
def path(self) -> str:
"""Fully qualified path to the topic."""
return f"projects/{self.projectid}/topics/{self.name}"
@property
def projectid(self) -> str:
"""The topic owner's Google Cloud project ID."""
if self._projectid is None:
self._projectid = self.auth.GOOGLE_CLOUD_PROJECT
return self._projectid
@property
def client(self) -> google.cloud.pubsub_v1.PublisherClient:
"""Pub/Sub client for topic access.
Will be created using :attr:`Topic.auth.credentials` if necessary.
"""
if self._client is None:
self._client = google.cloud.pubsub_v1.PublisherClient(
credentials=self.auth.credentials
)
return self._client
[docs]
def touch(self) -> None:
"""Test the connection to the topic, creating it if necessary.
.. tip: This is only necessary if you need to interact with the topic directly to do things like
*publish* messages. In particular, this is *not* necessary if you are trying to *pull* messages.
All users can create a subscription to a Pitt-Google topic and pull messages from it, even
if they can't actually touch the topic.
Raises:
CloudConnectionError:
'PermissionDenied' if :attr:`Topic.auth` does not have permission to get or create the table.
"""
try:
# Check if topic exists and we can connect.
self.client.get_topic(topic=self.path)
LOGGER.info(f"topic exists: {self.path}")
except google.api_core.exceptions.NotFound:
try:
# Try to create a new topic.
self.client.create_topic(name=self.path)
LOGGER.info(f"topic created: {self.path}")
except google.api_core.exceptions.PermissionDenied as excep:
# User has access to this topic's project but insufficient permissions to create a new topic.
# Assume this is a simple IAM problem rather than the user being confused about when
# to call this method (as can happen below).
msg = (
"PermissionDenied: You seem to have appropriate IAM permissions to get topics "
"in this project but not to create them."
)
raise exceptions.CloudConnectionError(msg) from excep
except google.api_core.exceptions.PermissionDenied as excep:
# User does not have permission to get this topic.
# This is not a problem if they only want to subscribe, but can be confusing.
# [TODO] Maybe users should just be allowed to get the topic?
msg = (
f"PermissionDenied: The provided `pittgoogle.Auth` cannot get topic {self.path}. "
"Either the provided Auth has a different project ID, or your credentials just don't "
"have appropriate IAM permissions. \nNote that if you are a user trying to connect to "
"a Pitt-Google topic, your Auth is _expected_ to have a different project ID and you "
"can safely ignore this error (and avoid running `Topic.touch` in the future). "
"It does not impact your ability to attach a subscription and pull messages."
)
raise exceptions.CloudConnectionError(msg) from excep
[docs]
def delete(self) -> None:
"""Delete the topic."""
try:
self.client.delete_topic(topic=self.path)
except google.api_core.exceptions.NotFound:
LOGGER.info(f"nothing to delete. topic not found: {self.path}")
else:
LOGGER.info(f"deleted topic: {self.path}")
[docs]
def publish(self, alert: "Alert") -> int:
"""Publish a message with :attr:`pittgoogle.Alert.dict` as the payload and
:attr:`pittgoogle.Alert.attributes` as the attributes."""
# Pub/Sub requires attribute keys and values to be strings. Sort the keys while we're at it.
attributes = {str(key): str(alert.attributes[key]) for key in sorted(alert.attributes)}
message = alert.schema.serialize(alert.dict)
future = self.client.publish(self.path, data=message, **attributes)
return future.result()
[docs]
@attrs.define
class Subscription:
"""Class to manage a Google Cloud Pub/Sub subscription.
Args:
name (str):
Name of the Pub/Sub subscription.
auth (Auth, optional):
Credentials for the Google Cloud project that will be used to connect to the subscription.
If not provided, it will be created from environment variables.
topic (Topic, optional):
Topic this subscription should be attached to. Required only when the subscription needs to be created.
client (google.cloud.pubsub_v1.SubscriberClient, optional):
Pub/Sub client that will be used to access the subscription.
If not provided, a new client will be created the first time it is needed.
schema_name (str):
Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.alert.Alert` for unpacking.
If not provided, some properties of the Alert may not be available. For a list of schema names, see
:meth:`pittgoogle.registry.Schemas.names`.
Example:
Create a subscription to Pitt-Google's 'ztf-loop' topic and pull messages:
.. code-block:: python
# Topic that the subscription should be connected to
topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds().pittgoogle)
# Create the subscription
subscription = pittgoogle.Subscription(
name="my-ztf-loop-subscription", topic=topic, schema_name="ztf"
)
subscription.touch()
# Pull a small batch of alerts
alerts = subscription.pull_batch(max_messages=4)
----
"""
name: str = attrs.field()
auth: Auth = attrs.field(factory=Auth, validator=attrs.validators.instance_of(Auth))
topic: Optional[Topic] = attrs.field(
default=None, validator=attrs.validators.optional(attrs.validators.instance_of(Topic))
)
_client: Optional[google.cloud.pubsub_v1.SubscriberClient] = attrs.field(
default=None,
validator=attrs.validators.optional(
attrs.validators.instance_of(google.cloud.pubsub_v1.SubscriberClient)
),
)
schema_name: str | None = attrs.field(default=None)
@property
def projectid(self) -> str:
"""Subscription owner's Google Cloud project ID."""
return self.auth.GOOGLE_CLOUD_PROJECT
@property
def path(self) -> str:
"""Fully qualified path to the subscription."""
return f"projects/{self.projectid}/subscriptions/{self.name}"
@property
def client(self) -> google.cloud.pubsub_v1.SubscriberClient:
"""Pub/Sub client that will be used to access the subscription.
If not provided, a new client will be created using :attr:`Subscription.auth`.
"""
if self._client is None:
self._client = google.cloud.pubsub_v1.SubscriberClient(
credentials=self.auth.credentials
)
return self._client
[docs]
def touch(self) -> None:
"""Test the connection to the subscription, creating it if necessary.
Note that messages published to the topic before the subscription was created are
not available to the subscription.
Raises:
TypeError:
if the subscription needs to be created but no topic was provided.
CloudConnectionError:
- 'NotFound` if the subscription needs to be created but the topic does not exist in Google Cloud.
- 'InvalidTopic' if the subscription exists but the user explicitly provided a topic that
this subscription is not actually attached to.
"""
try:
subscrip = self.client.get_subscription(subscription=self.path)
LOGGER.info(f"subscription exists: {self.path}")
except google.api_core.exceptions.NotFound:
subscrip = self._create() # may raise TypeError or CloudConnectionError
LOGGER.info(f"subscription created: {self.path}")
self._set_topic(subscrip.topic) # may raise CloudConnectionError
def _create(self) -> google.cloud.pubsub_v1.types.Subscription:
if self.topic is None:
raise TypeError("The subscription needs to be created but no topic was provided.")
try:
return self.client.create_subscription(name=self.path, topic=self.topic.path)
# this error message is not very clear. let's help.
except google.api_core.exceptions.NotFound as excep:
msg = f"NotFound: The subscription cannot be created because the topic does not exist: {self.topic.path}"
raise exceptions.CloudConnectionError(msg) from excep
def _set_topic(self, connected_topic_path) -> None:
# if the topic is invalid, raise an error
if (self.topic is not None) and (connected_topic_path != self.topic.path):
msg = (
"InvalidTopic: The subscription exists but is attached to a different topic.\n"
f"\tFound topic: {connected_topic_path}\n"
f"\tExpected topic: {self.topic.path}\n"
"Either use the found topic or delete the existing subscription and try again."
)
raise exceptions.CloudConnectionError(msg)
# if the topic isn't already set, do it now
if self.topic is None:
self.topic = Topic.from_path(connected_topic_path)
LOGGER.debug("topic validated")
[docs]
def delete(self) -> None:
"""Delete the subscription."""
try:
self.client.delete_subscription(subscription=self.path)
except google.api_core.exceptions.NotFound:
LOGGER.info(f"nothing to delete. subscription not found: {self.path}")
else:
LOGGER.info(f"deleted subscription: {self.path}")
[docs]
def pull_batch(self, max_messages: int = 1) -> List["Alert"]:
"""Pull a single batch of messages.
This method is recommended for use cases that need a small number of alerts on-demand,
often for testing and development.
This method is *not* recommended for long-running listeners as it is likely to be unstable.
Use :meth:`Consumer.stream` instead. This is Google's recommendation about how to use the
Google API that underpins these pittgoogle methods.
Args:
max_messages (int):
Maximum number of messages to be pulled.
Returns:
list[Alert]:
A list of Alert objects representing the pulled messages.
"""
# Wrapping the module-level function
return pull_batch(self, max_messages=max_messages, schema_name=self.schema_name)
[docs]
def purge(self):
"""Purge all messages from the subscription."""
msg = (
"WARNING: This is permanent.\n"
f"Are you sure you want to purge all messages from the subscription\n{self.path}?\n"
"(y/[n]): "
)
proceed = input(msg)
if proceed.lower() == "y":
LOGGER.info(f"Purging all messages from subscription {self.path}")
_ = self.client.seek(
request=dict(subscription=self.path, time=datetime.datetime.now())
)
[docs]
@attrs.define
class Consumer:
"""Consumer class to pull a Pub/Sub subscription and process messages.
Args:
subscription (str or Subscription):
Pub/Sub subscription to be pulled (it must already exist in Google Cloud).
msg_callback (callable):
Function that will process a single message. It should accept a Alert and return a Response.
batch_callback (callable, optional):
Function that will process a batch of results. It should accept a list of the results
returned by the msg_callback.
batch_maxn (int, optional):
Maximum number of messages in a batch. This has no effect if batch_callback is None.
batch_max_wait_between_messages (int, optional):
Max number of seconds to wait between messages before processing a batch. This has
no effect if batch_callback is None.
max_backlog (int, optional):
Maximum number of pulled but unprocessed messages before pausing the pull.
max_workers (int, optional):
Maximum number of workers for the executor. This has no effect if an executor is provided.
executor (concurrent.futures.ThreadPoolExecutor, optional):
Executor to be used by the Google API to pull and process messages in the background.
Example:
Open a streaming pull. Recommended for long-running listeners. This will pull and process
messages in the background, indefinitely. User must supply a callback that processes a single message.
It should accept a :class:`pittgoogle.pubsub.Alert` and return a :class:`pittgoogle.pubsub.Response`.
Optionally, can provide a callback that processes a batch of messages. Note that messages are
acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended
to do as much processing as possible in the message callback and use a batch callback only when
necessary.
.. code-block:: python
def my_msg_callback(alert):
# process the message here. we'll just print the ID.
print(f"processing message: {alert.metadata['message_id']}")
# return a Response. include a result if using a batch callback.
return pittgoogle.pubsub.Response(ack=True, result=alert.dict)
def my_batch_callback(results):
# process the batch of results (list of results returned by my_msg_callback)
# we'll just print the number of results in the batch
print(f"batch processing {len(results)} results)
consumer = pittgoogle.pubsub.Consumer(
subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
)
# open the stream in the background and process messages through the callbacks
# this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
consumer.stream()
----
"""
_subscription: Union[str, Subscription] = attrs.field(
validator=attrs.validators.instance_of((str, Subscription))
)
msg_callback: Callable[["Alert"], "Response"] = attrs.field(
validator=attrs.validators.is_callable()
)
batch_callback: Optional[Callable[[list], None]] = attrs.field(
default=None, validator=attrs.validators.optional(attrs.validators.is_callable())
)
batch_maxn: int = attrs.field(default=100, converter=int)
batch_max_wait_between_messages: int = attrs.field(default=30, converter=int)
max_backlog: int = attrs.field(default=1000, validator=attrs.validators.gt(0))
max_workers: Optional[int] = attrs.field(
default=None, validator=attrs.validators.optional(attrs.validators.instance_of(int))
)
_executor: concurrent.futures.ThreadPoolExecutor = attrs.field(
default=None,
validator=attrs.validators.optional(
attrs.validators.instance_of(concurrent.futures.ThreadPoolExecutor)
),
)
_queue: queue.Queue = attrs.field(factory=queue.Queue, init=False)
streaming_pull_future: google.cloud.pubsub_v1.subscriber.futures.StreamingPullFuture = (
attrs.field(default=None, init=False)
)
@property
def subscription(self) -> Subscription:
"""Subscription to be consumed."""
if isinstance(self._subscription, str):
self._subscription = Subscription(self._subscription)
self._subscription.touch()
return self._subscription
@property
def executor(self) -> concurrent.futures.ThreadPoolExecutor:
"""Executor to be used by the Google API for a streaming pull."""
if self._executor is None:
self._executor = concurrent.futures.ThreadPoolExecutor(self.max_workers)
return self._executor
[docs]
def stream(self, block: bool = True) -> None:
"""Open the stream in a background thread and process messages through the callbacks.
Recommended for long-running listeners.
Args:
block (bool):
Whether to block the main thread while the stream is open. If `True`, block
indefinitely (use `Ctrl-C` to close the stream and unblock). If `False`, open the
stream and then return (use :meth:`~Consumer.stop()` to close the stream).
This must be `True` in order to use a `batch_callback`.
"""
# open a streaming-pull and process messages through the callback, in the background
self._open_stream()
if not block:
msg = "The stream is open in the background. Use consumer.stop() to close it."
print(msg)
LOGGER.info(msg)
return
try:
self._process_batches()
# catch all exceptions and attempt to close the stream before raising
except (KeyboardInterrupt, Exception):
self.stop()
raise
def _open_stream(self) -> None:
"""Open a streaming pull and process messages in the background."""
LOGGER.info(f"opening a streaming pull on subscription: {self.subscription.path}")
self.streaming_pull_future = self.subscription.client.subscribe(
self.subscription.path,
self._callback,
flow_control=google.cloud.pubsub_v1.types.FlowControl(max_messages=self.max_backlog),
scheduler=google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler(
executor=self.executor
),
await_callbacks_on_shutdown=True,
)
def _callback(self, message: google.cloud.pubsub_v1.types.PubsubMessage) -> None:
"""Unpack the message, run the :attr:`~Consumer.msg_callback` and handle the response."""
# LOGGER.info("callback started")
response = self.msg_callback(Alert(msg=message)) # Response
# LOGGER.info(f"{response.result}")
if response.result is not None:
self._queue.put(response.result)
if response.ack:
message.ack()
else:
message.nack()
def _process_batches(self):
"""Run the batch callback if provided, otherwise just sleep.
This never returns -- it runs until it encounters an error.
"""
# if there's no batch_callback there's nothing to do except wait until the process is killed
if self.batch_callback is None:
while True:
time.sleep(60)
batch, count = [], 0
while True:
try:
batch.append(
self._queue.get(block=True, timeout=self.batch_max_wait_between_messages)
)
except queue.Empty:
# hit the max wait. process the batch
self.batch_callback(batch)
batch, count = [], 0
# catch anything else and try to process the batch before raising
except (KeyboardInterrupt, Exception):
self.batch_callback(batch)
raise
else:
self._queue.task_done()
count += 1
if count == self.batch_maxn:
# hit the max number of results. process the batch
self.batch_callback(batch)
batch, count = [], 0
[docs]
def stop(self) -> None:
"""Attempt to shutdown the streaming pull and exit the background threads gracefully."""
LOGGER.info("closing the stream")
self.streaming_pull_future.cancel() # trigger the shutdown
self.streaming_pull_future.result() # block until the shutdown is complete
[docs]
def pull_batch(self, max_messages: int = 1) -> List["Alert"]:
"""Pull a single batch of messages.
Recommended for testing. Not recommended for long-running listeners (use the
:meth:`~Consumer.stream` method instead).
Args:
max_messages (int):
Maximum number of messages to be pulled.
Returns:
list[Alert]:
A list of Alert objects representing the pulled messages.
"""
return self.subscription.pull_batch(max_messages=max_messages)
[docs]
@attrs.define(kw_only=True, frozen=True)
class Response:
"""Container for a response, to be returned by a :meth:`Consumer.msg_callback`.
Args:
ack (bool):
Whether to acknowledge the message. Use `True` if the message was processed successfully,
`False` if an error was encountered and you would like Pub/Sub to redeliver the message at
a later time. Note that once a message is acknowledged to Pub/Sub it is permanently deleted
(unless the subscription has been explicitly configured to retain acknowledged messages).
result (Any):
Anything the user wishes to return. If not `None`, the Consumer will collect the results
in a list and pass the list to the user's batch callback for further processing.
If there is no batch callback the results will be lost.
----
"""
ack: bool = attrs.field(default=True, converter=bool)
result: Any = attrs.field(default=None)