1
2
3
4
5
6
7
8
9
10
11
12
13 package org.gscg.common;
14
15 /***
16 * Also called counting semaphores, Dijkkstra semaphores are used to control
17 * access to a set of resources. A Dijkstra semaphore has a count associated
18 * with it and each acquire() call reduces the count. A thread that tries to
19 * acquire() a Dijkstra semaphore with a zero count blocks until someone else
20 * calls release() thus increasing the count.
21 *
22 * @author Karthik Rangaraju
23 * @version $Revision: 1.1 $ $Date: 2005/06/16 12:33:54 $
24 * @since 1.1.3
25 */
26 public class DijkstraSemaphore
27 {
28 /*** currentCount */
29 private int currentCount;
30
31 /*** max count */
32 private int maximumCount;
33
34 /*** starvation lock */
35 private Object starvationLock = new Object();
36
37 /***
38 * Creates a Dijkstra semaphore with the specified max count and initial
39 * count set to the max count (all resources released)
40 *
41 * @param maxCount is the max semaphores that can be acquired
42 */
43 public DijkstraSemaphore(final int maxCount)
44 {
45 this(maxCount, maxCount);
46 }
47
48 /***
49 * Creates a Dijkstra semaphore with the specified max count and an initial
50 * count of acquire() operations that are assumed to have already been
51 * performed.
52 *
53 * @param maxCount is the max semaphores that can be acquired
54 * @param initialCount is the current count (setting it to zero means all
55 * semaphores have already been acquired). 0 <= pInitialCount <=
56 * pMaxCount
57 */
58 public DijkstraSemaphore(final int maxCount, final int initialCount)
59 {
60 this.maximumCount = maxCount;
61 this.currentCount = initialCount;
62 }
63
64 /***
65 * If the count is non-zero, acquires a semaphore and decrements the count
66 * by 1, otherwise blocks until a release() is executed by some other
67 * thread.
68 *
69 * @throws InterruptedException is the thread is interrupted when blocked
70 * @see #tryAcquire()
71 * @see #acquireAll()
72 */
73 public void acquire() throws InterruptedException
74 {
75 synchronized (this)
76 {
77
78
79
80 while (this.currentCount == 0)
81 {
82 wait();
83 }
84 this.currentCount--;
85 synchronized (this.starvationLock)
86 {
87 if (this.currentCount == 0)
88 {
89 this.starvationLock.notify();
90 }
91 }
92 }
93 }
94
95 /***
96 * Non-blocking version of acquire().
97 *
98 * @return true if semaphore was acquired (count is decremented by 1), false
99 * otherwise
100 */
101 public boolean tryAcquire()
102 {
103 synchronized (this)
104 {
105 if (this.currentCount != 0)
106 {
107 this.currentCount--;
108 synchronized (this.starvationLock)
109 {
110 if (this.currentCount == 0)
111 {
112 this.starvationLock.notify();
113 }
114 }
115 return true;
116 }
117 return false;
118 }
119 }
120
121 /***
122 * Releases a previously acquires semaphore and increments the count by one.
123 * Does not check if the thread releasing the semaphore was a thread that
124 * acquired the semaphore previously. If more releases are performed than
125 * acquires, the count is not increased beyond the max count specified
126 * during construction.
127 *
128 * @see #release( int pCount )
129 * @see #releaseAll()
130 */
131 public void release()
132 {
133 synchronized (this)
134 {
135 this.currentCount++;
136 if (this.currentCount > this.maximumCount)
137 {
138 this.currentCount = this.maximumCount;
139 }
140 notify();
141 }
142 }
143
144 /***
145 * Same as release() except that the count is increased by pCount instead of
146 * 1. The resulting count is capped at max count specified in the
147 * constructor
148 *
149 * @param count is the amount by which the counter should be incremented
150 * @see #release()
151 */
152 public void release(final int count)
153 {
154 synchronized (this)
155 {
156 if (this.currentCount + count > this.maximumCount)
157 {
158 this.currentCount = this.maximumCount;
159 } else
160 {
161 this.currentCount += count;
162 }
163 notifyAll();
164 }
165 }
166
167 /***
168 * Tries to acquire all the semaphores thus bringing the count to zero.
169 *
170 * @throws InterruptedException if the thread is interrupted when blocked on
171 * this call
172 * @see #acquire()
173 * @see #releaseAll()
174 */
175 public void acquireAll() throws InterruptedException
176 {
177 synchronized (this)
178 {
179 for (int index = 0; index < this.maximumCount; index++)
180 {
181 acquire();
182 }
183 }
184 }
185
186 /***
187 * Releases all semaphores setting the count to max count. Warning: If this
188 * method is called by a thread that did not make a corresponding
189 * acquireAll() call, then you better know what you are doing!
190 *
191 * @see #acquireAll()
192 */
193 public void releaseAll()
194 {
195 synchronized (this)
196 {
197 release(this.maximumCount);
198 notifyAll();
199 }
200 }
201
202 /***
203 * sets the maximum count
204 *
205 * @param maximumCount the maximum count
206 */
207 public void setMaximumCount(final int maximumCount)
208 {
209 synchronized (this)
210 {
211 this.maximumCount = maximumCount;
212 this.currentCount = this.maximumCount;
213 }
214 }
215
216 /***
217 * @return returns the maximum count for the dijkstra semaphore
218 */
219 public int getMaximumCount()
220 {
221 return this.maximumCount;
222 }
223
224 /***
225 * This method blocks the calling thread until the count drops to zero. The
226 * method is not stateful and hence a drop to zero will not be recognized if
227 * a release happens before this call. You can use this method to implement
228 * threads that dynamically increase the resource pool or that log
229 * occurences of resource starvation. Also called a reverse-sensing
230 * semaphore
231 *
232 * @throws InterruptedException if the thread is interrupted while waiting
233 */
234 public void starvationCheck() throws InterruptedException
235 {
236 synchronized (this.starvationLock)
237 {
238 if (this.currentCount != 0)
239 {
240 this.starvationLock.wait();
241 }
242 }
243 }
244 }