struct
open Biokepi_run_environment
open Common
module Remove = Workflow_utilities.Remove
let create_dict ~(run_with:Machine.t) fasta =
let open KEDSL in
let picard_create_dict =
Machine.get_tool run_with Machine.Tool.Default.picard in
let src = fasta#product#path in
let dest = sprintf "%s.%s" (Filename.chop_suffix src ".fasta") "dict" in
let program =
Program.(Machine.Tool.(init picard_create_dict) &&
shf "java -jar $PICARD_JAR CreateSequenceDictionary R= %s O= %s"
(Filename.quote src) (Filename.quote dest)) in
let name = sprintf "picard-create-dict-%s" Filename.(basename src) in
let make =
Machine.run_stream_processor run_with program ~name
~self_ids:["picard"; "create-dict"] in
let host = Machine.(as_host run_with) in
workflow_node (single_file dest ~host) ~name ~make
~edges:[
depends_on fasta; depends_on Machine.Tool.(ensure picard_create_dict);
on_failure_activate (Remove.file ~run_with dest);
]
let sort_vcf ~(run_with:Machine.t) ?(sequence_dict) input_vcf =
let open KEDSL in
let picard = Machine.get_tool run_with Machine.Tool.Default.picard in
let sequence_name =
match sequence_dict with
| None -> "default"
| Some d -> Filename.basename d#product#path
in
let src = input_vcf#product#path in
let input_vcf_base = Filename.basename src in
let dest =
sprintf "%s.sorted-by-%s.vcf"
(Filename.chop_suffix src ".vcf") sequence_name in
let name = sprintf "picard-sort-vcf-%s-by-%s" input_vcf_base sequence_name in
let sequence_dict_opt =
match sequence_dict with
| None -> ""
| Some d -> sprintf "SEQUENCE_DICTIONARY= %s" (Filename.quote d#product#path)
in
let sequence_dict_edge =
match sequence_dict with
| None -> []
| Some d -> [depends_on d]
in
let program =
Program.(Machine.Tool.(init picard) &&
(shf "java -jar $PICARD_JAR SortVcf %s I= %s O= %s"
sequence_dict_opt
(Filename.quote src) (Filename.quote dest)))
in
let host = Machine.(as_host run_with) in
let make =
Machine.run_stream_processor
run_with program ~name ~self_ids:["picard"; "sort-vcf"] in
workflow_node (single_file dest ~host) ~name ~make
~edges:([
depends_on input_vcf; depends_on Machine.Tool.(ensure picard);
on_failure_activate (Remove.file ~run_with dest)
] @ sequence_dict_edge)
module Mark_duplicates_settings = struct
type t = {
name: string;
tmpdir: string;
max_sequences_for_disk_read_ends_map: int;
max_file_handles_for_read_ends_map: int;
sorting_collection_size_ratio: float;
mem_param: string option;
}
let factory = {
name = "factory";
tmpdir = "/tmp";
max_sequences_for_disk_read_ends_map = 50000;
max_file_handles_for_read_ends_map = 8000;
sorting_collection_size_ratio = 0.25;
mem_param = None;
}
let default = {
factory with
name = "default";
max_file_handles_for_read_ends_map = 20_000;
}
let to_java_shell_call t =
sprintf "TMPDIR=%s MAX_SEQUENCES_FOR_DISK_READ_ENDS_MAP=%d MAX_FILE_HANDLES_FOR_READ_ENDS_MAP=%d SORTING_COLLECTION_SIZE_RATIO=%f java %s -Djava.io.tmpdir=%s "
t.tmpdir
t.max_sequences_for_disk_read_ends_map
t.max_file_handles_for_read_ends_map
t.sorting_collection_size_ratio
(match t.mem_param with
| None -> ""
| Some some -> sprintf "-Xmx%s" some)
t.tmpdir
end
let mark_duplicates
?(settings = Mark_duplicates_settings.default)
~(run_with: Machine.t) ~input_bam output_bam_path =
let open KEDSL in
let picard_jar = Machine.get_tool run_with Machine.Tool.Default.picard in
let metrics_path =
sprintf "%s.%s" (Filename.chop_suffix output_bam_path ".bam") ".metrics" in
let sorted_bam =
Samtools.sort_bam_if_necessary ~run_with input_bam ~by:`Coordinate in
let program =
let java_call = Mark_duplicates_settings.to_java_shell_call settings in
Program.(Machine.Tool.(init picard_jar) &&
shf "%s -jar $PICARD_JAR MarkDuplicates VALIDATION_STRINGENCY=LENIENT INPUT=%s OUTPUT=%s METRICS_FILE=%s"
java_call
(Filename.quote sorted_bam#product#path)
(Filename.quote output_bam_path)
metrics_path) in
let name =
sprintf "picard-markdups-%s" Filename.(basename input_bam#product#path) in
let make =
Machine.run_big_program ~name run_with program
~self_ids:["picard"; "mark-duplicates"] in
let product = transform_bam input_bam#product output_bam_path in
workflow_node product
~name ~make
~edges:[
depends_on sorted_bam;
depends_on Machine.Tool.(ensure picard_jar);
on_failure_activate (Remove.file ~run_with output_bam_path);
on_failure_activate (Remove.file ~run_with metrics_path);
]
let bam_to_fastq
?sample_name ~run_with ~sample_type ~output_prefix input_bam =
let open KEDSL in
let sorted_bam =
Samtools.sort_bam_if_necessary
~run_with ~by:`Read_name input_bam in
let fastq_output_options, r1, r2opt =
match sample_type with
| `Paired_end ->
let r1 = sprintf "%s_R1.fastq" output_prefix in
let r2 = sprintf "%s_R2.fastq" output_prefix in
([sprintf "FASTQ=%s" Filename.(quote r1);
sprintf "SECOND_END_FASTQ=%s" Filename.(quote r2)],
r1, Some r2)
| `Single_end ->
let r1 = sprintf "%s.fastq" output_prefix in
([sprintf "FASTQ=%s" r1], r1, None)
in
let picard_jar = Machine.get_tool run_with Machine.Tool.Default.picard in
let program =
Program.(
Machine.Tool.(init picard_jar) &&
shf "java -jar $PICARD_JAR SamToFastq INPUT=%s %s"
(Filename.quote sorted_bam#product#path)
(String.concat ~sep:" " fastq_output_options)
) in
let name =
sprintf "picard-bam2fastq-%s" Filename.(basename input_bam#product#path) in
let make =
Machine.run_big_program ~name run_with program
~self_ids:["picard"; "bam-to-fastq"] in
let edges =
(fun list ->
match r2opt with
| Some r2 -> on_failure_activate (Remove.file ~run_with r2) :: list
| None -> list)
[
depends_on sorted_bam;
depends_on Machine.Tool.(ensure picard_jar);
on_failure_activate (Remove.file ~run_with r1);
]
in
workflow_node
(fastq_reads ?name:sample_name ~host:(Machine.as_host run_with) r1 r2opt)
~name ~make ~edges
end