Source code for pittgoogle.bigquery

# -*- coding: UTF-8 -*-
"""Classes to facilitate connections to BigQuery datasets and tables.

.. autosummary::

    Client
    Table

----
"""
import logging
from typing import TYPE_CHECKING, Optional

import attrs
import google.cloud.bigquery

from .alert import Alert
from .auth import Auth

if TYPE_CHECKING:
    import pandas as pd  # always lazy-load pandas. it hogs memory on cloud functions and run

LOGGER = logging.getLogger(__name__)


[docs] @attrs.define class Client: """A client for interacting with Google BigQuery. ---- """ _auth: Auth = attrs.field( default=None, validator=attrs.validators.optional(attrs.validators.instance_of(Auth)) ) _client: google.cloud.bigquery.Client | None = attrs.field(default=None) def __getattr__(self, attr): """If ``attr`` doesn't exist in this class, try getting it from the underlying ``google.cloud.bigquery.Client``. Raises: AttributeError: if ``attr`` doesn't exist in either the pittgoogle or google.cloud API. """ try: return getattr(self.client, attr) except AttributeError as excep: msg = f"Neither 'pittgoogle.bigquery.Client' nor 'google.cloud.bigquery.Client' has attribute '{attr}'" raise AttributeError(msg) from excep @property def auth(self) -> Auth: """Credentials for the Google Cloud project that this client will be connected to. This will be created using environment variables if necessary. """ if self._auth is None: self._auth = Auth() return self._auth @property def client(self) -> google.cloud.bigquery.Client: """Google Cloud BigQuery client. If the client has not been initialized yet, it will be created using :attr:`Client.auth`. Returns: google.cloud.bigquery.Client: An instance of the Google Cloud BigQuery client. """ if self._client is None: self._client = google.cloud.bigquery.Client(credentials=self.auth.credentials) return self._client
[docs] def list_table_names(self, dataset: str, projectid: str | None = None) -> list[str]: """Get the names of the tables in the dataset. Args: dataset (str): The name of the dataset. projectid (str, optional): The dataset owner's Google Cloud project ID. If None, :attr:`Client.client.project` will be used. Returns: list[str]: A list of table names in the dataset. Example: .. code-block:: python bqclient = pittgoogle.bigquery.Client() bqclient.list_table_names(dataset="ztf", projectid=pittgoogle.ProjectIds().pittgoogle) """ project = projectid or self.client.project return sorted([tbl.table_id for tbl in self.client.list_tables(f"{project}.{dataset}")])
[docs] def query( self, query: str, to_dataframe: bool = True, to_dataframe_kwargs: dict | None = None, **job_config_kwargs ): """Submit a BigQuery query job. Args: query (str): The SQL query to execute. to_dataframe (bool, optional): Whether to fetch the results and return them as a pandas DataFrame (True, default) or just return the query job (False). to_dataframe_kwargs (dict, optional): Keyword arguments to be passed to ``google.cloud.bigquery.QueryJob.to_dataframe``. Notable options: ``dtypes`` (dict), ``max_results`` (int), ``create_bqstorage_client`` (bool). This is ignored unless ``to_dataframe`` is True. ``create_bqstorage_client`` controls whether to use `google.cloud.bigquery_storage` (True) or `google.cloud.bigquery` (False). `bigquery_storage` can be faster but is not necessary. If you do not specify this parameter, pittgoogle will set it to True if the `bigquery_storage` library is installed, else False. **job_config_kwargs: Keyword arguments to pass to the `google.cloud.bigquery.QueryJobConfig` constructor. Notable option: ``dry_run`` (bool). Returns: pandas.DataFrame if ``to_dataframe`` is True, else google.cloud.bigquery.QueryJob Example: Query two tables (ztf.alerts_v4_02 and ztf.alerts_v3_3) for data on one object (ZTF19acfixfe). .. code-block:: python bqclient = pittgoogle.bigquery.Client() pittgoogle_project = pittgoogle.ProjectIds().pittgoogle sql = f\"\"\" SELECT objectId, candid, candidate.jd, candidate.fid, candidate.magpsf FROM `{pittgoogle_project}.ztf.alerts_v3_3` WHERE objectId = 'ZTF19acfixfe' UNION ALL SELECT objectId, candid, candidate.jd, candidate.fid, candidate.magpsf FROM `{pittgoogle_project}.ztf.alerts_v4_02` WHERE objectId = 'ZTF19acfixfe' \"\"\" diaobject_df = bqclient.query(query=sql) """ # Submit job_config = google.cloud.bigquery.QueryJobConfig(**job_config_kwargs) query_job = self.client.query(query, job_config=job_config) # Return if job_config.dry_run: print(f"This query will process {query_job.total_bytes_processed:,} bytes") return query_job if to_dataframe: kwargs = to_dataframe_kwargs.copy() if to_dataframe_kwargs else {} # Google sets 'create_bqstorage_client' to True by default and then raises a warning if the # 'bigquery_storage' library is not installed. Most pittgoogle users are not likely to have # this installed or even know what it is. Let's avoid the warning and just quietly check for it. create_bqstorage_client = self._check_bqstorage_client(kwargs.pop("create_bqstorage_client", None)) return query_job.to_dataframe(create_bqstorage_client=create_bqstorage_client, **kwargs) return query_job
@staticmethod def _check_bqstorage_client(user_value: bool | None) -> bool: """If ``user_value`` is None, check whether ``google.cloud.bigquery_storage`` is installed by trying to import it. Returns: bool: ``user_value`` if it is not None. Else, True (False) if the import is (is not) successful. """ if user_value is not None: return user_value try: import google.cloud.bigquery_storage # noqa: W0611 except ModuleNotFoundError: return False return True
[docs] @attrs.define class Table: """Methods and properties for interacting with a Google BigQuery table. Args: name (str): Name of the BigQuery table. dataset (str): Name of the BigQuery dataset this table belongs to. projectid (str, optional): The table owner's Google Cloud project ID. If not provided, the client's project ID will be used. client (google.cloud.bigquery.Client, optional): BigQuery client that will be used to access the table. If not provided, a new client will be created the first time it is requested. ---- """ # Strings _below_ the field will make these also show up as individual properties in rendered docs. name: str = attrs.field() """Name of the BigQuery table.""" dataset: str = attrs.field() """Name of the BigQuery dataset this table belongs to.""" client: Client | None = attrs.field(factory=Client) """BigQuery client used to access the table.""" # The rest don't need string descriptions because they are explicitly defined as properties below. _projectid: str = attrs.field(default=None) _table: google.cloud.bigquery.Table | None = attrs.field(default=None, init=False) _schema: Optional["pd.DataFrame"] = attrs.field(default=None, init=False)
[docs] @classmethod def from_cloud( cls, name: str, *, dataset: str | None = None, survey: str | None = None, testid: str | None = None, ): """Create a :class:`Table` object using a BigQuery client with implicit credentials. Use this method when creating a :class:`Table` object in code running in Google Cloud (e.g., in a Cloud Run module). The underlying Google APIs will automatically find your credentials. The table resource in Google BigQuery is expected to already exist. Args: name (str): Name of the table. dataset (str, optional): Name of the dataset containing the table. Either this or a `survey` is required. If a `testid` is provided, it will be appended to this name following the Pitt-Google naming syntax. survey (str, optional): Name of the survey. This will be used as the name of the dataset if the `dataset` kwarg is not provided. This kwarg is provided for convenience in cases where the Pitt-Google naming syntax is used to name resources. testid (str, optional): Pipeline identifier. If this is not `None`, `False`, or `"False"`, it will be appended to the dataset name. This is used in cases where the Pitt-Google naming syntax is used to name resources. This allows pipeline modules to find the correct resources without interfering with other pipelines that may have deployed resources with the same base names (e.g., for development and testing purposes). Returns: Table: The `Table` object. """ if dataset is None: dataset = survey # if testid is not False, "False", or None, append it to the dataset if testid and testid != "False": dataset = f"{dataset}_{testid}" # create a client with implicit credentials client = Client(client=google.cloud.bigquery.Client()) table = cls(name=name, dataset=dataset, projectid=client.project, client=client) # make the get request now to fail early if there's a problem _ = table.table return table
def __getattr__(self, attr): """If ``attr`` doesn't exist in this class, try getting it from the underlying ``google.cloud.bigquery.Table``. Raises: AttributeError: if ``attr`` doesn't exist in either the pittgoogle or google.cloud API. """ try: return getattr(self.table, attr) except AttributeError as excep: msg = f"Neither 'pittgoogle.bigquery.Table' nor 'google.cloud.bigquery.Table' has attribute '{attr}'" raise AttributeError(msg) from excep @property def id(self) -> str: """Fully qualified table ID with syntax 'projectid.dataset.name'.""" return f"{self.projectid}.{self.dataset}.{self.name}" @property def projectid(self) -> str: """The table owner's Google Cloud project ID. Defaults to :attr:`Table.client.client.project`. """ if self._projectid is None: self._projectid = self.client.client.project return self._projectid @property def table(self) -> google.cloud.bigquery.Table: """Google Cloud BigQuery Table object. Makes a `get_table` request if necessary. Returns: google.cloud.bigquery.Table: The BigQuery Table object, connected to the Cloud resource. """ if self._table is None: self._table = self.client.get_table(self.id) return self._table @property def schema(self) -> "pd.DataFrame": """Schema of the BigQuery table.""" if self._schema is None: # [TODO] Wondering, should we avoid pandas here? Maybe make this a dict instead? import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run fields = [] for field in self.table.schema: fld = field.to_api_repr() # dict child_fields = fld.pop("fields", []) # Append parent field name so that the child field name has the syntax 'parent_name.child_name'. # This is the syntax that should be used in SQL queries and also the one shown on BigQuery Console page. # The dicts update in place. _ = [cfld.update(name=f"{fld['name']}.{cfld['name']}") for cfld in child_fields] fields.extend([fld] + child_fields) self._schema = pd.DataFrame(fields) return self._schema
[docs] def insert_rows(self, rows: list[dict | Alert]) -> list[dict]: """Insert rows into the BigQuery table. Args: rows (list[dict or Alert]): The rows to be inserted. Can be a list of dictionaries or a list of Alert objects. Returns: list[dict]: A list of errors encountered. """ # if elements of rows are Alerts, need to extract the dicts myrows = [row.dict if isinstance(row, Alert) else row for row in rows] errors = self.client.insert_rows(self.table, myrows) if len(errors) > 0: LOGGER.warning(f"BigQuery insert error: {errors}") return errors
[docs] def query( self, *, columns: list[str] | None = None, where: str | None = None, limit: int | str | None = None, to_dataframe: bool = True, dry_run: bool = False, return_sql: bool = False, ): """Submit a BigQuery query job. Against this table. This method supports basic queries against this table. For more complex queries or queries against multiple tables, use :attr:`Client.query`. Args: columns (list[str], optional): List of columns to select. If None, all columns are selected. where (str, optional): SQL WHERE clause. limit (int or str, optional): Maximum number of rows to return. to_dataframe (bool, optional): Whether to fetch the results and return them as a pandas DataFrame (True, default) or just return the query job (False). dry_run (bool, optional): Whether to do a dry-run only to check whether the query is valid and estimate costs. return_sql (bool, optional): If True, the SQL query string will be returned. The query job will not be submitted. Returns: pandas.DataFrame, google.cloud.bigquery.QueryJob, or str: The SQL query string if ``return_sql`` is True. Otherwise, the results in a DataFrame if ``to_dataframe`` is True, else the query job. Example: .. code-block:: python alerts_tbl = pittgoogle.Table( name="alerts_v4_02", dataset="ztf", projectid=pittgoogle.ProjectIds().pittgoogle ) columns = ["objectId", "candid", "candidate.jd", "candidate.fid", "candidate.magpsf"] where = "objectId IN ('ZTF18aarunfu', 'ZTF24aavyicb', 'ZTF24aavzkuf')" diaobjects_df = alerts_tbl.query(columns=columns, where=where) """ # We could use parameterized queries, but accounting for all input possibilities would take a good amount of # work which should not be necessary. This query will be executed with the user's credentials/permissions. # No special access is added here. The user can already submit arbitrary SQL queries using 'Table.client.query', # so there's no point in trying to protect against SQL injection here. # Construct the SQL statement sql = f"SELECT {', '.join(columns) if columns else '*'}" sql += f" FROM `{self.table.full_table_id.replace(':', '.')}`" if where is not None: sql += f" WHERE {where}" if limit is not None: sql += f" LIMIT {limit}" if return_sql: return sql # Do the query return self.client.query(query=sql, dry_run=dry_run, to_dataframe=to_dataframe)