Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proto): add protobuf support for user-defined-function extension #234

Closed
wants to merge 1 commit into from

Conversation

rtpsw
Copy link

@rtpsw rtpsw commented Jun 27, 2022

For #233

@rtpsw
Copy link
Author

rtpsw commented Jun 27, 2022

Rationale for the proposed changes:

  • Why UDF as an extension function? A UDF is a function definition that is basically the same as a predefined (extension) function except that its implementation is embedded within the Substrait plan. It is not the same as an EmbeddedFunction, which describes a function invocation along with its implementation. In Arrow Substrait (the Substrait-consumer component of Arrow), a UDF must be registered the same way as a predefined function in order for it to be accessible within expressions, like the ones originating from the Substrait plan.
  • Why the specific fields in message UserDefinedFunction? The input_types and output_type fields are necessary for schema checking. The summary and description fields are used at least by Arrow Substrait's function registration; in case of a Python-implemented UDF, they have natural defaults that make specifying them in the Substrait plan optional. The code is in base64 to fully define byte-string conversions and over cloudpickle to support Python-implemented UDFs. In the future, other formats could be supported by adding a format field.

Note that a complete UDF workflow using the proposed changes has been locally implemented and tested to work using Ibis + Ibis-Substrait + (Py)Arrow.

One alternative discussed with @cpcloud that he asked be described here involves reusing as many of the existing protobuf definitions for EmbeddedFunction as possible but still separate function-definition from function-invocation due to the rationale discussed above. This would require some refactoring because currently these protobuf definitions are in algebra.proto and this alternative would need to use them from extensions.proto.

