View Javadoc
1   package ejava.examples.asyncmarket.ejb;
2   
3   import java.text.DateFormat;
4   import java.text.SimpleDateFormat;
5   import java.util.ArrayList;
6   import java.util.Collection;
7   import java.util.Date;
8   import java.util.List;
9   import java.util.concurrent.ExecutionException;
10  import java.util.concurrent.Future;
11  
12  import javax.annotation.PostConstruct;
13  import javax.annotation.Resource;
14  import javax.ejb.EJB;
15  import javax.ejb.EJBException;
16  import javax.ejb.Schedule;
17  import javax.ejb.ScheduleExpression;
18  import javax.ejb.Stateless;
19  import javax.ejb.Timeout;
20  import javax.ejb.Timer;
21  import javax.ejb.TimerConfig;
22  import javax.ejb.TimerService;
23  import javax.ejb.TransactionAttribute;
24  import javax.ejb.TransactionAttributeType;
25  import javax.inject.Inject;
26  import javax.jms.Destination;
27  import javax.jms.JMSConnectionFactory;
28  import javax.jms.JMSContext;
29  import javax.jms.JMSException;
30  import javax.jms.JMSProducer;
31  import javax.jms.MapMessage;
32  import javax.jms.Topic;
33  
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import ejava.examples.asyncmarket.bo.AuctionItem;
38  import ejava.examples.asyncmarket.bo.Bid;
39  import ejava.examples.asyncmarket.dao.AuctionItemDAO;
40  
41  @Stateless
42  @TransactionAttribute(TransactionAttributeType.REQUIRED)
43  public class AuctionMgmtEJB implements AuctionMgmtRemote, AuctionMgmtLocal {
44      private static final Logger logger = LoggerFactory.getLogger(AuctionMgmtEJB.class);
45      
46      @EJB
47      private AuctionMgmtActionEJB actions;
48      @Inject
49      private AuctionItemDAO auctionItemDAO;
50      @Inject
51      private DtoMapper dtoMapper;
52      
53      @Resource
54      private TimerService timerService;
55      //injected
56      long checkItemInterval;
57      
58      @Inject @JMSConnectionFactory("java:/JmsXA")
59      private JMSContext jmsContext;
60      @Resource(lookup="java:/jms/topic/ejava/examples/asyncMarket/topic1", type=Topic.class)
61      private Destination sellTopic;
62      
63      @PostConstruct
64      public void init() {
65          logger.info("**** AuctionMgmtEJB init() ***");
66          logger.debug("timerService={}", timerService);
67          logger.debug("checkAuctionInterval={}", checkItemInterval);
68          logger.debug("jmsContext={}", jmsContext);
69          logger.debug("sellTopic={}", sellTopic);
70      }
71      
72      public void cancelTimers() {
73          logger.debug("canceling timers");
74          for (Timer timer : (Collection<Timer>)timerService.getTimers()) {
75              timer.cancel();
76          }
77      }
78      public void initTimers(long delay) {
79          cancelTimers();
80          logger.debug("initializing timers, checkItemInterval={}", delay);
81          timerService.createIntervalTimer(0L, delay, new TimerConfig("checkAuctionTimer", false));
82      }
83      public void initTimers(ScheduleExpression schedule) {
84      	    cancelTimers();
85          logger.debug("initializing timers, schedule={}", schedule);
86      	    timerService.createCalendarTimer(schedule, new TimerConfig("checkAuctionTimer", false));
87      }
88      
89      public void closeBidding(long itemId) throws ResourceNotFoundException {
90          AuctionItem item = auctionItemDAO.getItem(itemId);
91          if (item == null) {
92              throw new ResourceNotFoundException("itemId[%s] not found", itemId);
93          }
94  
95          try {
96              item.closeBids();
97              logger.debug("closed bidding for item: {}", item);
98          }
99          catch (Exception ex) {
100             logger.error("error closing bid", ex);
101             throw new InternalErrorException("error closing bid:" + ex);
102         }
103     }
104 
105     @TransactionAttribute(TransactionAttributeType.REQUIRED)
106     public Bid getWinningBid(long itemId) throws ResourceNotFoundException {
107         AuctionItem item=auctionItemDAO.getItem(itemId);
108         if (item==null) {
109             throw new ResourceNotFoundException("bid[%s] not found", itemId);
110         }
111         
112         try {
113             return dtoMapper.toDTO(item.getWinningBid());
114         } catch (Exception ex) {
115             throw new InternalErrorException("error returning winning bid:%s", ex.toString());
116         }
117     }
118 
119     
120     @Timeout
121     @Schedule(second="*/10", minute ="*", hour="*", dayOfMonth="*", month="*", year="*", persistent=false)
122     @TransactionAttribute(TransactionAttributeType.REQUIRED)
123     public void execute(Timer timer) {
124         logger.info("timer fired: {}", timer);
125         try {
126             checkAuction();
127         }
128         catch (Exception ex) {
129             logger.error("error checking auction", ex);
130         }
131     }
132     
133     @TransactionAttribute(TransactionAttributeType.REQUIRED)
134     public int checkAuction() {
135         logger.info("checking auctions");
136         int index = 0;            
137         try {
138             for (List<AuctionItem> items = auctionItemDAO.getAvailableItems(index, 10); items.size()>0; ) {
139                 for(AuctionItem item : items) {
140                     publishAvailableItem(item);
141                 }
142                 index += items.size();
143                 items = auctionItemDAO.getAvailableItems(index, 10);
144             }
145             logger.debug("processed {} active items", index);
146             return index;
147         }
148         catch (JMSException ex) {
149             logger.error("error publishing auction item updates", ex);
150             return index;
151         }
152     }
153 
154     protected void publishAvailableItem(AuctionItem item) throws JMSException {
155         JMSProducer producer = jmsContext.createProducer();
156         MapMessage message = jmsContext.createMapMessage();
157         message.setJMSType("saleUpdate");
158         message.setLong("id", item.getId());
159         message.setString("name", item.getName());
160         message.setString("seller", item.getOwner().getUserId());
161         message.setLong("startDate", item.getStartDate().getTime());
162         message.setLong("endDate", item.getEndDate().getTime());
163         message.setDouble("minBid", item.getMinBid());
164         message.setDouble("bids", item.getBids().size());
165         message.setDouble("highestBid", 
166                 (item.getHighestBid() == null ? 0.00 :
167                     item.getHighestBid().getAmount()));            
168         producer.send(sellTopic, message);
169         logger.debug("sent={}", message);
170     }
171 
172     
173     public void removeBid(long bidId) throws ResourceNotFoundException {
174         Bid bid = auctionItemDAO.getBid(bidId);
175         if (bid==null) {
176             throw new ResourceNotFoundException("bidId[%s] not found", bidId);
177         }
178         
179         try {
180             auctionItemDAO.removeBid(bid);        }
181         catch (Exception ex) {
182             logger.error("error removing bid", ex);
183             throw new InternalErrorException("error removing bid[%d]:%s", bidId, ex.toString());
184         }
185     }
186     
187     @TransactionAttribute(TransactionAttributeType.REQUIRED)
188     public List<AuctionItem> getItems(int index, int count) {
189         try {
190             return dtoMapper.toDTO(auctionItemDAO.getItems(index, count));
191         }
192         catch (Exception ex) {
193             logger.error("error getting auction items", ex);
194             throw new InternalErrorException("error getting auction items: %s", ex.toString());
195         }
196     }
197 
198     public void removeItem(long itemId) {
199         try {
200             auctionItemDAO.removeItem(itemId);
201         }
202         catch (Exception ex) {
203             logger.error("error removing auction items", ex);
204             throw new InternalErrorException("error removing auction item[%d]: %s", itemId, ex.toString());
205         }
206     }
207 
208     /**
209      * Perform action synchronously while this caller waits
210      */
211 	@Override
212     @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
213 	public void workSync(int count, long delay) {
214         DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
215         
216         long startTime = System.currentTimeMillis();
217         for (int i=0; i<count; i++) {
218             logger.info("{} issuing sync request, delay={}", df.format(new Date()), delay);
219             	@SuppressWarnings("unused")
220             	Date date = actions.doWorkSync(delay);
221             	logger.info("sync waitTime={} msecs", System.currentTimeMillis()-startTime);
222         }
223         	long syncTime = System.currentTimeMillis() - startTime;
224         	logger.info("workSync time={} msecs", syncTime);
225 	}    
226 
227 	/**
228 	 * Perform action async from this caller
229 	 */
230 	@Override
231     @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
232 	public void workAsync(int count, long delay) {
233         DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
234         
235         List<Future<Date>> results = new ArrayList<Future<Date>>();
236             //issue requests
237         long startTime = System.currentTimeMillis();
238         for (int i=0; i<count; i++) {
239             	logger.info("{} issuing async request, delay={}", df.format(new Date()), delay);
240             	Future<Date> date = actions.doWorkAsync(delay);
241             	results.add(date);
242             	logger.info("async waitTime={} msecs", System.currentTimeMillis()-startTime);
243         }
244         
245             //process results
246         for (Future<Date> f: results) {
247             	logger.info("{} getting async response", df.format(new Date()));
248             	try {
249     				@SuppressWarnings("unused")
250     				Date date = f.get();
251     			} catch (ExecutionException | InterruptedException ex) {
252     				logger.error("unexpected error on future.get()", ex);
253     				throw new EJBException("unexpected error during future.get():"+ex);
254     			}
255             	logger.info("{} got async response", df.format(new Date()));
256         }
257         	long asyncTime = System.currentTimeMillis() - startTime;
258         	logger.info("workAsync time={} msecs", asyncTime);
259 	}    
260 }