Skip to content

Commit

Permalink
refactor: use latest cdc client (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
Emrehzl94 authored Aug 30, 2024
1 parent 08680b2 commit fe279cc
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object StreamsTransactionEventExtensions {
this.meta.username,
this.meta.username,
"unknown",
"unknown",
CaptureMode.OFF,
"unknown",
"unknown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ class ChangeEventExtensionsTest {
"service",
"neo4j",
"server-1",
"neo4j",
CaptureMode.DIFF,
"bolt",
"127.0.0.1:32000",
Expand Down Expand Up @@ -1291,6 +1292,7 @@ class ChangeEventExtensionsTest {
"service",
"neo4j",
"server-1",
"neo4j",
CaptureMode.DIFF,
"bolt",
"127.0.0.1:32000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ class StreamsTransactionEventExtensionsTest {
"user",
"user",
"unknown",
"unknown",
CaptureMode.OFF,
"unknown",
"unknown",
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<awaitility.version>4.2.1</awaitility.version>
<build-resources.version>2024-07.1</build-resources.version>
<byte-buddy.version>1.14.17</byte-buddy.version>
<cdc.version>1.0.6</cdc.version>
<cdc.version>1.0.7</cdc.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<hamcrest.version>2.2</hamcrest.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ abstract class Neo4jCdcSchemaIT {
"neo4j",
"neo4j",
"server-id",
"neo4j",
CaptureMode.DIFF,
"bolt",
"localhost:32000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ abstract class Neo4jCdcSourceIdIT {
"neo4j",
"neo4j",
"server-id",
"neo4j",
CaptureMode.DIFF,
"bolt",
"localhost:32000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ abstract class Neo4jSinkErrorIT {
"neo4j",
"neo4j",
"server-id",
"neo4j",
CaptureMode.DIFF,
"bolt",
"localhost:32000",
Expand Down
2 changes: 1 addition & 1 deletion sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<name>sink</name>
<description>Neo4j Connector for Kafka - Sink</description>
<properties>
<antlr4.version>4.13.1</antlr4.version>
<antlr4.version>4.13.2</antlr4.version>
<json-schema-validator.version>1.5.0</json-schema-validator.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ object TestUtils {
"service",
"neo4j",
"server-1",
"neo4j",
CaptureMode.DIFF,
"bolt",
"127.0.0.1:32000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.neo4j.connectors.kafka.configuration.helpers.VersionUtil
import org.neo4j.connectors.kafka.data.ChangeEventConverter
import org.neo4j.connectors.kafka.data.Headers
import org.neo4j.driver.SessionConfig
import org.neo4j.driver.TransactionConfig
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -46,6 +47,7 @@ class Neo4jCdcTask : SourceTask() {
private lateinit var settings: Map<String, String>
private lateinit var config: SourceConfiguration
private lateinit var sessionConfig: SessionConfig
private lateinit var transactionConfig: TransactionConfig
private lateinit var cdc: CDCService
private lateinit var offset: AtomicReference<String>
private lateinit var changeEventConverter: ChangeEventConverter
Expand All @@ -62,11 +64,13 @@ class Neo4jCdcTask : SourceTask() {
configBuilder.withDatabase(config.database)
}
sessionConfig = configBuilder.build()
transactionConfig = config.txConfig()

cdc =
CDCClient(
config.driver,
{ sessionConfig },
{ transactionConfig },
config.cdcPollingInterval.toJavaDuration(),
*config.cdcSelectors.toTypedArray())
log.debug("constructed cdc client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class SourceConfiguration(originals: Map<*, *>) :
SourceType.CDC -> {
val configMap = mutableMapOf<String, MutableList<Pattern>>()
val nonPositionalConfigMode = mutableMapOf<String, Boolean>()
val patternTxMetadataMap = mutableMapOf<Pattern, MutableMap<String, Any>>()

originals()
.entries
Expand Down Expand Up @@ -151,7 +152,7 @@ class SourceConfiguration(originals: Map<*, *>) :
.entries
.filter { CDC_PATTERN_ARRAY_METADATA_REGEX.matches(it.key) }
.map { CdcPatternConfigItem(it, CDC_PATTERN_ARRAY_METADATA_REGEX) }
.forEach { mapMetadata(it, nonPositionalConfigMode, configMap) }
.forEach { mapMetadata(it, nonPositionalConfigMode, configMap, patternTxMetadataMap) }

pivotMapCdcSelectorMap(configMap)
}
Expand Down Expand Up @@ -246,35 +247,38 @@ class SourceConfiguration(originals: Map<*, *>) :
val (index, patterns) = retrieveIndexAndPattern(configEntry, nonPositionalConfigMode, configMap)
val value = configEntry.value as String
val changesTo = value.splitToSequence(",").map { term -> term.trim() }.toSet()
val pattern = patterns.get(index)
val pattern = patterns[index]
pattern.withChangesTo(changesTo)
}

private fun mapMetadata(
configEntry: CdcPatternConfigItem,
nonPositionalConfigMode: MutableMap<String, Boolean>,
configMap: MutableMap<String, MutableList<Pattern>>
configMap: MutableMap<String, MutableList<Pattern>>,
patternTxMetadataMap: MutableMap<Pattern, MutableMap<String, Any>>
) {
val (index, patterns) = retrieveIndexAndPattern(configEntry, nonPositionalConfigMode, configMap)
val keyValue = configEntry.metadata!!
var value = configEntry.value
val metadataKey = configEntry.metadata!!
val value = configEntry.value
val pattern = patterns[index]
if (keyValue.startsWith(EntitySelector.METADATA_KEY_TX_METADATA + '.')) {
value =
mapOf(
keyValue.removePrefix(EntitySelector.METADATA_KEY_TX_METADATA + '.') to value,
)
val metadata =
mapOf(
EntitySelector.METADATA_KEY_TX_METADATA to value,
)
pattern.withMetadata(metadata)
if (metadataKey.startsWith("$METADATA_KEY_TX_METADATA.")) {
val txMetadataKey = metadataKey.removePrefix("$METADATA_KEY_TX_METADATA.")

val txMetadata =
patternTxMetadataMap.getOrPut(pattern) { mutableMapOf(txMetadataKey to value) }
txMetadata[txMetadataKey] = value

pattern.withTxMetadata(txMetadata)
patternTxMetadataMap[pattern] = txMetadata
} else if (metadataKey == METADATA_KEY_EXECUTING_USER) {
pattern.withExecutingUser(value as String)
} else if (metadataKey == METADATA_KEY_AUTHENTICATED_USER) {
pattern.withAuthenticatedUser(value as String)
} else {
val metadata =
mapOf(
keyValue to value,
)
pattern.withMetadata(metadata)
throw ConfigException(
"Unexpected metadata key: '$metadataKey' found in configuration property '${configEntry.key}'. " +
"Valid keys are '$METADATA_KEY_AUTHENTICATED_USER', '$METADATA_KEY_EXECUTING_USER', " +
"or keys starting with '$METADATA_KEY_TX_METADATA.*'.")
}
}

Expand Down Expand Up @@ -345,11 +349,36 @@ class SourceConfiguration(originals: Map<*, *>) :
cdcSelectorsToTopics.keys
.map {
when (it) {
is NodeSelector -> NodeSelector(it.change, it.changesTo, it.labels, it.key, it.metadata)
is NodeSelector ->
NodeSelector.builder()
.withOperation(it.operation)
.withChangesTo(it.changesTo)
.withLabels(it.labels)
.withKey(it.key)
.withTxMetadata(it.txMetadata)
.withExecutingUser(it.executingUser)
.withAuthenticatedUser(it.authenticatedUser)
.build()
is RelationshipSelector ->
RelationshipSelector(
it.change, it.changesTo, it.type, it.start, it.end, it.key, it.metadata)
is EntitySelector -> EntitySelector(it.change, it.changesTo, it.metadata)
RelationshipSelector.builder()
.withOperation(it.operation)
.withChangesTo(it.changesTo)
.withType(it.type)
.withStart(it.start)
.withEnd(it.end)
.withKey(it.key)
.withTxMetadata(it.txMetadata)
.withExecutingUser(it.executingUser)
.withAuthenticatedUser(it.authenticatedUser)
.build()
is EntitySelector ->
EntitySelector.builder()
.withOperation(it.operation)
.withChangesTo(it.changesTo)
.withTxMetadata(it.txMetadata)
.withExecutingUser(it.executingUser)
.withAuthenticatedUser(it.authenticatedUser)
.build()
else -> throw IllegalStateException("unexpected pattern type ${it.javaClass.name}")
}
}
Expand Down Expand Up @@ -459,6 +488,10 @@ class SourceConfiguration(originals: Map<*, *>) :
private val DEFAULT_CDC_POLL_DURATION = 5.seconds
private const val DEFAULT_STREAMING_PROPERTY = "timestamp"

private const val METADATA_KEY_AUTHENTICATED_USER = "authenticatedUser"
private const val METADATA_KEY_EXECUTING_USER = "executingUser"
private const val METADATA_KEY_TX_METADATA = "txMetadata"

fun validate(config: Config, originals: Map<String, String>) {
validate(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ object TestData {
"authenticated-user",
"executing-user",
"server-id",
"neo4j",
CaptureMode.FULL,
"connection-type",
"connection-client",
Expand Down
Loading

0 comments on commit fe279cc

Please sign in to comment.