Skip to content

Conversation

@kosiew
Copy link
Contributor

@kosiew kosiew commented Nov 1, 2025

Which issue does this PR close?

Rationale for this change

The existing reactive pipeline implementation in samber/ro introduced non-trivial runtime overhead due to always-on panic recovery and lock-based synchronization across observers. This limited throughput for high-volume data streams like those tested in the 1M and 1B row benchmarks.

This PR introduces:

  • A new ConcurrencyModeSingleProducer mode for lock-free single-writer scenarios.
  • A per-subscription opt-out of panic capture, reducing defer/recover overhead.
  • Comprehensive benchmarking examples and harnesses to measure real-world throughput.

These changes aim to demonstrate measurable runtime gains while retaining default safety for general-purpose use.

What changes are included in this PR?

  • Core enhancements:

    • Added ConcurrencyModeSingleProducer to observable.go and subscriber.go, allowing hot-path operation without locks.
    • Introduced WithObserverPanicCaptureDisabled(ctx) and supporting helpers for per-context panic-capture opt-out.
    • Updated SubscribeWithContext to skip panic wrappers when context disables capture.
  • New constructors:

    • NewSingleProducerObservable[WithContext] and NewSingleProducerSubscriber for optimized single-writer pipelines.
    • Unsafe observer constructors: NewObserverUnsafe, NewObserverWithContextUnsafe.
  • Documentation updates:

    • Added new sections under core/observer.md and troubleshooting/performance.md explaining concurrency modes and panic-capture opt-out.
  • Benchmarks and examples:

    • Added examples/billion-rows-benchmark/ with fixture expansion scripts and Go benchmark harness.
    • Introduced bench-1m target in Makefile for generating a 1M-row dataset and running the benchmark.
    • Added testing/benchmark_million_rows_test.go with detailed throughput comparisons.
    • Added subscriber_bench_test.go to microbenchmark subscriber hot-paths.
  • Testing:

    • Extended observer and subscriber tests to validate unsafe paths, context-based opt-out, and single-producer correctness.
    • Added race flag stubs (raceflag_race.go, raceflag_norace.go) for reproducible race detection tests.

Are these changes tested?

✅ Yes.
Comprehensive unit and benchmark tests have been added:

  • Observer and subscriber panic behavior tests.
  • Lockless single-producer correctness and dropped notification validation.
  • Concurrency stress tests for both safe and single-producer subscribers.
  • New end-to-end benchmarks covering multiple concurrency modes.

Are there any user-facing changes?

Yes — new APIs and optional flags:

  • ro.WithObserverPanicCaptureDisabled(ctx) allows disabling panic capture per subscription.
  • ro.NewSingleProducerObservable / ro.NewSingleProducerSubscriber offer optimized variants.
  • Default safety behavior remains unchanged; new features are strictly opt-in.

Documentation

New documentation pages and examples explain how and when to use these optimizations safely.

kosiew added 19 commits November 1, 2025 22:21
Introduce SetCaptureObserverPanics for flagging panic
recovery behavior at the observer level. Unsafe observables
now bypass the subscription-level panic catcher when
disabled, allowing errors to propagate directly. Added
regression tests for disabling panic capture and a million-row
benchmark to assess throughput. Document API changes and
safety trade-offs for advanced users.
Implement a shared panicCaptureGuard to serialize observer
panic-handling tests and ensure the panic-capture toggle checks
are consistent. This prevents mid-change observation of the
global flag.

Wrap observable panic-recovery scenarios with the same guard
to maintain panic capture during execution.
…ith ro.Pipe2 so the snippet aligns with the API and the surrounding explanation remains consistent.
Introduce guardObserverPanicCapture to toggle
CaptureObserverPanics while holding the guard.
Restore the previous state before unlocking to
ensure thread safety.

