Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
554a30a
Add observer panic handling enhancements
kosiew Nov 1, 2025
8222cc6
Add shared panicCaptureGuard for observer tests
kosiew Nov 1, 2025
964fc95
Updated the panic-capture opt-out example to construct the pipeline w…
kosiew Nov 1, 2025
8873a69
Add guardObserverPanicCapture to manage observer state
kosiew Nov 1, 2025
1c49abe
Disable observer panic capture for benchmark setup
kosiew Nov 1, 2025
058ddb2
Refactor panic capture with RWMutex for testing
kosiew Nov 1, 2025
6abd2b2
Add single-producer mode and optimize benchmarks
kosiew Nov 3, 2025
72fd487
Document global panic capture toggle and its impact on performance modes
kosiew Nov 3, 2025
9202bd4
Refactor range logic and add new unit tests
kosiew Nov 3, 2025
ce46665
Added three tests:
kosiew Nov 3, 2025
8acff62
Improve comments in observer_test.go for clarity
kosiew Nov 3, 2025
ea8774c
Add comment to explain observerPanicCaptureEnabled
kosiew Nov 3, 2025
cc14a3a
Add godoc warning to SetCaptureObserverPanics()
kosiew Nov 3, 2025
5280835
Add example for SetCaptureObserverPanics
kosiew Nov 3, 2025
3f51535
Enhance ConcurrencyMode comment clarity
kosiew Nov 3, 2025
5f6a45f
Add test for single-producer with multi-producer operator to demonstr…
kosiew Nov 3, 2025
c650a1a
Add test for single-producer context cancellation to verify observer …
kosiew Nov 3, 2025
ccb08cc
Add race detection support for operator creation tests
kosiew Nov 3, 2025
02d3592
Refactor observer panic capture to use atomic operations for compatib…
kosiew Nov 3, 2025
59bbc50
Enhance comments for concurrency modes in subscriber implementation t…
kosiew Nov 5, 2025
45ec665
Implement benchmarks for subscriber concurrency modes and panic captu…
kosiew Nov 5, 2025
7780620
Refactor error handling in observer methods to use lo.TryCatchWithErr…
kosiew Nov 5, 2025
8c2d374
Remove BenchmarkSubscriberPanicCapture and unused import for cleaner …
kosiew Nov 5, 2025
a707227
Add context-based opt-out for observer panic capture to improve perfo…
kosiew Nov 5, 2025
ace0826
Add unsafe observer constructors to improve performance in high-throu…
kosiew Nov 5, 2025
c0640ca
Add billion-rows benchmark example with CSV source and fixture expans…
kosiew Nov 5, 2025
42f0c59
perf: per-subscription panic-capture opt-out; add NewObserverUnsafe; …
kosiew Nov 5, 2025
8ce76bd
refactor: remove global panic capture toggle; update tests to use per…
kosiew Nov 5, 2025
3124b04
refactor: remove global panic capture toggle; update documentation to…
kosiew Nov 5, 2025
4c7fae5
docs: update panic capture documentation to emphasize per-subscriptio…
kosiew Nov 5, 2025
72515d3
docs: clarify per-subscription opt-out for panic capture in Observer …
kosiew Nov 5, 2025
1532650
docs: remove reference to global toggle in SubscribeWithContext comments
kosiew Nov 6, 2025
d17e7a4
refactor: rename NewObserverUnsafe to NewUnsafeObserver for consistency
kosiew Nov 6, 2025
9af8b11
refactor: optimize panic capture handling and direct call paths in su…
kosiew Nov 6, 2025
3f6db02
Optimize benchmark CSV reading with memory mapping
kosiew Nov 7, 2025
b431ef6
lint fix
kosiew Nov 7, 2025
1cabfba
test: add comprehensive tests for observer and subscriber implementat…
kosiew Nov 7, 2025
4211efb
feat: enhance context handling in connectable observable implementation
kosiew Nov 7, 2025
f3c1731
refactor: replace lo.TryCatchWithErrorValue with defer-recover patter…
kosiew Nov 7, 2025
23ced5a
refactor: add WithDroppedNotification helper to manage global hook sa…
kosiew Nov 8, 2025
09abdab
test: add note to avoid parallel execution in TestLocklessDroppedNoti…
kosiew Nov 8, 2025
055ae57
lint fix
kosiew Nov 8, 2025
d6405ab
refactor: revert panic handling with lo.TryCatchWithErrorValue for im…
kosiew Nov 8, 2025
9a7bbc8
Implement per-subscription wrapper change
kosiew Nov 8, 2025
e06ffa7
refactor: optimize panic handling by removing inline defer/recover wr…
kosiew Nov 8, 2025
6992320
lint fix
kosiew Nov 8, 2025
12e3c79
test: add test for subscriber panic propagation with capture=false
kosiew Nov 8, 2025
b1cc37a
refactor: replace global error handlers with atomic storage for concu…
kosiew Nov 8, 2025
2aeed5f
refactor: update WithDroppedNotification to use getter/setter for OnD…
kosiew Nov 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ weight:

doc:
cd docs && npm install && npm start

bench-1m:
@echo "Generating 1M fixture for examples/billion-rows-benchmark..."
@mkdir -p examples/billion-rows-benchmark/fixtures
@./examples/billion-rows-benchmark/scripts/expand_fixture.sh examples/billion-rows-benchmark/fixtures/sample.csv examples/billion-rows-benchmark/fixtures/1m.csv 1000000
@echo "Running 1M benchmark (may take a while)..."
FIXTURE_PATH=$(CURDIR)/examples/billion-rows-benchmark/fixtures/1m.csv go test -run=^$ -bench BenchmarkMillionRowChallenge -benchmem ./examples/billion-rows-benchmark
94 changes: 94 additions & 0 deletions connectable_constructors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2025 samber.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://github.com/samber/ro/blob/main/licenses/LICENSE.apache.md
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ro

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewConnectableObservableWithContext(t *testing.T) {
t.Parallel()
is := assert.New(t)

var ctxReceived context.Context
connectable := NewConnectableObservableWithContext(func(ctx context.Context, destination Observer[int]) Teardown {
ctxReceived = ctx
destination.NextWithContext(ctx, 1)
destination.NextWithContext(ctx, 2)
destination.NextWithContext(ctx, 3)
destination.CompleteWithContext(ctx)
return nil
})

var values []int
ctx := context.WithValue(context.Background(), testCtxKey, "value")
sub := connectable.SubscribeWithContext(ctx, NewObserver(
func(value int) { values = append(values, value) },
func(err error) { t.Fatalf("unexpected error: %v", err) },
func() {},
))

// Connect the connectable observable
connectSub := connectable.Connect()
connectSub.Wait()
sub.Wait()

is.Equal([]int{1, 2, 3}, values)
is.NotNil(ctxReceived)
is.Equal("value", ctxReceived.Value(testCtxKey))
}

func TestNewConnectableObservableWithConfigAndContext(t *testing.T) {
t.Parallel()
is := assert.New(t)

var ctxReceived context.Context
config := ConnectableConfig[int]{
Connector: defaultConnector[int],
ResetOnDisconnect: false,
}

connectable := NewConnectableObservableWithConfigAndContext(
func(ctx context.Context, destination Observer[int]) Teardown {
ctxReceived = ctx
destination.NextWithContext(ctx, 1)
destination.NextWithContext(ctx, 2)
destination.NextWithContext(ctx, 3)
destination.CompleteWithContext(ctx)
return nil
},
config,
)

var values []int
ctx := context.WithValue(context.Background(), testCtxKey, "value")
sub := connectable.SubscribeWithContext(ctx, NewObserver(
func(value int) { values = append(values, value) },
func(err error) { t.Fatalf("unexpected error: %v", err) },
func() {},
))

// Connect the connectable observable
connectSub := connectable.Connect()
connectSub.Wait()
sub.Wait()

is.Equal([]int{1, 2, 3}, values)
is.NotNil(ctxReceived)
is.Equal("value", ctxReceived.Value(testCtxKey))
}
44 changes: 44 additions & 0 deletions docs/docs/core/observer.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,50 @@ ro.Just(1, 2, 3, 4).Subscribe(observer)
// Recovered error: something went wrong!
```

### Opting Out for Maximum Throughput

:::warning Performance vs. Safety

Disabling panic capture removes the recovery overhead but will let panics crash your stream. Only opt out in trusted, performance-critical pipelines.

:::

If you are building high-throughput pipelines, you can opt-out per-subscription. The library provides a small helper to disable panic capture on the subscription context. Use it when you want to measure pure hot-path throughput or when you intentionally accept panics to propagate.

```go
// Create a context that disables observer panic capture for the subscription.
ctx := ro.WithObserverPanicCaptureDisabled(context.Background())

sum := int64(0)
pipeline := ro.Pipe2(
ro.Range(0, 1_000_000),
ro.Map(func(v int64) int64 { return v + 1 }),
ro.Filter(func(v int64) bool { return v%2 == 0 }),
)

