Commits

jhwoodyatt  committed 023a71c

Yet another reengineering of the [Mime_stream] module.

  • Participants
  • Parent commits e48d0e8

Comments (0)

Files changed (4)

 
 + ALL FILES NEED OCAMLDOC!
 
-+ (unimplemented): Many optional features.
-
 + (Mime_multipart): Need code for parsing and emitting multipart boundaries.
 
 + (Mime_multipart_stream):
     Inherit from Iom_octet_stream.scanner and emitter and provide convenience
     factories for isimplex, osimplex and duplex reactors that have
     octet stream plugs on one end and mime stream jacks on the other.
+
++ (unimplemented): Many optional features.
     
 # End of open issues

File mime/mime_stream.ml

 (*---------------------------------------------------------------------------*
   EVENT
  *---------------------------------------------------------------------------*)
-type 'adapter event = {
-    headers: 'adapter;
-    flowTx: Iom_stream.readywait Iom_gadget.tx;
-    bodyRx: Iom_octet_stream.fragment Iom_gadget.rx;
-} constraint 'adapter = #Mime_entity.headers
+type 'headers event =
+    'headers * Iom_octet_stream.fragment Iom_gadget.rx
+    constraint 'headers = #Mime_entity.headers
+
+type entity_scan_event = Mime_entity.basic_scan_adapter event
+type entity_emit_event = Mime_entity.basic_emit_adapter event
 
 (*---------------------------------------------------------------------------*
   SCANNING
  *---------------------------------------------------------------------------*)
-exception Header_too_long
+type scan_error =
+    | X_header_too_long
+    | X_header_incomplete
 
