1 package ejava.examples.jms10.jmsmechanics;
2
3 import java.util.ArrayList;
4 import java.util.List;
5
6 import javax.jms.Connection;
7 import javax.jms.ConnectionFactory;
8 import javax.jms.Destination;
9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.MessageConsumer;
12 import javax.jms.Session;
13 import javax.naming.Context;
14 import javax.naming.InitialContext;
15
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19
20
21
22
23 public class MessageCatcher implements Runnable {
24 private static final Logger logger = LoggerFactory.getLogger(MessageCatcher.class);
25 protected ConnectionFactory connFactory;
26 protected String user;
27 protected String password;
28 protected Session sharedSession;
29 protected Destination destination;
30 protected int ackMode = Session.AUTO_ACKNOWLEDGE;
31 protected boolean stop = false;
32 protected boolean stopped = false;
33 protected boolean started = false;
34 protected List<Message> messages = new ArrayList<Message>();
35 protected String name;
36
37 public MessageCatcher(String name) {
38 this.name = name;
39 }
40 public String getName() { return name; }
41 public void setConnFactory(ConnectionFactory connFactory) {
42 this.connFactory = connFactory;
43 }
44 public void setUser(String user) {
45 this.user = user;
46 }
47 public void setPassword(String password) {
48 this.password = password;
49 }
50 public MessageCatcher setSession(Session session) {
51 this.sharedSession = session;
52 return this;
53 }
54 public void setDestination(Destination destination) {
55 this.destination = destination;
56 }
57 public MessageCatcher setAckMode(int ackMode) {
58 this.ackMode = ackMode;
59 return this;
60 }
61 public int getCount() {
62 return messages.size();
63 }
64 public void clearMessages() {
65 messages.clear();
66 }
67 public List<Message> getMessages() {
68 return messages;
69 }
70 public void stop() {
71 this.stop = true;
72 }
73 public boolean isStopped() {
74 return stopped;
75 }
76 public boolean isStarted() {
77 return started;
78 }
79
80 protected Connection getConnection() throws JMSException {
81 return user==null ?
82 connFactory.createConnection() :
83 connFactory.createConnection(user, password);
84 }
85
86 public void execute() throws JMSException {
87 Connection connection = null;
88 Session session = this.sharedSession;
89 MessageConsumer consumer = null;
90 try {
91 if (session == null) {
92 connection = getConnection();
93 session = connection.createSession(false, ackMode);
94 }
95 consumer = session.createConsumer(destination);
96 if (this.sharedSession == null) {
97 connection.start();
98 }
99 stopped = stop = false;
100 logger.info("catcher {} starting (ackMode={})", name, ackMode);
101 started = true;
102 for (int i=0;!stop; i++) {
103 if (i%30==0) { logger.debug("catcher looking for message"); }
104 Message message = consumer.receive(100);
105 if (message != null && !message.getJMSRedelivered()) {
106 messages.add(message);
107 logger.debug("{} received message #{}, msgId={}", name, messages.size(), message.getJMSMessageID());
108 Thread.yield();
109 }
110 }
111 logger.info("catcher {} stopping (ackMode={})", name, ackMode);
112 if (ackMode == Session.CLIENT_ACKNOWLEDGE && messages.size() > 0) {
113 logger.debug("catcher {} acknowledging messages", name);
114 messages.get(messages.size()-1).acknowledge();
115 }
116 if (this.sharedSession == null) {
117 connection.stop();
118 }
119 }
120 finally {
121 stopped = true;
122
123 if (consumer != null) { consumer.close(); }
124 if (this.sharedSession == null && session!=null){ session.close();}
125 if (connection != null) { connection.close(); }
126 }
127 }
128
129 public void run() {
130 try {
131 execute();
132 }
133 catch (Exception ex) {
134 logger.error("error running " + name, ex);
135 }
136 }
137
138 public static void main(String args[]) {
139 try {
140 String connFactoryJNDI=null;
141 String destinationJNDI=null;
142 String name="";
143 for (int i=0; i<args.length; i++) {
144 if ("-jndi.name.connFactory".equals(args[i])) {
145 connFactoryJNDI = args[++i];
146 }
147 else if ("-jndi.name.destination".equals(args[i])) {
148 destinationJNDI=args[++i];
149 }
150 else if ("-name".equals(args[i])) {
151 name=args[++i];
152 }
153 }
154 if (connFactoryJNDI==null) {
155 throw new Exception("jndi.name.connFactory not supplied");
156 }
157 else if (destinationJNDI==null) {
158 throw new Exception("jndi.name.destination not supplied");
159 }
160 MessageCatcher catcher = new MessageCatcher(name);
161 Context jndi = new InitialContext();
162 catcher.setConnFactory(
163 (ConnectionFactory)jndi.lookup(connFactoryJNDI));
164 catcher.setDestination((Destination)jndi.lookup(destinationJNDI));
165 catcher.execute();
166 }
167 catch (Exception ex) {
168 logger.error("",ex);
169 System.exit(-1);
170 }
171 }
172 }