1 package org.jmage.pool;
2
3 import org.apache.log4j.Logger;
4
5 import java.util.Date;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.Set;
9
10 /***
11 * WorkerPoolImpl
12 */
13 public class WorkerPoolImpl implements WorkerPool {
14 protected int timeoutSeconds = 0;
15 protected static final int DEFAULT_TIMEOUT_SECONDS = 10;
16
17 public static final int MAX_JOBS = 1;
18
19 protected Set workerPool;
20 protected static Logger log = Logger.getLogger(WorkerPoolImpl.class.getName());
21 private static final String WORKER_HIRED = " hired next available worker: ";
22 private static final String WORKER_FREED = " freed worker: ";
23 private static final String WORKER_ADDED = " added worker to pool: ";
24 private static final String WORKER_REMOVED = " removed worker from pool: ";
25 private static final String FREE_ERROR = "unable to free worker, not part of this pool: ";
26 private static final String ADD_ERROR = "unable to add worker to pool, worker already a member: ";
27 private static final String REMOVE_ERROR = "unable to remove worker from pool, worker not a pool member: ";
28 private static final String TIMEOUT_ERROR = "unable to set timeout period less than one second";
29 private static final String TIMEOUT_SUCCESS = " settimeout seconds for WorkerPool to: ";
30 private static final String RESET_WORKER = " reset dozing worker for new assignment: ";
31
32 public WorkerPoolImpl() {
33 workerPool = new HashSet();
34 timeoutSeconds = DEFAULT_TIMEOUT_SECONDS;
35 }
36
37 public int getTimeoutSeconds() throws WorkerException {
38 return timeoutSeconds;
39 }
40
41 public void setTimeoutSeconds(int timeoutSeconds) throws WorkerException {
42 if (timeoutSeconds < 1) {
43 throw new WorkerException(TIMEOUT_ERROR);
44 }
45 this.timeoutSeconds = timeoutSeconds;
46 if (log.isDebugEnabled()) log.debug(TIMEOUT_SUCCESS + this.timeoutSeconds);
47 }
48
49 public void addWorker(Worker worker) throws WorkerException {
50 if (workerPool.contains(worker)) {
51 throw new WorkerException(ADD_ERROR + worker);
52 }
53 worker.setLastReport(new Date(System.currentTimeMillis()));
54 workerPool.add(worker);
55 if (log.isDebugEnabled()) log.debug(WORKER_ADDED + worker);
56 }
57
58 public void removeWorker(Worker worker) throws WorkerException {
59 if (!workerPool.contains(worker)) {
60 throw new WorkerException(REMOVE_ERROR + worker);
61 }
62 workerPool.remove(worker);
63 if (log.isDebugEnabled()) log.debug(WORKER_REMOVED + worker);
64 }
65
66 public Worker hireWorker() throws WorkerException {
67 while (true) {
68 Object[] workerPoolArray = workerPool.toArray();
69 for (int i = 0; i < workerPoolArray.length; i++) {
70 Worker worker = (Worker) workerPoolArray[i];
71
72 if (worker.getJobCount() < MAX_JOBS) {
73 worker.incJobCount();
74 worker.setLastReport(new Date(System.currentTimeMillis()));
75 if (log.isDebugEnabled()) log.debug(WORKER_HIRED + worker);
76 return worker;
77 } else {
78 if (this.hasDozedOff(worker)) {
79 this.resetWorker(worker);
80 }
81 }
82 }
83 }
84 }
85
86 public Set getAllWorkers() throws WorkerException {
87 return workerPool;
88 }
89
90 public void freeWorker(Worker worker) throws WorkerException {
91 if (workerPool.contains(worker)) {
92 worker.decJobCount();
93 worker.setLastReport(new Date(System.currentTimeMillis()));
94 if (log.isDebugEnabled()) log.debug(WORKER_FREED + worker);
95 } else {
96 throw new WorkerException(FREE_ERROR + worker);
97 }
98 }
99
100 public void freeWorkerFor(Object object) throws WorkerException {
101 Iterator it = workerPool.iterator();
102 while (it.hasNext()) {
103 Worker worker = (Worker) it.next();
104
105 if (object.equals(worker.getObject()) ||
106 object.hashCode() == worker.getObject().hashCode()) {
107 this.freeWorker(worker);
108 }
109 }
110 }
111
112 protected boolean hasDozedOff(Worker worker) {
113 return ((System.currentTimeMillis() - worker.getLastReport().getTime()) / 1000l) >= timeoutSeconds &&
114 worker.getJobCount() >= MAX_JOBS;
115 }
116
117 protected void resetWorker(Worker worker) {
118 worker.setJobCount(0);
119 worker.setLastReport(new Date(System.currentTimeMillis()));
120 if (log.isDebugEnabled()) log.debug(RESET_WORKER + worker);
121 }
122 }