链接地址:
上一篇中,我们简单的介绍了一下Guava Concurrency Monitor监控类,并对Monitor的源码结构进行了分析,对Monitor中的一些重要的方法做了简单的整理,那么在实际的工作中我们又该怎样去应用Monitor呢?接下来,我们首先通过分析GitHub上面的开源代码,来继续进行Guava Concurrency Monitor的学习。
下面的开源代码:包含Monitor的实例代码MonitorExample和对应的测试类MonitorExampleTest,由于针对的是Guava较早的版本,所以其中的一些API已经舍弃,不过仍然具有很高的学习价值,代码如下:
package guava; import com.google.common.util.concurrent.Monitor; import java.util.concurrent.atomic.AtomicInteger; /** * 原文地址:https://gist.github.com/bbejeck/1369371 * User: bbejeck */public class MonitorExample { private final Monitor monitor = new Monitor(); private volatile boolean condition = true; private int taskDoneCounter; //AtomicInteger:线程安全的加减操作 private AtomicInteger taskSkippedCounter = new AtomicInteger(0); private int stopTaskCount; private Monitor.Guard conditionGuard = new Monitor.Guard(monitor) { @Override public boolean isSatisfied() { return condition; } }; public void demoTryEnterIf() throws InterruptedException { if (monitor.tryEnterIf(conditionGuard)) { try { simulatedWork(); taskDoneCounter++; } finally { monitor.leave(); } } else { //自增加1 taskSkippedCounter.incrementAndGet(); } } public void demoEnterIf() throws InterruptedException { if (monitor.enterIf(conditionGuard)) { try { taskDoneCounter++; if (taskDoneCounter == stopTaskCount) { condition = false; } } finally { monitor.leave(); } } else { taskSkippedCounter.incrementAndGet(); } } public void demoEnterWhen() throws InterruptedException { monitor.enterWhen(conditionGuard); try { taskDoneCounter++; if (taskDoneCounter == stopTaskCount) { condition = false; } } finally { monitor.leave(); } } private void simulatedWork() throws InterruptedException { Thread.sleep(250); } // public void reEvaluateGuardCondition() {// monitor.reevaluateGuards();// } public int getStopTaskCount() { return stopTaskCount; } public void setStopTaskCount(int stopTaskCount) { this.stopTaskCount = stopTaskCount; } public void setCondition(boolean condition) { this.condition = condition; } public int getTaskSkippedCounter() { return taskSkippedCounter.get(); } public int getTaskDoneCounter() { return taskDoneCounter; }}
package guava; import org.junit.After;import org.junit.Before;import org.junit.Test; import java.lang.reflect.Method;import java.util.concurrent.*; import static org.hamcrest.CoreMatchers.is;import static org.junit.Assert.assertThat; /** * 原文地址:https://gist.github.com/bbejeck/1369371 * User: bbejeck */public class MonitorExampleTest { private MonitorExample monitorExample; private ExecutorService executorService; private int numberThreads = 10; // CountDownLatch:同步辅助类,允许一个或多个线程等待其他线程所执行的一组操作完成 private CountDownLatch startSignal; private CountDownLatch doneSignal; @Before public void setUp() throws Exception { monitorExample = new MonitorExample(); executorService = Executors.newFixedThreadPool(numberThreads); startSignal = new CountDownLatch(1); doneSignal = new CountDownLatch(numberThreads); } @After public void tearDown() { executorService.shutdownNow(); } /** * 第一个线程会进入Monitor调用simulatedWork()后线程等待 * 其余9个线程则会进入else,对taskSkippedCounter自增 * * @throws Exception */ @Test public void testDemoTryEnterIf() throws Exception { setUpThreadsForTestingMethod("demoTryEnterIf"); startAllThreadsForTest(); waitForTestThreadsToFinish(); int expectedTaskCount = 1; int expectedSkippedTasks = 9; assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount)); assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks)); } /** * 前5个线程会等待Monitor,因为Guard的isSatisfied()为true * 但是一旦isSatisfied()变为false,剩余的线程会进入else, * 对taskSkippedCounter自增 * * @throws Exception */ @Test public void testDemoEnterIfOnlyFiveTasksComplete() throws Exception { monitorExample.setStopTaskCount(5); setUpThreadsForTestingMethod("demoEnterIf"); startAllThreadsForTest(); waitForTestThreadsToFinish(); int expectedTaskCount = 5; int expectedSkippedTasks = 5; assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount)); assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks)); } /** * 所有10个线程都会进入Monitor,因为在整个时间内Guard的isSatisfied()为true * * @throws Exception */ @Test public void testDemoEnterIfAllTasksComplete() throws Exception { monitorExample.setStopTaskCount(Integer.MAX_VALUE); setUpThreadsForTestingMethod("demoEnterIf"); startAllThreadsForTest(); waitForTestThreadsToFinish(); int expectedTaskCount = 10; int expectedSkippedTasks = 0; assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount)); assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks)); } /** * Guard的isSatisfied()初始化为true,但是所有10个线程会进入Monitor * * @throws Exception */ @Test public void testDemoEnterWhen() throws Exception { monitorExample.setStopTaskCount(Integer.MAX_VALUE); monitorExample.setCondition(false); setUpThreadsForTestingMethod("demoEnterWhen"); startAllThreadsForTest(); int expectedCompletedCount = 0; int completedCount = monitorExample.getTaskDoneCounter(); assertThat(completedCount, is(expectedCompletedCount)); monitorExample.setCondition(true); waitForTestThreadsToFinish(); expectedCompletedCount = 10; completedCount = monitorExample.getTaskDoneCounter(); assertThat(completedCount, is(expectedCompletedCount)); } /** * 在3个线程完成工作后,人为的设置Guard的isSatisfied()为false * 以证明剩余的7个线程将等待,直到isSatisfied()变为true * 然后才会进入Monitor. * * @throws Exception */ @Test public void testDemoEnterWhenAllTasksCompleteEvenWhenConditionChanges() throws Exception { monitorExample.setCondition(true); monitorExample.setStopTaskCount(3); setUpThreadsForTestingMethod("demoEnterWhen"); startAllThreadsForTest(); //验证最初只有3个线程工作, 重新设定Guard的isSatisfied()为true FutureTaskcheckInitialTasksCompleted = new FutureTask ( new Callable () { public Integer call() { int initialCompletedTasks = monitorExample.getTaskDoneCounter(); monitorExample.setCondition(true);// monitorExample.reEvaluateGuardCondition(); return initialCompletedTasks; } }); new Thread(checkInitialTasksCompleted).start(); int expectedCompletedCount = 3; int completedCount = checkInitialTasksCompleted.get(); assertThat(completedCount, is(expectedCompletedCount)); waitForTestThreadsToFinish(); assertThat(completedCount, is(expectedCompletedCount)); expectedCompletedCount = 10; completedCount = monitorExample.getTaskDoneCounter(); assertThat(completedCount, is(expectedCompletedCount)); } private void waitForTestThreadsToFinish() throws InterruptedException { doneSignal.await(1000l, TimeUnit.MILLISECONDS); } private void startAllThreadsForTest() { startSignal.countDown(); } private Method getMethodUnderTest(String methodName) throws Exception { return monitorExample.getClass().getDeclaredMethod(methodName); } private void setUpThreadsForTestingMethod(String methodName) throws Exception { final Method testMethod = getMethodUnderTest(methodName); for (int i = 0; i < numberThreads; i++) { executorService.execute(new Runnable() { @Override public void run() { try { startSignal.await(); testMethod.invoke(monitorExample); } catch (Exception e) { //异常无须处理 } finally { doneSignal.countDown(); } } }); } } }