Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proto): implement AsOfJoinRel #331

Closed
wants to merge 4 commits into from
Closed

Conversation

rtpsw
Copy link

@rtpsw rtpsw commented Sep 13, 2022

See #330

Copy link
Contributor

@jvanstraten jvanstraten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this really requires updates to the website (built from the site directory) in order to describe what the behavior of multiple rhs relations in a normal join does in all cases, and what an as-of join is. I personally don't know what they are, but I also shouldn't need to know a priori because the documentation should tell me what they are.

The protos are also broken to the point of not compiling, but let's settle on the intended semantics by way of writing documentation first.

@rtpsw
Copy link
Author

rtpsw commented Sep 14, 2022

Got it. Please bear with me as I'm learning this repo and may take a couple of iterations.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts.

Comment on lines 233 to 263
### Join Properties

| Property | Description | Required |
| --------------- | ------------------------------------------------------------ | ---------------------------------- |
| Left Input | A relational input. | Required |
| Right Inputs | Each a relational input. | Required, at least one |
| Join Expression | A boolean condition that describes whether each record from the left set "match" the record from the right set. Field references correspond to the direct output order of the data. | Required. Can be the literal True. |
| Join Type | Same as the [Join](logical_relations.md#join-operator) operator. | Required |
| Tolerance | The maximum on-field value difference in an inexact match. | Required |
| On | The on-field. | Required |

### Join Types

Same as in the join operation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these are all properties of the AsOf join (and an AsOf join, if I am reading the protobuf correctly, doesn't have a join type). So I think all you need is something like...

| Property | ...
| Join | ...
| Tolerance | ...
| On | ...

Copy link
Author

@rtpsw rtpsw Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I define here a generalized as-of-join operation, which is a generalization of both the join operation and the current AsofJoinNode in Arrow (more details below). My understanding here is that it is OK (and desirable, I think) to define a more general logical operation, even if it is currently only partially implemented in physical operations, provided the general operation is well-designed.

At a high level, the table result of the generalized as-of-join operation, with given join-expression and join-type, is a (not necessarily strict) sub-table of the join-operation with the same join-expression and join-type. The site-doc for as-of-join says that a row in the join-result will also appear in the as-of-join-result if the inexact matching of the on-key evaluates to true for this row (false leads to null values in the result). This is why any join-type also works in the generalized as-of-join operation. Note that the AsOfJoinNode in Arrow is equivalent to the generalized as-of-join operation with a left-join as the join-type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine for it to be more general in principle, and for implementations to support the various join types with as of join, but let's keep the implementation decoupled from JoinRel.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean just duplicate the JoinRel fields into AsOfJoinRel?

@@ -178,6 +178,18 @@ message JoinRel {
substrait.extensions.AdvancedExtension advanced_extension = 10;
}

// A time-series variant of the multi JOIN relational operator left-join-right(s), which joins on an ordered key using inexact matching
message AsOfJoinRel {
JoinRel join = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does an AsOfJoinRel use all the properties of join? In particular, does it use the expression or type? If not, maybe composition is not the correct approach here and it would be better to just copy the relevant fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Weston here, let's repeat the fields instead of sticking a join inside of another join.

As of joins are significantly different from a generic join that they should not be subject to any changes that happen in JoinRel.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and yes. See my earlier response.

@@ -219,6 +219,40 @@ The join operation will combine two separate inputs into a single output, based
```


## AsOfJoin Operation

The as-of-join operation is a time-series operation that will combine a left input and multiple right inputs into a single output, based on a join expression, an on-field and a tolerance value. All inputs must have the on-field in ascending-order. The operation is similar to a join-operation where the join expression is used for exact matching whereas the on-field is used for inexact matching up to the tolerance value.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the statement "All inputs must have the on-field in ascending-order" is too prescriptive. In theory, an as-of join could be created even if the on-field wasn't in ascending order by doing something similar to a hash-join and building up a table in memory. In this case, if the ordering is not present (or not compatible with the on-key) then a consumer that only supports the ordered version can simply reject the plan.

That being said...if no real consumer supports an unordered variant...then maybe this would just be pedantic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think of this the more I think maybe a non-ordered as-of join is just too weird. For example, distribution would not necessarily be maintained in this case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, you're not asking here for a change, but let me know if I misunderstood.

Comment on lines 185 to 209
oneof on_type {
Expression.FieldReference on = 3;
// Reserve next tags for future on_type alternatives
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be an example of a different on_type?

Copy link
Author

@rtpsw rtpsw Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A natural example is an algebraic expression involving one or more fields and returning a value in an ordered domain (with a distance measure). Another, perhaps contrived, example is multiple on-fields interpreted as a radix.

@@ -363,6 +375,7 @@ message Rel {
ExtensionMultiRel extension_multi = 10;
ExtensionLeafRel extension_leaf = 11;
CrossRel cross = 12;
AsOfJoinRel asofjoin = 13;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
AsOfJoinRel asofjoin = 13;
AsOfJoinRel asof_join = 13;

// A time-series variant of the multi JOIN relational operator left-join-right(s), which joins on an ordered key using inexact matching
message AsOfJoinRel {
JoinRel join = 1;
int64 tolerance = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on why you chose int64 here as opposed to an expression or even a literal?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this should be more general. The exact definition needed here is a value from an ordered domain with a distance measure. How would that be captured in the proto?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the recent commit I used Literal and deferred the domain checking to the consumer.

@jvanstraten
Copy link
Contributor

Before I dive into the proposed Substrait representation, let me just write out what I now think a join relation behaviorally does or should do. It took about two hours of discussion with @cpcloud to get here, so it feels wrong not to share, and I'll be referring back to it as well.

  • Compute the Cartesian product of all inputs.
  • Filter out records based on the join expression.
  • For as-of joins only:
    • Filter out records based on the as-of inequality expression.
    • For each left-hand side record:
      • (Partially) order the right-hand side matches by the value that the inequality expression operated on, in order to find the smallest or largest value. Discard all the other records.
      • If a tolerance is specified, compute the difference between the LHS and RHS of the inequality expression for the remaining record. If this value is greater than or equal to the specified tolerance, discard this record as well.
  • Based on the join type:
    • inner join: return what we computed as-is.
    • left join: if a tuple of left input records no longer appears after filtering, generate a record for it, with all fields on the right-hand side set to null.
    • right join: as above but mirrored.
    • outer join: as left and right but generate records with nulls for both sides.
    • semi join: yield only the fields from the left input(s), and yield them only if at least one of the corresponding records from the cross product remains after all the filters.
    • anti join: yield only the fields from the left input(s), and yield them only if none of the corresponding records from the cross product remain after all the filters.
    • single join: like a left join, but return at most one join partner for each record from the left input(s); pick one at random or throw an error if this is ambiguous.
  • Filter out records based on the post-join filter.

Please bear in mind that this is just a behavioral description. You would certainly not be materializing the Cartesian product of all inputs as a first step of any sane join implementation.

With that said;

Multi-table join

Fundamentally, multi-table join is already covered by Substrait. Something like

SELECT employee.first_name, employee.last_name, call.start_time, call.end_time, call_outcome.outcome_text
FROM employee
INNER JOIN call ON call.employee_id = employee.id
INNER JOIN call_outcome ON call.call_outcome_id = call_outcome.id
ORDER BY call.start_time ASC;

would look like two nested inner join operations; something like sort(join(join(employee, call, call.employee_id == employee.id), call_outcome, call.call_outcome_id == call_outcome.id), call.start_time).

I don't necessarily see this as a reason not to define a multi-table join, but it does make me question what it's useful for. If the only reason is along the lines of "this maps more nicely to Acero," that's not a good reason to make consuming JoinRels more complicated for everyone else.

If we do decide to embrace this, then I have some concerns and notes:

  • If we look at the SQL query, we see that different right-hand-side tables can be joined in different ways, while we only have one join type option per JoinRel. That is, the proposed multi-table join syntax is not generic enough to replace a join tree in all cases when coming from SQL. Therefore, I'm not sure how helpful this change would be for any user except Acero (which this seems to be based on).
  • The more inputs there are, the harder it's going to be to verify that all tables actually have columns related to each other via the join expression. Join expressions are already a bit difficult to consume, so I'm not sure we should make it worse.
  • If we're going to generalize the right-hand side of the join, why not also the left-hand side? The logic I wrote above technically generalizes to both sides. This is probably a weird thing to do, but if this does not make any sense, why does it make sense to allow multiple tables on the right?

For Acero and friends, I don't necessarily see why a tree is that much harder to deal with than a flattened multi-rhs join. For a producer the conversion is trivial. For a consumer, you'd look at the left input and see if it's a compatible join; if so, merge it, otherwise consume it like a normal relation. This will also help proper consumption from other producers later, that might not know to flatten a join tree, even if Substrait were to support it (so you should kind of implement this anyway).

If you're really dead-set on wanting to mirror Acero's expressiveness for relations exactly, the correct solution is to use a relation extension. Again, this is something you should IMO do eventually anyway, because it allows you to decouple and make optional this tree-flattening logic in/from the consumer itself. It's just a matter of which you find higher-priority.

As-of join

I see a bunch of problems and implementation mistakes for this one, and have some random thoughts as well.

  • FieldRefs are not what you think they are. Fields don't have names in Substrait; they always positionally refer to exactly one field. This contrasts with the Arrow FieldRef in the thing this change seems based on, which seems to refer to two fields by means of a name, one in the left input and one in one of the right inputs. Otherwise, none of this makes sense. So, you would need at least two Substrait FieldRefs here.
  • Rather than FieldRefs though, the more natural thing to put here is an Expression; an inequality expression like lte or gte to be specific. The arguments of that inequality would be FieldRefs for the relation to map onto Acero exactly, but in Substrait they can be arbitrarily nested expressions. This is different from how Acero does it, but it's by design nonetheless.
  • At this point it's not really a "key" anymore. A name like as_of_inequality or whatever would make more sense. This is what I used in the behavior description above.
  • Having this expression explicit is a good thing, because now as-of only works if you provide an inequality expression for it. That same inequality operator can also be used for the (partial) ordering step.
  • Tolerance as a literal is problematic, because it implicitly requires application of a subtract function to compute the distance. This only works for number types, and even then it's ugly, because "operator functions" are no more special than any other function, so how is the join relation supposed to know what function you want to use for this? It might seem obvious, but only for number types defined by Substrait. What about a number type extension provided by someone else, that Substrait obviously doesn't provide a subtract function for? How is the relation supposed to know what function to use?
  • I also see no fundamental reason to restrict this to a literal. In the logic above, the tolerance could be any expression that can make use of any combination of fields from the inputs. I could see this be useful when your dataset includes non-constant error bars, for example.
  • Based on the above two points, I suggest we replace the tolerance with another filter expression, that, for the chosen as-of join pair, determines whether the pair is good enough. This solves the subtract problem as well, because the distance computation is now explicit.
  • I don't see a reason why we shouldn't just extend JoinRel to include as-of join functionality, as opposed to making a new relation for it. That would also avoid all the duplication that's going on right now. This would simply involve adding the as_of_inequality and as_of_filter expressions to JoinRel, and specifying that, if the former is specified, the JoinRel effectively becomes an as-of join by including the corresponding processing steps. as_of_filter could be optional, or we could just require it be set to true. It might be nicer to pack these two things into a sub-message since they're related to one another, in which case the existence of this sub-message field would enable/disable the as-of join functionality.
  • Just a note, but I don't think as-of joins make all that much sense in conjunction with all the existing join types. I don't think the logic breaks necessarily, it just becomes something weird. So I think we could also make as-of joins a new join type. In that case, the most natural structure would be to deprecate the enum and replace it with a oneof, where all types except the as-of variant are google.protobuf.Empty messages and the as-of option has the inequality and tolerance filter expression.

Some misc. notes:

  • The protos are currently broken because enums are defined in the global namespace (like in C), so you can't duplicate JoinType like that.
  • "reserving" tags/field numbers for a oneof isn't really a thing. You can't make use of the information in any way, and (if taken to the letter) it would also prevent adding fields outside the oneof later.
  • I feel like this should be split into two proposals, i.e. one for multi-table joins and one for as-of joins.
  • Please use conventional commit syntax for your commits, otherwise CI will complain and we can't merge.

@rtpsw
Copy link
Author

rtpsw commented Sep 16, 2022

Thanks for this very detailed feedback, @jvanstraten. It adds a lot of clarity to the discussion. I generally agree with a lot of the points you made. I'll address some points below.

Regarding the behavioral description. This is a good approach and, IIUC, it can be seen as a more detailed description of the generalization I described. However, I'm not sure I follow the as-of-join part. In particular, I would have expected to see the tolerance embedded within the as-of-inequality (or more generally, a range expression) rather than be separate from it and/or an optional item. For example, I would think an as-of-join with a tolerance of 1 hour would be represented with an as-of-inequality expression with a meaning like left.time - right.time <= 1 hour, which embeds the tolerance. Taking a further step, we might want to embed the sorting into the expression too, e.g., something with a meaning like max(right.time) where left.time - right.time <= 1 hour, though I don't know enough to say whether that can be supported in Substrait in a reasonable way.

Regarding multi-table joins. I agree there is symmetry between a left-join of one-left-with-multiple-right-tables and a right-join of one-right-with-multiple-left-tables, but then supporting one of them is enough and the choice is arbitrary. What I don't see the meaning of is a left-join or right-join of multiple-left-with-multiple-right-tables (no way to arbitrate between the multiple non-equivalent join-trees that would fit this), and If I'm reading the behavioral description correctly, it does not deal with this case anyway.

If the only reason is along the lines of "this maps more nicely to Acero,"

While Acero is a starting point for the design, it is not how I would justify defining a multi-table join. One justification is that it allows the Substrait consumer to represent the multi-join succinctly, so that any Substrait producer could optimize from this representation. Without a multi-join representation, the Substrait consumer would have to generate a tree-join and each Substrait producer would have to reimplement some kind of pattern-matcher to recover the multi-join from this tree-join before it could optimize.

Regarding as-of-join. Several points you raise are related to FieldRef and the tolerance, and I think embedding the tolerance in an as-of-inequality, as described earlier, should resolve them. As for the structuring of JoinRel and AsOfJoinRel, I don't have a strong opinion.

@rtpsw
Copy link
Author

rtpsw commented Sep 16, 2022

  • The protos are currently broken because enums are defined in the global namespace (like in C), so you can't duplicate JoinType like that.

My local protoc (libprotoc 3.6.1) compiled this fine, but I guess some CI job runs a different/older version. How should I correctly validate before committing?

  • "reserving" tags/field numbers for a oneof isn't really a thing. You can't make use of the information in any way, and (if taken to the letter) it would also prevent adding fields outside the oneof later.

What would be a proper way to allow for future types to be added to the oneof? Perhaps it is fine for the tags to be non-consecutive.

  • I feel like this should be split into two proposals, i.e. one for multi-table joins and one for as-of joins.

I think this is OK. If we do this, I'll give the as-of-join as a higher priority.

  • Please use conventional commit syntax for your commits, otherwise CI will complain and we can't merge.

I believe the first commit's message is conventional. Does that mean I should use the same conventional message for all commits in this PR? In other repos there was no issue for the reviewer to pick the message of the first commit.

@jvanstraten
Copy link
Contributor

jvanstraten commented Sep 19, 2022

For example, I would think an as-of-join with a tolerance of 1 hour would be represented with an as-of-inequality expression with a meaning like left.time - right.time <= 1 hour, which embeds the tolerance. Taking a further step, we might want to embed the sorting into the expression too, e.g., something with a meaning like max(right.time) where left.time - right.time <= 1 hour, though I don't know enough to say whether that can be supported in Substrait in a reasonable way.

I did it this way in part because I didn't fully consider your approach and in part because enforcing the inequality the be at the root of the expression and mandating that the LHS only uses fields from the left and the right only uses fields from the right makes it directly applicable for sorting as well. Conversely, something like left.time - right.time <= 1 hour cannot be used for sorting unless you first materialize the whole Cartesian product.

That being said, I like your idea of just making the sort explicit more. Sorts are normally represented using repeated SortField sorts, so we can just re-use that. Then you could just say "pick the first RHS record after sorting for each LHS record". The requirement would be that the sort only uses fields from the right-hand-side table(s) for the operation to be sensible. We could also define the sort keys to act on the RHS schema only rather than the joined schema; it'd be way easier to validate, but is probably more confusing to humans than it's worth.

I guess the operation could be implemented in two ways; either by sorting the entire RHS first, or doing a partial sort (i.e. min/max) operation after filtering all the candidates for a particular LHS record. To update the behavior description above, using the latter interpretation as the baseline:

  • For as-of joins only:
    • Filter out records based on the as-of inequality expression.
    • For each left-hand side record:
      • (Partially) order the right-hand side matches using the specified sort operation.
      • Discard all but the first record.

I think the sort could also be left out entirely, and implicitly be whatever minimizes the difference between the LHS and RHS of the comparison operator at the root of the expression. But that's not as powerful, because the root of the expression doesn't actually need to be a comparison operator that Substrait knows about if you do include the sort, and without the sort it's much harder to derive how to sort the RHS input if you want to do it before the rest of the join operation.

Actually... thinking about it like this, I think Substrait can already almost represent as-of: just insert a SortRel in the RHS with the appropriate sort, use a "single" join, and update the definition of that one that it needs to return the first value if ambiguous, rather than being allowed to error out or return any one of them regardless of sort. But I'm still fine with adding explicit support for as-of anyway.

I agree there is symmetry between a left-join of one-left-with-multiple-right-tables and a right-join of one-right-with-multiple-left-tables, but then supporting one of them is enough and the choice is arbitrary.

Semi, anti, and single joins are not symmetrical so it falls apart there, but fair enough otherwise, although by the same logic one of left or right join need not exist either. It was mostly intended as a thought experiment.

What I don't see the meaning of is a left-join or right-join of multiple-left-with-multiple-right-tables (no way to arbitrate between the multiple non-equivalent join-trees that would fit this), and If I'm reading the behavioral description correctly, it does not deal with this case anyway.

Doesn't it? I'm kind of too lazy to reread my description in detail to see if I made any mistakes, but the generalization I intended is for any multi-table join of left_1..left_n and right_1..right_m to behave just like JoinRel(CrossRel(left_1..left_n), CrossRel(right_1..right_m)), where I suppose the CrossRels need to be represented as a tree because they're fixed to two-input for whatever reason. You could also use inner joins instead, since they do the exact same thing AFAICT. I think this matches the intended behavior for multiple RHS tables, hence I generalized to multiple LHS tables this way, at least in my head.

One justification is that it allows the Substrait consumer to represent the multi-join succinctly, so that any Substrait producer could optimize from this representation. Without a multi-join representation, the Substrait consumer would have to generate a tree-join and each Substrait producer would have to reimplement some kind of pattern-matcher to recover the multi-join from this tree-join before it could optimize.

I think you swapped producer and consumer in your last sentence, but I get what you're saying. My counterargument would (still) be that a consumer cannot generally rely on the producer doing this optimization for them unless it already knows what the producer is. So there are two cases here:

  • The producer is arbitrary, probably something plugged into the consumer by an end user or some orchestration system, and no assumptions can be made about the producer except that it uses only core extensions. The user will probably expect the consumer to implement the query to the best of its abilities in terms of performance, rather than blindly following whatever the producer does. Therefore, you would have to account for the join tree representation even if Substrait supports multi-join (unless you just don't care enough about performance in this case to implement the optimization).
  • The producer and consumer do know about each; specifically, the consumer knows that the producer optimizes for it. In this case, just cut out the Substrait middle man entirely where things don't map one-to-one and use relation extensions.

It is true that, in the former case, having multi-table join in Substrait allows them to not care about dealing with optimizing join trees. However, any consumer that does not support multi-table join internally would have to outright reject a plan that uses it unless they write new code to reduce it. So, IMO, the tradeoff here tips against introducing first-class support for multi-table join.

Regarding as-of-join. Several points you raise are related to FieldRef and the tolerance, and I think embedding the tolerance in an as-of-inequality, as described earlier, should resolve them.

Correct.

My local protoc (libprotoc 3.6.1) compiled this fine, but I guess some CI job runs a different/older version. How should I correctly validate before committing?

That's weird IMO, but I've learned not to be surprised by inconsistencies in protobuf... I think CI uses buf lint to check, but I'm not 100% sure.

What would be a proper way to allow for future types to be added to the oneof? Perhaps it is fine for the tags to be non-consecutive.

It is, protobuf doesn't care what the tags are. The only somewhat relevant thing to know about them is that tags > 31 cost one or more additional bytes in the serialization.

The important thing for forward- and backward-compatibility is that all oneofs are semantically mandatory fields. This is because protobuf inherently doesn't know for unknown fields during deserialization which oneof they belong to (if any). It just treats them like any other unknown field; i.e. it stores them in a special place to retain them when reserializing and then silently moves on. So, a oneof that has a field populated that the deserializer doesn't know about looks like an unspecified oneof, and therefore needs to be rejected, or the plan would be executed incorrectly using whatever default behavior would be associated with not specifying the oneof.

I believe the first commit's message is conventional. Does that mean I should use the same conventional message for all commits in this PR? In other repos there was no issue for the reviewer to pick the message of the first commit.

IMO it shouldn't work this way, but CI will simply reject your PR if the commits don't satisfy conventional commit style, and I don't have the authority to overrule that. It's silly, because it's the final squash-merge commit that actually matters, which is ultimately typed and checked by the reviewer, and is not checked by CI (because it can't be until it's already in main). I've brought this issue up before, but feedback was mixed so I just dropped it.

Rel left = 2;
repeated Rel right = 3;
Expression expression = 4;
Expression post_join_filter = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't generally see a post join filter in asof join APIs. Do we need this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I think we should remove this. This seems like a very logical node. If an engine could benefit from a post-join filter then that should be left for the physical definition (one could argue that our join rel shouldn't have a post-join filter as well)

RelCommon common = 1;
Rel left = 2;
repeated Rel right = 3;
Expression expression = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this expression mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the actual join expression. The FieldRef that's still in the proposal right now does not work the way you think it works in Substrait and is just not applicable for a join. See my first few notes on AsOf join in #331 (comment). I think @rtpsw and me had already aligned on a number of changes to this proposal, but they haven't updated it yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is roughly equivalent to Acero's by_key I believe.

@icexelloss
Copy link
Contributor

icexelloss commented Sep 22, 2022

@jvanstraten Thank you for your detailed feedback, I think many of the point that you are making makes sense.

For context, I work with @rtpsw on supporting asof join via Python (ibis) frontend -> substrait -> Acero backend.

I think there are two separate goal here:
(1) Come up with a proper design/standard for "Asof Join" in substrait for different producer/consumer
(2) Come up with an quick/experimental design for Asof Join in ibis -> substrait -> Acero so we have something that works.

Ideally we can achieve both goal at the same time but in reality, because of time/resource constraint, achieving (1) takes much more time and have more design concerns and need more careful thinking. I wonder if there is a way to achieve (2) without:
(a) Having to spend the time/effort to achieve (1)
(b) Doesn't not compromise goal (1)

One idea that I have it to allow "experimental" Relation to be added to the substrait and allow Acero to start using it / iterate on it, after all the Asof Join API in Acero is experimental and subject to change. In my mind, "experimental" stuff is good way for users of substrait to achieve (2) and iterate faster while making it clear that this is not something officially supported and will change/break.

@jvanstraten
Copy link
Contributor

(2) Come up with an quick/experimental design for Asof Join in ibis -> substrait -> Acero so we have something that works.

This is exactly what Substrait supports extensions for. Well, and also for things that Substrait does support, but doesn't support in exactly the way a particular producer/consumer pair would like, and portability is not (yet) a concern. To be more clear, this means using

ExtensionSingleRel extension_single = 9;

message ExtensionSingleRel {
RelCommon common = 1;
Rel input = 2;
google.protobuf.Any detail = 3;
}

and populating the detail field with a message type defined outside of the substrait namespace. I understand that setting up the logistics for this is annoying, but (speaking with my Voltron Data hat on) I'm sure that we're going to have to at some point, and once we do we can iterate as fast as we want and experiment as much as we want without relying on feature stabilization from Substrait, experimental or otherwise.

I don't think that the "marked as experimental" thing that Arrow does would really work for Substrait, because we use an automated breaking-change detection and versioning system that would not be able to distinguish between breaking changes to experimental vs fully supported features.

@rtpsw
Copy link
Author

rtpsw commented Sep 26, 2022

Sorry for my response taking some time. I'll need a couple more days before I can give this issue my full attention again.

@icexelloss
Copy link
Contributor

icexelloss commented Sep 27, 2022

This is exactly what Substrait supports extensions for. Well, and also for things that Substrait does support, but doesn't support in exactly the way a particular producer/consumer pair would like, and portability is not (yet) a concern. To be more clear, this means using

Thanks @jvanstraten. Per discussion with @westonpace and Voltron Data folks, this sounds like the best way forward.

@CLAassistant
Copy link

CLAassistant commented Oct 6, 2022

CLA assistant check
All committers have signed the CLA.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@rtpsw
Copy link
Author

rtpsw commented Nov 7, 2022

Sorry it took a while to get back to this. Are we ready to move forward? If so, use merge or rebase to update the branch?

@rok
Copy link
Contributor

rok commented Nov 14, 2022

@rtpsw are you able to sign the CLA?

@rtpsw
Copy link
Author

rtpsw commented Dec 2, 2022

Signed.

@rtpsw
Copy link
Author

rtpsw commented Dec 4, 2022

The branch requires updating.; in this repo, do you prefer merge or rebase?

@westonpace
Copy link
Member

One of the PR checks requires that every commit comment follows semantic commit conventions. So merge commits will not work and rebase is probably necessary.

@rtpsw
Copy link
Author

rtpsw commented Dec 5, 2022

@jvanstraten, it looks like you need to reapprove and then rebase can take place automatically.

@rtpsw
Copy link
Author

rtpsw commented Dec 15, 2022

What is expected for this repo? a merge commit or a rebase?

@rtpsw
Copy link
Author

rtpsw commented Dec 23, 2022

What is expected for this repo? a merge commit or a rebase?

@cpcloud ?

@cpcloud
Copy link
Contributor

cpcloud commented Jan 11, 2023

@rtpsw Rebase

message JoinRel {
RelCommon common = 1;
Rel left = 2;
Rel right = 3;
repeated Rel right = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the changes to JoinRel.

message AsOfJoinRel {
RelCommon common = 1;
Rel left = 2;
repeated Rel right = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should follow JoinRel and be a single Rel.

@snadukudy
Copy link

Hi @westonpace could you please review this? I see this earlier comment: #331 (comment)

thank you, Sri

@cpcloud
Copy link
Contributor

cpcloud commented Jan 18, 2023

I'm going to push some commits to this PR that should address many of the review comments.

@github-actions
Copy link

ACTION NEEDED

Substrait follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@cpcloud cpcloud changed the title feat(proto): Multi-table and as-of joining feat(proto): implement AsOfJoinRel Jan 18, 2023
@cpcloud
Copy link
Contributor

cpcloud commented Jan 18, 2023

I have addressed the feedback, and have added an example implementation of an asof join algorithm using @jvanstraten's suggested generalizations. Hopefully this clarifies how it is to be used!

| Inputs | 1 |
| Outputs | 1 |
| Property Maintenance | Distribution is maintained. Orderedness is by the on-field only post operation. Physical relations may provide better property maintenance. |
| Direct Output Order | The emit order of the left input followed by the emit order of the right input. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we allow de duplication of the on field (and by field)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's something to be done by ibis or something producing ibis expressions, so I don't think that'll be anywhere in the Substrait spec.

@rtpsw
Copy link
Author

rtpsw commented Jan 20, 2023

I'm confused about what the next steps here are and who would be taking them. @cpcloud ?

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing to me. It looks like two separate approaches mashed together. On the one hand, the proto feels like a very academic definition of an asof join that tries to make an asof join and a regular join categorically equivalent. The closest example I can find is https://clickhouse.com/docs/en/sql-reference/statements/select/join/ and they don't support tolerance so it's a bit easier.

  • If we are going to be academic about this then I think the terminology of on and expression is confusing. In SQL syntax a join is JOIN ... ON join_expression. The on and expression are the same thing. I prefer clickhouse's terminology which uses "equality condition" (which is scalar/stateless) and "closest match condition" (which is not, strictly speaking, stateless) and the "join expression" is equality condition AND closest match condition. They also limit closest_match_condition to <, <=, >, >= which makes a lot of sense (frankly, I think you could limit it to just <, <= and require the producer to switch the sides if they want > or >=.
  • In this very academic definition tolerance doesn't make sense. Isn't it just a specialization of the join expression? In other words, if your join expression is l_key == r_key and your tolerance is r_on - l_on < constant then you could just have a join expression (l_key == r_key) && (r_on - l_on < constant).

On the other hand, the markdown is describing a typical time-series join in the way that anyone familiar with asof join would expect to see.

IMO, the most important next step is to pick which of the two approaches we prefer. Frankly, I would prefer the more physical description described in the markdown over what we have described in the proto. We can call it a physical relation if we want.

// possible_match_count = 0
//
// # typically on is `left_key <= right_key`
// # typically tolerance is `right_key - left_key < CONSTANT`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is left_key - right_key < CONSTANT, at least that matches pandas.merge_asof's definition (and I'm pretty sure it matches what we have in Acero).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thanks.

// # track possible matches so we know what to emit
// possible_match_count = 0
//
// # typically on is `left_key <= right_key`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is reversed. In the normal case (non-negative tolerance) your right keys will always be <= the left keys.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, thank you.

Comment on lines +211 to +212
// if possible_match_count and expression(left_row, right_right) and tolerance(left_row.key, right_row.key):
// yield left_row + right fields where match
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually work. First, match is undefined. Second, you can't apply expression this late in the process. Consider:

l_by l_on r_by r_on
9 1
10 2
10 1

Given a tolerance of 5 you should output:

l_by l_on r_by r_on
10 1 9 1

However, in this algorithm, right_row (which I assume is what is meant by right_right) will be [10, 1] and will not satisfy expression.

Since this is psedocode and we don't care about performance just apply expression inside the while loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, match is undefined.

It's not meant to be a variable in this case. It's mean to indicate that fields from the right row should be "merged" (or appended or choose your verb) into the left row. where match is probably redundant.

Thanks for pointing this out. I'll correct it, and just evaluate all of the predicates as part of the while loop condition.

//
// # check that the join key (expression) matches and the tolerance is satisfied
// if possible_match_count and expression(left_row, right_right) and tolerance(left_row.key, right_row.key):
// yield left_row + right fields where match
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity we should just say right row and not right fields. The default emit is the entire row, not just the payload.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thanks.

RelCommon common = 1;
Rel left = 2;
repeated Rel right = 3;
Expression expression = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is roughly equivalent to Acero's by_key I believe.

@@ -235,6 +235,40 @@ The join operation will combine two separate inputs into a single output, based
```


## AsOfJoin Operation

The as-of join operation is a time series operation that will combine a left input and a right input into a single output, based on a join expression, an `on` field and a tolerance value. All inputs must have the on-field in ascending-order. The operation is similar to a join operation where the join expression is used for exact matching whereas the `on` field is used for inexact matching up to the tolerance value.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both on and tolerance are expressions above and referenced here as field and value.

@@ -235,6 +235,40 @@ The join operation will combine two separate inputs into a single output, based
```


## AsOfJoin Operation

The as-of join operation is a time series operation that will combine a left input and a right input into a single output, based on a join expression, an `on` field and a tolerance value. All inputs must have the on-field in ascending-order. The operation is similar to a join operation where the join expression is used for exact matching whereas the `on` field is used for inexact matching up to the tolerance value.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"All inputs must have the on-field in ascending-order" is an odd thing to say given that on is an expression.

| Join Expression | A boolean condition that describes whether each record from the left set matches the record from the right set. Field references correspond to the direct output order of the data. | Required. Can be the literal True. |
| Post Join Filter | A boolean condition that describes which join rows appear in the output. | Optional, defaulting to True. |
| Join Type | Same as the [Join](logical_relations.md#join-operator) operator. | Required |
| Tolerance | The maximum on-field value difference in an inexact match. | Optional |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect if tolerance is an expression.

| Post Join Filter | A boolean condition that describes which join rows appear in the output. | Optional, defaulting to True. |
| Join Type | Same as the [Join](logical_relations.md#join-operator) operator. | Required |
| Tolerance | The maximum on-field value difference in an inexact match. | Optional |
| On | The on-field. | Required |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect if on is an expression.

RelCommon common = 1;
Rel left = 2;
Rel right = 3;
Expression expression = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field needs a comment:

a boolean expression that determines which rows are considered matches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, thanks.

@cpcloud
Copy link
Contributor

cpcloud commented Jan 20, 2023

This is confusing to me. It looks like two separate approaches mashed together.

That's probably because I haven't yet had a chance to update much on the markdown side.

On the one hand, the proto feels like a very academic definition of an asof join that tries to make an asof join and a regular join categorically equivalent. The closest example I can find is clickhouse.com/docs/en/sql-reference/statements/select/join and they don't support tolerance so it's a bit easier.

  • If we are going to be academic about this then I think the terminology of on and expression is confusing. In SQL syntax a join is JOIN ... ON join_expression. The on and expression are the same thing. I prefer clickhouse's terminology which uses "equality condition" (which is scalar/stateless) and "closest match condition" (which is not, strictly speaking, stateless) and the "join expression" is equality condition AND closest match condition. They also limit closest_match_condition to <, <=, >, >= which makes a lot of sense (frankly, I think you could limit it to just <, <= and require the producer to switch the sides if they want > or >=.

I don't follow the stateless/stateful distinction, in particular where any state comes in.

In any case, we can change the terminology.

I like the idea of calling the inequality closest_match_condition.

  • In this very academic definition tolerance doesn't make sense. Isn't it just a specialization of the join expression? In other words, if your join expression is l_key == r_key and your tolerance is r_on - l_on < constant then you could just have a join expression (l_key == r_key) && (r_on - l_on < constant).

Yep, that makes sense to me. I think we can remove tolerance.

On the other hand, the markdown is describing a typical time-series join in the way that anyone familiar with asof join would expect to see.

Yep, as I said elsewhere I haven't had a chance to rewrite it.

IMO, the most important next step is to pick which of the two approaches we prefer. Frankly, I would prefer the more physical description described in the markdown over what we have described in the proto. We can call it a physical relation if we want.

In my mind, the academic approach is more flexible. At the same time it's probably more annoying to work with. The main thing that I dislike about what you're calling the physical version is that it's implementation is motivated by existing code without considering the problem outside of that implementation. Calling it physical doesn't change that, I think just pushes the problem down an abstraction layer.

There's a tension here to be sure, people need to actually get stuff done. I'm aware of that. On the other hand, a more general approach is likely to work for systems that don't have the same exact physical API and representation as Acero.

I think I could maybe be convinced to move back towards the physical representation, but tolerance is too much of a wart IMO to stick around, Literals with all sorts of assumptions baked in are kind of a no-go for substrait IMO. How would you want to handle tolerance?

@westonpace
Copy link
Member

I don't follow the stateless/stateful distinction, in particular where any state comes in.

Understandable, I didn't explain much. That comment was motivated by the exercise "why couldn't this be expressed as a regular inequality join?" For example, what makes ASOF JOIN ... ON l.time >= r.time AND l.key == r.key different from JOIN ... ON l.time >= r.time AND l.key == r.key? The answer is (I think) that the closest_match_condition is a sort of window operator r.time == (MAX(r.time) WHERE r.time <= l.time && r.time >= l.time - tolerance) and so in my mind it is a "stateful" because the closest_match_condition has to consider the entire window from l.time - tolerance to l.time.

In fact, this question I believe (I don't really know Spark) describes how to implement an asof join using a union and a window function. I think this statefulness is the fundamental difference between join and asof join.

In my mind, the academic approach is more flexible. At the same time it's probably more annoying to work with. The main thing that I dislike about what you're calling the physical version is that it's implementation is motivated by existing code without considering the problem outside of that implementation.

I disagree the physical version is motivated purely by code. It is usually motivated by the specific problem "I want to know what the most recent ticker values were when trades happened". That leads to a very natural understanding of what I called the physical implementation. I think the fact that the top 5 google results12345 for asof join sound more like the physical version than the logical version are evidence of this. This is including the kdb version which at least tries to be domain independent.

At the same time it's probably more annoying to work with.

I don't know if annoying is the right word. It's going to lead to lots of consumers that only partially implement the spec. I think this is a more general problem with logical relations though.

On the other hand, a more general approach is likely to work for systems that don't have the same exact physical API and representation as Acero.

That's why I called it physical :) That being said, I know of 4 implementations (pandas, kdb, clickhouse, acero) that would be able to satisfy that physical implementation.

To be clear, I am perfectly fine with eventually supporting both a logical and a physical relation. So this isn't "which one do we want" as much as "which one is this PR trying to provide?"

@cpcloud
Copy link
Contributor

cpcloud commented Jan 20, 2023

Statefulness aside :) How should we proceed? I still don't know how tolerance fits in. A literal seems like it bakes in too many assumptions, and in the more general form it's not necessary.

@westonpace
Copy link
Member

I've done some more reading and I believe what we are stumbling towards is a window join. Asof join is a specialization of window join. Tolerance is how asof join defines the windows.

@rtpsw and myself have an engine that is capable of performing asof join but not window join. There are other engines (e.g. pandas) that are in this same situation. AFAIK, no engine other than kdb actually has an implementation of the more generic window join.

Statefulness aside :) How should we proceed?

If Acero only has asof join and Ibis only wants window join then I see no path forward. I don't think it makes sense to have a specification without two implementations. Either we wait until Acero adds window join (not likely in the near future), Ibis supports asof join (don't know what this entails) or someone writes an optimizer/translator that supports both.

@julianhyde
Copy link

I’m an expert in streaming/temporal query and I don’t share your faith that window join will solve all problems. There are a lot of gnarly query patterns for streaming/temporal join. If you try to put them all under the banner of “window join” you’ll end up with a very complex definition of “window”.

My advice is to deal with the common cases separately and don’t try to force them to converge.

@EpsilonPrime
Copy link
Member

We now have ConsistentPartitionWindowRel defined and merged. Does that meet the requirements of AsOfJoinRel?

@westonpace
Copy link
Member

We now have ConsistentPartitionWindowRel defined and merged. Does that meet the requirements of AsOfJoinRel?

Nope. A window join might do the trick but ConsistentPartitionWindowRel is for "projection-like" window operations (e.g. calculating new columns) and not for joining multiple inputs together.

Even if a window join were defined we would probably still want an asof-join physical relation for the reasons that @julianhyde alluded to.

For evidence I will point to:

  • Acero - supports asof join but not a generalized window join
  • DuckDb - supports asof join but not a generalized window join
  • Polars - supports asof join but not a generalized window join

@westonpace
Copy link
Member

@EpsilonPrime if we are going to revive this then I would propose a good starting point is to come up with python pseudo-code that we can all agree is correct. The current attempt would yield the incorrect answer if the on expression (which I think we agreed to rename) is of the form left <= right or left < right.

The current pseudocode also does not correctly handle expression. I had suggested moving expression inside the while loop but that does not actually work either.

The second thing we need to do is decide how to handle tolerance. Neither duckdb or clickhouse support tolerance. However, both pandas and acero do. I believe one way to handle it would be to make tolerance part of the expression. In other words, support something like:

expression = "left.ticker == right.ticker && left.time - right.time < tolerance"

I think I am agreeing more with @cpcloud these days. Let's just make this a logical asof join and get rid of tolerance entirely. The producer, if they want it, can encode it in the expression. The producer, if they don't support it, can reject the expression (both duckdb and clickhouse mandate the expression consist only of equality conditions).

@jacques-n
Copy link
Contributor

I'm closing this as abandoned. Can reopen if someone is actually working on it.

@jacques-n jacques-n closed this Aug 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.