pittgoogle.pubsub

Classes to facilitate connections to Google Cloud Pub/Sub streams.

Consumer(subscription, msg_callback[, ...])

Consumer class to pull a Pub/Sub subscription and process messages.

Response(*[, ack, result])

Container for a response, to be returned by a Consumer.msg_callback().

Subscription(name[, auth, topic, client, ...])

Class to manage a Google Cloud Pub/Sub subscription.

Topic(name[, projectid, auth, client])

Class to manage a Google Cloud Pub/Sub topic.


class pittgoogle.pubsub.Consumer(subscription: str | Subscription, msg_callback: Callable[[Alert], Response], batch_callback: Callable[[list], None] | None = None, batch_maxn=100, batch_max_wait_between_messages=30, max_backlog: int = 1000, max_workers: int | None = None, executor: ThreadPoolExecutor = None)[source]

Consumer class to pull a Pub/Sub subscription and process messages.

Parameters:
  • subscription (str or Subscription) – Pub/Sub subscription to be pulled (it must already exist in Google Cloud).

  • msg_callback (callable) – Function that will process a single message. It should accept a Alert and return a Response.

  • batch_callback (callable, optional) – Function that will process a batch of results. It should accept a list of the results returned by the msg_callback.

  • batch_maxn (int, optional) – Maximum number of messages in a batch. This has no effect if batch_callback is None.

  • batch_max_wait_between_messages (int, optional) – Max number of seconds to wait between messages before processing a batch. This has no effect if batch_callback is None.

  • max_backlog (int, optional) – Maximum number of pulled but unprocessed messages before pausing the pull.

  • max_workers (int, optional) – Maximum number of workers for the executor. This has no effect if an executor is provided.

  • executor (concurrent.futures.ThreadPoolExecutor, optional) – Executor to be used by the Google API to pull and process messages in the background.

Example

Open a streaming pull. Recommended for long-running listeners. This will pull and process messages in the background, indefinitely. User must supply a callback that processes a single message. It should accept a pittgoogle.pubsub.Alert and return a pittgoogle.pubsub.Response. Optionally, can provide a callback that processes a batch of messages. Note that messages are acknowledged (and thus permanently deleted) _before_ the batch callback runs, so it is recommended to do as much processing as possible in the message callback and use a batch callback only when necessary.

def my_msg_callback(alert):
    # process the message here. we'll just print the ID.
    print(f"processing message: {alert.metadata['message_id']}")

    # return a Response. include a result if using a batch callback.
    return pittgoogle.pubsub.Response(ack=True, result=alert.dict)

def my_batch_callback(results):
    # process the batch of results (list of results returned by my_msg_callback)
    # we'll just print the number of results in the batch
    print(f"batch processing {len(results)} results)

consumer = pittgoogle.pubsub.Consumer(
    subscription=subscription, msg_callback=my_msg_callback, batch_callback=my_batch_callback
)

# open the stream in the background and process messages through the callbacks
# this blocks indefinitely. use `Ctrl-C` to close the stream and unblock
consumer.stream()

property executor: ThreadPoolExecutor

Executor to be used by the Google API for a streaming pull.

pull_batch(max_messages: int = 1) List[Alert][source]

Pull a single batch of messages.

Recommended for testing. Not recommended for long-running listeners (use the stream() method instead).

Parameters:

max_messages (int) – Maximum number of messages to be pulled.

Returns:

A list of Alert objects representing the pulled messages.

Return type:

list[Alert]

stop() None[source]

Attempt to shutdown the streaming pull and exit the background threads gracefully.

stream(block: bool = True) None[source]

Open the stream in a background thread and process messages through the callbacks.

Recommended for long-running listeners.

Parameters:

block (bool) – Whether to block the main thread while the stream is open. If True, block indefinitely (use Ctrl-C to close the stream and unblock). If False, open the stream and then return (use stop() to close the stream). This must be True in order to use a batch_callback.

property subscription: Subscription

Subscription to be consumed.

class pittgoogle.pubsub.Response(*, ack=True, result: Any = None)[source]

Container for a response, to be returned by a Consumer.msg_callback().

Parameters:
  • ack (bool) – Whether to acknowledge the message. Use True if the message was processed successfully, False if an error was encountered and you would like Pub/Sub to redeliver the message at a later time. Note that once a message is acknowledged to Pub/Sub it is permanently deleted (unless the subscription has been explicitly configured to retain acknowledged messages).

  • result (Any) – Anything the user wishes to return. If not None, the Consumer will collect the results in a list and pass the list to the user’s batch callback for further processing. If there is no batch callback the results will be lost.


class pittgoogle.pubsub.Subscription(name: str, auth: Auth = NOTHING, topic: Topic | None = None, client: SubscriberClient | None = None, schema_name: str | None = None)[source]

Class to manage a Google Cloud Pub/Sub subscription.

Parameters:
  • name (str) – Name of the Pub/Sub subscription.

  • auth (Auth, optional) – Credentials for the Google Cloud project that will be used to connect to the subscription. If not provided, it will be created from environment variables.

  • topic (Topic, optional) – Topic this subscription should be attached to. Required only when the subscription needs to be created.

  • client (google.cloud.pubsub_v1.SubscriberClient, optional) – Pub/Sub client that will be used to access the subscription. If not provided, a new client will be created the first time it is needed.

  • schema_name (str) – Schema name of the alerts in the subscription. Passed to pittgoogle.alert.Alert for unpacking. If not provided, some properties of the Alert may not be available. For a list of schema names, see pittgoogle.registry.Schemas.names().

Example

Create a subscription to Pitt-Google’s ‘ztf-loop’ topic and pull messages:

# Topic that the subscription should be connected to
topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds().pittgoogle)

