Source

oni / iom / t / t_relay.ml

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
(*---------------------------------------------------------------------------*
  $Change$
  Copyright (c) 2006-2010, James H. Woodyatt
  All rights reserved.
  
  Redistribution and use in source and binary forms, with or without
  modification, are permitted provided that the following conditions
  are met:
  
    Redistributions of source code must retain the above copyright
    notice, this list of conditions and the following disclaimer.
    
    Redistributions in binary form must reproduce the above copyright
    notice, this list of conditions and the following disclaimer in
    the documentation and/or other materials provided with the
    distribution
  
  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
  COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
  INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
  STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
  OF THE POSSIBILITY OF SUCH DAMAGE. 
 *---------------------------------------------------------------------------*)

let jout = Cf_journal.stdout
let _ = jout#setlimit `None
let _ = Random.self_init ()
let _ = Sys.set_signal Sys.sigpipe Sys.Signal_ignore

module Opt: sig
    type mode = Client | Server
    
    val mode: mode
    val addr: Nx_tcp6_socket.address
    val maxEndpoints: int
    val tcpBacklog: int
    val numSessions: int
    val frameSize: int
    val readLimits: Iom_octet_stream.limits
    val maxPipeline: int
    val framesTx: int
end = struct
    open Arg
    
    type mode = Client | Server
    
    let mode_ = ref Client
    let hostName_ = ref (Nx_ip6_addr.ntop Nx_ip6_addr.unspecified)
    let portName_ = ref None
    let numSessions_ = ref 10
    let maxEndpoints_ = ref 5
    let tcpBacklog_ = ref 10
    let frameSize_ = ref 100
    let readBufHigh_ = ref 1024
    let maxPipeline_ = ref 10
    let framesTx_ = ref 100
    
    let specs_ = Arg.align [
        "-b", Set_int tcpBacklog_, " Server TCP connection backlog.";
        "-h", String (fun s -> hostName_ := s), " Host name (address).";
        "-m", Set_int maxEndpoints_, " Maximum simultaneous endpoints.";
        "-n", Set_int numSessions_, " Number of client sessions.";
        "-p", String (fun s -> portName_ := Some s), " Port name (number).";
        "-s", Unit (fun () -> mode_ := Server), " Server mode.";
        "-F", Set_int frameSize_, " Frame size.";
        "-r", Set_int readBufHigh_, " Read buffer limit.";
        "-Q", Set_int maxPipeline_, " Maximum number of frames in pipeline.";
        "-X", Set_int framesTx_, " Total number of frames to transmit.";
    ]
    
    let anon_ p =
        jout#error "unexpected anonymous argument '%s'" p
    
    let usage_ =
        Printf.sprintf
            "Usage: %s [-s] [-h host] [-p port] [-m maxEndpoints] \
            [-n numSessions] [-b tcpBacklog]"
            Sys.executable_name
    
    let () =
        parse specs_ anon_ usage_
    
    let mode = !mode_
    
    module NI = Nx_nameinfo
    module TCP4 = Iom_tcp4_socket
    module TCP6 = Iom_tcp6_socket
    module IP4 = Nx_ip4_addr
    module IP6 = Nx_ip6_addr
    
    let addr =
        let portName =
            match mode, !portName_ with
            | Client, None ->
                jout#error "remote service is required";
                usage specs_ usage_;
                exit (-1)
            | Server, None ->
                "0"
            | _, Some p ->
                p
        in
        let hostName =
            match mode with
            | Server ->
                !hostName_
            | Client ->
                if !hostName_ = "::" then "::1" else !hostName_
        in
        let rec loop = function
            | [] ->
                assert (not true);
                jout#fail "resolver error"
            | ai :: tl ->
                try
                    TCP6.P.AF.of_sockaddr
                        (NI.specialize_sockaddr ai.NI.ai_addr TCP6.P.AF.domain)
                with
                | Not_found ->
                    try
                        let addr =
                            NI.specialize_sockaddr ai.NI.ai_addr
                                TCP4.P.AF.domain
                        in
                        let host, port = TCP4.P.AF.of_sockaddr addr in
                        let host = IP4.is_unicast host in
                        let host = IP6.to_v4mapped host in
                        (host, port, 0l :> TCP6.address)
                    with
                    | Not_found
                    | Failure _ ->
                        loop tl
        in
        let niFlags =
            match mode with
            | Server -> {
                NI.to_address_default_flags with NI.ai_passive = true
              }
            | Client ->
                NI.to_address_default_flags
        in
        let niHint =
            NI.addrinfo_hint ~flags:niFlags NI.P.AF.domain
                Nx_socket.SOCK_STREAM.socktype NI.P.protocol
        in
        let niArg =
            if mode = Server && hostName = "::" then
                NI.A_servicename portName
            else
                NI.A_bothnames (hostName, portName)
        in
        loop begin
            try NI.to_addresses niHint niArg with
            | NI.Unresolved _ as x ->
                jout#warn "unresolved remote address";
                raise x
        end
    
    let numSessions =
        if !numSessions_ < 1 then begin
            jout#error "number of sessions must be positive integer";
            usage specs_ usage_;
            exit (-1)
        end;
        !numSessions_
    
    let maxEndpoints =
        if !maxEndpoints_ < 1 then begin
            jout#error "max simultaneous endpoints must be positive integer";
            usage specs_ usage_;
            exit (-1)
        end;
        if !maxEndpoints_ > !numSessions_ then maxEndpoints_ := !numSessions_;
        !maxEndpoints_
    
    let tcpBacklog =
        if !tcpBacklog_ < 1 then begin
            jout#error "server TCP backlog must be positive integer";
            usage specs_ usage_;
            exit (-1)
        end;
        !tcpBacklog_
    
    let frameSize =
        if !frameSize_ < 0 then begin
            jout#error "frame size must be non-negative integer";
            usage specs_ usage_;
            exit (-1)
        end;
        !frameSize_
    
    let readLimits =
        let high = frameSize + 100 in
        if !readBufHigh_ < high then readBufHigh_ := high;
        { Iom_octet_stream.low = 1; high = !readBufHigh_ }
    
    let maxPipeline =
        if !maxPipeline_ < 1 then begin
            jout#error "max pipeline must be non-negative integer";
            usage specs_ usage_;
            exit (-1)
        end;
        !maxPipeline_
    
    let framesTx =
        if !framesTx_ < 1 then begin
            jout#error "total frames to transmit must be non-negative integer";
            usage specs_ usage_;
            exit (-1)
        end;
        !framesTx_
