Class ThreadBarrier
- java.lang.Object
-
- java.util.concurrent.CyclicBarrier
-
- org.elasticsearch.common.util.concurrent.ThreadBarrier
-
public class ThreadBarrier extends java.util.concurrent.CyclicBarrier
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. Barriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.ThreadBarrier
adds a cause toBrokenBarrierException
thrown by aCyclicBarrier.reset()
operation defined byCyclicBarrier
.Sample usage:
- Barrier as a synchronization and Exception handling aid
- Barrier as a trigger for elapsed notification events
class MyTestClass implements RemoteEventListener { final ThreadBarrier barrier; class Worker implements Runnable { public void run() { barrier.await(); //wait for all threads to reach run try { prepare(); barrier.await(); //wait for all threads to prepare process(); barrier.await(); //wait for all threads to process } catch(Exception e){ log("Worker thread caught exception", e); barrier.reset(e); } } } public void testThreads() { barrier = new ThreadBarrier(N_THREADS + 1); for (int i = 0; i < N; ++i) new Thread(new Worker()).start(); try{ barrier.await(); //wait for all threads to reach run barrier.await(); //wait for all threads to prepare barrier.await(); //wait for all threads to process } catch(BrokenBarrierException bbe) { Assert.fail(bbe); } } int actualNotificationCount = 0; public synchronized void notify (RemoteEvent event) { try{ actualNotificationCount++; if (actualNotificationCount == EXPECTED_COUNT) barrier.await(); //signal when all notifications arrive // too many notifications? Assert.assertFalse("Exceeded notification count", actualNotificationCount > EXPECTED_COUNT); } catch(Exception e) { log("Worker thread caught exception", e); barrier.reset(e); } } public void testNotify() { barrier = new ThreadBarrier(N_LISTENERS + 1); registerNotification(); triggerNotifications(); //wait until either all notifications arrive, or //until a MAX_TIMEOUT is reached. barrier.await(MAX_TIMEOUT); //check if all notifications were accounted for or timed-out Assert.assertEquals("Notification count", EXPECTED_COUNT, actualNotificationCount); //inspect that the barrier isn't broken barrier.inspect(); //throws BrokenBarrierException if broken } }
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
ThreadBarrier.BarrierTimer
A Barrier action to be used in conjunction withThreadBarrier
to measure performance between barrier awaits.
-
Constructor Summary
Constructors Constructor Description ThreadBarrier(int parties)
ThreadBarrier(int parties, java.lang.Runnable barrierAction)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description int
await()
int
await(long timeout, java.util.concurrent.TimeUnit unit)
void
inspect()
Inspects if the barrier is broken.boolean
isBroken()
Queries if this barrier is in a broken state.void
reset(java.lang.Exception cause)
Resets the barrier to its initial state.
-
-
-
Method Detail
-
await
public int await() throws java.lang.InterruptedException, java.util.concurrent.BrokenBarrierException
- Overrides:
await
in classjava.util.concurrent.CyclicBarrier
- Throws:
java.lang.InterruptedException
java.util.concurrent.BrokenBarrierException
-
await
public int await(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException, java.util.concurrent.BrokenBarrierException, java.util.concurrent.TimeoutException
- Overrides:
await
in classjava.util.concurrent.CyclicBarrier
- Throws:
java.lang.InterruptedException
java.util.concurrent.BrokenBarrierException
java.util.concurrent.TimeoutException
-
reset
public void reset(java.lang.Exception cause)
Resets the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with aBrokenBarrierException
. Note that resets after a breakage has occurred for other reasons can be complicated to carry out; threads need to re-synchronize in some other way, and choose one to perform the reset. It may be preferable to instead create a new barrier for subsequent use.- Parameters:
cause
- The cause of the BrokenBarrierException
-
isBroken
public boolean isBroken()
Queries if this barrier is in a broken state. Note that ifreset(Exception)
is invoked the barrier will remain broken, whileCyclicBarrier.reset()
will reset the barrier to its initial state andisBroken()
will return false.- Overrides:
isBroken
in classjava.util.concurrent.CyclicBarrier
- Returns:
true
if one or more parties broke out of this barrier due to interruption or timeout since construction or the last reset, or a barrier action failed due to an exception;false
otherwise.- See Also:
inspect()
-
inspect
public void inspect() throws java.util.concurrent.BrokenBarrierException
Inspects if the barrier is broken. If for any reason, the barrier was broken, aBrokenBarrierException
will be thrown. Otherwise, would return gracefully.- Throws:
java.util.concurrent.BrokenBarrierException
- With a nested broken cause.
-
-