Source code for pittgoogle.schema

# -*- coding: UTF-8 -*-
"""Classes to manage alert schemas.

.. autosummary::

    Serializers
    Schema
    DefaultSchema
    ElasticcSchema
    LsstSchema
    LvkSchema
    ZtfSchema
"""

import abc
import base64
import io
import datetime
import json
import logging
import struct
import types
import math
from pathlib import Path
from typing import TYPE_CHECKING, Literal

import attrs
import fastavro
import numpy as np
import yaml

from . import __package_path__, exceptions

if TYPE_CHECKING:
    import google.cloud.pubsub_v1

    from . import Alert, types_

LOGGER = logging.getLogger(__name__)


# --------- Serializers --------- #
@attrs.define
class Serializers:
    @staticmethod
    def serialize_json(alert_dict: dict) -> bytes:
        """Serialize `alert_dict` using the JSON format.

        Args:
            alert_dict (dict):
                The dictionary to be serialized.

        Returns:
            bytes:
                The serialized data in bytes.
        """
        return json.dumps(Serializers._clean_for_json(alert_dict)).encode("utf-8")

    @staticmethod
    def deserialize_json(alert_bytes: bytes) -> dict:
        """Deserialize `alert_bytes` using the JSON format.

        Args:
            alert_bytes (bytes):
                The bytes to be deserialized. This is expected to be serialized as JSON.

        Returns:
            dict:
                The deserialized data in a dictionary.
        """
        return json.loads(alert_bytes)

    @staticmethod
    def serialize_avro(alert_dict: dict, *, schema_definition: dict) -> bytes:
        """Serialize `alert_dict` using the Avro format.

        Args:
            alert_dict (dict):
                The dictionary to be serialized.
        schema_definition (dict):
                The Avro schema definition to use for serialization.

        Returns:
            bytes:
                The serialized data in bytes.
        """
        bytes_io = io.BytesIO()
        fastavro.writer(bytes_io, schema_definition, [alert_dict])
        return bytes_io.getvalue()

    @staticmethod
    def deserialize_avro(alert_bytes: bytes) -> dict:
        """Deserialize `alert_bytes` using the Avro format.

        Args:
            alert_bytes (bytes):
                The bytes to be deserialized. This is expected to be serialized as Avro with the
                schema attached in the header.

        Returns:
            dict:
                The deserialized data in a dictionary.
        """
        with io.BytesIO(alert_bytes) as fin:
            alert_dicts = list(fastavro.reader(fin))  # list with single dict
        if len(alert_dicts) != 1:
            LOGGER.warning(f"Expected 1 Avro record. Found {len(alert_dicts)}.")
        return alert_dicts[0]

    @staticmethod
    def serialize_schemaless_avro(alert_dict: dict, *, schema_definition: dict) -> bytes:
        """Serialize `alert_dict` using the schemaless Avro format.

        Args:
            alert_dict (dict):
                The dictionary to be serialized. The schema is expected to match `schema_definition`.
            schema_definition (dict):
                The Avro schema definition to use for serialization.

        Returns:
            bytes:
                The serialized data in bytes.
        """
        fout = io.BytesIO()
        fastavro.schemaless_writer(fout, schema_definition, alert_dict)
        return fout.getvalue()

    @staticmethod
    def deserialize_schemaless_avro(alert_bytes: bytes, *, schema_definition: dict) -> dict:
        """Deserialize `alert_bytes` using the schemaless Avro format.

        Args:
            alert_bytes (bytes):
                The bytes to be deserialized. This is expected to be serialized as Avro without the
                schema attached in the header. The schema is expected to match `schema_definition`.
            schema_definition (dict):
                The Avro schema definition to use for deserialization.

        Returns:
            dict:
                The deserialized data in a dictionary.
        """
        bytes_io = io.BytesIO(alert_bytes)
        return fastavro.schemaless_reader(bytes_io, schema_definition)

    @staticmethod
    def serialize_confluent_wire_avro(
        alert_dict: dict, *, schema_definition: dict, schema_id: int
    ) -> bytes:
        """Serialize `alert_dict` using the Avro Confluent Wire Format.

        https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

        Args:
            alert_dict (dict):
                The dictionary to be serialized. The schema is expected to match `schema_definition`.
            schema_definition (dict):
                The Avro schema definition to use for serialization.
            version_id (int):
                The version ID of the schema. This is a 4-byte integer in big-endian format.
                It will be attached to the header of the serialized data.

        Returns:
            bytes:
                The serialized data in bytes.
        """
        # https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
        fout = io.BytesIO()
        fout.write(b"\x00")
        fout.write(struct.pack(">i", schema_id))
        fastavro.schemaless_writer(fout, schema_definition, alert_dict)
        return fout.getvalue()

    @staticmethod
    def deserialize_confluent_wire_avro(alert_bytes: bytes, *, schema_definition: dict) -> dict:
        """Deserialize `alert_bytes` using the Avro Confluent Wire Format.

        https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

        Args:
            alert_bytes (bytes):
                The bytes to be deserialized. This is expected to be serialized in Avro Confluent
                Wire Format. The schema is expected to match `schema_definition`.
            schema_definition (dict):
                The Avro schema definition to use for deserialization.

        Returns:
            dict:
                The deserialized data in a dictionary.
        """
        # https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
        bytes_io = io.BytesIO(alert_bytes[5:])
        return fastavro.schemaless_reader(bytes_io, schema_definition)

    @staticmethod
    def _clean_for_json(
        value: str | int | float | bytes | list | dict | None,
    ) -> str | int | float | list | dict | None:
        """Coerce `value` to a format suitable for json serialization.

        Args:
            value (str, int, float, bytes, list, dict, or None):
                The value to be cleaned. str, int, float (except NaN), and None types are
                returned unchanged. `None` is returned if `value` is consistent with NaN.
                bytes types are returned as base64-encoded strings. If list or dict, this
                method will be called recursively on each element/item.

        Returns:
            str, int, float, list, dict, or None:
                `value` with NaN replaced by None. Replacement is recursive if `value` is a list or dict.

        Raises:
            TypeError:
                If `value` is not a str, int, float, bytes, list, or dict.
        """
        # Return value suitable for json serialization.
        if isinstance(value, (str, int, types.NoneType)):
            return value
        if isinstance(value, float):
            if math.isnan(value) or math.isinf(value):
                return None
            return value
        if isinstance(value, bytes):
            return base64.b64encode(value).decode("utf-8")
        if isinstance(value, datetime.datetime):
            return value.timestamp()

        # Recurse.
        if isinstance(value, list):
            return [Serializers._clean_for_json(v) for v in value]
        if isinstance(value, dict):
            return {k: Serializers._clean_for_json(v) for k, v in value.items()}

        # That's all we know how to deal with right now.
        raise TypeError(f"Unrecognized type '{type(value)}' ({value})")


