1. james woodyatt
  2. oni

Source

oni / mime / mime_stream.ml

(*---------------------------------------------------------------------------*
  $Change$
  Copyright (c) 2005-2010, James H. Woodyatt
  All rights reserved.
  
  Redistribution and use in source and binary forms, with or without
  modification, are permitted provided that the following conditions
  are met:
  
    Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.
    
    Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in
    the documentation and/or other materials provided with the
    distribution
  
  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
  OF THE POSSIBILITY OF SUCH DAMAGE. 
 *---------------------------------------------------------------------------*)

(**)
let jout = Cf_journal.stdout
(**)

open Cf_cmonad.Op

(*---------------------------------------------------------------------------*
  EVENT
 *---------------------------------------------------------------------------*)
type 'headers event =
    'headers * Iom_octet_stream.fragment Iom_gadget.rx
    constraint 'headers = #Mime_entity.headers

type entity_scan_event = Mime_entity.basic_scan_adapter event
type entity_emit_event = Mime_entity.basic_emit_adapter event

(*---------------------------------------------------------------------------*
  SCANNING
 *---------------------------------------------------------------------------*)
type scan_error =
    | X_header_too_long
    | X_header_incomplete

let scan_error_to_string = function
    | X_header_too_long -> "header too long"
    | X_header_incomplete -> "header incomplete"

exception Scan_error of scan_error

