Skip to content

[SPARK-52218][SQL] Make current datetime functions evaluable again #50936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,16 @@ trait SparkDateTimeUtils {
}

/**
* Gets the number of microseconds since midnight using the session time zone.
* Gets the number of microseconds since midnight using the given time zone.
*/
def instantToMicrosOfDay(instant: Instant, timezone: String): Long = {
val zoneId = getZoneId(timezone)
instantToMicrosOfDay(instant, getZoneId(timezone))
}

/**
* Gets the number of microseconds since midnight using the given time zone.
*/
def instantToMicrosOfDay(instant: Instant, zoneId: ZoneId): Long = {
val localDateTime = LocalDateTime.ofInstant(instant, zoneId)
localDateTime.toLocalTime.getLong(MICRO_OF_DAY)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,16 @@ object ExpressionPatternBitMask {
}
}


/**
* An expression that cannot be evaluated but is guaranteed to be replaced with a foldable value
* by query optimizer (e.g. CurrentDate).
* An expression that cannot be evaluated. These expressions don't live past analysis or
* optimization time (e.g. Star) and should not be evaluated during query planning and
* execution.
*/
trait FoldableUnevaluable extends Expression {
override def foldable: Boolean = true
trait Unevaluable extends Expression {

/** Unevaluable is not foldable because we don't have an eval for it. */
final override def foldable: Boolean = false

override def eval(input: InternalRow = null): Any =
throw QueryExecutionErrors.cannotEvaluateExpressionError(this)
Expand All @@ -417,19 +421,6 @@ trait FoldableUnevaluable extends Expression {
throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this)
}

/**
* An expression that cannot be evaluated. These expressions don't live past analysis or
* optimization time (e.g. Star) and should not be evaluated during query planning and
* execution.
*/
trait Unevaluable extends Expression with FoldableUnevaluable {

/** Unevaluable is not foldable by default because we don't have an eval for it.
* Exception are expressions that will be replaced by a literal by Optimizer (e.g. CurrentDate).
* Hence we allow overriding overriding of this field in special cases.
*/
final override def foldable: Boolean = false
}

/**
* An expression that gets replaced at runtime (currently by the optimizer) into a different
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,18 @@ case class CurrentTimeZone()
since = "1.5.0")
// scalastyle:on line.size.limit
case class CurrentDate(timeZoneId: Option[String] = None)
extends LeafExpression with TimeZoneAwareExpression with FoldableUnevaluable {
extends LeafExpression with TimeZoneAwareExpression with CodegenFallback {
def this() = this(None)
override def foldable: Boolean = true
override def nullable: Boolean = false
override def dataType: DataType = DateType
final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE)
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def prettyName: String = "current_date"

override def eval(input: InternalRow): Any = currentDate(zoneId)
}

// scalastyle:off line.size.limit
Expand All @@ -180,9 +183,11 @@ object CurDateExpressionBuilder extends ExpressionBuilder {
}
}

abstract class CurrentTimestampLike() extends LeafExpression with FoldableUnevaluable {
abstract class CurrentTimestampLike() extends LeafExpression with CodegenFallback {
override def foldable: Boolean = true
override def nullable: Boolean = false
override def dataType: DataType = TimestampType
override def eval(input: InternalRow): Any = instantToMicros(java.time.Instant.now())
final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE)
}

Expand Down Expand Up @@ -246,13 +251,15 @@ case class Now() extends CurrentTimestampLike {
group = "datetime_funcs",
since = "3.4.0")
case class LocalTimestamp(timeZoneId: Option[String] = None) extends LeafExpression
with TimeZoneAwareExpression with FoldableUnevaluable {
with TimeZoneAwareExpression with CodegenFallback {
def this() = this(None)
override def foldable: Boolean = true
override def nullable: Boolean = false
override def dataType: DataType = TimestampNTZType
final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE)
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
override def eval(input: InternalRow): Any = localDateTimeToMicros(LocalDateTime.now(zoneId))
override def prettyName: String = "localtimestamp"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst.expressions
import java.time.DateTimeException
import java.util.Locale

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, TypeCheckResult}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType, toSQLValue}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.catalyst.trees.TreePattern.{CURRENT_LIKE, TreePattern}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.TimeFormatter
import org.apache.spark.sql.catalyst.util.TypeUtils.{ordinalNumber}
import org.apache.spark.sql.catalyst.util.TypeUtils.ordinalNumber
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.types.StringTypeWithCollation
import org.apache.spark.sql.types.{AbstractDataType, DataType, DecimalType, IntegerType, ObjectType, TimeType, TypeCollection}
Expand Down Expand Up @@ -429,17 +431,25 @@ object SecondExpressionBuilder extends ExpressionBuilder {
group = "datetime_funcs",
since = "4.1.0"
)
case class CurrentTime(child: Expression = Literal(TimeType.MICROS_PRECISION))
extends UnaryExpression with FoldableUnevaluable with ImplicitCastInputTypes {
case class CurrentTime(
child: Expression = Literal(TimeType.MICROS_PRECISION),
timeZoneId: Option[String] = None) extends UnaryExpression
with TimeZoneAwareExpression with ImplicitCastInputTypes with CodegenFallback {

def this() = {
this(Literal(TimeType.MICROS_PRECISION))
this(Literal(TimeType.MICROS_PRECISION), None)
}

final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE)
def this(child: Expression) = {
this(child, None)
}

final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE)

override def nullable: Boolean = false

override def foldable: Boolean = true

override def checkInputDataTypes(): TypeCheckResult = {
// Check foldability
if (!child.foldable) {
Expand Down Expand Up @@ -496,11 +506,19 @@ case class CurrentTime(child: Expression = Literal(TimeType.MICROS_PRECISION))

override def prettyName: String = "current_time"

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(child = newChild)
}

override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType)

override def eval(input: InternalRow): Any = {
val currentTimeOfDayMicros = DateTimeUtils.instantToMicrosOfDay(java.time.Instant.now(), zoneId)
DateTimeUtils.truncateTimeMicrosToPrecision(currentTimeOfDayMicros, precision)
}
}

// scalastyle:off line.size.limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
Seq(CurrentTime())
)
)
val resolved = ResolveInlineTables(table)
val resolved = ResolveInlineTables(ResolveTimeZone(table))
assert(resolved.isInstanceOf[ResolvedInlineTable],
"Expected an inline table to be resolved into a ResolvedInlineTable")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.language.postfixOps
import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark.{SparkArithmeticException, SparkDateTimeException, SparkException, SparkFunSuite, SparkIllegalArgumentException, SparkUpgradeException}
import org.apache.spark.{SparkArithmeticException, SparkDateTimeException, SparkFunSuite, SparkIllegalArgumentException, SparkUpgradeException}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter}
Expand Down Expand Up @@ -2140,14 +2140,4 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
}

test("datetime function CurrentDate and localtimestamp are Unevaluable") {
checkError(exception = intercept[SparkException] { CurrentDate(UTC_OPT).eval(EmptyRow) },
condition = "INTERNAL_ERROR",
parameters = Map("message" -> "Cannot evaluate expression: current_date(Some(UTC))"))

checkError(exception = intercept[SparkException] { LocalTimestamp(UTC_OPT).eval(EmptyRow) },
condition = "INTERNAL_ERROR",
parameters = Map("message" -> "Cannot evaluate expression: localtimestamp(Some(UTC))"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -919,17 +919,26 @@ private[hive] trait HiveInspectors {
// We will enumerate all of the possible constant expressions, throw exception if we missed
case Literal(_, dt) =>
throw SparkException.internalError(s"Hive doesn't support the constant type [$dt].")
// FoldableUnevaluable will be replaced with a foldable value in FinishAnalysis rule,
// skip eval() for them.
case _ if expr.collectFirst { case e: FoldableUnevaluable => e }.isDefined =>
toInspector(expr.dataType)
// ideally, we don't test the foldable here(but in optimizer), however, some of the
// Hive UDF / UDAF requires its argument to be constant objectinspector, we do it eagerly.
case _ if expr.foldable => toInspector(Literal.create(expr.eval(), expr.dataType))
case _ if expr.foldable && canEarlyEval(expr) =>
toInspector(Literal.create(expr.eval(), expr.dataType))
// For those non constant expression, map to object inspector according to its data type
case _ => toInspector(expr.dataType)
}

// TODO: hard-coding a list here is not very robust. A better idea is to have some kind of query
// context to pre-evaluate these current datetime values, and evaluating these expressions
// just get the pre-evaluated values from the query context, so that we don't need to wait
// for the rule `FinishAnalysis` to compute the values.
private def canEarlyEval(e: Expression): Boolean = e match {
case _: CurrentDate => false
case _: CurrentTime => false
case _: CurrentTimestampLike => false
case _: LocalTimestamp => false
case _ => e.children.forall(canEarlyEval)
}

def inspectorToDataType(inspector: ObjectInspector): DataType = inspector match {
case s: StructObjectInspector =>
StructType(s.getAllStructFieldRefs.asScala.map(f =>
Expand Down