Skip to content

Commit bb6b87b

Browse files
committed
WIP. Peer loader implemented and used to restore blockchain history.
Get rid of internal package to simplify testing with mockery.
1 parent 4a52493 commit bb6b87b

38 files changed

+1602
-364
lines changed

.mockery.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
quiet: False
2+
disable-version-string: True
3+
with-expecter: True
4+
mockname: "Mock{{.InterfaceName}}"
5+
filename: "{{.InterfaceName | snakecase}}.go"
6+
7+
inpackage: True
8+
9+
packages:
10+
github.com/alexeykiselev/waves-fork-detector/peers:
11+
interfaces:
12+
HistoryRequester:
13+
github.com/alexeykiselev/waves-fork-detector/chains:
14+
interfaces:
15+
HistoryProvider:
16+
github.com/alexeykiselev/waves-fork-detector/loading:
17+
interfaces:
18+
Reporter:

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,6 @@ dist: clean release
7070
@cd ./build/bin/darwin-amd64/; tar pzcvf ../../dist/forkdetector_$(VERSION)_macOS-amd64.tar.gz ./forkdetector*
7171
@cd ./build/bin/darwin-arm64/; tar pzcvf ../../dist/forkdetector_$(VERSION)_macOS-arm64.tar.gz ./forkdetector*
7272

73+
74+
mock:
75+
@mockery

internal/api.go renamed to api.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
package internal
1+
package main
22

33
import (
44
"compress/flate"
55
"context"
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"math/big"
910
"net/http"
1011
"net/netip"
1112
"runtime"
@@ -16,8 +17,8 @@ import (
1617
"go.uber.org/zap"
1718
"golang.org/x/sync/errgroup"
1819

19-
"github.com/alexeykiselev/waves-fork-detector/internal/chains"
20-
"github.com/alexeykiselev/waves-fork-detector/internal/peers"
20+
"github.com/alexeykiselev/waves-fork-detector/chains"
21+
"github.com/alexeykiselev/waves-fork-detector/peers"
2122
)
2223

2324
const defaultTimeout = 30 * time.Second
@@ -58,6 +59,14 @@ type status struct {
5859
GoroutinesCount int `json:"goroutines_count"`
5960
}
6061

62+
type HeadInfo struct {
63+
Number uint64 `json:"number"`
64+
ID string `json:"id"`
65+
Height uint32 `json:"height"`
66+
Score *big.Int `json:"score"`
67+
Timestamp time.Time `json:"timestamp"`
68+
}
69+
6170
type API struct {
6271
ctx context.Context
6372
wait func() error
@@ -120,13 +129,15 @@ func (a *API) runServer() error {
120129

121130
func (a *API) routes() chi.Router {
122131
r := chi.NewRouter()
123-
r.Get("/status", a.status) // Status information
124132
r.Get("/peers/all", a.peers) // Returns the list of all known peers
125133
r.Get("/peers/friendly", a.friendly) // Returns the list of peers that have been successfully connected at least once
126134
r.Get("/connections", a.connections) // Returns the list of active connections
127-
r.Get("/forks", a.forks) // Returns the combined info about forks for all connected peers
128-
r.Get("/all-forks", a.allForks) // Returns the combined info about all registered forks
129-
r.Get("/fork/{address}", a.fork) // Returns the info about fork of the given peer
135+
r.Get("/heads", a.heads) // Returns the combined info about heads for all connected peers
136+
137+
r.Get("/status", a.status) // Status information
138+
r.Get("/forks", a.forks) // Returns the combined info about forks for all connected peers
139+
r.Get("/all-forks", a.allForks) // Returns the combined info about all registered forks
140+
r.Get("/fork/{address}", a.fork) // Returns the info about fork of the given peer
130141
return r
131142
}
132143

@@ -203,6 +214,34 @@ func (a *API) connections(w http.ResponseWriter, _ *http.Request) {
203214
}
204215
}
205216

217+
func (a *API) heads(w http.ResponseWriter, _ *http.Request) {
218+
heads, err := a.linkage.Heads()
219+
if err != nil {
220+
http.Error(w, fmt.Sprintf("Failed to complete request: %v", err), http.StatusInternalServerError)
221+
return
222+
}
223+
infos := make([]HeadInfo, len(heads))
224+
for i, h := range heads {
225+
b, blErr := a.linkage.Block(h.BlockID)
226+
if blErr != nil {
227+
http.Error(w, fmt.Sprintf("Failed to complete request: %v", blErr), http.StatusInternalServerError)
228+
return
229+
}
230+
infos[i] = HeadInfo{
231+
Number: h.ID,
232+
ID: h.BlockID.String(),
233+
Height: b.Height,
234+
Score: b.Score,
235+
Timestamp: time.UnixMilli(int64(b.Timestamp)),
236+
}
237+
}
238+
err = json.NewEncoder(w).Encode(infos)
239+
if err != nil {
240+
http.Error(w, fmt.Sprintf("Failed to marshal heads to JSON: %v", err), http.StatusInternalServerError)
241+
return
242+
}
243+
}
244+
206245
func (a *API) forks(w http.ResponseWriter, _ *http.Request) {
207246
nodes, err := a.registry.Connections()
208247
if err != nil {
File renamed without changes.

internal/chains/linkage.go renamed to chains/linkage.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,22 @@ import (
1111
)
1212

1313
var (
14-
ErrBlockNotFound = fmt.Errorf("block not found")
15-
ErrUnleashedPeer = fmt.Errorf("peer not leashed")
1614
ErrParentNotFound = fmt.Errorf("parent not found")
15+
ErrRefNotFound = fmt.Errorf("reference block not found")
1716
)
1817

18+
type Leasher interface {
19+
Leash(addr netip.Addr) (proto.BlockID, error)
20+
MoveLeash(id proto.BlockID, addr netip.Addr) error
21+
}
22+
23+
type HistoryProvider interface {
24+
Leasher
25+
HasBlock(id proto.BlockID) (bool, error)
26+
LastIDs(id proto.BlockID, count int) ([]proto.BlockID, error)
27+
PutBlock(block *proto.Block, addr netip.Addr) error
28+
}
29+
1930
type Linkage struct {
2031
scheme proto.Scheme
2132
genesis proto.BlockID
@@ -72,11 +83,39 @@ func (l *Linkage) PutBlock(block *proto.Block, addr netip.Addr) error {
7283
return nil
7384
}
7485

86+
func (l *Linkage) PutMicroBlock(inv *proto.MicroBlockInv, addr netip.Addr) error {
87+
ok, err := l.st.hasBlock(inv.TotalBlockID)
88+
if err != nil {
89+
return err
90+
}
91+
if ok {
92+
if ulErr := l.st.updateLeash(addr, inv.TotalBlockID); ulErr != nil {
93+
return ulErr
94+
}
95+
return nil
96+
}
97+
hasRef, err := l.st.hasBlock(inv.Reference)
98+
if err != nil {
99+
return err
100+
}
101+
if !hasRef {
102+
return ErrRefNotFound
103+
}
104+
if insErr := l.st.putMicroBlock(inv); insErr != nil {
105+
return insErr
106+
}
107+
if ulErr := l.st.updateLeash(addr, inv.TotalBlockID); ulErr != nil {
108+
return ulErr
109+
}
110+
return nil
111+
}
112+
113+
// Leash return the block ID the peer is leashed to. For an unleashed peer the genesis block ID is returned.
75114
func (l *Linkage) Leash(addr netip.Addr) (proto.BlockID, error) {
76115
lsh, err := l.st.leash(addr)
77116
if err != nil {
78117
if errors.Is(err, leveldb.ErrNotFound) {
79-
return proto.BlockID{}, ErrUnleashedPeer
118+
return l.genesis, nil // Return genesis block ID in case of no leash for the peer.
80119
}
81120
return proto.BlockID{}, err
82121
}

internal/chains/linkage_internal_test.go renamed to chains/linkage_internal_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,17 @@ func TestLeashUpdate(t *testing.T) {
3333
bl3 := createNthBlock(t, bl2.BlockID(), 3)
3434

3535
a := netip.MustParseAddr("8.8.8.8")
36-
_, err := lk.Leash(a)
37-
assert.ErrorIs(t, err, ErrUnleashedPeer)
36+
id, err := lk.Leash(a)
37+
assert.NoError(t, err)
38+
assert.Equal(t, id1, id)
3839

3940
err = lk.PutBlock(bl2, a)
4041
require.NoError(t, err)
4142

4243
ok, err := lk.hasBlock(bl2.BlockID())
4344
require.NoError(t, err)
4445
assert.True(t, ok)
45-
id, err := lk.Leash(a)
46+
id, err = lk.Leash(a)
4647
require.NoError(t, err)
4748
assert.Equal(t, bl2.BlockID(), id)
4849

@@ -66,15 +67,17 @@ func TestMultipleLeashes(t *testing.T) {
6667
a := netip.MustParseAddr("8.8.8.8")
6768
b := netip.MustParseAddr("9.9.9.9")
6869

69-
_, err := lk.Leash(a)
70-
assert.ErrorIs(t, err, ErrUnleashedPeer)
71-
_, err = lk.Leash(b)
72-
assert.ErrorIs(t, err, ErrUnleashedPeer)
70+
id, err := lk.Leash(a)
71+
assert.NoError(t, err)
72+
assert.Equal(t, id1, id)
73+
id, err = lk.Leash(b)
74+
assert.NoError(t, err)
75+
assert.Equal(t, id1, id)
7376

7477
err = lk.PutBlock(bl2, a)
7578
require.NoError(t, err)
7679

77-
id, err := lk.Leash(a)
80+
id, err = lk.Leash(a)
7881
require.NoError(t, err)
7982
assert.Equal(t, bl2.BlockID(), id)
8083

internal/chains/storage.go renamed to chains/storage.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,63 @@ func (s *storage) putProtoBlock(block *proto.Block) error {
176176
return s.db.Write(batch, nil)
177177
}
178178

179+
func (s *storage) putMicroBlock(inv *proto.MicroBlockInv) error {
180+
sn, err := s.db.GetSnapshot()
181+
if err != nil {
182+
return fmt.Errorf("failed to put micro-block: %w", err)
183+
}
184+
defer sn.Release()
185+
n, err := s.getCounter(sn, blocksCounter)
186+
if err != nil {
187+
return fmt.Errorf("failed to put micro-block: %w", err)
188+
}
189+
// Get reference block and its number.
190+
refNum, err := s.getNumOfBlockID(sn, inv.Reference)
191+
if err != nil {
192+
return fmt.Errorf("failed to put micro-block: %w", err)
193+
}
194+
195+
ref, err := s.getBlock(sn, refNum)
196+
if err != nil {
197+
return fmt.Errorf("failed to put micro-block: %w", err)
198+
}
199+
200+
parentID, err := proto.NewBlockIDFromBytes(ref.Parent)
201+
if err != nil {
202+
return fmt.Errorf("failed to put micro-block: %w", err)
203+
}
204+
205+
batch := new(leveldb.Batch)
206+
207+
// Put next block number.
208+
n++
209+
s.putCounter(batch, blocksCounter, n)
210+
211+
b := block{
212+
Height: ref.Height,
213+
Timestamp: ref.Timestamp,
214+
Score: ref.Score,
215+
ID: inv.TotalBlockID.Bytes(),
216+
Parent: ref.Parent,
217+
Generator: ref.Generator,
218+
}
219+
220+
s.putID(batch, blockPrefix, inv.TotalBlockID, n) // Save reference ID -> num.
221+
// Put information about block by its number.
222+
if putErr := s.putBlock(batch, n, b); putErr != nil {
223+
return fmt.Errorf("failed to put micro-block: %w", putErr)
224+
}
225+
226+
// Put block into the chain.
227+
if lnkErr := s.putNewLinkForBlock(sn, batch, n, inv.TotalBlockID, b.Height, parentID); lnkErr != nil {
228+
return fmt.Errorf("failed to put micro-block: %w", lnkErr)
229+
}
230+
if phErr := s.putHead(sn, batch, n, refNum); phErr != nil {
231+
return fmt.Errorf("failed to put micro-block: %w", phErr)
232+
}
233+
return s.db.Write(batch, nil)
234+
}
235+
179236
func (s *storage) getAncestors(id proto.BlockID, count int) ([]proto.BlockID, error) {
180237
sn, err := s.db.GetSnapshot()
181238
if err != nil {
@@ -397,7 +454,7 @@ func (s *storage) putNewLinkForBlock(
397454
parents = append(parents, pn)
398455
pl, glErr := s.getLink(sn, pn)
399456
if glErr != nil {
400-
return err
457+
return glErr
401458
}
402459
if len(pl.Parents) > l {
403460
pn = pl.Parents[l]
File renamed without changes.
File renamed without changes.

internal/connection_manager.go renamed to connection_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package internal
1+
package main
22

33
import (
44
"context"
@@ -13,7 +13,7 @@ import (
1313
"github.com/wavesplatform/gowaves/pkg/p2p/peer"
1414
"github.com/wavesplatform/gowaves/pkg/proto"
1515

16-
"github.com/alexeykiselev/waves-fork-detector/internal/peers"
16+
"github.com/alexeykiselev/waves-fork-detector/peers"
1717
)
1818

1919
const (

0 commit comments

Comments
 (0)