Commits

Anonymous committed bccebbc

add queueLenth limits to the event listner queues

Conflicts:

opennms-services/src/main/java/org/opennms/netmgt/eventd/EventIpcManagerDefaultImpl.java

Comments (0)

Files changed (1)

opennms-services/src/main/java/org/opennms/netmgt/eventd/EventIpcManagerDefaultImpl.java

 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
         /**
          * Constructor
          */
-        EventListenerExecutor(EventListener listener) {
+        EventListenerExecutor(EventListener listener, Integer handlerQueueLength) {
             m_listener = listener;
             // You could also do Executors.newSingleThreadExecutor() here
-            m_delegateThread = Executors.newFixedThreadPool(1);
+            m_delegateThread = new ThreadPoolExecutor(
+                    1,
+                    1,
+                    0L,
+                    TimeUnit.MILLISECONDS,
+                    handlerQueueLength == null ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(handlerQueueLength),
+                    new ThreadFactory() {
+						
+						@Override
+						public Thread newThread(Runnable r) {
+							return new Thread(r, m_listener.getName()+"-Thread");
+						}
+					},
+					new RejectedExecutionHandler() {
+						
+						@Override
+						public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+							log().warn("Listener " + m_listener.getName() + "'s event queue is full discarding event");
+						}
+					}
+                );
         }
 
         public void addEvent(final Event event) {
             return;
         }
         
-        EventListenerExecutor listenerThread = new EventListenerExecutor(listener);
+        EventListenerExecutor listenerThread = new EventListenerExecutor(listener, m_handlerQueueLength);
         m_listenerThreads.put(listener.getName(), listenerThread);
     }
 
         Assert.state(m_eventHandler != null, "eventHandler not set");
         Assert.state(m_handlerPoolSize != null, "handlerPoolSize not set");
 
-        /**
-         * Create a fixed-size thread pool. The number of threads can be configured by using
-         * the "receivers" attribute in the config. The queue length for the pool can be configured
-         * with the "queueLength" attribute in the config.
-         */
-        m_eventHandlerPool = new ThreadPoolExecutor(
-            m_handlerPoolSize,
-            m_handlerPoolSize,
-            0L,
-            TimeUnit.MILLISECONDS,
-            m_handlerQueueLength == null ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(m_handlerQueueLength)
-        );
+        final String prefix = ThreadCategory.getPrefix();
+        try {
+            
+            ThreadCategory.setPrefix(Eventd.LOG4J_CATEGORY);
+
+            /**
+             * Create a fixed-size thread pool. The number of threads can be configured by using
+             * the "receivers" attribute in the config. The queue length for the pool can be configured
+             * with the "queueLength" attribute in the config.
+             */
+            m_eventHandlerPool = new ThreadPoolExecutor(
+                m_handlerPoolSize,
+                m_handlerPoolSize,
+                0L,
+                TimeUnit.MILLISECONDS,
+                m_handlerQueueLength == null ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(m_handlerQueueLength),
+                new ThreadFactory() {
+                	int created = 0;
+					@Override
+					public Thread newThread(Runnable r) {
+						String name = String.format("%sThread-%d-of-%d", EventIpcManagerDefaultImpl.this.getClass().getSimpleName(), ++created, m_handlerPoolSize);
+						return new Thread(r, name);
+					}
+                	
+                }
+            );
+        } finally {
+            ThreadCategory.setPrefix(prefix);
+        }
 
         // If the proxy is set, make this class its delegate.
         if (m_eventIpcManagerProxy != null) {