11package com .kesque .pulsar .sink .cassandra .astra ;
22
33import java .io .IOException ;
4- import java .nio .charset .StandardCharsets ;
5- import java .nio .file .Paths ;
6-
7- import static java .nio .charset .StandardCharsets .UTF_8 ;
8-
9- import java .util .HashMap ;
104import java .util .List ;
115import java .util .Map ;
12- import java .util .Optional ;
136import java .util .UUID ;
147import java .util .concurrent .Executors ;
158import java .util .concurrent .ScheduledExecutorService ;
2013import org .apache .avro .SchemaParseException ;
2114import org .apache .avro .reflect .AvroSchema ;
2215import org .apache .commons .collections4 .CollectionUtils ;
16+ import org .apache .commons .lang3 .StringUtils ;
2317import org .apache .logging .log4j .util .Strings ;
2418import org .apache .pulsar .client .api .Message ;
2519import org .apache .pulsar .client .api .Schema ;
3933
4034import kong .unirest .HttpResponse ;
4135import kong .unirest .Unirest ;
42- import kong .unirest .HttpResponse ;
4336import kong .unirest .JsonNode ;
44- import kong .unirest .Unirest ;
4537import kong .unirest .json .JSONObject ;
4638
4739/**
@@ -60,7 +52,6 @@ public class AstraSink implements Sink<byte[]> {
6052 private static final Logger log = LoggerFactory .getLogger (AstraSink .class );
6153
6254 private AstraConfig astraConfig ;
63- private String filePrefix = "" ;
6455
6556 private List <Record <byte []>> incomingList ;
6657 private ScheduledExecutorService flushExecutor ;
@@ -81,15 +72,8 @@ public class AstraSink implements Sink<byte[]> {
8172 */
8273 @ Override
8374 public void write (Record <byte []> record ) throws Exception {
84- synchronized (this ) {
85- int len = record .getValue ().length ;
86-
87- Long ledgerId = getLedgerId (record .getRecordSequence ().get ());
88- log .info ("ledgerID {} and value's length {}" , ledgerId , len );
89- // Optional<Message<byte[]>> msgOption = record.getMessage(); //.get();
90- // log.error("message option isPresent {}", msgOption.isPresent());
91-
92- }
75+ //TODO: toAstra can be its own thread
76+ toAstra (record );
9377 }
9478
9579 @ Override
@@ -126,7 +110,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
126110
127111 String err = "clusterId, region, keyspace, table name, and user credentials must be specified." ;
128112 log .error (err );
129- throw new Exception (err );
113+ throw new IllegalArgumentException (err );
130114 }
131115 this .baseURL = "https://" + astraConfig .getClusterId () + "-" + astraConfig .getClusterRegion ()
132116 + ".apps.astra.datastax.com/api/rest/v1/" ;
@@ -191,13 +175,13 @@ private boolean createTable(String schema) {
191175 }
192176
193177 private void addRow (String rows , String table ) {
194- String columns = "{ \" columns \" :[" + rows + "]}" ;
178+ log . info ( "addRow {}" , rows ) ;
195179 String url = this .baseURL +"keyspaces/" + astraConfig .getKeySpace () + "/tables/" + table + "/rows" ;
196180 HttpResponse <String > response = Unirest .post (url )
197181 .header ("x-cassandra-request-id" , genUUID ())
198182 .header ("x-cassandra-token" , this .token )
199183 .header ("Content-Type" , "application/json" )
200- .body (columns )
184+ .body (rows )
201185 .asString ();
202186
203187 if (response .getStatus () != 201 ) {
@@ -206,6 +190,27 @@ private void addRow(String rows, String table) {
206190 }
207191 }
208192
193+ private String buildColumnData (Record <byte []> record ) {
194+ JSONObject obj = new JSONObject (new String (record .getValue ()));
195+ String objStr = "{\" columns\" :[" ;
196+ for (String key : obj .keySet ()) {
197+ objStr = objStr + "{\" name\" :\" " + key + "\" ,\" value\" :\" " + obj .getString (key ) + "\" }," ;
198+ }
199+
200+ // remove the last comma
201+ return StringUtils .substring (objStr , 0 , objStr .length () - 1 ) + "]}" ;
202+ }
203+
204+ private void toAstra (Record <byte []> record ) {
205+ try {
206+ addRow (buildColumnData (record ), this .table );
207+ } catch (Exception e ) {
208+ log .error ("failed to send to astra " , e );
209+ }
210+
211+ record .ack ();
212+ }
213+
209214 private String genUUID () {
210215 return UUID .randomUUID ().toString ();
211216 }
0 commit comments