Skip to content

Commit

Permalink
Adding config option `spark.cosmos.read.responseContinuationTokenLimi…
Browse files Browse the repository at this point in the history
…tInKb` to reduce query continuation token size (Azure#44480)

* Adding config option to reduce query continuation token size

* Changelogs
  • Loading branch information
FabianMeiswinkel authored Mar 4, 2025
1 parent 36b8395 commit 82e0cfe
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 10 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.38.0-beta.1 (Unreleased)

#### Features Added
* Added config option `spark.cosmos.read.responseContinuationTokenLimitInKb` to reduce query continuation token size. - See [PR 44480](https://github.com/Azure/azure-sdk-for-java/pull/44480)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.38.0-beta.1 (Unreleased)

#### Features Added
* Added config option `spark.cosmos.read.responseContinuationTokenLimitInKb` to reduce query continuation token size. - See [PR 44480](https://github.com/Azure/azure-sdk-for-java/pull/44480)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.38.0-beta.1 (Unreleased)

#### Features Added
* Added config option `spark.cosmos.read.responseContinuationTokenLimitInKb` to reduce query continuation token size. - See [PR 44480](https://github.com/Azure/azure-sdk-for-java/pull/44480)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.38.0-beta.1 (Unreleased)

#### Features Added
* Added config option `spark.cosmos.read.responseContinuationTokenLimitInKb` to reduce query continuation token size. - See [PR 44480](https://github.com/Azure/azure-sdk-for-java/pull/44480)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.38.0-beta.1 (Unreleased)

#### Features Added
* Added config option `spark.cosmos.read.responseContinuationTokenLimitInKb` to reduce query continuation token size. - See [PR 44480](https://github.com/Azure/azure-sdk-for-java/pull/44480)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.azure.cosmos
import com.azure.cosmos.implementation.{DocumentCollection, ImplementationBridgeHelpers, PartitionKeyRange, SparkBridgeImplementationInternal}
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl
import com.azure.cosmos.implementation.routing.Range
import com.azure.cosmos.models.{CosmosContainerProperties, CosmosQueryRequestOptions, FeedRange, ModelBridgeInternal}
import com.azure.cosmos.models.{CosmosContainerProperties, CosmosQueryRequestOptions}
import com.azure.cosmos.spark.NormalizedRange

import java.time.Duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private[spark] object CosmosConfigNames {
val AllowInvalidJsonWithDuplicateJsonProperties = "spark.cosmos.read.allowInvalidJsonWithDuplicateJsonProperties"
val ReadCustomQuery = "spark.cosmos.read.customQuery"
val ReadMaxItemCount = "spark.cosmos.read.maxItemCount"
val ReadResponseContinuationTokenLimitInKb = "spark.cosmos.read.responseContinuationTokenLimitInKb"
val ReadPrefetchBufferSize = "spark.cosmos.read.prefetchBufferSize"
val ReadForceEventualConsistency = "spark.cosmos.read.forceEventualConsistency"
val ReadSchemaConversionMode = "spark.cosmos.read.schemaConversionMode"
Expand Down Expand Up @@ -174,6 +175,7 @@ private[spark] object CosmosConfigNames {
ReadForceEventualConsistency,
ReadSchemaConversionMode,
ReadMaxItemCount,
ReadResponseContinuationTokenLimitInKb,
ReadPrefetchBufferSize,
ReadInferSchemaSamplingSize,
ReadInferSchemaEnabled,
Expand Down Expand Up @@ -915,7 +917,8 @@ private case class CosmosReadConfig(forceEventualConsistency: Boolean,
customQuery: Option[CosmosParameterizedQuery],
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
runtimeFilteringEnabled: Boolean,
readManyFilteringConfig: CosmosReadManyFilteringConfig)
readManyFilteringConfig: CosmosReadManyFilteringConfig,
responseContinuationTokenLimitInKb: Option[Int] = None)

private object SchemaConversionModes extends Enumeration {
type SchemaConversionMode = Value
Expand Down Expand Up @@ -962,6 +965,13 @@ private object CosmosReadConfig {
parseFromStringFunction = queryText => queryText.toInt,
helpMessage = "The maximum number of documents returned in a single request. The default is 1000.")

private val ResponseContinuationTokenLimitInKb = CosmosConfigEntry[Int](
key = CosmosConfigNames.ReadResponseContinuationTokenLimitInKb,
mandatory = false,
defaultValue = None,
parseFromStringFunction = queryText => Math.max(1, queryText.toInt),
helpMessage = "The maximum continuation token size allowed in kilo-bytes. It has to be at least 1 KB.")

private val PrefetchBufferSize = CosmosConfigEntry[Int](
key = CosmosConfigNames.ReadPrefetchBufferSize,
mandatory = false,
Expand Down Expand Up @@ -1000,6 +1010,7 @@ private object CosmosReadConfig {
val customQuery = CosmosConfigEntry.parse(cfg, CustomQuery)
val maxItemCount = CosmosConfigEntry.parse(cfg, MaxItemCount)
val prefetchBufferSize = CosmosConfigEntry.parse(cfg, PrefetchBufferSize)
val responseContinuationTokenLimitInKb = CosmosConfigEntry.parse(cfg, ResponseContinuationTokenLimitInKb)
val maxIntegratedCacheStalenessInMilliseconds = CosmosConfigEntry.parse(cfg, MaxIntegratedCacheStalenessInMilliseconds)
val dedicatedGatewayRequestOptions = {
val result = new DedicatedGatewayRequestOptions
Expand Down Expand Up @@ -1034,7 +1045,8 @@ private object CosmosReadConfig {
customQuery,
throughputControlConfigOpt,
runtimeFilteringEnabled.get,
readManyFilteringConfig)
readManyFilteringConfig,
responseContinuationTokenLimitInKb)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ private object CosmosTableSchemaInferrer
val queryOptions = new CosmosQueryRequestOptions()
queryOptions.setMaxBufferedItemCount(cosmosInferenceConfig.inferSchemaSamplingSize)
queryOptions.setDedicatedGatewayRequestOptions(cosmosReadConfig.dedicatedGatewayRequestOptions)
if (cosmosReadConfig.responseContinuationTokenLimitInKb.isDefined) {
queryOptions.setResponseContinuationTokenLimitInKb(
cosmosReadConfig.responseContinuationTokenLimitInKb.get)
}
ThroughputControlHelper.populateThroughputControlGroupName(
ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ private case class ItemsPartitionReader
queryOptions.setDedicatedGatewayRequestOptions(readConfig.dedicatedGatewayRequestOptions)
queryOptions.setCosmosEndToEndOperationLatencyPolicyConfig(endToEndTimeoutPolicy)

if (readConfig.responseContinuationTokenLimitInKb.isDefined) {
queryOptions.setResponseContinuationTokenLimitInKb(
readConfig.responseContinuationTokenLimitInKb.get)
}

ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@

package com.azure.cosmos.spark

import com.azure.cosmos.CosmosAsyncContainer
import com.azure.cosmos.models.{CosmosQueryRequestOptions, SqlParameter, SqlQuerySpec}
import com.azure.cosmos.spark.diagnostics.ILogger
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode

import java.util.UUID
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
var config = CosmosReadConfig.parseCosmosReadConfig(userConfig)

config.forceEventualConsistency shouldBe false
config.responseContinuationTokenLimitInKb shouldBe None
config.schemaConversionMode shouldBe SchemaConversionModes.Strict
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1000
Expand All @@ -407,7 +408,8 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
"spark.cosmos.read.maxItemCount" -> "1000",
"spark.cosmos.read.maxIntegratedCacheStalenessInMS" -> "1000",
"spark.cosmos.read.runtimeFiltering.enabled" -> "false",
"spark.cosmos.read.readManyFiltering.enabled" -> "true"
"spark.cosmos.read.readManyFiltering.enabled" -> "true",
"spark.cosmos.read.responseContinuationTokenLimitInKb" -> "8"
)

config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
Expand All @@ -417,6 +419,7 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1000
config.prefetchBufferSize shouldBe 8
config.responseContinuationTokenLimitInKb shouldBe Some(8)
config.dedicatedGatewayRequestOptions.getMaxIntegratedCacheStaleness shouldBe Duration.ofMillis(1000)
config.runtimeFilteringEnabled shouldBe false
config.readManyFilteringConfig.readManyFilteringEnabled shouldBe true
Expand Down Expand Up @@ -470,7 +473,8 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
userConfig = Map(
"spark.cosmos.read.forceEventualConsistency" -> "false",
"spark.cosmos.read.schemaConversionMode" -> "Strict",
"spark.cosmos.read.maxItemCount" -> "1001"
"spark.cosmos.read.maxItemCount" -> "1001",
"spark.cosmos.read.responseContinuationTokenLimitInKb" -> "0"
)

config = CosmosReadConfig.parseCosmosReadConfig(userConfig)
Expand All @@ -480,6 +484,8 @@ class CosmosConfigSpec extends UnitSpec with BasicLoggingTrait {
config.customQuery shouldBe empty
config.maxItemCount shouldBe 1001
config.prefetchBufferSize shouldBe 1
// forcing min value to be 1
config.responseContinuationTokenLimitInKb shouldBe Some(1)
}

it should "parse custom query option of read configuration" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ abstract class SparkE2EQueryITestBase
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.maxItemCount" -> "1",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.responseContinuationTokenLimitInKb" -> "1"
)

val df = spark.read.format("cosmos.oltp").options(cfg).load()
Expand Down

0 comments on commit 82e0cfe

Please sign in to comment.