View Javadoc
1   package ejava.examples.asyncmarket.ejb;
2   
3   import java.text.DateFormat;
4   import java.text.SimpleDateFormat;
5   import java.util.ArrayList;
6   
7   import java.util.Collection;
8   import java.util.Date;
9   import java.util.List;
10  import java.util.concurrent.ExecutionException;
11  import java.util.concurrent.Future;
12  
13  import javax.annotation.PostConstruct;
14  import javax.annotation.Resource;
15  import javax.ejb.AsyncResult;
16  import javax.ejb.EJB;
17  import javax.ejb.EJBException;
18  import javax.ejb.Schedule;
19  import javax.ejb.ScheduleExpression;
20  import javax.ejb.Stateless;
21  import javax.ejb.Timeout;
22  import javax.ejb.Timer;
23  import javax.ejb.TimerService;
24  import javax.ejb.TransactionAttribute;
25  import javax.ejb.TransactionAttributeType;
26  import javax.jms.Connection;
27  import javax.jms.ConnectionFactory;
28  import javax.jms.Destination;
29  import javax.jms.JMSException;
30  import javax.jms.MapMessage;
31  import javax.jms.MessageProducer;
32  import javax.jms.Session;
33  import javax.jms.Topic;
34  import javax.persistence.EntityManager;
35  import javax.persistence.PersistenceContext;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  
40  import ejava.examples.asyncmarket.MarketException;
41  import ejava.examples.asyncmarket.bo.AuctionItem;
42  import ejava.examples.asyncmarket.bo.Bid;
43  import ejava.examples.asyncmarket.bo.Person;
44  import ejava.examples.asyncmarket.dao.AuctionItemDAO;
45  import ejava.examples.asyncmarket.dao.PersonDAO;
46  import ejava.examples.asyncmarket.jpa.JPAAuctionItemDAO;
47  import ejava.examples.asyncmarket.jpa.JPAPersonDAO;
48  
49  @Stateless
50  @TransactionAttribute(TransactionAttributeType.REQUIRED)
51  public class AuctionMgmtEJB implements AuctionMgmtRemote, AuctionMgmtLocal {
52      private static final Log log = LogFactory.getLog(AuctionMgmtEJB.class);
53      
54      @PersistenceContext(unitName="asyncMarket")
55      private EntityManager em;
56      
57      private @EJB AuctionMgmtActionEJB actions;
58      
59      private AuctionItemDAO auctionItemDAO;
60      private PersonDAO userDAO;
61      
62      @Resource
63      private TimerService timerService;
64      //injected
65      long checkItemInterval;
66      
67      @Resource(lookup="java:/JmsXA")
68      //@Resource(name="jms/ConnectionFactory")
69      private ConnectionFactory connFactory;
70      @Resource(lookup="java:/topic/ejava/examples/asyncMarket/topic1", type=Topic.class)
71      private Destination sellTopic;
72      
73      @PostConstruct
74      public void init() {
75          log.info("**** AuctionMgmtEJB init() ***");
76          log.debug("timerService=" + timerService);
77          log.debug("checkAuctionInterval=" + checkItemInterval);
78          log.debug("connFactory=" + connFactory);
79          log.debug("sellTopic=" + sellTopic);
80          
81          auctionItemDAO = new JPAAuctionItemDAO();
82          ((JPAAuctionItemDAO)auctionItemDAO).setEntityManager(em);
83          
84          userDAO = new JPAPersonDAO();
85          ((JPAPersonDAO)userDAO).setEntityManager(em);
86      }
87      
88      public void cancelTimers() {
89          log.debug("canceling timers");
90          for (Timer timer : (Collection<Timer>)timerService.getTimers()) {
91              timer.cancel();
92          }
93      }
94      public void initTimers(long delay) {
95          cancelTimers();
96          log.debug("initializing timers, checkItemInterval="+delay);
97          timerService.createTimer(0,delay, "checkAuctionTimer");
98      }
99      public void initTimers(ScheduleExpression schedule) {
100     	cancelTimers();
101         log.debug("initializing timers, schedule="+schedule);
102     	timerService.createCalendarTimer(schedule);
103     }
104     
105     public void closeBidding(long itemId) throws MarketException {
106         AuctionItem item = auctionItemDAO.getItem(itemId);
107         if (item == null) {
108             throw new MarketException("itemId not found:" + itemId);
109         }
110 
111         try {
112             item.closeBids();
113             log.debug("closed bidding for item:" + item);
114         }
115         catch (Exception ex) {
116             log.error("error closing bid", ex);
117             throw new MarketException("error closing bid:" + ex);
118         }
119     }
120 
121     public Bid getWinningBid(long itemId) throws MarketException {
122         AuctionItem item=null;
123         try {
124             item = auctionItemDAO.getItem(itemId);
125             if (item != null) {
126                return makeDTO(item.getWinningBid());
127             }
128         }
129         catch (Exception ex) {
130             log.error("error closing bid", ex);
131             throw new MarketException("error closing bid:" + ex);
132         }
133         throw new MarketException("itemId not found:" + itemId);
134     }
135 
136     private Bid makeDTO(Bid bid) {
137         Bid dto = new Bid(bid.getId());
138         dto.setAmount(bid.getAmount());
139         dto.setBidder(makeDTO(bid.getBidder(), dto));
140         dto.setItem(makeDTO(bid.getItem(), dto));
141         return dto;
142     }
143     
144     private Person makeDTO(Person person, Bid bid) {
145         Person dto = new Person(person.getId());
146         dto.setUserId(person.getUserId());
147         dto.getBids().add(bid);
148         bid.setBidder(dto);        
149         dto.setVersion(person.getVersion());
150         return dto;
151     }
152     
153     private AuctionItem makeDTO(AuctionItem item, Bid bid) {
154         AuctionItem dto = new AuctionItem(item.getId());
155         dto.setName(item.getName());
156         dto.setVersion(item.getVersion());
157         dto.setStartDate(item.getStartDate());
158         dto.setEndDate(item.getEndDate());
159         dto.setMinBid(item.getMinBid());
160         dto.setWinningBid(bid);
161         dto.setClosed(item.isClosed());
162         return dto;
163     }
164 
165     
166     @Timeout
167     @Schedule(second="*/10", minute ="*", hour="*", dayOfMonth="*", month="*", year="*", persistent=false)
168     public void execute(Timer timer) {
169         log.info("timer fired:" + timer);
170         try {
171             checkAuction();
172         }
173         catch (Exception ex) {
174             log.error("error checking auction", ex);
175         }
176     }
177     
178     public int checkAuction() throws MarketException {
179         log.info("checking auctions");
180         Connection connection = null;
181         Session session = null;
182         int index = 0;            
183         try {
184             connection = connFactory.createConnection();
185             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
186             List<AuctionItem> items = null;
187             do { 
188                 items = auctionItemDAO.getAvailableItems(index, 10);
189                 for(AuctionItem item : items) {
190                     publishAvailableItem(session, item);
191                 }
192                 index += items.size();
193             } while (items.size() > 0);
194             log.debug("processed " + index + " active items");
195             return index;
196         }
197         catch (JMSException ex) {
198             log.error("error publishing auction item updates", ex);
199             return index;
200         }
201         finally {
202             try {
203                 if (session != null) { session.close(); }
204                 if (connection != null) { connection.close(); }
205             } catch (JMSException ignored) {}
206         }
207     }
208 
209     protected void publishAvailableItem(Session session, AuctionItem item)
210         throws JMSException {
211         MessageProducer producer = null;
212         try {
213             producer = session.createProducer(sellTopic);
214             MapMessage message = session.createMapMessage();
215             message.setJMSType("saleUpdate");
216             message.setLong("id", item.getId());
217             message.setString("name", item.getName());
218             message.setString("seller", item.getOwner().getUserId());
219             message.setLong("startDate", item.getStartDate().getTime());
220             message.setLong("endDate", item.getEndDate().getTime());
221             message.setDouble("minBid", item.getMinBid());
222             message.setDouble("bids", item.getBids().size());
223             message.setDouble("highestBid", 
224                     (item.getHighestBid() == null ? 0.00 :
225                         item.getHighestBid().getAmount()));            
226             producer.send(message);
227             log.debug("sent=" + message);
228         }
229         finally {
230             if (producer != null)   { producer.close(); }
231         }
232     }
233 
234     
235     public void removeBid(long bidId) throws MarketException {
236         try {
237             Bid bid = auctionItemDAO.getBid(bidId);
238             auctionItemDAO.removeBid(bid);        }
239         catch (Exception ex) {
240             log.error("error removing bid", ex);
241             throw new MarketException("error removing bid:" + ex);
242         }
243     }
244     
245     public List<AuctionItem> getItems(int index, int count) 
246         throws MarketException {
247         try {
248             return makeDTO(auctionItemDAO.getItems(index, count));
249         }
250         catch (Exception ex) {
251             log.error("error getting auction items", ex);
252             throw new MarketException("error getting auction items" + ex);
253         }
254     }
255 
256     public void removeItem(long id) throws MarketException {
257         try {
258             auctionItemDAO.removeItem(id);
259         }
260         catch (Exception ex) {
261             log.error("error removing auction items", ex);
262             throw new MarketException("error removing auction items" + ex);
263         }
264     }
265     
266     private List<AuctionItem> makeDTO(List<AuctionItem> items) {
267         List<AuctionItem> dto = new ArrayList<AuctionItem>();
268         for (AuctionItem item : items) {
269             dto.add(makeDTO(item));
270         }
271         return dto;
272     }
273 
274     private AuctionItem makeDTO(AuctionItem item) {
275         AuctionItem dto = new AuctionItem(item.getId());
276         dto.setVersion(item.getVersion());
277         dto.setName(item.getName());
278         dto.setStartDate(item.getStartDate());
279         dto.setEndDate(item.getEndDate());
280         dto.setMinBid(item.getMinBid());
281         dto.setBids(makeDTO(item.getBids(), dto));
282         dto.setWinningBid(null);
283         dto.setClosed(item.isClosed());
284         return dto;
285     }
286 
287     private List<Bid> makeDTO(List<Bid> bids, AuctionItem item) {
288         List<Bid> dtos = new ArrayList<Bid>();
289         for (Bid bid : bids) {
290             Bid dto = new Bid(bid.getId());
291             dto.setAmount(bid.getAmount());
292             dto.setItem(item);
293             item.getBids().add(dto);
294             dto.setBidder(makeDTO(bid.getBidder(),dto));
295             dtos.add(dto);
296         }
297         return dtos;
298     }
299 
300     /**
301      * Perform action synchronously while caller waits.
302      */
303 	@Override
304 	public void workSync(int count, long delay) {
305         DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
306         
307         long startTime = System.currentTimeMillis();
308         for (int i=0; i<count; i++) {
309         	log.info(String.format("%s issuing sync request, delay=%d", df.format(new Date()), delay));
310         	@SuppressWarnings("unused")
311 			Date date= actions.doWorkSync(delay);
312         	log.info(String.format("sync waitTime=%d msecs", System.currentTimeMillis()-startTime));
313         }
314     	long syncTime = System.currentTimeMillis() - startTime;
315     	log.info(String.format("workSync time=%d msecs", syncTime));
316 	}    
317 
318 	/**
319 	 * Perform action async from caller.
320 	 */
321 	@Override
322 	public void workAsync(int count, long delay) {
323         DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
324         
325         long startTime = System.currentTimeMillis();
326         List<Future<Date>> results = new ArrayList<Future<Date>>();
327         for (int i=0; i<count; i++) {
328         	log.info(String.format("%s issuing async request, delay=%d", df.format(new Date()), delay));
329         	Future<Date> date = actions.doWorkAsync(delay);
330         	results.add(date);
331         	log.info(String.format("async waitTime=%d msecs", System.currentTimeMillis()-startTime));
332         }
333         for (Future<Date> f: results) {
334         	log.info(String.format("%s getting async response", df.format(new Date())));
335         	try {
336 				@SuppressWarnings("unused")
337 				Date date = f.get();
338 			} catch (Exception ex) {
339 				log.error("unexpected error on future.get()", ex);
340 				throw new EJBException("unexpected error during future.get():"+ex);
341 			}
342         	log.info(String.format("%s got async response", df.format(new Date())));
343         }
344     	long asyncTime = System.currentTimeMillis() - startTime;
345     	log.info(String.format("workAsync time=%d msecs", asyncTime));
346 	}    
347 }