Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I do not understand the partition error: ValueError: Could not find in old schema: 2: {field}: identity(2) #1100

Open
cfrancois7 opened this issue Aug 26, 2024 · 10 comments

Comments

@cfrancois7
Copy link

cfrancois7 commented Aug 26, 2024

Question

By trying partitionning my table I've got one error:
ValueError: Could not find in old schema: 2: {field}: identity(2)
I've drowned myself in the documentation, stackoverflow and medium to find one answer.
I even tried chatGPT but without sucess :D

I've used local SQLite and MinIO server to develop a "proof-of-concept".
Next, the code to reproduce the issue:

from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.partitioning import DayTransform, PartitionSpec, PartitionField
import pyarrow as pa

warehouse_path = "local_s3"
catalog = SqlCatalog(
    "default",
    **{
        "uri": f"sqlite:///{warehouse_path}/catalog.db",
        "warehouse": "http://localhost:9001",
        "s3.endpoint": "http://localhost:9001",
        "s3.access-key-id": "minio_user",
        "s3.secret-access-key": "minio1234",
    },
)
catalog.create_namespace_if_not_exists('my_namespace')



ts_schema = pa.schema([
    pa.field('timestamp', pa.timestamp('s'), nullable=False),  # Assuming timestamp with seconds precision
    pa.field('campaign_id', pa.uint8(), nullable=False),
    pa.field('temperature', pa.float32()),
    pa.field('pressure', pa.float32()),
    pa.field('humidity', pa.int32()),
    pa.field('led_0', pa.bool_())
])

# Define partitioning spec for campaign_ID
ts_partition_spec = PartitionSpec(
    PartitionField(
        field_id=2,
        source_id=2,
        transform=IdentityTransform(), 
        name="campaign_id"
    )
)
time_series_table = catalog.create_table(
    'my_namespace.time_series',
    schema=ts_schema,
    partition_spec=ts_partition_spec  #  <= raises error !!
)

My purpose is to partition the table by campaign_id.
Is it possible? If yes, how?
How to interpret the documentation from api documentation ?

I tried with timestamp field and the DayTransform such as:

ts_partition_spec = PartitionSpec(
    PartitionField(
        source_id=1, field_id=100, transform=DayTransform(), name="timestamp_day"
    )
)

It raised the same error.
ValueError: Could not find in old schema: 100: timestamp_day: Day(1)

@cfrancois7 cfrancois7 changed the title I do not understand the partition error: ValueError: Could not find in old schema: 1: timestamp: day(1) I do not understand the partition error: ValueError: Could not find in old schema: 1: {field}: identity(1) Aug 26, 2024
@cfrancois7 cfrancois7 changed the title I do not understand the partition error: ValueError: Could not find in old schema: 1: {field}: identity(1) I do not understand the partition error: ValueError: Could not find in old schema: 2: {field}: identity(2) Aug 26, 2024
@ndrluis
Copy link
Collaborator

ndrluis commented Aug 26, 2024

Hello @cfrancois7, thank you for your report.

I can't test it right now, but I believe the issue might be because you're using the Arrow schema to create the table, and we don't have the field_ids. Have you tried using the PyIceberg Schema/NestedField definition instead?

@cfrancois7
Copy link
Author

cfrancois7 commented Aug 27, 2024

@ndrluis
You are right! I tested with PyIceberg schema and it works.

I remmembered why I used the Arrow schema. It is because of the typing and requirement alignment between the data I want to append and the expected schema.
By using the PyIceberg schema, I need to declare also the PyArrow schema.
I did not find parser that can do the job in one line.
I need to build it or to rewrite the schema two times (PyIceberg + PyArrow).
It is harder to maintain.

For instance, the following code raised one error

from pyiceberg.partitioning import DayTransform, PartitionSpec, PartitionField
import pyarrow as pa

ts_schema = Schema(
    NestedField(field_id=1, name="timestamp", field_type=TimestampType(), required=True),
    NestedField(field_id=2, name="campaign_id", field_type=IntegerType(), required=True),
    NestedField(field_id=3, name="temperature", field_type=FloatType(), required=False),
    NestedField(field_id=4, name="pressure", field_type=FloatType(), required=False),
    NestedField(field_id=5, name="humidity", field_type=IntegerType(), required=False),
    NestedField(field_id=6, name="led_0", field_type=BooleanType(), required=False)
)

# Define partitioning spec for campaign_ID
ts_partition_spec = PartitionSpec(
    PartitionField(
        field_id=2,
        source_id=2,
        transform=IdentityTransform(), 
        name="campaign_id"
    )
)

ts_table = catalog.create_table_if_not_exists(
    'default.time_series',
    schema=ts_schema,
    partition_spec=ts_partition_spec,
    location = "local_s3"
)

ts_dict = {
    'timestamp': [
        datetime(2023, 1, 1, 12, 0),
        datetime(2023, 1, 1, 13, 0),
        datetime(2023, 1, 1, 15, 0),
        datetime(2023, 1, 1, 16, 0),
        datetime(2023, 1, 1, 12, 0),
        datetime(2023, 1, 1, 12, 0)
    ],
    'campaign_id': [1, 1, 1, 1, 2, 2],
    'temperature': [21.0, 21.5, 21.8, 21.0, 22.0, 24.5],
    'pressure': [1012.0, 1015.0, 1030.0, 1016.0, 1508.0, 1498.0],
    'humidity': [2, 5, 5, 5, 5, 5],
    'led_0': [0, 0, 0, 1, 0, 1]
}


