Skip to content

Commit

Permalink
fix: networking: avoid dialing when trying to handshake peers
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Jan 29, 2024
1 parent dfe8ff5 commit 59970e9
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 106 deletions.
6 changes: 4 additions & 2 deletions app/submodule/syncer/syncer_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ func NewSyncerSubmodule(ctx context.Context,
}
}

network.HelloHandler.Register(func(ci *types.ChainInfo) {
if err := network.HelloHandler.Register(ctx, func(ci *types.ChainInfo) {
err := chainSyncManager.BlockProposer().SendHello(ci)
if err != nil {
log.Errorf("error receiving chain info from hello %s: %s", ci, err)
return
}
})
}); err != nil {
return nil, err
}

return &SyncerSubmodule{
Stmgr: stmgr,
Expand Down
200 changes: 102 additions & 98 deletions pkg/net/helloprotocol/hello_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
"github.com/filecoin-project/venus/pkg/chain"
"github.com/filecoin-project/venus/pkg/net/exchange"
"github.com/filecoin-project/venus/pkg/net/peermgr"
"github.com/filecoin-project/venus/venus-shared/libp2p/hello"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"

"github.com/filecoin-project/go-state-types/abi"
fbig "github.com/filecoin-project/go-state-types/big"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
net "github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"

"github.com/ipfs-force-community/metrics"
)
Expand All @@ -28,6 +30,8 @@ var log = logging.Logger("/fil/hello")
// helloProtocolID is the libp2p protocol identifier for the hello protocol.
const helloProtocolID = "/fil/hello/1.0.0"

const helloTimeout = time.Second * 10

var (
genesisErrCt = metrics.NewCounter("hello_genesis_error", "Number of errors encountered in hello protocol due to incorrect genesis block")
helloMsgErrCt = metrics.NewCounter("hello_message_error", "Number of errors encountered in hello protocol due to malformed message")
Expand Down Expand Up @@ -97,15 +101,41 @@ func NewHelloProtocolHandler(h host.Host,
}

// Register registers the handler with the network.
func (h *HelloProtocolHandler) Register(peerDiscoveredCallback PeerDiscoveredCallback) {
func (h *HelloProtocolHandler) Register(ctx context.Context, peerDiscoveredCallback PeerDiscoveredCallback) error {
// register callbacks
h.peerDiscovered = peerDiscoveredCallback

// register a handle for when a new connection against someone is created
h.host.SetStreamHandler(helloProtocolID, h.handleNewStream)

// register for connection notifications
h.host.Network().Notify((*helloProtocolNotifiee)(h))
sub, err := h.host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted), eventbus.BufSize(1024))
if err != nil {
return fmt.Errorf("failed to subscribe to event bus: %w", err)
}
go func() {
// We want to get information on connected peers, we don't want to trigger new connections.
ctx := net.WithNoDial(ctx, "filecoin hello")
for evt := range sub.Out() {
pic := evt.(event.EvtPeerIdentificationCompleted)
// We just finished identifying the peer, that means we should know what
// protocols it speaks. Check if it speeks the Filecoin hello protocol
// before continuing.
if p, _ := h.host.Peerstore().FirstSupportedProtocol(pic.Peer, hello.ProtocolID); p != hello.ProtocolID {
continue
}

go func() {
err := h.sayHello(ctx, pic.Peer)
if err != nil {
protos, _ := h.host.Peerstore().GetProtocols(pic.Peer)
agent, _ := h.host.Peerstore().Get(pic.Peer, "AgentVersion")
log.Warnw("failed to say hello", "error", err, "peer", pic.Peer, "supported", protos, "agent", agent)
}
}()
}
}()

return nil
}

func (h *HelloProtocolHandler) handleNewStream(s net.Stream) {
Expand Down Expand Up @@ -200,19 +230,6 @@ func (h *HelloProtocolHandler) loadLocalFullTipset(ctx context.Context, tsk type
// ErrBadGenesis is the error returned when a mismatch in genesis blocks happens.
var ErrBadGenesis = fmt.Errorf("bad genesis block")

func (h *HelloProtocolHandler) getOurHelloMessage() (*HelloMessage, error) {
heaviest := h.chainStore.GetHead()
height := heaviest.Height()
weight := heaviest.ParentWeight()

return &HelloMessage{
GenesisHash: h.genesis,
HeaviestTipSetCids: heaviest.Cids(),
HeaviestTipSetHeight: height,
HeaviestTipSetWeight: weight,
}, nil
}

func (h *HelloProtocolHandler) receiveHello(ctx context.Context, s net.Stream) (*HelloMessage, error) {
var hello HelloMessage
err := hello.UnmarshalCBOR(s)
Expand All @@ -228,112 +245,99 @@ func (h *HelloProtocolHandler) receiveLatency(ctx context.Context, s net.Stream)
return &latency, nil
}

// sendHello send a hello message on stream `s`.
func (h *HelloProtocolHandler) sendHello(s net.Stream) error {
msg, err := h.getOurHelloMessage()
if err != nil {
return err
}
// responding to latency
func sendLatency(msg *LatencyMessage, s net.Stream) error {
buf := new(bytes.Buffer)
if err := msg.MarshalCBOR(buf); err != nil {
return err
}

n, err := s.Write(buf.Bytes())
if err != nil {
return err
}
if n != buf.Len() {
return fmt.Errorf("could not write all hello message bytes")
return fmt.Errorf("could not write all latency message bytes")
}
return nil
}

// responding to latency
func sendLatency(msg *LatencyMessage, s net.Stream) error {
func (h *HelloProtocolHandler) sayHello(ctx context.Context, peerID peer.ID) error {
// add timeout
ctx, cancel := context.WithTimeout(ctx, helloTimeout)
defer cancel()

s, err := h.host.NewStream(ctx, peerID, helloProtocolID)
if err != nil {
// If peer does not do hello keep connection open
return err
}
defer func() { _ = s.Close() }()

t0 := time.Now()
// send out the hello message
err = h.sendHello(s)
if err != nil {
// Don't close connection for failed hello protocol impl
return fmt.Errorf("failed to send hello handshake: %s", err)
}
// now receive latency message
lmsg, err := h.receiveLatency(ctx, s)
if err != nil {
return fmt.Errorf("failed to receive hello latency msg: %s", err)
}

t3 := time.Now()
lat := t3.Sub(t0)
// add to peer tracker
h.peerMgr.SetPeerLatency(s.Conn().RemotePeer(), lat)

if err == nil {
if lmsg.TArrival != 0 && lmsg.TSent != 0 {
t1 := time.Unix(0, lmsg.TArrival)
t2 := time.Unix(0, lmsg.TSent)
offset := t0.Sub(t1) + t3.Sub(t2)
offset /= 2
if offset > 5*time.Second || offset < -5*time.Second {
log.Infow("time offset", "offset", offset.Seconds(), "peerid", peerID)
}
}
}

return err
}

// sendHello send a hello message on stream `s`.
func (h *HelloProtocolHandler) sendHello(s net.Stream) error {
msg, err := h.getOurHelloMessage()
if err != nil {
return err
}
log.Debug("Sending hello message: ", msg.HeaviestTipSetCids, msg.HeaviestTipSetHeight, msg.GenesisHash)

buf := new(bytes.Buffer)
if err := msg.MarshalCBOR(buf); err != nil {
return err
}

n, err := s.Write(buf.Bytes())
if err != nil {
return err
}
if n != buf.Len() {
return fmt.Errorf("could not write all latency message bytes")
return fmt.Errorf("could not write all hello message bytes")
}
return nil
}

// Note: hide `net.Notifyee` impl using a new-type
type helloProtocolNotifiee HelloProtocolHandler

const helloTimeout = time.Second * 10

func (hn *helloProtocolNotifiee) asHandler() *HelloProtocolHandler {
return (*HelloProtocolHandler)(hn)
}

//
// `net.Notifyee` impl for `helloNotify`
//

func (hn *helloProtocolNotifiee) Connected(n net.Network, c net.Conn) {
// Connected is invoked when a connection is made to a libp2p node.
//
// - open stream on connection
// - send HelloMessage` on stream
// - read LatencyMessage response on stream
//
// Terminate the connection if it has a different genesis block
go func() {
// add timeout
ctx, cancel := context.WithTimeout(context.Background(), helloTimeout)
defer cancel()
s, err := hn.asHandler().host.NewStream(ctx, c.RemotePeer(), helloProtocolID)
if err != nil {
// If peer does not do hello keep connection open
return
}
defer func() { _ = s.Close() }()

t0 := time.Now()
// send out the hello message
err = hn.asHandler().sendHello(s)
if err != nil {
log.Debugf("failed to send hello handshake to peer %s: %s", c.RemotePeer(), err)
// Don't close connection for failed hello protocol impl
return
}

// now receive latency message
lmsg, err := hn.asHandler().receiveLatency(ctx, s)
if err != nil {
log.Debugf("failed to receive hello latency msg from peer %s: %s", c.RemotePeer(), err)
return
}
func (h *HelloProtocolHandler) getOurHelloMessage() (*HelloMessage, error) {
heaviest := h.chainStore.GetHead()
height := heaviest.Height()
weight := heaviest.ParentWeight()

t3 := time.Now()
lat := t3.Sub(t0)
// add to peer tracker
hn.peerMgr.SetPeerLatency(s.Conn().RemotePeer(), lat)

if err == nil {
if lmsg.TArrival != 0 && lmsg.TSent != 0 {
t1 := time.Unix(0, lmsg.TArrival)
t2 := time.Unix(0, lmsg.TSent)
offset := t0.Sub(t1) + t3.Sub(t2)
offset /= 2
if offset > 5*time.Second || offset < -5*time.Second {
log.Infow("time offset", "offset", offset.Seconds(), "peerid", c.RemotePeer().String())
}
}
}
}()
return &HelloMessage{
GenesisHash: h.genesis,
HeaviestTipSetCids: heaviest.Cids(),
HeaviestTipSetHeight: height,
HeaviestTipSetWeight: weight,
}, nil
}

func (hn *helloProtocolNotifiee) Listen(n net.Network, a ma.Multiaddr) { /* empty */ }
func (hn *helloProtocolNotifiee) ListenClose(n net.Network, a ma.Multiaddr) { /* empty */ }
func (hn *helloProtocolNotifiee) Disconnected(n net.Network, c net.Conn) { /* empty */ }
func (hn *helloProtocolNotifiee) OpenedStream(n net.Network, s net.Stream) { /* empty */ }
func (hn *helloProtocolNotifiee) ClosedStream(n net.Network, s net.Stream) { /* empty */ }
18 changes: 12 additions & 6 deletions pkg/net/helloprotocol/hello_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ func TestHelloHandshake(t *testing.T) {
require.NoError(t, err)

// stm: @DISCOVERY_HELLO_REGISTER_001
helloprotocol.NewHelloProtocolHandler(a, aPeerMgr, nil, oldStore, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc1.HelloCallback)
helloprotocol.NewHelloProtocolHandler(b, aPeerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc2.HelloCallback)
err = helloprotocol.NewHelloProtocolHandler(a, aPeerMgr, nil, oldStore, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(ctx, msc1.HelloCallback)
require.NoError(t, err)
err = helloprotocol.NewHelloProtocolHandler(b, aPeerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(ctx, msc2.HelloCallback)
require.NoError(t, err)

msc1.On("HelloCallback", b.ID(), heavy2.Key()).Return()
msc2.On("HelloCallback", a.ID(), heavy1.Key()).Return()
Expand Down Expand Up @@ -132,8 +134,10 @@ func TestHelloBadGenesis(t *testing.T) {
peerMgr, err := mockPeerMgr(ctx, t, a)
require.NoError(t, err)

helloprotocol.NewHelloProtocolHandler(a, peerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(msc1.HelloCallback)
helloprotocol.NewHelloProtocolHandler(b, peerMgr, nil, builder2.Store(), builder2.Mstore(), genesisB.Blocks()[0].Cid(), time.Second*30).Register(msc2.HelloCallback)
err = helloprotocol.NewHelloProtocolHandler(a, peerMgr, nil, store, mstore, genesisA.Blocks()[0].Cid(), time.Second*30).Register(ctx, msc1.HelloCallback)
require.NoError(t, err)
err = helloprotocol.NewHelloProtocolHandler(b, peerMgr, nil, builder2.Store(), builder2.Mstore(), genesisB.Blocks()[0].Cid(), time.Second*30).Register(ctx, msc2.HelloCallback)
require.NoError(t, err)

msc1.On("HelloCallback", mock.Anything, mock.Anything, mock.Anything).Return()
msc2.On("HelloCallback", mock.Anything, mock.Anything, mock.Anything).Return()
Expand Down Expand Up @@ -178,8 +182,10 @@ func TestHelloMultiBlock(t *testing.T) {
peerMgr, err := mockPeerMgr(ctx, t, a)
require.NoError(t, err)

helloprotocol.NewHelloProtocolHandler(a, peerMgr, nil, oldStore, mstore, genesisTipset.At(0).Cid(), time.Second*30).Register(msc1.HelloCallback)
helloprotocol.NewHelloProtocolHandler(b, peerMgr, nil, store, mstore, genesisTipset.At(0).Cid(), time.Second*30).Register(msc2.HelloCallback)
err = helloprotocol.NewHelloProtocolHandler(a, peerMgr, nil, oldStore, mstore, genesisTipset.At(0).Cid(), time.Second*30).Register(ctx, msc1.HelloCallback)
require.NoError(t, err)
err = helloprotocol.NewHelloProtocolHandler(b, peerMgr, nil, store, mstore, genesisTipset.At(0).Cid(), time.Second*30).Register(ctx, msc2.HelloCallback)
require.NoError(t, err)

msc1.On("HelloCallback", b.ID(), heavy2.Key()).Return()
msc2.On("HelloCallback", a.ID(), heavy1.Key()).Return()
Expand Down

0 comments on commit 59970e9

Please sign in to comment.