Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request - custom caching strategy #5308

Open
robsyme opened this issue Sep 17, 2024 · 12 comments
Open

Feature request - custom caching strategy #5308

robsyme opened this issue Sep 17, 2024 · 12 comments

Comments

@robsyme
Copy link
Collaborator

robsyme commented Sep 17, 2024

New feature

One of Nextflow's most useful features is the ability to send metadata though the process DAG in parallel to the file data. Not all of the metadata will be necessary to all processes, but any change to the metadata will result in a new task hash, and potentially unnecessary recomputation of tasks.

I propose that users would appreciate the ability to signal to Nextflow which parts of the metadata are relevant to the process.

Usage scenario

Let's say we have a samplesheet samples.csv

name,university
Paolo,Sapienza
Ben,Clemson

That we ingest in a simple workflow

workflow {
    Channel.fromPath("samples.csv")
    | splitCsv(header:true)
    | GetEducation
}

process GetEducation {
    input: val(meta)
    output: tuple val(meta), path("education.txt")
    script: "echo '${meta.name} going to ${meta.university}' > education.txt"
}

If we expand the samplesheet to include employer metadata:

name,university,employer
Paolo,Sapienza,Seqera
Ben,Clemson,Seqera

Any resumed runs will be unable to use the previously-computed tasks, as the GetEducation meta input has changed, even if we know that the new input will make no difference to the task calculation.

Suggest implementation

I propose a new option to the cache directive, whereby the user could supply a closure that returns some new value to be used as input into the task hash:

process GetEducation {
    cache { meta -> meta.subMap('name', 'university') }

    input: val(meta)
    output: tuple val(meta), path("education.txt")
    script: "echo '${meta.name} going to ${meta.university}' > education.txt"
}

This would would expand the opportunity to rely on the Nextflow task cache when a workflow is being interactively developed, and the extend of the required metadata is not known at the start of development.

The syntax might also include tuples and files (or bags of files):

process ApplyLearning {
    cache { meta, edu -> [meta.employer, edu] }

    input: tuple val(meta), path("education.txt")
    script: "cat education.txt > bio.txt && echo 'Now working at $meta.employer' >> bio.txt"
}
@robsyme
Copy link
Collaborator Author

robsyme commented Sep 18, 2024

Here is a self-contained example:

workflow {
    input = [
        [  id: 'test1', patient: 'patient_1' ],
        [  id: 'test2', patient: 'patient_3' ]
    ]

    Channel.fromList(input)
    | map { patient -> [patient.id, patient] }
    | set { patients }

    patients
    | map { id, patient -> id }
    | createFile
    | join(patients)
    | map { id, initialFile, patient -> [patient, initialFile] }
    | addPatientInfo
    | view
}

process createFile {
    input:
    val id

    output:
    tuple val(id), path("${id}.txt")

    script:
    """
    touch ${id}.txt
    echo "ID: ${id}" >> ${id}.txt
    """
}

process addPatientInfo {
    publishDir 'publish', mode: 'copy'

    input:
    tuple val(meta), path(initial_file)

    output:
    path "${meta.id}_with_patient.txt"

    script:
    """
    cp ${initial_file} ${meta.id}_with_patient.txt
    echo "Patient: ${meta.patient}" >> ${meta.id}_with_patient.txt
    """
}

If we were able to specify in the createFile which pieces of metadata were really relevant, this could be considerably shorter without sacrificing the reusability if the patient value changes for one or more samples:

workflow {
    input = [
        [  id: 'test1', patient: 'patient_1' ],
        [  id: 'test2', patient: 'patient_3' ]
    ]

    Channel.fromList(input)
    | createFile
    | addPatientInfo
    | view
}

process createFile {
    cache { meta.id }
    input:
    val(meta)

    output:
    tuple val(meta), path("${meta.id}.txt")

    script:
    """
    touch ${meta.id}.txt
    echo "ID: ${meta.id}" >> ${meta.id}.txt
    """
}

