module Common
= struct
include Nonstd
module String = struct
include Sosa.Native_string
end
let (//) = Filename.concat
let debug_mode =
ref (try Sys.getenv "BIOKEPI_DEBUG" = "true" with _ -> false)
let dbg fmt = ksprintf (fun s ->
if !debug_mode
then eprintf "biokepi-debug: %s\n%!" s
else ()
) fmt
let failwithf fmt = ksprintf failwith fmt
module Unique_id = struct
include Ketrew_pure.Internal_pervasives.Unique_id
end
module KEDSL = struct
include Ketrew.EDSL
module Command = Ketrew_pure.Target.Command
type nothing = < is_done : Condition.t option >
let nothing = object method is_done = None end
let target _ = `Please_KEDSL_workflow
let file_target _ = `Please_KEDSL_workflow
type file_workflow = single_file workflow_node
type phony_workflow = nothing workflow_node
type fastq_reads = <
is_done: Ketrew_pure.Target.Condition.t option;
paths : string * (string option);
r1 : single_file workflow_node;
r2 : single_file workflow_node option;
sample_name: string;
escaped_sample_name: string;
fragment_id: string option;
fragment_id_forced: string;
>
let fastq_reads ?host ?name ?fragment_id r1 r2_opt : fastq_reads =
object (self)
val r1_file = single_file ?host r1
val r2_file_opt = Option.map r2_opt ~f:(single_file ?host)
method r1 =
workflow_node r1_file
~name:(sprintf "File: %s" (Filename.basename r1_file#path))
method r2 =
Option.map r2_file_opt ~f:(fun r2_file ->
workflow_node r2_file
~name:(sprintf "File: %s" (Filename.basename r2_file#path))
)
method paths = (r1, r2_opt)
method is_done =
Some (match r2_file_opt with
| Some r2 -> `And [r1_file#exists; r2#exists]
| None -> `And [r1_file#exists; r1_file#exists;])
method sample_name =
Option.value name ~default:(Filename.basename r1)
method fragment_id = fragment_id
method fragment_id_forced =
Option.value fragment_id ~default:(Filename.basename r1)
method escaped_sample_name =
String.map self#sample_name ~f:(function
| '0' .. '9' | 'a' .. 'z' | 'A' .. 'Z' | '-' | '_' as c -> c
| other -> '_')
end
let fastq_node_of_single_file_nodes
~host ~name ?fragment_id fastq_r1 fastq_r2 =
let product =
let r2 = Option.map fastq_r2 ~f:(fun r -> r#product#path) in
fastq_reads ~host ~name ?fragment_id fastq_r1#product#path r2
in
let edges =
match fastq_r2 with
| Some r2 -> [depends_on fastq_r1; depends_on r2]
| None -> [depends_on fastq_r1]
in
workflow_node product
~equivalence:`None
~name:(sprintf "Assembled-fastq: %s (%s)"
name (Option.value fragment_id
~default:(Filename.basename fastq_r1#product#path)))
~edges
type bam_file = <
is_done: Ketrew_pure.Target.Condition.t option;
host: Host.t;
path : string;
sorting: [ `Coordinate | `Read_name ] option;
reference_build: string;
>
let bam_file ~host ?sorting ~reference_build path : bam_file =
object
val file = single_file ~host path
method host = host
method path = file#path
method is_done = file#is_done
method sorting = sorting
method reference_build = reference_build
end
let transform_bam ?change_sorting (bam : bam_file) ~path : bam_file =
bam_file
~host:bam#host
?sorting:(
match change_sorting with
| Some new_sorting -> Some new_sorting
| None -> bam#sorting
)
~reference_build:bam#reference_build
path
type bam_list = <
is_done: Ketrew_pure.Target.Condition.t option;
bams: bam_file list;
>
let bam_list (bams : bam_file list) : bam_list =
object
method bams = bams
method is_done =
Some (
`And (List.map bams
~f:(fun b ->
b#is_done
|> Option.value_exn ~msg:"Bams should have a Condition.t"))
)
end
let explode_bam_list_node (bln : bam_list workflow_node) =
List.map bln#product#bams ~f:(fun bam ->
workflow_node bam
~name:(Filename.basename bam#path)
~tags:["expolode_bam_list_node"]
~edges:[depends_on bln]
~equivalence:`None)
type _ bam_or_bams =
| Single_bam: bam_file workflow_node -> bam_file workflow_node bam_or_bams
| Bam_workflow_list: bam_file workflow_node list -> bam_list workflow_node bam_or_bams
let submit w = Ketrew.Client.submit_workflow w
end
module Ketrew = struct end
module Target_tags = struct
let aligner = "aligner"
let variant_caller = "variant-caller"
let clean_up = "clean-up"
end
end
module Machine
= struct
open Common
open KEDSL
module Tool = struct
module Definition = struct
type t = {name: string; version: string option}
let create ?version name = {name; version}
let to_opam_name {name; version} =
sprintf "%s.%s" name (Option.value ~default:"NOVERSION" version)
let to_string = to_opam_name
let to_directory_name = to_opam_name
end
module Default = struct
open Definition
let bwa = create "bwa" ~version:"0.7.10"
let samtools = create "samtools" ~version:"1.3"
let vcftools = create "vcftools" ~version:"0.1.12b"
let bedtools = create "bedtools" ~version:"2.23.0"
let somaticsniper = create "somaticsniper" ~version:"1.0.3"
let varscan = create "varscan" ~version:"2.3.5"
let picard = create "picard" ~version:"1.127"
let mutect = create "mutect"
let gatk = create "gatk"
let strelka = create "strelka" ~version:"1.0.14"
let virmid = create "virmid" ~version:"1.1.1"
let muse = create "muse" ~version:"1.0b"
let star = create "star" ~version:"2.4.1d"
let stringtie = create "stringtie" ~version:"1.2.2"
let cufflinks = create "cufflinks" ~version:"2.2.1"
let hisat = create "hisat" ~version:"0.1.6-beta"
let hisat2 = create "hisat" ~version:"2.0.2-beta"
let mosaik = create "mosaik" ~version:"2.2.3"
let kallisto = create "kallisto" ~version:"0.42.3"
let bowtie = create "bowtie" ~version:"1.1.2"
let optitype = create "optitype" ~version:"1.0.0"
let seq2hla = create "seq2hla" ~version:"2.2"
end
type t = {
definition: Definition.t;
init: Program.t;
ensure: phony_workflow;
}
let create ?init ?ensure definition = {
definition;
init =
Option.value init
~default:(Program.shf "echo 'Tool %s: default init'"
(Definition.to_string definition));
ensure =
Option.value_map
ensure
~f:KEDSL.forget_product
~default:(workflow_node nothing
~name:(sprintf "%s-ensured"
(Definition.to_string definition)));
}
let init t = t.init
let ensure t = t.ensure
module Kit = struct
type tool = t
type t = Definition.t -> tool option
let concat : t list -> t =
fun l ->
fun def ->
List.find_map l ~f:(fun kit -> kit def)
let of_list l : t =
fun def ->
List.find l ~f:(fun {definition; _} -> definition = def)
let get_exn t tool =
match t tool with
| Some s -> s
| None ->
failwithf "Toolkit cannot provide the tool %s"
(Definition.to_string tool)
end
end
module Make_fun = struct
module Requirement = struct
type t = [
| `Processors of int
| `Internet_access
| `Memory of [
| `GB of float
| `Small
| `Big
]
| `Quick_run
| `Spark of string list
| `Custom of string
| `Self_identification of string list
] [@@deriving yojson, show]
end
type t =
?name: string ->
?requirements: Requirement.t list ->
Program.t ->
KEDSL.Build_process.t
let stream_processor requirements =
`Processors 1 :: `Memory `Small :: requirements
let quick requirements = `Quick_run :: requirements
let downloading requirements =
`Internet_access :: stream_processor requirements
let with_self_ids ?self_ids l =
match self_ids with
| Some tags -> `Self_identification tags :: l
| None -> l
let with_requirements : t -> Requirement.t list -> t = fun f l ->
fun ?name ?(requirements = []) prog ->
f ?name ~requirements:(l @ requirements) prog
end
type t = {
name: string;
host: Host.t;
get_reference_genome: string -> Reference_genome.t;
toolkit: Tool.Kit.t;
run_program: Make_fun.t;
work_dir: string;
max_processors: int;
}
let create
~host ~get_reference_genome ~toolkit
~run_program ~work_dir ~max_processors name =
{name; toolkit; get_reference_genome; host;
run_program; work_dir; max_processors}
let name t = t.name
let as_host t = t.host
let get_reference_genome t = t.get_reference_genome
let get_tool t tool =
match t.toolkit tool with
| Some s -> s
| None ->
failwithf "Machine %S cannot provide the tool %s"
t.name (Tool.Definition.to_string tool)
let run_program t = t.run_program
let max_processors t = t.max_processors
let quick_run_program t : Make_fun.t =
Make_fun.with_requirements t.run_program (Make_fun.quick [])
let run_stream_processor ?self_ids t : Make_fun.t =
Make_fun.with_requirements t.run_program
(Make_fun.stream_processor [] |> Make_fun.with_self_ids ?self_ids)
let run_download_program t : Make_fun.t =
Make_fun.with_requirements t.run_program (Make_fun.downloading [])
let run_big_program t :
?processors: int -> ?self_ids : string list -> Make_fun.t =
fun ?(processors = 1) ?self_ids ->
Make_fun.with_requirements
t.run_program
(Make_fun.with_self_ids ?self_ids [`Memory `Big; `Processors processors])
let work_dir t = t.work_dir
end
module Metadata
= struct
let version = lazy "0.0.0+master"
let git_commit = Some "99877df6dfa2fe64cd2071aa691bda7bbf4f0de7"
end
module Reference_genome
: sig
open Common
type name = string
module Specification : sig
module Location : sig
type t = [
| `Url of string
| `Vcf_concat of (string * t) list
| `Concat of t list
| `Gunzip of t
| `Untar of t
]
val url : 'a -> [> `Url of 'a ]
val vcf_concat : 'a -> [> `Vcf_concat of 'a ]
val concat : 'a -> [> `Concat of 'a ]
val gunzip : 'a -> [> `Gunzip of 'a ]
val untar : 'a -> [> `Untar of 'a ]
end
type t = private {
name : name;
metadata : string option;
fasta : Location.t;
dbsnp : Location.t option;
cosmic : Location.t option;
exome_gtf : Location.t option;
cdna : Location.t option;
major_contigs : string list option;
}
val create :
?metadata:string ->
fasta:Location.t ->
?dbsnp:Location.t ->
?cosmic:Location.t ->
?exome_gtf:Location.t ->
?cdna:Location.t ->
?major_contigs:string list ->
string ->
t
module Default :
sig
module Name : sig
val b37 : name
val b37decoy : name
val b38 : name
val hg18 : name
val hg19 : name
val mm10 : name
end
val b37 : t
val b37decoy : t
val b38 : t
val hg18 : t
val hg19 : t
val mm10 : t
end
end
type t = private {
specification: Specification.t;
location : KEDSL.file_workflow;
cosmic : KEDSL.file_workflow option;
dbsnp : KEDSL.file_workflow option;
gtf : KEDSL.file_workflow option;
cdna : KEDSL.file_workflow option;
}
val create :
?cosmic:KEDSL.file_workflow ->
?dbsnp:KEDSL.file_workflow ->
?gtf:KEDSL.file_workflow ->
?cdna:KEDSL.file_workflow ->
Specification.t -> KEDSL.file_workflow -> t
val name : t -> name
val path : t -> string
val cosmic_path_exn : t -> string
val dbsnp_path_exn : t -> string
val gtf_path_exn : t -> string
val cdna_path_exn : t -> string
val major_contigs : t -> Region.t list
val fasta: t -> KEDSL.file_workflow
val cosmic_exn: t -> KEDSL.file_workflow
val dbsnp_exn: t -> KEDSL.file_workflow
val gtf_exn: t -> KEDSL.file_workflow
val gtf: t -> KEDSL.file_workflow option
val cdna_exn: t -> KEDSL.file_workflow
end
= struct
open Common
type name = string
module Specification = struct
module Location = struct
type t = [
| `Url of string
| `Vcf_concat of (string * t) list
| `Concat of t list
| `Gunzip of t
| `Untar of t
]
let url u = `Url u
let vcf_concat l = `Vcf_concat l
let concat l = `Concat l
let gunzip l = `Gunzip l
let untar l = `Untar l
end
type t = {
name: string;
metadata: string option;
fasta: Location.t;
dbsnp: Location.t option;
cosmic: Location.t option;
exome_gtf: Location.t option;
cdna: Location.t option;
major_contigs: string list option;
}
let create
?metadata
~fasta
?dbsnp
?cosmic
?exome_gtf
?cdna
?major_contigs
name = {
name;
metadata;
fasta;
dbsnp;
cosmic;
exome_gtf;
cdna;
major_contigs;
}
module Default = struct
let major_contigs_b37 =
List.init 22 (fun i -> sprintf "%d" (i + 1))
@ ["X"; "Y"; "MT";]
let major_contigs_hg_family =
List.init 22 (fun i -> sprintf "chr%d" (i + 1))
@ [
"chrX";
"chrY";
"chrM";
]
let major_contigs_mm10 =
List.init 19 (fun i -> sprintf "%d" (i + 1))
@ [ "X"; "Y" ]
module Name = struct
let b37 = "b37"
let b37decoy = "b37decoy"
let b38 = "b38"
let hg18 = "hg18"
let hg19 = "hg19"
let mm10 = "mm10"
end
let b37_dbsnp_url =
"ftp://gsapubftp-anonymous@ftp.broadinstitute.org/bundle/2.8/b37/dbsnp_138.b37.vcf.gz"
let b37_cosmic_url =
"http://www.broadinstitute.org/cancer/cga/sites/default/files/data/tools/mutect/b37_cosmic_v54_120711.vcf"
let b37_exome_gtf_url =
"http://ftp.ensembl.org/pub/release-75/gtf/homo_sapiens/Homo_sapiens.GRCh37.75.gtf.gz"
let b37_cdna_url =
"http://ftp.ensembl.org/pub/release-75/fasta/homo_sapiens/cdna/Homo_sapiens.GRCh37.75.cdna.all.fa.gz"
let b37 =
create Name.b37
~metadata:"Provided by the Biokepi library"
~major_contigs:major_contigs_b37
~fasta:Location.(
url "ftp://gsapubftp-anonymous@ftp.broadinstitute.org/bundle/2.8/b37/human_g1k_v37.fasta.gz"
|> gunzip)
~dbsnp:Location.(url b37_dbsnp_url |> gunzip)
~cosmic:Location.(url b37_cosmic_url)
~exome_gtf:Location.(url b37_exome_gtf_url |> gunzip)
~cdna:Location.(url b37_cdna_url |> gunzip)
let b37decoy =
create Name.b37decoy
~metadata:"Provided by the Biokepi library"
~major_contigs:major_contigs_b37
~fasta:Location.(
url
"ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/technical/reference/phase2_reference_assembly_sequence/hs37d5.fa.gz"
|> gunzip)
~dbsnp:Location.(url b37_dbsnp_url |> gunzip)
~exome_gtf:Location.(url b37_exome_gtf_url |> gunzip)
~cosmic:Location.(url b37_cosmic_url)
~cdna:Location.(url b37_cdna_url |> gunzip)
let b38 =
let b38_url =
"ftp://ftp.ensembl.org/pub/release-79/fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz" in
let dbsnp_b38 =
"http://ftp.ncbi.nlm.nih.gov/snp/organisms/human_9606_b142_GRCh38/VCF/00-All.vcf.gz" in
let gtf_b38_url =
"http://ftp.ensembl.org/pub/release-79/gtf/homo_sapiens/Homo_sapiens.GRCh38.79.gtf.gz" in
let cdna_b38_url =
"http://ftp.ensembl.org/pub/release-79/fasta/homo_sapiens/cdna/Homo_sapiens.GRCh38.cdna.all.fa.gz" in
create Name.b38
~metadata:"Provided by the Biokepi library"
~major_contigs:major_contigs_b37
~fasta:Location.(url b38_url|> gunzip)
~dbsnp:Location.(url dbsnp_b38 |> gunzip)
~exome_gtf:Location.(url gtf_b38_url |> gunzip)
~cdna:Location.(url cdna_b38_url |> gunzip)
let hg18 =
let hg18_url =
"ftp://gsapubftp-anonymous@ftp.broadinstitute.org/bundle/2.8/hg18/Homo_sapiens_assembly18.fasta.gz" in
let dbsnp_hg18_url =
"ftp://gsapubftp-anonymous@ftp.broadinstitute.org/bundle/2.8/hg18/dbsnp_138.hg18.vcf.gz" in
create Name.hg18
~metadata:"Provided by the Biokepi library"
~major_contigs:major_contigs_hg_family
~fasta:Location.(url hg18_url|> gunzip)
~dbsnp:Location.(url dbsnp_hg18_url |> gunzip)
let hg19 =
let hg19_url =
"ftp://gsapubftp-anonymous@ftp.broadinstitute.org/bundle/2.8/hg19/ucsc.hg19.fasta.gz" in
let dbsnp_hg19_url =
"ftp://gsapubftp-anonymous@ftp.broadinstitute.org/bundle/2.8/hg19/dbsnp_138.hg19.vcf.gz" in
create Name.hg19
~metadata:"Provided by the Biokepi library"
~major_contigs:major_contigs_hg_family
~fasta:Location.(url hg19_url|> gunzip)
~dbsnp:Location.(url dbsnp_hg19_url |> gunzip)
let mm10 =
let mm10_url =
"ftp://ftp.ensembl.org/pub/release-84/fasta/mus_musculus/dna/Mus_musculus.GRCm38.dna_sm.primary_assembly.fa.gz" in
let dbsnp_mm10_snps_url =
"ftp://ftp-mouse.sanger.ac.uk/REL-1303-SNPs_Indels-GRCm38/mgp.v3.snps.rsIDdbSNPv137.vcf.gz" in
let dbsnp_mm10_indels_url =
"ftp://ftp-mouse.sanger.ac.uk/REL-1303-SNPs_Indels-GRCm38/mgp.v3.indels.rsIDdbSNPv137.vcf.gz" in
let gene_annotations_gtf =
"ftp://ftp.ensembl.org/pub/release-79/gtf/mus_musculus/Mus_musculus.GRCm38.79.gtf.gz" in
create Name.mm10
~metadata:"Provided by the Biokepi Library"
~major_contigs:major_contigs_mm10
~fasta:Location.(url mm10_url |> gunzip)
~dbsnp:Location.(
vcf_concat ["db_snps.vcf", url dbsnp_mm10_snps_url |> gunzip;
"db_indels.vcf", url dbsnp_mm10_indels_url |> gunzip]
)
~exome_gtf:Location.(url gene_annotations_gtf |> gunzip)
end
end
type t = {
specification: Specification.t;
location: KEDSL.file_workflow;
cosmic: KEDSL.file_workflow option;
dbsnp: KEDSL.file_workflow option;
gtf: KEDSL.file_workflow option;
cdna: KEDSL.file_workflow option;
}
let create ?cosmic ?dbsnp ?gtf ?cdna specification location =
{specification; location; cosmic; dbsnp; gtf; cdna}
let name t = t.specification.Specification.name
let path t = t.location#product#path
let cosmic_path_exn t =
let msg = sprintf "cosmic_path_exn of %s" (name t) in
let cosmic = Option.value_exn ~msg t.cosmic in
cosmic#product#path
let dbsnp_path_exn t =
let msg = sprintf "dbsnp_path_exn of %s" (name t) in
let trgt = Option.value_exn ~msg t.dbsnp in
trgt#product#path
let gtf_path_exn t =
let msg = sprintf "gtf_path_exn of %s" (name t) in
let trgt = Option.value_exn ~msg t.gtf in
trgt#product#path
let cdna_path_exn t =
let msg = sprintf "cdna_path_exn of %s" (name t) in
let target = Option.value_exn ~msg t.cdna in
target#product#path
let fasta: t -> KEDSL.file_workflow = fun t -> t.location
let cosmic_exn t =
Option.value_exn ~msg:(sprintf "%s: no COSMIC" (name t)) t.cosmic
let dbsnp_exn t =
Option.value_exn ~msg:(sprintf "%s: no DBSNP" (name t)) t.dbsnp
let gtf_exn t =
Option.value_exn ~msg:(sprintf "%s: no GTF" (name t)) t.gtf
let gtf t = t.gtf
let cdna_exn t =
Option.value_exn ~msg:(sprintf "%s: no cDNA fasta file" (name t)) t.cdna
let major_contigs t : Region.t list =
match t.specification.Specification.major_contigs with
| None ->
failwithf "Reference %S does have major-contigs/chromosomes defined" (name t)
| Some l -> List.map l ~f:(fun s -> `Chromosome s)
end
module Region
= struct
open Common
type t = [
| `Chromosome of string
| `Chromosome_interval of string * int * int
| `Full
]
let to_filename = function
| `Full -> "Full"
| `Chromosome s -> sprintf "%s" s
| `Chromosome_interval (s, b, e) -> sprintf "%s_%d-%d" s b e
let to_samtools_specification = function
| `Full -> None
| `Chromosome s -> Some s
| `Chromosome_interval (s, b, e) -> Some (sprintf "%s:%d-%d" s b e)
let to_samtools_option r =
match to_samtools_specification r with
| Some s -> sprintf "-r %s" s
| None -> ""
let to_gatk_option r =
match to_samtools_specification r with
| Some s -> sprintf "--intervals %s" s
| None -> ""
let parse_samtools s =
match String.split ~on:(`Character ':') s with
| [] -> assert false
| [one] -> `Chromosome one
| [one; two] ->
begin match String.split ~on:(`Character '-') two with
| [left; right] ->
begin match Int.of_string left, Int.of_string right with
| Some b, Some e -> `Chromosome_interval (one, b, e)
| _ -> failwithf "Cannot parse %S into 2 loci" two
end
| _ -> failwithf "Not one '-' in %S" two
end
| _ -> failwithf "Not one or zero ':' in %S" s
let cmdliner_term () =
let open Cmdliner in
Term.(
pure (function
| None -> `Full
| Some s -> parse_samtools s)
$ Arg.(
value & opt (some string) None
& info ["R"; "region"] ~docv:"REGION"
~doc:"Specify a region; using samtools' format"
)
)
end
module Tool_parameters
= struct
open Common
type t = {
name: string;
parameters: (string * string) list;
}
let to_json t: Yojson.Basic.json =
let {name; parameters} = t in
`Assoc [
"name", `String name;
"parameters",
`Assoc (List.map parameters ~f:(fun (a, b) -> a, `String b));
]
let render {parameters; _} =
List.concat_map parameters ~f:(fun (a,b) -> [a; b])
end
module Workflow_utilities
= struct
open Common
module Remove = struct
let file ~run_with path =
let open KEDSL in
workflow_node nothing
~name:(sprintf "rm-%s" (Filename.basename path))
~done_when:(`Is_verified (`Command_returns (
Command.shell ~host:Machine.(as_host run_with)
(sprintf "ls %s" path),
2)))
~make:(Machine.quick_run_program
run_with Program.(exec ["rm"; "-f"; path]))
~tags:[Target_tags.clean_up]
let directory ~run_with path =
let open KEDSL in
workflow_node nothing
~name:(sprintf "rmdir-%s" (Filename.basename path))
~done_when:(`Is_verified (`Command_returns (
Command.shell ~host:Machine.(as_host run_with)
(sprintf "ls %s" path),
2
)))
~make:(Machine.quick_run_program
run_with Program.(exec ["rm"; "-rf"; path]))
~tags:[Target_tags.clean_up]
let path_on_host ~host path =
let open KEDSL in
workflow_node nothing
~name:(sprintf "rm-%s" (Filename.basename path))
~make:(daemonize ~using:`Python_daemon ~host
Program.(exec ["rm"; "-rf"; path]))
end
module Gunzip = struct
let concat ~(run_with : Machine.t) bunch_of_dot_gzs ~result_path =
let open KEDSL in
let program =
Program.(
exec ["mkdir"; "-p"; Filename.dirname result_path]
&& shf "gunzip -c %s > %s"
(List.map bunch_of_dot_gzs
~f:(fun o -> Filename.quote o#product#path)
|> String.concat ~sep:" ") result_path
) in
let name =
sprintf "gunzipcat-%s" (Filename.basename result_path) in
workflow_node
(single_file result_path ~host:Machine.(as_host run_with))
~name
~make:(Machine.run_stream_processor ~name run_with program)
~edges:(
on_failure_activate Remove.(file ~run_with result_path)
:: List.map ~f:depends_on bunch_of_dot_gzs)
end
module Cat = struct
let concat ~(run_with : Machine.t) bunch_of_files ~result_path =
let open KEDSL in
let program =
Program.(
exec ["mkdir"; "-p"; Filename.dirname result_path]
&& shf "cat %s > %s"
(List.map bunch_of_files
~f:(fun o -> Filename.quote o#product#path)
|> String.concat ~sep:" ") result_path
) in
let name =
sprintf "concat-all-%s" (Filename.basename result_path) in
workflow_node
(single_file result_path ~host:Machine.(as_host run_with))
~name
~edges:(
on_failure_activate Remove.(file ~run_with result_path)
:: List.map ~f:depends_on bunch_of_files)
~make:(Machine.run_stream_processor run_with ~name program)
let cat_folder ~host
~(run_program : Machine.Make_fun.t)
?(depends_on=[]) ~files_gzipped ~folder ~destination =
let deps = depends_on in
let open KEDSL in
let name = "cat-folder-" ^ Filename.quote folder in
let edges =
on_failure_activate (Remove.path_on_host ~host destination)
:: List.map ~f:depends_on deps in
if files_gzipped then (
workflow_node (single_file destination ~host)
~edges ~name
~make:(
run_program ~name
Program.(
shf "gunzip -c %s/* > %s" (Filename.quote folder)
(Filename.quote destination)))
) else (
workflow_node
(single_file destination ~host)
~edges ~name
~make:(
run_program ~name
Program.(
shf "cat %s/* > %s" (Filename.quote folder) (Filename.quote destination)))
)
end
module Download = struct
let wget_program ?output_filename url =
KEDSL.Program.exec [
"wget";
"-O"; Option.value output_filename ~default:Filename.(basename url);
url
]
let wget_to_folder
~host ~(run_program : Machine.Make_fun.t)
~test_file ~destination url =
let open KEDSL in
let name = "wget-" ^ Filename.basename destination in
let test_target = destination // test_file in
workflow_node (single_file test_target ~host) ~name
~make:(
run_program ~name
~requirements:(Machine.Make_fun.downloading [])
Program.(
exec ["mkdir"; "-p"; destination]
&& shf "wget %s -P %s"
(Filename.quote url)
(Filename.quote destination)))
~edges:[
on_failure_activate (Remove.path_on_host ~host destination);
]
let wget
~host ~(run_program : Machine.Make_fun.t)
url destination =
let open KEDSL in
let name = "wget-" ^ Filename.basename destination in
workflow_node
(single_file destination ~host) ~name
~make:(
run_program ~name
~requirements:(Machine.Make_fun.downloading [])
Program.(
exec ["mkdir"; "-p"; Filename.dirname destination]
&& shf "wget %s -O %s"
(Filename.quote url) (Filename.quote destination)))
~edges:[
on_failure_activate (Remove.path_on_host ~host destination);
]
let wget_gunzip
~host ~(run_program : Machine.Make_fun.t)
~destination url =
let open KEDSL in
let is_gz = Filename.check_suffix url ".gz" in
if is_gz then (
let name = "gunzip-" ^ Filename.basename (destination ^ ".gz") in
let wgot = wget ~host ~run_program url (destination ^ ".gz") in
workflow_node
(single_file destination ~host)
~edges:[
depends_on (wgot);
on_failure_activate (Remove.path_on_host ~host destination);
]
~name
~make:(
run_program ~name
~requirements:(Machine.Make_fun.stream_processor [])
Program.(shf "gunzip -c %s > %s"
(Filename.quote wgot#product#path)
(Filename.quote destination)))
) else (
wget ~host ~run_program url destination
)
let wget_untar
~host ~(run_program : Machine.Make_fun.t)
~destination_folder ~tar_contains url =
let open KEDSL in
let zip_flags =
let is_gz = Filename.check_suffix url ".gz" in
let is_bzip = Filename.check_suffix url ".bz2" in
if is_gz then "z" else if is_bzip then "j" else ""
in
let tar_filename = (destination_folder ^ ".tar") in
let name = "untar-" ^ tar_filename in
let wgot = wget ~host ~run_program url tar_filename in
let file_in_tar = (destination_folder // tar_contains) in
workflow_node
(single_file file_in_tar ~host)
~edges:[
depends_on (wgot);
on_failure_activate (Remove.path_on_host ~host destination_folder);
]
~name
~make:(
run_program ~name
~requirements:(Machine.Make_fun.stream_processor [])
Program.(
exec ["mkdir"; "-p"; destination_folder]
&& shf "tar -x%s -f %s -C %s"
zip_flags
(Filename.quote wgot#product#path)
(Filename.quote destination_folder)))
end
module Vcftools = struct
let vcf_process_n_to_1_no_machine
~host
~vcftools
~(run_program : Machine.Make_fun.t)
?(more_edges = [])
~vcfs
~final_vcf
command_prefix
=
let open KEDSL in
let name = sprintf "%s-%s" command_prefix (Filename.basename final_vcf) in
let make =
run_program ~name
Program.(
Machine.Tool.(init vcftools)
&& shf "%s %s > %s"
command_prefix
(String.concat ~sep:" "
(List.map vcfs ~f:(fun t -> Filename.quote t#product#path)))
final_vcf
) in
workflow_node ~name
(single_file final_vcf ~host)
~make
~edges:(
on_failure_activate
(Remove.path_on_host ~host final_vcf)
:: depends_on Machine.Tool.(ensure vcftools)
:: List.map ~f:depends_on vcfs
@ more_edges)
let vcf_concat_no_machine
~host
~vcftools
~(run_program : Machine.Make_fun.t)
?more_edges
vcfs
~final_vcf =
vcf_process_n_to_1_no_machine
~host ~vcftools ~run_program ?more_edges ~vcfs ~final_vcf
"vcf-concat"
let vcf_sort_no_machine
~host
~vcftools
~(run_program : Machine.Make_fun.t)
?more_edges
~src ~dest () =
let run_program =
Machine.Make_fun.with_requirements run_program [`Memory `Big] in
vcf_process_n_to_1_no_machine
~host ~vcftools ~run_program ?more_edges ~vcfs:[src] ~final_vcf:dest
"vcf-sort -c"
end
end