View Javadoc
1   package ejava.examples.jmsscheduler;
2   
3   import javax.jms.Connection;
4   import javax.jms.ConnectionFactory;
5   import javax.jms.Destination;
6   import javax.jms.JMSException;
7   import javax.jms.MapMessage;
8   import javax.jms.Message;
9   import javax.jms.MessageConsumer;
10  import javax.jms.MessageProducer;
11  import javax.jms.Session;
12  import javax.naming.Context;
13  import javax.naming.InitialContext;
14  
15  import org.apache.commons.logging.Log;
16  import org.apache.commons.logging.LogFactory;
17  
18  /**
19   * This class simulates the job of a worker. It will attempt to take a message
20   * off the queue, work on it, and issue a reply. The length of time taken on 
21   * each message will vary per message based on a difficulty index. The worker
22   * will quite when it hits its max value; always failing to repond to the last
23   * request processed (on purpose).
24   *
25   * @author jcstaff
26   */
27  public class Worker implements Runnable {
28      private static final Log log = LogFactory.getLog(Worker.class);
29      protected ConnectionFactory connFactory;
30      protected Destination requestQueue;
31      protected Destination dlq;
32      protected boolean stop = false;
33      protected boolean stopped = false;
34      protected boolean started = false;
35      protected boolean noFail = false;
36      protected String name;
37      protected int count=0;
38      protected int maxCount=0;
39      protected long delay[] = {0, 0, 0, 0, 10, 10, 10, 10, 100, 100}; 
40      protected String username;
41      protected String password;
42  
43      public Worker(String name) {
44          this.name = name;
45      }
46      public void setConnFactory(ConnectionFactory connFactory) {
47          this.connFactory = connFactory;
48      }
49      public void setRequestQueue(Destination requestQueue) {
50          this.requestQueue = requestQueue;
51      }    
52      public void setDLQ(Destination dlq) {
53          this.dlq = dlq;
54      }    
55      public int getCount() {
56          return count;
57      }
58      public void setMaxCount(int maxCount) {
59          this.maxCount = maxCount;
60      }
61      public void clearMessages() {
62          count = 0;
63      }
64      public void stop() {
65          this.stop = true;
66      }
67      public boolean isStopped() {
68          return stopped;
69      }
70      public boolean isStarted() {
71          return started;
72      }
73      public void setNoFail(boolean noFail) {
74      	this.noFail = noFail;
75      }
76      protected Connection createConnection(ConnectionFactory connFactory) 
77          throws Exception {
78          return username==null ? 
79          		connFactory.createConnection() :
80          		connFactory.createConnection(username, password);
81      }
82      public void setUsername(String username) {
83  		this.username = username;
84  	}
85      public void setPassword(String password) {
86  		this.password = password;
87  	}
88      public void execute() throws Exception {
89          Connection connection = null;
90          Session session = null;
91          MessageConsumer consumer = null;
92          MessageProducer producer = null;
93          MessageProducer dlqProducer = null;
94          try {
95              connection = createConnection(connFactory);
96              //use a transacted session to join request/response in single Tx
97              session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
98              consumer = session.createConsumer(requestQueue);
99              producer = session.createProducer(null);
100             dlqProducer = session.createProducer(dlq);
101             connection.start();
102 
103             stopped = stop = false;
104             log.info("worker " + name + " starting");
105             started = true;
106             while (!stop && (maxCount==0 || count < maxCount)) {
107                 Message message = consumer.receive(3000);
108                 if (message != null) {
109                     count += 1;                     
110                     try {
111                         MapMessage request = (MapMessage)message;
112                         int difficulty = request.getInt("difficulty");
113                         long sleepTime = delay[difficulty];
114                         int requestCounter = request.getIntProperty("count");
115                         StringBuilder text = new StringBuilder();
116                         Destination replyTo = request.getJMSReplyTo();
117                         text.append(name + " received message #" + count +
118                             ", req=" + requestCounter +
119                             ", replyTo=" + replyTo +
120                             ", delay=" + sleepTime);
121                         log.debug(text.toString());
122                         Thread.sleep(sleepTime);
123                         if (count < maxCount || maxCount==0 || noFail){//fail on last one
124                             Message response = session.createMessage();
125                             response.setJMSCorrelationID(
126                                     request.getJMSMessageID());
127                             response.setStringProperty("worker", name);
128                             try {
129                                 producer.send(replyTo, response);
130                             }
131                             catch (JMSException ex) {
132                                 log.error("error sending reply:" + ex);                                
133                                 dlqProducer.send(request);
134                             }
135                             finally {
136                                 log.debug("committing session for: " + request.getJMSMessageID());
137                                 session.commit();
138                             }
139                         }
140                     }
141                     catch (Exception ex) {
142                         log.error("error processing request:" + ex);
143                         dlqProducer.send(message);
144                         log.debug("committing session");
145                         session.commit();
146                     }
147                     Thread.yield();
148                 }      
149             }
150             log.info("worker " + name + " stopping");
151             connection.stop();
152         }
153         finally {
154             stopped = true;
155             started = false;
156             if (consumer != null)   { consumer.close(); }
157             if (session!=null){ session.close();}
158             if (connection != null) { connection.close(); }
159         }
160     }
161     
162     public void run() {
163         try {
164             execute();
165         }
166         catch (Exception ex) {
167             log.fatal("error running " + name, ex);
168         }
169     }    
170 
171     public static void main(String args[]) {
172     	boolean noExit=false;
173         try {
174             System.out.print("Worker args:");
175             for (String s: args) {
176                 System.out.print(s + " ");
177             }
178 
179             String connFactoryJNDI=null;
180             String requestQueueJNDI=null;
181             String dlqJNDI=null;
182             String name="";
183             Integer maxCount=null;
184             boolean noFail=false;
185             String username=null;
186             String password=null;
187             for (int i=0; i<args.length; i++) {
188                 if ("-jndi.name.connFactory".equals(args[i])) {
189                     connFactoryJNDI = args[++i];
190                 }
191                 else if ("-jndi.name.requestQueue".equals(args[i])) {
192                     requestQueueJNDI=args[++i];
193                 }
194                 else if ("-jndi.name.DLQ".equals(args[i])) {
195                     dlqJNDI=args[++i];
196                 }
197                 else if ("-name".equals(args[i])) {
198                     name=args[++i];
199                 }
200                 else if ("-max".equals(args[i])) {
201                     maxCount=new Integer(args[++i]);
202                 }
203                 else if ("-noFail".equals(args[i])) {
204                 	noFail=Boolean.parseBoolean(args[++i]);
205                 }
206                 else if ("-username".equals(args[i])) {
207                     username=args[++i];
208                 }
209                 else if ("-password".equals(args[i])) {
210                     password=args[++i];
211                 }
212                 else if ("-noExit".equals(args[i])) {
213                     noExit=true;
214                 }
215             }
216             if (connFactoryJNDI==null) { 
217                 throw new Exception("jndi.name.connFactory not supplied");
218             }
219             else if (requestQueueJNDI==null) {
220                 throw new Exception("jndi.name.requestQueue not supplied");
221             }            
222             else if (dlqJNDI==null) {
223                 throw new Exception("jndi.name.DLQ not supplied");
224             }            
225             Worker worker = new Worker(name);
226             Context jndi = new InitialContext();
227             worker.setConnFactory(
228                     (ConnectionFactory)jndi.lookup(connFactoryJNDI));
229             worker.setRequestQueue((Destination)jndi.lookup(requestQueueJNDI));
230             worker.setDLQ((Destination)jndi.lookup(dlqJNDI));
231             worker.setNoFail(noFail);
232             if (maxCount!=null) {
233                 worker.setMaxCount(maxCount);
234             }
235             worker.setUsername(username);
236             worker.setPassword(password);
237             worker.execute();
238         }
239         catch (Exception ex) {
240             log.fatal("",ex);
241             if (noExit) {
242             	throw new RuntimeException("worker error", ex);
243             }
244             System.exit(-1);            
245         }
246     }
247 
248 
249 }