Skip to content

Commit

Permalink
Writer tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrec committed Apr 22, 2020
1 parent 3b22354 commit 23f1199
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 126 deletions.
18 changes: 10 additions & 8 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func (f *Frame) closeW(w *Writer) error {
return err
}
buf := w.buf[:0]
// End mark (data block size of uint32(0)).
buf = append(buf, 0, 0, 0, 0)
if f.Descriptor.Flags.ContentChecksum() {
buf = f.checksum.Sum(buf)
}
// End mark (data block size of uint32(0)).
buf = append(buf, 0, 0, 0, 0)
_, err := w.src.Write(buf)
return err
}
Expand All @@ -62,7 +62,7 @@ newFrame:
}
goto newFrame
default:
return ErrInvalid
return ErrInvalidFrame
}
if err := f.Descriptor.initR(r); err != nil {
return err
Expand Down Expand Up @@ -103,14 +103,16 @@ func (fd *FrameDescriptor) write(w *Writer) error {
return nil
}

buf := w.buf[:2]
binary.LittleEndian.PutUint16(buf, uint16(fd.Flags))
buf := w.buf[:4+2]
// Write the magic number here even though it belongs to the Frame.
binary.LittleEndian.PutUint32(buf, w.frame.Magic)
binary.LittleEndian.PutUint16(buf[4:], uint16(fd.Flags))

if fd.Flags.Size() {
buf = buf[:10]
binary.LittleEndian.PutUint64(buf[2:], fd.ContentSize)
buf = buf[:4+2+8]
binary.LittleEndian.PutUint64(buf[4+2:], fd.ContentSize)
}
fd.Checksum = descriptorChecksum(buf)
fd.Checksum = descriptorChecksum(buf[4:])
buf = append(buf, fd.Checksum)

_, err := w.src.Write(buf)
Expand Down
28 changes: 12 additions & 16 deletions lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,24 @@ const (
// ErrInvalidSourceShortBuffer is returned by UncompressBlock or CompressBLock when a compressed
// block is corrupted or the destination buffer is not large enough for the uncompressed data.
ErrInvalidSourceShortBuffer _error = "lz4: invalid source or destination buffer too short"
// ErrClosed is returned when calling Write/Read or Close on an already closed Writer/Reader.
ErrClosed _error = "lz4: closed Writer"
// ErrInvalid is returned when reading an invalid LZ4 archive.
ErrInvalid _error = "lz4: bad magic number"
// ErrBlockDependency is returned when attempting to decompress an archive created with block dependency.
ErrBlockDependency _error = "lz4: block dependency not supported"
// ErrInvalidFrame is returned when reading an invalid LZ4 archive.
ErrInvalidFrame _error = "lz4: bad magic number"
// ErrUnsupportedSeek is returned when attempting to Seek any way but forward from the current position.
ErrUnsupportedSeek _error = "lz4: can only seek forward from io.SeekCurrent"
// ErrInternalUnhandledState is an internal error.
ErrInternalUnhandledState _error = "lz4: unhandled state"
// ErrInvalidHeaderChecksum
// ErrInvalidHeaderChecksum is returned when reading a frame.
ErrInvalidHeaderChecksum _error = "lz4: invalid header checksum"
// ErrInvalidBlockChecksum
// ErrInvalidBlockChecksum is returned when reading a frame.
ErrInvalidBlockChecksum _error = "lz4: invalid block checksum"
// ErrInvalidFrameChecksum
// ErrInvalidFrameChecksum is returned when reading a frame.
ErrInvalidFrameChecksum _error = "lz4: invalid frame checksum"
// ErrInvalidCompressionLevel
ErrInvalidCompressionLevel _error = "lz4: invalid compression level"
// ErrCannotApplyOptions
ErrCannotApplyOptions _error = "lz4: cannot apply options"
// ErrInvalidBlockSize
ErrInvalidBlockSize _error = "lz4: invalid block size"
// ErrOptionNotApplicable
// ErrOptionInvalidCompressionLevel is returned when the supplied compression level is invalid.
ErrOptionInvalidCompressionLevel _error = "lz4: invalid compression level"
// ErrOptionClosedOrError is returned when an option is applied to a closed or in error object.
ErrOptionClosedOrError _error = "lz4: cannot apply options on closed or in error object"
// ErrOptionInvalidBlockSize is returned when
ErrOptionInvalidBlockSize _error = "lz4: invalid block size"
// ErrOptionNotApplicable is returned when trying to apply an option to an object not supporting it.
ErrOptionNotApplicable _error = "lz4: option not applicable"
)
10 changes: 8 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lz4

import (
"fmt"
"reflect"
"runtime"
"sync"
)
Expand All @@ -17,6 +18,11 @@ type (
Option func(Applier) error
)

func (o Option) String() string {
//TODO proper naming of options
return reflect.TypeOf(o).String()
}

// Default options.
var (
defaultBlockSizeOption = BlockSizeOption(Block4Mb)
Expand Down Expand Up @@ -98,7 +104,7 @@ func BlockSizeOption(size BlockSize) Option {
return ErrOptionNotApplicable
}
if !size.isValid() {
return fmt.Errorf("%w: %d", ErrInvalidBlockSize, size)
return fmt.Errorf("%w: %d", ErrOptionInvalidBlockSize, size)
}
w.frame.Descriptor.Flags.BlockSizeIndexSet(size.index())
return nil
Expand Down Expand Up @@ -188,7 +194,7 @@ func CompressionLevelOption(level CompressionLevel) Option {
switch level {
case Fast, Level1, Level2, Level3, Level4, Level5, Level6, Level7, Level8, Level9:
default:
return fmt.Errorf("%w: %d", ErrInvalidCompressionLevel, level)
return fmt.Errorf("%w: %d", ErrOptionInvalidCompressionLevel, level)
}
w.level = level
return nil
Expand Down
6 changes: 2 additions & 4 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
var readerStates = []aState{
noState: newState,
errorState: newState,
newState: headerState,
headerState: readState,
newState: readState,
readState: closedState,
closedState: newState,
}
Expand Down Expand Up @@ -40,7 +39,7 @@ func (r *Reader) Apply(options ...Option) (err error) {
case errorState:
return r.state.err
default:
return ErrCannotApplyOptions
return ErrOptionClosedOrError
}
for _, o := range options {
if err = o(r); err != nil {
Expand Down Expand Up @@ -69,7 +68,6 @@ func (r *Reader) Read(buf []byte) (n int, err error) {
return 0, r.state.err
case newState:
// First initialization.
r.state.next(nil)
if err = r.frame.initR(r); r.state.next(err) {
return
}
Expand Down
9 changes: 5 additions & 4 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func TestReader(t *testing.T) {
t.Fatal(err)
}

var out bytes.Buffer
out := new(bytes.Buffer)
zr := lz4.NewReader(f)
n, err := io.Copy(&out, zr)
n, err := io.Copy(out, zr)
if err != nil {
t.Fatal(err)
}
Expand All @@ -69,7 +69,7 @@ func TestReader(t *testing.T) {

out.Reset()
zr = lz4.NewReader(f2)
_, err = io.CopyN(&out, zr, 10)
_, err = io.CopyN(out, zr, 10)
if err != nil {
t.Fatal(err)
}
Expand All @@ -78,6 +78,7 @@ func TestReader(t *testing.T) {
}
return

//TODO add Reader.Seek
pos, err := zr.Seek(-1, io.SeekCurrent)
if err == nil {
t.Fatal("expected error from invalid seek")
Expand Down Expand Up @@ -109,7 +110,7 @@ func TestReader(t *testing.T) {
}

out.Reset()
_, err = io.CopyN(&out, zr, 10)
_, err = io.CopyN(out, zr, 10)
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ const (
noState aState = iota // uninitialized reader
errorState // unrecoverable error encountered
newState // instantiated object
headerState // processing header
readState // reading data
writeState // writing data
closedState // all done
Expand Down
11 changes: 5 additions & 6 deletions state_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 16 additions & 21 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import "io"

var writerStates = []aState{
noState: newState,
newState: headerState,
headerState: writeState,
newState: writeState,
writeState: closedState,
closedState: newState,
errorState: newState,
Expand All @@ -21,7 +20,7 @@ func NewWriter(w io.Writer) *Writer {

type Writer struct {
state _State
buf [11]byte // frame descriptor needs at most 4+8+1=11 bytes
buf [15]byte // frame descriptor needs at most 4(magic)+4+8+1=11 bytes
src io.Writer // destination writer
level CompressionLevel // how hard to try
num int // concurrency level
Expand All @@ -41,7 +40,7 @@ func (w *Writer) Apply(options ...Option) (err error) {
case errorState:
return w.state.err
default:
return ErrCannotApplyOptions
return ErrOptionClosedOrError
}
for _, o := range options {
if err = o(w); err != nil {
Expand All @@ -62,7 +61,6 @@ func (w *Writer) Write(buf []byte) (n int, err error) {
case closedState, errorState:
return 0, w.state.err
case newState:
w.state.next(nil)
if err = w.frame.Descriptor.write(w); w.state.next(err) {
return
}
Expand All @@ -74,7 +72,7 @@ func (w *Writer) Write(buf []byte) (n int, err error) {
for len(buf) > 0 {
if w.idx == 0 && len(buf) >= zn {
// Avoid a copy as there is enough data for a block.
if err = w.write(); err != nil {
if err = w.write(buf[:zn], false); err != nil {
return
}
n += zn
Expand All @@ -93,18 +91,19 @@ func (w *Writer) Write(buf []byte) (n int, err error) {
}

// Buffer full.
if err = w.write(); err != nil {
if err = w.write(w.data, true); err != nil {
return
}
w.idx = 0
}
return
}

func (w *Writer) write() error {
func (w *Writer) write(data []byte, direct bool) error {
if w.isNotConcurrent() {
defer w.handler(len(w.data))
return w.frame.Blocks.Block.compress(w, w.data, w.ht).write(w)
defer w.handler(len(data))
block := w.frame.Blocks.Block
return block.compress(w, data, w.ht).write(w)
}
size := w.frame.Descriptor.Flags.BlockSizeIndex()
c := make(chan *FrameDataBlock)
Expand All @@ -122,42 +121,38 @@ func (w *Writer) write() error {
size.put(data)
<-c
size.put(zdata)
}(c, w.data, size)
}(c, data, size)

if w.idx > 0 {
// Not closed.
if direct {
w.data = size.get()
}
w.idx = 0

return nil
}

// Close closes the Writer, flushing any unwritten data to the underlying io.Writer,
// but does not close the underlying io.Writer.
func (w *Writer) Close() error {
func (w *Writer) Close() (err error) {
switch w.state.state {
case writeState:
case errorState:
return w.state.err
default:
return nil
}
var err error
defer func() { w.state.next(err) }()
if idx := w.idx; idx > 0 {
if w.idx > 0 {
// Flush pending data.
w.data = w.data[:idx]
w.idx = 0
if err = w.write(); err != nil {
if err = w.write(w.data[:w.idx], false); err != nil {
return err
}
w.data = nil
w.idx = 0
}
if w.isNotConcurrent() {
htPool.Put(w.ht)
size := w.frame.Descriptor.Flags.BlockSizeIndex()
size.put(w.data)
w.data = nil
}
return w.frame.closeW(w)
}
Expand Down
Loading

0 comments on commit 23f1199

Please sign in to comment.