Skip to content

Commit

Permalink
Merge pull request #1 from famosab/subworkflow
Browse files Browse the repository at this point in the history
wip
  • Loading branch information
famosab authored Oct 28, 2024
2 parents ddc3685 + 4d2b0e2 commit 5cbc66a
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 21 deletions.
10 changes: 10 additions & 0 deletions modules.json
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,16 @@
"modules"
]
},
"parabricks/applybqsr": {
"branch": "master",
"git_sha": "666652151335353eef2fcd58880bcef5bc2928e1",
"installed_by": ["modules"]
},
"parabricks/fq2bam": {
"branch": "master",
"git_sha": "666652151335353eef2fcd58880bcef5bc2928e1",
"installed_by": ["modules"]
},
"samblaster": {
"branch": "master",
"git_sha": "666652151335353eef2fcd58880bcef5bc2928e1",
Expand Down
3 changes: 3 additions & 0 deletions modules/nf-core/parabricks/applybqsr/environment.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions modules/nf-core/parabricks/applybqsr/main.nf

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions modules/nf-core/parabricks/applybqsr/meta.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 32 additions & 20 deletions subworkflows/local/fastq_align_parabricks/main.nf
Original file line number Diff line number Diff line change
@@ -1,35 +1,47 @@
// TODO nf-core: If in doubt look at other nf-core/subworkflows to see how we are doing things! :)
// https://github.com/nf-core/modules/tree/master/subworkflows
// You can also ask for help via your pull request or on the #subworkflows channel on the nf-core Slack workspace:
// https://nf-co.re/join
// TODO nf-core: A subworkflow SHOULD import at least two modules
//
// Alignment and BQSR with Nvidia CLARA Parabricks
//

include { PARABRICKS_FQ2BAM } from '../../../modules/nf-core/parabricks/fq2bam/main'
include { APPLYBQSR } from '../../../modules/nf-core/gatk/applybqsr/main'

workflow FASTQ_ALIGN_PARABRICKS {

take:
// TODO nf-core: edit input (take) channels
ch_bam // channel: [ val(meta), [ bam ] ]
ch_reads // channel: [mandatory] meta, reads
ch_interval_file // channel: [optional for parabricks] intervals_bed_combined
// val_sort // boolean: [mandatory] true -> sort, false -> don't sort
ch_fasta
ch_fasta_fai
ch_known_sites // channel [optional for parabricks] known_sites_indels

main:

ch_reports = Channel.empty()
ch_versions = Channel.empty()
ch_bam = Channel.empty()
ch_bai = Channel.empty()
ch_bqsr_table = Channel.empty()
ch_qc_metrics = Channel.empty()
ch_duplicate_metrics = Channel.empty()

// TODO nf-core: substitute modules here for the modules of your subworkflow
PARABRICKS_FQ2BAM(ch_reads.map{meta, reads -> [ meta, ch_reads, ch_interval_file ]}, ch_fasta, ch_fasta_fai, ch_known_sites)

SAMTOOLS_SORT ( ch_bam )
ch_versions = ch_versions.mix(SAMTOOLS_SORT.out.versions.first())
// Collecting FQ2BAM outputs
ch_bam = bam.mix(PARABRICKS_FQ2BAM.out.bam)
ch_bai = bai.mix(PARABRICKS_FQ2BAM.out.bai)
ch_bqsr_table = ch_bqsr_table(PARABRICKS_FQ2BAM.out.bqsr_table)
ch_qc_metrics = ch_qc_metrics(PARABRICKS_FQ2BAM.out.qc_metrics)
ch_duplicate_metrics = ch_duplicate_metrics(PARABRICKS_FQ2BAM.out.duplicate_metrics)

SAMTOOLS_INDEX ( SAMTOOLS_SORT.out.bam )
ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions.first())
// Apply BQSR
PARABRICKS_APPLYBQSR(ch_bam.map{meta, bam -> [ch_bam, ch_bai, ch_bqsr_table, ch_interval_file] })

emit:
// TODO nf-core: edit emitted channels
bam = SAMTOOLS_SORT.out.bam // channel: [ val(meta), [ bam ] ]
bai = SAMTOOLS_INDEX.out.bai // channel: [ val(meta), [ bai ] ]
csi = SAMTOOLS_INDEX.out.csi // channel: [ val(meta), [ csi ] ]
ch_versions = ch_versions.mix(PARABRICKS_FQ2BAM.out.versions)
ch_versions = ch_versions.mix(PARABRICKS_APPLYBQSR.out.versions)

versions = ch_versions // channel: [ versions.yml ]
}
emit:
bam = PARABRICKS_APPLYBQSR.out.bam // channel: [ [meta], bam ]
bai = PARABRICKS_APPLYBQSR.out.bai // channel: [ [meta], bai ]
versions = ch_versions // channel: [ versions.yml ]

}
69 changes: 69 additions & 0 deletions subworkflows/local/fastq_align_parabricks/tests/main.nf.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
nextflow_workflow {

name "Test Subworkflow FASTQ_ALIGN_PARABRICKS"
script "../main.nf"
config "./nextflow.config"
workflow "FASTQ_ALIGN_PARABRICKS"

tag "subworkflows"
tag "subworkflows_nfcore"
tag "subworkflows/fastq_align_parabricks"
tag "parabricks"
tag "parabricks/fq2bam"
tag "parabricks/applybqsr"

test("fastq_align_parabricks_single_end") {

when {
workflow {
"""
input[0] = Channel.of([
[ id:'test', single_end:true ],
[ file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_1.fastq.gz', checkIfExists: true) ]])
input[1] = [] // interval file
input[2] = Channel.value([
[id: 'reference'],
file(params.modules_testdata_base_path + 'genomics/sarscov2/genome/genome.fasta', checkIfExists: true)
])
input[3] = Channel.value([
[id: 'reference_index'],
file(params.modules_testdata_base_path + 'genomics/sarscov2/genome/genome.fasta.fai', checkIfExists: true)
])
input[4] = Channel.value([
[id: 'known_sites'],
file(params.test_data['sarscov2']['illumina']['test_vcf_gz'], checkIfExists: true)
])
"""
}
}

then {
assertAll(
{ assert workflow.success},
{ assert snapshot(workflow.out).match()}
)
}
}

// test("fastq_align_parabricks_paired_end") {

// when {
// workflow {
// """
// input[0] = Channel.of([[ id:'test', single_end:false ], [file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_1.fastq.gz', checkIfExists: true), file(params.modules_testdata_base_path + 'genomics/sarscov2/illumina/fastq/test_2.fastq.gz', checkIfExists: true)]
// ] )
// input[1] = BWA_INDEX.out.index
// input[2] = false
// input[3] = Channel.value([[id: 'genome'], file(params.modules_testdata_base_path + 'genomics/sarscov2/genome/genome.fasta', checkIfExists: true)])
// """
// }
// }

// then {
// assertAll(
// { assert workflow.success},
// { assert snapshot(workflow.out).match()}
// )
// }
// }
}
33 changes: 32 additions & 1 deletion workflows/sarek/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ workflow SAREK {
reports = reports.mix(FASTP.out.json.collect{ meta, json -> json })
reports = reports.mix(FASTP.out.html.collect{ meta, html -> html })

if (params.aligner = 'parabricks') {

params.split_fastq = 0

}

if (params.split_fastq) {
reads_for_alignment = FASTP.out.reads.map{ meta, reads ->
read_files = reads.sort(false) { a,b -> a.getName().tokenize('.')[0] <=> b.getName().tokenize('.')[0] }.collate(2)
Expand All @@ -274,6 +280,30 @@ workflow SAREK {
// First, we must calculate number of lanes for each sample (meta.n_fastq)
// This is needed to group reads from the same sample together using groupKey to avoid stalling the workflow
// when reads from different samples are mixed together

if (params.aligner = 'parabricks') {

fastq_mapped = reads_for_alignment
.combine(reads_grouping_key) // Creates a tuple of [ meta, bam, reads_grouping_key ]
.filter { meta1, files, meta2 -> meta1.sample == meta2.sample }
// Add n_fastq and other variables to meta
.map { meta1, files, meta2 ->
[ meta1 + meta2, bam ]
}
// Manipulate meta map to remove old fields and add new ones
.map { meta, files ->
[ meta - meta.subMap('id', 'read_group', 'data_type', 'num_lanes', 'read_group', 'size') + [ data_type: 'fastq_gz', id: meta.sample ], fastq ]
}
// Create groupKey from meta map
.map { meta, files ->
[ groupKey( meta, meta.n_fastq), fastq ]
}
// Group
.groupTuple()


}

reads_for_alignment.map { meta, reads ->
[ meta.subMap('patient', 'sample', 'sex', 'status'), reads ]
}
Expand All @@ -292,7 +322,8 @@ workflow SAREK {
// reads will be sorted
sort_bam = true
FASTQ_ALIGN_BWAMEM_MEM2_DRAGMAP_SENTIEON(reads_for_alignment, index_alignment, sort_bam, fasta, fasta_fai)

FASTQ_ALIGN_PARABRICKS(reads_for_alignment...)

// Grouping the bams from the same samples not to stall the workflow
// Use groupKey to make sure that the correct group can advance as soon as it is complete
// and not stall the workflow until all reads from all channels are mapped
Expand Down

0 comments on commit 5cbc66a

Please sign in to comment.