// SubscribeWithContext will pass the context to the subscription and
// downstream notifications — the opt-out avoids per-callback recover wrappers.
pipeline.SubscribeWithContext(ctx, ro.NewObserver(
func(v int64) { sum += v },
func(err error) { panic(err) },
func() {},
))
```

### Panic capture

Observers capture panics by default. If you need panics to propagate (for
benchmarking or performance-sensitive workloads), either construct an unsafe
observer with `NewObserverUnsafe` / `NewObserverWithContextUnsafe`, or
disable capture for a specific subscription by passing a context derived
with `WithObserverPanicCaptureDisabled(ctx)` to `SubscribeWithContext`:

```go
// Disable capture only for this subscription
ctx := ro.WithObserverPanicCaptureDisabled(context.Background())
pipeline.SubscribeWithContext(ctx, observer)
```

### State After Error

Once an Observer receives an error, it rejects further notifications:
Expand Down
53 changes: 52 additions & 1 deletion docs/docs/troubleshooting/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func BenchmarkMapOperator(b *testing.B) {
}

func BenchmarkConcurrentProcessing(b *testing.B) {
source := ro.Just(make([]int, 1000)...)
source := ro.Just(make([]int, 1000)...)

b.Run("Serial", func(b *testing.B) {
operator := serialProcessing()
Expand All @@ -311,6 +311,57 @@ func BenchmarkConcurrentProcessing(b *testing.B) {
}
```

### Subscriber Concurrency Modes

High-throughput sources can avoid unnecessary synchronization by selecting the right subscriber implementation. The core library now exposes `NewSingleProducerSubscriber`/`NewSingleProducerObservableWithContext`, and operators such as `Range` automatically opt into the `ConcurrencyModeSingleProducer` fast-path when there is exactly one upstream writer. This mode bypasses the `Lock`/`Unlock` calls entirely while retaining panic safety and teardown behavior. Use the following guidance when choosing a mode:

| Concurrency mode | Locking strategy | Drop policy | Recommended usage |
| --- | --- | --- | --- |
| `ConcurrencyModeSafe` | `sync.Mutex` | Blocks producers | Multiple writers or callbacks that may concurrently re-enter observers |
| `ConcurrencyModeEventuallySafe` | `sync.Mutex` | Drops when contended | Fan-in scenarios where losing values is acceptable |
| `ConcurrencyModeUnsafe` | No-op lock wrapper | Blocks producers | Single writer, but still routes through the locking API surface |
| `ConcurrencyModeSingleProducer` | No locking | Blocks producers | Single writer that needs the lowest possible overhead |

Note on panic-capture interaction
: Disabling capture lets some fast-paths
: (for example the single-producer and unsafe modes) avoid wrapping observer
: callbacks in the usual defer/recover machinery, which reduces
: per-notification overhead. Use `ro.WithObserverPanicCaptureDisabled(ctx)`
: when subscribing in benchmarks to avoid mutating global state and to keep
: tests parallel-friendly.

Run the million-row benchmark to compare the trade-offs:

```bash
go test -run=^$ -bench BenchmarkMillionRowChallenge -benchmem ./testing
```

Running the benchmark (tips)
:
- The benchmark harness in `testing/benchmark_million_rows_test.go` disables panic capture for the duration of the bench using a per-subscription context opt-out so the harness doesn't mutate global state. If you want to reproduce realistic production numbers, run the benchmark both with capture enabled and disabled.
- Increase bench time to reduce noise:

```bash
go test -run=^$ -bench BenchmarkMillionRowChallenge -benchmem ./testing -benchtime=10s
```

- To check for races, run:

```bash
go test -race ./...
```

- To profile CPU or mutex contention, use `pprof` with the benchmark or a traced run and inspect lock profiles to see how much time is spent acquiring `sync.Mutex` vs useful work.


Sample results on a 1M element pipeline:

- `single-producer`: 60.3 ms/op, 1.5 KiB/op, 39 allocs/op【9dc40c†L1-L5】【f63774†L1-L2】
- `unsafe-mutex`: 63.2 ms/op, 1.5 KiB/op, 39 allocs/op【f63774†L1-L2】【604fb9†L1-L2】
- `safe-mutex`: 67.1 ms/op, 1.6 KiB/op, 40 allocs/op【604fb9†L1-L2】【9ecf78†L1-L4】

The single-producer path trims roughly 4–6% off the runtime compared to the previous `unsafe` mode while preserving allocation characteristics. Stick with the safe variants whenever multiple goroutines might call `Next` concurrently.

## 6. Performance Optimization Checklist

### Memory Optimization
Expand Down
41 changes: 41 additions & 0 deletions examples/billion-rows-benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Billion-rows benchmark (example)

This example contains a benchmark harness that runs a pipeline against a static
CSV file (one integer per line). It's intended as a reproducible example for
large-file benchmarks such as the "billion rows" challenge.

Files
- `benchmark_test.go`: the benchmark. It expects a static fixture file with
one integer per line and emits those values through a simple CSV source.
- `fixtures/sample.csv`: a tiny sample fixture included for CI and quick runs.
- `scripts/expand_fixture.sh`: simple shell script to expand the small sample
into a larger fixture by repeating lines.

How to run
1. Use the small sample (fast / CI):

