diff --git a/VERSION.in b/VERSION.in index d40acaaea..e71519696 100644 --- a/VERSION.in +++ b/VERSION.in @@ -1 +1 @@ -1.15 +1.16 diff --git a/cuebot/src/main/java/com/imageworks/spcue/FrameDetail.java b/cuebot/src/main/java/com/imageworks/spcue/FrameDetail.java index 7fc4bc48a..eebd9d1b5 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/FrameDetail.java +++ b/cuebot/src/main/java/com/imageworks/spcue/FrameDetail.java @@ -27,6 +27,7 @@ public class FrameDetail extends FrameEntity implements FrameInterface { public int retryCount; public int exitStatus; public long maxRss; + public long maxPss; public int dispatchOrder; public String lastResource; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java index bad12e35a..f907536b2 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java @@ -326,10 +326,13 @@ public interface FrameDao { * @param f * @param maxRss * @param rss + * @param maxPss + * @param pss * @param lluTime * @throws FrameReservationException if the frame is locked by another thread. */ - void updateFrameMemoryUsageAndLluTime(FrameInterface f, long maxRss, long rss, long lluTime); + void updateFrameMemoryUsageAndLluTime(FrameInterface f, long maxRss, long rss, long maxPss, + long pss, long lluTime); /** * Attempt to put a exclusive row lock on the given frame. The frame must be in the specified diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/JobDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/JobDao.java index e99ee92fa..37736cbb4 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/JobDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/JobDao.java @@ -365,6 +365,15 @@ public interface JobDao { */ void updateMaxRSS(JobInterface job, long maxRss); + /** + * Update jobs max PSS. Only updates if the passed in value is greater than the current value of + * int_max_pss + * + * @param job + * @param maxPss + */ + void updateMaxPSS(JobInterface job, long maxPss); + /** * Inserts a key/value pair into the jobs env table * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java index 5d5433ada..847ae231e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java @@ -195,6 +195,16 @@ public interface LayerDao { */ void updateLayerMaxRSS(LayerInterface layer, long val, boolean force); + /** + * Updates the layer's maximum PSS value. If force is true, the max RSS is updated no matter + * what the current value is. If force is false, the value is only updated the val is greater + * than than the existing value. + * + * @param layer + * @param val + */ + void updateLayerMaxPSS(LayerInterface layer, long val, boolean force); + /** * Increases the value of the minimum memory when the supplied value is larger than the current * value diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java index be1fa3bb6..bf4ad01b4 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java @@ -133,8 +133,9 @@ public interface ProcDao { * @param usedKb * @param maxKb */ - void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss, long vsize, long maxVsize, - long usedGpuMemory, long maxUsedGpuMemory, long usedSwapMemory, byte[] children); + void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss, long pss, long maxPss, + long vsize, long maxVsize, long usedGpuMemory, long maxUsedGpuMemory, + long usedSwapMemory, byte[] children); /** * get aq virual proc from its unique id diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index 2ccaef16c..62e14746f 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -293,6 +293,7 @@ public FrameDetail mapRow(ResultSet rs, int rowNum) throws SQLException { frame.layerId = rs.getString("pk_layer"); frame.showId = rs.getString("pk_show"); frame.maxRss = rs.getLong("int_mem_max_used"); + frame.maxPss = rs.getLong("int_pss_max_used"); frame.name = rs.getString("str_name"); frame.number = rs.getInt("int_number"); frame.dispatchOrder = rs.getInt("int_dispatch_order"); @@ -709,14 +710,15 @@ public ResourceUsage getResourceUsage(FrameInterface f) { RESOURCE_USAGE_MAPPER, f.getFrameId()); } - private static final String UPDATE_FRAME_MEMORY_USAGE_AND_LLU_TIME = "UPDATE " + "frame " - + "SET " + "ts_updated = current_timestamp," + "int_mem_max_used = ?," - + "int_mem_used = ?," + "ts_llu = ? " + "WHERE " + "pk_frame = ? "; + private static final String UPDATE_FRAME_MEMORY_USAGE_AND_LLU_TIME = + "UPDATE " + "frame " + "SET " + "ts_updated = current_timestamp," + + "int_mem_max_used = ?," + "int_mem_used = ?," + "int_pss_max_used = ?," + + "int_pss_used = ?," + "ts_llu = ? " + "WHERE " + "pk_frame = ? "; @Override public void updateFrameMemoryUsageAndLluTime(FrameInterface f, long maxRss, long rss, - long lluTime) { - getJdbcTemplate().update(UPDATE_FRAME_MEMORY_USAGE_AND_LLU_TIME, maxRss, rss, + long maxPss, long pss, long lluTime) { + getJdbcTemplate().update(UPDATE_FRAME_MEMORY_USAGE_AND_LLU_TIME, maxRss, rss, maxPss, pss, new Timestamp(lluTime * 1000l), f.getFrameId()); } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/JobDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/JobDaoJdbc.java index 34962ca99..4c8520c7a 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/JobDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/JobDaoJdbc.java @@ -361,6 +361,12 @@ public void updateMaxRSS(JobInterface job, long value) { job.getJobId(), value); } + public void updateMaxPSS(JobInterface job, long value) { + getJdbcTemplate().update( + "UPDATE job_mem SET int_max_pss=? WHERE pk_job=? AND int_max_pss < ?", value, + job.getJobId(), value); + } + private static final String UPDATE_JOB_FINISHED = "UPDATE " + "job " + "SET " + "str_state = ?, " + "str_visible_name = NULL, " + "ts_stopped = current_timestamp " + "WHERE " + "str_state = 'PENDING' " + "AND " + "pk_job = ?"; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java index d9ef93e2b..910999a48 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java @@ -357,6 +357,22 @@ public void updateLayerMaxRSS(LayerInterface layer, long val, boolean force) { getJdbcTemplate().update(sb.toString(), options); } + private static final String UPDATE_LAYER_MAX_PSS = + "UPDATE " + "layer_mem " + "SET " + "int_max_pss = ? " + "WHERE " + "pk_layer = ?"; + + @Override + public void updateLayerMaxPSS(LayerInterface layer, long val, boolean force) { + StringBuilder sb = new StringBuilder(UPDATE_LAYER_MAX_PSS); + Object[] options; + if (!force) { + options = new Object[] {val, layer.getLayerId(), val}; + sb.append(" AND int_max_pss < ?"); + } else { + options = new Object[] {val, layer.getLayerId()}; + } + getJdbcTemplate().update(sb.toString(), options); + } + @Override public void updateLayerTags(LayerInterface layer, Set tags) { if (tags.size() == 0) { diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/NestedWhiteboardDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/NestedWhiteboardDaoJdbc.java index 45767b4e2..52f7afe19 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/NestedWhiteboardDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/NestedWhiteboardDaoJdbc.java @@ -86,8 +86,8 @@ public CachedJobWhiteboardMapper(NestedJobWhiteboardMapper result) { + "(job_resource.int_gpus + job_resource.int_local_gpus) AS int_gpus, " + "job_resource.int_min_cores, " + "job_resource.int_min_gpus, " + "job_resource.int_max_cores, " + "job_resource.int_max_gpus, " - + "job_mem.int_max_rss " + "FROM " + "show, " + "dept, " + "folder_level, " - + "folder_resource, " + "folder " + "LEFT JOIN " + "job " + "ON " + + "job_mem.int_max_rss, " + "job_mem.int_max_pss " + "FROM " + "show, " + "dept, " + + "folder_level, " + "folder_resource, " + "folder " + "LEFT JOIN " + "job " + "ON " + " (folder.pk_folder = job.pk_folder AND job.str_state='PENDING') " + "LEFT JOIN " + "facility " + "ON " + "(job.pk_facility = facility.pk_facility) " + "LEFT JOIN " + "job_stat " + "ON " + "(job.pk_job = job_stat.pk_job) " + "LEFT JOIN " @@ -290,6 +290,7 @@ private static final NestedJob mapResultSetToJob(ResultSet rs) throws SQLExcepti + "host_stat.int_load, " + "proc.pk_proc, " + "proc.int_cores_reserved AS proc_cores, " + "proc.int_gpus_reserved AS proc_gpus, " + "proc.int_mem_reserved AS proc_memory, " + "proc.int_mem_used AS used_memory, " + "proc.int_mem_max_used AS max_memory, " + + "proc.int_pss_used AS used_pss, " + "proc.int_pss_max_used AS max_pss, " + "proc.int_gpu_mem_reserved AS proc_gpu_memory, " + "proc.ts_ping, " + "proc.ts_booked, " + "proc.ts_dispatched, " + "proc.b_unbooked, " + "redirect.str_name AS str_redirect, " + "job.str_name AS job_name, " @@ -390,6 +391,8 @@ public NestedHost mapRow(ResultSet rs, int row) throws SQLException { .setReservedMemory(rs.getLong("proc_memory")) .setReservedGpuMemory(rs.getLong("proc_gpu_memory")) .setUsedMemory(rs.getLong("used_memory")) + .setUsedPss(rs.getLong("used_pss")) + .setMaxPss(rs.getLong("max_pss")) .setFrameName(rs.getString("frame_name")) .setJobName(rs.getString("job_name")) .setShowName(rs.getString("show_name")) diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java index a35f75c7b..a1874a01c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java @@ -171,16 +171,16 @@ public boolean clearVirtualProcAssignment(FrameInterface frame) { frame.getFrameId()) == 1; } - private static final String UPDATE_PROC_MEMORY_USAGE = - "UPDATE " + "proc " + "SET " + "int_mem_used = ?, " + "int_mem_max_used = ?," - + "int_virt_used = ?, " + "int_virt_max_used = ?, " + "int_gpu_mem_used = ?, " - + "int_gpu_mem_max_used = ?, " + "int_swap_used = ?, " + "bytea_children = ?, " - + "ts_ping = current_timestamp " + "WHERE " + "pk_frame = ?"; + private static final String UPDATE_PROC_MEMORY_USAGE = "UPDATE " + "proc " + "SET " + + "int_mem_used = ?, " + "int_mem_max_used = ?," + "int_pss_used = ?, " + + "int_pss_max_used = ?, " + "int_virt_used = ?, " + "int_virt_max_used = ?, " + + "int_gpu_mem_used = ?, " + "int_gpu_mem_max_used = ?, " + "int_swap_used = ?, " + + "bytea_children = ?, " + "ts_ping = current_timestamp " + "WHERE " + "pk_frame = ?"; @Override - public void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss, long vss, - long maxVss, long usedGpuMemory, long maxUsedGpuMemory, long usedSwapMemory, - byte[] children) { + public void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss, long pss, + long maxPss, long vss, long maxVss, long usedGpuMemory, long maxUsedGpuMemory, + long usedSwapMemory, byte[] children) { /* * This method is going to repeat for a proc every 1 minute, so if the proc is being touched * by another thread, then return quietly without updating memory usage. @@ -202,13 +202,15 @@ public PreparedStatement createPreparedStatement(Connection conn) conn.prepareStatement(UPDATE_PROC_MEMORY_USAGE); updateProc.setLong(1, rss); updateProc.setLong(2, maxRss); - updateProc.setLong(3, vss); - updateProc.setLong(4, maxVss); - updateProc.setLong(5, usedGpuMemory); - updateProc.setLong(6, maxUsedGpuMemory); - updateProc.setLong(7, usedSwapMemory); - updateProc.setBytes(8, children); - updateProc.setString(9, f.getFrameId()); + updateProc.setLong(3, pss); + updateProc.setLong(4, maxPss); + updateProc.setLong(5, vss); + updateProc.setLong(6, maxVss); + updateProc.setLong(7, usedGpuMemory); + updateProc.setLong(8, maxUsedGpuMemory); + updateProc.setLong(9, usedSwapMemory); + updateProc.setBytes(10, children); + updateProc.setString(11, f.getFrameId()); return updateProc; } }); diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java index 9e7b8ee95..d26f44360 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java @@ -1124,7 +1124,7 @@ public static JobStats mapJobStats(ResultSet rs) throws SQLException { JobStats.Builder statsBuilder = JobStats.newBuilder() .setReservedCores(Convert.coreUnitsToCores(rs.getInt("int_cores"))) .setReservedGpus(rs.getInt("int_gpus")).setMaxRss(rs.getLong("int_max_rss")) - .setTotalFrames(rs.getInt("int_frame_count")) + .setMaxPss(rs.getLong("int_max_pss")).setTotalFrames(rs.getInt("int_frame_count")) .setTotalLayers(rs.getInt("int_layer_count")) .setWaitingFrames(rs.getInt("int_waiting_count")) .setRunningFrames(rs.getInt("int_running_count")) @@ -1188,6 +1188,7 @@ public Layer mapRow(ResultSet rs, int rowNum) throws SQLException { LayerStats.Builder statsBuilder = LayerStats.newBuilder() .setReservedCores(Convert.coreUnitsToCores(rs.getInt("int_cores"))) .setReservedGpus(rs.getInt("int_gpus")).setMaxRss(rs.getLong("int_max_rss")) + .setMaxPss(rs.getLong("int_max_pss")) .setTotalFrames(rs.getInt("int_total_count")) .setWaitingFrames(rs.getInt("int_waiting_count")) .setRunningFrames(rs.getInt("int_running_count")) @@ -1253,6 +1254,7 @@ public UpdatedFrame mapRow(ResultSet rs, int rowNum) throws SQLException { UpdatedFrame.newBuilder().setId(SqlUtil.getString(rs, "pk_frame")) .setExitStatus(rs.getInt("int_exit_status")) .setMaxRss(rs.getInt("int_mem_max_used")) + .setMaxPss(rs.getLong("int_pss_max_used")) .setRetryCount(rs.getInt("int_retries")) .setState( FrameState.valueOf(SqlUtil.getString(rs, "str_state"))) @@ -1309,8 +1311,10 @@ public Frame mapRow(ResultSet rs, int rowNum) throws SQLException { .setState(FrameState.valueOf(SqlUtil.getString(rs, "str_state"))) .setLayerName(SqlUtil.getString(rs, "layer_name")) .setUsedMemory(rs.getLong("int_mem_used")) + .setUsedPss(rs.getLong("int_pss_used")) .setReservedMemory(rs.getLong("int_mem_reserved")) .setReservedGpuMemory(rs.getLong("int_gpu_mem_reserved")) + .setMaxPss(rs.getLong("int_pss_max_used")) .setCheckpointState( CheckpointState.valueOf(SqlUtil.getString(rs, "str_checkpoint_state"))) .setCheckpointCount(rs.getInt("int_checkpoint_count")); @@ -1454,7 +1458,8 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException { + "frame.int_dispatch_order," + "frame.ts_started," + "frame.ts_stopped," + "frame.ts_llu," + "frame.int_retries," + "frame.str_state," + "frame.str_host," + "frame.int_cores," + "frame.int_gpus," + "frame.int_mem_max_used," - + "frame.int_mem_used, " + "frame.int_mem_reserved, " + "frame.int_gpu_mem_reserved, " + + "frame.int_mem_used, " + "frame.int_pss_max_used," + "frame.int_pss_used, " + + "frame.int_mem_reserved, " + "frame.int_gpu_mem_reserved, " + "frame.str_checkpoint_state," + "frame.int_checkpoint_count," + "frame.int_total_past_core_time," + "frame.int_total_past_gpu_time," + "layer.str_name AS layer_name," + "job.str_name AS job_name," @@ -1496,6 +1501,7 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException { + "frame.str_host," + "frame.int_cores," + "frame.int_gpus," + "frame.ts_llu," + "COALESCE(proc.int_mem_max_used, frame.int_mem_max_used) AS int_mem_max_used," + "COALESCE(proc.int_mem_used, frame.int_mem_used) AS int_mem_used," + + "COALESCE(proc.int_pss_max_used, frame.int_pss_max_used) AS int_pss_max_used," + "frame_state_display_overrides.* " + "FROM " + "job, " + "layer," + "frame " + "LEFT JOIN proc ON (proc.pk_frame = frame.pk_frame) " + "LEFT JOIN frame_state_display_overrides ON " @@ -1608,7 +1614,7 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException { + "job_usage.int_core_time_fail, " + "job_usage.int_gpu_time_success, " + "job_usage.int_gpu_time_fail, " + "job_usage.int_frame_success_count, " + "job_usage.int_frame_fail_count, " + "job_usage.int_clock_time_high," - + "job_usage.int_clock_time_success," + "job_mem.int_max_rss," + + "job_usage.int_clock_time_success," + "job_mem.int_max_rss," + "job_mem.int_max_pss," + "(job_resource.int_cores + job_resource.int_local_cores) AS int_cores," + "(job_resource.int_gpus + job_resource.int_local_gpus) AS int_gpus, " + "job.str_loki_url " + "FROM " + "job," + "folder," + "show," + "facility," @@ -1627,11 +1633,11 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException { + "layer_usage.int_frame_success_count, " + "layer_usage.int_frame_fail_count, " + "layer_usage.int_clock_time_low, " + "layer_usage.int_clock_time_high," + "layer_usage.int_clock_time_success," + "layer_usage.int_clock_time_fail," - + "layer_mem.int_max_rss," + "layer_resource.int_cores," + "layer_resource.int_gpus " - + "FROM " + "layer, " + "job," + "layer_stat, " + "layer_resource, " + "layer_usage, " - + "layer_mem " + "WHERE " + "layer.pk_job = job.pk_job " + "AND " - + "layer.pk_layer = layer_stat.pk_layer " + "AND " - + "layer.pk_layer = layer_resource.pk_layer " + "AND " + + "layer_mem.int_max_rss," + "layer_mem.int_max_pss," + "layer_resource.int_cores," + + "layer_resource.int_gpus " + "FROM " + "layer, " + "job," + "layer_stat, " + + "layer_resource, " + "layer_usage, " + "layer_mem " + "WHERE " + + "layer.pk_job = job.pk_job " + "AND " + "layer.pk_layer = layer_stat.pk_layer " + + "AND " + "layer.pk_layer = layer_resource.pk_layer " + "AND " + "layer.pk_layer = layer_usage.pk_layer " + "AND " + "layer.pk_layer = layer_mem.pk_layer"; @@ -1645,7 +1651,7 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException { + "layer_usage.int_frame_fail_count, " + "layer_usage.int_clock_time_low, " + "layer_usage.int_clock_time_high, " + "layer_usage.int_clock_time_success, " + "layer_usage.int_clock_time_fail, " + "layer_mem.int_max_rss, " - + "layer_resource.int_cores, " + "layer_resource.int_gpus, " + + "layer_mem.int_max_pss, " + "layer_resource.int_cores, " + "layer_resource.int_gpus, " + "limit_names.str_limit_names " + "FROM " + "layer " + "JOIN " + "job ON layer.pk_job = job.pk_job " + "JOIN " + "layer_stat ON layer.pk_layer = layer_stat.pk_layer " + "JOIN " @@ -1746,8 +1752,8 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException { + "frame.int_number," + "frame.int_dispatch_order," + "frame.ts_started," + "frame.ts_stopped," + "frame.ts_llu," + "frame.int_retries," + "frame.str_state," + "frame.str_host," + "frame.int_cores," - + "frame.int_mem_max_used," + "frame.int_mem_used, " - + "frame.int_mem_reserved, " + "frame.int_gpus," + + "frame.int_mem_max_used," + "frame.int_mem_used, " + "frame.int_pss_max_used," + + "frame.int_pss_used, " + "frame.int_mem_reserved, " + "frame.int_gpus," + "frame.int_gpu_mem_max_used, " + "frame.int_gpu_mem_used, " + "frame.int_gpu_mem_reserved, " + "frame.str_checkpoint_state," + "frame.int_checkpoint_count," + "frame.int_total_past_core_time," diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java index 663698c1d..d25b71e3b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java @@ -389,10 +389,12 @@ public interface DispatchSupport { * @param frame * @param rss * @param maxRss + * @param pss + * @param maxPss * @param lluTime */ - void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRss, - long lluTime); + void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRss, long pss, + long maxPss, long lluTime); /** * Update memory usage data for a given frame's proc record. The frame is used to update the @@ -401,14 +403,16 @@ void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRs * @param frame * @param rss * @param maxRss + * @param pss + * @param maxPss * @param vsize * @param maxVsize * @param usedGpuMemory * @param maxUsedGpuMemory */ - void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss, long vsize, - long maxVsize, long usedGpuMemory, long maxUsedGpuMemory, long usedSwapMemory, - byte[] children); + void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss, long pss, long maxPss, + long vsize, long maxVsize, long usedGpuMemory, long maxUsedGpuMemory, + long usedSwapMemory, byte[] children); /** * Return true if adding the given core units would put the show over its burst value. diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 3eb1ad3f2..3eb327e0f 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -554,20 +554,20 @@ public void lostProc(VirtualProc proc, String reason, int exitStatus) { @Override @Transactional(propagation = Propagation.REQUIRED) - public void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss, long vsize, - long maxVsize, long usedGpuMemory, long maxUsedGpuMemory, long usedSwapMemory, - byte[] children) { - procDao.updateProcMemoryUsage(frame, rss, maxRss, vsize, maxVsize, usedGpuMemory, - maxUsedGpuMemory, usedSwapMemory, children); + public void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss, long pss, + long maxPss, long vsize, long maxVsize, long usedGpuMemory, long maxUsedGpuMemory, + long usedSwapMemory, byte[] children) { + procDao.updateProcMemoryUsage(frame, rss, maxRss, pss, maxPss, vsize, maxVsize, + usedGpuMemory, maxUsedGpuMemory, usedSwapMemory, children); } @Override @Transactional(propagation = Propagation.REQUIRED) public void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRss, - long lluTime) { + long pss, long maxPss, long lluTime) { try { - frameDao.updateFrameMemoryUsageAndLluTime(frame, maxRss, rss, lluTime); + frameDao.updateFrameMemoryUsageAndLluTime(frame, maxRss, rss, maxPss, pss, lluTime); } catch (FrameReservationException ex) { // Eat this, the frame was not in the correct state or // was locked by another thread. The only reason it would diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 7bd73ef3d..424ed87e2 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -862,11 +862,12 @@ private void updateMemoryUsageAndLluTime(List rFrames) { FrameInterface frame = jobManager.getFrame(rf.getFrameId()); dispatchSupport.updateFrameMemoryUsageAndLluTime(frame, rf.getRss(), rf.getMaxRss(), - rf.getLluTime()); + rf.getPss(), rf.getMaxPss(), rf.getLluTime()); - dispatchSupport.updateProcMemoryUsage(frame, rf.getRss(), rf.getMaxRss(), rf.getVsize(), - rf.getMaxVsize(), rf.getUsedGpuMemory(), rf.getMaxUsedGpuMemory(), - rf.getUsedSwapMemory(), rf.getChildren().toByteArray()); + dispatchSupport.updateProcMemoryUsage(frame, rf.getRss(), rf.getMaxRss(), rf.getPss(), + rf.getMaxPss(), rf.getVsize(), rf.getMaxVsize(), rf.getUsedGpuMemory(), + rf.getMaxUsedGpuMemory(), rf.getUsedSwapMemory(), + rf.getChildren().toByteArray()); } updateJobMemoryUsage(rFrames); @@ -880,6 +881,7 @@ private void updateMemoryUsageAndLluTime(List rFrames) { */ private void updateJobMemoryUsage(List frames) { final Map jobs = new HashMap(frames.size()); + final Map jobsPss = new HashMap(frames.size()); for (RunningFrameInfo frame : frames) { JobEntity job = new JobEntity(frame.getJobId()); @@ -890,11 +892,23 @@ private void updateJobMemoryUsage(List frames) { } else { jobs.put(job, frame.getMaxRss()); } + + if (jobsPss.containsKey(job)) { + if (jobsPss.get(job) < frame.getMaxPss()) { + jobsPss.put(job, frame.getMaxPss()); + } + } else { + jobsPss.put(job, frame.getMaxPss()); + } } for (Map.Entry set : jobs.entrySet()) { jobDao.updateMaxRSS(set.getKey(), set.getValue()); } + + for (Map.Entry set : jobsPss.entrySet()) { + jobDao.updateMaxPSS(set.getKey(), set.getValue()); + } } /** @@ -904,6 +918,7 @@ private void updateJobMemoryUsage(List frames) { */ private void updateLayerMemoryUsage(List frames) { final Map layers = new HashMap(frames.size()); + final Map layersPss = new HashMap(frames.size()); for (RunningFrameInfo frame : frames) { LayerEntity layer = new LayerEntity(frame.getLayerId()); @@ -914,6 +929,14 @@ private void updateLayerMemoryUsage(List frames) { } else { layers.put(layer, frame.getMaxRss()); } + + if (layersPss.containsKey(layer)) { + if (layersPss.get(layer) < frame.getMaxPss()) { + layersPss.put(layer, frame.getMaxPss()); + } + } else { + layersPss.put(layer, frame.getMaxPss()); + } } /* Attempt to update the max RSS value for the job **/ @@ -921,6 +944,11 @@ private void updateLayerMemoryUsage(List frames) { layerDao.increaseLayerMinMemory(set.getKey(), set.getValue()); layerDao.updateLayerMaxRSS(set.getKey(), set.getValue(), false); } + + /* Attempt to update the max PSS value for the layer **/ + for (Map.Entry set : layersPss.entrySet()) { + layerDao.updateLayerMaxPSS(set.getKey(), set.getValue(), false); + } } /** diff --git a/cuebot/src/main/java/com/imageworks/spcue/monitoring/MonitoringEventBuilder.java b/cuebot/src/main/java/com/imageworks/spcue/monitoring/MonitoringEventBuilder.java index b039beab7..f39ef4547 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/monitoring/MonitoringEventBuilder.java +++ b/cuebot/src/main/java/com/imageworks/spcue/monitoring/MonitoringEventBuilder.java @@ -155,6 +155,7 @@ public FrameEvent buildFrameCompleteEvent(FrameCompleteReport report, FrameState .setStartTime((int) report.getFrame().getStartTime()) .setStopTime((int) (System.currentTimeMillis() / 1000)) .setMaxRss(report.getFrame().getMaxRss()).setUsedMemory(report.getFrame().getRss()) + .setMaxPss(report.getFrame().getMaxPss()).setUsedPss(report.getFrame().getPss()) .setReservedMemory(proc.memoryReserved).setReservedGpuMemory(proc.gpuMemoryReserved) .setLluTime((int) report.getFrame().getLluTime()) .setMaxGpuMemory(report.getFrame().getMaxUsedGpuMemory()) @@ -232,6 +233,9 @@ public FrameEvent buildFrameEvent(EventType eventType, FrameDetail frame, String if (frame.maxRss > 0) { frameBuilder.setMaxRss(frame.maxRss); } + if (frame.maxPss > 0) { + frameBuilder.setMaxPss(frame.maxPss); + } if (frame.lastResource != null) { frameBuilder.setLastResource(frame.lastResource); } diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V37__Add_pss_memory_fields.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V37__Add_pss_memory_fields.sql new file mode 100644 index 000000000..340a6bfef --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V37__Add_pss_memory_fields.sql @@ -0,0 +1,33 @@ +-- Add PSS (Proportional Set Size) memory tracking fields +-- PSS provides a more accurate view of memory usage by proportionally +-- accounting for shared memory pages + +-- Add PSS fields to layer_mem table +ALTER TABLE layer_mem + ADD int_max_pss BIGINT DEFAULT 0; + +CREATE INDEX i_max_pss_layer ON layer_mem (int_max_pss); + +-- Add PSS fields to job_mem table +ALTER TABLE job_mem + ADD int_max_pss BIGINT DEFAULT 0; + +CREATE INDEX i_max_pss_job ON job_mem (int_max_pss); + +-- Add PSS fields to job_resource table +ALTER TABLE job_resource + ADD int_max_pss BIGINT DEFAULT 0; + +-- Add PSS fields to layer_resource table +ALTER TABLE layer_resource + ADD int_max_pss BIGINT DEFAULT 0; + +-- Add PSS fields to proc table +ALTER TABLE proc + ADD int_pss_max_used BIGINT DEFAULT 0, + ADD int_pss_used BIGINT DEFAULT 0; + +-- Add PSS fields to frame table +ALTER TABLE frame + ADD int_pss_max_used BIGINT DEFAULT 0, + ADD int_pss_used BIGINT DEFAULT 0; diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/criteria/FrameSearchTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/criteria/FrameSearchTests.java index 879b52195..171ff8013 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/criteria/FrameSearchTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/criteria/FrameSearchTests.java @@ -226,7 +226,8 @@ public void filterByMemoryRange() { IntStream.range(1, 11).forEach(i -> { FrameInterface frame = frameDao.findFrame(layer, i); frameDao.updateFrameState(frame, FrameState.RUNNING); - frameDao.updateFrameMemoryUsageAndLluTime(frame, CueUtil.GB * 5, CueUtil.GB, 0); + frameDao.updateFrameMemoryUsageAndLluTime(frame, CueUtil.GB * 5, CueUtil.GB, + CueUtil.GB * 5, CueUtil.GB, 0); }); FrameSearchInterface frameSearch = frameSearchFactory.create(); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/ProcDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/ProcDaoTests.java index b3e930d5a..a916391d3 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/ProcDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/ProcDaoTests.java @@ -319,7 +319,7 @@ public void testUpdateProcMemoryUsage() { procDao.verifyRunningProc(proc.getId(), frame.getId()); byte[] children = new byte[100]; - procDao.updateProcMemoryUsage(frame, 100, 100, 1000, 1000, 0, 0, 0, children); + procDao.updateProcMemoryUsage(frame, 100, 100, 100, 100, 1000, 1000, 0, 0, 0, children); } @@ -608,7 +608,8 @@ public void testBalanceUnderUtilizedProcs() { procDao.insertVirtualProc(proc1); byte[] children = new byte[100]; - procDao.updateProcMemoryUsage(frame1, 250000, 250000, 250000, 250000, 0, 0, 0, children); + procDao.updateProcMemoryUsage(frame1, 250000, 250000, 250000, 250000, 250000, 250000, 0, 0, + 0, children); layerDao.updateLayerMaxRSS(frame1, 250000, true); FrameDetail frameDetail2 = frameDao.findFrameDetail(job, "0002-pass_1"); @@ -618,7 +619,8 @@ public void testBalanceUnderUtilizedProcs() { proc2.frameId = frame2.id; procDao.insertVirtualProc(proc2); - procDao.updateProcMemoryUsage(frame2, 255000, 255000, 255000, 255000, 0, 0, 0, children); + procDao.updateProcMemoryUsage(frame2, 255000, 255000, 250000, 250000, 255000, 255000, 0, 0, + 0, children); layerDao.updateLayerMaxRSS(frame2, 255000, true); FrameDetail frameDetail3 = frameDao.findFrameDetail(job, "0003-pass_1"); @@ -628,8 +630,8 @@ public void testBalanceUnderUtilizedProcs() { proc3.frameId = frame3.id; procDao.insertVirtualProc(proc3); - procDao.updateProcMemoryUsage(frame3, 3145728, 3145728, 3145728, 3145728, 0, 0, 0, - children); + procDao.updateProcMemoryUsage(frame3, 3145728, 3145728, 3145728, 3145728, 3145728, 3145728, + 0, 0, 0, children); layerDao.updateLayerMaxRSS(frame3, 300000, true); procDao.balanceUnderUtilizedProcs(proc3, 100000); diff --git a/cuegui/cuegui/FrameMonitorTree.py b/cuegui/cuegui/FrameMonitorTree.py index 11e1d1849..35026d2ac 100644 --- a/cuegui/cuegui/FrameMonitorTree.py +++ b/cuegui/cuegui/FrameMonitorTree.py @@ -161,7 +161,7 @@ def getFrameStateOverride(frame): "time without an update is an indication of a stuck\n" "frame for most types of jobs") - self.addColumn("Memory", 60, id=13, + self.addColumn("Memory (RSS)", 60, id=13, data=lambda job, frame: ( frame.data.state == opencue.api.job_pb2.RUNNING and cuegui.Utils.memoryToString(frame.data.used_memory) or @@ -169,11 +169,23 @@ def getFrameStateOverride(frame): sort=lambda job, frame: (frame.data.state == opencue.api.job_pb2.RUNNING and frame.data.used_memory or frame.data.max_rss), tip="If a frame is running:\n" - "\t The amount of memory currently used by the frame.\n" + "\t The amount of RSS memory currently used by the frame.\n" "If a frame is not running:\n" - "\t The most memory this frame has used at one time.") + "\t The most RSS memory this frame has used at one time.") - self.addColumn("GPU Memory", 60, id=14, + self.addColumn("Memory (PSS)", 60, id=14, + data=lambda job, frame: ( + frame.data.state == opencue.api.job_pb2.RUNNING and + cuegui.Utils.memoryToString(frame.data.used_pss) or + cuegui.Utils.memoryToString(frame.data.max_pss)), + sort=lambda job, frame: (frame.data.state == opencue.api.job_pb2.RUNNING and + frame.data.used_pss or frame.data.max_pss), + tip="If a frame is running:\n" + "\t The amount of PSS memory currently used by the frame.\n" + "If a frame is not running:\n" + "\t The most PSS memory this frame has used at one time.") + + self.addColumn("GPU Memory", 60, id=15, data=lambda job, frame: ( frame.data.state == opencue.api.job_pb2.RUNNING and cuegui.Utils.memoryToString(frame.data.used_gpu_memory) or @@ -186,7 +198,7 @@ def getFrameStateOverride(frame): "If a frame is not running:\n" "\t The most GPU memory this frame has used at one time.") - self.addColumn("Remain", 70, id=15, + self.addColumn("Remain", 70, id=16, data=lambda job, frame: (frame.data.state == opencue.api.job_pb2.RUNNING and self.frameEtaDataBuffer.getEtaFormatted(job, frame) or ""), @@ -194,16 +206,17 @@ def getFrameStateOverride(frame): self.frameEtaDataBuffer.getEta(job, frame) or -1), tip="Hours:Minutes:Seconds remaining.") - self.addColumn("Start Time", 100, id=16, + self.addColumn("Start Time", 100, id=17, data=lambda job, frame: (self.getTimeString(frame.data.start_time) or ""), sort=lambda job, frame: (self.getTimeString(frame.data.start_time) or ""), tip="The time the frame was started or retried.") - self.addColumn("Stop Time", 100, id=17, + + self.addColumn("Stop Time", 100, id=18, data=lambda job, frame: (self.getTimeString(frame.data.stop_time) or ""), sort=lambda job, frame: (self.getTimeString(frame.data.stop_time) or ""), tip="The time that the frame finished or died.") - self.addColumn("Last Line", 0, id=18, + self.addColumn("Last Line", 0, id=19, data=lambda job, frame: (frame.data.state == opencue.api.job_pb2.RUNNING and self.frameLogDataBuffer.getLastLineData( job, frame)[FrameLogDataBuffer.LASTLINE] or ""), diff --git a/cuegui/cuegui/JobMonitorTree.py b/cuegui/cuegui/JobMonitorTree.py index d72dbc457..ed08c4cc9 100644 --- a/cuegui/cuegui/JobMonitorTree.py +++ b/cuegui/cuegui/JobMonitorTree.py @@ -150,23 +150,28 @@ def __init__(self, parent): data=lambda job: cuegui.Utils.memoryToString(job.data.job_stats.max_rss), sort=lambda job: sortableKey(job.data.job_stats.max_rss, job.data.start_time), - tip="The maximum memory used any single frame in each job.") - self.addColumn("Age", 50, id=11, + tip="The maximum RSS memory used any single frame in each job.") + self.addColumn("MaxPss", 55, id=11, + data=lambda job: cuegui.Utils.memoryToString(job.data.job_stats.max_pss), + sort=lambda job: sortableKey(job.data.job_stats.max_pss, + job.data.start_time), + tip="The maximum PSS memory used any single frame in each job.") + self.addColumn("Age", 50, id=12, data=lambda job: (cuegui.Utils.secondsToHHHMM((job.data.stop_time or time.time()) - job.data.start_time)), sort=lambda job: ((job.data.stop_time or time.time()) - job.data.start_time), tip="The HOURS:MINUTES that the job has spent in the queue.") - self.addColumn("Launched", 100, id=12, + self.addColumn("Launched", 100, id=13, data=lambda job: cuegui.Utils.dateToMMDDHHMM(job.data.start_time), sort=lambda job: job.data.start_time, tip="The time when the job was launched.") - self.addColumn("Finished", 100, id=13, + self.addColumn("Finished", 100, id=14, data=lambda job: (job.data.stop_time > 0 and cuegui.Utils.dateToMMDDHHMM(job.data.stop_time) or ""), sort=lambda job: job.data.stop_time, tip="The time when the job ended.") - self.addColumn("Progress", 0, id=14, + self.addColumn("Progress", 0, id=15, delegate=cuegui.ItemDelegate.JobProgressBarDelegate, tip="A visual overview of the progress of each job.\n" "Green \t is succeeded\n" diff --git a/cuegui/cuegui/LayerMonitorTree.py b/cuegui/cuegui/LayerMonitorTree.py index 1d2ec0250..70b0c4fbf 100644 --- a/cuegui/cuegui/LayerMonitorTree.py +++ b/cuegui/cuegui/LayerMonitorTree.py @@ -98,55 +98,66 @@ def __init__(self, parent): "MaxRss", 60, id=10, data=lambda layer: cuegui.Utils.memoryToString(layer.data.layer_stats.max_rss), sort=lambda layer: layer.data.layer_stats.max_rss, - tip="Maximum amount of memory used by any frame in\n" + tip="Maximum amount of RSS memory used by any frame in\n" "this layer at any time since the job was launched.") - self.addColumn("Total", 40, id=11, + self.addColumn( + "MaxPss", + 60, + id=11, + data=lambda layer: cuegui.Utils.memoryToString( + layer.data.layer_stats.max_pss + ), + sort=lambda layer: layer.data.layer_stats.max_pss, + tip="Maximum amount of PSS memory used by any frame in\n" + "this layer at any time since the job was launched.", + ) + self.addColumn("Total", 40, id=12, data=lambda layer: layer.data.layer_stats.total_frames, sort=lambda layer: layer.data.layer_stats.total_frames, tip="Total number of frames in this layer.") - self.addColumn("Done", 40, id=12, + self.addColumn("Done", 40, id=13, data=lambda layer: layer.data.layer_stats.succeeded_frames, sort=lambda layer: layer.data.layer_stats.succeeded_frames, tip="Total number of done frames in this layer.") - self.addColumn("Run", 40, id=13, + self.addColumn("Run", 40, id=14, data=lambda layer: layer.data.layer_stats.running_frames, sort=lambda layer: layer.data.layer_stats.running_frames, tip="Total number or running frames in this layer.") - self.addColumn("Depend", 53, id=14, + self.addColumn("Depend", 53, id=15, data=lambda layer: layer.data.layer_stats.depend_frames, sort=lambda layer: layer.data.layer_stats.depend_frames, tip="Total number of dependent frames in this layer.") - self.addColumn("Wait", 40, id=15, + self.addColumn("Wait", 40, id=16, data=lambda layer: layer.data.layer_stats.waiting_frames, sort=lambda layer: layer.data.layer_stats.waiting_frames, tip="Total number of waiting frames in this layer.") - self.addColumn("Eaten", 40, id=16, + self.addColumn("Eaten", 40, id=17, data=lambda layer: layer.data.layer_stats.eaten_frames, sort=lambda layer: layer.data.layer_stats.eaten_frames, tip="Total number of eaten frames in this layer.") - self.addColumn("Dead", 40, id=17, + self.addColumn("Dead", 40, id=18, data=lambda layer: layer.data.layer_stats.dead_frames, sort=lambda layer: layer.data.layer_stats.dead_frames, tip="Total number of dead frames in this layer.") self.addColumn( - "Avg", 65, id=18, + "Avg", 65, id=19, data=lambda layer: cuegui.Utils.secondsToHHMMSS(layer.data.layer_stats.avg_frame_sec), sort=lambda layer: layer.data.layer_stats.avg_frame_sec, tip="Average number of HOURS:MINUTES:SECONDS per frame\nin this layer.") - self.addColumn("Tags", 100, id=19, + self.addColumn("Tags", 100, id=20, data=lambda layer: " | ".join(layer.data.tags), tip="The tags define what resources may be booked on\n" "frames in this layer.") - self.addColumn("Progress", 100, id=20, + self.addColumn("Progress", 100, id=21, delegate=cuegui.ItemDelegate.ProgressDelegate, data=lambda layer: layer.percentCompleted(), sort=lambda layer: layer.percentCompleted(), tip="Progress for the Layer") - self.addColumn("Timeout", 45, id=21, + self.addColumn("Timeout", 45, id=22, data=lambda layer: cuegui.Utils.secondsToHHHMM(layer.data.timeout*60), sort=lambda layer: layer.data.timeout, tip="Timeout for the frames, Hours:Minutes") - self.addColumn("Timeout LLU", 45, id=22, + self.addColumn("Timeout LLU", 45, id=23, data=lambda layer: cuegui.Utils.secondsToHHHMM(layer.data.timeout_llu*60), sort=lambda layer: layer.data.timeout_llu, tip="Timeout for a frames\' LLU, Hours:Minutes") diff --git a/cuegui/cuegui/ProcChildren.py b/cuegui/cuegui/ProcChildren.py index 65ea9de9b..87f35ef8a 100644 --- a/cuegui/cuegui/ProcChildren.py +++ b/cuegui/cuegui/ProcChildren.py @@ -38,7 +38,7 @@ class ProcChildren(QtWidgets.QWidget): """Widget for displaying Host statistics for a Proc's child processes.""" - HEADERS = ["PID", "Name", "Start Time", "Rss (KB)", "VSize (KB)", + HEADERS = ["PID", "Name", "Start Time", "Rss (KB)", "Pss (KB)", "VSize (KB)", "Statm Rss (KB)", "Statm Size (KB)", "Cmd line"] def __init__(self, job, layer, hosts, parent=None): @@ -107,6 +107,7 @@ def _addProc(self, entry): QtGui.QStandardItem(proc.stat.name), QtGui.QStandardItem(proc.start_time), QtGui.QStandardItem(str(proc.stat.rss)), + QtGui.QStandardItem(str(proc.stat.pss)), QtGui.QStandardItem(str(proc.stat.vsize)), QtGui.QStandardItem(str(proc.statm.rss)), QtGui.QStandardItem(str(proc.statm.size)), diff --git a/proto/src/host.proto b/proto/src/host.proto index b321fdec1..416e47320 100644 --- a/proto/src/host.proto +++ b/proto/src/host.proto @@ -348,6 +348,8 @@ message NestedProc { NestedHost parent = 18; int64 used_gpu_memory = 19; float reserved_gpus = 20; + int64 max_pss = 21; + int64 used_pss = 22; } message NestedProcSeq { diff --git a/proto/src/job.proto b/proto/src/job.proto index 4c76308fa..13efc0a3c 100644 --- a/proto/src/job.proto +++ b/proto/src/job.proto @@ -520,6 +520,8 @@ message Frame { int64 max_gpu_memory = 21; int64 used_gpu_memory = 22; FrameStateDisplayOverride frame_state_display_override = 23; + int64 max_pss = 24; + int64 used_pss = 25; } // Object for frame searching @@ -566,6 +568,7 @@ message UpdatedFrame { int64 max_gpu_memory = 11; int64 used_gpu_memory = 12; FrameStateDisplayOverride frame_state_display_override = 13; + int64 max_pss = 14; } message UpdatedFrameSeq { @@ -686,6 +689,7 @@ message JobStats { int64 failed_gpu_sec = 23; float reserved_gpus = 24; int64 max_gpu_memory = 25; + int64 max_pss = 26; } // LAYER ---- @@ -746,6 +750,7 @@ message LayerStats { int64 failed_gpu_sec = 23; float reserved_gpus = 24; int64 max_gpu_memory = 25; + int64 max_pss = 26; } // NestedGroup --- diff --git a/proto/src/report.proto b/proto/src/report.proto index 6ace5708e..88d8d17d6 100644 --- a/proto/src/report.proto +++ b/proto/src/report.proto @@ -107,6 +107,8 @@ message RunningFrameInfo { int64 used_gpu_memory = 17; // kB ChildrenProcStats children = 18; //additional data about the running frame's child processes int64 used_swap_memory = 19; // kB + int64 max_pss = 20; // kB + int64 pss = 21; // kB }; message ChildrenProcStats { @@ -122,6 +124,7 @@ message ProcStats { message Stat { int64 rss = 1; + int64 pss = 6; int64 vsize = 2; string state = 3; string name = 4; diff --git a/pycue/opencue/wrappers/job.py b/pycue/opencue/wrappers/job.py index 9b52c1437..f0d460db7 100644 --- a/pycue/opencue/wrappers/job.py +++ b/pycue/opencue/wrappers/job.py @@ -809,6 +809,15 @@ def maxRss(self): :return: most memory used by any frame in kB""" return self.data.job_stats.max_rss + def maxPss(self): + """Returns the highest amount of PSS that any frame in this job used. + + Value is within 5% of the actual highest frame. + + :rtype: long + :return: most PSS used by any frame in kB""" + return self.data.job_stats.max_pss + def shutdownIfCompleted(self): """Shutdown the job if it is completed.""" self.stub.ShutdownIfCompleted(job_pb2.JobShutdownIfCompletedRequest(job=self.data), diff --git a/pycue/opencue/wrappers/layer.py b/pycue/opencue/wrappers/layer.py index 8b3dfdf1b..3b5e9bcc2 100644 --- a/pycue/opencue/wrappers/layer.py +++ b/pycue/opencue/wrappers/layer.py @@ -474,6 +474,15 @@ def maxRss(self): :return: most memory used by any frame in this layer in kB""" return self.data.layer_stats.max_rss + def maxPss(self): + """Returns the highest amount of PSS that any frame in this layer used. + + Value is within 5% of the actual highest frame. + + :rtype: long + :return: most PSS used by any frame in this layer in kB""" + return self.data.layer_stats.max_pss + def type(self): """Returns the type of layer. diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 1f56a7021..fc9141359 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -11,7 +11,7 @@ resolver = "2" [workspace.package] authors = ["Diego Tavares "] edition = "2021" -version = "0.1.6" +version = "0.1.7" [workspace.dependencies] async-trait = "0.1" diff --git a/rust/config/rqd.yaml b/rust/config/rqd.yaml index 2107ea53e..f89e68506 100644 --- a/rust/config/rqd.yaml +++ b/rust/config/rqd.yaml @@ -140,24 +140,6 @@ machine: # # Override the operating system reported to Cuebot # os: "Rocky Linux 9" - # Memory metric to use for process memory measurement - # Options: "rss" (default) or "pss" - # - # RSS (Resident Set Size): Total physical memory used by a process, including - # shared libraries counted fully. This can overreport memory usage when - # multiple processes share the same libraries. - # - # PSS (Proportional Set Size): Divides shared memory proportionally among - # processes using it, providing more accurate memory accounting. For example, - # if three processes share a 30MB library, each process reports 10MB instead - # of 30MB. - # - # Note: PSS requires Linux kernel 4.14+ and reads from /proc/[pid]/smaps_rollup. - # If PSS is unavailable, RQD will automatically fall back to RSS. - # - # Default: rss - # memory_metric: "rss" - # ============================================================================= # RUNNER CONFIGURATION # ============================================================================= diff --git a/rust/crates/rqd/src/config/mod.rs b/rust/crates/rqd/src/config/mod.rs index 2210bc88d..6e0db744c 100644 --- a/rust/crates/rqd/src/config/mod.rs +++ b/rust/crates/rqd/src/config/mod.rs @@ -26,14 +26,6 @@ lazy_static! { } //===Config Types=== -#[derive(Default, Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] -#[serde(rename_all = "lowercase")] -pub enum MemoryMetric { - #[default] - Rss, - Pss, -} - #[derive(Debug, Deserialize, Clone)] #[serde(default)] pub struct LoggingConfig { @@ -110,7 +102,6 @@ pub struct MachineConfig { pub nimby_start_retry_interval: Duration, pub nimby_display_xauthority_path: String, pub memory_oom_margin_percentage: u32, - pub memory_metric: MemoryMetric, } impl Default for MachineConfig { @@ -134,7 +125,6 @@ impl Default for MachineConfig { nimby_start_retry_interval: Duration::from_secs(60 * 5), // 5 min nimby_display_xauthority_path: "/home/{username}/Xauthority".to_string(), memory_oom_margin_percentage: 96, - memory_metric: MemoryMetric::Rss, } } } diff --git a/rust/crates/rqd/src/frame/running_frame.rs b/rust/crates/rqd/src/frame/running_frame.rs index 396c589fd..584281c5a 100644 --- a/rust/crates/rqd/src/frame/running_frame.rs +++ b/rust/crates/rqd/src/frame/running_frame.rs @@ -1256,6 +1256,8 @@ Render Frame Completed start_time: start_time as i64, max_rss: (stats.max_rss / KIB) as i64, rss: (stats.rss / KIB) as i64, + max_pss: (stats.max_pss / KIB) as i64, + pss: (stats.pss / KIB) as i64, max_vsize: (stats.max_vsize / KIB) as i64, vsize: (stats.vsize / KIB) as i64, attributes: self.request.attributes.clone(), @@ -1498,8 +1500,7 @@ mod tests { .await; let elapsed = start.elapsed(); - assert!(status.is_ok()); - assert_eq!((0, None), status.unwrap()); + assert_eq!((0, None), status.expect("status should be OK")); assert!( elapsed >= Duration::from_millis(500), "Command didn't run for expected duration" diff --git a/rust/crates/rqd/src/system/linux.rs b/rust/crates/rqd/src/system/linux.rs index 2098e2a94..3cd428531 100644 --- a/rust/crates/rqd/src/system/linux.rs +++ b/rust/crates/rqd/src/system/linux.rs @@ -33,13 +33,10 @@ use opencue_proto::{ report::{ChildrenProcStats, ProcStats, Stat}, }; use sysinfo::{DiskRefreshKind, Disks, MemoryRefreshKind, RefreshKind}; -use tracing::{debug, info}; +use tracing::debug; use uuid::Uuid; -use crate::{ - config::{MachineConfig, MemoryMetric}, - system::reservation::ProcessorStructure, -}; +use crate::{config::MachineConfig, system::reservation::ProcessorStructure}; use super::manager::{MachineGpuStats, MachineStat, ProcessStats, SystemManager}; @@ -58,7 +55,8 @@ pub struct LinuxSystem { #[derive(Debug)] struct ProcessData { - memory: u64, + rss: u64, + pss: u64, virtual_memory: u64, cmd: String, state: String, @@ -114,8 +112,10 @@ pub struct MachineDynamicInfo { /// Aggregated Data refering to a process session struct SessionData { - /// Amount of memory used by all processes in this session - memory: u64, + /// Amount of memory used by all processes in this session calculated by rss + rss: u64, + /// Amount of memory used by all processes in this session calculated by pss + pss: u64, /// Amount of virtual memory used by all processes in this session virtual_memory: u64, /// Amount of gpu used by all processes in this session @@ -159,8 +159,6 @@ impl LinuxSystem { .into_diagnostic() .wrap_err("SC_CLK_TCK not available")?; - info!("Memory metric configured: {:?}", config.memory_metric); - Ok(Self { config: config.clone(), static_info: MachineStaticInfo { @@ -657,20 +655,8 @@ impl LinuxSystem { }; let virtual_memory = vsize.saturating_mul(self.static_info.page_size); - // Read memory based on configured metric - let memory = match self.config.memory_metric { - MemoryMetric::Pss => { - // Try PSS, fallback to RSS if unavailable - match self.read_pss(pid) { - Ok(pss) => pss, - Err(_) => rss, - } - } - MemoryMetric::Rss => { - // Original RSS logic - rss - } - }; + // Try PSS, fallback to RSS if unavailable + let pss = self.read_pss(pid).unwrap_or(rss); let (start_time, run_time) = self.calculate_process_time(start_time); @@ -684,7 +670,8 @@ impl LinuxSystem { let cmd = cmdline.replace('\0', " "); Ok(ProcessData { - memory, + rss, + pss, virtual_memory, cmd, state, @@ -737,61 +724,70 @@ impl LinuxSystem { })?; // If session owner is still alive, iterate over the session and calculate memory - let (memory, virtual_memory, gpu_memory, start_time, run_time) = match self - .session_processes - .get(session_id) - { - Some(ref lineage) => { - // Process session data - lineage - .iter() - .filter_map(|pid| { - match self.cached_processes.get(pid) { - Some(proc) if !proc.is_dead() => { - // Confirm this is a proc and not a thread - let start_time_str = DateTime::::from( - UNIX_EPOCH + Duration::from_secs(proc.start_time), - ) - .format("%Y-%m-%d %H:%M:%S") - .to_string(); - let proc_memory = proc.memory; - let proc_vmemory = proc.virtual_memory; - let cmdline = proc.cmd.clone(); - - // Check for potential duplicates - children.push(ProcStats { - stat: Some(Stat { - rss: proc_memory as i64, - vsize: proc_vmemory as i64, - state: proc.state.clone(), - name: proc.name.clone(), - pid: pid.to_string(), - }), - statm: None, - status: None, - cmdline, - start_time: start_time_str, - }); - Some((proc_memory, proc_vmemory, 0, proc.start_time, proc.run_time)) + let (rss, pss, virtual_memory, gpu_memory, start_time, run_time) = + match self.session_processes.get(session_id) { + Some(ref lineage) => { + // Process session data + lineage + .iter() + .filter_map(|pid| { + match self.cached_processes.get(pid) { + Some(proc) if !proc.is_dead() => { + // Confirm this is a proc and not a thread + let start_time_str = DateTime::::from( + UNIX_EPOCH + Duration::from_secs(proc.start_time), + ) + .format("%Y-%m-%d %H:%M:%S") + .to_string(); + let proc_rss = proc.rss; + let proc_pss = proc.pss; + let proc_vmemory = proc.virtual_memory; + let cmdline = proc.cmd.clone(); + + // Check for potential duplicates + children.push(ProcStats { + stat: Some(Stat { + rss: proc_rss as i64, + pss: proc_pss as i64, + vsize: proc_vmemory as i64, + state: proc.state.clone(), + name: proc.name.clone(), + pid: pid.to_string(), + }), + statm: None, + status: None, + cmdline, + start_time: start_time_str, + }); + Some(( + proc_rss, + proc_pss, + proc_vmemory, + 0, + proc.start_time, + proc.run_time, + )) + } + _ => None, } - _ => None, - } - }) - .reduce(|a, b| { - ( - a.0 + b.0, - a.1 + b.1, - a.2 + b.2, - std::cmp::min(a.3, b.3), - std::cmp::max(a.4, b.4), - ) - }) - .unwrap_or((0, 0, 0, u64::MAX, 0)) - } - None => (0, 0, 0, u64::MAX, 0), - }; + }) + .reduce(|a, b| { + ( + a.0 + b.0, + a.1 + b.1, + a.2 + b.2, + a.3 + b.3, + std::cmp::min(a.3, b.3), + std::cmp::max(a.4, b.4), + ) + }) + .unwrap_or((0, 0, 0, 0, u64::MAX, 0)) + } + None => (0, 0, 0, 0, u64::MAX, 0), + }; Some(SessionData { - memory, + rss, + pss, virtual_memory, gpu_memory, start_time, @@ -890,12 +886,14 @@ impl SystemManager for LinuxSystem { Ok(self.calculate_proc_session_data(&pid).map(|session_data| { debug!( "Collect frame stats fo {}. rss: {}kb virtual: {}kb gpu: {}kb", - pid, session_data.memory, session_data.virtual_memory, session_data.gpu_memory + pid, session_data.rss, session_data.virtual_memory, session_data.gpu_memory ); ProcessStats { // Caller is responsible for maintaining the Max value between calls - max_rss: session_data.memory, - rss: session_data.memory, + max_rss: session_data.rss, + rss: session_data.rss, + max_pss: session_data.pss, + pss: session_data.pss, max_vsize: session_data.virtual_memory, vsize: session_data.virtual_memory, llu_time: log_mtime, @@ -966,7 +964,7 @@ impl SystemManager for LinuxSystem { #[cfg(test)] mod tests { - use crate::config::{MachineConfig, MemoryMetric}; + use crate::config::MachineConfig; use std::fs; use std::{collections::HashMap, sync::Mutex}; diff --git a/rust/crates/rqd/src/system/macos.rs b/rust/crates/rqd/src/system/macos.rs index 3b2b8426e..5a3fc97a4 100644 --- a/rust/crates/rqd/src/system/macos.rs +++ b/rust/crates/rqd/src/system/macos.rs @@ -85,8 +85,10 @@ pub struct MachineDynamicInfo { /// Aggregated Data refering to a process session struct SessionData { - /// Amount of memory used by all processes in this session - memory: u64, + /// Amount of memory used by all processes in this session calculated by rss + rss: u64, + /// Amount of memory used by all processes in this session calculated by pss + pss: u64, /// Amount of virtual memory used by all processes in this session virtual_memory: u64, /// Amount of gpu used by all processes in this session @@ -598,6 +600,8 @@ impl MacOsSystem { children.push(ProcStats { stat: Some(Stat { rss: proc_memory as i64, + // Fallback to RSS as PSS is not available on sysinfo + pss: proc_memory as i64, vsize: proc_vmemory as i64, state: proc.status().to_string(), name: proc.name().to_string_lossy().to_string(), @@ -634,7 +638,9 @@ impl MacOsSystem { None => (0, 0, 0, u64::MAX, 0), }; Some(SessionData { - memory, + rss: memory, + // Not tracking PSS for macos + pss: memory, virtual_memory, gpu_memory, start_time, @@ -734,12 +740,14 @@ impl SystemManager for MacOsSystem { Ok(self.calculate_proc_session_data(&pid).map(|session_data| { debug!( "Collect frame stats fo {}. rss: {}kb virtual: {}kb gpu: {}kb", - pid, session_data.memory, session_data.virtual_memory, session_data.gpu_memory + pid, session_data.rss, session_data.virtual_memory, session_data.gpu_memory ); ProcessStats { // Caller is responsible for maintaining the Max value between calls - max_rss: session_data.memory, - rss: session_data.memory, + max_rss: session_data.rss, + rss: session_data.rss, + max_pss: session_data.pss, + pss: session_data.pss, max_vsize: session_data.virtual_memory, vsize: session_data.virtual_memory, llu_time: log_mtime, diff --git a/rust/crates/rqd/src/system/manager.rs b/rust/crates/rqd/src/system/manager.rs index 4dda6f06f..5ba276c19 100644 --- a/rust/crates/rqd/src/system/manager.rs +++ b/rust/crates/rqd/src/system/manager.rs @@ -120,6 +120,10 @@ pub struct ProcessStats { pub max_rss: u64, /// Current resident set size (KB) - amount of physical memory currently in use. pub rss: u64, + /// Maximum proportional set size (KB) - maximum amount of physical memory used. + pub max_pss: u64, + /// Current proportional set size (KB) - amount of physical memory currently in use. + pub pss: u64, /// Maximum virtual memory size (KB) - maximum amount of virtual memory used. pub max_vsize: u64, /// Current virtual memory size (KB) - amount of virtual memory currently in use. @@ -143,6 +147,8 @@ impl Default for ProcessStats { ProcessStats { max_rss: 0, rss: 0, + max_pss: 0, + pss: 0, max_vsize: 0, vsize: 0, llu_time: 0, @@ -162,10 +168,12 @@ impl ProcessStats { pub fn update(&mut self, new: Self) { *self = ProcessStats { max_rss: std::cmp::max(new.max_rss, self.max_rss), + max_pss: std::cmp::max(new.max_pss, self.max_pss), max_vsize: std::cmp::max(new.max_vsize, self.max_vsize), max_used_gpu_memory: std::cmp::max(new.max_used_gpu_memory, self.max_used_gpu_memory), run_time: std::cmp::max(new.run_time, self.run_time), rss: new.rss, + pss: new.pss, vsize: new.vsize, llu_time: new.llu_time, used_gpu_memory: new.used_gpu_memory,