Commits

Anonymous committed d2b6e1a

.

  • Participants
  • Parent commits 7bee57e

Comments (0)

Files changed (3)

 
 Protocol description:
 
+Int = '0' | '1'..'9' ['0'..'9']*
+
+  decimal representation of integers without leading zeroes.
+
+Ver1:
+
 Stream = Message* End-of-stream
 
-Message = DIGIT+ \x0A User-message \x0A \x0A
+Message = (Int as len) \x0A
+          User-message \x0A
+          \x0A
 
-  where DIGIT+ is decimal representation of User-message length
-  in bytes, no more than 10 digits, must not begin from '0'.
+  where len is User-message length in bytes.  One should not
+  generate messages with length more than 10 digits.
 
 End-of-stream = \x0A
+
+Ver2:
+
+Stream = Message* End-of-stream
+
+Piece_len = \x20 (Int as piece_len)
+
+Piece_data = User-message \x0A
+
+Message = (Int as total_len) \x20 (Int as pieces_count) Piece_len* \x0A
+          Piece_data*
+          \x0A
+
+  total_len is the length of all piece_len's (i.e. length of all
+  Piece_data elements excluding their \x0A).
+  One should not generate messages with total_len more than 10 digits.
+
+  pieces_count is the count of Piece_len and Piece_data elements, it must
+  be non-negative.
+
+  Every piece_len must be the length of corresponding User-message
+  in bytes.

File dumbstreaming_it.ml

 value dbg fmt = Printf.ifprintf stdout fmt
 ;
 
+value ( & ) f x = f x
+;
+
 module type IT_TYPE
  =
   sig
     ;
 
     value joinI :
-      iteratee 'el (iteratee 'el 'a) -> iteratee 'el 'a
+      iteratee 'el (iteratee 'el' 'a) -> iteratee 'el 'a
     ;
 
     value drop : int -> iteratee 'el unit
     value limit : int -> enumeratee 'el 'el 'a
     ;
 
+    value sequence_stream : iteratee 'elo 'eli
+      -> enumeratee 'elo 'eli 'a
+    ;
+
+    value catch :
+      (unit -> iteratee 'el 'a) ->
+      ( err_msg ->
+         iteratee 'el 'a
+      ) ->
+      iteratee 'el 'a
+    ;
+
+    value take : int -> enumeratee 'el 'el 'a
+    ;
+
+    value stream2list : iteratee 'el (list 'el)
+    ;
+
   end
 ;
 
     (* for tests: *)
     value read_uint_nz : I.iteratee char int;
 
+    value read_lengths : int -> I.iteratee char (list int)
+    ;
+
   end
  =
   struct
         value read_uint_nz : I.iteratee char int
         ;
 
+        (* leading zeroes accepted *)
+        value read_uint64 : I.iteratee char int64
+        ;
+
+        (* leading zeroes = error *)
+        value read_uint64_nz : I.iteratee char int64
+        ;
+
       end
      =
       struct
           end
         ;
 
+        module SInt64_T : SIGNED_INT with type t = int64
+         =
+          struct
+            type t = int64;
+            value max_int = Int64.max_int;
+            value of_int = Int64.of_int;
+            value to_int = Int64.to_int;
+            value ( + ) = Int64.add;
+            value ( - ) = Int64.sub;
+            value ( * ) = Int64.mul;
+            value ( / ) = Int64.div;
+            value ( <? ) a b = (Int64.compare a b) < 0;
+            value ( =? ) a b = (Int64.compare a b) = 0;
+          end
+        ;
+
         exception SInt_overflow;
         exception SInt_not_a_number of string;
 
         module SInt = SInt_F(SInt_T)
         ;
 
+        module SInt64 = SInt_F(SInt64_T)
+        ;
+
         (* leading zeroes = error *)
         value read_uint_nz
          : I.iteratee char int
           SInt.read_unsigned_gen ~allow0:True
         ;
 
+        (* leading zeroes = error *)
+        value read_uint64_nz
+         : I.iteratee char int64
+         =
+          SInt64.read_unsigned_gen ~allow0:False
+        ;
+
+        value read_uint64 =
+          SInt64.read_unsigned_gen ~allow0:True
+        ;
+
       end
     ;
 
 
     value read_uint = read_uint;
     value read_uint_nz = read_uint_nz;
+    value read_uint64 = read_uint64;
+    value read_uint64_nz = read_uint64_nz;
 
 
     value itpair_anyresult_lasterror it1 it2 =
       itlist_anyresult_lasterror [it1; it2]
     ;
 
-    value read_packet _get_piece_it _combine_it _firstchar =
-      failwith "kva"
+
+    exception Dumbstreaming of string
     ;
 
