3
3
import static java .nio .charset .StandardCharsets .UTF_8 ;
4
4
5
5
import java .io .IOException ;
6
+ import java .util .Map ;
7
+ import java .util .concurrent .ConcurrentHashMap ;
8
+ import java .util .concurrent .ExecutorService ;
9
+ import java .util .concurrent .Executors ;
6
10
7
11
import com .fasterxml .jackson .databind .JsonNode ;
8
12
import com .kesque .pulsar .sink .s3 .AWSS3Config ;
18
22
import org .apache .avro .io .DatumReader ;
19
23
import org .apache .avro .io .DecoderFactory ;
20
24
import org .apache .hadoop .conf .Configuration ;
25
+ import org .apache .logging .log4j .util .Strings ;
21
26
import org .apache .parquet .avro .AvroParquetWriter ;
22
27
import org .apache .parquet .hadoop .ParquetFileWriter ;
23
28
import org .apache .parquet .hadoop .ParquetWriter ;
@@ -36,12 +41,22 @@ public class ParquetRecordWriter implements RecordWriter {
36
41
private AWSS3Config config ;
37
42
private S3Storage s3Storage ;
38
43
private Configuration parquetWriterConfig ;
39
- private String currentFile = "" ;
40
44
private Schema avroSchema ;
41
- private Record <byte []> record ; // kept for batch ack
45
+ private volatile String currentFile = "" ;
46
+ private volatile Record <byte []> lastRecord ; // kept for batch ack
42
47
43
- S3ParquetOutputFile s3ParquetOutputFile = null ;
44
- private ParquetWriter <GenericData .Record > writer = null ;
48
+ // parallel writer size
49
+ int WRITER_LIMIT = 4 ;
50
+
51
+ // key is the file name in S3
52
+ private ConcurrentHashMap <String , ParquetWriter <GenericData .Record >> writerMap = new ConcurrentHashMap <String , ParquetWriter <GenericData .Record >>(WRITER_LIMIT );
53
+ private ConcurrentHashMap <String , S3ParquetOutputFile > s3ParquetOutputFileMap = new ConcurrentHashMap <String , S3ParquetOutputFile >(WRITER_LIMIT );
54
+
55
+ // a thread pool of hard coded 4 threads for final commit and upload s3
56
+ ExecutorService uploaderExecutor = Executors .newFixedThreadPool (WRITER_LIMIT );
57
+
58
+ // S3ParquetOutputFile s3ParquetOutputFile = null;
59
+ // private ParquetWriter<GenericData.Record> writer = null;
45
60
46
61
public ParquetRecordWriter (AWSS3Config confg , S3Storage storage ) {
47
62
this .config = confg ;
@@ -50,57 +65,90 @@ public ParquetRecordWriter(AWSS3Config confg, S3Storage storage) {
50
65
parquetWriterConfig = new Configuration ();
51
66
parquetWriterConfig .set ("fs.s3.awsAccessKeyId" , config .getAccessKeyId ());
52
67
parquetWriterConfig .set ("fs.s3.awsSecretAccessKey" , config .getSecretAccessKey ());
53
-
54
68
}
55
69
56
70
@ Override
57
71
public void write (Record <byte []> record , String file ) {
58
72
byte [] data = record .getValue ();
59
73
String convJson = new String (data ); // StandardCharsets.UTF_8);
60
- log .info ("data payload length is {} string-value {}" , data .length );
61
74
JsonNode datum = JsonUtil .parse (convJson );
62
75
this .avroSchema = JsonUtil .inferSchema (JsonUtil .parse (convJson ), "schemafromjson" );
63
76
log .info (avroSchema .toString ());
64
77
65
78
GenericData .Record convertedRecord = (org .apache .avro .generic .GenericData .Record ) JsonUtil .convertToAvro (GenericData .get (), datum , avroSchema );
79
+ writeParquet (convertedRecord , file );
80
+ this .lastRecord = record ;
81
+ }
66
82
67
- try {
68
- if (file .equals (currentFile )) {
69
- log .info ("write to existing parquet writer" );
70
- writer .write (convertedRecord );
71
-
72
- } else {
73
- this .currentFile = file ;
74
- if (this .writer != null ) {
75
- writer .write (convertedRecord );
76
- log .info ("cumulative ack all pulsar messages and write to existing parquet writer" );
77
- record .ack (); // depends on cumulative ack
78
- s3ParquetOutputFile .s3out .setCommit ();
79
- this .writer .close ();
80
- this .writer = null ;
81
- } else {
82
- s3ParquetOutputFile = new S3ParquetOutputFile (this .s3Storage , file );
83
-
84
- log .info ("write to a new parquet writer" );
85
-
86
- this .writer = AvroParquetWriter .<GenericData .Record >builder (s3ParquetOutputFile ).withSchema (avroSchema )
87
- .withCompressionCodec (CompressionCodecName .SNAPPY ) // GZIP
88
- .withWriteMode (ParquetFileWriter .Mode .OVERWRITE ).withConf (parquetWriterConfig )
89
- .withPageSize (4 * 1024 * 1024 ) // For compression
90
- .withRowGroupSize (16 * 1024 * 1024 ) // For write buffering (Page size)
91
- .build ();
92
-
93
- writer .write (convertedRecord );
83
+ private synchronized void writeParquet (GenericData .Record record , String file ) {
84
+ log .info ("currentFile is {} file name is {}" , this .currentFile , file );
85
+ String lastFile = this .currentFile ; // save a copy because currentFile can be replace in the main thread
86
+ if (Strings .isNotBlank (lastFile ) && !file .equals (lastFile )) {
87
+ uploaderExecutor .execute (() -> {
88
+ ParquetWriter <GenericData .Record > writer = writerMap .get (lastFile );
89
+ if (writer == null ) {
90
+ log .error ("fatal error - failed to find parquet writer to match file {}" , lastFile );
91
+ return ;
92
+ }
93
+ S3ParquetOutputFile s3ParquetOutputFile = s3ParquetOutputFileMap .get (lastFile );
94
+ if (s3ParquetOutputFile == null ) {
95
+ log .error ("fatal error - failed to find s3ParquetOutputFile to match file {}" , lastFile );
96
+ return ;
97
+ }
98
+
99
+ // when a new file and parquet writer is required
100
+ s3ParquetOutputFile .s3out .setCommit ();
101
+ try {
102
+ writer .close ();
103
+ } catch (IOException e ) {
104
+ log .error ("close parquet writer exception {}" , e .getMessage ());
105
+ e .printStackTrace ();
94
106
}
107
+ writerMap .remove (lastFile );
108
+ s3ParquetOutputFileMap .remove (lastFile );
109
+ log .info ("cumulative ack all pulsar messages and write to existing parquet writer, map size {}" , writerMap .size ());
110
+ lastRecord .ack (); // depends on cumulative ack
111
+
112
+ });
113
+
114
+ }
115
+ this .currentFile = file ; // for the next write
116
+
117
+ ParquetWriter <GenericData .Record > writer = this .writerMap .get (file );
118
+ if (writer ==null ) {
119
+ log .info ("write to a new parquet writer with file {} currentFile {}" , file , this .currentFile );
120
+ S3ParquetOutputFile s3ParquetOutputFile = new S3ParquetOutputFile (this .s3Storage , file );
121
+
122
+ try {
123
+ writer = AvroParquetWriter .<GenericData .Record >builder (s3ParquetOutputFile ).withSchema (avroSchema )
124
+ .withCompressionCodec (CompressionCodecName .SNAPPY ) // GZIP
125
+ .withWriteMode (ParquetFileWriter .Mode .OVERWRITE ).withConf (parquetWriterConfig )
126
+ .withPageSize (4 * 1024 * 1024 ) // For compression
127
+ .withRowGroupSize (16 * 1024 * 1024 ) // For write buffering (Page size)
128
+ .build ();
129
+ } catch (IOException e ) {
130
+ log .error ("create parquet s3 writer exception {}" , e .getMessage ());
131
+ e .printStackTrace ();
95
132
}
133
+
134
+ s3ParquetOutputFileMap .put (file , s3ParquetOutputFile );
135
+ writerMap .put (file , writer );
136
+ log .info ("put writer and parquet output file to {}" , file );
137
+ }
138
+
139
+ try {
140
+ writer .write (record );
96
141
} catch (IOException e ) {
97
- e .printStackTrace ();
98
142
log .error ("write to parquet s3 exception {}" , e .getMessage ());
143
+ e .printStackTrace ();
99
144
}
100
145
}
101
146
102
147
@ Override
103
148
public void close () {
149
+ if (!uploaderExecutor .isShutdown ()) {
150
+ uploaderExecutor .shutdown ();
151
+ }
104
152
log .info ("ParquetRecordWriter close()" );
105
153
}
106
154
0 commit comments