Update panic-capture tests to utilize the new
helper, ensuring the global flag resets while
the mutex is still locked.
Temporarily disable observer panic capture during the
BenchmarkMillionRowChallenge setup. Added cleanup to
restore the previous setting, ensuring the benchmark
exercises the fast path effectively.
Convert panic capture to use sync.RWMutex and add
withObserverPanicCaptureEnabled to allow shared
locks during panic recovery verification. Update
observer and observable tests to utilize the shared
helper, ensuring execution only when panic capture
is active. Wrap operator suites to assert on
recovered panics, preventing execution when the
global panic-capture flag is off.
Introduce ConcurrencyModeSingleProducer and new observable/subscriber
constructors to create lockless pipelines with panic handling. Switch
Range and RangeWithStep to single-producer mode, enhancing subscriber
tests for the lockless path. Expand the million-row benchmark matrix
to compare subscriber modes and document performance trade-offs
for users.
Extract RangeWithMode in operator_creation.go to eliminate
duplicate range logic and provide a helper for tests.
Replace newRangeForBenchmark with RangeWithMode in
benchmark_million_rows_test.go. Add unit tests to verify
behavior of panic capturing and dropped notifications.
TestObserverPanicCaptureSnapshotting (already added earlier)
TestLocklessDroppedNotification (already added earlier)
TestSingleProducerStress (sequential stress for single-producer path)
TestSafeSubscriberConcurrentStress (concurrent stress for Safe subscriber)
Extracted RangeWithMode and replaced the duplicate helper in the benchmark.
Added EventuallySafe case to the million-row benchmark.
Add detailed commentary above panicCaptureGuard to explain
the usage of RWMutex for allowing multiple readers while
restricting to exclusive writers. Clarify the interaction
between withObserverPanicCaptureEnabled and
guardObserverPanicCapture. Update inline comments in
withObserverPanicCaptureEnabled and guardObserverPanicCapture
to make locking intent clear, and ensure read lock is
released before calling t.Fatal to prevent potential
deadlocks.
Clarify the rationale for unexported-global variable
observerPanicCaptureEnabled. Mention related components like
panicCaptureGuard and the exported helpers
SetCaptureObserverPanics / CaptureObserverPanics for
better understanding.
Clarify that SetCaptureObserverPanics() modifies global mutable
state. Note that the flag is sampled at Observer construction,
and existing observers maintain their captured value. Recommend
setting it at startup for benchmarks. Direct tests to
panicCaptureGuard for safe toggling. Ran gofmt and all tests
passed successfully.
Introduce ExampleSetCaptureObserverPanics in observer_test.go.
This example saves the previous value, disables capture, and
defers restoration. It also demonstrates the observer's
captured value, showing consistent output of "false false"
to illustrate stability after modifying the default setting.
Expand the comment block for ConcurrencyModeSingleProducer
to provide clear guidance on its optimization for a
single goroutine emitter. Mention reliance on atomics,
avoidance of mutexes, and the unsafety with
multi-producer operators.
@kosiew kosiew changed the title Add global panic-capture toggle to optimize high-throughput pipelines and benchmark million-row performance Add single-producer concurrency mode and global panic-capture toggle for high-throughput observables Nov 3, 2025
@kosiew kosiew marked this pull request as ready for review November 3, 2025 13:06
return newSubscriberImpl(mode, xsync.NewMutexWithLock(), BackpressureBlock, destination, false)
case ConcurrencyModeUnsafe:
return newSubscriberImpl(mode, xsync.NewMutexWithoutLock(), BackpressureBlock, destination)
return newSubscriberImpl(mode, xsync.NewMutexWithoutLock(), BackpressureBlock, destination, false)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewMutexWithoutLock is basically a mock for a no-op mutex.

According to your code, the ConcurrencyModeSingleProducer case is creating the same subscriber than ConcurrencyModeUnsafe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch and the motivation behind the concern is correct.

I will change the ConcurrencyModeSingleProducer behavior so it's not the same as ConcurrencyModeUnsafe.

Specifically:

ConcurrencyModeUnsafe still uses a no-op mutex via xsync.NewMutexWithoutLock() and lockless=false. This preserves the same API shape (a mutex object) but avoids actual locking.
ConcurrencyModeSingleProducer now uses the lockless fast-path: mu == nil and lockless == true. That lets the subscriberImpl skip Lock/Unlock calls entirely and use atomic status checks directly.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not sure to understand the difference between ConcurrencyModeUnsafe and ConcurrencyModeSingleProducer. Except it uses an if instead of no-op mutex lock

