Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3ee4863
get job stats with prodsourcelable for worker adjuster/maker
mightqxc Feb 18, 2026
d92df01
fix with jobstats API change
mightqxc Feb 24, 2026
5472c9b
Merge remote-tracking branch 'origin/master' into flin
mightqxc Feb 26, 2026
9fa6c30
Merge remote-tracking branch 'origin/master' into flin
mightqxc Mar 2, 2026
800dca7
Merge remote-tracking branch 'origin/master' into flin
mightqxc Mar 10, 2026
5ff8b8e
Merge remote-tracking branch 'origin/master' into flin
mightqxc Mar 19, 2026
f844692
Merge remote-tracking branch 'origin/master' into flin
mightqxc Mar 19, 2026
ae999a7
iamtokencred: default port of ARC CE
mightqxc Mar 25, 2026
d5b0b79
pretty
mightqxc Mar 25, 2026
34f74c6
submitter: reduce redundant calls per resource_type
mightqxc Mar 25, 2026
1992770
worker_adjuster: add prod_source_label
mightqxc Mar 25, 2026
33607ae
prod_source_label default
mightqxc Mar 25, 2026
3c8d009
swap resource_type and prod_source_label in nested dict
mightqxc Mar 25, 2026
242a6ca
Add map of pilotType and prodSourceLabel
mightqxc Mar 25, 2026
86cc3ac
revert worker_adjuster
mightqxc Mar 25, 2026
c326f72
worker_adjuster and submitter: add pilot_type and prod_source_label
mightqxc Mar 25, 2026
be4fe53
worker_adjuster: add logic for prod_source_label
mightqxc Mar 26, 2026
5486bfe
work_adjuster: more logic, dependency
mightqxc Mar 26, 2026
8d9a707
fixes
mightqxc Mar 26, 2026
a89323f
fix
mightqxc Mar 26, 2026
97671c0
fix
mightqxc Mar 26, 2026
4f4868f
worker_adjuster: fix behavior
mightqxc Mar 27, 2026
4a642ac
fix on get_queues_to_submit stats
mightqxc Mar 27, 2026
406467c
revert apfmon; pretty
mightqxc Mar 27, 2026
f3ea7d3
update apfmon accordingly
mightqxc Mar 27, 2026
7de5894
submitter: fixing nNewworkers from commands
mightqxc Mar 30, 2026
f329b44
log pretty
mightqxc Mar 30, 2026
a14c238
db_proxy get_queues_to_submit: fix about ANY pilotType
mightqxc Mar 30, 2026
851992f
fix
mightqxc Mar 30, 2026
a6f9e16
worker_adjuster: fix stats table
mightqxc Mar 31, 2026
767d3a2
worker_adjuster: configurable prioritizedProdSourceLabels
mightqxc Mar 31, 2026
3e069d0
fix
mightqxc Mar 31, 2026
c37466b
fix
mightqxc Mar 31, 2026
78d41ce
worker_adjuster: sort considering pilot_type
mightqxc Mar 31, 2026
8e60ca4
worker_adjuster: fix sorted
mightqxc Apr 1, 2026
4cb5083
version up for dependency
mightqxc Apr 1, 2026
4e7c8ab
pretty
mightqxc Apr 2, 2026
3556863
worker_adjuster: log result in table
mightqxc Apr 2, 2026
6800690
log pretty
mightqxc Apr 2, 2026
4bd4dce
fix
mightqxc Apr 2, 2026
4283cd9
fixes suggested by github copilot
mightqxc Apr 7, 2026
e3b015e
fix
mightqxc Apr 7, 2026
04aa072
fix
mightqxc Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "13-02-2026 07:45:33 on flin (by mightqxc)"
timestamp = "07-04-2026 12:44:11 on flin (by mightqxc)"
4 changes: 2 additions & 2 deletions pandaharvester/harvesterbody/job_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ def run(self):
if n_jobs == 0:
tmpLog.debug("no job to fetch; skip")
continue
# prodsourcelabel
# prod_source_label
try:
is_grandly_unified_queue = pandaQueueDict.is_grandly_unified_queue(siteName)
except Exception:
is_grandly_unified_queue = False
default_prodSourceLabel = queueConfig.get_source_label(is_gu=is_grandly_unified_queue)
# randomize prodsourcelabel if configured
# randomize prod_source_label if configured
pdpm = getattr(queueConfig, "prodSourceLabelRandomWeightsPermille", {})
choice_list = core_utils.make_choice_list(pdpm=pdpm, default=default_prodSourceLabel)
prodSourceLabel = random.choice(choice_list)
Expand Down
606 changes: 310 additions & 296 deletions pandaharvester/harvesterbody/submitter.py

Large diffs are not rendered by default.

740 changes: 577 additions & 163 deletions pandaharvester/harvesterbody/worker_adjuster.py

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions pandaharvester/harvesterbody/worker_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ def get_plugin(self, queue_config):
return self.pluginFactory.get_plugin(queue_config.workerMaker)