end

open Cf_cmonad.Op

module G = Iom_gadget
module R = Iom_reactor
module TCP = Iom_tcp6_socket
module IP6 = Nx_ip6_addr
module IP4 = Nx_ip4_addr

module N_set = Cf_rbtree.Set(Cf_ordered.Int_order)
module N_map = Cf_rbtree.Map(Cf_ordered.Int_order)

module Ordered_by_TCP_address = struct
    type t = TCP.address

    let compare (addr1, port1, scope1 : t) (addr2, port2, scope2 : t) =
        let n = port2 - port1 in if n <> 0 then n
        else let n = Int32.compare scope1 scope2 in if n <> 0 then n
        else Nx_ip6_addr.compare addr1 addr2
end

module A_set = Cf_rbtree.Set(Ordered_by_TCP_address)
module A_map = Cf_rbtree.Map(Ordered_by_TCP_address)

let sessionIdSeq =
    let count = ref 0 in
    Cf_seq.of_function begin fun () ->
        incr count;
        let n = !count in
        if n > Opt.numSessions then raise Not_found;
        n
    end

let socket_ntop_ addr =
    let host, port, scope = addr in
    let hostname = IP6.ntop host in
    let ifname =
        try "%" ^ (Nx_netif.indextoname (Int32.to_int scope))
        with Not_found -> ""
    in
    Printf.sprintf "%s%s/%u" hostname ifname port

