View Javadoc
1   package ejava.examples.jms20.jmsmechanics;
2   
3   import java.util.ArrayList;
4   import java.util.List;
5   
6   import javax.jms.Destination;
7   import javax.jms.JMSConsumer;
8   import javax.jms.JMSContext;
9   import javax.jms.JMSException;
10  import javax.jms.Message;
11  import javax.jms.Session;
12  
13  import org.slf4j.Logger;
14  import org.slf4j.LoggerFactory;
15  
16  /**
17   * This is a support class uses to receive messages by test cases that
18   * are sending messages either to a queue or a topic.
19   */
20  public class MessageCatcher implements Runnable {
21      private static final Logger logger = LoggerFactory.getLogger(MessageCatcher.class);
22      private String name;
23      private JMSContext parentContext;
24      private Destination destination;
25      private int ackMode = Session.AUTO_ACKNOWLEDGE;
26      private boolean stop;
27      private boolean stopped;
28      private boolean started;
29      private List<Message> messages = new ArrayList<Message>();
30          
31      public MessageCatcher(String name) {
32          this.name = name;
33      }
34      public String getName() { return name; }
35      public MessageCatcher setContext(JMSContext context) {
36          this.parentContext = context;
37          return this;
38      }    
39      public void setDestination(Destination destination) {
40          this.destination = destination;
41      }    
42      public MessageCatcher setAckMode(int ackMode) {
43          this.ackMode = ackMode;
44          return this;
45      }
46      public int getCount() {
47          return messages.size();
48      }
49      public void clearMessages() {
50          messages.clear();
51      }
52      public List<Message> getMessages() {
53          return messages;
54      }
55      public void stop() {
56          this.stop = true;
57      }
58      public boolean isStopped() {
59          return stopped;
60      }
61      public boolean isStarted() {
62          return started;
63      }
64      
65      public void execute() throws JMSException {
66          try (JMSContext context = parentContext.createContext(ackMode)) {
67              try (JMSConsumer consumer = context.createConsumer(destination)) {
68                  context.start();
69                  stopped = stop = false;
70                  logger.info("catcher {} starting (ackMode={})", name, ackMode);
71                  started = true;
72                  for (int i=0;!stop; i++) {
73                      if (i%30==0) { logger.debug("catcher {} looking for message", name); }
74                      Message message = consumer.receive(100);
75                      if (message != null) {
76                          messages.add(message);
77                          logger.debug("{} received message #{}, msgId={}", name, messages.size(), message.getJMSMessageID());
78                          if (!stop) { Thread.yield(); }
79                      }      
80                  }
81              }
82              
83              logger.info("catcher {} stopping (ackMode={})", name, ackMode);
84              if (ackMode == JMSContext.CLIENT_ACKNOWLEDGE
85   && messages.size() > 0) {
86                  logger.debug("catcher {} acknowledging messages", name);
87                  messages.get(messages.size()-1).acknowledge();
88              }
89              context.stop();
90          }
91          finally {
92              stopped = true;
93          }
94      }
95      
96      public void run() {
97          try {
98              execute();
99          }
100         catch (Exception ex) {
101             logger.error("error running " + name, ex);
102             throw new RuntimeException("error running " + name, ex);
103         }
104     }
105 }