/* * JBossMQ, the OpenSource JMS implementation * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.mq.server; import java.util.Enumeration; import javax.jms.Destination; import javax.jms.JMSException; import org.jboss.ha.framework.interfaces.HAPartition; import org.jboss.logging.Logger; import org.jboss.mq.SpyDestination; import org.jboss.mq.SpyMessage; import org.jboss.mq.pm.Tx; /** * This class is a message queue which is stored (hashed by Destination) on the * JMS provider * * @author Marcus Redeker (marcus.redeker@gedoplan.de) * @author Ingo Bruell (ingo.bruell@gedoplan.de) * @version $Revision$ */ public class JMSBridgedTopic extends JMSTopic { private static Logger log = Logger.getLogger(JMSBridgedTopic.class); protected HAPartition partition; protected String serviceName; protected boolean deliverLocal; // Constructor --------------------------------------------------- public JMSBridgedTopic(SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server, BasicQueueParameters parameters, HAPartition partition, String name, boolean deliverLocal) throws JMSException { super(dest, temporary, server, parameters); this.partition = partition; this.serviceName = name; this.deliverLocal = deliverLocal; } public synchronized void addMessage(SpyMessage mes, Tx txId) throws JMSException { // check if message was delivered from another node if (mes instanceof BridgedMessage) { log.trace("deliver message: " + mes.getJMSMessageID()); super.addMessage((SpyMessage) ((BridgedMessage) mes) .getBridgedMessage(), txId); } else { log.trace("bridge message: " + mes.getJMSMessageID()); try { partition.callAsynchMethodOnCluster(serviceName, "giveMessageToLocalTopic", new Object[]{mes}, new Class[]{SpyMessage.class}, true); if (deliverLocal) { super.addMessage(mes, txId); } } catch (Exception e) { JMSException jmse = new JMSException(e.getMessage()); jmse.setLinkedException(e); throw jmse; } } } /** * Message Object to transfer Message between nodes in a cluster, to avoid * message bouncing. * * @author ingo.bruell.ext */ public static class BridgedMessage extends SpyMessage { protected SpyMessage msg; public BridgedMessage(SpyMessage msg) { this.msg = msg; } public SpyMessage getBridgedMessage() { return msg; } // Delegate calls to encapsulated message object public void acknowledge() throws JMSException { msg.acknowledge(); } public boolean getBooleanProperty(String name) throws JMSException { return msg.getBooleanProperty(name); } public byte getByteProperty(String name) throws JMSException { return msg.getByteProperty(name); } public double getDoubleProperty(String name) throws JMSException { return msg.getDoubleProperty(name); } public float getFloatProperty(String name) throws JMSException { return msg.getFloatProperty(name); } public int getIntProperty(String name) throws JMSException { return msg.getIntProperty(name); } public Destination getJMSDestination() { return msg.getJMSDestination(); } public long getJMSExpiration() { return msg.getJMSExpiration(); } public String getJMSMessageID() { return msg.getJMSMessageID(); } public int getJMSPriority() { return msg.getJMSPriority(); } public boolean getJMSRedelivered() { return msg.getJMSRedelivered(); } public Destination getJMSReplyTo() { return msg.getJMSReplyTo(); } public long getJMSTimestamp() { return msg.getJMSTimestamp(); } public String getJMSType() { return msg.getJMSType(); } public long getLongProperty(String name) throws JMSException { return msg.getLongProperty(name); } public Object getObjectProperty(String name) throws JMSException { return msg.getObjectProperty(name); } public Enumeration getPropertyNames() throws JMSException { return msg.getPropertyNames(); } public short getShortProperty(String name) throws JMSException { return msg.getShortProperty(name); } public String getStringProperty(String name) throws JMSException { return msg.getStringProperty(name); } public boolean propertyExists(String name) throws JMSException { return msg.propertyExists(name); } public void setJMSCorrelationID(String correlationID) throws JMSException { msg.setJMSCorrelationID(correlationID); } public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException { msg.setJMSCorrelationIDAsBytes(correlationID); } public void setJMSDeliveryMode(int deliveryMode) throws JMSException { msg.setJMSDeliveryMode(deliveryMode); } public void setJMSDestination(Destination dest) throws JMSException { msg.setJMSDestination(dest); } public void setJMSExpiration(long expiration) throws JMSException { msg.setJMSExpiration(expiration); } public void setJMSMessageID(String messageID) throws JMSException { msg.setJMSMessageID(messageID); } public void setJMSPriority(int priority) throws JMSException { msg.setJMSPriority(priority); } public void setJMSRedelivered(boolean redelivered) throws JMSException { msg.setJMSRedelivered(redelivered); } public void setJMSTimestamp(long timestamp) throws JMSException { msg.setJMSTimestamp(timestamp); } public void setJMSType(String type) throws JMSException { msg.setJMSType(type); } } }