Skip to content

Commit b834e24

Browse files
committed
fix: Check resources growth
1 parent 80d964a commit b834e24

1 file changed

Lines changed: 198 additions & 0 deletions

File tree

tests/test_resource_growth.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
# Copyright 2025 Adobe. All rights reserved.
2+
# This file is licensed to you under the Apache License,
3+
# Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
4+
# or the MIT license (http://opensource.org/licenses/MIT),
5+
# at your option.
6+
7+
# Unless required by applicable law or agreed to in writing,
8+
# this software is distributed on an "AS IS" BASIS, WITHOUT
9+
# WARRANTIES OR REPRESENTATIONS OF ANY KIND, either express or
10+
# implied. See the LICENSE-MIT and LICENSE-APACHE files for the
11+
# specific language governing permissions and limitations under
12+
# each license.
13+
14+
"""
15+
Regression tests for resource growth under concurrent Reader/Builder load.
16+
17+
These tests reproduce the production symptom observed in gen-3p-proxy-pipeline:
18+
- Memory growing from ~11 GiB to ~19 GiB per pod under signing load
19+
- CPU spiking to 599% during concurrent operations
20+
21+
Root causes fixed in c2pa-rs:
22+
- A new tokio multi-thread Runtime (= new OS thread pool) was created per
23+
Reader FFI call. Under load this produced hundreds of leaked threads.
24+
- A new reqwest::Client (= new TCP connection pool) was created per
25+
Reader/Builder instance and not pooled.
26+
27+
These tests measure OS-level thread count and RSS before/after a burst of
28+
concurrent Reader operations and assert both stay within reasonable bounds.
29+
They are expected to FAIL against the un-fixed c2pa-rs library.
30+
"""
31+
32+
import gc
33+
import io
34+
import os
35+
import threading
36+
import time
37+
import unittest
38+
import concurrent.futures
39+
40+
from c2pa import Reader
41+
42+
FIXTURES_FOLDER = os.path.join(os.path.dirname(__file__), "fixtures")
43+
DEFAULT_TEST_FILE = os.path.join(FIXTURES_FOLDER, "C.jpg")
44+
45+
# Burst parameters that reproduce production-scale pressure without being
46+
# excessively slow in CI.
47+
BURST_ITERATIONS = 60
48+
BURST_WORKERS = 8
49+
50+
# Thresholds — intentionally generous to avoid flakiness, but tight enough
51+
# to catch the original bugs (which produced 400+ threads and hundreds of MB).
52+
MAX_THREAD_GROWTH = BURST_WORKERS + 4 # executor threads + small buffer
53+
MAX_MEMORY_GROWTH_MB = 80 # generous; old code would add 300+ MB
54+
55+
56+
def _proc_status() -> dict[str, str]:
57+
"""Read /proc/self/status on Linux; return empty dict on other platforms."""
58+
try:
59+
with open("/proc/self/status") as f:
60+
return dict(
61+
line.split(":", 1) for line in f if ":" in line
62+
)
63+
except OSError:
64+
return {}
65+
66+
67+
def _thread_count() -> int:
68+
"""OS-level thread count (includes native tokio threads, not just Python)."""
69+
status = _proc_status()
70+
if "Threads" in status:
71+
return int(status["Threads"].strip())
72+
# macOS / Windows fallback: Python threads only (still catches Python leaks)
73+
return threading.active_count()
74+
75+
76+
def _rss_mb() -> float | None:
77+
"""Resident set size in MB, or None if not measurable."""
78+
status = _proc_status()
79+
if "VmRSS" in status:
80+
kb = int(status["VmRSS"].strip().split()[0])
81+
return kb / 1024.0
82+
return None
83+
84+
85+
class TestConcurrentReaderResourceGrowth(unittest.TestCase):
86+
"""Verify that concurrent Reader operations do not leak threads or memory.
87+
88+
The burst deliberately mirrors the production pattern: many goroutines /
89+
asyncio tasks simultaneously calling the C2PA signing stack, each going
90+
through Reader.json() which triggers post-validation via the Rust FFI.
91+
"""
92+
93+
@classmethod
94+
def setUpClass(cls):
95+
with open(DEFAULT_TEST_FILE, "rb") as f:
96+
cls.test_data = f.read()
97+
98+
def _read_once(self) -> None:
99+
buf = io.BytesIO(self.test_data)
100+
reader = Reader("image/jpeg", buf)
101+
_ = reader.json()
102+
reader.close()
103+
104+
def _burst(self, iterations: int = BURST_ITERATIONS, workers: int = BURST_WORKERS) -> None:
105+
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
106+
futures = [pool.submit(self._read_once) for _ in range(iterations)]
107+
for f in concurrent.futures.as_completed(futures):
108+
f.result() # re-raise any exception from the worker
109+
110+
def test_thread_count_stable_under_load(self):
111+
"""OS thread count must not grow unboundedly during a Reader burst.
112+
113+
Before fix: each Reader FFI call created a new tokio multi-thread
114+
Runtime (Builder::new_multi_thread()) = ~8 new OS threads each call.
115+
60 calls × 8 threads = 480+ leaked threads on top of baseline.
116+
117+
After fix: one shared static Runtime; thread count stays near baseline.
118+
"""
119+
# Warm up: let the static runtime and connection pool initialise once.
120+
self._burst(iterations=10, workers=4)
121+
gc.collect()
122+
time.sleep(0.2)
123+
124+
threads_before = _thread_count()
125+
126+
self._burst()
127+
gc.collect()
128+
time.sleep(0.5) # allow OS to reap any transient threads
129+
130+
threads_after = _thread_count()
131+
growth = threads_after - threads_before
132+
133+
self.assertLessEqual(
134+
growth,
135+
MAX_THREAD_GROWTH,
136+
f"Thread count grew by {growth} (before={threads_before}, after={threads_after}). "
137+
f"Indicates a tokio Runtime is being created per FFI call instead of shared. "
138+
f"Expected growth ≤ {MAX_THREAD_GROWTH}.",
139+
)
140+
141+
def test_memory_stable_under_load(self):
142+
"""RSS must not grow unboundedly during a Reader burst.
143+
144+
Before fix: each Reader held a private reqwest::Client (TCP connection
145+
pool ~100-500 KB each). 60 concurrent Readers = 30-300 MB of pools
146+
accumulating before GC can collect them.
147+
148+
After fix: one shared static reqwest::Client; RSS stays near baseline.
149+
"""
150+
rss_before = _rss_mb()
151+
if rss_before is None:
152+
self.skipTest("RSS measurement not available on this platform")
153+
154+
# Warm up
155+
self._burst(iterations=10, workers=4)
156+
gc.collect()
157+
time.sleep(0.2)
158+
159+
rss_before = _rss_mb()
160+
161+
self._burst()
162+
gc.collect()
163+
time.sleep(0.5)
164+
165+
rss_after = _rss_mb()
166+
growth_mb = rss_after - rss_before
167+
168+
self.assertLess(
169+
growth_mb,
170+
MAX_MEMORY_GROWTH_MB,
171+
f"RSS grew by {growth_mb:.1f} MB (before={rss_before:.1f} MB, after={rss_after:.1f} MB). "
172+
f"Indicates connection pools or Rust objects are not being shared/freed. "
173+
f"Expected growth < {MAX_MEMORY_GROWTH_MB} MB.",
174+
)
175+
176+
def test_stream_callbacks_released_on_close(self):
177+
"""Stream callbacks must be None after close(), not held until GC.
178+
179+
Callbacks hold references to the backing BytesIO which delays
180+
memory reclamation under concurrent load.
181+
"""
182+
buf = io.BytesIO(self.test_data)
183+
# Access internal Stream via Reader internals is not possible directly;
184+
# test via the public Stream class used by Builder.
185+
from c2pa.c2pa import Stream
186+
187+
inner_buf = io.BytesIO(self.test_data)
188+
stream = Stream(inner_buf)
189+
self.assertTrue(stream._read_cb is not None, "callback should exist before close")
190+
stream.close()
191+
self.assertIsNone(stream._read_cb, "_read_cb must be None after close()")
192+
self.assertIsNone(stream._seek_cb, "_seek_cb must be None after close()")
193+
self.assertIsNone(stream._write_cb, "_write_cb must be None after close()")
194+
self.assertIsNone(stream._flush_cb, "_flush_cb must be None after close()")
195+
196+
197+
if __name__ == "__main__":
198+
unittest.main()

0 commit comments

Comments
 (0)