Source

dumbstreaming / dumbstreaming_it.ml

value dbgn fmt = Printf.ksprintf (fun s -> Printf.eprintf "%s%!" s) fmt
;
value dbg fmt = Printf.ksprintf (fun s -> Printf.eprintf "DBG: %s\n%!" s) fmt
;
(*
value dbg fmt = Printf.ifprintf stdout fmt
;
*)

value ( & ) f x = f x
;

open Ds_types
;


module Make (I : IT_TYPE)
 :
  sig
    (* None of EOF *)

    value read :
      ~get_piece_it :
       ( ~totalsize:int64 ->
         ~totalcount:int ->
         ~piecesize:int ->
         ~piecenumber:int ->
         I.iteratee char 'i
       )
      -> ~combine_it : (I.iteratee 'i 'a)
      -> I.iteratee char (option 'a)
    ;

    value read_lengths : int -> I.iteratee char (list int)
    ;

  end
 =
  struct

    value ie_cont k = I.IE_cont None k
    ;

    open Printf
    ;

    module IO = I.It_IO;
    open I.Ops;

    value itpair_anyresult_lasterror it1 it2 =
      I.itlist_anyresult_lasterror [it1; it2]
    ;

    exception Dumbstreaming of string
    ;

    value ds_error fmt = Printf.ksprintf
      (fun s -> I.throw_err (Dumbstreaming s))
      fmt
    ;

    value read_the_char c =
      let err txt = ds_error "read_char: %C expected, %s found" c txt in
      I.peek >>= fun
      [ Some c' ->
          if c == c'
          then
            I.drop 1
          else
            err & sprintf "%C" c'
      | None ->
          err "eof"
      ]
    ;

    value read_space = read_the_char '\x20'
      and read_eol =
        I.peek >>= (fun
        [ None -> ds_error "expected EOL (\\r?\\n), found EOF"
        | Some '\r' -> I.junk
        | _ -> I.return ()
        ])
        >>= fun () ->
        I.peek >>= fun
        [ None -> ds_error "expected EOL (\\n), found EOF"
        | Some '\n' -> I.junk
        | Some c -> ds_error "expected EOL (\\n), found %C" c
        ]
    ;


    value ie_doneM
     (x : 'a)
     (s : I.stream 'el)
     : IO.m (I.iteratee 'el 'a  *  I.stream 'el)
     = IO.return (I.IE_done x, s)
    ;


    value (read_data_lens :
      ~get_piece_it :
       ( ~totalsize:int64 -> ~totalcount:int ->
         ~piecesize:int -> ~piecenumber:int ->
         I.iteratee char 'i
       )
      -> ~combine_it : (I.iteratee 'i 'a)
      -> ~totalsize : int64
      -> ~lens : list int
      -> ~totalcount : int
      -> I.iteratee char 'a
    )
      ~get_piece_it
      ~combine_it
      ~totalsize
      ~lens
      ~totalcount
     =
      let rec read_data_lens ~combine_it ~lens ~piecenumber
       : I.iteratee char 'a
       =
        match (lens, combine_it) with
        [ ([], _) ->
           read_eol >>= fun () ->
           I.joinI (I.return combine_it)
        | ([len :: lens], (I.IE_done _ | I.IE_cont (Some _) _)) ->
            I.drop len >>= fun () ->
            read_eol >>= fun () ->
            read_data_lens ~combine_it ~lens ~piecenumber:(piecenumber + 1)
        | ([len :: lens], I.IE_cont None k) ->
            let this_piece_it : I.iteratee char 'i = get_piece_it
              ~totalsize ~piecenumber ~piecesize:len ~totalcount in
            (I.joinI &
               I.take_or_fail len this_piece_it) >>= fun this_piece_res ->
            read_eol >>= fun () ->

            I.lift (I.feedI k (I.chunk_of this_piece_res))
              >>= fun combine_it ->

            read_data_lens ~combine_it ~lens ~piecenumber:(piecenumber + 1)
        ]
      in
        read_data_lens ~combine_it ~piecenumber:0 ~lens
    ;


    value read_data ~get_piece_it ~combine_it ~totalsize ~lens =
      let sum_lens = List.fold_left Int64.add 0L
        (List.map Int64.of_int lens) in
      if totalsize <> sum_lens
      then ds_error
        "total length (%Li) is not equal to sum of pieces' lengths (%Li)"
          totalsize sum_lens
      else
        let totalcount = List.length lens in
        read_data_lens
          ~get_piece_it ~combine_it ~totalsize
          ~totalcount ~lens
    ;


    value read_lengths count : I.iteratee char (list int) =

      let read_len
       : I.iteratee char int
       = read_space >>= fun () -> I.read_uint
      in

      let enumerate_over_lens
       : I.iteratee int (list int)
         -> I.iteratee char (I.iteratee int (list int))
       = I.sequence_stream read_len
      in

      let list_of_count
       : I.iteratee int (list int)
       = I.joinI (I.take count I.stream2list)
      in

      I.joinI (enumerate_over_lens list_of_count)

(* ==
      I.joinI
        ( (I.sequence_stream (read_space >>= fun () -> read_uint))
          (I.joinI (I.take count I.stream2list))
        )
*)
    ;


    value int_of_int64 =
      let max = Int64.of_int Pervasives.max_int
      and min = Int64.of_int Pervasives.min_int in
      fun msg x ->
        if x < min || x > max
        then ds_error "int_of_int64: %s" msg
        else I.return (Int64.to_int x)
    ;

    value read_some ~get_piece_it ~combine_it : I.iteratee _ _ =
      I.read_uint64 >>= fun packet_length ->
      let read_data_gen = read_data ~get_piece_it ~combine_it
      in
      let read_ver1 () =
        int_of_int64 "read_ver1" packet_length >>= fun pl ->
        read_data_gen ~totalsize:packet_length ~lens:[pl]

      and read_ver2 () =
        I.read_uint >>= fun count ->
        let () = assert (count >= 0) in
        read_lengths count >>= fun lengths ->
        let lengths_count = List.length lengths in
        if lengths_count < count
        then ds_error "too few pieces' lengths: expected %i, got %i"
                      count lengths_count
        else
        let () = assert (count = List.length lengths) in
        read_eol >>= fun () ->
        read_data_gen ~totalsize:packet_length ~lens:lengths

      in
      I.peek >>= fun
      [ Some '\x20' -> I.junk >>= read_ver2
      | Some '\x0A' -> I.junk >>= read_ver1
      | Some _ | None -> ds_error
          "space or \\x0A expected after packet length"
      ]
    ;


    value (read :
      ~get_piece_it :
       ( ~totalsize:int64 ->
         ~totalcount:int ->
         ~piecesize:int ->
         ~piecenumber:int ->
         I.iteratee char 'i
       )
      -> ~combine_it : (I.iteratee 'i 'a)
      -> I.iteratee char (option 'a)
    )
     ~get_piece_it ~combine_it
     =
      I.peek >>= fun
      [ None | Some '\x0A' -> I.return None
      | Some _ ->
          read_some ~get_piece_it ~combine_it >>= fun (r : _) ->
          I.return (Some r)
      ]
    ;


  end
;