View Javadoc
1   package ejava.examples.jms10.jmsmechanics;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   
6   import javax.jms.Connection;
7   import javax.jms.ConnectionFactory;
8   import javax.jms.Destination;
9   import javax.jms.JMSException;
10  import javax.jms.Message;
11  import javax.jms.MessageConsumer;
12  import javax.jms.Session;
13  import javax.naming.Context;
14  import javax.naming.InitialContext;
15  
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  /**
20   * This is a support class uses to receive messages by test cases that
21   * are sending messages either to a queue or a topic.
22   */
23  public class MessageCatcher implements Runnable {
24      private static final Logger logger = LoggerFactory.getLogger(MessageCatcher.class);
25      protected ConnectionFactory connFactory;
26      protected String user;
27      protected String password;
28      protected Session sharedSession;
29      protected Destination destination;
30      protected int ackMode = Session.AUTO_ACKNOWLEDGE;
31      protected boolean stop = false;
32      protected boolean stopped = false;
33      protected boolean started = false;
34      protected List<Message> messages = new ArrayList<Message>();
35      protected String name;
36          
37      public MessageCatcher(String name) {
38          this.name = name;
39      }
40      public String getName() { return name; }
41      public void setConnFactory(ConnectionFactory connFactory) {
42          this.connFactory = connFactory;
43      }
44  	public void setUser(String user) {
45  		this.user = user;
46  	}
47  	public void setPassword(String password) {
48  		this.password = password;
49  	}
50      public MessageCatcher setSession(Session session) {
51          this.sharedSession = session;
52          return this;
53      }
54      public void setDestination(Destination destination) {
55          this.destination = destination;
56      }    
57      public MessageCatcher setAckMode(int ackMode) {
58          this.ackMode = ackMode;
59          return this;
60      }
61      public int getCount() {
62          return messages.size();
63      }
64      public void clearMessages() {
65          messages.clear();
66      }
67      public List<Message> getMessages() {
68          return messages;
69      }
70      public void stop() {
71          this.stop = true;
72      }
73      public boolean isStopped() {
74          return stopped;
75      }
76      public boolean isStarted() {
77          return started;
78      }
79      
80      protected Connection getConnection() throws JMSException {
81      	return user==null ? 
82  			connFactory.createConnection() : 
83  			connFactory.createConnection(user, password);
84      }
85      
86      public void execute() throws JMSException {
87          Connection connection = null;
88          Session session = this.sharedSession;
89          MessageConsumer consumer = null;
90          try {
91              if (session == null) {
92                  connection = getConnection();
93                  session = connection.createSession(false, ackMode);
94              }
95              consumer = session.createConsumer(destination);
96              if (this.sharedSession == null) {
97                  connection.start();
98              }
99              stopped = stop = false;
100             logger.info("catcher {} starting (ackMode={})", name, ackMode);
101             started = true;
102             for (int i=0;!stop; i++) {
103                 if (i%30==0) { logger.debug("catcher looking for message"); }
104                 Message message = consumer.receive(100);
105                 if (message != null && !message.getJMSRedelivered()) {
106                     messages.add(message);
107                     logger.debug("{} received message #{}, msgId={}", name, messages.size(), message.getJMSMessageID());
108                     Thread.yield();
109                 }      
110             }
111             logger.info("catcher {} stopping (ackMode={})", name, ackMode);
112             if (ackMode == Session.CLIENT_ACKNOWLEDGE && messages.size() > 0) {
113                 logger.debug("catcher {} acknowledging messages", name);
114                 messages.get(messages.size()-1).acknowledge();
115             }
116             if (this.sharedSession == null) {
117                 connection.stop();
118             }
119         }
120         finally {
121             stopped = true;
122             //started = false;
123             if (consumer != null)   { consumer.close(); }
124             if (this.sharedSession == null && session!=null){ session.close();}
125             if (connection != null) { connection.close(); }
126         }
127     }
128     
129     public void run() {
130         try {
131             execute();
132         }
133         catch (Exception ex) {
134             logger.error("error running " + name, ex);
135         }
136     }    
137 
138     public static void main(String args[]) {
139         try {
140             String connFactoryJNDI=null;
141             String destinationJNDI=null;
142             String name="";
143             for (int i=0; i<args.length; i++) {
144                 if ("-jndi.name.connFactory".equals(args[i])) {
145                     connFactoryJNDI = args[++i];
146                 }
147                 else if ("-jndi.name.destination".equals(args[i])) {
148                     destinationJNDI=args[++i];
149                 }
150                 else if ("-name".equals(args[i])) {
151                     name=args[++i];
152                 }
153             }
154             if (connFactoryJNDI==null) { 
155                 throw new Exception("jndi.name.connFactory not supplied");
156             }
157             else if (destinationJNDI==null) {
158                 throw new Exception("jndi.name.destination not supplied");
159             }            
160             MessageCatcher catcher = new MessageCatcher(name);
161             Context jndi = new InitialContext();
162             catcher.setConnFactory(
163                     (ConnectionFactory)jndi.lookup(connFactoryJNDI));
164             catcher.setDestination((Destination)jndi.lookup(destinationJNDI));
165             catcher.execute();
166         }
167         catch (Exception ex) {
168             logger.error("",ex);
169             System.exit(-1);            
170         }
171     }
172 }