message UserDefinedFunction {
// A serialization of the user-defined function's code. The default serialization
// format is using base64 of cloudpickle.dumps().
string code = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a UTF-8 string and then use base64 when you can just use a protobuf binary (bottom of the table)? Generally, I feel like this should be a protobuf Any, which pretty much just is a standardized format for a binary and a string that acts as a format specification.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is no longer relevant, so I'll fix this and then base64 would not be needed. I think the type can be bytes. The format of the bytes would be cloudpickle by default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that if we go with the above-described alternative then cloudpickle format would be default only in the PythonPickleFunction case.

@jacques-n
Copy link
Contributor

Hello. There is already a sketch of how to do this in the spec under embedded functions. It would be great to start from the sketch as well as update it as you see fit. We don't call this a UDF as we consider a UDF to be a function that is registered once and then referred to by some sort of identifier. Embedded functions are situations where the plan contains the actual code of the function.

@rtpsw
Copy link
Author

rtpsw commented Jun 28, 2022

Hello. There is already a sketch of how to do this in the spec under embedded functions. It would be great to start from the sketch as well as update it as you see fit. We don't call this a UDF as we consider a UDF to be a function that is registered once and then referred to by some sort of identifier. Embedded functions are situations where the plan contains the actual code of the function.

Yes, this is basically the alternative and distinction between an embedded function and a UDF noted above. Is it already in use by some Substrait producer or consumer? Indeed the proposed UserDefinedFunction is intended for registration once and potentially multiple references. In this alternative, the existing definitions under EmbeddedFunction would need to be refactored to a common place and reused by both EmbeddedFunction and UserDefinedFunction.

I'd like to first reach agreement on which alternative to go with.

@icexelloss
Copy link
Contributor

icexelloss commented Jun 29, 2022

@jacques-n Thanks for the feedback, it's not obvious to me the difference between a Embedded Functions and UDF (or whether that distinction is needed) so I would like to understand it further.

Here is why I don't see a clear difference between the two:
In Spark, for example. you can define and use an User Define Function by

@pandas_udf(...)
def my_udf(...):
    return ...

df = df.withColumn('a', my_udf(df['b']))

I am guessing this is considered a Embedded Functions?

But also you can register this function in order to use in a SQL string

spark.udf.register('my_udf', my_udf)
spark.sql.select("select my_udf(b) from table")

To me, the fact that the function is registered seems to be a minor distinction between the two use case (only different by whether the UDF can be used in sql string) and the rest of the implementation of two cases are the same.

Therefore, it seems to be that rational behind the distinction is not very clear - perhaps you can give some example of justifying the different concept of "UDF" vs "Embedded Function"? and which one is more suited for the Spark/Ibis Python UDFs?

@rtpsw
Copy link
Author

rtpsw commented Jun 29, 2022

@icexelloss, I'll let @jacques-n answer and just note my understanding that EmbeddedFunction, like an inline invocation, has its definition expanded into the Substrait plan per invocation whereas UserDefinedFunction has its definition appearing just once in the Substrait plan.

@jacques-n
Copy link
Contributor

@rtpsw is right on the money. Substrait distinguishes between embedded/inline functions versus function references (aka user defined functions). User here is a user of Substrait and shouldn't be confused with the user of Spark. Layers above Substrait are free to call both of these things UDFs.

@icexelloss
Copy link
Contributor

icexelloss commented Jun 29, 2022

Gotcha. Sounds like EmbeddedFunction is what should be used for both Spark and Ibis UDFs then. A follow up question - if I understand correctly - the implementation (i.e., cloudpickled bytes) are inside the EmbeddedFunction message itself can be repeated in the substrait protobuf. This could cause issues when the function is called on many different input expressions and I wonder if it makes sense to lift the implementation out of the main protobuf and use reference to the implementation inside EmbeddedFunction message? (I am not sure if substrait supports this pattern already)

@jacques-n
Copy link
Contributor

Gotcha. Sounds like EmbeddedFunction is what should be used for both Spark and Ibis UDFs then. A follow up question - if I understand correctly - the implementation (i.e., cloudpickled bytes) are inside the EmbeddedFunction message itself can be repeated in the substrait protobuf. This could cause issues when the function is called on many different input expressions and I wonder if it makes sense to lift the implementation out of the main protobuf and use reference to the implementation inside EmbeddedFunction message? (I am not sure if substrait supports this pattern already)

Yeah, that's how it is right now. I'm open to a pattern of references similar to other function references where the embedded bodies are stored at the root of the plan.

@icexelloss
Copy link
Contributor

icexelloss commented Jun 29, 2022

@rtpsw Sounds to me EmbeddedFunction is the way to go. IMO, we probably want to make some changes to the current EmbeddedFunction message, i.e., separate the definition from the invocation and lift the definition out, e.g., sth like

message EmbeddedFunctionDefinition {
  # Stored at the root of the plan
  string id = 1;
  oneof kind {
    PythonPickleFunction python_pickle_function = 4;
    WebAssemblyFunction web_assembly_function = 5;
  }

  message PythonPickleFunction {
    bytes function = 1;
    repeated string prerequisite = 2;
  }
}

message EmbeddedFunction {
    repeated Expression arguments = 1;
    Type output_type = 2; 
    string function_id = 3; # Reference embedded function by id
}

What do you think?

@jacques-n
Copy link
Contributor

@rtpsw Sounds to me EmbeddedFunction is the way to go... What do you think?

The concepts are reasonable but you should use the same anchor/reference system we use elsewhere in the plan (for normal functions, user defined types, etc).

For the Python Pickle function, you also need to write up some documentation saying what the signature of a python function must be to be allowed to use arguments of a certain type. For example, if the arguments are VARCHAR<24>, DECIMAL, what should be the arguments be within Python. It is important that a second system could produce and/or consume on the same python pickle packaging so we need to be clear about not only dependencies but all of the type mappings we support to start. I'd also suspect that you should describe the version of python that the pickle function was within. (I'm thinking that prerequisites maybe should be tuples of requirement name and version number/range/etc?) Would love python people to fill this out to make sure. As I said, the python embedded functions should be defined well enough to be used in multiple systems. (If that requires them using a substrait python sdk, I think that would be fine.)

@rtpsw
Copy link
Author

rtpsw commented Jun 30, 2022

@icexelloss, I'm fine with trying this general approach. @jacques-n, I understand what you're trying to push for; however, I think this would make the current focused issue (whose resolution would unblock a path on my UDF project) into a much larger one that involves setting standards and reaching agreement about them. My view is that since the existing EmbeddedFunction is currently acceptable without such standards, then a simple refactoring of it (in which definition and usage are separated) should also be, and that the standards-setting should be deferred to a separate issue. Thoughts?

Having said that, here are some thoughts about your points:

  • I expect the mapping of argument types to Python would be consistent with the PyArrow type system.
  • Regarding versions, my understanding is it depends on (the compatibility guarantees of) the pickler. cloudpickle requires being installed for unpickling, at least in order to reduce pickling size, but this does not seem to be mandated by all picklers (see here). Also note that some Substrait plans are intended for immediate delivery and consumption, rather than for long-term storage, and the versioning question is simpler in their case. The choice of pickler can reflect this.

@cpcloud
Copy link
Contributor

cpcloud commented Jun 30, 2022

Agree on the need to write up the type mapping for the UDFs. One question is: what data are coming into the UDF? Does/should substrait have an opinion here? Without something like "arrow data are coming in" there's no way that two independent systems could ever hope to execute the same plan if it contained UDF, dependencies aside.

I'd also suspect that you should describe the version of python that the pickle function was within

Maybe, but the pickle protocol number should be sufficient.

