@@ -27,9 +27,13 @@ type FileWriter struct {
2727 name string
2828 replication int
2929 blockSize int64
30+ offset int64
3031
3132 blockWriter * transfer.BlockWriter
3233 deadline time.Time
34+
35+ // Key and IV for transparent encryption support.
36+ enc * TransparentEncryptionInfo
3337}
3438
3539// Create opens a new file in HDFS with the default replication, block size,
@@ -62,13 +66,14 @@ func (c *Client) Create(name string) (*FileWriter, error) {
6266// very important that Close is called after all data has been written.
6367func (c * Client ) CreateFile (name string , replication int , blockSize int64 , perm os.FileMode ) (* FileWriter , error ) {
6468 createReq := & hdfs.CreateRequestProto {
65- Src : proto .String (name ),
66- Masked : & hdfs.FsPermissionProto {Perm : proto .Uint32 (uint32 (perm ))},
67- ClientName : proto .String (c .namenode .ClientName ),
68- CreateFlag : proto .Uint32 (1 ),
69- CreateParent : proto .Bool (false ),
70- Replication : proto .Uint32 (uint32 (replication )),
71- BlockSize : proto .Uint64 (uint64 (blockSize )),
69+ Src : proto .String (name ),
70+ Masked : & hdfs.FsPermissionProto {Perm : proto .Uint32 (uint32 (perm ))},
71+ ClientName : proto .String (c .namenode .ClientName ),
72+ CreateFlag : proto .Uint32 (1 ),
73+ CreateParent : proto .Bool (false ),
74+ Replication : proto .Uint32 (uint32 (replication )),
75+ BlockSize : proto .Uint64 (uint64 (blockSize )),
76+ CryptoProtocolVersion : []hdfs.CryptoProtocolVersionProto {hdfs .CryptoProtocolVersionProto_ENCRYPTION_ZONES },
7277 }
7378 createResp := & hdfs.CreateResponseProto {}
7479
@@ -77,11 +82,20 @@ func (c *Client) CreateFile(name string, replication int, blockSize int64, perm
7782 return nil , & os.PathError {"create" , name , interpretCreateException (err )}
7883 }
7984
85+ var enc * TransparentEncryptionInfo
86+ if createResp .GetFs ().GetFileEncryptionInfo () != nil {
87+ enc , err = c .kmsGetKey (createResp .GetFs ().GetFileEncryptionInfo ())
88+ if err != nil {
89+ return nil , & os.PathError {"create" , name , err }
90+ }
91+ }
92+
8093 return & FileWriter {
8194 client : c ,
8295 name : name ,
8396 replication : replication ,
8497 blockSize : blockSize ,
98+ enc : enc ,
8599 }, nil
86100}
87101
@@ -106,11 +120,21 @@ func (c *Client) Append(name string) (*FileWriter, error) {
106120 return nil , & os.PathError {"append" , name , interpretException (err )}
107121 }
108122
123+ var enc * TransparentEncryptionInfo
124+ if appendResp .GetStat ().GetFileEncryptionInfo () != nil {
125+ enc , err = c .kmsGetKey (appendResp .GetStat ().GetFileEncryptionInfo ())
126+ if err != nil {
127+ return nil , & os.PathError {"append" , name , err }
128+ }
129+ }
130+
109131 f := & FileWriter {
110132 client : c ,
111133 name : name ,
112134 replication : int (appendResp .Stat .GetBlockReplication ()),
113135 blockSize : int64 (appendResp .Stat .GetBlocksize ()),
136+ offset : int64 (* appendResp .Stat .Length ),
137+ enc : enc ,
114138 }
115139
116140 // This returns nil if there are no blocks (it's an empty file) or if the
@@ -176,6 +200,28 @@ func (f *FileWriter) SetDeadline(t time.Time) error {
176200// of this, it is important that Close is called after all data has been
177201// written.
178202func (f * FileWriter ) Write (b []byte ) (int , error ) {
203+ // Encrypt data chunk if file in HDFS encrypted zone.
204+ if f .enc != nil && len (b ) > 0 {
205+ var offset int
206+ for offset < len (b ) {
207+ size := min (len (b )- offset , aesChunkSize )
208+ ciphertext , err := aesCtrStep (f .offset , f .enc , b [offset :offset + size ])
209+ if err != nil {
210+ return offset , err
211+ }
212+ writtenSize , err := f .writeImpl (ciphertext )
213+ offset += writtenSize
214+ if err != nil {
215+ return offset , err
216+ }
217+ }
218+ return offset , nil
219+ } else {
220+ return f .writeImpl (b )
221+ }
222+ }
223+
224+ func (f * FileWriter ) writeImpl (b []byte ) (int , error ) {
179225 if f .blockWriter == nil {
180226 err := f .startNewBlock ()
181227 if err != nil {
@@ -187,6 +233,7 @@ func (f *FileWriter) Write(b []byte) (int, error) {
187233 for off < len (b ) {
188234 n , err := f .blockWriter .Write (b [off :])
189235 off += n
236+ f .offset += int64 (n )
190237 if err == transfer .ErrEndOfBlock {
191238 err = f .startNewBlock ()
192239 }
@@ -316,3 +363,10 @@ func (f *FileWriter) finalizeBlock() error {
316363 f .blockWriter = nil
317364 return nil
318365}
366+
367+ func min (a , b int ) int {
368+ if a < b {
369+ return a
370+ }
371+ return b
372+ }
0 commit comments