View Javadoc
1   package ejava.examples.jmsscheduler;
2   
3   import java.util.HashMap;
4   import java.util.Map;
5   
6   import javax.jms.ConnectionFactory;
7   import javax.jms.Destination;
8   import javax.jms.JMSConsumer;
9   import javax.jms.JMSContext;
10  import javax.jms.JMSProducer;
11  import javax.jms.MapMessage;
12  import javax.jms.Message;
13  import javax.jms.MessageListener;
14  import javax.jms.Session;
15  import javax.naming.Context;
16  import javax.naming.InitialContext;
17  
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  
21  /**
22   * This is used to simulate work being tasked to a scheduling queue. Each 
23   * request will be tracked for a result.
24   */
25  public class Requestor implements Runnable, MessageListener {
26      private static final Logger logger = LoggerFactory.getLogger(Requestor.class);
27      protected ConnectionFactory connFactory;
28      protected Destination requestQueue;
29      protected boolean stop = false;
30      protected boolean stopped = false;
31      protected boolean started = false;
32      protected int count=0;
33      protected String name;
34      protected long sleepTime=10000;
35      protected int maxCount=10;
36      protected Map<String, Message> requests = new HashMap<String,Message>();
37      protected int responseCount=0;
38      protected long startTime=0;
39      protected String username;
40      protected String password;
41          
42      public Requestor(String name) {
43          this.name = name;
44      }
45      public void setConnFactory(ConnectionFactory connFactory) {
46          this.connFactory = connFactory;
47      }
48      public void setRequestQueue(Destination requestQueue) {
49          this.requestQueue = requestQueue;
50      }    
51      public int getCount() {
52          return count;
53      }
54      public void setSleepTime(long sleepTime) {
55          this.sleepTime = sleepTime;
56      }
57      public void setMaxCount(int maxCount) {
58          this.maxCount = maxCount;
59      }
60      public void clearMessages() {
61          count = 0;
62      }
63      public void stop() {
64          this.stop = true;
65      }
66      public boolean isStopped() {
67          return stopped;
68      }
69      public boolean isStarted() {
70          return started;
71      }
72      protected JMSContext createContext(Integer sessionMode) throws Exception {
73          if (sessionMode==null) {
74              return username==null ? 
75              		connFactory.createContext() :
76              		connFactory.createContext(username, password);
77          } else {
78              return username==null ? 
79                      connFactory.createContext(sessionMode) :
80                      connFactory.createContext(username, password, sessionMode);            
81          }
82      }
83      public void setUsername(String username) {
84  		this.username = username;
85  	}
86      public void setPassword(String password) {
87  		this.password = password;
88  	}
89      public void execute() throws Exception {
90          try (JMSContext context=createContext(Session.AUTO_ACKNOWLEDGE)) {
91              JMSProducer producer = context.createProducer();
92              Destination replyTo = context.createTemporaryQueue();
93              
94              try (JMSConsumer consumer = context.createConsumer(replyTo)) {
95                  consumer.setMessageListener(this);
96                  context.start();
97                  stopped = stop = false;
98                  
99                  logger.info("requester {} starting: maxCount={}, sleepTime {}", name, maxCount, sleepTime);
100                 started = true;
101                 startTime=System.currentTimeMillis();
102                 
103                 while (!stop && (maxCount==0 || count < maxCount)) {
104                     MapMessage message = context.createMapMessage();
105                     message.setIntProperty("count", ++count);
106                     message.setInt("difficulty", count % 10);
107                     message.setJMSReplyTo(replyTo);
108                     synchronized (requests) {
109                         producer.send(requestQueue, message);
110                         requests.put(message.getJMSMessageID(), message);
111                     }
112                     if (sleepTime>=1000 || (count % 100==0)) {
113                         logger.debug("published message(" + count + "):" + 
114                                 message.getJMSMessageID());
115                         logger.debug("outstanding requests=" + requests.size());
116                     }
117                     Thread.sleep(sleepTime);
118                 }
119                 
120                 logger.info("requester {} stopping, count={}", name, count);
121                 while (requests.size() > 0) {
122                     logger.debug("waiting for {} outstanding responses", requests.size());
123                     logger.trace("requests={}", requests);
124                     Thread.sleep(3000);
125                 }
126                 context.stop();
127             }
128 
129         }
130         finally {
131             stopped = true;
132             started = false;
133         }
134     }
135     
136     public void run() {
137         try {
138             execute();
139         }
140         catch (Exception ex) {
141             logger.error("error running {}", name, ex);
142         }
143     }    
144 
145     /**
146      * This method is used to asynchronously receive the responses to 
147      * requests sent by the main loop.
148      */
149     public void onMessage(Message message) {
150         try {
151             String correlationID = message.getJMSCorrelationID();
152             Message request=null;
153             synchronized (requests) {
154                 request = requests.remove(correlationID);    
155             }        
156 
157             if (request != null) {
158                 responseCount += 1;
159                 String worker = message.getStringProperty("worker");
160 
161                 if (sleepTime>=1000 || (responseCount % 100==0)) {
162                     logger.debug("recieved response for:{}, from {}, outstanding={}", 
163                             request.getIntProperty("count"), worker, requests.size());
164                 }
165             }
166             else {
167                 logger.warn("received unexpected response:{}" + correlationID);
168             }
169         } catch (Exception ex) {
170             logger.info("error processing message", ex);
171         }
172     }
173 
174     public static void main(String args[]) {
175         boolean noExit=false;
176         try {
177             System.out.print("Requestor args:");
178             for (String s: args) {
179                 System.out.print(s + " ");
180             }
181             System.out.println();
182             String connFactoryJNDI=null;
183             String requestQueueJNDI=null;
184             String name="";
185             Long sleepTime=null;
186             Integer maxCount=null;
187             String username=null;
188             String password=null;
189              for (int i=0; i<args.length; i++) {
190                 if ("-jndi.name.connFactory".equals(args[i])) {
191                     connFactoryJNDI = args[++i];
192                 }
193                 else if ("-jndi.name.requestQueue".equals(args[i])) {
194                     requestQueueJNDI=args[++i];
195                 }
196                 else if ("-name".equals(args[i])) {
197                     name=args[++i];
198                 }
199                 else if ("-sleep".equals(args[i])) {
200                     sleepTime=new Long(args[++i]);
201                 }
202                 else if ("-max".equals(args[i])) {
203                     maxCount=new Integer(args[++i]);
204                 }
205                 else if ("-username".equals(args[i])) {
206                     username=args[++i];
207                 }
208                 else if ("-password".equals(args[i])) {
209                     password=args[++i];
210                 }
211                 else if ("-noExit".equals(args[i])) {
212                     noExit=true;
213                 }
214             }
215             if (connFactoryJNDI==null) { 
216                 throw new Exception("jndi.name.connFactory not supplied");
217             }
218             else if (requestQueueJNDI==null) {
219                 throw new Exception("jndi.name.requestQueue not supplied");
220             }            
221             Requestor requestor = new Requestor(name);
222             Context jndi = new InitialContext();
223             requestor.setConnFactory(
224                     (ConnectionFactory)jndi.lookup(connFactoryJNDI));
225             requestor.setRequestQueue((Destination)jndi.lookup(requestQueueJNDI));
226             if (maxCount!=null) {
227                 requestor.setMaxCount(maxCount);
228             }
229             if (sleepTime!=null) {
230                 requestor.setSleepTime(sleepTime);
231             }
232             requestor.setUsername(username);
233             requestor.setPassword(password);
234             requestor.execute();
235         }
236         catch (Exception ex) {
237             logger.error("",ex);
238             if (noExit) {
239                 throw new RuntimeException("requestor error", ex);
240             }
241             System.exit(-1);            
242         }
243     }
244 }