class common signalJack =
    let signalRx, signalTx = signalJack in
    object(self)
        inherit Iom_gadget.start
        inherit Iom_gadget.next
        
        method private interrupt (`Signal _) =
            jout#notice "Interrupted!";
            signalTx#put `Stop
        
        method private guard =
            signalRx#get self#interrupt
    end

class frame more data id = object
    inherit Iom_octet_stream.fragment more data
    method id: string = id
end

module type Frame = sig    
    type t = Data of string * Cf_message.t | Close
    val parse: Cf_message.t -> (t * int) Cf_clex.t
    val emit: string -> Cf_message.t -> Cf_message.t
end

module Frame: Frame = struct
    open Cf_llscan.Op
    
    type t = Data of string * Cf_message.t | Close
    
    let header_ =
        let scan id n c = id, n, c in
        Cf_fmt_llscan.scanf "FRAME %s %u\r\n%n" scan
    
    let trailer_ = Cf_llscan.lit "END\r\n" ()
    
    let body_ pos m z =
        let z = Cf_seq.shift pos z in
        let _ = Lazy.force z in
        Some (Cf_message.truncate ~pos m, z)
    
    let frame_ m =
        header_ >>= fun (id, n, c) ->
        let m = Cf_message.shift ~pos:c m in
        body_ n m >>= fun body ->
        trailer_ >>= fun () ->
        Cf_llscan.ret (Data (id, body), n + c + 5)
    
    let final_ =
        Cf_llscan.fin >>= fun () ->
        Cf_llscan.ret (Close, 0)
    
    let parse m = Cf_llscan.alt [ final_; frame_ m; Cf_llscan.err () ]
    
    let terminal_ = [ "END\r\n", 0, 5 ]
    
    let emit id body =
        let n = Cf_message.length body in
        let header = Printf.sprintf "FRAME %s %u\r\n" id n in
        (header, 0, String.length header) :: (body @ terminal_)
end

class scanner ?limits waitTx xRx yTx = object
    inherit [Iom_octet_stream.fragment, Frame.t]
        Iom_octet_stream.scanner ?limits waitTx xRx yTx
    
    method private lex = Frame.parse
end

class emitter ?limits waitTx xRx yTx = object
    inherit [Frame.t, Iom_octet_stream.fragment]
        Iom_octet_stream.emitter ?limits waitTx xRx yTx
    
    method private fragment = function
        | Frame.Data (id, body) -> Iom_stream.More, Frame.emit id body
        | Frame.Close -> Iom_stream.Last, []
    
    method private emit =
        let m = Cf_message.drain buffer_ in
        [ new Iom_octet_stream.fragment more_ m ], mark_
end

let framer ?s ?addrs k =
    TCP.endpoint ?s ?addrs ~limits:Opt.readLimits k >>= fun fix ->
    let _ = {
        Opt.readLimits with
        Iom_octet_stream.low = Opt.readLimits.Iom_octet_stream.high
    } in
    Iom_layer.duplex fix
        (new scanner ?limits:None) (new emitter ?limits:None)

let tag_chars =
    "ABCDEFGHIJKLMNOPQRSTUVQXYZabcdefghijklmnopqrstuvwxyz0123456789"

let num_tag_chars = String.length tag_chars - 1

let clientFrameData =
    let s = String.make Opt.frameSize 'x' in
    for i = 0 to Opt.frameSize - 1 do
        s.[i] <- tag_chars.[Random.int num_tag_chars]
    done;
    Cf_message.create s

let clientSessionObj framerJack clientPlug =
    let (frameRx, frameTx), (ntfyRx, ctrlTx) =
        (framerJack : (Frame.t, Frame.t, 'fctrl, 'fntfy) Iom_stream.iofix)
    in
    let ctrlRx, ntfyTx = clientPlug in
    let tag_ = String.make 16 'x' in
    for i = 0 to 15 do tag_.[i] <- tag_chars.[Random.int num_tag_chars] done;
    object(self)
        inherit Iom_gadget.start as super
        inherit Iom_gadget.next
        
        val framesSent_ = 0
        val pipelineSet_ = N_set.nil
        
        method private put_frame_ n set =
            let frameId = Printf.sprintf "%s:%u" tag_ n in
            frameTx#put (Frame.Data (frameId, clientFrameData)) >>= fun () ->
            let n = succ n in
            let set = N_set.put n set in
            Cf_cmonad.return (n, set)
        
        method private put_next_frame set =
            self#put_frame_ framesSent_ set >>= fun (n, set) ->
            {< framesSent_ = n; pipelineSet_ = set >}#next
                        
        method private notify = function
            | #Iom_stream.ready ->
                assert (jout#debug "Client %s is READY." tag_);
                self#next
            | #Iom_stream.wait ->
                assert (jout#debug "Client %s is BLOCKING." tag_);
                self#next
            | `Failed x ->
                jout#error "clientSessionObj#notify: `Failed! (%s)"
                    (Printexc.to_string x);
                raise x
        
        method private control `Stop = ctrlTx#put `Stop
        
        method private receive = function
            | Frame.Close ->
                jout#info "Client %s is CLOSED." tag_;
                ntfyTx#put `Done >>= fun () ->
                Iom_gadget.abort
            | Frame.Data (frameId, _) ->
                try
                    let tag, n =
                        Scanf.sscanf frameId "%16s:%u" (fun a b -> a, b)
                    in
                    assert begin
                        Printf.printf "%s\r" frameId;
                        flush stdout;
                        true
                    end;
                    if tag = tag_ && N_set.member n pipelineSet_ then
                        let pipelineSet = N_set.clear n pipelineSet_ in
                        if framesSent_ < Opt.framesTx then
                            self#put_next_frame pipelineSet
                        else
                            frameTx#put Frame.Close >>= fun () ->
                            self#next
                    else
                        self#next
                with
                | x ->
                    jout#error "Client %s receiving frameId=%s (%s)"
                        tag_ frameId (Printexc.to_string x);
                    ctrlTx#put `Stop >>= fun () ->
                    ntfyTx#put (`Failed x)
        
        method private guard =
            ntfyRx#get self#notify >>= fun () ->
            ctrlRx#get self#control >>= fun () ->
            frameRx#get self#receive
        
        method start =
            if framesSent_ > 0 then begin
                ctrlTx#put `Ready >>= fun () ->
                super#start
            end
            else begin
                let rec loop n set =
                    if n < Opt.maxPipeline then begin
                        self#put_frame_ n set >>= fun (n, set) ->
                        loop n set
                    end
                    else
                        {< framesSent_ = n; pipelineSet_ = set >}#start
                in
                loop framesSent_ pipelineSet_
            end
    end

let clientSession k =
    let addrs = TCP.unspecified_address, Opt.addr in
    framer ~addrs k >>= fun framerJack ->
    Iom_gadget.create (clientSessionObj framerJack)

let clientObj intJack k =
    let intNtfyRx, intCtrlTx = intJack in
    object(self)
        inherit Iom_gadget.start as super
        inherit Iom_gadget.next
        
        val sessionSeq_ =
            Cf_seq.unfold begin function
                | x when x < Opt.numSessions -> Some (x, succ x)
                | _ -> None
            end 0
        
        val sessionMap_ = N_map.nil
        
        method private interrupt (`Signal _) =
            jout#notice "Client interrupted.";
            intCtrlTx#put `Stop >>= fun () ->
            let z = N_map.to_seq_incr sessionMap_ in
            let z = Cf_seq.map begin fun (n, (_, ctrlTx)) ->
                jout#info "Stopping client %u." n;
                ctrlTx#put `Stop
            end z in
            Cf_seq.C.sequence z
                
        method private client_failed_ _ x =
            jout#fail "Client error: %s" (Printexc.to_string x)
        
        method private client_done_ n =
            jout#info "Client %u finished." n;
            let sessionMap = N_map.delete n sessionMap_ in
            match Lazy.force sessionSeq_ with
            | Cf_seq.Z ->
                if N_map.empty sessionMap then begin
                    intCtrlTx#put `Stop >>= fun () ->
                    Iom_gadget.abort
                end
                else
                    {< sessionMap_ = sessionMap >}#next
            | Cf_seq.P (n, sessionSeq) ->
                clientSession k >>= fun fix ->
                let sessionMap = N_map.replace (n, fix) sessionMap in
                {< sessionSeq_ = sessionSeq; sessionMap_ = sessionMap >}#next
        
        method private client_notify_ n = function
            | `Failed x -> self#client_failed_ n x
            | `Done -> self#client_done_ n
        
        method private guard_client_notify_ pair =
            let n, (notifyRx, _) = pair in
            notifyRx#get (self#client_notify_ n)
        
        method private guard_clients_ =
            let z = N_map.to_seq_incr sessionMap_ in
            let z = Cf_seq.map self#guard_client_notify_ z in
            Cf_seq.C.sequence z
        
        method private guard =
            intNtfyRx#get self#interrupt >>= fun () ->
            self#guard_clients_
        
        method start =
            if not (N_map.empty sessionMap_) then begin
                intCtrlTx#put `Ready >>= fun () ->
                super#start
            end
            else begin
                let sessionSeq = Cf_seq.shift Opt.maxEndpoints sessionSeq_ in
                let z = Cf_seq.limit Opt.maxEndpoints sessionSeq_ in
                let pairMSeq = Cf_seq.map begin fun n ->
                    clientSession k >>= fun fix ->
                    Cf_cmonad.return (n, fix)
                end z in
                Cf_seq.C.accumulate pairMSeq >>= fun pairList ->
                let sessionMap = N_map.of_list_incr pairList in
                let obj = {<
                    sessionSeq_ = sessionSeq;
                    sessionMap_ = sessionMap;
                >} in
                obj#start
            end
    end

let client k =
    jout#notice "Starting client to %s." (socket_ntop_ Opt.addr);
    R.signaler ~n:Sys.sigint k >>= fun intJack ->
    (clientObj intJack k)#start

let serverObj intJack listenJack k =
    let intNtfyRx, intCtrlTx = intJack in
    let connectionRx, (listenNtfyRx, listenCtrlTx) = listenJack in
    object(self)
        inherit Iom_gadget.start as super
        inherit Iom_gadget.next
        
        val numEndpoints_ = 0
        val numSessions_ = 0
        val clientMap_ = A_map.nil
        val closingMap_ = A_map.nil
        val blockedSet_ = A_set.nil
        val readyQ_ = Cf_deque.nil
        
        method private interrupt (`Signal _) =
            listenCtrlTx#put `Stop >>= fun () ->
            intCtrlTx#put `Stop >>= fun () ->
            let z = Cf_deque.B.to_seq readyQ_ in
            let z = Cf_seq.map (fun (_, (_, (_, x))) -> x#put `Stop) z in
            Cf_seq.C.sequence z >>= fun () ->
            jout#notice "Server interrupted.";
            Iom_gadget.abort
        
        method private listen = function
            | `Listen addr ->
                jout#info "Server listening at %s." (socket_ntop_ addr);
                self#next
            | `Failed x ->
                intCtrlTx#put `Stop >>= fun () ->
                jout#error "Server error: %s" (Printexc.to_string x);
                Iom_gadget.abort
        
        method private connection c =
            let numEndpoints = succ numEndpoints_ in
            let numSessions = succ numSessions_ in
            begin
                if
                    numSessions < Opt.numSessions &&
                    numEndpoints < Opt.maxEndpoints
                then
                    Cf_cmonad.return ()
                else if numSessions < Opt.numSessions then
                    listenCtrlTx#put `Wait
                else
                    listenCtrlTx#put `Stop
            end >>= fun () ->
            framer ~s:c#socket k >>= fun clientJack ->
            let _, (_, clientTx) = clientJack in
            clientTx#put `Ready >>= fun () ->
            let pair = c#remote, clientJack in
            let clientMap = A_map.replace pair clientMap_ in
            let readyQ = Cf_deque.A.push pair readyQ_ in
            jout#info "Service started for client at %s."
                (socket_ntop_ c#remote);
            {<
                numEndpoints_ = numEndpoints;
                numSessions_ = numSessions;
                clientMap_ = clientMap;
                readyQ_ = readyQ;
            >}#next
        
        method private client_ready_ pair =
            let addr, _ = pair in
            let addrStr = socket_ntop_ addr in
            jout#info "Client at %s is READY." addrStr;
            if A_set.member addr blockedSet_ then begin
                let blockedSet = A_set.clear addr blockedSet_ in
                begin
                    if A_set.empty blockedSet then begin
                        let z = Cf_deque.B.to_seq readyQ_ in
                        let z = Cf_seq.map begin fun (_, (_, (_, ctrlTx))) -> 
                            ctrlTx#put `Ready
                        end z in
                        Cf_seq.C.sequence z
                    end
                    else
                        Cf_cmonad.return ()
                end >>= fun () ->
                {< blockedSet_ = blockedSet >}#next
            end
            else
                self#next
        
        method private client_wait_ pair =
            let addr, _ = pair in
            assert (not (A_set.member addr blockedSet_));
            let addrStr = socket_ntop_ addr in
            jout#info "Client at %s is BLOCKING." addrStr;
            begin
                if A_set.empty blockedSet_ then begin
                    let z = Cf_deque.B.to_seq readyQ_ in
                    let z = Cf_seq.map begin fun (_, (_, (_, ctrlTx))) -> 
                        ctrlTx#put `Wait
                    end z in
                    Cf_seq.C.sequence z
                end
                else
                    Cf_cmonad.return ()
            end >>= fun () ->
            {< blockedSet_ = A_set.put addr blockedSet_ >}#next
        
        method private client_remove_ addr =
            jout#info "Removing client at %s n=%u." (socket_ntop_ addr)
                numSessions_;
            begin
                if
                    numEndpoints_ < Opt.maxEndpoints &&
                    numSessions_ < Opt.numSessions
                then
                    Cf_cmonad.return (pred numEndpoints_)
                else
                    let _, (_, listenCtrlTx) = listenJack in
                    listenCtrlTx#put `Ready >>= fun () ->
                    Cf_cmonad.return (pred numEndpoints_)
            end >>= fun numEndpoints ->
            let clientMap = A_map.delete addr clientMap_ in
            if clientMap == clientMap_ then
                jout#error "client map UNCHANGED.";
            let closingMap = A_map.delete addr closingMap_ in
            if closingMap == closingMap_ then
                jout#error "closing map UNCHANGED.";
            if
                not (numSessions_ < Opt.numSessions) &&
                A_map.empty closingMap &&
                A_map.empty clientMap
            then begin
                intCtrlTx#put `Stop >>= fun () ->
                Iom_gadget.abort
            end
            else begin
                let readyList = A_map.to_list_decr clientMap_ in
                let readyQ = Cf_deque.A.of_list readyList in
                jout#info "clients=%u closing=%u"
                    (A_map.size clientMap) (A_map.size closingMap);
                {<
                    numEndpoints_ = numEndpoints;
                    clientMap_ = clientMap;
                    closingMap_ = closingMap;
                    readyQ_ = readyQ;
                >}#next
            end
        
        method private client_complete_ pair =
            let addr, _ = pair in
            let addrStr = socket_ntop_ addr in
            jout#info "Client at %s is CLOSED." addrStr;
            self#client_remove_ addr
        
        method private client_failed_ pair x =
            let addr, _ = pair in
            let addrStr = socket_ntop_ addr in
            let errorStr = Printexc.to_string x in
            jout#error "Client at %s FAILED (%s)" addrStr errorStr;
            self#client_remove_ addr
        
        method private client_notify_ pair = function
            | `Failed x -> self#client_failed_ pair x
            | `Ready -> self#client_ready_ pair
            | `Wait -> self#client_wait_ pair
        
        method private client_receive_ pair frame =
            let addr, ((_, frameTx), _) = pair in
            let addrStr = socket_ntop_ addr in
            match frame with
            | Frame.Data (frameId, _) as frame ->
                assert begin
                    Printf.printf "%s\r" frameId;
                    flush stdout;
                    true
                end;
                let z = Cf_deque.B.to_seq readyQ_ in
                let z = Cf_seq.map begin fun (_, ((_, frameTx), _)) -> 
                    frameTx#put frame
                end z in
                Cf_seq.C.sequence z >>= fun () ->
                let readyQ =
                    let hd = Cf_deque.B.head readyQ_ in
                    let tl = Cf_deque.B.tail readyQ_ in
                    Cf_deque.A.push hd tl
                in
                {< readyQ_ = readyQ >}#next
            | Frame.Close ->
                jout#info "Service closing for client at %s." addrStr;
                frameTx#put Frame.Close >>= fun () ->
                let clientMap = A_map.delete addr clientMap_ in
                let readyList = A_map.to_list_decr clientMap_ in
                let readyQ = Cf_deque.A.of_list readyList in
                assert (not (A_map.member addr closingMap_));
                let closingMap = A_map.replace pair closingMap_ in
                {<
                    clientMap_ = clientMap;
                    closingMap_ = closingMap;
                    readyQ_ = readyQ;
                >}#next
        
        method private guard_listener_ =
            if numEndpoints_ >= Opt.maxEndpoints then
                Cf_cmonad.return ()
            else
                connectionRx#get self#connection
        
        method private guard_client_notify_ pair =
            let _, (_, (notifyRx, _)) = pair in
            notifyRx#get (self#client_notify_ pair)
        
        method private guard_client_receive_ pair =
            let _, ((frameRx, _), _) = pair in
            frameRx#get (self#client_receive_ pair)
        
        method private guard_clients_ =
            let z = A_map.to_seq_incr closingMap_ in
            let z = Cf_seq.map self#guard_client_notify_ z in
            Cf_seq.C.sequence z >>= fun () ->
            if not (A_set.empty blockedSet_) then
                Cf_cmonad.return ()
            else begin
                let z0 = Cf_deque.A.to_seq readyQ_ in
                let z1 = Cf_seq.map self#guard_client_notify_ z0 in
                let z2 = Cf_seq.map self#guard_client_receive_ z0 in
                let z = Cf_seq.concat z1 z2 in
                Cf_seq.C.sequence z
            end
        
        method private guard =
            intNtfyRx#get self#interrupt >>= fun () ->
            listenNtfyRx#get self#listen >>= fun () ->
            self#guard_listener_ >>= fun () ->
            self#guard_clients_
        
        method start =
            intCtrlTx#put `Ready >>= fun () ->
            listenCtrlTx#put `Ready >>= fun () ->
            super#start
    end

let server k =
    jout#notice "Starting server with backlog=%u." Opt.tcpBacklog;
    R.signaler ~n:Sys.sigint k >>= fun intJack ->
    TCP.listener ~src:Opt.addr ~backlog:Opt.tcpBacklog k >>= fun listenJack ->
    let server = serverObj intJack listenJack k in
    server#start

let () =
    let reactor =
        match Opt.mode with
        | Opt.Client -> client
        | Opt.Server -> server
    in
    begin
        try G.run reactor with
        | Unix.Unix_error (error, where, arg) ->
            let error = Unix.error_message error in
            jout#fail "Unix error: '%s' in %s(%s)" error where arg
    end;
    jout#notice "Finished."

(*--- $File$ ---*)