1 package ejava.examples.jmsscheduler;
2
3 import javax.jms.ConnectionFactory;
4 import javax.jms.Destination;
5 import javax.jms.JMSConsumer;
6 import javax.jms.JMSContext;
7 import javax.jms.JMSException;
8 import javax.jms.JMSProducer;
9 import javax.jms.JMSRuntimeException;
10 import javax.jms.MapMessage;
11 import javax.jms.Message;
12 import javax.jms.Session;
13 import javax.naming.Context;
14 import javax.naming.InitialContext;
15
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19
20
21
22
23
24
25
26 public class Worker implements Runnable {
27 private static final Logger logger = LoggerFactory.getLogger(Worker.class);
28 protected ConnectionFactory connFactory;
29 protected Destination requestQueue;
30 protected Destination dlq;
31 protected boolean stop = false;
32 protected boolean stopped = false;
33 protected boolean started = false;
34 protected boolean noFail = false;
35 protected String name;
36 protected int count=0;
37 protected int maxCount=0;
38 protected long delay[] = {0, 0, 0, 0, 10, 10, 10, 10, 100, 100};
39 protected String username;
40 protected String password;
41
42 public Worker(String name) {
43 this.name = name;
44 }
45 public void setConnFactory(ConnectionFactory connFactory) {
46 this.connFactory = connFactory;
47 }
48 public void setRequestQueue(Destination requestQueue) {
49 this.requestQueue = requestQueue;
50 }
51 public void setDLQ(Destination dlq) {
52 this.dlq = dlq;
53 }
54 public int getCount() {
55 return count;
56 }
57 public void setMaxCount(int maxCount) {
58 this.maxCount = maxCount;
59 }
60 public void clearMessages() {
61 count = 0;
62 }
63 public void stop() {
64 this.stop = true;
65 }
66 public boolean isStopped() {
67 return stopped;
68 }
69 public boolean isStarted() {
70 return started;
71 }
72 public void setNoFail(boolean noFail) {
73 this.noFail = noFail;
74 }
75 protected JMSContext createContext(Integer sessionMode) throws Exception {
76 if (sessionMode==null) {
77 return username==null ?
78 connFactory.createContext() :
79 connFactory.createContext(username, password);
80 } else {
81 return username==null ?
82 connFactory.createContext(sessionMode) :
83 connFactory.createContext(username, password, sessionMode);
84 }
85 }
86 public void setUsername(String username) {
87 this.username = username;
88 }
89 public void setPassword(String password) {
90 this.password = password;
91 }
92 public void execute() throws Exception {
93 try (JMSContext context=createContext(Session.SESSION_TRANSACTED)) {
94
95 try (JMSConsumer consumer = context.createConsumer(requestQueue)) {
96 context.start();
97
98 stopped = stop = false;
99 logger.info("worker " + name + " starting");
100 started = true;
101
102 JMSProducer producer = context.createProducer();
103 while (!stop && (maxCount==0 || count < maxCount)) {
104 Message message = consumer.receive(3000);
105 if (message != null) {
106 count += 1;
107 try {
108 MapMessage request = (MapMessage)message;
109 int difficulty = request.getInt("difficulty");
110 long sleepTime = delay[difficulty];
111 int requestCounter = request.getIntProperty("count");
112 Destination replyTo = request.getJMSReplyTo();
113 logger.debug(name + " received message #{}, req={}, replyTo={}, delay={}",
114 count, requestCounter, replyTo, sleepTime);
115 Thread.sleep(sleepTime);
116 if (count < maxCount || maxCount==0 || noFail){
117 Message response = context.createMessage();
118 response.setJMSCorrelationID(request.getJMSMessageID());
119 response.setStringProperty("worker", name);
120 try {
121 producer.send(replyTo, response);
122 } catch (JMSRuntimeException ex) {
123 logger.error("error sending reply:" + ex);
124 producer.send(dlq, request);
125 } finally {
126 logger.debug("committing session for: {}", request.getJMSMessageID());
127 context.commit();
128 }
129 }
130 }
131 catch (Exception ex) {
132 logger.error("error processing request:", ex);
133 producer.send(dlq, message);
134 logger.debug("committing session");
135 context.commit();
136 }
137 Thread.yield();
138 }
139 }
140 }
141 logger.info("worker {} stopping", name);
142 context.stop();
143 }
144 finally {
145 stopped = true;
146 started = false;
147 }
148 }
149
150 public void run() {
151 try {
152 execute();
153 }
154 catch (Exception ex) {
155 logger.error("error running " + name, ex);
156 }
157 }
158
159 public static void main(String args[]) {
160 boolean noExit=false;
161 try {
162 System.out.print("Worker args:");
163 for (String s: args) {
164 System.out.print(s + " ");
165 }
166
167 String connFactoryJNDI=null;
168 String requestQueueJNDI=null;
169 String dlqJNDI=null;
170 String name="";
171 Integer maxCount=null;
172 boolean noFail=false;
173 String username=null;
174 String password=null;
175 for (int i=0; i<args.length; i++) {
176 if ("-jndi.name.connFactory".equals(args[i])) {
177 connFactoryJNDI = args[++i];
178 }
179 else if ("-jndi.name.requestQueue".equals(args[i])) {
180 requestQueueJNDI=args[++i];
181 }
182 else if ("-jndi.name.DLQ".equals(args[i])) {
183 dlqJNDI=args[++i];
184 }
185 else if ("-name".equals(args[i])) {
186 name=args[++i];
187 }
188 else if ("-max".equals(args[i])) {
189 maxCount=new Integer(args[++i]);
190 }
191 else if ("-noFail".equals(args[i])) {
192 noFail=Boolean.parseBoolean(args[++i]);
193 }
194 else if ("-username".equals(args[i])) {
195 username=args[++i];
196 }
197 else if ("-password".equals(args[i])) {
198 password=args[++i];
199 }
200 else if ("-noExit".equals(args[i])) {
201 noExit=true;
202 }
203 }
204 if (connFactoryJNDI==null) {
205 throw new Exception("jndi.name.connFactory not supplied");
206 }
207 else if (requestQueueJNDI==null) {
208 throw new Exception("jndi.name.requestQueue not supplied");
209 }
210 else if (dlqJNDI==null) {
211 throw new Exception("jndi.name.DLQ not supplied");
212 }
213 Worker worker = new Worker(name);
214 Context jndi = new InitialContext();
215 worker.setConnFactory(
216 (ConnectionFactory)jndi.lookup(connFactoryJNDI));
217 worker.setRequestQueue((Destination)jndi.lookup(requestQueueJNDI));
218 worker.setDLQ((Destination)jndi.lookup(dlqJNDI));
219 worker.setNoFail(noFail);
220 if (maxCount!=null) {
221 worker.setMaxCount(maxCount);
222 }
223 worker.setUsername(username);
224 worker.setPassword(password);
225 worker.execute();
226 }
227 catch (Exception ex) {
228 logger.error("",ex);
229 if (noExit) {
230 throw new RuntimeException("worker error", ex);
231 }
232 System.exit(-1);
233 }
234 }
235
236
237 }