Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
1c2a01f
Increase number of rows per file and add shuf
marinak-ebi Jun 13, 2025
c5f5b01
Create folder for input_parquet_files
marinak-ebi Jun 13, 2025
a4b3c5f
Flatten job folder structure
marinak-ebi Jun 13, 2025
869801d
Rename AnnotationExtractorAndHadoopLoader to annotation_extractor
marinak-ebi Jun 13, 2025
5af5109
Flatten annotation_extractor
marinak-ebi Jun 13, 2025
b292c7c
Rename tmp to annotation_pipeline_output
marinak-ebi Jun 13, 2025
4988caa
Create folder input_parquet_files
marinak-ebi Jun 13, 2025
808b0bf
Fix paths
marinak-ebi Jun 18, 2025
24bb576
Temporally turn off ipdateImpress function
marinak-ebi Jun 18, 2025
a9ca8a6
Fix path for function_windowed
marinak-ebi Jun 18, 2025
1613680
Add additional logs
marinak-ebi Jun 18, 2025
29207ff
Fix path to function_windowed.R
marinak-ebi Jun 18, 2025
3e1acfd
Fix mp_chooser path
marinak-ebi Jun 18, 2025
59e4b3b
Change path to annotation_extractor
marinak-ebi Jun 18, 2025
6774f5d
Fix path for indeces
marinak-ebi Jun 25, 2025
36759c4
Fix path to mp_chooser
marinak-ebi Jun 25, 2025
5ced419
Change path of step 2 logs
marinak-ebi Jun 25, 2025
30188dd
Fix path
marinak-ebi Jun 26, 2025
fed0a03
Compress files in step 2
marinak-ebi Jun 26, 2025
5277621
Change path of step 4 logs
marinak-ebi Jun 26, 2025
8c7d2b8
Add mindepth to fix minor error
marinak-ebi Jun 26, 2025
24dd1bc
Fix typo
marinak-ebi Jun 26, 2025
1471b79
Change path of phase II logs
marinak-ebi Jun 26, 2025
69f2230
Rename slurm jobs
marinak-ebi Jun 26, 2025
233bc71
Change compression of phase 3 logs
marinak-ebi Jun 26, 2025
c6769ed
Remove intermediate logs
marinak-ebi Jun 26, 2025
9db9921
Edit path of annotation pipeline logs
marinak-ebi Jun 26, 2025
d125559
Create slurm job for annotation pipeline logs compression
marinak-ebi Jun 26, 2025
ab336e7
Change path to annotation_pipeline_output
marinak-ebi Jun 26, 2025
e8e17bd
Fix Typo
marinak-ebi Jun 26, 2025
b558dd7
Fix Typo
marinak-ebi Jun 26, 2025
7d15079
Fix Typo
marinak-ebi Jun 26, 2025
d76e668
Fix path to annotation_logs
marinak-ebi Jun 26, 2025
28a3785
Rename AllResultsIndeces.txt to global_results_index.txt
marinak-ebi Jun 26, 2025
0524606
Remove unneeded underscore
marinak-ebi Jun 26, 2025
255327f
Rename allsingleindeces.zip
marinak-ebi Jun 26, 2025
2fa86a1
Rename AllJobs.bch
marinak-ebi Jun 26, 2025
09071f0
Rewrite log messages
marinak-ebi Jun 26, 2025
40d23c0
Uncomment updateImpress
marinak-ebi Jun 27, 2025
6787277
Rename upper level folders
marinak-ebi Jun 27, 2025
5b58c83
Make job name version specific
marinak-ebi Jun 27, 2025
ba4cf84
Rename job for phase III
marinak-ebi Jun 27, 2025
54b588a
Fix concatenation bug
marinak-ebi Jun 27, 2025
9005b61
Temporarily disable updateImpress
marinak-ebi Jun 27, 2025
ccba356
Fix bug for mainAgeing
marinak-ebi Jun 27, 2025
bebc4e7
Remove extra space
marinak-ebi Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ mainAgeing = function(file = NULL ,
cpu = cpu ,
memory = memory ,
time = time ,
jobname = paste0("impc_job_", DRversion),
extraBatchParameters = extraBatchParameters
)
write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ BatchGenerator = function(file ,
cpu = 1 ,
memory = "8G" ,
time = "10:00:00" ,
jobname = NULL ,
extraBatchParameters = NULL) {
dirOut = file.path(dir, 'ClusterOut')
dirErr = file.path(dir, 'ClusterErr')
Expand All @@ -701,7 +702,7 @@ BatchGenerator = function(file ,
ro = paste(' -o ', paste0('"', oname, '.ClusterOut', '"'), sep = '')
re = paste(' -e ', paste0('"', ename, '.ClusterErr', '"'), sep = '')
rf = paste(
"sbatch --job-name=impc_stats_pipeline_job --mem=", memory,
"sbatch --job-name=", jobname, " --mem=", memory,
" --time=", time,
extraBatchParameters ,
' --cpus-per-task=' ,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ generate_data <- function(args, thresh = 4) {
controlSize = 1500,
extraBatchParameters = NULL,
combineEAandLA = FALSE,
solrBaseURL = NULL
solrBaseURL = NULL,
DRversion = args[3]
)
trash <- NULL
gc()
Expand Down
6 changes: 3 additions & 3 deletions annotation_pipeline/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ def main():
file_list = [line.strip() for line in f]
total_files = len(file_list)

# Store StatPackets temporary.
output_dir = Path("tmp")
# Store StatPackets.
output_dir = Path("../annotation_pipeline_output")
if not output_dir.exists():
output_dir.mkdir()

output_file = output_dir / (file_list_path.name + "_.statpackets")
output_file = output_dir / (file_list_path.name + ".statpackets")

for i, file in enumerate(file_list):
file_path = Path(file)
Expand Down
131 changes: 65 additions & 66 deletions orchestration/orchestration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ WINDOWING_PIPELINE=${7:-"true"}

# Redirect all output and errors to the log file.
LOGFILE=${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_logs/orchestration_${VERSION}.log
JOBNAME="impc_job_${VERSION}"
exec > >(tee -a "$LOGFILE") 2>&1

echo "Starting pipeline run. Data release $VERSION. Fetching packages from $REMOTE / $BRANCH."
Expand All @@ -33,7 +34,7 @@ function message0() {
# Function to wait until all jobs on SLURM complete.
function waitTillCommandFinish() {
while true; do
if ! (squeue --format="%A %.30j" | grep -q "impc_stats_pipeline_job"); then
if ! (squeue --format="%A %.30j" | grep -q "$JOBNAME"); then
message0 "Done waiting for SLURM jobs to complete."
break
fi
Expand Down Expand Up @@ -76,7 +77,8 @@ function submit_limit_jobs() {
# Preparation.
mkdir --mode=775 ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION}
cd ${KOMP_PATH}/impc_statistical_pipeline/IMPC_DRs/stats_pipeline_input_dr${VERSION}
cp ${PARQUET_FOLDER}/*.parquet ./
mkdir input_parquet_files
cp ${PARQUET_FOLDER}/*.parquet ./input_parquet_files
cp ${MP_CHOOSER_FOLDER}/part*.txt ./mp_chooser.json

message0 "Update packages to the latest version"
Expand All @@ -87,95 +89,93 @@ message0 "Update completed"

# Statistical pipeline.
message0 "Starting the IMPC statistical pipeline..."
mkdir SP compressed_logs
mkdir 01_batching compressed_logs
export input_path=$(realpath .)
export sp_results=$(realpath SP)
export sp_results=$(realpath 01_batching)
message0 "Parquet files path: ${input_path}"
message0 "Output path: ${sp_results}"
cd SP
cd 01_batching

message0 "Phase I. Convert parquet files into Rdata..."

message0 "Step 1. Create jobs"
step1_files=$(find .. -type f -name '*.parquet' -exec realpath {} \;)
step1_files=$(find ../input_parquet_files -type f -name '*.parquet' -exec realpath {} \;)
for file in $step1_files; do
echo "sbatch --job-name=impc_stats_pipeline_job --mem=10G --time=00:10:00 -e ${file}.err -o ${file}.log --wrap='Rscript Step2Parquet2Rdata.R $file'" >> jobs_step2_Parquet2Rdata.bch
file_name=$(basename "${file}" .parquet)
echo "sbatch --job-name=${JOBNAME} --mem=10G --time=00:10:00 -e ../compressed_logs/step2_logs/${file_name}.err -o ../compressed_logs/step2_logs/${file_name}.log --wrap='Rscript Step2Parquet2Rdata.R $file'" >> jobs_step2_Parquet2Rdata.bch
done

message0 "Step 2. Read parquet files and create pseudo Rdata"
fetch_script 0-ETL/Step2Parquet2Rdata.R
sbatch --job-name=impc_stats_pipeline_job --time=01:00:00 --mem=1G -o ../compressed_logs/step2_job_id.txt --wrap="bash jobs_step2_Parquet2Rdata.bch"
sbatch --job-name=${JOBNAME} --time=01:00:00 --mem=1G -o ../compressed_logs/step2_job_id.txt --wrap="bash jobs_step2_Parquet2Rdata.bch"
waitTillCommandFinish
rm Step2Parquet2Rdata.R
find ../ -type f -name '*.log' -exec zip -q -m ../compressed_logs/step2_logs.zip {} +
find ../ -type f -name '*.err' -exec zip -q -m ../compressed_logs/step2_logs.zip {} +
sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_step2.txt --wrap="zip -r -m -q ../compressed_logs/step2_logs.zip ../compressed_logs/step2_logs/"

message0 "Step 3. Merging pseudo Rdata files into single file for each procedure - jobs creator"
dirs=$(find "${sp_results}/ProcedureScatterRdata" -maxdepth 1 -type d)
dirs=$(find "${sp_results}/ProcedureScatterRdata" -maxdepth 1 -mindepth 1 -type d)
for dir in $dirs; do
echo "sbatch --job-name=impc_stats_pipeline_job --mem=50G --time=01:30:00 -e ${dir}/step4_merge_rdatas.err -o ${dir}/step4_merge_rdatas.log --wrap='Rscript Step4MergingRdataFiles.R ${dir}'" >> jobs_step4_MergeRdatas.bch
file_name=$(basename "${dir}")
echo "sbatch --job-name=${JOBNAME} --mem=50G --time=01:30:00 -e ../compressed_logs/step4_logs/${file_name}_step4.err -o ../compressed_logs/step4_logs/${file_name}_step4.log --wrap='Rscript Step4MergingRdataFiles.R ${dir}'" >> jobs_step4_MergeRdatas.bch
done

message0 "Step 4. Merging pseudo Rdata files into single files per procedure"
fetch_script 0-ETL/Step4MergingRdataFiles.R
sbatch --job-name=impc_stats_pipeline_job --time=01:00:00 --mem=1G -o ../compressed_logs/step4_job_id.txt --wrap="bash jobs_step4_MergeRdatas.bch"
sbatch --job-name=${JOBNAME} --time=01:00:00 --mem=1G -o ../compressed_logs/step4_job_id.txt --wrap="bash jobs_step4_MergeRdatas.bch"
waitTillCommandFinish
rm Step4MergingRdataFiles.R
find . -type f -name '*.log' -exec zip -q -m ../compressed_logs/step4_logs.zip {} +
find . -type f -name '*.err' -exec zip -q -m ../compressed_logs/step4_logs.zip {} +
sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_step4.txt --wrap="zip -r -m -q ../compressed_logs/step4_logs.zip ../compressed_logs/step4_logs/"

message0 "Phase I. Compressing the log files and house cleaning..."
zip -q -rm ../compressed_logs/phase1_jobs.zip *.bch
rm -rf ProcedureScatterRdata

message0 "Starting Phase II, packaging the big data into small packages ..."
mkdir DataGeneratingLog
for file in $(find Rdata -type f -exec realpath {} \;); do
file_basename=$(basename $file .Rdata)
echo "sbatch --job-name=impc_stats_pipeline_job --mem=45G --time=6-00 -e DataGeneratingLog/${file_basename}_errorlog.log -o DataGeneratingLog/${file_basename}_outputlog.log --wrap='Rscript InputDataGenerator.R ${file} ${file_basename}'" >> DataGenerationJobList.bch
echo "sbatch --job-name=${JOBNAME} --mem=45G --time=6-00 -e ../compressed_logs/phase2_logs/${file_basename}.err -o ../compressed_logs/phase2_logs/${file_basename}.log --wrap='Rscript InputDataGenerator.R ${file} ${file_basename} ${VERSION}'" >> DataGenerationJobList.bch
done
fetch_script jobs/InputDataGenerator.R
sbatch --job-name=impc_stats_pipeline_job --time=01:00:00 --mem=1G -o ../compressed_logs/phase2_job_id.txt --wrap="bash DataGenerationJobList.bch"
sbatch --job-name=${JOBNAME} --time=01:00:00 --mem=1G -o ../compressed_logs/phase2_job_id.txt --wrap="bash DataGenerationJobList.bch"
waitTillCommandFinish
rm InputDataGenerator.R

message0 "End of packaging data."
message0 "Phase II. Compressing the log files and house cleaning..."
mv *.bch DataGeneratingLog/
zip -q -rm phase2_logs.zip DataGeneratingLog/
mv phase2_logs.zip ../compressed_logs/
mv *.bch ../compressed_logs/phase2_logs/
sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_phase2.txt --wrap="zip -r -m -q ../compressed_logs/phase2_logs.zip ../compressed_logs/phase2_logs/"

message0 "Appending all procedure based jobs into one single file..."
mkdir jobs
find ./*/*_RawData/*.bch -type f | xargs cat >> jobs/AllJobs.bch
mkdir ../02_sp_output
find ./*/*_RawData/*.bch -type f | xargs cat >> ../02_sp_output/phase3_jobs.bch

message0 "Phase III. Initialising the statistical analysis..."
cd jobs
cd ../02_sp_output
message0 "Updating the dynamic contents from the IMPReSS..."
R --quiet -e \
"DRrequiredAgeing:::updateImpress( \
updateImpressFileInThePackage = TRUE, \
updateOptionalParametersList = TRUE, \
updateTheSkipList = TRUE, \
saveRdata = FALSE \
)"
# R --quiet -e \
# "DRrequiredAgeing:::updateImpress( \
# updateImpressFileInThePackage = TRUE, \
# updateOptionalParametersList = TRUE, \
# updateTheSkipList = TRUE, \
# saveRdata = FALSE \
# )"

message0 "Running the IMPC statistical pipeline by submitting jobs..."
if [ "${WINDOWING_PIPELINE}" = true ]; then
fetch_script jobs/function_windowed.R
mv function_windowed.R function.R
else
fetch_script jobs/function.R
fetch_script function.R
fi

R --quiet -e \
"DRrequiredAgeing:::ReplaceWordInFile( \
'$(realpath function.R)', \
'DRversionNotSpecified', \
${VERSION} \
)"
chmod 775 AllJobs.bch
submit_limit_jobs AllJobs.bch ../../compressed_logs/phase3_job_id.txt

chmod 775 phase3_jobs.bch
submit_limit_jobs phase3_jobs.bch ../compressed_logs/phase3_job_id.txt
waitTillCommandFinish

message0 "Postprocessing the IMPC statistical analysis results..."
Expand All @@ -194,56 +194,55 @@ R --quiet -e \
storepath='$(realpath RPackage_backup)' \
)"

message0 "Compress phase III log files"
find . -type f -name '*.ClusterOut' -exec zip -q -m ../compressed_logs/phase3_logs.zip {} +
message0 "Compress phase III error files"
find . -type f -name '*.ClusterErr' -exec zip -q -m ../compressed_logs/phase3_errs.zip {} +
message0 "Submit phase III log and err files compression"
sbatch --job-name=compress_logs --time=1-00:00:00 --mem=1G -o ../compressed_logs/zip_phase3_logs.txt --wrap="find . -type d -name 'ClusterOut' -exec zip -q -r -m ../compressed_logs/phase3_logs.zip {} \;"
sbatch --job-name=compress_logs --time=1-00:00:00 --mem=1G -o ../compressed_logs/zip_phase3_errs.txt --wrap="find . -type d -name 'ClusterErr' -exec zip -q -r -m ../compressed_logs/phase3_errs.zip {} \;"

message0 "This is the last step. If you see no file in the list below, the SP is successfully completed."

# Annotation pipeline.
message0 "Starting the IMPC annotation pipeline..."
cd jobs/Results_IMPC_SP_Windowed/
cd ../02_sp_output/Results_IMPC_SP_Windowed/
message0 "Step 1: Clean ups and creating the global index for the results."
message0 "Indexing the results..."
for dir in $(find . -mindepth 2 -maxdepth 2 -type d); do
base_dir=$(basename "$dir")
output_file="FileIndex_${base_dir}_$(printf "%.6f" $(echo $RANDOM/32767 | bc -l)).Ind"
echo "sbatch --job-name=impc_stats_pipeline_job --mem=1G --time=2-00 \
-e ${base_dir}_error.err -o ${base_dir}_output.log --wrap=\"find $dir -type f -name '*.tsv' -exec realpath {} \; > $output_file\"" >> minijobs.bch
echo "sbatch --job-name=${JOBNAME} --mem=1G --time=2-00 \
-e ../../compressed_logs/minijobs_logs/${base_dir}.err -o ../../compressed_logs/minijobs_logs/${base_dir}.log \
--wrap=\"find $dir -type f -name '*.tsv' -exec realpath {} \; > $output_file\"" >> minijobs.bch
done
chmod 775 minijobs.bch
submit_limit_jobs minijobs.bch ../../../compressed_logs/minijobs_job_id.txt
submit_limit_jobs minijobs.bch ../../compressed_logs/minijobs_job_id.txt
waitTillCommandFinish
mv minijobs.bch ../../../compressed_logs
mv minijobs.bch ../../compressed_logs
sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../../compressed_logs/zip_minijobs.txt --wrap="zip -r -m -q ../../compressed_logs/minijobs_logs.zip ../../compressed_logs/minijobs_logs/"

find . -type f -name '*_output.log' -exec zip -q -m ../../../compressed_logs/minijobs_logs.zip {} +
find . -type f -name '*_error.err' -exec zip -q -m ../../../compressed_logs/minijobs_logs.zip {} +
message0 "Moving single indeces into a separate directory called AnnotationExtractorAndHadoopLoader..."
mkdir AnnotationExtractorAndHadoopLoader
chmod 775 AnnotationExtractorAndHadoopLoader
mv *.Ind AnnotationExtractorAndHadoopLoader
cd AnnotationExtractorAndHadoopLoader
message0 "Moving single indices into a separate directory called 03_indices_and_splits..."
mkdir ../../03_indices_and_splits
chmod 775 ../../03_indices_and_splits
cd ../../03_indices_and_splits
mv ../02_sp_output/Results_IMPC_SP_Windowed/*.Ind .

message0 "Concatenating single index files to create a global index for the results..."
cat *.Ind >> AllResultsIndeces.txt
message0 "Zipping the single indeces..."
zip -q -rm allsingleindeces.zip *.Ind
split -50 AllResultsIndeces.txt split_index_
cat *.Ind | shuf --random-source=<(yes "42") >> global_results_index.txt
message0 "Zipping the single indices..."
sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_indices.txt --wrap="zip -r -m -q individual_indices.zip *.Ind"
split -1000 global_results_index.txt split_index_

message0 "Convert the mp_chooser JSON file to Rdata..."
R --quiet -e "a = jsonlite::fromJSON('../../../../mp_chooser.json');save(a,file='../../../../mp_chooser.json.Rdata')"
export MP_CHOOSER_FILE=$(realpath ../../../../mp_chooser.json.Rdata | tr -d '\n')
R --quiet -e "a = jsonlite::fromJSON('../mp_chooser.json');save(a,file='../mp_chooser.json.Rdata')"
export MP_CHOOSER_FILE=$(realpath ../mp_chooser.json.Rdata | tr -d '\n')

if [[ -z "${MP_CHOOSER_FILE}" || ! -f "${MP_CHOOSER_FILE}" ]]; then
echo -e "ERROR: mp_chooser not found at location\n\t${MP_CHOOSER_FILE}"
exit 1
fi

mkdir err log out
message0 "Generate annotation jobs..."
for file in $(find . -maxdepth 1 -type f -name "split_index*"); do
echo "sbatch --job-name=impc_stats_pipeline_job --mem=5G --time=2-00 \
-e err/$(basename "$file").err -o out/$(basename "$file").out --wrap='python3 loader.py $(basename "$file") ${MP_CHOOSER_FILE}'" >> annotation_jobs.bch
echo "sbatch --job-name=${JOBNAME} --mem=5G --time=2-00 \
-e ../compressed_logs/annotation_logs/$(basename "$file").err -o ../compressed_logs/annotation_logs/$(basename "$file").out --wrap='python3 loader.py $(basename "$file") ${MP_CHOOSER_FILE}'" >> annotation_jobs.bch
done
chmod 775 annotation_jobs.bch

Expand All @@ -253,13 +252,13 @@ python3.10 -m pip install rpy2
python3.10 -m pip install numpy
python3.10 -m pip install pandas

message0 "Downloading the action script..."
message0 "Downloading the action script loader.py..."
fetch_script loader.py annotation_pipeline
submit_limit_jobs annotation_jobs.bch ../../../../compressed_logs/annotation_job_id.txt
submit_limit_jobs annotation_jobs.bch ../compressed_logs/annotation_job_id.txt
waitTillCommandFinish

message0 "Zipping logs..."
mv annotation_jobs.bch ../../../../compressed_logs
zip -q -rm ../../../../compressed_logs/annotation_logs.zip log/* err/* out/*
zip -q -rm splits.zip split_index_*
message0 "Running Slurm jobs to compress logs..."
mv annotation_jobs.bch ../compressed_logs
sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_annotations.txt --wrap="zip -r -m -q ../compressed_logs/annotation_logs.zip ../compressed_logs/annotation_logs/"
sbatch --job-name=compress_logs --time=15:00:00 --mem=1G -o ../compressed_logs/zip_splits.txt --wrap="find . -type f -name 'split_index_*' -exec zip -q -m splits.zip {} +"
message0 "Job done."