TransWikia.com

Using checkpoints with snakemake gives each instance of a rule all input files

Stack Overflow Asked by Josh Loecker on November 10, 2021

I’ve recently come across checkpoints in snakemake and realized they will work perfectly with what I am trying to do. I’ve been able to implement the workflow listed here. I also found this stackoverflow question, but can’t quite make sense of it or how I might make it work for what I am doing

The rules I am working with are as follows:

def ReturnBarcodeFolderNames():
    path = config['results_folder'] + "Barcode/"
    return_direc = []
    for root, directory, files in os.walk(path):
        for direc in directory:
            return_direc.append(direc)
    return return_direc


rule all:
    input:
        expand(config['results_folder'] + "Barcode/{folder}.merged.fastq", folder=ReturnBarcodeFolderNames())


checkpoint barcode:
    input:
        expand(config['results_folder'] + "Basecall/{fast5_files}", fast5_files=FAST5_FILES)
    output:
        temp(directory(config['results_folder'] + "Barcode/.tempOutput/"))
    shell:
        "guppy_barcoder "
        "--input_path {input} "
        "--save_path {output} "
        "--barcode_kits EXP-PBC096 "
        "--recursive"

def aggregate_barcode_folders(wildcards):
    checkpoint_output = checkpoints.barcode.get(**wildcards).output[0]
    folder_names = []
    for root, directories, files in os.walk(checkpoint_output):
        for direc in directories:
            folder_names.append(direc)

    return expand(config['results_folder'] + "Barcode/.tempOutput/{folder}", folder=folder_names)

rule merge:
    input:
        aggregate_barcode_folders
    output:
        config['results_folder'] + "Barcode/{folder}.merged.fastq"
    shell:
         "echo {input}"

The rule barcode and def aggregate_barcode_folders work as expected, but when rule merge is reached, every input folder is being passed to each instance of the rule. This results in something like the following:

rule merge:
    input: /Results/Barcode/.tempOutput/barcode81, 
/Results/Barcode/.tempOutput/barcode28, 
/Results/Barcode/.tempOutput/barcode17, 
/Results/Barcode/.tempOutput/barcode10, 
/Results/Barcode/.tempOutput/barcode26, 
/Results/Barcode/.tempOutput/barcode21, 
/Results/Barcode/.tempOutput/barcode42, 
/Results/Barcode/.tempOutput/barcode89, 
/Results/Barcode/.tempOutput/barcode45, 
/Results/Barcode/.tempOutput/barcode20, 
/Results/Barcode/.tempOutput/barcode18, 
/Results/Barcode/.tempOutput/barcode27, 
/Results/Barcode/.tempOutput/barcode11, 
.
.
.
.
.
    output: /Results/Barcode/barcode75.merged.fastq
    jobid: 82
    wildcards: folder=barcode75

The same exact input is needed for each job of rule merge, which amounts to about 80 instances. But, the wildcards portion in each job is different for each folder. How can I use this as input for each instance of my rule merge, instead of passing the entire list received from def aggregate_barcode_folders?

I feel there may be something amiss with the input from rule all, but I’m not 100% sure what the problem may be.

As a note, I know snakemake will throw an error stating that it is waiting for output files from rule merge, as I am not doing anything with the output other than printing it to the screen.

EDIT

I’ve decided to go against checkpoints for now, and instead opt for the following. To make things more clear, the goal for this pipeline is as follows: I am attempting to merge fastq files from an output folder into one file, with the input files having a variable number of files (1 to about 3 per folder, but I won’t know how many). The structure of the input is as follows

INPUT

|-- Results
    |-- FolderA
        |-- barcode01
            |-- file1.fastq
        |-- barcode02
            |-- file1.fastq
            |-- file2.fastq
        |-- barcode03
            |-- file1.fastq
    |-- FolderB
        |-- barcode01
            |-- file1.fastq
        |-- barcode02
            |-- file1.fastq
            |-- file2.fastq
        |-- barcode03
            |-- file1.fastq
    |-- FolderC
        |-- barcode01
            |-- file1.fastq
            |-- file2.fastq
        |-- barcode02
            |-- file1.fastq
        |-- barcode03
            |-- file1.fastq
            |-- file2.fastq

OUTPUT
I would like to turn that output resembling something such as:

|-- Results
    |-- barcode01.merged.fastq
    |-- barcode02.merged.fastq
    |-- barcode03.merged.fastq

The output files would contain data from all file#.fastq from its respective barcode folder, from folder A, B, and C.

I’ve been able to get (I think) further than I was before, but snakemake is throwing an error that says Missing input files for rule basecall: /Users/joshl/PycharmProjects/ARS/Results/DataFiles/fast5/FAL03879_67a0761e_1055/ barcode72.fast5. My code relevant code is here:

CODE


configfile: "config.yaml"
FAST5_FILES = glob_wildcards(config['results_folder'] + "DataFiles/fast5/{fast5_files}.fast5").fast5_files

def return_fast5_folder_names():
    path = config['results_folder'] + "Basecall/"
    fast5_folder_names = []
    for item in os.scandir(path):
        if Path(item).is_dir():
            fast5_folder_names.append(item.name)

    return fast5_folder_names

