package org.jboss.qa.hornetq; import java.util.HashMap; import java.util.Map; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.log4j.Logger; public class TestConsumer { Logger log = Logger.getLogger(TestProducer.class.getName()); public static void main(String[] args) throws Exception { TestConsumer c = new TestConsumer(); c.startClient(args); } public void startClient(String[] args) throws NamingException, JMSException { if (args.length != 2) { System.out.println("Set parameters \"hostname\" and \"jndi_name_of_the_queue\""); System.exit(1); } System.out.println("Start receiver to hostname: " + args[0] + " and queue: " + args[1]); Map idChecked = new HashMap(); Context context = null; Connection con = null; Session session = null; try { // Properties properties = new Properties(); // properties.setProperty("java.naming.factory.initial", "org.jboss.naming.HttpNamingContextFactory"); // properties.setProperty("java.naming.provider.url", "http://localhost:8080/invoker/JNDIFactory"); // properties.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); Properties properties = new Properties(); properties.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); properties.setProperty("java.naming.provider.url", "jnp://" + args[0] + ":1099"); // properties.setProperty("java.naming.provider.url", "jnp://messaging-20:1099"); properties.setProperty("java.naming.factory.url.pkgs", "org.jnp.interfaces.NamingContextFactory"); context = new InitialContext(properties); ConnectionFactory cf = (ConnectionFactory) (ConnectionFactory) context.lookup("/ConnectionFactory"); con = (Connection) cf.createConnection(); con.start(); session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = (Queue) context.lookup(args[1]); // Queue queue = (Queue) context.lookup("/queue/queueOut"); MessageConsumer consumer = session.createConsumer(queue); long time = System.currentTimeMillis(); int counter = 0; Message message = null; Message lastMessage = null; while ((message = consumer.receive(10000)) != null) { try { message.acknowledge(); counter++; System.out.println("Receiver for hostname: " + args[0] + ", number or received messages:" + counter + ", message id = " + message.getJMSMessageID() + " message count property: " + message.getStringProperty("count") + ", responce for original message: " + message.getStringProperty("inMessageId")); log.info("Received message from node: " + args[0] + ", with messageId: " + message.getJMSMessageID() + " as response for intial message with id: " + message.getStringProperty("inMessageId")); } catch (JMSException ex) { System.out.println("Exception thrown during ack for message: " + message); } lastMessage = message; } if (lastMessage != null) { lastMessage.acknowledge(); } System.out.println("End receiver to hostname: " + args[0] + " and queue: " + args[1] + ", received messages: " + counter); } finally { if (session != null) { session.close(); } if (con != null) { try { con.close(); } catch (JMSException e) { e.printStackTrace(); } } if (context != null) { try { context.close(); } catch (NamingException e) { e.printStackTrace(); } } } } }