process addPatientInfo {
    publishDir 'publish', mode: 'copy'

    input:
    tuple val(meta), path(initial_file)

    output:
    path "${meta.id}_with_patient.txt"

    script:
    """
    cp ${initial_file} ${meta.id}_with_patient.txt
    echo "Patient: ${meta.patient}" >> ${meta.id}_with_patient.txt
    """
}

@bentsherman
Copy link
Member

The cache closure wouldn't need any parameters, just use the task context like any other dynamic directive. Also would want to log a warning to make it clear that you're basically doing a "development" run as this option shouldn't be used in production.

Here's a quandary -- I think the user would need to specify this custom strategy from the beginning. If I do a run, then add some metadata columns, but I forgot to specify this closure, then I'm out of luck.

I guess you could make it work if you specify all inputs:

process ApplyLearning {
    cache { [meta.employer, edu] }

    input:
    tuple val(meta), path(edu, stageAs: "education.txt")

    // ...
}

I think you could tack that on to the second run and be able to resume.

@bentsherman
Copy link
Member

Thinking about this for 30 more seconds, I would argue that if the intent is to declare that a process depends only on meta.employer and not the entire meta map, then you should just provide meta.employer as an input:

process ApplyLearning {
    input:
    val(employer)
    path("education.txt")

    // ...
}

I think the deeper issue is that processes are invoked with channels instead of individual values and that makes it difficult to control how inputs are passed into the process. It's much easier to just provide the channel you already have, which might have more inputs than you actually need, rather than adding more channel boilerplate.

But I think that solving that problem is probably the best way to address this one too.

workflow {
    input = [
        [  id: 'test1', patient: 'patient_1' ],
        [  id: 'test2', patient: 'patient_3' ]
    ]

    Channel.fromList(input)
    | map { meta ->
      def initial_file = createFile(meta.id)
      return [ meta, addPatientInfo(meta, initial_file) ]
    }
    | view
}

process createFile {
    input:
    val(id)

    output:
    path("${id}.txt")

    script:
    """
    touch ${id}.txt
    echo "ID: ${id}" >> ${id}.txt
    """
}

process addPatientInfo {
    publishDir 'publish', mode: 'copy'

    input:
    val(meta)
    path(initial_file)

    output:
    path "${meta.id}_with_patient.txt"

    script:
    """
    cp ${initial_file} ${meta.id}_with_patient.txt
    echo "Patient: ${meta.patient}" >> ${meta.id}_with_patient.txt
    """
}

@pditommaso
Copy link
Member

What am i missing?

» nextflow run t.nf  -resume -ansi-log false
N E X T F L O W  ~  version 24.08.0-edge
Launching `t.nf` [jolly_majorana] DSL2 - revision: 6f25f6915a
[fa/d8a5fc] Cached process > GetEducation (1)
[33/9737d0] Cached process > GetEducation (2)

@robsyme
Copy link
Collaborator Author

robsyme commented Sep 19, 2024

Paolo - maybe a clearer example would be:

params.input = "samples.basic.csv"

workflow {
    Channel.fromPath(params.input)
    | splitCsv(header:true)
    | GetEducation
    | ApplyLearning
}

process GetEducation {
    input: val(meta)
    output: tuple val(meta), path("education.txt")
    script: "echo '$meta.name going to $meta.university' > education.txt"
}

process ApplyLearning {
    input: tuple val(meta), path("education.txt")
    script: "cat education.txt > bio.txt && echo 'Now working at $meta.employer' >> bio.txt"
}

And we have two samplesheets, a samples.basic.csv:

name,university
Paolo,Sapienza
Ben,Clemson

... and a samples.extended.csv

name,university,employer
Paolo,Sapienza,Seqera
Ben,Clemson,Seqera

If I run using the basic samplesheet first, we run all four tasks:

$ nextflow run . --input samples.basic.csv
 N E X T F L O W   ~  version 24.04.4
[15/6978b9] process > GetEducation (1)  [100%] 2 of 2 ✔
[fa/5be9e8] process > ApplyLearning (2) [100%] 2 of 2 ✔

