Skip to content

Context

A high-level context manager for BigQuery operations.

This is the primary entry point for most users. It simplifies all interactions with BigQuery by providing an easy-to-use context manager (with statement) that handles the connection lifecycle automatically. Internally, it orchestrates the Connector, Fetcher, and Pusher classes.

Attributes:

Name Type Description
connector BQConnector

The underlying connector instance.

fetcher Optional[FetchWorker]

The fetcher instance, available after the context is entered.

pusher Optional[PushWorker]

The pusher instance, available after the context is entered.

Example
import pandas as pd

from easy_bigquery import BQManager

# Using the Manager as a context manager handles all
# connection and disconnection logic automatically.
with BQManager() as bq:
    # 1. Fetch data from a public dataset.
    sql = f'SELECT * FROM {bq.connector.project_id}.{bq.connector.dataset}.{bq.connector.table} LIMIT 15'
    df_fetched = bq.fetch(sql)
    print(df_fetched.head())

    # 2. Push a new DataFrame to your dataset.
    data = {'user_id': [100], 'status': ['active']}
    df_to_push = pd.DataFrame(data)
    table_name = 'test_table'
    bq.push(
        df=df_to_push,
        project_id=bq.connector.project_id,
        dataset=bq.connector.dataset,
        table=table_name,
        write_disposition='WRITE_APPEND',
    )
