Skip to content

Commit

Permalink
Merge pull request #362 from halfcrazy/translog
Browse files Browse the repository at this point in the history
implement transaction logger by inject from ctx
  • Loading branch information
dave-tucker committed Sep 12, 2023
2 parents 09a6f46 + 7e7e9af commit 239822f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 2 deletions.
14 changes: 12 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,14 +773,23 @@ func (o *ovsdbClient) listDbs(ctx context.Context) ([]string, error) {
return dbs, err
}

// logFromContext returns a Logger from ctx or return the default logger
func (o *ovsdbClient) logFromContext(ctx context.Context) *logr.Logger {
if logger, err := logr.FromContext(ctx); err == nil {
return &logger
}
return o.logger
}

// Transact performs the provided Operations on the database
// RFC 7047 : transact
func (o *ovsdbClient) Transact(ctx context.Context, operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
logger := o.logFromContext(ctx)
o.rpcMutex.RLock()
if o.rpcClient == nil || !o.connected {
o.rpcMutex.RUnlock()
if o.options.reconnect {
o.logger.V(5).Info("blocking transaction until reconnected", "operations",
logger.V(5).Info("blocking transaction until reconnected", "operations",
fmt.Sprintf("%+v", operation))
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
Expand All @@ -806,6 +815,7 @@ func (o *ovsdbClient) Transact(ctx context.Context, operation ...ovsdb.Operation
}

func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite bool, operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
logger := o.logFromContext(ctx)
var reply []ovsdb.OperationResult
db := o.databases[dbName]
db.modelMutex.RLock()
Expand All @@ -822,7 +832,7 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite b
if o.rpcClient == nil {
return nil, ErrNotConnected
}
dbgLogger := o.logger.WithValues("database", dbName).V(4)
dbgLogger := logger.WithValues("database", dbName).V(4)
if dbgLogger.Enabled() {
dbgLogger.Info("transacting operations", "operations", fmt.Sprintf("%+v", operation))
}
Expand Down
84 changes: 84 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package client

import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"os"
"reflect"
Expand All @@ -14,6 +16,8 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/rpc2"
"github.com/go-logr/logr"
"github.com/go-logr/stdr"
"github.com/google/uuid"
"github.com/ovn-org/libovsdb/cache"
db "github.com/ovn-org/libovsdb/database"
Expand All @@ -22,6 +26,7 @@ import (
"github.com/ovn-org/libovsdb/ovsdb"
"github.com/ovn-org/libovsdb/ovsdb/serverdb"
"github.com/ovn-org/libovsdb/server"
"github.com/ovn-org/libovsdb/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -800,6 +805,85 @@ func TestOperationWhenNeverConnected(t *testing.T) {
}
}

func TestTransactionLogger(t *testing.T) {
stdr.SetVerbosity(5)

var defSchema ovsdb.DatabaseSchema
err := json.Unmarshal([]byte(schema), &defSchema)
require.NoError(t, err)
_, sock := newOVSDBServer(t, defDB, defSchema)
// Create client for this server's Server database
endpoint := fmt.Sprintf("unix:%s", sock)

var defaultBuf bytes.Buffer
defaultL := stdr.New(log.New(&defaultBuf, "", log.LstdFlags)).WithName("default")

// Create client to test transaction logger
ovs, err := newOVSDBClient(defDB,
WithEndpoint(endpoint),
WithLogger(&defaultL))
require.NoError(t, err)

err = ovs.Connect(context.Background())
require.NoError(t, err)
t.Cleanup(ovs.Close)

var s ovsdb.DatabaseSchema
err = json.Unmarshal([]byte(schema), &s)
require.NoError(t, err)

dbModel, err := test.GetModel()
require.NoError(t, err)
m := mapper.NewMapper(dbModel.Schema)

bridge1 := test.BridgeType{
Name: "foo",
ExternalIds: map[string]string{
"foo": "bar",
"baz": "quux",
"waldo": "fred",
},
}
bridgeInfo1, err := dbModel.NewModelInfo(&bridge1)
require.NoError(t, err)
bridgeRow1, err := m.NewRow(bridgeInfo1)
require.Nil(t, err)
bridgeUUID1 := uuid.NewString()
operation1 := ovsdb.Operation{
Op: ovsdb.OperationInsert,
Table: "Bridge",
UUID: bridgeUUID1,
Row: bridgeRow1,
}
_, _ = ovs.Transact(context.TODO(), operation1)
assert.Contains(t, defaultBuf.String(), "default")

bridge2 := test.BridgeType{
Name: "bar",
ExternalIds: map[string]string{
"foo": "bar",
"baz": "quux",
"waldo": "fred",
},
}
bridgeInfo2, err := dbModel.NewModelInfo(&bridge2)
require.NoError(t, err)
bridgeRow2, err := m.NewRow(bridgeInfo2)
require.Nil(t, err)
bridgeUUID2 := uuid.NewString()
operation2 := ovsdb.Operation{
Op: ovsdb.OperationInsert,
Table: "Bridge",
UUID: bridgeUUID2,
Row: bridgeRow2,
}
var customBuf bytes.Buffer
customL := stdr.New(log.New(&customBuf, "", log.LstdFlags)).WithName("custom")
ctx := logr.NewContext(context.TODO(), customL)
_, _ = ovs.Transact(ctx, operation2)
assert.Contains(t, customBuf.String(), "custom")
}

func TestOperationWhenNotConnected(t *testing.T) {
ovs, err := newOVSDBClient(defDB)
require.NoError(t, err)
Expand Down

0 comments on commit 239822f

Please sign in to comment.