Commits

Anonymous committed 2b61414

Checkpoint. Scanning and emitting of MIME entities and messages with Iom
reactors now supported. See ISSUES for remaining work to do.

  • Participants
  • Parent commits 0a2b036

Comments (0)

Files changed (9)

 mime_lex.cmi: mime_atom.cmi 
 mime_entity.cmi: mime_atom.cmi 
+mime_stream.cmi: mime_entity.cmi 
 mime_atom.cmo: mime_atom.cmi 
 mime_atom.cmx: mime_atom.cmi 
 mime_base64.cmo: mime_base64.cmi 
 mime_lex.cmx: mime_atom.cmx mime_lex.cmi 
 mime_entity.cmo: mime_lex.cmi mime_atom.cmi mime_entity.cmi 
 mime_entity.cmx: mime_lex.cmx mime_atom.cmx mime_entity.cmi 
-mime_stream.cmo: mime_entity.cmi mime_stream.cmi 
-mime_stream.cmx: mime_entity.cmx mime_stream.cmi 
-t/t_mime.cmo: mime_lex.cmi mime_entity.cmi mime_base64.cmi mime_atom.cmi 
-t/t_mime.cmx: mime_lex.cmx mime_entity.cmx mime_base64.cmx mime_atom.cmx 
+mime_stream.cmo: mime_lex.cmi mime_entity.cmi mime_stream.cmi 
+mime_stream.cmx: mime_lex.cmx mime_entity.cmx mime_stream.cmi 
+t/t_mime.cmo: mime_stream.cmi mime_lex.cmi mime_entity.cmi mime_base64.cmi \
+    mime_atom.cmi 
+t/t_mime.cmx: mime_stream.cmx mime_lex.cmx mime_entity.cmx mime_base64.cmx \
+    mime_atom.cmx 
 
 + (unimplemented): Many optional features.
 
-+ (Mime_stream): Inherit from Iom_octet_stream.parse and emit and provide
-    convenience factories for isimplex, osimplex and duplex reactors that have
++ (Mime_stream):
+    Provide convenience factories for isimplex, osimplex and duplex reactors
+    that have octet stream plugs on one end and mime stream jacks on the other.
+
++ (Mime_multipart): Need code for parsing and emitting multipart boundaries.
+
++ (Mime_multipart_stream):
+    A derivation of mime streams that carries events where the body is either a
+    single part entity like with [Mime_stream] or a multipart consisting of a
+    stream of mime entities.  
+    
+    Inherit from Iom_octet_stream.scanner and emitter and provide convenience
+    factories for isimplex, osimplex and duplex reactors that have
     octet stream plugs on one end and mime stream jacks on the other.
     
-    A mime stream carries exactly one event, delivered when the top-level
-    headers are presented.  The event includes a subclass of the mime entity
-    headers object and either an octet stream for the body or a stream of mime
-    entity streams for multipart messages.
-
 # End of open issues
 name="mime"
 version="0.0"
 description="OCaml NAE MIME Processing"
-requires="unix cf"
+requires="cf nx iom"
 archive(byte)="mime.cma"
 archive(native)="mime.cmxa"
 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
 # OF THE POSSIBILITY OF SUCH DAMAGE. 
 
-REQUIRE = cf iom
+REQUIRE = cf nx iom
 PREDICATES =
 
 ###############################################################################
 opt:: $(TEST_OPT_PROGRAMS)
 
 t.% : t/t_%.ml $(TEST_LIBS:%=%.cma)
-	$(OCAMLC) -o $@ $(CMO_OPT) $(DEBUG_OPT) $(TEST_LINKOPT) \
-        $(TEST_LIBS:%=%.cma) $<
+	$(OCAMLC) -o $@ $(CMO_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cma) $<
 
 t-opt.% : t/t_%.ml $(TEST_LIBS:%=%.cmxa)
 	$(OCAMLOPT) -o $@ $(CMX_OPT) $(TEST_LINKOPT) $(TEST_LIBS:%=%.cmxa) $<

mime/mime_entity.ml

     method unstructured: (string * string) list
 end
 
+class type mime_version = object
+    method mime_version: unit
+end
+
+class type force = object
+    method force: unit
+end
+
 module S = struct
+    let mime_version = "MIME-Version"
     let content_type = "Content-Type"
     let content_disposition = "Content-Disposition"
     let content_transfer_encoding = "Content-Transfer-Encoding"
     let content_id = "Content-Id"
 end
 
-class headers_parser =
-    let p_end_of_field =
-        L.parse_optcfws >>= fun () ->
-        L.parse_crlf >>= fun () ->
-        P.fin
-    in
+let p_end_of_field =
+    L.parse_optcfws >>= fun () ->
+    L.parse_crlf >>= fun () ->
+    P.fin
+
+class basic_scan_adapter =
     let p_content_type =
         L.parse_token >>= fun ct_type ->
         match ct_type_of_string ct_type with
                 end fields
             end
         in