we need to be clear about not only dependencies

This seems well outside the scope of substrait and deep into provisioning and dependency management. Requiring that two arbitrary consumers that have been deployed independently be able to execute the same plan that includes a Python UDF doesn't seem feasible to solve anywhere, let alone in Substrait.

A list of requirements is barely enough to get a Python environment to work.

What if one consumer is running on x86-64 and the other on arm64 and there are native dependencies like numpy in the mix?

What if both are on x86-64 but with C++ dependencies compiled with different, incompatible C++ standard libraries?

IMO it's up to the systems that want to share plans with Python UDFs to make sure their dependencies are compatible by using modern deployment and provisioning tools.

@rtpsw
Copy link
Author

rtpsw commented Jun 30, 2022

Agree on the need to write up the type mapping for the UDFs. One question is: what data are coming into the UDF? Does/should substrait have an opinion here? Without something like "arrow data are coming in" there's no way that two independent systems could ever hope to execute the same plan if it contained UDF, dependencies aside.

This definitely has to be Arrow data. The type system is already in place; it is used for inputs and outputs of extension functions, and it should be reused for the UDFs we are discussing here.

IMO it's up to the systems that want to share plans with Python UDFs to make sure their dependencies are compatible by using modern deployment and provisioning tools.

Agree, A common use case is delivering a Substrait plan for execution at a server/cloud host. In this case, it's normal for the caller to be responsible for a consistent distributed setup. Since there's no dependency management problem for Substrait here, we can focus on UDF definitions in this issue.

Note that the more difficult dependency management problems arise when Substrait plans are intended for long-term storage, but this is a separate topic.

@jacques-n
Copy link
Contributor

This seems well outside the scope of substrait

I'm not convinced of this. Definitely not without substantially more discussion around the design and vision of embedded operations. Without it, it's much harder to argue that a substrait plan with an embedded function is well intentioned and compatible across multiple systems. Let's make sure to keep "hard to do" distinct from "not preferred". We can decide to not do something because it is hard. But is this actually hard in in common cases? (I have no doubt it is hard in some cases but let's keep those "bucket of bytes" cases to be as rare as possible.)

Python Pickle version

I don't know pickle. Experts must say what the right version information is. Just seems like we definitely need concepts around version.

Arrow...

I think that is one model and it makes sense to support it. However, from a Substrait pov, is it the first model we should introduce? Why not introduce something that works which defines a default python primitive to substrait type mapping to begin with to expose simple examples?

current focused issue...

I don't think the issue here is narrow. Formalizing embedded functions into the Substrait spec should be seen as a meaningful undertaking. The content on the current spec around this topic has been marked as "sketch" and hasn't been meaningfully touched since originally sketched 10 months ago. It lists a bunch of open questions that we should resolve to formalize.

I don't want to create a giant barrier to getting this done. At the same time, it seems worthwhile to put together a design and then figure out the bite-sized steps to get us to a good outcome. It's important to make sure that the proposal works for multiple many different substrait users.

@cpcloud
Copy link
Contributor

cpcloud commented Jun 30, 2022

This seems well outside the scope of substrait

I'm not convinced of this. Definitely not without substantially more discussion around the design and vision of embedded operations. Without it, it's much harder to argue that a substrait plan with an embedded function is well intentioned and compatible across multiple systems. Let's make sure to keep "hard to do" distinct from "not preferred". We can decide to not do something because it is hard. But is this actually hard in in common cases? (I have no doubt it is hard in some cases but let's keep those "bucket of bytes" cases to be as rare as possible.)

I don't think it's a bucket of bytes situation, I think it's a "this is a really difficult problem without substrait involved and adding another technology's opinions into the mix is a bad idea"-kind of problem.

If I'm understanding you correctly it sounds like you're saying that a plan must work across arbitrary consumer environments to be a valid plan. If so, I think this is an astronomically high barrier to use UDFs and it will prevent people from using UDFs at all.

Can you describe what a substrait consumer and producer would do with a list of dependencies and their version constraints?

It might also help to clarify what "the common case" is here. In my experience it's not easy to enumerate what "the common case" means for dependency management.

Python Pickle version

I don't know pickle. Experts must say what the right version information is. Just seems like we definitely need concepts around version.

I've used pickle quite a bit. The documentation is good and provides detailed information about versioning.

I don't understand what it means to say an expert must say what the right version information is. What is an expert here? Pickle already contains versioning information and compatibility guarantees around Python versions. I'm not saying we shouldn't include Python version, only that we should look at what already exists and use it as is if we can.

Arrow...

I think that is one model and it makes sense to support it. However, from a Substrait pov, is it the first model we should introduce? Why not introduce something that works which defines a default python primitive to substrait type mapping to begin with to expose simple examples?

