Skip to content

Commit

Permalink
use libcurl to instead of httplib
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyi committed Feb 7, 2025
1 parent 2f22ac3 commit 44184cf
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 10,297 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ jobs:
with:
distribution: 'corretto'
java-version: '11'
- run: sudo apt update && sudo apt -y install zlib1g-dev libcurl4 libcurl4-openssl-dev
- run: sudo sysctl kernel.perf_event_paranoid=1
- run: make -j`nproc`
- run: make test
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
with:
path: |
build/bin/
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ API_JAR=lib/async-profiler.jar
CONVERTER_JAR=lib/converter.jar

CFLAGS=-O3 -fno-exceptions
CXXFLAGS=-O3 -fno-exceptions -fno-omit-frame-pointer -fvisibility=hidden
CXXFLAGS=-std=c++11 -O3 -fno-exceptions -fno-omit-frame-pointer -fvisibility=hidden
INCLUDES=-I$(JAVA_HOME)/include -Isrc/helper
LIBS=-ldl -lpthread
LIBS=-ldl -lpthread -lcurl -lz
MERGE=true

JAVAC=$(JAVA_HOME)/bin/javac
Expand Down
25 changes: 15 additions & 10 deletions src/arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,16 +472,19 @@ const char* Arguments::file() {
return _file;
}

