1 package ejava.examples.jmsscheduler;
2
3 import java.util.HashMap;
4 import java.util.Map;
5
6 import javax.jms.ConnectionFactory;
7 import javax.jms.Destination;
8 import javax.jms.JMSConsumer;
9 import javax.jms.JMSContext;
10 import javax.jms.JMSProducer;
11 import javax.jms.MapMessage;
12 import javax.jms.Message;
13 import javax.jms.MessageListener;
14 import javax.jms.Session;
15 import javax.naming.Context;
16 import javax.naming.InitialContext;
17
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21
22
23
24
25 public class Requestor implements Runnable, MessageListener {
26 private static final Logger logger = LoggerFactory.getLogger(Requestor.class);
27 protected ConnectionFactory connFactory;
28 protected Destination requestQueue;
29 protected boolean stop = false;
30 protected boolean stopped = false;
31 protected boolean started = false;
32 protected int count=0;
33 protected String name;
34 protected long sleepTime=10000;
35 protected int maxCount=10;
36 protected Map<String, Message> requests = new HashMap<String,Message>();
37 protected int responseCount=0;
38 protected long startTime=0;
39 protected String username;
40 protected String password;
41
42 public Requestor(String name) {
43 this.name = name;
44 }
45 public void setConnFactory(ConnectionFactory connFactory) {
46 this.connFactory = connFactory;
47 }
48 public void setRequestQueue(Destination requestQueue) {
49 this.requestQueue = requestQueue;
50 }
51 public int getCount() {
52 return count;
53 }
54 public void setSleepTime(long sleepTime) {
55 this.sleepTime = sleepTime;
56 }
57 public void setMaxCount(int maxCount) {
58 this.maxCount = maxCount;
59 }
60 public void clearMessages() {
61 count = 0;
62 }
63 public void stop() {
64 this.stop = true;
65 }
66 public boolean isStopped() {
67 return stopped;
68 }
69 public boolean isStarted() {
70 return started;
71 }
72 protected JMSContext createContext(Integer sessionMode) throws Exception {
73 if (sessionMode==null) {
74 return username==null ?
75 connFactory.createContext() :
76 connFactory.createContext(username, password);
77 } else {
78 return username==null ?
79 connFactory.createContext(sessionMode) :
80 connFactory.createContext(username, password, sessionMode);
81 }
82 }
83 public void setUsername(String username) {
84 this.username = username;
85 }
86 public void setPassword(String password) {
87 this.password = password;
88 }
89 public void execute() throws Exception {
90 try (JMSContext context=createContext(Session.AUTO_ACKNOWLEDGE)) {
91 JMSProducer producer = context.createProducer();
92 Destination replyTo = context.createTemporaryQueue();
93
94 try (JMSConsumer consumer = context.createConsumer(replyTo)) {
95 consumer.setMessageListener(this);
96 context.start();
97 stopped = stop = false;
98
99 logger.info("requester {} starting: maxCount={}, sleepTime {}", name, maxCount, sleepTime);
100 started = true;
101 startTime=System.currentTimeMillis();
102
103 while (!stop && (maxCount==0 || count < maxCount)) {
104 MapMessage message = context.createMapMessage();
105 message.setIntProperty("count", ++count);
106 message.setInt("difficulty", count % 10);
107 message.setJMSReplyTo(replyTo);
108 synchronized (requests) {
109 producer.send(requestQueue, message);
110 requests.put(message.getJMSMessageID(), message);
111 }
112 if (sleepTime>=1000 || (count % 100==0)) {
113 logger.debug("published message(" + count + "):" +
114 message.getJMSMessageID());
115 logger.debug("outstanding requests=" + requests.size());
116 }
117 Thread.sleep(sleepTime);
118 }
119
120 logger.info("requester {} stopping, count={}", name, count);
121 while (requests.size() > 0) {
122 logger.debug("waiting for {} outstanding responses", requests.size());
123 logger.trace("requests={}", requests);
124 Thread.sleep(3000);
125 }
126 context.stop();
127 }
128
129 }
130 finally {
131 stopped = true;
132 started = false;
133 }
134 }
135
136 public void run() {
137 try {
138 execute();
139 }
140 catch (Exception ex) {
141 logger.error("error running {}", name, ex);
142 }
143 }
144
145
146
147
148
149 public void onMessage(Message message) {
150 try {
151 String correlationID = message.getJMSCorrelationID();
152 Message request=null;
153 synchronized (requests) {
154 request = requests.remove(correlationID);
155 }
156
157 if (request != null) {
158 responseCount += 1;
159 String worker = message.getStringProperty("worker");
160
161 if (sleepTime>=1000 || (responseCount % 100==0)) {
162 logger.debug("recieved response for:{}, from {}, outstanding={}",
163 request.getIntProperty("count"), worker, requests.size());
164 }
165 }
166 else {
167 logger.warn("received unexpected response:{}" + correlationID);
168 }
169 } catch (Exception ex) {
170 logger.info("error processing message", ex);
171 }
172 }
173
174 public static void main(String args[]) {
175 boolean noExit=false;
176 try {
177 System.out.print("Requestor args:");
178 for (String s: args) {
179 System.out.print(s + " ");
180 }
181 System.out.println();
182 String connFactoryJNDI=null;
183 String requestQueueJNDI=null;
184 String name="";
185 Long sleepTime=null;
186 Integer maxCount=null;
187 String username=null;
188 String password=null;
189 for (int i=0; i<args.length; i++) {
190 if ("-jndi.name.connFactory".equals(args[i])) {
191 connFactoryJNDI = args[++i];
192 }
193 else if ("-jndi.name.requestQueue".equals(args[i])) {
194 requestQueueJNDI=args[++i];
195 }
196 else if ("-name".equals(args[i])) {
197 name=args[++i];
198 }
199 else if ("-sleep".equals(args[i])) {
200 sleepTime=new Long(args[++i]);
201 }
202 else if ("-max".equals(args[i])) {
203 maxCount=new Integer(args[++i]);
204 }
205 else if ("-username".equals(args[i])) {
206 username=args[++i];
207 }
208 else if ("-password".equals(args[i])) {
209 password=args[++i];
210 }
211 else if ("-noExit".equals(args[i])) {
212 noExit=true;
213 }
214 }
215 if (connFactoryJNDI==null) {
216 throw new Exception("jndi.name.connFactory not supplied");
217 }
218 else if (requestQueueJNDI==null) {
219 throw new Exception("jndi.name.requestQueue not supplied");
220 }
221 Requestor requestor = new Requestor(name);
222 Context jndi = new InitialContext();
223 requestor.setConnFactory(
224 (ConnectionFactory)jndi.lookup(connFactoryJNDI));
225 requestor.setRequestQueue((Destination)jndi.lookup(requestQueueJNDI));
226 if (maxCount!=null) {
227 requestor.setMaxCount(maxCount);
228 }
229 if (sleepTime!=null) {
230 requestor.setSleepTime(sleepTime);
231 }
232 requestor.setUsername(username);
233 requestor.setPassword(password);
234 requestor.execute();
235 }
236 catch (Exception ex) {
237 logger.error("",ex);
238 if (noExit) {
239 throw new RuntimeException("requestor error", ex);
240 }
241 System.exit(-1);
242 }
243 }
244 }