1 package ejava.examples.jmsnotifier;
2
3 import javax.jms.Connection;
4 import javax.jms.ConnectionFactory;
5 import javax.jms.Destination;
6 import javax.jms.JMSException;
7 import javax.jms.Message;
8 import javax.jms.MessageConsumer;
9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 import javax.jms.Topic;
12 import javax.naming.Context;
13 import javax.naming.InitialContext;
14
15 import org.apache.commons.logging.Log;
16 import org.apache.commons.logging.LogFactory;
17
18
19
20
21
22
23
24 public class Subscriber implements Runnable {
25 private static final Log log = LogFactory.getLog(Subscriber.class);
26 protected ConnectionFactory connFactory;
27 protected Destination destination;
28 protected boolean stop = false;
29 protected boolean stopped = false;
30 protected boolean started = false;
31 protected String name;
32 protected int count=0;
33 protected long sleepTime=0;
34 protected int maxCount=0;
35 protected boolean durable=false;
36 protected String selector=null;
37 protected String username;
38 protected String password;
39
40 public Subscriber(String name) {
41 this.name = name;
42 }
43 public void setConnFactory(ConnectionFactory connFactory) {
44 this.connFactory = connFactory;
45 }
46 public void setDestination(Destination destination) {
47 this.destination = destination;
48 }
49 public int getCount() {
50 return count;
51 }
52 public void setSleepTime(long sleepTime) {
53 this.sleepTime = sleepTime;
54 }
55 public void setMaxCount(int maxCount) {
56 this.maxCount = maxCount;
57 }
58 public void setDurable(boolean durable) {
59 this.durable = durable;
60 }
61 public void setSelector(String selector) {
62 this.selector = selector;
63 }
64 public void clearMessages() {
65 count = 0;
66 }
67 public void stop() {
68 this.stop = true;
69 }
70 public boolean isStopped() {
71 return stopped;
72 }
73 public boolean isStarted() {
74 return started;
75 }
76 public void setUsername(String username) {
77 this.username = username;
78 }
79 public void setPassword(String password) {
80 this.password = password;
81 }
82
83 public void execute() throws Exception {
84 Connection connection = null;
85 Session session = null;
86 MessageConsumer consumer = null;
87 try {
88 connection = username==null ?
89 connFactory.createConnection() :
90 connFactory.createConnection(username, password);
91 connection.setClientID(name);
92 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
93 if (durable == false) {
94 try { session.unsubscribe(name); }
95 catch (JMSException ignored) {}
96 consumer = session.createConsumer(destination, selector);
97 }
98 else {
99 consumer = session.createDurableSubscriber((Topic)destination,
100 name, selector, false);
101 }
102 connection.start();
103
104 stopped = stop = false;
105 log.info("subscriber " + name + " starting:" +
106 "durable=" + durable +
107 ", selector=" + selector);
108 started = true;
109 while (!stop && (maxCount==0 || count < maxCount)) {
110 Message message = consumer.receive(3000);
111 if (message != null) {
112 count += 1;
113 StringBuilder text = new StringBuilder();
114 text.append(name + " received message #" + count +
115 ", msgId=" + message.getJMSMessageID());
116 if (message instanceof TextMessage) {
117 text.append(", body="
118 +((TextMessage)message).getText());
119 }
120 log.debug(text.toString());
121 Thread.yield();
122 }
123 if (sleepTime > 0) {
124 log.debug("processing message for " + sleepTime + "msecs");
125 Thread.sleep(sleepTime);
126 }
127 }
128 log.info("subscriber " + name + " stopping");
129 connection.stop();
130 }
131 finally {
132 stopped = true;
133 started = false;
134 if (consumer != null) { consumer.close(); }
135 if (session!=null){ session.close();}
136 if (connection != null) { connection.close(); }
137 }
138 }
139
140 public void run() {
141 try {
142 execute();
143 }
144 catch (Exception ex) {
145 log.fatal("error running " + name, ex);
146 }
147 }
148
149 public static void main(String args[]) {
150 boolean noExit=false;
151 try {
152 String connFactoryJNDI=null;
153 String destinationJNDI=null;
154 String name="";
155 Long sleepTime=null;
156 Integer maxCount=null;
157 Boolean durable=null;
158 String selector=null;
159 String username=null;
160 String password=null;
161 for (int i=0; i<args.length; i++) {
162 if ("-jndi.name.connFactory".equals(args[i])) {
163 connFactoryJNDI = args[++i];
164 }
165 else if ("-jndi.name.destination".equals(args[i])) {
166 destinationJNDI=args[++i];
167 }
168 else if ("-name".equals(args[i])) {
169 name=args[++i];
170 }
171 else if ("-name".equals(args[i])) {
172 name=args[++i];
173 }
174 else if ("-sleep".equals(args[i])) {
175 sleepTime=new Long(args[++i]);
176 }
177 else if ("-max".equals(args[i])) {
178 maxCount=new Integer(args[++i]);
179 }
180 else if ("-durable".equals(args[i])) {
181 durable=new Boolean(args[++i]);
182 }
183 else if ("-selector".equals(args[i])) {
184 selector=args[++i];
185 }
186 else if ("-username".equals(args[i])) {
187 username=args[++i];
188 }
189 else if ("-password".equals(args[i])) {
190 password=args[++i];
191 }
192 else if ("-noExit".equals(args[i])) {
193 noExit=true;
194 }
195 }
196 if (connFactoryJNDI==null) {
197 throw new Exception("jndi.name.connFactory not supplied");
198 }
199 else if (destinationJNDI==null) {
200 throw new Exception("jndi.name.destination not supplied");
201 }
202 Subscriber subscriber = new Subscriber(name);
203 Context jndi = new InitialContext();
204 subscriber.setConnFactory(
205 (ConnectionFactory)jndi.lookup(connFactoryJNDI));
206 subscriber.setDestination((Destination)jndi.lookup(destinationJNDI));
207 if (maxCount!=null) {
208 subscriber.setMaxCount(maxCount);
209 }
210 if (sleepTime!=null) {
211 subscriber.setSleepTime(sleepTime);
212 }
213 if (durable!=null) {
214 subscriber.setDurable(durable);
215 }
216 if (selector!=null) {
217 subscriber.setSelector(selector);
218 }
219 subscriber.setUsername(username);
220 subscriber.setPassword(password);
221 subscriber.execute();
222 }
223 catch (Exception ex) {
224 log.fatal("",ex);
225 System.exit(-1);
226 if (noExit) {
227 throw new RuntimeException("error in subscriber", ex);
228 }
229 System.exit(-1);
230 }
231 }
232 }