-
Notifications
You must be signed in to change notification settings - Fork 6
feat(aws): Implemented S3-backed blob storage with key-value interface #1014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c6ae7e0
2680d89
46177b2
dd3a019
0abec74
8103433
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| // Copyright 2025 Blink Labs Software | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package aws | ||
|
|
||
| import ( | ||
| "context" | ||
| "math/big" | ||
| ) | ||
|
|
||
| const commitTimestampBlobKey = "metadata_commit_timestamp" | ||
|
|
||
| func (b *BlobStoreS3) GetCommitTimestamp(ctx context.Context) (int64, error) { | ||
| data, err := b.Get(ctx, commitTimestampBlobKey) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| return new(big.Int).SetBytes(data).Int64(), nil | ||
| } | ||
|
|
||
| func (b *BlobStoreS3) SetCommitTimestamp(ctx context.Context, ts int64) error { | ||
| raw := new(big.Int).SetInt64(ts).Bytes() | ||
| return b.Put(ctx, commitTimestampBlobKey, raw) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| // Copyright 2025 Blink Labs Software | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package aws | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "log/slog" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/aws/aws-sdk-go-v2/config" | ||
| "github.com/aws/aws-sdk-go-v2/service/s3" | ||
| "github.com/blinklabs-io/dingo/database/plugin" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| ) | ||
|
|
||
| // Register plugin | ||
| func init() { | ||
| plugin.Register( | ||
| plugin.PluginEntry{ | ||
| Type: plugin.PluginTypeBlob, | ||
| Name: "s3", | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| // BlobStoreS3 stores data in an AWS S3 bucket | ||
| type BlobStoreS3 struct { | ||
| promRegistry prometheus.Registerer | ||
| startupCtx context.Context | ||
| logger *S3Logger | ||
| client *s3.Client | ||
| bucket string | ||
| prefix string | ||
| startupCancel context.CancelFunc | ||
| } | ||
|
|
||
| // New creates a new S3-backed blob store and dataDir must be "s3://bucket" or "s3://bucket/prefix" | ||
| func New( | ||
| dataDir string, | ||
| logger *slog.Logger, | ||
| promRegistry prometheus.Registerer, | ||
| ) (*BlobStoreS3, error) { | ||
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if logger == nil { | ||
| logger = slog.New(slog.NewJSONHandler(io.Discard, nil)) | ||
| } | ||
|
|
||
| const prefix = "s3://" | ||
| if !strings.HasPrefix(strings.ToLower(dataDir), prefix) { | ||
| cancel() | ||
| return nil, errors.New("s3 blob: expected dataDir='s3://<bucket>[/prefix]'") | ||
| } | ||
|
|
||
| path := strings.TrimPrefix(strings.ToLower(dataDir), prefix) | ||
| if path == "" { | ||
| cancel() | ||
| return nil, errors.New("s3 blob: bucket not set") | ||
| } | ||
|
|
||
| parts := strings.SplitN(path, "/", 2) | ||
| if len(parts) == 0 || parts[0] == "" { | ||
| cancel() | ||
| return nil, errors.New("s3 blob: invalid S3 path (missing bucket)") | ||
| } | ||
|
|
||
| bucket := parts[0] | ||
| keyPrefix := "" | ||
| if len(parts) > 1 && parts[1] != "" { | ||
| keyPrefix = strings.TrimSuffix(parts[1], "/") | ||
| if keyPrefix != "" { | ||
| keyPrefix += "/" | ||
| } | ||
| } | ||
|
|
||
| // Loads all the aws credentials. | ||
| awsCfg, err := config.LoadDefaultConfig(ctx) | ||
| if err != nil { | ||
| cancel() | ||
| return nil, fmt.Errorf("s3 blob: load default AWS config: %w", err) | ||
| } | ||
| client := s3.NewFromConfig(awsCfg) | ||
|
|
||
| db := &BlobStoreS3{ | ||
| logger: NewS3Logger(logger), | ||
| promRegistry: promRegistry, | ||
| client: client, | ||
| bucket: bucket, | ||
| prefix: keyPrefix, | ||
| startupCtx: ctx, | ||
| startupCancel: cancel, | ||
| } | ||
| if err := db.init(); err != nil { | ||
| cancel() | ||
| return nil, err | ||
| } | ||
| return db, nil | ||
| } | ||
|
|
||
| func (d *BlobStoreS3) init() error { | ||
| // Configure metrics | ||
| if d.promRegistry != nil { | ||
| d.registerBlobMetrics() | ||
| } | ||
|
|
||
| // Close the startup context so that initialization will succeed. | ||
| if d.startupCancel != nil { | ||
| d.startupCancel() | ||
| d.startupCancel = nil | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Returns the S3 client. | ||
| func (d *BlobStoreS3) Client() *s3.Client { | ||
| return d.client | ||
| } | ||
|
|
||
| // Returns the bucket handle. | ||
| func (d *BlobStoreS3) Bucket() string { | ||
| return d.bucket | ||
| } | ||
|
|
||
| // Returns the S3 key with an optional prefix. | ||
| func (d *BlobStoreS3) fullKey(key string) string { | ||
| return d.prefix + key | ||
| } | ||
|
|
||
| func awsString(s string) *string { | ||
| return &s | ||
| } | ||
|
|
||
| // Get reads the value at key. | ||
| func (d *BlobStoreS3) Get(ctx context.Context, key string) ([]byte, error) { | ||
| out, err := d.client.GetObject(ctx, &s3.GetObjectInput{ | ||
| Bucket: &d.bucket, | ||
| Key: awsString(d.fullKey(key)), | ||
| }) | ||
| if err != nil { | ||
| d.logger.Errorf("s3 get %q failed: %v", key, err) | ||
| return nil, err | ||
| } | ||
| defer out.Body.Close() | ||
|
|
||
| data, err := io.ReadAll(out.Body) | ||
| if err != nil { | ||
| d.logger.Errorf("s3 read %q failed: %v", key, err) | ||
| return nil, err | ||
| } | ||
| d.logger.Infof("s3 get %q ok (%d bytes)", key, len(data)) | ||
| return data, nil | ||
| } | ||
|
Comment on lines
+150
to
+169
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing metrics recording in Get operation. The Ensure that metrics are recorded for successful and failed Get operations, including bytes read. For example: // After successful read (before line 168):
if d.promRegistry != nil {
// Increment ops_total counter with labels
// Increment bytes_total counter with len(data)
}Note: The exact implementation depends on how metrics are defined in 🤖 Prompt for AI Agents |
||
|
|
||
| // Put writes a value to key. | ||
| func (d *BlobStoreS3) Put(ctx context.Context, key string, value []byte) error { | ||
| _, err := d.client.PutObject(ctx, &s3.PutObjectInput{ | ||
| Bucket: &d.bucket, | ||
| Key: awsString(d.fullKey(key)), | ||
| Body: bytes.NewReader(value), | ||
| }) | ||
| if err != nil { | ||
| d.logger.Errorf("s3 put %q failed: %v", key, err) | ||
| return err | ||
| } | ||
| d.logger.Infof("s3 put %q ok (%d bytes)", key, len(value)) | ||
| return nil | ||
| } | ||
|
Comment on lines
+171
to
+184
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing metrics recording in Put operation. Similar to the Ensure that metrics are recorded for successful and failed Put operations, including bytes written. For example: // After successful put (before line 183):
if d.promRegistry != nil {
// Increment ops_total counter with labels
// Increment bytes_total counter with len(value)
}Note: The exact implementation depends on how metrics are defined in 🤖 Prompt for AI Agents |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| // Copyright 2025 Blink Labs Software | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package aws | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "io" | ||
| "log/slog" | ||
| ) | ||
|
|
||
| // S3Logger is a thin wrapper giving our logger a consistent interface. | ||
| type S3Logger struct { | ||
| logger *slog.Logger | ||
| } | ||
|
|
||
| func NewS3Logger(logger *slog.Logger) *S3Logger { | ||
| if logger == nil { | ||
| logger = slog.New(slog.NewJSONHandler(io.Discard, nil)) | ||
| } | ||
| return &S3Logger{logger: logger} | ||
| } | ||
|
|
||
| func (g *S3Logger) Infof(msg string, args ...any) { | ||
| g.logger.Info( | ||
| fmt.Sprintf(msg, args...), | ||
| "component", "database", | ||
| ) | ||
| } | ||
|
|
||
| func (g *S3Logger) Warningf(msg string, args ...any) { | ||
| g.logger.Warn( | ||
| fmt.Sprintf(msg, args...), | ||
| "component", "database", | ||
| ) | ||
| } | ||
|
|
||
| func (g *S3Logger) Debugf(msg string, args ...any) { | ||
| g.logger.Debug( | ||
| fmt.Sprintf(msg, args...), | ||
| "component", "database", | ||
| ) | ||
| } | ||
|
|
||
| func (g *S3Logger) Errorf(msg string, args ...any) { | ||
| g.logger.Error( | ||
| fmt.Sprintf(msg, args...), | ||
| "component", "database", | ||
| ) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| // Copyright 2025 Blink Labs Software | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package aws | ||
|
|
||
| import "github.com/prometheus/client_golang/prometheus" | ||
|
|
||
| const s3MetricNamePrefix = "database_blob_" | ||
|
|
||
| func (d *BlobStoreS3) registerBlobMetrics() { | ||
| opsTotal := prometheus.NewCounter( | ||
| prometheus.CounterOpts{ | ||
| Name: s3MetricNamePrefix + "ops_total", | ||
| Help: "Total number of S3 blob operations", | ||
| }, | ||
| ) | ||
| bytesTotal := prometheus.NewCounter( | ||
| prometheus.CounterOpts{ | ||
| Name: s3MetricNamePrefix + "bytes_total", | ||
| Help: "Total bytes read/written for S3 blob operations", | ||
| }, | ||
| ) | ||
|
|
||
| d.promRegistry.MustRegister(opsTotal, bytesTotal) | ||
| } | ||
|
Comment on lines
+21
to
+36
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Metrics are created but never used or incremented. The counters Store the counters as fields and increment them in the relevant operations: +type BlobStoreS3 struct {
+ promRegistry prometheus.Registerer
+ startupCtx context.Context
+ logger *S3Logger
+ client *s3.Client
+ bucket string
+ prefix string
+ startupCancel context.CancelFunc
+ opsTotal prometheus.Counter
+ bytesTotal prometheus.Counter
+}
func (d *BlobStoreS3) registerBlobMetrics() {
- opsTotal := prometheus.NewCounter(
+ d.opsTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: s3MetricNamePrefix + "ops_total",
Help: "Total number of S3 blob operations",
},
)
- bytesTotal := prometheus.NewCounter(
+ d.bytesTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: s3MetricNamePrefix + "bytes_total",
Help: "Total bytes read/written for S3 blob operations",
},
)
- d.promRegistry.MustRegister(opsTotal, bytesTotal)
+ d.promRegistry.MustRegister(d.opsTotal, d.bytesTotal)
}Then increment in // In Get method after successful read:
if d.opsTotal != nil {
d.opsTotal.Inc()
d.bytesTotal.Add(float64(len(data)))
}
// In Put method after successful write:
if d.opsTotal != nil {
d.opsTotal.Inc()
d.bytesTotal.Add(float64(len(value)))
}🤖 Prompt for AI Agents |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify that the plugin registration is complete.
The plugin registration only sets
TypeandName, butPluginEntryincludes additional fields (NewFromOptionsFunc,Description,Options) that are not populated. Confirm whetherNewFromOptionsFuncis required for the plugin system to instantiate the S3 blob store, or if the current registration is sufficient.Run the following script to check how other blob plugins register themselves:
🏁 Script executed:
Length of output: 1592
Length of output: 1443
Length of output: 3763
Length of output: 1676
Length of output: 275
Plugin registration is incomplete and will cause runtime failures.
The
PluginEntryrequires aNewFromOptionsFuncfield that creates and returns aPlugininstance. Without it, callingGetPlugin(plugin.PluginTypeBlob, "s3")will invoke a nil function pointer and panic. All three blob plugin implementations (S3, Badger, GCS) have this same issue. Each blob plugin should define a factory function and register it.Example fix needed:
Note: The
pluginNameparameter indatabase/plugin/blob/store.go'sNew()function is also unused—the implementation hardcodesbadgerPlugin.New()regardless of the plugin name passed in.🤖 Prompt for AI Agents