If I launch using the extended samplesheet, I have to recalculate the GetEducation tasks, even though they are exactly the same tasks with exactly the same result:

$ nextflow run main.nf --input samples.extended.csv -resume 
 N E X T F L O W   ~  version 24.04.4
[11/64ea70] process > GetEducation (1)  [100%] 2 of 2 ✔
[19/6f8b45] process > ApplyLearning (2) [100%] 2 of 2 ✔

In the second (samples.extended.csv) run, the meta input variable is indeed different, but I have no way of telling Nextflow which parts of the meta variable are actually important for the purposes of task caching. It would be very useful if I could say to Nextflow that it's only meta.name and meta.university that are used by the GetEducation task. Other changes to this variable (like additional fields in the Map in this case) are inconsequential for the purposes of task caching.

@wangyang1749
Copy link

wangyang1749 commented Sep 24, 2024

I think it's important to be able to control the cache. Ability to manually cache time-consuming tasks.

Suppose my script is as follows.

process saySecond {
  scratch true
  stageInMode "copy"
  container "master:5000/stress:latest"

  cache 'lenient'
  input: 
    path db
  output:
    path("db2.json")
  script:
    """
    cat $db > db2.json
    """
}
workflow {
  ch_input = Channel2.fromPath(["/data/workspace/1/nf-hello/db/a.txt","/data/workspace/1/nf-hello/db/b.txt"])
  saySecond(ch_input)
}

View the hash of the task using the parameter -dump-hashes json.

[saySecond (1)] cache hash: a621c03e986060730fa7c05f43a0d754; mode: LENIENT; entries: [
    {
        "hash": "52fc10538560a06bf4eefb5029a3a408",
        "type": "java.util.UUID",
        "value": "68e313b7-a96b-482f-aa5a-5d3c2971779c"
    },
    {
        "hash": "4f60e6294a34dfbe2dd400257e58d05e",
        "type": "java.lang.String",
        "value": "saySecond"
    },
    {
        "hash": "115eed1375a4e25775dfa635bdaf0eee",
        "type": "java.lang.String",
        "value": "    \"\"\"\n    cat $db > db2.json\n    \"\"\"\n"
    },
    {
        "hash": "112f589ee6fa1b07d3f510e3885ea446",
        "type": "java.lang.String",
        "value": "master:5000/stress:latest"
    },
    {
        "hash": "0d39a5ff3a5c828a386e57fe6d0f07cd",
        "type": "java.lang.String",
        "value": "db"
    },
    {
        "hash": "9598b73b3492f1a8034f97fd39eff09f",
        "type": "nextflow.util.ArrayBag",
        "value": "[FileHolder(sourceObj:/data/workspace/1/nf-hello/db/a.txt, storePath:/data/workspace/1/nf-hello/db/a.txt, stageName:a.txt)]"
    },
    {
        "hash": "4f9d4b0d22865056c37fb6d9c2a04a67",
        "type": "java.lang.String",
        "value": "$"
    },
    {
        "hash": "16fe7483905cce7a85670e43e4678877",
        "type": "java.lang.Boolean",
        "value": "true"
    }
]

If nothing else, the cached Hasche will be the same the next time you run it. But if something goes wrong and an insignificant parameter is accidentally added to the script, Hasche changes.
For example, adding some spaces.

