Skip to content

Commit

Permalink
feat(substrait) Update to substrait v0.64.0
Browse files Browse the repository at this point in the history
* Update proto definition to v0.64.0
* Fixed FetchRel to use OffsetMode and CountMode
  • Loading branch information
srikrishnak committed Jan 15, 2025
1 parent be0b09b commit 913ff70
Show file tree
Hide file tree
Showing 15 changed files with 5,230 additions and 4,123 deletions.
4 changes: 2 additions & 2 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Package substraitgo contains the experimental go bindings for substrait
// (https://substrait.io).
//
// Current generated proto substrait version: v0.55.0
// Current generated proto substrait version: v0.64.0
package substraitgo

//go:generate buf generate https://github.com/substrait-io/substrait.git#tag=v0.59.0
//go:generate buf generate https://github.com/substrait-io/substrait.git#tag=v0.64.0
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/substrait-io/substrait-go v1.2.0 // indirect
golang.org/x/crypto v0.30.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/substrait-io/substrait v0.63.1 h1:XNPvrEYNPjDqenK4TxqBDDUNzglafdjzjejzQqEwk5Y=
github.com/substrait-io/substrait v0.63.1/go.mod h1:MPFNw6sToJgpD5Z2rj0rQrdP/Oq8HG7Z2t3CAEHtkHw=
github.com/substrait-io/substrait-go v1.2.0 h1:3ZNRkc8FYD7ifCagKEOZQtUcgMceMQfwo2N1NGaK4Q4=
github.com/substrait-io/substrait-go v1.2.0/go.mod h1:IPsy24rdjp/buXR+T8ENl6QCnSCS6h+uM8P+GaZez7c=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY=
golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
Expand Down
14 changes: 13 additions & 1 deletion plan/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package plan

