博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Guava库学习:学习Concurrency(二)Monitor_2
阅读量:6464 次
发布时间:2019-06-23

本文共 8706 字,大约阅读时间需要 29 分钟。

hot3.png

    链接地址:

    上一篇中,我们简单的介绍了一下Guava Concurrency Monitor监控类,并对Monitor的源码结构进行了分析,对Monitor中的一些重要的方法做了简单的整理,那么在实际的工作中我们又该怎样去应用Monitor呢?接下来,我们首先通过分析GitHub上面的开源代码,来继续进行Guava Concurrency Monitor的学习。

 

    下面的开源代码:包含Monitor的实例代码MonitorExample和对应的测试类MonitorExampleTest,由于针对的是Guava较早的版本,所以其中的一些API已经舍弃,不过仍然具有很高的学习价值,代码如下:

package guava; import com.google.common.util.concurrent.Monitor; import java.util.concurrent.atomic.AtomicInteger; /** * 原文地址:https://gist.github.com/bbejeck/1369371 * User: bbejeck */public class MonitorExample {     private final Monitor monitor = new Monitor();    private volatile boolean condition = true;    private int taskDoneCounter;    //AtomicInteger:线程安全的加减操作    private AtomicInteger taskSkippedCounter = new AtomicInteger(0);    private int stopTaskCount;     private Monitor.Guard conditionGuard = new Monitor.Guard(monitor) {        @Override        public boolean isSatisfied() {            return condition;        }    };     public void demoTryEnterIf() throws InterruptedException {        if (monitor.tryEnterIf(conditionGuard)) {            try {                simulatedWork();                taskDoneCounter++;            } finally {                monitor.leave();            }        } else {            //自增加1            taskSkippedCounter.incrementAndGet();        }    }     public void demoEnterIf() throws InterruptedException {        if (monitor.enterIf(conditionGuard)) {            try {                taskDoneCounter++;                if (taskDoneCounter == stopTaskCount) {                    condition = false;                }            } finally {                monitor.leave();            }        } else {            taskSkippedCounter.incrementAndGet();        }     }     public void demoEnterWhen() throws InterruptedException {        monitor.enterWhen(conditionGuard);        try {            taskDoneCounter++;            if (taskDoneCounter == stopTaskCount) {                condition = false;            }        } finally {            monitor.leave();        }    }     private void simulatedWork() throws InterruptedException {        Thread.sleep(250);    } //    public void reEvaluateGuardCondition() {//        monitor.reevaluateGuards();//    }     public int getStopTaskCount() {        return stopTaskCount;    }     public void setStopTaskCount(int stopTaskCount) {        this.stopTaskCount = stopTaskCount;    }     public void setCondition(boolean condition) {        this.condition = condition;    }     public int getTaskSkippedCounter() {        return taskSkippedCounter.get();    }     public int getTaskDoneCounter() {        return taskDoneCounter;    }}
package guava; import org.junit.After;import org.junit.Before;import org.junit.Test; import java.lang.reflect.Method;import java.util.concurrent.*; import static org.hamcrest.CoreMatchers.is;import static org.junit.Assert.assertThat; /** * 原文地址:https://gist.github.com/bbejeck/1369371 * User: bbejeck */public class MonitorExampleTest {     private MonitorExample monitorExample;    private ExecutorService executorService;    private int numberThreads = 10;    // CountDownLatch:同步辅助类,允许一个或多个线程等待其他线程所执行的一组操作完成    private CountDownLatch startSignal;    private CountDownLatch doneSignal;     @Before    public void setUp() throws Exception {        monitorExample = new MonitorExample();        executorService = Executors.newFixedThreadPool(numberThreads);        startSignal = new CountDownLatch(1);        doneSignal = new CountDownLatch(numberThreads);    }     @After    public void tearDown() {        executorService.shutdownNow();    }     /**     * 第一个线程会进入Monitor调用simulatedWork()后线程等待     * 其余9个线程则会进入else,对taskSkippedCounter自增     *     * @throws Exception     */    @Test    public void testDemoTryEnterIf() throws Exception {        setUpThreadsForTestingMethod("demoTryEnterIf");        startAllThreadsForTest();        waitForTestThreadsToFinish();        int expectedTaskCount = 1;        int expectedSkippedTasks = 9;        assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount));        assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks));    }     /**     * 前5个线程会等待Monitor,因为Guard的isSatisfied()为true     * 但是一旦isSatisfied()变为false,剩余的线程会进入else,     * 对taskSkippedCounter自增     *     * @throws Exception     */    @Test    public void testDemoEnterIfOnlyFiveTasksComplete() throws Exception {        monitorExample.setStopTaskCount(5);        setUpThreadsForTestingMethod("demoEnterIf");         startAllThreadsForTest();        waitForTestThreadsToFinish();        int expectedTaskCount = 5;        int expectedSkippedTasks = 5;         assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount));        assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks));     }     /**     * 所有10个线程都会进入Monitor,因为在整个时间内Guard的isSatisfied()为true     *     * @throws Exception     */    @Test    public void testDemoEnterIfAllTasksComplete() throws Exception {        monitorExample.setStopTaskCount(Integer.MAX_VALUE);        setUpThreadsForTestingMethod("demoEnterIf");         startAllThreadsForTest();        waitForTestThreadsToFinish();        int expectedTaskCount = 10;        int expectedSkippedTasks = 0;         assertThat(monitorExample.getTaskDoneCounter(), is(expectedTaskCount));        assertThat(monitorExample.getTaskSkippedCounter(), is(expectedSkippedTasks));     }     /**     * Guard的isSatisfied()初始化为true,但是所有10个线程会进入Monitor     *     * @throws Exception     */    @Test    public void testDemoEnterWhen() throws Exception {        monitorExample.setStopTaskCount(Integer.MAX_VALUE);        monitorExample.setCondition(false);        setUpThreadsForTestingMethod("demoEnterWhen");        startAllThreadsForTest();        int expectedCompletedCount = 0;        int completedCount = monitorExample.getTaskDoneCounter();        assertThat(completedCount, is(expectedCompletedCount));         monitorExample.setCondition(true);         waitForTestThreadsToFinish();        expectedCompletedCount = 10;        completedCount = monitorExample.getTaskDoneCounter();        assertThat(completedCount, is(expectedCompletedCount));    }     /**     * 在3个线程完成工作后,人为的设置Guard的isSatisfied()为false     * 以证明剩余的7个线程将等待,直到isSatisfied()变为true     * 然后才会进入Monitor.     *     * @throws Exception     */    @Test    public void testDemoEnterWhenAllTasksCompleteEvenWhenConditionChanges() throws Exception {        monitorExample.setCondition(true);        monitorExample.setStopTaskCount(3);        setUpThreadsForTestingMethod("demoEnterWhen");        startAllThreadsForTest();         //验证最初只有3个线程工作, 重新设定Guard的isSatisfied()为true        FutureTask
 checkInitialTasksCompleted = new FutureTask
(                new Callable
() {                    public Integer call() {                        int initialCompletedTasks = monitorExample.getTaskDoneCounter();                        monitorExample.setCondition(true);//                        monitorExample.reEvaluateGuardCondition();                        return initialCompletedTasks;                     }                });         new Thread(checkInitialTasksCompleted).start();         int expectedCompletedCount = 3;        int completedCount = checkInitialTasksCompleted.get();        assertThat(completedCount, is(expectedCompletedCount));         waitForTestThreadsToFinish();        assertThat(completedCount, is(expectedCompletedCount));        expectedCompletedCount = 10;        completedCount = monitorExample.getTaskDoneCounter();        assertThat(completedCount, is(expectedCompletedCount));    }     private void waitForTestThreadsToFinish() throws InterruptedException {        doneSignal.await(1000l, TimeUnit.MILLISECONDS);    }     private void startAllThreadsForTest() {        startSignal.countDown();    }     private Method getMethodUnderTest(String methodName) throws Exception {        return monitorExample.getClass().getDeclaredMethod(methodName);    }      private void setUpThreadsForTestingMethod(String methodName) throws Exception {        final Method testMethod = getMethodUnderTest(methodName);        for (int i = 0; i < numberThreads; i++) {            executorService.execute(new Runnable() {                @Override                public void run() {                    try {                        startSignal.await();                        testMethod.invoke(monitorExample);                    } catch (Exception e) {                        //异常无须处理                    } finally {                        doneSignal.countDown();                    }                }            });        }    } }

转载于:https://my.oschina.net/realfighter/blog/349926

你可能感兴趣的文章
DynamoDB Local for Desktop Development
查看>>
用javascript验证哥德巴赫猜想
查看>>
Shell编程-环境变量配置文件
查看>>
[Unity3d]DrawCall优化手记
查看>>
Struts2和Spring MVC的区别
查看>>
理解Javascript参数中的arguments对象
查看>>
p2:千行代码入门python
查看>>
bzoj1106[POI2007]立方体大作战tet*
查看>>
spring boot configuration annotation processor not found in classpath问题解决
查看>>
Mysql中文字符串提取datetime
查看>>
由中序遍历和后序遍历求前序遍历
查看>>
我学习参考的网址
查看>>
easyui的combotree以及tree,c#后台异步加载的详细介绍
查看>>
[Processing]点到线段的最小距离
查看>>
考研随笔2
查看>>
乱码的情况
查看>>
虚拟机centos 同一个tomcat、不同端口访问不同的项目
查看>>
在不花一分钱的情况下,如何验证你的创业想法是否可行?《转》
查看>>
Linux/Android 性能优化工具 perf
查看>>
GitHub使用教程、注册与安装
查看>>