1 package ejava.examples.jmsnotifier;
2
3 import javax.jms.ConnectionFactory;
4 import javax.jms.Destination;
5 import javax.jms.InvalidDestinationRuntimeException;
6 import javax.jms.JMSConsumer;
7 import javax.jms.JMSContext;
8 import javax.jms.Message;
9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 import javax.jms.Topic;
12 import javax.naming.Context;
13 import javax.naming.InitialContext;
14
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18
19
20
21
22 public class Subscriber implements Runnable {
23 private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);
24 protected ConnectionFactory connFactory;
25 protected Destination destination;
26 protected boolean stop = false;
27 protected boolean stopped = false;
28 protected boolean started = false;
29 protected String name;
30 protected int limitCount=0;
31 protected long sleepTime=0;
32 protected int maxCount=0;
33 protected boolean durable=false;
34 protected String selector=null;
35 protected String username;
36 protected String password;
37
38 public Subscriber(String name) {
39 this.name = name;
40 }
41 public void setConnFactory(ConnectionFactory connFactory) {
42 this.connFactory = connFactory;
43 }
44 public void setDestination(Destination destination) {
45 this.destination = destination;
46 }
47 public int getCount() {
48 return limitCount;
49 }
50 public void setSleepTime(long sleepTime) {
51 this.sleepTime = sleepTime;
52 }
53 public void setMaxCount(int maxCount) {
54 this.maxCount = maxCount;
55 }
56 public void setDurable(boolean durable) {
57 this.durable = durable;
58 }
59 public void setSelector(String selector) {
60 this.selector = selector;
61 }
62 public void clearMessages() {
63 limitCount = 0;
64 }
65 public void stop() {
66 this.stop = true;
67 }
68 public boolean isStopped() {
69 return stopped;
70 }
71 public boolean isStarted() {
72 return started;
73 }
74 public void setUsername(String username) {
75 this.username = username;
76 }
77 public void setPassword(String password) {
78 this.password = password;
79 }
80
81 private JMSContext createContext(Integer sessionMode) {
82 if (sessionMode!=null) {
83 return username==null ?
84 connFactory.createContext(sessionMode) :
85 connFactory.createContext(username, password, sessionMode);
86 } else {
87 return username==null ?
88 connFactory.createContext() :
89 connFactory.createContext(username, password);
90 }
91 }
92
93 private JMSConsumer createConsumer(JMSContext context) {
94 if (durable == false) {
95 try { context.unsubscribe(name); }
96 catch (InvalidDestinationRuntimeException ex) {}
97 return context.createConsumer(destination, selector);
98 }
99 else {
100 return context.createDurableConsumer((Topic)destination,
101 name, selector, false);
102 }
103 }
104
105 public void execute() throws Exception {
106 try (JMSContext context=createContext(Session.AUTO_ACKNOWLEDGE)) {
107 context.setClientID(name);
108
109 try (JMSConsumer consumer=createConsumer(context)) {
110 context.start();
111 stopped = stop = false;
112 logger.info("subscriber {} starting: durable={}, selector={}",
113 name, durable, selector);
114 started = true;
115
116 while (!stop && (maxCount==0 || limitCount < maxCount)) {
117 Message message = consumer.receive(3000);
118 if (message != null) {
119 limitCount += 1;
120 Object countProp = message.getObjectProperty("count");
121 StringBuilder text = new StringBuilder();
122 text.append(name + " received message #" + limitCount +
123 ", msgId=" + message.getJMSMessageID() +
124 ", count property=" + countProp);
125 if (message instanceof TextMessage) {
126 text.append(", body="
127 +((TextMessage)message).getText());
128 }
129 logger.debug(text.toString());
130 Thread.yield();
131 }
132 if (sleepTime > 0) {
133 logger.debug("processing message for {}msecs", sleepTime);
134 Thread.sleep(sleepTime);
135 }
136 }
137 }
138
139 logger.info("subscriber " + name + " stopping");
140 context.stop();
141 }
142 finally {
143 stopped = true;
144 started = false;
145 }
146 }
147
148 public void run() {
149 try {
150 execute();
151 }
152 catch (Exception ex) {
153 logger.error("error running " + name, ex);
154 }
155 }
156
157 public static void main(String args[]) {
158 boolean noExit=false;
159 try {
160 String connFactoryJNDI=null;
161 String destinationJNDI=null;
162 String name="";
163 Long sleepTime=null;
164 Integer maxCount=null;
165 Boolean durable=null;
166 String selector=null;
167 String username=null;
168 String password=null;
169 for (int i=0; i<args.length; i++) {
170 if ("-jndi.name.connFactory".equals(args[i])) {
171 connFactoryJNDI = args[++i];
172 }
173 else if ("-jndi.name.destination".equals(args[i])) {
174 destinationJNDI=args[++i];
175 }
176 else if ("-name".equals(args[i])) {
177 name=args[++i];
178 }
179 else if ("-name".equals(args[i])) {
180 name=args[++i];
181 }
182 else if ("-sleep".equals(args[i])) {
183 sleepTime=new Long(args[++i]);
184 }
185 else if ("-max".equals(args[i])) {
186 maxCount=new Integer(args[++i]);
187 }
188 else if ("-durable".equals(args[i])) {
189 durable=new Boolean(args[++i]);
190 }
191 else if ("-selector".equals(args[i])) {
192 selector=args[++i];
193 }
194 else if ("-username".equals(args[i])) {
195 username=args[++i];
196 }
197 else if ("-password".equals(args[i])) {
198 password=args[++i];
199 }
200 else if ("-noExit".equals(args[i])) {
201 noExit=true;
202 }
203 }
204 if (connFactoryJNDI==null) {
205 throw new Exception("jndi.name.connFactory not supplied");
206 }
207 else if (destinationJNDI==null) {
208 throw new Exception("jndi.name.destination not supplied");
209 }
210 Subscriber subscriber = new Subscriber(name);
211 Context jndi = new InitialContext();
212 subscriber.setConnFactory(
213 (ConnectionFactory)jndi.lookup(connFactoryJNDI));
214 subscriber.setDestination((Destination)jndi.lookup(destinationJNDI));
215 if (maxCount!=null) {
216 subscriber.setMaxCount(maxCount);
217 }
218 if (sleepTime!=null) {
219 subscriber.setSleepTime(sleepTime);
220 }
221 if (durable!=null) {
222 subscriber.setDurable(durable);
223 }
224 if (selector!=null) {
225 subscriber.setSelector(selector);
226 }
227 subscriber.setUsername(username);
228 subscriber.setPassword(password);
229 subscriber.execute();
230 }
231 catch (Exception ex) {
232 logger.error("",ex);
233 System.exit(-1);
234 if (noExit) {
235 throw new RuntimeException("error in subscriber", ex);
236 }
237 System.exit(-1);
238 }
239 }
240 }