Commits

Anonymous committed b12aff6 Draft

can now shutdown ChannelServer

Comments (0)

Files changed (6)

Cineraria.Net.Serve/ChannelServer.cs

 
 namespace Cineraria.Net.Serve
 {
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
     using System.Net;
 
     using Cineraria.Concurrency;
         private readonly Channel<ConnectionEvent> _eventChannel;
 
         /// <summary>
+        /// This server's <see cref="InputListener"/> instances.
+        /// </summary>
+        private List<InputListener> _inputListenerInstances;
+
+        /// <summary>
+        /// This server's <see cref="OutputSender"/> instances.
+        /// </summary>
+        private List<OutputSender> _outputSenderInstances;
+
+        /// <summary>
+        /// This server's <see cref="ConnectionListener"/> instance.
+        /// </summary>
+        private ConnectionListener _connectionListenerInstance;
+
+        /// <summary>
+        /// The thread group that contains this server's threads.
+        /// </summary>
+        private ThreadGroup _runningThreadGroup;
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="ChannelServer"/>
         /// class.
         /// </summary>
         /// </param>
         public void Start(int inputListenerThreads, int outputSenderThreads)
         {
-            new ThreadGroup()
+            if (inputListenerThreads <= 0)
+            {
+                throw new ArgumentOutOfRangeException(
+                    "inputListenerThreads",
+                    "inputListenerThreads must be positive");
+            }
+            else if (outputSenderThreads <= 0)
+            {
+                throw new ArgumentOutOfRangeException(
+                    "outputSenderThreads",
+                    "outputSenderThreads must be positive");
+            }
+            else if (this._inputListenerInstances != null)
+            {
+                throw new InvalidOperationException(
+                    "Cannot start a server that has already been started.");
+            }
+
+            this._inputListenerInstances = new List<InputListener>(
+                Enumerable.Range(0, inputListenerThreads)
+                    .Select(_ => new InputListener(
+                        this._eventChannel,
+                        this._inputListenChannel)));
+            this._outputSenderInstances = new List<OutputSender>(
+                    Enumerable.Range(0, outputSenderThreads)
+                    .Select(_ => new OutputSender(this._outputChannel)));
+            this._connectionListenerInstance = new ConnectionListener(
+                    this._endPoint,
+                    this._eventChannel,
+                    this._inputListenChannel,
+                    this._outputChannel);
+
+            this._runningThreadGroup = new ThreadGroup()
                 .AddOne(
-                    () => new ConnectionListener(
-                        this._endPoint,
-                        this._eventChannel,
-                        this._inputListenChannel,
-                        this._outputChannel).Run(),
+                    this._connectionListenerInstance.Run,
                     "Connection Listener")
                 .AddMany(
-                    threadNumber => new InputListener(
-                        this._eventChannel,
-                        this._inputListenChannel).Run(),
+                    n => this._inputListenerInstances[n].Run(),
                     inputListenerThreads,
                     "Input Listener")
                 .AddMany(
-                    threadNumber => new OutputSender(this._outputChannel)
-                        .Run(),
+                    n => this._outputSenderInstances[n].Run(),
                     outputSenderThreads,
-                    "Output Sender")
-                .StartAll();
+                    "Output Sender");
+            this._runningThreadGroup.StartAll();
+        }
+
+        /// <summary>
+        /// Shuts down this instance. Creates Close
+        /// <see cref="ConnectionEvent"/> instances for all of the remote
+        /// clients currently connected.
+        /// </summary>
+        public void Shutdown()
+        {
+            if (this._inputListenerInstances == null)
+            {
+                throw new InvalidOperationException(
+                    "Cannot shutdown a server that isn't running.");
+            }
+
+            this._connectionListenerInstance.Shutdown();
+            foreach (InputListener il in this._inputListenerInstances)
+            {
+                il.Shutdown();
+            }
+
+            foreach (OutputSender os in this._outputSenderInstances)
+            {
+                os.Shutdown();
+            }
+
+            this._runningThreadGroup.WaitForAll();
         }
     }
 }

Cineraria.Net.Serve/ConnectionEvent.cs

     public sealed class ConnectionEvent
     {
         /// <summary>
+        /// The <see cref="ConnectionEvent"/> passed to
+        /// <see cref="OutputSender"/> instances to signal they should
+        /// shutdown.
+        /// </summary>
+        internal static readonly ConnectionEvent ShutdownSignal
+            = new ConnectionEvent();
+
+        /// <summary>
         /// The channel used to send connections to the input listener.
         /// </summary>
         private readonly Channel<RemoteClient> _listenChannel;
         }
 
         /// <summary>
+        /// Constructor for ShutdownSignal static field.
+        /// </summary>
+        private ConnectionEvent()
+        {
+        }
+
+        /// <summary>
         /// Gets the remote client that fired this event.
         /// </summary>
         public RemoteClient RemoteClient { get; private set; }

Cineraria.Net.Serve/ConnectionListener.cs

         private readonly Channel<ConnectionEvent> _outputChannel;
 
         /// <summary>
+        /// The <see cref="TcpListener"/> used to listen for new connections.
+        /// </summary>
+        private readonly TcpListener _tcpListener;
+
+        /// <summary>
+        /// A value indicating whether this instance should shutdown.
+        /// </summary>
+        private bool _shutdown = false;
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="ConnectionListener"/>
         /// class.
         /// </summary>
             this._inputParseChannel = inputParseChannel;
             this._inputListenChannel = inputListenChannel;
             this._outputChannel = outputChannel;
+            this._tcpListener = new TcpListener(this._endPoint);
         }
 
         /// <summary>
         /// </summary>
         public void Run()
         {
-            TcpListener server = null;
             try
             {
-                server = new TcpListener(this._endPoint);
-                server.Start();
+                this._tcpListener.Start();
 
                 while (true)
                 {
-                    // AcceptTcpClient blocks for connection
-                    TcpTextConnection connection = new TcpTextConnection(
-                        server.AcceptTcpClient());
+                    TcpTextConnection connection;
+                    try
+                    {
+                        // AcceptTcpClient blocks for connection
+                        connection = new TcpTextConnection(
+                            this._tcpListener.AcceptTcpClient());
+                    }
+                    catch (SocketException)
+                    {
+                        if (this._shutdown)
+                        {
+                            break;
+                        }
+                        else
+                        {
+                            throw;
+                        }
+                    }
 
                     this._inputParseChannel.Put(new ConnectionEvent(
                         new RemoteClient(connection, this._outputChannel),
             }
             finally
             {
-                if (server != null)
-                {
-                    server.Stop();
-                }
+                this._tcpListener.Stop();
             }
         }
+
+        /// <summary>
+        /// Shuts down this instance.
+        /// </summary>
+        public void Shutdown()
+        {
+            this._shutdown = true;
+            this._tcpListener.Stop();
+        }
     }
 }

Cineraria.Net.Serve/InputListener.cs

 {
     using System;
     using System.Collections.Generic;
+    using System.Linq;
     using System.Net.Sockets;
 
     using Cineraria.Concurrency;
         private readonly Dictionary<Socket, RemoteClient> _connections;
 
         /// <summary>
+        /// A value indicating whether this instance should shutdown.
+        /// </summary>
+        private bool _shutdown = false;
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="InputListener"/> class.
         /// </summary>
         /// <param name="inputParseChannel">
         /// </summary>
         public void Run()
         {
-            while (true)
+            while (!this._shutdown)
             {
                 List<Socket> sockets = new List<Socket>(this._connections.Keys);
                 Socket.Select(sockets, null, null, -1);
                     }
                     else
                     {
-                        RemoteClient conn = this._connections[socket];
-                        string message = conn.TcpConnection.Reader.ReadLine();
-                        this._connections.Remove(socket);
-                        if (message == null)
-                        {
-                            conn.TcpConnection.Close();
-
-                            // Send a close notification to the input parser
-                            this._inputParseChannel.Put(new ConnectionEvent(
-                                conn,
-                                null,
-                                null,
-                                EventType.Close));
-                        }
-                        else
-                        {
-                            // Send an open notification to the input parser
-                            this._inputParseChannel.Put(new ConnectionEvent(
-                                conn,
-                                this._inputListenChannel,
-                                message,
-                                EventType.Message));
-                        }
+                        this.ReactToSocket(socket);
                     }
                 }
             }
+
+            this.CloseAllConnections();
+        }
+
+        /// <summary>
+        /// Shuts down a <see cref="InputListener"/> instance listening on the
+        /// same new connection channel as this instance. That instance will
+        /// create Close <see cref="ConnectionEvent"/> instances for all of the
+        /// remote clients it is currently responsible for.
+        /// </summary>
+        public void Shutdown()
+        {
+            this._shutdown = true;
+            this._inputListenChannel.Put(RemoteClient.ShutdownSignal);
         }
 
         /// <summary>
             RemoteClient conn;
             if (this._inputListenChannel.Poll(out conn, 1))
             {
-                this._connections.Add(conn.TcpConnection.Socket, conn);
+                if (conn != RemoteClient.ShutdownSignal)
+                {
+                    this._connections.Add(conn.TcpConnection.Socket, conn);
+                }
             }
         }