Source code in easy_bigquery/context/manager.py
class BQManager:
    """
    A high-level context manager for BigQuery operations.

    This is the primary entry point for most users. It simplifies all
    interactions with BigQuery by providing an easy-to-use context
    manager (`with` statement) that handles the connection lifecycle
    automatically. Internally, it orchestrates the Connector, Fetcher,
    and Pusher classes.

    Attributes:
        connector (BQConnector): The underlying connector instance.
        fetcher (Optional[FetchWorker]): The fetcher instance,
            available after the context is entered.
        pusher (Optional[PushWorker]): The pusher instance,
            available after the context is entered.

    Example:
        ```python
        import pandas as pd

        from easy_bigquery import BQManager

        # Using the Manager as a context manager handles all
        # connection and disconnection logic automatically.
        with BQManager() as bq:
            # 1. Fetch data from a public dataset.
            sql = f'SELECT * FROM {bq.connector.project_id}.{bq.connector.dataset}.{bq.connector.table} LIMIT 15'
            df_fetched = bq.fetch(sql)
            print(df_fetched.head())

            # 2. Push a new DataFrame to your dataset.
            data = {'user_id': [100], 'status': ['active']}
            df_to_push = pd.DataFrame(data)
            table_name = 'test_table'
            bq.push(
                df=df_to_push,
                project_id=bq.connector.project_id,
                dataset=bq.connector.dataset,
                table=table_name,
                write_disposition='WRITE_APPEND',
            )
        ```
    """

    def __init__(self, **kwargs: Any):
        """
        Initializes the manager by creating a connector.

        Args:
            **kwargs: Keyword arguments to be passed down to the
                `BQConnector` constructor (e.g., `project_id`).
        """
        self.connector = BQConnector(**kwargs)
        self.fetcher: Optional[FetchWorker] = None
        self.pusher: Optional[PushWorker] = None

    def __enter__(self) -> 'BQManager':
        """Establishes connection and initializes service classes."""
        self.connector.connect()
        self.fetcher = FetchWorker(self.connector)
        self.pusher = PushWorker(self.connector)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Closes the connection."""
        self.connector.close()

    def fetch(self, query: str, **kwargs: Any) -> pd.DataFrame:
        """
        High-level method to fetch data. Delegates to FetchWorker.

        Args:
            query: The SQL query to execute.
            **kwargs: Additional arguments for the fetcher.

        Returns:
            A pandas DataFrame with the query results.
        """
        if not self.fetcher:
            raise ConnectionError('Manager context is not active.')
        return self.fetcher.fetch(query, **kwargs)

    def push(
        self,
        df: pd.DataFrame,
        project_id: Optional[str] = None,
        dataset: Optional[str] = None,
        table: Optional[str] = None,
        schema: Optional[List[bq.SchemaField]] = None,
        write_disposition: Literal[
            'WRITE_TRUNCATE',
            'WRITE_APPEND',
            'WRITE_EMPTY',
            'WRITE_DISPOSITION_UNSPECIFIED',
            'WRITE_TRUNCATE_DATA',
        ] = 'WRITE_APPEND',
    ) -> None:
        """
        High-level method to push data. Delegates to PushWorker.

        Args:
            df: The pandas DataFrame to upload.
            project_id: Optional GCP project ID. Default value comes from
                environment variables.
            dataset: Optional GCP dataset name. Default value comes from
                environment variables.
            table: Optional GCP table name. Default value comes from
                environment variables.
            schema: Optional list of `SchemaField` objects.
            write_disposition: Write mode ('WRITE_TRUNCATE', 'WRITE_APPEND',
                'WRITE_EMPTY', 'WRITE_DISPOSITION_UNSPECIFIED',
                'WRITE_TRUNCATE_DATA'. Defaults to 'WRITE_APPEND').
        """
        if not self.pusher:
            raise ConnectionError('Manager context is not active.')
        self.pusher.push(
            df,
            project_id,
            dataset,
            table,
            schema,
            write_disposition,
        )

__enter__()

Establishes connection and initializes service classes.

Source code in easy_bigquery/context/manager.py
def __enter__(self) -> 'BQManager':
    """Establishes connection and initializes service classes."""
    self.connector.connect()
    self.fetcher = FetchWorker(self.connector)
    self.pusher = PushWorker(self.connector)
    return self

__exit__(exc_type, exc_val, exc_tb)

Closes the connection.

Source code in easy_bigquery/context/manager.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """Closes the connection."""
    self.connector.close()

__init__(**kwargs)

Initializes the manager by creating a connector.

Parameters:

Name Type Description Default
**kwargs Any

Keyword arguments to be passed down to the BQConnector constructor (e.g., project_id).

{}
Source code in easy_bigquery/context/manager.py
def __init__(self, **kwargs: Any):
    """
    Initializes the manager by creating a connector.

    Args:
        **kwargs: Keyword arguments to be passed down to the
            `BQConnector` constructor (e.g., `project_id`).
    """
    self.connector = BQConnector(**kwargs)
    self.fetcher: Optional[FetchWorker] = None
    self.pusher: Optional[PushWorker] = None

fetch(query, **kwargs)

High-level method to fetch data. Delegates to FetchWorker.

Parameters:

Name Type Description Default
query str

The SQL query to execute.

required
**kwargs Any

Additional arguments for the fetcher.

{}

Returns:

Type Description
DataFrame

A pandas DataFrame with the query results.

Source code in easy_bigquery/context/manager.py
def fetch(self, query: str, **kwargs: Any) -> pd.DataFrame:
    """
    High-level method to fetch data. Delegates to FetchWorker.

    Args:
        query: The SQL query to execute.
        **kwargs: Additional arguments for the fetcher.

    Returns:
        A pandas DataFrame with the query results.
    """
    if not self.fetcher:
        raise ConnectionError('Manager context is not active.')
    return self.fetcher.fetch(query, **kwargs)

push(df, project_id=None, dataset=None, table=None, schema=None, write_disposition='WRITE_APPEND')

High-level method to push data. Delegates to PushWorker.

Parameters:

Name Type Description Default
df DataFrame

The pandas DataFrame to upload.

required
project_id Optional[str]

Optional GCP project ID. Default value comes from environment variables.

None
dataset Optional[str]

Optional GCP dataset name. Default value comes from environment variables.

None
table Optional[str]

Optional GCP table name. Default value comes from environment variables.

None
schema Optional[List[SchemaField]]

Optional list of SchemaField objects.

None
write_disposition Literal['WRITE_TRUNCATE', 'WRITE_APPEND', 'WRITE_EMPTY', 'WRITE_DISPOSITION_UNSPECIFIED', 'WRITE_TRUNCATE_DATA']

Write mode ('WRITE_TRUNCATE', 'WRITE_APPEND', 'WRITE_EMPTY', 'WRITE_DISPOSITION_UNSPECIFIED', 'WRITE_TRUNCATE_DATA'. Defaults to 'WRITE_APPEND').

'WRITE_APPEND'
Source code in easy_bigquery/context/manager.py
def push(
    self,
    df: pd.DataFrame,
    project_id: Optional[str] = None,
    dataset: Optional[str] = None,
    table: Optional[str] = None,
    schema: Optional[List[bq.SchemaField]] = None,
    write_disposition: Literal[
        'WRITE_TRUNCATE',
        'WRITE_APPEND',
        'WRITE_EMPTY',
        'WRITE_DISPOSITION_UNSPECIFIED',
        'WRITE_TRUNCATE_DATA',
    ] = 'WRITE_APPEND',
) -> None:
    """
    High-level method to push data. Delegates to PushWorker.

    Args:
        df: The pandas DataFrame to upload.
        project_id: Optional GCP project ID. Default value comes from
            environment variables.
        dataset: Optional GCP dataset name. Default value comes from
            environment variables.
        table: Optional GCP table name. Default value comes from
            environment variables.
        schema: Optional list of `SchemaField` objects.
        write_disposition: Write mode ('WRITE_TRUNCATE', 'WRITE_APPEND',
            'WRITE_EMPTY', 'WRITE_DISPOSITION_UNSPECIFIED',
            'WRITE_TRUNCATE_DATA'. Defaults to 'WRITE_APPEND').
    """
    if not self.pusher:
        raise ConnectionError('Manager context is not active.')
    self.pusher.push(
        df,
        project_id,
        dataset,
        table,
        schema,
        write_disposition,
    )