View Javadoc
1   package ejava.examples.jmsscheduler;
2   
3   import javax.jms.ConnectionFactory;
4   import javax.jms.Destination;
5   import javax.jms.JMSConsumer;
6   import javax.jms.JMSContext;
7   import javax.jms.JMSException;
8   import javax.jms.JMSProducer;
9   import javax.jms.JMSRuntimeException;
10  import javax.jms.MapMessage;
11  import javax.jms.Message;
12  import javax.jms.Session;
13  import javax.naming.Context;
14  import javax.naming.InitialContext;
15  
16  import org.slf4j.Logger;
17  import org.slf4j.LoggerFactory;
18  
19  /**
20   * This class simulates the job of a worker. It will attempt to take a message
21   * off the queue, work on it, and issue a reply. The length of time taken on 
22   * each message will vary per message based on a difficulty index. The worker
23   * will quite when it hits its max value; always failing to repond to the last
24   * request processed (on purpose).
25   */
26  public class Worker implements Runnable {
27      private static final Logger logger = LoggerFactory.getLogger(Worker.class);
28      protected ConnectionFactory connFactory;
29      protected Destination requestQueue;
30      protected Destination dlq;
31      protected boolean stop = false;
32      protected boolean stopped = false;
33      protected boolean started = false;
34      protected boolean noFail = false;
35      protected String name;
36      protected int count=0;
37      protected int maxCount=0;
38      protected long delay[] = {0, 0, 0, 0, 10, 10, 10, 10, 100, 100}; 
39      protected String username;
40      protected String password;
41  
42      public Worker(String name) {
43          this.name = name;
44      }
45      public void setConnFactory(ConnectionFactory connFactory) {
46          this.connFactory = connFactory;
47      }
48      public void setRequestQueue(Destination requestQueue) {
49          this.requestQueue = requestQueue;
50      }    
51      public void setDLQ(Destination dlq) {
52          this.dlq = dlq;
53      }    
54      public int getCount() {
55          return count;
56      }
57      public void setMaxCount(int maxCount) {
58          this.maxCount = maxCount;
59      }
60      public void clearMessages() {
61          count = 0;
62      }
63      public void stop() {
64          this.stop = true;
65      }
66      public boolean isStopped() {
67          return stopped;
68      }
69      public boolean isStarted() {
70          return started;
71      }
72      public void setNoFail(boolean noFail) {
73      	this.noFail = noFail;
74      }
75      protected JMSContext createContext(Integer sessionMode) throws Exception {
76          if (sessionMode==null) {
77              return username==null ? 
78                      connFactory.createContext() :
79                      connFactory.createContext(username, password);            
80          } else {
81              return username==null ? 
82                      connFactory.createContext(sessionMode) :
83                      connFactory.createContext(username, password, sessionMode);                        
84          }
85      }
86      public void setUsername(String username) {
87  		this.username = username;
88  	}
89      public void setPassword(String password) {
90  		this.password = password;
91  	}
92      public void execute() throws Exception {
93          try (JMSContext context=createContext(Session.SESSION_TRANSACTED)) {
94              //use a transacted session to join request/response in single Tx
95              try (JMSConsumer consumer = context.createConsumer(requestQueue)) {
96                  context.start();
97                  
98                  stopped = stop = false;
99                  logger.info("worker " + name + " starting");
100                 started = true;
101                 
102                 JMSProducer producer = context.createProducer();
103                 while (!stop && (maxCount==0 || count < maxCount)) {
104                     Message message = consumer.receive(3000);
105                     if (message != null) {
106                         count += 1;                     
107                         try {
108                             MapMessage request = (MapMessage)message;
109                             int difficulty = request.getInt("difficulty");
110                             long sleepTime = delay[difficulty];
111                             int requestCounter = request.getIntProperty("count");
112                             Destination replyTo = request.getJMSReplyTo();
113                             logger.debug(name + " received message #{}, req={}, replyTo={}, delay={}", 
114                                     count, requestCounter, replyTo, sleepTime);
115                             Thread.sleep(sleepTime);
116                             if (count < maxCount || maxCount==0 || noFail){//fail on last one
117                                 Message response = context.createMessage();
118                                 response.setJMSCorrelationID(request.getJMSMessageID());
119                                 response.setStringProperty("worker", name);
120                                 try {
121                                     producer.send(replyTo, response);
122                                 } catch (JMSRuntimeException ex) {
123                                     logger.error("error sending reply:" + ex);                                
124                                     producer.send(dlq, request);
125                                 } finally {
126                                     logger.debug("committing session for: {}", request.getJMSMessageID());
127                                     context.commit();
128                                 }
129                             }
130                         }
131                         catch (Exception ex) {
132                             logger.error("error processing request:", ex);
133                             producer.send(dlq, message);
134                             logger.debug("committing session");
135                             context.commit();
136                         }
137                         Thread.yield();
138                     }      
139                 }
140             }
141             logger.info("worker {} stopping", name);
142             context.stop();
143         }
144         finally {
145             stopped = true;
146             started = false;
147         }
148     }
149     
150     public void run() {
151         try {
152             execute();
153         }
154         catch (Exception ex) {
155             logger.error("error running " + name, ex);
156         }
157     }    
158 
159     public static void main(String args[]) {
160     	boolean noExit=false;
161         try {
162             System.out.print("Worker args:");
163             for (String s: args) {
164                 System.out.print(s + " ");
165             }
166 
167             String connFactoryJNDI=null;
168             String requestQueueJNDI=null;
169             String dlqJNDI=null;
170             String name="";
171             Integer maxCount=null;
172             boolean noFail=false;
173             String username=null;
174             String password=null;
175             for (int i=0; i<args.length; i++) {
176                 if ("-jndi.name.connFactory".equals(args[i])) {
177                     connFactoryJNDI = args[++i];
178                 }
179                 else if ("-jndi.name.requestQueue".equals(args[i])) {
180                     requestQueueJNDI=args[++i];
181                 }
182                 else if ("-jndi.name.DLQ".equals(args[i])) {
183                     dlqJNDI=args[++i];
184                 }
185                 else if ("-name".equals(args[i])) {
186                     name=args[++i];
187                 }
188                 else if ("-max".equals(args[i])) {
189                     maxCount=new Integer(args[++i]);
190                 }
191                 else if ("-noFail".equals(args[i])) {
192                 	noFail=Boolean.parseBoolean(args[++i]);
193                 }
194                 else if ("-username".equals(args[i])) {
195                     username=args[++i];
196                 }
197                 else if ("-password".equals(args[i])) {
198                     password=args[++i];
199                 }
200                 else if ("-noExit".equals(args[i])) {
201                     noExit=true;
202                 }
203             }
204             if (connFactoryJNDI==null) { 
205                 throw new Exception("jndi.name.connFactory not supplied");
206             }
207             else if (requestQueueJNDI==null) {
208                 throw new Exception("jndi.name.requestQueue not supplied");
209             }            
210             else if (dlqJNDI==null) {
211                 throw new Exception("jndi.name.DLQ not supplied");
212             }            
213             Worker worker = new Worker(name);
214             Context jndi = new InitialContext();
215             worker.setConnFactory(
216                     (ConnectionFactory)jndi.lookup(connFactoryJNDI));
217             worker.setRequestQueue((Destination)jndi.lookup(requestQueueJNDI));
218             worker.setDLQ((Destination)jndi.lookup(dlqJNDI));
219             worker.setNoFail(noFail);
220             if (maxCount!=null) {
221                 worker.setMaxCount(maxCount);
222             }
223             worker.setUsername(username);
224             worker.setPassword(password);
225             worker.execute();
226         }
227         catch (Exception ex) {
228             logger.error("",ex);
229             if (noExit) {
230             	throw new RuntimeException("worker error", ex);
231             }
232             System.exit(-1);            
233         }
234     }
235 
236 
237 }