-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52544][SQL] Allow configuring Json datasource string length limit through SQLConf #51235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5016,6 +5016,14 @@ object SQLConf { | |
.booleanConf | ||
.createWithDefault(false) | ||
|
||
val JSON_MAX_STRING_LENGTH = | ||
buildConf("spark.sql.json.defaultMaxStringLength") | ||
.doc("Global default maximum string length limit when reading JSON data. It will be " + | ||
"overridden if a JSONOption maxStringLen is provided.") | ||
.version("3.5.0") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 4.1.0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be backported to 3.5 to fix a regression, how about |
||
.intConf | ||
.createWithDefault(Int.MaxValue) | ||
|
||
val VARIANT_ALLOW_DUPLICATE_KEYS = | ||
buildConf("spark.sql.variant.allowDuplicateKeys") | ||
.internal() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,7 @@ import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId} | |
import java.util.Locale | ||
import java.util.concurrent.atomic.AtomicLong | ||
|
||
import com.fasterxml.jackson.core.JsonFactory | ||
import com.fasterxml.jackson.core.{JsonFactory, StreamReadConstraints} | ||
import org.apache.commons.lang3.exception.ExceptionUtils | ||
import org.apache.hadoop.fs.{Path, PathFilter} | ||
import org.apache.hadoop.io.SequenceFile.CompressionType | ||
|
@@ -4096,6 +4096,49 @@ abstract class JsonSuite | |
Row("{null, bad json}"), Row("{[1,2,3], null}")) | ||
) | ||
} | ||
|
||
test("Test JSON data source maxStringLen option") { | ||
// Create a JSON string that is way longer than DEFAULT_MAX_STRING_LEN. | ||
val longStringSize = StreamReadConstraints.DEFAULT_MAX_STRING_LEN * 10 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does |
||
val longString: String = "a" * longStringSize | ||
val longStringJson = s"""{ "longString": "$longString" }""" | ||
|
||
withTempDir { tmpDir => | ||
// Create a JSON file with a string that is longer than DEFAULT_MAX_STRING_LEN. | ||
val inputPath = new File(tmpDir, "input.json").toPath | ||
Files.write(inputPath, longStringJson.getBytes) | ||
|
||
// With JSON_MAX_STRING_LENGTH set to Int.max, should be able to read and write | ||
// the long string. | ||
withSQLConf(SQLConf.JSON_MAX_STRING_LENGTH.key -> s"${Int.MaxValue}") { | ||
val df = spark.read.schema("longString string") | ||
.json(inputPath.toString) | ||
assert(df.collect() === Row(longString) :: Nil) | ||
|
||
val e = intercept[SparkException] { | ||
spark.read.schema("longString string") | ||
.option("maxStringLen", 10) | ||
.option("mode", "FAILFAST") | ||
.json(inputPath.toString) | ||
.collect() | ||
} | ||
assert(e.getCause.getMessage.contains( | ||
"Malformed records are detected in record parsing")) | ||
} | ||
|
||
withSQLConf(SQLConf.JSON_MAX_STRING_LENGTH.key -> s"${longStringSize - 1}") { | ||
val e = intercept[SparkException] { | ||
spark.read.schema("longString string") | ||
.option("mode", "FAILFAST") | ||
.json(inputPath.toString) | ||
.collect() | ||
} | ||
assert(e.getCondition.startsWith("FAILED_READ_FILE")) | ||
assert(e.getCause.getMessage.contains( | ||
"Malformed records are detected in record parsing")) | ||
} | ||
} | ||
} | ||
} | ||
|
||
class JsonV1Suite extends JsonSuite { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we follow
ParquetOptions
and pass aSQLConf
instance to constructJSONOptions
?