From e800f06a5841d52fe2d7243c3988250758549d96 Mon Sep 17 00:00:00 2001 From: Jeremy Stribling Date: Tue, 29 Aug 2017 17:10:22 -0700 Subject: [PATCH] RFC: provide insight into the status of gogit A project I'm working on uses gogit to power a git remote helper (i.e., it gets invoked by git when you do `git clone ://...`, etc from the command line). On big repos, gogit and our backing store are slow enough that having printed progress indicators makes a huge difference to usability, and better allows us to debug where slownesses might be. This is a quick-and-dirty PR that lets gogit communicate its status back to some listener. I haven't fixed up the tests yet, or written any new tests, because I wanted to get feedback first on if this is even a good approach or not. Please take a look and let me know! (Note that this is different from the sideband progress stuff, which only lets you know the progress of the remote server. This PR gives the status of the local client.) --- options.go | 6 ++- plumbing/format/idxfile/encoder.go | 43 +++++++++++---- plumbing/format/packfile/common.go | 17 ++++-- plumbing/format/packfile/decoder.go | 50 +++++++++++++---- plumbing/format/packfile/delta_selector.go | 50 +++++++++++++++-- plumbing/format/packfile/encoder.go | 32 +++++++---- plumbing/revlist/revlist.go | 21 ++++++-- plumbing/status.go | 53 +++++++++++++++++++ plumbing/storer/object.go | 2 +- plumbing/transport/server/server.go | 8 +-- remote.go | 25 +++++++-- storage/filesystem/internal/dotgit/dotgit.go | 4 +- storage/filesystem/internal/dotgit/writers.go | 30 ++++++----- storage/filesystem/object.go | 4 +- 14 files changed, 274 insertions(+), 71 deletions(-) create mode 100644 plumbing/status.go diff --git a/options.go b/options.go index 0ec18d426..19bcf0b3e 100644 --- a/options.go +++ b/options.go @@ -133,7 +133,8 @@ type FetchOptions struct { Progress sideband.Progress // Tags describe how the tags will be fetched from the remote repository, // by default is TagFollowing. - Tags TagFetchMode + Tags TagFetchMode + StatusChan plumbing.StatusChan } // Validate validates the fields and sets the default values. @@ -159,7 +160,8 @@ type PushOptions struct { // object. A refspec with empty src can be used to delete a reference. RefSpecs []config.RefSpec // Auth credentials, if required, to use with the remote repository. - Auth transport.AuthMethod + Auth transport.AuthMethod + StatusChan plumbing.StatusChan } // Validate validates the fields and sets the default values. diff --git a/plumbing/format/idxfile/encoder.go b/plumbing/format/idxfile/encoder.go index 40abfb830..da5587b00 100644 --- a/plumbing/format/idxfile/encoder.go +++ b/plumbing/format/idxfile/encoder.go @@ -6,6 +6,7 @@ import ( "io" "sort" + "gopkg.in/src-d/go-git.v4/plumbing" "gopkg.in/src-d/go-git.v4/utils/binary" ) @@ -23,10 +24,10 @@ func NewEncoder(w io.Writer) *Encoder { } // Encode encodes an Idxfile to the encoder writer. -func (e *Encoder) Encode(idx *Idxfile) (int, error) { +func (e *Encoder) Encode(idx *Idxfile, statusChan plumbing.StatusChan) (int, error) { idx.Entries.Sort() - flow := []func(*Idxfile) (int, error){ + flow := []func(*Idxfile, plumbing.StatusChan) (int, error){ e.encodeHeader, e.encodeFanout, e.encodeHashes, @@ -37,7 +38,7 @@ func (e *Encoder) Encode(idx *Idxfile) (int, error) { sz := 0 for _, f := range flow { - i, err := f(idx) + i, err := f(idx, statusChan) sz += i if err != nil { @@ -48,7 +49,7 @@ func (e *Encoder) Encode(idx *Idxfile) (int, error) { return sz, nil } -func (e *Encoder) encodeHeader(idx *Idxfile) (int, error) { +func (e *Encoder) encodeHeader(idx *Idxfile, _ plumbing.StatusChan) (int, error) { c, err := e.Write(idxHeader) if err != nil { return c, err @@ -57,7 +58,7 @@ func (e *Encoder) encodeHeader(idx *Idxfile) (int, error) { return c + 4, binary.WriteUint32(e, idx.Version) } -func (e *Encoder) encodeFanout(idx *Idxfile) (int, error) { +func (e *Encoder) encodeFanout(idx *Idxfile, _ plumbing.StatusChan) (int, error) { fanout := idx.calculateFanout() for _, c := range fanout { if err := binary.WriteUint32(e, c); err != nil { @@ -68,7 +69,13 @@ func (e *Encoder) encodeFanout(idx *Idxfile) (int, error) { return 1024, nil } -func (e *Encoder) encodeHashes(idx *Idxfile) (int, error) { +func (e *Encoder) encodeHashes(idx *Idxfile, statusChan plumbing.StatusChan) (int, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusIndexHash, + ObjectsTotal: len(idx.Entries), + } + statusChan.SendUpdate(update) + sz := 0 for _, ent := range idx.Entries { i, err := e.Write(ent.Hash[:]) @@ -77,12 +84,20 @@ func (e *Encoder) encodeHashes(idx *Idxfile) (int, error) { if err != nil { return sz, err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return sz, nil } -func (e *Encoder) encodeCRC32(idx *Idxfile) (int, error) { +func (e *Encoder) encodeCRC32(idx *Idxfile, statusChan plumbing.StatusChan) (int, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusIndexCRC, + ObjectsTotal: len(idx.Entries), + } + statusChan.SendUpdate(update) + sz := 0 for _, ent := range idx.Entries { err := binary.Write(e, ent.CRC32) @@ -91,12 +106,20 @@ func (e *Encoder) encodeCRC32(idx *Idxfile) (int, error) { if err != nil { return sz, err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return sz, nil } -func (e *Encoder) encodeOffsets(idx *Idxfile) (int, error) { +func (e *Encoder) encodeOffsets(idx *Idxfile, statusChan plumbing.StatusChan) (int, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusIndexOffset, + ObjectsTotal: len(idx.Entries), + } + statusChan.SendUpdate(update) + sz := 0 var o64bits []uint64 @@ -112,6 +135,8 @@ func (e *Encoder) encodeOffsets(idx *Idxfile) (int, error) { } sz += 4 + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } for _, o := range o64bits { @@ -125,7 +150,7 @@ func (e *Encoder) encodeOffsets(idx *Idxfile) (int, error) { return sz, nil } -func (e *Encoder) encodeChecksums(idx *Idxfile) (int, error) { +func (e *Encoder) encodeChecksums(idx *Idxfile, _ plumbing.StatusChan) (int, error) { if _, err := e.Write(idx.PackfileChecksum[:]); err != nil { return 0, err } diff --git a/plumbing/format/packfile/common.go b/plumbing/format/packfile/common.go index 728cb16a4..424a89926 100644 --- a/plumbing/format/packfile/common.go +++ b/plumbing/format/packfile/common.go @@ -3,6 +3,7 @@ package packfile import ( "io" + "gopkg.in/src-d/go-git.v4/plumbing" "gopkg.in/src-d/go-git.v4/plumbing/storer" "gopkg.in/src-d/go-git.v4/utils/ioutil" ) @@ -23,9 +24,12 @@ const ( // UpdateObjectStorage updates the given storer.EncodedObjectStorer with the contents of the // packfile. -func UpdateObjectStorage(s storer.EncodedObjectStorer, packfile io.Reader) error { +func UpdateObjectStorage( + s storer.EncodedObjectStorer, + packfile io.Reader, + statusChan plumbing.StatusChan) error { if sw, ok := s.(storer.PackfileWriter); ok { - return writePackfileToObjectStorage(sw, packfile) + return writePackfileToObjectStorage(sw, packfile, statusChan) } stream := NewScanner(packfile) @@ -34,13 +38,16 @@ func UpdateObjectStorage(s storer.EncodedObjectStorer, packfile io.Reader) error return err } - _, err = d.Decode() + _, err = d.Decode(statusChan) return err } -func writePackfileToObjectStorage(sw storer.PackfileWriter, packfile io.Reader) error { +func writePackfileToObjectStorage( + sw storer.PackfileWriter, + packfile io.Reader, + statusChan plumbing.StatusChan) error { var err error - w, err := sw.PackfileWriter() + w, err := sw.PackfileWriter(statusChan) if err != nil { return err } diff --git a/plumbing/format/packfile/decoder.go b/plumbing/format/packfile/decoder.go index e49de5168..cfc5abca1 100644 --- a/plumbing/format/packfile/decoder.go +++ b/plumbing/format/packfile/decoder.go @@ -116,26 +116,39 @@ func canResolveDeltas(s *Scanner, o storer.EncodedObjectStorer) bool { // Decode reads a packfile and stores it in the value pointed to by s. The // offsets and the CRCs are calculated by this method -func (d *Decoder) Decode() (checksum plumbing.Hash, err error) { +func (d *Decoder) Decode(statusChan plumbing.StatusChan) (checksum plumbing.Hash, err error) { defer func() { d.isDecoded = true }() if d.isDecoded { return plumbing.ZeroHash, ErrAlreadyDecoded } - if err := d.doDecode(); err != nil { + if err := d.doDecode(statusChan); err != nil { return plumbing.ZeroHash, err } return d.s.Checksum() } -func (d *Decoder) doDecode() error { +func (d *Decoder) doDecode(statusChan plumbing.StatusChan) error { + statusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusCount, + }) + _, count, err := d.s.Header() if err != nil { return err } + statusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusCount, + ObjectsTotal: int(count), + }) + statusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusFetch, + ObjectsTotal: int(count), + }) + if !d.hasBuiltIndex { d.idx = NewIndex(int(count)) } @@ -144,25 +157,35 @@ func (d *Decoder) doDecode() error { _, isTxStorer := d.o.(storer.Transactioner) switch { case d.o == nil: - return d.decodeObjects(int(count)) + return d.decodeObjects(int(count), statusChan) case isTxStorer: - return d.decodeObjectsWithObjectStorerTx(int(count)) + return d.decodeObjectsWithObjectStorerTx(int(count), statusChan) default: - return d.decodeObjectsWithObjectStorer(int(count)) + return d.decodeObjectsWithObjectStorer(int(count), statusChan) } } -func (d *Decoder) decodeObjects(count int) error { +func (d *Decoder) decodeObjects(count int, statusChan plumbing.StatusChan) error { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusFetch, + ObjectsTotal: count, + } for i := 0; i < count; i++ { if _, err := d.DecodeObject(); err != nil { return err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return nil } -func (d *Decoder) decodeObjectsWithObjectStorer(count int) error { +func (d *Decoder) decodeObjectsWithObjectStorer(count int, statusChan plumbing.StatusChan) error { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusFetch, + ObjectsTotal: count, + } for i := 0; i < count; i++ { obj, err := d.DecodeObject() if err != nil { @@ -172,12 +195,19 @@ func (d *Decoder) decodeObjectsWithObjectStorer(count int) error { if _, err := d.o.SetEncodedObject(obj); err != nil { return err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return nil } -func (d *Decoder) decodeObjectsWithObjectStorerTx(count int) error { +func (d *Decoder) decodeObjectsWithObjectStorerTx(count int, statusChan plumbing.StatusChan) error { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusFetch, + ObjectsTotal: count, + } + d.tx = d.o.(storer.Transactioner).Begin() for i := 0; i < count; i++ { @@ -196,6 +226,8 @@ func (d *Decoder) decodeObjectsWithObjectStorerTx(count int) error { return err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return d.tx.Commit() diff --git a/plumbing/format/packfile/delta_selector.go b/plumbing/format/packfile/delta_selector.go index cc0ae0fd5..9803929a0 100644 --- a/plumbing/format/packfile/delta_selector.go +++ b/plumbing/format/packfile/delta_selector.go @@ -32,22 +32,47 @@ func newDeltaSelector(s storer.EncodedObjectStorer) *deltaSelector { // ObjectsToPack creates a list of ObjectToPack from the hashes provided, // creating deltas if it's suitable, using an specific internal logic -func (dw *deltaSelector) ObjectsToPack(hashes []plumbing.Hash) ([]*ObjectToPack, error) { - otp, err := dw.objectsToPack(hashes) +func (dw *deltaSelector) ObjectsToPack( + hashes []plumbing.Hash, + statusChan plumbing.StatusChan, +) ([]*ObjectToPack, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusRead, + ObjectsTotal: len(hashes), + } + statusChan.SendUpdate(update) + + otp, err := dw.objectsToPack(hashes, statusChan, update) if err != nil { return nil, err } + update = plumbing.StatusUpdate{ + Stage: plumbing.StatusSort, + ObjectsTotal: update.ObjectsTotal, + } + statusChan.SendUpdate(update) + dw.sort(otp) - if err := dw.walk(otp); err != nil { + update = plumbing.StatusUpdate{ + Stage: plumbing.StatusDelta, + ObjectsTotal: update.ObjectsTotal, + } + statusChan.SendUpdate(update) + + if err := dw.walk(otp, statusChan, update); err != nil { return nil, err } return otp, nil } -func (dw *deltaSelector) objectsToPack(hashes []plumbing.Hash) ([]*ObjectToPack, error) { +func (dw *deltaSelector) objectsToPack( + hashes []plumbing.Hash, + statusChan plumbing.StatusChan, + update plumbing.StatusUpdate, +) ([]*ObjectToPack, error) { var objectsToPack []*ObjectToPack for _, h := range hashes { o, err := dw.encodedDeltaObject(h) @@ -61,6 +86,9 @@ func (dw *deltaSelector) objectsToPack(hashes []plumbing.Hash) ([]*ObjectToPack, } objectsToPack = append(objectsToPack, otp) + + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } if err := dw.fixAndBreakChains(objectsToPack); err != nil { @@ -171,7 +199,16 @@ func (dw *deltaSelector) sort(objectsToPack []*ObjectToPack) { sort.Sort(byTypeAndSize(objectsToPack)) } -func (dw *deltaSelector) walk(objectsToPack []*ObjectToPack) error { +func (dw *deltaSelector) walk( + objectsToPack []*ObjectToPack, + statusChan plumbing.StatusChan, + update plumbing.StatusUpdate, +) error { + sendUpdate := func() { + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) + } + for i := 0; i < len(objectsToPack); i++ { target := objectsToPack[i] @@ -179,11 +216,13 @@ func (dw *deltaSelector) walk(objectsToPack []*ObjectToPack) error { // object. This happens when a delta is set to be reused from an existing // packfile. if target.IsDelta() { + sendUpdate() continue } // We only want to create deltas from specific types. if !applyDelta[target.Type()] { + sendUpdate() continue } @@ -200,6 +239,7 @@ func (dw *deltaSelector) walk(objectsToPack []*ObjectToPack) error { return err } } + sendUpdate() } return nil diff --git a/plumbing/format/packfile/encoder.go b/plumbing/format/packfile/encoder.go index 142655904..3d183b1f5 100644 --- a/plumbing/format/packfile/encoder.go +++ b/plumbing/format/packfile/encoder.go @@ -14,10 +14,10 @@ import ( // Encoder gets the data from the storage and write it into the writer in PACK // format type Encoder struct { - selector *deltaSelector - w *offsetWriter - zw *zlib.Writer - hasher plumbing.Hasher + selector *deltaSelector + w *offsetWriter + zw *zlib.Writer + hasher plumbing.Hasher // offsets is a map of object hashes to corresponding offsets in the packfile. // It is used to determine offset of the base of a delta when a OFS_DELTA is // used. @@ -47,16 +47,28 @@ func NewEncoder(w io.Writer, s storer.EncodedObjectStorer, useRefDeltas bool) *E // Encode creates a packfile containing all the objects referenced in hashes // and writes it to the writer in the Encoder. -func (e *Encoder) Encode(hashes []plumbing.Hash) (plumbing.Hash, error) { - objects, err := e.selector.ObjectsToPack(hashes) +func (e *Encoder) Encode( + hashes []plumbing.Hash, + statusChan plumbing.StatusChan, +) (plumbing.Hash, error) { + objects, err := e.selector.ObjectsToPack(hashes, statusChan) if err != nil { return plumbing.ZeroHash, err } - return e.encode(objects) + return e.encode(objects, statusChan) } -func (e *Encoder) encode(objects []*ObjectToPack) (plumbing.Hash, error) { +func (e *Encoder) encode( + objects []*ObjectToPack, + statusChan plumbing.StatusChan, +) (plumbing.Hash, error) { + update := plumbing.StatusUpdate{ + Stage: plumbing.StatusSend, + ObjectsTotal: len(objects), + } + statusChan.SendUpdate(update) + if err := e.head(len(objects)); err != nil { return plumbing.ZeroHash, err } @@ -65,6 +77,8 @@ func (e *Encoder) encode(objects []*ObjectToPack) (plumbing.Hash, error) { if err := e.entry(o); err != nil { return plumbing.ZeroHash, err } + update.ObjectsDone++ + statusChan.SendUpdateIfPossible(update) } return e.footer() @@ -137,7 +151,7 @@ func (e *Encoder) writeOfsDeltaHeader(deltaOffset int64, base plumbing.Hash) err // for OFS_DELTA, offset of the base is interpreted as negative offset // relative to the type-byte of the header of the ofs-delta entry. - relativeOffset := deltaOffset-baseOffset + relativeOffset := deltaOffset - baseOffset if relativeOffset <= 0 { return fmt.Errorf("bad offset for OFS_DELTA entry: %d", relativeOffset) } diff --git a/plumbing/revlist/revlist.go b/plumbing/revlist/revlist.go index 5b2ff994e..7720bc31a 100644 --- a/plumbing/revlist/revlist.go +++ b/plumbing/revlist/revlist.go @@ -20,34 +20,41 @@ func Objects( s storer.EncodedObjectStorer, objs, ignore []plumbing.Hash, + statusChan plumbing.StatusChan, ) ([]plumbing.Hash, error) { - ignore, err := objects(s, ignore, nil, true) + ignore, err := objects(s, ignore, nil, nil, true) if err != nil { return nil, err } - return objects(s, objs, ignore, false) + return objects(s, objs, ignore, statusChan, false) } func objects( s storer.EncodedObjectStorer, objects, ignore []plumbing.Hash, + statusChan plumbing.StatusChan, allowMissingObjects bool, ) ([]plumbing.Hash, error) { seen := hashListToSet(ignore) result := make(map[plumbing.Hash]bool) + update := plumbing.StatusUpdate{Stage: plumbing.StatusCount} + statusChan.SendUpdate(update) + walkerFunc := func(h plumbing.Hash) { if !seen[h] { result[h] = true seen[h] = true + update.ObjectsTotal++ + statusChan.SendUpdateIfPossible(update) } } for _, h := range objects { - if err := processObject(s, h, seen, ignore, walkerFunc); err != nil { + if err := processObject(s, h, seen, ignore, walkerFunc, statusChan); err != nil { if allowMissingObjects && err == plumbing.ErrObjectNotFound { continue } @@ -56,7 +63,10 @@ func objects( } } - return hashSetToList(result), nil + hashes := hashSetToList(result) + update.ObjectsTotal = len(hashes) + statusChan.SendUpdate(update) + return hashes, nil } // processObject obtains the object using the hash an process it depending of its type @@ -66,6 +76,7 @@ func processObject( seen map[plumbing.Hash]bool, ignore []plumbing.Hash, walkerFunc func(h plumbing.Hash), + statusChan plumbing.StatusChan, ) error { if seen[h] { return nil @@ -88,7 +99,7 @@ func processObject( return iterateCommitTrees(seen, do, walkerFunc) case *object.Tag: walkerFunc(do.Hash) - return processObject(s, do.Target, seen, ignore, walkerFunc) + return processObject(s, do.Target, seen, ignore, walkerFunc, statusChan) case *object.Blob: walkerFunc(do.Hash) default: diff --git a/plumbing/status.go b/plumbing/status.go new file mode 100644 index 000000000..bed12ad99 --- /dev/null +++ b/plumbing/status.go @@ -0,0 +1,53 @@ +package plumbing + +type StatusStage int + +const ( + StatusCount StatusStage = iota + StatusRead + StatusSort + StatusDelta + StatusSend + StatusFetch + StatusIndexHash + StatusIndexCRC + StatusIndexOffset + StatusDone + + StatusUnknown StatusStage = -1 +) + +type StatusUpdate struct { + Stage StatusStage + + ObjectsTotal int + ObjectsDone int + + // TODO: BytesTotal and BytesDone? +} + +type StatusChan chan<- StatusUpdate + +func (sc StatusChan) SendUpdate(update StatusUpdate) { + if sc == nil { + return + } + sc <- update +} + +func (sc StatusChan) SendUpdateIfPossible(update StatusUpdate) { + if sc == nil { + return + } + if update.ObjectsDone == update.ObjectsTotal { + // We should always send the final status update, before the + // next stage change. + sc <- update + return + } + + select { + case sc <- update: + default: + } +} diff --git a/plumbing/storer/object.go b/plumbing/storer/object.go index 3f41468a7..d5aee0283 100644 --- a/plumbing/storer/object.go +++ b/plumbing/storer/object.go @@ -60,7 +60,7 @@ type PackfileWriter interface { // // If the Storer not implements PackfileWriter the objects should be written // using the Set method. - PackfileWriter() (io.WriteCloser, error) + PackfileWriter(plumbing.StatusChan) (io.WriteCloser, error) } // EncodedObjectIter is a generic closable interface for iterating over objects. diff --git a/plumbing/transport/server/server.go b/plumbing/transport/server/server.go index be36de5cf..552ab11d6 100644 --- a/plumbing/transport/server/server.go +++ b/plumbing/transport/server/server.go @@ -165,7 +165,7 @@ func (s *upSession) UploadPack(ctx context.Context, req *packp.UploadPackRequest pr, pw := io.Pipe() e := packfile.NewEncoder(pw, s.storer, false) go func() { - _, err := e.Encode(objs) + _, err := e.Encode(objs, nil) pw.CloseWithError(err) }() @@ -175,12 +175,12 @@ func (s *upSession) UploadPack(ctx context.Context, req *packp.UploadPackRequest } func (s *upSession) objectsToUpload(req *packp.UploadPackRequest) ([]plumbing.Hash, error) { - haves, err := revlist.Objects(s.storer, req.Haves, nil) + haves, err := revlist.Objects(s.storer, req.Haves, nil, nil) if err != nil { return nil, err } - return revlist.Objects(s.storer, req.Wants, haves) + return revlist.Objects(s.storer, req.Wants, haves, nil) } func (*upSession) setSupportedCapabilities(c *capability.List) error { @@ -313,7 +313,7 @@ func (s *rpSession) writePackfile(r io.ReadCloser) error { return nil } - if err := packfile.UpdateObjectStorage(s.storer, r); err != nil { + if err := packfile.UpdateObjectStorage(s.storer, r, nil); err != nil { _ = r.Close() return err } diff --git a/remote.go b/remote.go index c07c5af03..f42fdadca 100644 --- a/remote.go +++ b/remote.go @@ -140,13 +140,13 @@ func (r *Remote) PushContext(ctx context.Context, o *PushOptions) error { var hashesToPush []plumbing.Hash // Avoid the expensive revlist operation if we're only doing deletes. if !allDelete { - hashesToPush, err = revlist.Objects(r.s, objects, haves) + hashesToPush, err = revlist.Objects(r.s, objects, haves, o.StatusChan) if err != nil { return err } } - rs, err := pushHashes(ctx, s, r.s, req, hashesToPush) + rs, err := pushHashes(ctx, s, r.s, req, hashesToPush, o.StatusChan) if err != nil { return err } @@ -155,6 +155,13 @@ func (r *Remote) PushContext(ctx context.Context, o *PushOptions) error { return err } + defer func() { + o.StatusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusDone, + ObjectsTotal: len(hashesToPush), + }) + }() + return r.updateRemoteReferenceStorage(req, rs) } @@ -267,6 +274,12 @@ func (r *Remote) fetch(ctx context.Context, o *FetchOptions) (storer.ReferenceSt return nil, err } + defer func() { + o.StatusChan.SendUpdate(plumbing.StatusUpdate{ + Stage: plumbing.StatusDone, + }) + }() + if !updated { return remoteRefs, NoErrAlreadyUpToDate } @@ -322,6 +335,7 @@ func (r *Remote) fetchPack(ctx context.Context, o *FetchOptions, s transport.Upl if err = packfile.UpdateObjectStorage(r.s, buildSidebandIfSupported(req.Capabilities, reader, o.Progress), + o.StatusChan, ); err != nil { return err } @@ -498,7 +512,9 @@ func calculateRefs(spec []config.RefSpec, }) } -func getWants(localStorer storage.Storer, refs memory.ReferenceStorage) ([]plumbing.Hash, error) { +func getWants( + localStorer storage.Storer, + refs memory.ReferenceStorage) ([]plumbing.Hash, error) { wants := map[plumbing.Hash]bool{} for _, ref := range refs { hash := ref.Hash() @@ -742,6 +758,7 @@ func pushHashes( sto storer.EncodedObjectStorer, req *packp.ReferenceUpdateRequest, hs []plumbing.Hash, + statusChan plumbing.StatusChan, ) (*packp.ReportStatus, error) { rd, wr := io.Pipe() @@ -749,7 +766,7 @@ func pushHashes( done := make(chan error) go func() { e := packfile.NewEncoder(wr, sto, false) - if _, err := e.Encode(hs); err != nil { + if _, err := e.Encode(hs, statusChan); err != nil { done <- wr.CloseWithError(err) return } diff --git a/storage/filesystem/internal/dotgit/dotgit.go b/storage/filesystem/internal/dotgit/dotgit.go index 2840bc74d..56c392879 100644 --- a/storage/filesystem/internal/dotgit/dotgit.go +++ b/storage/filesystem/internal/dotgit/dotgit.go @@ -137,8 +137,8 @@ func (d *DotGit) Shallow() (billy.File, error) { // NewObjectPack return a writer for a new packfile, it saves the packfile to // disk and also generates and save the index for the given packfile. -func (d *DotGit) NewObjectPack() (*PackWriter, error) { - return newPackWrite(d.fs) +func (d *DotGit) NewObjectPack(statusChan plumbing.StatusChan) (*PackWriter, error) { + return newPackWrite(d.fs, statusChan) } // ObjectPacks returns the list of availables packfiles diff --git a/storage/filesystem/internal/dotgit/writers.go b/storage/filesystem/internal/dotgit/writers.go index 46d361982..2349be042 100644 --- a/storage/filesystem/internal/dotgit/writers.go +++ b/storage/filesystem/internal/dotgit/writers.go @@ -22,15 +22,16 @@ import ( type PackWriter struct { Notify func(plumbing.Hash, *packfile.Index) - fs billy.Filesystem - fr, fw billy.File - synced *syncedReader - checksum plumbing.Hash - index *packfile.Index - result chan error + fs billy.Filesystem + fr, fw billy.File + synced *syncedReader + checksum plumbing.Hash + index *packfile.Index + result chan error + statusChan plumbing.StatusChan } -func newPackWrite(fs billy.Filesystem) (*PackWriter, error) { +func newPackWrite(fs billy.Filesystem, statusChan plumbing.StatusChan) (*PackWriter, error) { fw, err := fs.TempFile(fs.Join(objectsPath, packPath), "tmp_pack_") if err != nil { return nil, err @@ -42,11 +43,12 @@ func newPackWrite(fs billy.Filesystem) (*PackWriter, error) { } writer := &PackWriter{ - fs: fs, - fw: fw, - fr: fr, - synced: newSyncedReader(fw, fr), - result: make(chan error), + fs: fs, + fw: fw, + fr: fr, + synced: newSyncedReader(fw, fr), + result: make(chan error), + statusChan: statusChan, } go writer.buildIndex() @@ -61,7 +63,7 @@ func (w *PackWriter) buildIndex() { return } - checksum, err := d.Decode() + checksum, err := d.Decode(w.statusChan) if err != nil { w.result <- err return @@ -149,7 +151,7 @@ func (w *PackWriter) encodeIdx(writer io.Writer) error { idx.PackfileChecksum = w.checksum idx.Version = idxfile.VersionSupported e := idxfile.NewEncoder(writer) - _, err := e.Encode(idx) + _, err := e.Encode(idx, w.statusChan) return err } diff --git a/storage/filesystem/object.go b/storage/filesystem/object.go index 5073a3827..c6a0da989 100644 --- a/storage/filesystem/object.go +++ b/storage/filesystem/object.go @@ -77,12 +77,12 @@ func (s *ObjectStorage) NewEncodedObject() plumbing.EncodedObject { return &plumbing.MemoryObject{} } -func (s *ObjectStorage) PackfileWriter() (io.WriteCloser, error) { +func (s *ObjectStorage) PackfileWriter(statusChan plumbing.StatusChan) (io.WriteCloser, error) { if err := s.requireIndex(); err != nil { return nil, err } - w, err := s.dir.NewObjectPack() + w, err := s.dir.NewObjectPack(statusChan) if err != nil { return nil, err }