# -*- coding: UTF-8 -*-
"""Classes to manage alert schemas.
.. autosummary::
Serializers
Schema
DefaultSchema
ElasticcSchema
LsstSchema
LvkSchema
ZtfSchema
"""
import abc
import base64
import io
import datetime
import json
import logging
import struct
import types
import math
from pathlib import Path
from typing import TYPE_CHECKING, Literal
import attrs
import fastavro
import numpy as np
import yaml
from . import __package_path__, exceptions
if TYPE_CHECKING:
import google.cloud.pubsub_v1
from . import Alert, types_
LOGGER = logging.getLogger(__name__)
# --------- Serializers --------- #
@attrs.define
class Serializers:
@staticmethod
def serialize_json(alert_dict: dict) -> bytes:
"""Serialize `alert_dict` using the JSON format.
Args:
alert_dict (dict):
The dictionary to be serialized.
Returns:
bytes:
The serialized data in bytes.
"""
return json.dumps(Serializers._clean_for_json(alert_dict)).encode("utf-8")
@staticmethod
def deserialize_json(alert_bytes: bytes) -> dict:
"""Deserialize `alert_bytes` using the JSON format.
Args:
alert_bytes (bytes):
The bytes to be deserialized. This is expected to be serialized as JSON.
Returns:
dict:
The deserialized data in a dictionary.
"""
return json.loads(alert_bytes)
@staticmethod
def serialize_avro(alert_dict: dict, *, schema_definition: dict) -> bytes:
"""Serialize `alert_dict` using the Avro format.
Args:
alert_dict (dict):
The dictionary to be serialized.
schema_definition (dict):
The Avro schema definition to use for serialization.
Returns:
bytes:
The serialized data in bytes.
"""
bytes_io = io.BytesIO()
fastavro.writer(bytes_io, schema_definition, [alert_dict])
return bytes_io.getvalue()
@staticmethod
def deserialize_avro(alert_bytes: bytes) -> dict:
"""Deserialize `alert_bytes` using the Avro format.
Args:
alert_bytes (bytes):
The bytes to be deserialized. This is expected to be serialized as Avro with the
schema attached in the header.
Returns:
dict:
The deserialized data in a dictionary.
"""
with io.BytesIO(alert_bytes) as fin:
alert_dicts = list(fastavro.reader(fin)) # list with single dict
if len(alert_dicts) != 1:
LOGGER.warning(f"Expected 1 Avro record. Found {len(alert_dicts)}.")
return alert_dicts[0]
@staticmethod
def serialize_schemaless_avro(alert_dict: dict, *, schema_definition: dict) -> bytes:
"""Serialize `alert_dict` using the schemaless Avro format.
Args:
alert_dict (dict):
The dictionary to be serialized. The schema is expected to match `schema_definition`.
schema_definition (dict):
The Avro schema definition to use for serialization.
Returns:
bytes:
The serialized data in bytes.
"""
fout = io.BytesIO()
fastavro.schemaless_writer(fout, schema_definition, alert_dict)
return fout.getvalue()
@staticmethod
def deserialize_schemaless_avro(alert_bytes: bytes, *, schema_definition: dict) -> dict:
"""Deserialize `alert_bytes` using the schemaless Avro format.
Args:
alert_bytes (bytes):
The bytes to be deserialized. This is expected to be serialized as Avro without the
schema attached in the header. The schema is expected to match `schema_definition`.
schema_definition (dict):
The Avro schema definition to use for deserialization.
Returns:
dict:
The deserialized data in a dictionary.
"""
bytes_io = io.BytesIO(alert_bytes)
return fastavro.schemaless_reader(bytes_io, schema_definition)
@staticmethod
def serialize_confluent_wire_avro(
alert_dict: dict, *, schema_definition: dict, schema_id: int
) -> bytes:
"""Serialize `alert_dict` using the Avro Confluent Wire Format.
https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
Args:
alert_dict (dict):
The dictionary to be serialized. The schema is expected to match `schema_definition`.
schema_definition (dict):
The Avro schema definition to use for serialization.
version_id (int):
The version ID of the schema. This is a 4-byte integer in big-endian format.
It will be attached to the header of the serialized data.
Returns:
bytes:
The serialized data in bytes.
"""
# https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
fout = io.BytesIO()
fout.write(b"\x00")
fout.write(struct.pack(">i", schema_id))
fastavro.schemaless_writer(fout, schema_definition, alert_dict)
return fout.getvalue()
@staticmethod
def deserialize_confluent_wire_avro(alert_bytes: bytes, *, schema_definition: dict) -> dict:
"""Deserialize `alert_bytes` using the Avro Confluent Wire Format.
https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
Args:
alert_bytes (bytes):
The bytes to be deserialized. This is expected to be serialized in Avro Confluent
Wire Format. The schema is expected to match `schema_definition`.
schema_definition (dict):
The Avro schema definition to use for deserialization.
Returns:
dict:
The deserialized data in a dictionary.
"""
# https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
bytes_io = io.BytesIO(alert_bytes[5:])
return fastavro.schemaless_reader(bytes_io, schema_definition)
@staticmethod
def _clean_for_json(
value: str | int | float | bytes | list | dict | None,
) -> str | int | float | list | dict | None:
"""Coerce `value` to a format suitable for json serialization.
Args:
value (str, int, float, bytes, list, dict, or None):
The value to be cleaned. str, int, float (except NaN), and None types are
returned unchanged. `None` is returned if `value` is consistent with NaN.
bytes types are returned as base64-encoded strings. If list or dict, this
method will be called recursively on each element/item.
Returns:
str, int, float, list, dict, or None:
`value` with NaN replaced by None. Replacement is recursive if `value` is a list or dict.
Raises:
TypeError:
If `value` is not a str, int, float, bytes, list, or dict.
"""
# Return value suitable for json serialization.
if isinstance(value, (str, int, types.NoneType)):
return value
if isinstance(value, float):
if math.isnan(value) or math.isinf(value):
return None
return value
if isinstance(value, bytes):
return base64.b64encode(value).decode("utf-8")
if isinstance(value, datetime.datetime):
return value.timestamp()
# Recurse.
if isinstance(value, list):
return [Serializers._clean_for_json(v) for v in value]
if isinstance(value, dict):
return {k: Serializers._clean_for_json(v) for k, v in value.items()}
# That's all we know how to deal with right now.
raise TypeError(f"Unrecognized type '{type(value)}' ({value})")
# --------- Default Schema Definitions --------- #
[docs]
@attrs.define(kw_only=True)
class Schema(abc.ABC):
"""Class for an individual schema.
Do not call this class's constructor directly. Instead, load a schema using the registry
:class:`pittgoogle.registry.Schemas`.
"""
# String _under_ field definition will cause field to appear as a property in rendered docs.
name: str = attrs.field()
"""Name of the schema. This is typically the name of the survey as well."""
description: str = attrs.field()
"""A description of the schema."""
origin: str = attrs.field()
"""Pointer to the schema's origin. Typically this is a URL to a repo maintained by the survey."""
serializer: Literal["json", "avro"] = attrs.field()
"""Whether to serialize the dict to JSON or Avro when, e.g., publishing a Pub/Sub message."""
deserializer: Literal["json", "avro"] = attrs.field()
"""Whether to use a JSON or Avro to deserialize when decoding alert_bytes -> alert_dict."""
version: str | None = attrs.field(default=None)
"""Version of the schema, or None."""
version_id: str | None = attrs.field(default=None)
"""Version ID of the schema, or None. Currently only used for class:`_ConfluentWireAvroSchema`."""
definition: dict | None = attrs.field(default=None)
"""The schema definition used to serialize and deserialize the alert bytes, if one is required."""
path: Path | None = attrs.field(default=None)
"""Path to a file containing the schema definition."""
filter_map: dict = attrs.field(factory=dict)
"""Mapping of the filter name as stored in the alert (often an int) to the common name (often a string)."""
# The rest don't need string descriptions because we will define them as explicit properties.
_map: dict | None = attrs.field(default=None, init=False)
@classmethod
@abc.abstractmethod
def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None):
"""Create a :class:`Schema` object. This class method must be implemented by subclasses.
Args:
yaml_dict (dict):
A dictionary containing the schema information. This should be one entry from the
registry's 'schemas.yml' file.
alert_bytes (bytes or None, optional):
Message data, if available. Some schemas will use this to infer the schema version.
Returns:
Schema
"""
[docs]
@abc.abstractmethod
def serialize(
self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None
) -> bytes:
"""Serialize `alert_dict`. This method must be implemented by subclasses.
Args:
alert_dict (dict):
The dictionary to be serialized.
serializer (str or None, optional):
Whether to serialize the dict using Avro or JSON. If not None, this will override
the `serializer` property and is subject to the same conditions.
Returns:
bytes:
The serialized data in bytes.
"""
[docs]
@abc.abstractmethod
def deserialize(self, alert_bytes: bytes) -> dict:
"""Deserialize `alert_bytes`. This method must be implemented by subclasses.
Args:
alert_bytes (bytes):
The bytes to be deserialized.
Returns:
dict:
A dictionary representing the deserialized `alert_bytes`.
"""
@abc.abstractmethod
def _name_in_bucket(_alert: "Alert") -> str:
"""Construct the name of the Google Cloud Storage object."""
@property
def survey(self) -> str:
return self.name
@property
def map(self) -> dict:
"""Mapping of Pitt-Google's generic field names to survey-specific field names."""
if self._map is None:
yml = __package_path__ / "schemas" / "maps" / f"{self.survey}.yml"
try:
self._map = yaml.safe_load(yml.read_text())
except FileNotFoundError:
raise ValueError(f"no schema map found for schema name '{self.name}'")
return self._map
[docs]
@attrs.define(kw_only=True)
class DefaultSchema(Schema):
"""Default schema to serialize and deserialize alert bytes."""
serializer: Literal["json", "avro"] = attrs.field(default="json")
"""Whether to serialize the alert_dict to JSON (default) or Avro when, e.g., publishing a Pub/Sub message.
If "avro", the user must supply the schema definition by setting :meth:`Schema.definition`."""
deserializer: Literal["json", "avro"] = attrs.field(default="json")
"""Whether to use a JSON (default) or Avro to deserialize when decoding `alert_bytes` -> `alert_dict`.
If "avro", this `pittgoogle.Schema` will expect the Avro schema to be attached to `alert_bytes` in the header."""
@classmethod
def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None):
"""Create a schema object from `yaml_dict`.
Args:
yaml_dict (dict):
A dictionary containing the schema information. This should be one entry from the
registry's 'schemas.yml' file.
alert_bytes (bytes or None, optional):
Message data, if available. This is unused and not necessary for this schema.
Returns:
Schema
"""
schema = cls(**yaml_dict)
# Resolve the path. If it is not None, it is expected to be the path to
# a ".avsc" file relative to the pittgoogle package directory.
schema.path = __package_path__ / schema.path if schema.path is not None else None
# Load the avro schema definition, if the file exists. Fallback to None.
invalid_path = (
(schema.path is None) or (schema.path.suffix != ".avsc") or (not schema.path.is_file())
)
schema.definition = None if invalid_path else fastavro.schema.load_schema(schema.path)
return schema
[docs]
def serialize(
self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None
) -> bytes:
"""Serialize the `alert_dict`.
Args:
alert_dict (dict):
The dictionary to be serialized.
serializer (str or None, optional):
Whether to serialize the dict using Avro or JSON. If not None, this will override
the `serializer` property and is subject to the same conditions.
Returns:
bytes:
The serialized data in bytes.
"""
_serializer = serializer or self.serializer
if _serializer == "json":
return Serializers.serialize_json(alert_dict)
return Serializers.serialize_avro(alert_dict, schema_definition=self.definition)
[docs]
def deserialize(self, alert_bytes: bytes) -> dict:
"""Deserialize `alert_bytes` using JSON or Avro format as defined by the `deserializer` property.
Args:
alert_bytes (bytes):
The bytes to be deserialized.
Returns:
A dictionary representing the deserialized `alert_bytes`.
"""
if self.deserializer == "json":
return Serializers.deserialize_json(alert_bytes)
return Serializers.deserialize_avro(alert_bytes)
def _name_in_bucket(_alert: "Alert") -> None:
"""Construct the name of the Google Cloud Storage object."""
raise NotImplementedError("Name syntax is unknown.")
# --------- Survey Schema Definitions --------- #
[docs]
@attrs.define(kw_only=True)
class ElasticcSchema(Schema):
"""Schema for ELAsTiCC alerts."""
serializer: Literal["json", "avro"] = attrs.field(default="avro")
"""Whether to serialize the dict to Avro (default) or JSON when, e.g., publishing a Pub/Sub message.
If "avro", this schema will use the schemaless Avro format."""
deserializer: Literal["json", "avro"] = attrs.field(default="avro")
"""Whether to use a Avro (default) or JSON to deserialize when decoding alert_bytes -> alert_dict.
If "avro", this schema will use the schemaless Avro format."""
@classmethod
def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None):
"""Create a schema object from `yaml_dict`.
Args:
yaml_dict (dict):
A dictionary containing the schema information, loaded from the registry's 'schemas.yml' file.
alert_bytes (bytes or None, optional):
Message data, if available. This is unused and not necessary for this schema.
Returns:
Schema
"""
schema = cls(**yaml_dict)
schema.path = __package_path__ / schema.path
schema.definition = fastavro.schema.load_schema(schema.path)
return schema
[docs]
def serialize(
self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None
) -> bytes:
"""Serialize the `alert_dict`.
Args:
alert_dict (dict):
The dictionary to be serialized.
serializer (str or None, optional):
Whether to serialize the dict using Avro or JSON. If not None, this will override
:meth:`ElasticcSchema.serializer` and is subject to the same conditions.
Returns:
bytes:
The serialized data in bytes.
"""
_serializer = serializer or self.serializer
if _serializer == "json":
return Serializers.serialize_json(alert_dict)
return Serializers.serialize_schemaless_avro(alert_dict, schema_definition=self.definition)
[docs]
def deserialize(self, alert_bytes: bytes) -> dict:
"""Deserialize `alert_bytes` using JSON or Avro format as defined by :meth:`ElasticcSchema.deserializer`.
Args:
alert_bytes (bytes):
The bytes to be deserialized.
Returns:
A dictionary representing the deserialized `alert_bytes`.
"""
if self.deserializer == "json":
return Serializers.deserialize_json(alert_bytes)
return Serializers.deserialize_schemaless_avro(
alert_bytes, schema_definition=self.definition
)
def _name_in_bucket(_alert: "Alert") -> None:
"""Construct the name of the Google Cloud Storage object."""
raise NotImplementedError("Name syntax is unknown.")
[docs]
@attrs.define(kw_only=True)
class LsstSchema(Schema):
"""Schema for LSST alerts."""
serializer: Literal["json", "avro"] = attrs.field(default="avro")
"""Whether to serialize the dict to Avro (default) or JSON when, e.g., publishing a Pub/Sub message.
If "avro", this schema will use the Avro Confluent Wire Format
(https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format)."""
deserializer: Literal["json", "avro"] = attrs.field(default="avro")
"""Whether to use Avro (default) or JSON to deserialize when decoding alert_bytes -> alert_dict.
If "avro", this schema will use the Avro Confluent Wire Format
(https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format)."""
@classmethod
def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None):
"""Create a schema object from `yaml_dict`.
Args:
yaml_dict (dict):
A dictionary containing the schema information, loaded from the registry's 'schemas.yml' file.
alert_bytes (bytes or None, optional):
Message data. This is needed in order to infer the schema version. If not provided,
methods such as :meth:`LsstSchema.serialize` (if avro), :meth:`LsstSchema.deserialize` (if avro),
and :meth:`LsstSchema._name_in_bucket` will raise a :class:`pittgoogle.exceptions.SchemaError`.
Returns:
Schema
"""
schema = cls(**yaml_dict)
if alert_bytes is None:
LOGGER.warning(
"No alert_bytes provided. Cannot infer schema version. "
"Methods that rely on it will be unavailable."
)
return schema
# Get the schema ID out of the avro header and use it to construct the schema version.
# LSST's syntax is: schema-id = 703 (int) --> schema-version = 'v7_3'
_, version_id = struct.Struct(">bi").unpack(alert_bytes[:5])
schema.version_id = version_id
# Convert, eg, 1103 -> 'v11_3'
major = str(version_id // 100)
minor = str(version_id % 100)
schema.version = f"v{major}_{minor}"
if schema.version not in [
"v7_0",
"v7_1",
"v7_2",
"v7_3",
"v7_4",
"v8_0",
"v9_0",
"v10_0",
"v11_0",
"v11_1",
]:
raise exceptions.SchemaError(f"Schema definition not found for {schema.version}.")
# Resolve the path and load the schema definition.
schema_path = schema.path.replace("MAJOR", major).replace("MINOR", minor)
schema.path = __package_path__ / schema_path
schema.definition = fastavro.schema.load_schema(schema.path)
return schema
[docs]
def serialize(
self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None
) -> bytes:
"""Serialize the `alert_dict`.
Args:
alert_dict (dict):
The dictionary to be serialized.
serializer (str or None, optional):
Whether to serialize the dict using Avro or JSON. If not None, this will override
:meth:`LsstSchema.serializer` and is subject to the same conditions.
Returns:
bytes:
The serialized data in bytes.
Raises:
exceptions.SchemaError:
If the schema version or definition are unavailable and Avro serialization is requested.
"""
_serializer = serializer or self.serializer
if _serializer == "json":
return Serializers.serialize_json(alert_dict)
if self.version is None or self.definition is None:
raise exceptions.SchemaError(
"No Avro schema information is available. Cannot serialize to Avro."
)
# Reconstruct LSST's schema ID from the version string. Convert, eg, 'v7_3' -> 703.
schema_id = int("0".join(self.version.strip("v").split("_")))
return Serializers.serialize_confluent_wire_avro(
alert_dict, schema_definition=self.definition, schema_id=schema_id
)
[docs]
def deserialize(self, alert_bytes: bytes) -> dict:
"""Deserialize `alert_bytes` using JSON or Avro format as defined by :meth:`LsstSchema.deserializer`.
Args:
alert_bytes (bytes):
The bytes to be deserialized.
Returns:
A dictionary representing the deserialized `alert_bytes`.
"""
if self.deserializer == "json":
return Serializers.deserialize_json(alert_bytes)
if self.definition is None:
raise exceptions.SchemaError(
"No schema definition available. Cannot deserialize Avro."
)
return Serializers.deserialize_confluent_wire_avro(
alert_bytes, schema_definition=self.definition
)
@staticmethod
def _name_in_bucket(alert: "Alert") -> str:
"""Construct the name of the Google Cloud Storage object."""
if alert.schema.version is None:
raise exceptions.SchemaError(
"No version information available. Cannot construct object name."
)
_date = datetime.date.fromtimestamp(
float(alert.attributes["kafka.timestamp"]) / 1000
).strftime("%Y-%m-%d")
objectid_key = alert.get_key("objectid", name_only=True)
sourceid_key = alert.get_key("sourceid", name_only=True)
return f"{alert.schema.version}/kafkaPublishTimestamp={_date}/{objectid_key}={alert.objectid}/{sourceid_key}={alert.sourceid}.avro"
[docs]
@attrs.define(kw_only=True)
class LvkSchema(DefaultSchema):
"""Schema for LVK alerts."""
@classmethod
def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None) -> "Schema":
"""Create a schema object from `yaml_dict`.
Args:
yaml_dict (dict):
A dictionary containing the schema information, loaded from the registry's 'schemas.yml' file.
alert_bytes (bytes or None, optional):
Message data. This is needed in order to get the schema version. If not provided, methods
such as :meth:`LvkSchema._name_in_bucket` will raise a :class:`pittgoogle.exceptions.SchemaError`.
Returns:
Schema
"""
schema = super()._from_yaml(yaml_dict)
alert_dict = Serializers.deserialize_json(alert_bytes or b"{}")
schema.version = alert_dict.get("schema_version")
return schema
@staticmethod
def _name_in_bucket(alert: "Alert") -> "str":
"""Construct the name of the Google Cloud Storage object."""
if alert.schema.version is None:
raise exceptions.SchemaError("Schema version not found. Cannot construct object name.")
filename = f"{alert.dict['alert_type']}-{alert.dict['time_created'][0:10]}.json"
return f"{alert.schema.version}/{alert.get_key('objectid')}={alert.objectid}/{filename}"
[docs]
@attrs.define(kw_only=True)
class ZtfSchema(DefaultSchema):
"""Schema for ZTF alerts."""
deserializer: Literal["json", "avro"] = attrs.field(default="avro")
"""Whether to use a Avro (default) or JSON to deserialize when decoding `alert_bytes` -> `alert_dict`.
If "avro", this `pittgoogle.Schema` will expect the Avro schema to be attached to `alert_bytes` in the header."""
[docs]
@attrs.define(kw_only=True)
class RapidSchema(Schema):
"""Schema for RAPID alerts."""
serializer: Literal["json", "avro"] = attrs.field(default="avro")
"""Whether to serialize the dict to Avro (default) or JSON when, e.g., publishing a Pub/Sub message.
If "avro", this schema will use the Avro Confluent Wire Format
(https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format)."""
deserializer: Literal["json", "avro"] = attrs.field(default="avro")
"""Whether to use Avro (default) or JSON to deserialize when decoding alert_bytes -> alert_dict.
If "avro", this schema will use the Avro Confluent Wire Format
(https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format)."""
@classmethod
def _from_yaml(cls, yaml_dict: dict, *, alert_bytes: bytes | None = None):
"""Create a schema object from `yaml_dict`.
Args:
yaml_dict (dict):
A dictionary containing the schema information, loaded from the registry's 'schemas.yml' file.
alert_bytes (bytes or None, optional):
Message data. This is needed in order to infer the schema version. If not provided,
methods such as :meth:`RapidSchema.serialize` (if avro), :meth:`RapidSchema.deserialize` (if avro),
and :meth:`RapidSchema._name_in_bucket` will raise a :class:`pittgoogle.exceptions.SchemaError`.
Returns:
Schema
"""
schema = cls(**yaml_dict)
if alert_bytes is None:
LOGGER.warning(
"No alert_bytes provided. Cannot infer schema version. "
"Methods that rely on it will be unavailable."
)
return schema
# Get the schema ID out of the avro header and use it to construct the schema version.
# RAPID's syntax is: schema-id = 101 (int) --> schema-version = 'v1_0'
_, version_id = struct.Struct(">bi").unpack(alert_bytes[:5])
schema.version_id = version_id
# Convert, eg, 100 -> 'v01_00'
major = f"{version_id // 100:02d}"
minor = f"{version_id % 100:02d}"
schema.version = f"v{major}_{minor}"
if schema.version not in [
"v01_00",
]:
raise exceptions.SchemaError(f"Schema definition not found for {schema.version}.")
# Resolve the path and load the schema definition.
schema_path = schema.path.replace("MAJOR", major).replace("MINOR", minor)
schema.path = __package_path__ / schema_path
schema.definition = fastavro.schema.load_schema(schema.path)
return schema
[docs]
def serialize(
self, alert_dict: dict, *, serializer: Literal["json", "avro", None] = None
) -> bytes:
"""Serialize the `alert_dict`.
Args:
alert_dict (dict):
The dictionary to be serialized.
serializer (str or None, optional):
Whether to serialize the dict using Avro or JSON. If not None, this will override
:meth:`LsstSchema.serializer` and is subject to the same conditions.
Returns:
bytes:
The serialized data in bytes.
Raises:
exceptions.SchemaError:
If the schema version or definition are unavailable and Avro serialization is requested.
"""
_serializer = serializer or self.serializer
if _serializer == "json":
return Serializers.serialize_json(alert_dict)
if self.version is None or self.definition is None:
raise exceptions.SchemaError(
"No Avro schema information is available. Cannot serialize to Avro."
)
# Reconstruct RAPID's schema ID from the version string. Convert, eg, 'v1_0' -> 100.
schema_id = self.version_id
return Serializers.serialize_confluent_wire_avro(
alert_dict, schema_definition=self.definition, schema_id=schema_id
)
[docs]
def deserialize(self, alert_bytes: bytes) -> dict:
"""Deserialize `alert_bytes` using JSON or Avro format as defined by :meth:`RapidSchema.deserializer`.
Args:
alert_bytes (bytes):
The bytes to be deserialized.
Returns:
A dictionary representing the deserialized `alert_bytes`.
"""
if self.deserializer == "json":
return Serializers.deserialize_json(alert_bytes)
if self.definition is None:
raise exceptions.SchemaError(
"No schema definition available. Cannot deserialize Avro."
)
return Serializers.deserialize_confluent_wire_avro(
alert_bytes, schema_definition=self.definition
)
@staticmethod
def _name_in_bucket(alert: "Alert") -> str:
"""Construct the name of the Google Cloud Storage object."""
if alert.schema.version is None:
raise exceptions.SchemaError(
"No version information available. Cannot construct object name."
)
_date = datetime.date.fromtimestamp(
float(alert.attributes["kafka.timestamp"]) / 1000
).strftime("%Y-%m-%d")
objectid_key = alert.get_key("objectid", name_only=True)
sourceid_key = alert.get_key("sourceid", name_only=True)
return f"{alert.schema.version}/kafkaPublishTimestamp={_date}/{objectid_key}={alert.objectid}/{sourceid_key}={alert.sourceid}.avro"