Source code for pittgoogle.alert

# -*- coding: UTF-8 -*-
"""Classes for working with astronomical alerts.

.. autosummary::

    Alert

----
"""
import base64
import datetime
import importlib.resources
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Mapping, Union

import attrs
import google.cloud.pubsub_v1

from . import registry, types_, exceptions
from .schema import Schema  # so 'schema' module doesn't clobber 'Alert.schema' attribute

if TYPE_CHECKING:
    import pandas as pd  # always lazy-load pandas. it hogs memory on cloud functions and run

LOGGER = logging.getLogger(__name__)
PACKAGE_DIR = importlib.resources.files(__package__)


[docs] @attrs.define(kw_only=True) class Alert: """Container for an astronomical alert. To create an `Alert`, use one of the `from_*` methods like :meth:`pittgoogle.Alert.from_dict`. Instances of this class are also returned by other calls like :meth:`pittgoogle.pubsub.Subscription.pull_batch`. Args: dict (dict, optional): The alert data as a dictionary. If not provided, it will be loaded from the attributes (dict, optional): Attributes or custom metadata for the alert. schema_name (str): Name of the schema for the alert. This is use to deserialize the alert bytes. See :meth:`pittgoogle.registry.Schemas.names` for a list of options. If not provided, some properties of the `Alert` may not be available. msg (PubsubMessageLike or google.cloud.pubsub_v1.types.PubsubMessage, optional): The incoming Pub/Sub message object. This class is documented at `<https://cloud.google.com/python/docs/reference/pubsub/latest/google.cloud.pubsub_v1.types.PubsubMessage>`__. path (pathlib.Path, optional): Path to a file containing the alert data. ---- """ _dict: Mapping | None = attrs.field(default=None) _attributes: Mapping[str, str] | None = attrs.field(default=None) schema_name: str | None = attrs.field(default=None) msg: google.cloud.pubsub_v1.types.PubsubMessage | types_.PubsubMessageLike | None = ( attrs.field(default=None) ) path: Path | None = attrs.field(default=None) # Use "Union" because " | " is throwing an error when combined with forward references. _dataframe: Union["pd.DataFrame", None] = attrs.field(default=None) _schema: Schema | None = attrs.field(default=None, init=False) # ---- class methods ---- #
[docs] @classmethod def from_cloud_run(cls, envelope: Mapping, schema_name: str | None = None) -> "Alert": """Create an `Alert` from an HTTP request envelope containing a Pub/Sub message, as received by a Cloud Run module. Args: envelope (dict): The HTTP request envelope containing the Pub/Sub message. schema_name (str, optional): The name of the schema to use. Defaults to None. Returns: Alert: An instance of the `Alert` class. Raises: BadRequest: If the Pub/Sub message is invalid or missing. Example: Code for a Cloud Run module that uses this method to open a ZTF alert: .. code-block:: python import pittgoogle # flask is used to work with HTTP requests, which trigger Cloud Run modules # the request contains the Pub/Sub message, which contains the alert packet import flask app = flask.Flask(__name__) # function that receives the request @app.route("/", methods=["POST"]) def index(): try: # unpack the alert # if the request does not contain a valid message, this raises a `BadRequest` alert = pittgoogle.Alert.from_cloud_run(envelope=flask.request.get_json(), schema_name="ztf") except pittgoogle.exceptions.BadRequest as exc: # return the error text and an HTTP 400 Bad Request code return str(exc), 400 # continue processing the alert # when finished, return an empty string and an HTTP success code return "", 204 """ # check whether received message is valid, as suggested by Cloud Run docs if not envelope: raise exceptions.BadRequest("Bad Request: no Pub/Sub message received") if not isinstance(envelope, dict) or "message" not in envelope: raise exceptions.BadRequest("Bad Request: invalid Pub/Sub message format") # convert the message publish_time string -> datetime # occasionally the string doesn't include microseconds so we need a try/except publish_time = envelope["message"]["publish_time"].replace("Z", "+00:00") try: publish_time = datetime.datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S.%f%z") except ValueError: publish_time = datetime.datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S%z") return cls( msg=types_.PubsubMessageLike( # data is required. the rest should be present in the message, but use get to be lenient data=base64.b64decode(envelope["message"]["data"].encode("utf-8")), attributes=envelope["message"].get("attributes"), message_id=envelope["message"].get("message_id"), publish_time=publish_time, ordering_key=envelope["message"].get("ordering_key"), ), schema_name=schema_name, )
[docs] @classmethod def from_dict( cls, payload: Mapping, attributes: Mapping[str, str] | None = None, schema_name: str | None = None, ) -> "Alert": """Create an `Alert` object from the given `payload` dictionary. Args: payload (dict): The dictionary containing the data for the `Alert` object. attributes (Mapping[str, str], None): Additional attributes for the `Alert` object. Defaults to None. schema_name (str, None): The name of the schema. Defaults to None. Returns: Alert: An instance of the `Alert` class. """ return cls(dict=payload, attributes=attributes, schema_name=schema_name)
[docs] @classmethod def from_msg( cls, msg: "google.cloud.pubsub_v1.types.PubsubMessage", schema_name: str | None = None ) -> "Alert": """Create an `Alert` object from a `google.cloud.pubsub_v1.types.PubsubMessage`. Args: msg (google.cloud.pubsub_v1.types.PubsubMessage): The PubsubMessage object to create the Alert from. schema_name (str, optional): The name of the schema to use for the Alert. Defaults to None. Returns: Alert: The created `Alert` object. """ return cls(msg=msg, schema_name=schema_name)
[docs] @classmethod def from_path(cls, path: str | Path, schema_name: str | None = None) -> "Alert": """Creates an `Alert` object from the file at the specified `path`. Args: path (str or Path): The path to the file containing the alert data. schema_name (str, optional): The name of the schema to use for the alert. Defaults to None. Returns: Alert: An instance of the `Alert` class. Raises: FileNotFoundError: If the file at the specified `path` does not exist. IOError: If there is an error reading the file. """ with open(path, "rb") as f: bytes_ = f.read() return cls( msg=types_.PubsubMessageLike(data=bytes_), schema_name=schema_name, path=Path(path) )
# ---- properties ---- # @property def attributes(self) -> Mapping: """Return the alert's custom metadata. If this was not provided (typical case), this attribute will contain a copy of the incoming :attr:`Alert.msg.attributes`. You may update this dictionary as desired. If you publish this alert using :attr:`pittgoogle.Topic.publish`, this dictionary will be sent as the outgoing message's Pub/Sub attributes. """ if self._attributes is None: self._attributes = dict(self.msg.attributes) return self._attributes @property def dict(self) -> Mapping: """Alert data as a dictionary. If this was not provided (typical case), this attribute will contain the deserialized alert bytes from :attr:`Alert.msg.data`. You may update this dictionary as desired. If you publish this alert using :attr:`pittgoogle.Topic.publish`, this dictionary will be sent as the outgoing Pub/Sub message's data payload. Returns: dict: The alert data as a dictionary. Raises: SchemaError: If unable to deserialize the alert bytes. """ if self._dict is None: self._dict = self.schema.deserialize(self.msg.data) return self._dict @property def dataframe(self) -> "pd.DataFrame": """Return a pandas DataFrame containing the source detections.""" if self._dataframe is not None: return self._dataframe import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run # sources and previous sources are expected to have the same fields sources_df = pd.DataFrame([self.get("source")] + self.get("prv_sources")) # sources and forced sources may have different fields forced_df = pd.DataFrame(self.get("prv_forced_sources")) # use nullable integer data type to avoid converting ints to floats # for columns in one dataframe but not the other sources_ints = [c for c, v in sources_df.dtypes.items() if v == int] sources_df = sources_df.astype( {c: "Int64" for c in set(sources_ints) - set(forced_df.columns)} ) forced_ints = [c for c, v in forced_df.dtypes.items() if v == int] forced_df = forced_df.astype( {c: "Int64" for c in set(forced_ints) - set(sources_df.columns)} ) self._dataframe = pd.concat([sources_df, forced_df], ignore_index=True) return self._dataframe @property def alertid(self) -> str | int: """Return the alert ID. Convenience wrapper around :attr:`Alert.get`. If the survey does not define an alert ID, this returns the `sourceid`. """ return self.get("alertid", self.sourceid) @property def objectid(self) -> str | int: """Return the object ID. Convenience wrapper around :attr:`Alert.get`. The "object" represents a collection of sources, as determined by the survey. """ return self.get("objectid") @property def sourceid(self) -> str | int: """Return the source ID. Convenience wrapper around :attr:`Alert.get`. The "source" is the detection that triggered the alert. """ return self.get("sourceid") @property def schema(self) -> Schema: """Return the schema from the :class:`pittgoogle.registry.Schemas` registry. Raises: SchemaError: If the `schema_name` is not supplied or a schema with this name is not found. """ if self._schema is None: self._schema = registry.Schemas.get(self.schema_name) return self._schema # ---- methods ---- # def _add_id_attributes(self) -> None: """Add the IDs ("alertid", "objectid", "sourceid") to :attr:`Alert.attributes`.""" ids = ["alertid", "objectid", "sourceid"] values = [self.get(id) for id in ids] # get the survey-specific field names survey_names = [self.get_key(id) for id in ids] # if the field is nested, the key will be a list # but pubsub message attributes must be strings. join to avoid a future error on publish names = [".".join(id) if isinstance(id, list) else id for id in survey_names] # only add to attributes if the survey has defined this field for idname, idvalue in zip(names, values): if idname is not None: self.attributes[idname] = idvalue
[docs] def get(self, field: str, default: Any = None) -> Any: """Return the value of a field from the alert data. Parameters: field (str): Name of a field. This must be one of the generic field names used by Pitt-Google (keys in :attr:`Alert.schema.map`). To use a survey-specific field name instead, use :attr:`Alert.dict.get`. default (str, optional): The default value to be returned if the field is not found. Returns: any: The value in the :attr:`Alert.dict` corresponding to the field. """ survey_field = self.schema.map.get(field) # str, list[str], or None if survey_field is None: return default if isinstance(survey_field, str): return self.dict.get(survey_field, default) # if survey_field is not one of the expected types, the schema map is malformed # maybe this was intentional, but we don't know how to handle it here if not isinstance(survey_field, list): raise TypeError( f"field lookup not implemented for a schema-map value of type {type(survey_field)}" ) # the list must have more than 1 item, else it would be a single str if len(survey_field) == 2: try: return self.dict[survey_field[0]][survey_field[1]] except KeyError: return default if len(survey_field) == 3: try: return self.dict[survey_field[0]][survey_field[1]][survey_field[2]] except KeyError: return default raise NotImplementedError( f"field lookup not implemented for depth {len(survey_field)} (key = {survey_field})" )
[docs] def get_key( self, field: str, name_only: bool = False, default: str | None = None ) -> str | list[str] | None: """Return the survey-specific field name. Args: field (str): Generic field name whose survey-specific name is to be returned. This must be one of the keys in the dict `self.schema.map`. name_only (bool): In case the survey-specific field name is nested below the top level, whether to return just the single final name as a str (True) or the full path as a list[str] (False). default (str or None): Default value to be returned if the field is not found. Returns: str or list[str]): Survey-specific name for the `field`, or `default` if the field is not found. list[str] if this is a nested field and `name_only` is False, else str with the final field name only. """ survey_field = self.schema.map.get(field) # str, list[str], or None if survey_field is None: return default if name_only and isinstance(survey_field, list): return survey_field[-1] return survey_field