1 package ejava.examples.asyncmarket.ejb;
2
3 import java.text.DateFormat;
4 import java.text.SimpleDateFormat;
5 import java.util.ArrayList;
6 import java.util.Collection;
7 import java.util.Date;
8 import java.util.List;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.Future;
11
12 import javax.annotation.PostConstruct;
13 import javax.annotation.Resource;
14 import javax.ejb.EJB;
15 import javax.ejb.EJBException;
16 import javax.ejb.Schedule;
17 import javax.ejb.ScheduleExpression;
18 import javax.ejb.Stateless;
19 import javax.ejb.Timeout;
20 import javax.ejb.Timer;
21 import javax.ejb.TimerConfig;
22 import javax.ejb.TimerService;
23 import javax.ejb.TransactionAttribute;
24 import javax.ejb.TransactionAttributeType;
25 import javax.inject.Inject;
26 import javax.jms.Destination;
27 import javax.jms.JMSConnectionFactory;
28 import javax.jms.JMSContext;
29 import javax.jms.JMSException;
30 import javax.jms.JMSProducer;
31 import javax.jms.MapMessage;
32 import javax.jms.Topic;
33
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import ejava.examples.asyncmarket.bo.AuctionItem;
38 import ejava.examples.asyncmarket.bo.Bid;
39 import ejava.examples.asyncmarket.dao.AuctionItemDAO;
40
41 @Stateless
42 @TransactionAttribute(TransactionAttributeType.REQUIRED)
43 public class AuctionMgmtEJB implements AuctionMgmtRemote, AuctionMgmtLocal {
44 private static final Logger logger = LoggerFactory.getLogger(AuctionMgmtEJB.class);
45
46 @EJB
47 private AuctionMgmtActionEJB actions;
48 @Inject
49 private AuctionItemDAO auctionItemDAO;
50 @Inject
51 private DtoMapper dtoMapper;
52
53 @Resource
54 private TimerService timerService;
55
56 long checkItemInterval;
57
58 @Inject @JMSConnectionFactory("java:/JmsXA")
59 private JMSContext jmsContext;
60 @Resource(lookup="java:/jms/topic/ejava/examples/asyncMarket/topic1", type=Topic.class)
61 private Destination sellTopic;
62
63 @PostConstruct
64 public void init() {
65 logger.info("**** AuctionMgmtEJB init() ***");
66 logger.debug("timerService={}", timerService);
67 logger.debug("checkAuctionInterval={}", checkItemInterval);
68 logger.debug("jmsContext={}", jmsContext);
69 logger.debug("sellTopic={}", sellTopic);
70 }
71
72 public void cancelTimers() {
73 logger.debug("canceling timers");
74 for (Timer timer : (Collection<Timer>)timerService.getTimers()) {
75 timer.cancel();
76 }
77 }
78 public void initTimers(long delay) {
79 cancelTimers();
80 logger.debug("initializing timers, checkItemInterval={}", delay);
81 timerService.createIntervalTimer(0L, delay, new TimerConfig("checkAuctionTimer", false));
82 }
83 public void initTimers(ScheduleExpression schedule) {
84 cancelTimers();
85 logger.debug("initializing timers, schedule={}", schedule);
86 timerService.createCalendarTimer(schedule, new TimerConfig("checkAuctionTimer", false));
87 }
88
89 public void closeBidding(long itemId) throws ResourceNotFoundException {
90 AuctionItem item = auctionItemDAO.getItem(itemId);
91 if (item == null) {
92 throw new ResourceNotFoundException("itemId[%s] not found", itemId);
93 }
94
95 try {
96 item.closeBids();
97 logger.debug("closed bidding for item: {}", item);
98 }
99 catch (Exception ex) {
100 logger.error("error closing bid", ex);
101 throw new InternalErrorException("error closing bid:" + ex);
102 }
103 }
104
105 @TransactionAttribute(TransactionAttributeType.REQUIRED)
106 public Bid getWinningBid(long itemId) throws ResourceNotFoundException {
107 AuctionItem item=auctionItemDAO.getItem(itemId);
108 if (item==null) {
109 throw new ResourceNotFoundException("bid[%s] not found", itemId);
110 }
111
112 try {
113 return dtoMapper.toDTO(item.getWinningBid());
114 } catch (Exception ex) {
115 throw new InternalErrorException("error returning winning bid:%s", ex.toString());
116 }
117 }
118
119
120 @Timeout
121 @Schedule(second="*/10", minute ="*", hour="*", dayOfMonth="*", month="*", year="*", persistent=false)
122 @TransactionAttribute(TransactionAttributeType.REQUIRED)
123 public void execute(Timer timer) {
124 logger.info("timer fired: {}", timer);
125 try {
126 checkAuction();
127 }
128 catch (Exception ex) {
129 logger.error("error checking auction", ex);
130 }
131 }
132
133 @TransactionAttribute(TransactionAttributeType.REQUIRED)
134 public int checkAuction() {
135 logger.info("checking auctions");
136 int index = 0;
137 try {
138 for (List<AuctionItem> items = auctionItemDAO.getAvailableItems(index, 10); items.size()>0; ) {
139 for(AuctionItem item : items) {
140 publishAvailableItem(item);
141 }
142 index += items.size();
143 items = auctionItemDAO.getAvailableItems(index, 10);
144 }
145 logger.debug("processed {} active items", index);
146 return index;
147 }
148 catch (JMSException ex) {
149 logger.error("error publishing auction item updates", ex);
150 return index;
151 }
152 }
153
154 protected void publishAvailableItem(AuctionItem item) throws JMSException {
155 JMSProducer producer = jmsContext.createProducer();
156 MapMessage message = jmsContext.createMapMessage();
157 message.setJMSType("saleUpdate");
158 message.setLong("id", item.getId());
159 message.setString("name", item.getName());
160 message.setString("seller", item.getOwner().getUserId());
161 message.setLong("startDate", item.getStartDate().getTime());
162 message.setLong("endDate", item.getEndDate().getTime());
163 message.setDouble("minBid", item.getMinBid());
164 message.setDouble("bids", item.getBids().size());
165 message.setDouble("highestBid",
166 (item.getHighestBid() == null ? 0.00 :
167 item.getHighestBid().getAmount()));
168 producer.send(sellTopic, message);
169 logger.debug("sent={}", message);
170 }
171
172
173 public void removeBid(long bidId) throws ResourceNotFoundException {
174 Bid bid = auctionItemDAO.getBid(bidId);
175 if (bid==null) {
176 throw new ResourceNotFoundException("bidId[%s] not found", bidId);
177 }
178
179 try {
180 auctionItemDAO.removeBid(bid); }
181 catch (Exception ex) {
182 logger.error("error removing bid", ex);
183 throw new InternalErrorException("error removing bid[%d]:%s", bidId, ex.toString());
184 }
185 }
186
187 @TransactionAttribute(TransactionAttributeType.REQUIRED)
188 public List<AuctionItem> getItems(int index, int count) {
189 try {
190 return dtoMapper.toDTO(auctionItemDAO.getItems(index, count));
191 }
192 catch (Exception ex) {
193 logger.error("error getting auction items", ex);
194 throw new InternalErrorException("error getting auction items: %s", ex.toString());
195 }
196 }
197
198 public void removeItem(long itemId) {
199 try {
200 auctionItemDAO.removeItem(itemId);
201 }
202 catch (Exception ex) {
203 logger.error("error removing auction items", ex);
204 throw new InternalErrorException("error removing auction item[%d]: %s", itemId, ex.toString());
205 }
206 }
207
208
209
210
211 @Override
212 @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
213 public void workSync(int count, long delay) {
214 DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
215
216 long startTime = System.currentTimeMillis();
217 for (int i=0; i<count; i++) {
218 logger.info("{} issuing sync request, delay={}", df.format(new Date()), delay);
219 @SuppressWarnings("unused")
220 Date date = actions.doWorkSync(delay);
221 logger.info("sync waitTime={} msecs", System.currentTimeMillis()-startTime);
222 }
223 long syncTime = System.currentTimeMillis() - startTime;
224 logger.info("workSync time={} msecs", syncTime);
225 }
226
227
228
229
230 @Override
231 @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
232 public void workAsync(int count, long delay) {
233 DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
234
235 List<Future<Date>> results = new ArrayList<Future<Date>>();
236
237 long startTime = System.currentTimeMillis();
238 for (int i=0; i<count; i++) {
239 logger.info("{} issuing async request, delay={}", df.format(new Date()), delay);
240 Future<Date> date = actions.doWorkAsync(delay);
241 results.add(date);
242 logger.info("async waitTime={} msecs", System.currentTimeMillis()-startTime);
243 }
244
245
246 for (Future<Date> f: results) {
247 logger.info("{} getting async response", df.format(new Date()));
248 try {
249 @SuppressWarnings("unused")
250 Date date = f.get();
251 } catch (ExecutionException | InterruptedException ex) {
252 logger.error("unexpected error on future.get()", ex);
253 throw new EJBException("unexpected error during future.get():"+ex);
254 }
255 logger.info("{} got async response", df.format(new Date()));
256 }
257 long asyncTime = System.currentTimeMillis() - startTime;
258 logger.info("workAsync time={} msecs", asyncTime);
259 }
260 }