Skip to content

[Storage] Optimize memory cache key creation and key format for some stores #7391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ type LightTransactionResults struct {
// before any future reads. However, we're keeping it temporarily during active development
// for safety and debugging purposes. It will be removed once the implementation is finalized.
lock sync.RWMutex
store map[string]*flow.LightTransactionResult
indexStore map[string]*flow.LightTransactionResult
blockStore map[string][]flow.LightTransactionResult
store map[store.TwoIdentifier]*flow.LightTransactionResult // Key: blockID + txID
indexStore map[store.IdentifierAndUint32]*flow.LightTransactionResult // Key: blockID + txIndex
blockStore map[flow.Identifier][]flow.LightTransactionResult // key: blockID
}

var _ storage.LightTransactionResults = (*LightTransactionResults)(nil)

func NewLightTransactionResults() *LightTransactionResults {
return &LightTransactionResults{
store: make(map[string]*flow.LightTransactionResult),
indexStore: make(map[string]*flow.LightTransactionResult),
blockStore: make(map[string][]flow.LightTransactionResult),
store: make(map[store.TwoIdentifier]*flow.LightTransactionResult),
indexStore: make(map[store.IdentifierAndUint32]*flow.LightTransactionResult),
blockStore: make(map[flow.Identifier][]flow.LightTransactionResult),
}
}