+    value ds_error fmt = Printf.ksprintf
+      (fun s -> I.throw_err (Dumbstreaming s))
+      fmt
+    ;
 
+    value ( >>= ) m f = I.bindI f m
+    ;
 
-    value read _get_piece_it _combine_it =
-      
-(*
-      (some /* mapI */ read_packet get_piece_it combine_it)
-      /* itpair_anyresult_lasterror */
-      (read_eos >>= fun () -> None)
+    value read_the_char c =
+      I.peek >>= fun
+      [ Some '\x20' -> I.drop 1
+      | Some c' -> ds_error "read_char: %C expected, %C found" c c'
+      | None -> ds_error "read_char: %C expected, eof found" c
+      ]
+    ;
+
+    value read_space = read_the_char '\x20'
+      and read_x0A = read_the_char '\x0A'
+    ;
+
+    value read_data ~get_piece_it ~combine_it packet_length lengths =
+      .
+
+
+      (ignore get_piece_it ; ignore combine_it
+       ; ignore lengths ; ignore packet_length ; failwith "kvava")
+    ;
+
+
+    value read_lengths count : I.iteratee char (list int) =
+
+      let read_len
+       : I.iteratee char int
+       = read_space >>= fun () -> read_uint
+      in
+
+      let enumerate_over_lens
+       : I.iteratee int (list int)
+         -> I.iteratee char (I.iteratee int (list int))
+       = I.sequence_stream read_len
+      in
+
+      let list_of_count
+       : I.iteratee int (list int)
+       = I.joinI (I.take count I.stream2list)
+      in
+
+      I.joinI (enumerate_over_lens list_of_count)
+
+(* ==
+      I.joinI
+        ( (I.sequence_stream (read_space >>= fun () -> read_uint))
+          (I.joinI (I.take count I.stream2list))
+        )
 *)
-      failwith "kva"
+    ;
 
+
+    value int_of_int64 =
+      let max = Int64.of_int Pervasives.max_int
+      and min = Int64.of_int Pervasives.min_int in
+      fun msg x ->
+        if x < min || x > max
+        then ds_error "int_of_int64: %s" msg
+        else I.return (Int64.to_int x)
+    ;
+
+    value read_some ~get_piece_it ~combine_it =
+      read_uint64 >>= fun packet_length ->
+      let read_data_gen = read_data ~get_piece_it ~combine_it
+      in
+      let read_ver1 () =
+        int_of_int64 "read_ver1" packet_length >>= fun pl ->
+        read_data_gen packet_length [pl]
+
+      and read_ver2 () =
+        read_uint >>= fun count ->
+        let () = assert (count >= 0) in
+        read_lengths count >>= fun lengths ->
+        let lengths_count = List.length lengths in
+        if lengths_count < count
+        then ds_error "too few pieces' lengths: expected %i, got %i"
+                      count lengths_count
+        else
+        let () = assert (count = List.length lengths) in
+        read_x0A >>= fun () ->
+        read_data_gen packet_length lengths
+
+      in
+      I.peek >>= fun
+      [ Some '\x20' -> junk >>= read_ver2
+      | Some '\x0A' -> junk >>= read_ver1
+      | Some _ | None -> ds_error
+          "space or \\x0A expected after packet length"
+      ]
+    ;
+
+    value read ~get_piece_it ~combine_it =
+      I.peek >>= fun
+      [ None | Some '\x0A' -> I.return None
+      | Some _ ->
+          I.catch
+            (fun () ->
+               read_some ~get_piece_it ~combine_it >>= fun r ->
+               I.return (Some r)
+            )
+            (fun e -> ds_error "%s" & Printexc.to_string e
+            )
+      ]
     ;
 
   end
     ; print_newline ()
     )
 ;
+
+
+
+
+value test_read_lengths count string =
+  let stream = I.Chunk (I.Subarray.of_string string) in
+  let () = Printf.printf "%S -> %!" string in
+  let reader = D.read_lengths count in
+  let ( >>= ) = I.( >>= ) in
+  match IO.runIO
+    ( (enum1 stream
+        (reader >>= fun r ->
+         I.break_chars (fun _ -> False) >>= fun rest ->
+         I.return (r, rest)
+        )
+      )
+      >>% I.run
+    )
+  with
+  [ `Ok (r, rest) -> Printf.printf "ok [%s], rest = %S\n%!"
+       (String.concat "; " (List.map string_of_int r))
+       rest
+  | `Error e -> Printf.printf "error \"%s\"\n%!"
+      (printexc e)
+  ]
+;
+
+value () =
+( print_newline ()
+; test_read_lengths 3 " 12 34 56 78 90"
+)
+;