Skip to content

Commit 70a3bd2

Browse files
committed
godev: cleanup storage bucket api
Factored out the boilerplate to interact with the storage buckets. Refactored the interface to more closely match the Cloud Storage API. Change-Id: I8c9bb9ba271cae9aa0fd15e24bd127d7523e3766 Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/524377 TryBot-Result: Gopher Robot <[email protected]> Reviewed-by: Hyang-Ah Hana Kim <[email protected]> Run-TryBot: Jamal Carvalho <[email protected]>
1 parent bcc0643 commit 70a3bd2

File tree

5 files changed

+179
-225
lines changed

5 files changed

+179
-225
lines changed

godev/cmd/telemetrygodev/main.go

+9-61
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func main() {
3838
flag.Parse()
3939
ctx := context.Background()
4040
cfg := config.NewConfig()
41-
buckets, err := buckets(ctx, cfg)
41+
buckets, err := storage.NewAPI(ctx, cfg)
4242
if err != nil {
4343
log.Fatal(err)
4444
}
@@ -77,7 +77,7 @@ type indexPage struct {
7777
Reports []*link
7878
}
7979

80-
func handleRoot(fsys fs.FS, ucfg *tconfig.Config, buckets *stores) content.HandlerFunc {
80+
func handleRoot(fsys fs.FS, ucfg *tconfig.Config, buckets *storage.API) content.HandlerFunc {
8181
cserv := content.Server(fsys)
8282
return func(w http.ResponseWriter, r *http.Request) error {
8383
if r.URL.Path != "/" {
@@ -87,10 +87,7 @@ func handleRoot(fsys fs.FS, ucfg *tconfig.Config, buckets *stores) content.Handl
8787
page := indexPage{}
8888

8989
ctx := r.Context()
90-
it, err := buckets.chart.List(ctx, "")
91-
if err != nil {
92-
return err
93-
}
90+
it := buckets.Chart.Objects(ctx, "")
9491
for {
9592
obj, err := it.Next()
9693
if errors.Is(err, storage.ErrObjectIteratorDone) {
@@ -99,18 +96,15 @@ func handleRoot(fsys fs.FS, ucfg *tconfig.Config, buckets *stores) content.Handl
9996
date := strings.TrimSuffix(obj, ".json")
10097
page.Charts = append(page.Charts, &link{Text: date, URL: "/charts/" + date})
10198
}
102-
it, err = buckets.merge.List(ctx, "")
103-
if err != nil {
104-
return err
105-
}
99+
it = buckets.Merge.Objects(ctx, "")
106100
for {
107101
obj, err := it.Next()
108102
if errors.Is(err, storage.ErrObjectIteratorDone) {
109103
break
110104
}
111105
page.Reports = append(page.Reports, &link{
112106
Text: strings.TrimSuffix(obj, ".json"),
113-
URL: buckets.merge.Location() + "/" + obj,
107+
URL: buckets.Merge.URI() + "/" + obj,
114108
})
115109
}
116110
return content.Template(w, fsys, "index.html", page, http.StatusOK)
@@ -121,11 +115,11 @@ type chartPage struct {
121115
Charts map[string]any
122116
}
123117

124-
func handleChart(fsys fs.FS, ucfg *tconfig.Config, buckets *stores) content.HandlerFunc {
118+
func handleChart(fsys fs.FS, ucfg *tconfig.Config, buckets *storage.API) content.HandlerFunc {
125119
return func(w http.ResponseWriter, r *http.Request) error {
126120
ctx := r.Context()
127121
p := strings.TrimPrefix(path.Clean(r.URL.Path), "/charts/")
128-
reader, err := buckets.chart.Reader(ctx, p+".json")
122+
reader, err := buckets.Chart.Object(p + ".json").NewReader(ctx)
129123
if errors.Is(err, storage.ErrObjectNotExist) {
130124
return content.Status(w, http.StatusNotFound)
131125
} else if err != nil {
@@ -147,7 +141,7 @@ func handleChart(fsys fs.FS, ucfg *tconfig.Config, buckets *stores) content.Hand
147141
}
148142
}
149143

150-
func handleUpload(ucfg *tconfig.Config, buckets *stores) content.HandlerFunc {
144+
func handleUpload(ucfg *tconfig.Config, buckets *storage.API) content.HandlerFunc {
151145
return func(w http.ResponseWriter, r *http.Request) error {
152146
if r.Method == "POST" {
153147
var report telemetry.Report
@@ -160,7 +154,7 @@ func handleUpload(ucfg *tconfig.Config, buckets *stores) content.HandlerFunc {
160154
// TODO: capture metrics for collisions.
161155
ctx := r.Context()
162156
name := fmt.Sprintf("%s/%g.json", report.Week, report.X)
163-
f, err := buckets.upload.Writer(ctx, name)
157+
f, err := buckets.Upload.Object(name).NewWriter(ctx)
164158
if err != nil {
165159
return err
166160
}
@@ -224,49 +218,3 @@ func fsys(fromOS bool) fs.FS {
224218
}
225219
return f
226220
}
227-
228-
type stores struct {
229-
upload storage.Store
230-
merge storage.Store
231-
chart storage.Store
232-
}
233-
234-
func buckets(ctx context.Context, cfg *config.Config) (*stores, error) {
235-
if cfg.UseGCS && !cfg.OnCloudRun() {
236-
if err := os.Setenv("STORAGE_EMULATOR_HOST", cfg.StorageEmulatorHost); err != nil {
237-
return nil, err
238-
}
239-
}
240-
var upload storage.Store
241-
var merge storage.Store
242-
var chart storage.Store
243-
var err error
244-
if cfg.UseGCS {
245-
upload, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.UploadBucket)
246-
if err != nil {
247-
return nil, err
248-
}
249-
merge, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.MergedBucket)
250-
if err != nil {
251-
return nil, err
252-
}
253-
chart, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.ChartDataBucket)
254-
if err != nil {
255-
return nil, err
256-
}
257-
} else {
258-
upload, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.UploadBucket)
259-
if err != nil {
260-
return nil, err
261-
}
262-
merge, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.MergedBucket)
263-
if err != nil {
264-
return nil, err
265-
}
266-
chart, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.ChartDataBucket)
267-
if err != nil {
268-
return nil, err
269-
}
270-
}
271-
return &stores{upload, merge, chart}, nil
272-
}

godev/cmd/worker/main.go

+10-59
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func main() {
3535
flag.Parse()
3636
ctx := context.Background()
3737
cfg := config.NewConfig()
38-
buckets, err := buckets(ctx, cfg)
38+
buckets, err := storage.NewAPI(ctx, cfg)
3939
if err != nil {
4040
log.Fatal(err)
4141
}
@@ -67,18 +67,15 @@ func main() {
6767
}
6868

6969
// TODO: monitor duration and processed data volume.
70-
func handleMerge(cfg *tconfig.Config, s *stores) content.HandlerFunc {
70+
func handleMerge(cfg *tconfig.Config, s *storage.API) content.HandlerFunc {
7171
return func(w http.ResponseWriter, r *http.Request) error {
7272
ctx := r.Context()
7373
date := r.URL.Query().Get("date")
7474
if _, err := time.Parse("2006-01-02", date); err != nil {
7575
return content.Error(err, http.StatusBadRequest)
7676
}
77-
it, err := s.upload.List(ctx, date)
78-
if err != nil {
79-
return err
80-
}
81-
mergeWriter, err := s.merge.Writer(ctx, date+".json")
77+
it := s.Upload.Objects(ctx, date)
78+
mergeWriter, err := s.Merge.Object(date + ".json").NewWriter(ctx)
8279
if err != nil {
8380
return err
8481
}
@@ -94,7 +91,7 @@ func handleMerge(cfg *tconfig.Config, s *stores) content.HandlerFunc {
9491
return err
9592
}
9693
count++
97-
reader, err := s.upload.Reader(ctx, obj)
94+
reader, err := s.Upload.Object(obj).NewReader(ctx)
9895
if err != nil {
9996
return err
10097
}
@@ -113,20 +110,20 @@ func handleMerge(cfg *tconfig.Config, s *stores) content.HandlerFunc {
113110
if err := mergeWriter.Close(); err != nil {
114111
return err
115112
}
116-
msg := fmt.Sprintf("merged %d reports into %s/%s", count, s.merge.Location(), date)
113+
msg := fmt.Sprintf("merged %d reports into %s/%s", count, s.Merge.URI(), date)
117114
return content.Text(w, msg, http.StatusOK)
118115
}
119116
}
120117

121-
func handleChart(cfg *tconfig.Config, s *stores) content.HandlerFunc {
118+
func handleChart(cfg *tconfig.Config, s *storage.API) content.HandlerFunc {
122119
return func(w http.ResponseWriter, r *http.Request) error {
123120
ctx := r.Context()
124121
// TODO: use start date and end date to create a timeseries of data.
125122
date := r.URL.Query().Get("date")
126123
if _, err := time.Parse("2006-01-02", date); err != nil {
127124
return content.Error(err, http.StatusBadRequest)
128125
}
129-
in, err := s.merge.Reader(ctx, date+".json")
126+
in, err := s.Merge.Object(date + ".json").NewReader(ctx)
130127
if errors.Is(err, storage.ErrObjectNotExist) {
131128
return content.Error(err, http.StatusNotFound)
132129
}
@@ -153,7 +150,7 @@ func handleChart(cfg *tconfig.Config, s *stores) content.HandlerFunc {
153150
data := nest(reports)
154151
charts := charts(cfg, date, data, xs)
155152
obj := fmt.Sprintf("%s.json", date)
156-
out, err := s.chart.Writer(ctx, obj)
153+
out, err := s.Chart.Object(obj).NewWriter(ctx)
157154
if err != nil {
158155
return err
159156
}
@@ -166,7 +163,7 @@ func handleChart(cfg *tconfig.Config, s *stores) content.HandlerFunc {
166163
return err
167164
}
168165

169-
msg := fmt.Sprintf("processed %d reports into %s", len(reports), s.chart.Location()+"/"+obj)
166+
msg := fmt.Sprintf("processed %d reports into %s", len(reports), s.Chart.URI()+"/"+obj)
170167
return content.Text(w, msg, http.StatusOK)
171168
}
172169
}
@@ -459,49 +456,3 @@ func fsys(fromOS bool) fs.FS {
459456
}
460457
return f
461458
}
462-
463-
type stores struct {
464-
upload storage.Store
465-
merge storage.Store
466-
chart storage.Store
467-
}
468-
469-
func buckets(ctx context.Context, cfg *config.Config) (*stores, error) {
470-
if cfg.UseGCS && !cfg.OnCloudRun() {
471-
if err := os.Setenv("STORAGE_EMULATOR_HOST", cfg.StorageEmulatorHost); err != nil {
472-
return nil, err
473-
}
474-
}
475-
var upload storage.Store
476-
var merge storage.Store
477-
var chart storage.Store
478-
var err error
479-
if cfg.UseGCS {
480-
upload, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.UploadBucket)
481-
if err != nil {
482-
return nil, err
483-
}
484-
merge, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.MergedBucket)
485-
if err != nil {
486-
return nil, err
487-
}
488-
chart, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.ChartDataBucket)
489-
if err != nil {
490-
return nil, err
491-
}
492-
} else {
493-
upload, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.UploadBucket)
494-
if err != nil {
495-
return nil, err
496-
}
497-
merge, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.MergedBucket)
498-
if err != nil {
499-
return nil, err
500-
}
501-
chart, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.ChartDataBucket)
502-
if err != nil {
503-
return nil, err
504-
}
505-
}
506-
return &stores{upload, merge, chart}, nil
507-
}

godev/internal/storage/api.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"os"
6+
7+
"golang.org/x/telemetry/godev/internal/config"
8+
)
9+
10+
type API struct {
11+
Upload BucketHandle
12+
Merge BucketHandle
13+
Chart BucketHandle
14+
}
15+
16+
func NewAPI(ctx context.Context, cfg *config.Config) (*API, error) {
17+
if cfg.UseGCS && !cfg.OnCloudRun() {
18+
if err := os.Setenv("STORAGE_EMULATOR_HOST", cfg.StorageEmulatorHost); err != nil {
19+
return nil, err
20+
}
21+
}
22+
upload, err := newBucket(ctx, cfg, cfg.UploadBucket)
23+
if err != nil {
24+
return nil, err
25+
}
26+
merge, err := newBucket(ctx, cfg, cfg.MergedBucket)
27+
if err != nil {
28+
return nil, err
29+
}
30+
chart, err := newBucket(ctx, cfg, cfg.ChartDataBucket)
31+
if err != nil {
32+
return nil, err
33+
}
34+
return &API{upload, merge, chart}, nil
35+
}
36+
37+
func newBucket(ctx context.Context, cfg *config.Config, name string) (BucketHandle, error) {
38+
if cfg.UseGCS {
39+
return NewGCSBucket(ctx, cfg.ProjectID, name)
40+
}
41+
return NewFSBucket(ctx, cfg.LocalStorage, name)
42+
}

0 commit comments

Comments
 (0)