View Javadoc
1   package ejava.examples.jmsnotifier;
2   
3   import javax.jms.Connection;
4   import javax.jms.ConnectionFactory;
5   import javax.jms.Destination;
6   import javax.jms.JMSException;
7   import javax.jms.Message;
8   import javax.jms.MessageConsumer;
9   import javax.jms.Session;
10  import javax.jms.TextMessage;
11  import javax.jms.Topic;
12  import javax.naming.Context;
13  import javax.naming.InitialContext;
14  
15  import org.apache.commons.logging.Log;
16  import org.apache.commons.logging.LogFactory;
17  
18  /**
19   * This is used to listen to messages on a destination. You can control the
20   * durability (topics only) and selector used using the properties.
21   *
22   * @author jcstaff
23   */
24  public class Subscriber implements Runnable {
25      private static final Log log = LogFactory.getLog(Subscriber.class);
26      protected ConnectionFactory connFactory;
27      protected Destination destination;
28      protected boolean stop = false;
29      protected boolean stopped = false;
30      protected boolean started = false;
31      protected String name;
32      protected int count=0;
33      protected long sleepTime=0;
34      protected int maxCount=0;
35      protected boolean durable=false;
36      protected String selector=null;
37      protected String username;
38      protected String password;
39          
40      public Subscriber(String name) {
41          this.name = name;
42      }
43      public void setConnFactory(ConnectionFactory connFactory) {
44          this.connFactory = connFactory;
45      }
46      public void setDestination(Destination destination) {
47          this.destination = destination;
48      }    
49      public int getCount() {
50          return count;
51      }
52      public void setSleepTime(long sleepTime) {
53          this.sleepTime = sleepTime;
54      }
55      public void setMaxCount(int maxCount) {
56          this.maxCount = maxCount;
57      }
58      public void setDurable(boolean durable) {
59          this.durable = durable;
60      }
61      public void setSelector(String selector) {
62          this.selector = selector;
63      }
64      public void clearMessages() {
65          count = 0;
66      }
67      public void stop() {
68          this.stop = true;
69      }
70      public boolean isStopped() {
71          return stopped;
72      }
73      public boolean isStarted() {
74          return started;
75      }
76      public void setUsername(String username) {
77  		this.username = username;
78  	}
79      public void setPassword(String password) {
80  		this.password = password;
81  	}
82  
83      public void execute() throws Exception {
84          Connection connection = null;
85          Session session = null;
86          MessageConsumer consumer = null;
87          try {
88              connection = username==null ?
89              		connFactory.createConnection() :
90              		connFactory.createConnection(username, password);
91              connection.setClientID(name);
92              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
93              if (durable == false) {                
94                  try { session.unsubscribe(name); } 
95                  catch (JMSException ignored) {}
96                  consumer = session.createConsumer(destination, selector);                
97              }
98              else {
99                  consumer = session.createDurableSubscriber((Topic)destination, 
100                                                          name, selector, false);
101             }            
102             connection.start();
103 
104             stopped = stop = false;
105             log.info("subscriber " + name + " starting:" +
106                     "durable=" + durable +
107                     ", selector=" + selector);
108             started = true;
109             while (!stop && (maxCount==0 || count < maxCount)) {
110                 Message message = consumer.receive(3000);
111                 if (message != null) {
112                     count += 1;
113                     StringBuilder text = new StringBuilder();
114                     text.append(name + " received message #" + count +
115                             ", msgId=" + message.getJMSMessageID());
116                     if (message instanceof TextMessage) {
117                         text.append(", body=" 
118                                 +((TextMessage)message).getText());
119                     }
120                     log.debug(text.toString());
121                     Thread.yield();
122                 }      
123                 if (sleepTime > 0) {
124                     log.debug("processing message for " + sleepTime + "msecs");
125                     Thread.sleep(sleepTime);
126                 }
127             }
128             log.info("subscriber " + name + " stopping");
129             connection.stop();
130         }
131         finally {
132             stopped = true;
133             started = false;
134             if (consumer != null)   { consumer.close(); }
135             if (session!=null){ session.close();}
136             if (connection != null) { connection.close(); }
137         }
138     }
139     
140     public void run() {
141         try {
142             execute();
143         }
144         catch (Exception ex) {
145             log.fatal("error running " + name, ex);
146         }
147     }    
148 
149     public static void main(String args[]) {
150     	boolean noExit=false;
151         try {
152             String connFactoryJNDI=null;
153             String destinationJNDI=null;
154             String name="";
155             Long sleepTime=null;
156             Integer maxCount=null;
157             Boolean durable=null;
158             String selector=null;
159             String username=null;
160             String password=null;
161             for (int i=0; i<args.length; i++) {
162                 if ("-jndi.name.connFactory".equals(args[i])) {
163                     connFactoryJNDI = args[++i];
164                 }
165                 else if ("-jndi.name.destination".equals(args[i])) {
166                     destinationJNDI=args[++i];
167                 }
168                 else if ("-name".equals(args[i])) {
169                     name=args[++i];
170                 }
171                 else if ("-name".equals(args[i])) {
172                     name=args[++i];
173                 }
174                 else if ("-sleep".equals(args[i])) {
175                     sleepTime=new Long(args[++i]);
176                 }
177                 else if ("-max".equals(args[i])) {
178                     maxCount=new Integer(args[++i]);
179                 }
180                 else if ("-durable".equals(args[i])) {
181                     durable=new Boolean(args[++i]);
182                 }
183                 else if ("-selector".equals(args[i])) {
184                     selector=args[++i];
185                 }
186                 else if ("-username".equals(args[i])) {
187                 	username=args[++i];
188                 }
189                 else if ("-password".equals(args[i])) {
190                 	password=args[++i];
191                 }
192                 else if ("-noExit".equals(args[i])) {
193                 	noExit=true;
194                 }
195             }
196             if (connFactoryJNDI==null) { 
197                 throw new Exception("jndi.name.connFactory not supplied");
198             }
199             else if (destinationJNDI==null) {
200                 throw new Exception("jndi.name.destination not supplied");
201             }            
202             Subscriber subscriber = new Subscriber(name);
203             Context jndi = new InitialContext();
204             subscriber.setConnFactory(
205                     (ConnectionFactory)jndi.lookup(connFactoryJNDI));
206             subscriber.setDestination((Destination)jndi.lookup(destinationJNDI));
207             if (maxCount!=null) {
208                 subscriber.setMaxCount(maxCount);
209             }
210             if (sleepTime!=null) {
211                 subscriber.setSleepTime(sleepTime);
212             }
213             if (durable!=null) {
214                 subscriber.setDurable(durable);
215             }
216             if (selector!=null) {
217                 subscriber.setSelector(selector);
218             }
219             subscriber.setUsername(username);
220             subscriber.setPassword(password);
221             subscriber.execute();
222         }
223         catch (Exception ex) {
224             log.fatal("",ex);
225             System.exit(-1);            
226             if (noExit) {
227             	throw new RuntimeException("error in subscriber", ex);
228             }
229             System.exit(-1);
230         }
231     }
232 }