class virtual ['event] scanner ?limits flowTx bodyRx mimeTx =
    let bodyRx = (bodyRx :> 'fragment Iom_gadget.rx) in
    object(self:'self)
        constraint 'adapter = #Mime_entity.basic_scan_adapter
        constraint 'event = 'adapter event
        inherit [Iom_octet_stream.fragment, 'event] Iom_octet_stream.scanner
            ?limits flowTx bodyRx mimeTx as super
        
        val pass_ = false
        val bodyTx_: 'fragment Iom_gadget.tx option = None
        
        method virtual private headers:
            (string * char Cf_seq.t) list -> 'adapter
                
        method private tokens msg _ =
            let tlength = Cf_message.length msg in
            try
                let headers, body = Mime_lex.split_into_fields msg in
                let blength = Cf_message.length body in
                let slength = tlength - blength in
                let event = self#headers headers, bodyRx in
                Some ((event, slength), Cf_seq.nil)
            with
            | End_of_file when tlength > limits_.Iom_octet_stream.high ->
                raise (Scan_error X_header_too_long)
            | End_of_file when more_ = Iom_stream.Last ->
                raise (Scan_error X_header_incomplete)
            | End_of_file ->
                None
        
        method private shift_body bodyTx consume buffer =
            let m = Cf_message.drain buffer in
            let fragment = new Iom_octet_stream.fragment more_ m in
            bodyTx#put fragment >>= fun () ->
            if more_ = Iom_stream.Last then
                Cf_cmonad.return (Cf_exnopt.U None)
            else begin
                let obj =
                    match bodyTx_ with
                    | None ->
                        {< consume_ = true; buffer_ = Cf_deque.nil; mark_ = 0;
                           bodyTx_ = Some bodyTx >}
                    | Some bodyTx' ->
                        assert (bodyTx' == bodyTx);
                        {< consume_ = true; buffer_ = Cf_deque.nil;
                           mark_ = 0; >}
                in
                let return = Cf_cmonad.return (Cf_exnopt.U (Some obj)) in
                if consume then
                    return
                else begin
                    flowTx#put `Ready >>= fun () ->
                    return
                end
            end
        
        method private shift_header =
            match try Cf_exnopt.U self#scan with x -> Cf_exnopt.X x with
            | Cf_exnopt.X _ as v ->
                Cf_cmonad.return v
            | Cf_exnopt.U None ->
                Cf_cmonad.return (Cf_exnopt.U (Some self))
            | Cf_exnopt.U (Some ([], _))
            | Cf_exnopt.U (Some (_ :: _ :: _, _)) ->
                assert (not true);
                Cf_cmonad.return (Cf_exnopt.U None)
            | Cf_exnopt.U (Some ((h, _) :: [], len)) ->
                self#shift_octets len >>= fun (consume, buffer, _) ->
                Iom_gadget.simplex >>= fun (bodyRx, bodyTx) ->
                mimeTx#put (h, bodyRx) >>= fun () ->
                self#shift_body bodyTx consume buffer
        
        method produce =
            match produce_ with
            | false ->
                Cf_cmonad.return (Cf_exnopt.U (Some self))
            | true ->
                match bodyTx_ with
                | Some bodyTx ->
                    self#shift_body bodyTx consume_ buffer_
                | None ->
                    self#shift_header
    end

class entity_scanner ?limits flowTx bodyRx mimeTx = object
    inherit [Mime_entity.basic_scan_adapter event] scanner
        ?limits flowTx bodyRx mimeTx
    method private headers = new Mime_entity.basic_scan_adapter
end

class virtual ['event] message_scanner ?limits flowTx bodyRx mimeTx = object
    constraint 'event = #Mime_entity.message_scan_adapter event
    inherit ['event] scanner ?limits flowTx bodyRx mimeTx
end

(*---------------------------------------------------------------------------*
  EMITTING
 *---------------------------------------------------------------------------*)
type 'event emitter_source =
    | E_src_entity of 'event Iom_gadget.rx
    | E_src_body of Iom_octet_stream.fragment Iom_gadget.rx
    constraint 'event = #Mime_entity.basic_emit_adapter event

class ['event] emitter ?limits flowTx mimeRx bodyTx =
    object(self:'self)
        constraint 'event = #Mime_entity.basic_emit_adapter event
        inherit ['event, Iom_octet_stream.fragment] Iom_octet_stream.emitter
             ?limits flowTx mimeRx bodyTx
        
        val source_ = E_src_entity (mimeRx :> 'event Iom_gadget.rx)
        
        method private emit =
            let data = Cf_deque.A.to_list buffer_ in
            let m = new Iom_octet_stream.fragment more_ data in
            [ m ], Cf_message.length data
        
        method push_event (headers, bodyRx) =
            self#push_octets Iom_stream.More headers#to_message >>=
                fun (consume, buffer, mark) ->
            Cf_cmonad.return {<
                consume_ = consume; buffer_ = buffer; mark_ = mark;
                source_ = E_src_body bodyRx
            >}
        
        method push_fragment m =
            let more = m#more in
            self#push_octets more m#data >>= fun (consume, buffer, mark) ->
            Cf_cmonad.return {<
                consume_ = consume; buffer_ = buffer; mark_ = mark;
                more_ = more;
            >}
        
        method consume f =
            match consume_ with
            | false ->
                Cf_cmonad.nil
            | true ->
                match source_ with
                | E_src_entity mimeRx ->
                    mimeRx#get begin fun event ->
                        try
                            self#push_event event >>= fun obj ->
                            f (Cf_exnopt.U obj)
                        with
                        | x ->
                            f (Cf_exnopt.X x)
                    end
                | E_src_body bodyRx ->
                    bodyRx#get begin fun fragment ->
                        try
                            self#push_fragment fragment >>= fun obj ->
                            f (Cf_exnopt.U obj)
                        with
                        | x ->
                            f (Cf_exnopt.X x)
                    end
    end

class entity_emitter = [Mime_entity.basic_emit_adapter event] emitter

class ['event] message_emitter ?limits flowTx mimeRx bodyTx = object
    constraint 'event = #Mime_entity.message_emit_adapter event
    inherit ['event] emitter ?limits flowTx mimeRx bodyTx
end

let emit_wrapper fragmentTx =
    let fragmentTx = (fragmentTx :> Iom_octet_stream.fragment Iom_gadget.tx) in
    Iom_gadget.simplex >>= fun (eventRx, eventTx) ->
    let eventTx = (eventTx :> entity_emit_event Iom_gadget.tx) in
    Iom_gadget.start begin
        Iom_gadget.guard begin
            eventRx#get begin fun (headers, fragmentRx) ->
                let m = headers#to_message in
                let fragment =
                    new Iom_octet_stream.fragment Iom_stream.More m
                in
                fragmentTx#put fragment >>= fun () ->
                Iom_gadget.wrap fragmentRx fragmentTx Cf_flow.nop
            end
        end
    end >>= fun () ->
    Cf_cmonad.return eventTx

(*---------------------------------------------------------------------------*
  PLUGS and JACKS
 *---------------------------------------------------------------------------*)
type ('i, 'c, 'n) ipad = ('i event, 'c, 'n) Iom_stream.ipad
    constraint 'i = #Mime_entity.basic_scan_adapter

type ('i, 'c, 'n) ifix = ('i event, 'c, 'n) Iom_stream.ifix
    constraint 'i = #Mime_entity.basic_scan_adapter

type ('o, 'c, 'n) opad = ('o event, 'c, 'n) Iom_stream.opad
    constraint 'o = #Mime_entity.basic_emit_adapter

type ('o, 'c, 'n) ofix = ('o event, 'c, 'n) Iom_stream.ofix
    constraint 'o = #Mime_entity.basic_emit_adapter

type ('i, 'o, 'c, 'n) iopad = ('i event, 'o event, 'c, 'n) Iom_stream.iopad
    constraint 'adapter = #Mime_entity.basic_emit_adapter

type ('i, 'o, 'c, 'n) iofix = ('i event, 'o event, 'c, 'n) Iom_stream.iofix
    constraint 'adapter = #Mime_entity.basic_emit_adapter

type ('c, 'n) entity_scan_pad =
    (Mime_entity.basic_scan_adapter, 'c, 'n) ipad

type ('c, 'n) entity_scan_fix =
    (Mime_entity.basic_scan_adapter, 'c, 'n) ifix

type ('c, 'n) entity_emit_pad =
    (Mime_entity.basic_emit_adapter, 'c, 'n) opad

type ('c, 'n) entity_emit_fix =
    (Mime_entity.basic_emit_adapter, 'c, 'n) ofix

type ('c, 'n) entity_duplex_pad =
    (Mime_entity.basic_scan_adapter, Mime_entity.basic_emit_adapter, 'c, 'n)
    iopad

type ('c, 'n) entity_duplex_fix =
    (Mime_entity.basic_scan_adapter, Mime_entity.basic_emit_adapter, 'c, 'n)
    iofix

let scan_entity ?limits ifix =
    Iom_layer.ingest ifix (new entity_scanner ?limits)

let emit_entity ?limits ofix =
    Iom_layer.render ofix (new entity_emitter ?limits)

let duplex_entities ?slim ?elim iofix =
    Iom_layer.duplex iofix
        (new entity_scanner ?limits:slim) (new entity_emitter ?limits:elim)

(*--- $File$ ---*)