Commits

Dmitry Grebeniuk  committed 86434ca

parsing the stream with iteratees

  • Participants
  • Parent commits 1949161

Comments (0)

Files changed (2)

File dumbstreaming_it.ml

-(*
 value dbg fmt = Printf.eprintf fmt
 ;
-*)
+(*
 value dbg fmt = Printf.ifprintf stdout fmt
 ;
+*)
 
 value ( & ) f x = f x
 ;
       iteratee 'el (iteratee 'el' 'a) -> iteratee 'el 'a
     ;
 
+    value lift : It_IO.m 'a -> iteratee 'el 'a
+    ;
+
     value liftI : It_IO.m (iteratee 'el 'a) -> iteratee 'el 'a
     ;
 
 ;
 
 
-type ntotal = int64
- and npart = int
- and nbytes = int
-;
-
-
 module Make (I : IT_TYPE)
  :
   sig
 
     value read :
       ~get_piece_it :
-       ( ~ntotal:ntotal -> ~npart:npart -> ~nbytes:nbytes
-         -> I.iteratee char 'i
+       ( ~totalsize:int64 ->
+         ~totalcount:int ->
+         ~piecesize:int ->
+         ~piecenumber:int ->
+         I.iteratee char 'i
        )
       -> ~combine_it : (I.iteratee 'i 'a)
       -> I.iteratee char (option 'a)
 
     value (read_data_lens :
       ~get_piece_it :
-       ( ~ntotal:ntotal -> ~npart:npart -> ~nbytes:nbytes
-         -> I.iteratee char 'i
+       ( ~totalsize:int64 -> ~totalcount:int ->
+         ~piecesize:int -> ~piecenumber:int ->
+         I.iteratee char 'i
        )
       -> ~combine_it : (I.iteratee 'i 'a)
-      -> ~ntotal : ntotal
-      -> ~lens : list nbytes
+      -> ~totalsize : int64
+      -> ~lens : list int
+      -> ~totalcount : int
       -> I.iteratee char 'a
     )
       ~get_piece_it
       ~combine_it
-      ~ntotal
+      ~totalsize
       ~lens
+      ~totalcount
      =
-      let rec read_data_lens ~combine_it ~lens ~npart : I.iteratee char 'a =
-        match (lens, (combine_it : I.iteratee _ _)) with
-        [
-(*
- ([], _) ->
+      let rec read_data_lens ~combine_it ~lens ~piecenumber
+       : I.iteratee char 'a
+       =
+        match (lens, combine_it) with
+        [ ([], _) ->
            read_x0A >>= fun () ->
            I.joinI (I.return combine_it)
         | ([len :: lens], (I.IE_done _ | I.IE_cont (Some _) _)) ->
             I.drop len >>= fun () ->
             read_x0A >>= fun () ->
-            read_data_lens ~combine_it ~lens ~npart:(npart + 1)
-        |
-*)
-
- ([len :: _lens], I.IE_cont None k) ->
-            let this_piece_it : I.iteratee char _ = get_piece_it
-              ~ntotal ~npart ~nbytes:len in
-            (I.joinI & I.take len this_piece_it) >>= fun _this_it_framed ->
-            let () = ignore (_this_it_framed = ()) in
-
-(*
-            this_it_framed >>= fun (this_piece_res : _) ->
+            read_data_lens ~combine_it ~lens ~piecenumber:(piecenumber + 1)
+        | ([len :: lens], I.IE_cont None k) ->
+            let this_piece_it : I.iteratee char 'i = get_piece_it
+              ~totalsize ~piecenumber ~piecesize:len ~totalcount in
+            (I.joinI & I.take len this_piece_it) >>= fun this_piece_res ->
             read_x0A >>= fun () ->
 
-            I.liftI (I.feedI k (I.chunk_of this_piece_res))
-              >>= fun _combine_it ->
-*)
+            I.lift (I.feedI k (I.chunk_of this_piece_res))
+              >>= fun combine_it ->
 
- failwith "qq"
-(*
-            read_data_lens ~combine_it ~lens ~npart:(npart + 1)
-*)
-(*
-*)
+            read_data_lens ~combine_it ~lens ~piecenumber:(piecenumber + 1)
         ]
       in
-        read_data_lens ~combine_it ~npart:0 ~lens
+        read_data_lens ~combine_it ~piecenumber:0 ~lens
     ;
 
-(**
 
-**)
-
-
-(*
-    value read_data ~get_piece_it ~combine_it packet_length lens =
+    value read_data ~get_piece_it ~combine_it ~totalsize ~lens =
       let sum_lens = List.fold_left Int64.add 0L
         (List.map Int64.of_int lens) in
-      if packet_length <> sum_lens
+      if totalsize <> sum_lens
       then ds_error
         "total length (%Li) is not equal to sum of pieces' lengths (%Li)"
-          packet_length sum_lens
+          totalsize sum_lens
       else
-        read_data_lens ~get_piece_it ~combine_it ~ntotal:packet_length ~lens
+        let totalcount = List.length lens in
+        read_data_lens
+          ~get_piece_it ~combine_it ~totalsize
+          ~totalcount ~lens
     ;
-*)
 
 
     value read_lengths count : I.iteratee char (list int) =
         else I.return (Int64.to_int x)
     ;
 
-(*
     value read_some ~get_piece_it ~combine_it : I.iteratee _ _ =
       read_uint64 >>= fun packet_length ->
       let read_data_gen = read_data ~get_piece_it ~combine_it
       in
       let read_ver1 () =
         int_of_int64 "read_ver1" packet_length >>= fun pl ->
-        read_data_gen packet_length [pl]
+        read_data_gen ~totalsize:packet_length ~lens:[pl]
 
       and read_ver2 () =
         read_uint >>= fun count ->
         else
         let () = assert (count = List.length lengths) in
         read_x0A >>= fun () ->
-        read_data_gen packet_length lengths
+        read_data_gen ~totalsize:packet_length ~lens:lengths
 
       in
       I.peek >>= fun
           "space or \\x0A expected after packet length"
       ]
     ;
-*)
+
 
     value (read :
       ~get_piece_it :
-       ( ~ntotal:ntotal -> ~npart:npart -> ~nbytes:nbytes
-         -> I.iteratee char 'i
+       ( ~totalsize:int64 ->
+         ~totalcount:int ->
+         ~piecesize:int ->
+         ~piecenumber:int ->
+         I.iteratee char 'i
        )
       -> ~combine_it : (I.iteratee 'i 'a)
       -> I.iteratee char (option 'a)
     )
      ~get_piece_it ~combine_it
      =
-      let () = ignore combine_it in
-      let () = ignore get_piece_it in
       I.peek >>= fun
       [ None | Some '\x0A' -> I.return None
       | Some _ ->
           I.catch
             (fun () ->
-(*
                read_some ~get_piece_it ~combine_it >>= fun (r : _) ->
                I.return (Some r)
-*)
-I.return None
             )
             (fun e -> ds_error "%s" & Printexc.to_string e
             )
       ]
     ;
 
+
   end
 ;
 
 
 
+value gather_to_string = I.break_chars (fun _ -> False)
+;
+
+
 value test_read_lengths count string =
   let stream = I.Chunk (I.Subarray.of_string string) in
   let () = Printf.printf "%S -> %!" string in
   match IO.runIO
     ( (enum1 stream
         (reader >>= fun r ->
-         I.break_chars (fun _ -> False) >>= fun rest ->
+         gather_to_string >>= fun rest ->
          I.return (r, rest)
         )
       )
 ; test_read_lengths 3 " 12 34 56 78 90"
 )
 ;
+
+
+value test_read_string = "\
+12 3 2 4 6\n\
+ab\n\
+cdef\n\
+ghijkl\n\
+\n\
+the-rest"
+;
+
+
+value test_read () =
+  let stream = I.Chunk (I.Subarray.of_string test_read_string) in
+  let ( >>= ) = I.( >>= ) in
+  let it_res : I.iteratee char (option (list string)) =
+    D.read
+      ~get_piece_it :
+        (fun ~totalsize ~totalcount ~piecesize ~piecenumber ->
+           gather_to_string >>= fun str ->
+           let msg = Printf.sprintf "[%i*%Li,#%i*%i]<%s>"
+             totalcount totalsize piecenumber piecesize str in
+           (* let () = Printf.eprintf "get_piece_it: %s\n%!" msg in *)
+           I.return msg
+        )
+      ~combine_it : I.stream2list
+  in
+  match IO.runIO
+    ( (enum1 stream
+        (it_res >>= fun r ->
+         gather_to_string >>= fun rest ->
+         I.return (r, rest)
+        )
+      )
+      >>% I.run
+    )
+  with
+  [ `Ok (r, rest) -> Printf.printf "ok [%s], rest = %S\n%!"
+      (match r with
+       [ None -> "None"
+       | Some r ->
+           (String.concat " + " r)
+       ])
+       rest
+  | `Error e -> Printf.printf "error \"%s\"\n%!"
+      (printexc e)
+  ]
+;
+
+
+value () = test_read ()
+;