- Transaction relay
- The transaction relay process propagates new signed and
validated transaction through the peer-to-peer network of the blockchain
nodes. Every node participates in the transaction relay process by receiving
new signed transactions, validating signed transactions against the pending
state of the node, and forwarding validated transactions to the list of known
peers of the node. There are two cases where the transaction relay process
takes place
- Transaction sign, send, validate, and relay
- New transactions are first submitted by the client to the node for signing with the sender account, and then sent by the client to the node for validation through the transaction application to the pending state of the node. A transactions that has failed the transaction application is rejected with an error that is directly returned to the client that sent the transaction. After the successful transaction application the transaction is considered validated and is the transaction is relayed to the list of known peers of the node. The transaction sign, send, validate, and relay process ensures that only validated transactions are relayed to the list of known peers. Reception of an already applied transaction results in a transaction application error, and the duplicated transaction is not relayed any more. This protects the blockchain network from relaying already relayed duplicated transactions
- Transaction receive, validate, and relay
- Transactions signed and sent to other nodes are relayed to the node through the transaction sign, send, validate, and relay mechanism. The node receives the transaction, applies the transaction to the pending state and, if successful, relays the transaction to the list of known peers. Once again, application of an already applied transaction to the pending state results in an error, and the duplicated transaction is not relayed any more. This protects the blockchain network from relaying already relayed duplicated transactions
- Message relay type
- The
MsgRelay
type reliably handles propagation of the validated transaction and the proposed and validated blocks through the peer-to-peer network of the blockchain. The message relay takes messages from where they are created and validated and forwards messages to where they are processed and confirmed. The message relay component is fully integrated in the node graceful shutdown mechanism. The node shared context hierarchy is used to signal the graceful shutdown to all concurrent processes on the node including the message relay. On reception of the shutdown signal the message relay finishes relaying messages in progress and gracefully terminates. The node main goroutine gives time to all node concurrent processes to terminate gracefully by waiting on the node shared wait group. The inbound message channel is exposed to the transaction and block creation and forwarding node components to receive new messages in the concurrency safe way. The transactions and blocks are relayed through the gRPC client streaming. All the transaction- and block-specific gRPC client streaming logic is encapsulated and parameterized in the generic gRPC relay function. The message relay supports self-relaying messages to the node itself through the self-relay configuration parameter. The message relay periodically monitors the list of known peer for new peers to start relaying messages to the newly discovered peers. The message relay gracefully terminates all goroutines dedicated to relaying messages to each known peer by waiting on the relays wait group after the node shared context hierarchy has been canceled. The peer add and the peer remove channels ensure concurrency safe addition of new discovered peers and removal of closed peers, while reliably relaying messages to the known, healthy peers- Generic message relay
- The
MsgRelay
type is a generic type that handles relay of both transactions and blocks in the peer-to-peer network of the blockchain. The message relay type is parameterized with the message type to relay transaction or blocks, and the generic gRPC relay function to manage the corresponding gRPC client stream of transactions or blocks. This design provides the flexible and resilient message relay, while reusing the robust implementation of the common concurrent message relay logic without code duplication that would be required if the message relay type were not generic - Concurrency safe message relay
- The message relay type is concurrency safe. The concurrency safety of the message relay type is achieved through the use of channels. The single inbound message channel is exposed to the transaction sign, send, validate, and relay component, and the transaction receive, validate, and relay component. The inbound message channel receives new or relayed validated transactions and blocks to be further relayed to the list of known peers of the node. The message relay is implemented by multiplexing the inbound message channel over the list of outbound relay channels each with a dedicated goroutine representing every peer from the list of known peers of the node. This design ensures concurrent relay of messages without blocking known peers down the list if there is a problem with a peer at the top of the list. The relay of messages is fully concurrent on the basis of each known peer. Problems with relaying messages to one peer do not impact message relay to other peers
- Concurrent peer monitoring
- The message relay periodically and concurrently checks the list of known peers of the node for new discovered peers to include into the message relay list of peers. When a new peer is discovered a dedicated outbound relay channel is created with an associated goroutine to forward relayed messages from the inbound message channel through the outbound relay channel via the gRPC client streaming to the new peer. After message relay error, the connection is gracefully closed, the dedicated outbound relay channel is closed, and the associated goroutine is gracefully terminated. This design contributes to the resilience of the message relay, as new peers are progressively added to the message relay list of peers, while the problematic peers are gracefully closed and disposed in the concurrency safe way
- Message relay gRPC client streaming
- This implementation uses the gRPC client streaming to relay messages from the outbound relay channels to every known peer. This design naturally forwards a stream of new transactions or blocks to every peer with the minimal network overhead. A separate gRPC client streaming connection is established with every known peer. The gRPC client streaming connection is reused until there is a message relay error e.g. the peer node goes offline. When a message relay error occurs, the gRPC client streaming connection is closed, the outbound relay channel is closed, and the associated message relay goroutine is gracefully terminated
ctx context.Context
Node shared context hierarchy wg *sync.WaitGroup
Node shared wait group chMsg chan Msg
Inbound generic message channel grpcRelay Relay
gRPC generic client streaming selfRelay bool
Self-relay configuration parameter peerReader PeerReader
Peer reader wgRelays *sync.WaitGroup
Relays wait group chPeerAdd, chPeerRem chan string
Peer add and peer remove channels type MsgRelay[Msg any, Relay GRPCMsgRelay[Msg]] struct { ctx context.Context wg *sync.WaitGroup chMsg chan Msg grpcRelay Relay selfRelay bool peerReader PeerReader wgRelays *sync.WaitGroup chPeerAdd, chPeerRem chan string } func NewMsgRelay[Msg any, Relay GRPCMsgRelay[Msg]]( ctx context.Context, wg *sync.WaitGroup, cap int, grpcRelay Relay, selfRelay bool, peerReader PeerReader, ) *MsgRelay[Msg, Relay] { return &MsgRelay[Msg, Relay]{ ctx: ctx, wg: wg, chMsg: make(chan Msg, cap), grpcRelay: grpcRelay, selfRelay: selfRelay, peerReader: peerReader, wgRelays: new(sync.WaitGroup), chPeerAdd: make(chan string), chPeerRem: make(chan string), } }
The message relay exposes the single inbound message channel to relay messages.
The message relay for transactions and blocks happens through the TxRelayer
and the BlockRelayer
interfaces respectively. These interfaces are implemented
by the generic MsgRelay
type. Places where the relayer interfaces are used
- The
TxRelayer
interface is used by theTxSend
andTxReceive
methods of theTx
gRPC service - The
BlockRelayer
interface is used by theProposeBlocks
method of theBlockProposer
type and by theBlockReceive
method of theBlock
gRPC service
type TxRelayer interface {
RelayTx(tx chain.SigTx)
}
type BlockRelayer interface {
RelayBlock(blk chain.SigBlock)
}
func (r *MsgRelay[Msg, Relay]) RelayTx(tx Msg) {
r.chMsg <- tx
}
func (r *MsgRelay[Msg, Relay]) RelayBlock(blk Msg) {
r.chMsg <- blk
}
- Message relay algorithm
- The message relay algorithm coordinates concurrent
processes of the monitoring new discovered peers through the peer reader
interface, adding new peers for the message relay, removing offline peers from
the message relay, multiplexing the inbound message channel over the list of
outbound message relay channels. The message relay algorithm starts a
dedicated goroutine to periodically read all known peers of the node. Each new
peer is handled by the peer add channel. If the peer is not in the list of
message relay peers, A new outbound message relay channel is created with an
associated goroutine to independently and concurrently manage the message
relay to the peer. A failure in communication with a peer causes the failed
peer to be handled by the peer remove channel. The outbound message relay
channel for the peer is closed, the associated goroutine is gracefully
terminated. Later the same peer, when online again, will be handled by the
peer add channel. Finally all messages from the inbound message relay channel
are multiplexed to all active outbound message relay channels. The message
relay algorithm is fully integrated into the node graceful shutdown mechanism
by monitoring the cancellation of the node shared context hierarchy, waiting
for all message relay goroutines to gracefully terminate through the relays
wait group, and notifying the graceful shutdown of the message relay to the
node through the node shared wait group. The message relay algorithm
- Start a goroutine for concurrently monitoring the list of known peers
- Compose the cancellation channel of the node shared context hierarchy, the
add peer channel for adding new peers, the remove peer channel for removal
of closed peers, the inbound message relay channel for multiplexing the
inbound message relay channel to the list of outbound message relay channels
- When the node shared context hierarchy is canceled, close all active outbound message relay channels, wait for all active message relay goroutines to gracefully terminate, and stop the message relay
- When a new peer is discovered, create a new outbound message relay channel, start a new goroutine to handle the message relay to the new peer
- When an active peer connection is closed, close the outbound message relay channel, which causes the associated message relay goroutine to terminate gracefully
- When a new message is sent to the inbound message relay channel, multiplex the message over the list of active outbound message relay channels. The gRPC client streaming to all known active peers is performed concurrently
func (r *MsgRelay[Msg, Relay]) RelayMsgs(period time.Duration) { defer r.wg.Done() r.wgRelays.Add(1) go r.addPeers(period) chRelays := make(map[string]chan Msg) closeRelays := func() { for _, chRelay := range chRelays { close(chRelay) } } for { select { case <- r.ctx.Done(): closeRelays() r.wgRelays.Wait() return case peer := <- r.chPeerAdd: _, exist := chRelays[peer] if exist { continue } if r.selfRelay { fmt.Printf("<=> Blk relay: %v\n", peer) } else { fmt.Printf("<=> Tx relay: %v\n", peer) } chRelay := r.peerRelay(peer) chRelays[peer] = chRelay case peer := <- r.chPeerRem: _, exist := chRelays[peer] if !exist { continue } chRelay := chRelays[peer] close(chRelay) delete(chRelays, peer) case msg := <- r.chMsg: for _, chRelay := range chRelays { chRelay <- msg } } } }
- Monitor peers
- The concurrent peers monitoring process is started by the
message relay in order to keep relaying messages to new peers discovered over
the time. The concurrent peers monitoring is performed periodically through
the peer reader interface. The concurrent peers monitoring process is fully
integrated into the node graceful shutdown mechanism. In each peers monitoring
cycle the list of known peers is sent to the peer add channel. Only new peers
will result in creation of the new outbound message relay channel with the
associated message relay goroutine. Based on the value of the self relay
configuration parameter either only known peers or known peers the the node’s
own address are sent to the peer add channel. The concurrent peers monitoring
process
- Periodically read the list of known peers with or without the node’s own address
- Send all known peers to the peer add channel
func (r *MsgRelay[Msg, Relay]) addPeers(period time.Duration) { defer r.wgRelays.Done() tick := time.NewTicker(period) defer tick.Stop() for { select { case <- r.ctx.Done(): return case <- tick.C: var peers []string if r.selfRelay { peers = r.peerReader.SelfPeers() } else { peers = r.peerReader.Peers() } for _, peer := range peers { r.chPeerAdd <- peer } } } }
- Outbound message relay
- The outbound message relay happens through the
dedicated to the peer outbound message relay channel in the associated
goroutine. This design allows to increase the throughput of relayed messages
by independently and concurrently handling message relay to each active peer,
to increase the resilience by isolating message relays in dedicated goroutines
on the per peer basis. The message relay goroutines are integrated into the
message relay graceful shutdown mechanism through the node shared context
hierarchy and the dedicated relays wait group. The outbound message relay
creates the gRPC client connection with the peer and passes the node shared
context hierarchy. the gRPC client connection, and the outbound message relay
channel to the generic gRPC relay function that manages the gRPC client stream
of relayed messages. On any error of establishing the gRPC client connection
or handling the gRPC client stream the peer is sent to the peer remove
channel. This mark the peer as inactive, closes the channel, and gracefully
terminates the associated goroutine. The outbound message relay process
- Start a dedicated to the peer message relay goroutine
- The goroutine establishes the gRPC client connection with the peer
- The goroutine passes the node shared context hierarchy, the gRPC client connection, and the outbound message relay channel to the generic gRPC relay function
func (r *MsgRelay[Msg, Relay]) peerRelay(peer string) chan Msg { chRelay := make(chan Msg) r.wgRelays.Add(1) go func () { defer r.wgRelays.Done() conn, err := grpc.NewClient( peer, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { fmt.Println(err) r.chPeerRem <- peer return } defer conn.Close() err = r.grpcRelay(r.ctx, conn, chRelay) if err != nil { fmt.Println(err) r.chPeerRem <- peer return } }() return chRelay }
- Transaction relay through gRPC client streaming
- The gRPC client streaming
relays transactions or blocks from the outbound message relay channel to the
gRPC client stream of transactions or blocks. The gRPC client streaming is
message type specific and is parameterized in the message relay type with the
gRPC relay generic function. The gRPC relay generic function accepts the node
shared context hierarchy, the gRPC client connection, and the outbound message
relay channel. The gRPC client streaming creates the message-specific gRPC
client and establishes the gRPC client stream. The gRPC client streaming
combines the node shared context cancellation channel for the graceful
shutdown with the outbound message relay channel for streaming messages to the
peer. When a new message is sent to the outbound message relay channel, the
message is encoded and sent over the gRPC client stream to the peer. The
transaction relay through the gRPC client streaming
- Create the gRPC transaction client
- Call the gRPC
TxReceive
method to establish the gRPC client stream with the peer - Combine the cancellation channel of the node shared context hierarchy with
the outbound transaction relay channel
- When the node shared context hierarchy is canceled, close the gRPC client connection and stop the transaction relay to the peer
- When a new transaction is sent to the outbound transaction relay channel, forward the transaction to the established gRPC client stream
type GRPCMsgRelay[Msg any] func( ctx context.Context, conn *grpc.ClientConn, chRelay chan Msg, ) error var GRPCTxRelay GRPCMsgRelay[chain.SigTx] = func( ctx context.Context, conn *grpc.ClientConn, chRelay chan chain.SigTx, ) error { cln := rpc.NewTxClient(conn) stream, err := cln.TxReceive(context.Background()) if err != nil { return err } defer stream.CloseAndRecv() for { select { case <- ctx.Done(): return nil case tx, open := <- chRelay: if !open { return nil } jtx, err := json.Marshal(tx) if err != nil { fmt.Println(err) continue } req := &rpc.TxReceiveReq{Tx: jtx} err = stream.Send(req) if err != nil { fmt.Println(err) continue } } } }
The gRPC Tx
service provides the TxReceive
method to receive transactions
relayed from other peers on the blockchain. The transaction relay happens from
the gRPC TxSend
method and from the gRPC TxReceive
method to further relay
validated transactions to other peers. The transaction relay forwards
transactions to other peers through the gRPC client streaming. The interface of
the service
message TxReceiveReq {
bytes Tx = 1;
}
message TxReceiveRes { }
service Tx {
rpc TxReceive(stream TxReceiveReq) returns (TxReceiveRes);
}
The implementation of the TxReceive
method
- For each transaction received from the gRPC client stream
- Decode the transaction
- Apply the decoded transaction to the pending state, if successful,
- Relay further the validated transaction to the list of known peers
func (s *TxSrv) TxReceive(
stream grpc.ClientStreamingServer[TxReceiveReq, TxReceiveRes],
) error {
for {
req, err := stream.Recv()
if err == io.EOF {
res := &TxReceiveRes{}
return stream.SendAndClose(res)
}
if err != nil {
return status.Errorf(codes.Internal, err.Error())
}
var tx chain.SigTx
err = json.Unmarshal(req.Tx, &tx)
if err != nil {
fmt.Println(err)
continue
}
fmt.Printf("<== Tx receive\n%v\n", tx)
err = s.txApplier.ApplyTx(tx)
if err != nil {
fmt.Print(err)
continue
}
if s.txRelayer != nil {
s.txRelayer.RelayTx(tx)
}
}
}
The TestTxReceive
testing process
- Create and persist the genesis
- Create the state from the genesis
- Get the initial owner account and its balance from the genesis
- Re-create the initial owner account from the genesis
- Set up the gRPC server and gRPC client
- Create the gRPC transaction client
- Call the
TxReceive
method to get the gRPC client stream to relay validated transactions - Start relaying valid and invalid transactions to the gRPC client stream. For
each transaction
- Create and sign a transaction
- Encode the signed transaction
- Call the gRPC
TxReceive
method to relay the encoded transaction - Wait for the relayed transaction to be received and processed
- Verify that the balance of the initial owner account on the pending state after receiving relayed transactions is correct
go test -v -cover -coverprofile=coverage.cov ./... -run TxReceive
The TestTxRelay
testing process
- Set up the bootstrap node
- Create the peer discovery without starting for the bootstrap node
- Initialize the state on the bootstrap node by creating the genesis
- Create and start the transaction relay for the bootstrap node
- Start the gRPC server on the bootstrap node
- Set up the new node
- Create and start the peer discovery for the new node
- Wait for the peer discovery to discover peers
- Synchronize the state on the new node by fetching the genesis and confirmed blocks from the bootstrap node
- Start the gRPC server on the new node
- Wait for the gRPC server of the new node to start
- Get the initial owner account and its balance from the genesis
- Re-create the initial owner account from the genesis
- Sign and send several signed transactions to the bootstrap node
- Verify that the initial account balance on the pending state of the new node and the bootstrap node are equal
go test -v -cover -coverprofile=coverage.cov ./... -run TxRelay