diff --git a/conf/functional_test.config b/conf/functional_test.config index b1fa1547f4449fbb6b35617bb1f5b6e376ac2c89..62682378f03969a77a45cb3bd8630ea2092b320b 100644 --- a/conf/functional_test.config +++ b/conf/functional_test.config @@ -5,7 +5,7 @@ singularity.enabled = true singularity.autoMounts = true process { - container = "/work/project/plateforme/metaG/functional_test/singularity_img/metagwgs.sif" + container = "/work/project/plateforme/metaG/functional_test/singularity_img/dev_v2.4/metagwgs.sif" withLabel: BINNING { container = "/work/project/plateforme/metaG/functional_test/singularity_img/binning.sif" } diff --git a/docs/usage.md b/docs/usage.md index ed48bf64eb10adb77e299d7e9ea4015e4924406e..a0907e2a70137cdfddd6f4d41fc85052d6a51f23 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -118,7 +118,7 @@ For HiFi the fastq2 column is not needed > name_sample2,number_flowcell,path_sample2_R1.fastq.gz,path_sample2_R2.fastq.gz > ``` - * **If you want to perfom cross alignment for binning on groups** : + * **If you want to perfom coassembly or cross alignment for binning on groups** : > ``` > sample,group,fastq_1,fastq_2 > name_sample1,group_name,path_sample1_flowcell1_R1.fastq.gz,path_sample1_flowcell1_R2.fastq.gz @@ -276,6 +276,7 @@ No parameter available for this substep. * `--assembly` allows to indicate the assembly tool. For short reads: `["metaspades" or "megahit"]`: Default: `metaspades`. For HiFi reads: `["hifiasm-meta", "metaflye"]`. Default: `hifiasm-meta`. +* `--coassembly` allows to assemble together the samples labeled with the same group in the samplesheet. It will generate one assembly for each group. To co-assemble all of your samples together, you must indicate a unique group for each sample in the samplesheet. **WARNING** With the coassembly, you can't use `--binning_cross_alignment 'group'` because one binning will be generate for each group co-assembled and automatically mapping with every sample of his group but `--binning_cross_alignment 'all'` can be use to cross align every sample with every group. **WARNING 4:** For short reads, the user can choose between `metaspades` or `megahit` for `--assembly` parameter. The choice can be based on CPUs and memory availability: `metaspades` needs more CPUs and memory than `megahit` but our tests showed that assembly metrics are better for `metaspades` than `megahit`.For PacBio HiFi reads, the user can choose between `hifiasm-meta` or `metaflye`. @@ -352,6 +353,8 @@ For example check the version of software in the yaml file and the singularity r * `--gtdbtk_bank`: indicates path to the GTDBTK database. +* `--metabat2_seed`: Set the seed for metabat2, for exact reproducibility of metabat2 (default: 0 (random seed)) + * `--binning_cross_alignment ["all","group","individual"]`: defines mapping strategy to compute co-abundances for binning. `all` means that each samples will be mapped against every assembly, `group` means that all sample from a group will be mapped against every assembly of the group, `individual` means that each sample will only be mapped against his assembly. Default `individual` diff --git a/main.nf b/main.nf index 848dfb038a962141bf33c2cd7c7a07476fbf5b7e..ac146146aa8da400a4fb9076a4abd8e876adb6cb 100644 --- a/main.nf +++ b/main.nf @@ -23,7 +23,7 @@ include { STEP_08_BINNING as S08_BINNING } from './subworkflows/08_binning' include { GET_SOFTWARE_VERSIONS } from './modules/get_software_versions' include { MULTIQC } from './modules/multiqc' -include { MERGE_FASTQ } from './modules/merge_fastq.nf' + /* @@ -65,6 +65,7 @@ include { MERGE_FASTQ } from './modules/merge_fastq.nf' --stop_at_assembly Stop the pipeline at this step --assembly Indicate the assembly tool for short reads ["metaspades" or "megahit" ]. Default: "metaspades". or for HiFi reads ["hifiasm-meta", "metaflye"]. Default: "hifiasm-meta". + --coassembly Assemble together samples labeled with the same group in the samplesheet S03_FILTERING options: --stop_at_filtering Stop the pipeline at this step @@ -213,6 +214,10 @@ workflow { exit 1, "You must specify --skip_binning or specify a GTDB-TK bank with --gtdbtk_bank" } + if ( params.coassembly && params.binning_cross_alignment == 'group'){ + exit 1, "--binning_cross_alignment group must not be use use --coassembly." + } + //////////// @@ -228,8 +233,8 @@ workflow { if (hasExtension(row.fastq_1, "fastq") || hasExtension(row.fastq_1, "fq") || hasExtension(row.fastq_2, "fastq") || hasExtension(row.fastq_2, "fq")) { exit 1, "We do recommend to use gziped fastq file to help you reduce your data footprint." } - if (params.binning_cross_alignment == 'group' && row.group == null){ - exit 1, "You must specify groups in the samplesheet if you want to use --binning_cross_alignment 'group'" + if ((params.binning_cross_alignment == 'group' || params.coassembly) && row.group == null){ + exit 1, "You must specify groups in the samplesheet if you want to use --binning_cross_alignment 'group' or --coassembly" } ["sample":row.sample, "flowcell":row.flowcell, @@ -244,7 +249,6 @@ workflow { def meta = [:] meta.id = item.sample if (item.flowcell!=null) { meta.id = meta.id+"_"+item.flowcell} - if (item.group !=null) {meta.id = meta.id+"_"+item.group} meta.sample = item.sample meta.flowcell = item.flowcell meta.group = item.group @@ -265,7 +269,6 @@ workflow { def meta = [:] meta.id = item.sample if (item.flowcell!=null) { meta.id = meta.id+"_"+item.flowcell} - if (item.group !=null) {meta.id = meta.id+"_"+item.group} meta.sample = item.sample meta.flowcell = item.flowcell meta.group = item.group @@ -346,7 +349,7 @@ workflow { ch_host_index, ch_kaiju_db ) - ch_preprocessed_reads = S01_CLEAN_QC.out.preprocessed_reads + ch_reads = S01_CLEAN_QC.out.preprocessed_reads ch_cutadapt_report = S01_CLEAN_QC.out.cutadapt_report ch_sickle_report = S01_CLEAN_QC.out.sickle_report @@ -357,54 +360,11 @@ workflow { ch_kaiju_report = S01_CLEAN_QC.out.kaiju_report ch_software_versions_S01 = S01_CLEAN_QC.out.software_versions - } else { - ch_preprocessed_reads = ch_reads } if ( !params.stop_at_clean ) { - if (!has_assembly & has_flowcell ){ - ////////////////// - // Manage Flowcell - ////////////////// - ch_reads_tmp = ch_preprocessed_reads - .map { - meta, fastq -> - [ meta.sample, meta, fastq ] - } - .groupTuple(by: [0]) - .branch { - id, meta, fastq -> - single : fastq.size() == 1 - return [[id:meta.sample.unique().join(), - sample:meta.sample.unique().join(), - flowcell:meta.flowcell.join("_"), - group:meta.group.unique().join(), - assembly:meta.assembly.unique().join(), - type:meta.type.unique().join()], fastq.flatten() ] - multiple: fastq.size() > 1 - return [[id:meta.sample.unique().join(), - sample:meta.sample.unique().join(), - flowcell:meta.flowcell.join("_"), - group:meta.group.unique().join(), - assembly:meta.assembly.unique().join(), - type:meta.type.unique().join()], fastq.flatten() ] - } - - - MERGE_FASTQ ( - ch_reads_tmp.multiple - ) - .reads - .mix(ch_reads_tmp.single) - .set{ch_preprocessed_reads} - } - - ///////////////////// - //End manage Flowcell - ///////////////////// - - S02_ASSEMBLY ( ch_preprocessed_reads, ch_assembly, has_assembly, assembly_tool ) + S02_ASSEMBLY ( ch_reads, ch_assembly, has_assembly, assembly_tool, has_flowcell ) ch_assembly = S02_ASSEMBLY.out.assembly ch_reads = S02_ASSEMBLY.out.reads ch_bam = S02_ASSEMBLY.out.bam @@ -426,7 +386,7 @@ workflow { S03_FILTERING ( ch_assembly, - ch_preprocessed_reads, + ch_reads, ch_idxstats, ch_bam, ch_min_contigs_cpm, diff --git a/modules/assembly.nf b/modules/assembly.nf index 98cf3c6c5ac4f2c2c849b99d34e17d0ec552f7bd..5c24b432946f1c743772b3fb0a50f8441406aacb 100644 --- a/modules/assembly.nf +++ b/modules/assembly.nf @@ -4,7 +4,7 @@ process METASPADES { label 'ASSEMBLY_SR' input: - tuple val(meta), path(reads) + tuple val(meta), path(read1), path(read2) output: tuple val(meta), path("metaspades_${meta.id}/${meta.id}.contigs.fa"), emit: assembly @@ -15,7 +15,21 @@ process METASPADES { (_,mem,unit) = (task.memory =~ /(\d+) ([A-Z]B)/)[0] if ( unit =~ /GB/ ) { """ - metaspades.py -t ${task.cpus} -m $mem -1 ${reads[0]} -2 ${reads[1]} -o metaspades_${meta.id} + echo " +[ + { + orientation: \\"fr\\", + type: \\"paired-end\\", + right reads: [ + \\"${read1.join('\\",\n \\"')}\\" + ], + left reads: [ + \\"${read2.join('\\",\n \\"')}\\" + ] + } +]" > input.yaml + + metaspades.py -t ${task.cpus} -m $mem --dataset input.yaml -o metaspades_${meta.id} mv metaspades_${meta.id}/scaffolds.fasta metaspades_${meta.id}/${meta.id}.contigs.fa mv metaspades_${meta.id}/spades.log metaspades_${meta.id}/${meta.id}.log mv metaspades_${meta.id}/params.txt metaspades_${meta.id}/${meta.id}.params.txt @@ -34,7 +48,7 @@ process MEGAHIT { label 'ASSEMBLY_SR' input: - tuple val(meta), path(reads) + tuple val(meta), path(read1), path(read2) output: tuple val(meta), path("megahit_${meta.id}/${meta.id}.contigs.fa"), emit: assembly @@ -43,7 +57,7 @@ process MEGAHIT { script: """ - megahit -t ${task.cpus} -1 ${reads[0]} -2 ${reads[1]} -o megahit_${meta.id} --out-prefix "${meta.id}" + megahit -t ${task.cpus} -1 ${read1.join(',')} -2 ${read2.join(',')} -o megahit_${meta.id} --out-prefix "${meta.id}" mv megahit_${meta.id}/options.json megahit_${meta.id}/${meta.id}.params.txt rm -r megahit_${meta.id}/intermediate_contigs @@ -69,7 +83,7 @@ process HIFIASM_META { """ mkdir hifiasm_meta_${meta.id} - hifiasm_meta -t ${task.cpus} -o ${meta.id} $reads 2> hifiasm_meta_${meta.id}/${meta.id}.log + hifiasm_meta -t ${task.cpus} -o ${meta.id} ${reads.join(' ')} 2> hifiasm_meta_${meta.id}/${meta.id}.log # gfa to fasta format awk '/^S/{print ">"\$2"\\n"\$3}' ${meta.id}.p_ctg.gfa | fold > hifiasm_meta_${meta.id}/${meta.id}.contigs.fa @@ -98,7 +112,7 @@ process METAFLYE { """ mkdir metaflye_${meta.id} - flye --pacbio-hifi $reads -o 'metaflye_${meta.id}' --meta -t ${task.cpus} 2> metaflye_${meta.id}/${meta.id}.log + flye --pacbio-hifi ${reads.join(' ')} -o 'metaflye_${meta.id}' --meta -t ${task.cpus} 2> metaflye_${meta.id}/${meta.id}.log mv metaflye_${meta.id}/assembly.fasta metaflye_${meta.id}/${meta.id}.contigs.fa mv metaflye_${meta.id}/params.json metaflye_${meta.id}/${meta.id}.params.json diff --git a/modules/binning.nf b/modules/binning.nf index c90306a79d44979f11785a20dfc1128558818c07..431fb01ccabb8a318b778753c67852325cf16734 100644 --- a/modules/binning.nf +++ b/modules/binning.nf @@ -30,7 +30,7 @@ process METABAT2 { script: """ mkdir -p metabat2/bins/ - metabat2 --inFile $fna --abdFile $depth --outFile metabat2/bins/${meta.id}_metabat2 --numThreads ${task.cpus} + metabat2 --inFile $fna --abdFile $depth --outFile metabat2/bins/${meta.id}_metabat2 --numThreads ${task.cpus} --seed ${params.metabat2_seed} echo \$(metabat2 -h 2>&1) > v_metabat2.txt """ } @@ -130,7 +130,7 @@ process METAWRAP_REFINMENT { path "v_metawrap.txt", emit: v_metawrap script: - bins_flag = (bins3 != null) ? "-A $bins1 -B $bins2 -C $bins3" : "-A $bins2 -B $bins2 " + bins_flag = (bins3 != null) ? "-A $bins1 -B $bins2 -C $bins3" : "-A $bins1 -B $bins2 " """ echo "metawrap 1.3_modified" > v_metawrap.txt diff --git a/modules/cd_hit.nf b/modules/cd_hit.nf index 3896ec1cad19614edb0255afc1a39d4ec0e31b85..b341142d5b122cddc788d4cb2b9ba0008d37ed2d 100644 --- a/modules/cd_hit.nf +++ b/modules/cd_hit.nf @@ -15,7 +15,7 @@ process INDIVIDUAL_CD_HIT { script: """ cd-hit-est -c ${pct_id} -i ${ffn} -o ${meta.id}.cd-hit-est.${pct_id}.fasta -T ${task.cpus} -M ${task.mem} -d 150 - cat ${meta.id}.cd-hit-est.${pct_id}.fasta.clstr | cd_hit_produce_table_clstr.py > ${meta.id}.cd-hit-est.${pct_id}.table_cluster_contigs.txt + cd_hit_produce_table_clstr.py -i ${meta.id}.cd-hit-est.${pct_id}.fasta.clstr -o ${meta.id}.cd-hit-est.${pct_id}.table_cluster_contigs.txt echo \$(cd-hit -h 2>&1) > v_cdhit.txt """ } @@ -54,7 +54,6 @@ main: INDIVIDUAL_CD_HIT( ch_assembly, ch_percentage_identity ) ch_individual_clusters = INDIVIDUAL_CD_HIT.out.clstr_fasta.collect() GLOBAL_CD_HIT(ch_individual_clusters , ch_percentage_identity ) - ch_ffn = ch_assembly.flatMap{it -> it[1]}.collect() emit: individual_clstr_table = INDIVIDUAL_CD_HIT.out.clstr_table diff --git a/modules/feature_counts.nf b/modules/feature_counts.nf index 73ec38c6e7ac033b902c4716954e11051e5fe94f..63eb79ac2877839bdff59b4265dcd470b4017598 100644 --- a/modules/feature_counts.nf +++ b/modules/feature_counts.nf @@ -44,14 +44,12 @@ process QUANTIFICATION_TABLE { workflow QUANTIFICATION { take: - ch_gff // channel: [ val(meta), path(gff) ] - ch_bam // channel: [ val(meta), path(bam), path(bam_index) ] + ch_gff_and_bam // channel: [ val(meta), path(gff), path(bam), path(bam_index) ] ch_individual_clstr_table ch_global_clstr_table main: - ch_gff_and_bam = ch_gff.join(ch_bam, remainder: false) - + FEATURE_COUNTS(ch_gff_and_bam) ch_count_table = FEATURE_COUNTS.out.count_table.collect() ch_quant_report = FEATURE_COUNTS.out.summary diff --git a/modules/read_alignment.nf b/modules/read_alignment.nf index 881f6c87062fbe1515669c0ec9309a7b7136f70c..5183c8b8754dc78e7e5d124ead32d1251ae233a5 100644 --- a/modules/read_alignment.nf +++ b/modules/read_alignment.nf @@ -1,6 +1,6 @@ process BWA_MEM { tag "${meta.id}" - publishDir "${params.outdir}/$publishDir_path/${meta.id}", mode: 'copy', pattern:"*sort*" + publishDir "${params.outdir}/$publishDir_path/${meta.id}", mode: 'copy', pattern:"*bam*" input: tuple val(meta), path(fna), path(reads) @@ -8,16 +8,16 @@ process BWA_MEM { output: - tuple val(meta), path("${meta.id}.sort.bam"), path("${meta.id}.sort.bam.bai"), emit: bam + tuple val(meta), path("${meta.id}.bam"), path("${meta.id}.bam.bai"), emit: bam path "v_bwa.txt", emit: v_bwa_mem2 path "v_samtools.txt", emit: v_samtools script: """ bwa-mem2 index ${fna} -p ${fna} - bwa-mem2 mem -t ${task.cpus} ${fna} ${reads[0]} ${reads[1]} | samtools view -@ ${task.cpus} -bS - | samtools sort -@ ${task.cpus} - -o ${meta.id}.sort.bam + bwa-mem2 mem -t ${task.cpus} ${fna} ${reads[0]} ${reads[1]} | samtools view -@ ${task.cpus} -bS - | samtools sort -@ ${task.cpus} - -o ${meta.id}.bam - samtools index -@ ${task.cpus} ${meta.id}.sort.bam + samtools index -@ ${task.cpus} ${meta.id}.bam bwa-mem2 version > v_bwa.txt samtools --version &> v_samtools.txt @@ -27,23 +27,23 @@ process BWA_MEM { process MINIMAP2 { tag "${meta.id}" label 'MINIMAP2' - publishDir "${params.outdir}/$publishDir_path/${meta.id}", mode: 'copy', pattern:"*sort*" + publishDir "${params.outdir}/$publishDir_path/${meta.id}", mode: 'copy', pattern:"*bam*" input: tuple val(meta), path(fna), path(reads) val(publishDir_path) output: - tuple val(meta), path("${meta.id}.sort.bam"), path("${meta.id}.sort.bam.bai"), emit: bam + tuple val(meta), path("${meta.id}.bam"), path("${meta.id}.bam.bai"), emit: bam path "v_minimap2.txt", emit: v_minimap2 path "v_samtools.txt", emit: v_samtools script: """ # align reads to contigs, keep only primary aln and sort resulting bam - minimap2 -t ${task.cpus} -ax map-hifi $fna $reads | samtools view -@ ${task.cpus} -b -F 2304 | samtools sort -@ ${task.cpus} -o ${meta.id}.sort.bam + minimap2 -t ${task.cpus} -ax map-hifi $fna $reads | samtools view -@ ${task.cpus} -b -F 2304 | samtools sort -@ ${task.cpus} -o ${meta.id}.bam - samtools index ${meta.id}.sort.bam -@ ${task.cpus} + samtools index ${meta.id}.bam -@ ${task.cpus} samtools --version &> v_samtools.txt minimap2 --version &> v_minimap2.txt diff --git a/nextflow.config b/nextflow.config index 1516f4c193d32ed642ca1ccdb0f4ebfab828c758..b0f4b71604e27d890a5927e13beae3203e337099 100644 --- a/nextflow.config +++ b/nextflow.config @@ -21,6 +21,7 @@ params { gtdbtk_bank = "" percentage_identity = 0.95 type = "" + coassembly = false // Stop after step or skip optional step/sub-step. @@ -58,6 +59,7 @@ params { drep_threshold = 0.95 min_completeness = 50 max_contamination = 10 + metabat2_seed = 0 // Ressources. kaiju_db_dir = false diff --git a/subworkflows/02_assembly.nf b/subworkflows/02_assembly.nf index d300ba9ba701ee0aadf1748e6b8a6184adea0728..46c4da520ebdf66e304ce8443d440b1854c71cbb 100644 --- a/subworkflows/02_assembly.nf +++ b/subworkflows/02_assembly.nf @@ -3,13 +3,15 @@ include { QUAST as ASSEMBLY_QUAST} from '../modules/metaquast' include { READS_DEDUPLICATION } from '../modules/reads_deduplication' include { MINIMAP2 } from '../modules/read_alignment' include { GET_ALIGNMENT_METRICS } from '../modules/read_alignment_manipulation' +include { MERGE_FASTQ } from '../modules/merge_fastq.nf' workflow STEP_02_ASSEMBLY { take: - reads - assembly - has_assembly - assembly_tool + reads + assembly + has_assembly + assembly_tool + has_flowcell main: ch_assembler_v = Channel.empty() @@ -17,36 +19,107 @@ workflow STEP_02_ASSEMBLY { ch_bwa_mem_v = Channel.empty() ch_minimap2_v = Channel.empty() ch_samtools_v = Channel.empty() - - if (has_assembly){ - ch_assembly = assembly - } - else { - if(assembly_tool == 'metaspades') { - METASPADES(reads) - ch_assembly = METASPADES.out.assembly - ch_assembler_v = METASPADES.out.v_metaspades - } - else if(assembly_tool == 'megahit') { - MEGAHIT(reads) - ch_assembly = MEGAHIT.out.assembly - ch_assembler_v = MEGAHIT.out.v_megahit - } - else if(assembly_tool == 'hifiasm-meta') { - HIFIASM_META(reads) - ch_assembly = HIFIASM_META.out.assembly - ch_assembler_v = HIFIASM_META.out.v_hifiasm_meta - } - else if(assembly_tool == 'metaflye') { - METAFLYE(reads) + + + ch_assembly = assembly + ch_reads = reads + + if (!has_assembly & has_flowcell ){ + ////////////////// + // Manage Flowcell + ////////////////// + ch_reads_flowcell = reads + .map { meta, fastq -> + [ meta.sample, meta, fastq ] } + .groupTuple(by: [0]) + .branch { id, meta, fastq -> + single : fastq.size() == 1 + return [[id:meta.sample.unique().join(), + sample:meta.sample.unique().join(), + flowcell:meta.flowcell.join("_"), + group:meta.group.unique().join(), + assembly:meta.assembly.unique().join(), + type:meta.type.unique().join()], fastq.flatten() ] + multiple: fastq.size() > 1 + return [[id:meta.sample.unique().join(), + sample:meta.sample.unique().join(), + flowcell:meta.flowcell.join("_"), + group:meta.group.unique().join(), + assembly:meta.assembly.unique().join(), + type:meta.type.unique().join()], fastq.flatten() ] + } + + MERGE_FASTQ (ch_reads_flowcell.multiple) + .reads + .mix(ch_reads_flowcell.single) + .set{ch_reads} + } + + if (params.coassembly){ + ch_reads.map { meta, fastq -> + [ meta.group, meta, fastq] } + .groupTuple(by: [0]) + .map { group, metas, fastq -> + def meta = [:] + meta.id = metas.group.unique().join() + meta.sample = metas.sample.join("_") + meta.flowcell = metas.flowcell.unique().join() + meta.group = metas.group.unique().join() + meta.assembly = metas.assembly.unique().join() + meta.type = metas.type.unique().join() + if (params.type.toUpperCase() == "SR") { + return [meta, fastq.collect { it[0] }, fastq.collect { it[1] }] + } else { + return [meta, fastq ] + }} + .set { ch_reads_assembly } + + if (has_assembly){ + ch_assembly = assembly.map { meta, assembly -> + [ meta.group, meta, assembly] } + .groupTuple(by: [0]) + .map{ group, metas, assembly -> + def meta = [:] + meta.id = metas.group.unique().join() + meta.sample = metas.sample.join("_") + meta.flowcell = metas.flowcell.unique().join() + meta.group = metas.group.unique().join() + meta.assembly = metas.assembly.unique().join() + meta.type = metas.type.unique().join() + return [meta, assembly[0]] } + } + + } else if (params.type.toUpperCase() == "SR") { + ch_reads_assembly = ch_reads + .map { meta, fastq -> + return [meta, fastq[0], fastq[1]] } + + } else { + ch_reads_assembly = ch_reads + } + + if (!has_assembly){ + if (assembly_tool == 'metaspades') { + METASPADES(ch_reads_assembly) + ch_assembly = METASPADES.out.assembly + ch_assembler_v = METASPADES.out.v_metaspades + } else if (assembly_tool == 'megahit') { + MEGAHIT(ch_reads_assembly) + ch_assembly = MEGAHIT.out.assembly + ch_assembler_v = MEGAHIT.out.v_megahit + } else if (assembly_tool == 'hifiasm-meta') { + HIFIASM_META(ch_reads_assembly) + ch_assembly = HIFIASM_META.out.assembly + ch_assembler_v = HIFIASM_META.out.v_hifiasm_meta + } else if (assembly_tool == 'metaflye') { + METAFLYE(ch_reads_assembly) ch_assembly = METAFLYE.out.assembly ch_assembler_v = METAFLYE.out.v_metaflye - } - else { + } else { exit 1, "Invalid assembly parameter: ${assembly_tool}" - } + } } - + ch_assembly_renamed = RENAME_CONTIGS(ch_assembly) ch_assembly_quast = ch_assembly_renamed .map { meta, file -> file } @@ -58,9 +131,22 @@ workflow STEP_02_ASSEMBLY { ch_idxstats = Channel.empty() ch_flagstat = Channel.empty() - ch_reads = reads + + if (params.coassembly){ + ch_reads.map { meta, fastq -> + [ meta.group, meta, fastq ]} + .set { ch_reads_tmp } + + ch_assembly_renamed.map { meta, assembly -> + [ meta.group, assembly ]} + .combine(ch_reads_tmp, by: 0) + .map { group, assembly, meta, fastq -> + [ meta, assembly,fastq ]} + .set { ch_contigs_and_reads } + } else { + ch_contigs_and_reads = ch_assembly_renamed.join(ch_reads, remainder: true) + } - ch_contigs_and_reads = ch_assembly_renamed.join(ch_reads, remainder: true) if ( params.type.toUpperCase() == "SR" ) { READS_DEDUPLICATION ( ch_contigs_and_reads ) @@ -71,10 +157,10 @@ workflow STEP_02_ASSEMBLY { ch_bwa_mem_v = READS_DEDUPLICATION.out.v_bwa_mem2 } else { - MINIMAP2(ch_contigs_and_reads,"02_assembly/02_3_reads_vs_primary_assembly") - ch_bam = MINIMAP2.out.bam + MINIMAP2(ch_contigs_and_reads,"02_assembly/02_3_reads_vs_primary_assembly") + ch_bam = MINIMAP2.out.bam - ch_minimap2_v = MINIMAP2.out.v_minimap2 + ch_minimap2_v = MINIMAP2.out.v_minimap2 } diff --git a/subworkflows/03_filtering.nf b/subworkflows/03_filtering.nf index 6ae3938054a9832056abbb4d2e8455b1581722a4..bf92a0b2c1546baf924df278b6b8a95d33452a0a 100644 --- a/subworkflows/03_filtering.nf +++ b/subworkflows/03_filtering.nf @@ -23,10 +23,33 @@ workflow STEP_03_ASSEMBLY_FILTER { ch_chunk_assembly_for_filter = assemblies .splitFasta(by: 100000, file: true) - - ch_assembly_and_idxstats = ch_chunk_assembly_for_filter + if (params.coassembly){ + idxstats.map { meta, idxstats -> + [ meta.group, meta, idxstats] } + .groupTuple(by: [0]) + .map { group, metas, idxstats -> + def meta = [:] + meta.id = metas.group.unique().join() + meta.sample = metas.sample.join("_") + meta.flowcell = metas.flowcell.unique().join() + meta.group = metas.group.unique().join() + meta.assembly = metas.assembly.unique().join() + meta.type = metas.type.unique().join() + [ group, meta, idxstats] } + .set { ch_idxstats_tmp } + ch_chunk_assembly_for_filter.map { meta, assembly -> + [ meta.group, assembly ]} + .combine(ch_idxstats_tmp, by: 0) + .map { group, assembly, meta, idxstats -> + [ meta, assembly, idxstats ]} + .set { ch_assembly_and_idxstats } + } else { + ch_assembly_and_idxstats = ch_chunk_assembly_for_filter .combine(idxstats, by:0) + } + + CHUNK_ASSEMBLY_FILTER ( ch_assembly_and_idxstats, @@ -52,69 +75,85 @@ workflow STEP_03_ASSEMBLY_FILTER { ch_merged_selected = MERGE_ASSEMBLY_FILTER.out.merged_selected discarded_contigs = MERGE_ASSEMBLY_FILTER.out.merged_discarded + ch_merged_selected_all = ch_merged_selected + .map { meta, file -> file } + .collect() + QUAST( ch_merged_selected_all, "${filtering_outdir}/03_1_filtered_assembly/" ) + ch_quast_report = QUAST.out.report - // Differentiate sample with and without discarded_contigs - // samples with no discarded_contigs are not going to be processed to process - discarded_contigs_category = discarded_contigs.branch{ - without: it[1].isEmpty() - with: !it[1].isEmpty() - } - samples_without_discarded_contigs = discarded_contigs_category.without.map{ it -> it[0]} - samples_with_discarded_contigs = discarded_contigs_category.with.map{ it -> it[0]} + // Differentiate sample with and without discarded_contigs + // samples with no discarded_contigs are not going to be processed to process + discarded_contigs_category = discarded_contigs.branch{ + without: it[1].isEmpty() + with: !it[1].isEmpty() + } - - // make a symblink with the bam and bai from step 02 for samples that have not been affected by the filtering (no contig discarded) - result_path_dir = file("${params.outdir}/${filtered_assembly_bam_outdir}/") - result_path_dir.mkdirs() - samples_without_discarded_contigs.map { - sample -> { file("${result_path_dir}/${sample.id}/").mkdir() - file("${params.outdir}/${unfiltered_assembly_bam_outdir}/${sample.id}/${sample.id}.sort.bam") - .mklink("${result_path_dir}/${sample.id}/${sample.id}.sort.bam", overwrite:true) - file("${params.outdir}/${unfiltered_assembly_bam_outdir}/${sample.id}/${sample.id}.sort.bam.bai") - .mklink("${result_path_dir}/${sample.id}/${sample.id}.sort.bam.bai", overwrite:true) - } - } - - ch_bam_unchanged_by_filtering = samples_without_discarded_contigs.join(bam) - - ch_selected_contigs_and_reads = samples_with_discarded_contigs.join(ch_merged_selected).join(reads) - - + if (params.coassembly){ + discarded_contigs_category.without.map { meta, discarded_empty -> [ meta.group ]} + .combine( bam.map { meta, bam, bai -> + [ meta.group, meta, bam, bai ]}, by: 0) + .map{ group, meta, bam, bai -> + [ meta, bam, bai ]} + .set{ ch_bam_unchanged_by_filtering } + + ch_selected_contigs_and_reads= discarded_contigs_category.with.map {meta, discarded_contigs -> meta.group} + .join( ch_merged_selected.map { meta, contigs -> + [meta.group, meta, contigs]}) + .combine( reads.map{ meta, reads -> + [ meta.group, meta, reads ]}, by: 0) + .map{ group, meta_contigs, contigs, meta_reads, reads -> + [ meta_reads, contigs, reads ]} + + } else { + ch_bam_unchanged_by_filtering = discarded_contigs_category.without.map{ it -> it[0]} + .join(bam) + + ch_selected_contigs_and_reads = discarded_contigs_category.with.map{ it -> it[0]} + .join(ch_merged_selected).join(reads) + } + + // make a symblink with the bam and bai from step 02 for samples that have not been affected by the filtering (no contig discarded) + result_path_dir = file("${params.outdir}/${filtered_assembly_bam_outdir}/") + result_path_dir.mkdirs() + + ch_bam_unchanged_by_filtering.map { meta, bam, bai -> + { file("${result_path_dir}/${meta.id}/").mkdir() + file("${params.outdir}/${unfiltered_assembly_bam_outdir}/${meta.id}/${meta.id}.bam") + .mklink("${result_path_dir}/${meta.id}/${meta.id}.bam", overwrite:true) + file("${params.outdir}/${unfiltered_assembly_bam_outdir}/${meta.id}/${meta.id}.bam.bai") + .mklink("${result_path_dir}/${meta.id}/${meta.id}.bam.bai", overwrite:true) + } + } - if ( params.type.toUpperCase() == "SR" ) { - BWA_MEM(ch_selected_contigs_and_reads, filtered_assembly_bam_outdir) - ch_bam_post_filtering = BWA_MEM.out.bam - } - else { - MINIMAP2(ch_selected_contigs_and_reads, filtered_assembly_bam_outdir) - ch_bam_post_filtering = MINIMAP2.out.bam - } + if ( params.type.toUpperCase() == "SR" ) { + BWA_MEM(ch_selected_contigs_and_reads, filtered_assembly_bam_outdir) + ch_bam_post_filtering = BWA_MEM.out.bam + } + else { + MINIMAP2(ch_selected_contigs_and_reads, filtered_assembly_bam_outdir) + ch_bam_post_filtering = MINIMAP2.out.bam + } - ch_all_bam = samples_without_discarded_contigs.join(bam).mix(ch_bam_post_filtering) + ch_all_bam = ch_bam_unchanged_by_filtering.mix(ch_bam_post_filtering) - GET_ALIGNMENT_METRICS(ch_all_bam, filtered_assembly_bam_outdir) + GET_ALIGNMENT_METRICS(ch_all_bam, filtered_assembly_bam_outdir) - ch_flagstat = GET_ALIGNMENT_METRICS.out.sam_flagstat - ch_coverage = GET_ALIGNMENT_METRICS.out.sam_coverage + ch_flagstat = GET_ALIGNMENT_METRICS.out.sam_flagstat + ch_coverage = GET_ALIGNMENT_METRICS.out.sam_coverage - ch_merged_quast = ch_merged_selected - .map { meta, file -> file } - .collect() - QUAST( ch_merged_quast, "${filtering_outdir}/03_1_filtered_assembly/" ) - ch_quast_report = QUAST.out.report + - ch_final_bam = ch_bam_post_filtering.mix(ch_bam_unchanged_by_filtering) - emit: - selected_contigs = ch_merged_selected - quast_report = ch_quast_report - bam = ch_final_bam - sam_coverage = ch_coverage - sam_flagstat = ch_flagstat + emit: + selected_contigs = ch_merged_selected + quast_report = ch_quast_report + bam = ch_all_bam + sam_coverage = ch_coverage + sam_flagstat = ch_flagstat } diff --git a/subworkflows/04_structural_annot.nf b/subworkflows/04_structural_annot.nf index 49841efd8fefc08910d4463108fea3820a57293b..505e633e2d4551eba7d70235a4e5dc94616c274c 100644 --- a/subworkflows/04_structural_annot.nf +++ b/subworkflows/04_structural_annot.nf @@ -17,8 +17,6 @@ workflow STEP_04_STRUCTURAL_ANNOT { MERGE_ANNOTATIONS(annotations_ch) - ch_software_versions = PRODIGAL.out.v_prodigal.first() - ch_software_versions = PRODIGAL.out.v_prodigal.first().mix( BARRNAP.out.v_barrnap.first(), TRNASCAN_SE.out.v_tRNAscan.first()) diff --git a/subworkflows/05_protein_alignment.nf b/subworkflows/05_protein_alignment.nf index 56258c467d3f05fb596c58ac877384a906aad3d7..b178755fc127e885b34957e46d9b2d030c5b7a5a 100644 --- a/subworkflows/05_protein_alignment.nf +++ b/subworkflows/05_protein_alignment.nf @@ -13,7 +13,7 @@ workflow STEP_05_PROTEIN_ALIGNMENT { DIAMOND ( prokka_faa, - diamond, + diamond ) ch_diamond_result = DIAMOND.out.m8 diff --git a/subworkflows/06_functionnal_annot.nf b/subworkflows/06_functionnal_annot.nf index 5b860554d4bf89254818dd7ba51387d6230dc25c..ad4812966c54553690f215f0ee8021dd4875ad10 100644 --- a/subworkflows/06_functionnal_annot.nf +++ b/subworkflows/06_functionnal_annot.nf @@ -28,7 +28,22 @@ workflow STEP_06_FUNC_ANNOT { ch_global_clstr_table = CD_HIT.out.global_clstr_table ch_cdhit_v = CD_HIT.out.v_cdhit - QUANTIFICATION ( gff, bam, ch_individual_clstr_table, ch_global_clstr_table) + if (params.coassembly){ + bam.map { meta, bam, bai -> + [ meta.group, meta, bam, bai ]} + .set { ch_bam_tmp } + + gff.map { meta, gff -> + [ meta.group, gff]} + .combine( ch_bam_tmp, by: 0) + .map { group, gff, meta, bam, bai -> + [ meta, gff, bam, bai ]} + .set { ch_gff_and_bam } + } else { + ch_gff_and_bam = gff.join(bam, remainder: false) + } + + QUANTIFICATION ( ch_gff_and_bam, ch_individual_clstr_table, ch_global_clstr_table) ch_quant_table = QUANTIFICATION.out.quantification_table ch_quant_report = QUANTIFICATION.out.quant_report ch_featurecounts_v = QUANTIFICATION.out.v_featurecounts diff --git a/subworkflows/07_taxonomic_affi.nf b/subworkflows/07_taxonomic_affi.nf index 99ead6cf80315b3a862db8cb92a50db591d1986d..4f1eb9b4b62bc2e619a41b4f46f6e6e82d0aff96 100644 --- a/subworkflows/07_taxonomic_affi.nf +++ b/subworkflows/07_taxonomic_affi.nf @@ -8,7 +8,20 @@ workflow STEP_07_TAXO_AFFI { diamond_result // channel: [ val(meta), path(diamond_file) ] sam_coverage // channel: [ val(meta), path(samtools coverage) ] main: - ch_assign_taxo_input = diamond_result.join(sam_coverage, remainder: true) + if (params.coassembly){ + sam_coverage.map { meta, cov -> + [ meta.group, meta, cov ]} + .set { ch_sam_cov_tmp } + + diamond_result.map { meta, m8 -> + [ meta.group, m8]} + .combine( ch_sam_cov_tmp, by: 0) + .map { group, m8, meta, cov -> + [ meta, m8, cov ]} + .set { ch_assign_taxo_input } + } else { + ch_assign_taxo_input = diamond_result.join(sam_coverage, remainder: true) + } ASSIGN_TAXONOMY ( taxonomy, ch_assign_taxo_input ) diff --git a/subworkflows/08_binning.nf b/subworkflows/08_binning.nf index 59b9c7b7b7dbf16051dfb3d0c38c4d4370f24e92..32b9e329eb2aad2c07776c2f47b5977fdb8410e8 100644 --- a/subworkflows/08_binning.nf +++ b/subworkflows/08_binning.nf @@ -52,21 +52,21 @@ workflow STEP_08_BINNING { if (params.binning_cross_alignment == 'all') { // combine assemblies with reads of all samples ch_reads_assembly = reads.combine(assembly) - .map{ meta_reads, reads, meta_assembly, assembly -> - if (meta_reads != meta_assembly){ - [[id:meta_reads.id+"_"+meta_assembly.id, sample:meta_assembly.sample, flowcell:meta_assembly.flowcell, group:meta_assembly.group, assembly:meta_assembly.assembly, type:meta_assembly.type], assembly, reads] - } - } + .map{ meta_reads, reads, meta_assembly, assembly -> + if (((meta_reads.id != meta_assembly.id) && !(params.coassembly)) || (params.coassembly && (meta_reads.group != meta_assembly.group))){ + [[id:meta_reads.id+"_"+meta_assembly.id, sample:meta_assembly.sample, flowcell:meta_assembly.flowcell, group:meta_assembly.group, assembly:meta_assembly.assembly, type:meta_assembly.type], assembly, reads] + } + } } else if (params.binning_cross_alignment == 'group'){ // combine assemblies with reads of samples from same group ch_assembly_group = assembly.map{ meta, assembly -> [ meta.group, meta, assembly ] } ch_reads_assembly = reads.map{ meta, reads -> [ meta.group, meta, reads ] } - .combine(ch_assembly_group, by: 0) - .map{ group, meta_reads, reads, meta_assembly, assembly -> - if (meta_reads != meta_assembly){ - [[id:meta_reads.id+"_"+meta_assembly.id, sample:meta_assembly.sample, flowcell:meta_assembly.flowcell, group:meta_assembly.group, assembly:meta_assembly.assembly, type:meta_assembly.type], assembly, reads] - } - } + .combine(ch_assembly_group, by: 0) + .map{ group, meta_reads, reads, meta_assembly, assembly -> + if (meta_reads != meta_assembly){ + [[id:meta_reads.id+"_"+meta_assembly.id, sample:meta_assembly.sample, flowcell:meta_assembly.flowcell, group:meta_assembly.group, assembly:meta_assembly.assembly, type:meta_assembly.type], assembly, reads] + } + } } // cross alignment if (params.type == 'SR') { @@ -78,21 +78,51 @@ workflow STEP_08_BINNING { } // formatting channel ch_bam = ch_bam_cross_alignment.mix(bam) - .map { meta, bam, bai -> [ meta.sample, meta, bam, bai ] } - .groupTuple(by: [0]) - .map { sample,metas, bam, bai -> - [ metas.min { it.id.size() }, bam, bai ] - } + .map { meta, bam, bai -> + if (params.coassembly){[ meta.group, meta, bam, bai ]} + else {[ meta.sample, meta, bam, bai ]}} + .groupTuple(by: [0]) + .map { sample,metas, bam, bai -> + [ metas.min { it.id.size() }, bam, bai ] + } } else { ch_bam = bam } + if (params.coassembly){ + if (params.binning_cross_alignment == 'all') { + ch_bam.map { meta, bam, bai -> [ meta.group, bam, bai ] } + .set { ch_bam_tmp } + } else { + ch_bam.map { meta, bam, bai -> [ meta.group, meta, bam, bai ] } + .groupTuple(by: [0]) + .map { group, metas, bam, bai -> + [ group, bam, bai ]} + .set { ch_bam_tmp } + } + assembly.map { meta, assembly -> + [ meta.group, meta, assembly ]} + .combine( ch_bam_tmp, by: 0) + .map { group, meta, assembly, bam, bai -> + [ meta, assembly, bam, bai]} + .tap { ch_assembly_bam } + .map { meta, assembly, bam, bai -> + [ meta, assembly ]} + .tap { ch_assembly } + + ch_assembly_bam.map{ meta, assembly, bam, bai -> + [ meta, bam, bai ]} + .set{ ch_bam} + } else { + ch_assembly_bam = assembly.join(ch_bam) + ch_assembly = assembly + } /////////// /// BINNING /////////// ch_depth = GENERATE_DEPTH_FILES(ch_bam) - ch_assembly_depth = assembly.join(ch_depth) + ch_assembly_depth = ch_assembly.join(ch_depth) METABAT2(ch_assembly_depth) ch_metabat_bins = METABAT2.out.bins.filter{ t -> t[1].list().size()} @@ -102,8 +132,6 @@ workflow STEP_08_BINNING { ch_maxbin_bins = MAXBIN2.out.bins.filter{ t -> t[1].list().size()} ch_maxbin_v = MAXBIN2.out.v_maxbin - ch_assembly_bam = assembly.join(ch_bam) - CONCOCT(ch_assembly_bam) ch_concoct_bins = CONCOCT.out.bins.filter{ t -> t[1].list().size()} ch_concoct_v = CONCOCT.out.v_concoct @@ -120,7 +148,8 @@ workflow STEP_08_BINNING { (it[2] != null && it[3] != null) single: true } - + + //bins set with bins form at least 2 binners ch_bins_multiple = ch_bins_set.multiple.map{ meta, bins1, bins2, bins3 -> if ( bins1 == null ) { return [meta, bins2, bins3, bins1] @@ -131,6 +160,7 @@ workflow STEP_08_BINNING { } } + //bins set with bins form only 1 binner ch_bins_single = ch_bins_set.single.map{ meta, bins1, bins2, bins3 -> if ( bins1 != null ) { return [meta, bins1] @@ -160,7 +190,7 @@ workflow STEP_08_BINNING { .map { meta, file -> file} .collect() - ch_bins_assembly = METAWRAP_REFINMENT.out.bins.join(assembly) + ch_bins_assembly = METAWRAP_REFINMENT.out.bins.join(ch_assembly) UNBINNED_CONTIGS(ch_bins_assembly)