1 package ejava.examples.jmsscheduler;
2
3 import javax.jms.Connection;
4 import javax.jms.ConnectionFactory;
5 import javax.jms.Destination;
6 import javax.jms.JMSException;
7 import javax.jms.MapMessage;
8 import javax.jms.Message;
9 import javax.jms.MessageConsumer;
10 import javax.jms.MessageProducer;
11 import javax.jms.Session;
12 import javax.naming.Context;
13 import javax.naming.InitialContext;
14
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17
18
19
20
21
22
23
24
25
26
27 public class Worker implements Runnable {
28 private static final Log log = LogFactory.getLog(Worker.class);
29 protected ConnectionFactory connFactory;
30 protected Destination requestQueue;
31 protected Destination dlq;
32 protected boolean stop = false;
33 protected boolean stopped = false;
34 protected boolean started = false;
35 protected boolean noFail = false;
36 protected String name;
37 protected int count=0;
38 protected int maxCount=0;
39 protected long delay[] = {0, 0, 0, 0, 10, 10, 10, 10, 100, 100};
40 protected String username;
41 protected String password;
42
43 public Worker(String name) {
44 this.name = name;
45 }
46 public void setConnFactory(ConnectionFactory connFactory) {
47 this.connFactory = connFactory;
48 }
49 public void setRequestQueue(Destination requestQueue) {
50 this.requestQueue = requestQueue;
51 }
52 public void setDLQ(Destination dlq) {
53 this.dlq = dlq;
54 }
55 public int getCount() {
56 return count;
57 }
58 public void setMaxCount(int maxCount) {
59 this.maxCount = maxCount;
60 }
61 public void clearMessages() {
62 count = 0;
63 }
64 public void stop() {
65 this.stop = true;
66 }
67 public boolean isStopped() {
68 return stopped;
69 }
70 public boolean isStarted() {
71 return started;
72 }
73 public void setNoFail(boolean noFail) {
74 this.noFail = noFail;
75 }
76 protected Connection createConnection(ConnectionFactory connFactory)
77 throws Exception {
78 return username==null ?
79 connFactory.createConnection() :
80 connFactory.createConnection(username, password);
81 }
82 public void setUsername(String username) {
83 this.username = username;
84 }
85 public void setPassword(String password) {
86 this.password = password;
87 }
88 public void execute() throws Exception {
89 Connection connection = null;
90 Session session = null;
91 MessageConsumer consumer = null;
92 MessageProducer producer = null;
93 MessageProducer dlqProducer = null;
94 try {
95 connection = createConnection(connFactory);
96
97 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
98 consumer = session.createConsumer(requestQueue);
99 producer = session.createProducer(null);
100 dlqProducer = session.createProducer(dlq);
101 connection.start();
102
103 stopped = stop = false;
104 log.info("worker " + name + " starting");
105 started = true;
106 while (!stop && (maxCount==0 || count < maxCount)) {
107 Message message = consumer.receive(3000);
108 if (message != null) {
109 count += 1;
110 try {
111 MapMessage request = (MapMessage)message;
112 int difficulty = request.getInt("difficulty");
113 long sleepTime = delay[difficulty];
114 int requestCounter = request.getIntProperty("count");
115 StringBuilder text = new StringBuilder();
116 Destination replyTo = request.getJMSReplyTo();
117 text.append(name + " received message #" + count +
118 ", req=" + requestCounter +
119 ", replyTo=" + replyTo +
120 ", delay=" + sleepTime);
121 log.debug(text.toString());
122 Thread.sleep(sleepTime);
123 if (count < maxCount || maxCount==0 || noFail){
124 Message response = session.createMessage();
125 response.setJMSCorrelationID(
126 request.getJMSMessageID());
127 response.setStringProperty("worker", name);
128 try {
129 producer.send(replyTo, response);
130 }
131 catch (JMSException ex) {
132 log.error("error sending reply:" + ex);
133 dlqProducer.send(request);
134 }
135 finally {
136 log.debug("committing session for: " + request.getJMSMessageID());
137 session.commit();
138 }
139 }
140 }
141 catch (Exception ex) {
142 log.error("error processing request:" + ex);
143 dlqProducer.send(message);
144 log.debug("committing session");
145 session.commit();
146 }
147 Thread.yield();
148 }
149 }
150 log.info("worker " + name + " stopping");
151 connection.stop();
152 }
153 finally {
154 stopped = true;
155 started = false;
156 if (consumer != null) { consumer.close(); }
157 if (session!=null){ session.close();}
158 if (connection != null) { connection.close(); }
159 }
160 }
161
162 public void run() {
163 try {
164 execute();
165 }
166 catch (Exception ex) {
167 log.fatal("error running " + name, ex);
168 }
169 }
170
171 public static void main(String args[]) {
172 boolean noExit=false;
173 try {
174 System.out.print("Worker args:");
175 for (String s: args) {
176 System.out.print(s + " ");
177 }
178
179 String connFactoryJNDI=null;
180 String requestQueueJNDI=null;
181 String dlqJNDI=null;
182 String name="";
183 Integer maxCount=null;
184 boolean noFail=false;
185 String username=null;
186 String password=null;
187 for (int i=0; i<args.length; i++) {
188 if ("-jndi.name.connFactory".equals(args[i])) {
189 connFactoryJNDI = args[++i];
190 }
191 else if ("-jndi.name.requestQueue".equals(args[i])) {
192 requestQueueJNDI=args[++i];
193 }
194 else if ("-jndi.name.DLQ".equals(args[i])) {
195 dlqJNDI=args[++i];
196 }
197 else if ("-name".equals(args[i])) {
198 name=args[++i];
199 }
200 else if ("-max".equals(args[i])) {
201 maxCount=new Integer(args[++i]);
202 }
203 else if ("-noFail".equals(args[i])) {
204 noFail=Boolean.parseBoolean(args[++i]);
205 }
206 else if ("-username".equals(args[i])) {
207 username=args[++i];
208 }
209 else if ("-password".equals(args[i])) {
210 password=args[++i];
211 }
212 else if ("-noExit".equals(args[i])) {
213 noExit=true;
214 }
215 }
216 if (connFactoryJNDI==null) {
217 throw new Exception("jndi.name.connFactory not supplied");
218 }
219 else if (requestQueueJNDI==null) {
220 throw new Exception("jndi.name.requestQueue not supplied");
221 }
222 else if (dlqJNDI==null) {
223 throw new Exception("jndi.name.DLQ not supplied");
224 }
225 Worker worker = new Worker(name);
226 Context jndi = new InitialContext();
227 worker.setConnFactory(
228 (ConnectionFactory)jndi.lookup(connFactoryJNDI));
229 worker.setRequestQueue((Destination)jndi.lookup(requestQueueJNDI));
230 worker.setDLQ((Destination)jndi.lookup(dlqJNDI));
231 worker.setNoFail(noFail);
232 if (maxCount!=null) {
233 worker.setMaxCount(maxCount);
234 }
235 worker.setUsername(username);
236 worker.setPassword(password);
237 worker.execute();
238 }
239 catch (Exception ex) {
240 log.fatal("",ex);
241 if (noExit) {
242 throw new RuntimeException("worker error", ex);
243 }
244 System.exit(-1);
245 }
246 }
247
248
249 }