From 4b83d048147abf26fd89a438d23876bd7ae9e0c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cantoineeripret=E2=80=9D?= Date: Mon, 22 Sep 2025 13:35:54 +0200 Subject: [PATCH] add partitioning and clustering to the to_gbq function --- pandas_gbq/gbq.py | 88 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 3 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 8db1d4ea..581846e0 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -6,8 +6,10 @@ from datetime import datetime import logging import re +import typing import warnings +import pandas from pandas_gbq.contexts import Context # noqa - backward compatible export from pandas_gbq.contexts import context from pandas_gbq.exceptions import ( # noqa - backward compatible export @@ -380,6 +382,14 @@ def to_gbq( progress_bar=True, credentials=None, api_method: str = "default", + clustering_columns: typing.Union[ + pandas.core.indexes.base.Index, typing.Iterable[typing.Hashable] + ] = (), + time_partitioning_column: typing.Optional[str] = None, + time_partitioning_type: typing.Optional[str] = "DAY", + time_partitioning_expiration_ms: typing.Optional[int] = None, + range_partitioning_column: typing.Optional[str] = None, + range_partitioning_range: typing.Optional[dict] = None, verbose=None, private_key=None, auth_redirect_uri=None, @@ -406,6 +416,20 @@ def to_gbq( destination_table : str Name of table to be written, in the form ``dataset.tablename`` or ``project.dataset.tablename``. + clustering_columns : typing.Union[ + pandas.core.indexes.base.Index, typing.Iterable[typing.Hashable] + ] = (), + Specifies the columns for clustering in the BigQuery table. + time_partitioning_column : str, optional + Specifies the column for time-based partitioning in the BigQuery table. + time_partitioning_type : str, default 'DAY' + Specifies the type of time-based partitioning. + time_partitioning_expiration_ms : int, optional + Specifies the milliseconds for time-based partitioning expiration. + range_partitioning_column : str, optional + Specifies the column for range-based partitioning in the BigQuery table. + range_partitioning_range : dict, optional + Specifies the range for range-based partitioning. project_id : str, optional Google Cloud Platform project ID. Optional when available from the environment. @@ -610,7 +634,16 @@ def to_gbq( location=location, credentials=connector.credentials, ) - table_connector.create(table_id, table_schema) + table_connector.create( + table_id, + table_schema, + clustering_columns=clustering_columns, + time_partitioning_column=time_partitioning_column, + time_partitioning_type=time_partitioning_type, + time_partitioning_expiration_ms=time_partitioning_expiration_ms, + range_partitioning_column=range_partitioning_column, + range_partitioning_range=range_partitioning_range, + ) else: if if_exists == "append": # Convert original schema (the schema that already exists) to pandas-gbq API format @@ -731,7 +764,17 @@ def exists(self, table_id): except self.http_error as ex: self.process_http_error(ex) - def create(self, table_id, schema): + def create( + self, + table_id, + schema, + clustering_columns=None, + time_partitioning_column=None, + time_partitioning_type="DAY", + time_partitioning_expiration_ms=None, + range_partitioning_column=None, + range_partitioning_range=None, + ): """Create a table in Google BigQuery given a table and schema Parameters @@ -741,8 +784,27 @@ def create(self, table_id, schema): schema : str Use the generate_bq_schema to generate your table schema from a dataframe. + clustering_columns : list, optional + List of columns to cluster the table on. + time_partitioning_column : str, optional + Column to partition the table on. + time_partitioning_type : str, default 'DAY' + Type of time partitioning. + time_partitioning_expiration_ms : int, optional + Expiration time for the partitioning. + range_partitioning_column : str, optional + Column to partition the table on. + range_partitioning_range : dict, optional + Range for the partitioning. """ - from google.cloud.bigquery import DatasetReference, Table, TableReference + from google.cloud.bigquery import ( + DatasetReference, + Table, + TableReference, + TimePartitioning, + RangePartitioning, + PartitionRange, + ) if self.exists(table_id): raise TableCreationError("Table {0} already exists".format(table_id)) @@ -762,6 +824,26 @@ def create(self, table_id, schema): table = Table(table_ref) table.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + if clustering_columns: + table.clustering_fields = list(clustering_columns) + + if time_partitioning_column: + table.time_partitioning = TimePartitioning( + type_=time_partitioning_type, + field=time_partitioning_column, + expiration_ms=time_partitioning_expiration_ms, + ) + + if range_partitioning_column and range_partitioning_range: + table.range_partitioning = RangePartitioning( + field=range_partitioning_column, + range_=PartitionRange( + start=range_partitioning_range["start"], + end=range_partitioning_range["end"], + interval=range_partitioning_range["interval"], + ), + ) + try: self.client.create_table(table) except self.http_error as ex: