1 package ejava.examples.jms20.jmsmechanics;
2
3 import java.util.ArrayList;
4 import java.util.List;
5
6 import javax.jms.Destination;
7 import javax.jms.JMSConsumer;
8 import javax.jms.JMSContext;
9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.Session;
12
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16
17
18
19
20 public class MessageCatcher implements Runnable {
21 private static final Logger logger = LoggerFactory.getLogger(MessageCatcher.class);
22 private String name;
23 private JMSContext parentContext;
24 private Destination destination;
25 private int ackMode = Session.AUTO_ACKNOWLEDGE;
26 private boolean stop;
27 private boolean stopped;
28 private boolean started;
29 private List<Message> messages = new ArrayList<Message>();
30
31 public MessageCatcher(String name) {
32 this.name = name;
33 }
34 public String getName() { return name; }
35 public MessageCatcher setContext(JMSContext context) {
36 this.parentContext = context;
37 return this;
38 }
39 public void setDestination(Destination destination) {
40 this.destination = destination;
41 }
42 public MessageCatcher setAckMode(int ackMode) {
43 this.ackMode = ackMode;
44 return this;
45 }
46 public int getCount() {
47 return messages.size();
48 }
49 public void clearMessages() {
50 messages.clear();
51 }
52 public List<Message> getMessages() {
53 return messages;
54 }
55 public void stop() {
56 this.stop = true;
57 }
58 public boolean isStopped() {
59 return stopped;
60 }
61 public boolean isStarted() {
62 return started;
63 }
64
65 public void execute() throws JMSException {
66 try (JMSContext context = parentContext.createContext(ackMode)) {
67 try (JMSConsumer consumer = context.createConsumer(destination)) {
68 context.start();
69 stopped = stop = false;
70 logger.info("catcher {} starting (ackMode={})", name, ackMode);
71 started = true;
72 for (int i=0;!stop; i++) {
73 if (i%30==0) { logger.debug("catcher {} looking for message", name); }
74 Message message = consumer.receive(100);
75 if (message != null) {
76 messages.add(message);
77 logger.debug("{} received message #{}, msgId={}", name, messages.size(), message.getJMSMessageID());
78 if (!stop) { Thread.yield(); }
79 }
80 }
81 }
82
83 logger.info("catcher {} stopping (ackMode={})", name, ackMode);
84 if (ackMode == JMSContext.CLIENT_ACKNOWLEDGE
85 && messages.size() > 0) {
86 logger.debug("catcher {} acknowledging messages", name);
87 messages.get(messages.size()-1).acknowledge();
88 }
89 context.stop();
90 }
91 finally {
92 stopped = true;
93 }
94 }
95
96 public void run() {
97 try {
98 execute();
99 }
100 catch (Exception ex) {
101 logger.error("error running " + name, ex);
102 throw new RuntimeException("error running " + name, ex);
103 }
104 }
105 }