Skip to content

Commit 0ea8ad1

Browse files
committed
split ChangeStreamCursor from OplogCursor
1 parent 250e019 commit 0ea8ad1

File tree

8 files changed

+198
-115
lines changed

8 files changed

+198
-115
lines changed

core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ private Document commandChangeStreamPipeline(Document query, Oplog oplog, String
644644
int batchSize = (int) cursorDocument.getOrDefault("batchSize", 0);
645645

646646
String namespace = getFullCollectionNamespace(collectionName);
647-
Cursor cursor = oplog.createCursor(changeStreamDocument, namespace, aggregation);
647+
Cursor cursor = oplog.createChangeStreamCursor(changeStreamDocument, namespace, aggregation);
648648
return Utils.firstBatchCursorResponse(namespace, cursor.takeDocuments(batchSize), cursor);
649649
}
650650

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package de.bwaldvogel.mongo.oplog;
2+
3+
import java.util.List;
4+
import java.util.stream.Collectors;
5+
6+
import de.bwaldvogel.mongo.MongoBackend;
7+
import de.bwaldvogel.mongo.backend.TailableCursor;
8+
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
9+
import de.bwaldvogel.mongo.bson.BsonTimestamp;
10+
import de.bwaldvogel.mongo.bson.Document;
11+
import de.bwaldvogel.mongo.exception.MongoServerException;
12+
13+
public class ChangeStreamCursor implements TailableCursor {
14+
15+
private static final String FULL_DOCUMENT = "fullDocument";
16+
private static final String OPERATION_TYPE = "operationType";
17+
private static final String CLUSTER_TIME = "clusterTime";
18+
private static final String DOCUMENT_KEY = "documentKey";
19+
20+
private final MongoBackend mongoBackend;
21+
private final Document changeStreamDocument;
22+
private final Aggregation aggregation;
23+
private final OplogCursor oplogCursor;
24+
25+
ChangeStreamCursor(
26+
MongoBackend mongoBackend,
27+
Document changeStreamDocument,
28+
Aggregation aggregation,
29+
OplogCursor oplogCursor
30+
) {
31+
this.mongoBackend = mongoBackend;
32+
this.changeStreamDocument = changeStreamDocument;
33+
this.aggregation = aggregation;
34+
this.oplogCursor = oplogCursor;
35+
}
36+
37+
@Override
38+
public long getId() {
39+
return oplogCursor.getId();
40+
}
41+
42+
@Override
43+
public boolean isEmpty() {
44+
return oplogCursor.isEmpty();
45+
}
46+
47+
@Override
48+
public List<Document> takeDocuments(int numberToReturn) {
49+
return aggregation.runStagesAsStream(
50+
oplogCursor.takeDocuments(numberToReturn).stream()
51+
.map(this::toChangeStreamResponseDocument)
52+
).collect(Collectors.toList());
53+
}
54+
55+
@Override
56+
public OplogPosition getPosition() {
57+
return oplogCursor.getPosition();
58+
}
59+
60+
private Document toChangeStreamResponseDocument(Document oplogDocument) {
61+
OperationType operationType = OperationType.fromCode(oplogDocument.get("op").toString());
62+
Document documentKey = new Document();
63+
Document document = getUpdateDocument(oplogDocument);
64+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(oplogDocument);
65+
OplogPosition oplogPosition = new OplogPosition(timestamp);
66+
switch (operationType) {
67+
case UPDATE:
68+
case DELETE:
69+
documentKey = document;
70+
break;
71+
case INSERT:
72+
documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID));
73+
break;
74+
case COMMAND:
75+
return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp);
76+
default:
77+
throw new IllegalArgumentException("Unexpected operation type: " + operationType);
78+
}
79+
80+
return new Document()
81+
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
82+
.append(OPERATION_TYPE, operationType.getDescription())
83+
.append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType))
84+
.append(DOCUMENT_KEY, documentKey)
85+
.append(CLUSTER_TIME, timestamp);
86+
}
87+
88+
private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) {
89+
Document document = getUpdateDocument(oplogDocument);
90+
String operationType = document.keySet().stream().findFirst().orElseThrow(
91+
() -> new MongoServerException("Unspecified command operation type")
92+
);
93+
94+
return new Document()
95+
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
96+
.append(OPERATION_TYPE, operationType)
97+
.append(CLUSTER_TIME, timestamp);
98+
}
99+
100+
private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) {
101+
switch (operationType) {
102+
case INSERT:
103+
return getUpdateDocument(document);
104+
case DELETE:
105+
return null;
106+
case UPDATE:
107+
return lookUpUpdateDocument(changeStreamDocument, document);
108+
}
109+
throw new IllegalArgumentException("Invalid operation type");
110+
}
111+
112+
private Document getUpdateDocument(Document document) {
113+
return (Document) document.get(OplogDocumentFields.O);
114+
}
115+
116+
private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) {
117+
Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document));
118+
if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) {
119+
String namespace = (String) document.get(OplogDocumentFields.NAMESPACE);
120+
String databaseName = namespace.split("\\.")[0];
121+
String collectionName = namespace.split("\\.")[1];
122+
return mongoBackend.resolveDatabase(databaseName)
123+
.resolveCollection(collectionName, true)
124+
.queryAllAsStream()
125+
.filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID)))
126+
.findFirst()
127+
.orElse(deltaUpdate);
128+
}
129+
return deltaUpdate;
130+
}
131+
132+
private Document getDeltaUpdate(Document updateDocument) {
133+
Document delta = new Document();
134+
if (updateDocument.containsKey("$set")) {
135+
delta.appendAll((Document) updateDocument.get("$set"));
136+
}
137+
if (updateDocument.containsKey("$unset")) {
138+
delta.appendAll((Document) updateDocument.get("$unset"));
139+
}
140+
return delta;
141+
}
142+
143+
}
Lines changed: 23 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,27 @@
11
package de.bwaldvogel.mongo.oplog;
22

33
import java.util.List;
4+
import java.util.Objects;
45
import java.util.UUID;
56
import java.util.function.Function;
7+
import java.util.stream.Collectors;
68
import java.util.stream.Stream;
79

810
import de.bwaldvogel.mongo.MongoBackend;
911
import de.bwaldvogel.mongo.MongoCollection;
10-
import de.bwaldvogel.mongo.backend.Cursor;
1112
import de.bwaldvogel.mongo.backend.CursorRegistry;
13+
import de.bwaldvogel.mongo.backend.TailableCursor;
1214
import de.bwaldvogel.mongo.backend.Utils;
1315
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
1416
import de.bwaldvogel.mongo.bson.BsonTimestamp;
1517
import de.bwaldvogel.mongo.bson.Document;
16-
import de.bwaldvogel.mongo.exception.MongoServerException;
1718

1819
public class CollectionBackedOplog implements Oplog {
1920

2021
private static final long ELECTION_TERM = 1L;
2122
private static final String START_AT_OPERATION_TIME = "startAtOperationTime";
22-
private static final String FULL_DOCUMENT = "fullDocument";
2323
private static final String START_AFTER = "startAfter";
2424
private static final String RESUME_AFTER = "resumeAfter";
25-
private static final String OPERATION_TYPE = "operationType";
26-
private static final String CLUSTER_TIME = "clusterTime";
27-
private static final String DOCUMENT_KEY = "documentKey";
2825

2926
private final OplogClock oplogClock;
3027
private final MongoCollection<Document> collection;
@@ -83,21 +80,19 @@ public void handleDropCollection(String namespace) {
8380
collection.addDocument(toOplogDropCollection(databaseName, collectionName));
8481
}
8582

86-
private Stream<Document> streamOplog(Document changeStreamDocument, OplogPosition position, Aggregation aggregation,
87-
String namespace) {
88-
return aggregation.runStagesAsStream(collection.queryAllAsStream()
83+
private Stream<Document> streamOplog(OplogPosition position, String namespace) {
84+
return collection.queryAllAsStream()
8985
.filter(document -> filterNamespace(document, namespace))
9086
.filter(document -> {
91-
BsonTimestamp timestamp = getOplogTimestamp(document);
87+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document);
9288
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
9389
return documentOplogPosition.isAfter(position);
9490
})
9591
.sorted((o1, o2) -> {
96-
BsonTimestamp timestamp1 = getOplogTimestamp(o1);
97-
BsonTimestamp timestamp2 = getOplogTimestamp(o2);
92+
BsonTimestamp timestamp1 = OplogUtils.getOplogTimestamp(o1);
93+
BsonTimestamp timestamp2 = OplogUtils.getOplogTimestamp(o2);
9894
return timestamp1.compareTo(timestamp2);
99-
})
100-
.map(document -> toChangeStreamResponseDocument(document, changeStreamDocument)));
95+
});
10196
}
10297

10398
private static boolean filterNamespace(Document document, String namespace) {
@@ -110,7 +105,16 @@ private static boolean filterNamespace(Document document, String namespace) {
110105
}
111106

112107
@Override
113-
public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
108+
public OplogCursor createCursor(String namespace, OplogPosition initialOplogPosition) {
109+
return new OplogCursor(
110+
cursorRegistry.generateCursorId(),
111+
position -> streamOplog(position, namespace),
112+
initialOplogPosition
113+
);
114+
}
115+
116+
@Override
117+
public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
114118
Document startAfter = (Document) changeStreamDocument.get(START_AFTER);
115119
Document resumeAfter = (Document) changeStreamDocument.get(RESUME_AFTER);
116120
BsonTimestamp startAtOperationTime = (BsonTimestamp) changeStreamDocument.get(START_AT_OPERATION_TIME);
@@ -123,7 +127,7 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr
123127
String collectionName = Utils.getCollectionNameFromFullName(namespace);
124128
boolean resumeAfterTerminalEvent = collection.queryAllAsStream()
125129
.filter(document -> {
126-
BsonTimestamp timestamp = getOplogTimestamp(document);
130+
BsonTimestamp timestamp = OplogUtils.getOplogTimestamp(document);
127131
OplogPosition documentOplogPosition = new OplogPosition(timestamp);
128132
return initialOplogPosition.isAfter(documentOplogPosition.inclusive());
129133
})
@@ -141,9 +145,9 @@ public Cursor createCursor(Document changeStreamDocument, String namespace, Aggr
141145
initialOplogPosition = new OplogPosition(oplogClock.now());
142146
}
143147

144-
Function<OplogPosition, Stream<Document>> streamSupplier =
145-
position -> streamOplog(changeStreamDocument, position, aggregation, namespace);
146-
OplogCursor cursor = new OplogCursor(cursorRegistry.generateCursorId(), streamSupplier, initialOplogPosition);
148+
OplogCursor oplogCursor = createCursor(namespace, initialOplogPosition);
149+
ChangeStreamCursor cursor
150+
= new ChangeStreamCursor(backend, changeStreamDocument, aggregation, oplogCursor);
147151
cursorRegistry.add(cursor);
148152
return cursor;
149153
}
@@ -185,91 +189,4 @@ private boolean isOplogCollection(String namespace) {
185189
return collection.getFullName().equals(namespace);
186190
}
187191

