struct
  type 'a pipeline = 'a t
  type workflow_option_failure_mode = [ `Silent | `Fail_if_not_happening ]
  type workflow_option = [
    | `Multi_sample_indel_realignment of workflow_option_failure_mode
    | `Parallel_alignment_over_fastq_fragments of
        [ `Bwa_mem | `Bwa | `Mosaik | `Star | `Hisat ] list
        * workflow_option_failure_mode
    | `Map_reduce of [ `Gatk_indel_realigner ]
  ]
  type t = {
    reference_build: Reference_genome.name;
    work_dir: string;
    machine : Machine.t;
    options: workflow_option list;
    wrap_bam_node:
      bam pipeline ->
      KEDSL.bam_file KEDSL.workflow_node ->
      KEDSL.bam_file KEDSL.workflow_node;
    wrap_vcf_node:
      vcf pipeline ->
      KEDSL.single_file KEDSL.workflow_node ->
      KEDSL.single_file KEDSL.workflow_node;
    wrap_gtf_node:
      gtf pipeline ->
      KEDSL.single_file KEDSL.workflow_node ->
      KEDSL.single_file KEDSL.workflow_node;
  }
  let create
      ?(wrap_bam_node = fun _ x -> x)
      ?(wrap_vcf_node = fun _ x -> x)
      ?(wrap_gtf_node = fun _ x -> x)
      ?(options=[])
      ~reference_build ~work_dir ~machine () =
    {reference_build; work_dir; machine; options;
     wrap_bam_node; wrap_vcf_node; wrap_gtf_node}

  let has_option {options; _} f =
    List.exists options ~f

  let apply_with_metadata ~metadata_spec wf =
    begin match metadata_spec with
    | `Add_tags tgs -> KEDSL.add_tags ~recursive:false wf tgs
    | `Add_tags_rec tgs -> KEDSL.add_tags ~recursive:true wf tgs
    end;
    wf

  (* This compiler variable name is confusing, perhaps 'state' is better? *)
  let rec fastq_step ~read ~compiler (pipeline: fastq pipeline) =
    let {work_dir; machine ; _ } = compiler in
    match pipeline with
    | Fastq f -> f
    | Concat_text (l: fastq pipeline list) ->
      failwith "Compilation of Biokepi.Pipeline.Concat_text: not implemented"
    | Gunzip_concat (l: fastq_gz pipeline list) ->
      let fastqs =
        let rec f = 
          function
          | Fastq_gz t -> t
          | With_metadata (metadata_spec, p) -> 
            apply_with_metadata ~metadata_spec (f p)
        in
        List.map l ~f in
      let result_path = work_dir // to_file_prefix ~read pipeline ^ ".fastq" in
      dbg "Result_Path: %S" result_path;
      Workflow_utilities.Gunzip.concat ~run_with:machine fastqs ~result_path
    | With_metadata (metadata_spec, pipeline) ->
      fastq_step ~read ~compiler pipeline |> apply_with_metadata ~metadata_spec

  let rec fastq_sample_step ~compiler (fs : fastq_sample pipeline) =
    let {work_dir; machine ; _ } = compiler in

      match fs with
      | Bam_to_fastq (info_opt, how, what) ->
        let bam = compile_aligner_step ~compiler what in
        let sample_type =
          match how with `Single -> `Single_end | `Paired -> `Paired_end in
        let fastq_pair =
          let output_prefix = work_dir // to_file_prefix ?read:None what in
          let sample_name = Option.map info_opt (fun x -> x.sample_name) in
          Picard.bam_to_fastq ~run_with:machine ~sample_type
            ?sample_name
            ~output_prefix bam
        in
        fastq_pair
      | Paired_end_sample (info, l1, l2) ->
        let r1 = fastq_step ~compiler ~read:(`R1 info.fragment_id) l1 in
        let r2 = fastq_step ~compiler ~read:(`R2 info.fragment_id) l2 in
        let open KEDSL in
        workflow_node (fastq_reads ~host:Machine.(as_host machine)
                         ~name:info.sample_name
                         r1#product#path (Some r2#product#path))
          ~name:(sprintf "Pairing sample %s (%s and %s)"
                   info.sample_name
                   (Filename.basename r1#product#path)
                   (Filename.basename r2#product#path))
          ~edges:[ depends_on r1; depends_on r2 ]
      | Single_end_sample (info, single) ->
        let r1 = fastq_step ~compiler ~read:(`R1 info.fragment_id) single in
        let open KEDSL in
        workflow_node (fastq_reads ~host:Machine.(as_host machine)
                         ~name:info.sample_name
                         r1#product#path None)
          ~equivalence:`None
          ~edges:[ depends_on r1 ]
          ~name:(sprintf "single-end %s (%s)"
                   info.sample_name
                   (Filename.basename r1#product#path))
      | With_metadata (metadata_spec, p) ->
        fastq_sample_step ~compiler p |> apply_with_metadata ~metadata_spec

  and compile_aligner_step ~compiler (pipeline : bam pipeline) =
    let {reference_build; work_dir; machine; _} = compiler in
    let result_prefix = work_dir // to_file_prefix pipeline in
    dbg "Result_Prefix: %S" result_prefix;
    let rec parallelize_alignment ~make_aligner sample =
      match sample with
      | Paired_end_sample (info,
                           Gunzip_concat r1_list,
                           Gunzip_concat r2_list) ->
        let exploded =
          let count = ref 0 in
          List.map2 r1_list r2_list
            ~f:(fun r1 r2 ->
                match r1, r2 with
                | (Fastq_gz wf1, Fastq_gz wf2) ->
                  let new_info =
                    incr count; 
                    {info with
                     fragment_id =
                       (* fragmenting = creating fragments of previous fragment *)
                       sprintf "%s-fragment-%d" info.fragment_id !count} in
                  make_aligner (
                    Paired_end_sample (new_info,
                                       Gunzip_concat [r1],
                                       Gunzip_concat [r2]))
                | other -> failwith "compile_aligner_step: not implemented"
              ) in
        let bams = List.map exploded ~f:(compile_aligner_step ~compiler) in
        Samtools.merge_bams ~run_with:machine bams
          (result_prefix ^ "-merged.bam")
      | With_metadata (metadata_spec, p) ->
        parallelize_alignment ~make_aligner p
        |> apply_with_metadata ~metadata_spec
      | other ->
        failwith "parallelize_alignment: Not fully implemented"
    in
    let catch_parallelize_aligner aligner sample =
      let option =
        List.find_map compiler.options
          (function
          | `Parallel_alignment_over_fastq_fragments (al, todo)
            when List.mem ~set:al aligner -> Some todo
          | _ -> None)
      in
      match sample with
      | Paired_end_sample (name,
                           Gunzip_concat r1_list,
                           Gunzip_concat r2_list)
        when List.length r1_list > 1 ->
        begin match option with
        | Some _ -> `Caught
        | None -> `Not_caught
        end
      | Paired_end_sample (name,
                           Gunzip_concat r1_list,
                           Gunzip_concat r2_list)
        when List.length r1_list = 1 -> `Not_caught
      | other ->
        begin match option with
        | Some `Silent
        | None -> `Not_caught
        | Some `Fail_if_not_happening ->
          failwithf "Option Parallel_alignment_over_fastq_fragments is set to Fail_if_not_happening and it didn't happen:\n%s"
            (to_json other |> Yojson.Basic.pretty_to_string)
        end
    in
    let perform_aligner_parallelization aligner_tag ~make_aligner
        ~make_workflow sample =
      begin match catch_parallelize_aligner aligner_tag sample  with
      | `Caught -> parallelize_alignment ~make_aligner sample
      | `Not_caught ->
        let fastq = fastq_sample_step ~compiler sample in
        make_workflow fastq
      end
    in
    let bam_node =
      match pipeline with
      | Bam_sample (name, bam_target) -> bam_target
      | Gatk_indel_realigner (configuration, bam)
        when has_option compiler ((=) (`Map_reduce `Gatk_indel_realigner)) ->
        let input_bam = compile_aligner_step ~compiler bam in
        Gatk.indel_realigner_map_reduce ~run_with:machine ~compress:false
          ~configuration (KEDSL.Single_bam input_bam)
      | Gatk_indel_realigner (configuration, bam) ->
        let input_bam = compile_aligner_step ~compiler bam in
        Gatk.indel_realigner ~run_with:machine ~compress:false
          ~configuration (KEDSL.Single_bam input_bam)
      | Gatk_bqsr (configuration, bam) ->
        let input_bam = compile_aligner_step ~compiler bam in
        let output_bam = result_prefix ^ ".bam" in
        Gatk.base_quality_score_recalibrator
          ~configuration ~run_with:machine ~input_bam ~output_bam
      | Picard_mark_duplicates (settings, bam) ->
        let input_bam = compile_aligner_step ~compiler bam in
        let output_bam = result_prefix ^ ".bam" in
        Picard.mark_duplicates ~settings
          ~run_with:machine ~input_bam output_bam
      | Bwa_mem (bwa_mem_config, fq_sample) ->
        perform_aligner_parallelization
          `Bwa_mem ~make_aligner:(fun pe -> Bwa_mem (bwa_mem_config, pe))
          fq_sample
          ~make_workflow:(fun fastq ->
              Bwa.mem_align_to_sam ~reference_build
                ~configuration:bwa_mem_config
                ~fastq ~result_prefix ~run_with:machine ()
              |> Samtools.sam_to_bam ~reference_build ~run_with:machine)
      | Bwa (bwa_config, what) ->
        perform_aligner_parallelization
          `Bwa ~make_aligner:(fun pe -> Bwa (bwa_config, pe)) what
          ~make_workflow:(fun fastq ->
              Bwa.align_to_sam ~reference_build
                ~configuration:bwa_config
                ~fastq ~result_prefix ~run_with:machine ()
              |> Samtools.sam_to_bam ~reference_build ~run_with:machine)
      | Mosaik (what) ->
        perform_aligner_parallelization
          `Mosaik ~make_aligner:(fun pe -> Mosaik (pe)) what
          ~make_workflow:(fun fastq -> Mosaik.align ~reference_build
                ~fastq ~result_prefix ~run_with:machine ())
      | Star (configuration, what) ->
        perform_aligner_parallelization
          `Star ~make_aligner:(fun pe -> Star (configuration, pe)) what
          ~make_workflow:(fun fastq ->
              Star.align ~configuration ~reference_build
                ~fastq ~result_prefix ~run_with:machine ())
      | Hisat (configuration, what) ->
        perform_aligner_parallelization
          `Hisat ~make_aligner:(fun pe -> Hisat (configuration, pe)) what
          ~make_workflow:(fun fastq ->
              Hisat.align ~configuration ~reference_build
                ~fastq ~result_prefix ~run_with:machine ()
              |> Samtools.sam_to_bam ~reference_build ~run_with:machine
            )
      | With_metadata (metadata_spec, p)  ->
        compile_aligner_step ~compiler p
        |> apply_with_metadata ~metadata_spec
    in
    compiler.wrap_bam_node pipeline bam_node

  let rec compile_bam_pair ~compiler (pipeline : bam_pair pipeline) :
      [ `Normal of KEDSL.bam_file KEDSL.workflow_node ] *
      [ `Tumor of KEDSL.bam_file KEDSL.workflow_node ] *
      [ `Pipeline of bam_pair pipeline ]
    =
    let {reference_build; work_dir; machine; _} = compiler in
    begin match pipeline with
    | Bam_pair (
        (Gatk_bqsr (n_bqsr_config, Gatk_indel_realigner (n_gir_conf, n_bam)))
        ,
        (Gatk_bqsr (t_bqsr_config, Gatk_indel_realigner (t_gir_conf, t_bam)))
      )
      when
        has_option compiler
          (function `Multi_sample_indel_realignment _ -> true | _ -> false)
        && n_gir_conf = t_gir_conf ->
      let normal = compile_aligner_step ~compiler n_bam in
      let tumor = compile_aligner_step ~compiler t_bam in
      let bam_list_node =
        if has_option compiler ((=) (`Map_reduce `Gatk_indel_realigner))
        then (
          Gatk.indel_realigner_map_reduce ~run_with:machine ~compress:false
            ~configuration:n_gir_conf (KEDSL.Bam_workflow_list [normal; tumor])
        ) else
          Gatk.indel_realigner ~run_with:machine ~compress:false
            ~configuration:n_gir_conf (KEDSL.Bam_workflow_list [normal; tumor])
      in
      begin match KEDSL.explode_bam_list_node bam_list_node with
      | [realigned_normal; realigned_tumor] ->
        let new_pipeline =
          Bam_pair (
            Gatk_bqsr (n_bqsr_config,
                       Bam_sample (Filename.chop_extension realigned_normal#product#path,
                                   realigned_normal)),
            Gatk_bqsr (t_bqsr_config,
                       Bam_sample (Filename.chop_extension realigned_tumor#product#path,
                                   realigned_tumor)))
        in
        compile_bam_pair ~compiler new_pipeline
      | other ->
        failwithf "Gatk.indel_realigner did not return the correct list of length 2 (tumor, normal): it gave %d bams"
          (List.length other)
      end
    | Bam_pair ( Gatk_bqsr (_, Gatk_indel_realigner (_, _)),
                 Gatk_bqsr (_, Gatk_indel_realigner (_, _))) as bam_pair
      when
        has_option compiler
          ((=) (`Multi_sample_indel_realignment `Fail_if_not_happening)) ->
      failwithf "Option (`Multi_sample_indel_realignment `Fail_if_not_happening) is set and this pipeline does not qualify:\n%s"
        (to_json bam_pair |> Yojson.Basic.pretty_to_string)
    | Bam_pair (normal_t, tumor_t) as final_pipeline ->
      let normal = compile_aligner_step ~compiler normal_t in
      let tumor = compile_aligner_step ~compiler tumor_t in
      (`Normal normal, `Tumor tumor, `Pipeline final_pipeline)
    | With_metadata (metadata_spec, p) ->
      let `Normal normal, `Tumor tumor, `Pipeline p =
        compile_bam_pair ~compiler p in

      (`Normal (apply_with_metadata ~metadata_spec normal),
       `Tumor (apply_with_metadata ~metadata_spec tumor),
       `Pipeline p)
    end

  let rec compile_variant_caller_step ~compiler (pipeline: vcf pipeline) =
    let {reference_build; work_dir; machine; _} = compiler in
    (* result prefix ignore optimizations *)
    let vcf_node =
      match pipeline with
      | Somatic_variant_caller (som_vc, bam_pair) ->
        let (`Normal normal, `Tumor tumor, `Pipeline new_bam_pair) =
          compile_bam_pair ~compiler bam_pair in
        let result_prefix =
          work_dir
          // to_file_prefix (Somatic_variant_caller (som_vc, new_bam_pair)) in
        dbg "Result_Prefix: %S" result_prefix;
        som_vc.Variant_caller.make_target
          ~run_with:machine ~input:(Variant_caller.Somatic {normal; tumor})
          ~result_prefix ()
      | Germline_variant_caller (gvc, bam) ->
        let result_prefix = work_dir // to_file_prefix pipeline in
        dbg "Result_Prefix: %S" result_prefix;
        let input_bam = compile_aligner_step ~compiler bam in
        gvc.Variant_caller.make_target
          ~run_with:machine ~input:(Variant_caller.Germline input_bam)
          ~result_prefix ()
      | With_metadata (metadata_spec, p) ->
        compile_variant_caller_step ~compiler p
        |> apply_with_metadata ~metadata_spec
    in
    compiler.wrap_vcf_node pipeline vcf_node

  let rec compile_gtf_step ~compiler (pipeline: gtf pipeline) =
    let {reference_build; work_dir; machine; _} = compiler in
    let result_prefix = work_dir // to_file_prefix pipeline in
    dbg "Result_Prefix: %S" result_prefix;
    let gtf_node =
      match pipeline with
      | Stringtie (configuration, bam) ->
        let bam = compile_aligner_step ~compiler bam in
        Stringtie.run ~configuration
          ~bam ~result_prefix ~run_with:machine ()
      | With_metadata (metadata_spec, p) ->
        compile_gtf_step ~compiler p
        |> apply_with_metadata ~metadata_spec
    in
    compiler.wrap_gtf_node pipeline gtf_node

  let rec seq2hla_hla_types_step ~compiler (pipeline : seq2hla_hla_types pipeline) =
    let { machine ; work_dir; _ } = compiler in
    match pipeline with
    | Seq2HLA sample ->
      let work_dir = work_dir // (to_file_prefix pipeline) ^ "_work_dir" in
      (* TODO: Seq2HLA can actually take the gzipped version too, so we'd
           need a unique type for that. *)

      let fastq_pair = fastq_sample_step ~compiler sample in
      let sample_name = fastq_pair#product#sample_name in
      let r1 = fastq_pair#product#r1 in
      let r2 = match fastq_pair#product#r2 with
        | Some r2 -> r2
        | _ -> failwithf "Seq2HLA doesn't support Single_end_sample(s)."
      in
      Seq2HLA.hla_type
        ~work_dir ~run_with:machine ~run_name:sample_name ~r1 ~r2
    | With_metadata (metadata_spec, p) ->
      seq2hla_hla_types_step ~compiler p
      |> apply_with_metadata ~metadata_spec

  let rec optitype_hla_types_step ~compiler (pipeline : optitype_hla_types pipeline) =
    let { machine ; work_dir; _ } = compiler in
    match pipeline with
    | Optitype (kind, sample) ->
      let work_dir = work_dir // (to_file_prefix pipeline) ^ "_work_dir" in
      let fastq = fastq_sample_step ~compiler sample in
      let sample_name = fastq#product#sample_name in
      Optitype.hla_type
        ~work_dir ~run_with:machine ~run_name:sample_name ~fastq kind
    | With_metadata (metadata_spec, p) ->
      optitype_hla_types_step ~compiler p
      |> apply_with_metadata ~metadata_spec

end