-
Notifications
You must be signed in to change notification settings - Fork 6
feat(aws): Implemented S3-backed blob storage with key-value interface #1014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Akhil Repala <[email protected]>
📝 WalkthroughWalkthroughAdds an AWS S3-backed blob store under database/plugin/blob/aws with four new files: database.go (BlobStoreS3 type, New constructor, S3 client initialization, Get/Put, key-prefix handling, metrics wiring, plugin registration), logger.go (S3Logger wrapper with Infof/Warningf/Debugf/Errorf), metrics.go (Prometheus counters registration for ops and bytes), and commit_timestamp.go (GetCommitTimestamp and SetCommitTimestamp using big.Int byte encoding). All changes are new files; no existing files were modified. Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20–25 minutes Areas requiring extra attention:
Pre-merge checks and finishing touches✅ Passed checks (4 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)database/plugin/blob/aws/database.go (2)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
🔇 Additional comments (2)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Signed-off-by: Akhil Repala <[email protected]>
Signed-off-by: Akhil Repala <[email protected]>
Signed-off-by: Akhil Repala <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
database/plugin/blob/aws/logger.go (1)
28-61: Consider using structured logging and With() for efficiency.The current implementation formats messages with
fmt.Sprintfand repeats the"component"="database"attribute on every log call. This approach:
- Loses the benefits of structured logging when
argscould be key-value pairs- Is less efficient than setting the component field once
Consider refactoring the constructor to use
With():func NewS3Logger(logger *slog.Logger) *S3Logger { if logger == nil { logger = slog.New(slog.NewJSONHandler(io.Discard, nil)) } + logger = logger.With("component", "database") return &S3Logger{logger: logger} } func (g *S3Logger) Infof(msg string, args ...any) { - g.logger.Info( - fmt.Sprintf(msg, args...), - "component", "database", - ) + g.logger.Info(fmt.Sprintf(msg, args...)) } func (g *S3Logger) Warningf(msg string, args ...any) { - g.logger.Warn( - fmt.Sprintf(msg, args...), - "component", "database", - ) + g.logger.Warn(fmt.Sprintf(msg, args...)) } func (g *S3Logger) Debugf(msg string, args ...any) { - g.logger.Debug( - fmt.Sprintf(msg, args...), - "component", "database", - ) + g.logger.Debug(fmt.Sprintf(msg, args...)) } func (g *S3Logger) Errorf(msg string, args ...any) { - g.logger.Error( - fmt.Sprintf(msg, args...), - "component", "database", - ) + g.logger.Error(fmt.Sprintf(msg, args...)) }Note: The Logger interface in
database/plugin/log.gouses method names without the "f" suffix (e.g.,Infoinstead ofInfof), but if printf-style formatting is intentional for this S3-specific logger, the current naming is acceptable.database/plugin/blob/aws/database.go (4)
78-82: Simplify redundant length check.
strings.SplitN(path, "/", 2)always returns a slice with at least one element, solen(parts) == 0on Line 79 is impossible.parts := strings.SplitN(path, "/", 2) - if len(parts) == 0 || parts[0] == "" { + if parts[0] == "" { cancel() return nil, errors.New("s3 blob: invalid S3 path (missing bucket)") }
84-91: Simplify key prefix normalization logic.The key prefix handling can be made more concise.
bucket := parts[0] - keyPrefix := "" - if len(parts) > 1 && parts[1] != "" { - keyPrefix = strings.TrimSuffix(parts[1], "/") - if keyPrefix != "" { - keyPrefix += "/" - } + var keyPrefix string + if len(parts) > 1 { + keyPrefix = strings.Trim(parts[1], "/") + if keyPrefix != "" { + keyPrefix += "/" + } }
146-148: Consider inlining the trivial awsString helper.The
awsStringhelper is a one-line function used only twice. Consider inlining it for simplicity, or retain it if you anticipate more usage in future methods.
167-167: Consider using Debug level for per-operation success logs.Success logs on lines 167 and 182 are emitted at
Infolevel for everyGetandPutoperation. In high-throughput scenarios, this can generate excessive log volume. Consider usingDebuglevel for routine success messages and reservingInfofor significant events.- d.logger.Infof("s3 get %q ok (%d bytes)", key, len(data)) + d.logger.Debugf("s3 get %q ok (%d bytes)", key, len(data))- d.logger.Infof("s3 put %q ok (%d bytes)", key, len(value)) + d.logger.Debugf("s3 put %q ok (%d bytes)", key, len(value))Also applies to: 182-182
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
database/plugin/blob/aws/commit_timestamp.go(1 hunks)database/plugin/blob/aws/database.go(1 hunks)database/plugin/blob/aws/logger.go(1 hunks)database/plugin/blob/aws/metrics.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
database/plugin/blob/aws/metrics.go (1)
database/plugin/blob/aws/database.go (1)
BlobStoreS3(44-52)
database/plugin/blob/aws/database.go (3)
database/plugin/register.go (3)
Register(50-52)PluginEntry(40-46)PluginTypeBlob(26-26)database/plugin/blob/aws/logger.go (2)
S3Logger(24-26)NewS3Logger(28-33)database/plugin/log.go (1)
Logger(18-27)
database/plugin/blob/aws/logger.go (2)
database/plugin/log.go (1)
Logger(18-27)database/plugin/blob/aws/database.go (1)
New(55-114)
database/plugin/blob/aws/commit_timestamp.go (1)
database/plugin/blob/aws/database.go (1)
BlobStoreS3(44-52)
🔇 Additional comments (3)
database/plugin/blob/aws/database.go (2)
151-169: No issue found — GetObject Body is safely closed.The code already follows the AWS SDK recommended pattern: the error is checked before the
deferstatement is reached. IfGetObjectreturns an error on line 156, the function returns immediately without setting up the defer. Thedefer out.Body.Close()on line 160 only executes whenerr == nil, guaranteeing thatBodyis non-nil and safe to close. No changes needed.Likely an incorrect or invalid review comment.
34-41: Disregard the original review comment—the code is correct.The plugin system does not require
NewFromOptionsFuncto be populated. Plugins self-register their metadata (Type, Name) viainit()functions, and actual instantiation occurs through direct calls to each plugin'sNew()function with required parameters (dataDir, logger, promRegistry). TheNewFromOptionsFuncfield in thePluginEntrystruct is unused dead code—GetPlugin()is never called in the codebase. All existing plugins (S3, Badger, GCS, SQLite) follow the same pattern and omit this field.Likely an incorrect or invalid review comment.
database/plugin/blob/aws/commit_timestamp.go (1)
24-30: The negative timestamp concern in this review comment is based on an invalid assumption.This codebase only stores timestamps from
time.Now().UnixMilli(), which always produces positive int64 values. Thebig.Int.SetInt64().Bytes()→SetBytes().Int64()round-trip works correctly for positive values; the concern about negative timestamps breaking the encoding is moot since they never occur in practice.The code correctly serializes and deserializes positive int64 timestamps. The review's technical premise—that negative values won't round-trip correctly—is true in isolation but irrelevant to this codebase's actual data domain.
Regarding missing keys: there is an inconsistency worth noting (AWS propagates the error while SQLite's implementation comments that missing records are not an error), but whether this is a bug depends on the intended behavior—this warrants a separate clarification question if needed rather than a categorical issue.
Likely an incorrect or invalid review comment.
| func (d *BlobStoreS3) registerBlobMetrics() { | ||
| opsTotal := prometheus.NewCounter( | ||
| prometheus.CounterOpts{ | ||
| Name: s3MetricNamePrefix + "ops_total", | ||
| Help: "Total number of S3 blob operations", | ||
| }, | ||
| ) | ||
| bytesTotal := prometheus.NewCounter( | ||
| prometheus.CounterOpts{ | ||
| Name: s3MetricNamePrefix + "bytes_total", | ||
| Help: "Total bytes read/written for S3 blob operations", | ||
| }, | ||
| ) | ||
|
|
||
| d.promRegistry.MustRegister(opsTotal, bytesTotal) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Metrics are created but never used or incremented.
The counters opsTotal and bytesTotal are created as local variables but are never stored as fields on BlobStoreS3, making it impossible for the Get and Put methods to increment them. These metrics will always report zero.
Store the counters as fields and increment them in the relevant operations:
+type BlobStoreS3 struct {
+ promRegistry prometheus.Registerer
+ startupCtx context.Context
+ logger *S3Logger
+ client *s3.Client
+ bucket string
+ prefix string
+ startupCancel context.CancelFunc
+ opsTotal prometheus.Counter
+ bytesTotal prometheus.Counter
+}
func (d *BlobStoreS3) registerBlobMetrics() {
- opsTotal := prometheus.NewCounter(
+ d.opsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: s3MetricNamePrefix + "ops_total",
Help: "Total number of S3 blob operations",
},
)
- bytesTotal := prometheus.NewCounter(
+ d.bytesTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: s3MetricNamePrefix + "bytes_total",
Help: "Total bytes read/written for S3 blob operations",
},
)
- d.promRegistry.MustRegister(opsTotal, bytesTotal)
+ d.promRegistry.MustRegister(d.opsTotal, d.bytesTotal)
}Then increment in Get and Put operations (in database.go):
// In Get method after successful read:
if d.opsTotal != nil {
d.opsTotal.Inc()
d.bytesTotal.Add(float64(len(data)))
}
// In Put method after successful write:
if d.opsTotal != nil {
d.opsTotal.Inc()
d.bytesTotal.Add(float64(len(value)))
}🤖 Prompt for AI Agents
In database/plugin/blob/aws/metrics.go around lines 21 to 36, the prometheus
counters opsTotal and bytesTotal are created as local variables and never stored
on the BlobStoreS3 struct, so they cannot be incremented by Get/Put and will
always report zero; modify the BlobStoreS3 type to add fields for opsTotal and
bytesTotal, assign the created counters to those struct fields inside
registerBlobMetrics (instead of local variables), register them with
promRegistry, and then update the Get and Put implementations (in database.go)
to increment d.opsTotal.Inc() and d.bytesTotal.Add(float64(len(data))) after
successful reads and d.opsTotal.Inc() and d.bytesTotal.Add(float64(len(value)))
after successful writes (guarding with nil checks if necessary).
Signed-off-by: Akhil Repala <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (2)
database/plugin/blob/aws/database.go (2)
43-52: Acknowledge previous review:startupCtxfield remains unused.As noted in the previous review, the
startupCtxfield is only written to and never read after initialization. Consider removing it from the struct if it serves no purpose, or document its intended use.
110-113: Acknowledge previous review: Context leak wheninit()fails.As noted in the previous review, when
db.init()returns an error on line 110, the context created on line 60 is not canceled before returning, leading to a resource leak. The previous review suggested addingcancel()before the error return or usingdefer cancel()after line 60.
🧹 Nitpick comments (2)
database/plugin/blob/aws/database.go (2)
84-91: Consider simplifying the key prefix normalization.The current logic removes the trailing slash and then conditionally adds it back. This can be simplified for better readability.
Apply this diff to simplify:
bucket := parts[0] keyPrefix := "" if len(parts) > 1 && parts[1] != "" { - keyPrefix = strings.TrimSuffix(parts[1], "/") - if keyPrefix != "" { - keyPrefix += "/" - } + keyPrefix = strings.TrimSuffix(parts[1], "/") + "/" }
151-159: Consider distinguishing NotFound errors from other failures.The current implementation treats all S3 errors the same way. Blob storage callers often need to distinguish between a missing key (NotFound) and other errors (access denied, network issues). Consider checking for specific AWS error types to provide better error semantics.
Example pattern using AWS SDK v2 error handling:
out, err := d.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &d.bucket, Key: awsString(d.fullKey(key)), }) if err != nil { // Check for NoSuchKey error var nsk *types.NoSuchKey if errors.As(err, &nsk) { d.logger.Debugf("s3 get %q not found", key) return nil, fmt.Errorf("key not found: %w", err) } d.logger.Errorf("s3 get %q failed: %v", key, err) return nil, err }You'll need to import
"github.com/aws/aws-sdk-go-v2/service/s3/types"for error types.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
database/plugin/blob/aws/database.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
database/plugin/blob/aws/database.go (3)
database/plugin/register.go (3)
Register(50-52)PluginEntry(40-46)PluginTypeBlob(26-26)database/plugin/blob/aws/logger.go (2)
S3Logger(24-26)NewS3Logger(28-33)database/plugin/log.go (1)
Logger(18-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: lint
- GitHub Check: go-test (1.24.x, ubuntu-latest)
- GitHub Check: nilaway
- GitHub Check: Analyze (go)
🔇 Additional comments (1)
database/plugin/blob/aws/database.go (1)
131-148: LGTM: Accessor and helper methods are well-designed.The accessor methods (
Client,Bucket) and helpers (fullKey,awsString) are straightforward and correctly implement their intended functionality.
| // Register plugin | ||
| func init() { | ||
| plugin.Register( | ||
| plugin.PluginEntry{ | ||
| Type: plugin.PluginTypeBlob, | ||
| Name: "s3", | ||
| }, | ||
| ) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify that the plugin registration is complete.
The plugin registration only sets Type and Name, but PluginEntry includes additional fields (NewFromOptionsFunc, Description, Options) that are not populated. Confirm whether NewFromOptionsFunc is required for the plugin system to instantiate the S3 blob store, or if the current registration is sufficient.
Run the following script to check how other blob plugins register themselves:
🏁 Script executed:
#!/bin/bash
# Description: Check plugin registration patterns in other blob implementations
# Search for plugin.Register calls in other blob plugins
rg -nP --type=go 'plugin\.Register\(' -A 8 -g 'database/plugin/blob/**'Length of output: 1592
---
🏁 Script executed:
```shell
#!/bin/bash
# Description: Check PluginEntry type definition and plugin instantiation pattern
# Find PluginEntry type definition
rg -nP --type=go 'type\s+PluginEntry\s+struct' -A 15
# Also search for how plugins are actually instantiated/used
rg -nP --type=go 'NewFromOptionsFunc' -B 2 -A 2
Length of output: 1443
---
🏁 Script executed:
```shell
#!/bin/bash
# Description: Check if blob plugins are instantiated through the plugin system
# Search for GetPlugin calls related to blob
rg -nP --type=go 'GetPlugin|NewFromOptionsFunc' -B 3 -A 3 -g '!**/test/**'
# Search for BlobStore instantiation patterns
rg -nP --type=go 'BlobStore|blob.*New' -B 2 -A 2 | head -50
# Check if there are any New() functions in blob implementations
fd --type f -name 'database.go' -path '*/blob/*' -exec grep -l 'func New' {} \;
Length of output: 3763
---
🏁 Script executed:
```shell
#!/bin/bash
# Description: Find blob.New() implementation and understand instantiation flow
# Search for blob.New() function
rg -nP --type=go 'func New\(' -A 20 'database/plugin/blob/store.go'
# Check for all files in blob plugin directory
fd --type f --path 'database/plugin/blob/*' -name '*.go'
# Search for plugin system usage with blob
rg -nP --type=go 'GetPlugin.*PluginTypeBlob|plugin\.PluginTypeBlob' -B 3 -A 3
Length of output: 1676
---
🏁 Script executed:
```shell
#!/bin/bash
# Description: Verify if pluginName is truly unused in blob.New()
# Search complete blob.New() and surrounding context
rg -nP --type=go 'func New\(' -A 30 'database/plugin/blob/store.go'
# Search for any conditional logic or factory pattern
rg -nP --type=go 'pluginName|GetPlugin' 'database/plugin/blob/store.go'
Length of output: 275
Plugin registration is incomplete and will cause runtime failures.
The PluginEntry requires a NewFromOptionsFunc field that creates and returns a Plugin instance. Without it, calling GetPlugin(plugin.PluginTypeBlob, "s3") will invoke a nil function pointer and panic. All three blob plugin implementations (S3, Badger, GCS) have this same issue. Each blob plugin should define a factory function and register it.
Example fix needed:
func init() {
plugin.Register(
plugin.PluginEntry{
Type: plugin.PluginTypeBlob,
Name: "s3",
NewFromOptionsFunc: func() plugin.Plugin {
// Return properly initialized plugin instance
},
},
)
}Note: The pluginName parameter in database/plugin/blob/store.go's New() function is also unused—the implementation hardcodes badgerPlugin.New() regardless of the plugin name passed in.
🤖 Prompt for AI Agents
In database/plugin/blob/aws/database.go around lines 33-41, the plugin
registration is missing the required NewFromOptionsFunc which will cause a nil
function panic at runtime; add a NewFromOptionsFunc that returns a properly
initialized S3 blob plugin instance (accepting options as needed), and apply the
same pattern to the Badger and GCS plugin registration files so each PluginEntry
provides a factory. Also update database/plugin/blob/store.go to stop ignoring
the pluginName parameter and instead choose/instantiate the correct plugin
implementation based on pluginName (call the registered factory/new-from-options
for the requested plugin) so New() returns the requested plugin rather than
hardcoding the Badger implementation.
| // Get reads the value at key. | ||
| func (d *BlobStoreS3) Get(ctx context.Context, key string) ([]byte, error) { | ||
| out, err := d.client.GetObject(ctx, &s3.GetObjectInput{ | ||
| Bucket: &d.bucket, | ||
| Key: awsString(d.fullKey(key)), | ||
| }) | ||
| if err != nil { | ||
| d.logger.Errorf("s3 get %q failed: %v", key, err) | ||
| return nil, err | ||
| } | ||
| defer out.Body.Close() | ||
|
|
||
| data, err := io.ReadAll(out.Body) | ||
| if err != nil { | ||
| d.logger.Errorf("s3 read %q failed: %v", key, err) | ||
| return nil, err | ||
| } | ||
| d.logger.Infof("s3 get %q ok (%d bytes)", key, len(data)) | ||
| return data, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing metrics recording in Get operation.
The BlobStoreS3 struct includes a Prometheus registry and the PR summary mentions "Added metrics and logging for all operations," but the Get method does not record any metrics. According to the AI summary, metrics.go defines counters for blob operations (ops_total, bytes_total), but they are not incremented here.
Ensure that metrics are recorded for successful and failed Get operations, including bytes read. For example:
// After successful read (before line 168):
if d.promRegistry != nil {
// Increment ops_total counter with labels
// Increment bytes_total counter with len(data)
}Note: The exact implementation depends on how metrics are defined in metrics.go.
🤖 Prompt for AI Agents
In database/plugin/blob/aws/database.go around lines 150-169, the Get method
logs but does not update Prometheus metrics; update it to increment the blob
operation counters on both success and error (use the same label set you use
elsewhere: operation="get", bucket/name labels as appropriate) and on success
also increment the bytes_total counter by len(data); guard updates with a nil
check on d.promRegistry (or whatever metric handles are stored) so tests/older
builds without metrics won't panic, and place the success increments after the
ReadAll succeeds but before returning, and the error increment(s) in each error
branch before returning.
| // Put writes a value to key. | ||
| func (d *BlobStoreS3) Put(ctx context.Context, key string, value []byte) error { | ||
| _, err := d.client.PutObject(ctx, &s3.PutObjectInput{ | ||
| Bucket: &d.bucket, | ||
| Key: awsString(d.fullKey(key)), | ||
| Body: bytes.NewReader(value), | ||
| }) | ||
| if err != nil { | ||
| d.logger.Errorf("s3 put %q failed: %v", key, err) | ||
| return err | ||
| } | ||
| d.logger.Infof("s3 put %q ok (%d bytes)", key, len(value)) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing metrics recording in Put operation.
Similar to the Get method, the Put method does not record any metrics despite the metrics infrastructure being set up. The PR summary explicitly mentions "Added metrics and logging for all operations," but metrics are not incremented here.
Ensure that metrics are recorded for successful and failed Put operations, including bytes written. For example:
// After successful put (before line 183):
if d.promRegistry != nil {
// Increment ops_total counter with labels
// Increment bytes_total counter with len(value)
}Note: The exact implementation depends on how metrics are defined in metrics.go.
🤖 Prompt for AI Agents
In database/plugin/blob/aws/database.go around lines 171 to 184, the Put method
currently logs success/failure but does not update Prometheus metrics; add the
same metrics updates used in Get: check d.promRegistry (or equivalent) is
non-nil, on error increment the ops_total counter with labels (operation="put",
result="error" or similar) and on success increment ops_total with result="ok"
and increment the bytes_total (or equivalent) counter by len(value); match the
label names and metric variables defined in metrics.go so the Put operation
records both operation count and bytes written.
Signed-off-by: Akhil Repala <[email protected]>
Closes #333
Summary by CodeRabbit