def return_barcode_folder_names():
    path = config['results_folder'] + ".barcodeTempOutput"
    fast5_folder_names = []
    collated_barcode_folder_names = []

    for item in os.scandir(path):
        if Path(item).is_dir():
            full_item_path = os.path.join(path, item.name)
            fast5_folder_names.append(full_item_path)

    index = 0
    for item in fast5_folder_names:
        collated_barcode_folder_names.append([])
        for folder in os.scandir(item):
            if Path(folder).is_dir():
                collated_barcode_folder_names[index].append(folder.name)
        index += 1

    return collated_barcode_folder_names


rule all:
    input:
        # basecall
        expand(config['results_folder'] + "Basecall/{fast5_file}", fast5_file=FAST5_FILES),

         # barcode
        expand(config['results_folder'] + ".barcodeTempOutput/{fast5_folders}", fast5_folders=return_fast5_folder_names()),

        # merge files
        expand(config['results_folder'] + "Barcode/{barcode_numbers}.merged.fastq", barcode_numbers=return_barcode_folder_names())

rule basecall:
    input:
         config['results_folder'] + "DataFiles/fast5/{fast5_file}.fast5"
    output:
        directory(config['results_folder'] + "Basecall/{fast5_file}")
    shell:
         r"""
         guppy_basecaller 
         --input_path {input} 
         --save_path {output} 
         --quiet 
         --config dna_r9.4.1_450bps_fast.cfg 
         --num_callers 2 
         --cpu_threads_per_caller 6
         """

rule barcode:
    input:
        config['results_folder'] + "Basecall/{fast5_folders}"
    output:
        directory(config['results_folder'] + ".barcodeTempOutput/{fast5_folders}")
    threads: 12
    shell:
         r"""
         for item in {input}; do
                guppy_barcoder 
                --input_path $item 
                --save_path {output} 
                --barcode_kits EXP-PBC096 
                --recursive
         done         
         """

rule merge_files:
    input:
        expand(config['results_folder'] + ".barcodeTempOutput/" + "{fast5_folder}/{barcode_numbers}",
               fast5_folder=glob_wildcards(config['results_folder'] + ".barcodeTempOutput/{fast5_folders}/{barcode_numbers}/{fastq_files}.fastq").fast5_folders,
               barcode_numbers=glob_wildcards(config['results_folder'] +".barcodeTempOutput/{fast5_folders}/{barcode_numbers}/{fastq_files}.fastq").barcode_numbers)
    output:
        config['results_folder'] + "Barcode/{barcode_numbers}.merged.fastq"
    shell:
        r"""
        echo "Hello world"
        echo {input}
        """

Under rule all, if I comment out the line that corresponds to merge files, there is no error

One Answer

I am not fully understanding what you mean, but I think the problem lies indeed in the input for rule all. I currently also do not have access to a computer (I'm on my phone right now), so I can not make a real example.. Probably what you want to do is change ReturnBarcodeFolderNames to use a checkpoint. I guess only after rule barcode you actually know what you want as final output.

def ReturnBarcodeFolderNames(wildcards):
    # the wildcard here makes sure that barcode is executed first
    checkpoint_output = checkpoints.barcode.get().output[0]
    
    folder_names = []
    for root, directories, files in os.walk(checkpoint_output):
        for direc in directories:
            folder_names.append(direc)

    return expand(config['results_folder'] + "Barcode/{folder}.merged.fastq", folder=folder_names)


rule all:
    input:
        ReturnBarcodeFolderNames


rule merge:
    input:
        config['results_folder'] + "Barcode/.tempOutput/{folder}"
    output:
        config['results_folder'] + "Barcode/{folder}.merged.fastq"
    shell:
         "echo {input}"

Obviously ReturnBarcodeFolderNames does not work in its current form. However, the idea is that you check what you want as final output in rule all after rule barcode has been executed. Rule merge then does not have to use the checkpoint, as its input and output can be clearly defined.

I hope this helps :), but maybe I have been addressing something else than your problem. It wasn't completely clear to me from the question unfortunately.


edit

Here is a stripped down version of the code, but it should be easy to implement the last parts now. It works for the folder structure you gave in the example:

import os
import glob


def get_merged_barcodes(wildcards):
    tmpdir = checkpoints.barcode.get(**wildcards).output[0]  # this forces the checkpoint to be executed before we continue
    barcodes = set()  # a set is like a list, but only stores unique values
    for folder in os.listdir(tmpdir):
        for barcode in os.listdir(tmpdir + "/" + folder):
            barcodes.add(barcode)

    mergedfiles = ["results/" + barcode + ".merged.fastq" for barcode in barcodes]
    return mergedfiles
    

rule all:
    input:
        get_merged_barcodes


checkpoint barcode:
    input:
        rules.basecall.output
    output:
        directory("results")
    shell:
        """
        stuff
        """


def get_merged_input(wildcards):
    return glob.glob(f"results/**/{wildcards.barcode}/*.fastq")



rule merge_files:
    input:
        get_merged_input
    output:
        "results/{barcode}.merged.fastq"
    shell:
        """
        echo {input}
        """

Basically what you did in the original question was almost working!

Answered by Maarten-vd-Sande on November 10, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP