View Javadoc
1   package ejava.examples.jmsmechanics;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   
6   import javax.jms.Connection;
7   import javax.jms.ConnectionFactory;
8   import javax.jms.Destination;
9   import javax.jms.JMSException;
10  import javax.jms.Message;
11  import javax.jms.MessageConsumer;
12  import javax.jms.Session;
13  import javax.naming.Context;
14  import javax.naming.InitialContext;
15  
16  import org.apache.commons.logging.Log;
17  import org.apache.commons.logging.LogFactory;
18  
19  /**
20   * This is a support class uses to receive messages by test cases that
21   * are sending messages either to a queue or a topic.
22   *
23   * @author jcstaff
24   */
25  public class MessageCatcher implements Runnable {
26      private static final Log log = LogFactory.getLog(MessageCatcher.class);
27      protected ConnectionFactory connFactory;
28      protected String user;
29      protected String password;
30      protected Session sharedSession;
31      protected Destination destination;
32      protected int ackMode = Session.AUTO_ACKNOWLEDGE;
33      protected boolean stop = false;
34      protected boolean stopped = false;
35      protected boolean started = false;
36      protected List<Message> messages = new ArrayList<Message>();
37      protected String name;
38          
39      public MessageCatcher(String name) {
40          this.name = name;
41      }
42      public String getName() { return name; }
43      public void setConnFactory(ConnectionFactory connFactory) {
44          this.connFactory = connFactory;
45      }
46  	public void setUser(String user) {
47  		this.user = user;
48  	}
49  	public void setPassword(String password) {
50  		this.password = password;
51  	}
52      public MessageCatcher setSession(Session session) {
53          this.sharedSession = session;
54          return this;
55      }
56      public void setDestination(Destination destination) {
57          this.destination = destination;
58      }    
59      public MessageCatcher setAckMode(int ackMode) {
60          this.ackMode = ackMode;
61          return this;
62      }
63      public int getCount() {
64          return messages.size();
65      }
66      public void clearMessages() {
67          messages.clear();
68      }
69      public List<Message> getMessages() {
70          return messages;
71      }
72      public void stop() {
73          this.stop = true;
74      }
75      public boolean isStopped() {
76          return stopped;
77      }
78      public boolean isStarted() {
79          return started;
80      }
81      
82      protected Connection getConnection() throws JMSException {
83      	return user==null ? 
84  			connFactory.createConnection() : 
85  			connFactory.createConnection(user, password);
86      }
87      
88      public void execute() throws JMSException {
89          Connection connection = null;
90          Session session = this.sharedSession;
91          MessageConsumer consumer = null;
92          try {
93              if (session == null) {
94                  connection = getConnection();
95                  session = connection.createSession(false, ackMode);
96              }
97              consumer = session.createConsumer(destination);
98              if (this.sharedSession == null) {
99                  connection.start();
100             }
101             stopped = stop = false;
102             log.info("catcher " + name + " starting (ackMode=" + ackMode + ")");
103             started = true;
104             while (!stop) {
105                 log.debug("catcher looking for message");
106                 Message message = consumer.receive(3000);
107                 if (message != null) {
108                     messages.add(message);
109                     log.debug(name + " received message #" + messages.size() +
110                             ", msgId=" + message.getJMSMessageID());
111                     Thread.yield();
112                 }      
113             }
114             log.info("catcher " + name + " stopping (ackMode=" + ackMode + ")");
115             if (ackMode == Session.CLIENT_ACKNOWLEDGE && messages.size() > 0) {
116                 log.debug("catcher " + name + " acknowledging messages");
117                 messages.get(messages.size()-1).acknowledge();
118             }
119             if (this.sharedSession == null) {
120                 connection.stop();
121             }
122         }
123         finally {
124             stopped = true;
125             //started = false;
126             if (consumer != null)   { consumer.close(); }
127             if (this.sharedSession == null && session!=null){ session.close();}
128             if (connection != null) { connection.close(); }
129         }
130     }
131     
132     public void run() {
133         try {
134             execute();
135         }
136         catch (Exception ex) {
137             log.fatal("error running " + name, ex);
138         }
139     }    
140 
141     public static void main(String args[]) {
142         try {
143             String connFactoryJNDI=null;
144             String destinationJNDI=null;
145             String name="";
146             for (int i=0; i<args.length; i++) {
147                 if ("-jndi.name.connFactory".equals(args[i])) {
148                     connFactoryJNDI = args[++i];
149                 }
150                 else if ("-jndi.name.destination".equals(args[i])) {
151                     destinationJNDI=args[++i];
152                 }
153                 else if ("-name".equals(args[i])) {
154                     name=args[++i];
155                 }
156             }
157             if (connFactoryJNDI==null) { 
158                 throw new Exception("jndi.name.connFactory not supplied");
159             }
160             else if (destinationJNDI==null) {
161                 throw new Exception("jndi.name.destination not supplied");
162             }            
163             MessageCatcher catcher = new MessageCatcher(name);
164             Context jndi = new InitialContext();
165             catcher.setConnFactory(
166                     (ConnectionFactory)jndi.lookup(connFactoryJNDI));
167             catcher.setDestination((Destination)jndi.lookup(destinationJNDI));
168             catcher.execute();
169         }
170         catch (Exception ex) {
171             log.fatal(ex);
172             System.exit(-1);            
173         }
174     }
175 }