1 package ejava.examples.jmsmechanics;
2
3 import java.util.ArrayList;
4 import java.util.List;
5
6 import javax.jms.Connection;
7 import javax.jms.ConnectionFactory;
8 import javax.jms.Destination;
9 import javax.jms.JMSException;
10 import javax.jms.Message;
11 import javax.jms.MessageConsumer;
12 import javax.jms.Session;
13 import javax.naming.Context;
14 import javax.naming.InitialContext;
15
16 import org.apache.commons.logging.Log;
17 import org.apache.commons.logging.LogFactory;
18
19
20
21
22
23
24
25 public class MessageCatcher implements Runnable {
26 private static final Log log = LogFactory.getLog(MessageCatcher.class);
27 protected ConnectionFactory connFactory;
28 protected String user;
29 protected String password;
30 protected Session sharedSession;
31 protected Destination destination;
32 protected int ackMode = Session.AUTO_ACKNOWLEDGE;
33 protected boolean stop = false;
34 protected boolean stopped = false;
35 protected boolean started = false;
36 protected List<Message> messages = new ArrayList<Message>();
37 protected String name;
38
39 public MessageCatcher(String name) {
40 this.name = name;
41 }
42 public String getName() { return name; }
43 public void setConnFactory(ConnectionFactory connFactory) {
44 this.connFactory = connFactory;
45 }
46 public void setUser(String user) {
47 this.user = user;
48 }
49 public void setPassword(String password) {
50 this.password = password;
51 }
52 public MessageCatcher setSession(Session session) {
53 this.sharedSession = session;
54 return this;
55 }
56 public void setDestination(Destination destination) {
57 this.destination = destination;
58 }
59 public MessageCatcher setAckMode(int ackMode) {
60 this.ackMode = ackMode;
61 return this;
62 }
63 public int getCount() {
64 return messages.size();
65 }
66 public void clearMessages() {
67 messages.clear();
68 }
69 public List<Message> getMessages() {
70 return messages;
71 }
72 public void stop() {
73 this.stop = true;
74 }
75 public boolean isStopped() {
76 return stopped;
77 }
78 public boolean isStarted() {
79 return started;
80 }
81
82 protected Connection getConnection() throws JMSException {
83 return user==null ?
84 connFactory.createConnection() :
85 connFactory.createConnection(user, password);
86 }
87
88 public void execute() throws JMSException {
89 Connection connection = null;
90 Session session = this.sharedSession;
91 MessageConsumer consumer = null;
92 try {
93 if (session == null) {
94 connection = getConnection();
95 session = connection.createSession(false, ackMode);
96 }
97 consumer = session.createConsumer(destination);
98 if (this.sharedSession == null) {
99 connection.start();
100 }
101 stopped = stop = false;
102 log.info("catcher " + name + " starting (ackMode=" + ackMode + ")");
103 started = true;
104 while (!stop) {
105 log.debug("catcher looking for message");
106 Message message = consumer.receive(3000);
107 if (message != null) {
108 messages.add(message);
109 log.debug(name + " received message #" + messages.size() +
110 ", msgId=" + message.getJMSMessageID());
111 Thread.yield();
112 }
113 }
114 log.info("catcher " + name + " stopping (ackMode=" + ackMode + ")");
115 if (ackMode == Session.CLIENT_ACKNOWLEDGE && messages.size() > 0) {
116 log.debug("catcher " + name + " acknowledging messages");
117 messages.get(messages.size()-1).acknowledge();
118 }
119 if (this.sharedSession == null) {
120 connection.stop();
121 }
122 }
123 finally {
124 stopped = true;
125
126 if (consumer != null) { consumer.close(); }
127 if (this.sharedSession == null && session!=null){ session.close();}
128 if (connection != null) { connection.close(); }
129 }
130 }
131
132 public void run() {
133 try {
134 execute();
135 }
136 catch (Exception ex) {
137 log.fatal("error running " + name, ex);
138 }
139 }
140
141 public static void main(String args[]) {
142 try {
143 String connFactoryJNDI=null;
144 String destinationJNDI=null;
145 String name="";
146 for (int i=0; i<args.length; i++) {
147 if ("-jndi.name.connFactory".equals(args[i])) {
148 connFactoryJNDI = args[++i];
149 }
150 else if ("-jndi.name.destination".equals(args[i])) {
151 destinationJNDI=args[++i];
152 }
153 else if ("-name".equals(args[i])) {
154 name=args[++i];
155 }
156 }
157 if (connFactoryJNDI==null) {
158 throw new Exception("jndi.name.connFactory not supplied");
159 }
160 else if (destinationJNDI==null) {
161 throw new Exception("jndi.name.destination not supplied");
162 }
163 MessageCatcher catcher = new MessageCatcher(name);
164 Context jndi = new InitialContext();
165 catcher.setConnFactory(
166 (ConnectionFactory)jndi.lookup(connFactoryJNDI));
167 catcher.setDestination((Destination)jndi.lookup(destinationJNDI));
168 catcher.execute();
169 }
170 catch (Exception ex) {
171 log.fatal(ex);
172 System.exit(-1);
173 }
174 }
175 }