ts_df = pa.Table.from_pydict(ts_list)
ts_table.append(ts_df) # <= RAISES ONE ERROR, I need to pass the PyArrow Schema in `from_dict` to make it works
┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Table field                      ┃ Dataframe field                  ┃
╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ ❌ │ 1: timestamp: required timestamp │ 1: timestamp: optional timestamp │
│ ❌ │ 2: campaign_id: required int     │ 2: campaign_id: optional long    │
│ ❌ │ 3: temperature: optional float   │ 3: temperature: optional double  │
│ ❌ │ 4: pressure: optional float      │ 4: pressure: optional double     │
│ ❌ │ 5: humidity: optional int        │ 5: humidity: optional long       │
│ ❌ │ 6: led_0: optional boolean       │ 6: led_0: optional long

@ndrluis
Copy link
Collaborator

ndrluis commented Aug 27, 2024

@cfrancois7 You can take the schema and call as_arrow()

ts_df = pa.Table.from_pydict(ts_list, schema=ts_schema.as_arrow())

@ndrluis
Copy link
Collaborator

ndrluis commented Aug 27, 2024

Anyway, I think we need to do something to avoid this problem in the future. Since we require the field ID, we should only accept the PyIceberg Schema. What do you think? @Fokko @sungwy @kevinjqliu @HonahX (Maybe it would be nice to create a committers team to tag all of you 🤔).

@sungwy
Copy link
Collaborator

sungwy commented Aug 27, 2024

Hi @cfrancois7 - thank you very much for raising this issue! And thank you @ndrluis for jumping on to dig into the root cause as well.

We've made some enhancements to PyIceberg to be able to support defining PartitionSpec on table creation (this wasn't even possible before), but there's still two problems here that you helped outline:

  1. The supported input arguments in create_table API still gives the impression that it supports what you were trying to do
  2. Our documentation isn't up to date with the best practices

The root cause of the problem is that the IDs of the Iceberg Table schema are reassigned when a table is created. So the constraint the API has on trying to match the PartitionSpec by ID doesn't really work on table creation.

Instead, the newly introduced practice is to do the following:

with catalog.create_table_transaction(
    identifier='my_namespace.time_series',
    schema=ts_schema,
) as txn:
    with txn.update_spec() as update_spec:
        update_spec.add_identity("campaign_id")

table = catalog.load_table('my_namespace.time_series')

This approach relies on just matching the partition field by its field name, similar to how Spark and Flink APIs handle partition updates.

Please let me know if this works for you! I think it'll also be worthwhile for us to leave this issue open until we can clarify our API and our documentation to prevent other users from running into the same issues.

@Fokko
Copy link
Contributor

Fokko commented Aug 27, 2024

Agree with @sungwy that this is mostly a documentation issue, so let's extend the docs so ChatGPT can give better answers.

Another solution would be:

ts_table = catalog.create_table_if_not_exists(
    'default.time_series',
    schema=ts_schema,
    location = "local_s3"
)
with ts_table.update_spec() as update_spec:
    update_spec.add_identity("campaign_id")

This will first create the table, and then set the spec, but that's probably alright.

Since we require the field ID, we should only accept the PyIceberg Schema.

I don't think this is the most user-friendly option. In the end, we don't want to put the burden of field-IDs on the users. Keep in mind that they also get re-assigned:

ts_schema = Schema(
    NestedField(field_id=1925, name="timestamp", field_type=TimestampType(), required=True),
)

ts_table = catalog.create_table('default.time_series', schema=ts_schema)

assert ts_table.schema.fields[0] == 1  # Field-ID starts now from 1 as they are being re-assigned.

Another thing I noticed:

┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Table field                      ┃ Dataframe field                  ┃
╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ ❌ │ 1: timestamp: required timestamp │ 1: timestamp: optional timestamp │
│ ❌ │ 2: campaign_id: required int     │ 2: campaign_id: optional long    │
│ ❌ │ 3: temperature: optional float   │ 3: temperature: optional double  │
│ ❌ │ 4: pressure: optional float      │ 4: pressure: optional double     │
│ ❌ │ 5: humidity: optional int        │ 5: humidity: optional long       │
│ ❌ │ 6: led_0: optional boolean       │ 6: led_0: optional long

Arrow by default sets everything to nullable, while there are no nulls in the data. We could check if the nullable is set correctly by checking if there are any null-records. This could become expensive when the table is big, so we probably only want to do it when we actually want to write an optional field to a required field in the table.

@mike-luabase
Copy link

mike-luabase commented Oct 4, 2024

in case it's not clear from @Fokko 's example, here's how you add a (non-identity) partition. This is the only way I've found to define a table with the Arrow schema and include a partition.

date_column = "some_date_col"

iceberg_table = catalog.create_table(
    identifier=f"default.table_name",
    schema=schema, # arrow schema
)
with iceberg_table.update_spec() as update_spec:
    update_spec.add_field(
        source_column_name=date_column,
        transform=DayTransform(),
        partition_field_name=f"{date_column}_day"
    )

@christophediprima
Copy link

Hi, guys! This thread is very interesting! Has the documentation been updated yet?

@kevinjqliu
Copy link
Contributor

@christophediprima
Copy link

This is not ideal as it always add a partition evolution to the tables... I would like to use StarRocks materialized views that does not support those.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants