diff --git a/cmake/FindLDMS.cmake b/cmake/FindLDMS.cmake index 01b1e02e..37e559ad 100644 --- a/cmake/FindLDMS.cmake +++ b/cmake/FindLDMS.cmake @@ -17,23 +17,38 @@ # LDMS_LIBRARIES The LDMS library # LDMS_INCLUDE_DIRS The location of LDMS headers +# Set up search paths based on LDMS_PREFIX +set(LDMS_SEARCH_PATHS /usr/lib64 /usr/lib /opt/ovis/lib) +set(LDMS_INCLUDE_SEARCH_PATHS /usr/include /usr/local/include /opt/ovis/include) + +if(LDMS_PREFIX) + list(INSERT LDMS_SEARCH_PATHS 0 "${LDMS_PREFIX}/lib") + list(INSERT LDMS_INCLUDE_SEARCH_PATHS 0 "${LDMS_PREFIX}/include") +endif() + find_library(LDMS_LIBLDMS NAMES ldms - PATHS /usr/lib64 /usr/lib /opt/ovis/lib + PATHS ${LDMS_SEARCH_PATHS} ) find_library(LDMS_LIBSTREAM NAMES ldmsd_stream - PATHS /usr/lib64 /usr/lib /opt/ovis/lib + PATHS ${LDMS_SEARCH_PATHS} ) find_path(LDMS_INCLUDE_DIRS - NAMES "ldms.h" "ldmsd_stream.h" - PATH_SUFFIXES "ldms" + NAMES "ldms/ldms.h" "ldms/ldmsd_stream.h" + PATHS ${LDMS_INCLUDE_SEARCH_PATHS} ) -message(STATUS "libldms.so => ${LDMS_LIBRARIES}") -message(STATUS "ldmsd_stream.h => ${LDMS_INCLUDE_DIRS}") -message(STATUS "ldms.h => ${LDMS_INCLUDE_DIRS}") + +message(STATUS "libldms.so => ${LDMS_LIBLDMS}") +message(STATUS "libldmsd_stream.so => ${LDMS_LIBSTREAM}") +message(STATUS "LDMS include dir => ${LDMS_INCLUDE_DIRS}") + +# Set LDMS_LIBRARIES to include both libraries +if(LDMS_LIBLDMS AND LDMS_LIBSTREAM) + set(LDMS_LIBRARIES ${LDMS_LIBLDMS} ${LDMS_LIBSTREAM}) +endif() include(FindPackageHandleStandardArgs) find_package_handle_standard_args(LDMS DEFAULT_MSG diff --git a/src/services/ldms/CMakeLists.txt b/src/services/ldms/CMakeLists.txt index 1de769ef..ac153ab6 100644 --- a/src/services/ldms/CMakeLists.txt +++ b/src/services/ldms/CMakeLists.txt @@ -1,11 +1,11 @@ -include_directories(${LDMS_INCLUDE_DIRS}) - set(CALIPER_LDMS_SOURCES LdmsForwarder.cpp) add_library(caliper-ldms OBJECT ${CALIPER_LDMS_SOURCES}) +target_include_directories(caliper-ldms PRIVATE ${LDMS_INCLUDE_DIRS}) + +# Add compiler flags to handle LDMS header compatibility issues +target_compile_options(caliper-ldms PRIVATE -fpermissive) add_service_objlib("caliper-ldms") add_caliper_service("ldms CALIPER_HAVE_LDMS") - - diff --git a/src/services/ldms/LdmsForwarder.cpp b/src/services/ldms/LdmsForwarder.cpp index 7a2353fd..3d2cbded 100644 --- a/src/services/ldms/LdmsForwarder.cpp +++ b/src/services/ldms/LdmsForwarder.cpp @@ -47,11 +47,11 @@ static void event_cb(ldms_t x, ldms_xprt_event_t e, void* cb_arg) conn_status = 0; break; case LDMS_XPRT_EVENT_REJECTED: - ldms_xprt_put(x); + ldms_xprt_put(x, "tst"); conn_status = ECONNREFUSED; break; case LDMS_XPRT_EVENT_DISCONNECTED: - ldms_xprt_put(x); + ldms_xprt_put(x, "tst"); conn_status = ENOTCONN; break; case LDMS_XPRT_EVENT_ERROR: @@ -89,7 +89,7 @@ ldms_t setup_connection(const char* xprt, const char* host, const char* port, co ts.tv_nsec = 0; } - ldms_g = ldms_xprt_new_with_auth(xprt, NULL, auth, NULL); + ldms_g = ldms_xprt_new_with_auth(xprt, auth, NULL); if (!ldms_g) { printf("Error %d creating the '%s' transport\n", errno, xprt); return NULL; @@ -118,20 +118,25 @@ void caliper_ldms_connector_initialize() const char* env_ldms_auth = getenv("CALIPER_LDMS_AUTH"); /* Check/set LDMS transport type */ - if (!env_ldms_xprt || !env_ldms_host || !env_ldms_port || !env_ldms_auth) { - Log(1).stream() << "Either the transport, host, port or authentication is not given. Setting to default.\n"; + if (env_ldms_xprt == NULL) + env_ldms_xprt = "sock"; - if (env_ldms_xprt == NULL) - env_ldms_xprt = "sock"; + if (env_ldms_host == NULL) + env_ldms_host = "localhost"; - if (env_ldms_host == NULL) - env_ldms_host = "localhost"; + if (env_ldms_port == NULL) + env_ldms_port = "412"; - if (env_ldms_port == NULL) - env_ldms_port = "412"; + if (env_ldms_auth == NULL) + env_ldms_auth = "munge"; - if (env_ldms_auth == NULL) - env_ldms_auth = "munge"; + // Print configuration for debugging + if (getenv("CALIPER_LDMS_VERBOSE")) { + Log(1).stream() << "LDMS Configuration:" << std::endl; + Log(1).stream() << " Transport: " << env_ldms_xprt << std::endl; + Log(1).stream() << " Host: " << env_ldms_host << std::endl; + Log(1).stream() << " Port: " << env_ldms_port << std::endl; + Log(1).stream() << " Auth: " << env_ldms_auth << std::endl; } pthread_mutex_lock(&ln_lock); @@ -149,10 +154,9 @@ void caliper_ldms_connector_initialize() return; } -void write_ldms_record(int mpi_rank, RegionProfile& profile) +void write_ldms_record(int mpi_rank, RegionProfile& profile, ldms_t ldms_conn) { - caliper_ldms_connector_initialize(); - + // Connection is now passed as parameter, initialized once in post_init() std::map region_times; double total_time = 0; @@ -187,7 +191,7 @@ void write_ldms_record(int mpi_rank, RegionProfile& profile) buffer, buffer_size, "{ \"timestamp\": %f, \"jobid\" : %d, \"rank\" : %d, \"procid\" : %s, \"nodelist\" : %s, " - "\"caliper-perf-data\", \"duration\": %f, \"path\": \"%s\"} \n", + "\"stream\": \"caliper-perf-data\", \"duration\": %f, \"path\": \"%s\"} \n", unix_ts, env_ldms_jobid, mpi_rank, @@ -201,7 +205,7 @@ void write_ldms_record(int mpi_rank, RegionProfile& profile) buffer, buffer_size, "{ \"timestamp\": %f, \"jobid\" : %d, \"rank\": %d, \"procid\" : %s, \"nodelist\" : %s, " - "\"caliper-perf-data\", \"duration\": %f, \"path\": \"%s\"} \n", + "\"stream\": \"caliper-perf-data\", \"duration\": %f, \"path\": \"%s\"} \n", unix_ts, env_ldms_jobid, 0, @@ -215,7 +219,7 @@ void write_ldms_record(int mpi_rank, RegionProfile& profile) if (env_ldms_caliper_verbose > 0) puts(buffer); - int rc = ldmsd_stream_publish(ldms_cali, "caliper-perf-data", LDMSD_STREAM_JSON, buffer, strlen(buffer) + 1); + int rc = ldmsd_stream_publish(ldms_conn, "caliper-perf-data", LDMSD_STREAM_JSON, buffer, strlen(buffer) + 1); if (rc) Log(0).stream() << "Error " << rc << " publishing data.\n"; @@ -227,20 +231,47 @@ void write_ldms_record(int mpi_rank, RegionProfile& profile) class LdmsForwarder { RegionProfile profile; + ldms_t ldms_connection; // Store LDMS connection handle void snapshot(Caliper* c) { Entry e = c->get(c->get_attribute("mpi.rank")); int rank = e.empty() ? -1 : e.value().to_int(); - write_ldms_record(rank, profile); + // Only publish if connection is available + if (ldms_connection) { + write_ldms_record(rank, profile, ldms_connection); + } else { + Log(0).stream() << "LDMS connection not available, skipping publish\n"; + } profile.clear(); // reset profile - skip to create a cumulative profile } - void post_init(Caliper* c, Channel* channel) { profile.start(); } + void post_init(Caliper* c, Channel* channel) { + profile.start(); + + // Initialize LDMS connection once per instance + caliper_ldms_connector_initialize(); + ldms_connection = ldms_cali; // Store global connection in instance + + if (conn_status != 0) { + Log(0).stream() << channel->name() << ": Failed to initialize LDMS connection: " << conn_status << "\n"; + ldms_connection = nullptr; + } else { + Log(1).stream() << channel->name() << ": LDMS connection established\n"; + } + } + + void cleanup() { + if (ldms_connection) { + ldms_xprt_close(ldms_connection); + ldms_connection = nullptr; + Log(1).stream() << "LDMS connection closed\n"; + } + } - LdmsForwarder() {} + LdmsForwarder() : ldms_connection(nullptr) {} public: @@ -258,7 +289,10 @@ class LdmsForwarder channel->events().snapshot.connect([instance](Caliper* c, SnapshotView, SnapshotBuilder&) { instance->snapshot(c); }); - channel->events().finish_evt.connect([instance](Caliper* c, Channel* chn) { delete instance; }); + channel->events().finish_evt.connect([instance](Caliper* c, Channel* chn) { + instance->cleanup(); + delete instance; + }); Log(1).stream() << channel->name() << "Initialized LDMS forwarder\n"; }