From 3eedc41f4b2c439e44b41a3e7678de6cb5861f77 Mon Sep 17 00:00:00 2001 From: Huayu Ouyang Date: Thu, 16 Apr 2026 19:50:46 +0000 Subject: [PATCH 1/3] SERVER-119445 TSBS changes for failover dsi task --- cmd/tsbs_load_mongo/document_per_loader.go | 37 +++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/cmd/tsbs_load_mongo/document_per_loader.go b/cmd/tsbs_load_mongo/document_per_loader.go index ab6fc99b2..0a6654014 100644 --- a/cmd/tsbs_load_mongo/document_per_loader.go +++ b/cmd/tsbs_load_mongo/document_per_loader.go @@ -51,6 +51,35 @@ func (p *naiveProcessor) Init(_ int, doLoad, _ bool) { p.pvs = []interface{}{} } +func withRetry(ctx context.Context, maxAttempts int, op func(context.Context) error) error { + backoff := 100 * time.Millisecond + + for attempt := 1; ; attempt++ { + err := op(ctx) + if err == nil { + return nil + } + + // Stop on non‑retryable errors (optional but recommended) + //var le mongo.LabeledError + //if !errors.As(err, &le) || !le.HasErrorLabel("RetryableError") && !le.HasErrorLabel("RetryableWriteError") { + // return err + //} + + // Give up if context is done or attempts exhausted + if ctx.Err() != nil || (maxAttempts > 0 && attempt >= maxAttempts) { + if ctx.Err() != nil { + return ctx.Err() + } + return err + } + + // Simple exponential backoff + time.Sleep(backoff) + backoff *= 2 + } +} + // ProcessBatch creates a new document for each incoming event for a simpler // approach to storing the data. This is _NOT_ the default since the aggregation method // is recommended by Mongo and other blogs @@ -105,7 +134,13 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin if doLoad { opts := options.InsertMany().SetOrdered(orderedInserts) - _, err := p.collection.InsertMany(context.Background(), p.pvs, opts) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + //_, err := p.collection.InsertMany(ctx, p.pvs, opts) + err := withRetry(ctx, 0, func(c context.Context) error { + _, err := p.collection.InsertMany(ctx, p.pvs, opts) + return err + }) if err != nil { log.Fatalf("Bulk insert docs err: %s\n", err.Error()) } From 4faa469a2549c18f852e4524ff307835edf86d04 Mon Sep 17 00:00:00 2001 From: Huayu Ouyang Date: Thu, 30 Apr 2026 15:49:15 +0000 Subject: [PATCH 2/3] SERVER-119445 WIP to add dsi failover test --- cmd/tsbs_load_mongo/creator.go | 10 +- cmd/tsbs_load_mongo/document_per_loader.go | 433 +++++++++++++++++++-- cmd/tsbs_load_mongo/load_error.go | 38 ++ cmd/tsbs_load_mongo/main.go | 21 + pkg/targets/mongo/implemented_target.go | 3 + 5 files changed, 474 insertions(+), 31 deletions(-) create mode 100644 cmd/tsbs_load_mongo/load_error.go diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index 7b559f47b..dbde6364b 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "strings" + "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -17,7 +18,14 @@ type dbCreator struct { func (d *dbCreator) Init() { var err error - opts := options.Client().ApplyURI(daemonURL).SetSocketTimeout(writeTimeout).SetRetryWrites(retryableWrites) + serverSelectionTimeout := 30 * time.Second + opts := options.Client().ApplyURI(daemonURL). + SetSocketTimeout(socketTimeout). + SetServerSelectionTimeout(serverSelectionTimeout). + SetRetryWrites(retryableWrites) + + log.Printf("mongo client: retryWrites=%t socketTimeout=%s serverSelectionTimeout=%s writeTimeout=%s maxPerWriteRetryTime=%s deterministicIDs=%t", + retryableWrites, socketTimeout, serverSelectionTimeout, writeTimeout, maxPerWriteRetryTime, deterministicIDs) d.client, err = mongo.Connect(context.Background(), opts) if err != nil { log.Fatal(err) diff --git a/cmd/tsbs_load_mongo/document_per_loader.go b/cmd/tsbs_load_mongo/document_per_loader.go index 0a6654014..92fdd945b 100644 --- a/cmd/tsbs_load_mongo/document_per_loader.go +++ b/cmd/tsbs_load_mongo/document_per_loader.go @@ -2,11 +2,17 @@ package main import ( "context" + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" "log" + "strings" "sync" "time" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -37,6 +43,45 @@ type singlePoint map[string]interface{} var spPool = &sync.Pool{New: func() interface{} { return &singlePoint{} }} +// makeDeterministicID builds a 12-byte _id from the identity fields of a +// generated event: (host, measurement, timestamp_ns). This matches the wire +// size of the default Mongo ObjectID (12 bytes) so the primary-key index +// shape is unchanged vs. server-generated ids, while making inserts +// idempotent: a retry after a partial success surfaces each already-persisted +// document as an E11000 dup-key, which remainingDocsAfterPartialFailure then +// silently filters out. +// +// All TSBS generators (devops, iot) produce events where (host, measurement, +// timestamp_ns) is unique by construction, so SHA-256 truncated to 96 bits is +// vastly below any practical collision risk for workloads of this size. +func makeDeterministicID(host, measurement string, timestampNS int64) primitive.Binary { + var tsBuf [8]byte + binary.BigEndian.PutUint64(tsBuf[:], uint64(timestampNS)) + h := sha256.New() + h.Write([]byte(host)) + h.Write([]byte{0}) + h.Write([]byte(measurement)) + h.Write([]byte{0}) + h.Write(tsBuf[:]) + sum := h.Sum(nil) + return primitive.Binary{Subtype: 0x00, Data: sum[:12]} +} + +// extractIdentityTag returns the value of the identity tag for the event +// (hostname for devops, name for iot). Empty string if neither is present, +// which is survivable because the _id still encodes measurement+timestamp. +func extractIdentityTag(event *tsbsMongo.MongoPoint) string { + t := &tsbsMongo.MongoTag{} + for j := 0; j < event.TagsLength(); j++ { + event.Tags(t, j) + key := string(t.Key()) + if key == "hostname" || key == "name" { + return string(t.Value()) + } + } + return "" +} + type naiveProcessor struct { dbc *dbCreator collection *mongo.Collection @@ -51,39 +96,316 @@ func (p *naiveProcessor) Init(_ int, doLoad, _ bool) { p.pvs = []interface{}{} } -func withRetry(ctx context.Context, maxAttempts int, op func(context.Context) error) error { - backoff := 100 * time.Millisecond - for attempt := 1; ; attempt++ { - err := op(ctx) - if err == nil { - return nil - } +// retryableStepdownCodes lists MongoDB server error codes that signal a +// transient replica-set state change (primary failover, stepdown, shutdown, +// etc.) or other retryable server condition. These are the classic codes that +// appear when a primary steps down and a secondary is promoted mid-write. +// See: https://www.mongodb.com/docs/manual/core/retryable-writes/ +var retryableStepdownCodes = map[int]struct{}{ + 6: {}, // HostUnreachable + 7: {}, // HostNotFound + 89: {}, // NetworkTimeout + 91: {}, // ShutdownInProgress + 189: {}, // PrimarySteppedDown + 262: {}, // ExceededTimeLimit + 9001: {}, // SocketException + 10107: {}, // NotWritablePrimary (formerly NotMaster) + 11600: {}, // InterruptedAtShutdown + 11602: {}, // InterruptedDueToReplStateChange + 13435: {}, // NotPrimaryNoSecondaryOk + 13436: {}, // NotPrimaryOrSecondary + 63: {}, // StaleShardVersion + 150: {}, // StaleEpoch + 13388: {}, // StaleConfig + 234: {}, // RetryChangeStream + 133: {}, // FailedToSatisfyReadPreference +} + +// isRetryable reports whether err is a transient error that can plausibly +// succeed on a later attempt, e.g. a network blip or a replica-set failover. +// Non-retryable errors (validation, auth, duplicate key, etc.) return false +// so the caller fails fast rather than burning the entire retry budget. +func isRetryable(err error) bool { + if err == nil { + return false + } + // Respect caller cancellation / deadline explicitly. + if errors.Is(err, context.Canceled) { + return false + } + // A recovered driver panic is only retryable when (a) we've opted into + // deterministic _ids (so retries are idempotent) AND (b) the panic matches + // the specific known result[:idIndex] bug signature. + if pe, ok := err.(*driverPanicError); ok { + if deterministicIDs && isKnownInsertManyResultSlicePanic(pe.value) { + return true + } + return false + } + // context.DeadlineExceeded on the *per-attempt* context is retryable; + // on the *parent* context the caller will bail out via ctx.Err() anyway. + if errors.Is(err, context.DeadlineExceeded) { + return true + } + // Driver-level helpers: network errors and driver timeouts are retryable. + if mongo.IsNetworkError(err) || mongo.IsTimeout(err) { + return true + } + // Server-selection errors surface during a failover window when the driver + // couldn't find a primary within serverSelectionTimeout. The replica set + // will have a primary again shortly; the outer maxPerWriteRetryTime budget + // bounds how long we keep retrying. + if strings.Contains(err.Error(), "server selection error") { + return true + } + // Duplicate key is never retryable on its own (and on a retry after a + // partially-succeeded batch we handle this explicitly via + // BulkWriteException inspection below). + if mongo.IsDuplicateKeyError(err) { + return false + } + + // The driver attaches a "RetryableWriteError" label to any server error + // it considers retryable. + type labeled interface { + HasErrorLabel(string) bool + } + if le, ok := err.(labeled); ok && le.HasErrorLabel("RetryableWriteError") { + return true + } + + switch e := err.(type) { + case mongo.CommandError: + if _, ok := retryableStepdownCodes[int(e.Code)]; ok { + return true + } + case mongo.WriteException: + if e.WriteConcernError != nil { + if _, ok := retryableStepdownCodes[e.WriteConcernError.Code]; ok { + return true + } + } + for _, we := range e.WriteErrors { + if _, ok := retryableStepdownCodes[we.Code]; ok { + return true + } + } + case mongo.BulkWriteException: + if e.WriteConcernError != nil { + if _, ok := retryableStepdownCodes[e.WriteConcernError.Code]; ok { + return true + } + } + for _, we := range e.WriteErrors { + if _, ok := retryableStepdownCodes[we.WriteError.Code]; ok { + return true + } + } + } + + return false +} + +// remainingDocsAfterPartialFailure implements the partial-success recovery +// logic for InsertMany. The MongoDB wire protocol never tells us explicitly +// which documents from the input slice were persisted, so we rely on the +// documented server behaviour: +// +// - With ordered=true, the server stops at the first WriteError. All docs +// at indices < firstErrIdx were persisted; the doc at firstErrIdx failed; +// docs at indices > firstErrIdx were never attempted. +// - With ordered=false, the server attempts every doc. Every index listed +// in WriteErrors failed; every other index was persisted. +// +// In both cases we return only the documents that still need to be (re)tried, +// and we drop docs whose per-write error is non-retryable (e.g. duplicate key +// from a prior partially-successful attempt). +func remainingDocsAfterPartialFailure(docs []interface{}, bwe mongo.BulkWriteException, ordered bool) []interface{} { + if len(bwe.WriteErrors) == 0 { + return docs + } + failedIdx := make(map[int]mongo.WriteError, len(bwe.WriteErrors)) + minIdx := len(docs) + for _, we := range bwe.WriteErrors { + failedIdx[we.Index] = we.WriteError + if we.Index < minIdx { + minIdx = we.Index + } + } + + isNonRetryableWriteErr := func(we mongo.WriteError) bool { + // Duplicate keys almost always mean the doc was already inserted on a + // prior attempt - don't keep retrying it. + if we.Code == 11000 || we.Code == 11001 || we.Code == 12582 { + return true + } + // If the per-write code isn't in our retryable set, treat as terminal. + _, ok := retryableStepdownCodes[we.Code] + return !ok + } + + var remaining []interface{} + if ordered { + // Everything before the first failure is persisted; skip it. + // The doc at minIdx failed - include it only if it's retryable. + if we, ok := failedIdx[minIdx]; ok && !isNonRetryableWriteErr(we) { + remaining = append(remaining, docs[minIdx]) + } + // Everything after the first failure was never attempted; retry all. + if minIdx+1 < len(docs) { + remaining = append(remaining, docs[minIdx+1:]...) + } + } else { + for i, d := range docs { + we, failed := failedIdx[i] + if !failed { + // Persisted on this attempt; do not retry. + continue + } + if isNonRetryableWriteErr(we) { + continue + } + remaining = append(remaining, d) + } + } + return remaining +} + +// driverPanicError wraps a recovered panic from inside the mongo driver so +// callers (and isRetryable) can reason about it as a typed error. +type driverPanicError struct { + value interface{} +} + +func (e *driverPanicError) Error() string { + return fmt.Sprintf("panic in mongo InsertMany: %v", e.value) +} + +// isKnownInsertManyResultSlicePanic reports whether the recovered panic +// matches the signature of the mongo-go-driver issue in +// (*Collection).insert's WriteCommandError cleanup path, where +// `result = result[:idIndex]` (or the sibling `append(result[:idIndex], ...)`) +// can go out of range when server-reported WriteErrors indices don't line up +// with the post-retry result slice. This panic is still present in every +// released v1.x and v2.x driver version; recovering and retrying it is only +// safe when writes are idempotent (deterministic _ids) - otherwise retrying +// risks silent duplicates because some docs in the batch may already have +// been persisted server-side. +// +// We match on the runtime panic message specifically so that unrelated +// panics (nil-pointer in user code, OOM, etc.) are not treated as retryable. +func isKnownInsertManyResultSlicePanic(v interface{}) bool { + msg := fmt.Sprintf("%v", v) + if !strings.Contains(msg, "slice bounds out of range") { + return false + } + // Only the result[:idIndex] / append(result[:idIndex], ...) form uses the + // `[:N]` shape; other out-of-range panics in the same family (e.g. + // `[N:M]`) are not this bug and shouldn't be swept in. + return strings.Contains(msg, "[:-") || strings.Contains(msg, "[:") +} + +// safeInsertMany wraps (*mongo.Collection).InsertMany so that any panic +// raised inside the driver is converted into a regular error. The returned +// error is a *driverPanicError when a panic was recovered, so callers can +// distinguish driver panics from ordinary server-returned errors. +func safeInsertMany(ctx context.Context, coll *mongo.Collection, docs []interface{}, opts *options.InsertManyOptions) (err error) { + defer func() { + if r := recover(); r != nil { + err = &driverPanicError{value: r} + } + }() + _, err = coll.InsertMany(ctx, docs, opts) + return err +} + +// insertManyWithRetry runs InsertMany with retry semantics tailored for a +// replica-set failover. It: +// - bounds each attempt by attemptTimeout so a hung call cannot eat the +// total budget, +// - uses exponential backoff (starting at 100ms, capped at 5s) between +// attempts, +// - gives up once the parent context is done, +// - only retries errors classified as transient by isRetryable, and +// - on BulkWriteException inspects the per-document WriteErrors and retries +// only the documents that still need to be inserted, respecting the +// ordered/unordered semantics. +func insertManyWithRetry( + ctx context.Context, + coll *mongo.Collection, + docs []interface{}, + opts *options.InsertManyOptions, + attemptTimeout time.Duration, +) error { + ordered := true + if opts != nil && opts.Ordered != nil { + ordered = *opts.Ordered + } + + backoff := 100 * time.Millisecond + const maxBackoff = 5 * time.Second + + pending := docs + for attempt := 1; ; attempt++ { + if len(pending) == 0 { + return nil + } + attemptCtx, cancel := context.WithTimeout(ctx, attemptTimeout) + err := safeInsertMany(attemptCtx, coll, pending, opts) + cancel() + if err == nil { + return nil + } + + // If the error is a partial-failure BulkWriteException, try to narrow + // `pending` down to the docs that still need to be inserted before + // deciding whether to retry. This way a single dup-key among 1000 + // otherwise-successful docs on a post-failover retry doesn't torpedo + // the whole batch. + if bwe, ok := err.(mongo.BulkWriteException); ok { + pending = remainingDocsAfterPartialFailure(pending, bwe, ordered) + if len(pending) == 0 { + // Every remaining doc either succeeded or failed with a + // non-retryable error that we're explicitly choosing to drop. + return nil + } + } - // Stop on non‑retryable errors (optional but recommended) - //var le mongo.LabeledError - //if !errors.As(err, &le) || !le.HasErrorLabel("RetryableError") && !le.HasErrorLabel("RetryableWriteError") { - // return err - //} + if !isRetryable(err) { + return err + } + if ctx.Err() != nil { + return ctx.Err() + } - // Give up if context is done or attempts exhausted - if ctx.Err() != nil || (maxAttempts > 0 && attempt >= maxAttempts) { - if ctx.Err() != nil { - return ctx.Err() - } - return err - } + log.Printf("Transient mongo write error (attempt %d, %d docs pending): %s", attempt, len(pending), err.Error()) - // Simple exponential backoff - time.Sleep(backoff) - backoff *= 2 - } + select { + case <-time.After(backoff): + case <-ctx.Done(): + return ctx.Err() + } + if backoff < maxBackoff { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } } // ProcessBatch creates a new document for each incoming event for a simpler // approach to storing the data. This is _NOT_ the default since the aggregation method // is recommended by Mongo and other blogs func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint64) { + // If a prior batch had a terminal write error, short-circuit so workers drain fast + // and RunBenchmark can reach postRun/summary. We still consume the batch so + // the scanner's flow-control doesn't deadlock. + // if loadFailed() { + // return 0, 0 + // } + batch := b.(*batch).arr if cap(p.pvs) < len(batch) { p.pvs = make([]interface{}, len(batch)) @@ -107,12 +429,26 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin event.Tags(t, j) (*x)["tags"].(map[string]string)[string(t.Key())] = string(t.Value()) } + if deterministicIDs { + (*x)["_id"] = makeDeterministicID( + extractIdentityTag(event), + string(event.MeasurementName()), + event.Timestamp(), + ) + } p.pvs[i] = x metricCnt += uint64(event.FieldsLength()) } } else { for i, event := range batch { x := bson.D{} + if deterministicIDs { + x = append(x, bson.E{"_id", makeDeterministicID( + extractIdentityTag(event), + string(event.MeasurementName()), + event.Timestamp(), + )}) + } x = append(x, bson.E{"measurement", string(event.MeasurementName())}) x = append(x, bson.E{timestampField, time.Unix(0, event.Timestamp())}) f := &tsbsMongo.MongoReading{} @@ -134,15 +470,52 @@ func (p *naiveProcessor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uin if doLoad { opts := options.InsertMany().SetOrdered(orderedInserts) - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - //_, err := p.collection.InsertMany(ctx, p.pvs, opts) - err := withRetry(ctx, 0, func(c context.Context) error { - _, err := p.collection.InsertMany(ctx, p.pvs, opts) - return err - }) + // Each individual attempt is bounded by the configurable TSBS parameter writeTimeout (defaults to 30s). + attemptTimeout := writeTimeout + if attemptTimeout <= 0 { + attemptTimeout = 30 * time.Second + } + + // If maxPerWriteRetryTime is set, we use it to bound the total time spent retrying. Otherwise, we use the attemptTimeout. + var ( + ctx = context.Background() + cancel context.CancelFunc + ) + if maxPerWriteRetryTime > 0 { + ctx, cancel = context.WithTimeout(ctx, maxPerWriteRetryTime) + } else { + ctx, cancel = context.WithTimeout(ctx, attemptTimeout) + } + var err error + if maxPerWriteRetryTime > 0 { + err = insertManyWithRetry(ctx, p.collection, p.pvs, opts, attemptTimeout) + } else { + err = safeInsertMany(ctx, p.collection, p.pvs, opts) + } + cancel() if err != nil { + log.Fatalf("Bulk insert docs err: %s\n", err.Error()) + + // code that tests not fatally logging during workload. + // if maxPerWriteRetryTime > 0 { + // // Record the failure and stop counting metrics for this batch, + // // but do NOT kill the process: RunBenchmark must still reach + // // postRun so the "N workers (mean rate X metrics/sec)" summary + // // line is printed. main() will exit non-zero after that. + // log.Printf("Bulk insert docs err: %s\n", err.Error()) + // setLoadFailed(err) + // for _, pv := range p.pvs { + // spPool.Put(pv) + // } + // return 0, 0 + + // } else { + // log.Fatalf("Bulk insert docs err: %s\n", err.Error()) + // } + + + } } for _, p := range p.pvs { diff --git a/cmd/tsbs_load_mongo/load_error.go b/cmd/tsbs_load_mongo/load_error.go new file mode 100644 index 000000000..103de5447 --- /dev/null +++ b/cmd/tsbs_load_mongo/load_error.go @@ -0,0 +1,38 @@ +package main + +import ( + "sync" + "sync/atomic" +) + +// loadErrState tracks whether any worker has hit a terminal write error. +// We deliberately avoid log.Fatalf from inside worker goroutines: that calls +// os.Exit immediately and prevents CommonBenchmarkRunner.postRun from printing +// the summary line ("N workers (mean rate X metrics/sec)") that downstream +// result parsers (e.g. DSI's TsbsLoadResultParser) rely on. Instead, workers +// record the first error here, return fast so RunBenchmark can shut down +// cleanly, and main() exits non-zero once the summary has been written. +var loadErrState struct { + flag uint32 + mu sync.Mutex + err error +} + +func setLoadFailed(err error) { + loadErrState.mu.Lock() + if loadErrState.err == nil { + loadErrState.err = err + } + loadErrState.mu.Unlock() + atomic.StoreUint32(&loadErrState.flag, 1) +} + +func loadFailed() bool { + return atomic.LoadUint32(&loadErrState.flag) == 1 +} + +func loadError() error { + loadErrState.mu.Lock() + defer loadErrState.mu.Unlock() + return loadErrState.err +} diff --git a/cmd/tsbs_load_mongo/main.go b/cmd/tsbs_load_mongo/main.go index 6d184ace7..9f048c26f 100644 --- a/cmd/tsbs_load_mongo/main.go +++ b/cmd/tsbs_load_mongo/main.go @@ -6,6 +6,7 @@ package main import ( "fmt" "log" + "os" "time" "github.com/blagojts/viper" @@ -31,8 +32,11 @@ var ( daemonURL string documentPer bool writeTimeout time.Duration + socketTimeout time.Duration + maxPerWriteRetryTime time.Duration timeseriesCollection bool retryableWrites bool + deterministicIDs bool orderedInserts bool randomFieldOrder bool batchMetaFields bool @@ -72,9 +76,12 @@ func init() { daemonURL = viper.GetString("url") writeTimeout = viper.GetDuration("write-timeout") + socketTimeout = viper.GetDuration("socket-timeout") + maxPerWriteRetryTime = viper.GetDuration("max-per-write-retry-time") documentPer = viper.GetBool("document-per-event") timeseriesCollection = viper.GetBool("timeseries-collection") retryableWrites = viper.GetBool("retryable-writes") + deterministicIDs = viper.GetBool("deterministic-ids") orderedInserts = viper.GetBool("ordered-inserts") randomFieldOrder = viper.GetBool("random-field-order") batchMetaFields = viper.GetBool("batch-meta-fields") @@ -101,6 +108,10 @@ func init() { log.Fatal("Must set document-per-event=true in order to use timeseries-collection=true") } + if !documentPer && deterministicIDs { + log.Fatal("deterministic-ids is only supported with document-per-event=true") + } + if !timeseriesCollection && batchMetaFields { log.Fatal("Must set document-per-event=true and timeseries-collection=true in order to use batch-meta-fields=true") } @@ -125,4 +136,14 @@ func main() { } loader.RunBenchmark(benchmark) + + // code for the not fatal logging workaround + + // If any worker recorded a terminal write error exit non-zero now that the summary line has + // already been printed. This keeps downstream result parsers happy while still + // surfacing the failure to callers that inspect the exit code. + if err := loadError(); err != nil { + log.Printf("tsbs_load_mongo completed with load errors: %s", err.Error()) + os.Exit(3) + } } diff --git a/pkg/targets/mongo/implemented_target.go b/pkg/targets/mongo/implemented_target.go index 44f62913a..7c997550c 100644 --- a/pkg/targets/mongo/implemented_target.go +++ b/pkg/targets/mongo/implemented_target.go @@ -21,9 +21,12 @@ type mongoTarget struct { func (t *mongoTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.FlagSet) { flagSet.String(flagPrefix+"url", "mongodb://localhost:27017/", "Mongo URL.") flagSet.Duration(flagPrefix+"write-timeout", 10*time.Second, "Write timeout.") + flagSet.Duration(flagPrefix+"socket-timeout", 10*time.Second, "Mongo client SocketTimeout. Bounds how long the driver waits on a single socket read/write before giving up.") flagSet.Bool(flagPrefix+"document-per-event", false, "Whether to use one document per event or aggregate by hour") flagSet.Bool(flagPrefix+"timeseries-collection", false, "Whether to use a time-series collection") flagSet.Bool(flagPrefix+"retryable-writes", true, "Whether to use retryable writes") + flagSet.Duration(flagPrefix+"max-per-write-retry-time", 0, "Total wall-clock time to keep retrying a failed write (e.g. across a replica-set failover) before giving up, as a Go duration (e.g. 30s, 5m). Set to 0 to disable retries.") + flagSet.Bool(flagPrefix+"deterministic-ids", false, "Assign a deterministic _id (SHA256-prefix of host+measurement+timestamp_ns) to each document so writes are idempotent across retries. Enables recovery from driver panics and partial-batch failures during replica-set failovers, at the cost of added client-side CPU and changed primary-key index shape vs. the default server-generated ObjectID. Only applies with document-per-event=true.") flagSet.Bool(flagPrefix+"ordered-inserts", true, "Whether to use ordered inserts") flagSet.Bool(flagPrefix+"random-field-order", true, "Whether to use random field order") flagSet.Bool(flagPrefix+"batch-meta-fields", true, "Whether to use ensure batches of data have the same meta field") From a899ca897b0f784db10d40992916918dbae29e46 Mon Sep 17 00:00:00 2001 From: Huayu Ouyang Date: Thu, 14 May 2026 17:39:54 +0000 Subject: [PATCH 3/3] make serverSelectionTimeout configurable --- cmd/tsbs_load_mongo/creator.go | 2 -- cmd/tsbs_load_mongo/main.go | 12 +++++++----- pkg/targets/mongo/implemented_target.go | 1 + 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cmd/tsbs_load_mongo/creator.go b/cmd/tsbs_load_mongo/creator.go index dbde6364b..032528b75 100644 --- a/cmd/tsbs_load_mongo/creator.go +++ b/cmd/tsbs_load_mongo/creator.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "strings" - "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -18,7 +17,6 @@ type dbCreator struct { func (d *dbCreator) Init() { var err error - serverSelectionTimeout := 30 * time.Second opts := options.Client().ApplyURI(daemonURL). SetSocketTimeout(socketTimeout). SetServerSelectionTimeout(serverSelectionTimeout). diff --git a/cmd/tsbs_load_mongo/main.go b/cmd/tsbs_load_mongo/main.go index 9f048c26f..f0a6f6701 100644 --- a/cmd/tsbs_load_mongo/main.go +++ b/cmd/tsbs_load_mongo/main.go @@ -29,11 +29,12 @@ const ( // Program option vars: var ( - daemonURL string - documentPer bool - writeTimeout time.Duration - socketTimeout time.Duration - maxPerWriteRetryTime time.Duration + daemonURL string + documentPer bool + writeTimeout time.Duration + socketTimeout time.Duration + serverSelectionTimeout time.Duration + maxPerWriteRetryTime time.Duration timeseriesCollection bool retryableWrites bool deterministicIDs bool @@ -77,6 +78,7 @@ func init() { daemonURL = viper.GetString("url") writeTimeout = viper.GetDuration("write-timeout") socketTimeout = viper.GetDuration("socket-timeout") + serverSelectionTimeout = viper.GetDuration("server-selection-timeout") maxPerWriteRetryTime = viper.GetDuration("max-per-write-retry-time") documentPer = viper.GetBool("document-per-event") timeseriesCollection = viper.GetBool("timeseries-collection") diff --git a/pkg/targets/mongo/implemented_target.go b/pkg/targets/mongo/implemented_target.go index 7c997550c..573130333 100644 --- a/pkg/targets/mongo/implemented_target.go +++ b/pkg/targets/mongo/implemented_target.go @@ -22,6 +22,7 @@ func (t *mongoTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.Flag flagSet.String(flagPrefix+"url", "mongodb://localhost:27017/", "Mongo URL.") flagSet.Duration(flagPrefix+"write-timeout", 10*time.Second, "Write timeout.") flagSet.Duration(flagPrefix+"socket-timeout", 10*time.Second, "Mongo client SocketTimeout. Bounds how long the driver waits on a single socket read/write before giving up.") + flagSet.Duration(flagPrefix+"server-selection-timeout", 90*time.Second, "Mongo client ServerSelectionTimeout. How long the driver waits to find a suitable server (e.g. a primary) before returning an error. Must be >= expected replica-set election time.") flagSet.Bool(flagPrefix+"document-per-event", false, "Whether to use one document per event or aggregate by hour") flagSet.Bool(flagPrefix+"timeseries-collection", false, "Whether to use a time-series collection") flagSet.Bool(flagPrefix+"retryable-writes", true, "Whether to use retryable writes")