1 package ejava.examples.asyncmarket.ejb;
2
3 import java.text.DateFormat;
4 import java.text.SimpleDateFormat;
5 import java.util.ArrayList;
6
7 import java.util.Collection;
8 import java.util.Date;
9 import java.util.List;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.Future;
12
13 import javax.annotation.PostConstruct;
14 import javax.annotation.Resource;
15 import javax.ejb.AsyncResult;
16 import javax.ejb.EJB;
17 import javax.ejb.EJBException;
18 import javax.ejb.Schedule;
19 import javax.ejb.ScheduleExpression;
20 import javax.ejb.Stateless;
21 import javax.ejb.Timeout;
22 import javax.ejb.Timer;
23 import javax.ejb.TimerService;
24 import javax.ejb.TransactionAttribute;
25 import javax.ejb.TransactionAttributeType;
26 import javax.jms.Connection;
27 import javax.jms.ConnectionFactory;
28 import javax.jms.Destination;
29 import javax.jms.JMSException;
30 import javax.jms.MapMessage;
31 import javax.jms.MessageProducer;
32 import javax.jms.Session;
33 import javax.jms.Topic;
34 import javax.persistence.EntityManager;
35 import javax.persistence.PersistenceContext;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39
40 import ejava.examples.asyncmarket.MarketException;
41 import ejava.examples.asyncmarket.bo.AuctionItem;
42 import ejava.examples.asyncmarket.bo.Bid;
43 import ejava.examples.asyncmarket.bo.Person;
44 import ejava.examples.asyncmarket.dao.AuctionItemDAO;
45 import ejava.examples.asyncmarket.dao.PersonDAO;
46 import ejava.examples.asyncmarket.jpa.JPAAuctionItemDAO;
47 import ejava.examples.asyncmarket.jpa.JPAPersonDAO;
48
49 @Stateless
50 @TransactionAttribute(TransactionAttributeType.REQUIRED)
51 public class AuctionMgmtEJB implements AuctionMgmtRemote, AuctionMgmtLocal {
52 private static final Log log = LogFactory.getLog(AuctionMgmtEJB.class);
53
54 @PersistenceContext(unitName="asyncMarket")
55 private EntityManager em;
56
57 private @EJB AuctionMgmtActionEJB actions;
58
59 private AuctionItemDAO auctionItemDAO;
60 private PersonDAO userDAO;
61
62 @Resource
63 private TimerService timerService;
64
65 long checkItemInterval;
66
67 @Resource(lookup="java:/JmsXA")
68
69 private ConnectionFactory connFactory;
70 @Resource(lookup="java:/topic/ejava/examples/asyncMarket/topic1", type=Topic.class)
71 private Destination sellTopic;
72
73 @PostConstruct
74 public void init() {
75 log.info("**** AuctionMgmtEJB init() ***");
76 log.debug("timerService=" + timerService);
77 log.debug("checkAuctionInterval=" + checkItemInterval);
78 log.debug("connFactory=" + connFactory);
79 log.debug("sellTopic=" + sellTopic);
80
81 auctionItemDAO = new JPAAuctionItemDAO();
82 ((JPAAuctionItemDAO)auctionItemDAO).setEntityManager(em);
83
84 userDAO = new JPAPersonDAO();
85 ((JPAPersonDAO)userDAO).setEntityManager(em);
86 }
87
88 public void cancelTimers() {
89 log.debug("canceling timers");
90 for (Timer timer : (Collection<Timer>)timerService.getTimers()) {
91 timer.cancel();
92 }
93 }
94 public void initTimers(long delay) {
95 cancelTimers();
96 log.debug("initializing timers, checkItemInterval="+delay);
97 timerService.createTimer(0,delay, "checkAuctionTimer");
98 }
99 public void initTimers(ScheduleExpression schedule) {
100 cancelTimers();
101 log.debug("initializing timers, schedule="+schedule);
102 timerService.createCalendarTimer(schedule);
103 }
104
105 public void closeBidding(long itemId) throws MarketException {
106 AuctionItem item = auctionItemDAO.getItem(itemId);
107 if (item == null) {
108 throw new MarketException("itemId not found:" + itemId);
109 }
110
111 try {
112 item.closeBids();
113 log.debug("closed bidding for item:" + item);
114 }
115 catch (Exception ex) {
116 log.error("error closing bid", ex);
117 throw new MarketException("error closing bid:" + ex);
118 }
119 }
120
121 public Bid getWinningBid(long itemId) throws MarketException {
122 AuctionItem item=null;
123 try {
124 item = auctionItemDAO.getItem(itemId);
125 if (item != null) {
126 return makeDTO(item.getWinningBid());
127 }
128 }
129 catch (Exception ex) {
130 log.error("error closing bid", ex);
131 throw new MarketException("error closing bid:" + ex);
132 }
133 throw new MarketException("itemId not found:" + itemId);
134 }
135
136 private Bid makeDTO(Bid bid) {
137 Bid dto = new Bid(bid.getId());
138 dto.setAmount(bid.getAmount());
139 dto.setBidder(makeDTO(bid.getBidder(), dto));
140 dto.setItem(makeDTO(bid.getItem(), dto));
141 return dto;
142 }
143
144 private Person makeDTO(Person person, Bid bid) {
145 Person dto = new Person(person.getId());
146 dto.setUserId(person.getUserId());
147 dto.getBids().add(bid);
148 bid.setBidder(dto);
149 dto.setVersion(person.getVersion());
150 return dto;
151 }
152
153 private AuctionItem makeDTO(AuctionItem item, Bid bid) {
154 AuctionItem dto = new AuctionItem(item.getId());
155 dto.setName(item.getName());
156 dto.setVersion(item.getVersion());
157 dto.setStartDate(item.getStartDate());
158 dto.setEndDate(item.getEndDate());
159 dto.setMinBid(item.getMinBid());
160 dto.setWinningBid(bid);
161 dto.setClosed(item.isClosed());
162 return dto;
163 }
164
165
166 @Timeout
167 @Schedule(second="*/10", minute ="*", hour="*", dayOfMonth="*", month="*", year="*", persistent=false)
168 public void execute(Timer timer) {
169 log.info("timer fired:" + timer);
170 try {
171 checkAuction();
172 }
173 catch (Exception ex) {
174 log.error("error checking auction", ex);
175 }
176 }
177
178 public int checkAuction() throws MarketException {
179 log.info("checking auctions");
180 Connection connection = null;
181 Session session = null;
182 int index = 0;
183 try {
184 connection = connFactory.createConnection();
185 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
186 List<AuctionItem> items = null;
187 do {
188 items = auctionItemDAO.getAvailableItems(index, 10);
189 for(AuctionItem item : items) {
190 publishAvailableItem(session, item);
191 }
192 index += items.size();
193 } while (items.size() > 0);
194 log.debug("processed " + index + " active items");
195 return index;
196 }
197 catch (JMSException ex) {
198 log.error("error publishing auction item updates", ex);
199 return index;
200 }
201 finally {
202 try {
203 if (session != null) { session.close(); }
204 if (connection != null) { connection.close(); }
205 } catch (JMSException ignored) {}
206 }
207 }
208
209 protected void publishAvailableItem(Session session, AuctionItem item)
210 throws JMSException {
211 MessageProducer producer = null;
212 try {
213 producer = session.createProducer(sellTopic);
214 MapMessage message = session.createMapMessage();
215 message.setJMSType("saleUpdate");
216 message.setLong("id", item.getId());
217 message.setString("name", item.getName());
218 message.setString("seller", item.getOwner().getUserId());
219 message.setLong("startDate", item.getStartDate().getTime());
220 message.setLong("endDate", item.getEndDate().getTime());
221 message.setDouble("minBid", item.getMinBid());
222 message.setDouble("bids", item.getBids().size());
223 message.setDouble("highestBid",
224 (item.getHighestBid() == null ? 0.00 :
225 item.getHighestBid().getAmount()));
226 producer.send(message);
227 log.debug("sent=" + message);
228 }
229 finally {
230 if (producer != null) { producer.close(); }
231 }
232 }
233
234
235 public void removeBid(long bidId) throws MarketException {
236 try {
237 Bid bid = auctionItemDAO.getBid(bidId);
238 auctionItemDAO.removeBid(bid); }
239 catch (Exception ex) {
240 log.error("error removing bid", ex);
241 throw new MarketException("error removing bid:" + ex);
242 }
243 }
244
245 public List<AuctionItem> getItems(int index, int count)
246 throws MarketException {
247 try {
248 return makeDTO(auctionItemDAO.getItems(index, count));
249 }
250 catch (Exception ex) {
251 log.error("error getting auction items", ex);
252 throw new MarketException("error getting auction items" + ex);
253 }
254 }
255
256 public void removeItem(long id) throws MarketException {
257 try {
258 auctionItemDAO.removeItem(id);
259 }
260 catch (Exception ex) {
261 log.error("error removing auction items", ex);
262 throw new MarketException("error removing auction items" + ex);
263 }
264 }
265
266 private List<AuctionItem> makeDTO(List<AuctionItem> items) {
267 List<AuctionItem> dto = new ArrayList<AuctionItem>();
268 for (AuctionItem item : items) {
269 dto.add(makeDTO(item));
270 }
271 return dto;
272 }
273
274 private AuctionItem makeDTO(AuctionItem item) {
275 AuctionItem dto = new AuctionItem(item.getId());
276 dto.setVersion(item.getVersion());
277 dto.setName(item.getName());
278 dto.setStartDate(item.getStartDate());
279 dto.setEndDate(item.getEndDate());
280 dto.setMinBid(item.getMinBid());
281 dto.setBids(makeDTO(item.getBids(), dto));
282 dto.setWinningBid(null);
283 dto.setClosed(item.isClosed());
284 return dto;
285 }
286
287 private List<Bid> makeDTO(List<Bid> bids, AuctionItem item) {
288 List<Bid> dtos = new ArrayList<Bid>();
289 for (Bid bid : bids) {
290 Bid dto = new Bid(bid.getId());
291 dto.setAmount(bid.getAmount());
292 dto.setItem(item);
293 item.getBids().add(dto);
294 dto.setBidder(makeDTO(bid.getBidder(),dto));
295 dtos.add(dto);
296 }
297 return dtos;
298 }
299
300
301
302
303 @Override
304 public void workSync(int count, long delay) {
305 DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
306
307 long startTime = System.currentTimeMillis();
308 for (int i=0; i<count; i++) {
309 log.info(String.format("%s issuing sync request, delay=%d", df.format(new Date()), delay));
310 @SuppressWarnings("unused")
311 Date date= actions.doWorkSync(delay);
312 log.info(String.format("sync waitTime=%d msecs", System.currentTimeMillis()-startTime));
313 }
314 long syncTime = System.currentTimeMillis() - startTime;
315 log.info(String.format("workSync time=%d msecs", syncTime));
316 }
317
318
319
320
321 @Override
322 public void workAsync(int count, long delay) {
323 DateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
324
325 long startTime = System.currentTimeMillis();
326 List<Future<Date>> results = new ArrayList<Future<Date>>();
327 for (int i=0; i<count; i++) {
328 log.info(String.format("%s issuing async request, delay=%d", df.format(new Date()), delay));
329 Future<Date> date = actions.doWorkAsync(delay);
330 results.add(date);
331 log.info(String.format("async waitTime=%d msecs", System.currentTimeMillis()-startTime));
332 }
333 for (Future<Date> f: results) {
334 log.info(String.format("%s getting async response", df.format(new Date())));
335 try {
336 @SuppressWarnings("unused")
337 Date date = f.get();
338 } catch (Exception ex) {
339 log.error("unexpected error on future.get()", ex);
340 throw new EJBException("unexpected error during future.get():"+ex);
341 }
342 log.info(String.format("%s got async response", df.format(new Date())));
343 }
344 long asyncTime = System.currentTimeMillis() - startTime;
345 log.info(String.format("workAsync time=%d msecs", asyncTime));
346 }
347 }