diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala index b16ee9ad1929a..6a51799e1132d 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkDateTimeUtils.scala @@ -134,10 +134,16 @@ trait SparkDateTimeUtils { } /** - * Gets the number of microseconds since midnight using the session time zone. + * Gets the number of microseconds since midnight using the given time zone. */ def instantToMicrosOfDay(instant: Instant, timezone: String): Long = { - val zoneId = getZoneId(timezone) + instantToMicrosOfDay(instant, getZoneId(timezone)) + } + + /** + * Gets the number of microseconds since midnight using the given time zone. + */ + def instantToMicrosOfDay(instant: Instant, zoneId: ZoneId): Long = { val localDateTime = LocalDateTime.ofInstant(instant, zoneId) localDateTime.toLocalTime.getLong(MICRO_OF_DAY) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 7b349c6311eef..cfdad6e9a51fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -403,12 +403,16 @@ object ExpressionPatternBitMask { } } + /** - * An expression that cannot be evaluated but is guaranteed to be replaced with a foldable value - * by query optimizer (e.g. CurrentDate). + * An expression that cannot be evaluated. These expressions don't live past analysis or + * optimization time (e.g. Star) and should not be evaluated during query planning and + * execution. */ -trait FoldableUnevaluable extends Expression { - override def foldable: Boolean = true +trait Unevaluable extends Expression { + + /** Unevaluable is not foldable because we don't have an eval for it. */ + final override def foldable: Boolean = false override def eval(input: InternalRow = null): Any = throw QueryExecutionErrors.cannotEvaluateExpressionError(this) @@ -417,19 +421,6 @@ trait FoldableUnevaluable extends Expression { throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this) } -/** - * An expression that cannot be evaluated. These expressions don't live past analysis or - * optimization time (e.g. Star) and should not be evaluated during query planning and - * execution. - */ -trait Unevaluable extends Expression with FoldableUnevaluable { - - /** Unevaluable is not foldable by default because we don't have an eval for it. - * Exception are expressions that will be replaced by a literal by Optimizer (e.g. CurrentDate). - * Hence we allow overriding overriding of this field in special cases. - */ - final override def foldable: Boolean = false -} /** * An expression that gets replaced at runtime (currently by the optimizer) into a different diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index b4b81dade83fc..4ae8883dae043 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -145,8 +145,9 @@ case class CurrentTimeZone() since = "1.5.0") // scalastyle:on line.size.limit case class CurrentDate(timeZoneId: Option[String] = None) - extends LeafExpression with TimeZoneAwareExpression with FoldableUnevaluable { + extends LeafExpression with TimeZoneAwareExpression with CodegenFallback { def this() = this(None) + override def foldable: Boolean = true override def nullable: Boolean = false override def dataType: DataType = DateType final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE) @@ -154,6 +155,8 @@ case class CurrentDate(timeZoneId: Option[String] = None) copy(timeZoneId = Option(timeZoneId)) override def prettyName: String = "current_date" + + override def eval(input: InternalRow): Any = currentDate(zoneId) } // scalastyle:off line.size.limit @@ -180,9 +183,11 @@ object CurDateExpressionBuilder extends ExpressionBuilder { } } -abstract class CurrentTimestampLike() extends LeafExpression with FoldableUnevaluable { +abstract class CurrentTimestampLike() extends LeafExpression with CodegenFallback { + override def foldable: Boolean = true override def nullable: Boolean = false override def dataType: DataType = TimestampType + override def eval(input: InternalRow): Any = instantToMicros(java.time.Instant.now()) final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE) } @@ -246,13 +251,15 @@ case class Now() extends CurrentTimestampLike { group = "datetime_funcs", since = "3.4.0") case class LocalTimestamp(timeZoneId: Option[String] = None) extends LeafExpression - with TimeZoneAwareExpression with FoldableUnevaluable { + with TimeZoneAwareExpression with CodegenFallback { def this() = this(None) + override def foldable: Boolean = true override def nullable: Boolean = false override def dataType: DataType = TimestampNTZType final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE) override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = copy(timeZoneId = Option(timeZoneId)) + override def eval(input: InternalRow): Any = localDateTimeToMicros(LocalDateTime.now(zoneId)) override def prettyName: String = "localtimestamp" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala index 0f9405f43634c..47f2d5d73e212 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala @@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst.expressions import java.time.DateTimeException import java.util.Locale +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, TypeCheckResult} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} import org.apache.spark.sql.catalyst.expressions.Cast.{toSQLExpr, toSQLId, toSQLType, toSQLValue} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.catalyst.trees.TreePattern.{CURRENT_LIKE, TreePattern} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.TimeFormatter -import org.apache.spark.sql.catalyst.util.TypeUtils.{ordinalNumber} +import org.apache.spark.sql.catalyst.util.TypeUtils.ordinalNumber import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.types.StringTypeWithCollation import org.apache.spark.sql.types.{AbstractDataType, DataType, DecimalType, IntegerType, ObjectType, TimeType, TypeCollection} @@ -429,17 +431,25 @@ object SecondExpressionBuilder extends ExpressionBuilder { group = "datetime_funcs", since = "4.1.0" ) -case class CurrentTime(child: Expression = Literal(TimeType.MICROS_PRECISION)) - extends UnaryExpression with FoldableUnevaluable with ImplicitCastInputTypes { +case class CurrentTime( + child: Expression = Literal(TimeType.MICROS_PRECISION), + timeZoneId: Option[String] = None) extends UnaryExpression + with TimeZoneAwareExpression with ImplicitCastInputTypes with CodegenFallback { def this() = { - this(Literal(TimeType.MICROS_PRECISION)) + this(Literal(TimeType.MICROS_PRECISION), None) } - final override val nodePatterns: Seq[TreePattern] = Seq(CURRENT_LIKE) + def this(child: Expression) = { + this(child, None) + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(CURRENT_LIKE) override def nullable: Boolean = false + override def foldable: Boolean = true + override def checkInputDataTypes(): TypeCheckResult = { // Check foldability if (!child.foldable) { @@ -496,11 +506,19 @@ case class CurrentTime(child: Expression = Literal(TimeType.MICROS_PRECISION)) override def prettyName: String = "current_time" + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + override protected def withNewChildInternal(newChild: Expression): Expression = { copy(child = newChild) } override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType) + + override def eval(input: InternalRow): Any = { + val currentTimeOfDayMicros = DateTimeUtils.instantToMicrosOfDay(java.time.Instant.now(), zoneId) + DateTimeUtils.truncateTimeMicrosToPrecision(currentTimeOfDayMicros, precision) + } } // scalastyle:off line.size.limit diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index 662c740e2f96c..af7c6eb18ba83 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -121,7 +121,7 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { Seq(CurrentTime()) ) ) - val resolved = ResolveInlineTables(table) + val resolved = ResolveInlineTables(ResolveTimeZone(table)) assert(resolved.isInstanceOf[ResolvedInlineTable], "Expected an inline table to be resolved into a ResolvedInlineTable") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 2ddddad7a2942..f166dfb2d7e7c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -28,7 +28,7 @@ import scala.language.postfixOps import scala.reflect.ClassTag import scala.util.Random -import org.apache.spark.{SparkArithmeticException, SparkDateTimeException, SparkException, SparkFunSuite, SparkIllegalArgumentException, SparkUpgradeException} +import org.apache.spark.{SparkArithmeticException, SparkDateTimeException, SparkFunSuite, SparkIllegalArgumentException, SparkUpgradeException} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter} @@ -2140,14 +2140,4 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } } - - test("datetime function CurrentDate and localtimestamp are Unevaluable") { - checkError(exception = intercept[SparkException] { CurrentDate(UTC_OPT).eval(EmptyRow) }, - condition = "INTERNAL_ERROR", - parameters = Map("message" -> "Cannot evaluate expression: current_date(Some(UTC))")) - - checkError(exception = intercept[SparkException] { LocalTimestamp(UTC_OPT).eval(EmptyRow) }, - condition = "INTERNAL_ERROR", - parameters = Map("message" -> "Cannot evaluate expression: localtimestamp(Some(UTC))")) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index c0169bcbb8791..087f68ce3c790 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -919,17 +919,26 @@ private[hive] trait HiveInspectors { // We will enumerate all of the possible constant expressions, throw exception if we missed case Literal(_, dt) => throw SparkException.internalError(s"Hive doesn't support the constant type [$dt].") - // FoldableUnevaluable will be replaced with a foldable value in FinishAnalysis rule, - // skip eval() for them. - case _ if expr.collectFirst { case e: FoldableUnevaluable => e }.isDefined => - toInspector(expr.dataType) // ideally, we don't test the foldable here(but in optimizer), however, some of the // Hive UDF / UDAF requires its argument to be constant objectinspector, we do it eagerly. - case _ if expr.foldable => toInspector(Literal.create(expr.eval(), expr.dataType)) + case _ if expr.foldable && canEarlyEval(expr) => + toInspector(Literal.create(expr.eval(), expr.dataType)) // For those non constant expression, map to object inspector according to its data type case _ => toInspector(expr.dataType) } + // TODO: hard-coding a list here is not very robust. A better idea is to have some kind of query + // context to pre-evaluate these current datetime values, and evaluating these expressions + // just get the pre-evaluated values from the query context, so that we don't need to wait + // for the rule `FinishAnalysis` to compute the values. + private def canEarlyEval(e: Expression): Boolean = e match { + case _: CurrentDate => false + case _: CurrentTime => false + case _: CurrentTimestampLike => false + case _: LocalTimestamp => false + case _ => e.children.forall(canEarlyEval) + } + def inspectorToDataType(inspector: ObjectInspector): DataType = inspector match { case s: StructObjectInspector => StructType(s.getAllStructFieldRefs.asScala.map(f =>