/* * Copyright 2001, 2002,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.axis.transport.jms; import org.apache.axis.components.jms.JMSVendorAdapter; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TemporaryTopic; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import java.util.HashMap; /** * TopicConnector is a concrete JMSConnector subclass that specifically handles * connections to topics (pub-sub domain). * * @author Jaime Meritt (jmeritt@sonicsoftware.com) * @author Richard Chung (rchung@sonicsoftware.com) * @author Dave Chappell (chappell@sonicsoftware.com) */ public class TopicConnector extends JMSConnector { public TopicConnector(TopicConnectionFactory factory, int numRetries, int numSessions, long connectRetryInterval, long interactRetryInterval, long timeoutTime, boolean allowReceive, String clientID, String username, String password, JMSVendorAdapter adapter, JMSURLHelper jmsurl) throws JMSException { super(factory, numRetries, numSessions, connectRetryInterval, interactRetryInterval, timeoutTime, allowReceive, clientID, username, password, adapter, jmsurl); } protected Connection internalConnect(ConnectionFactory connectionFactory, String username, String password) throws JMSException { TopicConnectionFactory tcf = (TopicConnectionFactory)connectionFactory; if(username == null) return tcf.createTopicConnection(); return tcf.createTopicConnection(username, password); } protected SyncConnection createSyncConnection(ConnectionFactory factory, Connection connection, int numSessions, String threadName, String clientID, String username, String password) throws JMSException { return new TopicSyncConnection((TopicConnectionFactory)factory, (TopicConnection)connection, numSessions, threadName, clientID, username, password); } protected AsyncConnection createAsyncConnection(ConnectionFactory factory, Connection connection, String threadName, String clientID, String username, String password) throws JMSException { return new TopicAsyncConnection((TopicConnectionFactory)factory, (TopicConnection)connection, threadName, clientID, username, password); } public JMSEndpoint createEndpoint(String destination) { return new TopicEndpoint(destination); } /** * Create an endpoint for a queue destination. * * @param destination * @return * @throws JMSException */ public JMSEndpoint createEndpoint(Destination destination) throws JMSException { if(!(destination instanceof Topic)) throw new IllegalArgumentException("The input be a topic for this connector"); return new TopicDestinationEndpoint((Topic)destination); } private TopicSession createTopicSession(TopicConnection connection, int ackMode) throws JMSException { return connection.createTopicSession(false, ackMode); } private Topic createTopic(TopicSession session, String subject) throws Exception { return m_adapter.getTopic(session, subject); } private TopicSubscriber createSubscriber(TopicSession session, TopicSubscription subscription) throws Exception { if(subscription.isDurable()) return createDurableSubscriber(session, (Topic)subscription.m_endpoint.getDestination(session), subscription.m_subscriptionName, subscription.m_messageSelector, subscription.m_noLocal); else return createSubscriber(session, (Topic)subscription.m_endpoint.getDestination(session), subscription.m_messageSelector, subscription.m_noLocal); } private TopicSubscriber createDurableSubscriber(TopicSession session, Topic topic, String subscriptionName, String messageSelector, boolean noLocal) throws JMSException { return session.createDurableSubscriber(topic, subscriptionName, messageSelector, noLocal); } private TopicSubscriber createSubscriber(TopicSession session, Topic topic, String messageSelector, boolean noLocal) throws JMSException { return session.createSubscriber(topic, messageSelector, noLocal); } private final class TopicAsyncConnection extends AsyncConnection { TopicAsyncConnection(TopicConnectionFactory connectionFactory, TopicConnection connection, String threadName, String clientID, String username, String password) throws JMSException { super(connectionFactory, connection, threadName, clientID, username, password); } protected ListenerSession createListenerSession(javax.jms.Connection connection, Subscription subscription) throws Exception { TopicSession session = createTopicSession((TopicConnection)connection, subscription.m_ackMode); TopicSubscriber subscriber = createSubscriber(session, (TopicSubscription)subscription); return new TopicListenerSession(session, subscriber, (TopicSubscription)subscription); } private final class TopicListenerSession extends ListenerSession { TopicListenerSession(TopicSession session, TopicSubscriber subscriber, TopicSubscription subscription) throws Exception { super(session, subscriber, subscription); } void cleanup() { try{m_consumer.close();}catch(Exception ignore){} try { TopicSubscription sub = (TopicSubscription)m_subscription; if(sub.isDurable() && sub.m_unsubscribe) { ((TopicSession)m_session).unsubscribe(sub.m_subscriptionName); } } catch(Exception ignore){} try{m_session.close();}catch(Exception ignore){} } } } private final class TopicSyncConnection extends SyncConnection { TopicSyncConnection(TopicConnectionFactory connectionFactory, TopicConnection connection, int numSessions, String threadName, String clientID, String username, String password) throws JMSException { super(connectionFactory, connection, numSessions, threadName, clientID, username, password); } protected SendSession createSendSession(javax.jms.Connection connection) throws JMSException { TopicSession session = createTopicSession((TopicConnection)connection, JMSConstants.DEFAULT_ACKNOWLEDGE_MODE); TopicPublisher publisher = session.createPublisher(null); return new TopicSendSession(session, publisher); } private final class TopicSendSession extends SendSession { TopicSendSession(TopicSession session, TopicPublisher publisher) throws JMSException { super(session, publisher); } protected MessageConsumer createConsumer(Destination destination) throws JMSException { return createSubscriber((TopicSession)m_session, (Topic)destination, null, JMSConstants.DEFAULT_NO_LOCAL); } protected void deleteTemporaryDestination(Destination destination) throws JMSException { ((TemporaryTopic)destination).delete(); } protected Destination createTemporaryDestination() throws JMSException { return ((TopicSession)m_session).createTemporaryTopic(); } protected void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { ((TopicPublisher)m_producer).publish((Topic)destination, message, deliveryMode, priority, timeToLive); } } } private class TopicEndpoint extends JMSEndpoint { String m_topicName; TopicEndpoint(String topicName) { super(TopicConnector.this); m_topicName = topicName; } Destination getDestination(Session session) throws Exception { return createTopic((TopicSession)session, m_topicName); } protected Subscription createSubscription(MessageListener listener, HashMap properties) { return new TopicSubscription(listener, this, properties); } public String toString() { StringBuffer buffer = new StringBuffer("TopicEndpoint:"); buffer.append(m_topicName); return buffer.toString(); } public boolean equals(Object object) { if(!super.equals(object)) return false; if(!(object instanceof TopicEndpoint)) return false; return m_topicName.equals(((TopicEndpoint)object).m_topicName); } } private final class TopicSubscription extends Subscription { String m_subscriptionName; boolean m_unsubscribe; boolean m_noLocal; TopicSubscription(MessageListener listener, JMSEndpoint endpoint, HashMap properties) { super(listener, endpoint, properties); m_subscriptionName = MapUtils.removeStringProperty(properties, JMSConstants.SUBSCRIPTION_NAME, null); m_unsubscribe = MapUtils.removeBooleanProperty(properties, JMSConstants.UNSUBSCRIBE, JMSConstants.DEFAULT_UNSUBSCRIBE); m_noLocal = MapUtils.removeBooleanProperty(properties, JMSConstants.NO_LOCAL, JMSConstants.DEFAULT_NO_LOCAL); } boolean isDurable() { return m_subscriptionName != null; } public boolean equals(Object obj) { if(!super.equals(obj)) return false; if(!(obj instanceof TopicSubscription)) return false; TopicSubscription other = (TopicSubscription)obj; if(other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal) return false; if(isDurable()) { return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName); } else if(other.isDurable()) return false; else return true; } public String toString() { StringBuffer buffer = new StringBuffer(super.toString()); buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe); if(isDurable()) { buffer.append(":"); buffer.append(m_subscriptionName); } return buffer.toString(); } } private final class TopicDestinationEndpoint extends TopicEndpoint { Topic m_topic; TopicDestinationEndpoint(Topic topic) throws JMSException { super(topic.getTopicName()); m_topic = topic; } Destination getDestination(Session session) { return m_topic; } } }