Source code for pittgoogle.schema

# -*- coding: UTF-8 -*-
"""Classes to manage alert schemas.

.. autosummary::

    Schema
    SchemaHelpers

----
"""
import importlib.resources
import io
import json
import logging
import re
from pathlib import Path

import attrs
import fastavro
import yaml

from . import exceptions, utils

LOGGER = logging.getLogger(__name__)
PACKAGE_DIR = importlib.resources.files(__package__)


[docs] @attrs.define(kw_only=True) class SchemaHelpers: """Class to organize helper functions. This class is not intended to be used directly, except by developers adding support for a new schema. For Developers: When a user requests a schema from the registry, the class method :meth:`Schema._from_yaml` is called. The method will pass ``schema_name``'s dict entry from the registry's `schemas.yml` file to one of these helper methods, which will then construct the :class:`Schema` object. If you are adding support for a new schema, you will need to point to the appropriate helper method for your schema using the ``helper`` field in the registry's `schemas.yml` file. If an appropriate method does not exist in this class, you will need to add one. ---- """
[docs] @staticmethod def default_schema_helper(schema_dict: dict) -> "Schema": """Resolve `schema.path`. If it points to a valid ".avsc" file, load it into `schema.avsc`.""" schema = _DefaultSchema(**schema_dict) # Resolve the path. If it is not None, this helper expects it to be the path to # a ".avsc" file relative to the pittgoogle package directory. schema.path = PACKAGE_DIR / schema.path if schema.path is not None else None # Load the avro schema, if the file exists. Fallback to None. invalid_path = ( (schema.path is None) or (schema.path.suffix != ".avsc") or (not schema.path.is_file()) ) if invalid_path: schema.definition = None else: schema.definition = fastavro.schema.load_schema(schema.path) return schema
@staticmethod def elasticc_schema_helper(schema_dict: dict) -> "Schema": schema = _SchemalessAvroSchema(**schema_dict) # Resolve the path and load the schema schema.path = PACKAGE_DIR / schema.path schema.definition = fastavro.schema.load_schema(schema.path) return schema
[docs] @staticmethod def lsst_schema_helper(schema_dict: dict) -> "Schema": """Load the Avro schema definition for lsst.v7_1.alert.""" # [FIXME] This is hack to get the latest schema version into pittgoogle-client # until we can get :meth:`SchemaHelpers.lsst_auto_schema_helper` working. if not schema_dict["name"] == "lsst.v7_1.alert": raise NotImplementedError("Only 'lsst.v7_1.alert' is supported for LSST.") schema = _ConfluentWireAvroSchema(**schema_dict) # Resolve the path and load the schema schema.path = PACKAGE_DIR / schema.path schema.definition = fastavro.schema.load_schema(schema.path) return schema
[docs] @staticmethod def lsst_auto_schema_helper(schema_dict: dict) -> "Schema": """Load the Avro schema definition using the ``lsst.alert.packet`` package. Raises: SchemaError: If an LSST schema called ``schema.name`` cannot be loaded. An error is raised because the LSST alert bytes are schemaless, so ``schema.definition`` will be required in order to deserialize the alert. """ import lsst.alert.packet.schema schema = _ConfluentWireAvroSchema(**schema_dict) version_msg = f"For valid versions, see {schema.origin}." # Parse major and minor versions out of schema.name. Expecting syntax "lsst.v<MAJOR>_<MINOR>.alert". try: major, minor = map(int, re.findall(r"\d+", schema.name)) except ValueError as excep: msg = ( f"Unable to identify major and minor version. Please use the syntax " "'lsst.v<MAJOR>_<MINOR>.alert', replacing '<MAJOR>' and '<MINOR>' with integers. " f"{version_msg}" ) raise exceptions.SchemaError(msg) from excep schema_dir = Path(lsst.alert.packet.schema.get_schema_path(major, minor)) schema.path = schema_dir / f"{schema.name}.avsc" try: schema.definition = lsst.alert.packet.schema.Schema.from_file(schema.path).definition except fastavro.repository.SchemaRepositoryError as excep: msg = f"Unable to load the schema. {version_msg}" raise exceptions.SchemaError(msg) from excep return schema
[docs] @attrs.define(kw_only=True) class Schema: """Class for an individual schema. Do not call this class's constructor directly. Instead, load a schema using the registry :class:`pittgoogle.registry.Schemas`. ---- """ # String _under_ field definition will cause field to appear as a property in rendered docs. name: str = attrs.field() """Name of the schema.""" description: str = attrs.field() """A description of the schema.""" origin: str = attrs.field() """Pointer to the schema's origin. Typically this is a URL to a repo maintained by the survey.""" definition: dict | None = attrs.field(default=None) """The schema definition used to serialize and deserialize the alert bytes, if one is required.""" _helper: str = attrs.field(default="default_schema_helper") """Name of the method in :class:`SchemaHelpers` used to load this schema.""" path: Path | None = attrs.field(default=None) """Path where the helper can find the schema, if needed.""" filter_map: dict = attrs.field(factory=dict) """Mapping of the filter name as stored in the alert (often an int) to the common name (often a string).""" # The rest don't need string descriptions because we will define them as explicit properties. _survey: str | None = attrs.field(default=None) # _map is important, but don't accept it as an init arg. We'll load it from a yaml file later. _map: dict | None = attrs.field(default=None, init=False) @classmethod def _from_yaml(cls, schema_dict: dict, **schema_dict_replacements) -> "Schema": """Create a :class:`Schema` object from an entry in the registry's `schemas.yml` file. This method calls a helper method in :class:`SchemaHelpers` to finish the initialization. Args: schema_dict (dict): A dictionary containing the schema information. **schema_dict_replacements: Additional keyword arguments that will override entries in ``schema_dict``. Returns: Schema: The created `Schema` object. """ # Combine the args and kwargs then let the helper finish up the initialization. my_schema_dict = schema_dict.copy() my_schema_dict.update(schema_dict_replacements) helper = getattr(SchemaHelpers, my_schema_dict["helper"]) return helper(my_schema_dict) @property def survey(self) -> str: """Name of the survey. This is usually the first part of the schema's name.""" if self._survey is None: self._survey = self.name.split(".")[0] return self._survey @property def map(self) -> dict: """Mapping of Pitt-Google's generic field names to survey-specific field names.""" if self._map is None: yml = PACKAGE_DIR / f"schemas/maps/{self.survey}.yml" try: self._map = yaml.safe_load(yml.read_text()) except FileNotFoundError: raise ValueError(f"no schema map found for schema name '{self.name}'") return self._map
class _DefaultSchema(Schema): """Default schema to serialize and deserialize alert bytes.""" def serialize(self, alert_dict: dict) -> bytes: """Serialize `alert_dict` using the JSON format. Args: alert_dict (dict): The dictionary to be serialized. Returns: bytes: The serialized data in bytes. """ return json.dumps(alert_dict).encode("utf-8") def deserialize(self, alert_bytes: bytes) -> dict: """Deserialize `alert_bytes`. Args: alert_bytes (bytes): The bytes to be deserialized. This is expected to be serialized as either Avro with the schema attached in the header or JSON. Returns: A dictionary representing the deserialized ``alert_bytes``. Raises: SchemaError: If the deserialization fails after trying both JSON and Avro. """ # [FIXME] This should be redesigned. # For now, just try avro then json, catching basically all errors in the process. try: return utils.Cast.avro_to_dict(alert_bytes) except Exception: try: return utils.Cast.json_to_dict(alert_bytes) except Exception as excep: raise exceptions.SchemaError("Failed to deserialize the alert bytes") from excep class _SchemalessAvroSchema(Schema): """Schema to serialize and deserialize alert bytes in the schemaless Avro format.""" def serialize(self, alert_dict: dict) -> bytes: """Serialize `alert_dict` using the schemaless Avro format.""" fout = io.BytesIO() fastavro.schemaless_writer(fout, self.definition, alert_dict) fout.seek(0) message = fout.getvalue() return message def deserialize(self, alert_bytes: bytes) -> dict: bytes_io = io.BytesIO(alert_bytes) return fastavro.schemaless_reader(bytes_io, self.definition) # [FIXME] class _ConfluentWireAvroSchema(Schema): """Schema to serialize and deserialize alert bytes in the Avro Confluent Wire Format. https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format """ def serialize(self, alert_dict: dict) -> bytes: # [TODO] raise NotImplementedError("Confluent Wire Format not yet supported.") def deserialize(self, alert_bytes: bytes) -> dict: bytes_io = io.BytesIO(alert_bytes[5:]) return fastavro.schemaless_reader(bytes_io, self.definition)