httplib::Client* Arguments::httpClient() {
if (_httpcli == NULL) {
_httpcli = new httplib::Client(_dd_agent_host, _dd_trace_agent_port);
_httpcli->set_keep_alive(true);
_httpcli->set_follow_location(true);
_httpcli->set_connection_timeout(3, 0);
_httpcli->set_read_timeout(5, 0);
_httpcli->set_write_timeout(8, 0);
CURL* Arguments::httpClient() {
if (_curl == NULL) {
_curl = curl_easy_init();
if (_curl) {
char url[2048] = {0};
snprintf(url, sizeof(url), "http://%s:%d/profiling/v1/input", _dd_agent_host, _dd_trace_agent_port);
curl_easy_setopt(_curl, CURLOPT_URL, url);
curl_easy_setopt(_curl, CURLOPT_CONNECTTIMEOUT, 3L);
curl_easy_setopt(_curl, CURLOPT_TIMEOUT, 5L);
curl_easy_setopt(_curl, CURLOPT_FOLLOWLOCATION, 1L);
}
}
return _httpcli;
return _curl;
}

// Returns true if the log file is a temporary file of asprof launcher
Expand Down Expand Up @@ -616,7 +619,9 @@ int Arguments::parseTimeout(const char* str) {
Arguments::~Arguments() {
if (!_shared) free(_buf);
if (_temp_filename != NULL) free(_temp_filename);
delete _httpcli;
if (_curl) {
curl_easy_cleanup(_curl);
}
}

void Arguments::save() {
Expand Down
8 changes: 4 additions & 4 deletions src/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@


#include <stddef.h>
#include <curl/curl.h>
#define CPPHTTPLIB_NO_EXCEPTIONS
#include "httplib.h"


const long DEFAULT_INTERVAL = 10000000; // 10 ms
Expand Down Expand Up @@ -146,7 +146,7 @@ class Arguments {
private:
char* _buf;
bool _shared;
httplib::Client* _httpcli;
CURL* _curl;

void appendToEmbeddedList(int& list, char* value);
const char* expandFilePattern(const char* pattern);
Expand Down Expand Up @@ -267,7 +267,7 @@ class Arguments {
_dd_env(""),
_dd_version(""),
_dd_tags(""),
_httpcli(NULL),
_curl(NULL),
_http_out(false) {
}

Expand All @@ -281,7 +281,7 @@ class Arguments {

const char* file();

httplib::Client* httpClient();
CURL* httpClient();

bool hasTemporaryLog() const;

Expand Down
145 changes: 123 additions & 22 deletions src/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include <sys/types.h>
#include <sys/utsname.h>
#include <unistd.h>
#include <vector>
#include <zlib.h>
#include <curl/curl.h>
#include "demangle.h"
#include "flightRecorder.h"
#include "incbin.h"
Expand All @@ -28,7 +31,6 @@
#include "tsc.h"
#include "vmStructs.h"
#include "nlohmann/json.hpp"
#include "httplib.h"


INCBIN(JFR_SYNC_CLASS, "src/helper/one/profiler/JfrSync.class")
Expand All @@ -37,6 +39,48 @@ static void JNICALL JfrSync_stopProfiler(JNIEnv* env, jclass cls) {
Profiler::instance()->stop();
}

static bool gzipCompress(const std::vector<unsigned char>& in, std::vector<unsigned char>& out) {
// 计算压缩后所需的最大缓冲区大小
const uLongf compressedSize = compressBound(in.size());
out.resize(compressedSize);

// 初始化 zlib 压缩流
z_stream zs;
memset(&zs, 0, sizeof(zs));
// 使用 gzip 格式进行压缩
if (deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
std::cerr << "Failed to initialize zlib deflate stream." << std::endl;
return false;
}

// 设置输入和输出缓冲区
zs.next_in = const_cast<Bytef*>(in.data());
zs.avail_in = in.size();
zs.next_out = out.data();
zs.avail_out = compressedSize;

// 执行压缩操作
const int ret = deflate(&zs, Z_FINISH);
if (ret != Z_STREAM_END) {
std::cerr << "Compression failed with return code: " << ret << std::endl;
deflateEnd(&zs);
return false;
}

// 调整输出缓冲区的大小为实际压缩后的大小
out.resize(zs.total_out);

// 结束压缩流并释放资源
deflateEnd(&zs);
return true;
}

static size_t curlWriteCB(void *contents, size_t size, size_t nmem, std::string *s) {
const size_t len = size * nmem;
s->append(static_cast<char *>(contents), len);
return len;
}


const int SMALL_BUFFER_SIZE = 1024;
const int SMALL_BUFFER_LIMIT = SMALL_BUFFER_SIZE - 128;
Expand Down Expand Up @@ -417,6 +461,31 @@ class RecordingBuffer : public Buffer {
}
};

static int hex_to_dec(const char hex) {
if (hex >= '0' && hex <= '9')
{
return hex - '0';
}
if (hex >= 'a' && hex <= 'f')
{
return hex - 'a' + 10;
}
if (hex >= 'A' && hex <= 'F')
{
return hex - 'A' + 10;
}
return 0;
}

static void hex_to_string(const char *hex, char *str) {
int i;
for (i = 0; hex[i] != '\0'; i += 2)
{
str[i / 2] = (char)(hex_to_dec(hex[i]) << 4 | hex_to_dec(hex[i + 1]));
}
str[i / 2] = '\0';
}


class Recording {
private:
Expand Down Expand Up @@ -508,16 +577,16 @@ class Recording {

if (_global_args._http_out) {
off_t size = lseek(_fd, 0, SEEK_SET);
std::string jfr;
std::vector<unsigned char> jfr;
jfr.reserve(size);
char buf[8192];
unsigned char buf[8192];
ssize_t rlen;
while(true) {
rlen = read(_fd, buf, sizeof(buf));
if (rlen <= 0) {
break;
}
jfr.append(buf, rlen);
jfr.insert(jfr.end(), buf, buf+rlen);
}

nlohmann::json j;
Expand All @@ -527,39 +596,71 @@ class Recording {
j["language"] = "jvm";
std::stringstream ss;
// "process_id:31145,service:zy-profiling-test,profiler_version:0.102.0~b67f6e3380,host:zydeMacBook-Air.local,runtime-id:06dddda1-957b-4619-97cb-1a78fc7e3f07,language:jvm,env:test,version:v1.2"
char hostname[512] = {};
char hostname[512] = {0};
gethostname(hostname, sizeof(hostname));
ss << "process_id:" << getpid() << ",host:" << hostname << ",profiler_version:" << PROFILER_VERSION << ",service:" << _global_args._dd_service;
ss << ",env:" << _global_args._dd_env << ",version:" << _global_args._dd_version << ",language:jvm";
if (_global_args._dd_tags != NULL && _global_args._dd_tags[0] != 0) {
ss << "," << _global_args._dd_tags;
char *original_tags = (char *)malloc(strlen(_global_args._dd_tags) / 2 + 1);
hex_to_string(_global_args._dd_tags, original_tags);
ss << "," << original_tags;
free(original_tags);
}
j["tags_profiler"] = ss.str();
time_t start_time = Profiler::instance()->start_time();
time_t stop_time = Profiler::instance()->stop_time();
char buffer[40] = {};
char buffer[40] = {0};
strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", gmtime(&start_time));
j["start"] = std::string(buffer);

strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", gmtime(&stop_time));
j["end"] = std::string(buffer);
std::string json = nlohmann::to_string(j);

httplib::MultipartFormDataItems form = {
{"event", j.dump(), "event.json", "application/octet-stream"},
{"main", jfr, "main.jfr", "application/octet-stream"},
};
httplib::Client* cli = _global_args.httpClient();
httplib::Result result;
for (int i = 0; i < 3; i++) {
result = cli->Post("/profiling/v1/input", form);
if (result == NULL) {
Log::warn("unable to do http request: %s", httplib::to_string(result.error()).c_str());
continue;
}
if (result->status / 100 == 2) {
break;
CURL* curl = _global_args.httpClient();
if (curl != NULL) {
std::vector<unsigned char> gzipOut;
if (!gzipCompress(jfr, gzipOut)) {
Log::error("failed to gzip compress jfr file");
} else {
Log::warn("unable to send profile file by http, status: %d, response: %s", result->status, result->body.c_str());
for (int i = 0; i < 3; i++) {
//curl_easy_reset(curl);
curl_mime *multipart = curl_mime_init(curl);
curl_mimepart *part = curl_mime_addpart(multipart);
curl_mime_name(part, "event");
curl_mime_filename(part, "event.json"); // set the form filename
curl_mime_type(part, "application/json");
curl_mime_data(part, json.data(), json.size());

part = curl_mime_addpart(multipart);
curl_mime_name(part, "main");
curl_mime_filename(part, "main.jfr");
curl_mime_type(part, "application/octet-stream");
curl_mime_data(part, reinterpret_cast<const char *>(gzipOut.data()), gzipOut.size());

curl_easy_setopt(curl, CURLOPT_MIMEPOST, multipart);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlWriteCB);
std::string resp;
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &resp);

const CURLcode res = curl_easy_perform(curl);
// 清理 multipart 表单数据
curl_mime_free(multipart);

// 检查请求是否成功
if(res != CURLE_OK) {
Log::warn("unable to do http request, errcode: %d, errmsg: %s", res, curl_easy_strerror(res));
continue;
} else {
long httpCode;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
if (httpCode / 100 == 2) {
break;
} else {
Log::warn("unable to send profile file by http, status code: %ld, response: %s", httpCode, resp.c_str());
}
}
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>
#include "hooks.h"
#include "asprof.h"
#include "cpuEngine.h"
Expand Down Expand Up @@ -148,6 +149,7 @@ bool Hooks::init(bool attach) {
return false;
}

curl_global_init(CURL_GLOBAL_ALL);
Profiler::instance()->updateSymbols(false);
Profiler::setupSignalHandlers();

Expand All @@ -165,6 +167,7 @@ bool Hooks::init(bool attach) {

void Hooks::shutdown() {
Profiler::instance()->shutdown(_global_args);
curl_global_cleanup();
}

void Hooks::patchLibraries() {
Expand Down
Loading

0 comments on commit 44184cf

Please sign in to comment.