cat $db            > db2.json
[saySecond (1)] cache hash: bec5c53e5888f9afc06d945fe5fb7d56; mode: LENIENT; entries: [
    {
        "hash": "52fc10538560a06bf4eefb5029a3a408",
        "type": "java.util.UUID",
        "value": "68e313b7-a96b-482f-aa5a-5d3c2971779c"
    },
    {
        "hash": "4f60e6294a34dfbe2dd400257e58d05e",
        "type": "java.lang.String",
        "value": "saySecond"
    },
    {
        "hash": "029c7f28b3785d1c47bdc397d45af9c8",
        "type": "java.lang.String",
        "value": "    \"\"\"\n    cat $db       > db2.json\n    \"\"\"\n"
    },
    {
        "hash": "112f589ee6fa1b07d3f510e3885ea446",
        "type": "java.lang.String",
        "value": "master:5000/stress:latest"
    },
    {
        "hash": "0d39a5ff3a5c828a386e57fe6d0f07cd",
        "type": "java.lang.String",
        "value": "db"
    },
    {
        "hash": "9598b73b3492f1a8034f97fd39eff09f",
        "type": "nextflow.util.ArrayBag",
        "value": "[FileHolder(sourceObj:/data/workspace/1/nf-hello/db/a.txt, storePath:/data/workspace/1/nf-hello/db/a.txt, stageName:a.txt)]"
    },
    {
        "hash": "4f9d4b0d22865056c37fb6d9c2a04a67",
        "type": "java.lang.String",
        "value": "$"
    },
    {
        "hash": "16fe7483905cce7a85670e43e4678877",
        "type": "java.lang.Boolean",
        "value": "true"
    }
]

Is it possible to have a command that manually computes the Hasche of the current input and script, then replaces the Hasche of the database with the newly computed Hasche, allowing cache a time-consuming task.

Or add a field to the database that can be forced to be cached, and start forcing the cache when it is determined that the result has been generated

@robsyme
Copy link
Collaborator Author

robsyme commented Sep 28, 2024

@SPPearce
Copy link
Contributor

This would be extremely useful for the in-house pipeline that I'm currently building; I have ended up using a bunch of subMaps to only select the parts of the meta map that are required for each process, to ensure that changing the initial metadata doesn't invalidate the entire cache.

@bentsherman
Copy link
Member

Isn't it the same amount of work to provide the subMap as an input vs through a custom cache directive?

@pditommaso
Copy link
Member

+1 for subMap. we could think custom cache when will have custom type, and in which we may think to have some declaration about caching

@robsyme
Copy link
Collaborator Author

robsyme commented Oct 22, 2024

Ben - the issue will be re-joining the full metadata back after the process. Contrast:

workflow {
    people = Channel.of(
        [name:"Rob", employer:"Seqera", loves:"Nextflow, and making Ben's life harder"],
        [name:"Ben", employer:"Seqera", loves:"Nextflow"],
        [name:"Paolo", employer:"Seqera", loves:"Nextflow"],
        [name:"Simon", employer:"NeoGenomics", loves:"Nextflow"]
    )
    
    people
    | map { person -> person.subMap('name', 'employer') }
    | DoSomething
    | join(people.map { person -> [person.subMap('name', 'employer'), person] })
    | map { _submap, txtFile, person -> [person, txtFile] }
    | view
}

process DoSomething {
    input: val(person)
    output: tuple val(person), path("${person.name}.txt")
    script:
    """
    echo "Hello ${person.name} from ${person.employer}" > ${person.name}.txt
    """
}

with

workflow {
    people = Channel.of(
        [name:"Rob", employer:"Seqera", loves:"Nextflow, and making Ben's life harder"],
        [name:"Ben", employer:"Seqera", loves:"Nextflow"],
        [name:"Paolo", employer:"Seqera", loves:"Nextflow"],
        [name:"Simon", employer:"NeoGenomics", loves:"Nextflow"]
    )
    | DoSomething
    | view
}

process DoSomething {
    cache { person.subMap('person', 'employer') }
    input: val(person)
    output: tuple val(person), path("${person.name}.txt")
    script:
    """
    echo "Hello ${person.name} from ${person.employer}" > ${person.name}.txt
    """
}

That said, the longer I think about it, I actually prefer the idea that a task's cache inputs should describe everything that you need to reconstruct the outputs.

@SPPearce
Copy link
Contributor

SPPearce commented Oct 23, 2024

Exactly Rob. And it is easy to screw up the joins, because you need to ensure that you have exactly the right fields to map on (which is another bugbear of mine; it'd be really nice to have a join that will match [name:"Ben", employer:"Seqera", loves:"Nextflow"] with [name:"Ben", employer:"Seqera"]).
Even better would be if the cache automatically detected only the fields of person that it actually used, but I realise that is a harder task.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants