Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/eth/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (eth *EthService) startEthDataManager(ctx context.Context) error {
defer proposalOutcomeSub.Unsubscribe()

// interval to clean refresh all indexed data
ticker := time.NewTicker(1 * time.Hour)
ticker := time.NewTicker(6 * time.Hour)

for {
select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/integration_tests/00_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestBlockCreation(t *testing.T) {
_, err := sdk.Core.Ping(ctx, connect.NewRequest(&corev1.PingRequest{}))
assert.NoError(t, err)

err = utils.WaitForDevnetHealthy(30 * time.Second)
err = utils.WaitForDevnetHealthy()
assert.NoError(t, err)

var blockOne *corev1.Block
Expand Down
2 changes: 1 addition & 1 deletion pkg/integration_tests/00_manage_entity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestEntityManager(t *testing.T) {
ctx := context.Background()
sdk := utils.DiscoveryOne

err := utils.WaitForDevnetHealthy(60 * time.Second)
err := utils.WaitForDevnetHealthy()
assert.NoError(t, err)

// Generate a test private key
Expand Down
2 changes: 1 addition & 1 deletion pkg/integration_tests/00_slash_proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestCanRetrieveSlashProposal(t *testing.T) {
wsRpcUrl = "ws" + strings.TrimPrefix(wsRpcUrl, "http")
}

err := utils.WaitForDevnetHealthy(30 * time.Second)
err := utils.WaitForDevnetHealthy()
require.NoError(t, err)

ethrpc, err := ethclient.Dial(wsRpcUrl)
Expand Down
2 changes: 1 addition & 1 deletion pkg/integration_tests/10_upload_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func TestUploadStream(t *testing.T) {
ctx := context.Background()

require.NoError(t, utils.WaitForDevnetHealthy(30*time.Second))
require.NoError(t, utils.WaitForDevnetHealthy())

serverAddr := "node3.oap.devnet"
privKeyPath := "./assets/demo_key.txt"
Expand Down
3 changes: 1 addition & 2 deletions pkg/integration_tests/12_rewards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package integration_tests
import (
"context"
"testing"
"time"

v1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1"
"github.com/OpenAudio/go-openaudio/pkg/common"
Expand All @@ -17,7 +16,7 @@ func TestRewardsLifecycle(t *testing.T) {
nodeUrl := utils.DiscoveryOneRPC

// Wait for devnet to be ready
if err := utils.WaitForDevnetHealthy(30 * time.Second); err != nil {
if err := utils.WaitForDevnetHealthy(); err != nil {
t.Fatalf("Devnet not ready: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/integration_tests/90_deregistration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDeregisterNode(t *testing.T) {
wsRpcUrl = "ws" + strings.TrimPrefix(wsRpcUrl, "http")
}

err := utils.WaitForDevnetHealthy(30 * time.Second)
err := utils.WaitForDevnetHealthy()
require.NoError(t, err)

ethrpc, err := ethclient.Dial(wsRpcUrl)
Expand Down
61 changes: 58 additions & 3 deletions pkg/integration_tests/utils/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"net/http"
"os"
Expand Down Expand Up @@ -69,8 +70,12 @@ func EnsureProtocol(endpoint string) string {
return endpoint
}

func WaitForDevnetHealthy(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
func WaitForDevnetHealthy(timeout ...time.Duration) error {
timeoutDuration := 60 * time.Second
if len(timeout) > 0 {
timeoutDuration = timeout[0]
}
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()

nodes := []*sdk.OpenAudioSDK{
Expand All @@ -80,14 +85,23 @@ func WaitForDevnetHealthy(timeout time.Duration) error {
ContentThree,
}

nodeAddresses := []string{
ContentOneRPC,
ContentTwoRPC,
ContentThreeRPC,
}

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

client := NewTestHTTPClient()

for {
select {
case <-ctx.Done():
return errors.New("timed out waiting for devnet to be ready")
case <-ticker.C:
// Check core services are ready
allReady := true
for _, n := range nodes {
status, err := n.Core.GetStatus(context.Background(), connect.NewRequest(&corev1.GetStatusRequest{}))
Expand All @@ -99,7 +113,48 @@ func WaitForDevnetHealthy(timeout time.Duration) error {
break
}
}
if allReady {
if !allReady {
continue
}

// Check mediorum services have wallets registered
allMediorumReady := true
var healthResponse struct {
Storage struct {
WalletIsRegistered bool `json:"wallet_is_registered"`
} `json:"storage"`
}

for _, addr := range nodeAddresses {
// Ensure https:// protocol
baseURL := addr
if !strings.HasPrefix(baseURL, "https://") && !strings.HasPrefix(baseURL, "http://") {
baseURL = "https://" + baseURL
} else if strings.HasPrefix(baseURL, "http://") {
baseURL = strings.Replace(baseURL, "http://", "https://", 1)
}

req, err := http.NewRequestWithContext(ctx, "GET", baseURL+"/health-check", nil)
if err != nil {
allMediorumReady = false
break
}

resp, err := client.Do(req)
if err != nil {
allMediorumReady = false
break
}

if resp.StatusCode != 200 || json.NewDecoder(resp.Body).Decode(&healthResponse) != nil || !healthResponse.Storage.WalletIsRegistered {
resp.Body.Close()
allMediorumReady = false
break
}
resp.Body.Close()
}

if allReady && allMediorumReady {
return nil
}
}
Expand Down
26 changes: 24 additions & 2 deletions pkg/mediorum/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func (ss *MediorumServer) startPprofServer(ctx context.Context) error {
}

func (ss *MediorumServer) refreshPeersAndSigners(ctx context.Context) error {
interval := 30 * time.Minute
interval := 10 * time.Minute
if os.Getenv("OPENAUDIO_ENV") == "dev" {
interval = 10 * time.Second
}
Expand Down Expand Up @@ -715,7 +715,29 @@ func (ss *MediorumServer) refreshPeersAndSigners(ctx context.Context) error {
ss.Config.Peers = peers
ss.Config.Signers = signers

ss.logger.Info("updated peers and signers dynamically", zap.Int("peers", len(peers)), zap.Int("signers", len(signers)))
// Update WalletIsRegistered based on updated peers list
ss.Config.WalletIsRegistered = false
for _, peer := range peers {
if strings.EqualFold(ss.Config.Self.Wallet, peer.Wallet) && strings.EqualFold(ss.Config.Self.Host, peer.Host) {
ss.Config.WalletIsRegistered = true
break
}
}

// Log detailed info if not registered to help diagnose issues
if !ss.Config.WalletIsRegistered {
peerHosts := make([]string, len(peers))
for i, p := range peers {
peerHosts[i] = p.Host
}
ss.logger.Warn("node not found in registered peers list",
zap.String("self_host", ss.Config.Self.Host),
zap.String("self_wallet", ss.Config.Self.Wallet),
zap.Int("total_peers", len(peers)),
zap.Strings("peer_hosts", peerHosts))
}

ss.logger.Info("updated peers and signers dynamically", zap.Int("peers", len(peers)), zap.Int("signers", len(signers)), zap.Bool("wallet_is_registered", ss.Config.WalletIsRegistered))
case <-ctx.Done():
return ctx.Err()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/uptime/uptime.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func toPeers(endpoints []*ethv1.ServiceEndpoint) []registrar.Peer {
}

func (u *Uptime) startPeerRefresher() {
interval := 30 * time.Minute
interval := 10 * time.Minute
if os.Getenv("OPENAUDIO_ENV") == "dev" {
interval = 10 * time.Second
}
Expand Down
Loading