Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions cmake/FindLDMS.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,38 @@
# LDMS_LIBRARIES The LDMS library
# LDMS_INCLUDE_DIRS The location of LDMS headers

# Set up search paths based on LDMS_PREFIX
set(LDMS_SEARCH_PATHS /usr/lib64 /usr/lib /opt/ovis/lib)
set(LDMS_INCLUDE_SEARCH_PATHS /usr/include /usr/local/include /opt/ovis/include)

if(LDMS_PREFIX)
list(INSERT LDMS_SEARCH_PATHS 0 "${LDMS_PREFIX}/lib")
list(INSERT LDMS_INCLUDE_SEARCH_PATHS 0 "${LDMS_PREFIX}/include")
endif()

find_library(LDMS_LIBLDMS
NAMES ldms
PATHS /usr/lib64 /usr/lib /opt/ovis/lib
PATHS ${LDMS_SEARCH_PATHS}
)

find_library(LDMS_LIBSTREAM
NAMES ldmsd_stream
PATHS /usr/lib64 /usr/lib /opt/ovis/lib
PATHS ${LDMS_SEARCH_PATHS}
)

find_path(LDMS_INCLUDE_DIRS
NAMES "ldms.h" "ldmsd_stream.h"
PATH_SUFFIXES "ldms"
NAMES "ldms/ldms.h" "ldms/ldmsd_stream.h"
PATHS ${LDMS_INCLUDE_SEARCH_PATHS}
)
message(STATUS "libldms.so => ${LDMS_LIBRARIES}")
message(STATUS "ldmsd_stream.h => ${LDMS_INCLUDE_DIRS}")
message(STATUS "ldms.h => ${LDMS_INCLUDE_DIRS}")

message(STATUS "libldms.so => ${LDMS_LIBLDMS}")
message(STATUS "libldmsd_stream.so => ${LDMS_LIBSTREAM}")
message(STATUS "LDMS include dir => ${LDMS_INCLUDE_DIRS}")

# Set LDMS_LIBRARIES to include both libraries
if(LDMS_LIBLDMS AND LDMS_LIBSTREAM)
set(LDMS_LIBRARIES ${LDMS_LIBLDMS} ${LDMS_LIBSTREAM})
endif()

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(LDMS DEFAULT_MSG
Expand Down
8 changes: 4 additions & 4 deletions src/services/ldms/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
include_directories(${LDMS_INCLUDE_DIRS})

set(CALIPER_LDMS_SOURCES
LdmsForwarder.cpp)

add_library(caliper-ldms OBJECT ${CALIPER_LDMS_SOURCES})
target_include_directories(caliper-ldms PRIVATE ${LDMS_INCLUDE_DIRS})

# Add compiler flags to handle LDMS header compatibility issues
target_compile_options(caliper-ldms PRIVATE -fpermissive)

add_service_objlib("caliper-ldms")
add_caliper_service("ldms CALIPER_HAVE_LDMS")


80 changes: 57 additions & 23 deletions src/services/ldms/LdmsForwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ static void event_cb(ldms_t x, ldms_xprt_event_t e, void* cb_arg)
conn_status = 0;
break;
case LDMS_XPRT_EVENT_REJECTED:
ldms_xprt_put(x);
ldms_xprt_put(x, "tst");
conn_status = ECONNREFUSED;
break;
case LDMS_XPRT_EVENT_DISCONNECTED:
ldms_xprt_put(x);
ldms_xprt_put(x, "tst");
conn_status = ENOTCONN;
break;
case LDMS_XPRT_EVENT_ERROR:
Expand Down Expand Up @@ -89,7 +89,7 @@ ldms_t setup_connection(const char* xprt, const char* host, const char* port, co
ts.tv_nsec = 0;
}

ldms_g = ldms_xprt_new_with_auth(xprt, NULL, auth, NULL);
ldms_g = ldms_xprt_new_with_auth(xprt, auth, NULL);
if (!ldms_g) {
printf("Error %d creating the '%s' transport\n", errno, xprt);
return NULL;
Expand Down Expand Up @@ -118,20 +118,25 @@ void caliper_ldms_connector_initialize()
const char* env_ldms_auth = getenv("CALIPER_LDMS_AUTH");

/* Check/set LDMS transport type */
if (!env_ldms_xprt || !env_ldms_host || !env_ldms_port || !env_ldms_auth) {
Log(1).stream() << "Either the transport, host, port or authentication is not given. Setting to default.\n";
if (env_ldms_xprt == NULL)
env_ldms_xprt = "sock";

if (env_ldms_xprt == NULL)
env_ldms_xprt = "sock";
if (env_ldms_host == NULL)
env_ldms_host = "localhost";

if (env_ldms_host == NULL)
env_ldms_host = "localhost";
if (env_ldms_port == NULL)
env_ldms_port = "412";

if (env_ldms_port == NULL)
env_ldms_port = "412";
if (env_ldms_auth == NULL)
env_ldms_auth = "munge";

if (env_ldms_auth == NULL)
env_ldms_auth = "munge";
// Print configuration for debugging
if (getenv("CALIPER_LDMS_VERBOSE")) {
Log(1).stream() << "LDMS Configuration:" << std::endl;
Log(1).stream() << " Transport: " << env_ldms_xprt << std::endl;
Log(1).stream() << " Host: " << env_ldms_host << std::endl;
Log(1).stream() << " Port: " << env_ldms_port << std::endl;
Log(1).stream() << " Auth: " << env_ldms_auth << std::endl;
}

pthread_mutex_lock(&ln_lock);
Expand All @@ -149,10 +154,9 @@ void caliper_ldms_connector_initialize()
return;
}

void write_ldms_record(int mpi_rank, RegionProfile& profile)
void write_ldms_record(int mpi_rank, RegionProfile& profile, ldms_t ldms_conn)
{
caliper_ldms_connector_initialize();

// Connection is now passed as parameter, initialized once in post_init()
std::map<std::string, double> region_times;
double total_time = 0;

Expand Down Expand Up @@ -187,7 +191,7 @@ void write_ldms_record(int mpi_rank, RegionProfile& profile)
buffer,
buffer_size,
"{ \"timestamp\": %f, \"jobid\" : %d, \"rank\" : %d, \"procid\" : %s, \"nodelist\" : %s, "
"\"caliper-perf-data\", \"duration\": %f, \"path\": \"%s\"} \n",
"\"stream\": \"caliper-perf-data\", \"duration\": %f, \"path\": \"%s\"} \n",
unix_ts,
env_ldms_jobid,
mpi_rank,
Expand All @@ -201,7 +205,7 @@ void write_ldms_record(int mpi_rank, RegionProfile& profile)
buffer,
buffer_size,
"{ \"timestamp\": %f, \"jobid\" : %d, \"rank\": %d, \"procid\" : %s, \"nodelist\" : %s, "
"\"caliper-perf-data\", \"duration\": %f, \"path\": \"%s\"} \n",
"\"stream\": \"caliper-perf-data\", \"duration\": %f, \"path\": \"%s\"} \n",
unix_ts,
env_ldms_jobid,
0,
Expand All @@ -215,7 +219,7 @@ void write_ldms_record(int mpi_rank, RegionProfile& profile)
if (env_ldms_caliper_verbose > 0)
puts(buffer);

int rc = ldmsd_stream_publish(ldms_cali, "caliper-perf-data", LDMSD_STREAM_JSON, buffer, strlen(buffer) + 1);
int rc = ldmsd_stream_publish(ldms_conn, "caliper-perf-data", LDMSD_STREAM_JSON, buffer, strlen(buffer) + 1);

if (rc)
Log(0).stream() << "Error " << rc << " publishing data.\n";
Expand All @@ -227,20 +231,47 @@ void write_ldms_record(int mpi_rank, RegionProfile& profile)
class LdmsForwarder
{
RegionProfile profile;
ldms_t ldms_connection; // Store LDMS connection handle

void snapshot(Caliper* c)
{
Entry e = c->get(c->get_attribute("mpi.rank"));
int rank = e.empty() ? -1 : e.value().to_int();

write_ldms_record(rank, profile);
// Only publish if connection is available
if (ldms_connection) {
write_ldms_record(rank, profile, ldms_connection);
} else {
Log(0).stream() << "LDMS connection not available, skipping publish\n";
}

profile.clear(); // reset profile - skip to create a cumulative profile
}

void post_init(Caliper* c, Channel* channel) { profile.start(); }
void post_init(Caliper* c, Channel* channel) {
profile.start();

// Initialize LDMS connection once per instance
caliper_ldms_connector_initialize();
ldms_connection = ldms_cali; // Store global connection in instance

if (conn_status != 0) {
Log(0).stream() << channel->name() << ": Failed to initialize LDMS connection: " << conn_status << "\n";
ldms_connection = nullptr;
} else {
Log(1).stream() << channel->name() << ": LDMS connection established\n";
}
}

void cleanup() {
if (ldms_connection) {
ldms_xprt_close(ldms_connection);
ldms_connection = nullptr;
Log(1).stream() << "LDMS connection closed\n";
}
}

LdmsForwarder() {}
LdmsForwarder() : ldms_connection(nullptr) {}

public:

Expand All @@ -258,7 +289,10 @@ class LdmsForwarder
channel->events().snapshot.connect([instance](Caliper* c, SnapshotView, SnapshotBuilder&) {
instance->snapshot(c);
});
channel->events().finish_evt.connect([instance](Caliper* c, Channel* chn) { delete instance; });
channel->events().finish_evt.connect([instance](Caliper* c, Channel* chn) {
instance->cleanup();
delete instance;
});

Log(1).stream() << channel->name() << "Initialized LDMS forwarder\n";
}
Expand Down