(* code generated with [/Volumes/Encrypted_zzz/dev/biokepi/tools/build-doc.sh ketrew,ppx_deriving.std] *)
module Bedtools 
struct

open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove


module Configuration = struct
  module Intersect = struct
    type t = {
      params: string list;      
              (** Catch-all list of parameters to be concatted together and passed to the command. *)

      with_headers: bool;      
             (** The header of A will be prepended to the output. -header. *)

    }
    let default = { params = []; with_headers = true; }

    let render {params; with_headers} =
      (if with_headers then " -header " else " ")
      ^ (String.concat ~sep:" " params)
  end
end


let bamtofastq
    ~(run_with:Machine.t) ~sample_type ~output_prefix input_bam =
  let open KEDSL in
  let sorted_bam =
    Samtools.sort_bam_if_necessary
      ~run_with ~by:`Read_name input_bam in
  let fastq_output_options, r1, r2opt =
    match sample_type with
    | `Paired_end ->
      let r1 = sprintf "%s_R1.fastq" output_prefix in
      let r2 = sprintf "%s_R2.fastq" output_prefix in
      (["-fq"; r1; "-fq2"; r2], r1, Some r2)
    | `Single_end ->
      let r1 = sprintf "%s.fastq" output_prefix in
      (["-fq"; r1], r1, None)
  in
  let bedtools = Machine.get_tool run_with Machine.Tool.Default.bedtools in
  let src_bam = input_bam#product#path in
  let program =
    Program.(Machine.Tool.(init bedtools)
             && exec ["mkdir""-p"Filename.dirname r1]
             && exec ("bedtools" ::
                      "bamtofastq" ::  "-i" :: src_bam ::
                      fastq_output_options)) in
  let name =
    sprintf "bedtools-bamtofastq-%s"
      Filename.(basename src_bam |> chop_extension) in
  let make = Machine.run_program ~name run_with program in
  let edges = [
    depends_on Machine.Tool.(ensure bedtools);
    depends_on input_bam;
    on_failure_activate (Remove.file ~run_with r1);
    on_success_activate (Remove.file ~run_with sorted_bam#product#path);
  ] |> fun list ->
    begin match r2opt with
    | None -> list
    | Some r2 ->
      on_failure_activate (Remove.file ~run_with r2) :: list
    end
  in
  workflow_node
    (fastq_reads ~host:(Machine.as_host run_with) r1 r2opt)
    ~edges ~name ~make


(** Used to determine if features in multiple sets intersect with one another.

Feature sets include BED, VCF, GFF, and BAM files.

  • primary: The primary set file (workflow_node with #path).
  • intersect_with: List of set file workflow_nodes to intersect with.
  • result: Path to the resulting set.
*)

let intersect
    ~(run_with:Machine.t)
    ?(configuration=Configuration.Intersect.default)
    ~primary ~intersect_with output
  =
  let open KEDSL in
  let bedtools = Machine.get_tool run_with Machine.Tool.Default.bedtools in
  let arguments =
    (sprintf "-a %s" (Filename.quote primary#product#path)) ^
    (List.map ~f:(fun n -> (Filename.quote n#product#path)) intersect_with
     |> String.concat ~sep:","
     |> sprintf " -b %s ")
    ^ (Configuration.Intersect.render configuration)
  in
  let program =
    Program.(Machine.Tool.(init bedtools)
             && sh ("bedtools intersect "
                    ^ arguments ^ " > " ^ output)) in
  let name = sprintf "bedtools-intersect-%s-with-%s"
      (Filename.basename primary#product#path)
      (String.concat ~sep:"__"
         (List.map
            ~f:(fun n -> (Filename.basename n#product#path)) intersect_with)) in
  let make = Machine.run_program run_with ~name program in
  let edges = [
    depends_on primary;
    depends_on Machine.Tool.(ensure bedtools);
    on_failure_activate (Remove.file run_with output)
  ] @ (List.map ~f:depends_on intersect_with) in
  let host = Machine.as_host run_with in
  let out = single_file ~host output in
  workflow_node out ~name ~edges ~make
    ~done_when:(`Is_verified (out#is_bigger_than 1))
end
module Bwa 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove

module Configuration = struct

  let default_gap_open_penalty = 11
  let default_gap_extension_penalty = 4

  module Common = struct
    type t = {
      name: string;
      gap_open_penalty: int;
      gap_extension_penalty: int;
    }
    let default = {
      name = "default";
      gap_open_penalty = default_gap_open_penalty;
      gap_extension_penalty = default_gap_extension_penalty;
    }
    let name t = t.name
    let to_json {name; gap_open_penalty; gap_extension_penalty} =
      `Assoc [
        "name"`String name;
        "gap_open_penalty"`Int gap_open_penalty;
        "gap_extension_penalty"`Int gap_extension_penalty;
      ]
  end
  module Aln = Common
  module Mem = Common

end


let index
    ~reference_build
    ~(run_with : Machine.t) =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  (* `bwa index` creates a bunch of files, c.f.
     [this question](https://www.biostars.org/p/73585/) we detect the
     `.bwt` one. *)

  let bwa_tool = Machine.get_tool run_with Machine.Tool.Default.bwa in
  let name =
    sprintf "bwa-index-%s" (Filename.basename reference_fasta#product#path) in
  let result = sprintf "%s.bwt" reference_fasta#product#path in
  workflow_node ~name
    (single_file ~host:(Machine.(as_host run_with)) result)
    ~edges:[
      on_failure_activate (Remove.file ~run_with result);
      depends_on reference_fasta;
      depends_on Machine.Tool.(ensure bwa_tool);
    ]
    ~tags:[Target_tags.aligner]
    ~make:(Machine.run_big_program run_with ~name
             ~self_ids:["bwa""index"]
             Program.(
               Machine.Tool.(init bwa_tool)
               && shf "bwa index %s"
                 (Filename.quote reference_fasta#product#path)))

let read_group_header_option algorithm ~sample_name ~read_group_id =
  (* this option should magically make the sam file compatible
           mutect and other GATK-like pieces of software
           http://seqanswers.com/forums/showthread.php?t=17233

           The `LB` one seems “necessary” for somatic sniper:
           `[bam_header_parse] missing LB tag in @RG lines.`
  *)

  match algorithm with
  |`Mem ->
    sprintf "-R '@RG\\tID:%s\\tSM:%s\\tLB:ga\\tPL:Illumina'" read_group_id sample_name
  |`Aln ->
    sprintf "-r '@RG\\tID:%s\\tSM:%s\\tLB:ga\\tPL:Illumina'" read_group_id sample_name

let mem_align_to_sam
    ~reference_build
    ?(configuration = Configuration.Mem.default)
    ~fastq
    (* ~(r1: KEDSL.single_file KEDSL.workflow_node) *)
    (* ?(r2: KEDSL.single_file KEDSL.workflow_node option) *)
    ~(result_prefix:string)
    ~(run_with : Machine.t)
    () =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let in_work_dir =
    Program.shf "cd %s" Filename.(quote (dirname result_prefix)) in
  (* `bwa index` creates a bunch of files, c.f.
     [this question](https://www.biostars.org/p/73585/) we detect the
     `.bwt` one. *)

  let bwa_tool = Machine.get_tool run_with Machine.Tool.Default.bwa in
  let bwa_index = index ~reference_build ~run_with in
  let result = sprintf "%s.sam" result_prefix in
  let r1_path, r2_path_opt = fastq#product#paths in
  let name = sprintf "bwa-mem-%s" (Filename.basename r1_path) in
  let processors = Machine.max_processors run_with in
  let bwa_base_command =
    String.concat ~sep:" " [
      "bwa mem";
      (read_group_header_option `Mem 
         ~sample_name:fastq#product#escaped_sample_name
         ~read_group_id:(Filename.basename r1_path));
      "-t"Int.to_string processors;
      "-O"Int.to_string configuration.Configuration.Mem.gap_open_penalty;
      "-E"Int.to_string configuration.Configuration.Mem.gap_extension_penalty;
      (Filename.quote reference_fasta#product#path);
      (Filename.quote r1_path);
    ] in
  let bwa_base_target ~bwa_command  = 
    workflow_node
      (single_file result ~host:Machine.(as_host run_with))
      ~name
      ~edges:(
        depends_on Machine.Tool.(ensure bwa_tool)
        :: depends_on bwa_index
        :: depends_on fastq
        :: on_failure_activate (Remove.file ~run_with result)
        :: [])
      ~tags:[Target_tags.aligner]
      ~make:(Machine.run_big_program run_with ~processors ~name
               ~self_ids:["bwa""mem"]
               Program.(
                 Machine.Tool.(init bwa_tool)
                 && in_work_dir
                 && sh bwa_command))
  in
  match r2_path_opt with
  | Some read2 -> 
    let bwa_command =
      String.concat ~sep:" " [
        bwa_base_command;
        (Filename.quote read2);
        ">"; (Filename.quote result);
      ] in
    bwa_base_target ~bwa_command
  | None -> 
    let bwa_command =
      String.concat ~sep:" " [
        bwa_base_command;
        ">"; (Filename.quote result);
      ] in
    bwa_base_target ~bwa_command



let align_to_sam
    ~reference_build
    ?(configuration = Configuration.Aln.default)
    ~fastq
    ~(result_prefix:string)
    ~(run_with : Machine.t)
    () =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let in_work_dir =
    Program.shf "cd %s" Filename.(quote (dirname result_prefix)) in
  (* `bwa index` creates a bunch of files, c.f.
     [this question](https://www.biostars.org/p/73585/) we detect the
     `.bwt` one. *)

  let bwa_tool = Machine.get_tool run_with Machine.Tool.Default.bwa in
  let bwa_index = index ~reference_build ~run_with in
  let processors = Machine.max_processors run_with in
  let bwa_aln read_number read =
    let name = sprintf "bwa-aln-%s" (Filename.basename read) in
    let result = sprintf "%s-R%d.sai" result_prefix read_number in
    let bwa_command =
      String.concat ~sep:" " [
        "bwa aln";
        "-t"Int.to_string processors;
        "-O"Int.to_string configuration.Configuration.Aln.gap_open_penalty;
        "-E"Int.to_string configuration.Configuration.Aln.gap_extension_penalty;
        (Filename.quote reference_fasta#product#path);
        (Filename.quote read);
        ">"; (Filename.quote result);
      ] in
    workflow_node
      (single_file result ~host:Machine.(as_host run_with))
      ~name
      ~edges:[
        depends_on fastq;
        depends_on bwa_index;
        depends_on Machine.Tool.(ensure bwa_tool);
        on_failure_activate (Remove.file ~run_with result);
      ]
      ~tags:[Target_tags.aligner]
      ~make:(Machine.run_big_program run_with ~processors ~name
               ~self_ids:["bwa""aln"]
               Program.(
                 Machine.Tool.(init bwa_tool)
                 && in_work_dir
                 && sh bwa_command
               ))
  in
  let r1_path, r2_path_opt = fastq#product#paths in
  let r1_sai = bwa_aln 1 r1_path in
  let r2_sai_opt = Option.map r2_path_opt ~f:(fun r -> (bwa_aln 2 r, r)) in
  let sam =
    let name = sprintf "bwa-sam-%s" (Filename.basename result_prefix) in
    let result = sprintf "%s.sam" result_prefix in
    let program, edges =
      let common_edges = [
        depends_on r1_sai;
        depends_on reference_fasta;
        depends_on bwa_index;
        depends_on Machine.Tool.(ensure bwa_tool);
        on_failure_activate (Remove.file ~run_with result);
      ] in
      match r2_sai_opt with
      | Some (r2_sai, r2) ->
        Program.(
          Machine.Tool.(init bwa_tool)
          && in_work_dir
          && shf "bwa sampe %s %s %s %s %s %s > %s"
            (read_group_header_option `Aln 
               ~sample_name:fastq#product#escaped_sample_name
               ~read_group_id:(Filename.basename r1_path))
            (Filename.quote reference_fasta#product#path)
            (Filename.quote r1_sai#product#path)
            (Filename.quote r2_sai#product#path)
            (Filename.quote r1_path)
            (Filename.quote r2)
            (Filename.quote result)),
        (depends_on r2_sai :: common_edges)
      | None ->
        Program.(
          Machine.Tool.(init bwa_tool)
          && in_work_dir
          && shf "bwa samse %s %s %s > %s"
            (read_group_header_option `Aln 
               ~sample_name:fastq#product#escaped_sample_name
               ~read_group_id:(Filename.basename r1_path))
            (Filename.quote reference_fasta#product#path)
            (Filename.quote r1_sai#product#path)
            (Filename.quote result)),
        common_edges
    in
    workflow_node
      (single_file result ~host:Machine.(as_host run_with))
      ~name ~edges
      ~tags:[Target_tags.aligner]
      ~make:(Machine.run_big_program run_with ~processors:1 ~name  program
               ~self_ids:["bwa""sampe"])
  in
  sam
end
module Cufflinks 
struct
open Biokepi_run_environment
open Common


let run ~reference_build
    ~(run_with:Machine.t)
    ~processors 
    ~bam 
    ~result_prefix =
  let open KEDSL in
  let name = sprintf "cufflinks-%s" (Filename.basename result_prefix) in
  let result_file suffix = result_prefix ^ suffix in
  let output_dir = result_file "-cufflinks_output" in
  let genes_gtf_output = output_dir // "genes.fpkm_tracking" in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let reference_annotations =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.gtf_exn in
  let cufflinks_tool =
    Machine.get_tool run_with Machine.Tool.Default.cufflinks in
  let sorted_bam =
    Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate bam in
  let processors = Machine.max_processors run_with in
  let make =
    Machine.run_big_program run_with ~name ~processors
      ~self_ids:["cufflinks"]
      Program.(
        Machine.Tool.init cufflinks_tool
        && shf "mkdir -p %s" output_dir
        && shf "cufflinks -p %d -G %s -o %s %s
                "

          processors
          reference_annotations#product#path
          output_dir
          sorted_bam#product#path
      )
  in
  workflow_node ~name ~make
    (single_file genes_gtf_output ~host:(Machine.as_host run_with))
    ~edges:[
      depends_on bam;
      depends_on reference_fasta;
      depends_on reference_annotations;
      depends_on (Machine.Tool.ensure cufflinks_tool);
      depends_on (Samtools.index_to_bai ~run_with sorted_bam);
    ]
end
module Cycledash 
struct
open Biokepi_run_environment
open Common



let post_to_cycledash_script =
  (* we use rawgit.com and not cdn.rawgit.com because the CDN caches
     old version for ever *)

  "https://gist.githubusercontent.com/smondet/4beec3cbd7c6a3a922bc/raw"

let post_vcf
    ~run_with
    ~vcf
    ~variant_caller_name
    ~dataset_name
    ?truth_vcf
    ?normal_bam
    ?tumor_bam
    ?params
    ?witness_output
    url =
  let open KEDSL in
  let unik_script = sprintf "/tmp/upload_to_cycledash_%s" (Unique_id.create ()) in
  let script_options =
    let with_path opt s = opt, s#product#path in
    List.filter_opt [
      Some ("-V", vcf#product#path);
      Some ("-v", variant_caller_name);
      Some ("-P", dataset_name);
      Option.map truth_vcf ~f:(with_path "-T"); 
      Some ("-U", url);
      Option.map tumor_bam ~f:(with_path "-t");
      Option.map normal_bam ~f:(with_path "-n");
      Option.map params ~f:(fun p -> "-N", p);
      Some ("-w"Option.value witness_output ~default:"/tmp/www")
    ]
    |> List.concat_map ~f:(fun (x, y) -> [x; y]) in
  let name =
    sprintf "upload+cycledash: %s" (Filename.basename vcf#product#path) in
  let make =
    Machine.run_download_program run_with Program.(
        shf "curl -f %s > %s"
          (Filename.quote post_to_cycledash_script)
          (Filename.quote unik_script)
        && 
        exec ("sh" :: unik_script :: script_options)
      )
  in
  let edges =
    let dep = Option.map ~f:depends_on in
    [ dep (Some vcf);
      dep truth_vcf;
      dep normal_bam;
      dep tumor_bam; ] |> List.filter_opt in
  match witness_output with
  | None ->
    workflow_node nothing ~name ~make ~edges
  | Some path ->
    workflow_node ~name ~make ~edges
      (single_file path ~host:Machine.(as_host run_with))
    |> forget_product

end
module Gatk 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove


module Configuration = struct

  module Gatk_config () = struct
    type t = {
      
      (** The name of the configuration, specific to Biokepi. *)

      name: string;

      
      (** MalformedReadFilter options.

This filter is applied automatically by all GATK tools in order to protect them from crashing on reads that are grossly malformed. There are a few issues (such as the absence of sequence bases) that will cause the run to fail with an error, but these cases can be preempted by setting flags that cause the problem reads to also be filtered. *)


      
      (** Ignore reads with CIGAR containing the N operator, instead of failing with an error *)

      filter_reads_with_n_cigar: bool;
      
      (** Ignore reads with mismatching numbers of bases and base qualities, instead of failing with an error.*)

      filter_mismatching_base_and_quals: bool;
      
      (** Ignore reads with no stored bases (i.e. '*' where the sequence should be), instead of failing with an error *)

      filter_bases_not_stored: bool;

      
      (** Other parameters: *)

      parameters: (string * string) list;
    }

    let name t = t.name

    let to_json t: Yojson.Basic.json =
      let {name;
           filter_reads_with_n_cigar;
           filter_mismatching_base_and_quals;
           filter_bases_not_stored;
           parameters} = t in
      `Assoc [
        "name"`String name;
        "filter_reads_with_N_cigar"`Bool filter_reads_with_n_cigar;
        "filter_mismatching_base_and_quals"`Bool filter_mismatching_base_and_quals;
        "filter_bases_not_stored"`Bool filter_bases_not_stored;
        "parameters",
        `Assoc (List.map parameters ~f:(fun (a, b) -> a, `String b));
      ]

    let render {name;
                filter_reads_with_n_cigar;
                filter_mismatching_base_and_quals;
                filter_bases_not_stored;
                parameters} =
      (if filter_reads_with_n_cigar
       then "--filter_reads_with_N_cigar" else "") ::
      (if filter_mismatching_base_and_quals
       then "--filter_mismatching_base_and_quals" else "") ::
      (if filter_bases_not_stored
       then "--filter_bases_not_stored" else "") ::
      List.concat_map parameters ~f:(fun (a, b) -> [a; b])
      |> List.filter ~f:(fun s -> not (String.is_empty s))

    let default =
      {name = "default";
       filter_reads_with_n_cigar = false;
       filter_mismatching_base_and_quals = false;
       filter_bases_not_stored = false;
       parameters = []}
  end

  module Indel_realigner = struct
    include Gatk_config ()
  end

  module Realigner_target_creator = struct
    include Gatk_config ()
  end

  module Bqsr = struct
    include Gatk_config ()
  end

  module Print_reads = struct
    include Gatk_config ()
  end

  type indel_realigner = (Indel_realigner.t * Realigner_target_creator.t)
  type bqsr = (Bqsr.t * Print_reads.t)

  let default_indel_realigner = (Indel_realigner.default, Realigner_target_creator.default)
  let default_bqsr = (Bqsr.default, Print_reads.default)


  module Mutect2 = struct
    type t = {
      name: string;
      use_dbsnp: bool;
      use_cosmic: bool;
      additional_arguments: string list;
    }
    let create
        ?(use_dbsnp = true) ?(use_cosmic = true) name additional_arguments =
      {name; use_dbsnp; use_cosmic; additional_arguments}

    let to_json {name; use_dbsnp; use_cosmic; additional_arguments}
      : Yojson.Basic.json =
    `Assoc [
      "name"`String name;
      "use-cosmic"`Bool use_cosmic;
      "use-dbsnp"`Bool use_dbsnp;
      "additional-arguments",
      `List (List.map additional_arguments ~f:(fun s -> `String s));
    ]

    let default = create "default" []

    let compile ~reference {name; use_dbsnp; use_cosmic; additional_arguments} =
      let with_db use opt_name get_exn =
        if not use then None
        else
          let node = get_exn reference in
          Some ( [opt_name; node#product#path], [KEDSL.depends_on node])
      in
      let args, edges =
        List.filter_opt [
          with_db use_dbsnp  "--dbsnp" Reference_genome.dbsnp_exn;
          with_db use_cosmic  "--cosmic" Reference_genome.cosmic_exn;
        ]
        |> List.split
      in
      (`Arguments (List.concat args @ additional_arguments),
       `Edges (List.concat edges))

    let name t = t.name
  end


end

  (*
     For now we have the two steps in the same target but this could
     be split in two.
     c.f. https://www.broadinstitute.org/gatk/guide/tooldocs/org_broadinstitute_gatk_tools_walkers_indels_IndelRealigner.php

     We want to be able to run the indel-realigner on mutliple bams, so we
     cannot use the usual `~result_prefix` argument:
     See the documentation for the `--nWayOut` option:
     https://www.broadinstitute.org/gatk/guide/tooldocs/org_broadinstitute_gatk_tools_walkers_indels_IndelRealigner.php#--nWayOut
     See also
     http://gatkforums.broadinstitute.org/gatk/discussion/5588/best-practice-for-multi-sample-non-human-indel-realignment

     Also, the documentation is incomplete (or buggy), the option `--nWayOut`
     will output the Bam files in the current directory (i.e. the one GATK is
     running in).
     So, unless the user uses the `?run_directory` option, we extract that
     directory from the input-bams; if they do not coincide we consider this an
     error.


     On top of that we use the GADT `_ KEDSL.bam_or_bams` to return have 2
     possible return types:
     bam_file workflow_node or bam_list workflow_node
  *)

open Configuration

let indel_realigner_output_filename_tag
    ~configuration:(ir_config, target_config)
    ?region input_bams =
  let digest_of_input () =
    List.map input_bams ~f:(fun o -> o#product#path)
    |> String.concat ~sep:""
    (* we make this file “unique” with an MD5 sum of the input paths *)
    |> Digest.string |> Digest.to_hex in
  let bam_number = List.length input_bams in
  String.concat ~sep:"" [
    "_indelreal-";
    ir_config.Configuration.Indel_realigner.name;
    "-";
    target_config.Configuration.Realigner_target_creator.name;
    "-";
    sprintf "-%dx" bam_number;
    (if bam_number = 1 then "" else "-" ^ digest_of_input ());
    Option.value_map ~f:(fun r -> "-" ^ Region.to_filename r) region ~default:"";
  ]

let indel_realigner :
  type a.
  ?compress:bool ->
  ?on_region: Region.t ->
  configuration:(Indel_realigner.t * Realigner_target_creator.t) ->
  run_with:Machine.t ->
  ?run_directory: string ->
  a KEDSL.bam_or_bams ->
  a =
  fun ?(compress=false)
    ?(on_region = `Full)
    ~configuration ~run_with ?run_directory
    input_bam_or_bams ->
    let open KEDSL in
    let input_bam_1, more_input_bams = (* this an at-least-length-1 list :)  *)
      match input_bam_or_bams with
      | Single_bam bam -> bam, []
      | Bam_workflow_list [] ->
        failwithf "Empty bam-list in Gatk.indel_realigner`"
      | Bam_workflow_list (one :: more) -> (one, more)
    in
    let run_directory =
      match run_directory with
      | None ->
        let dir = Filename.dirname input_bam_1#product#path in
        List.iter more_input_bams ~f:(fun bam ->
            if Filename.dirname bam#product#path <> dir then
              failwithf "These two BAMS are not in the same directory:\n\    %s\n\    %s\nGATK.indel_realigner when running on multiple bams requires a proper run-directory, clean-up your bams or provide the option ~run_directory
                      "
 input_bam_1#product#path bam#product#path
          );
        dir
      | Some rundir -> rundir
    in
    let indel_config, target_config = configuration in
    let input_sorted_bam_1 =
      Samtools.sort_bam_if_necessary
        ~run_with ~by:`Coordinate input_bam_1 in
    let more_input_sorted_bams =
      List.map more_input_bams
        ~f:(Samtools.sort_bam_if_necessary
              ~run_with ~by:`Coordinatein
    let more_input_bams = `Use_the_sorted_ones_please in
    let input_bam_1 = `Use_the_sorted_ones_please in
    ignore (more_input_bams, input_bam_1);
    let name =
      sprintf "gatk-indelrealign-%s-%dx-%s"
        (Region.to_filename on_region)
        (List.length more_input_sorted_bams + 1)
        (Filename.basename input_sorted_bam_1#product#path)
    in
    let gatk = Machine.get_tool run_with Machine.Tool.Default.gatk in
    let reference_genome =
      let reference_build = input_sorted_bam_1#product#reference_build in
      Machine.get_reference_genome run_with reference_build in
    let fasta = Reference_genome.fasta reference_genome in
    (*
    let digest_of_input =
      List.map (input_sorted_bam_1 :: more_input_sorted_bams)
        ~f:(fun o -> o#product#path)
      |> String.concat ~sep:""
      (* we make this file “unique” with an MD5 sum of the input paths *)
      |> Digest.string |> Digest.to_hex in
       *)

    let output_suffix =
      indel_realigner_output_filename_tag
        ~configuration ~region:on_region
        (input_sorted_bam_1 :: more_input_sorted_bams)
        (* ~bam_number:(List.length more_input_sorted_bams + 1) *)
        (* ~digest_of_input *)
        (*
      sprintf "_indelreal-%s-%s-%dx%s"
        target_config.Configuration.Indel_realigner.name
        (Region.to_filename on_region)
        (List.length more_input_sorted_bams + 1)
        (if more_input_sorted_bams = [] then "" else "-" ^ digest_of_input)
           *)

    in
    let intervals_file =
      Filename.chop_suffix input_sorted_bam_1#product#path ".bam"
      ^ output_suffix ^ ".intervals" in
    let output_bam_path input =
      run_directory // (
        Filename.chop_extension input#product#path ^ output_suffix ^ ".bam"
        |> Filename.basename)
    in
    let processors = Machine.max_processors run_with in
    let make =
      let target_creation_args =
        [
          "-R"Filename.quote fasta#product#path;
          "-I"Filename.quote input_sorted_bam_1#product#path;
          "-o"Filename.quote intervals_file;
          "-nt"Int.to_string processors;
        ]
        @ Realigner_target_creator.render target_config
        @ List.concat_map more_input_sorted_bams
          ~f:(fun bam -> ["-I"Filename.quote bam#product#path])
      in
      let indel_real_args =
        [ "-R"; fasta#product#path;
          "-I"; input_sorted_bam_1#product#path;
          "-targetIntervals"; intervals_file;
        ] @ Indel_realigner.render indel_config @
        begin match more_input_sorted_bams with
        | [] ->
          ["-o"; output_bam_path input_sorted_bam_1]
        | more ->
          List.concat_map more
            ~f:(fun b -> ["-I"Filename.quote b#product#path])
          @ ["--nWayOut"; output_suffix ^ ".bam"]
        end
      in
      let intervals_option = Region.to_gatk_option on_region in
      Machine.run_big_program run_with ~name ~processors
        ~self_ids:["gatk""indel-realigner"]
        Program.(
          Machine.Tool.(init gatk)
          && shf "cd %s" (Filename.quote run_directory)
          && shf "java -jar $GATK_JAR -T RealignerTargetCreator %s %s"
            intervals_option
            (String.concat ~sep:" " target_creation_args)
          && sh ("java -jar $GATK_JAR -T IndelRealigner "
                 ^ intervals_option
                 ^ (if compress then " " else " -compress 0 ")
                 ^ (String.concat ~sep:" " indel_real_args)))
    in
    let edges =
      let sequence_dict = (* implicit dependency *)
        Picard.create_dict ~run_with fasta in
      [
        depends_on Machine.Tool.(ensure gatk);
        depends_on fasta;
        (* RealignerTargetCreator wants the `.fai`: *)
        depends_on (Samtools.faidx ~run_with fasta);
        depends_on sequence_dict;
        on_failure_activate (Remove.file ~run_with intervals_file);
      ]
      @ List.concat_map (input_sorted_bam_1 :: more_input_sorted_bams) ~f:(fun b -> [
            depends_on b;
            depends_on (Samtools.index_to_bai ~run_with b);
            on_failure_activate (Remove.file ~run_with (output_bam_path b));
          ])
    in
    let node : type a. a bam_or_bams -> a =
      (* we need a function to force `type a.` *)
      function
      | Single_bam _ ->
        (* This is what we give to the `-o` option: *)
        workflow_node  ~name ~make ~edges
          (transform_bam input_sorted_bam_1#product
             (output_bam_path input_sorted_bam_1))
      | Bam_workflow_list _ ->
        workflow_node  ~name ~make ~edges
          (bam_list
             (List.map (input_sorted_bam_1 :: more_input_sorted_bams)
                ~f:(fun b ->
                    (* This is what the documentation says it will to
                       with the `--nWayOut` option *)

                    transform_bam b#product (output_bam_path b))))
    in
    node input_bam_or_bams

let indel_realigner_map_reduce :
  type a.
  ?compress:bool ->
  configuration:(Indel_realigner.t * Realigner_target_creator.t) ->
  run_with:Machine.t ->
  ?run_directory: string ->
  a KEDSL.bam_or_bams ->
  a =
  fun ?compress
    ~configuration ~run_with
    ?run_directory
    input_bam_or_bams ->
    let open KEDSL in
    begin match input_bam_or_bams with
    | Single_bam bam_node ->
      let all_nodes =
        let f on_region =
          indel_realigner ?compress
            ~on_region
            ~configuration ~run_with ?run_directory
            input_bam_or_bams
        in
        let reference =
          Machine.get_reference_genome run_with
            bam_node#product#reference_build in
        List.map ~f (Reference_genome.major_contigs reference)
      in
      let result_path =
        Filename.chop_extension bam_node#product#path
        ^ indel_realigner_output_filename_tag
          ~configuration [bam_node]
        ^ "-merged.bam"
      in
      Samtools.merge_bams ~run_with all_nodes result_path
    | Bam_workflow_list bams ->
      let all_nodes =
        (* A list of lists that looks like:
           [
             [bam1_reg1; bam2_reg1; bam3_reg1];
             [bam1_reg2; bam2_reg2; bam3_reg2];
             [bam1_reg3; bam2_reg3; bam3_reg3];
             [bam1_reg4; bam2_reg4; bam3_reg4];
           ]
        *)

        let f on_region =
          let bam_list_node =
            indel_realigner ?compress
              ~on_region
              ~configuration ~run_with ?run_directory
              input_bam_or_bams
          in
          let exploded = KEDSL.explode_bam_list_node bam_list_node in
          exploded
        in
        let reference =
          Machine.get_reference_genome run_with
            (List.hd_exn bams)#product#reference_build in
        List.map ~f (Reference_genome.major_contigs reference)
      in
      let merged_bams =
        List.mapi bams ~f:(fun index bam ->
            let all_regions_for_this_bam =
              List.map all_nodes ~f:(fun region_n ->
                  List.nth region_n index |>
                  Option.value_exn ~msg:"bug in Gatk.indel_realigner_map_reduce")
            in
            let result_path =
              Filename.chop_extension bam#product#path
              ^ sprintf "-%d-" index (* the index is there as debug/witness *)
              ^ indel_realigner_output_filename_tag
                ~configuration bams
              ^ "-merged.bam"
            in
            Samtools.merge_bams ~run_with all_regions_for_this_bam result_path
          )
      in
      workflow_node ~name:"Indel-realigner-map-reduce"
        ~edges:(List.map merged_bams ~f:depends_on)
        (bam_list (List.map merged_bams ~f:(fun n -> n#product)))
    end



(* Again doing two steps in one target for now:
   http://gatkforums.broadinstitute.org/discussion/44/base-quality-score-recalibrator
   https://www.broadinstitute.org/gatk/guide/tooldocs/org_broadinstitute_gatk_tools_walkers_bqsr_BaseRecalibrator.php
*)

let call_gatk ~analysis ?(region=`Full) args =
  let open KEDSL.Program in
  let escaped_args = List.map ~f:Filename.quote args in
  let intervals_option = Region.to_gatk_option region in
  sh (String.concat ~sep:" "
        ("java -jar $GATK_JAR -T " :: analysis :: intervals_option :: escaped_args))

let base_quality_score_recalibrator
    ~configuration:(bqsr_configuration, print_reads_configuration)
    ~run_with
    ~input_bam ~output_bam =
  let open KEDSL in
  let name = sprintf "gatk-%s" (Filename.basename output_bam) in
  let gatk = Machine.get_tool run_with Machine.Tool.Default.gatk in
  let reference_genome =
    Machine.get_reference_genome run_with input_bam#product#reference_build in
  let fasta = Reference_genome.fasta reference_genome in
  let db_snp = Reference_genome.dbsnp_exn reference_genome in
  let sorted_bam =
    Samtools.sort_bam_if_necessary
      ~run_with ~by:`Coordinate input_bam in
  let input_bam = `Please_use_the_sorted_one in ignore input_bam;
  let recal_data_table =
    Filename.chop_suffix sorted_bam#product#path ".bam" ^ "-recal_data.table"
  in
  let processors = Machine.max_processors run_with in
  let make =
    Machine.run_big_program run_with ~name ~processors
      ~self_ids:["gatk""bqsr"]
      Program.(
        Machine.Tool.(init gatk)
        && call_gatk ~analysis:"BaseRecalibrator" ([
          "-nct"Int.to_string processors;
          "-I"; sorted_bam#product#path;
          "-R"; fasta#product#path;
          "-knownSites"; db_snp#product#path;
          "-o"; recal_data_table;
        ] @ Configuration.Bqsr.render bqsr_configuration)
        && call_gatk ~analysis:"PrintReads" ([
          "-nct"Int.to_string processors;
          "-R"; fasta#product#path;
          "-I"; sorted_bam#product#path;
          "-BQSR"; recal_data_table;
          "-o"; output_bam;
        ] @ Configuration.Print_reads.render print_reads_configuration)
      ) in
  workflow_node ~name (transform_bam sorted_bam#product ~path:output_bam)
    ~make
    ~edges:[
      depends_on Machine.Tool.(ensure gatk);
      depends_on fasta; depends_on db_snp;
      depends_on sorted_bam;
      depends_on (Samtools.index_to_bai ~run_with sorted_bam);
      on_failure_activate (Remove.file ~run_with output_bam);
      on_failure_activate (Remove.file ~run_with recal_data_table);
    ]


let haplotype_caller
    ?(more_edges = [])
    ~run_with ~input_bam ~result_prefix how =
  let open KEDSL in
  let reference =
    Machine.get_reference_genome run_with input_bam#product#reference_build in
  let run_on_region ~add_edges region =
    let result_file suffix =
      let region_name = Region.to_filename region in
      sprintf "%s-%s%s" result_prefix region_name suffix in
    let output_vcf = result_file "-germline.vcf" in
    let gatk = Machine.get_tool run_with Machine.Tool.Default.gatk in
    let run_path = Filename.dirname output_vcf in
    let reference_fasta = Reference_genome.fasta reference in
    let reference_dot_fai = Samtools.faidx ~run_with reference_fasta in
    let sequence_dict = Picard.create_dict ~run_with reference_fasta in
    let dbsnp = Reference_genome.dbsnp_exn reference in
    let sorted_bam =
      Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate input_bam in
    let run_gatk_haplotype_caller =
      let name = sprintf "%s" (Filename.basename output_vcf) in
      let make =
        Machine.run_big_program run_with ~name
          ~self_ids:["gatk""haplotype-caller"]
          Program.(
            Machine.Tool.(init gatk)
            && shf "mkdir -p %s" run_path
            && shf "cd %s" run_path
            && call_gatk ~region ~analysis:"HaplotypeCaller" [
              "-I"; sorted_bam#product#path;
              "-R"; reference_fasta#product#path;
              "--dbsnp"; dbsnp#product#path;
              "-o"; output_vcf;
              "--filter_reads_with_N_cigar";
            ]
          )
      in
      workflow_node ~name ~make
        (single_file output_vcf ~host:Machine.(as_host run_with))
        ~tags:[Target_tags.variant_caller]
        ~edges:(add_edges @ [
            depends_on Machine.Tool.(ensure gatk);
            depends_on sorted_bam;
            depends_on reference_fasta;
            depends_on dbsnp;
            depends_on reference_dot_fai;
            depends_on sequence_dict;
            depends_on (Samtools.index_to_bai ~run_with sorted_bam);
            on_failure_activate (Remove.file ~run_with output_vcf);
          ])
    in
    run_gatk_haplotype_caller
  in
  match how with
  | `Region region -> run_on_region ~add_edges:more_edges region
  | `Map_reduce ->
    let targets =
      List.map (Reference_genome.major_contigs reference)
        ~f:(run_on_region ~add_edges:[]) (* we add edges only to the last step *)
    in
    let final_vcf = result_prefix ^ "-merged.vcf" in
    Vcftools.vcf_concat ~run_with targets ~final_vcf ~more_edges

(** Call somatic variants with Mutect2.

Mutect2 comes within the GATK (as opposed to Mutect).

Cf. also https://www.broadinstitute.org/gatk/guide/tooldocs/org_broadinstitute_gatk_tools_walkers_cancer_m2_MuTect2.php *)


let mutect2
    ?(more_edges = [])
    ~configuration
    ~run_with
    ~input_normal_bam ~input_tumor_bam (* The doc says only one of each *)
    ~result_prefix how =
  let open KEDSL in
  let reference =
    Machine.get_reference_genome run_with
      input_normal_bam#product#reference_build in
  let run_on_region ~add_edges region =
    let result_file suffix =
      let region_name = Region.to_filename region in
      sprintf "%s-%s%s" result_prefix region_name suffix in
    let output_vcf = result_file "-mutect2.vcf" in
    let gatk = Machine.get_tool run_with Machine.Tool.Default.gatk in
    let run_path = Filename.dirname output_vcf in
    let reference_fasta = Reference_genome.fasta reference in
    let reference_dot_fai = Samtools.faidx ~run_with reference_fasta in
    let sequence_dict = Picard.create_dict ~run_with reference_fasta in
    let sorted_normal_bam =
      Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate input_normal_bam
    in
    let sorted_tumor_bam =
      Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate input_tumor_bam
    in
    let `Arguments config_arguments, `Edges confg_edges =
      Configuration.Mutect2.compile ~reference configuration in
    let run_caller =
      let name = sprintf "%s" (Filename.basename output_vcf) in
      let make =
        Machine.run_big_program run_with ~name
          ~self_ids:["gatk""mutect2"]
          Program.(
            Machine.Tool.(init gatk)
            && shf "mkdir -p %s" run_path
            && shf "cd %s" run_path
            && call_gatk ~region ~analysis:"MuTect2"
              ([ "-I:normal"; sorted_normal_bam#product#path;
                 "-I:tumor"; sorted_tumor_bam#product#path;
                 "-R"; reference_fasta#product#path;
                 "-o"; output_vcf; ]
               @ config_arguments)
          )
      in
      workflow_node ~name ~make
        (single_file output_vcf ~host:Machine.(as_host run_with))
        ~tags:[Target_tags.variant_caller]
        ~edges:(add_edges @ confg_edges @ [
            depends_on Machine.Tool.(ensure gatk);
            depends_on sorted_normal_bam;
            depends_on sorted_tumor_bam;
            depends_on reference_fasta;
            depends_on reference_dot_fai;
            depends_on sequence_dict;
            depends_on (Samtools.index_to_bai ~run_with sorted_normal_bam);
            depends_on (Samtools.index_to_bai ~run_with sorted_tumor_bam);
            on_failure_activate (Remove.file ~run_with output_vcf);
          ])
    in
    run_caller
  in
  match how with
  | `Region region -> run_on_region ~add_edges:more_edges region
  | `Map_reduce ->
    let targets =
      List.map (Reference_genome.major_contigs reference)
        ~f:(run_on_region ~add_edges:[]) (* we add edges only to the last step *)
    in
    let final_vcf = result_prefix ^ "-merged.vcf" in
    Vcftools.vcf_concat ~run_with targets ~final_vcf ~more_edges
end
module Hisat 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove

module Configuration = struct
  type t = {
    name : string;
    version : [`V_0_1_6_beta | `V_2_0_2_beta];
  }
  let to_json {name; version}: Yojson.Basic.json =
    `Assoc [
      "name"`String name;
      "version",
      (match version with
      |`V_0_1_6_beta -> `String "V_0_1_6_beta"
      |`V_2_0_2_beta -> `String "V_2_0_2_beta");
    ]
  let default_v1 = {name = "default_v1"; version = `V_0_1_6_beta}
  let default_v2 = {name = "default_v2"; version = `V_2_0_2_beta}
  let get_tool t =
    let open Machine.Tool.Default in
    match t.version with
    |`V_0_1_6_beta -> hisat
    |`V_2_0_2_beta -> hisat2
  let name t = t.name
end

let index
    ~reference_build
    ~index_prefix
    ~configuration
    ~(run_with : Machine.t) =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let result_dir = Filename.dirname index_prefix in
  let version = configuration.Configuration.version in
  let hisat_tool =
    Machine.get_tool run_with (Configuration.get_tool configuration) in
  let build_binary =
    match version with
    | `V_0_1_6_beta -> "hisat-build"
    | `V_2_0_2_beta -> "hisat2-build"
  in
  let name =
    sprintf "%s-%s" build_binary (Filename.basename reference_fasta#product#path) in
  let first_index_file =
    match version with
    | `V_0_1_6_beta -> sprintf "%s.1.bt2" index_prefix
    | `V_2_0_2_beta -> sprintf "%s.1.ht2" index_prefix
  in
  workflow_node ~name
    (single_file ~host:(Machine.(as_host run_with)) first_index_file)
    ~edges:[
      on_failure_activate (Remove.directory ~run_with result_dir);
      depends_on reference_fasta;
      depends_on Machine.Tool.(ensure hisat_tool);
    ]
    ~tags:[Target_tags.aligner]
    ~make:(Machine.run_big_program run_with ~name
             ~self_ids:["hisat""index"]
             Program.(
               Machine.Tool.(init hisat_tool)
               && shf "mkdir %s" result_dir
               && shf "%s %s %s"
                 build_binary
                 reference_fasta#product#path
                 index_prefix
             ))

let align
    ~reference_build
    ~configuration
    ~fastq
    ~(result_prefix:string)
    ~(run_with : Machine.t)
    () =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let reference_dir = (Filename.dirname reference_fasta#product#path) in
  let version = configuration.Configuration.version in
  let hisat_binary =
    match version with
      | `V_0_1_6_beta -> "hisat"
      | `V_2_0_2_beta -> "hisat2"
  in
  let index_dir = sprintf "%s/%s-index/" reference_dir hisat_binary in
  let index_prefix = index_dir // (sprintf  "%s-index" hisat_binary) in
  let in_work_dir =
    Program.shf "cd %s" Filename.(quote (dirname result_prefix)) in
  let hisat_tool =
    Machine.get_tool run_with (Configuration.get_tool configuration) in
  let hisat_index = index ~index_prefix ~reference_build ~run_with ~configuration in
  let result = sprintf "%s.sam" result_prefix in
  let r1_path, r2_path_opt = fastq#product#paths in
  let name = sprintf "%s-rna-align-%s" hisat_binary (Filename.basename r1_path) in
  let processors = Machine.max_processors run_with in
  let hisat_base_command = sprintf
      "%s -p %d -x %s -S %s"
      hisat_binary
      processors
      (Filename.quote index_prefix)
      (Filename.quote result)
  in
  let base_hisat_target ~hisat_command =
    workflow_node ~name
      (single_file
         ~host:(Machine.(as_host run_with))
         result)
      ~edges:[
        on_failure_activate (Remove.file ~run_with result);
        depends_on reference_fasta;
        depends_on hisat_index;
        depends_on fastq;
        depends_on Machine.Tool.(ensure hisat_tool);
      ]
      ~tags:[Target_tags.aligner]
      ~make:(Machine.run_big_program run_with ~processors ~name
               ~self_ids:["hisat""align"]
               Program.(
                 Machine.Tool.(init hisat_tool)
                 && in_work_dir
                 && sh hisat_command
               ))
  in
  match r2_path_opt with
  | Some read2 ->
    let hisat_command =
      String.concat ~sep:" " [
        hisat_base_command;
        "-1"; (Filename.quote r1_path);
        "-2"; (Filename.quote read2);
      ] in
    base_hisat_target ~hisat_command
  | None ->
    let hisat_command = String.concat ~sep:" " [
        hisat_base_command;
        "-U"; (Filename.quote r1_path);
      ] in
    base_hisat_target ~hisat_command
end
module Kallisto 
struct
(** Workflow-nodes to run kallisto. *)


open Biokepi_run_environment
open Common

(** Create a kallisto specific index of the transcriptome (cDNA) *)

let index
    ~reference_build
    ~(run_with : Machine.t) =
  let open KEDSL in
  let reference_transcriptome =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.cdna_exn in
  let kallisto_tool = Machine.get_tool run_with Machine.Tool.Default.kallisto in
  let name =
    sprintf "kallisto-index-%s" (Filename.basename reference_transcriptome#product#path) in
  let reference_dir = (Filename.dirname reference_transcriptome#product#path) in
  let result = sprintf "%s.kallisto.idx" reference_dir in
  workflow_node ~name
    (single_file ~host:(Machine.(as_host run_with)) result)
    ~edges:[
      on_failure_activate (Workflow_utilities.Remove.file ~run_with result);
      depends_on reference_transcriptome;
      depends_on Machine.Tool.(ensure kallisto_tool);
    ]
    ~make:(Machine.run_big_program run_with ~name
             ~self_ids:["kallisto""index"]
             Program.(
               Machine.Tool.(init kallisto_tool)
               && shf "kallisto index -i %s %s"
                 result
                 reference_transcriptome#product#path
             ))

(** Quantify transcript abundance from RNA fastqs, results in abundance.tsv file *)

let run
    ~reference_build
    ?(bootstrap_samples=100)
    ~(run_with:Machine.t)
    ~processors 
    ~fastq 
    ~result_prefix 
    =
  let open KEDSL in
  let name = sprintf "kallisto-%s-bootstrap_%d" (Filename.basename result_prefix) bootstrap_samples in
  let result_file suffix = result_prefix ^ suffix in
  let output_dir = result_file "-kallisto" in
  let abundance_file = output_dir // "abundance.tsv" in
  let kallisto_index = index ~reference_build ~run_with in
  let kallisto_tool = Machine.get_tool run_with Machine.Tool.Default.kallisto in
  let r1_path, r2_path_opt = fastq#product#paths in
  let kallisto_quant_base_cmd = 
      sprintf 
        "kallisto quant -i %s -o %s -b %d -t %d %s"
          kallisto_index#product#path
          output_dir
          bootstrap_samples
          processors
          r1_path
  in
  let kallisto_quant = 
    match r2_path_opt with
    | Some r2_path -> sprintf "%s %s" kallisto_quant_base_cmd r2_path
    | None -> kallisto_quant_base_cmd
  in
  let make =
    Machine.run_big_program run_with ~name ~processors
      ~self_ids:["kallisto""quant"]
      Program.(
        Machine.Tool.init kallisto_tool
        && sh kallisto_quant
      )
  in
  workflow_node ~name ~make
    (single_file abundance_file ~host:(Machine.as_host run_with))
    ~edges:[
      on_failure_activate
        (Workflow_utilities.Remove.directory ~run_with output_dir);
      depends_on kallisto_index;
      depends_on (Machine.Tool.ensure kallisto_tool);
    ]
end
module Mosaik 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove

let index
    ~reference_build
    ~(run_with : Machine.t) =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let mosaik_tool = Machine.get_tool run_with Machine.Tool.Default.mosaik in
  let name =
    sprintf "mosaik-build-%s" (Filename.basename reference_fasta#product#path) in
  let index_result = sprintf "%s.mosaik.dat" reference_fasta#product#path in
  let jump_file_result = sprintf "%s.mosaik-index" reference_fasta#product#path in
  let mosaik_tmp_dir = (Filename.dirname reference_fasta#product#path) // "mosaik-tmp" in
  workflow_node ~name
    (single_file ~host:(Machine.(as_host run_with)) index_result)
    ~edges:[
      on_failure_activate (Remove.file ~run_with jump_file_result);
      on_failure_activate (Remove.file ~run_with index_result);
      depends_on reference_fasta;
      depends_on Machine.Tool.(ensure mosaik_tool);
    ]
    ~tags:[Target_tags.aligner]
    ~make:(Machine.run_big_program run_with ~name
             ~self_ids:["mosaik""index"]
             Program.(
               Machine.Tool.(init mosaik_tool)
               && shf "mkdir -p %s" mosaik_tmp_dir
               && shf "export MOSAIK_TMP=%s" mosaik_tmp_dir 
               (* Command to build basic MOSAIK reference file *)
               && shf "MosaikBuild -fr %s -oa %s"
                 reference_fasta#product#path
                 index_result
               (* Command to build MOSAIK index file *)
               && shf "MosaikJump -ia %s -hs 15 -out %s"
                 index_result
                 jump_file_result
             ))
  



let align 
    ~reference_build
    ~fastq
    ?(sequencer="illumina")
    ~(result_prefix:string)
    ~(run_with : Machine.t)
    () =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let jump_file_result = sprintf "%s.mosaik-index" reference_fasta#product#path in
  let index_result = sprintf "%s.mosaik.dat" reference_fasta#product#path in
  let in_work_dir =
    Program.shf "cd %s" Filename.(quote (dirname result_prefix)) in
  let mosaik_tool = Machine.get_tool run_with Machine.Tool.Default.mosaik in
  let mosaik_index = index ~reference_build ~run_with in
  let r1_path, r2_path_opt = fastq#product#paths in
  let name = sprintf "mosaik-align-%s" (Filename.basename r1_path) in
  let mka_out = sprintf "%s.mka" result_prefix in
  let mkb_out = sprintf "%s.mkb" result_prefix in
  let result = sprintf "%s.bam" mka_out in
  let mosaik_align_command = 
    sprintf "MosaikAligner -in %s -out %s -ia %s -j %s -annpe $MOSAIK_PE_ANN -annse $MOSAIK_SE_ANN"
      mkb_out
      mka_out
      index_result
      jump_file_result
  in
  let mosaik_build_command =
    (* Build MOSAIK specific input file *)
    let mosaik_build_base_command = 
        sprintf "MosaikBuild -q %s -st %s -out %s"
            (Filename.quote r1_path)
            sequencer
            mkb_out
    in
    match r2_path_opt with
    | Some read2 -> 
      String.concat ~sep:" " [
        mosaik_build_base_command;
        "-q2"; (Filename.quote read2);
      ]
    | None -> mosaik_build_base_command  
  in   
  workflow_node ~name
      (bam_file ~reference_build ~host:(Machine.(as_host run_with)) result)
      ~edges:[
        on_failure_activate (Remove.file ~run_with result);
        depends_on reference_fasta;
        depends_on mosaik_index;
        depends_on fastq;
        depends_on Machine.Tool.(ensure mosaik_tool);
      ]
      ~tags:[Target_tags.aligner]
      ~make:(Machine.run_big_program run_with ~name
               ~self_ids:["mosaik""align"]
               Program.(
                 Machine.Tool.(init mosaik_tool)
                 && in_work_dir
                 && sh mosaik_build_command 
                 && sh mosaik_align_command
               ))
  
end
module Muse 
struct
(** Workflow-nodes to run MuSE. *)


open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove

module Configuration = struct

  type t = {
    name: string;
    input_type: [ `WGS | `WES ];
  }

  let name t = t.name

  let input_type_to_string input_type =
    match input_type with
    | `WGS -> "WGS"
    | `WES -> "WES"

  let to_json {name; input_type} =
    `Assoc [
      "name"`String name;
      "input-type"`String (input_type_to_string input_type);
    ]

  let default input_type =
    let is = input_type_to_string input_type in
    { name = "default-" ^ is; input_type}
    
  let wgs = default `WGS
  let wes = default `WES

end

let run
    ~configuration
    ~(run_with:Machine.t) ~normal ~tumor ~result_prefix ?(more_edges = []) how =
  let open KEDSL in
  let reference =
    Machine.get_reference_genome run_with normal#product#reference_build in
  let muse_tool = Machine.get_tool run_with Machine.Tool.Default.muse in
  let muse_call_on_region region =
    let result_file suffix =
      let region_name = Region.to_filename region in
      sprintf "%s-%s%s" result_prefix region_name suffix in
    let intervals_option = Region.to_samtools_specification region in
    let output_file_prefix = result_file "-out" in (* MuSE adds `.MuSE.txt` *)
    let output_file = output_file_prefix ^ ".MuSE.txt" in
    let run_path = Filename.dirname output_file in
    let fasta = Reference_genome.fasta reference in
    let fasta_dot_fai = Samtools.faidx ~run_with fasta in
    (* let sequence_dict = Picard.create_dict ~run_with fasta in *)
    let sorted_normal =
      Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate normal in
    let sorted_tumor =
      Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate tumor in
    let run_muse_call =
      let name = sprintf "muse-call-%s" (Filename.basename output_file) in
      let make =
        Machine.run_program run_with ~name Program.(
            Machine.Tool.(init muse_tool)
            && shf "mkdir -p %s" run_path
            && shf "$muse_bin call -f %s %s -O %s %s %s"
              fasta#product#path
              (match intervals_option with
              | Some o -> sprintf "-r %s" o
              | None -> "")
              output_file_prefix
              sorted_tumor#product#path
              sorted_normal#product#path
              (* yes, the help message says tumor then normal *)
          )
      in
      workflow_node ~name ~make
        (single_file output_file ~host:Machine.(as_host run_with))
        ~tags:[Target_tags.variant_caller; "muse"]
        ~edges:[
          depends_on Machine.Tool.(ensure muse_tool);
          depends_on sorted_normal;
          depends_on sorted_tumor;
          depends_on fasta;
          depends_on fasta_dot_fai;
          depends_on (Samtools.index_to_bai ~run_with sorted_normal);
          depends_on (Samtools.index_to_bai ~run_with sorted_tumor);
          on_failure_activate (Remove.file ~run_with output_file);
        ]
    in
    run_muse_call
  in
  let muse_sump call_file =
    let vcf_output = result_prefix ^ ".vcf" in
    let dbsnp = Reference_genome.dbsnp_exn reference in
    let bgzipped_dbsnp =
      Samtools.bgzip ~run_with dbsnp (dbsnp#product#path ^ ".bgz"in
    let name = sprintf "muse-sump-%s" (Filename.basename vcf_output) in
    let make =
      Machine.run_program run_with ~name Program.(
          Machine.Tool.(init muse_tool)
          && shf "$muse_bin sump -I %s %s -O %s -D %s"
            call_file#product#path
            Configuration.(match configuration.input_type with
              | `WGS -> "-G" | `WES -> "-E")
            vcf_output
            bgzipped_dbsnp#product#path
        )
    in
    workflow_node ~name ~make
      (single_file vcf_output ~host:Machine.(as_host run_with))
      ~tags:[Target_tags.variant_caller; "muse"]
      ~edges:(more_edges @ [ (* muse_sump is the last step in every case *)
        depends_on Machine.Tool.(ensure muse_tool);
        depends_on call_file;
        depends_on bgzipped_dbsnp;
        depends_on
          (Samtools.tabix ~run_with ~tabular_format:`Vcf bgzipped_dbsnp);
        on_failure_activate (Remove.file ~run_with vcf_output);
      ])
  in
  begin match how with
  | `Region region ->
    muse_call_on_region region |> muse_sump
  | `Map_reduce ->
    let call_files =
      List.map (Reference_genome.major_contigs reference) ~f:muse_call_on_region
    in
    let concatenated =
      Workflow_utilities.Cat.concat ~run_with call_files
        ~result_path:(result_prefix ^ "-concat.txt")
    in
    muse_sump concatenated
  end
end
module Mutect 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove

module Configuration = struct

  type t = {
    name: string;
    with_cosmic: bool;
    with_dbsnp: bool;
    parameters: (string * string) list;
  }

  let to_json {name; with_cosmic; with_dbsnp; parameters}: Yojson.Basic.json =
    `Assoc [
      "name"`String name;
      "with-cosmic"`Bool with_cosmic;
      "with-dbsnp"`Bool with_dbsnp;
      "parameters",
      `Assoc (List.map parameters ~f:(fun (a, b) -> a, `String b));
    ]
  let render {parameters; _} =
    List.concat_map parameters ~f:(fun (a,b) -> [a; b])

  let default =
    {name = "default";
     with_cosmic = true; with_dbsnp = true;
     parameters = []}
  let default_without_cosmic =
    {name = "default_without_cosmic";
     with_cosmic = false; with_dbsnp = true;
     parameters = []}

  let name t = t.name

end

let run
    ~(run_with:Machine.t) ~normal ~tumor ~result_prefix
    ?(more_edges = []) ~configuration how =
  let open KEDSL in
  let reference =
    Machine.get_reference_genome run_with normal#product#reference_build in
  let run_on_region ~add_edges region =
    let result_file suffix =
      let region_name = Region.to_filename region in
      sprintf "%s-%s%s" result_prefix region_name suffix in
    let intervals_option = Region.to_gatk_option region in
    let output_file = result_file "-somatic.vcf" in
    let dot_out_file = result_file "-output.out"in
    let coverage_file = result_file "coverage.wig" in
    let mutect = Machine.get_tool run_with Machine.Tool.Default.mutect in
    let run_path = Filename.dirname output_file in
    let fasta = Reference_genome.fasta reference in
    let cosmic =
      match configuration.Configuration.with_cosmic with
      | true -> Some (Reference_genome.cosmic_exn reference)
      | false -> None in
    let dbsnp =
      match configuration.Configuration.with_dbsnp with
      | true -> Some (Reference_genome.dbsnp_exn reference)
      | false -> None in
    let fasta_dot_fai = Samtools.faidx ~run_with fasta in
    let sequence_dict = Picard.create_dict ~run_with fasta in
    let sorted_normal =
      Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate normal in
    let sorted_tumor =
      Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate tumor in
    let run_mutect =
      let name = sprintf "%s" (Filename.basename output_file) in
      let cosmic_option =
        Option.value_map ~default:"" cosmic ~f:(fun node ->
            sprintf "--cosmic %s" (Filename.quote node#product#path)) in
      let dbsnp_option =
        Option.value_map ~default:"" dbsnp ~f:(fun node ->
            sprintf "--dbsnp %s" (Filename.quote node#product#path)) in
      let make =
        Machine.run_big_program run_with ~name
          ~self_ids:["mutect"]
          Program.(
            Machine.Tool.(init mutect)
            && shf "mkdir -p %s" run_path
            && shf "cd %s" run_path
            && sh ("java -Xmx2g -jar $mutect_HOME/muTect-*.jar --analysis_type MuTect " ^
                   (String.concat ~sep:" " ([
                        "--reference_sequence"; fasta#product#path;
                        cosmic_option; dbsnp_option; intervals_option;
                        "--input_file:normal"; sorted_normal#product#path;
                        "--input_file:tumor"; sorted_tumor#product#path;
                        "--out"; dot_out_file;
                        "--vcf"; output_file;
                        "--coverage_file"; coverage_file;
                      ] @ Configuration.render configuration))))
      in
      let edges =
        Option.value_map cosmic ~default:[] ~f:(fun n -> [depends_on n])
        @ Option.value_map dbsnp ~default:[] ~f:(fun n -> [depends_on n])
        @ add_edges
        @ [
          depends_on Machine.Tool.(ensure mutect);
          depends_on sorted_normal;
          depends_on sorted_tumor;
          depends_on fasta;
          depends_on fasta_dot_fai;
          depends_on sequence_dict;
          depends_on (Samtools.index_to_bai ~run_with sorted_normal);
          depends_on (Samtools.index_to_bai ~run_with sorted_tumor);
          on_failure_activate (Remove.file ~run_with output_file);
        ] in
      workflow_node ~name ~make
        (single_file output_file ~host:Machine.(as_host run_with))
        ~tags:[Target_tags.variant_caller] ~edges
    in
    run_mutect
  in
  match how with
  | `Region region -> run_on_region ~add_edges:more_edges region
  | `Map_reduce ->
    let targets =
      List.map (Reference_genome.major_contigs reference)
        (* We pass the more_edges only to the last step *)
        ~f:(run_on_region ~add_edges:[]) in
    let final_vcf = result_prefix ^ "-merged.vcf" in
    Vcftools.vcf_concat ~run_with targets ~final_vcf ~more_edges
end
module Optitype 
struct

open Biokepi_run_environment
open Common

(** Run OptiType in `RNA or `DNA mode.

Please provide a fresh work_dir directory, it will be deleted in case of failure. *)


let hla_type ~work_dir ~run_with ~fastq ~run_name nt =
  let tool = Machine.get_tool run_with Machine.Tool.Default.optitype in
  let r1_path, r2_path_opt = fastq#product#paths in
  let name = sprintf "optitype-%s" run_name in
  let make =
    Machine.run_big_program run_with ~name
      ~self_ids:["optitype"]
      KEDSL.Program.(
        Machine.Tool.init tool
        && exec ["mkdir""-p"; work_dir]
        && exec ["cd"; work_dir]
        && sh "cp -r ${OPTITYPE_DATA}/data ." (* HLA reference data *)
        && (* config example *)
        sh "cp -r ${OPTITYPE_DATA}/config.ini.example config.ini"
        && (* adjust config razers3 path *)
        sh "sed -i.bak \"s|\\/path\\/to\\/razers3|$(which razers3)|g\" config.ini"
        &&
        shf "OptiTypePipeline --verbose --input %s %s %s -o %s "
          (Filename.quote r1_path)
          (Option.value_map ~default:"" r2_path_opt ~f:Filename.quote)
          (match nt with | `DNA -> "--dna" | `RNA -> "--rna")
          run_name)
  in
  let product =
    let host = Machine.as_host run_with in
    let vol =
      let open Ketrew_pure.Target.Volume in
      create (dir run_name []) ~host
        ~root:(Ketrew_pure.Path.absolute_directory_exn work_dir)
    in
    object
      method is_done = Some (`Volume_exists vol)
      method path = work_dir
    end
  in
  KEDSL.workflow_node product ~name ~make
    ~edges:(
      [
        KEDSL.depends_on (Machine.Tool.ensure tool);
        KEDSL.depends_on fastq;
        KEDSL.on_failure_activate
          (Workflow_utilities.Remove.directory ~run_with work_dir);
      ]
    )
end
module Picard 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove



let create_dict ~(run_with:Machine.t) fasta =
  let open KEDSL in
  let picard_create_dict =
    Machine.get_tool run_with Machine.Tool.Default.picard in
  let src = fasta#product#path in
  let dest = sprintf "%s.%s" (Filename.chop_suffix src ".fasta""dict" in
  let program =
    Program.(Machine.Tool.(init picard_create_dict) &&
             shf "java -jar $PICARD_JAR CreateSequenceDictionary R= %s O= %s"
               (Filename.quote src) (Filename.quote dest)) in
  let name = sprintf "picard-create-dict-%s" Filename.(basename src) in
  let make =
    Machine.run_stream_processor run_with program ~name
      ~self_ids:["picard""create-dict"in
  let host = Machine.(as_host run_with) in
  workflow_node (single_file dest ~host) ~name ~make
    ~edges:[
      depends_on fasta; depends_on Machine.Tool.(ensure picard_create_dict);
      on_failure_activate (Remove.file ~run_with dest);
    ]

(* Sort the given VCF.

- [?sequence_dict] A sequence dictionary by which the VCF will be sorted.
*)

let sort_vcf ~(run_with:Machine.t) ?(sequence_dict) input_vcf =
  let open KEDSL in
  let picard = Machine.get_tool run_with Machine.Tool.Default.picard in
  let sequence_name =
    match sequence_dict with
    | None -> "default"
    | Some d -> Filename.basename d#product#path
  in
  let src = input_vcf#product#path in
  let input_vcf_base = Filename.basename src in
  let dest =
    sprintf "%s.sorted-by-%s.vcf"
      (Filename.chop_suffix src ".vcf") sequence_name in
  let name = sprintf "picard-sort-vcf-%s-by-%s" input_vcf_base sequence_name in
  let sequence_dict_opt =
    match sequence_dict with
    | None -> ""
    | Some d -> sprintf "SEQUENCE_DICTIONARY= %s" (Filename.quote d#product#path)
  in
  let sequence_dict_edge =
    match sequence_dict with
    | None -> []
    | Some d -> [depends_on d]
  in
  let program =
    Program.(Machine.Tool.(init picard) &&
             (shf "java -jar $PICARD_JAR SortVcf %s I= %s O= %s"
                sequence_dict_opt
                (Filename.quote src) (Filename.quote dest)))
  in
  let host = Machine.(as_host run_with) in
  let make =
    Machine.run_stream_processor
      run_with program ~name ~self_ids:["picard""sort-vcf"in
  workflow_node (single_file dest ~host) ~name ~make
    ~edges:([
      depends_on input_vcf; depends_on Machine.Tool.(ensure picard);
      on_failure_activate (Remove.file ~run_with dest)
    ] @ sequence_dict_edge)

module Mark_duplicates_settings = struct
  type t = {
    name: string;
    tmpdir: string;
    max_sequences_for_disk_read_ends_map: int;
    max_file_handles_for_read_ends_map: int;
    sorting_collection_size_ratio: float;
    mem_param: string option;
  }
  let factory = {
    name = "factory";
    tmpdir = "/tmp";
    max_sequences_for_disk_read_ends_map = 50000;
    max_file_handles_for_read_ends_map = 8000;
    sorting_collection_size_ratio = 0.25;
    mem_param = None;
  }

  let default = {
    factory with
    name = "default";
    max_file_handles_for_read_ends_map = 20_000;
  }

  let to_java_shell_call t =
    sprintf "TMPDIR=%s MAX_SEQUENCES_FOR_DISK_READ_ENDS_MAP=%d MAX_FILE_HANDLES_FOR_READ_ENDS_MAP=%d SORTING_COLLECTION_SIZE_RATIO=%f java %s -Djava.io.tmpdir=%s "
      t.tmpdir
      t.max_sequences_for_disk_read_ends_map
      t.max_file_handles_for_read_ends_map
      t.sorting_collection_size_ratio
      (match t.mem_param with
      | None  -> ""
      | Some some -> sprintf "-Xmx%s" some)
      t.tmpdir

end

let mark_duplicates
    ?(settings = Mark_duplicates_settings.default)
    ~(run_with: Machine.t) ~input_bam output_bam_path =
  let open KEDSL in
  let picard_jar = Machine.get_tool run_with Machine.Tool.Default.picard in
  let metrics_path =
    sprintf "%s.%s" (Filename.chop_suffix output_bam_path ".bam"".metrics" in
  let sorted_bam =
    Samtools.sort_bam_if_necessary ~run_with input_bam ~by:`Coordinate in
  let program =
    let java_call = Mark_duplicates_settings.to_java_shell_call settings in
    Program.(Machine.Tool.(init picard_jar) &&
             shf "%s -jar $PICARD_JAR MarkDuplicates VALIDATION_STRINGENCY=LENIENT INPUT=%s OUTPUT=%s METRICS_FILE=%s"
               java_call
               (Filename.quote sorted_bam#product#path)
               (Filename.quote output_bam_path)
               metrics_path) in
  let name =
    sprintf "picard-markdups-%s" Filename.(basename input_bam#product#path) in
  let make =
    Machine.run_big_program ~name run_with program
      ~self_ids:["picard""mark-duplicates"in
  let product = transform_bam  input_bam#product output_bam_path in
  workflow_node product
    ~name ~make
    ~edges:[
      depends_on sorted_bam;
      depends_on Machine.Tool.(ensure picard_jar);
      on_failure_activate (Remove.file ~run_with output_bam_path);
      on_failure_activate (Remove.file ~run_with metrics_path);
    ]

let bam_to_fastq
    ?sample_name ~run_with ~sample_type ~output_prefix input_bam =
  let open KEDSL in
  let sorted_bam =
    Samtools.sort_bam_if_necessary
      ~run_with ~by:`Read_name input_bam in
  let fastq_output_options, r1, r2opt =
    match sample_type with
    | `Paired_end ->
      let r1 = sprintf "%s_R1.fastq" output_prefix in
      let r2 = sprintf "%s_R2.fastq" output_prefix in
      ([sprintf "FASTQ=%s" Filename.(quote r1);
        sprintf "SECOND_END_FASTQ=%s" Filename.(quote r2)],
       r1, Some r2)
    | `Single_end ->
      let r1 = sprintf "%s.fastq" output_prefix in
      ([sprintf "FASTQ=%s" r1], r1, None)
  in
  let picard_jar = Machine.get_tool run_with Machine.Tool.Default.picard in
  let program =
    Program.(
      Machine.Tool.(init picard_jar) &&
      shf "java -jar $PICARD_JAR SamToFastq INPUT=%s %s"
        (Filename.quote sorted_bam#product#path)
        (String.concat ~sep:" " fastq_output_options)
    ) in
  let name =
    sprintf "picard-bam2fastq-%s" Filename.(basename input_bam#product#path) in
  let make =
    Machine.run_big_program ~name run_with program
      ~self_ids:["picard""bam-to-fastq"in
  let edges =
    (fun list ->
       match r2opt with
       | Some r2 -> on_failure_activate (Remove.file ~run_with r2) :: list
       | None -> list)
      [
        depends_on sorted_bam;
        depends_on Machine.Tool.(ensure picard_jar);
        on_failure_activate (Remove.file ~run_with r1);
      ]
  in
  workflow_node
    (fastq_reads ?name:sample_name ~host:(Machine.as_host run_with) r1 r2opt)
    ~name ~make ~edges
end
module Samtools 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove

let sam_to_bam ~(run_with : Machine.t) ~reference_build file_t =
  let open KEDSL in
  let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
  let src = file_t#product#path in
  let dest = sprintf "%s.%s" (Filename.chop_suffix src ".sam""bam" in
  let program =
    Program.(Machine.Tool.(init samtools)
             && exec ["samtools""view""-b""-o"; dest; src])
  in
  let name = sprintf "sam-to-bam-%s" (Filename.chop_suffix src ".sam"in
  let make = Machine.run_program ~name run_with program in
  let host = Machine.(as_host run_with) in
  workflow_node ~name
    (bam_file ~reference_build dest ~host)
    ~make
    ~edges:[
      depends_on file_t;
      depends_on Machine.Tool.(ensure samtools);
      on_failure_activate (Remove.file ~run_with dest);
      on_success_activate (Remove.file ~run_with src);
    ]

let faidx ~(run_with:Machine.t) fasta =
  let open KEDSL in
  let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
  let src = fasta#product#path in
  let dest = sprintf "%s.%s" src "fai" in
  let program =
    Program.(Machine.Tool.(init samtools) && exec ["samtools""faidx"; src]) in
  let name = sprintf "samtools-faidx-%s" Filename.(basename src) in
  let make = Machine.run_program ~name run_with program in
  let host = Machine.(as_host run_with) in
  workflow_node
    (single_file dest ~host) ~name ~make
    ~edges:[
      depends_on fasta;
      depends_on Machine.Tool.(ensure samtools);
      on_failure_activate (Remove.file ~run_with dest);
    ]

let flagstat ~(run_with:Machine.t) bam =
  let open KEDSL in
  let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
  let src = bam#product#path in
  let dest = sprintf "%s.%s" src "flagstat" in
  let program =
    Program.(
      Machine.Tool.(init samtools) &&
      shf "samtools flagstat %s > %s" (Filename.quote src) (Filename.quote dest)
    ) in
  let name = sprintf "samtools-flagstat-%s" Filename.(basename src) in
  let make = Machine.run_program ~name run_with program in
  let host = Machine.(as_host run_with) in
  workflow_node
    (single_file dest ~host) ~name ~make
    ~edges:[
      depends_on bam;
      depends_on Machine.Tool.(ensure samtools);
      on_failure_activate (Remove.file ~run_with dest);
    ]

let bgzip ~run_with input_file output_path =
  let open KEDSL in
  let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
  let program =
    Program.(Machine.Tool.(init samtools)
             && shf "bgzip %s -c > %s" input_file#product#path output_path) in
  let name =
    sprintf "samtools-bgzip-%s" Filename.(basename input_file#product#path) in
  let make = Machine.run_program ~name run_with program in
  let host = Machine.(as_host run_with) in
  workflow_node
    (single_file output_path ~host) ~name ~make
    ~edges:[
      depends_on input_file;
      depends_on Machine.Tool.(ensure samtools);
      on_failure_activate (Remove.file ~run_with output_path);
    ]

let tabix ~run_with ~tabular_format input_file =
  let open KEDSL in
  let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
  let output_path = input_file#product#path ^ ".tbi" in
  let minus_p_argument =
    match tabular_format with
    | `Gff -> "gff"
    | `Bed -> "bed"
    | `Sam -> "sam"
    | `Vcf -> "vcf"
    | `Psltab -> "psltab" in
  let program =
    Program.(
      Machine.Tool.(init samtools)
      && shf "tabix -p %s %s"
        minus_p_argument
        input_file#product#path
    ) in
  let name =
    sprintf "samtools-tabix-%s" Filename.(basename input_file#product#path) in
  let make = Machine.run_program ~name run_with program in
  let host = Machine.(as_host run_with) in
  workflow_node
    (single_file output_path ~host) ~name ~make
    ~edges:[
      depends_on input_file;
      depends_on Machine.Tool.(ensure samtools);
      on_failure_activate (Remove.file ~run_with output_path);
    ]

let do_on_bam
    ~(run_with:Machine.t)
    ?(more_depends_on=[]) ~name
    ?(more_requirements: Machine.Make_fun.Requirement.t list = [])
    input_bam ~product ~make_command =
  let open KEDSL in
  let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
  let src = input_bam#product#path in
  let sub_command = make_command src product#path in
  let program =
    Program.(Machine.Tool.(init samtools) && exec ("samtools" :: sub_command))
  in
  let make =
    Machine.run_program ~requirements:more_requirements ~name run_with program
  in
  workflow_node product ~name ~make
    ~edges:(
      depends_on Machine.Tool.(ensure samtools)
      :: depends_on input_bam
      :: on_failure_activate (Remove.file ~run_with product#path)
      :: more_depends_on)

let sort_bam_no_check ~(run_with:Machine.t) ~by input_bam =
  let source = input_bam#product#path in
  let dest_suffix =
    match by with
    | `Coordinate -> "sorted"
    | `Read_name -> "read-name-sorted"
  in
  let dest_prefix =
    sprintf "%s-%s" (Filename.chop_suffix source ".bam") dest_suffix in
  let dest = sprintf "%s.%s" dest_prefix "bam" in
  let product =
    KEDSL.transform_bam ~change_sorting:by input_bam#product ~path:dest in
  let processors = Machine.max_processors run_with in
  let make_command src des =
    let command = ["-@"Int.to_string processors; src;
                   "-T"; dest_prefix; "-o"; dest] in
    match by with
    | `Coordinate -> "sort" :: command
    | `Read_name -> "sort" :: "-n" :: command
  in
  do_on_bam ~run_with input_bam ~product ~make_command
    ~more_requirements:[`Memory `Big`Processors processors]
    ~name:(sprintf "Samtools-sort %s"
             Filename.(basename input_bam#product#path))
(** Uses "samtools sort" if the input_bam is not tagged as “sorted as the ~by argument.” If it is indeed already sorted the function returns the input_bam node as is. *)

let sort_bam_if_necessary ~(run_with:Machine.t) ~by
    input_bam : KEDSL.bam_file KEDSL.workflow_node =
  match input_bam#product#sorting with
  | Some some when some = by -> (* Already sorted “the same way” *)
    input_bam
  | other ->
    sort_bam_no_check ~run_with input_bam ~by

(* Produce an index for the given BAM file.

- [?check_sorted] First check that the BAM is sorted, and fail if not.
  (default: true)
*)

let index_to_bai ~(run_with:Machine.t) ?(check_sorted=true) input_bam =
  begin match input_bam#product#sorting with
  | (Some `Read_name | Nonewhen check_sorted ->
    failwithf "In function Samtools.index_to_bai the input bam %s is not declared as sorted-by-coordinate (samtools-index requires that)"
      input_bam#product#path
  | _ -> ()
  end;
  let product =
    KEDSL.single_file  ~host:(Machine.as_host run_with)
      (sprintf "%s.%s" input_bam#product#path "bai"in
  let make_command src des = ["index""-b"; src] in
  do_on_bam ~run_with input_bam ~product ~make_command
    ~name:(sprintf "Samtools-index %s"
             Filename.(basename input_bam#product#path))

let mpileup ~run_with ?adjust_mapq ~region input_bam =
  let open KEDSL in
  let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
  let src = input_bam#product#path in
  let adjust_mapq_option =
    match adjust_mapq with | None -> "" | Some n -> sprintf "-C%d" n in
  let samtools_region_option = Region.to_samtools_option region in
  let reference_genome =
    Machine.get_reference_genome run_with input_bam#product#reference_build in
  let fasta = Reference_genome.fasta reference_genome in
  let pileup =
    Filename.chop_suffix src ".bam" ^
    sprintf "-%s%s.mpileup" (Region.to_filename region) adjust_mapq_option
  in
  let sorted_bam =
    sort_bam_if_necessary ~run_with input_bam ~by:`Coordinate in
  let program =
    Program.(
      Machine.Tool.(init samtools)
      && shf
        "samtools mpileup %s %s -Bf %s %s > %s"
        adjust_mapq_option samtools_region_option 
        fasta#product#path
        sorted_bam#product#path
        pileup
    ) in
  let name =
    sprintf "samtools-mpileup-%s" Filename.(basename pileup |> chop_extension)
  in
  let make = Machine.run_program ~name run_with program in
  let host = Machine.(as_host run_with) in
  let edges = [
    depends_on Machine.Tool.(ensure samtools);
    depends_on sorted_bam;
    depends_on fasta;
    index_to_bai ~run_with sorted_bam |> depends_on;
    on_failure_activate (Remove.file ~run_with pileup);
  ] in
  workflow_node ~name (single_file pileup ~host) ~make ~edges

(** Merge a list of BAM files.

The BAMs will be sorted, by coordinate, if they are not already.

"samtools merge" fails if the list of bams has only one (or zero) bams, so this functions raises an exception if input_bam_list has lenth < 2.

By default, duplicate @PG and @RG IDs will be automatically uniquified with a random suffix.

  • ?delete_input_on_success: Delete the list of input BAMs when this node succeeds. (default: true)
  • ?attach_rg_tag: Attach an RG tag to each alignment. The tag value is inferred from file names.(default: false)
  • ?uncompressed_bam_output: Uncompressed BAM output. (default: false)
  • ?compress_level_one: Use zlib compression level 1 to compress the output. (default: false)
  • ?combine_rg_headers: When several input files contain @RG headers with the same ID, emit only one the first of them. (default: false)
  • ?combine_pg_headers: When several input files contain @PG headers with the same ID, emit only one the first of them. (default: false)
*)

let merge_bams
    ~(run_with : Machine.t)
    ?(delete_input_on_success = true)
    ?(attach_rg_tag = false)
    ?(uncompressed_bam_output = false)
    ?(compress_level_one = false)
    ?(combine_rg_headers = false)
    ?(combine_pg_headers = false)
    (input_bam_list : KEDSL.bam_file KEDSL.workflow_node list)
    (output_bam_path : string) =
  let open KEDSL in
  let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
  let () =
    let lgth = List.length input_bam_list in
    if lgth < 2 then
      failwithf "Samtools.merge_bams: %d input-bam%s provided and `samtools merge` will fail with less than 2 Bam files"
        lgth
        (match lgth with 1 -> " was" | _ -> "s were")
  in
  let sorted_bams =
    List.map input_bam_list ~f:(sort_bam_if_necessary ~run_with ~by:`Coordinatein
  let options =
    (if attach_rg_tag then ["-r"else [])
    @ (if uncompressed_bam_output then ["-u"else [])
    @ (if compress_level_one then ["-1"else [])
    @ (if combine_rg_headers then ["-c"else [])
    @ (if combine_pg_headers then ["-p"else [])
  in
  let program =
    let open  Program in
    Machine.Tool.(init samtools)
    && exec (
      ["samtools""merge"] @ options @ [output_bam_path]
      @ List.map sorted_bams ~f:(fun bam -> bam#product#path)
    )
  in
  let name =
    sprintf "merge-%d-bams-into-%s" (List.length input_bam_list)
      (Filename.basename output_bam_path) in
  let make = Machine.run_program ~name run_with program in
  let remove_input_bams =
    List.map input_bam_list ~f:(fun src ->
        on_success_activate (Remove.file ~run_with src#product#path))
  in
  let edges =
    depends_on Machine.Tool.(ensure samtools)
    :: on_failure_activate (Remove.file ~run_with output_bam_path)
    :: List.map sorted_bams ~f:depends_on
    @ if delete_input_on_success then remove_input_bams else []
  in
  let product =
    (* samtools merge creates sorted bams: *)
    let one =
      List.hd input_bam_list
      |> Option.value_exn
        ~msg:(sprintf "Samtools.merge_bams: input is empty list")
    in
    (transform_bam one#product ~path:output_bam_path) in
  workflow_node ~name product ~make ~edges

end
module Seq2HLA 
struct

open Biokepi_run_environment
open Common

let hla_type ~work_dir ~run_with ~r1 ~r2 ~run_name =
  let tool = Machine.get_tool run_with Machine.Tool.Default.seq2hla in
  (* Why quote this here? Seems like it easy to create a bug,
     why not enforce this at node construction ?*)

  let r1pt = Filename.quote r1#product#path in
  let r2pt = Filename.quote r2#product#path in
  let name = sprintf "seq2HLA-%s" run_name in
  let host = Machine.as_host run_with in
  let processors = Machine.max_processors run_with in
  let make =
    Machine.run_big_program run_with ~name ~processors
      KEDSL.Program.(Machine.Tool.init tool
                     && exec ["mkdir""-p"; work_dir]
                     && exec ["cd"; work_dir]
                     && shf "seq2HLA -1 %s -2 %s -r %s -p %d"
                       r1pt r2pt run_name processors)
  in
  let class1 = work_dir // (sprintf "%s-ClassI.HLAgenotype4digits" run_name) in
  let class2 = work_dir // (sprintf "%s-ClassII.HLAgenotype4digits" run_name) in
  let cond = KEDSL.list_of_files ~host [class1; class2] in
  KEDSL.workflow_node ~name cond ~make
    ~edges:[
      KEDSL.depends_on (Machine.Tool.ensure tool);
      KEDSL.depends_on r1;
      KEDSL.depends_on r2;
    ]

end
module Somaticsniper 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove


let default_prior_probability = 0.01
let default_theta = 0.85

module Configuration = struct

  type t = {
    name: string;
    prior_probability: float;
    theta: float
  }

  let to_json {name; prior_probability; theta}: Yojson.Basic.json =
    `Assoc [
      "name"`String name;
      "prior_probability"`Float prior_probability;
      "theta"`Float theta;
    ]
  let name {name; _} = name

  let render {name; prior_probability; theta}  =
    ["-s"Float.to_string prior_probability;
     "-T"Float.to_string theta]

  let default = {
    name = "default";
    prior_probability = default_prior_probability;
    theta = default_theta;
  }

end

let run
    ~run_with
     ?(configuration = Configuration.default) ~normal ~tumor ~result_prefix () =
  let open KEDSL in
  let result_file suffix = sprintf "%s-%s" result_prefix suffix in
  let name = sprintf "somaticsniper: %s" (result_file ""in
  let sniper = Machine.get_tool run_with Machine.Tool.Default.somaticsniper in
  let reference_fasta =
    Machine.get_reference_genome run_with normal#product#reference_build
    |> Reference_genome.fasta in
  let output_file = result_file "-snvs.vcf" in
  let run_path = Filename.dirname output_file in
  let sorted_normal =
    Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate normal in
  let sorted_tumor =
    Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate tumor in
  let make =
    Machine.run_big_program run_with
      ~self_ids:["somaticsniper"]
      ~name ~processors:1 Program.(
          Machine.Tool.init sniper
          && shf "mkdir -p %s" run_path
          && shf "cd %s" run_path
          && exec (
            ["somaticsniper""-F""vcf"]
            @ (Configuration.render configuration)
            @ ["-f";  reference_fasta#product#path;
               sorted_normal#product#path;
               sorted_tumor#product#path;
               output_file]
          ))
  in
  workflow_node ~name ~make
    (single_file output_file ~host:Machine.(as_host run_with))
    ~metadata:(`String name)
    ~tags:[Target_tags.variant_caller; "somaticsniper"]
    ~edges:[
      depends_on (Machine.Tool.ensure sniper);
      depends_on sorted_normal;
      depends_on sorted_tumor;
      depends_on (Samtools.index_to_bai ~run_with sorted_normal);
      depends_on (Samtools.index_to_bai ~run_with sorted_tumor);
      depends_on reference_fasta;
      on_failure_activate ( Remove.file ~run_with output_file );
    ]
end
module Star 
struct
open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove


module Configuration = struct

  module Align = struct
    type t = {
      name: string;

      
      (** The mapping quality MAPQ (column 5) is 255 for uniquely mapping reads, and int(-10*log10(1- 1/Nmap)) for multi-mapping reads. This scheme is same as the one used by TopHat and is compatible with Cufflinks. The default MAPQ=255 for the unique mappers maybe changed with to an integer between 0 and 255 to ensure compatibility with downstream tools such as GATK. *)

      sam_mapq_unique: int option;

      
      (** Specifies the length of the genomic sequence around the annotated junction to be used in constructing the splice junctions database. Ideally, this length should be equal to the ReadLength-1, where ReadLength is the length of the reads. *)

      overhang_length: int option;
      parameters: (string * string) list;
    }
    let name t = t.name

    let default = {
      name = "default";
      sam_mapq_unique = None;
      overhang_length = None;
      parameters = [];
    }

    let to_json t: Yojson.Basic.json =
      let {name;
           sam_mapq_unique;
           overhang_length;
           parameters} = t in
      `Assoc [
        "name"`String name;
        "sam_mapq_unique",
        (match sam_mapq_unique with
        | None -> `Null
        | Some x -> `Int x);
        "overhang_length",
        (match overhang_length with
        | None -> `Null
        | Some x -> `Int x);
        "parameters",
        `Assoc (List.map parameters ~f:(fun (a, b) -> a, `String b));
      ]

    let render {name;
                sam_mapq_unique;
                overhang_length;
                parameters} =
      (match overhang_length with
      | None -> ""
      | Some x ->
        sprintf "--sjdbOverhang %d" x
      ) ::
      (match sam_mapq_unique with
      | None -> ""
      | Some x ->
        if 0 > x || x > 255
        then failwith "STAR Align sam_mapq_unique must be between 0 and 255"
        else ();
        sprintf "--outSAMmapqUnique %d" x) ::
      List.concat_map parameters ~f:(fun (a, b) -> [a; b])
  end
end

let index
    ~reference_build
    ~(run_with : Machine.t) =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let star_tool = Machine.get_tool run_with Machine.Tool.Default.star in
  let name =
    sprintf "star-index-%s" (Filename.basename reference_fasta#product#path) in
  let reference_annotations =
    Machine.get_reference_genome run_with reference_build |> Reference_genome.gtf_exn in
  let reference_dir = (Filename.dirname reference_fasta#product#path) in
  let result_dir = sprintf "%s/star-index/" reference_dir in
  let suffix_array_result = result_dir // "SA" in
  let processors = Machine.max_processors run_with in
  workflow_node ~name
    (single_file ~host:(Machine.(as_host run_with)) suffix_array_result)
    ~edges:[
      on_failure_activate (Remove.directory ~run_with result_dir);
      depends_on reference_fasta;
      depends_on Machine.Tool.(ensure star_tool);
    ]
    ~tags:[Target_tags.aligner]
    ~make:(Machine.run_big_program run_with ~processors ~name
             ~self_ids:["star""index"]
             Program.(
               Machine.Tool.(init star_tool)
               && shf "mkdir %s" result_dir
               && shf "STAR --runMode genomeGenerate --genomeDir %s --genomeFastaFiles %s --sjdbGTFfile %s --runThreadN %d"
                 result_dir
                 (Filename.quote reference_fasta#product#path)
                 (Filename.quote reference_annotations#product#path)
                 processors
             ))

let align
    ~reference_build
    ~fastq
    ~(result_prefix:string)
    ~(run_with : Machine.t)
    ?(configuration=Configuration.Align.default)
    () =
  let open KEDSL in
  let reference_fasta =
    Machine.get_reference_genome run_with reference_build
    |> Reference_genome.fasta in
  let in_work_dir =
    Program.shf "cd %s" Filename.(quote (dirname result_prefix)) in
  let star_tool = Machine.get_tool run_with Machine.Tool.Default.star in
  let star_index = index ~reference_build ~run_with in
  let reference_dir = (Filename.dirname reference_fasta#product#path) in
  let star_index_dir = sprintf "%s/star-index/" reference_dir in
  (* STAR appends Aligned.sortedByCoord.out.bam to the filename *)
  let result = sprintf "%sAligned.sortedByCoord.out.bam" result_prefix in
  let r1_path, r2_path_opt = fastq#product#paths in
  let name = sprintf "star-rna-align-%s" (Filename.basename r1_path) in
  let processors = Machine.max_processors run_with in
  let star_base_command = sprintf
      "STAR --outSAMtype BAM SortedByCoordinate --outSAMstrandField intronMotif --outSAMattributes NH HI NM MD --outFilterIntronMotifs RemoveNoncanonical --genomeDir %s --runThreadN %d --outFileNamePrefix %s --outSAMattrRGline %s %s --readFilesIn %s"
      (Filename.quote star_index_dir)
      processors
      result_prefix
      (sprintf "ID:%s SM:\"%s\""
         (Filename.basename r1_path)
         fastq#product#sample_name)
      (Configuration.Align.render configuration
       |> String.concat ~sep:" ")
      (Filename.quote r1_path)
  in
  let base_star_target ~star_command =
    workflow_node ~name
      (bam_file
         ~sorting:`Coordinate
         ~host:(Machine.(as_host run_with))
         ~reference_build
         result)
      ~edges:[
        on_failure_activate (Remove.file ~run_with result);
        depends_on reference_fasta;
        depends_on star_index;
        depends_on fastq;
        depends_on Machine.Tool.(ensure star_tool);
      ]
      ~tags:[Target_tags.aligner]
      ~make:(Machine.run_big_program run_with ~processors ~name
               ~self_ids:["star""align"]
               Program.(
                 Machine.Tool.(init star_tool)
                 && in_work_dir
                 && sh star_command
               ))
  in
  match r2_path_opt with
  | Some read2 ->
    let star_command =
      String.concat ~sep:" " [
        star_base_command;
        (Filename.quote read2);
      ] in
    base_star_target ~star_command
  | None ->
    let star_command = star_base_command in
    base_star_target ~star_command
end
module Strelka 
struct
open Biokepi_run_environment
open Common


  (*
They happen to have a
[website](https://sites.google.com/site/strelkasomaticvariantcaller/).

The usage is:

- create a config file,
- generate a `Makefile` with `configureStrelkaWorkflow.pl`,
- run `make -j<n>`.
 *)



module Configuration = struct

  type t = {
    name: string;
    parameters: (string * string) list
  }
  let name t = t.name
  let to_json {name; parameters}: Yojson.Basic.json =
    `Assoc [
      "name"`String name;
      "parameters",
      `Assoc (List.map parameters ~f:(fun (a, b) -> a, `String b));
    ]

  let generate_config_file ~path config : KEDSL.Program.t =
    let open KEDSL in
    Program.(
      shf "echo '[user]' > %s" path
      && chain
        (List.map config.parameters (fun (k, v) ->
             shf "echo '%s = %s' >> %s" k v path))
    )

  let default =
    { name = "default";
      parameters = [
        "isSkipDepthFilters""0";
        "maxInputDepth""10000";
        "depthFilterMultiple""3.0";
        "snvMaxFilteredBasecallFrac""0.4";
        "snvMaxSpanningDeletionFrac""0.75";
        "indelMaxRefRepeat""8";
        "indelMaxWindowFilteredBasecallFrac""0.3";
        "indelMaxIntHpolLength""14";
        "ssnvPrior""0.000001";
        "sindelPrior""0.000001";
        "ssnvNoise""0.0000005";
        "sindelNoise""0.0000001";
        "ssnvNoiseStrandBiasFrac""0.5";
        "minTier1Mapq""40";
        "minTier2Mapq""5";
        "ssnvQuality_LowerBound""15";
        "sindelQuality_LowerBound""30";
        "isWriteRealignedBam""0";
        "binSize""25000000";
        "extraStrelkaArguments""--eland-compatibility";
      ]}
  let test1 =
    { name = "test1";
      parameters = [
        "isSkipDepthFilters""0";
        "maxInputDepth""10000";
        "depthFilterMultiple""3.0";
        "snvMaxFilteredBasecallFrac""0.4";
        "snvMaxSpanningDeletionFrac""0.75";
        "indelMaxRefRepeat""8";
        "indelMaxWindowFilteredBasecallFrac""0.3";
        "indelMaxIntHpolLength""14";
        "ssnvPrior""0.000001";
        "sindelPrior""0.000001";
        "ssnvNoise""0.0000005";
        "sindelNoise""0.0000001";
        "ssnvNoiseStrandBiasFrac""0.5";
        "minTier1Mapq""40";
        "minTier2Mapq""5";
        "ssnvQuality_LowerBound""15";
        "sindelQuality_LowerBound""30";
        "isWriteRealignedBam""0";
        "binSize""25000000";
        "extraStrelkaArguments""--eland-compatibility";
        "priorSomaticSnvRate""1e-06";
        "germlineSnvTheta""0.001";
      ]}
  let empty_exome =
    { name = "empty-exome";
      parameters = [
        "isSkipDepthFilters""1";
      ]}
  let exome_default =
    { name = "exome-default";
      parameters = [
        "isSkipDepthFilters""1";
        "maxInputDepth""10000";
        "depthFilterMultiple""3.0";
        "snvMaxFilteredBasecallFrac""0.4";
        "snvMaxSpanningDeletionFrac""0.75";
        "indelMaxRefRepeat""8";
        "indelMaxWindowFilteredBasecallFrac""0.3";
        "indelMaxIntHpolLength""14";
        "ssnvPrior""0.000001";
        "sindelPrior""0.000001";
        "ssnvNoise""0.0000005";
        "sindelNoise""0.0000001";
        "ssnvNoiseStrandBiasFrac""0.5";
        "minTier1Mapq""40";
        "minTier2Mapq""5";
        "ssnvQuality_LowerBound""15";
        "sindelQuality_LowerBound""30";
        "isWriteRealignedBam""0";
        "binSize""25000000";
        "extraStrelkaArguments""--eland-compatibility";
      ]}

end


let run
    ~run_with ~normal ~tumor ~result_prefix ?(more_edges = [])
    ?(configuration = Configuration.default) () =
  let open KEDSL in
  let open Configuration in
  let name = Filename.basename result_prefix in
  let result_file suffix = result_prefix ^ suffix in
  let output_dir = result_file "strelka_output" in
  let config_file_path = result_file  "configuration" in
  let output_file_path = output_dir // "results/passed_somatic_combined.vcf" in
  let reference_fasta =
    Machine.get_reference_genome run_with normal#product#reference_build
    |> Reference_genome.fasta in
  let strelka_tool = Machine.get_tool run_with Machine.Tool.Default.strelka in
  let gatk_tool = Machine.get_tool run_with Machine.Tool.Default.gatk in
  let sorted_normal =
    Samtools.sort_bam_if_necessary
      ~run_with ~by:`Coordinate normal in
  let sorted_tumor =
    Samtools.sort_bam_if_necessary
      ~run_with ~by:`Coordinate tumor in
  let working_dir = Filename.(dirname result_prefix) in
  let processors = Machine.max_processors run_with in
  let make =
    Machine.run_big_program run_with ~name ~processors
      ~self_ids:["strelka"]
      Program.(
        Machine.Tool.init strelka_tool && Machine.Tool.init gatk_tool
        && shf "mkdir -p %s"  working_dir
        && shf "cd %s" working_dir
        && generate_config_file ~path:config_file_path configuration
        && shf "rm -fr %s" output_dir (* strelka won't start if this
                                         directory exists *)

        && shf "$STRELKA_BIN/configureStrelkaWorkflow.pl            --normal=%s                 --tumor=%s                  --ref=%s                   --config=%s                 --output-dir=%s "
          sorted_normal#product#path
          sorted_tumor#product#path
          reference_fasta#product#path
          config_file_path
          output_dir
        && shf "cd %s" output_dir
        && shf "make -j%d" processors
        && Gatk.call_gatk ~analysis:"CombineVariants" [
          "--variant:snvs""results/passed.somatic.snvs.vcf";
          "--variant:indels""results/passed.somatic.indels.vcf";
          "-R"; reference_fasta#product#path;
          "-genotypeMergeOptions""PRIORITIZE";
          "-o"; output_file_path; "-priority""snvs,indels"
        ]
      )
  in
  workflow_node ~name ~make
    (single_file output_file_path ~host:(Machine.as_host run_with))
    ~edges:(more_edges @ [
      depends_on normal;
      depends_on tumor;
      depends_on reference_fasta;
      depends_on (Machine.Tool.ensure strelka_tool);
      depends_on (Machine.Tool.ensure gatk_tool);
      depends_on sorted_normal;
      depends_on sorted_tumor;
      depends_on (Picard.create_dict ~run_with reference_fasta);
      depends_on (Samtools.faidx ~run_with reference_fasta);
      depends_on (Samtools.index_to_bai ~run_with sorted_normal);
      depends_on (Samtools.index_to_bai ~run_with sorted_tumor);
    ])
end
module Stringtie 
struct

open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove

module Configuration = struct
  type t = {
    name : string;
    use_reference_gtf : bool;
  }
  let to_json {name; use_reference_gtf}: Yojson.Basic.json =
    `Assoc [
      "name"`String name;
      "use-reference-gtf"`Bool use_reference_gtf;
    ]
  let default = {name = "default"; use_reference_gtf = true}
end

let run
    ~configuration
    ~(run_with:Machine.t)
    ~bam
    ~result_prefix () =
  let open KEDSL in
  let name = sprintf "stringtie-%s" (Filename.basename result_prefix) in
  let result_file suffix = result_prefix ^ suffix in
  let output_dir = result_file "-stringtie_output" in
  let output_file_path = output_dir // "output.gtf" in
  let reference_genome =
    Machine.get_reference_genome run_with bam#product#reference_build in
  let reference_fasta = Reference_genome.fasta reference_genome in
  let sorted_bam =
    Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate bam in
  let reference_annotations =
    let gtf = Reference_genome.gtf reference_genome in
    let use_the_gtf = configuration.Configuration.use_reference_gtf in
    match use_the_gtf, gtf with
    | trueSome some -> Some some
    | false, _ -> None
    | trueNone ->
      failwithf "Stringtie: use_reference_gtf is `true` but the genome %s does not provide one (use another genome or allow this to run without GTF by giving another Configuration.t)"
        (Reference_genome.name reference_genome)
  in
  let stringtie_tool =
    Machine.get_tool run_with Machine.Tool.Default.stringtie in
  let processors = Machine.max_processors run_with in
  let make =
    let reference_annotations_option =
      Option.value_map ~default:"" reference_annotations
        ~f:(fun o -> sprintf "-G %s" Filename.(quote o#product#path)) in
    Machine.run_big_program run_with ~name ~processors
      ~self_ids:["stringtie"]
      Program.(
        Machine.Tool.init stringtie_tool
        && shf "mkdir -p %s" output_dir
        && shf "stringtie %s -p %d %s -o %s "
          sorted_bam#product#path
          processors
          reference_annotations_option
          output_file_path
      )
  in
  let edges =
    Option.value_map ~default:[] reference_annotations
      ~f:(fun o -> [depends_on o])
    @ [
      depends_on sorted_bam;
      depends_on reference_fasta;
      depends_on (Machine.Tool.ensure stringtie_tool);
      depends_on (Samtools.index_to_bai ~run_with sorted_bam);
    ]
  in
  workflow_node ~name ~make ~edges
    (single_file output_file_path ~host:(Machine.as_host run_with))

end
module Varscan 
struct

open Biokepi_run_environment
open Common

module Remove = Workflow_utilities.Remove

(**

Expects the tool "varscan" to provide the environment variable "$VARSCAN_JAR". *)



let empty_vcf = "##fileformat=VCFv4.1
##source=VarScan2
##INFO=<ID=DP,Number=1,Type=Integer,Description=\"Total depth of quality bases\">
##INFO=<ID=SOMATIC,Number=0,Type=Flag,Description=\"Indicates if record is a somatic mutation\">
##INFO=<ID=SS,Number=1,Type=String,Description=\"Somatic status of variant (0=Reference,1=Germline,2=Somatic,3=LOH, or 5=Unknown)\">
##INFO=<ID=SSC,Number=1,Type=String,Description=\"Somatic score in Phred scale (0-255) derived from somatic p-value\">
##INFO=<ID=GPV,Number=1,Type=Float,Description=\"Fisher's Exact Test P-value of tumor+normal versus no variant for Germline calls\">
##INFO=<ID=SPV,Number=1,Type=Float,Description=\"Fisher's Exact Test P-value of tumor versus normal for Somatic/LOH calls\">
##FILTER=<ID=str10,Description=\"Less than 10% or more than 90% of variant supporting reads on one strand\">
##FILTER=<ID=indelError,Description=\"Likely artifact due to indel reads at this position\">
##FORMAT=<ID=GT,Number=1,Type=String,Description=\"Genotype\">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description=\"Genotype Quality\">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description=\"Read Depth\">
##FORMAT=<ID=RD,Number=1,Type=Integer,Description=\"Depth of reference-supporting bases (reads1)\">
##FORMAT=<ID=AD,Number=1,Type=Integer,Description=\"Depth of variant-supporting bases (reads2)\">
##FORMAT=<ID=FREQ,Number=1,Type=String,Description=\"Variant allele frequency\">
##FORMAT=<ID=DP4,Number=1,Type=String,Description=\"Strand read counts: ref/fwd, ref/rev, var/fwd, var/rev\">
#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tNORMAL\tTUMOR
"


let somatic_on_region
    ~run_with ?adjust_mapq ~normal ~tumor ~result_prefix region =
  let open KEDSL in
  let name = Filename.basename result_prefix in
  let result_file suffix = result_prefix ^ suffix in
  let varscan_tool = Machine.get_tool run_with Machine.Tool.Default.varscan in
  let snp_output = result_file "-snp.vcf" in
  let indel_output = result_file "-indel.vcf" in
  let normal_pileup = Samtools.mpileup ~run_with ~region ?adjust_mapq normal in
  let tumor_pileup = Samtools.mpileup ~run_with ~region ?adjust_mapq tumor in
  let host = Machine.as_host run_with in
  let tags = [Target_tags.variant_caller; "varscan"in
  let varscan_somatic =
    let name = "somatic-" ^ name in
    let make =
      let big_one_liner =
        "if [ -s " ^ normal_pileup#product#path
        ^ " ] && [ -s " ^ tumor_pileup#product#path ^ " ] ; then "
        ^ sprintf "java -jar $VARSCAN_JAR somatic %s %s --output-snp %s --output-indel %s --output-vcf 1 ; "
          normal_pileup#product#path
          tumor_pileup#product#path
          snp_output
          indel_output
        ^ " else  "
        ^ "echo '" ^ empty_vcf ^ "' > " ^ snp_output ^ " ; "
        ^ "echo '" ^ empty_vcf ^ "' > " ^ indel_output ^ " ; "
        ^ " fi "
      in
      Program.(Machine.Tool.init varscan_tool && sh big_one_liner)
      |> Machine.run_big_program run_with ~name ~processors:1
        ~self_ids:["varscan""somatic"]
    in
    workflow_node ~name ~make
      (single_file snp_output ~host)
      ~tags
      ~edges:[
        depends_on (Machine.Tool.ensure varscan_tool);
        depends_on normal_pileup;
        depends_on tumor_pileup;
        on_failure_activate (Remove.file ~run_with snp_output);
        on_failure_activate (Remove.file ~run_with indel_output);
      ]
  in
  let snp_filtered = result_file "-snpfiltered.vcf" in
  let indel_filtered = result_file "-indelfiltered.vcf" in
  let varscan_filter =
    let name = "filter-" ^ name in
    let make =
      Program.(
        Machine.Tool.init varscan_tool
        && shf "java -jar $VARSCAN_JAR somaticFilter %s --indel-file %s --output-file %s"
          snp_output
          indel_output
          snp_filtered
        && shf "java -jar $VARSCAN_JAR processSomatic %s" snp_filtered
        && shf "java -jar $VARSCAN_JAR processSomatic %s" indel_output
      )
      |> Machine.run_big_program run_with ~name ~processors:1
        ~self_ids:["varscan""somaticfilter"]
    in
    workflow_node ~name
      (single_file snp_filtered ~host) ~make ~tags
      ~edges:[
        depends_on varscan_somatic;
        on_failure_activate (Remove.file ~run_with snp_filtered);
        on_failure_activate (Remove.file ~run_with indel_filtered);
      ]
  in
  varscan_filter

let somatic_map_reduce
    ?(more_edges = []) ~run_with ?adjust_mapq
    ~normal ~tumor ~result_prefix () =
  let run_on_region region =
    let result_prefix = result_prefix ^ "-" ^ Region.to_filename region in
    somatic_on_region ~run_with
      ?adjust_mapq ~normal ~tumor ~result_prefix region in
  let reference =
    Machine.get_reference_genome run_with normal#product#reference_build in
  let targets =
    List.map (Reference_genome.major_contigs reference)
      ~f:run_on_region in
  let final_vcf = result_prefix ^ "-merged.vcf" in
  Vcftools.vcf_concat ~run_with targets ~final_vcf ~more_edges
end
module Vcftools 
struct
open Biokepi_run_environment
open Common


(* 

   Most of the “implementation” part of this module is in the
   {!Workflow_utilities} module (because functions are used to prepare VCFs
   associated with reference genomres).

*)


(** Concatenate VCFs using "vcftools concat". *)

let vcf_concat ~(run_with:Machine.t) ?more_edges vcfs ~final_vcf =
  let vcftools = Machine.get_tool run_with Machine.Tool.Default.vcftools in
  let host = Machine.(as_host run_with) in
  let run_program = Machine.run_program run_with in
  Workflow_utilities.Vcftools.vcf_concat_no_machine
    ~host ~vcftools ~run_program ?more_edges vcfs ~final_vcf
end
module Virmid 
struct

open Biokepi_run_environment
open Common



(* See http://sourceforge.net/p/virmid/wiki/Home/ *)
module Configuration = struct

  type t = {
    name: string;
    parameters: (string * string) list
  }
  let to_json {name; parameters}: Yojson.Basic.json =
    `Assoc [
      "name"`String name;
      "parameters",
      `Assoc (List.map parameters ~f:(fun (a, b) -> a, `String b));
    ]
  let render {parameters; _} =
    List.concat_map parameters ~f:(fun (a,b) -> [a; b])

  let default =
    {name = "default"; parameters = []}

  let example_1 =
    (* The first one: http://sourceforge.net/p/virmid/wiki/Home/#examples *)
    {name= "examplel_1";
     parameters = [
       "-c1""20";
       "-C1""100";
       "-c2""20";
       "-C2""100";
     ]}

  let name t = t.name
end

let run
    ~run_with ~normal ~tumor ~result_prefix
    ?(more_edges = []) ~configuration () =
  let open KEDSL in
  let open Configuration in 
  let name = Filename.basename result_prefix in
  let result_file suffix = result_prefix ^ suffix in
  let output_file = result_file "-somatic.vcf" in
  let output_prefix = "virmid-output" in
  let work_dir = result_file "-workdir" in
  let reference_fasta =
    Machine.get_reference_genome run_with normal#product#reference_build
    |> Reference_genome.fasta in
  let virmid_tool = Machine.get_tool run_with Machine.Tool.Default.virmid in
  let virmid_somatic_broken_vcf =
    (* maybe it's actually not broken, but later tools can be
       annoyed by the a space in the header. *)

    work_dir // Filename.basename tumor#product#path ^ ".virmid.som.passed.vcf" in
  let processors = Machine.max_processors run_with in
  let make =
    Machine.run_big_program run_with ~name ~processors
      ~self_ids:["virmid"]
      Program.(
        Machine.Tool.init virmid_tool
        && shf "mkdir -p %s" work_dir
        && sh (String.concat ~sep:" " ([
            "java -jar $VIRMID_JAR -f";
            "-w"; work_dir;
            "-R"; reference_fasta#product#path;
            "-D"; tumor#product#path;
            "-N"; normal#product#path;
            "-t"Int.to_string processors;
            "-o"; output_prefix;
          ] @ Configuration.render configuration))
        && shf "sed 's/\\(##INFO.*Number=A,Type=String,\\) /\\1/' %s > %s"
          virmid_somatic_broken_vcf output_file
          (* We construct the `output_file` out of the “broken” one with `sed`. *)
      )
  in
  workflow_node ~name ~make
    (single_file output_file ~host:(Machine.as_host run_with))
    ~edges:(more_edges @ [
        depends_on normal;
        depends_on tumor;
        depends_on reference_fasta;
        depends_on (Machine.Tool.ensure virmid_tool);
      ])
end