188-
private Document getFullDocument(Document changeStreamDocument, Document document, OperationType operationType) {
189-
switch (operationType) {
190-
case INSERT:
191-
return getUpdateDocument(document);
192-
case DELETE:
193-
return null;
194-
case UPDATE:
195-
return lookUpUpdateDocument(changeStreamDocument, document);
196-
}
197-
throw new IllegalArgumentException("Invalid operation type");
198-
}
199-
200-
private Document lookUpUpdateDocument(Document changeStreamDocument, Document document) {
201-
Document deltaUpdate = getDeltaUpdate(getUpdateDocument(document));
202-
if (changeStreamDocument.containsKey(FULL_DOCUMENT) && changeStreamDocument.get(FULL_DOCUMENT).equals("updateLookup")) {
203-
String namespace = (String) document.get(OplogDocumentFields.NAMESPACE);
204-
String databaseName = namespace.split("\\.")[0];
205-
String collectionName = namespace.split("\\.")[1];
206-
return backend.resolveDatabase(databaseName)
207-
.resolveCollection(collectionName, true)
208-
.queryAllAsStream()
209-
.filter(d -> d.get(OplogDocumentFields.ID).equals(((Document) document.get(OplogDocumentFields.O2)).get(OplogDocumentFields.ID)))
210-
.findFirst()
211-
.orElse(deltaUpdate);
212-
}
213-
return deltaUpdate;
214-
}
215-
216-
private Document getDeltaUpdate(Document updateDocument) {
217-
Document delta = new Document();
218-
if (updateDocument.containsKey("$set")) {
219-
delta.appendAll((Document) updateDocument.get("$set"));
220-
}
221-
if (updateDocument.containsKey("$unset")) {
222-
delta.appendAll((Document) updateDocument.get("$unset"));
223-
}
224-
return delta;
225-
}
226-
227-
private Document toChangeStreamResponseDocument(Document oplogDocument, Document changeStreamDocument) {
228-
OperationType operationType = OperationType.fromCode(oplogDocument.get(OplogDocumentFields.OPERATION_TYPE).toString());
229-
Document documentKey = new Document();
230-
Document document = getUpdateDocument(oplogDocument);
231-
BsonTimestamp timestamp = getOplogTimestamp(oplogDocument);
232-
OplogPosition oplogPosition = new OplogPosition(timestamp);
233-
switch (operationType) {
234-
case UPDATE:
235-
case DELETE:
236-
documentKey = document;
237-
break;
238-
case INSERT:
239-
documentKey.append(OplogDocumentFields.ID, document.get(OplogDocumentFields.ID));
240-
break;
241-
case COMMAND:
242-
return toChangeStreamCommandResponseDocument(oplogDocument, oplogPosition, timestamp);
243-
default:
244-
throw new IllegalArgumentException("Unexpected operation type: " + operationType);
245-
}
246-
247-
return new Document()
248-
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
249-
.append(OPERATION_TYPE, operationType.getDescription())
250-
.append(FULL_DOCUMENT, getFullDocument(changeStreamDocument, oplogDocument, operationType))
251-
.append(DOCUMENT_KEY, documentKey)
252-
.append(CLUSTER_TIME, timestamp);
253-
}
254-
255-
private Document toChangeStreamCommandResponseDocument(Document oplogDocument, OplogPosition oplogPosition, BsonTimestamp timestamp) {
256-
Document document = getUpdateDocument(oplogDocument);
257-
String operationType = document.keySet().stream().findFirst().orElseThrow(
258-
() -> new MongoServerException("Unspecified command operation type")
259-
);
260-
261-
return new Document()
262-
.append(OplogDocumentFields.ID, new Document(OplogDocumentFields.ID_DATA_KEY, oplogPosition.toHexString()))
263-
.append(OPERATION_TYPE, operationType)
264-
.append(CLUSTER_TIME, timestamp);
265-
}
266-
267-
private static BsonTimestamp getOplogTimestamp(Document document) {
268-
return (BsonTimestamp) document.get(OplogDocumentFields.TIMESTAMP);
269-
}
270-
271-
private static Document getUpdateDocument(Document document) {
272-
return (Document) document.get(OplogDocumentFields.O);
273-
}
274-
275192
}

core/src/main/java/de/bwaldvogel/mongo/oplog/NoopOplog.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import java.util.List;
44

5-
import de.bwaldvogel.mongo.backend.Cursor;
65
import de.bwaldvogel.mongo.backend.EmptyCursor;
6+
import de.bwaldvogel.mongo.backend.TailableCursor;
77
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
88
import de.bwaldvogel.mongo.bson.Document;
99

@@ -35,7 +35,12 @@ public void handleDropCollection(String namespace) {
3535
}
3636

3737
@Override
38-
public Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
38+
public TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition) {
39+
return EmptyCursor.get();
40+
}
41+
42+
@Override
43+
public TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation) {
3944
return EmptyCursor.get();
4045
}
4146
}

core/src/main/java/de/bwaldvogel/mongo/oplog/Oplog.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.util.List;
44

5-
import de.bwaldvogel.mongo.backend.Cursor;
5+
import de.bwaldvogel.mongo.backend.TailableCursor;
66
import de.bwaldvogel.mongo.backend.aggregation.Aggregation;
77
import de.bwaldvogel.mongo.bson.Document;
88

@@ -16,5 +16,7 @@ public interface Oplog {
1616

1717
void handleDropCollection(String namespace);
1818

19-
Cursor createCursor(Document changeStreamDocument, String namespace, Aggregation aggregation);
19+
TailableCursor createCursor(String namespace, OplogPosition initialOplogPosition);
20+
21+
TailableCursor createChangeStreamCursor(Document changeStreamDocument, String namespace, Aggregation aggregation);
2022
}

0 commit comments

Comments
 (0)