-        object(_:'self)
-            constraint 'self = #headers
-    		            
+        object(_:'self)    		            
             method content_type = Lazy.force ctz
             method content_disposition = Lazy.force cdiz
             method content_transfer_encoding = Lazy.force ctez            
                 ()
         end
 
-let parse_headers msg =
+let scan_basic_headers msg =
     let headers, body = Mime_lex.split_into_fields msg in
-    let hp = new headers_parser headers in
+    let hp = new basic_scan_adapter headers in
     hp#force;
     hp, body
 
-class headers_emitter =
+class message_scan_adapter =
+    let p_mime_version =
+        Cf_parser.lit "1.0" () >>= fun () ->
+        p_end_of_field
+    in
+    fun fields ->
+        let _, fields = L.select_field S.mime_version p_mime_version fields in
+        object
+            inherit basic_scan_adapter fields
+            method mime_version = ()
+        end
+
+let scan_message_headers f msg =
+    let headers, body = Mime_lex.split_into_fields msg in
+    let parse = f headers in
+    parse#force;
+    parse, body
+
+class type emit = object
+    method private emit_structured: Format.formatter -> unit
+    method private emit_unstructured: Format.formatter -> unit
+    method private emit_headers: Format.formatter -> unit
+    
+    method emit: Cf_message.t
+end
+
+class basic_emit_adapter =
     let e_content_type pp ct =
         Format.fprintf pp "%s/%s" (ct_type_to_string ct.ct_type) ct.ct_subtype;
         L.emit_parameters pp ct.ct_parameters
             [ str, 0, len ]
     end
 
-let emit_headers ?ct ?cdi ?cte ?cde ?cl ?cid u =
+let e_mime_version pp () = Format.pp_print_string pp "1.0"
+
+class message_emit_adapter ?ct ?cdi ?cte ?cde ?cl ?cid u =
     object
-    	inherit headers_emitter ?ct ?cdi ?cte ?cde ?cl ?cid u as super
+        inherit basic_emit_adapter ?ct ?cdi ?cte ?cde ?cl ?cid u as super
+        
+        method private emit_headers pp =
+            L.emit_field e_mime_version pp S.mime_version ();
+            super#emit_headers pp
+        
+        method mime_version = ()
+    end
+
+let emit_basic_headers ?ct ?cdi ?cte ?cde ?cl ?cid u =
+    object
+    	inherit basic_emit_adapter ?ct ?cdi ?cte ?cde ?cl ?cid u as super
     	val mutable text_ = lazy (raise Lazy.Undefined)
     	method emit = Lazy.force text_
     	initializer text_ <- Lazy.lazy_from_val super#emit

mime/mime_entity.mli

     method unstructured: (string * string) list
 end
 
-class headers_parser: (string * char Cf_seq.t) list -> object
-    inherit headers
+class type mime_version = object
+    method mime_version: unit
+end
+
+class type force = object
     method force: unit
 end
 
-val parse_headers: Cf_message.t -> headers_parser * Cf_message.t
+class basic_scan_adapter: (string * char Cf_seq.t) list -> object
+    inherit headers
+    inherit force
+end
 
-class headers_emitter:
+val scan_basic_headers: Cf_message.t -> basic_scan_adapter * Cf_message.t
+
+class message_scan_adapter:
+    (string * char Cf_seq.t) list ->
+    object
+        inherit basic_scan_adapter
+        inherit mime_version
+    end
+
+val scan_message_headers:
+    ((string * char Cf_seq.t) list -> (#message_scan_adapter as 'adapter)) ->
+    Cf_message.t -> 'adapter * Cf_message.t
+
+class type emit = object
+    method private emit_structured: Format.formatter -> unit
+    method private emit_unstructured: Format.formatter -> unit
+    method private emit_headers: Format.formatter -> unit
+    
+    method emit: Cf_message.t
+end
+
+class basic_emit_adapter:
     ?ct:ct_t -> ?cdi:cd_t -> ?cte:cte_t -> ?cde:string -> ?cl:int64 ->
     ?cid:(string * string) -> (string * string) list ->
     object
         inherit headers
-        
-        method private emit_structured: Format.formatter -> unit
-        method private emit_unstructured: Format.formatter -> unit
-        method private emit_headers: Format.formatter -> unit
-        
-        method emit: Cf_message.t
+        inherit emit
     end
 
-val emit_headers:
+class message_emit_adapter:
     ?ct:ct_t -> ?cdi:cd_t -> ?cte:cte_t -> ?cde:string -> ?cl:int64 ->
-    ?cid:(string * string) -> (string * string) list -> headers_emitter
+    ?cid:(string * string) -> (string * string) list ->
+    object
+        inherit basic_emit_adapter
+        inherit mime_version
+    end
+
+val emit_basic_headers:
+    ?ct:ct_t -> ?cdi:cd_t -> ?cte:cte_t -> ?cde:string -> ?cl:int64 ->
+    ?cid:(string * string) -> (string * string) list -> basic_emit_adapter
 
 (*--- End of File [ mime_entity.mli ] ---*)

mime/mime_stream.ml

 
 open Cf_cmonad.Op
 
+(*---------------------------------------------------------------------------*
+  EVENT
+ *---------------------------------------------------------------------------*)
+type 'adapter event = {
+    headers: 'adapter;
+    flowTx: Iom_stream.readywait Iom_gadget.tx;
+    bodyRx: Iom_octet_stream.fragment Iom_gadget.rx;
+} constraint 'adapter = #Mime_entity.headers
+
+(*---------------------------------------------------------------------------*
+  SCANNING
+ *---------------------------------------------------------------------------*)
 exception Header_too_long
 
-type 'headers r_event =
-    | R_event of 'headers * r_body
-    constraint 'headers = #Mime_entity.headers
-and r_body =
-    | R_body of Iom_octet_stream.fragment Iom_gadget.rx
-    | R_part of (Iom_stream.more * Mime_entity.headers r_event) Iom_gadget.rx
-    
-class ['headers] parse flowTx readRx mimeTx = object
-    inherit [Iom_octet_stream.fragment, 'headers r_event]
-        Iom_octet_stream.parse flowTx readRx mimeTx as super
+class virtual ['event] scanner ~lim flowTx bodyRx mimeTx =
+    object(self:'self)
+        constraint 'adapter = #Mime_entity.basic_scan_adapter
+        constraint 'event = 'adapter event
+        inherit [Iom_octet_stream.fragment, 'event] Iom_octet_stream.scanner
+            flowTx bodyRx mimeTx as super
         
-    method lex (_ : Cf_message.t) (_ : char Cf_seq.t) =
-        (None : (('headers r_event * int) * char Cf_seq.t) option)
-    
-    (* what about flowRx? probably no need (lower levels do that) *)
+        method virtual private headers:
+            (string * char Cf_seq.t) list -> 'adapter
+                
+        method private lex msg _ =
+            let tlength = Cf_message.length msg in
+            try
+                let headers, body = Mime_lex.split_into_fields msg in
+                let blength = Cf_message.length body in
+                let slength = tlength - blength in
+                let event = {
+                    headers = self#headers headers;
+                    bodyRx =
+                        (bodyRx :> Iom_octet_stream.fragment Iom_gadget.rx);
+                    flowTx = (flowTx :> Iom_stream.readywait Iom_gadget.tx);
+                } in
+                Some ((event, slength), Cf_seq.nil)
+            with
+            | End_of_file when tlength > lim ->
+                raise Header_too_long
+            | End_of_file ->
+                None
+
+        method shift =
+            super#shift >>= fun v ->
+            Cf_cmonad.return begin
+                match v with
+                | Cf_exnopt.U (_ :: _ as v, _) ->
+                    Cf_exnopt.U (v, None)
+                | r ->
+                    r
+            end
+    end
+
+class entity_scanner ~lim flowTx bodyRx mimeTx = object
+    inherit [Mime_entity.basic_scan_adapter event] scanner
+        ~lim flowTx bodyRx mimeTx
+    method private headers = new Mime_entity.basic_scan_adapter
 end
 
-type 'headers w_event =
-    | W_event of 'headers * w_body
-    constraint 'headers = #Mime_entity.headers
-and w_body =
-    | W_body of Iom_octet_stream.fragment Iom_gadget.tx
-    | W_part of (Iom_stream.more * Mime_entity.headers w_event) Iom_gadget.tx
+class virtual ['event] message_scanner ~lim flowTx bodyRx mimeTx = object
+    constraint 'event = #Mime_entity.message_scan_adapter event
+    inherit ['event] scanner ~lim flowTx bodyRx mimeTx
+end
 
 (*---------------------------------------------------------------------------*
-let rec mqPush n q = function
-    | (_, _, len as hd) :: tl -> mqPush (n + len) (Cf_deque.A.push hd q) tl
-    | [] -> n, q
+  EMITTING
+ *---------------------------------------------------------------------------*)
+type 'event emitter_source =
+    | E_src_entity of 'event Iom_gadget.rx
+    | E_src_body of Iom_octet_stream.fragment Iom_gadget.rx
+    constraint 'event = #Mime_entity.basic_emit_adapter event
 
-let rec mqPop m x n q =
-    if x > 0 then
-        match Cf_deque.B.pop q with
-        | Some ((buf, pos, len), tl) when len > x ->
-            let hd1 = buf, pos + x, len - x and hd0 = buf, pos, x in
-            mqPop (hd0 :: m) 0 (n - x) (Cf_deque.B.push hd1 tl)
-        | Some ((_, _, len as hd), tl) ->
-            mqPop (hd :: m) (x - len) (n - len) tl
-        | None ->
-    		List.rev m, n, q
-    else
-        List.rev m, n, q
+class ['event] emitter ?limits flowTx mimeRx bodyTx =
+    object(self:'self)
+        constraint 'event = #Mime_entity.basic_emit_adapter event
+        inherit ['event, Iom_octet_stream.fragment] Iom_octet_stream.emitter
+             ?limits flowTx mimeRx bodyTx
 
-let mqDrain = Cf_deque.A.fold (fun m s -> s :: m) []
+        val source_ = E_src_entity (mimeRx :> 'event Iom_gadget.rx)
+        val flowTx_ = (flowTx :> Iom_stream.readywait Iom_gadget.tx)
 
-(*
-type Iom_file.rd_binding = {
-    rd_ctrlTx: reader_control_t Iom_gadget.tx;
-    rd_sigRx: signal_t Iom_gadget.rx;
-    rd_dataRx: octets Iom_gadget.rx;
-}
-*)
+        method private push_octets data =
+            let length = Cf_message.length data in
+            let mark = mark_ + length in
+            let buffer = Cf_message.push data buffer_ in
+            let high = limits_.Iom_octet_stream.high in
+            if consume_ && not (mark < high) then begin
+                flowTx_#put `Wait >>= fun () ->
+                Cf_cmonad.return (false, buffer, mark)
+            end
+            else
+                Cf_cmonad.return (consume_, buffer, mark)
+        
+        method private shift_octets len =
+            let mark = if mark_ - len > 0 then mark_ - len else 0 in
+            let buffer = Cf_message.shiftq ~len buffer_ in
+            let low = limits_.Iom_octet_stream.low in
+            if not consume_ && mark < low then begin
+                flowTx_#put `Ready >>= fun () ->
+                Cf_cmonad.return (true, buffer, mark)
+            end
+            else
+                Cf_cmonad.return (consume_, buffer, mark)
 
-type 'headers rd_signal =
-    [ Iom_reactor.signal | `Headers of 'headers ]
-    constraint 'headers = #Mime_entity.headers_parser
+        method private emit =
+            let data = Cf_deque.A.to_list buffer_ in
+            let m = new Iom_octet_stream.fragment more_ data in
+            [ m ], Cf_message.length data
+        
+        method push_headers e =
+            self#push_octets e.headers#emit >>= fun (consume, buffer, mark) ->
+            Cf_cmonad.return {<
+                consume_ = consume; buffer_ = buffer; mark_ = mark;
+                source_ = E_src_body e.bodyRx; flowTx_ = e.flowTx
+            >}
+        
+        method push_body m =
+            self#push_octets m#data >>= fun (consume, buffer, mark) ->
+            Cf_cmonad.return {<
+                consume_ = consume; buffer_ = buffer; mark_ = mark;
+                more_ = m#more;
+            >}
 
-class ['headers] rd ctrlRx sigTx dataTx fileBind = object(self)
-    inherit Iom_machine.initial
-    
-    val ctrlRx_ = (ctrlRx :> Iom_file.reader_control_t Iom_gadget.rx)
-    val sigTx_ = (sigTx :> 'headers rd_signal Iom_gadget.tx)
-    val dataTx_ = (dataTx :> Iom_file.octets Iom_gadget.tx)
-
-    val ctrlTx_ = fileBind.Iom_file.rd_ctrlTx;
-    val sigRx_ = fileBind.Iom_file.rd_sigRx;
-    val dataRx_ = fileBind.Iom_file.rd_sigRx;
-    
-    method private control = function
-    	| `Ready -> self#ready
-    	| `Wait -> self#wait
-    	| `Close -> self#close
-    	| `Unlink -> self#unlink
-    
-    method private error x = sigTx#put (`Error x)
-    
-    method private signal (`Error x) = self#error x
-    
-    method private guard =
-    	sigRx_#get self#signal >>= fun () ->
-    	ctrlRx_#get self#control >>= fun () ->
-    	dataRx_#get self#receive
-end
- *---------------------------------------------------------------------------*)
-
-(*---------------------------------------------------------------------------*
-class ['headers] fragment ?h:headers more octets =
-    object
-        inherit Iom_file.octets more octets
-        constraint 'headers = #Mime_entity.headers
-        method headers: 'headers option = headers
+        method consume f =
+            match consume_ with
+            | false ->
+                Cf_cmonad.nil
+            | true ->
+                match source_ with
+                | E_src_entity mimeRx ->
+                    mimeRx#get begin fun event ->
+                        try
+                            self#push_headers event >>= fun obj ->
+                            f (Cf_exnopt.U obj)
+                        with
+                        | x ->
+                            f (Cf_exnopt.X x)
+                    end
+                | E_src_body bodyRx ->
+                    bodyRx#get begin fun fragment ->
+                        try
+                            self#push_body fragment >>= fun obj ->
+                            f (Cf_exnopt.U obj)
+                        with
+                        | x ->
+                            f (Cf_exnopt.X x)
+                    end
     end
 
-(*---
-  val pushQ_:
-    int -> (string * int * int) Cf_deque.t -> Cf_message.t ->
-    int * (string * int * int) Cf_deque.t
-    
-    Given a queue Q, with N bytes total in it, push the contents of the message
-    onto the B end the queue, and increment N by the size of the message.
-  ---*)
-let rec pushQ_ n q = function
-    | (_, _, len as hd) :: tl ->
-    	pushQ_ (n + len) (Cf_deque.A.push hd q) tl
-    | [] ->
-    	n, q
+class entity_emitter = [Mime_entity.basic_emit_adapter event] emitter
 
-(*---
-  val popQLoop_:
-    Cf_message.t -> int -> int -> (string * int * int) Cf_deque.t ->
-    Cf_message.t * int * (string * int * int) Cf_deque.t
-
-    Given an accumulator M (assumed empty), a requested length X, a length N
-    and a queue Q, pop up to X bytes off the queue into the (reversing)
-    accumulator.
-  ---*)
-
-let rec popQLoop_ m x n q =
-    if x > 0 then
-        match Cf_deque.B.pop q with
-        | Some ((buf, pos, len), tl) when len > x ->
-            let hd1 = buf, pos + x, len - x and hd0 = buf, pos, x in
-            popQLoop_ (hd0 :: m) 0 (n - x) (Cf_deque.B.push hd1 tl)
-        | Some ((_, _, len as hd), tl) ->
-            popQLoop_ (hd :: m) (x - len) (n - len) tl
-        | None ->
-    		popQLoop_ m 0 n q
-    else
-        List.rev m, n, q
-
-let popQ_ ~e n q = popQLoop_ [] e n q
-
-let drainQ_ q = Cf_deque.fold (fun m s -> s :: m) [] q
-
-class virtual ['w, 'd, 'h] buffer ~e ?(d = 1) readyRx readyTx =
-    let readyRx = (readyRx :> 'r Iom_gadget.rx) in
-    let readyTx = (readyTx :> 'r Iom_gadget.tx) in
-    object(self:'self)
-        inherit Iom_machine.initial as super
-
-        constraint 'w = [> Iom_file.ready_t ]
-        constraint 'd = #Iom_file.octets
-        constraint 'h = #Mime_entity.headers
-        
-        val queue_ = Cf_deque.nil
-        val ready_ = false
-        val more_ = Iom_file.More
-        val length_ = 0
-        val headers_ = None
-
-        method private virtual receive: 'd -> ('self, unit) Iom_gadget.t
-
-        method private virtual send:
-            ?h:'h -> Iom_file.more_t -> Cf_message.t ->
-    		('self, unit) Iom_gadget.t
-    	
-    	(*---
-    	  We drain the buffer [e] bytes at a time until there are no more bytes
-    	  in the stream, or a transmission exception is raised, or there's less
-    	  than [d] bytes left.  If we the stream is still in progress, then
-    	  construct a new object and invoke #next on it.
-    	  ---*)
-        method private drain r more n q h =
-    		let m, n, q = popQ_ ~e n q in
-    		match h, m with
-    		| None, _	(* no headers yet [ingesting] *)
-    		| _, [] ->	(* queue is empty *)
-    			assert (jout#debug
-    				"Mime_stream.buffer#drain: done! h=%s n=%d m='%s'"
-    				(match h with None -> "None" | _ -> "Some") n
-    				(Cf_message.contents m));
-    			let obj = {<
-    				queue_ = q; ready_ = r; more_ = more; length_ = n;
-    				headers_ = h;
-    			>} in
-    			obj#next
-    		| Some _, _ ->
-    			let txmore = if n > 0 then Iom_file.More else more in
-    			assert (jout#debug
-    				"Mime_stream.buffer#drain: sending... n=%d d=%d %s'%s'"
-    				n d (if txmore = Iom_file.More then "+" else ".")
-    				(Cf_message.contents m));
-    			self#send ?h txmore m >>= fun () ->
-    			match txmore with
-    			| Iom_file.More ->
-    				if n < d && more = Iom_file.More then
-    					let obj = {<
-    						queue_ = q; ready_ = r; more_ = more;
-    						length_ = n; headers_ = h;
-    					>} in
-    					obj#next
-    				else
-    					self#drain r more n q None
-    			| _ ->
-    				self#drain r more n q h
-        
-        method private wait = {< ready_ = false >}#next
-        
-        method private ready =
-            begin
-                if length_ < e then readyTx#put `Ready else Cf_cmonad.nil 
-            end >>= fun () ->
-            self#drain true more_ length_ queue_ headers_
-        
-        method private flow = function
-            | `Wait when ready_ -> self#wait
-            | `Ready when not ready_ -> self#ready
-            | _ -> self#next
-        
-        method private guard = readyRx#get self#flow
-
-        initializer
-            if d < 1 then invalid_arg "Mime_stream.buffer: d < 1";
-            if e < d then invalid_arg "Mime_stream.buffer: e < d"
-    end
-
-class ['h] ingestor ~e ?(d = e) ~p ~c:ctrlIO ~s:sigIO dataRx dataTx =
-    let ctrlRx, ctrlTx = ctrlIO in
-    let ctrlRx = (ctrlRx :> 'c Iom_gadget.rx) in
-    let ctrlTx = (ctrlTx :> 'c Iom_gadget.tx) in
-    let sigRx, sigTx = sigIO in
-    let sigRx = (sigRx :> 's Iom_gadget.rx) in
-    let sigTx = (sigTx :> 's Iom_gadget.tx) in
-    let dataRx = (dataRx :> 'd Iom_gadget.rx) in
-    let dataTx = (dataTx :> 'h fragment Iom_gadget.tx) in
-    object(self)
-        inherit ['c, 'd, 'h] buffer ~e ~d ctrlRx ctrlTx as super
-
-        constraint 'c = Iom_file.reader_control_t
-        constraint 's = Iom_file.signal_t
-        constraint 'd = Iom_file.octets
-        constraint 'h = #Mime_entity.headers_parser
-    	
-    	method private error x =
-    		sigTx#put (`Error x) >>= fun () ->
-    		ctrlTx#put `Close
-    	
-    	method private receive data =
-    		let more = data#more and m = data#octets in
-    		let n = length_ + Cf_message.length m in
-    		let m = Cf_deque.fold (fun m s -> s :: m) data#octets queue_ in
-    		if n < e && more = Iom_file.More then
-    			let n, q = pushQ_ 0 Cf_deque.nil m in
-    			{< length_ = n; queue_ = q >}#next
-    		else
-    			match headers_ with
-    			| Some _ ->
-    				let n, q = pushQ_ 0 Cf_deque.nil m in
-    				self#drain ready_ more n q headers_
-    			| None ->
-    				match
-    					try
-    						Cf_exnopt.U (Some (p m))
-    					with
-    					| End_of_file ->
-    						Cf_exnopt.U None
-    					| x ->
-    						Cf_exnopt.X x
-    				with
-    				| Cf_exnopt.X x ->
-    					self#error x
-    				| Cf_exnopt.U (Some (h, m)) ->
-    					let n, q = pushQ_ 0 Cf_deque.nil m in
-    					self#drain ready_ more n q (Some h)
-    				| Cf_exnopt.U None when n < e ->
-    					let n, q = pushQ_ n Cf_deque.nil m in
-    					{< length_ = n; queue_ = q >}#next
-    				| Cf_exnopt.U None ->
-    					self#error Header_too_long
-    				
-        method private send ?h more m = dataTx#put (new fragment ?h more m)
-    	
-        method private signal x = sigTx#put x
-        
-        method private flow = function
-            | #Iom_file.control_t as c -> ctrlTx#put c
-            | #Iom_file.ready_t as w -> super#flow w
-        
-        method private guard =
-            sigRx#get self#signal >>= fun () ->
-            super#guard >>= fun () ->
-            match more_ with
-            | Iom_file.More -> dataRx#get self#receive
-            | _ -> Cf_cmonad.nil 
-
-    end
-
-class ['h] renderer ~e ?(d = e) ~c:ctrlIO ~s:sigIO dataRx dataTx =
-    let ctrlRx, ctrlTx = ctrlIO in
-    let ctrlRx = (ctrlRx :> 'c Iom_gadget.rx) in
-    let ctrlTx = (ctrlTx :> 'c Iom_gadget.tx) in
-    let sigRx, sigTx = sigIO in
-    let sigRx = (sigRx :> 's Iom_gadget.rx) in
-    let sigTx = (sigTx :> 's Iom_gadget.tx) in
-    let dataRx = (dataRx :> 'd Iom_gadget.rx) in
-    let dataTx = (dataTx :> Iom_file.octets Iom_gadget.tx) in
-    object(self)
-        inherit ['s, 'd, 'h] buffer ~e ~d sigRx sigTx as super
-
-        constraint 'c = Iom_file.control_t
-        constraint 's = Iom_file.writer_signal_t
-        constraint 'h = #Mime_entity.headers_emitter
-        constraint 'd = 'h fragment
-        
-        method private receive (data : 'd) =
-            let more = data#more in
-            let h, m =
-                match headers_ with
-                | Some _ ->
-                    data#headers, data#octets
-                | None ->
-                    let h = data#headers in
-                    let m = data#octets in
-                    h, m
-            in
-    		assert (jout#debug "Mime_stream.renderer#receive: %s'%s'"
-    			(if more = Iom_file.More then "+" else ".")
-    			(Cf_message.contents m));
-            let n, q = pushQ_ length_ queue_ m in
-            match more with
-            | Iom_file.More when n < e -> 
-                {< length_ = n; queue_ = q; headers_ = h >}#next
-            | _ ->
-                self#drain ready_ more n q h
-
-        method private send ?h more m =
-    		let m =
-    			match h with
-    			| None -> m
-    			| Some h -> h#emit @ m
-    		in
-    		dataTx#put (new Iom_file.octets more m) >>= fun () ->
-    		if more = Iom_file.Last then
-    			sigTx#put `Final
-    		else
-    			Cf_cmonad.nil 
-    		
-        method private control x = ctrlTx#put x
-        
-        method private flow = function
-            | #Iom_file.ready_t as w -> super#flow w
-            | s -> sigTx#put s
-
-        method private guard =
-            ctrlRx#get self#control >>= fun () ->
-            super#guard >>= fun () ->
-            match more_ with
-            | Iom_file.More -> dataRx#get self#receive
-            | _ -> Cf_cmonad.nil 
-    end
-
-type 'p rd_t = {
-    rd_ctrlTx: Iom_file.reader_control_t Iom_gadget.tx;
-    rd_sigRx: Iom_file.signal_t Iom_gadget.rx;
-    rd_dataRx: 'p fragment Iom_gadget.rx;
-} constraint 'p = #Mime_entity.headers
-
-let ingest ~e ?d ~p rd =
-    Iom_gadget.simplex >>= fun (ctrlRx, ctrlTx) ->
-    Iom_gadget.simplex >>= fun (sigRx, sigTx) ->
-    Iom_gadget.simplex >>= fun (dataRx, dataTx) ->
-    let ctrlIO = ctrlRx, rd.Iom_file.rd_ctrlTx in
-    let sigIO = rd.Iom_file.rd_sigRx, sigTx in
-    let i =
-    	new ingestor ~e ?d ~p ~c:ctrlIO ~s:sigIO rd.Iom_file.rd_dataRx dataTx
-    in
-    i#start >>= fun () ->
-    Cf_cmonad.return {
-    	rd_ctrlTx = ctrlTx;
-    	rd_sigRx = sigRx;
-    	rd_dataRx = dataRx;
-    }
-
-type 'e wr_t = {
-    wr_ctrlTx: Iom_file.control_t Iom_gadget.tx;
-    wr_sigRx: Iom_file.writer_signal_t Iom_gadget.rx;
-    wr_dataTx: 'e fragment Iom_gadget.tx;
-} constraint 'e = #Mime_entity.headers
-
-let render ~e ?d wr =
-    Iom_gadget.simplex >>= fun (ctrlRx, ctrlTx) ->
-    Iom_gadget.simplex >>= fun (sigRx, sigTx) ->
-    Iom_gadget.simplex >>= fun (dataRx, dataTx) ->
-    let ctrlIO = ctrlRx, wr.Iom_file.wr_ctrlTx in
-    let sigIO = wr.Iom_file.wr_sigRx, sigTx in
-    let r =
-    	new renderer ~e ?d ~c:ctrlIO ~s:sigIO dataRx wr.Iom_file.wr_dataTx
-    in
-    r#start >>= fun () ->
-    Cf_cmonad.return {
-    	wr_ctrlTx = ctrlTx;
-    	wr_sigRx = sigRx;
-    	wr_dataTx = dataTx;
-    }
- *---------------------------------------------------------------------------*)
+class ['event] message_emitter ?limits flowTx mimeRx bodyTx = object
+    constraint 'event = #Mime_entity.message_emit_adapter event
+    inherit ['event] emitter ?limits flowTx mimeRx bodyTx
+end
 
 (*--- End of File [ mime_stream.ml ] ---*)

mime/mime_stream.mli

   OF THE POSSIBILITY OF SUCH DAMAGE. 
  *---------------------------------------------------------------------------*)
 
+(*---------------------------------------------------------------------------*
+  EVENT
+ *---------------------------------------------------------------------------*)
+type 'adapter event = {
+    headers: 'adapter;
+    flowTx: Iom_stream.readywait Iom_gadget.tx;
+    bodyRx: Iom_octet_stream.fragment Iom_gadget.rx;
+} constraint 'adapter = #Mime_entity.headers
+
+(*---------------------------------------------------------------------------*
+  SCANNING
+ *---------------------------------------------------------------------------*)
 exception Header_too_long
 
-(*---------------------------------------------------------------------------*
-class ['headers] fragment:
-    ?h:'headers -> Iom_file.more_t -> Cf_message.t ->
-    object
-        inherit Iom_file.octets
-        constraint 'headers = #Mime_entity.headers
-        method headers: 'headers option
+class type virtual ['event] scanner = object('self)
+    constraint 'adapter = #Mime_entity.basic_scan_adapter
+    constraint 'event = 'adapter event
+    inherit [Iom_octet_stream.fragment, 'event] Iom_octet_stream.scanner
+
+    method private virtual headers: (string * char Cf_seq.t) list -> 'adapter
+end
+
+class entity_scanner:
+    lim:int -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
+    Iom_octet_stream.fragment #Iom_gadget.rx ->
+    Mime_entity.basic_scan_adapter event #Iom_gadget.tx ->
+    object('self)
+        inherit [Mime_entity.basic_scan_adapter event] scanner
+        method private headers:
+            (string * char Cf_seq.t) list -> Mime_entity.basic_scan_adapter
     end
 
-class virtual ['w, 'd, 'h] buffer:
-    e:int -> ?d:int -> 'w #Iom_gadget.rx -> 'w #Iom_gadget.tx ->
+class virtual ['event] message_scanner:
+    lim:int -> [> Iom_stream.readywait ] #Iom_gadget.tx ->
+    Iom_octet_stream.fragment #Iom_gadget.rx -> 'event #Iom_gadget.tx ->
     object('self)
-        inherit Iom_machine.initial
-        
-        constraint 'w = [> Iom_file.ready_t ]
-        constraint 'd = #Iom_file.octets
-        constraint 'h = #Mime_entity.headers
-
-        val queue_: (string * int * int) Cf_deque.t
-        val ready_: bool
-        val more_: Iom_file.more_t
-        val length_: int
-        val headers_: 'h option
-
-        method private drain:
-            bool -> Iom_file.more_t -> int ->
-            (string * int * int) Cf_deque.t -> 'h option ->
-            ('self, unit) Iom_gadget.t
-
-        method private virtual receive: 'd -> ('self, unit) Iom_gadget.t
-
-        method private virtual send:
-            ?h:'h -> Iom_file.more_t -> Cf_message.t ->
-    		('self, unit) Iom_gadget.t
-        
-        method private ready: ('self, unit) Iom_gadget.t
-        method private wait : ('self, unit) Iom_gadget.t
-
-        method private flow: 'w -> ('self, unit) Iom_gadget.t
+        constraint 'event = #Mime_entity.message_scan_adapter event
+        inherit ['event] scanner
     end
 
-class ['h] ingestor:
-    e:int -> ?d:int -> p:(Cf_message.t -> 'h * Cf_message.t) ->
-    c:('c #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
-    s:('s #Iom_gadget.rx * 's #Iom_gadget.tx) ->
-    'd #Iom_gadget.rx -> 'h fragment #Iom_gadget.tx ->
+(*---------------------------------------------------------------------------*
+  EMITTING
+ *---------------------------------------------------------------------------*)
+type 'event emitter_source =
+    | E_src_entity of 'event Iom_gadget.rx
+    | E_src_body of Iom_octet_stream.fragment Iom_gadget.rx
+    constraint 'event = #Mime_entity.basic_emit_adapter event
+
+class type ['event] emitter = object('self)
+    constraint 'event = #Mime_entity.basic_emit_adapter event
+    inherit ['event, Iom_octet_stream.fragment] Iom_octet_stream.emitter
+    
+    val source_: 'event emitter_source
+    val flowTx_: Iom_stream.readywait Iom_gadget.tx
+
+    method push_headers: 'event -> 'self Iom_gadget.t
+    method push_body: Iom_octet_stream.fragment -> 'self Iom_gadget.t
+end
+
+class entity_emitter:
+    ?limits:Iom_octet_stream.limits ->
+    [> Iom_stream.readywait ] #Iom_gadget.tx ->
+    Mime_entity.basic_emit_adapter event #Iom_gadget.rx ->
+    Iom_octet_stream.fragment #Iom_gadget.tx ->
+    [Mime_entity.basic_emit_adapter event] emitter
+
+class ['event] message_emitter:
+    ?limits:Iom_octet_stream.limits ->
+    [> Iom_stream.readywait ] #Iom_gadget.tx -> 'event #Iom_gadget.rx ->
+    Iom_octet_stream.fragment #Iom_gadget.tx ->
     object('self)
-        inherit ['c, 'd, 'h] buffer
-
-        constraint 'c = Iom_file.reader_control_t
-        constraint 's = Iom_file.signal_t
-        constraint 'd = Iom_file.octets
-        constraint 'h = #Mime_entity.headers_parser
-
-        method private receive: 'd -> ('self, unit) Iom_gadget.t
-
-        method private send:
-    		 ?h:'h -> Iom_file.more_t -> Cf_message.t ->
-    		 ('self, unit) Iom_gadget.t
+        constraint 'event = #Mime_entity.message_emit_adapter event
+        inherit ['event] emitter
     end
 
-class ['h] renderer:
-    e:int -> ?d:int -> c:('c #Iom_gadget.rx * 'c #Iom_gadget.tx) ->
-    s:('s #Iom_gadget.rx * 's #Iom_gadget.tx) -> 'h fragment #Iom_gadget.rx ->
-    Iom_file.octets #Iom_gadget.tx ->
-    object('self)
-        inherit ['s, 'd, 'h] buffer
-
-        constraint 'c = Iom_file.control_t
-        constraint 's = Iom_file.writer_signal_t
-        constraint 'h = #Mime_entity.headers_emitter
-        constraint 'd = 'h fragment
-
-        method private receive: 'd -> ('self, unit) Iom_gadget.t
-
-        method private send:
-            ?h:'h -> Iom_file.more_t -> Cf_message.t ->
-    		('self, unit) Iom_gadget.t
-    end
-
-type 'h rd_t = {
-    rd_ctrlTx: Iom_file.reader_control_t Iom_gadget.tx;
-    rd_sigRx: Iom_file.signal_t Iom_gadget.rx;
-    rd_dataRx: 'h fragment Iom_gadget.rx;
-} constraint 'h = #Mime_entity.headers
-
-val ingest:
-    e:int -> ?d:int ->
-    p:(Cf_message.t -> (#Mime_entity.headers_parser as 'h) * Cf_message.t) ->
-    Iom_file.rd_t -> ('s, 'h rd_t) Iom_gadget.t
-
-type 'h wr_t = {
-    wr_ctrlTx: Iom_file.control_t Iom_gadget.tx;
-    wr_sigRx: Iom_file.writer_signal_t Iom_gadget.rx;
-    wr_dataTx: 'h fragment Iom_gadget.tx;
-} constraint 'h = #Mime_entity.headers
-
-val render:
-    e:int -> ?d:int -> Iom_file.wr_t ->
-    ('s, #Mime_entity.headers_emitter wr_t) Iom_gadget.t
- *---------------------------------------------------------------------------*)
-
 (*--- End of File [ mime_stream.mli ] ---*)
     let cl = Int64.of_int (String.length body)
     
     let h =
+        "MIME-Version: 1.0\r\n" ^
         "Content-Type: text/plain; charset=utf8\r\n" ^
         "Content-Transfer-Encoding: binary\r\n" ^
         "Content-Id: <\"this is a test\"@conjury.org>\r\n" ^
         body
     
     let test () =
-        let e = emit_headers ~cte ~ct ~cid ~cl [] in
+        let e = new message_emit_adapter ~cte ~ct ~cid ~cl [] in
         let m = e#emit @ (Cf_message.create body) in
         let h' = Cf_message.contents m in
         if h' <> h then jout#fail "emit error [%s][%s]" h' h;
-        let p, body' = parse_headers m in
+        let p, body' = scan_message_headers (new message_scan_adapter) m in
         let g s f =
             try
                 match f () with
             jout#fail "Body match error"
 end
 
-(*
 module T4 = struct
+    open Cf_cmonad.Op
     module G = Iom_gadget
-    open Cf_cmonad.Op
+    module E = Mime_entity
+    module S = Mime_stream
 
-    let cte = Mime_entity.CTE_binary
+    let cte = E.CTE_binary
 
     let ct = {
-        Mime_entity.ct_type = Mime_entity.CT_text;
+        E.ct_type = E.CT_text;
         ct_subtype = "xml";
         ct_parameters = Mime_atom.Map.of_list [ "charset", "utf8" ];
     }
         "Content-Transfer-Encoding: binary\r\n" ^
         "\r\n" ^
         "<greeting/>"
-
-    let okay = ref false
-
-    let reader_ fd k =
-        Iom_file.read ~e:8 ~fd k >>= fun rd ->
-        let p = Mime_entity.parse_headers in
-        Mime_stream.ingest ~e:1500 ~p rd >>= fun rd ->
-        let rec loop () =
-            G.guard begin
-                rd.Mime_stream.rd_sigRx#get do_sigRx >>= fun () ->
-                rd.Mime_stream.rd_dataRx#get do_dataRx
-            end
-        and do_sigRx (`Error x) =
-            raise x
-        and do_dataRx m =
-            G.load >>= fun (h, s) ->
-            let h = match m#headers with Some _ as h -> h | _ -> h in
-            begin
-                match h with
-                | None -> jout#info "no header..."
-                | Some _ -> jout#info "header!"
-            end;
-            let s = s @ m#octets in
-            let str = Cf_message.contents s in
-            match m#more with
-            | Iom_file.More ->
-                jout#info "more...";
-                G.store (h, s) >>= fun () ->
-                loop ()
-            | Iom_file.Last ->
-                jout#info "last...";
-                okay := begin
-                    match h with
-                    | Some h ->
-                        jout#info "body... '%s' = '%s'"
-                            (Cf_message.contents body) str;
-                        h#content_transfer_encoding = Some cte &&
-                        h#content_type = Some ct &&
-                        (Cf_message.contents body) = str
-                    | None ->
-                        jout#fail "No header!"
-                end;
-                Cf_cmonad.nil 
-        in
-        G.start (loop ()) (None, []) >>= fun () ->
-        rd.Mime_stream.rd_ctrlTx#put `Ready
-
-    let writer_ fd k =
-        Iom_file.write ~fd k >>= fun wr ->
-        Mime_stream.render ~e:5 wr >>= fun wr ->
-        let rec loop () =
-            G.guard begin
-                wr.Mime_stream.wr_sigRx#get begin function
-                    | `Error x -> raise x
-                    | `Final -> Cf_cmonad.nil 
-                    | _ -> loop ()
-                end
-            end
-        in
-        G.start (loop ()) () >>= fun () ->
-        let h = Mime_entity.emit_headers ~ct ~cte [] in
-        let frag = new Mime_stream.fragment ~h Iom_file.Last body in
-        wr.Mime_stream.wr_dataTx#put frag
-    
-    let reactor_ k = 
-        let rd, wr = Unix.pipe () in
-        Unix.set_nonblock rd;
-        Unix.set_nonblock wr;
-        reader_ rd k >>= fun () ->
-        writer_ wr k >>= fun () ->
-        Cf_cmonad.nil 
-
-    let test () =
-        try
-            G.run reactor_ ();
-            if not !okay then failwith "reactor: processing completed early."
-        with
-        | Unix.Unix_error (error, fname, arg) ->
-            let msg =
-                let error = Unix.error_message error in
-                Printf.sprintf "Unix error \"%s\" in %s(%s).\n" error fname arg
-            in
-            failwith msg
-        | x ->
-            raise x
-end
-*)
-
-module T4 = struct
-    open Cf_cmonad.Op
     
     let finished = ref false
+        
+    class top ijack ojack =
+        let scanRx, (_, ctrlTx) = ijack in
+        let emitTx, _ = ojack in
+        object
+            inherit Iom_gadget.start as super
+            inherit Iom_gadget.next
+            
+            method private guard =
+                scanRx#get begin fun e ->
+                    let h = e.Mime_stream.headers in
+                    h#force;
+                    if h#content_transfer_encoding <> Some cte then
+                        jout#fail "T4: content transfer encoding";
+                    let ct' = h#content_type in
+                    let ct' =
+                        match ct' with
+                        | None -> jout#fail "T4: content type"
+                        | Some ct' -> ct'
+                    in
+                    if ct'.Mime_entity.ct_type <> ct.E.ct_type then
+                        jout#fail "T4: content type [type]";
+                    if ct'.Mime_entity.ct_subtype <> ct.E.ct_subtype then
+                        jout#fail "T4: content type [subtype]";
+                    let ctp' =
+                        Mime_atom.Map.search "charset"
+                        ct'.Mime_entity.ct_parameters
+                    in
+                    let ctp =
+                        Mime_atom.Map.search "charset"
+                        ct.Mime_entity.ct_parameters
+                    in
+                    if ctp' <> ctp then
+                        jout#fail "T4: content type [parameter]";
+                    finished := true;
+                    ctrlTx#put `Stop >>= fun () ->
+                    Iom_gadget.abort
+                end
+            
+            method start =
+                let data = Cf_message.create plain in
+                let m = new Iom_octet_stream.fragment Iom_stream.Last data in
+                emitTx#put m >>= fun () ->
+                ctrlTx#put `Ready >>= fun () ->
+                super#start
+        end
+            
+    let reactor k =
+        let limits = { Iom_octet_stream.low = 256; high = 256 } in
+        Iom_pipe.create ~limits k >>= fun (ijack, ojack) ->
+        Iom_layer.ingest ijack (new S.entity_scanner ~lim:256) >>= fun ijack ->
+        let top = new top ijack ojack in
+        top#start
     
-    let reactor _ =
-        finished := true;
-        Cf_cmonad.nil 
+    let test () =
+        Iom_gadget.run reactor;
+        if not !finished then jout#fail "T4: reaction incomplete!"
+end
+
+module T5 = struct
+    open Cf_cmonad.Op
+    module G = Iom_gadget
+    module E = Mime_entity
+    module S = Mime_stream
+
+    let cte = E.CTE_binary
+
+    let ct = {
+        E.ct_type = E.CT_text;
+        ct_subtype = "xml";
+        ct_parameters = Mime_atom.Map.of_list [ "charset", "utf8" ];
+    }
+        
+    let body = Cf_message.create "<greeting/>\r\n"
+
+    let expect =
+        "Content-Type: text/xml; charset=utf8\r\n" ^
+        "Content-Transfer-Encoding: binary\r\n" ^
+        "\r\n" ^
+        "<greeting/>\r\n"
+    
+    let finished = ref false
+        
+    class top ijack ojack =
+        let readRx, (_, ctrlTx) = ijack in
+        let emitTx, _ = ojack in
+        object
+            inherit Iom_gadget.start as super
+            inherit Iom_gadget.next
+            
+            method private guard =
+                readRx#get begin fun u ->
+                    if u#more <> Iom_stream.Last then
+                        jout#fail "T5: message incomplete!";
+                    let m = Cf_message.contents u#data in
+                    if expect <> m then
+                        jout#fail "T5: format error!\n%s" m;
+                    finished := true;
+                    ctrlTx#put `Stop >>= fun () ->
+                    Iom_gadget.abort
+                end
+            
+            method start =
+                Iom_gadget.simplex >>= fun (bodyRx, bodyTx) ->
+                let m = new Iom_octet_stream.fragment Iom_stream.Last body in
+                bodyTx#put m >>= fun () ->
+                let h = new Mime_entity.basic_emit_adapter ~ct ~cte [] in
+                let e = {
+                    Mime_stream.headers = h;
+                    flowTx = new Iom_gadget.tx Iom_gadget.null;
+                    bodyRx = bodyRx;
+                } in
+                emitTx#put e >>= fun () ->
+                ctrlTx#put `Ready >>= fun () ->
+                super#start
+        end
+            
+    let reactor k =
+        let limits = { Iom_octet_stream.low = 256; high = 256 } in
+        Iom_pipe.create ~limits k >>= fun (ijack, ojack) ->
+        Iom_layer.render ojack (new S.entity_emitter ~limits) >>= fun ojack ->
+        let top = new top ijack ojack in
+        top#start
     
     let test () =
         Iom_gadget.run reactor;
 
 let main () =
     let tests = [
-        T1.test; T2.test; T3.test; T4.test
+        T1.test; T2.test; T3.test; T4.test; T5.test
     ] in
     Printf.printf "1..%d\n" (List.length tests);
     flush stdout;