```bash
# from repo root
go test -run=^$ -bench BenchmarkMillionRowChallenge ./examples/billion-rows-benchmark -benchmem
```

2. Use a larger static fixture (recommended for real measurements):

- Obtain or generate a static CSV where each line is an integer (the 1B
challenge provides generators). Place it at `examples/billion-rows-benchmark/fixtures/1brc.csv` or set `FIXTURE_PATH`.

Example to expand the included sample to 1_000_000 lines (quick, not realistic):

```bash
cd examples/billion-rows-benchmark
mkdir -p fixtures
./scripts/expand_fixture.sh fixtures/sample.csv fixtures/1m.csv 1000000
export FIXTURE_PATH=$(pwd)/fixtures/1m.csv
# run the bench (this will still run the benchmark harness, which runs the pipeline once per iteration)
go test -run=^$ -bench BenchmarkMillionRowChallenge -benchmem
```

Notes
- The benchmark accepts `FIXTURE_PATH` environment variable to point to the CSV fixture. If not set, it falls back to `fixtures/sample.csv` included in the example.
- For the official 1B challenge, follow the instructions in the challenge repository to generate the required static file and set `FIXTURE_PATH` to that file.
- The benchmark uses the per-subscription helper `ro.WithObserverPanicCaptureDisabled(ctx)` to avoid mutating global state when measuring hot-path performance.
121 changes: 121 additions & 0 deletions examples/billion-rows-benchmark/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package benchmark

import (
"bytes"
"context"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/samber/ro"
"golang.org/x/exp/mmap"
)

// csvSource creates an Observable that reads int64 values (one per line)
// from the provided file path. It emits each parsed value and completes.
// This is intentionally simple: the observable reads the file synchronously
// on subscribe and emits values to the destination observer.
func csvSource(path string) ro.Observable[int64] {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For max speed, the syscall.Mmap syscall is recommended 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

direct syscall.Mmap requires OS-specific code and careful unmapping.
Shall I go with x/exp/mmap which provides a simpler ReaderAt wrapper and is cross-platform?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, definitely!

I did not know there was an experimental implementation!

return ro.NewObservableWithContext(func(ctx context.Context, dest ro.Observer[int64]) ro.Teardown {
reader, err := mmap.Open(path)
if err != nil {
dest.Error(err)
return nil
}
defer func() { _ = reader.Close() }()

size := reader.Len()
if size == 0 {
dest.CompleteWithContext(ctx)
return nil
}

data := make([]byte, size)
if _, err := reader.ReadAt(data, 0); err != nil {
dest.Error(err)
return nil
}

offset := 0
for offset < len(data) {
next := bytes.IndexByte(data[offset:], '\n')
var line []byte
if next == -1 {
line = data[offset:]
offset = len(data)
} else {
line = data[offset : offset+next]
offset += next + 1
}

if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}

v, err := strconv.ParseInt(string(line), 10, 64)
if err != nil {
dest.Error(err)
return nil
}

// propagate context-aware notifications
dest.NextWithContext(ctx, v)
}

dest.CompleteWithContext(ctx)
return nil
})
}

// Benchmark that runs the "million row" pipeline using a static CSV fixture.
// The benchmark expects a file with one integer per line. By default it will
// use the small sample in the fixtures directory. To benchmark a large static
// dataset, set the FIXTURE_PATH environment variable or place the file at
// `examples/billion-rows-benchmark/fixtures/1brc.csv`.
func BenchmarkMillionRowChallenge(b *testing.B) {
b.ReportAllocs()

fixture := os.Getenv("FIXTURE_PATH")
if fixture == "" {
fixture = filepath.Join("fixtures", "sample.csv")
}

// Use per-subscription opt-out of panic capture so the benchmark measures
// hot-path throughput without mutating global state.
ctx := ro.WithObserverPanicCaptureDisabled(context.Background())

benchmarkCases := []struct {
name string
src ro.Observable[int64]
}{
{name: "file-source", src: csvSource(fixture)},
}

for _, tc := range benchmarkCases {
b.Run(tc.name, func(b *testing.B) {
pipeline := ro.Pipe3(
tc.src,
ro.Map(func(value int64) int64 { return value + 1 }),
ro.Filter(func(value int64) bool { return value%2 == 0 }),
ro.Map(func(value int64) int64 { return value * 3 }),
)

b.ResetTimer()
for i := 0; i < b.N; i++ {
var sum int64

subscription := pipeline.SubscribeWithContext(ctx, ro.NewObserver(
func(value int64) { sum += value },
func(err error) { b.Fatalf("unexpected error: %v", err) },
func() {},
))

subscription.Wait()

// keep the correctness guard
_ = sum
}
})
}
}
10 changes: 10 additions & 0 deletions examples/billion-rows-benchmark/fixtures/sample.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
1
2
3
4
5
6
7
8
9
10
Loading
Loading