bufferevent_flush

Issue #36 on hold
Junt Van created an issue

http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html says that flush method is available

int bufferevent_flush(struct bufferevent *bufev,
    short iotype, enum bufferevent_flush_mode state);

but I can not find appropriate method...

My problem is: I'm closing socket by timeout, and sent data before close, but output buffer is not flushed before socket closed

object(EventBuffer)#30 (2) {
["length"]=>  int(18)
["contiguous_space"]=> int(18)                                                                                                              
}      

so client receive nothing...

I've tried different combination of available method before socket close, but nothing helps. Can you recommended me correct way to flush output buffer before close?

        if($bev instanceof \EventBufferEvent){
            $bev->write($response); 
             var_dump($bev->output);
            $bev->close();
        }

Comments (12)

  1. Junt Van reporter
    • edited description

    Found in examples exactly the same, that I need but for other protocol at pecl-event / examples / sslfilter.php

    case strncmp('QUIT', $line, 4):
                    $this->ev_write($id, "250 OK quit\r\n");
                    $this->ev_close($id);
                    break;
    
  2. Ruslan Osmanov repo owner

    As far as I can see, bufferevent_flush() currently does nothing for socket-based bufferevents:

    Currently (as of Libevent 2.0.5-beta), bufferevent_flush() is only implemented for some bufferevent types. In particular, socket-based bufferevents don’t have it.

    Try to call fflush(), if your bufferevent is based on PHP's resource.

  3. Junt Van reporter

    I'm creating connection using

      if (!empty($context) and $context instanceof \EventSslContext) {
                $bev = \EventBufferEvent::sslSocket($this->event_base, $fd, $context, \EventBufferEvent::SSL_ACCEPTING, \EventBufferEvent::OPT_CLOSE_ON_FREE);
            } else {
                $bev = new \EventBufferEvent($this->event_base, $fd, \EventBufferEvent::OPT_CLOSE_ON_FREE | \EventBufferEvent::OPT_DEFER_CALLBACKS);
            }
    

    tried to reach "row" socket, but did not find a way to reach it, $fd in every object is just int. Any other ideas how to do it using libevent?

  4. Junt Van reporter

    OPT_DEFER_CALLBACKS and watermarks did not help... will try others approaches and inform if success

  5. Ruslan Osmanov repo owner

    You can flush bufferevent by freeing it. The \EventBufferEvent::OPT_CLOSE_ON_FREE option may not help due to PHP's reference counting, if you do not destroy the EventBufferEvent explicitly. So if you don't need the bufferevent and want to flush it, you can explicitly destroy and free the object by assigning null:

    <?php
    $bev->free();
    // free() may be not enough, if you have references to this object.
    $bev = null;
    
  6. Fred Brown

    Junt Van, you mention timeout, I could be misunderstanding, but if a connection has timeout then you should consider it broken. You cannot flush a broken connection. If EventBufferEvent could send the data it would do so without any intervention.

  7. Jonny Big

    Same problem

    <?php
    
    class MyListener {
    
        public $base,$listener,$socket,$addr,$bev;
    
        public function __construct($addr) {
            $this->addr = $addr;
            $this->base = new EventBase();
            $this->socket = stream_socket_server($this->addr, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, stream_context_create());
            $this->listener = new EventListener($this->base, array($this, 'accept'), $this->base, EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $this->socket);
            $this->listener->setErrorCallback(array($this, 'accept_error'));
            $this->base->dispatch();
        }
    
        public function accept($listener, $fd, $address, $ctx) {
            $this->bev[$fd] = new EventBufferEvent($this->base, $fd, EventBufferEvent::OPT_CLOSE_ON_FREE);
            $this->bev[$fd]->setCallbacks(array($this, 'onRead'), NULL, array($this, 'onError'), $fd);
            $this->bev[$fd]->setWatermark(Event::READ | Event::WRITE, 0, 0xffffff);
            $this->bev[$fd]->enable(Event::READ | Event::WRITE);
    
            $resp = "HTTP/1.1 200 OK\r\n"
            . "Content-Type: text/html; charset=utf-8\r\n"
            //. "Content-Length: 0\r\n"
            //. "Connection: close\r\n"
            . "\r\n";
    
            $this->bev[$fd]->write($resp);
    
            $this->bev[$fd]->disable(Event::READ | Event::WRITE);
            $this->bev[$fd]->close();
            unset($this->bev[$fd]);
        }
    
        public function accept_error($listener, $ctx) {
            fprintf(STDERR, "Got an error %d (%s) on the listener. "
                ."Shutting down.\n",
                EventUtil::getLastSocketErrno(),
                EventUtil::getLastSocketError());
        }
    
        public function onRead($bev, $fd) {
            $input  = $bev->getInput();
            while($str = $input->read(255)) { echo $str; }
        }
    
        public function onError($bev, $events, $ctx) {
            if ($events & EventBufferEvent::ERROR) {
                echo "Error from bufferevent\n";
            }
            if ($events & (EventBufferEvent::EOF | EventBufferEvent::ERROR)) {
                //$bev->free();
            }
        }
    }
    
    return new MyListener('tcp://127.0.0.1:10080');
    

    curl -vv http://127.0.0.1:10080/

    * About to connect() to 127.0.0.1 port 10080 (#0)
    *   Trying 127.0.0.1... connected
    * Connected to 127.0.0.1 (127.0.0.1) port 10080 (#0)
    > GET / HTTP/1.1
    > User-Agent: curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.27.1 zlib/1.2.3 libidn/1.18 libssh2/1.4.2
    > Host: 127.0.0.1:10080
    > Accept: */*
    > 
    * Empty reply from server
    * Connection #0 to host 127.0.0.1 left intact
    curl: (52) Empty reply from server
    * Closing connection #0
    

    Empty reply from server

    Expect

    * About to connect() to 127.0.0.1 port 10080 (#0)
    *   Trying 127.0.0.1... connected
    * Connected to 127.0.0.1 (127.0.0.1) port 10080 (#0)
    > GET / HTTP/1.1
    > User-Agent: curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.27.1 zlib/1.2.3 libidn/1.18 libssh2/1.4.2
    > Host: 127.0.0.1:10080
    > Accept: */*
    > 
    < HTTP/1.1 200 OK
    < Content-Type: text/html; charset=utf-8
    * no chunk, no close, no size. Assume close to signal end
    < 
    * Closing connection #0
    
  8. Jonny Big

    Found right way to do it.

    Must set onWrite callback and detect when buffers are empty.

    And then close.

    Working Example

    <?php
    
    class MyListener {
    
        public $base,$listener,$socket,$addr,$bev;
    
        public function __construct($addr) {
            $this->addr = $addr;
            $this->base = new EventBase();
            $this->socket = stream_socket_server($this->addr, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, stream_context_create());
            $this->listener = new EventListener($this->base, array($this, 'accept'), $this->base, EventListener::OPT_CLOSE_ON_FREE | EventListener::OPT_REUSEABLE, -1, $this->socket);
            $this->listener->setErrorCallback(array($this, 'accept_error'));
    
            $this->base->dispatch();
        }
    
        public function dispatch() {
        }
    
        public function accept($listener, $fd, $address, $ctx) {
            //$sock = EventUtil::getSocketByFd($fd);
            $this->bev[$fd] = new stdClass();
            $this->bev[$fd]->sent = 0;
            $this->bev[$fd]->bev = new EventBufferEvent($this->base, $fd, EventBufferEvent::OPT_CLOSE_ON_FREE | EventBufferEvent::OPT_DEFER_CALLBACKS);
            $this->bev[$fd]->bev->setCallbacks(array($this, 'onRead'), array($this, 'onWrite'), array($this, 'onError'), $fd);
            //$this->bev[$fd]->bev->setWatermark(Event::READ | Event::WRITE, 0, 0xffffff);
            $this->bev[$fd]->bev->enable(Event::READ | Event::WRITE);
    
            if (!$this->bev[$fd]->sent) {
                $this->bev[$fd]->sent = 1;
    
                $str = "HTTP/1.1 200 OK\r\n"
                . "Content-Type: text/html; charset=utf-8\r\n"
                //. "Content-Length: 0\r\n"
                //. "Connection: close\r\n"
                . "\r\n";
    
                $bev =& $this->bev[$fd]->bev;
                $output = $bev->getOutput();
                $output->add($str);
            }
        }
    
        public function accept_error($listener, $ctx) {
            if ($errno = EventUtil::getLastSocketErrno()) {
                fprintf(STDERR, "Got an error %d (%s) on the listener. "
                    ."Shutting down.\n",
                    EventUtil::getLastSocketErrno(),
                    EventUtil::getLastSocketError());
    
                $this->base->exit(NULL);
            }
        }
    
        public function onRead($bev, $fd) {
            $input  = $bev->getInput();
            if ($input->length > 0) {
                while($input->length > 0) {
                    $buffer = $input->read(32);
                    if ($buffer !== "") {
                        echo $buffer;
                    } else {
                        break;
                    }
                }
            }
        }
    
        public function onWrite($bev, $fd) {
            if ($this->bev[$fd]->sent) {
                $input = $bev->input;
                $output = $bev->output;
                if (!$output->length && !$input->length) {
                    $bev->disable(Event::READ | Event::WRITE);
                    $bev->close();
                    unset($this->bev[$fd]);
                }
            }
        }
    
        public function onError($bev, $events, $ctx) {
            if ($events & EventBufferEvent::ERROR) {
                echo "Error from bufferevent\n";
            }
            if ($events & (EventBufferEvent::EOF | EventBufferEvent::ERROR)) {
                $bev->free();
            }
        }
    }
    
    return new MyListener('tcp://127.0.0.1:10080');
    
  9. Log in to comment