Expand Down Expand Up @@ -68,11 +68,10 @@ func (l *LightTransactionResults) ByBlockIDTransactionIndex(blockID flow.Identif
// Expected errors during normal operation:
// - `storage.ErrNotFound` if light transaction results at given blockID weren't found.
func (l *LightTransactionResults) ByBlockID(id flow.Identifier) ([]flow.LightTransactionResult, error) {
key := store.KeyFromBlockID(id)
l.lock.RLock()
defer l.lock.RUnlock()

val, ok := l.blockStore[key]
val, ok := l.blockStore[id]
if !ok {
return nil, storage.ErrNotFound
}
Expand All @@ -83,11 +82,10 @@ func (l *LightTransactionResults) ByBlockID(id flow.Identifier) ([]flow.LightTra
// Store inserts a transaction results into a storage
// No errors are expected during normal operation.
func (l *LightTransactionResults) Store(blockID flow.Identifier, transactionResults []flow.LightTransactionResult) error {
key := store.KeyFromBlockID(blockID)
l.lock.Lock()
defer l.lock.Unlock()

l.blockStore[key] = transactionResults
l.blockStore[blockID] = transactionResults
for i, txResult := range transactionResults {
txIDKey := store.KeyFromBlockIDTransactionID(blockID, txResult.TransactionID)
txIndexKey := store.KeyFromBlockIDIndex(blockID, uint32(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ type TransactionResultErrorMessages struct {
// before any future reads. However, we're keeping it temporarily during active development
// for safety and debugging purposes. It will be removed once the implementation is finalized.
lock sync.RWMutex
store map[string]*flow.TransactionResultErrorMessage
indexStore map[string]*flow.TransactionResultErrorMessage
blockStore map[string][]flow.TransactionResultErrorMessage
store map[store.TwoIdentifier]*flow.TransactionResultErrorMessage // Key: blockID + txID
indexStore map[store.IdentifierAndUint32]*flow.TransactionResultErrorMessage // Key: blockID + txIndex
blockStore map[flow.Identifier][]flow.TransactionResultErrorMessage // Key: blockID
}

var _ storage.TransactionResultErrorMessages = (*TransactionResultErrorMessages)(nil)

func NewTransactionResultErrorMessages() *TransactionResultErrorMessages {
return &TransactionResultErrorMessages{
store: make(map[string]*flow.TransactionResultErrorMessage),
indexStore: make(map[string]*flow.TransactionResultErrorMessage),
blockStore: make(map[string][]flow.TransactionResultErrorMessage),
store: make(map[store.TwoIdentifier]*flow.TransactionResultErrorMessage),
indexStore: make(map[store.IdentifierAndUint32]*flow.TransactionResultErrorMessage),
blockStore: make(map[flow.Identifier][]flow.TransactionResultErrorMessage),
}
}

Expand Down Expand Up @@ -91,11 +91,10 @@ func (t *TransactionResultErrorMessages) ByBlockIDTransactionIndex(
// Expected errors during normal operation:
// - `storage.ErrNotFound` if no block was found.
func (t *TransactionResultErrorMessages) ByBlockID(id flow.Identifier) ([]flow.TransactionResultErrorMessage, error) {
key := store.KeyFromBlockID(id)
t.lock.RLock()
defer t.lock.RUnlock()

val, ok := t.blockStore[key]
val, ok := t.blockStore[id]
if !ok {
return nil, storage.ErrNotFound
}
Expand All @@ -110,11 +109,10 @@ func (t *TransactionResultErrorMessages) Store(
blockID flow.Identifier,
transactionResultErrorMessages []flow.TransactionResultErrorMessage,
) error {
key := store.KeyFromBlockID(blockID)
t.lock.Lock()
defer t.lock.Unlock()

t.blockStore[key] = transactionResultErrorMessages
t.blockStore[blockID] = transactionResultErrorMessages
for i, txResult := range transactionResultErrorMessages {
txIDKey := store.KeyFromBlockIDTransactionID(blockID, txResult.TransactionID)
txIndexKey := store.KeyFromBlockIDIndex(blockID, uint32(i))
Expand Down
59 changes: 24 additions & 35 deletions storage/store/light_transaction_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,67 +14,58 @@ var _ storage.LightTransactionResults = (*LightTransactionResults)(nil)

type LightTransactionResults struct {
db storage.DB
cache *Cache[string, flow.LightTransactionResult]
indexCache *Cache[string, flow.LightTransactionResult]
blockCache *Cache[string, []flow.LightTransactionResult]
cache *Cache[TwoIdentifier, flow.LightTransactionResult] // Key: blockID + txID
indexCache *Cache[IdentifierAndUint32, flow.LightTransactionResult] // Key: blockID + txIndex
blockCache *Cache[flow.Identifier, []flow.LightTransactionResult] // Key: blockID
}

func NewLightTransactionResults(collector module.CacheMetrics, db storage.DB, transactionResultsCacheSize uint) *LightTransactionResults {
retrieve := func(r storage.Reader, key string) (flow.LightTransactionResult, error) {
var txResult flow.LightTransactionResult
blockID, txID, err := KeyToBlockIDTransactionID(key)
if err != nil {
return flow.LightTransactionResult{}, fmt.Errorf("could not convert key: %w", err)
}
retrieve := func(r storage.Reader, key TwoIdentifier) (flow.LightTransactionResult, error) {
blockID, txID := KeyToBlockIDTransactionID(key)

err = operation.RetrieveLightTransactionResult(r, blockID, txID, &txResult)
var txResult flow.LightTransactionResult
err := operation.RetrieveLightTransactionResult(r, blockID, txID, &txResult)
if err != nil {
return flow.LightTransactionResult{}, err
}
return txResult, nil
}
retrieveIndex := func(r storage.Reader, key string) (flow.LightTransactionResult, error) {
var txResult flow.LightTransactionResult
blockID, txIndex, err := KeyToBlockIDIndex(key)
if err != nil {
return flow.LightTransactionResult{}, fmt.Errorf("could not convert index key: %w", err)
}

err = operation.RetrieveLightTransactionResultByIndex(r, blockID, txIndex, &txResult)
retrieveIndex := func(r storage.Reader, key IdentifierAndUint32) (flow.LightTransactionResult, error) {
blockID, txIndex := KeyToBlockIDIndex(key)

var txResult flow.LightTransactionResult
err := operation.RetrieveLightTransactionResultByIndex(r, blockID, txIndex, &txResult)
if err != nil {
return flow.LightTransactionResult{}, err
}
return txResult, nil
}
retrieveForBlock := func(r storage.Reader, key string) ([]flow.LightTransactionResult, error) {
var txResults []flow.LightTransactionResult

blockID, err := KeyToBlockID(key)
if err != nil {
return nil, fmt.Errorf("could not convert index key: %w", err)
}

err = operation.LookupLightTransactionResultsByBlockIDUsingIndex(r, blockID, &txResults)
retrieveForBlock := func(r storage.Reader, blockID flow.Identifier) ([]flow.LightTransactionResult, error) {
var txResults []flow.LightTransactionResult
err := operation.LookupLightTransactionResultsByBlockIDUsingIndex(r, blockID, &txResults)
if err != nil {
return nil, err
}
return txResults, nil
}

return &LightTransactionResults{
db: db,
cache: newCache(collector, metrics.ResourceTransactionResults,
withLimit[string, flow.LightTransactionResult](transactionResultsCacheSize),
withStore(noopStore[string, flow.LightTransactionResult]),
withLimit[TwoIdentifier, flow.LightTransactionResult](transactionResultsCacheSize),
withStore(noopStore[TwoIdentifier, flow.LightTransactionResult]),
withRetrieve(retrieve),
),
indexCache: newCache(collector, metrics.ResourceTransactionResultIndices,
withLimit[string, flow.LightTransactionResult](transactionResultsCacheSize),
withStore(noopStore[string, flow.LightTransactionResult]),
withLimit[IdentifierAndUint32, flow.LightTransactionResult](transactionResultsCacheSize),
withStore(noopStore[IdentifierAndUint32, flow.LightTransactionResult]),
withRetrieve(retrieveIndex),
),
blockCache: newCache(collector, metrics.ResourceTransactionResultIndices,
withLimit[string, []flow.LightTransactionResult](transactionResultsCacheSize),
withStore(noopStore[string, []flow.LightTransactionResult]),
withLimit[flow.Identifier, []flow.LightTransactionResult](transactionResultsCacheSize),
withStore(noopStore[flow.Identifier, []flow.LightTransactionResult]),
withRetrieve(retrieveForBlock),
),
}
Expand Down Expand Up @@ -107,8 +98,7 @@ func (tr *LightTransactionResults) BatchStore(blockID flow.Identifier, transacti
tr.indexCache.Insert(keyIndex, result)
}

key := KeyFromBlockID(blockID)
tr.blockCache.Insert(key, transactionResults)
tr.blockCache.Insert(blockID, transactionResults)
})
return nil
}
Expand Down Expand Up @@ -139,8 +129,7 @@ func (tr *LightTransactionResults) ByBlockIDTransactionIndex(blockID flow.Identi

// ByBlockID gets all transaction results for a block, ordered by transaction index
func (tr *LightTransactionResults) ByBlockID(blockID flow.Identifier) ([]flow.LightTransactionResult, error) {
key := KeyFromBlockID(blockID)
transactionResults, err := tr.blockCache.Get(tr.db.Reader(), key)
transactionResults, err := tr.blockCache.Get(tr.db.Reader(), blockID)
if err != nil {
return nil, err
}
Expand Down
66 changes: 27 additions & 39 deletions storage/store/transaction_result_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,37 @@ var _ storage.TransactionResultErrorMessages = (*TransactionResultErrorMessages)

type TransactionResultErrorMessages struct {
db storage.DB
cache *Cache[string, flow.TransactionResultErrorMessage]
indexCache *Cache[string, flow.TransactionResultErrorMessage]
blockCache *Cache[string, []flow.TransactionResultErrorMessage]
cache *Cache[TwoIdentifier, flow.TransactionResultErrorMessage] // Key: blockID + txID
indexCache *Cache[IdentifierAndUint32, flow.TransactionResultErrorMessage] // Key: blockID + txIndex
blockCache *Cache[flow.Identifier, []flow.TransactionResultErrorMessage] // Key: blockID
}

func NewTransactionResultErrorMessages(collector module.CacheMetrics, db storage.DB, transactionResultsCacheSize uint) *TransactionResultErrorMessages {
retrieve := func(r storage.Reader, key string) (flow.TransactionResultErrorMessage, error) {
var txResultErrMsg flow.TransactionResultErrorMessage
blockID, txID, err := KeyToBlockIDTransactionID(key)
if err != nil {
return flow.TransactionResultErrorMessage{}, fmt.Errorf("could not convert key: %w", err)
}
retrieve := func(r storage.Reader, key TwoIdentifier) (flow.TransactionResultErrorMessage, error) {
blockID, txID := KeyToBlockIDTransactionID(key)

err = operation.RetrieveTransactionResultErrorMessage(r, blockID, txID, &txResultErrMsg)
var txResultErrMsg flow.TransactionResultErrorMessage
err := operation.RetrieveTransactionResultErrorMessage(r, blockID, txID, &txResultErrMsg)
if err != nil {
return flow.TransactionResultErrorMessage{}, err
}
return txResultErrMsg, nil
}
retrieveIndex := func(r storage.Reader, key string) (flow.TransactionResultErrorMessage, error) {
var txResultErrMsg flow.TransactionResultErrorMessage
blockID, txIndex, err := KeyToBlockIDIndex(key)
if err != nil {
return flow.TransactionResultErrorMessage{}, fmt.Errorf("could not convert index key: %w", err)
}

err = operation.RetrieveTransactionResultErrorMessageByIndex(r, blockID, txIndex, &txResultErrMsg)
retrieveIndex := func(r storage.Reader, key IdentifierAndUint32) (flow.TransactionResultErrorMessage, error) {
blockID, txIndex := KeyToBlockIDIndex(key)

var txResultErrMsg flow.TransactionResultErrorMessage
err := operation.RetrieveTransactionResultErrorMessageByIndex(r, blockID, txIndex, &txResultErrMsg)
if err != nil {
return flow.TransactionResultErrorMessage{}, err
}
return txResultErrMsg, nil
}
retrieveForBlock := func(r storage.Reader, key string) ([]flow.TransactionResultErrorMessage, error) {
var txResultErrMsg []flow.TransactionResultErrorMessage
blockID, err := KeyToBlockID(key)
if err != nil {
return nil, fmt.Errorf("could not convert index key: %w", err)
}

err = operation.LookupTransactionResultErrorMessagesByBlockIDUsingIndex(r, blockID, &txResultErrMsg)
retrieveForBlock := func(r storage.Reader, blockID flow.Identifier) ([]flow.TransactionResultErrorMessage, error) {
var txResultErrMsg []flow.TransactionResultErrorMessage
err := operation.LookupTransactionResultErrorMessagesByBlockIDUsingIndex(r, blockID, &txResultErrMsg)
if err != nil {
return nil, err
}
Expand All @@ -62,19 +53,19 @@ func NewTransactionResultErrorMessages(collector module.CacheMetrics, db storage

return &TransactionResultErrorMessages{
db: db,
cache: newCache[string, flow.TransactionResultErrorMessage](collector, metrics.ResourceTransactionResultErrorMessages,
withLimit[string, flow.TransactionResultErrorMessage](transactionResultsCacheSize),
withStore(noopStore[string, flow.TransactionResultErrorMessage]),
cache: newCache(collector, metrics.ResourceTransactionResultErrorMessages,
withLimit[TwoIdentifier, flow.TransactionResultErrorMessage](transactionResultsCacheSize),
withStore(noopStore[TwoIdentifier, flow.TransactionResultErrorMessage]),
withRetrieve(retrieve),
),
indexCache: newCache[string, flow.TransactionResultErrorMessage](collector, metrics.ResourceTransactionResultErrorMessagesIndices,
withLimit[string, flow.TransactionResultErrorMessage](transactionResultsCacheSize),
withStore(noopStore[string, flow.TransactionResultErrorMessage]),
indexCache: newCache(collector, metrics.ResourceTransactionResultErrorMessagesIndices,
withLimit[IdentifierAndUint32, flow.TransactionResultErrorMessage](transactionResultsCacheSize),
withStore(noopStore[IdentifierAndUint32, flow.TransactionResultErrorMessage]),
withRetrieve(retrieveIndex),
),
blockCache: newCache[string, []flow.TransactionResultErrorMessage](collector, metrics.ResourceTransactionResultErrorMessagesIndices,
withLimit[string, []flow.TransactionResultErrorMessage](transactionResultsCacheSize),
withStore(noopStore[string, []flow.TransactionResultErrorMessage]),
blockCache: newCache(collector, metrics.ResourceTransactionResultErrorMessagesIndices,
withLimit[flow.Identifier, []flow.TransactionResultErrorMessage](transactionResultsCacheSize),
withStore(noopStore[flow.Identifier, []flow.TransactionResultErrorMessage]),
withRetrieve(retrieveForBlock),
),
}
Expand All @@ -94,8 +85,7 @@ func (t *TransactionResultErrorMessages) Store(blockID flow.Identifier, transact
// No errors are expected during normal operation.
func (t *TransactionResultErrorMessages) Exists(blockID flow.Identifier) (bool, error) {
// if the block is in the cache, return true
key := KeyFromBlockID(blockID)
if ok := t.blockCache.IsCached(key); ok {
if ok := t.blockCache.IsCached(blockID); ok {
return ok, nil
}

Expand Down Expand Up @@ -139,8 +129,7 @@ func (t *TransactionResultErrorMessages) batchStore(
t.indexCache.Insert(keyIndex, result)
}

key := KeyFromBlockID(blockID)
t.blockCache.Insert(key, transactionResultErrorMessages)
t.blockCache.Insert(blockID, transactionResultErrorMessages)
})
return nil
}
Expand Down Expand Up @@ -176,8 +165,7 @@ func (t *TransactionResultErrorMessages) ByBlockIDTransactionIndex(blockID flow.
//
// No errors are expected during normal operation.
func (t *TransactionResultErrorMessages) ByBlockID(blockID flow.Identifier) ([]flow.TransactionResultErrorMessage, error) {
key := KeyFromBlockID(blockID)
transactionResultErrorMessages, err := t.blockCache.Get(t.db.Reader(), key)
transactionResultErrorMessages, err := t.blockCache.Get(t.db.Reader(), blockID)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading