View Javadoc
1   package ejava.examples.jmsnotifier;
2   
3   import javax.jms.ConnectionFactory;
4   import javax.jms.Destination;
5   import javax.jms.InvalidDestinationRuntimeException;
6   import javax.jms.JMSConsumer;
7   import javax.jms.JMSContext;
8   import javax.jms.Message;
9   import javax.jms.Session;
10  import javax.jms.TextMessage;
11  import javax.jms.Topic;
12  import javax.naming.Context;
13  import javax.naming.InitialContext;
14  
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  /**
19   * This is used to listen to messages on a destination. You can control the
20   * durability (topics only) and selector used using the properties.
21   */
22  public class Subscriber implements Runnable {
23      private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);
24      protected ConnectionFactory connFactory;
25      protected Destination destination;
26      protected boolean stop = false;
27      protected boolean stopped = false;
28      protected boolean started = false;
29      protected String name;
30      protected int limitCount=0;
31      protected long sleepTime=0;
32      protected int maxCount=0;
33      protected boolean durable=false;
34      protected String selector=null;
35      protected String username;
36      protected String password;
37          
38      public Subscriber(String name) {
39          this.name = name;
40      }
41      public void setConnFactory(ConnectionFactory connFactory) {
42          this.connFactory = connFactory;
43      }
44      public void setDestination(Destination destination) {
45          this.destination = destination;
46      }    
47      public int getCount() {
48          return limitCount;
49      }
50      public void setSleepTime(long sleepTime) {
51          this.sleepTime = sleepTime;
52      }
53      public void setMaxCount(int maxCount) {
54          this.maxCount = maxCount;
55      }
56      public void setDurable(boolean durable) {
57          this.durable = durable;
58      }
59      public void setSelector(String selector) {
60          this.selector = selector;
61      }
62      public void clearMessages() {
63          limitCount = 0;
64      }
65      public void stop() {
66          this.stop = true;
67      }
68      public boolean isStopped() {
69          return stopped;
70      }
71      public boolean isStarted() {
72          return started;
73      }
74      public void setUsername(String username) {
75  		this.username = username;
76  	}
77      public void setPassword(String password) {
78  		this.password = password;
79  	}
80  
81      private JMSContext createContext(Integer sessionMode) {
82          if (sessionMode!=null) {
83              return username==null ?
84                      connFactory.createContext(sessionMode) :
85                      connFactory.createContext(username, password, sessionMode);            
86          } else {
87              return username==null ?
88                      connFactory.createContext() :
89                      connFactory.createContext(username, password);
90          }
91      }
92      
93      private JMSConsumer createConsumer(JMSContext context) {
94          if (durable == false) {                
95              try { context.unsubscribe(name); }
96              catch (InvalidDestinationRuntimeException ex) {}
97              return context.createConsumer(destination, selector);                
98          }
99          else {
100             return context.createDurableConsumer((Topic)destination, 
101                                                      name, selector, false);
102         }                    
103     }
104 
105     public void execute() throws Exception {
106         try (JMSContext context=createContext(Session.AUTO_ACKNOWLEDGE)) {
107             context.setClientID(name);
108             
109             try (JMSConsumer consumer=createConsumer(context)) {
110                 context.start();
111                 stopped = stop = false;
112                 logger.info("subscriber {} starting: durable={}, selector={}", 
113                         name, durable, selector);
114                 started = true;
115                 
116                 while (!stop && (maxCount==0 || limitCount < maxCount)) {
117                     Message message = consumer.receive(3000);
118                     if (message != null) {
119                         limitCount += 1;
120                         Object countProp = message.getObjectProperty("count");
121                         StringBuilder text = new StringBuilder();
122                         text.append(name + " received message #" + limitCount +
123                                 ", msgId=" + message.getJMSMessageID() +
124                                 ", count property=" + countProp);
125                         if (message instanceof TextMessage) {
126                             text.append(", body=" 
127                                     +((TextMessage)message).getText());
128                         }
129                         logger.debug(text.toString());
130                         Thread.yield();
131                     }      
132                     if (sleepTime > 0) {
133                         logger.debug("processing message for {}msecs", sleepTime);
134                         Thread.sleep(sleepTime);
135                     }
136                 }
137             }
138 
139             logger.info("subscriber " + name + " stopping");
140             context.stop();
141         }
142         finally {
143             stopped = true;
144             started = false;
145         }
146     }
147     
148     public void run() {
149         try {
150             execute();
151         }
152         catch (Exception ex) {
153             logger.error("error running " + name, ex);
154         }
155     }    
156 
157     public static void main(String args[]) {
158     	boolean noExit=false;
159         try {
160             String connFactoryJNDI=null;
161             String destinationJNDI=null;
162             String name="";
163             Long sleepTime=null;
164             Integer maxCount=null;
165             Boolean durable=null;
166             String selector=null;
167             String username=null;
168             String password=null;
169             for (int i=0; i<args.length; i++) {
170                 if ("-jndi.name.connFactory".equals(args[i])) {
171                     connFactoryJNDI = args[++i];
172                 }
173                 else if ("-jndi.name.destination".equals(args[i])) {
174                     destinationJNDI=args[++i];
175                 }
176                 else if ("-name".equals(args[i])) {
177                     name=args[++i];
178                 }
179                 else if ("-name".equals(args[i])) {
180                     name=args[++i];
181                 }
182                 else if ("-sleep".equals(args[i])) {
183                     sleepTime=new Long(args[++i]);
184                 }
185                 else if ("-max".equals(args[i])) {
186                     maxCount=new Integer(args[++i]);
187                 }
188                 else if ("-durable".equals(args[i])) {
189                     durable=new Boolean(args[++i]);
190                 }
191                 else if ("-selector".equals(args[i])) {
192                     selector=args[++i];
193                 }
194                 else if ("-username".equals(args[i])) {
195                 	username=args[++i];
196                 }
197                 else if ("-password".equals(args[i])) {
198                 	password=args[++i];
199                 }
200                 else if ("-noExit".equals(args[i])) {
201                 	noExit=true;
202                 }
203             }
204             if (connFactoryJNDI==null) { 
205                 throw new Exception("jndi.name.connFactory not supplied");
206             }
207             else if (destinationJNDI==null) {
208                 throw new Exception("jndi.name.destination not supplied");
209             }            
210             Subscriber subscriber = new Subscriber(name);
211             Context jndi = new InitialContext();
212             subscriber.setConnFactory(
213                     (ConnectionFactory)jndi.lookup(connFactoryJNDI));
214             subscriber.setDestination((Destination)jndi.lookup(destinationJNDI));
215             if (maxCount!=null) {
216                 subscriber.setMaxCount(maxCount);
217             }
218             if (sleepTime!=null) {
219                 subscriber.setSleepTime(sleepTime);
220             }
221             if (durable!=null) {
222                 subscriber.setDurable(durable);
223             }
224             if (selector!=null) {
225                 subscriber.setSelector(selector);
226             }
227             subscriber.setUsername(username);
228             subscriber.setPassword(password);
229             subscriber.execute();
230         }
231         catch (Exception ex) {
232             logger.error("",ex);
233             System.exit(-1);            
234             if (noExit) {
235             	throw new RuntimeException("error in subscriber", ex);
236             }
237             System.exit(-1);
238         }
239     }
240 }