Skip to content

Commit

Permalink
Merge pull request #1919 from dedis/work-be1-stuart-better-logging
Browse files Browse the repository at this point in the history
[BE1] better logging
  • Loading branch information
sgueissa committed Jun 12, 2024
2 parents 57c2ca0 + c197318 commit e47fa9a
Show file tree
Hide file tree
Showing 56 changed files with 410 additions and 259 deletions.
90 changes: 40 additions & 50 deletions be1-go/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"go.dedis.ch/kyber/v3"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -51,35 +50,10 @@ type ServerConfig struct {
DatabasePath string `json:"database-path"`
}

func (s *ServerConfig) newHub(l *zerolog.Logger) (oldHub.Hub, error) {
// compute the client server address if it wasn't provided
if s.ClientAddress == "" {
s.ClientAddress = fmt.Sprintf("ws://%s:%d/client", s.PublicAddress, s.ClientPort)
}
// compute the server server address if it wasn't provided
if s.ServerAddress == "" {
s.ServerAddress = fmt.Sprintf("ws://%s:%d/server", s.PublicAddress, s.ServerPort)
}

var ownerPubKey kyber.Point = nil
err := ownerKey(s.PublicKey, &ownerPubKey)
if err != nil {
return nil, err
}

hub, err := hub.New(s.DatabasePath, ownerPubKey, s.ClientAddress, s.ServerAddress)

if err != nil {
return nil, err
}

return hub, nil
}

// Serve parses the CLI arguments and spawns a hub and a websocket server for
// the server
func Serve(cliCtx *cli.Context) error {
poplog := logger.Logger
log := logger.Logger

configFilePath := cliCtx.String("config-file")
var serverConfig ServerConfig
Expand All @@ -102,7 +76,7 @@ func Serve(cliCtx *cli.Context) error {
}
}

h, err := serverConfig.newHub(&poplog)
h, err := serverConfig.newHub(log)
if err != nil {
return err
}
Expand All @@ -111,13 +85,11 @@ func Serve(cliCtx *cli.Context) error {
h.Start()

// Start websocket server for clients
clientSrv := network.NewServer(h, serverConfig.PrivateAddress, serverConfig.ClientPort, socket.ClientSocketType,
poplog.With().Str("role", "client websocket").Logger())
clientSrv := network.NewServer(h, serverConfig.PrivateAddress, serverConfig.ClientPort, socket.ClientSocketType, log)
clientSrv.Start()

// Start a websocket server for remote servers
serverSrv := network.NewServer(h, serverConfig.PrivateAddress, serverConfig.ServerPort, socket.ServerSocketType,
poplog.With().Str("role", "server websocket").Logger())
serverSrv := network.NewServer(h, serverConfig.PrivateAddress, serverConfig.ServerPort, socket.ServerSocketType, log)
serverSrv.Start()

// create wait group which waits for goroutines to finish
Expand All @@ -141,7 +113,7 @@ func Serve(cliCtx *cli.Context) error {
}

// start watching goroutine
go watchConfigFile(watcher, configFilePath, &serverConfig.OtherServers, updatedServersChan)
go watchConfigFile(watcher, configFilePath, &serverConfig.OtherServers, updatedServersChan, log)
}

// map to keep track of the connection status of the servers
Expand All @@ -152,7 +124,7 @@ func Serve(cliCtx *cli.Context) error {
}

wg.Add(1)
go serverConnectionLoop(h, wg, done, serverConfig.OtherServers, updatedServersChan, &connectedServers)
go serverConnectionLoop(h, wg, done, serverConfig.OtherServers, updatedServersChan, &connectedServers, log)

// Wait for a Ctrl-C
err = network.WaitAndShutdownServers(cliCtx.Context, nil, clientSrv, serverSrv)
Expand All @@ -177,18 +149,38 @@ func Serve(cliCtx *cli.Context) error {
select {
case <-channsClosed:
case <-time.After(time.Second * 10):
poplog.Error().Msg("channs didn't close after timeout, exiting")
log.Error().Msg("channs didn't close after timeout, exiting")
}

return nil
}

func (s *ServerConfig) newHub(log zerolog.Logger) (oldHub.Hub, error) {
// compute the client server address if it wasn't provided
if s.ClientAddress == "" {
s.ClientAddress = fmt.Sprintf("ws://%s:%d/client", s.PublicAddress, s.ClientPort)
}
// compute the server server address if it wasn't provided
if s.ServerAddress == "" {
s.ServerAddress = fmt.Sprintf("ws://%s:%d/server", s.PublicAddress, s.ServerPort)
}

var ownerPubKey kyber.Point = nil
err := ownerKey(s.PublicKey, &ownerPubKey, log)
if err != nil {
return nil, err
}

return hub.New(s.DatabasePath, ownerPubKey, s.ClientAddress, s.ServerAddress, log)
}

// serverConnectionLoop tries to connect to the remote servers following an exponential backoff strategy
// it also listens for updates in the other-servers field and tries to connect to the new servers
func serverConnectionLoop(h oldHub.Hub, wg *sync.WaitGroup, done chan struct{}, otherServers []string, updatedServersChan chan []string, connectedServers *map[string]bool) {
func serverConnectionLoop(h oldHub.Hub, wg *sync.WaitGroup, done chan struct{}, otherServers []string,
updatedServersChan chan []string, connectedServers *map[string]bool, log zerolog.Logger) {
// first connection to the servers
serversToConnect := otherServers
_ = connectToServers(h, wg, done, serversToConnect, connectedServers)
_ = connectToServers(h, wg, done, serversToConnect, connectedServers, log)

// define the delay between connection retries
delay := connectionRetryInitialDelay
Expand All @@ -201,7 +193,7 @@ func serverConnectionLoop(h oldHub.Hub, wg *sync.WaitGroup, done chan struct{},
case <-ticker.C:
// try to connect to servers
log.Info().Msgf("Trying to connect to servers: %v", serversToConnect)
err := connectToServers(h, wg, done, serversToConnect, connectedServers)
err := connectToServers(h, wg, done, serversToConnect, connectedServers, log)
if err != nil {
increaseDelay(&delay)
ticker.Reset(delay)
Expand All @@ -212,7 +204,7 @@ func serverConnectionLoop(h oldHub.Hub, wg *sync.WaitGroup, done chan struct{},
delay = connectionRetryInitialDelay
ticker.Reset(delay)
serversToConnect = newServersList
_ = connectToServers(h, wg, done, serversToConnect, connectedServers)
_ = connectToServers(h, wg, done, serversToConnect, connectedServers, log)
case <-done:
log.Info().Msg("Stopping the server connection loop")
wg.Done()
Expand All @@ -223,12 +215,13 @@ func serverConnectionLoop(h oldHub.Hub, wg *sync.WaitGroup, done chan struct{},

// connectToServers updates the connection status of the servers and tries to connect to the ones that are not connected
// it returns an error if at least one connection fails
func connectToServers(h oldHub.Hub, wg *sync.WaitGroup, done chan struct{}, servers []string, connectedServers *map[string]bool) error {
func connectToServers(h oldHub.Hub, wg *sync.WaitGroup, done chan struct{}, servers []string,
connectedServers *map[string]bool, log zerolog.Logger) error {
updateServersState(servers, connectedServers)
var returnErr error
for serverAddress, connected := range *connectedServers {
if !connected {
err := connectToSocket(serverAddress, h, wg, done)
err := connectToSocket(serverAddress, h, wg, done, log)
if err == nil {
(*connectedServers)[serverAddress] = true
} else {
Expand All @@ -242,11 +235,7 @@ func connectToServers(h oldHub.Hub, wg *sync.WaitGroup, done chan struct{}, serv

// connectToSocket establishes a connection to another server's server
// endpoint.
func connectToSocket(address string, h oldHub.Hub,
wg *sync.WaitGroup, done chan struct{}) error {

poplog := logger.Logger

func connectToSocket(address string, h oldHub.Hub, wg *sync.WaitGroup, done chan struct{}, log zerolog.Logger) error {
urlString := fmt.Sprintf("ws://%s/server", address)
u, err := url.Parse(urlString)
if err != nil {
Expand All @@ -258,10 +247,10 @@ func connectToSocket(address string, h oldHub.Hub,
return xerrors.Errorf("failed to dial to %s: %v", u.String(), err)
}

poplog.Info().Msgf("connected to server at %s", urlString)
log.Info().Msgf("connected to server at %s", urlString)

remoteSocket := socket.NewServerSocket(h.Receiver(),
h.OnSocketClose(), ws, wg, done, poplog)
h.OnSocketClose(), ws, wg, done, log)
wg.Add(2)

go remoteSocket.WritePump()
Expand All @@ -277,7 +266,7 @@ func connectToSocket(address string, h oldHub.Hub,
return nil
}

func ownerKey(pk string, point *kyber.Point) error {
func ownerKey(pk string, point *kyber.Point, log zerolog.Logger) error {
if pk != "" {
*point = crypto.Suite.Point()
// decode public key and unmarshal public key
Expand Down Expand Up @@ -357,7 +346,8 @@ func startWithFlags(cliCtx *cli.Context) (ServerConfig, error) {
// watchConfigFile watches the config file for changes, updates the other servers list in the config if necessary
// and sends the updated other servers list to the updatedServersChan so that the connection to servers loop can
// connect to them and update their connection status
func watchConfigFile(watcher *fsnotify.Watcher, configFilePath string, otherServersField *[]string, updatedServersChan chan []string) {
func watchConfigFile(watcher *fsnotify.Watcher, configFilePath string, otherServersField *[]string,
updatedServersChan chan []string, log zerolog.Logger) {
for event := range watcher.Events {
if event.Op&fsnotify.Write == fsnotify.Write {
updatedConfig, err := loadConfig(configFilePath)
Expand Down
11 changes: 6 additions & 5 deletions be1-go/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"encoding/json"
"github.com/fsnotify/fsnotify"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"io"
"os"
Expand All @@ -14,9 +16,6 @@ import (
"sync"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -52,7 +51,7 @@ func TestConnectToSocket(t *testing.T) {
wh.Start()

wg := &sync.WaitGroup{}
err = connectToSocket("localhost:9001", wh, wg, wDone)
err = connectToSocket("localhost:9001", wh, wg, wDone, log)
require.NoError(t, err)

err = remoteSrv.Shutdown()
Expand Down Expand Up @@ -100,6 +99,8 @@ func TestLoadConfigFileWithInvalidAuthPort(t *testing.T) {

// TestWatchConfigFile tests that a config file is watched correctly and the updated servers are received
func TestWatchConfigFile(t *testing.T) {
log := zerolog.New(io.Discard)

// Load the config from the file
serverConfig, err := loadConfig(validConfigWatcherPath)
require.NoError(t, err)
Expand All @@ -119,7 +120,7 @@ func TestWatchConfigFile(t *testing.T) {
updatedServersChan := make(chan []string)

// Start watching the config file
go watchConfigFile(watcher, validConfigWatcherPath, &serverConfig.OtherServers, updatedServersChan)
go watchConfigFile(watcher, validConfigWatcherPath, &serverConfig.OtherServers, updatedServersChan, log)
defer func(watcher *fsnotify.Watcher) {
require.NoError(t, watcher.Close())
}(watcher)
Expand Down
4 changes: 2 additions & 2 deletions be1-go/internal/database/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *SQLite) CheckRumor(senderID string, rumorID int) (bool, error) {
if rumorID == 0 {
err := s.database.QueryRow(selectAnyRumor, senderID).Scan(&id)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return false, err
return false, poperrors.NewDatabaseSelectErrorMsg(err.Error())
} else if errors.Is(err, sql.ErrNoRows) {
return true, nil
}
Expand All @@ -158,7 +158,7 @@ func (s *SQLite) CheckRumor(senderID string, rumorID int) (bool, error) {

err := s.database.QueryRow(selectLastRumor, senderID).Scan(&id)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return false, err
return false, poperrors.NewDatabaseSelectErrorMsg(err.Error())
} else if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
Expand Down
9 changes: 7 additions & 2 deletions be1-go/internal/database/sqlite/sqlite_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqlite
import (
"database/sql"
"encoding/base64"
"github.com/rs/zerolog"
"go.dedis.ch/kyber/v3"
poperrors "popstellar/internal/errors"
"popstellar/internal/handler/channel"
Expand All @@ -14,14 +15,15 @@ var dbLock sync.RWMutex
// SQLite is a wrapper around the SQLite database.
type SQLite struct {
database *sql.DB
log zerolog.Logger
}

//======================================================================================================================
// Database initialization
//======================================================================================================================

// NewSQLite returns a new SQLite instance.
func NewSQLite(path string, foreignKeyOn bool) (SQLite, error) {
func NewSQLite(path string, foreignKeyOn bool, log zerolog.Logger) (SQLite, error) {
dbLock.Lock()
defer dbLock.Unlock()

Expand Down Expand Up @@ -99,7 +101,10 @@ func NewSQLite(path string, foreignKeyOn bool) (SQLite, error) {
return SQLite{}, poperrors.NewDatabaseTransactionCommitErrorMsg("%v", err)
}

return SQLite{database: db}, nil
return SQLite{
database: db,
log: log.With().Str("module", "sqlite").Logger(),
}, nil
}

func initRumorTables(tx *sql.Tx) error {
Expand Down
4 changes: 3 additions & 1 deletion be1-go/internal/database/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"io"
"os"
"path/filepath"
"popstellar/internal/crypto"
Expand Down Expand Up @@ -82,7 +84,7 @@ func newFakeSQLite(t *testing.T) (SQLite, string, error) {
require.NoError(t, err)

fn := filepath.Join(dir, "test.DB")
lite, err := NewSQLite(fn, false)
lite, err := NewSQLite(fn, false, zerolog.New(io.Discard))
require.NoError(t, err)

return lite, dir, nil
Expand Down
32 changes: 30 additions & 2 deletions be1-go/internal/errors/errors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package errors

import (
"encoding/json"
"fmt"
"path/filepath"
"runtime"
)

Expand Down Expand Up @@ -55,7 +57,7 @@ func (p *PopError) Error() string {
func NewPopError(code int, format string, a ...interface{}) *PopError {

var pcs [depthStack]uintptr
n := runtime.Callers(2, pcs[:])
n := runtime.Callers(3, pcs[:])
stack := pcs[0:n]

return &PopError{
Expand All @@ -75,14 +77,40 @@ func (p *PopError) StackTraceString() string {

for {
frame, ok := frames.Next()
stackTrace += fmt.Sprintf("%s\n\t%s:%d\n", frame.Function, frame.File, frame.Line)
if !ok {
break
}
stackTrace += fmt.Sprintf("%s\n\t%s:%d\n", filepath.Base(frame.Function), filepath.Base(frame.File), frame.Line)
}

return stackTrace
}

type Frame struct {
Function string `json:"function"`
File string `json:"file"`
Line int `json:"line"`
}

func (p *PopError) GetStackTraceJSON() ([]byte, error) {
frames := runtime.CallersFrames(p.stackTrace)
var stackTraceArray []Frame

for {
frame, ok := frames.Next()
if !ok {
break
}
stackTraceArray = append(stackTraceArray, Frame{
Function: filepath.Base(frame.Function),
File: filepath.Base(frame.File),
Line: frame.Line,
})
}

return json.Marshal(stackTraceArray)
}

// NewInvalidActionError returns an error with the code -1 for an invalid action
func NewInvalidActionError(format string, a ...interface{}) error {
return NewPopError(InvalidActionErrorCode, InvalidActionErrorMsg+format, a...)
Expand Down
Loading

0 comments on commit e47fa9a

Please sign in to comment.