Commits

Arul Dhesiaseelan  committed fa24a18 Draft

JMS Trigger plugin code cleanup. Added standalone JUnit test that uses JmsAction to publish and JmsTrigger to consume messages.

  • Participants
  • Parent commits f4e8029

Comments (0)

Files changed (9)

File jms-trigger/src/main/java/flux/JmsTriggerFactory.java

 /**
  * Factory for creating JMS Trigger.
  *
- * @author arul@flux.ly.com
+ * @author arul@flux.ly
  */
 public class JmsTriggerFactory implements AdapterFactory {
 

File jms-trigger/src/main/java/fluximpl/JmsTriggerImpl.java

 
 /**
  * JmsTrigger which wait's for and consumes messages from a queue.
+ *
+ * @author Copyright 2012 Flux Corporation. All rights reserved.
  */
 public class JmsTriggerImpl extends TriggerImpl implements JmsTrigger {
 
   }
 
   public String getPassword() {
-    return getVariable().password;
+    Password password = getVariable().password;
+    if (password != null) {
+      return password.getEncryptedPassword();
+    }
+    return null;
   }
 
   public void setPassword(String password) {
     JmsTriggerVariable var = getVariable();
-    var.password = password;
+    if (password != null) {
+      var.password = Password.makePassword(password);
+    }
     putVariable(var);
   }
 

File jms-trigger/src/main/java/fluximpl/JmsTriggerImplBeanInfo.java

 import java.util.Vector;
 
 /**
- * BeanInfo for the web designer
+ * BeanInfo for the web and desktop designer.
+ *
+ * @author Copyright 2012 Flux Corporation. All rights reserved.
  */
 public class JmsTriggerImplBeanInfo extends ActionImplBeanInfo {
 
   protected BeanDescriptor bd = new BeanDescriptor(JmsTriggerImpl.class);
 
-
   public JmsTriggerImplBeanInfo(final BeanDescriptor bd) {
     super();
     this.bd = bd;

File jms-trigger/src/main/java/fluximpl/JmsTriggerVariable.java

   public Properties extraInitialContextProperties = new Properties();
   public String providerUrl;
   public String username;
-  public String password;
+  public Password password;
   public String pollingDelay = "+5s";
   public String messageSelector;
 }

File jms-trigger/src/main/java/fluximpl/TextMessageImpl.java

 /**
  * Minimalistic POJO that contains body of the JMS text message, correlation id, message id.
  * Just add what you need for persistence. No bloating flow context.
+ *
+ * @author Copyright 2012 Flux Corporation. All rights reserved.
  */
 public class TextMessageImpl implements TextMessage, Serializable, Cloneable {
 

File jms-trigger/src/main/resources/JmsTriggerImpl_big.png

Added
New image

File jms-trigger/src/main/resources/JmsTriggerImpl_small.png

Added
New image

File jms-trigger/src/test/java/test/ActiveMQBroker.java

 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
+import javax.jms.Connection;
+import javax.jms.Session;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Properties;
 
 /**
  * @author Copyright 2012 Flux Corporation. All rights reserved.
  */
-public final class ActiveMQBroker {
+public abstract class ActiveMQBroker {
 
-  public static BrokerService startBroker() throws Exception {
-    final BrokerService broker = new BrokerService();
+  private final static String vmProviderUrl = "vm://localhost";
+  private final static String tcpProviderUrl = "tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000";
+  private BrokerService broker;
 
+  protected BrokerService startBroker() throws Exception {
+    broker = new BrokerService();
     broker.setUseJmx(false);
     try {
-//      broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000");
-            broker.addConnector("vm://localhost");
+      broker.addConnector(vmProviderUrl);
     } catch (Exception e) {
       e.printStackTrace();
     }
 
     URI uri = null;
     try {
-//      uri = new URI("tcp://localhost:61616");
-      uri = new URI("vm://localhost");
+      uri = new URI(vmProviderUrl);
     } catch (URISyntaxException e) {
       e.printStackTrace();
     }
     connection.start();
     System.out.println("Broker started!");
     return broker;
-
   }
 
-  public static void publishTestMessages(int numMsgs, String destinationName, String correlationId) {
-    Context jndiContext = null;
-    ConnectionFactory connectionFactory = null;
-    Connection connection = null;
-    Session session = null;
-    Destination destination = null;
-    MessageProducer producer = null;
-    /*
-    * Create a JNDI API InitialContext object
-    */
-    Properties props = new Properties();
-    props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
-//    props.setProperty(Context.PROVIDER_URL, "tcp://localhost:61616");
-    props.setProperty(Context.PROVIDER_URL, "vm://localhost");
-
-    try {
-      jndiContext = new InitialContext(props);
-    } catch (NamingException e) {
-      System.out.println("Could not create JNDI API context: " + e.toString());
-    }
-
-    /*
-    * Look up connection factory and destination.
-    */
-    try {
-      connectionFactory = (ConnectionFactory) jndiContext.lookup("queueConnectionFactory");
-      destination = (Destination) jndiContext.lookup(destinationName);
-    } catch (NamingException e) {
-      System.out.println("JNDI API lookup failed: " + e);
-    }
-
-    /*
-    * Create connection. Create session from connection; false means
-    * session is not transacted. Create sender and text message. Send
-    * messages, varying text slightly. Send end-of-messages message.
-    * Finally, close connection.
-    */
-    try {
-      connection = connectionFactory.createConnection();
-      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      producer = session.createProducer(destination);
-      TextMessage message = session.createTextMessage();
-      for (int i = 0; i < numMsgs; i++) {
-        message.setText("This is message " + (i + 1));
-        if (correlationId != null)
-          message.setJMSCorrelationID(correlationId);
-        message.setStringProperty("key1", "val1");
-        message.setStringProperty("key2", "val2");
-        message.setStringProperty("key3", "val2");
-        System.out.println("Sending message: " + message.getText());
-        producer.send(message);
-      }
-
-    } catch (JMSException e) {
-      System.out.println("Exception occurred: " + e);
-    } finally {
-      if (connection != null) {
-        try {
-          connection.close();
-        } catch (JMSException e) {
-        }
-      }
-    }
-
+  protected void stopBroker() throws Exception {
+    broker.stop();
   }
 }

File jms-trigger/src/test/java/test/JmsTriggerTest.java

 package test;
 
 import flux.*;
+import flux.j2ee.JmsAction;
+import flux.j2ee.JmsMessageType;
 import fluximpl.TextMessageImpl;
-import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static junit.framework.Assert.assertEquals;
 
 /**
- * @author Copyright 2000-2012 Flux Corporation. All rights reserved.
+ * @author Copyright 2012 Flux Corporation. All rights reserved.
  */
-public class JmsTriggerTest {
+public class JmsTriggerTest extends ActiveMQBroker {
 
   private static final String queue = "example.MyQueue";
   private static final String jndiName = "MyQueue";
+  private static final String activeMQCF = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
+  private static final String connectionFactory = "queueConnectionFactory";
+  private static final String vmProviderUrl = "vm://localhost";
+  private static final String tcpProviderUrl = "tcp://localhost:61616";
 
+  private Engine engine;
+  Factory factory = Factory.makeInstance();
 
-  public static void main(String[] args) throws Exception {
-    BrokerService broker = ActiveMQBroker.startBroker();
-    ActiveMQBroker.publishTestMessages(1, jndiName, "1234");
+  static Map<String, String> variablesMap = new HashMap<String, String>();
 
-    Factory factory = Factory.makeInstance();
+  @Before
+  public void setup() throws Exception {
+    startBroker();
     Configuration config = factory.makeConfiguration();
     config.setDatabaseType(DatabaseType.H2);
+    engine = factory.makeEngine(config);
+    engine.start();
+    variablesMap.put("key1", "value1");
+    variablesMap.put("key2", "value2");
+    variablesMap.put("key3", "value3");
+  }
 
-    Engine engine = factory.makeEngine(config);
-    engine.start();
+  @Test
+  public void produceConsumerTest() throws Exception {
 
     EngineHelper helper = factory.makeEngineHelper();
     FlowChart workflow = helper.makeFlowChart("jms trigger");
 
+    JmsAction publisher = workflow.makeJ2eeFactory().makeJmsAction("publisher");
+    publisher.setConnectionFactory(connectionFactory);
+    publisher.setMessageType(JmsMessageType.TEXT);
+    publisher.setBody("This goes in body");
+    publisher.setDestinationName(jndiName);
+    publisher.setCorrelationId("1234");
+    publisher.setProviderUrl(vmProviderUrl);
+    publisher.setPostscript("System.out.println(\"Message published!\");");
+    publisher.setProperties(variablesMap);
+
     JmsTriggerFactory customActionFactory = (JmsTriggerFactory) workflow.makeFactory("JmsTriggerFactory");
+    JmsTrigger consumer = customActionFactory.makeJmsTrigger("consumer");
+    consumer.setListeningQueueName(queue);
+    consumer.setInitialContextFactory(activeMQCF);
+    consumer.setProviderUrl(vmProviderUrl);
+    consumer.setFetchAllMessages(true);
+    consumer.setConnectionFactory(connectionFactory);
+    consumer.setPollingDelay("+5s");
 
-    JmsTrigger trigger = customActionFactory.makeJmsTrigger("trigger");
-    trigger.setListeningQueueName(queue);
-    trigger.setInitialContextFactory("org.apache.activemq.jndi.ActiveMQInitialContextFactory");
-    trigger.setProviderUrl("vm://localhost");
-//    trigger.setProviderUrl("tcp://localhost:61616");
-    trigger.setFetchAllMessages(true);
-    trigger.setConnectionFactory("queueConnectionFactory");
-    trigger.setPollingDelay("+5s");
+    publisher.addFlow(consumer);
 
     JavaAction fetcher = workflow.makeJavaAction("message fetcher");
     fetcher.setListener(MessageFetcher.class);
     fetcher.setTransactionBreak(true);
 
-    trigger.addFlow(fetcher);
+    consumer.addFlow(fetcher);
 
     JavaAction processor = workflow.makeJavaAction("message processor");
     processor.setListener(MessageProcessor.class);
     processor.addFlow(console);
 
     String name = engine.put(workflow);
-    engine.join(name, "+45s", "+5s");
+    engine.join(name, "+60s", "+5s");
+  }
 
+  @After
+  public void teardown() throws Exception {
     engine.dispose();
-    broker.stop();
+    stopBroker();
   }
 
   public static class MessageFetcher implements ActionListener {
     public Object actionFired(KeyFlowContext flowContext) throws Exception {
       JmsTrigger.JmsTriggerResult result = (JmsTrigger.JmsTriggerResult) flowContext.get("jms");
       if (result != null) {
-        System.out.println("Processing messages # " + result.getMessages().size());
+        System.out.println("Processing messages at " + result.getMessages().size());
         TextMessageImpl message = (TextMessageImpl) result.getMessages().get(0);
-        System.out.println("Received message: " + message.getText());
+        assertEquals("This goes in body", message.getText());
+        Iterable<String> itr = Enumerator.make(message.getPropertyNames());
+        for (String key : itr) {
+          System.out.println("K = " + key + "; V = " + message.getStringProperty(key));
+          assertEquals(variablesMap.get(key), message.getStringProperty(key));
+        }
       }
       return null;
     }
   }
 
+  public static class Enumerator<T> implements Iterable<T> {
+    private final Enumeration<T> en;
+
+    public Enumerator(Enumeration<T> en) {
+      this.en = en;
+    }
+
+    public Iterator<T> iterator() {
+      return new Iterator<T>() {
+        public boolean hasNext() {
+          return en.hasMoreElements();
+        }
+
+        public T next() {
+          return en.nextElement();
+        }
+
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    }
+
+    public static <T> Iterable<T> make(Enumeration<T> en) {
+      return new Enumerator<T>(en);
+    }
+  }
+
 }