View Javadoc
1   package ejava.examples.jmsscheduler;
2   
3   import java.util.HashMap;
4   import java.util.Map;
5   
6   import javax.jms.Connection;
7   import javax.jms.ConnectionFactory;
8   import javax.jms.Destination;
9   import javax.jms.MapMessage;
10  import javax.jms.Message;
11  import javax.jms.MessageConsumer;
12  import javax.jms.MessageListener;
13  import javax.jms.MessageProducer;
14  import javax.jms.Session;
15  import javax.naming.Context;
16  import javax.naming.InitialContext;
17  
18  import org.apache.commons.logging.Log;
19  import org.apache.commons.logging.LogFactory;
20  
21  /**
22   * This is used to simulate work being tasked to a scheduling queue. Each 
23   * request will be tracked for a result.
24   *
25   * @author jcstaff
26   */
27  public class Requestor implements Runnable, MessageListener {
28      private static final Log log = LogFactory.getLog(Requestor.class);
29      protected ConnectionFactory connFactory;
30      protected Destination requestQueue;
31      protected boolean stop = false;
32      protected boolean stopped = false;
33      protected boolean started = false;
34      protected int count=0;
35      protected String name;
36      protected long sleepTime=10000;
37      protected int maxCount=10;
38      protected Map<String, Message> requests = new HashMap<String,Message>();
39      protected int responseCount=0;
40      protected long startTime=0;
41      protected String username;
42      protected String password;
43          
44      public Requestor(String name) {
45          this.name = name;
46      }
47      public void setConnFactory(ConnectionFactory connFactory) {
48          this.connFactory = connFactory;
49      }
50      public void setRequestQueue(Destination requestQueue) {
51          this.requestQueue = requestQueue;
52      }    
53      public int getCount() {
54          return count;
55      }
56      public void setSleepTime(long sleepTime) {
57          this.sleepTime = sleepTime;
58      }
59      public void setMaxCount(int maxCount) {
60          this.maxCount = maxCount;
61      }
62      public void clearMessages() {
63          count = 0;
64      }
65      public void stop() {
66          this.stop = true;
67      }
68      public boolean isStopped() {
69          return stopped;
70      }
71      public boolean isStarted() {
72          return started;
73      }
74      protected Connection createConnection(ConnectionFactory connFactory) 
75          throws Exception {
76          return username==null ? 
77          		connFactory.createConnection() :
78          		connFactory.createConnection(username, password);
79      }
80      protected Destination getReplyTo(Session session) throws Exception {
81          return session.createTemporaryQueue();
82      }
83      public void setUsername(String username) {
84  		this.username = username;
85  	}
86      public void setPassword(String password) {
87  		this.password = password;
88  	}
89      public void execute() throws Exception {
90          Connection connection = null;
91          Session session = null;
92          MessageProducer producer = null;
93          try {
94              connection = createConnection(connFactory);
95              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
96              producer = session.createProducer(requestQueue);
97              Destination replyTo = getReplyTo(session);
98              MessageConsumer consumer = session.createConsumer(replyTo);
99              consumer.setMessageListener(this);
100             connection.start();
101             stopped = stop = false;
102 
103             log.info("requester " + name + " starting: " +
104                     "maxCount=" + maxCount +
105                     ", sleepTime" + sleepTime);
106             started = true;
107             startTime=System.currentTimeMillis();
108             while (!stop && (maxCount==0 || count < maxCount)) {
109                 MapMessage message = session.createMapMessage();
110                 message.setIntProperty("count", ++count);
111                 message.setInt("difficulty", count % 10);
112                 message.setJMSReplyTo(replyTo);
113                 synchronized (requests) {
114                 	producer.send(message);
115                     requests.put(message.getJMSMessageID(), message);
116                 }
117                 if (sleepTime>=1000 || (count % 100==0)) {
118                     log.debug("published message(" + count + "):" + 
119                             message.getJMSMessageID());
120                     log.debug("outstanding requests=" + requests.size());
121                 }
122                 Thread.sleep(sleepTime);
123             }
124             log.info("requester " + name + " stopping, count=" + count);
125             while (requests.size() > 0) {
126                 log.debug("waiting for " + requests.size() +  
127                           " outstanding responses");
128                 log.trace("requests=" + requests);
129                 Thread.sleep(3000);
130             }
131             connection.stop();
132         }
133         finally {
134             stopped = true;
135             started = false;
136             if (producer != null)   { producer.close(); }
137             if (session!=null){ session.close();}
138             if (connection != null) { connection.close(); }
139         }
140     }
141     
142     public void run() {
143         try {
144             execute();
145         }
146         catch (Exception ex) {
147             log.fatal("error running " + name, ex);
148         }
149     }    
150 
151     /**
152      * This method is used to asynchronously receive the responses to 
153      * requests sent by the main loop.
154      */
155     public void onMessage(Message message) {
156         try {
157             String correlationID = message.getJMSCorrelationID();
158             Message request=null;
159             synchronized (requests) {
160                 request = requests.remove(correlationID);    
161             }        
162 
163             if (request != null) {
164                 responseCount += 1;
165                 String worker = message.getStringProperty("worker");
166 
167                 if (sleepTime>=1000 || (responseCount % 100==0)) {
168                     log.debug("recieved response for:" + 
169                             request.getIntProperty("count") +
170                             ", from " + worker + 
171                             ", outstanding=" + requests.size());
172                 }
173             }
174             else {
175                 log.warn("received unexpected response:" + correlationID);
176             }
177         } catch (Exception ex) {
178             log.info("error processing message", ex);
179         }
180     }
181 
182     public static void main(String args[]) {
183         boolean noExit=false;
184         try {
185             System.out.print("Requestor args:");
186             for (String s: args) {
187                 System.out.print(s + " ");
188             }
189             System.out.println();
190             String connFactoryJNDI=null;
191             String requestQueueJNDI=null;
192             String name="";
193             Long sleepTime=null;
194             Integer maxCount=null;
195             String username=null;
196             String password=null;
197              for (int i=0; i<args.length; i++) {
198                 if ("-jndi.name.connFactory".equals(args[i])) {
199                     connFactoryJNDI = args[++i];
200                 }
201                 else if ("-jndi.name.requestQueue".equals(args[i])) {
202                     requestQueueJNDI=args[++i];
203                 }
204                 else if ("-name".equals(args[i])) {
205                     name=args[++i];
206                 }
207                 else if ("-sleep".equals(args[i])) {
208                     sleepTime=new Long(args[++i]);
209                 }
210                 else if ("-max".equals(args[i])) {
211                     maxCount=new Integer(args[++i]);
212                 }
213                 else if ("-username".equals(args[i])) {
214                     username=args[++i];
215                 }
216                 else if ("-password".equals(args[i])) {
217                     password=args[++i];
218                 }
219                 else if ("-noExit".equals(args[i])) {
220                     noExit=true;
221                 }
222             }
223             if (connFactoryJNDI==null) { 
224                 throw new Exception("jndi.name.connFactory not supplied");
225             }
226             else if (requestQueueJNDI==null) {
227                 throw new Exception("jndi.name.requestQueue not supplied");
228             }            
229             Requestor requestor = new Requestor(name);
230             Context jndi = new InitialContext();
231             requestor.setConnFactory(
232                     (ConnectionFactory)jndi.lookup(connFactoryJNDI));
233             requestor.setRequestQueue((Destination)jndi.lookup(requestQueueJNDI));
234             if (maxCount!=null) {
235                 requestor.setMaxCount(maxCount);
236             }
237             if (sleepTime!=null) {
238                 requestor.setSleepTime(sleepTime);
239             }
240             requestor.setUsername(username);
241             requestor.setPassword(password);
242             requestor.execute();
243         }
244         catch (Exception ex) {
245             log.fatal("",ex);
246             if (noExit) {
247             	throw new RuntimeException("requestor error", ex);
248             }
249             System.exit(-1);            
250         }
251     }
252 }