# --------- Default Schema Definitions --------- #
[docs] @attrs.define(kw_only=True) class Schema(abc.ABC): """Class for an individual schema. Do not call this class's constructor directly. Instead, load a schema using the registry :class:`pittgoogle.registry.Schemas`. """ # String _under_ field definition will cause field to appear as a property in rendered docs. name: str = attrs.field() """Name of the schema. This is typically the name of the survey as well.""" description: str = attrs.field() """A description of the schema.""" origin: str = attrs.field() """Pointer to the schema's origin. Typically this is a URL to a repo maintained by the survey.""" serializer: Literal["json", "avro"] = attrs.field() """Whether to serialize the dict to JSON or Avro when, e.g., publishing a Pub/Sub message.""" deserializer: Literal["json", "avro"] = attrs.field() """Whether to use a JSON or Avro to deserialize when decoding alert_bytes -> alert_dict.""" version: str | None = attrs.field(default=None) """Version of the schema, or None.""" version_id: str | None = attrs.field(default=None) """Version ID of the schema, or None. Currently only used for class:`_ConfluentWireAvroSchema`.""" definition: dict | None = attrs.field(default=None) """The schema definition used to serialize and deserialize the alert bytes, if one is required.""" path: Path | None = attrs.field(default=None) """Path to a file containing the schema definition.""" filter_map: dict = attrs.field(factory=dict) """Mapping of the filter name as stored in the alert (often an int) to the common name (often a string).""" # The rest don't need string descriptions because we will define them as explicit properties. _map: dict | None = attrs.field(default=None, init=False) @classmethod @abc.abstractmethod def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None): """Create a :class:`Schema` object. This class method must be implemented by subclasses. Args: yaml_dict (dict): A dictionary containing the schema information. This should be one entry from the registry's 'schemas.yml' file. alert_bytes (bytes or None, optional): Message data, if available. Some schemas will use this to infer the schema version. Returns: Schema """
[docs] @abc.abstractmethod def serialize( self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None ) -> bytes: """Serialize `alert_dict`. This method must be implemented by subclasses. Args: alert_dict (dict): The dictionary to be serialized. serializer (str or None, optional): Whether to serialize the dict using Avro or JSON. If not None, this will override the `serializer` property and is subject to the same conditions. Returns: bytes: The serialized data in bytes. """
[docs] @abc.abstractmethod def deserialize(self, alert_bytes: bytes) -> dict: """Deserialize `alert_bytes`. This method must be implemented by subclasses. Args: alert_bytes (bytes): The bytes to be deserialized. Returns: dict: A dictionary representing the deserialized `alert_bytes`. """
@abc.abstractmethod def _name_in_bucket(_alert: "Alert") -> str: """Construct the name of the Google Cloud Storage object.""" @property def survey(self) -> str: return self.name @property def map(self) -> dict: """Mapping of Pitt-Google's generic field names to survey-specific field names.""" if self._map is None: yml = __package_path__ / "schemas" / "maps" / f"{self.survey}.yml" try: self._map = yaml.safe_load(yml.read_text()) except FileNotFoundError: raise ValueError(f"no schema map found for schema name '{self.name}'") return self._map
[docs] @attrs.define(kw_only=True) class DefaultSchema(Schema): """Default schema to serialize and deserialize alert bytes.""" serializer: Literal["json", "avro"] = attrs.field(default="json") """Whether to serialize the alert_dict to JSON (default) or Avro when, e.g., publishing a Pub/Sub message. If "avro", the user must supply the schema definition by setting :meth:`Schema.definition`.""" deserializer: Literal["json", "avro"] = attrs.field(default="json") """Whether to use a JSON (default) or Avro to deserialize when decoding `alert_bytes` -> `alert_dict`. If "avro", this `pittgoogle.Schema` will expect the Avro schema to be attached to `alert_bytes` in the header.""" @classmethod def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None): """Create a schema object from `yaml_dict`. Args: yaml_dict (dict): A dictionary containing the schema information. This should be one entry from the registry's 'schemas.yml' file. alert_bytes (bytes or None, optional): Message data, if available. This is unused and not necessary for this schema. Returns: Schema """ schema = cls(**yaml_dict) # Resolve the path. If it is not None, it is expected to be the path to # a ".avsc" file relative to the pittgoogle package directory. schema.path = __package_path__ / schema.path if schema.path is not None else None # Load the avro schema definition, if the file exists. Fallback to None. invalid_path = ( (schema.path is None) or (schema.path.suffix != ".avsc") or (not schema.path.is_file()) ) schema.definition = None if invalid_path else fastavro.schema.load_schema(schema.path) return schema
[docs] def serialize( self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None ) -> bytes: """Serialize the `alert_dict`. Args: alert_dict (dict): The dictionary to be serialized. serializer (str or None, optional): Whether to serialize the dict using Avro or JSON. If not None, this will override the `serializer` property and is subject to the same conditions. Returns: bytes: The serialized data in bytes. """ _serializer = serializer or self.serializer if _serializer == "json": return Serializers.serialize_json(alert_dict) return Serializers.serialize_avro(alert_dict, schema_definition=self.definition)
[docs] def deserialize(self, alert_bytes: bytes) -> dict: """Deserialize `alert_bytes` using JSON or Avro format as defined by the `deserializer` property. Args: alert_bytes (bytes): The bytes to be deserialized. Returns: A dictionary representing the deserialized `alert_bytes`. """ if self.deserializer == "json": return Serializers.deserialize_json(alert_bytes) return Serializers.deserialize_avro(alert_bytes)
def _name_in_bucket(_alert: "Alert") -> None: """Construct the name of the Google Cloud Storage object.""" raise NotImplementedError("Name syntax is unknown.")
# --------- Survey Schema Definitions --------- #
[docs] @attrs.define(kw_only=True) class ElasticcSchema(Schema): """Schema for ELAsTiCC alerts.""" serializer: Literal["json", "avro"] = attrs.field(default="avro") """Whether to serialize the dict to Avro (default) or JSON when, e.g., publishing a Pub/Sub message. If "avro", this schema will use the schemaless Avro format.""" deserializer: Literal["json", "avro"] = attrs.field(default="avro") """Whether to use a Avro (default) or JSON to deserialize when decoding alert_bytes -> alert_dict. If "avro", this schema will use the schemaless Avro format.""" @classmethod def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None): """Create a schema object from `yaml_dict`. Args: yaml_dict (dict): A dictionary containing the schema information, loaded from the registry's 'schemas.yml' file. alert_bytes (bytes or None, optional): Message data, if available. This is unused and not necessary for this schema. Returns: Schema """ schema = cls(**yaml_dict) schema.path = __package_path__ / schema.path schema.definition = fastavro.schema.load_schema(schema.path) return schema
[docs] def serialize( self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None ) -> bytes: """Serialize the `alert_dict`. Args: alert_dict (dict): The dictionary to be serialized. serializer (str or None, optional): Whether to serialize the dict using Avro or JSON. If not None, this will override :meth:`ElasticcSchema.serializer` and is subject to the same conditions. Returns: bytes: The serialized data in bytes. """ _serializer = serializer or self.serializer if _serializer == "json": return Serializers.serialize_json(alert_dict) return Serializers.serialize_schemaless_avro(alert_dict, schema_definition=self.definition)
[docs] def deserialize(self, alert_bytes: bytes) -> dict: """Deserialize `alert_bytes` using JSON or Avro format as defined by :meth:`ElasticcSchema.deserializer`. Args: alert_bytes (bytes): The bytes to be deserialized. Returns: A dictionary representing the deserialized `alert_bytes`. """ if self.deserializer == "json": return Serializers.deserialize_json(alert_bytes) return Serializers.deserialize_schemaless_avro( alert_bytes, schema_definition=self.definition )
def _name_in_bucket(_alert: "Alert") -> None: """Construct the name of the Google Cloud Storage object.""" raise NotImplementedError("Name syntax is unknown.")
[docs] @attrs.define(kw_only=True) class LsstSchema(Schema): """Schema for LSST alerts.""" serializer: Literal["json", "avro"] = attrs.field(default="avro") """Whether to serialize the dict to Avro (default) or JSON when, e.g., publishing a Pub/Sub message. If "avro", this schema will use the Avro Confluent Wire Format (https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).""" deserializer: Literal["json", "avro"] = attrs.field(default="avro") """Whether to use Avro (default) or JSON to deserialize when decoding alert_bytes -> alert_dict. If "avro", this schema will use the Avro Confluent Wire Format (https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).""" @classmethod def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None): """Create a schema object from `yaml_dict`. Args: yaml_dict (dict): A dictionary containing the schema information, loaded from the registry's 'schemas.yml' file. alert_bytes (bytes or None, optional): Message data. This is needed in order to infer the schema version. If not provided, methods such as :meth:`LsstSchema.serialize` (if avro), :meth:`LsstSchema.deserialize` (if avro), and :meth:`LsstSchema._name_in_bucket` will raise a :class:`pittgoogle.exceptions.SchemaError`. Returns: Schema """ schema = cls(**yaml_dict) if alert_bytes is None: LOGGER.warning( "No alert_bytes provided. Cannot infer schema version. " "Methods that rely on it will be unavailable." ) return schema # Get the schema ID out of the avro header and use it to construct the schema version. # LSST's syntax is: schema-id = 703 (int) --> schema-version = 'v7_3' _, version_id = struct.Struct(">bi").unpack(alert_bytes[:5]) schema.version_id = version_id # Convert, eg, 1103 -> 'v11_3' major = str(version_id // 100) minor = str(version_id % 100) schema.version = f"v{major}_{minor}" if schema.version not in [ "v7_0", "v7_1", "v7_2", "v7_3", "v7_4", "v8_0", "v9_0", "v10_0", "v11_0", "v11_1", ]: raise exceptions.SchemaError(f"Schema definition not found for {schema.version}.") # Resolve the path and load the schema definition. schema_path = schema.path.replace("MAJOR", major).replace("MINOR", minor) schema.path = __package_path__ / schema_path schema.definition = fastavro.schema.load_schema(schema.path) return schema
[docs] def serialize( self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None ) -> bytes: """Serialize the `alert_dict`. Args: alert_dict (dict): The dictionary to be serialized. serializer (str or None, optional): Whether to serialize the dict using Avro or JSON. If not None, this will override :meth:`LsstSchema.serializer` and is subject to the same conditions. Returns: bytes: The serialized data in bytes. Raises: exceptions.SchemaError: If the schema version or definition are unavailable and Avro serialization is requested. """ _serializer = serializer or self.serializer if _serializer == "json": return Serializers.serialize_json(alert_dict) if self.version is None or self.definition is None: raise exceptions.SchemaError( "No Avro schema information is available. Cannot serialize to Avro." ) # Reconstruct LSST's schema ID from the version string. Convert, eg, 'v7_3' -> 703. schema_id = int("0".join(self.version.strip("v").split("_"))) return Serializers.serialize_confluent_wire_avro( alert_dict, schema_definition=self.definition, schema_id=schema_id )
[docs] def deserialize(self, alert_bytes: bytes) -> dict: """Deserialize `alert_bytes` using JSON or Avro format as defined by :meth:`LsstSchema.deserializer`. Args: alert_bytes (bytes): The bytes to be deserialized. Returns: A dictionary representing the deserialized `alert_bytes`. """ if self.deserializer == "json": return Serializers.deserialize_json(alert_bytes) if self.definition is None: raise exceptions.SchemaError( "No schema definition available. Cannot deserialize Avro." ) return Serializers.deserialize_confluent_wire_avro( alert_bytes, schema_definition=self.definition )
@staticmethod def _name_in_bucket(alert: "Alert") -> str: """Construct the name of the Google Cloud Storage object.""" if alert.schema.version is None: raise exceptions.SchemaError( "No version information available. Cannot construct object name." ) _date = datetime.date.fromtimestamp( float(alert.attributes["kafka.timestamp"]) / 1000 ).strftime("%Y-%m-%d") objectid_key = alert.get_key("objectid", name_only=True) sourceid_key = alert.get_key("sourceid", name_only=True) return f"{alert.schema.version}/kafkaPublishTimestamp={_date}/{objectid_key}={alert.objectid}/{sourceid_key}={alert.sourceid}.avro"
[docs] @attrs.define(kw_only=True) class LvkSchema(DefaultSchema): """Schema for LVK alerts.""" @classmethod def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None) -> "Schema": """Create a schema object from `yaml_dict`. Args: yaml_dict (dict): A dictionary containing the schema information, loaded from the registry's 'schemas.yml' file. alert_bytes (bytes or None, optional): Message data. This is needed in order to get the schema version. If not provided, methods such as :meth:`LvkSchema._name_in_bucket` will raise a :class:`pittgoogle.exceptions.SchemaError`. Returns: Schema """ schema = super()._from_yaml(yaml_dict) alert_dict = Serializers.deserialize_json(alert_bytes or b"{}") schema.version = alert_dict.get("schema_version") return schema @staticmethod def _name_in_bucket(alert: "Alert") -> "str": """Construct the name of the Google Cloud Storage object.""" if alert.schema.version is None: raise exceptions.SchemaError("Schema version not found. Cannot construct object name.") filename = f"{alert.dict['alert_type']}-{alert.dict['time_created'][0:10]}.json" return f"{alert.schema.version}/{alert.get_key('objectid')}={alert.objectid}/{filename}"
[docs] @attrs.define(kw_only=True) class ZtfSchema(DefaultSchema): """Schema for ZTF alerts.""" deserializer: Literal["json", "avro"] = attrs.field(default="avro") """Whether to use a Avro (default) or JSON to deserialize when decoding `alert_bytes` -> `alert_dict`. If "avro", this `pittgoogle.Schema` will expect the Avro schema to be attached to `alert_bytes` in the header."""
[docs] @attrs.define(kw_only=True) class RapidSchema(Schema): """Schema for RAPID alerts.""" serializer: Literal["json", "avro"] = attrs.field(default="avro") """Whether to serialize the dict to Avro (default) or JSON when, e.g., publishing a Pub/Sub message. If "avro", this schema will use the Avro Confluent Wire Format (https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).""" deserializer: Literal["json", "avro"] = attrs.field(default="avro") """Whether to use Avro (default) or JSON to deserialize when decoding alert_bytes -> alert_dict. If "avro", this schema will use the Avro Confluent Wire Format (https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).""" @classmethod def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None): """Create a schema object from `yaml_dict`. Args: yaml_dict (dict): A dictionary containing the schema information, loaded from the registry's 'schemas.yml' file. alert_bytes (bytes or None, optional): Message data. This is needed in order to infer the schema version. If not provided, methods such as :meth:`RapidSchema.serialize` (if avro), :meth:`RapidSchema.deserialize` (if avro), and :meth:`RapidSchema._name_in_bucket` will raise a :class:`pittgoogle.exceptions.SchemaError`. Returns: Schema """ schema = cls(**yaml_dict) if alert_bytes is None: LOGGER.warning( "No alert_bytes provided. Cannot infer schema version. " "Methods that rely on it will be unavailable." ) return schema # Get the schema ID out of the avro header and use it to construct the schema version. # RAPID's syntax is: schema-id = 101 (int) --> schema-version = 'v1_0' _, version_id = struct.Struct(">bi").unpack(alert_bytes[:5]) schema.version_id = version_id # Convert, eg, 100 -> 'v01_00' major = f"{version_id // 100:02d}" minor = f"{version_id % 100:02d}" schema.version = f"v{major}_{minor}" if schema.version not in [ "v01_00", ]: raise exceptions.SchemaError(f"Schema definition not found for {schema.version}.") # Resolve the path and load the schema definition. schema_path = schema.path.replace("MAJOR", major).replace("MINOR", minor) schema.path = __package_path__ / schema_path schema.definition = fastavro.schema.load_schema(schema.path) return schema
[docs] def serialize( self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None ) -> bytes: """Serialize the `alert_dict`. Args: alert_dict (dict): The dictionary to be serialized. serializer (str or None, optional): Whether to serialize the dict using Avro or JSON. If not None, this will override :meth:`LsstSchema.serializer` and is subject to the same conditions. Returns: bytes: The serialized data in bytes. Raises: exceptions.SchemaError: If the schema version or definition are unavailable and Avro serialization is requested. """ _serializer = serializer or self.serializer if _serializer == "json": return Serializers.serialize_json(alert_dict) if self.version is None or self.definition is None: raise exceptions.SchemaError( "No Avro schema information is available. Cannot serialize to Avro." ) # Reconstruct RAPID's schema ID from the version string. Convert, eg, 'v1_0' -> 100. schema_id = self.version_id return Serializers.serialize_confluent_wire_avro( alert_dict, schema_definition=self.definition, schema_id=schema_id )
[docs] def deserialize(self, alert_bytes: bytes) -> dict: """Deserialize `alert_bytes` using JSON or Avro format as defined by :meth:`RapidSchema.deserializer`. Args: alert_bytes (bytes): The bytes to be deserialized. Returns: A dictionary representing the deserialized `alert_bytes`. """ if self.deserializer == "json": return Serializers.deserialize_json(alert_bytes) if self.definition is None: raise exceptions.SchemaError( "No schema definition available. Cannot deserialize Avro." ) return Serializers.deserialize_confluent_wire_avro( alert_bytes, schema_definition=self.definition )
@staticmethod def _name_in_bucket(alert: "Alert") -> str: """Construct the name of the Google Cloud Storage object.""" if alert.schema.version is None: raise exceptions.SchemaError( "No version information available. Cannot construct object name." ) _date = datetime.date.fromtimestamp( float(alert.attributes["kafka.timestamp"]) / 1000 ).strftime("%Y-%m-%d") objectid_key = alert.get_key("objectid", name_only=True) sourceid_key = alert.get_key("sourceid", name_only=True) return f"{alert.schema.version}/kafkaPublishTimestamp={_date}/{objectid_key}={alert.objectid}/{sourceid_key}={alert.sourceid}.avro"