/* * Copyright 2001, 2002,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.axis.transport.jms; import org.apache.axis.components.jms.JMSVendorAdapter; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TemporaryQueue; /** * QueueConnector is a concrete JMSConnector subclass that specifically handles * connections to queues (ptp domain). * * @author Jaime Meritt (jmeritt@sonicsoftware.com) * @author Richard Chung (rchung@sonicsoftware.com) * @author Dave Chappell (chappell@sonicsoftware.com) */ public class QueueConnector extends JMSConnector { public QueueConnector(ConnectionFactory factory, int numRetries, int numSessions, long connectRetryInterval, long interactRetryInterval, long timeoutTime, boolean allowReceive, String clientID, String username, String password, JMSVendorAdapter adapter, JMSURLHelper jmsurl) throws JMSException { super(factory, numRetries, numSessions, connectRetryInterval, interactRetryInterval, timeoutTime, allowReceive, clientID, username, password, adapter, jmsurl); } public JMSEndpoint createEndpoint(String destination) { return new QueueEndpoint(destination); } /** * Create an endpoint for a queue destination. * * @param destination * @return * @throws JMSException */ public JMSEndpoint createEndpoint(Destination destination) throws JMSException { if(!(destination instanceof Queue)) throw new IllegalArgumentException("The input must be a queue for this connector"); return new QueueDestinationEndpoint((Queue)destination); } protected Connection internalConnect(ConnectionFactory connectionFactory, String username, String password) throws JMSException { QueueConnectionFactory qcf = (QueueConnectionFactory)connectionFactory; if(username == null) return qcf.createQueueConnection(); return qcf.createQueueConnection(username, password); } protected SyncConnection createSyncConnection(ConnectionFactory factory, Connection connection, int numSessions, String threadName, String clientID, String username, String password) throws JMSException { return new QueueSyncConnection((QueueConnectionFactory)factory, (QueueConnection)connection, numSessions, threadName, clientID, username, password); } private QueueSession createQueueSession(QueueConnection connection, int ackMode) throws JMSException { return connection.createQueueSession(false, ackMode); } private Queue createQueue(QueueSession session, String subject) throws Exception { return m_adapter.getQueue(session, subject); } private QueueReceiver createReceiver(QueueSession session, Queue queue, String messageSelector) throws JMSException { return session.createReceiver(queue, messageSelector); } private final class QueueSyncConnection extends SyncConnection { QueueSyncConnection(QueueConnectionFactory connectionFactory, QueueConnection connection, int numSessions, String threadName, String clientID, String username, String password) throws JMSException { super(connectionFactory, connection, numSessions, threadName, clientID, username, password); } protected SendSession createSendSession(javax.jms.Connection connection) throws JMSException { QueueSession session = createQueueSession((QueueConnection)connection, JMSConstants.DEFAULT_ACKNOWLEDGE_MODE); QueueSender sender = session.createSender(null); return new QueueSendSession(session, sender); } private final class QueueSendSession extends SendSession { QueueSendSession(QueueSession session, QueueSender sender) throws JMSException { super(session, sender); } protected MessageConsumer createConsumer(Destination destination) throws JMSException { return createReceiver((QueueSession)m_session, (Queue)destination, null); } protected Destination createTemporaryDestination() throws JMSException { return ((QueueSession)m_session).createTemporaryQueue(); } protected void deleteTemporaryDestination(Destination destination) throws JMSException { ((TemporaryQueue)destination).delete(); } protected void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { ((QueueSender)m_producer).send((Queue)destination, message, deliveryMode, priority, timeToLive); } } } private class QueueEndpoint extends JMSEndpoint { String m_queueName; QueueEndpoint(String queueName) { super(QueueConnector.this); m_queueName = queueName; } Destination getDestination(Session session) throws Exception { return createQueue((QueueSession)session, m_queueName); } public String toString() { StringBuffer buffer = new StringBuffer("QueueEndpoint:"); buffer.append(m_queueName); return buffer.toString(); } public boolean equals(Object object) { if(!super.equals(object)) return false; if(!(object instanceof QueueEndpoint)) return false; return m_queueName.equals(((QueueEndpoint)object).m_queueName); } } private final class QueueDestinationEndpoint extends QueueEndpoint { Queue m_queue; QueueDestinationEndpoint(Queue queue) throws JMSException { super(queue.getQueueName()); m_queue = queue; } Destination getDestination(Session session) { return m_queue; } } protected AsyncConnection createAsyncConnection(ConnectionFactory factory, Connection connection, String threadName, String clientID, String username, String password) throws JMSException { return new QueueAsyncConnection((QueueConnectionFactory)factory, (QueueConnection)connection, threadName, clientID, username, password); } private final class QueueAsyncConnection extends AsyncConnection { QueueAsyncConnection(QueueConnectionFactory connectionFactory, QueueConnection connection, String threadName, String clientID, String username, String password) throws JMSException { super(connectionFactory, connection, threadName, clientID, username, password); } protected ListenerSession createListenerSession(javax.jms.Connection connection, Subscription subscription) throws Exception { QueueSession session = createQueueSession((QueueConnection)connection, subscription.m_ackMode); QueueReceiver receiver = createReceiver(session, (Queue)subscription.m_endpoint.getDestination(session), subscription.m_messageSelector); return new ListenerSession(session, receiver, subscription); } } }