# make workers
def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_type, maker=None):
tmpLog = core_utils.make_logger(_logger, f"queue={queue_config.queueName} jtype={job_type} rtype={resource_type}", method_name="make_workers")
def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_type, prod_source_label="ANY", maker=None):
tmpLog = core_utils.make_logger(
_logger, f"queue={queue_config.queueName} jtype={job_type} rtype={resource_type} pslabel={prod_source_label}", method_name="make_workers"
)
tmpLog.debug("start")
try:
# get plugin
Expand All @@ -37,7 +39,7 @@ def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_
for iChunk, jobChunk in enumerate(jobchunk_list):
# make a worker
if iChunk >= n_ready:
workSpec = maker.make_worker(jobChunk, queue_config, job_type, resource_type)
workSpec = maker.make_worker(jobChunk, queue_config, job_type, resource_type, prod_source_label=prod_source_label)
else:
Comment thread
mightqxc marked this conversation as resolved.
# use ready worker
if iChunk < len(readyWorkers):
Expand All @@ -63,35 +65,35 @@ def make_workers(self, jobchunk_list, queue_config, n_ready, job_type, resource_
return [], jobchunk_list

# get number of jobs per worker
def get_num_jobs_per_worker(self, queue_config, n_workers, job_type, resource_type, maker=None):
def get_num_jobs_per_worker(self, queue_config, n_workers, job_type, resource_type, prod_source_label="ANY", maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
return maker.get_num_jobs_per_worker(n_workers)

# get number of workers per job
def get_num_workers_per_job(self, queue_config, n_workers, job_type, resource_type, maker=None):
def get_num_workers_per_job(self, queue_config, n_workers, job_type, resource_type, prod_source_label="ANY", maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
return maker.get_num_workers_per_job(n_workers)

# check number of ready resources
def num_ready_resources(self, queue_config, job_type, resource_type, maker=None):
def num_ready_resources(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
return maker.num_ready_resources()

# get upper limit on the cumulative total of workers per job
def get_max_workers_per_job_in_total(self, queue_config, job_type, resource_type, maker=None):
def get_max_workers_per_job_in_total(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
return maker.get_max_workers_per_job_in_total()

# get upper limit on the number of new workers per job in a cycle
def get_max_workers_per_job_per_cycle(self, queue_config, job_type, resource_type, maker=None):
def get_max_workers_per_job_per_cycle(self, queue_config, job_type, resource_type, prod_source_label="ANY", maker=None):
# get plugin
if maker is None:
maker = self.pluginFactory.get_plugin(queue_config.workerMaker)
Expand Down
25 changes: 25 additions & 0 deletions pandaharvester/harvestercommunicator/panda_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,31 @@ def get_job_stats(self):

return stats, "OK"

# get job statistics: new function with prod_source_label, under testing and may replace the old one
def get_job_stats_new(self):
tmp_log = self.make_logger(method_name="get_job_stats_new")
tmp_log.debug("Start")

tmp_status, tmp_response = self.request_ssl("GET", "statistics/active_job_detailed_stats_by_site", {})
stats = {}
ret_message = "FAILED"

# Communication issue
if tmp_status is False:
core_utils.dump_error_message(tmp_log, tmp_response)
return stats, ret_message

tmp_success = tmp_response.get("success", False)
tmp_message = tmp_response.get("message")
stats = tmp_response.get("data")

if not tmp_success:
ret_message = tmp_message
core_utils.dump_error_message(tmp_log, ret_message)
return stats, ret_message

return stats, "OK"

# update workers
def update_workers(self, workspec_list):
tmp_log = self.make_logger(method_name="update_workers")
Expand Down
36 changes: 36 additions & 0 deletions pandaharvester/harvestercore/core_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,3 +706,39 @@ def naive_utcfromtimestamp(timestamp: float) -> datetime:
datetime: current UTC date and time, without tzinfo
"""
return aware_utcfromtimestamp(timestamp).replace(tzinfo=None)


def special_pilot_type_to_prod_source_label(pilot_type: str) -> str:
"""
Convert special pilotType of worker to prodSourceLabel of PanDA job.

Args:
pilot_type (str): pilotType of worker, e.g. "RC", "ALRB", "PT" (except "PR" which is production)

Returns:
str: prodSourceLabel of PanDA job, e.g. "rc_test2", "rc_alrb", "ptest"; "ANY" if no mapping is defined for the given pilot_type
"""
pilot_type_to_prod_source_label_map = {
"RC": "rc_test2",
"ALRB": "rc_alrb",
"PT": "ptest",
}
return pilot_type_to_prod_source_label_map.get(pilot_type, "ANY")


def prod_source_label_to_pilot_type(prod_source_label: str) -> str:
"""
Convert prodSourceLabel of PanDA job to pilotType of worker.

Args:
prod_source_label (str): prodSourceLabel of PanDA job, e.g. "rc_test2", "rc_alrb", "ptest"

Returns:
str: pilotType of worker, e.g. "RC", "ALRB", "PT"; default to "PR" (production) if no mapping is defined for the given prod_source_label
"""
prod_source_label_to_pilot_type_map = {
"rc_test2": "RC",
"rc_alrb": "ALRB",
"ptest": "PT",
}
return prod_source_label_to_pilot_type_map.get(prod_source_label, "PR")
Loading
Loading