Source

oni / iom / iom_octet_stream.ml

(*---------------------------------------------------------------------------*
  $Change$
  Copyright (c) 2006-2010, James H. Woodyatt
  All rights reserved.
  
  Redistribution and use in source and binary forms, with or without
  modification, are permitted provided that the following conditions
  are met:
  
    Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.
    
    Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in
    the documentation and/or other materials provided with the
    distribution
  
  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
  OF THE POSSIBILITY OF SUCH DAMAGE. 
 *---------------------------------------------------------------------------*)

(*
let jout = Cf_journal.stdout
*)

exception Buffering

class fragment more data = object
    method more: Iom_stream.more = more
    method data: Cf_message.t = data
end

type limits = { low: int; high: int }

let normalize_limits limits =
    let low = if limits.low < 1 then 1 else limits.low in
    let high = if limits.high < low then low else limits.high in
    if low <> limits.low || high <> limits.high then
        { low = low; high = high }
    else
        limits

open Cf_cmonad.Op

class virtual ['x, 'y] transform ~limits flowTx xRx yTx =
    let flowTx = (flowTx :> 'flow Iom_gadget.tx) in
    object
        inherit ['x, 'y] Iom_layer.transform xRx yTx
        val buffer_: Cf_message.substring Cf_deque.t = Cf_deque.nil
        val mark_ = 0
        val more_ = Iom_stream.More
        val limits_ = limits
        
        method private push_octets more data =
            let length = Cf_message.length data in
            let mark = mark_ + length in
            let buffer = Cf_message.push data buffer_ in
            let consume =
                match more with Iom_stream.Last -> false | _ -> consume_
            in
            if consume && not (mark < limits_.high) then begin
                flowTx#put `Wait >>= fun () ->
                Cf_cmonad.return (false, buffer, mark)
            end
            else
                Cf_cmonad.return (consume_, buffer, mark)
        
        method private shift_octets len =
            let mark = if mark_ - len > 0 then mark_ - len else 0 in
            let buffer = Cf_message.shiftq ~len buffer_ in
            if more_ = Iom_stream.More && not consume_ && mark < limits_.low
            then begin
                flowTx#put `Ready >>= fun () ->
                Cf_cmonad.return (true, buffer, mark)
            end
            else
                Cf_cmonad.return (consume_, buffer, mark)
    end

let no_limits_emitter_ = { low = 1; high = max_int }

class ['x, 'y] emitter ?limits flowTx xRx yTx =
    let limits =
        match limits with
        | None -> no_limits_emitter_
        | Some limits -> normalize_limits limits
    in
    object(self)
        inherit ['x, 'y] transform ~limits flowTx xRx yTx
        constraint 'y = #fragment
        
        method private fragment: 'x -> Iom_stream.more * Cf_message.t =
            fun _ -> Iom_stream.Last, []
        
        method private emit: 'y list * int = [], 0
        
        method push x =
            let more, data = self#fragment x in
            self#push_octets more data >>= fun (consume, buffer, mark) ->
            Cf_cmonad.return {<
                consume_ = consume; buffer_ = buffer; mark_ = mark;
                more_ = more;
            >}
        
        method shift =
            match
                try Cf_exnopt.U self#emit with x -> Cf_exnopt.X x
            with
            | Cf_exnopt.X _ as x ->
                Cf_cmonad.return x
            | Cf_exnopt.U (s, len) ->
                self#shift_octets len >>= fun (consume, buffer, mark) ->
                let obj =
                    match more_ with
                    | Iom_stream.More ->
                        Some {<
                            consume_ = consume; buffer_ = buffer; mark_ = mark
                        >}
                    | Iom_stream.Last ->
                        None
                in
                Cf_cmonad.return (Cf_exnopt.U (s, obj))
    end

let x_negative_adjust_ = "Iom_octet_stream.scanner: negative adjustment!"

let no_limits_scanner_ = { low = max_int; high = max_int }

class ['x, 'y] scanner ?limits flowTx xRx yTx =
    let limits =
        match limits with
        | None -> no_limits_scanner_
        | Some limits -> normalize_limits limits
    in
    object(self)
        inherit ['x, 'y] transform ~limits flowTx xRx yTx
        constraint 'x = #fragment
                
        method private tokens: Cf_message.t -> (char, 'y * int) Cf_llscan.t =
            fun _ -> Cf_llscan.nil
        
        method private scan =
            let rec loop s n m z =
                match
                    try Cf_exnopt.U (self#tokens m z) with x -> Cf_exnopt.X x
                with
                | Cf_exnopt.U (Some ((v, d), z)) ->
                    if d < 0 then
                        invalid_arg x_negative_adjust_
                    else
                        let s = v :: s in
                        if Lazy.force z = Cf_seq.Z then
                            Some (List.rev s, n)
                        else
                            loop s (n + d) (Cf_message.shift ~pos:d m) z
                | Cf_exnopt.U None
                | Cf_exnopt.X Buffering ->
                    if n > 0 then Some (List.rev s, n) else None
                | Cf_exnopt.X x ->
                    raise x
            in
            let x =
                match more_ with Iom_stream.More -> Some Buffering | _ -> None
            in
            let m = Cf_message.drain buffer_ in
            loop [] 0 m (Cf_message.to_seq ?x m)
        
        method push x =
            let more = x#more in
            self#push_octets more x#data >>= fun (consume, buffer, mark) ->
            Cf_cmonad.return {<
                consume_ = consume; buffer_ = buffer; mark_ = mark;
                more_ = more;
            >}
        
        method shift =
            match
                try Cf_exnopt.U self#scan with x -> Cf_exnopt.X x
            with
            | Cf_exnopt.X _ as v ->
                Cf_cmonad.return v
            | Cf_exnopt.U None ->
                Cf_cmonad.return (Cf_exnopt.U ([], Some self))
            | Cf_exnopt.U (Some (s, len)) ->
                self#shift_octets len >>= fun (consume, buffer, mark) ->
                let obj =
                    match more_ with
                    | Iom_stream.Last when mark_ > 0 ->
                        None
                    | _ ->
                        Some {<
                            consume_ = consume; buffer_ = buffer; mark_ = mark
                        >}
                in
                Cf_cmonad.return (Cf_exnopt.U (s, obj))
    end

class virtual ['x] buffer ~limits flowTx xRx yTx =
    object(self)
        constraint 'x = #fragment
        inherit ['x, 'x] transform ~limits flowTx xRx yTx
        
        method virtual private frame: Iom_stream.more -> Cf_message.t -> 'x
        
        method push x =
            let more = x#more in
            self#push_octets more x#data >>= fun (consume, buffer, mark) ->
            Cf_cmonad.return {<
                consume_ = consume; buffer_ = buffer; mark_ = mark;
                more_ = more;
            >}
        
        method shift =
            let rec loop acc buffer mark =
                if mark < limits.high then begin
                    match more_ with
                    | Iom_stream.Last ->
                        let data = (Cf_message.drain buffer) in
                        let frag = self#frame more_ data in
                        Cf_cmonad.return (frag :: acc, None)
                    | Iom_stream.More when not consume_ && mark < limits.low ->
                        flowTx#put `Ready >>= fun () ->
                        Cf_cmonad.return (acc, Some (buffer, mark, true))
                    | Iom_stream.More ->
                        Cf_cmonad.return (acc, Some (buffer, mark, consume_))
                end
                else begin
                    let m, buffer = Cf_message.pop ~len:limits.high buffer in
                    let frag = self#frame Iom_stream.More m in
                    loop (frag :: acc) buffer (mark - limits.high)
                end
            in
            loop [] buffer_ mark_ >>= fun (s, objOpt) ->
            match objOpt with
            | Some (b, m, c) ->
                let obj = {< buffer_ = b; mark_ = m; consume_ = c; >} in
                Cf_cmonad.return (Cf_exnopt.U (List.rev s, Some obj))
            | None ->
                Cf_cmonad.return (Cf_exnopt.U (List.rev s, None))
    end

let buffer_aux ~limits flowTx dataRx dataTx = object
    inherit [fragment] buffer ~limits flowTx dataRx dataTx
    method private frame = new fragment
end

let input_buffer ?(limits = no_limits_scanner_) ijack =
    Iom_layer.ingest ijack (buffer_aux ~limits)

let output_buffer ?(limits = no_limits_emitter_) ojack =
    Iom_layer.render ojack (buffer_aux ~limits)

let duplex_buffers
    ?(ilim = no_limits_scanner_) ?(olim = no_limits_emitter_) iojack =
    Iom_layer.duplex iojack (buffer_aux ~limits:ilim) (buffer_aux ~limits:olim)

(*--- $File$ ---*)