Pvem_lwt_unix: Literate Implementation

Contents

Menu

(**************************************************************************)
(*  Copyright (c) 2012, 2013,                                             *)
(*                           Sebastien Mondet <seb@mondet.org>,           *)
(*                           Ashish Agarwal <agarwal1975@gmail.com>.      *)
(*                                                                        *)
(*  Permission to use, copy, modify, and/or distribute this software for  *)
(*  any purpose with or without fee is hereby granted, provided that the  *)
(*  above  copyright notice  and this  permission notice  appear  in all  *)
(*  copies.                                                               *)
(*                                                                        *)
(*  THE  SOFTWARE IS  PROVIDED  "AS  IS" AND  THE  AUTHOR DISCLAIMS  ALL  *)
(*  WARRANTIES  WITH  REGARD  TO  THIS SOFTWARE  INCLUDING  ALL  IMPLIED  *)
(*  WARRANTIES  OF MERCHANTABILITY AND  FITNESS. IN  NO EVENT  SHALL THE  *)
(*  AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL  *)
(*  DAMAGES OR ANY  DAMAGES WHATSOEVER RESULTING FROM LOSS  OF USE, DATA  *)
(*  OR PROFITS,  WHETHER IN AN  ACTION OF CONTRACT, NEGLIGENCE  OR OTHER  *)
(*  TORTIOUS ACTION,  ARISING OUT  OF OR IN  CONNECTION WITH THE  USE OR  *)
(*  PERFORMANCE OF THIS SOFTWARE.                                         *)
(**************************************************************************)


module Deferred_result = Pvem.With_deferred(Lwt)

open Deferred_result



module type IO = sig
  (** Useful I/O functions. *)

  type path = string
  (** The type of file paths *)

  (** {3 Whole Files} *)

  (** Write a string to a file. *)
  val write_file: path -> content:string ->
    (unit, [> `IO of [> `Write_file_exn of path * exn ] ]) t

  (** Read a string from a file. *)
  val read_file: path ->
    (string, [> `IO of [> `Read_file_exn of path * exn ] ]) t

  (** {3 Access To Channels } *)

  (** Run a function [f] on an output channel, if the channel comes from
      a [`File f], it will be closed before returning (in case of success,
      or error, but not for exceptions). *)
  val with_out_channel:
    ?buffer_size:int ->
    f:(Lwt_io.output_channel ->
       ('a,
        [> `IO of
             [> `Exn of exn | `File_exists of string | `Wrong_path of string ] ]
        as 'err) t) ->
    [ `Append_to_file of string
    | `Create_file of string
    | `Overwrite_file of string
    | `Channel of Lwt_io.output_channel | `Stderr | `Stdout] ->
    ('a, 'err) t

  (** Safely call [Lwt_io.fprint]. *)
  val write: Lwt_io.output_channel -> string -> (unit, [> `IO of [> `Exn of exn ] ]) t

  (** Flush an output channel. *)
  val flush: Lwt_io.output_channel ->  (unit, [> `IO of [> `Exn of exn ] ]) t

  (** Run a function [f] on an input channel, if the channel comes from
      a [`File f], it will be closed before returning (in case of success,
      or error, but not for exceptions). *)
  val with_in_channel:
    [ `Channel of Lwt_io.input_channel | `File of string | `Stdin ] ->
    ?buffer_size:int ->
    f:(Lwt_io.input_channel -> ('a, [> `IO of [> `Exn of exn ] ] as 'err) t) ->
    ('a, 'err) t

  (** Read [count] bytes from an input-channel (default: “as much as possible”,
      c.f. {{:http://ocsigen.org/lwt/api/Lwt_io#VALread}Lwt_io.read}). *)
  val read: ?count:int -> Lwt_io.input_channel ->
    (string, [> `IO of [> `Exn of exn ] ]) t

  val error_to_string: 
    [< `IO of [< `Exn of exn 
              | `File_exists of string 
              | `Read_file_exn of string * exn 
              | `Write_file_exn of string * exn 
              | `Wrong_path of string ] ] -> string
  (** Get a human-readable string out of an error produced by this module. *)

end

module type SYSTEM = sig
  (** Basic system access functions. *)

  (** Block for a given amount of seconds ([Lwt_unix.sleep]). *)
  val sleep: float -> (unit, [> `System of [> `Sleep of float]  * [> `Exn of exn ]]) t

  (** Manipulate [/bin/sh] commands (flavors of [Unix.system]).  *)
  module Shell : sig

    (** Make [/bin/sh] execute a command, fail if it does not return 0. *)
    val do_or_fail: string ->
      (unit,
       [> `Shell of
            string * [> `Exited of int | `Exn of exn | `Signaled of int | `Stopped of int ]
       ]) t

    (** Execute a shell command and return its standard output, standard error,
        and exit code
        [stdout, stderr]. *)
    val execute:
      string ->
      (string * string * [ `Exited of int | `Signaled of int | `Stopped of int ],
       [> `Shell of string * [> `Exn of exn ]]) t

    val status_to_string:
      [< `Exited of int | `Exn of exn | `Signaled of int | `Stopped of int ] ->
      string
    (** Convert a status (or an error) to a human-readable string *)

  end

  (** Execute a function [f] with a timeout (in seconds). If [f] throws
      an exception it will be passed as [`System (_, e)], if the functions
      timeouts the error will be [`Timeout time]. *)
  val with_timeout : float ->
    f:(unit -> ('a, [> `Timeout of float
                    | `System of [> `With_timeout of float ] * [> `Exn of exn ] 
                    ] as 'error) t) -> 
    ('a, 'error) t

  (** Create a new empty directory or fail if it already exists
      (i.e. like [mkdir]). The default permissions are [0o700]. *)
  val make_new_directory: ?perm:int -> string ->
    (unit,
     [> `System of
          [> `Make_directory of string ] *
            [> `Already_exists | `Exn of exn
            | `Wrong_access_rights of int ] ]) t

  (** Create as many directories as needed (can be 0) to ensure that the
      directory path exists (like [mkdir -p]). The default permissions
      are [0o700].  *)
  val ensure_directory_path: ?perm:int -> string ->
    (unit,
     [> `System of
          [> `Make_directory of string ] *
            [> `Exn of exn | `Wrong_access_rights of int ] ]) t

  (** Quick information on files. *)
  type file_info =
    [ `Absent
    | `Regular_file of int
    | `Symlink of string
    | `Block_device
    | `Character_device
    | `Directory
    | `Fifo
    | `Socket]

  val file_info_to_string: file_info -> string
  (** Convert file information to a string *)

  (** Get information about a path (whether it exists, its size, or
      sym-link destination). If [follow_symlink] is [false]
      (default) use [lstat] (so the result can be [`Symlink _]), if [true]
      call [stat] (information about the target).  *)
  val file_info :
    ?follow_symlink:bool -> string ->
    (file_info,
     [> `System of [> `File_info of string ] * [> `Exn of exn ] ]) t

  (** Get all the children of a directory, through a [next] stream-like
      function. *)
  val list_directory: string ->
    [ `Stream of
        (unit ->
         (string option,
          [> `System of [> `List_directory of string ] * [> `Exn of exn ] ]) t) ]

  (** Remove a file or a directory recursively. [remove] does not fail
      if the file does not exist.  *)
  val remove: string ->
    (unit,
     [> `System of [> `File_info of string
                   | `Remove of string
                   | `List_directory of string ] * 
            [>  `Exn of exn ] ]) t

  (** Make a symbolic link [link_path] pointing at
      [target]. [make_symlink] fails if the file [link_path] already
      exists. *)
  val make_symlink: target:string -> link_path:string ->
    (unit,
     [> `System of [> `Make_symlink of string * string]
                   * [> `File_exists of string
                     | `Exn of exn ] ]) t

  (** Specification of a “destination” for [copy] and [move]. *)
  type file_destination = [
    | `Into of string (** [`Into path] means copy 'file' into the {b directory} path. *)
    | `Onto of string (** [`Onto path] means copy 'file' {b as} [path]. *)
  ]

  (** Copy files or directories (recursively).

      If [ignore_strange] is true [copy] won't fail on block/character
      devices, fifos or sockets (defaults to [false]).

      The [buffer_size] (default [64_000]) is used both for reading and
      writing files.

      On can [`Fail] on symbolic links, [`Follow] them, or [`Redo] a new
      symlink with the same target.

  *)
  val copy:
    ?ignore_strange:bool ->
    ?symlinks:[ `Fail | `Follow | `Redo ] ->
    ?buffer_size:int ->
    ?if_exists:[ `Fail | `Overwrite | `Update ] ->
    src:string -> file_destination ->
    (unit,
     [> `System of
          [> `Copy of string
          | `File_info of string
          | `List_directory of string
          | `Make_symlink of string * string
          | `Remove of string
          | `Make_directory of string ] *
            [> `Already_exists
            | `IO of [> `File_exists of string | `Wrong_path of string ]
            | `Exn of exn
            | `File_exists of string
            | `Wrong_path of string
            | `File_not_found of string
            | `Wrong_access_rights of int
            | `Not_a_directory of string
            | `Wrong_file_kind of
                string *
                  [> `Block_device | `Character_device
                  | `Fifo | `Socket | `Symlink of string ] ] ]) t

  (** Try to move [src] to [dest] using [Lwt_unix.rename], if it works,
      return [`Moved] if it does not work but [copy] could work
      (i.e. both paths are not in the same {i device}) return
      [`Must_copy]. *)
  val move_in_same_device:
    ?if_exists:[ `Fail | `Overwrite | `Update ] ->
    src:string -> file_destination ->
    ([ `Moved | `Must_copy ],
     [> `System of [> `Move of string | `File_info of string ]
                   * [> `Exn of exn
                     | `File_exists of string] ]) t

  (** Heavy-weight function trying to mimic the behavior the UNIX command “mv”
      (c.f. {{:http://www.openbsd.org/cgi-bin/cvsweb/src/bin/mv/mv.c?rev=1.35;content-type=text%2Fplain;only_with_tag=HEAD}mv.c}):
      it tries [move_in_same_device] and if it returns [`Must_copy] it
      calls [copy] and [remove] (if [copy] fails, the [remove] won't
      happen but there will be no clean-up of the files already
      copied).
  *)
  val move:
    ?ignore_strange:bool ->
    ?symlinks:[ `Fail | `Follow | `Redo ] ->
    ?buffer_size:int ->
    ?if_exists:[ `Fail | `Overwrite | `Update ] ->
    src:string -> file_destination ->
    (unit,
     [> `System of
          [> `Copy of string
          | `Move of string
          | `Remove of string
          | `File_info of string
          | `List_directory of string
          | `Make_symlink of string * string
          | `Make_directory of string ] *
            [> `Already_exists
            | `IO of [> `File_exists of string | `Wrong_path of string ]
            | `Exn of exn
            | `File_exists of string
            | `Wrong_path of string
            | `File_not_found of string
            | `Wrong_access_rights of int
            | `Not_a_directory of string
            | `Wrong_file_kind of
                string *
                  [> `Block_device | `Character_device
                  | `Fifo | `Socket | `Symlink of string ] ] ]) t

  (** Representation of a hierarchy of files ([`Leaf]) and directories ([`Node]). *)
  type file_tree = [
    | `Node of string * file_tree list
    | `Leaf of string * file_info
  ]

  (** Obtain the [file_tree] starting at a given path. *)
  val file_tree :
    ?follow_symlinks:bool ->
    string ->
    (file_tree, [> `System of
                     [> `File_info of string
                     | `File_tree of string
                     | `List_directory of string ] *
                       [> `Exn of exn
                       | `File_not_found of string] ]) t

  val error_to_string:
    [< `Shell of string *
           [< `Exited of int | `Exn of exn | `Signaled of int | `Stopped of int ]
    | `System of
         [< `Copy of string 
         | `File_info of string 
         | `File_tree of string 
         | `List_directory of string 
         | `Make_directory of string 
         | `Make_symlink of string * string 
         | `Move of string 
         | `Remove of string ]
         * [< `Already_exists 
           | `Exn of exn 
           | `File_exists of string 
           | `File_not_found of string 
           | `IO of [< `Exn of exn 
                    | `File_exists of string 
                    | `Read_file_exn of string * exn 
                    | `Write_file_exn of string * exn 
                    | `Wrong_path of string ] 
           | `Not_a_directory of string 
           | `Wrong_access_rights of int 
           | `Wrong_file_kind of string * file_info
           | `Wrong_path of string ] 
    ] -> string
  (** Make a human-readable string for any error in this module. *)

end


module type DEFERRED_LIST = sig
(** Monadic “operations” on lists. *)

  (** Sequentially launch [f] on the first argument and
      get out of the loop at the first error.
      The function returns the list of results if all succeed, or the
      first error.
  *)
  val while_sequential: 'a list -> f:('a -> ('c, 'b) t) -> ('c list, 'b) t

  (** Sequentially launch [f] on the first argument and process the
      whole list even if there are errors.
      The function returns the list of successes and the list of errors.  *)
  val for_sequential: 'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t

  (** Like [for_sequential] but all the threads are launched concurrently. *)
  val for_concurrent: 'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t

  (** Like [for_concurrent] but with the index in the list passed to the
      function. *)
  val for_concurrent_with_index:
    'a list -> f:(int -> 'a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t

  (** [pick_and_cancel] is a wrapper for [Lwt.pick]. *)
  val pick_and_cancel: ('a, 'error) t list -> ('a, 'error) t

end

module type LIGHT = sig
  (** Basic traffic lights. *)

  type t
  (** The traffic signal handle (uses {!Lwt.task}). *)

  val create: unit -> t
  (** Create a “red” traffic light. *)

  val try_to_pass: t -> (unit, 'a) Deferred_result.t
  (** [try_to_pass t] will block until [t] is “green” or will return
      immediately if [t] is already green. *)

  val green: t -> unit
  (** [green t] sets the light to “green”, this will wake up all the threads
    waiting on [try_to_pass t]. *)

end

module Internal_pervasives = struct
  let (|>) x f = f x
  module String = StringLabels
  module List = ListLabels
  module Filename = struct
    include Filename

    let string_rexists s ~f ~from:n =
      let rec loop n =
        if n = 0 then
          None
        else if f s.[n - 1] then
          Some n
        else
          loop (n - 1)
      in
      loop n

    let skip_end_slashes s ~from =
      match string_rexists s ~from ~f:(fun c -> c <> '/') with
      | Some v -> `Ends_at v
      | None   -> `All_slashes
    let split = function
    | "" -> ".", "."
    | s ->
      match skip_end_slashes s ~from:(String.length s) with
      | `All_slashes -> "/", "/"
      | `Ends_at basename_end ->
        match string_rexists s ~f:(fun c -> c = '/') ~from:basename_end with
        | None -> ".", String.sub ~pos:0 ~len:basename_end s
        | Some basename_start ->
          let basename =
            String.sub s ~pos:basename_start
              ~len:(basename_end - basename_start)
          in
          let dirname =
            match skip_end_slashes s ~from:basename_start with
            | `All_slashes -> "/"
            | `Ends_at dirname_end -> String.sub ~pos:0 ~len:dirname_end s
          in
          dirname, basename
    let parts filename =
      let rec loop acc filename =
        match split filename with
        | "." as base, "." -> base :: acc
        | "/" as base, "/" -> base :: acc
        | rest, dir ->
          loop (dir :: acc) rest
      in
      loop [] filename
  end
  let exn = Printexc.to_string
end
open Internal_pervasives
open Printf

module IO : IO = struct

  type path = string

  let fail_io e = fail (`IO e)
  let wrap_deferred_io f =
    wrap_deferred ~on_exn:(fun e -> `IO (`Exn e)) (fun () -> f ())

  let error_to_string = function
  | `IO err ->
    match err with
    | `Exn e  -> sprintf "IO-Exception %s" (exn e)
    | `Wrong_path e -> sprintf "Wrong-path: %S" e
    | `File_exists e -> sprintf "File already exists: %S" e
    | `Read_file_exn (p, e) -> sprintf "Exception while reading %S: %s" p (exn e)
    | `Write_file_exn (p, e) -> sprintf "Exception while writing %S: %s" p (exn e)

  (******************************************************************************)
  (* Channels *)

  (* This transforms the legacy `?buffer_size` option into the `?buffer` option
     that `Lwt_io` expects after backwards incompatible change in Lwt
     2.5.0.  *)
  let buffer_option buffer_size_option =
    match buffer_size_option with
    | None -> None
    | Some s -> Some (Lwt_bytes.create s)

  let with_out_channel ?buffer_size ~f out =
    begin
      let buffer = buffer_option buffer_size in
      let open_file ~flags file =
        wrap_deferred_io (fun () ->
            Lwt_io.open_file ~mode:Lwt_io.output ?buffer ~flags file)
      in
      begin match out with
      | `Stdout -> return Lwt_io.stdout
      | `Stderr -> return Lwt_io.stderr
      | `Channel c -> return c
      | `Append_to_file file ->
        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_APPEND ]) file
      | `Overwrite_file file ->
        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_TRUNC ]) file
      | `Create_file file ->
        open_file ~flags:Unix.([ O_CREAT; O_WRONLY; O_EXCL ]) file
      end
      >>= fun outchan ->
      begin
        f outchan
        >>< begin function
        | `Ok o ->
          wrap_deferred_io (fun () -> Lwt_io.close outchan)
          >>= fun () ->
          return o
        | `Error e ->
          begin match out with
          | `Append_to_file _
          | `Overwrite_file _
          | `Create_file _ ->
            wrap_deferred_io (fun () -> Lwt_io.close outchan)
            >>= fun _ ->
            fail e
          | _ -> fail e
          end
        end
      end
    end
    >>< begin function
    | `Ok o -> return o
    | `Error (`IO (`Exn (Unix.Unix_error (Unix.ENOENT, _, path)))) ->
      fail_io (`Wrong_path path)
    | `Error (`IO (`Exn (Unix.Unix_error (Unix.EEXIST, _, path)))) ->
      fail_io (`File_exists path)
    | `Error  e -> fail e
    end

  let write out s =
    wrap_deferred_io (fun () -> Lwt_io.fprint out s)

  let flush out = wrap_deferred_io (fun () -> Lwt_io.flush out)


  let with_in_channel inspec ?buffer_size ~f =
    begin match inspec with
    | `Stdin -> return Lwt_io.stdin
    | `Channel c -> return c
    | `File file ->
      let buffer = buffer_option buffer_size in
      wrap_deferred_io (fun () ->
          Lwt_io.open_file ~mode:Lwt_io.input ?buffer file)
    end
    >>= fun inchan ->
    begin
      f inchan
      >>< begin function
      | `Ok o ->
        wrap_deferred_io (fun () -> Lwt_io.close inchan)
        >>= fun () ->
        return o
      | `Error e ->
        begin match inspec with
        | `File _ ->
          wrap_deferred_io (fun () -> Lwt_io.close inchan)
          >>= fun _ ->
          fail e
        | _ -> fail e
        end
      end
    end

  let read ?count i =
    wrap_deferred_io (fun () -> Lwt_io.read ?count i)

  (******************************************************************************)
  (* Whole Files *)

  let  write_file file ~content =
    catch_deferred Lwt_io.(fun () ->
        with_file ~mode:output file (fun i -> write i content))
    >>< function
    | `Ok o -> return o
    | `Error e -> fail_io (`Write_file_exn (file, e))

  let read_file file =
    catch_deferred Lwt_io.(fun () ->
        with_file ~mode:input file (fun i -> read i))
     >>< function
    | `Ok o -> return o
    | `Error e -> fail_io (`Read_file_exn (file, e))

end

module System : SYSTEM = struct

  let wrap_deferred_system cmd f =
    wrap_deferred f ~on_exn:(fun e -> `System (cmd, `Exn e))
  let fail_sys r = fail (`System r)

  module Shell = struct

    let discriminate_process_status s ret =
      begin match ret with
      | Lwt_unix.WEXITED 0 -> return ()
      | Lwt_unix.WEXITED n -> fail (`Shell (s, `Exited n))
      | Lwt_unix.WSIGNALED n -> fail (`Shell (s, `Signaled n))
      | Lwt_unix.WSTOPPED n -> fail (`Shell (s, `Stopped n))
    end

    let status_to_string = function
    | `Exited i -> sprintf "Exited with %d" i
    | `Exn e -> sprintf "Exception %s" (exn e)
    | `Signaled i -> sprintf "Signaled (%d)" i
    | `Stopped i -> sprintf "Stopped (%d)" i

    let do_or_fail s =
      wrap_deferred  Lwt_io.(fun () -> Lwt_unix.system s)
        ~on_exn:(fun e -> `Shell (s, `Exn e))
      >>= fun ret ->
      discriminate_process_status s ret


    let execute s =
      wrap_deferred ~on_exn:(fun e -> `Shell (s, `Exn e))
        Lwt.(fun () ->
          let inprocess = Lwt_process.(open_process_full (shell s)) in
          Lwt_list.map_p Lwt_io.read
            [inprocess#stdout; inprocess#stderr; ]
          >>= fun output ->
          inprocess#status >>= fun status ->
          return (status, output))
      >>= fun (ret, output) ->
      let code =
        match ret with
        | Lwt_unix.WEXITED n ->   (`Exited n)
        | Lwt_unix.WSIGNALED n -> (`Signaled n)
        | Lwt_unix.WSTOPPED n ->  (`Stopped n)
      in
      begin match output with
      | [out; err] -> return (out, err, code)
      | _ -> assert false
      end

  end


  let sleep f =
    wrap_deferred_system (`Sleep f) (fun () -> Lwt_unix.sleep f)

  let with_timeout time ~f =
    Lwt.catch
      begin fun () ->
        Lwt_unix.with_timeout time f
      end
      begin function
      | Lwt_unix.Timeout -> fail (`Timeout time)
      | e -> fail_sys (`With_timeout time, `Exn e)
      end


  let mkdir_or_fail ?(perm=0o700) dirname =
    let fail_here e =
      fail_sys (`Make_directory dirname, e) in
    Lwt.catch
      Lwt.(fun () -> Lwt_unix.mkdir dirname perm >>= fun () -> return (`Ok ()))
      begin function
      | Unix.Unix_error (Unix.EACCES, cmd, arg)  ->
        fail_here (`Wrong_access_rights perm)
      | Unix.Unix_error (Unix.EEXIST, cmd, arg)  ->
        fail_here (`Already_exists)
      | Unix.Unix_error (Unix.EISDIR, cmd, arg)  ->
        (* Bypass MacOSX bug https://github.com/janestreet/core/issues/7 *)
        fail_here (`Already_exists)
      | e -> fail_here (`Exn e)
      end

  let mkdir_even_if_exists ?(perm=0o700) dirname =
    let fail_here e =
      fail_sys (`Make_directory dirname, e) in
    Lwt.catch
      Lwt.(fun () -> Lwt_unix.mkdir dirname perm >>= fun () -> return (`Ok ()))
      begin function
      | Unix.Unix_error (Unix.EACCES, cmd, arg)  ->
        fail_here (`Wrong_access_rights perm)
      | Unix.Unix_error (Unix.EISDIR, cmd, arg)  ->
        (* Bypass MacOSX bug https://github.com/janestreet/core/issues/7 *)
        return ()
      | Unix.Unix_error (Unix.EEXIST, cmd, arg)  -> return ()
      | e -> fail_here (`Exn e)
      end

  let make_new_directory ?perm dirname =
    mkdir_or_fail ?perm dirname

  let ensure_directory_path ?perm dirname =
    (* Code inspired by Core.Std.Unix *)
    let init, dirs =
      match Filename.parts dirname with
      | [] -> ksprintf failwith "Sys.mkdir_p: BUG! Filename.parts %s -> []" dirname
      | init :: dirs -> (init, dirs)
    in
    mkdir_even_if_exists ?perm init
    >>= fun () ->
    List.fold_left dirs ~init:(return init) ~f:(fun m part ->
        m >>= fun previous ->
        let dir = Filename.concat previous part in
        mkdir_even_if_exists ?perm dir
        >>= fun () ->
        return dir)
    >>= fun _ ->
    return ()

  type file_info =
    [ `Absent
    | `Regular_file of int
    | `Symlink of string
    | `Block_device
    | `Character_device
    | `Directory
    | `Fifo
    | `Socket]

(*
  WARNING: this is a work-around for issue [329] with Lwt_unix.readlink.
  When it is fixed, we should go back to Lwt_unix.

  [329]: http://ocsigen.org/trac/ticket/329
*)
  let lwt_unix_readlink l =
    let open Lwt in
    Lwt_preemptive.detach Unix.readlink l

  let file_info ?(follow_symlink=false) path =
    let stat_fun =
      if follow_symlink then Lwt_unix.stat else Lwt_unix.lstat in
    (* eprintf "(l)stat %s? \n%!" path; *)
    Lwt.catch
      Lwt.(fun () -> stat_fun path >>= fun s -> return (`Ok (`Unix_stats s)))
      begin function
      | Unix.Unix_error (Unix.ENOENT, cmd, arg)  -> return `Absent
      | e -> fail_sys (`File_info path, `Exn e)
      end
    >>= fun m ->
    let open Lwt_unix in
    begin match m with
    | `Absent -> return `Absent
    | `Unix_stats stats ->
      begin match stats.st_kind with
      | S_DIR -> return (`Directory)
      | S_REG -> return (`Regular_file (stats.st_size))
      | S_LNK ->
        (* eprintf "readlink %s? \n%!" path; *)
        begin
          catch_deferred (fun () -> lwt_unix_readlink path)
          >>< begin function
          | `Ok s -> return s
          | `Error e -> fail (`System (`File_info path, `Exn e))
          end
        end
        >>= fun destination ->
        (* eprintf "readlink %s worked \n%!" path; *)
        return (`Symlink destination)
      | S_CHR -> return (`Character_device)
      | S_BLK -> return (`Block_device)
      | S_FIFO -> return (`Fifo)
      | S_SOCK -> return (`Socket)
      end
    end

  let list_directory path =
    let f_stream = Lwt_unix.files_of_directory path in
    let next s =
      wrap_deferred ~on_exn:(fun e -> `System (`List_directory path, `Exn e))
        Lwt.(fun () ->
            catch (fun () -> Lwt_stream.next s >>= fun n -> return (Some n))
              (function Lwt_stream.Empty -> return None | e -> fail e)) in
    `Stream (fun () -> (next f_stream))

  let remove path =
    let rec remove_aux path =
      file_info path
      >>= begin function
      | `Absent -> return ()
      | `Block_device
      | `Character_device
      | `Symlink _
      | `Fifo
      | `Socket
      | `Regular_file _-> wrap_deferred_system (`Remove path) (fun () -> Lwt_unix.unlink path)
      | `Directory ->
        let `Stream next_dir = list_directory path in
        let rec loop () =
          next_dir ()
          >>= begin function
          | Some ".."
          | Some "." -> loop ()
          | Some name ->
            remove_aux (Filename.concat path name)
            >>= fun () ->
            loop ()
          | None -> return ()
          end
        in
        loop ()
        >>= fun () ->
        wrap_deferred_system (`Remove path) (fun () -> Lwt_unix.rmdir path)
      end
    in
    remove_aux path
    >>< begin function
    | `Ok () -> return ()
    | `Error (`System_exn e) -> fail (`System (`Remove path, `Exn e))
    | `Error (`System e) -> fail (`System e)
    end

  let make_symlink ~target ~link_path =
    wrap_deferred (fun () -> Lwt_unix.symlink target link_path)
      ~on_exn:(fun e ->
          begin match e with
          | Unix.Unix_error (Unix.EEXIST, cmd, arg)  ->
            (`System (`Make_symlink (target, link_path), `File_exists link_path))
          | e ->  (`System (`Make_symlink (target, link_path), `Exn e))
          end)

  type file_destination = [
    | `Into of string
    | `Onto of string
  ]
  let path_of_destination ~src ~dst =
    match dst with
    | `Into p -> Filename.(concat p (basename src))
    | `Onto p -> p

  let copy
      ?(ignore_strange=false) ?(symlinks=`Fail) ?(buffer_size=64_000)
      ?(if_exists=`Fail)
      ~src dst =
    let rec copy_aux ~src ~dst =
      file_info src
      >>= begin function
      | `Absent -> fail (`File_not_found src)
      | `Block_device
      | `Character_device
      | `Fifo
      | `Socket as k ->
        if ignore_strange then return () else fail (`Wrong_file_kind (src, k))
      | `Symlink content ->
        begin match symlinks with
        | `Fail -> fail (`Wrong_file_kind (src, `Symlink content))
        | `Follow -> copy_aux ~src:content ~dst
        | `Redo ->
          let link_path = path_of_destination ~src ~dst in
          begin match if_exists with
          | `Fail -> (* make_symlink already fails on existing files *)
            return ()
          | `Overwrite
          | `Update -> remove link_path (* remove does not fail on missing files *)
          end
          >>= fun () ->
          make_symlink ~target:content ~link_path
        end
      | `Regular_file _->
        let output_path = path_of_destination ~src ~dst in
        let open_spec =
          match if_exists with
          | `Fail -> `Create_file output_path
          | `Overwrite | `Update -> `Overwrite_file output_path
        in
        IO.with_out_channel ~buffer_size open_spec ~f:(fun outchan ->
            IO.with_in_channel ~buffer_size (`File src) ~f:(fun inchan ->
                let rec loop () =
                  IO.read ~count:buffer_size inchan
                  >>= begin function
                  | "" -> return ()
                  | buf ->
                    IO.write outchan buf >>= fun () ->
                    loop ()
                  end
                in
                loop ()))
      | `Directory ->
        let new_dir = path_of_destination ~src ~dst in
        file_info new_dir
        >>= begin function
        | `Absent ->
          make_new_directory new_dir
        | smth_else ->
          begin match if_exists with
          | `Fail -> fail (`File_exists new_dir)
          | `Overwrite ->
            remove new_dir
            >>= fun () ->
            make_new_directory new_dir
          | `Update ->
            if smth_else = `Directory
            then return ()
            else fail (`Not_a_directory new_dir)
          end
        end
        >>= fun () ->
        let `Stream next_dir = list_directory src in
        let rec loop () =
          next_dir ()
          >>= begin function
          | Some ".."
          | Some "." -> loop ()
          | Some name ->
            copy_aux
              ~src:(Filename.concat src name)
              ~dst:(`Into new_dir)
            >>= fun () ->
            loop ()
          | None -> return ()
          end
        in
        loop ()
      end
    in
    (copy_aux ~src ~dst
     >>< begin function
     | `Ok () -> return ()
     | `Error err ->
       begin match err with
       | `IO (`Exn e) -> fail (`System (`Copy src, `Exn e))
       | `IO (`File_exists _)
       | `IO (`Wrong_path _)
       | `File_exists _
       | `File_not_found _
       | `Not_a_directory _
       | `Wrong_file_kind _ as e -> fail (`System (`Copy src, e))
       | `System e -> fail (`System e)
       end
     end)

  let move_in_same_device ?(if_exists=`Fail) ~src dst =
    let real_dest = path_of_destination ~src ~dst in
    begin match if_exists with
    | `Fail ->
      file_info real_dest
      >>= begin function
      | `Absent -> return ()
      | _ -> fail (`System (`Move src, `File_exists real_dest))
      end
    | _ -> (* Unix.rename does overwriting *) return ()
    end
    >>= fun () ->
    Lwt.catch
      Lwt.(fun () -> Lwt_unix.rename src real_dest >>= fun () -> return (`Ok `Moved))
      begin function
      | Unix.Unix_error (Unix.EXDEV, cmd, arg)  -> return `Must_copy
      | Unix.Unix_error (Unix.ENOTEMPTY, cmd, arg)  -> return `Must_copy
      | e -> fail (`System (`Move src, `Exn e))
      end

  let move ?ignore_strange ?symlinks ?buffer_size ?if_exists ~src dst =
    move_in_same_device ?if_exists ~src dst
    >>= begin function
    | `Moved -> return ()
    | `Must_copy ->
      copy ~src ?buffer_size ?ignore_strange ?symlinks ?if_exists dst
      >>= fun () ->
      remove src
    end


  type file_tree = [
    | `Node of string * file_tree list
    | `Leaf of string * file_info
  ]

  let file_tree ?(follow_symlinks=false) path =
    let directory p l = return (`Node (p, l)) in
    let file p l = return (`Leaf (p, l)) in
    let rec find_aux ?name_to_report path =
      let name =
        match name_to_report with
        | Some s -> s
        | None -> Filename.basename path in
      file_info path
      >>= begin function
      | `Absent -> fail (`File_not_found path)
      | `Block_device
      | `Character_device
      | `Fifo
      | `Regular_file _
      | `Socket as k -> file name k
      | `Symlink content as k ->
        begin match follow_symlinks with
        | true ->
          let continue =
            if Filename.is_relative content
            then (Filename.concat path content)
            else content in
          find_aux ~name_to_report:(Filename.basename path) continue
        | false ->
          file name k
        end
      | `Directory ->
        let `Stream next_dir = list_directory path in
        let rec loop acc =
          next_dir ()
          >>= begin function
          | Some ".."
          | Some "." -> loop acc
          | Some name -> loop (name :: acc)
          | None -> return acc
          end
        in
        loop []
        >>= fun sub_list ->
        List.fold_left
          ~init:(return []) (List.sort ~cmp:String.compare sub_list)
          ~f:(fun prev dir ->
              prev >>= fun l ->
              find_aux (Filename.concat path dir)
              >>= fun newone ->
              return (newone :: l))
        >>| List.rev
        >>= fun sub_tree ->
        directory name sub_tree
      end
    in
    (find_aux path
     >>< function
     | `Ok o -> return o
     | `Error e ->
       begin match e with
       | `Io_exn e -> fail (`System (`File_tree path, `Exn e))
       | `File_not_found _ as e -> fail (`System (`File_tree path, e))
       | `System e -> fail (`System e)
       end)

  let file_info_to_string = function
  | `Absent -> "Absent"
  | `Regular_file i -> sprintf "Regular file (size %d B)" i
  | `Symlink s -> sprintf "Sym-link to %S" s
  | `Block_device -> "Block device"
  | `Character_device -> "Character device"
  | `Directory -> "Directory"
  | `Fifo -> "FIFO"
  | `Socket -> "Socket"


  let error_to_string = function
  | `System (where, what) ->
    sprintf "System error while %s: %s"
      begin match where with
      | `File_info s -> sprintf "getting info on %S" s
      | `Make_directory s -> sprintf "making directory %S" s
         (* | `IO of [ `File_exists of string | `Wrong_path of string ] *)
      | `File_tree s -> sprintf "getting file tree from %S" s
      | `Move s -> sprintf "moving %S" s
      | `Copy s -> sprintf "copying %S" s
      | `Make_symlink (s1, s2) -> sprintf "Making symlink from target %S to %s" s1 s2
      | `Remove s -> sprintf "removing %S" s
      | `List_directory s -> sprintf "listing directory %S" s
      end
      begin match what with
      | `Already_exists -> "Already exists"
      | `IO _ as e -> sprintf "I/O Error %S" (IO.error_to_string e)
      | `File_not_found s -> sprintf "File not found (%S)" s
      | `Not_a_directory s -> sprintf "Not a directory (%S)" s
      | `File_exists s -> sprintf "File exists (%S)" s
      | `Wrong_path s -> sprintf "Wrong path (%S)" s
      | `Wrong_file_kind (s, k) ->
        sprintf "Wrong kind of file (%S: %s)" s (file_info_to_string k)
      | `Exn e -> sprintf "Exception %s" (exn e)
      | `Wrong_access_rights o -> sprintf "wrong access rights: 0o%o" o
      end
  | `Shell (cmd, err) ->
    sprintf "Shell command %S failed: %s" cmd
      (Shell.status_to_string err)

end

module Deferred_list = struct

  (** Returns the list of results if all succeed, or the first error. *)
  let while_sequential:
    'a list -> f:('a -> ('c, 'b) t) -> ('c list, 'b) t
    = fun (type b) (l: 'a list) ~(f: 'a -> ('c, b) t) ->
      let module Map_sequential = struct
        exception Local_exception of b
        let ms l f =
          wrap_deferred
            (fun () ->
               Lwt_list.map_s (fun o ->
                   Lwt.bind (f o) (function
                     | `Ok oo -> Lwt.return oo
                     | `Error ee -> Lwt.fail (Local_exception ee))) l)
            ~on_exn:(function
              | Local_exception e -> e
              | e ->
                ksprintf failwith "Expecting only Local_exception, but got: %s"
                  (Printexc.to_string e) ())
      end in
      Map_sequential.ms l f

  let for_sequential:
    'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
    = fun l ~f ->
      let oks = ref [] in
      let errors = ref [] in
      List.fold_left l ~init:(return ()) ~f:(fun prevm elt ->
          prevm >>= fun () ->
          f elt >>< function
          | `Ok o -> oks := o :: !oks; return ()
          | `Error e -> errors := e :: !errors; return ())
      >>= fun () ->
      return (List.rev !oks, List.rev !errors)

  let for_concurrent:
    'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
    = fun l ~f ->
      let oks = ref [] in
      let errors = ref [] in
      Lwt.(
        Lwt_list.map_p (fun elt ->
            f elt >>= function
            | `Ok o -> oks := o :: !oks; return ()
            | `Error e -> errors := e :: !errors; return ()) l
        >>= fun _ ->
        return (`Ok ())
      )
      >>= fun () ->
      return (List.rev !oks, List.rev !errors)


  let for_concurrent_with_index l ~f =
    let with_indexes = List.mapi l ~f:(fun i a -> (i, a)) in
    for_concurrent with_indexes ~f:(fun (i, a) -> f i a)

  let pick_and_cancel: ('a, 'error) t list -> ('a, 'error) t = fun l ->
    Lwt.pick l

  end

module Light = struct

  type t = {
    mutable lwt_t: unit Lwt.t;
    mutable lwt_u: unit Lwt.u;
    mutable color: [`Red | `Green];
  }
  let create () =
    let lwt_t, lwt_u = Lwt.task () in
    {lwt_u; lwt_t; color = `Red}

  let try_to_pass w =
    match w.color with
    | `Green -> return ()
    | `Red ->
      begin match Lwt.state w.lwt_t with
      | Lwt.Sleep -> ()
      | Lwt.Return () | Lwt.Fail _ ->
        (* we need to renew the “task” *)
        let t, u = Lwt.task () in
        w.lwt_t <- t;
        w.lwt_u <- u;
      end;
      wrap_deferred ~on_exn:(fun e -> e) (fun () -> w.lwt_t)
      >>< function
      | `Error Lwt.Canceled -> return ()
      | `Error other -> failwith "BUG: THIS SHOULD NOT HAPPEN"
      | `Ok () -> return ()

  let green t =
    t.color <- `Green;
    Lwt.wakeup_exn t.lwt_u Lwt.Canceled
  (* We use Lwt.Canceled so that can re-wake-up sleepers at will
    see https://github.com/ocsigen/lwt/blob/master/src/core/lwt.ml#L312
    where Lwt.Canceled is ignored *)


end