@@ -27,9 +27,13 @@ type FileWriter struct {
2727 name string
2828 replication int
2929 blockSize int64
30+ offset int64
3031
3132 blockWriter * transfer.BlockWriter
3233 deadline time.Time
34+
35+ // Key and IV for transparent encryption support.
36+ enc * TransparentEncryptionInfo
3337}
3438
3539// Create opens a new file in HDFS with the default replication, block size,
@@ -62,13 +66,14 @@ func (c *Client) Create(name string) (*FileWriter, error) {
6266// very important that Close is called after all data has been written.
6367func (c * Client ) CreateFile (name string , replication int , blockSize int64 , perm os.FileMode ) (* FileWriter , error ) {
6468 createReq := & hdfs.CreateRequestProto {
65- Src : proto .String (name ),
66- Masked : & hdfs.FsPermissionProto {Perm : proto .Uint32 (uint32 (perm ))},
67- ClientName : proto .String (c .namenode .ClientName ),
68- CreateFlag : proto .Uint32 (1 ),
69- CreateParent : proto .Bool (false ),
70- Replication : proto .Uint32 (uint32 (replication )),
71- BlockSize : proto .Uint64 (uint64 (blockSize )),
69+ Src : proto .String (name ),
70+ Masked : & hdfs.FsPermissionProto {Perm : proto .Uint32 (uint32 (perm ))},
71+ ClientName : proto .String (c .namenode .ClientName ),
72+ CreateFlag : proto .Uint32 (1 ),
73+ CreateParent : proto .Bool (false ),
74+ Replication : proto .Uint32 (uint32 (replication )),
75+ BlockSize : proto .Uint64 (uint64 (blockSize )),
76+ CryptoProtocolVersion : []hdfs.CryptoProtocolVersionProto {hdfs .CryptoProtocolVersionProto_ENCRYPTION_ZONES },
7277 }
7378 createResp := & hdfs.CreateResponseProto {}
7479
@@ -77,11 +82,21 @@ func (c *Client) CreateFile(name string, replication int, blockSize int64, perm
7782 return nil , & os.PathError {"create" , name , interpretCreateException (err )}
7883 }
7984
85+ var enc * TransparentEncryptionInfo
86+ if createResp .GetFs ().GetFileEncryptionInfo () != nil {
87+ enc , err = c .kmsGetKey (createResp .GetFs ().GetFileEncryptionInfo ())
88+ if err != nil {
89+ c .Remove (name )
90+ return nil , & os.PathError {"create" , name , err }
91+ }
92+ }
93+
8094 return & FileWriter {
8195 client : c ,
8296 name : name ,
8397 replication : replication ,
8498 blockSize : blockSize ,
99+ enc : enc ,
85100 }, nil
86101}
87102
@@ -106,11 +121,21 @@ func (c *Client) Append(name string) (*FileWriter, error) {
106121 return nil , & os.PathError {"append" , name , interpretException (err )}
107122 }
108123
124+ var enc * TransparentEncryptionInfo
125+ if appendResp .GetStat ().GetFileEncryptionInfo () != nil {
126+ enc , err = c .kmsGetKey (appendResp .GetStat ().GetFileEncryptionInfo ())
127+ if err != nil {
128+ return nil , & os.PathError {"append" , name , err }
129+ }
130+ }
131+
109132 f := & FileWriter {
110133 client : c ,
111134 name : name ,
112135 replication : int (appendResp .Stat .GetBlockReplication ()),
113136 blockSize : int64 (appendResp .Stat .GetBlocksize ()),
137+ offset : int64 (* appendResp .Stat .Length ),
138+ enc : enc ,
114139 }
115140
116141 // This returns nil if there are no blocks (it's an empty file) or if the
@@ -176,6 +201,28 @@ func (f *FileWriter) SetDeadline(t time.Time) error {
176201// of this, it is important that Close is called after all data has been
177202// written.
178203func (f * FileWriter ) Write (b []byte ) (int , error ) {
204+ // Encrypt data chunk if file in HDFS encrypted zone.
205+ if f .enc != nil && len (b ) > 0 {
206+ var offset int
207+ for offset < len (b ) {
208+ size := min (len (b )- offset , aesChunkSize )
209+ ciphertext , err := aesCtrStep (f .offset , f .enc , b [offset :offset + size ])
210+ if err != nil {
211+ return offset , err
212+ }
213+ writtenSize , err := f .writeImpl (ciphertext )
214+ offset += writtenSize
215+ if err != nil {
216+ return offset , err
217+ }
218+ }
219+ return offset , nil
220+ } else {
221+ return f .writeImpl (b )
222+ }
223+ }
224+
225+ func (f * FileWriter ) writeImpl (b []byte ) (int , error ) {
179226 if f .blockWriter == nil {
180227 err := f .startNewBlock ()
181228 if err != nil {
@@ -187,6 +234,7 @@ func (f *FileWriter) Write(b []byte) (int, error) {
187234 for off < len (b ) {
188235 n , err := f .blockWriter .Write (b [off :])
189236 off += n
237+ f .offset += int64 (n )
190238 if err == transfer .ErrEndOfBlock {
191239 err = f .startNewBlock ()
192240 }
@@ -316,3 +364,10 @@ func (f *FileWriter) finalizeBlock() error {
316364 f .blockWriter = nil
317365 return nil
318366}
367+
368+ func min (a , b int ) int {
369+ if a < b {
370+ return a
371+ }
372+ return b
373+ }
0 commit comments