Skip to content

Commit 027351d

Browse files
committed
Properly manage JNI resources in JVMTI wallclock sampler
1 parent fe6aba3 commit 027351d

File tree

2 files changed

+95
-57
lines changed

2 files changed

+95
-57
lines changed

ddprof-lib/src/main/cpp/wallClock.cpp

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -161,49 +161,83 @@ void WallClockJVMTI::timerLoop() {
161161
// Attach to JVM as the first step
162162
VM::attachThread("Datadog Profiler Wallclock Sampler");
163163
auto collectThreads = [&](std::vector<ThreadEntry>& threads) {
164-
jvmtiEnv* jvmti = VM::jvmti();
165-
if (jvmti == nullptr) {
166-
return;
167-
}
168-
JNIEnv* jni = VM::jni();
169-
170-
jint threads_count = 0;
171-
jthread* threads_ptr = nullptr;
172-
jvmti->GetAllThreads(&threads_count, &threads_ptr);
164+
jvmtiEnv* jvmti = VM::jvmti();
165+
if (jvmti == nullptr) {
166+
return;
167+
}
168+
JNIEnv* jni = VM::jni();
173169

174-
bool do_filter = Profiler::instance()->threadFilter()->enabled();
175-
int self = OS::threadId();
170+
jint threads_count = 0;
171+
jthread* threads_ptr = nullptr;
172+
jvmti->GetAllThreads(&threads_count, &threads_ptr);
176173

177-
for (int i = 0; i < threads_count; i++) {
178-
jthread thread = threads_ptr[i];
179-
if (thread != nullptr) {
180-
VMThread* nThread = VMThread::fromJavaThread(jni, thread);
181-
if (nThread == nullptr) {
182-
continue;
183-
}
184-
int tid = nThread->osThreadId();
185-
if (tid != self && (!do_filter || Profiler::instance()->threadFilter()->accept(tid))) {
186-
threads.push_back({nThread, thread});
187-
}
174+
bool do_filter = Profiler::instance()->threadFilter()->enabled();
175+
int self = OS::threadId();
176+
for (int i = 0; i < threads_count; i++) {
177+
jthread thread = threads_ptr[i];
178+
if (thread != nullptr) {
179+
VMThread* nThread = VMThread::fromJavaThread(jni, thread);
180+
if (nThread == nullptr) {
181+
jni->DeleteLocalRef(thread);
182+
continue;
188183
}
184+
jint thread_state;
185+
if (jvmti->GetThreadState(thread, &thread_state) == JVMTI_ERROR_NONE &&
186+
(thread_state & JVMTI_THREAD_STATE_TERMINATED) == 0) {
187+
int tid = VMThread::nativeThreadId(jni, thread);
188+
if (tid != -1 && tid != self && (!do_filter || Profiler::instance()->threadFilter()->accept(tid))) {
189+
threads.push_back({nThread, jni->NewWeakGlobalRef(thread)});
190+
}
191+
}
192+
jni->DeleteLocalRef(thread);
189193
}
190-
jvmti->Deallocate((unsigned char*)threads_ptr);
194+
}
195+
jvmti->Deallocate((unsigned char*)threads_ptr);
191196
};
192197

193198
auto sampleThreads = [&](ThreadEntry& thread_entry, int& num_failures, int& threads_already_exited, int& permission_denied) {
194199
static jint max_stack_depth = (jint)Profiler::instance()->max_stack_depth();
195200
static jvmtiFrameInfo* frame_buffer = new jvmtiFrameInfo[max_stack_depth];
196201
static jvmtiEnv* jvmti = VM::jvmti();
202+
static JNIEnv* jni = VM::jni();
197203

198204
int num_frames = 0;
199-
jvmtiError err = jvmti->GetStackTrace(thread_entry.java, 0, max_stack_depth, frame_buffer, &num_frames);
205+
if (thread_entry.java_ref == nullptr) {
206+
num_failures++;
207+
return false;
208+
}
209+
210+
jobject thread = jni->NewLocalRef(thread_entry.java_ref);
211+
if (thread == nullptr) {
212+
num_failures++;
213+
jni->DeleteWeakGlobalRef(thread_entry.java_ref);
214+
return false;
215+
}
216+
217+
jint thread_state;
218+
jvmtiError state_err = jvmti->GetThreadState(thread, &thread_state);
219+
if (state_err != JVMTI_ERROR_NONE || (thread_state & JVMTI_THREAD_STATE_TERMINATED) != 0) {
220+
num_failures++;
221+
jni->DeleteWeakGlobalRef(thread_entry.java_ref);
222+
jni->DeleteLocalRef(thread);
223+
return false;
224+
}
225+
jvmtiError err = jvmti->GetStackTrace(thread, 0, max_stack_depth, frame_buffer, &num_frames);
226+
// cleanup the reference(s) to the java thread
227+
jni->DeleteWeakGlobalRef(thread_entry.java_ref);
228+
jni->DeleteLocalRef(thread);
229+
200230
if (err != JVMTI_ERROR_NONE) {
201231
num_failures++;
202232
if (err == JVMTI_ERROR_THREAD_NOT_ALIVE) {
203233
threads_already_exited++;
204234
}
205235
return false;
206236
}
237+
if (num_frames == 0) {
238+
// some JVMTI attached threads are Java-like but have no stack; we can just ignore them
239+
return true;
240+
}
207241
ExecutionEvent event;
208242
VMThread* vm_thread = thread_entry.native;
209243
int raw_thread_state = vm_thread->state();
@@ -267,6 +301,5 @@ void WallClockASGCT::timerLoop() {
267301
}
268302
return true;
269303
};
270-
271304
timerLoopCommon<int>(collectThreads, sampleThreads, _reservoir_size, _interval);
272305
}

ddprof-lib/src/main/cpp/wallClock.h

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ class BaseWallClock : public Engine {
3939
// Profiler::recordSample().
4040
int _reservoir_size;
4141

42-
pthread_t _thread;
43-
virtual void timerLoop() = 0;
44-
virtual void initialize(Arguments& args) {};
42+
pthread_t _thread;
43+
virtual void timerLoop() = 0;
44+
virtual void initialize(Arguments& args) {};
4545

4646
static void *threadEntry(void *wall_clock) {
4747
((BaseWallClock *)wall_clock)->timerLoop();
@@ -76,38 +76,43 @@ class BaseWallClock : public Engine {
7676

7777
while (_running.load(std::memory_order_relaxed)) {
7878
collectThreads(threads);
79+
int size = threads.size();
80+
if (threads.size() > 0) {
81+
int num_failures = 0;
82+
int threads_already_exited = 0;
83+
int permission_denied = 0;
84+
std::vector<ThreadType> sample = reservoir.sample(threads);
85+
for (ThreadType thread : sample) {
86+
if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) {
87+
continue;
88+
}
89+
}
7990

80-
int num_failures = 0;
81-
int threads_already_exited = 0;
82-
int permission_denied = 0;
83-
std::vector<ThreadType> sample = reservoir.sample(threads);
84-
for (ThreadType thread : sample) {
85-
if (!sampleThreads(thread, num_failures, threads_already_exited, permission_denied)) {
86-
continue;
91+
epoch.updateNumSamplableThreads(threads.size());
92+
epoch.updateNumFailedSamples(num_failures);
93+
epoch.updateNumSuccessfulSamples(sample.size() - num_failures);
94+
epoch.updateNumExitedThreads(threads_already_exited);
95+
epoch.updateNumPermissionDenied(permission_denied);
96+
u64 endTime = TSC::ticks();
97+
u64 duration = TSC::ticks_to_millis(endTime - startTime);
98+
if (epoch.hasChanged() || duration >= 1000) {
99+
epoch.endEpoch(duration);
100+
Profiler::instance()->recordWallClockEpoch(self, &epoch);
101+
epoch.newEpoch(endTime);
102+
startTime = endTime;
103+
} else {
104+
epoch.clean();
87105
}
88-
}
89106

90-
epoch.updateNumSamplableThreads(threads.size());
91-
epoch.updateNumFailedSamples(num_failures);
92-
epoch.updateNumSuccessfulSamples(sample.size() - num_failures);
93-
epoch.updateNumExitedThreads(threads_already_exited);
94-
epoch.updateNumPermissionDenied(permission_denied);
95-
u64 endTime = TSC::ticks();
96-
u64 duration = TSC::ticks_to_millis(endTime - startTime);
97-
if (epoch.hasChanged() || duration >= 1000) {
98-
epoch.endEpoch(duration);
99-
Profiler::instance()->recordWallClockEpoch(self, &epoch);
100-
epoch.newEpoch(endTime);
101-
startTime = endTime;
102-
} else {
103-
epoch.clean();
107+
threads.clear();
104108
}
105-
106-
threads.clear();
107109
// Get a random sleep duration
108-
// clamp the random interval to <1,2N-1>
109-
// the probability of clamping is extremely small, close to zero
110-
OS::sleep(std::min(std::max((long int)1, static_cast<long int>(distribution(generator))), ((_interval * 2) - 1)));
110+
// restrict the random interval to <N/2,2N-1>
111+
long int delay = _interval;
112+
do {
113+
delay = static_cast<long int>(distribution(generator));
114+
} while (delay < interval / 2 || delay > 2 * interval);
115+
OS::sleep(delay);
111116
}
112117
}
113118

@@ -159,8 +164,8 @@ class WallClockJVMTI : public BaseWallClock {
159164
void timerLoop() override;
160165
public:
161166
struct ThreadEntry {
162-
VMThread* native;
163-
jthread java;
167+
VMThread* native;
168+
jobject java_ref;
164169
};
165170
WallClockJVMTI() : BaseWallClock() {}
166171
const char* name() override {

0 commit comments

Comments
 (0)