1 package ejava.examples.jmsscheduler;
2
3 import java.util.HashMap;
4 import java.util.Map;
5
6 import javax.jms.Connection;
7 import javax.jms.ConnectionFactory;
8 import javax.jms.Destination;
9 import javax.jms.MapMessage;
10 import javax.jms.Message;
11 import javax.jms.MessageConsumer;
12 import javax.jms.MessageListener;
13 import javax.jms.MessageProducer;
14 import javax.jms.Session;
15 import javax.naming.Context;
16 import javax.naming.InitialContext;
17
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20
21
22
23
24
25
26
27 public class Requestor implements Runnable, MessageListener {
28 private static final Log log = LogFactory.getLog(Requestor.class);
29 protected ConnectionFactory connFactory;
30 protected Destination requestQueue;
31 protected boolean stop = false;
32 protected boolean stopped = false;
33 protected boolean started = false;
34 protected int count=0;
35 protected String name;
36 protected long sleepTime=10000;
37 protected int maxCount=10;
38 protected Map<String, Message> requests = new HashMap<String,Message>();
39 protected int responseCount=0;
40 protected long startTime=0;
41 protected String username;
42 protected String password;
43
44 public Requestor(String name) {
45 this.name = name;
46 }
47 public void setConnFactory(ConnectionFactory connFactory) {
48 this.connFactory = connFactory;
49 }
50 public void setRequestQueue(Destination requestQueue) {
51 this.requestQueue = requestQueue;
52 }
53 public int getCount() {
54 return count;
55 }
56 public void setSleepTime(long sleepTime) {
57 this.sleepTime = sleepTime;
58 }
59 public void setMaxCount(int maxCount) {
60 this.maxCount = maxCount;
61 }
62 public void clearMessages() {
63 count = 0;
64 }
65 public void stop() {
66 this.stop = true;
67 }
68 public boolean isStopped() {
69 return stopped;
70 }
71 public boolean isStarted() {
72 return started;
73 }
74 protected Connection createConnection(ConnectionFactory connFactory)
75 throws Exception {
76 return username==null ?
77 connFactory.createConnection() :
78 connFactory.createConnection(username, password);
79 }
80 protected Destination getReplyTo(Session session) throws Exception {
81 return session.createTemporaryQueue();
82 }
83 public void setUsername(String username) {
84 this.username = username;
85 }
86 public void setPassword(String password) {
87 this.password = password;
88 }
89 public void execute() throws Exception {
90 Connection connection = null;
91 Session session = null;
92 MessageProducer producer = null;
93 try {
94 connection = createConnection(connFactory);
95 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
96 producer = session.createProducer(requestQueue);
97 Destination replyTo = getReplyTo(session);
98 MessageConsumer consumer = session.createConsumer(replyTo);
99 consumer.setMessageListener(this);
100 connection.start();
101 stopped = stop = false;
102
103 log.info("requester " + name + " starting: " +
104 "maxCount=" + maxCount +
105 ", sleepTime" + sleepTime);
106 started = true;
107 startTime=System.currentTimeMillis();
108 while (!stop && (maxCount==0 || count < maxCount)) {
109 MapMessage message = session.createMapMessage();
110 message.setIntProperty("count", ++count);
111 message.setInt("difficulty", count % 10);
112 message.setJMSReplyTo(replyTo);
113 synchronized (requests) {
114 producer.send(message);
115 requests.put(message.getJMSMessageID(), message);
116 }
117 if (sleepTime>=1000 || (count % 100==0)) {
118 log.debug("published message(" + count + "):" +
119 message.getJMSMessageID());
120 log.debug("outstanding requests=" + requests.size());
121 }
122 Thread.sleep(sleepTime);
123 }
124 log.info("requester " + name + " stopping, count=" + count);
125 while (requests.size() > 0) {
126 log.debug("waiting for " + requests.size() +
127 " outstanding responses");
128 log.trace("requests=" + requests);
129 Thread.sleep(3000);
130 }
131 connection.stop();
132 }
133 finally {
134 stopped = true;
135 started = false;
136 if (producer != null) { producer.close(); }
137 if (session!=null){ session.close();}
138 if (connection != null) { connection.close(); }
139 }
140 }
141
142 public void run() {
143 try {
144 execute();
145 }
146 catch (Exception ex) {
147 log.fatal("error running " + name, ex);
148 }
149 }
150
151
152
153
154
155 public void onMessage(Message message) {
156 try {
157 String correlationID = message.getJMSCorrelationID();
158 Message request=null;
159 synchronized (requests) {
160 request = requests.remove(correlationID);
161 }
162
163 if (request != null) {
164 responseCount += 1;
165 String worker = message.getStringProperty("worker");
166
167 if (sleepTime>=1000 || (responseCount % 100==0)) {
168 log.debug("recieved response for:" +
169 request.getIntProperty("count") +
170 ", from " + worker +
171 ", outstanding=" + requests.size());
172 }
173 }
174 else {
175 log.warn("received unexpected response:" + correlationID);
176 }
177 } catch (Exception ex) {
178 log.info("error processing message", ex);
179 }
180 }
181
182 public static void main(String args[]) {
183 boolean noExit=false;
184 try {
185 System.out.print("Requestor args:");
186 for (String s: args) {
187 System.out.print(s + " ");
188 }
189 System.out.println();
190 String connFactoryJNDI=null;
191 String requestQueueJNDI=null;
192 String name="";
193 Long sleepTime=null;
194 Integer maxCount=null;
195 String username=null;
196 String password=null;
197 for (int i=0; i<args.length; i++) {
198 if ("-jndi.name.connFactory".equals(args[i])) {
199 connFactoryJNDI = args[++i];
200 }
201 else if ("-jndi.name.requestQueue".equals(args[i])) {
202 requestQueueJNDI=args[++i];
203 }
204 else if ("-name".equals(args[i])) {
205 name=args[++i];
206 }
207 else if ("-sleep".equals(args[i])) {
208 sleepTime=new Long(args[++i]);
209 }
210 else if ("-max".equals(args[i])) {
211 maxCount=new Integer(args[++i]);
212 }
213 else if ("-username".equals(args[i])) {
214 username=args[++i];
215 }
216 else if ("-password".equals(args[i])) {
217 password=args[++i];
218 }
219 else if ("-noExit".equals(args[i])) {
220 noExit=true;
221 }
222 }
223 if (connFactoryJNDI==null) {
224 throw new Exception("jndi.name.connFactory not supplied");
225 }
226 else if (requestQueueJNDI==null) {
227 throw new Exception("jndi.name.requestQueue not supplied");
228 }
229 Requestor requestor = new Requestor(name);
230 Context jndi = new InitialContext();
231 requestor.setConnFactory(
232 (ConnectionFactory)jndi.lookup(connFactoryJNDI));
233 requestor.setRequestQueue((Destination)jndi.lookup(requestQueueJNDI));
234 if (maxCount!=null) {
235 requestor.setMaxCount(maxCount);
236 }
237 if (sleepTime!=null) {
238 requestor.setSleepTime(sleepTime);
239 }
240 requestor.setUsername(username);
241 requestor.setPassword(password);
242 requestor.execute();
243 }
244 catch (Exception ex) {
245 log.fatal("",ex);
246 if (noExit) {
247 throw new RuntimeException("requestor error", ex);
248 }
249 System.exit(-1);
250 }
251 }
252 }