# Create the subscription
subscription = pittgoogle.Subscription(
    name="my-ztf-loop-subscription", topic=topic, schema_name="ztf"
)
subscription.touch()

# Pull a small batch of alerts
alerts = subscription.pull_batch(max_messages=4)

property client: SubscriberClient

Pub/Sub client that will be used to access the subscription.

If not provided, a new client will be created using Subscription.auth.

delete() None[source]

Delete the subscription.

property path: str

Fully qualified path to the subscription.

property projectid: str

Subscription owner’s Google Cloud project ID.

pull_batch(max_messages: int = 1) List[Alert][source]

Pull a single batch of messages.

This method is recommended for use cases that need a small number of alerts on-demand, often for testing and development.

This method is not recommended for long-running listeners as it is likely to be unstable. Use Consumer.stream() instead. This is Google’s recommendation about how to use the Google API that underpins these pittgoogle methods.

Parameters:

max_messages (int) – Maximum number of messages to be pulled.

Returns:

A list of Alert objects representing the pulled messages.

Return type:

list[Alert]

purge()[source]

Purge all messages from the subscription.

touch() None[source]

Test the connection to the subscription, creating it if necessary.

Note that messages published to the topic before the subscription was created are not available to the subscription.

Raises:
  • TypeError – if the subscription needs to be created but no topic was provided.

  • CloudConnectionError

    • ‘NotFound` if the subscription needs to be created but the topic does not exist in Google Cloud. - ‘InvalidTopic’ if the subscription exists but the user explicitly provided a topic that this subscription is not actually attached to.

class pittgoogle.pubsub.Topic(name: str, projectid: str = None, auth: Auth = None, client: PublisherClient | None = None)[source]

Class to manage a Google Cloud Pub/Sub topic.

Parameters:
  • name (str) – Name of the Pub/Sub topic.

  • projectid (str, optional) – The topic owner’s Google Cloud project ID. Either this or auth is required. Use this if you are connecting to a subscription owned by a different project than this topic. pittgoogle.registry.ProjectIds is a registry containing Pitt-Google’s project IDs.

  • auth (Auth, optional) – Credentials for the Google Cloud project that owns this topic. If not provided, it will be created from environment variables when needed.

  • client (google.cloud.pubsub_v1.PublisherClient, optional) – Pub/Sub client that will be used to access the topic. If not provided, a new client will be created the first time it is requested.

Example

# Create a new topic in your project
my_topic = pittgoogle.Topic(name="my-new-topic")
my_topic.touch()

# Create a dummy message to publish
my_alert = pittgoogle.Alert(
    dict={"message": "Hello, World!"},  # the message payload
    attributes={"custom_key": "custom_value"}  # custom attributes for the message
)

# Publish the message to the topic
my_topic.publish(my_alert)  # returns the ID of the published message

To pull the message back from the topic, use a Subscription.


property auth: Auth

Credentials for the Google Cloud project that owns this topic.

This will be created from environment variables if needed.

property client: PublisherClient

Pub/Sub client for topic access.

Will be created using Topic.auth.credentials if necessary.

delete() None[source]

Delete the topic.

classmethod from_cloud(name: str, *, projectid: str, survey: str | None = None, testid: str | None = None)[source]

Creates a Topic with a Topic.client that uses implicit credentials.

Parameters:
  • name (str) – Name of the topic. If survey and/or testid are provided, they will be added to this name following the Pitt-Google naming syntax.

  • projectid (str) – Project ID of the Google Cloud project that owns this resource. Project IDs used by Pitt-Google are listed in the registry for convenience (pittgoogle.registry.ProjectIds). Required because it cannot be retrieved from the client and there is no explicit auth.

  • survey (str, optional) – Name of the survey. If provided, it will be prepended to name following the Pitt-Google naming syntax.

  • testid (str, optional) – Pipeline identifier. If this is not None, False, or “False”, it will be appended to the name following the Pitt-Google naming syntax. This is used to allow pipeline modules to find the correct resources without interfering with other pipelines that may have deployed resources with the same base names (e.g., for development and testing purposes).

classmethod from_path(path) Topic[source]

Parse the path and return a new Topic.

property path: str

Fully qualified path to the topic.

property projectid: str

The topic owner’s Google Cloud project ID.

publish(alert: Alert) int[source]

Publish a message with pittgoogle.Alert.dict as the payload and pittgoogle.Alert.attributes as the attributes.

touch() None[source]

Test the connection to the topic, creating it if necessary.

Raises:

CloudConnectionError – ‘PermissionDenied’ if Topic.auth does not have permission to get or create the table.

pittgoogle.pubsub.pull_batch(subscription: str | Subscription, max_messages: int = 1, schema_name: str = '', **subscription_kwargs) List[Alert][source]

Pulls a single batch of messages from the specified subscription.

Parameters:
  • subscription (str or Subscription) – The subscription to be pulled. If str, the name of the subscription. The subscription is expected to exist in Google Cloud.

  • max_messages (int) – The maximum number of messages to be pulled.

  • schema_name (str) – The schema name of the alerts in the subscription. See pittgoogle.registry.Schemas.names() for the list of options. Passed to Alert for unpacking. If not provided, some properties of the Alert may not be available.

  • **subscription_kwargs – Keyword arguments used to create the Subscription object, if needed.

Returns:

A list of Alert objects representing the pulled messages.

Return type:

list[Alert]