Commits

Anonymous committed 028ea43

Iteratees_http: + it_multipart, and a lot of things to implement it

  • Participants
  • Parent commits de70c60

Comments (0)

Files changed (4)

 ;
 
 
-value dbgstream s =
+value dbgstream ?addon s =
   match s with
   [ EOF eopt ->
       Printf.sprintf "s:EOF{e=%s}" &
       [ None -> "None"
       | Some exn -> Printf.sprintf "Some{%s}" & Printexc.to_string exn
       ]
-  | Chunk b -> Printf.sprintf "s:Chunk{arr[%i],ofs=%i,len=%i}"
+  | Chunk b -> Printf.sprintf "s:Chunk{arr[%i],ofs=%i,len=%i%s}"
       (Array.length b.S.arr) b.S.ofs b.S.len
+      (match addon with
+       [ None -> ""
+       | Some a -> "," ^ a
+       ]
+      )
+  ]
+;
+
+
+value dbgstream_char ?body s =
+  match body with
+  [ None -> dbgstream ?addon:None s
+  | Some i ->
+      match s with
+      [ EOF _ -> ""
+      | Chunk arr ->
+          let (a1, _a2) = S.split_at i arr in
+          Printf.sprintf "body=%S.." (S.to_string a1)
+      ]
   ]
 ;
 
     value cons : stream 'el -> sl 'el -> sl 'el;
     value get_one_opt : sl 'el -> option (stream 'el);
     value one : stream 'el -> sl 'el;
+
+    value dbgsl : sl 'el -> string;
   end
  =
   struct
       | _ -> assert False
       ];
     value one s = cons s [];
+
+    value dbgsl sl =
+      Printf.sprintf "sl:[%s]"
+        (String.concat " ; " (List.map dbgstream sl))
+    ;
   end
 ;
 
 ;
 
 
+value rec ie_errorMsl e sl =
+  IO.return (IE_cont (Some e) (fun s -> ie_errorMsl e (Sl.one s)), sl)
+;
+
+
 (* +
    Almost unusable in OCaml, since value monomorphism/restriction(?)
    for function applications (for [ie_cont some_cont]) bound to
 value (stream2list : iteratee 'el (list 'el)) =
   IE_cont None (fun s -> step [] s
     where rec step rev_acc s =
-let () = dbg "s2l: step: acc=%i\n" & List.length rev_acc in
+      let () = dbg "s2l: step: acc=%i\n" & List.length rev_acc in
       match s with
       [ Chunk c ->
           if S.is_empty c
 ;
 
 
+value get_stream_eof : iteratee 'el (option (option err_msg)) =
+  IE_cont None (fun s ->
+    ie_doneM
+      (match s with
+       [ EOF opt_err -> Some opt_err
+       | Chunk _ -> None
+       ]
+      )
+      s
+  )
+;
+
 (* Check if the stream is finished or harbors an error *)
 
 value (is_stream_finished : iteratee 'el (option err_msg)) =
 ;
 
 
+
 (* Primitive iteratees: parser combinators *)
 
 (* The analogue of hs' List.break
 
 value (break_chars : (char -> bool) -> iteratee char string) cpred =
   mapI (fun b ->
-     let r = Buffer.contents b in (dbg "break_chars: b=%i, cont=%S\n"
-       (Obj.magic b) r; r)
+     let r = Buffer.contents b in
+     (* let () = dbg "break_chars: b=%i, cont=%S\n" (Obj.magic b) r in *)
+     r
   ) &
   prepend
     (fun () -> Buffer.create break_chars_buf_init_size)
   k str >>% (IO.return % fst)
 ;
 
+(* +
+   Feed iteratee if it wants to eat, otherwise leave it as is
+   (ignoring the stream).  The rest of the stream is ignored anyway.
+   Useful when iteratee is fed only when it is hungry.
+*)
+value feed_it :
+  iteratee 'el 'a ->
+  stream 'el ->
+  iteratee 'el 'a
+ =
+  fun it s ->
+    match it with
+    [ IE_done _ | IE_cont (Some _) _ -> it
+    | IE_cont None k ->
+        liftI (k s >>% fun (it, _) -> IO.return it)
+    ]
+;
+
 
 exception Itlist_empty;
 
 
 
 module Deque_stream
+ :
+  sig
+    type t 'el;
+    value empty : t 'el;
+    value cons : int -> stream 'el -> t 'el -> t 'el;
+    value cons_sl : sl 'el -> t 'el -> t 'el;
+    value snoc : t 'el -> int -> stream 'el -> t 'el;
+    value concat : t 'el -> sl 'el;
+    value destr_head : t 'el -> option ((int * stream 'el) * t 'el);
+    value is_empty : t 'el -> bool;
+  end
  =
   struct
 
-
-
-на первом шаге it_subseq_step должен выдавать None для "не нашли"
-и Some (то, что дают итераты, только в option) для "есть значение!"
+    (* очередь = deque of (ofs * chunk), где последний чанк не скопирован
+       (т.е. добавление чанка в конец вызывает сначала копирование
+       предыдущего чанка) -- можем рассчитывать только на нескопированность
+       последнего чанка.
+       копирование будет однократным, так как из хвоста чанки не берутся,
+       а только добавляются туда.
+    *)
+    type t 'el = list (int * stream 'el);
+
+    value empty = [];
+
+    (*  добавить чанк в конец с копированием предыдущего чанка
+        (копировать только кусок от ofs до len) *)
+    value rec snoc init ofs stream =
+      match init with
+      [ [] -> [(ofs, stream)]
+      | [(h_ofs, h_stream) :: []] ->
+          match h_stream with
+          [ Chunk c ->
+              let h_stream = Chunk (S.copy
+                (S.drop h_ofs c))
+              in
+                [(0, h_stream) :: [(ofs, stream)]]
+          | EOF _ -> assert False
+          ]
+      | [h :: ([_ :: _] as t)] -> [h :: snoc t ofs stream]
+      ]
+    ;
+
+    (* склеить очередь (копирование).  обязано скопировать последний чанк,
+       не являющийся eof, так как его могут поменять. *)
+    value rec concat q : sl _ =
+      let to_add ~allow_eof ~copy ofs h_stream =
+        match h_stream with
+        [ EOF _ -> if allow_eof then h_stream else assert False
+        | Chunk c ->
+            if ofs = 0 && not copy
+            then h_stream
+            else
+              let c = S.drop ofs c in
+              let c = if copy then S.copy c else c in
+              Chunk c
+        ]
+      in
+      match q with
+      [ [] -> Sl.empty
+      | [(h_ofs, h_stream) :: []] ->
+          Sl.one (to_add ~allow_eof:True ~copy:True h_ofs h_stream)
+      | [(h_ofs, h_stream) :: ([_ :: _] as tl)] ->
+          Sl.cons
+            (to_add ~allow_eof:False ~copy:False h_ofs h_stream)
+            (concat tl)
+      ]
+    ;
+
+    value destr_head q =
+      match q with
+      [ [] -> None
+      | [h :: t] -> Some (h, t)
+      ]
+    ;
+
+    value cons ofs stream tail =
+      match (stream, tail) with
+      [ (EOF _, [_ :: _]) -> assert False
+      | ((EOF _ | Chunk _), _) -> [(ofs, stream) :: tail]
+      ]
+    ;
+
+    value rec cons_sl sl tail =
+      match Sl.destr_head sl with
+      [ None -> tail
+      | Some (h_stream, t_sl) ->
+          cons 0 h_stream (cons_sl t_sl tail)
+      ]
+    ;
+
+    value is_empty t = (t = [])
+    ;
+
+  end
+;
+
+
+value fdbg fmt = Printf.ksprintf (Printf.eprintf "forms: %s\n%!") fmt
+;
+
+(*
+на первом шаге it_subseq_step должен выдавать None для ". не нашли"
+и Some (то, что дают итераты, только в option) для "! есть значение"
 или "? хочу ещё данных"
-!
-
+*)
 
 value break_subsequence
- : (S.t 'el -> int -> option (iteratee 'el 'a * stream 'el)) ->
+ : (S.t 'el -> int -> option (IO.m (iteratee 'el 'a * sl 'el))) ->
    iteratee 'el 'b ->
    iteratee 'el (option 'a * iteratee 'el 'b)
