View Javadoc

1   package org.jmage.pool;
2   
3   import org.apache.log4j.Logger;
4   
5   import java.util.Date;
6   import java.util.HashSet;
7   import java.util.Iterator;
8   import java.util.Set;
9   
10  /***
11   * WorkerPoolImpl
12   */
13  public class WorkerPoolImpl implements WorkerPool {
14      protected int timeoutSeconds = 0;
15      protected static final int DEFAULT_TIMEOUT_SECONDS = 10;
16  
17      public static final int MAX_JOBS = 1;
18  
19      protected Set workerPool;
20      protected static Logger log = Logger.getLogger(WorkerPoolImpl.class.getName());
21      private static final String WORKER_HIRED = " hired next available worker: ";
22      private static final String WORKER_FREED = " freed worker: ";
23      private static final String WORKER_ADDED = " added worker to pool: ";
24      private static final String WORKER_REMOVED = " removed worker from pool: ";
25      private static final String FREE_ERROR = "unable to free worker, not part of this pool: ";
26      private static final String ADD_ERROR = "unable to add worker to pool, worker already a member: ";
27      private static final String REMOVE_ERROR = "unable to remove worker from pool, worker not a pool member: ";
28      private static final String TIMEOUT_ERROR = "unable to set timeout period less than one second";
29      private static final String TIMEOUT_SUCCESS = " settimeout seconds for WorkerPool to: ";
30      private static final String RESET_WORKER = " reset dozing worker for new assignment: ";
31  
32      public WorkerPoolImpl() {
33          workerPool = new HashSet();
34          timeoutSeconds = DEFAULT_TIMEOUT_SECONDS;
35      }
36  
37      public int getTimeoutSeconds() throws WorkerException {
38          return timeoutSeconds;
39      }
40  
41      public void setTimeoutSeconds(int timeoutSeconds) throws WorkerException {
42          if (timeoutSeconds < 1) {
43              throw new WorkerException(TIMEOUT_ERROR);
44          }
45          this.timeoutSeconds = timeoutSeconds;
46          if (log.isDebugEnabled()) log.debug(TIMEOUT_SUCCESS + this.timeoutSeconds);
47      }
48  
49      public void addWorker(Worker worker) throws WorkerException {
50          if (workerPool.contains(worker)) {
51              throw new WorkerException(ADD_ERROR + worker);
52          }
53          worker.setLastReport(new Date(System.currentTimeMillis()));
54          workerPool.add(worker);
55          if (log.isDebugEnabled()) log.debug(WORKER_ADDED + worker);
56      }
57  
58      public void removeWorker(Worker worker) throws WorkerException {
59          if (!workerPool.contains(worker)) {
60              throw new WorkerException(REMOVE_ERROR + worker);
61          }
62          workerPool.remove(worker);
63          if (log.isDebugEnabled()) log.debug(WORKER_REMOVED + worker);
64      }
65  
66      public Worker hireWorker() throws WorkerException {
67          while (true) {
68              Object[] workerPoolArray = workerPool.toArray();
69              for (int i = 0; i < workerPoolArray.length; i++) {
70                  Worker worker = (Worker) workerPoolArray[i];
71                  //TODO: this should go into it's own thread?
72                  if (worker.getJobCount() < MAX_JOBS) {
73                      worker.incJobCount();
74                      worker.setLastReport(new Date(System.currentTimeMillis()));
75                      if (log.isDebugEnabled()) log.debug(WORKER_HIRED + worker);
76                      return worker;
77                  } else {
78                      if (this.hasDozedOff(worker)) {
79                          this.resetWorker(worker);
80                      }
81                  }
82              }
83          }
84      }
85  
86      public Set getAllWorkers() throws WorkerException {
87          return workerPool;
88      }
89  
90      public void freeWorker(Worker worker) throws WorkerException {
91          if (workerPool.contains(worker)) {
92              worker.decJobCount();
93              worker.setLastReport(new Date(System.currentTimeMillis()));
94              if (log.isDebugEnabled()) log.debug(WORKER_FREED + worker);
95          } else {
96              throw new WorkerException(FREE_ERROR + worker);
97          }
98      }
99  
100     public void freeWorkerFor(Object object) throws WorkerException {
101         Iterator it = workerPool.iterator();
102         while (it.hasNext()) {
103             Worker worker = (Worker) it.next();
104             //TODO: check why second half of if statement fixes dynamic proxy weirdness??
105             if (object.equals(worker.getObject()) ||
106                     object.hashCode() == worker.getObject().hashCode()) {
107                 this.freeWorker(worker);
108             }
109         }
110     }
111 
112     protected boolean hasDozedOff(Worker worker) {
113         return ((System.currentTimeMillis() - worker.getLastReport().getTime()) / 1000l) >= timeoutSeconds &&
114                 worker.getJobCount() >= MAX_JOBS;
115     }
116 
117     protected void resetWorker(Worker worker) {
118         worker.setJobCount(0);
119         worker.setLastReport(new Date(System.currentTimeMillis()));
120         if (log.isDebugEnabled()) log.debug(RESET_WORKER + worker);
121     }
122 }