// from the provided file path. It emits each parsed value and completes.
// This is intentionally simple: the observable reads the file synchronously
// on subscribe and emits values to the destination observer.
func csvSource(path string) ro.Observable[int64] {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For max speed, the syscall.Mmap syscall is recommended 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

direct syscall.Mmap requires OS-specific code and careful unmapping.
Shall I go with x/exp/mmap which provides a simpler ReaderAt wrapper and is cross-platform?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, definitely!

I did not know there was an experimental implementation!

// wrapper to avoid extra allocations on the hot path. Callers should use
// `WithObserverPanicCaptureDisabled(ctx)` when subscribing in
// performance-sensitive code; there is no longer a global toggle.
if isObserverPanicCaptureDisabled(ctx) && (s.mode == ConcurrencyModeUnsafe || s.mode == ConcurrencyModeSingleProducer) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the choice to force-enable the panic capture for safe observables ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the subscription path, we only skip the TryCatchWithErrorValue wrapper when panic capture is turned off and the observable runs in an explicitly unsafe or single-producer mode.

Safe and “eventually safe” modes are meant for cases where multiple goroutines might emit at once. Those always keep the panic guard on — even if capture is disabled — because a panic during a shared lock could crash the whole fan-in. The guard catches the panic, reports it, releases the lock, and shuts things down cleanly.

Only single-threaded modes can safely drop the guard for maximum speed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you agree with this, I will add the above to the code comment too.

observer.go Outdated
// panic-recovery. Use this only in performance-sensitive paths where callers
// guarantee no panics or want panics to propagate to the caller. This mirrors
// the repository's "unsafe" naming for performance-optimized constructors.
func NewObserverUnsafe[T any](onNext func(value T), onError func(err error), onComplete func()) Observer[T] {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"NewObserverUnsafe" -> "NewUnsafeObserver" to match the "NewUnsafeObservable" naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change it.

}

func (o *observerImpl[T]) tryNext(ctx context.Context, value T) {
if !o.capturePanics || isObserverPanicCaptureDisabled(ctx) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A context lookup for each value might be expensive. Can we perform this lookup at subscription time ?

I don't expect such a behavior to change in stream lifetime. (i might be wrong)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will implement your suggestion.

return newSubscriberImpl(mode, xsync.NewMutexWithLock(), BackpressureBlock, destination, false)
case ConcurrencyModeUnsafe:
return newSubscriberImpl(mode, xsync.NewMutexWithoutLock(), BackpressureBlock, destination)
return newSubscriberImpl(mode, xsync.NewMutexWithoutLock(), BackpressureBlock, destination, false)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not sure to understand the difference between ConcurrencyModeUnsafe and ConcurrencyModeSingleProducer. Except it uses an if instead of no-op mutex lock

@kosiew kosiew marked this pull request as draft November 6, 2025 06:56
@codecov-commenter
Copy link

codecov-commenter commented Nov 6, 2025

Codecov Report

❌ Patch coverage is 85.04274% with 35 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.83%. Comparing base (7ab54e9) to head (2aeed5f).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
observer.go 81.92% 11 Missing and 4 partials ⚠️
subscriber.go 85.71% 7 Missing and 5 partials ⚠️
ro.go 76.00% 4 Missing and 2 partials ⚠️
observable.go 92.85% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #182      +/-   ##
==========================================
+ Coverage   67.83%   68.83%   +1.00%     
==========================================
  Files          76       77       +1     
  Lines        8361     8774     +413     
==========================================
+ Hits         5672     6040     +368     
- Misses       2604     2625      +21     
- Partials       85      109      +24     
Flag Coverage Δ
unittests 68.83% <85.04%> (+1.00%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@kosiew
Copy link
Contributor Author

kosiew commented Nov 7, 2025

difference between ConcurrencyModeUnsafe and ConcurrencyModeSingleProducer.

NewSubscriberWithConcurrencyMode picks a sync strategy based on the chosen mode.

Unsafe mode uses a dummy mutex (xsync.NewMutexWithoutLock()), so Lock and Unlock are no-ops. We still follow the same call pattern as the safe version, which keeps code expectations consistent but skips real locking overhead.

Single-producer mode drops the mutex entirely and enables a lockless fast path. It relies only on atomic checks, making it the fastest when a single goroutine emits.

Because these modes behave differently — one fakes a lock, the other truly skips it — both are kept so callers can choose the right balance between safety and speed.

Replace buffered file reading in benchmark CSV source with a
memory-mapped implementation. This approach iterates over
newline-delimited integers from the mapped slice, trims
trailing carriage returns, and ensures immediate completion
when the file is empty.
@kosiew kosiew force-pushed the performance-benchmark-8 branch from ee7ab47 to 3f6db02 Compare November 7, 2025 10:25
@kosiew kosiew force-pushed the performance-benchmark-8 branch from 8020f64 to 2aeed5f Compare November 8, 2025 08:22
@kosiew kosiew marked this pull request as ready for review November 8, 2025 08:28
@kosiew kosiew requested a review from samber November 11, 2025 03:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Benchmark: create a "billion row challenge" benchmark

3 participants