- = fun it_subseq_step it_proc_until ->
-
-
-очередь = deque of (ofs * chunk), где последний чанк не скопирован
-          (т.е. добавление чанка в конец вызывает сначала копирование
-          предыдущего чанка) -- можем рассчитывать только на нескопированность
-          последнего чанка.
-          копирование будет однократным, так как из хвоста чанки не берутся,
-          а только добавляются туда.
-должна уметь:
-snoc : stream -> q
-  добавить чанк в конец с копированием предыдущего чанка
-    (копировать только кусок от ofs до len)
-concat : q -> chunk
-  склеить очередь (копирование)
-split_at_head : q -> option (chunk * q)  (* None | Some (hd, tl) *)
-
-
-let rec break_subsequence q it_proc_until =
-  match it_proc_until with
-  [ IE_done _ | IE_cont (Some _) _ ->
-      (* если it_proc_until кончил -- возвращаем его и склеенную очередь *)
-      ie_doneM (None, it_proc_until) (Deque_stream.concat q)
-
-
-> [Monday, 30 July 2012 14:13:34 Дмитрий Гребенюк] Представим расклад:
-тестирующий итерат запросил чанки [1;2] ; [3;4], пока не нашёл нужную
-последовательность, запрашивает ещё, получает EOF, выдаёт ошибку
-(так как не нашёл нужного).  Что делаем: идём по смещениям дальше:
-обрабатываем [2]; [3;4]; EOF тестирующим итератом, тоже ошибка.
-Дальше надо продолжить обрабатывать [3;4]; EOF, но надо скормить чанк [1;2]
-обрабатывающему итерату.  И вот, он кончил, ему больше не надо данных.
-Чо делать?  Возвращать [3;4]; EOF как остаток.  И вот тут -- снова те же яйца.
-Возвратить можно либо [3;4], либо EOF, в качестве остатка необработанных
-данных.  Возвратим [3;4] -- нарушим закон "итерат, получающий EOF, должен
-возвратить значение либо ошибку", так как за этим итератом может быть
-"прибинжен" итерат, который возьмёт [3;4], ему будет мало, и он захочет ещё
-данных, тогда как вся комбинация обязана возвратить значение либо ошибку,
-но не "хочу ещё данных".  Если же вернём EOF, то проебём [3;4], и прибинженный
-итерат их не получит.
-
-
-
-
+ = fun it_subseq_step it_proc ->
+
+let rec break_subsequence q it_proc =
+  (* иначе -- loop_q с очередью *)
+  loop_q q it_proc
+
+and break_subsequence_ret q ~sub_res_opt it_proc =
+  ie_doneMsl (sub_res_opt, it_proc) (Deque_stream.concat q)
+
+and feed_proc it_proc s =
+  match it_proc with
+  [ IE_done _ | IE_cont (Some _) _ -> IO.return it_proc
   | IE_cont None k_proc ->
-      (* иначе -- loop_q с очередью *)
-      loop_q q k_proc
+      let () = fdbg "bs: feed_proc: %s" (dbgstream s) in
+      k_proc s >>% fun (it_proc, _sl) ->
+      IO.return it_proc
   ]
 
-and loop_q q k_proc =
-  match Deque_stream.split_at_head q with
-  [ None -> step0 k_proc
-  | Some (first_ofs, first_chunk) -> вызываем loop_index
+and loop_q q it_proc =
+  match Deque_stream.destr_head q with
+  [ None ->
+      let () = fdbg "bs: loop_q: empty q" in
+      ie_contM (step0 it_proc)
+  | Some ((first_ofs, first_stream), qtail) ->
+      let () = fdbg "bs: loop_q: first_stream = %s, first_ofs = %i"
+        (dbgstream first_stream) first_ofs in
+      match first_stream with
+      [ EOF _ ->
+          feed_proc it_proc first_stream >>% fun it_proc ->
+          ie_doneMsl (None, it_proc) (Sl.one first_stream)
+      | Chunk first_chunk ->
+          loop_index first_chunk first_ofs it_proc ~qtail
+      ]
   ]
 
-and step0 k_proc =
+and step0 it_proc =
   (* запрашиваем чанк, loop_index с первым чанком,
      смещением=0 и пустым хвостом.  вызывается из loop_q. *)
-  ie_cont & fun
-  [ Chunk c -> loop_index c 0 k_proc ~qtail:Deque_stream.empty
-  | (EOF _ ) as s -> ie_doneM (None, ie_cont k_proc) s
-  ]
-
-and loop_index first_chunk first_ofs k_proc ~qtail =
+  fun s ->
+    let () = fdbg "bs: step0: stream = %s" (dbgstream s) in
+    match s with
+    [ Chunk c -> loop_index c 0 it_proc ~qtail:Deque_stream.empty
+    | (EOF _ ) as s -> ie_doneM (None, it_proc) s
+    ]
+
+and loop_index (first_chunk : S.t _) first_ofs it_proc ~qtail =
 
   (* Кормить = *)
-  let feed_first_chunk ~cur_ofs:ofs k_proc =
-    (* кормим it_proc_until куском текущего чанка от начального до текущего
+  let feed_first_chunk ~cur_ofs it_proc =
+    (* кормим it_proc куском текущего чанка от начального до текущего
        смещения, имеющихся в значениях loop_index *)
-    .
+    let len = cur_ofs - first_ofs in
+    let () = assert (len >= 0) in
+    let () = fdbg "bs: feed_first_chunk: %i .. %i (len = %i)"
+      first_ofs cur_ofs len in
+    if len = 0
+    then IO.return it_proc
+    else
+      let stream = Chunk (S.sub first_chunk ~ofs:first_ofs ~len) in
+      feed_proc it_proc stream
+  in
 
   (* пытаемся последовательно по всем смещениям применить it_subseq_step: *)
   let first_len = S.length first_chunk in
   let rec loop_ofs ofs =
     let () = assert (ofs <= first_len) in
+    (* let () = fdbg "bs: loop_ofs: ofs = %i, first_ofs = %i" ofs first_ofs
+    in *)
     (* проверка на конец должна быть в начале, так как step1 может
        дать ofs=len *)
     if ofs = first_len
     then
       (* - когда дошли до конца чанка, *)
       (*   - Кормить *)
-      feed_first_chunk ~cur_ofs:ofs k_proc >>% fun (it_proc, _s_proc) ->
+      feed_first_chunk ~cur_ofs:ofs it_proc >>% fun it_proc ->
       (*   - идём дальше: break_subsequence по хвосту (там же оно выйдет,
              если итерат кончил), и остаток потока тут не важен *)
       break_subsequence qtail it_proc
     else
-      match it_subseq_step first_chunk first_ofs with
+      match it_subseq_step first_chunk ofs with
       [ None ->
           (* - пока ошибка, идём по чанку к следующему смещению. *)
-          loop_index first_chunk (first_ofs + 1) k_proc ~qtail
-      | Some (sq_it, sq_left) ->
+          (* let () = fdbg "bs: loop_ofs: None from subseq" in *)
+          loop_ofs (ofs + 1)
+      | Some io_sq_it_left ->
+          let () = fdbg "bs: loop_ofs: Some _ from subseq" in
+          io_sq_it_left >>% fun (sq_it, sq_left) ->
+          let () = fdbg "bs: loop_ofs: sq_left = %s"
+            (Sl.dbgsl sq_left)
+          in
           match sq_it with
-          [ IE_done _ ->
+          [ IE_done sub_res ->
              (* - когда результат от it_subseq_step -- *)
              (*   - Кормить *)
-             feed_first_chunk ~cur_ofs:ofs k_proc >>% fun (it_proc, _s_proc) ->
-             (*   - возвращаем его и результат от it_subseq_step (если
-                     кормёжка оставила кусок чанка -- игнорируем)
-             *)
-             .
-          | IE_cont (Some _) _ -> assert False
-              (* так как в первом шаге итерат должен вернуть None
-                 на случай типа-ошибки *)
-          | IE_cont None k_sub -> ...
+             feed_first_chunk ~cur_ofs:ofs it_proc >>% fun it_proc ->
+             (*   - возвращаем его и результат от it_subseq_step *)
+             let q = Deque_stream.cons_sl sq_left qtail in
+             break_subsequence_ret ~sub_res_opt:(Some sub_res) q it_proc
+          | IE_cont (Some _) _ -> ie_errorMsl (Invalid_argument
+              "break_sequence: it_subseq_step should return None on error")
+              Sl.empty
+          | IE_cont None k_sub ->
               (* - когда не ошибка, а "хочу ещё" от it_subseq_step -- *)
               (*   - Кормить *)
-              feed_first_chunk ~cur_ofs:ofs k_proc >>% fun (it_proc, s_proc) ->
-              match it_proc with
-              [ IE_done _ | IE_cont (Some _) _ ->
-                  (* - если после кормления есть результат от it_proc_until --
-                       возвращаем его, склеиваем остаток после кормления и
-                       хвост очереди, возвращаем всё *)
-                  .
-              | IE_cont None k_proc ->
-                  (* - если нет результата -- *)
-                  (*   - очередь формируем доклеиванием спереди текущего
-                         смещения и текущего чанка к хвосту очереди *)
-                  .
-                  (*   - очередь и продолжение от it_subseq_step передаём в
-                         step1 *)
-                  .
-              ]  (* match it_proc *)
+              feed_first_chunk ~cur_ofs:ofs it_proc >>% fun it_proc ->
+              step1 k_sub it_proc qtail
           ]  (* match sq_it *)
       ]  (* match it_subseq_step option *)
   in
-    loop first_ofs
+    loop_ofs first_ofs
 
 (* запросить ещё чанки для тестирования их продолжением k_sub *)
-and step1 k_sub k_proc q =
-  (* тут: очередь непуста *)
-  let () = assert (not (Deque_stream.is_empty q)) in
+and step1 k_sub it_proc q =
   (* запрашиваем чанк *)
+  ie_contM & fun s ->
+  let q = Deque_stream.snoc q 0 s in
+  (* дать s в k_sub, посмотреть на результат *)
+  k_sub s >>% fun (it_sub, sl_sub) ->
+  match it_sub with
+  [ IE_done r ->
+      break_subsequence_ret
+        (Deque_stream.cons_sl sl_sub Deque_stream.empty)
+        ~sub_res_opt:(Some r)
+        it_proc
+
+  | IE_cont (Some _) _ ->
+       (* - если ошибка -- loop_index со следующего смещения в первом чанке
+            очереди (может дать ofs=len) *)
+       match Deque_stream.destr_head q with
+       [ None -> assert False
+       | Some ((first_ofs, first_stream), qtail) ->
+           match first_stream with
+           [ EOF _ ->
+               let () = assert (Deque_stream.is_empty qtail) in
+               feed_proc it_proc first_stream >>% fun it_proc ->
+               break_subsequence_ret
+                 q
+                 ~sub_res_opt:None
+                 it_proc
+
+           | Chunk first_chunk ->
+               loop_index first_chunk (first_ofs + 1) it_proc ~qtail
+           ]
+       ]
+
+  | IE_cont None k_sub -> step1 k_sub it_proc q
+  ]
+in
   ie_cont & fun s ->
-  match s with
-  [ EOF _ ->
-      доклеить s в зад очереди
-
-      дать s в k_sub, посмотреть на результат.
-
-       - если ошибка -- loop_index со следующего смещения в первом чанке
-         очереди (может дать ofs=len)
-       - если результат -- возвращаем Some res, ie_cont k_proc, EOF _
-
-  | Chunk c ->
-      (* 1. доклеиваем в зад очереди (должно скопировать предыдущий чанк) *)
-      let q = Deque_stream.snoc_chunk c 0 q in
-      (* 2. пытаемся его применить к продолжению it_subseq_step *)
-      k_sub s 
-       - если ошибка -- loop_index со следующего смещения в первом чанке
-         очереди (может дать ofs=len)
-       - если результат -- возвращаем остаток потока, нетронутый it_subseq_step,
-         результат it_subseq_step и текущее состояние it_proc_until
-       - если "хочу ещё" -- step1
-
-
-
-
-       let rec break_subsequence ~q ~it_proc_until =
-         match it_proc_until with
-         [ IE_done _ | IE_cont (Some _) _ ->
-             return (None, it_proc_until)
-         | IE_cont None k ->
-             ie_cont & step ~q ~it_proc_until ~k
-         ]
-
-       and step ~q ~it_proc_until ~k s =
-         match s with
-         [ EOF _ -> ie_doneM (None, it_proc_until) s
-         | Chunk arr ->
-
-             let rec loop_q q =
-               if Deque.is_empty q
-               then
-                 ie_contM & break_subsequence ~q ~it_proc_until
-               else
-                 let (arr, ofs) = Deque.head q
-                 and qtail = Deque.tail q in
-                 match loop_index arr ofs ~qtail with
-                 [ None -> ie_cont & step ~q ~k ~it_proc_until
-                 | Some ofs ->
-                     let (will_feed, will_leave) = S.split_by ofs arr in
-                     feedI k will_feed >>% fun it_proc_until ->
-                     let q =
-                       if S.is_empty will_leave
-                       then qtail
-                       else Deque.cons will_leave qtail
-                     in
-                     loop_q q
+    let () = fdbg "bs: init: stream = %s" (dbgstream s) in
+    break_subsequence Deque_stream.(cons 0 s empty) it_proc
+;
+
+
+value rec it_ignore : iteratee 'el unit =
+  IE_cont None it_ignore_step
+and it_ignore_step = fun
+  [ (EOF _) as s -> ie_doneM () s
+  | Chunk _ -> ie_contM it_ignore_step
+  ]
+;
+
+
+value map_ready
+ : ! 'el1 'el2 'a .
+   iteratee 'el1 'a ->
+   iteratee 'el2 'a
+ = fun it ->
+     joinI & return it
+;
+
+
+value eof_to_res
+ : ! 'el1 'el2 'a .
+   iteratee 'el1 'a ->
+   option err_msg ->
+   iteratee 'el2 [= `Ok of 'a | `Error of err_msg ]
+ = fun it opt_err ->
+     match it with
+     [ IE_done a -> IE_done (`Ok a)
+     | IE_cont (Some e) _k -> IE_done (`Error e)
+     | IE_cont None k ->
+         lift
+           (IO.catch
+              (fun () ->
+                 k (EOF opt_err) >>% fun (it, _) ->
+                 match it with
+                 [ IE_done r -> IO.return (`Ok r)
+                 | IE_cont (Some e) _ -> IO.return (`Error e)
+                 | IE_cont None _ -> IO.error (Divergent_iteratee "eof_to_res")
                  ]
-
-             and loop_index arr ofs ~qtail =
-               if ofs = S.length arr
-               then Some ofs
-               else
-                 match try_index arr ofs ~qtail with
-                 а вот тут может прийти привет -- итерат
-                 скушал всё и надо вернуть в том числе
-                 остаток, склеенный из rest + qtail
-                 .
-
-             and try_index_step arr ofs ~qtail ~it_subseq_step =
-               (it_subseq_step arr ofs) >>% fun (it_sub, s) ->
-               match it_sub with
-               [ IE_done a ->
-                   ie_doneM (Some a, it_proc_until) (s +++ qtail)
-               | IE_cont (Some _) _ ->
-                   try_next arr ~new_ofs:(ofs + 1) ~qtail
-               .
-
-             and try_index ~qtail ~it_sub = .
-
-
-value iter_stream_list : list (stream 'el) ->
-  IO.m (iteratee 'el 'a * list (stream 'el)
-.
-
-value concat_stream_list : list (stream 'el) -> stream 'el
-.
-
-
-из loop_index возвращать смещение, до которого можно кормить данными
-вложенный итерат.  Там будет либо это смещение по итогу, либо запрос
-"хочу ещё данных", без вариантов.
-
-
-
-             and try_next ~new_ofs arr ~qtail =
-               if new_ofs = S.length arr
-               then Deque.tail qtail ...
-               else try_index ~ofs:new_ofs ~qtail arr
-
-             in
-               let q = Deque.snoc (arr, 0) q in
-               loop_q q
-         ]
-
-     (ignore (it_subseq_step (raise Exit) (raise Exit)); ignore it_proc_until; raise Exit)
-
+              )
+              (fun e -> IO.return (`Error e))
+           )
+     ]
 ;
 
 
-value rec ie_errorM e s = IO.return (IE_cont (Some e) (ie_errorM_cont e), s)
-  and ie_errorM_cont e = fun s -> ie_errorM e s
-;
-
-
-value proc_inside_boundary boundary it_proc : iteratee char unit =
-  (break_subsequence
-     (fun arr ofs ->
-       let rec cmp arr ~ofs ~i =
-         if i = String.length boundary
-         then `Yes ofs
+(* for break_subsequence *)
+
+value probe_string str
+ : S.t 'el -> int -> option (IO.m (iteratee char unit * sl char))
+ = fun arr ofs ->
+     let rec cmp arr ~ofs ~i =
+       if i = String.length str
+       then `Yes ofs
+       else
+         let len = S.length arr in
+         if ofs = len
+         then `Maybe i
          else
-           let len = S.length arr in
-           if i + ofs = len
-           then `Maybe i
-           else
-             if S.get arr ofs = boundary.[i]
-             then cmp arr ~ofs:(ofs + 1) ~i:(i + 1)
-             else `No
-       in
-       let rec it_loop arr ~ofs ~i : IO.m (iteratee char unit * stream char) =
-         match cmp arr ~ofs ~i with
-         [ `Yes ofs -> ie_doneM () (Chunk (snd (S.split_at ofs arr)))
-         | `No -> ie_errorM Not_found empty_stream
-         | `Maybe i ->
-             ie_contM & fun s ->
-             match s with
-             [ EOF eopt ->
-                 ie_errorM
-                   (match eopt with [ None -> End_of_file | Some e -> e ])
-                   s
-             | Chunk arr -> it_loop arr ~ofs:0 ~i
-             ]
-         ]
-       in
-         it_loop arr ~ofs ~i:0
-     )
-     it_proc
-  ) >>= fun ((), it_proc) ->
-  (lift & run it_proc) >>= return
-;
-
-
-
-
-(**********
-
-exception Form of string;
-
-value form_error fmt = Printf.ksprintf (fun s -> throw_err (Form s)) fmt
-;
-
-value it_proc_form
- : string ->
-   iteratee char 'a ->
-   iteratee 'a 'r ->
-   iteratee char 'r
- = fun boundary it_part it_fold ->
-
-     let read_line =
-       break_chars ( (=) '\r' ) >>= fun line ->
-       let ret () = return line in
-       peek >>= fun exp_cr_opt ->
-       match exp_cr_opt with
-       [ None -> ret ()
-       | Some exp_cr ->
-           let () = assert (exp_cr = '\r') in
-           peek >>= fun exp_lf_opt ->
-           match exp_lf_opt with
-           [ None -> ret ()
-           | Some exp_lf ->
-               junk >>= fun () ->
-               match exp_lf with
-               [ '\n' -> junk >>= ret
-               | c -> form_error "bad line ending: expected %c, found %c"
-                   '\n' c
-               ]
+           if S.get arr ofs = str.[i]
+           then cmp arr ~ofs:(ofs + 1) ~i:(i + 1)
+           else `No
+     in
+     let ret ofs_after =
+       let () = fdbg "probe_string: len=%i, dropping %i"
+         (S.length arr) ofs_after in
+       ie_doneM () (Chunk (S.drop ofs_after arr))
+     in
+     let rec it_first_step arr ~ofs ~i
+      : option (IO.m (iteratee char unit * sl char)) =
+       match cmp arr ~ofs ~i with
+       [ `Yes ofs_after -> Some (ret ofs_after)
+       | `No -> None
+       | `Maybe i ->
+           let () = fdbg "probe: cont from first step" in
+           Some (ie_contM & it_step ~i)
+       ]
+     and it_step ~i s =
+       match s with
+       [ EOF eopt ->
+           ie_errorMsl
+             (match eopt with [ None -> End_of_file | Some e -> e ])
+             (Sl.one s)
+       | Chunk arr ->
+           match cmp arr ~ofs:0 ~i with
+           [ `Yes ofs_after -> ret ofs_after
+           | `Maybe i -> ie_contM (it_step ~i)
+           | `No -> ie_errorMsl Not_found (Sl.one s)
            ]
        ]
      in
-
-     let read_part_headers =
-       let inner acc =
-         read_line >>= fun line ->
-         if line = ""
-         then return & List.rev acc
-         else inner [line :: acc]
-       in
-         inner []
+       it_first_step arr ~ofs ~i:0
+;
+
+
+(* returns last n items from stream *)
+value it_last
+ : int -> iteratee 'el (list 'el)
+ = fun n ->
+     let rec step q s =
+       match s with
+       [ EOF _ ->
+           let r =
+             (List.rev
+               (Queue.fold
+                  (fun acc el -> [el :: acc])
+                  []
+                  q
+               )
+             )
+           in
+           let () = assert (List.length r <= n) in
+           ie_doneM r s
+       | Chunk arr ->
+           let len = S.length arr in
+           let () =
+             if len >= n
+             then
+               ( Queue.clear q
+               ; let arr_last = S.drop (len - n) arr in
+                 let () = assert (S.length arr_last = n) in
+                 S.iter (fun el -> Queue.add el q) arr_last
+               )
+             else
+               ( S.iter
+                   (fun el ->
+                      ( Queue.push el q
+                      ; if Queue.length q > n
+                        then ignore (Queue.take q)
+                        else ()
+                      )
+                   )
+                   arr
+               )
+           in
+             ie_contM (step q)
+       ]
      in
-
-     (?alt?
-        match_boundary
-        fail if no boundary
-     ) >>= fun () ->
-     after_boundary >>= fun ab ->
-     match ab with
-     [ `Finished -> finish it_fold
-     | `Next ->
-         read_part_headers >>= fun part_headers ->
-
-
-
+       ie_cont (step (Queue.create ()))
 ;
-***************)
-
 
 
 end

iteratees_http.ml

         match read_hex str with
         [ None -> frame_err (exc ("Bad chunk size: " ^ str)) iter
         | Some size ->
-            let () = dbg "enum_chunk_decoded: frame %i (%x) bytes\n" size size in
+            let () = dbg
+              "enum_chunk_decoded: frame %i (%x) bytes\n" size size in
             getCRLF iter (
             take size iter >>= fun r ->
             getCRLF r (
     enum_chunk_decoded iter
 ;
 
+
+
+exception Multipart_error of string;
+
+value multipart_error fmt =
+  Printf.ksprintf (fun s -> throw_err (Multipart_error s)) fmt
+;
+
+value it_multipart
+ : string ->
+   (list string -> iteratee char 'a) ->
+   iteratee 'a 'r ->
+   iteratee char 'r
+ = fun (type a) (type r) boundary it_part (it_fold : iteratee a r) ->
+
+     let read_line =
+       break_chars ( (=) '\r' ) >>= fun line ->
+       let ret () = return line in
+       peek >>= fun exp_cr_opt ->
+       match exp_cr_opt with
+       [ None -> ret ()
+       | Some exp_cr ->
+           let () = assert (exp_cr = '\r') in
+           junk >>= fun () ->
+           peek >>= fun exp_lf_opt ->
+           match exp_lf_opt with
+           [ None -> ret ()
+           | Some exp_lf ->
+               junk >>= fun () ->
+               match exp_lf with
+               [ '\n' -> ret ()
+               | c -> multipart_error "bad line ending: expected %C, found %C"
+                   '\n' c
+               ]
+           ]
+       ]
+     in
+
+     let read_part_headers =
+       let rec inner acc =
+         read_line >>= fun line ->
+         let () = fdbg "read_part_headers: line = %S" line in
+         if line = ""
+         then return & List.rev acc
+         else inner [line :: acc]
+       in
+         inner []
+     in
+
+     let after_boundary =
+       head >>= fun c1 ->
+       head >>= fun c2 ->
+       match (c1, c2) with
+       [ ('-', '-') ->
+           head >>= fun c3 ->
+           head >>= fun c4 ->
+           match (c3, c4) with
+           [ ('\r', '\n') -> return `Finished
+           | _ -> multipart_error "after closing boundary: expected CR LF, \
+                              found %C %C"
+                             c3 c4
+           ]
+       | ('\r', '\n') -> return `Next
+       | _ -> multipart_error
+                "after boundary: expected either CR LF or \"--\", \
+                 found %C %C"
+                c1 c2
+       ]
+     in
+
+     let crlf_boundary = "\r\n" ^ boundary in
+
+     let proc_of_it_fold it_fold =
+       match it_fold with
+       [ IE_done _ | IE_cont (Some _) _ -> `Skip
+       | IE_cont None _ -> `Proc
+       ]
+     in
+
+     let fdbg_stream_char title =
+       ie_cont
+         (fun s ->
+            let () =
+            fdbg "stream: %s: %s" title (dbgstream_char ~body:10 s)
+            in
+            ie_doneM () s
+         )
+     in
+
+     let rec loop_boundaries (it_fold : iteratee a r) =
+       let () = fdbg "loop" in
+       fdbg_stream_char "before loop" >>= fun () ->
+       after_boundary >>= fun ab ->
+       match ab with
+       [ `Finished ->
+           let () = fdbg "  `Finished" in
+           fdbg_stream_char "after finished" >>= fun () ->
+           it_ignore (* must ignore, RFC 2046 *) >>= fun () ->
+           map_ready & feed_it it_fold (EOF None)
+       | `Next ->
+           let () = fdbg "  `Next" in
+           read_part_headers >>= fun part_headers ->
+           let proc = proc_of_it_fold it_fold in
+           break_subsequence
+             (probe_string crlf_boundary)
+             (match proc with
+              [ `Proc ->
+                  let () = fdbg "will read this part with it_part" in
+                  (it_part part_headers) >>= fun p ->
+                  let () = fdbg "it_part ok" in
+                  return & Some p
+              | `Skip ->
+                  let () = fdbg "will skip this part" in
+                  it_ignore >>= fun () ->
+                  let () = fdbg "part was skipped" in
+                  return None
+              ]
+             )
+           >>= fun (opt_boundary, opt_it_part) ->
+           fdbg_stream_char "after it_part" >>= fun () ->
+           match opt_boundary with
+           [ None ->
+               (* если границу не нашли *)
+               (* если opt_it_part = None -- не собирали результаты,
+                  значит даём it_fold'у multipart_error, вылетаем.
+                  если Some -- итерату it_part уже дали EOF в break_sequence,
+                  деинит сделан, значит даём .. см выше ..
+                *)
+               let err_msg = Multipart_error "expected boundary, found EOF" in
+               map_ready & feed_it it_fold (EOF (Some err_msg))
+           | Some () ->
+               (* если границу нашли (и прочитали) *)
+
+               eof_to_res opt_it_part None >>= fun res_part ->
+
+               match res_part with
+               [ `Ok None ->
+                   loop_boundaries it_fold
+               | `Ok (Some p) ->
+                   let it_fold = feed_it it_fold (chunk_of p) in
+                   loop_boundaries it_fold
+               | `Error e ->
+                   (* часть собрать не смогли по вине it_proc --
+                      передаём ошибку в it_fold и допринять части *)
+                   loop_boundaries & feed_it it_fold (EOF (Some e))
+               ]
+           ]
+       ]
+     in
+
+     let rec search_for_beginning () =
+       break_subsequence
+         (probe_string boundary)
+         (it_last 2)
+       >>= fun (opt_boundary, it_last2) ->
+       match opt_boundary with
+       [ None ->
+           let () = fdbg "sfb: None" in
+           map_ready & feed_it it_fold (EOF None)
+       | Some () ->
+           let () = fdbg "sfb: Some" in
+           eof_to_res it_last2 None >>= fun res_last2 ->
+           match res_last2 with
+           [ `Ok ([] | ['\r'; '\n']) ->
+               let () = fdbg "sfb: found good prefix" in
+               loop_boundaries it_fold
+           | `Ok _ ->
+               let () = fdbg "sfb: found bad prefix" in
+               fdbg_stream_char "sfb/bad" >>= fun () ->
+               search_for_beginning ()
+           | `Error _ ->
+               throw_err (Failure "unexpected error from it_last 2")
+           ]
+       ]
+     in
+       search_for_beginning ()
+;
+
+
 end;
           ~cur:(func cur s.arr.(i))
 ;
 
+value iter
+ : ('a -> unit) -> t 'a -> unit
+ = fun f s ->
+     fold L (fun () x -> let () = f x in ()) () s
+;
+
+
 value to_list s =
   fold R
     (fun acc elem -> [elem :: acc])
 ;
 
 
+(***********)
+
+exception Left of (exn * string)
+;
+
+
+value dump_after title =
+   get_stream_eof >>= fun opt_opt_err ->
+   let () = fdbg "after %s: %s"
+     title
+     (match opt_opt_err with
+      [ None -> "some data"
+      | Some None -> "usual EOF"
+      | Some (Some e) ->
+          Printf.sprintf "error: %s"
+            (Printexc.to_string e)
+      ]
+     )
+   in
+     return ()
+;
+
+
+value test_forms () =
+  let test1 =
+( "qwe\r\n-----------------------------7045176531256545735900303621\
+\r\nContent-Disposition: form-data; name=\"MAX_FILE_SIZE\"\r\n\r\n100000\r\n\
+-----------------------------7045176531256545735900303621\r\nContent-Disposit\
+ion: form-data; name=\"uploadedfile\"; filename=\"allchars\"\r\nContent-Type: \
+application/octet-stream\r\n\r\n\
+\x00\x01\x02\x03\x04\x05\x06\x07\b\t\n\x0B\x0C\r\x0E\x0F\x10\x11\x12\x13\x14\
+\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E\x1F !\"#$%&'()*+,-./0123456789:;<=>?\
+@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7F\x80\x81\
+\x82\x83\x84\x85\x86\x87\x88\x89\x8A\x8B\x8C\x8D\x8E\x8F\x90\x91\x92\x93\x94\
+\x95\x96\x97\x98\x99\x9A\x9B\x9C\x9D\x9E\x9F\xA0\xA1\xA2\xA3\xA4\xA5\xA6\xA7\
+\xA8\xA9\xAA\xAB\xAC\xAD\xAE\xAF\xB0\xB1\xB2\xB3\xB4\xB5\xB6\xB7\xB8\xB9\xBA\
+\xBB\xBC\xBD\xBE\xBF\xC0\xC1\xC2\xC3\xC4\xC5\xC6\xC7\xC8\xC9\xCA\xCB\xCC\xCD\
+\xCE\xCF\xD0\xD1\xD2\xD3\xD4\xD5\xD6\xD7\xD8\xD9\xDA\xDB\xDC\xDD\xDE\xDF\xE0\
+\xE1\xE2\xE3\xE4\xE5\xE6\xE7\xE8\xE9\xEA\xEB\xEC\xED\xEE\xEF\xF0\xF1\xF2\xF3\
+\xF4\xF5\xF6\xF7\xF8\xF9\xFA\xFB\xFC\xFD\xFE\xFF\
+\r\n-----------------------------7045176531256545735900303621--\r\n\
+qwe"
+, "-----------------------------7045176531256545735900303621"
+, [ ( [ "Content-Disposition: form-data; name=\"MAX_FILE_SIZE\""
+      ]
+    , "100000"
+    )
+  ; ( [ "Content-Disposition: form-data; name=\"uploadedfile\"; file\
+         name=\"allchars\""
+      ; "Content-Type: application/octet-stream"
+      ]
+    , "\
+\x00\x01\x02\x03\x04\x05\x06\x07\b\t\n\x0B\x0C\r\x0E\x0F\x10\x11\x12\x13\x14\
+\x15\x16\x17\x18\x19\x1A\x1B\x1C\x1D\x1E\x1F !\"#$%&'()*+,-./0123456789:;<=>?\
+@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7F\x80\x81\
+\x82\x83\x84\x85\x86\x87\x88\x89\x8A\x8B\x8C\x8D\x8E\x8F\x90\x91\x92\x93\x94\
+\x95\x96\x97\x98\x99\x9A\x9B\x9C\x9D\x9E\x9F\xA0\xA1\xA2\xA3\xA4\xA5\xA6\xA7\
+\xA8\xA9\xAA\xAB\xAC\xAD\xAE\xAF\xB0\xB1\xB2\xB3\xB4\xB5\xB6\xB7\xB8\xB9\xBA\
+\xBB\xBC\xBD\xBE\xBF\xC0\xC1\xC2\xC3\xC4\xC5\xC6\xC7\xC8\xC9\xCA\xCB\xCC\xCD\
+\xCE\xCF\xD0\xD1\xD2\xD3\xD4\xD5\xD6\xD7\xD8\xD9\xDA\xDB\xDC\xDD\xDE\xDF\xE0\
+\xE1\xE2\xE3\xE4\xE5\xE6\xE7\xE8\xE9\xEA\xEB\xEC\xED\xEE\xEF\xF0\xF1\xF2\xF3\
+\xF4\xF5\xF6\xF7\xF8\xF9\xFA\xFB\xFC\xFD\xFE\xFF\
+"
+    )
+  ]
+)
+  in
+    List.iter
+      (fun (body, boundary, expected) ->
+         match IO.runIO
+         ((enum_string
+             body
+             (     (H.it_multipart boundary
+                      (fun headers ->
+                         let () = fdbg "getting part" in
+                         gather_to_string >>= fun part ->
+                         (* let () = fdbg "got part: %S" part in *)
+                         dump_after "part" >>= fun () ->
+                         return (headers, part)
+                      )
+                      (stream2list >>= fun lst ->
+                       dump_after "all parts" >>= fun () ->
+                       return lst
+                      )
+                   )
+             )
+          ) >>% run
+         )
+         with
+         [ `Ok got ->
+             if expected <> got
+             then failwith "test failed"
+             else Printf.printf "test passed\n%!"
+         | `Error e ->
+             Printf.eprintf "forms: test failed with exception: %s" (msg e)
+             where rec msg e =
+               match e with
+               [ Left (e, s) ->
+                   sprintf "%s (stream left: %S)"
+                     (msg e) s
+               | H.Multipart_error s -> s
+               | Iteratees_err_msg e -> msg e
+               | e -> Printexc.to_string e
+               ]
+         ]
+      )
+      [test1]
+;
+
 
 value () =
   ( P.printf "TESTS BEGIN.\n"
 
   ; test_base64decode ()
 
+  ; test_forms ()
+
   ; P.printf "TESTS END.\n"
   );