I was using Arrow as an example of one choice, not as the thing we should necessarily start with. We can start with Python objects, that seems fine if horribly inefficient.

@rtpsw
Copy link
Author

rtpsw commented Jul 1, 2022

Here's a detailed (though not complete) technical proposal to start a concrete discussion about. If it seems to be in the right direction, I could work on a PR for it. I designed with the following in mind:

  • Separate function (global) definition from (local) invocation. Avoid specifying a function's body at multiple call-sites.
  • Ensure a function has a type. I am not sure whether we want the input and/or output types to be statically defined or dynamically determined at the call-site; below I opted for the former.
  • Rely on the existing type system in type.proto. Use Type to capture both input and output types of the functions.

In proto/substrait/type.proto:

message FunctionType {
  repeated Type input_type = 1;
  Type output_type = 2;
}

In proto/substrait/function.proto (new file):

import "proto/substrait/type.proto";

message FunctionBody {
  oneof kind {
    PythonPickleFunction python_pickle_function = 1;
    WebAssemblyFunction web_assembly_function = 2;
  }

  message PythonPickleFunction {
    bytes function = 1;
    repeated string prerequisite = 2;
  }

  message WebAssemblyFunction {
    bytes script = 1;
    repeated string prerequisite = 2;
  }
}

message FunctionDefinition {
  FunctionType type = 1;
  FunctionBody body = 2;
}

message FunctionDoc {
  string summary = 1;
  string description = 2;
}

In proto/substrait/extensions/extensions.proto:

message SimpleExtensionDeclaration {
  ...
  message ExtensionFunction {
    ...
    UserDefinedFunction udf = 4;

    message UserDefinedFunction {
      FunctionDefinition def = 1;
      FunctionDoc doc = 2;
    }
  }

In proto/substrait/algebra.proto, remove EmbeddedFunction because ScalarFunction can already be used to reference an ExtensionFunction having a UserDefinedFunction.

@jacques-n
Copy link
Contributor

I don't understand what it means to say an expert must say what the right version information is. What is an expert here?

I'm saying someone who knows Python serialization like yourself and several others on the list (and not me). If a pickle version alone is sufficient on the interpreter side, awesome. I just want to make sure that we have a clear and reliable versioning specification/approach.

@icexelloss
Copy link
Contributor

IMO there is reasonable solutions to the issue raised above.

Agree on the need to write up the type mapping for the UDFs. One question is: what data are coming into the UDF? Does/should substrait have an opinion here? Without something like "arrow data are coming in" there's no way that two independent systems could ever hope to execute the same plan if it contained UDF, dependencies aside.

I think this is something that the producer can specify inside the EmbeddedFunction definition. The types of UDFs I have in mind are "vectorized UDF", i.e., the UDF takes a vector of values and returns a vector of values (map) or a scalar value (aggregate). For this types of UDF, I think using Arrow data is reasonable to start and have a field in the EmbeddedFunction to specify the "data type". For UDFs that take a single value and return a single value, Python object is probably the only choice but I don't think there is much use for that types of UDFs now because of performance.

I'm not convinced of this. Definitely not without substantially more discussion around the design and vision of embedded operations. Without it, it's much harder to argue that a substrait plan with an embedded function is well intentioned and compatible across multiple systems. Let's make sure to keep "hard to do" distinct from "not preferred". We can decide to not do something because it is hard. But is this actually hard in in common cases? (I have no doubt it is hard in some cases but let's keep those "bucket of bytes" cases to be as rare as possible.)

I think perhaps we can support two ways here.

  • We can include a message that is effectively a requirements.txt at the root of the substrait plan to allow consumers to recreate the Python environment and the EmbeddedFunction doesn't need to specify any more dependency / Python environment
  • We can include a message that is a path to a Python executable. This is useful when the producer / consumer has access to a shared Python executable (We have system setup internal to deploy this for example).

I think these two option doesn't solve all the cases but is reasonable start that covers common cases. @jacques-n @cpcloud @rtpsw WDYT?

