Source

dumbstreaming / dumbstreaming_it.ml

module type IT_TYPE
 =
  sig
    module It_IO
     :
      sig
        type m +'a;
        value return : 'a -> m 'a;
        value bind_rev : m 'a -> ('a -> m 'b) -> m 'b;
        value error : exn -> m 'a;
      end
    ;

    type stream 'el;
    value empty_stream : stream 'el;

    type err_msg = exn;

    type iteratee 'el 'a =
      [ IE_done of 'a
      | IE_cont of option err_msg and
          ( stream 'el
            -> It_IO.m (iteratee 'el 'a * stream 'el)
          )
      ]
    ;

    value bindI :
      iteratee 'el 'a -> ('a -> iteratee 'el 'b) -> iteratee 'el 'b
    ;

  end
;


type ntotal = int
 and npart = int
 and nbytes = int
;


module Make (I : IT_TYPE)
 :
  sig
    (* None of EOF *)

    value read : (ntotal -> npart -> nbytes -> I.iteratee char 'i)
              -> I.iteratee 'i 'a
              -> I.iteratee char (option 'a)
    ;
  end
 =
  struct

    module It_add
     :
      sig
        value itlist_anyresult_lasterror
         : list (I.iteratee 'el 'a) -> I.iteratee 'el 'a
        ;
      end
     =
      struct

        exception Itlist_empty;

        module IO = I.It_IO;

        value ( & ) f x = f x;

        value ( >>% ) = IO.bind_rev;

        open I;
        value ( >>= ) = bindI;
 
        value itlist_step_firstresult_lasterror
          (lst : list (iteratee 'el 'a))
          (s : stream 'el)
         :
          IO.m [= `First_result of (iteratee 'el 'a * stream 'el)
               |  `Last_error of err_msg
               |  `Cont of list (iteratee 'el 'a)
               ]
         =
          let rec loop lasterropt acc lst =
            match lst with
            [ [] ->
                if acc = []
                then
                  match lasterropt with
                  [ None -> assert False
                  | Some err -> IO.return & `Last_error err
                  ]
                else
                  IO.return & `Cont (List.rev acc)
            | [hd :: tl] ->
                match hd with
                [ (IE_done _) as it -> IO.return & `First_result (it, s)
                | IE_cont ((Some _) as someerr) _ ->
                    loop someerr acc tl
                | IE_cont None k ->
                    k s >>% fun
                    [ (IE_done _, _) as r ->
                        IO.return & `First_result r
                    | (IE_cont (Some _ as someerr) _, _) ->
                        loop someerr acc tl
                    | ((IE_cont None _ as hd'), _s) ->
                        loop lasterropt [hd' :: acc] tl
                    ]
                ]
            ]
          in
            if lst = []
            then IO.return & `Last_error Itlist_empty
            else loop None [] lst
        ;

        value get_any_done lst =
          loop lst
          where rec loop lst =
            match lst with
            [ [ ((IE_done _) as x) :: _ ] -> Some x
            | [] -> None
            | [ (IE_cont _ _) :: _ ] -> loop lst
            ]
        ;

        value ie_cont k = IE_cont None k
        ;

        value ie_contM k = IO.return (IE_cont None k, empty_stream)
        ;


        value itlist_anyresult_lasterror
          (lst : list (iteratee 'el 'a))
         :
          iteratee 'el 'a
         =
          match get_any_done lst with
          [ Some x -> x
          | None -> ie_cont & step lst
              where rec step lst s =
                itlist_step_firstresult_lasterror lst s >>% fun
                [ `First_result r -> IO.return r
                | `Last_error e -> IO.error e
                | `Cont lst -> ie_contM & step lst
                ]
          ]
        ;

      end
    ;

    open It_add
    ;

    value itpair_anyresult_lasterror it1 it2 =
      itlist_anyresult_lasterror [it1; it2]
    ;

    value read_packet get_piece_it combine_it =
      .
    ;



    value read get_piece_it combine_it =
      (some /* mapI */ read_packet get_piece_it combine_it)
      /* itpair_anyresult_lasterror */
      (read_eos >>= fun () -> None)
    ;

  end
;