package org.jboss.qa.hornetq; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.log4j.Logger; public class TestProducer { Logger log = Logger.getLogger(TestProducer.class.getName()); private static Message createMessage(Session session, int counter) throws Exception { TextMessage message = session.createTextMessage("This is text message " + counter); message.setIntProperty("count", counter); message.setText(String.valueOf("mnovak: " + counter)); // BytesMessage message = session.createBytesMessage(); // message.writeBytes(new byte[100]); return message; } private int numberOfRetries = 0; private static int maxRetries = 10; public static void main(String[] args) { TestProducer testProducer = new TestProducer(); testProducer.startClient(args); } public void startClient(String[] args) { for (int i = 0; i < args.length; i++) { System.out.println("print "+ i + "th argument: " + args[i]); } if (args.length != 2) { System.out.println("Set parameters \"hostname\" and \"jndi_name_of_the_queue\""); System.exit(1); } final int MESSAGES_COUNT = 10000; Context context = null; Connection con = null; Session session = null; try { Properties properties = new Properties(); properties.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); properties.setProperty("java.naming.provider.url", "jnp://" + args[0] + ":1099"); properties.setProperty("java.naming.factory.url.pkgs", "org.jnp.interfaces.NamingContextFactory"); context = new InitialContext(properties); ConnectionFactory cf = (ConnectionFactory) context.lookup("/ConnectionFactory"); con = cf.createConnection(); session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Queue queue = (Queue) context.lookup("/queue/InQueue"); Queue queue = (Queue) context.lookup(args[1]); MessageProducer producer = session.createProducer(queue); long time = System.currentTimeMillis(); int i = 1; Message msg = null; while (i <= MESSAGES_COUNT) { msg = createMessage(session, i); try { producer.send(msg); System.out.println("Producer for node: " + args[0] + ". Sent message with property count: " + i + ", messageId:" + msg.getJMSMessageID()); log.info("Sent: " + msg.getJMSMessageID()); } catch (Exception ex) { ex.printStackTrace(); resendMessage(producer, msg, i); } i++; } producer.close(); System.out.println("It took " + (System.currentTimeMillis() - time) + " ms"); } catch (Exception e) { e.printStackTrace(); } finally { if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (con != null) { try { con.close(); } catch (JMSException e) { e.printStackTrace(); } } if (context != null) { try { context.close(); } catch (NamingException e) { e.printStackTrace(); } } } } private void resendMessage(MessageProducer producer, Message msg, int i) { try { if (numberOfRetries > maxRetries) { try { throw new Exception("Number of retries (" + numberOfRetries + ") is greater than limit (" + maxRetries + ")."); } catch (Exception ex) { ex.printStackTrace(); } } Thread.sleep(3000); numberOfRetries++; producer.send(msg); System.out.println("Sender RETRY (" + numberOfRetries + ") Sent message: " + i); } catch (JMSException ex) { ex.printStackTrace(); } catch (InterruptedException ex) { ex.printStackTrace(); } } }