(Just in a side note, for the use case of this in Arrow, the "environment" here doesn't really matter because producer/consumer share the same Python environment)

@rtpsw
Copy link
Author

rtpsw commented Jul 5, 2022

For UDFs that take a single value and return a single value, Python object is probably the only choice

I think even if in this case, the single value should have one of the types defined in the Substrait type system.

but I don't think there is much use for that types of UDFs now because of performance.

This part I agree with.

  • We can include a message that is effectively a requirements.txt
  • We can include a message that is a path to a Python executable

These are reasonable options assuming the versioning problem is about dependencies; however, @cpcloud says the dependency problem is not in scope for Substrait. From the discussion leading to this post by @jacques-n, my understanding is the versioning problem is about serialization compatibility, i.e., ensuring that the pickled UDF can be correctly unpickled by various Substrait consumers. In principle, picklers should take care of this compatibility, yet Substrait should have appropriate definitions for it.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 5, 2022

Some other thoughts regarding dependency

I have the same feeling that no matter what we include in the substrait in term of Python dependencies, it's still hard to make these substrait work for multiple system. As a thought experiment, if I have something produces such substrait and passed to Spark, even I fully specifies the version of the packages I need, Spark would still need to install these packages on its worker, which isn't something that Spark can do right now. Therefore, even we come up a spec here, we won't truly know if it works until we try to pass this across systems.

Another reason that I think we shouldn't handle Python libraries and dependencies in Substrait is that systems like Spark/Dask itself doesn't handle these as part of the "Query plan", i.e., if a user of Spark has a Spark script that constructs a query that has UDFs, the libraries requirement to run this query (Python or Scala) is not part of the Spark script and the query itself, and is controlled by parameter to start up the Spark cluster, and I think most system handles dependencies this way - i.e. not part of the query plan but rather as part of "how to setup the system". Since substrait is a presentation of the query plan, I think it's not a concern of substrait either IMO.

It's true that this makes it that the consumer of the substrait plan is responsible for making sure it has the dependencies needed for executing it, but I think this reasonable because it is similar to whoever takes a Spark script and run it needs to make sure it has the proper dependencies.

Which makes me lean towards punt on this problem for now, and the UDF spec is already useful without the dependency part IMO.

Edit: Add additional reasons for not handling dependencies in substrait.

@rtpsw
Copy link
Author

rtpsw commented Jul 6, 2022

@cpcloud, @jacques-n: back to you. How to move forward? If my proposal here is a reasonable starting point, I could get it ready and upload here.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 13, 2022

@jacques-n @cpcloud haven't heard back from you for a while. I would try to summarize what I think is a way forward and wonder what do you think

Re @cpcloud comments on "object type that is passed in and out of the UDF": I think this is reasonable to add to the UDF definition protobuf (details pending, do we need just one type for the input/output, or do we need one type for each input arguments?)

Re @jacques-n comments on "dependencies and versions for the UDF": I think this is best to not solve this problem. (Elaborating of the reasoning here: #234 (comment))

Does this sounds a reasonable way forward?

@jacques-n
Copy link
Contributor

@icexelloss we had some decent conversations about this at the last sync. In general, I think we have to work further on making things "self contained". This means having some sort of dependency definition (amongst other things). I think @cpcloud had some ideas of how to do this.

@icexelloss
Copy link
Contributor

I see - Thanks for the update here. What's the best way for us to track this work? (Since this is the blocker for us now we'd like to get involved and help out if we can)

@jacques-n
Copy link
Contributor

I'd love for @cpcloud to chime in as I'm not an expert here. I think his suggestion was that we support dependencies for python defined as a pip lock file or a conda lock file to start(?).

I definitely think that the other item: what is the way we express the function signature (argument types, etc) for the python code and how do we declare the entrypoint into the python code also need to define.

It would be great for you put together a proposal of each of these items as markdown as part of this patch.

@cpcloud
Copy link
Contributor

cpcloud commented Jul 15, 2022

@icexelloss @rtpsw I think your proposal is a good starting point.

I think the minimum requirements are:

A way to specify dependencies:

The main point of including support for this is for the plan to be self contained. It's most definitely an optional field. I'm not 100% sold on it, but I'm willing to accept it to move this forward.

  • The python executable path seems like a good idea. It's a lightweight way to encode the case where you control the environment and probably will be the most common scenario.
  • Some kind of oneof message (plus a field for the actual lockfile content) for different lockfile types: conda, requirements, poetry would cover a large range of the ecosystem

A way to specify the expected input data kind

  • This would probably be a oneof message (or perhaps an enum) indicating Arrow or Python
  • This should be independent of the function signature.

A way to specify the entrypoint

  • Somehow the consumer needs to know what thing to import and then call, so we need to add some sort of entry_point field to indicate this.

@rtpsw
Copy link
Author

rtpsw commented Jul 18, 2022

Addressing the items by @cpcloud:

  • Dependencies: I'll try to work out some proposal here. The main change this seems to lead to is the need to launch a Python interpreter, which is currently not done by the PyArrow consumer, in order to pick up the Python executable or the locked dependencies. Basically, one needs to load and interpret the Substrait plan in order to figure out how to set up the interpreter, which doesn't seem ideal. Also, we would need to decide what is the meaning of the option being missing - should a default Python interpreter be launched or should the current interpreter be used, when one already exists (during loading of the plan)?
  • Input data kind: I'm confused about what you have in mind, because I think Substrait already defines a type system that is used by (non-UDF) function extensions that I think is also applicable to UDF extensions. If by a Python kind you mean the Python UDF could return a general Python value, I'm missing how that works for an extension function that, AFAIU, is supposed to have a return type from this Substrait type system.
  • Entry-point: I have working local experimental Ibis/Substrait/Arrow code that supports UDFs used within an expression (as in this issue) and within a read-relation (for user-defined data-sources, an upcoming enhancement). In this code, the entry-point is defined by convention: a UDF used within an expression is a Python function that gets typed inputs and returns a typed output, while a UDF used within a read-relation is a Python function that returns an Arrow stream. If additional kinds of entry-points are needed, then a field can be added with values that cover the entry-point kinds I just described. Also, the function's code can include import statements to allow it to work standalone, after pickling and unpickling, such that the Substrait plan remains self-contained; it's not clear to me that putting these imports within a new Substrait plan field instead is a better way of handling entry-points.

@cpcloud
Copy link
Contributor

cpcloud commented Jul 18, 2022

Addressing the items by @cpcloud:

  • Dependencies: I'll try to work out some proposal here. The main change this seems to lead to is the need to launch a Python interpreter, which is currently not done by the PyArrow consumer, in order to pick up the Python executable or the locked dependencies. Basically, one needs to load and interpret the Substrait plan in order to figure out how to set up the interpreter, which doesn't seem ideal. Also, we would need to decide what is the meaning of the option being missing - should a default Python interpreter be launched or should the current interpreter be used, when one already exists (during loading of the plan)?

Perhaps a SAME_PROCESS enumeration variant would work here.

  • Input data kind: I'm confused about what you have in mind, because I think Substrait already defines a type system that is used by (non-UDF) function extensions that I think is also applicable to UDF extensions. If by a Python kind you mean the Python UDF could return a general Python value, I'm missing how that works for an extension function that, AFAIU, is supposed to have a return type from this Substrait type system.

There's a difference between the type system in Substrait, and the kinds of objects a UDF operates on. For example, a consumer could call a scalar UDF on every element of input(s), or it could send vectors of the input(s). The producer should specify which kind of input the UDF expects so that the consumer can do the right thing. Perhaps "calling convention" is a better name for this, rather than "kind"?

  • Entry-point: I have working local experimental Ibis/Substrait/Arrow code that supports UDFs used within an expression (as in this issue) and within a read-relation (for user-defined data-sources, an upcoming enhancement). In this code, the entry-point is defined by convention: a UDF used within an expression is a Python function that gets typed inputs and returns a typed output, while a UDF used within a read-relation is a Python function that returns an Arrow stream. If additional kinds of entry-points are needed, then a field can be added with values that cover the entry-point kinds I just described. Also, the function's code can include import statements to allow it to work standalone, after pickling and unpickling, such that the Substrait plan remains self-contained; it's not clear to me that putting these imports within a new Substrait plan field instead is a better way of handling entry-points.

An example of what you already have would really help me understand what you're describing. It's difficult to understand the description without something to reference.

@rtpsw
Copy link
Author

rtpsw commented Jul 18, 2022

Perhaps a SAME_PROCESS enumeration variant would work here.

OK, I'll try this.

There's a difference between the type system in Substrait, and the kinds of objects a UDF operates on. For example, a consumer could call a scalar UDF on every element of input(s), or it could send vectors of the input(s). The producer should specify which kind of input the UDF expects so that the consumer can do the right thing. Perhaps "calling convention" is a better name for this, rather than "kind"?

Would "input-shape" be a good term for what you're describing? We currently have analytic, elementwise, and reduction UDFs that are all scalar functions. In general, invocations of scalar functions differ in both input-shape and output-shape. For example, one invocation of an elementwise function could map a scalar shape to a scalar shape whereas another invocation of the same elementwise function could map a vector shape to a vector shape. Note that I'm working on adding tabular UDFs, where the output-shape could be 2-dimensional.

An example of what you already have would really help me understand what you're describing. It's difficult to understand the description without something to reference.

I've answered this elsewhere to point you to my work in progress.

@cpcloud
Copy link
Contributor

cpcloud commented Jul 18, 2022

Perhaps a SAME_PROCESS enumeration variant would work here.

OK, I'll try this.

There's a difference between the type system in Substrait, and the kinds of objects a UDF operates on. For example, a consumer could call a scalar UDF on every element of input(s), or it could send vectors of the input(s). The producer should specify which kind of input the UDF expects so that the consumer can do the right thing. Perhaps "calling convention" is a better name for this, rather than "kind"?

Would "input-shape" be a good term for what you're describing? We currently have analytic, elementwise, and reduction UDFs that are all scalar functions. In general, invocations of scalar functions differ in both input-shape and output-shape. For example, one invocation of an elementwise function could map a scalar shape to a scalar shape whereas another invocation of the same elementwise function could map a vector shape to a vector shape. Note that I'm working on adding tabular UDFs, where the output-shape could be 2-dimensional.

Input shape isn't what I'm describing.

In the simplest SQL engine (ignoring window functions and table functions) there are two kinds of functions

  • scalar functions whose input shape is S and whose output shape is also S.
  • aggregate functions whose input shape is S and whose output shape is 1.

Semantically, scalar functions take a single value in and produce a single value.

The physical representation of those column elements has no effect on the semantic "shape" of the function's output. I am trying to capture this fact here. The UDF must communicate whether it expects to be called with a single scalar value per argument or with a vector per argument.

Imagine a UDF that adds one to every value in a column, using the per-row calling convention:

def add_one(x):
    return x + 1

With a calling convention of SCALAR (not sold on the name at all) the function would be invoked with a single value.

Now imagine the same UDF using VECTOR (again not sold on the name):

def add_one(x):
	return pyarrow.compute.add(x, 1)

This would have no effect on the execution results, but the execution itself will differ quite a bit.

We need to capture this variation in the information produced by the producer so the consumer knows how to call the function and also how to assemble the result of calling it.

An example of what you already have would really help me understand what you're describing. It's difficult to understand the description without something to reference.

I've answered this elsewhere to point you to my work in progress.

Hm, I was looking for an example of the Python code. Can you put the example (or a link to it) here? Apologies if you have to repeat yourself, but it would be good to have the relevant examples with the discussion in one place.

@jvanstraten
Copy link
Contributor

Input data kind: I'm confused about what you have in mind, because I think Substrait already defines a type system that is used by (non-UDF) function extensions that I think is also applicable to UDF extensions.

Please bear in mind that Substrait's type system is entirely abstract: Substrait does not define any representation (in memory or otherwise) of those types, except the protobuf format it uses to represent literals. Formally, a consumer is free to use whatever internal data representation they want, as long as:

  • said representation can represent at least the set of values that Substrait defines that must be representable;
  • any operations on such values within that set should yield values that are still in that set (unless, I guess, the result is explicitly unspecified, but I don't think there are any instances of that today).

For example, it's perfectly fine for all integer types to be represented with some bigint or for floats and doubles to be represented using quads or whatever, as long as an 127i8 + 127i8 "overflows correctly" rather than yielding 254i8 (since that doesn't exist as far as Substrait is concerned). An i8 is obviously fairly likely to be represented as a two's complement byte, but for example for timestamps and intervals the format is far less obvious.

Therefore, just saying that UDFs should use Substrait's type system is not sufficient. Some physical* representation is needed in addition. IMO the most obvious candidate for this would be Arrow, since many systems have been moving toward that over the past few years already... but Substrait's and Arrow's type systems are not 100% compatible. Just using Python objects also isn't sufficient unless you specify which ones (for integers for example, int? some ctype? something from numpy? or maybe a list of ints for slightly more efficient vectorization?). I think the most obvious format to start with would be to use the protobuf literals that are already defined because it's comparably easy, but obviously this would be horribly inefficient in practice, so more efficient formats would need to be added over time.

I think Jacques was suggesting requiring the use of some Substrait-controlled API to access values. I guess that would also work... ish. IMO it mostly just shifts the problem to the signatures of the functions defined by that API, and would try to solve exactly the problem that Arrow was created to solve. But I might be misunderstanding or misremembering what was said.

To me it would make the most sense if:

  • the calling convention would be specified with the UDF by some enum or oneof;
  • for it to be the responsibility of the consumer to convert between the UDF representation and its internal representation if needed; and
  • for it to be the responsibility of the producer or user to ensure that this conversion is not necessary if they prefer performance over consumer compatibility, by using the native format of the consumer they're targeting if possible.

* please excuse the overloaded term. I don't know what else to call it.

@rtpsw
Copy link
Author

rtpsw commented Jul 18, 2022

It sounds like what both of you have in mind is an ABI (though in a data-scientific sense that is narrower than usual), since you're describing the concrete input and output formats of a function. Is this a correct understanding? In my local work, the Substrait consumer didn't have to directly deal with an ABI because it was (as well as the shapes I mentioned were) implied by the kind of function, like elementwise or analytical, that was used. I guess you're asking to make this ABI explicit in the Substrait plan. This is a bit unexpected to me because the Substrait plan tends to keep things abstract; however, I'm not all that experienced with Substrait, so this could be the right approach, and I wouldn't oppose it.

@jvanstraten
Copy link
Contributor

It sounds like what both of you have in mind is an ABI (though in a data-scientific sense that is narrower than usual), since you're describing the concrete input and output formats of a function. Is this a correct understanding?

I think so.

[...] This is a bit unexpected to me because the Substrait plan tends to keep things abstract [...]

The problem is that the implementation of the UDF itself isn't abstract at all; it's bytecode for some specific VM or architecture (Python isn't really a VM, but in this context you can think of the Python interpreter as acting as a VM for the pickled function). So there's a disconnect there. If UDFs were instead implemented in terms of some set of abstract, primitive functions akin to something like LLVM-IR but using Substrait's type system, you wouldn't need this... but then you also wouldn't need UDFs, because you could just describe the functionality using existing expression trees. Not to mention that every consumer would have to implement that complete "Substrait VM".

It could also be that my understanding of UDFs is lacking. I've been monitoring this thread but mostly abstaining from commenting in it because there's a significant gap in my background there.

@rtpsw
Copy link
Author

rtpsw commented Jul 18, 2022

Hm, I was looking for an example of the Python code. Can you put the example (or a link to it) here? Apologies if you have to repeat yourself, but it would be good to have the relevant examples with the discussion in one place.

I'm not sure I understand; are you looking for an example UDF? Here's an example one that works in my local code and fits your VECTOR classification:

def twice(v):
    """Compute twice the value of the input"""
    import pyarrow.compute as pc
    return pc.multiply(v, 2)

The way this UDF is handled in my local code is the following:

  • The Substrait producer pickles the function into a bytes field.
  • The Substrait consumer unpickles the function from this field and invokes it on Arrow arrays.

There is no support in my local code for a function like x * 2 that fits your SCALAR classification. To me it doesn't seem natural to support such slow functions, given that Arrow goes to great lengths to provide high-performance kernel implementations for functions.

@rtpsw
Copy link
Author

rtpsw commented Jul 18, 2022

The problem is that the implementation of the UDF itself isn't abstract at all; it's bytecode for some specific VM or architecture (Python isn't really a VM, but in this context you can think of the Python interpreter as acting as a VM for the pickled function). So there's a disconnect there.

I agree this disconnect exists. The question I'm struggling with is whether resolving this disconnect should be the responsibility of the Substrait plan or the Substrait producer/consumer. The former option would force the Substrait plan to get much more concrete than usual but promises to keep the Substrait plan self-contained (if this is at all possible with UDFs), whereas the latter option would limit the Substrait producer/consumer with which a particular UDF can be used (since its ABI would need to be supported, and perhaps represented within the pickled function object) but promises to keep the substrait plan abstract.

@icexelloss
Copy link
Contributor

icexelloss commented Jul 19, 2022

@cpcloud Thanks for putting the requirement together. I agree with most of what you said.

One thing I wasn't sure about is "A way to specify the entrypoint". If you are talking about "import pandas as pd" before executing the UDF, AFAIK this is handled by cloudpickle already (https://github.com/cloudpipe/cloudpickle/blob/master/cloudpickle/cloudpickle.py#L345) and I don't think we need to do extra work here. Perhaps this doesn't need to be an requirement for the cloudpickled UDF?

@icexelloss
Copy link
Contributor

@cpcloud Any thoughts about #234 (comment) on "entry point"?

@jvanstraten jvanstraten changed the title feat(proto): Add protobuf support for user-defined-function extension feat(proto): add protobuf support for user-defined-function extension Jul 26, 2022
@cpcloud
Copy link
Contributor

cpcloud commented Aug 3, 2022

@icexelloss I think what you're saying makes sense. Since unpickling will result in an instance of the UDF object then there's no real need in the case to re-specify the name of the function anywhere.

@icexelloss
Copy link
Contributor

Thanks. I think what @cpcloud specified above is reasonable requirements. Due to prioritization we will keep this in the backlog for a bit and hopefully find time to put up a updated proposal.

@CLAassistant
Copy link

CLAassistant commented Oct 6, 2022

CLA assistant check
All committers have signed the CLA.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@westonpace
Copy link
Member

This effort appears to have stalled. Can we close this PR and re-open at a later date if you choose to revisit?

@rtpsw
Copy link
Author

rtpsw commented Mar 19, 2023

@icexelloss, is this still needed?

@icexelloss
Copy link
Contributor

Yes I think we can close this as we have a solution without changing substrait spec

@westonpace westonpace closed this May 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants