Skip to content

[sql-46] trigger kvdb to sql migration using sqldb/v2 #1114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f88ba56
firewalldb: ensure that test SQL store is closed
ViktorTigerstrom Jun 9, 2025
ddd984c
firewalldb: export FirewallDBs interface
ViktorTigerstrom May 27, 2025
8a152a4
firewalldb: update NewTestDB funcs to return FirewallDBs
ViktorTigerstrom May 19, 2025
ebf1e45
db: add List All Kv Records query
ViktorTigerstrom May 20, 2025
325f5a6
multi: rename sql kvstores session_id to group_id
ViktorTigerstrom Jun 30, 2025
0c53b7b
firewalldb: clarify bbolt kvstores illustration
ViktorTigerstrom May 6, 2025
c3d8ecf
firewalldb: add kvstores kvdb to SQL migration
ViktorTigerstrom May 6, 2025
3d3960d
mod: go get sqldb/v2
ViktorTigerstrom Jun 4, 2025
2820392
multi: use sqldb v2 in litd
ViktorTigerstrom Jun 10, 2025
2bb06d5
firewalldb: add `ListAllKVStoresRecords` to queries
ViktorTigerstrom Jul 10, 2025
b8c772b
firewalldb: rename sqlStore to store in mig test
ViktorTigerstrom Jul 11, 2025
efe82a6
firewalldb: use sqldb/v2 in the firewalldb mig test
ViktorTigerstrom Jul 11, 2025
f153c65
session: use sqldb/v2 in the session mig test
ViktorTigerstrom Jul 11, 2025
3031c83
sqlcmig6: add `sqlcmig6` package
ViktorTigerstrom Jul 10, 2025
8d558ce
accounts: add SQLMig6Queries to `accounts`
ViktorTigerstrom Jul 10, 2025
7c1a697
accounts: use `sqlcmig6` for kvdb to sql migration
ViktorTigerstrom Jul 10, 2025
721af2b
session: add SQLMig6Queries to `session`
ViktorTigerstrom Jul 10, 2025
ed9d317
session: use `sqlcmig6` for kvdb to sql migration
ViktorTigerstrom Jul 10, 2025
ba82ac0
firewalldb: add SQLMig6Queries to `firewalldb`
ViktorTigerstrom Jul 11, 2025
cd19ab9
firewalldb: use queries to assert migration results
ViktorTigerstrom Jul 11, 2025
b70d7bb
firewalldb: use `sqlcmig6` for kvdb to sql migration
ViktorTigerstrom Jul 11, 2025
71ddae3
migrationstreams: introduce `migrationstreams` pkg
ViktorTigerstrom Jul 11, 2025
1d9100e
multi: introduce dev migrations
ViktorTigerstrom Jul 11, 2025
e231ccf
accounts: export kvdb DB
ViktorTigerstrom Jun 9, 2025
78d4b40
multi: add kvdb to sql dev migration
ViktorTigerstrom Jul 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions accounts/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *InterceptorService) CreditAccount(ctx context.Context,
return nil, ErrAccountServiceDisabled
}

// Credit the account in the db.
// Credit the account in the DB.
err := s.store.CreditAccount(ctx, accountID, amount)
if err != nil {
return nil, fmt.Errorf("unable to credit account: %w", err)
Expand All @@ -386,7 +386,7 @@ func (s *InterceptorService) DebitAccount(ctx context.Context,
return nil, ErrAccountServiceDisabled
}

// Debit the account in the db.
// Debit the account in the DB.
err := s.store.DebitAccount(ctx, accountID, amount)
if err != nil {
return nil, fmt.Errorf("unable to debit account: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion accounts/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestAccountService(t *testing.T) {

// Ensure that the service was started successfully and
// still running though, despite the closing of the
// db store.
// DB store.
require.True(t, s.IsRunning())

// Now let's send the invoice update, which should fail.
Expand Down
87 changes: 76 additions & 11 deletions accounts/sql_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"time"

"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/lightning-terminal/db/sqlc"
"github.com/lightninglabs/lightning-terminal/db/sqlcmig6"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/pmezard/go-difflib/difflib"
)

Expand All @@ -27,7 +30,7 @@ var (
// the KV database to the SQL database. The migration is done in a single
// transaction to ensure that all accounts are migrated or none at all.
func MigrateAccountStoreToSQL(ctx context.Context, kvStore kvdb.Backend,
tx SQLQueries) error {
tx SQLMig6Queries) error {

log.Infof("Starting migration of the KV accounts store to SQL")

Expand All @@ -50,7 +53,7 @@ func MigrateAccountStoreToSQL(ctx context.Context, kvStore kvdb.Backend,
// to the SQL database. The migration is done in a single transaction to ensure
// that all accounts are migrated or none at all.
func migrateAccountsToSQL(ctx context.Context, kvStore kvdb.Backend,
tx SQLQueries) error {
tx SQLMig6Queries) error {

log.Infof("Starting migration of accounts from KV to SQL")

Expand All @@ -68,7 +71,7 @@ func migrateAccountsToSQL(ctx context.Context, kvStore kvdb.Backend,
kvAccount.ID, err)
}

migratedAccount, err := getAndMarshalAccount(
migratedAccount, err := getAndMarshalMig6Account(
ctx, tx, migratedAccountID,
)
if err != nil {
Expand Down Expand Up @@ -151,17 +154,79 @@ func getBBoltAccounts(db kvdb.Backend) ([]*OffChainBalanceAccount, error) {
return accounts, nil
}

// getAndMarshalAccount retrieves the account with the given ID. If the account
// cannot be found, then ErrAccNotFound is returned.
func getAndMarshalMig6Account(ctx context.Context, db SQLMig6Queries,
id int64) (*OffChainBalanceAccount, error) {

dbAcct, err := db.GetAccount(ctx, id)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrAccNotFound
} else if err != nil {
return nil, err
}

return marshalDBMig6Account(ctx, db, dbAcct)
}

func marshalDBMig6Account(ctx context.Context, db SQLMig6Queries,
dbAcct sqlcmig6.Account) (*OffChainBalanceAccount, error) {

alias, err := AccountIDFromInt64(dbAcct.Alias)
if err != nil {
return nil, err
}

account := &OffChainBalanceAccount{
ID: alias,
Type: AccountType(dbAcct.Type),
InitialBalance: lnwire.MilliSatoshi(dbAcct.InitialBalanceMsat),
CurrentBalance: dbAcct.CurrentBalanceMsat,
LastUpdate: dbAcct.LastUpdated.UTC(),
ExpirationDate: dbAcct.Expiration.UTC(),
Invoices: make(AccountInvoices),
Payments: make(AccountPayments),
Label: dbAcct.Label.String,
}

invoices, err := db.ListAccountInvoices(ctx, dbAcct.ID)
if err != nil {
return nil, err
}
for _, invoice := range invoices {
var hash lntypes.Hash
copy(hash[:], invoice.Hash)
account.Invoices[hash] = struct{}{}
}

payments, err := db.ListAccountPayments(ctx, dbAcct.ID)
if err != nil {
return nil, err
}

for _, payment := range payments {
var hash lntypes.Hash
copy(hash[:], payment.Hash)
account.Payments[hash] = &PaymentEntry{
Status: lnrpc.Payment_PaymentStatus(payment.Status),
FullAmount: lnwire.MilliSatoshi(payment.FullAmountMsat),
}
}

return account, nil
}

// migrateSingleAccountToSQL runs the migration for a single account from the
// KV database to the SQL database.
func migrateSingleAccountToSQL(ctx context.Context,
tx SQLQueries, account *OffChainBalanceAccount) (int64, error) {
tx SQLMig6Queries, account *OffChainBalanceAccount) (int64, error) {

accountAlias, err := account.ID.ToInt64()
if err != nil {
return 0, err
}

insertAccountParams := sqlc.InsertAccountParams{
insertAccountParams := sqlcmig6.InsertAccountParams{
Type: int16(account.Type),
InitialBalanceMsat: int64(account.InitialBalance),
CurrentBalanceMsat: account.CurrentBalance,
Expand All @@ -180,7 +245,7 @@ func migrateSingleAccountToSQL(ctx context.Context,
}

for hash := range account.Invoices {
addInvoiceParams := sqlc.AddAccountInvoiceParams{
addInvoiceParams := sqlcmig6.AddAccountInvoiceParams{
AccountID: sqlId,
Hash: hash[:],
}
Expand All @@ -192,7 +257,7 @@ func migrateSingleAccountToSQL(ctx context.Context,
}

for hash, paymentEntry := range account.Payments {
upsertPaymentParams := sqlc.UpsertAccountPaymentParams{
upsertPaymentParams := sqlcmig6.UpsertAccountPaymentParams{
AccountID: sqlId,
Hash: hash[:],
Status: int16(paymentEntry.Status),
Expand All @@ -211,7 +276,7 @@ func migrateSingleAccountToSQL(ctx context.Context,
// migrateAccountsIndicesToSQL runs the migration for the account indices from
// the KV database to the SQL database.
func migrateAccountsIndicesToSQL(ctx context.Context, kvStore kvdb.Backend,
tx SQLQueries) error {
tx SQLMig6Queries) error {

log.Infof("Starting migration of accounts indices from KV to SQL")

Expand All @@ -233,7 +298,7 @@ func migrateAccountsIndicesToSQL(ctx context.Context, kvStore kvdb.Backend,
settleIndexName, settleIndex)
}

setAddIndexParams := sqlc.SetAccountIndexParams{
setAddIndexParams := sqlcmig6.SetAccountIndexParams{
Name: addIndexName,
Value: int64(addIndex),
}
Expand All @@ -243,7 +308,7 @@ func migrateAccountsIndicesToSQL(ctx context.Context, kvStore kvdb.Backend,
return err
}

setSettleIndexParams := sqlc.SetAccountIndexParams{
setSettleIndexParams := sqlcmig6.SetAccountIndexParams{
Name: settleIndexName,
Value: int64(settleIndex),
}
Expand Down
25 changes: 10 additions & 15 deletions accounts/sql_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ package accounts

import (
"context"
"database/sql"
"fmt"
"testing"
"time"

"github.com/lightninglabs/lightning-terminal/db"
"github.com/lightninglabs/lightning-terminal/db/sqlcmig6"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/sqldb"
"github.com/lightningnetwork/lnd/sqldb/v2"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"pgregory.net/rapid"
Expand All @@ -36,7 +35,7 @@ func TestAccountStoreMigration(t *testing.T) {
}

makeSQLDB := func(t *testing.T) (*SQLStore,
*db.TransactionExecutor[SQLQueries]) {
*SQLMig6QueriesExecutor[SQLMig6Queries]) {

testDBStore := NewTestDB(t, clock)

Expand All @@ -45,13 +44,9 @@ func TestAccountStoreMigration(t *testing.T) {

baseDB := store.BaseDB

genericExecutor := db.NewTransactionExecutor(
baseDB, func(tx *sql.Tx) SQLQueries {
return baseDB.WithTx(tx)
},
)
queries := sqlcmig6.NewForType(baseDB, baseDB.BackendType)

return store, genericExecutor
return store, NewSQLMig6QueriesExecutor(baseDB, queries)
}

assertMigrationResults := func(t *testing.T, sqlStore *SQLStore,
Expand Down Expand Up @@ -306,7 +301,7 @@ func TestAccountStoreMigration(t *testing.T) {
)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, kvStore.db.Close())
require.NoError(t, kvStore.DB.Close())
})

// Populate the kv store.
Expand Down Expand Up @@ -339,11 +334,11 @@ func TestAccountStoreMigration(t *testing.T) {
// Perform the migration.
var opts sqldb.MigrationTxOptions
err = txEx.ExecTx(ctx, &opts,
func(tx SQLQueries) error {
func(tx SQLMig6Queries) error {
return MigrateAccountStoreToSQL(
ctx, kvStore.db, tx,
ctx, kvStore.DB, tx,
)
},
}, sqldb.NoOpReset,
)
require.NoError(t, err)

Expand Down Expand Up @@ -445,7 +440,7 @@ func rapidRandomizeAccounts(t *testing.T, kvStore *BoltStore) {
acct := makeAccountGen().Draw(t, "account")

// Then proceed to insert the account with its invoices and
// payments into the db
// payments into the DB
newAcct, err := kvStore.NewAccount(
ctx, acct.balance, acct.expiry, acct.label,
)
Expand Down
22 changes: 11 additions & 11 deletions accounts/store_kvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
// DBFilename is the filename within the data directory which contains
// the macaroon stores.
DBFilename = "accounts.db"
DBFilename = "accounts.DB"

// dbPathPermission is the default permission the account database
// directory is created with (if it does not exist).
Expand Down Expand Up @@ -60,7 +60,7 @@ var (

// BoltStore wraps the bolt DB that stores all accounts and their balances.
type BoltStore struct {
db kvdb.Backend
DB kvdb.Backend
clock clock.Clock
}

Expand Down Expand Up @@ -101,7 +101,7 @@ func NewBoltStore(dir, fileName string, clock clock.Clock) (*BoltStore, error) {

// Return the DB wrapped in a BoltStore object.
return &BoltStore{
db: db,
DB: db,
clock: clock,
}, nil
}
Expand All @@ -110,7 +110,7 @@ func NewBoltStore(dir, fileName string, clock clock.Clock) (*BoltStore, error) {
//
// NOTE: This is part of the Store interface.
func (s *BoltStore) Close() error {
return s.db.Close()
return s.DB.Close()
}

// NewAccount creates a new OffChainBalanceAccount with the given balance and a
Expand Down Expand Up @@ -162,7 +162,7 @@ func (s *BoltStore) NewAccount(ctx context.Context, balance lnwire.MilliSatoshi,

// Try storing the account in the account database, so we can keep track
// of its balance.
err := s.db.Update(func(tx walletdb.ReadWriteTx) error {
err := s.DB.Update(func(tx walletdb.ReadWriteTx) error {
bucket := tx.ReadWriteBucket(accountBucketName)
if bucket == nil {
return ErrAccountBucketNotFound
Expand Down Expand Up @@ -364,7 +364,7 @@ func (s *BoltStore) DeleteAccountPayment(_ context.Context, id AccountID,
func (s *BoltStore) updateAccount(id AccountID,
updateFn func(*OffChainBalanceAccount) error) error {

return s.db.Update(func(tx kvdb.RwTx) error {
return s.DB.Update(func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(accountBucketName)
if bucket == nil {
return ErrAccountBucketNotFound
Expand Down Expand Up @@ -451,7 +451,7 @@ func (s *BoltStore) Account(_ context.Context, id AccountID) (
// Try looking up and reading the account by its ID from the local
// bolt DB.
var accountBinary []byte
err := s.db.View(func(tx kvdb.RTx) error {
err := s.DB.View(func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(accountBucketName)
if bucket == nil {
return ErrAccountBucketNotFound
Expand Down Expand Up @@ -487,7 +487,7 @@ func (s *BoltStore) Accounts(_ context.Context) ([]*OffChainBalanceAccount,
error) {

var accounts []*OffChainBalanceAccount
err := s.db.View(func(tx kvdb.RTx) error {
err := s.DB.View(func(tx kvdb.RTx) error {
// This function will be called in the ForEach and receive
// the key and value of each account in the DB. The key, which
// is also the ID is not used because it is also marshaled into
Expand Down Expand Up @@ -531,7 +531,7 @@ func (s *BoltStore) Accounts(_ context.Context) ([]*OffChainBalanceAccount,
//
// NOTE: This is part of the Store interface.
func (s *BoltStore) RemoveAccount(_ context.Context, id AccountID) error {
return s.db.Update(func(tx kvdb.RwTx) error {
return s.DB.Update(func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(accountBucketName)
if bucket == nil {
return ErrAccountBucketNotFound
Expand All @@ -554,7 +554,7 @@ func (s *BoltStore) LastIndexes(_ context.Context) (uint64, uint64, error) {
var (
addValue, settleValue []byte
)
err := s.db.View(func(tx kvdb.RTx) error {
err := s.DB.View(func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(accountBucketName)
if bucket == nil {
return ErrAccountBucketNotFound
Expand Down Expand Up @@ -592,7 +592,7 @@ func (s *BoltStore) StoreLastIndexes(_ context.Context, addIndex,
byteOrder.PutUint64(addValue, addIndex)
byteOrder.PutUint64(settleValue, settleIndex)

return s.db.Update(func(tx kvdb.RwTx) error {
return s.DB.Update(func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(accountBucketName)
if bucket == nil {
return ErrAccountBucketNotFound
Expand Down
Loading