/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.mq.server.jmx; import java.util.Set; import javax.jms.Destination; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.management.ObjectInstance; import javax.management.Query; import javax.management.QueryExp; import org.jboss.ha.framework.interfaces.HAPartition; import org.jboss.ha.framework.server.ClusterPartition; import org.jboss.ha.framework.server.ClusterPartitionMBean; import org.jboss.mq.MessageStatistics; import org.jboss.mq.SpyMessage; import org.jboss.mq.SpyTopic; import org.jboss.mq.server.JMSBridgedTopic; import org.jboss.mq.server.JMSDestinationManager; import org.jboss.mq.server.MessageCounter; import org.jboss.mq.server.JMSBridgedTopic.BridgedMessage; import org.jboss.mx.util.MBeanProxy; /** * Bridge messages to all nodes in a cluster in the same topic * * @author Marcus Redeker (marcus.redeker@gedoplan.de) * @author Ingo Bruell (ingo.bruell@gedoplan.de) * @version $Revision$ */ public class BridgedTopic extends DestinationMBeanSupport implements BridgedTopicMBean { protected JMSBridgedTopic destination; protected String partitionName; protected boolean deliverLocal = true; protected HAPartition partition; // ====== Attributes ====================================0 /** * Name of the cluster partition */ public String getPartitionName() { return partitionName; } public void setPartitionName(final String partitionName) { this.partitionName = partitionName; } /** * defines that the message should be locally delivered, also */ public boolean getDeliverLocal() { return deliverLocal; } public void setDeliverLocal(boolean deliverLocal) { this.deliverLocal = deliverLocal; } // ====== Management ====================================0 public void createService() throws Exception { boolean debug = log.isDebugEnabled(); if (debug) log.debug("Initializing BridgedTopic on partition: " + partitionName); partition = findHAPartitionWithName(partitionName); } public void startService(HAPartition haPartition) throws Exception { this.partition = haPartition; this.startService(); } public void startService() throws Exception { if (destinationName == null || destinationName.length() == 0) { throw new IllegalStateException("TopicName was not set"); } JMSDestinationManager jmsServer = (JMSDestinationManager) server .getAttribute(jbossMQService, "Interceptor"); // create the topic spyDest = new SpyTopic(destinationName); // create the distributed destination destination = new JMSBridgedTopic((SpyTopic) spyDest, null, jmsServer, parameters, partition, "BridgedTopic", deliverLocal); jmsServer.addDestination(destination); // set JNDI name for this MBean if (jndiName == null) { setJNDIName("topic/" + destinationName); } else { // in config phase, we only stored the name, and didn't actually bind it setJNDIName(jndiName); } partition.registerRPCHandler("BridgedTopic", this); super.startService(); } // ====== Services ====================================0 /** * forward message to local topic * @param mes * @throws JMSException */ public synchronized void giveMessageToLocalTopic(SpyMessage mes) throws JMSException { log.trace("transfered message: " + mes.getJMSMessageID()); try { Destination oLocalDestination = new SpyTopic(destinationName); mes.setJMSDestination(oLocalDestination); JMSDestinationManager jmsServer = (JMSDestinationManager) server .getAttribute(jbossMQService, "Interceptor"); SpyMessage oMsg = new BridgedMessage(mes); jmsServer.addMessage(null, oMsg); } catch (Exception e) { JMSException jmse = new JMSException(e.getMessage()); jmse.setLinkedException(e); throw jmse; } } /** * detect given partition * * @param name * @return * @throws Exception */ protected HAPartition findHAPartitionWithName(String name) throws Exception { HAPartition result = null; QueryExp exp = Query.and(Query.eq(Query.classattr(), Query .value(ClusterPartition.class.getName())), Query.match(Query .attr("PartitionName"), Query.value(name))); Set mbeans = this.getServer().queryMBeans(null, exp); if (mbeans != null && mbeans.size() > 0) { ObjectInstance inst = (ObjectInstance) (mbeans.iterator().next()); ClusterPartitionMBean cp = (ClusterPartitionMBean) MBeanProxy.get( ClusterPartitionMBean.class, inst.getObjectName(), this .getServer()); result = cp.getHAPartition(); } return result; } /** * removes all messages from destination */ public void removeAllMessages() throws Exception { destination.removeAllMessages(); } /** * get message count */ public MessageCounter[] getMessageCounter() { return destination.getMessageCounter(); } /** * get statistics */ public MessageStatistics[] getMessageStatistics() throws Exception { // TODO Auto-generated method stub return null; } }