Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.CacheFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
Expand Down Expand Up @@ -1746,6 +1747,8 @@ public abstract static class ReadChangeStream

abstract String getChangeStreamName();

abstract @Nullable List<String> getTvfNameList();

abstract @Nullable String getMetadataInstance();

abstract @Nullable String getMetadataDatabase();
Expand Down Expand Up @@ -1783,6 +1786,8 @@ abstract static class Builder {

abstract Builder setChangeStreamName(String changeStreamName);

abstract Builder setTvfNameList(List<String> tvfNameList);

abstract Builder setMetadataInstance(String metadataInstance);

abstract Builder setMetadataDatabase(String metadataDatabase);
Expand Down Expand Up @@ -1861,6 +1866,11 @@ public ReadChangeStream withChangeStreamName(String changeStreamName) {
return toBuilder().setChangeStreamName(changeStreamName).build();
}

/** Specifies the list of TVF names to query and union. */
public ReadChangeStream withTvfNameList(List<String> tvfNameList) {
return toBuilder().setTvfNameList(tvfNameList).build();
}

/** Specifies the metadata database. */
public ReadChangeStream withMetadataInstance(String metadataInstance) {
return toBuilder().setMetadataInstance(metadataInstance).build();
Expand Down Expand Up @@ -2042,6 +2052,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
? MAX_INCLUSIVE_END_AT
: getInclusiveEndAt();
final List<String> tvfNameList = getTvfNameList();
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
Expand All @@ -2051,10 +2062,19 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
isMutableChangeStream(
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
LOG.info("The change stream {} is mutable: {}", changeStreamName, isMutableChangeStream);
if (tvfNameList != null && !tvfNameList.isEmpty()) {
if (!isMutableChangeStream) {
throw new IllegalArgumentException(
"tvfNameList is only supported for change streams with MUTABLE_KEY_RANGE mode");
}
// TODO: if !per_placement_tvf=true, throw exception.
checkTvfExistence(spannerAccessor.getDatabaseClient(), tvfNameList);
}
final DaoFactory daoFactory =
new DaoFactory(
changeStreamSpannerConfig,
changeStreamName,
tvfNameList,
partitionMetadataSpannerConfig,
partitionMetadataTableNames,
rpcPriority,
Expand Down Expand Up @@ -2754,6 +2774,50 @@ static String resolveSpannerProjectId(SpannerConfig config) {
: config.getProjectId().get();
}

@VisibleForTesting
static void checkTvfExistence(DatabaseClient databaseClient, List<String> tvfNameList) {
if (tvfNameList == null || tvfNameList.isEmpty()) {
return;
}
Dialect dialect = databaseClient.getDialect();
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
StringBuilder sql =
new StringBuilder(
"SELECT routine_name FROM information_schema.routines WHERE routine_type LIKE '%FUNCTION' AND routine_name IN (");
for (int i = 0; i < tvfNameList.size(); i++) {
if (dialect == Dialect.POSTGRESQL) {
sql.append("$").append(i + 1);
} else {
sql.append("@p").append(i);
}
if (i < tvfNameList.size() - 1) {
sql.append(", ");
}
}
sql.append(")");
Statement.Builder builder = Statement.newBuilder(sql.toString());
for (int i = 0; i < tvfNameList.size(); i++) {
if (dialect == Dialect.POSTGRESQL) {
builder.bind("p" + (i + 1)).to(PartitionMetadataDao.escapeTvfName(tvfNameList.get(i)));
} else {
builder.bind("p" + i).to(PartitionMetadataDao.escapeTvfName(tvfNameList.get(i)));
}
}
Statement statement = builder.build();
ResultSet resultSet = tx.executeQuery(statement);
java.util.Set<String> foundNames = new java.util.HashSet<>();
while (resultSet.next()) {
foundNames.add(resultSet.getString(0));
}
for (String tvfName : tvfNameList) {
if (!foundNames.contains(PartitionMetadataDao.escapeTvfName(tvfName))) {
throw new IllegalArgumentException(
"TVF specified: " + tvfName + " is not found in the existing TVF's: " + foundNames);
}
}
}
}

@VisibleForTesting
static boolean isMutableChangeStream(
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
Expand Down Expand Up @@ -64,6 +65,18 @@ public class ChangeStreamsConstants {
/** The sliding window size in seconds for throughput reporting. */
public static final int THROUGHPUT_WINDOW_SECONDS = 10;

/**
* The delimiter used to separate the partition token and the tvf name. Note this string does not
* exist in the partition token itself.
*/
public static final String PARTITION_TOKEN_TVF_NAME_DELIMITER = "#";

/** The default tvf name for a change stream query is the empty {@link String}. */
public static final String DEFAULT_TVF_NAME = "";

/** The default tvf name list to query and union is empty {@link Collections.emptyList()}. */
public static final List<String> DEFAULT_TVF_NAME_LIST = Collections.emptyList();

/**
* We use the following partition token to provide an estimate size of a partition token. A usual
* partition token has around 140 characters.
Expand All @@ -85,6 +98,7 @@ public class ChangeStreamsConstants {
.setState(State.CREATED)
.setWatermark(Timestamp.now())
.setCreatedAt(Timestamp.now())
.setTvfName(DEFAULT_TVF_NAME)
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,17 @@ private void processChildPartition(
record.getStartTimestamp(),
partition.getEndTimestamp(),
partition.getHeartbeatMillis(),
partition.getTvfName(),
childPartition);
LOG.debug("[{}] Inserting child partition token {}", partitionToken, childPartitionToken);
final Boolean insertedRow =
partitionMetadataDao
.runInTransaction(
transaction -> {
if (transaction.getPartition(childPartitionToken) == null) {
if (transaction.getPartition(
PartitionMetadataDao.composePartitionTokenWithTvfName(
childPartitionToken, partition.getTvfName()))
== null) {
transaction.insert(row);
return true;
} else {
Expand All @@ -188,13 +192,15 @@ private PartitionMetadata toPartitionMetadata(
Timestamp startTimestamp,
Timestamp endTimestamp,
long heartbeatMillis,
String tvfName,
ChildPartition childPartition) {
return PartitionMetadata.newBuilder()
.setPartitionToken(childPartition.getToken())
.setParentTokens(childPartition.getParentTokens())
.setStartTimestamp(startTimestamp)
.setEndTimestamp(endTimestamp)
.setHeartbeatMillis(heartbeatMillis)
.setTvfName(tvfName)
.setState(CREATED)
.setWatermark(startTimestamp)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,14 @@ private ProcessContinuation schedulePartitions(
}

private Timestamp updateBatchToScheduled(List<PartitionMetadata> batchPartitions) {
final List<String> batchPartitionTokens =
final List<String> batchComposedPartitionTokens =
batchPartitions.stream()
.map(PartitionMetadata::getPartitionToken)
.map(
partition ->
PartitionMetadataDao.composePartitionTokenWithTvfName(
partition.getPartitionToken(), partition.getTvfName()))
.collect(Collectors.toList());
return dao.updateToScheduled(batchPartitionTokens);
return dao.updateToScheduled(batchComposedPartitionTokens);
}

private void outputBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private void processStartPartition(
.setStartTimestamp(record.getStartTimestamp())
.setEndTimestamp(partition.getEndTimestamp())
.setHeartbeatMillis(partition.getHeartbeatMillis())
.setTvfName(partition.getTvfName())
.setState(CREATED)
.setWatermark(record.getStartTimestamp())
.build();
Expand All @@ -146,7 +147,10 @@ private void processStartPartition(
partitionMetadataDao
.runInTransaction(
transaction -> {
if (transaction.getPartition(startPartitionToken) == null) {
if (transaction.getPartition(
PartitionMetadataDao.composePartitionTokenWithTvfName(
startPartitionToken, partition.getTvfName()))
== null) {
transaction.insert(row);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,14 @@ public ProcessContinuation run(
ManualWatermarkEstimator<Instant> watermarkEstimator,
BundleFinalizer bundleFinalizer) {
final String token = partition.getPartitionToken();
final String tvfName = partition.getTvfName();

// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
// ReadChangeStreamPartitionDoFn#processElement is called
final PartitionMetadata updatedPartition =
Optional.ofNullable(partitionMetadataDao.getPartition(token))
Optional.ofNullable(
partitionMetadataDao.getPartition(
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName)))
.map(partitionMetadataMapper::from)
.orElseThrow(
() ->
Expand Down Expand Up @@ -223,7 +226,11 @@ public ProcessContinuation run(

try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
token,
tvfName,
startTimestamp,
changeStreamQueryEndTimestamp,
partition.getHeartbeatMillis())) {

metrics.incQueryCounter();
while (resultSet.next()) {
Expand Down Expand Up @@ -298,7 +305,9 @@ public ProcessContinuation run(
LOG.debug("[{}] Continuation present, returning {}", token, maybeContinuation);
bundleFinalizer.afterBundleCommit(
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
updateWatermarkCallback(token, watermarkEstimator));
updateWatermarkCallback(
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName),
watermarkEstimator));
return maybeContinuation.get();
}
}
Expand Down Expand Up @@ -361,25 +370,27 @@ public ProcessContinuation run(
LOG.debug("[{}] Finishing partition", token);
// TODO: This should be performed after the commit succeeds. Since bundle finalizers are not
// guaranteed to be called, this needs to be performed in a subsequent fused stage.
partitionMetadataDao.updateToFinished(token);
partitionMetadataDao.updateToFinished(
PartitionMetadataDao.composePartitionTokenWithTvfName(token, tvfName));
metrics.decActivePartitionReadCounter();
LOG.info("[{}] After attempting to finish the partition", token);
return ProcessContinuation.stop();
}

private BundleFinalizer.Callback updateWatermarkCallback(
String token, WatermarkEstimator<Instant> watermarkEstimator) {
String composedToken, WatermarkEstimator<Instant> watermarkEstimator) {
return () -> {
final Instant watermark = watermarkEstimator.currentWatermark();
LOG.debug("[{}] Updating current watermark to {}", token, watermark);
LOG.debug("[{}] Updating current watermark to {}", composedToken, watermark);
try {
partitionMetadataDao.updateWatermark(
token, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 1_000L));
composedToken, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 1_000L));
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
LOG.debug("[{}] Unable to update the current watermark, partition NOT FOUND", token);
LOG.debug(
"[{}] Unable to update the current watermark, partition NOT FOUND", composedToken);
} else {
LOG.error("[{}] Error updating the current watermark", token, e);
LOG.error("[{}] Error updating the current watermark", composedToken, e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;

import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_TVF_NAME;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class ChangeStreamDao {
*/
public ChangeStreamResultSet changeStreamQuery(
String partitionToken,
String tvfName,
Timestamp startTimestamp,
Timestamp endTimestamp,
long heartbeatMillis) {
Expand All @@ -95,10 +98,14 @@ public ChangeStreamResultSet changeStreamQuery(
if (this.isPostgres()) {
// Ensure we have determined whether change stream uses mutable key range
if (this.isMutableChangeStream) {
query =
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
+ changeStreamName
+ "\"($1, $2, $3, $4, null)";
if (tvfName == null || tvfName.equals(DEFAULT_TVF_NAME)) {
query =
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
+ changeStreamName
+ "\"($1, $2, $3, $4, null)";
} else {
query = "SELECT * FROM \"spanner\".\"" + tvfName + "\"($1, $2, $3, $4, null)";
}
} else {
query =
"SELECT * FROM \"spanner\".\"read_json_"
Expand All @@ -117,16 +124,29 @@ public ChangeStreamResultSet changeStreamQuery(
.to(heartbeatMillis)
.build();
} else {
query =
"SELECT * FROM READ_"
+ changeStreamName
+ "("
+ " start_timestamp => @startTimestamp,"
+ " end_timestamp => @endTimestamp,"
+ " partition_token => @partitionToken,"
+ " read_options => null,"
+ " heartbeat_milliseconds => @heartbeatMillis"
+ ")";
if (this.isMutableChangeStream && tvfName != null && !tvfName.equals(DEFAULT_TVF_NAME)) {
query =
"SELECT * FROM "
+ tvfName
+ "("
+ " start_timestamp => @startTimestamp,"
+ " end_timestamp => @endTimestamp,"
+ " partition_token => @partitionToken,"
+ " read_options => null,"
+ " heartbeat_milliseconds => @heartbeatMillis"
+ ")";
} else {
query =
"SELECT * FROM READ_"
+ changeStreamName
+ "("
+ " start_timestamp => @startTimestamp,"
+ " end_timestamp => @endTimestamp,"
+ " partition_token => @partitionToken,"
+ " read_options => null,"
+ " heartbeat_milliseconds => @heartbeatMillis"
+ ")";
}
statement =
Statement.newBuilder(query)
.bind("startTimestamp")
Expand Down
Loading
Loading