+
+        /// <summary>
+        /// Reacts to the specified socket by creating an appropriate
+        /// <see cref="ConnectionEvent"/>.
+        /// </summary>
+        /// <param name="socket">The socket.</param>
+        private void ReactToSocket(Socket socket)
+        {
+            this._connections.Remove(socket);
+            RemoteClient conn = this._connections[socket];
+            string message;
+            try
+            {
+                message = conn.TcpConnection.Reader.ReadLine();
+            }
+            catch (System.IO.IOException e)
+            {
+                if (e.InnerException != null)
+                {
+                    SocketException e1 = e.InnerException as SocketException;
+                    if (e1 != null)
+                    {
+                        switch (e1.SocketErrorCode)
+                        {
+                            case SocketError.ConnectionAborted:
+                            case SocketError.ConnectionReset:
+                                // Remote host shut down connection ungracefully.
+                                conn.TcpConnection.Close();
+                                this.SendCloseEvent(conn);
+                                return;
+                        }
+                    }
+                }
+
+                throw;
+            }
+
+            if (message == null)
+            {
+                conn.TcpConnection.Close();
+
+                // Send a close notification to the input parser
+                this.SendCloseEvent(conn);
+            }
+            else
+            {
+                // Send an open notification to the input parser
+                this._inputParseChannel.Put(new ConnectionEvent(
+                    conn,
+                    this._inputListenChannel,
+                    message,
+                    EventType.Message));
+            }
+        }
+
+        /// <summary>
+        /// Closes all connections this instance is responsible for.
+        /// </summary>
+        private void CloseAllConnections()
+        {
+            // Don't close or send NotifyingSocket
+            foreach (RemoteClient client in this._connections.Values
+                .Where(v => v != null))
+            {
+                client.TcpConnection.Close();
+                this.SendCloseEvent(client);
+            }
+        }
+
+        /// <summary>
+        /// Sends a close event to the parser(s) in regards to the specified
+        /// client.
+        /// </summary>
+        /// <param name="client">
+        /// The client whose connection has closed.
+        /// </param>
+        private void SendCloseEvent(RemoteClient client)
+        {
+            this._inputParseChannel.Put(new ConnectionEvent(
+                    client,
+                    null,
+                    null,
+                    EventType.Close));
+        }
     }
 }

Cineraria.Net.Serve/OutputSender.cs

 
     using Cineraria.Concurrency;
 
+    using ThreadInterruptedException =
+        System.Threading.ThreadInterruptedException;
+
     /// <summary>
     /// A thread that takes <see cref="ConnectionEvent"/> instances from a
     /// channel and sends the attached message over the event's connection.
             while (true)
             {
                 ConnectionEvent msg = this._outputChannel.Take();
-                msg.RemoteClient.TcpConnection.Writer.Write(msg.Message);
+                if (msg == ConnectionEvent.ShutdownSignal)
+                {
+                    break;
+                }
+                else
+                {
+                    msg.RemoteClient.TcpConnection.Writer.Write(msg.Message);
+                }
             }
         }
+
+        /// <summary>
+        /// Shuts down an <see cref="OutputSender"/> instance listening on the
+        /// same output event channel as this instance.
+        /// </summary>
+        public void Shutdown()
+        {
+            this._outputChannel.Put(ConnectionEvent.ShutdownSignal);
+        }
     }
 }

Cineraria.Net.Serve/RemoteClient.cs

         public static readonly string EndLine = "\r\n";
 
         /// <summary>
+        /// The <see cref="RemoteClient"/> passed to <see cref="InputListener"/>
+        /// instances to signal they should shutdown.
+        /// </summary>
+        internal static readonly RemoteClient ShutdownSignal
+            = new RemoteClient();
+
+        /// <summary>
         /// The channel that output events are put on for execution by the
         /// output sender thread(s).
         /// </summary>
         }
 
         /// <summary>
+        /// Constructor for ShutdownSignal static property.
+        /// </summary>
+        private RemoteClient()
+        {
+        }
+
+        /// <summary>
         /// Gets or sets the local data. This object is maintained between
         /// different <see cref="ConnectionEvent"/> instances originating from
         /// the same remote client.