import (
"fmt"

substraitgo "github.com/substrait-io/substrait-go/v3"
"github.com/substrait-io/substrait-go/v3/expr"
"github.com/substrait-io/substrait-go/v3/extensions"
Expand Down Expand Up @@ -124,6 +123,7 @@ type Builder interface {
// Deprecated: Use VirtualTableFromExpr(...).Remap() instead.
VirtualTableFromExprRemap(fieldNames []string, remap []int32, values ...expr.VirtualTableExpressionValue) (*VirtualTableReadRel, error)
VirtualTableFromExpr(fieldNames []string, values ...expr.VirtualTableExpressionValue) (*VirtualTableReadRel, error)
IcebergTableFromMetadataFile(metadataURI string, schema types.NamedStruct) (*IcebergTableReadRel, error)
// Deprecated: Use Sort(...).Remap() instead.
SortRemap(input Rel, remap []int32, sorts ...expr.SortField) (*SortRel, error)
Sort(input Rel, sorts ...expr.SortField) (*SortRel, error)
Expand Down Expand Up @@ -587,6 +587,18 @@ func (b *builder) VirtualTable(fields []string, values ...expr.StructLiteralValu
return b.VirtualTableRemap(fields, nil, values...)
}

func (b *builder) IcebergTableFromMetadataFile(metadataURI string, schema types.NamedStruct) (*IcebergTableReadRel, error) {
tableType := &Direct{}
tableType.MetadataUri = metadataURI

return &IcebergTableReadRel{
baseReadRel: baseReadRel{
baseSchema: schema,
},
tableType: tableType,
}, nil

Check warning on line 599 in plan/builders.go

View check run for this annotation

Codecov / codecov/patch

plan/builders.go#L590-L599

Added lines #L590 - L599 were not covered by tests
}

func (b *builder) SortRemap(input Rel, remap []int32, sorts ...expr.SortField) (*SortRel, error) {
if input == nil {
return nil, errNilInputRel
Expand Down
18 changes: 16 additions & 2 deletions plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,24 @@ func RelFromProto(rel *proto.Rel, reg expr.ExtensionRegistry) (Rel, error) {
return nil, fmt.Errorf("error getting input to FetchRel: %w", err)
}

var offset int64
if off, ok := rel.Fetch.OffsetMode.(*proto.FetchRel_Offset); ok {
offset = off.Offset
} else {
return nil, fmt.Errorf("%w: missing required Offset field for Fetch Relation", substraitgo.ErrInvalidRel)
}

Check warning on line 419 in plan/plan.go

View check run for this annotation

Codecov / codecov/patch

plan/plan.go#L418-L419

Added lines #L418 - L419 were not covered by tests

var count int64
if cnt, ok := rel.Fetch.CountMode.(*proto.FetchRel_Count); ok {
count = cnt.Count
} else {
return nil, fmt.Errorf("%w: missing required Count field for Fetch Relation", substraitgo.ErrInvalidRel)
}

Check warning on line 426 in plan/plan.go

View check run for this annotation

Codecov / codecov/patch

plan/plan.go#L425-L426

Added lines #L425 - L426 were not covered by tests

out := &FetchRel{
input: input,
offset: rel.Fetch.Offset,
count: rel.Fetch.Count,
offset: offset,
count: count,
advExtension: rel.Fetch.AdvancedExtension,
}
if rel.Fetch.Common != nil {
Expand Down
113 changes: 109 additions & 4 deletions plan/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,107 @@ func (e *ExtensionTableReadRel) Remap(mapping ...int32) (Rel, error) {
return RemapHelper(e, mapping)
}

type (
SnapshotId proto.ReadRel_IcebergTable_MetadataFileRead_SnapshotId
SnapshotTimestamp proto.ReadRel_IcebergTable_MetadataFileRead_SnapshotTimestamp

IcebergSnapshot interface {
isSnapshot()
}
)

func (*SnapshotId) isSnapshot() {}
func (*SnapshotTimestamp) isSnapshot() {}

Check warning on line 384 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L383-L384

Added lines #L383 - L384 were not covered by tests

type (
Direct proto.ReadRel_IcebergTable_MetadataFileRead

IcebergTableType interface {
isTableType()
}
)

func (*Direct) isTableType() {}

Check warning on line 394 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L394

Added line #L394 was not covered by tests

// IcebergTableReadRel is a scan on iceberg table.
type IcebergTableReadRel struct {
baseReadRel

tableType IcebergTableType
advExtension *extensions.AdvancedExtension
}

func (n *IcebergTableReadRel) NamedTableAdvancedExtension() *extensions.AdvancedExtension {
return n.advExtension

Check warning on line 405 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L404-L405

Added lines #L404 - L405 were not covered by tests
}

func (n *IcebergTableReadRel) RecordType() types.RecordType {
return n.remap(n.directOutputSchema())

Check warning on line 409 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L408-L409

Added lines #L408 - L409 were not covered by tests
}

func (n *IcebergTableReadRel) ToProtoPlanRel() *proto.PlanRel {
return &proto.PlanRel{
RelType: &proto.PlanRel_Rel{
Rel: n.ToProto(),
},
}

Check warning on line 417 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L412-L417

Added lines #L412 - L417 were not covered by tests
}

func (n *IcebergTableReadRel) ToProto() *proto.Rel {
readRel := n.toReadRelProto()

if n.tableType != nil {
if directTableType, ok := n.tableType.(*Direct); ok {
readRel.ReadType = &proto.ReadRel_IcebergTable_{
IcebergTable: &proto.ReadRel_IcebergTable{
TableType: &proto.ReadRel_IcebergTable_Direct{
Direct: (*proto.ReadRel_IcebergTable_MetadataFileRead)(directTableType),
},
},
}
}

Check warning on line 432 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L420-L432

Added lines #L420 - L432 were not covered by tests
}

readRel.ReadType = &proto.ReadRel_IcebergTable_{
IcebergTable: &proto.ReadRel_IcebergTable{
TableType: &proto.ReadRel_IcebergTable_Direct{
Direct: &proto.ReadRel_IcebergTable_MetadataFileRead{
MetadataUri: "metadata_uri",
Snapshot: &proto.ReadRel_IcebergTable_MetadataFileRead_SnapshotId{
SnapshotId: "snapshot_id",
},
},
},
},
}
return &proto.Rel{
RelType: &proto.Rel_Read{
Read: readRel,
},
}

Check warning on line 451 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L435-L451

Added lines #L435 - L451 were not covered by tests
}

func (n *IcebergTableReadRel) Copy(_ ...Rel) (Rel, error) {
return n, nil

Check warning on line 455 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L454-L455

Added lines #L454 - L455 were not covered by tests
}

func (n *IcebergTableReadRel) CopyWithExpressionRewrite(rewriteFunc RewriteFunc, _ ...Rel) (Rel, error) {
newExprs, err := n.copyExpressions(rewriteFunc)
if err != nil {
return nil, err
}
if slices.Equal(newExprs, n.getExpressions()) {
return n, nil
}
nt := *n
nt.updateFilters(newExprs)
return &nt, nil

Check warning on line 468 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L458-L468

Added lines #L458 - L468 were not covered by tests
}

func (n *IcebergTableReadRel) Remap(mapping ...int32) (Rel, error) {
return RemapHelper(n, mapping)

Check warning on line 472 in plan/relations.go

View check run for this annotation

Codecov / codecov/patch

plan/relations.go#L471-L472

Added lines #L471 - L472 were not covered by tests
}

// PathType is the type of a LocalFileReadRel's uris.
type PathType int8

Expand Down Expand Up @@ -897,10 +998,14 @@ func (f *FetchRel) ToProto() *proto.Rel {
return &proto.Rel{
RelType: &proto.Rel_Fetch{
Fetch: &proto.FetchRel{
Common: f.toProto(),
Input: f.input.ToProto(),
Offset: f.offset,
Count: f.count,
Common: f.toProto(),
Input: f.input.ToProto(),
OffsetMode: &proto.FetchRel_Offset{
Offset: f.offset,
},
CountMode: &proto.FetchRel_Count{
Count: f.count,
},
AdvancedExtension: f.advExtension,
},
},
Expand Down
Loading

0 comments on commit 913ff70

Please sign in to comment.