-class virtual ['event] scanner ~lim flowTx bodyRx mimeTx =
+let scan_error_to_string = function
+    | X_header_too_long -> "header too long"
+    | X_header_incomplete -> "header incomplete"
+
+exception Scan_error of scan_error
+
+class virtual ['event] scanner ?limits flowTx bodyRx mimeTx =
+    let bodyRx = (bodyRx :> 'fragment Iom_gadget.rx) in
     object(self:'self)
         constraint 'adapter = #Mime_entity.basic_scan_adapter
         constraint 'event = 'adapter event
         inherit [Iom_octet_stream.fragment, 'event] Iom_octet_stream.scanner
-            flowTx bodyRx mimeTx as super
+            ?limits flowTx bodyRx mimeTx as super
+        
+        val pass_ = false
+        val bodyTx_: 'fragment Iom_gadget.tx option = None
         
         method virtual private headers:
             (string * char Cf_seq.t) list -> 'adapter
                 let headers, body = Mime_lex.split_into_fields msg in
                 let blength = Cf_message.length body in
                 let slength = tlength - blength in
-                let event = {
-                    headers = self#headers headers;
-                    bodyRx =
-                        (bodyRx :> Iom_octet_stream.fragment Iom_gadget.rx);
-                    flowTx = (flowTx :> Iom_stream.readywait Iom_gadget.tx);
-                } in
+                let event = self#headers headers, bodyRx in
                 Some ((event, slength), Cf_seq.nil)
             with
-            | End_of_file when tlength > lim ->
-                raise Header_too_long
+            | End_of_file when tlength > limits_.Iom_octet_stream.high ->
+                raise (Scan_error X_header_too_long)
+            | End_of_file when more_ = Iom_stream.Last ->
+                raise (Scan_error X_header_incomplete)
             | End_of_file ->
                 None
-
-        method shift =
-            super#shift >>= fun v ->
-            Cf_cmonad.return begin
-                match v with
-                | Cf_exnopt.U (_ :: _ as v, _) ->
-                    Cf_exnopt.U (v, None)
-                | r ->
-                    r
+        
+        method private shift_body bodyTx consume buffer =
+            let m = Cf_message.drain buffer in
+            let fragment = new Iom_octet_stream.fragment more_ m in
+            bodyTx#put fragment >>= fun () ->
+            if more_ = Iom_stream.Last then
+                Cf_cmonad.return (Cf_exnopt.U None)
+            else begin
+                let obj =
+                    match bodyTx_ with
+                    | None ->
+                        {< consume_ = true; buffer_ = Cf_deque.nil; mark_ = 0;
+                           bodyTx_ = Some bodyTx >}
+                    | Some bodyTx' ->
+                        assert (bodyTx' == bodyTx);
+                        {< consume_ = true; buffer_ = Cf_deque.nil;
+                           mark_ = 0; >}
+                in
+                let return = Cf_cmonad.return (Cf_exnopt.U (Some obj)) in
+                if consume then
+                    return
+                else begin
+                    flowTx#put `Ready >>= fun () ->
+                    return
+                end
             end
+        
+        method private shift_header =
+            match try Cf_exnopt.U self#parse with x -> Cf_exnopt.X x with
+            | Cf_exnopt.X _ as v ->
+                Cf_cmonad.return v
+            | Cf_exnopt.U None ->
+                Cf_cmonad.return (Cf_exnopt.U (Some self))
+            | Cf_exnopt.U (Some ([], _))
+            | Cf_exnopt.U (Some (_ :: _ :: _, _)) ->
+                assert (not true);
+                Cf_cmonad.return (Cf_exnopt.U None)
+            | Cf_exnopt.U (Some ((h, _) :: [], len)) ->
+                self#shift_octets len >>= fun (consume, buffer, _) ->
+                Iom_gadget.simplex >>= fun (bodyRx, bodyTx) ->
+                mimeTx#put (h, bodyRx) >>= fun () ->
+                self#shift_body bodyTx consume buffer
+        
+        method produce =
+            match produce_ with
+            | false ->
+                Cf_cmonad.return (Cf_exnopt.U (Some self))
+            | true ->
+                match bodyTx_ with
+                | Some bodyTx ->
+                    self#shift_body bodyTx consume_ buffer_
+                | None ->
+                    self#shift_header
     end
 
-class entity_scanner ~lim flowTx bodyRx mimeTx = object
+class entity_scanner ?limits flowTx bodyRx mimeTx = object
     inherit [Mime_entity.basic_scan_adapter event] scanner
-        ~lim flowTx bodyRx mimeTx
+        ?limits flowTx bodyRx mimeTx
     method private headers = new Mime_entity.basic_scan_adapter
 end
 
-class virtual ['event] message_scanner ~lim flowTx bodyRx mimeTx = object
+class virtual ['event] message_scanner ?limits flowTx bodyRx mimeTx = object
     constraint 'event = #Mime_entity.message_scan_adapter event
-    inherit ['event] scanner ~lim flowTx bodyRx mimeTx
+    inherit ['event] scanner ?limits flowTx bodyRx mimeTx
 end
 
 (*---------------------------------------------------------------------------*
              ?limits flowTx mimeRx bodyTx
 
         val source_ = E_src_entity (mimeRx :> 'event Iom_gadget.rx)
-        val flowTx_ = (flowTx :> Iom_stream.readywait Iom_gadget.tx)
-
-        method private push_octets data =
-            let length = Cf_message.length data in
-            let mark = mark_ + length in
-            let buffer = Cf_message.push data buffer_ in
-            let high = limits_.Iom_octet_stream.high in
-            if consume_ && not (mark < high) then begin
-                flowTx_#put `Wait >>= fun () ->
-                Cf_cmonad.return (false, buffer, mark)
-            end
-            else
-                Cf_cmonad.return (consume_, buffer, mark)
-        
-        method private shift_octets len =
-            let mark = if mark_ - len > 0 then mark_ - len else 0 in
-            let buffer = Cf_message.shiftq ~len buffer_ in
-            let low = limits_.Iom_octet_stream.low in
-            if not consume_ && mark < low then begin
-                flowTx_#put `Ready >>= fun () ->
-                Cf_cmonad.return (true, buffer, mark)
-            end
-            else
-                Cf_cmonad.return (consume_, buffer, mark)
 
         method private emit =
             let data = Cf_deque.A.to_list buffer_ in
             let m = new Iom_octet_stream.fragment more_ data in
             [ m ], Cf_message.length data
         
-        method push_headers e =
-            self#push_octets e.headers#emit >>= fun (consume, buffer, mark) ->
+        method push_event (headers, bodyRx) =
+            self#push_octets Iom_stream.More headers#emit >>=
+                fun (consume, buffer, mark) ->
             Cf_cmonad.return {<
                 consume_ = consume; buffer_ = buffer; mark_ = mark;
-                source_ = E_src_body e.bodyRx; flowTx_ = e.flowTx
+                source_ = E_src_body bodyRx
             >}
         
-        method push_body m =
-            self#push_octets m#data >>= fun (consume, buffer, mark) ->
+        method push_fragment m =
+            let more = m#more in
+            self#push_octets more m#data >>= fun (consume, buffer, mark) ->
             Cf_cmonad.return {<
                 consume_ = consume; buffer_ = buffer; mark_ = mark;
-                more_ = m#more;
+                more_ = more;
             >}
 
         method consume f =
                 | E_src_entity mimeRx ->
                     mimeRx#get begin fun event ->
                         try
-                            self#push_headers event >>= fun obj ->
+                            self#push_event event >>= fun obj ->
                             f (Cf_exnopt.U obj)
                         with
                         | x ->
                 | E_src_body bodyRx ->
                     bodyRx#get begin fun fragment ->
                         try
-                            self#push_body fragment >>= fun obj ->
+                            self#push_fragment fragment >>= fun obj ->
                             f (Cf_exnopt.U obj)
                         with
                         | x ->
     inherit ['event] emitter ?limits flowTx mimeRx bodyTx
 end
 
+let emit_wrapper fragmentTx =
+    let fragmentTx = (fragmentTx :> Iom_octet_stream.fragment Iom_gadget.tx) in
+    Iom_gadget.simplex >>= fun (eventRx, eventTx) ->
+    let eventTx = (eventTx :> entity_emit_event Iom_gadget.tx) in
+    Iom_gadget.start begin
+        Iom_gadget.guard begin
+            eventRx#get begin fun (headers, fragmentRx) ->
+                let m = headers#emit in
+                let fragment =
+                    new Iom_octet_stream.fragment Iom_stream.More m
+                in
+                fragmentTx#put fragment >>= fun () ->
+                Iom_gadget.wrap fragmentRx fragmentTx Cf_flow.nop
+            end
+        end
+    end >>= fun () ->
+    Cf_cmonad.return eventTx
+
+(*---------------------------------------------------------------------------*
+  PLUGS and JACKS
+ *---------------------------------------------------------------------------*)
+type ('i, 'c, 'n) iplug = ('i event, 'c, 'n) Iom_stream.iplug
+    constraint 'i = #Mime_entity.basic_scan_adapter
+
 type ('i, 'c, 'n) ijack = ('i event, 'c, 'n) Iom_stream.ijack
     constraint 'i = #Mime_entity.basic_scan_adapter
 
+type ('o, 'c, 'n) oplug = ('o event, 'c, 'n) Iom_stream.oplug
+    constraint 'o = #Mime_entity.basic_emit_adapter
+
 type ('o, 'c, 'n) ojack = ('o event, 'c, 'n) Iom_stream.ojack
     constraint 'o = #Mime_entity.basic_emit_adapter
 
+type ('i, 'o, 'c, 'n) ioplug = ('i event, 'o event, 'c, 'n) Iom_stream.ioplug
+    constraint 'adapter = #Mime_entity.basic_emit_adapter
+
 type ('i, 'o, 'c, 'n) iojack = ('i event, 'o event, 'c, 'n) Iom_stream.iojack
     constraint 'adapter = #Mime_entity.basic_emit_adapter
 
-type ('c, 'n) entity_scanner_jack =
+type ('c, 'n) entity_scan_plug =
+    (Mime_entity.basic_scan_adapter, 'c, 'n) iplug
+
+type ('c, 'n) entity_scan_jack =
     (Mime_entity.basic_scan_adapter, 'c, 'n) ijack
 
-type ('c, 'n) entity_emitter_jack =
+type ('c, 'n) entity_emit_plug =
+    (Mime_entity.basic_emit_adapter, 'c, 'n) oplug
+
+type ('c, 'n) entity_emit_jack =
     (Mime_entity.basic_emit_adapter, 'c, 'n) ojack
 
-type ('c, 'n) entity_io_jack =
+type ('c, 'n) entity_duplex_plug =
+    (Mime_entity.basic_scan_adapter, Mime_entity.basic_emit_adapter, 'c, 'n)
+    ioplug
+
+type ('c, 'n) entity_duplex_jack =
     (Mime_entity.basic_scan_adapter, Mime_entity.basic_emit_adapter, 'c, 'n)
     iojack
 
-let scan_entity ~lim ijack =
-    Iom_layer.ingest ijack (new entity_scanner ~lim)
+let scan_entity ?limits ijack =
+    Iom_layer.ingest ijack (new entity_scanner ?limits)
 
 let emit_entity ?limits ojack =
     Iom_layer.render ojack (new entity_emitter ?limits)
 
-let duplex_entities ~slim ?elim iojack =
+let duplex_entities ?slim ?elim iojack =
     Iom_layer.duplex iojack
-        (new entity_scanner ~lim:slim) (new entity_emitter ?limits:elim)
+        (new entity_scanner ?limits:slim) (new entity_emitter ?limits:elim)
 
 (*--- End of File [ mime_stream.ml ] ---*)

File mime/mime_stream.mli

 (*---------------------------------------------------------------------------*
   EVENT
  *---------------------------------------------------------------------------*)
-type 'adapter event = {
-    headers: 'adapter;
-    flowTx: Iom_stream.readywait Iom_gadget.tx;
-    bodyRx: Iom_octet_stream.fragment Iom_gadget.rx;
-} constraint 'adapter = #Mime_entity.headers
+type 'headers event =
+    'headers * Iom_octet_stream.fragment Iom_gadget.rx
+    constraint 'headers = #Mime_entity.headers
+
+type entity_scan_event = Mime_entity.basic_scan_adapter event
+type entity_emit_event = Mime_entity.basic_emit_adapter event
 
 (*---------------------------------------------------------------------------*
   SCANNING
  *---------------------------------------------------------------------------*)
-exception Header_too_long
+type scan_error =
+    | X_header_too_long
+    | X_header_incomplete
+
+val scan_error_to_string: scan_error -> string
+
+exception Scan_error of scan_error
 
 class type virtual ['event] scanner = object('self)
     constraint 'adapter = #Mime_entity.basic_scan_adapter
     constraint 'event = 'adapter event
     inherit [Iom_octet_stream.fragment, 'event] Iom_octet_stream.scanner
 
+    val bodyTx_: Iom_octet_stream.fragment Iom_gadget.tx option
+
     method private virtual headers: (string * char Cf_seq.t) list -> 'adapter
 end
 
 class entity_scanner:
-    lim:int -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
+    ?limits:Iom_octet_stream.limits ->
+    [> Iom_stream.readywait ] #Iom_gadget.tx ->
     Iom_octet_stream.fragment #Iom_gadget.rx ->
     Mime_entity.basic_scan_adapter event #Iom_gadget.tx ->
     object('self)
     end
 
 class virtual ['event] message_scanner:
-    lim:int -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
+    ?limits:Iom_octet_stream.limits ->
+    [> Iom_stream.readywait ] #Iom_gadget.tx ->
     Iom_octet_stream.fragment #Iom_gadget.rx -> 'event #Iom_gadget.tx ->
     object('self)
         constraint 'event = #Mime_entity.message_scan_adapter event
     inherit ['event, Iom_octet_stream.fragment] Iom_octet_stream.emitter
     
     val source_: 'event emitter_source
-    val flowTx_: Iom_stream.readywait Iom_gadget.tx
 
-    method push_headers: 'event -> 'self Iom_gadget.t
-    method push_body: Iom_octet_stream.fragment -> 'self Iom_gadget.t
+    method push_event: 'event -> 'self Iom_gadget.t
+    method push_fragment: Iom_octet_stream.fragment -> 'self Iom_gadget.t
 end
 
 class entity_emitter:
         inherit ['event] emitter
     end
 
+val emit_wrapper:
+    Iom_octet_stream.fragment Iom_gadget.tx ->
+    entity_emit_event Iom_gadget.tx Iom_gadget.t
+
 (*---------------------------------------------------------------------------*
   PLUGS and JACKS
  *---------------------------------------------------------------------------*)
+type ('i, 'c, 'n) iplug = ('i event, 'c, 'n) Iom_stream.iplug
+    constraint 'i = #Mime_entity.basic_scan_adapter
+
 type ('i, 'c, 'n) ijack = ('i event, 'c, 'n) Iom_stream.ijack
     constraint 'i = #Mime_entity.basic_scan_adapter
 
+type ('o, 'c, 'n) oplug = ('o event, 'c, 'n) Iom_stream.oplug
+    constraint 'o = #Mime_entity.basic_emit_adapter
+
 type ('o, 'c, 'n) ojack = ('o event, 'c, 'n) Iom_stream.ojack
     constraint 'o = #Mime_entity.basic_emit_adapter
 
+type ('i, 'o, 'c, 'n) ioplug = ('i event, 'o event, 'c, 'n) Iom_stream.ioplug
+    constraint 'adapter = #Mime_entity.basic_emit_adapter
+
 type ('i, 'o, 'c, 'n) iojack = ('i event, 'o event, 'c, 'n) Iom_stream.iojack
     constraint 'adapter = #Mime_entity.basic_emit_adapter
 
-type ('c, 'n) entity_scanner_jack =
+type ('c, 'n) entity_scan_plug =
+    (Mime_entity.basic_scan_adapter, 'c, 'n) iplug
+
+type ('c, 'n) entity_scan_jack =
     (Mime_entity.basic_scan_adapter, 'c, 'n) ijack
 
-type ('c, 'n) entity_emitter_jack =
+type ('c, 'n) entity_emit_plug =
+    (Mime_entity.basic_emit_adapter, 'c, 'n) oplug
+
+type ('c, 'n) entity_emit_jack =
     (Mime_entity.basic_emit_adapter, 'c, 'n) ojack
 
-type ('c, 'n) entity_io_jack =
+type ('c, 'n) entity_duplex_plug =
+    (Mime_entity.basic_scan_adapter, Mime_entity.basic_emit_adapter, 'c, 'n)
+    ioplug
+
+type ('c, 'n) entity_duplex_jack =
     (Mime_entity.basic_scan_adapter, Mime_entity.basic_emit_adapter, 'c, 'n)
     iojack
 
 val scan_entity:
-    lim:int -> (Iom_octet_stream.fragment, 'c, 'n) Iom_stream.ijack ->
-    ('c, 'n) entity_scanner_jack Iom_gadget.t
+    ?limits:Iom_octet_stream.limits ->
+    (Iom_octet_stream.fragment, 'c, 'n) Iom_stream.ijack ->
+    ('c, 'n) entity_scan_jack Iom_gadget.t
 
-val emit_entity :
+val emit_entity:
     ?limits:Iom_octet_stream.limits ->
     (Iom_octet_stream.fragment, 'c, 'n) Iom_stream.ojack ->
-    ('c, 'n) entity_emitter_jack Iom_gadget.t
+    ('c, 'n) entity_emit_jack Iom_gadget.t
 
-val duplex_entities :
-    slim:int -> ?elim:Iom_octet_stream.limits ->
+val duplex_entities:
+    ?slim:Iom_octet_stream.limits -> ?elim:Iom_octet_stream.limits ->
     (Iom_octet_stream.fragment, Iom_octet_stream.fragment, 'c, 'n)
         Iom_stream.iojack ->
-    ('c, 'n) entity_io_jack Iom_gadget.t
+    ('c, 'n) entity_duplex_jack Iom_gadget.t
 
 (*--- End of File [ mime_stream.mli ] ---*)

File mime/t/t_mime.ml

         with
         | End_of_file ->
             jout#fail "T2: End_of_file"
-        | Bad_field_name (line, m) ->
+        | Error (X_field_bad_syntax (line, m)) ->
             let str = Cf_message.contents m in
-            jout#fail "T2: Bad_field_name (%u, \"%s\")" line str
+            jout#fail "T2: bad field syntax (%u, \"%s\")" line str
 end
 
 module T3 = struct
             inherit Iom_gadget.next
             
             method private guard =
-                scanRx#get begin fun e ->
-                    let h = e.Mime_stream.headers in
+                scanRx#get begin fun (h, _) ->
                     h#force;
                     if h#content_transfer_encoding <> Some cte then
                         jout#fail "T4: content transfer encoding";
     let reactor k =
         let limits = { Iom_octet_stream.low = 256; high = 256 } in
         Iom_pipe.create ~limits k >>= fun (ijack, ojack) ->
-        Mime_stream.scan_entity ~lim:256 ijack >>= fun ijack ->
+        Mime_stream.scan_entity ~limits ijack >>= fun ijack ->
         let top = new top ijack ojack in
         top#start
     
                 let m = new Iom_octet_stream.fragment Iom_stream.Last body in
                 bodyTx#put m >>= fun () ->
                 let h = new Mime_entity.basic_emit_adapter ~ct ~cte [] in
-                let e = {
-                    Mime_stream.headers = h;
-                    flowTx = new Iom_gadget.tx Iom_gadget.null;
-                    bodyRx = bodyRx;
-                } in
-                emitTx#put e >>= fun () ->
+                emitTx#put (h, bodyRx) >>= fun () ->
                 ctrlTx#put `Ready >>= fun () ->
                 super#start
         end