# -*- coding: UTF-8 -*-
"""Classes for working with astronomical alerts.
.. autosummary::
Alert
----
"""
import base64
import datetime
import io
import logging
import random
from pathlib import Path
from typing import TYPE_CHECKING, Any, Mapping, Union
import attrs
import google.cloud.pubsub_v1
from . import exceptions, registry, types_
# so 'schema' module doesn't clobber 'Alert.schema' attribute
from .schema import Schema
if TYPE_CHECKING:
import astropy.table
import google.cloud.functions_v1
import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run
LOGGER = logging.getLogger(__name__)
[docs]
@attrs.define(kw_only=True)
class Alert:
"""Container for an astronomical alert.
To create an `Alert`, use one of the `from_*` methods like :meth:`pittgoogle.Alert.from_dict`.
Instances of this class are also returned by other calls like :meth:`pittgoogle.pubsub.Subscription.pull_batch`.
Args:
dict (dict, optional):
The alert data as a dictionary. If not provided, it will be loaded from the
attributes (dict, optional):
Attributes or custom metadata for the alert.
schema_name (str):
Name of the schema for the alert. This is use to deserialize the alert bytes.
See :meth:`pittgoogle.registry.Schemas.names` for a list of options.
If not provided, some properties of the `Alert` may not be available.
msg (PubsubMessageLike or google.cloud.pubsub_v1.types.PubsubMessage, optional):
The incoming Pub/Sub message object. This class is documented at
`<https://cloud.google.com/python/docs/reference/pubsub/latest/google.cloud.pubsub_v1.types.PubsubMessage>`__.
path (pathlib.Path, optional):
Path to a file containing the alert data.
----
"""
_dict: Mapping | None = attrs.field(default=None)
_attributes: Mapping[str, str] | None = attrs.field(default=None)
schema_name: str | None = attrs.field(default=None)
msg: google.cloud.pubsub_v1.types.PubsubMessage | types_.PubsubMessageLike | None = (
attrs.field(default=None)
)
path: Path | None = attrs.field(default=None)
# Use "Union" because " | " is throwing an error when combined with forward references.
context: Union[
"google.cloud.functions_v1.context.Context", types_._FunctionsContextLike, None
] = attrs.field(default=None)
_dataframe: Union["pd.DataFrame", None] = attrs.field(default=None)
_skymap: Union["astropy.table.Qtable", None] = attrs.field(default=None)
_schema: Schema | None = attrs.field(default=None, init=False)
_healpix9: int | None = attrs.field(default=None, init=False)
_healpix19: int | None = attrs.field(default=None, init=False)
_healpix29: int | None = attrs.field(default=None, init=False)
# ---- class methods ---- #
[docs]
@classmethod
def from_cloud_functions(
cls,
event: Mapping,
context: "google.cloud.functions_v1.context.Context",
schema_name: str | None = None,
):
"""Create an `Alert` from an 'event' and 'context', as received by a Cloud Functions module.
Argument definitions copied from https://cloud.google.com/functions/1stgendocs/tutorials/pubsub-1st-gen
Args:
event (dict):
The dictionary with data specific to this type of event. The `@type` field maps to
`type.googleapis.com/google.pubsub.v1.PubsubMessage`. The `data` field maps to the
PubsubMessage data in a base64-encoded string. The `attributes` field maps to the
PubsubMessage attributes if any is present.
context (google.cloud.functions.Context):
Metadata of triggering event including `event_id` which maps to the PubsubMessage
messageId, `timestamp` which maps to the PubsubMessage publishTime, `event_type` which
maps to `google.pubsub.topic.publish`, and `resource` which is a dictionary that
describes the service API endpoint pubsub.googleapis.com, the triggering topic's name,
and the triggering event type `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
"""
alert = cls(
msg=types_.PubsubMessageLike(
# data is required. the rest should be present in the message, but use get to be lenient
data=base64.b64decode(event["data"]),
attributes=event.get("attributes", {}),
message_id=context.event_id,
publish_time=cls._str_to_datetime(context.timestamp),
),
context=context,
schema_name=schema_name,
)
return alert
[docs]
@classmethod
def from_cloud_run(cls, envelope: Mapping, schema_name: str | None = None) -> "Alert":
"""Create an `Alert` from an HTTP request envelope containing a Pub/Sub message, as received by a Cloud Run module.
Args:
envelope (dict):
The HTTP request envelope containing the Pub/Sub message.
schema_name (str, optional):
The name of the schema to use. Defaults to None.
Returns:
Alert:
An instance of the `Alert` class.
Raises:
BadRequest:
If the Pub/Sub message is invalid or missing.
Example:
Code for a Cloud Run module that uses this method to open a ZTF alert:
.. code-block:: python
import pittgoogle
# flask is used to work with HTTP requests, which trigger Cloud Run modules
# the request contains the Pub/Sub message, which contains the alert packet
import flask
app = flask.Flask(__name__)
# function that receives the request
@app.route("/", methods=["POST"])
def index():
try:
# unpack the alert
# if the request does not contain a valid message, this raises a `BadRequest`
alert = pittgoogle.Alert.from_cloud_run(envelope=flask.request.get_json(), schema_name="ztf")
except pittgoogle.exceptions.BadRequest as exc:
# return the error text and an HTTP 400 Bad Request code
return str(exc), 400
# continue processing the alert
# when finished, return an empty string and an HTTP success code
return "", 204
"""
# check whether received message is valid, as suggested by Cloud Run docs
if not envelope:
raise exceptions.BadRequest("Bad Request: no Pub/Sub message received")
if not isinstance(envelope, dict) or "message" not in envelope:
raise exceptions.BadRequest("Bad Request: invalid Pub/Sub message format")
alert = cls(
msg=types_.PubsubMessageLike(
# data is required. the rest should be present in the message, but use get to be lenient
data=base64.b64decode(envelope["message"]["data"].encode("utf-8")),
attributes=envelope["message"].get("attributes", {}),
message_id=envelope["message"].get("message_id"),
publish_time=cls._str_to_datetime(envelope["message"]["publish_time"]),
ordering_key=envelope["message"].get("ordering_key"),
),
schema_name=schema_name,
)
return alert
[docs]
@classmethod
def from_dict(
cls,
payload: Mapping,
attributes: Mapping[str, str] | None = None,
schema_name: str | None = None,
) -> "Alert":
"""Create an `Alert` object from the given `payload` dictionary.
Args:
payload (dict):
The dictionary containing the data for the `Alert` object.
attributes (Mapping[str, str], None):
Additional attributes for the `Alert` object. Defaults to None.
schema_name (str, None):
The name of the schema. Defaults to None.
Returns:
Alert:
An instance of the `Alert` class.
"""
return cls(dict=payload, attributes=attributes, schema_name=schema_name)
[docs]
@classmethod
def from_msg(
cls, msg: "google.cloud.pubsub_v1.types.PubsubMessage", schema_name: str | None = None
) -> "Alert":
"""Create an `Alert` object from a `google.cloud.pubsub_v1.types.PubsubMessage`.
Args:
msg (google.cloud.pubsub_v1.types.PubsubMessage):
The PubsubMessage object to create the Alert from.
schema_name (str, optional):
The name of the schema to use for the Alert. Defaults to None.
Returns:
Alert:
The created `Alert` object.
"""
alert = cls(msg=msg, schema_name=schema_name)
return alert
[docs]
@classmethod
def from_path(cls, path: str | Path, schema_name: str | None = None) -> "Alert":
"""Create an `Alert` object from the file at the specified `path`.
Args:
path (str or Path):
The path to the file containing the alert data.
schema_name (str, optional):
The name of the schema to use for the alert. Defaults to None.
Returns:
Alert:
An instance of the `Alert` class.
Raises:
FileNotFoundError:
If the file at the specified `path` does not exist.
IOError:
If there is an error reading the file.
"""
bytes_ = Path(path).read_bytes()
alert = cls(
msg=types_.PubsubMessageLike(data=bytes_), schema_name=schema_name, path=Path(path)
)
return alert
def to_mock_input(self, cloud_functions: bool = False):
if not cloud_functions:
raise NotImplementedError("Only cloud functions has been implemented.")
return MockInput(alert=self).to_cloud_functions()
# ---- properties ---- #
@property
def attributes(self) -> Mapping:
"""Return the alert's custom metadata.
If this was not provided (typical case), this attribute will contain a copy of
the incoming :attr:`Alert.msg.attributes`.
Alert IDs and schema version will be added if not already present.
You may update this dictionary as desired. If you publish this alert using
:attr:`pittgoogle.Topic.publish`, this dictionary will be sent as the outgoing
message's Pub/Sub attributes.
"""
if self._attributes is None:
if self.msg is not None:
self._attributes = dict(self.msg.attributes)
else:
self._attributes = {}
self._add_attributes()
return self._attributes
@property
def dict(self) -> Mapping:
"""Alert data as a dictionary.
If this was not provided (typical case), this attribute will contain the deserialized
alert bytes from :attr:`Alert.msg.data`.
You may update this dictionary as desired. If you publish this alert using
:attr:`pittgoogle.Topic.publish`, this dictionary will be sent as the outgoing
Pub/Sub message's data payload.
Returns:
dict:
The alert data as a dictionary.
Raises:
SchemaError:
If unable to deserialize the alert bytes.
"""
if self._dict is None:
self._dict = self.schema.deserialize(self.msg.data)
return self._dict
@property
def dataframe(self) -> "pd.DataFrame":
"""Return a pandas DataFrame containing the source detections."""
if self._dataframe is not None:
return self._dataframe
import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run
# sources and previous sources are expected to have the same fields
sources_df = pd.DataFrame([self.get("source")] + (self.get("prv_sources") or []))
# sources and forced sources may have different fields
forced_df = pd.DataFrame(self.get("prv_forced_sources") or [])
# use nullable integer data type to avoid converting ints to floats
# for columns in one dataframe but not the other
sources_ints = [c for c, v in sources_df.dtypes.items() if v == int]
sources_df = sources_df.astype(
{c: "Int64" for c in set(sources_ints) - set(forced_df.columns)}
)
forced_ints = [c for c, v in forced_df.dtypes.items() if v == int]
forced_df = forced_df.astype(
{c: "Int64" for c in set(forced_ints) - set(sources_df.columns)}
)
self._dataframe = pd.concat([sources_df, forced_df], ignore_index=True)
return self._dataframe
@property
def alertid(self) -> str | int:
"""Return the alert ID. Convenience wrapper around :attr:`Alert.get`.
If the survey does not define an alert ID, this returns the `sourceid`.
"""
return self.get("alertid", self.sourceid)
@property
def objectid(self) -> str | int:
"""Return the object ID. Convenience wrapper around :attr:`Alert.get`.
The "object" represents a collection of sources, as determined by the survey.
"""
return self.get("objectid")
@property
def sourceid(self) -> str | int:
"""Return the source ID. Convenience wrapper around :attr:`Alert.get`.
The "source" is the detection that triggered the alert.
"""
return self.get("sourceid")
@property
def ra(self) -> float:
"""Return the source's right ascension. Convenience wrapper around :attr:`Alert.get`.
The "source" is the detection that triggered the alert.
"""
return self.get("ra")
@property
def dec(self) -> float:
"""Return the source's declination. Convenience wrapper around :attr:`Alert.get`.
The "source" is the detection that triggered the alert.
"""
return self.get("dec")
@property
def healpix29(self) -> int:
"""Return the HEALPix order 29 pixel index at the source's right ascension (RA) and declination.
Uses the nested numbering scheme for pixel indexes. Assumes RA and dec are in degrees.
This can be useful for spatial searches and cross matches because it collapses two floats
(RA and dec) into one integer (pixel index), which can be much easier to work with. There
is some loss of precision but it will be insignificant for most use cases --
the pixel resolution (square root of area) at order 29 is ~4e-4 arcseconds.
This resolution may even be higher than preferred for many use cases because it can result
in a very large set of pixels that are needed to cover the area of interest.
In that case, try :meth:`healpix19` or :meth:`healpix9`.
Example:
Check whether this alert is within 5 arcsec of the eclipsing cataclysmic variable EX Draconis.
We recommend `hpgeom <https://hpgeom.readthedocs.io/` for working with HEALPix.
.. code-block:: python
import hpgeom
ex_dra_coords = (271.05995, 67.90355) # deg
radius = 5 / 3600 # deg
nside29 = hpgeom.order_to_nside(29)
# Find the set of HEALPix order 29 pixels that cover a 5" cone centered on the target.
# The length of this list is 508,185,237.
ex_dra_cone = hpgeom.query_circle(nside29, *ex_dra_coords, radius, inclusive=True, fact=1)
# Check whether this alert is within 5" of the target.
alert.healpix29 in ex_dra_cone
"""
if self._healpix29 is None:
import hpgeom
self._healpix29 = hpgeom.angle_to_pixel(
a=self.ra,
b=self.dec,
nside=hpgeom.order_to_nside(29),
nest=True,
lonlat=True,
degrees=True,
)
return self._healpix29
@property
def healpix19(self) -> int:
"""Return the HEALPix order 19 pixel index at the source's right ascension (RA) and declination.
See :meth:`healpix29` for a more detailed explanation and an example of how HEALPix indexes
can be used. The difference here is that order 19 means the pixels are larger, with
a resolution (square root of area) of ~0.4 arcseconds.
If this resolution is still too fine for your use case, try :meth:`healpix9`.
If it is too coarse, try :meth:`healpix29`.
The following list of pixels covers at least the same area of sky as the one in the healpix29
example (and likely more), but the total number of pixels is reduced by a factor of ~10^6
down to 549.
.. code-block:: python
# See the healpix29 docstring for a complete example. The radius is 5" and
# nside19 is analogous to nside29. The length of this list is 549.
ex_dra_cone = hpgeom.query_circle(nside19, *ex_dra_coords, radius, inclusive=True)
"""
if self._healpix19 is None:
import hpgeom
self._healpix19 = hpgeom.angle_to_pixel(
a=self.ra,
b=self.dec,
nside=hpgeom.order_to_nside(19),
nest=True,
lonlat=True,
degrees=True,
)
return self._healpix19
@property
def healpix9(self) -> int:
"""Return the HEALPix order 9 pixel index at the source's right ascension (RA) and declination.
See :meth:`healpix29` for a more detailed explanation and an example of how HEALPix indexes
can be used. The difference here is that order 9 means the pixels are much larger, with
a resolution (square root of area) of ~400 arcseconds or ~0.1 degrees.
The following list of pixels covers the same area of sky (and more) as the one in the
healpix29 example, but the total number of pixels is reduced by a factor of ~10^9
down to a single pixel.
.. code-block:: python
# See the healpix29 docstring for a complete example. The radius is 5" and
# nside9 is analogous to nside29. The length of this list is 1.
ex_dra_cone = hpgeom.query_circle(nside9, *ex_dra_coords, radius, inclusive=True)
If this resolution is too coarse for your use case, try :meth:`healpix19` or :meth:`healpix29`.
"""
if self._healpix9 is None:
import hpgeom
self._healpix9 = hpgeom.angle_to_pixel(
a=self.ra,
b=self.dec,
nside=hpgeom.order_to_nside(9),
nest=True,
lonlat=True,
degrees=True,
)
return self._healpix9
@property
def schema(self) -> Schema:
"""Return the schema from the :class:`pittgoogle.registry.Schemas` registry.
Raises:
SchemaError:
If the `schema_name` is not supplied or a schema with this name is not found.
"""
if self._schema is None:
alert_bytes = self.msg.data if self.msg else None
self._schema = registry.Schemas.get(self.schema_name, alert_bytes=alert_bytes)
return self._schema
@property
def skymap(self) -> Union["astropy.table.QTable", None]:
"""Alert skymap as an astropy Table. Currently implemented for LVK schemas only.
This skymap is loaded from the alert to an astropy table and extra columns are added, following
https://emfollow.docs.ligo.org/userguide/tutorial/multiorder_skymaps.html.
The table is sorted by PROBDENSITY and then UNIQ, in descending order, so that the most likely
location is first. Columns:
- UNIQ: HEALPix pixel index in the NUNIQ indexing scheme.
- PROBDENSITY: Probability density in the pixel (per steradian).
- nside: HEALPix nside parameter defining the pixel resolution.
- ipix: HEALPix pixel index at resolution nside.
- ra: Right ascension of the pixel center (radians).
- dec: Declination of the pixel center (radians).
- pixel_area: Area of the pixel (steradians).
- prob: Probability density in the pixel.
- cumprob: Cumulative probability density up to the pixel.
Examples:
.. code-block:: python
# most likely location
alert.skymap[0]
# 90% credible region
alert.skymap[:alert.skymap['cumprob'].searchsorted(0.9)]
"""
if self._skymap is None and self.schema_name.startswith("lvk"):
import astropy.table
import astropy.units
import hpgeom
import numpy as np
if self.get("skymap") is None:
return
skymap = astropy.table.QTable.read(io.BytesIO(base64.b64decode(self.get("skymap"))))
skymap.sort(["PROBDENSITY", "UNIQ"], reverse=True)
skymap["nside"] = (2 ** (np.log2(skymap["UNIQ"] // 4) // 2)).astype(int)
skymap["ipix"] = skymap["UNIQ"] - 4 * skymap["nside"] ** 2
skymap["ra"], skymap["dec"] = hpgeom.pixel_to_angle(
skymap["nside"], skymap["ipix"], degrees=False
)
skymap["ra"].unit = astropy.units.rad
skymap["dec"].unit = astropy.units.rad
skymap["pixel_area"] = hpgeom.nside_to_pixel_area(skymap["nside"], degrees=False)
skymap["pixel_area"].unit = astropy.units.sr
skymap["prob"] = skymap["pixel_area"] * skymap["PROBDENSITY"]
skymap["cumprob"] = skymap["prob"].cumsum()
self._skymap = skymap
return self._skymap
@property
def name_in_bucket(self) -> str:
"""Name of the alert object (file) in Google Cloud Storage."""
return self.schema._name_in_bucket(alert=self)
# ---- methods ---- #
def _add_attributes(self) -> None:
"""Add IDs, indexes, and other properties to :attr:`Alert.attributes`.
The added keys include:
- alertid (if defined by the survey)
- objectid (if defined by the survey)
- sourceid (if defined by the survey)
- ssobjectid (if defined by the survey)
- healpix9
- healpix19
- healpix29
- schema.version
- n_previous_detections
"""
# Get the data IDs and corresponding survey-specific field names. If the field is nested, the
# key will be a list. Join list -> string since these are likely to become Pub/Sub message attributes.
ids = ["alertid", "objectid", "sourceid", "ssobjectid"]
_names = [self.get_key(id) for id in ids]
names = ["_".join(id) if isinstance(id, list) else id for id in _names]
values = [self.get(id) for id in ids]
attributes = dict(zip(names, values))
# Add derived properties.
attributes["healpix9"] = self.healpix9
attributes["healpix19"] = self.healpix19
attributes["healpix29"] = self.healpix29
# Add metadata.
attributes["schema_version"] = self.schema.version
attributes["n_previous_detections"] = len(self.get("prv_sources") or [])
# Add the collected attributes to self, but only if not None and don't clobber existing.
for name, value in attributes.items():
if name is not None and name not in self._attributes:
self._attributes[name] = value
[docs]
def get(self, field: str, default: Any = None) -> Any:
"""Return the value of a field from the alert data.
Parameters:
field (str):
Name of a field. This must be one of the generic field names used by Pitt-Google
(keys in :attr:`Alert.schema.map`). To use a survey-specific field name instead, use
:attr:`Alert.dict.get`.
default (str, optional):
The default value to be returned if the field is not found.
Returns:
any:
The value in the :attr:`Alert.dict` corresponding to the field.
"""
survey_field = self.schema.map.get(field) # str, list[str], or None
if survey_field is None:
return default
if isinstance(survey_field, str):
return self.dict.get(survey_field, default)
# if survey_field is not one of the expected types, the schema map is malformed
# maybe this was intentional, but we don't know how to handle it here
if not isinstance(survey_field, list):
raise TypeError(
f"field lookup not implemented for a schema-map value of type {type(survey_field)}"
)
# the list must have more than 1 item, else it would be a single str
if len(survey_field) == 2:
try:
return self.dict[survey_field[0]][survey_field[1]]
except (KeyError, TypeError):
return default
if len(survey_field) == 3:
try:
return self.dict[survey_field[0]][survey_field[1]][survey_field[2]]
except (KeyError, TypeError):
return default
raise NotImplementedError(
f"field lookup not implemented for depth {len(survey_field)} (key = {survey_field})"
)
[docs]
def get_key(
self, field: str, name_only: bool = False, default: str | None = None
) -> str | list[str] | None:
"""Return the survey-specific field name.
Args:
field (str):
Generic field name whose survey-specific name is to be returned. This must be one of the
keys in the dict `self.schema.map`.
name_only (bool):
In case the survey-specific field name is nested below the top level, whether to return
just the single final name as a str (True) or the full path as a list[str] (False).
default (str or None):
Default value to be returned if the field is not found.
Returns:
str or list[str]):
Survey-specific name for the `field`, or `default` if the field is not found.
list[str] if this is a nested field and `name_only` is False, else str with the
final field name only.
"""
survey_field = self.schema.map.get(field) # str, list[str], or None
if survey_field is None:
return default
if name_only and isinstance(survey_field, list):
return survey_field[-1]
return survey_field
[docs]
def drop_cutouts(self) -> dict:
"""Drop the cutouts from the alert dictionary.
Returns:
dict:
The `dict` with the cutouts (postage stamps) removed.
"""
cutouts = [
self.get_key(key) for key in ["cutout_difference", "cutout_science", "cutout_template"]
]
alert_stripped = {k: v for k, v in self.dict.items() if k not in cutouts}
return alert_stripped
@staticmethod
def _str_to_datetime(str_time: str) -> datetime.datetime:
# occasionally the string doesn't include microseconds so we need a try/except
try:
return datetime.datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%S.%f%z")
except ValueError:
return datetime.datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%S%z")
@attrs.define
class MockInput:
alert: Alert = attrs.field()
def to_cloud_functions(self) -> tuple[dict, types_._FunctionsContextLike]:
"""
Parameter definitions copied from https://cloud.google.com/functions/1stgendocs/tutorials/pubsub-1st-gen
Returns:
event (dict):
The dictionary with data specific to this type of event. The `@type` field maps to
`type.googleapis.com/google.pubsub.v1.PubsubMessage`. The `data` field maps to the
PubsubMessage data in a base64-encoded string. The `attributes` field maps to the
PubsubMessage attributes if any is present.
context (google.cloud.functions.Context):
Metadata of triggering event including `event_id` which maps to the PubsubMessage
messageId, `timestamp` which maps to the PubsubMessage publishTime, `event_type` which
maps to `google.pubsub.topic.publish`, and `resource` which is a dictionary that
describes the service API endpoint pubsub.googleapis.com, the triggering topic's name,
and the triggering event type `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
"""
message = self.alert.schema.serialize(self.alert.dict)
# Pub/Sub requires attribute keys and values to be strings. Sort by key while we're at it.
attributes = {
str(key): str(self.alert.attributes[key]) for key in sorted(self.alert.attributes)
}
# message, attributes = self.alert._prep_for_publish()
event_type = "type.googleapis.com/google.pubsub.v1.PubsubMessage"
now = (
datetime.datetime.now(datetime.timezone.utc)
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z")
)
event = {"@type": event_type, "data": base64.b64encode(message), "attributes": attributes}
context = types_._FunctionsContextLike(
event_id=str(int(1e12 * random.random())),
timestamp=now,
event_type="google.pubsub.topic.publish",
resource={
"name": "projects/NONE/topics/NONE",
"service": "pubsub.googleapis.com",
"type": event_type,
},
)
return event, context