File tree Expand file tree Collapse file tree 2 files changed +5
-4
lines changed
s3/src/main/java/com/kesque/pulsar/sink/s3 Expand file tree Collapse file tree 2 files changed +5
-4
lines changed Original file line number Diff line number Diff line change @@ -12,7 +12,7 @@ GET `admin/v2/functions/connectors` displays the nar is loaded successfully as
12
12
```
13
13
14
14
```
15
- $ bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-s3-1.0.nar --inputs aws-s3-input-topic --name aws-s3-test --sink-config-file ./connectors/pulsar-postgres-jdbc-sink .yaml
15
+ $ bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-s3-1.0.nar --inputs aws-s3-input-topic --name aws-s3-test --sink-config-file ./connectors/pulsar-s3-io .yaml --processing-guarantees EFFECTIVELY_ONCE --subs-position Earliest
16
16
"Created successfully"
17
17
18
18
$ bin/pulsar-admin sinks list
Original file line number Diff line number Diff line change @@ -84,9 +84,10 @@ public class AWSS3Sink implements Sink<byte[]> {
84
84
@ Override
85
85
public void write (Record <byte []> record ) throws Exception {
86
86
synchronized (this ) {
87
- //int len = record.getValue().length;
88
-
89
- this .lastRecordEpoch = record .getEventTime ().get ();
87
+ Optional <Long > eventTimeOptional = record .getEventTime ();
88
+ if (eventTimeOptional .isPresent ()) {
89
+ this .lastRecordEpoch = eventTimeOptional .get ();
90
+ }
90
91
Long ledgerId = getLedgerId (record .getRecordSequence ().get ());
91
92
LOG .info ("ledgerID {} event time {}" , ledgerId , this .lastRecordEpoch );
92
93
// Optional<Message<byte[]>> msgOption = record.getMessage(); //.get();
You can’t perform that action at this time.
0 commit comments