2022-09-25并发编程系列00
请注意,本文编写于 579 天前,最后修改于 579 天前,其中某些信息可能已经过时。

CountdownLatch

基本介绍

基本概念

  • CountDownLatch允许一个或者多个线程去等待其他线程完成操作。

  • CountDownLatch接收一个int型参数,表示要等待的工作线程的个数。

  • 也不一定是多线程,在单线程中可以用这个int型参数表示多个操作步骤。

常用方法

image-20220924101918880

CountDownLatch 用来进行线程同步协作,等待所有线程完成倒计时。

其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

基本使用

基本用法

package cn.knightzz.countdown_latch.test;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.lang.Thread.sleep;

/**
 * @author 王天赐
 * @title: CountdownLatchTest
 * @projectName hm-juc-codes
 * @description:
 * @website <a href="http://knightzz.cn/">http://knightzz.cn/</a>
 * @github <a href="https://github.com/knightzz1998">https://github.com/knightzz1998</a>
 * @create: 2022-09-24 10:27
 */
@SuppressWarnings("all")
@Slf4j(topic = "c.CountdownLatchTest")
public class CountdownLatchTest {

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch latch = new CountDownLatch(3);

        Thread t1 = new Thread(() -> {
            log.debug("start ... ");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            latch.countDown();
            log.debug("end ... ");
        }, "t1");

        Thread t2 = new Thread(() -> {
            log.debug("start ... ");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            latch.countDown();
            log.debug("end ... ");
        }, "t2");

        Thread t3 = new Thread(() -> {
            log.debug("start ... ");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            latch.countDown();
            log.debug("end ... ");
        }, "t3");

        t1.start();
        t2.start();
        t3.start();

        // await 等待计数器清零
        log.debug("wait start ... ");
        // 等待 countDown 通知
        sleep(100);
        latch.await();
        log.debug("wait end ... ");
    }
}

执行结果如下 :

11:34:56.760 [main] DEBUG c.CountdownLatchTest - wait start ... 
11:34:56.760 [t2] DEBUG c.CountdownLatchTest - start ... 
11:34:56.760 [t3] DEBUG c.CountdownLatchTest - start ... 
11:34:56.760 [t1] DEBUG c.CountdownLatchTest - start ... 
11:34:57.769 [t1] DEBUG c.CountdownLatchTest - end ... 
11:34:58.775 [t3] DEBUG c.CountdownLatchTest - end ... 
11:34:59.773 [t2] DEBUG c.CountdownLatchTest - end ... 
11:34:59.773 [main] DEBUG c.CountdownLatchTest - wait end ... 

Process finished with exit code 0

可以看到上面的执行结果:

  • 等待计数器countDown执行完以后, latch.await(); 才会结束等待, 否则就会一直等待

与线程池结合

实现一个简单小案例 :

  • 创建大小为4的线程池
  • 前3个线程 t1~3 用于计算 , t4 用于汇总
  • 等待前3个计算完成以后, t4线程才开始汇总
package cn.knightzz.countdown_latch.test;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static java.lang.Thread.sleep;

/**
 * @author 王天赐
 * @title: CountdownLatchTest
 * @projectName hm-juc-codes
 * @description:
 * @website <a href="http://knightzz.cn/">http://knightzz.cn/</a>
 * @github <a href="https://github.com/knightzz1998">https://github.com/knightzz1998</a>
 * @create: 2022-09-24 10:27
 */
@SuppressWarnings("all")
@Slf4j(topic = "c.CountdownLatchPoolTest")
public class CountdownLatchPoolTest {

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch latch = new CountDownLatch(3);

        ExecutorService service = Executors.newFixedThreadPool(4);

        service.submit(() -> {
            log.debug("start ... ");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            latch.countDown();
            log.debug("end ... ");
        }, "t1");

        service.submit(() -> {
            log.debug("start ... ");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            latch.countDown();
            log.debug("end ... ");
        }, "t2");

        service.submit(() -> {
            log.debug("start ... ");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            latch.countDown();
            log.debug("end ... ");
        }, "t3");

        service.submit(() -> {
            log.debug("sum start ... ");
            try {
                latch.await();
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.debug("sun end ... ");
        }, "t4");
    }
}

基本案例

等待多线程执行完毕

package cn.knightzz.countdown_latch.apply;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author 王天赐
 * @title: WaitThreadsCompete
 * @projectName hm-juc-codes
 * @description: 等待多个线程执行结束
 * @website <a href="http://knightzz.cn/">http://knightzz.cn/</a>
 * @github <a href="https://github.com/knightzz1998">https://github.com/knightzz1998</a>
 * @create: 2022-09-24 14:27
 */
@SuppressWarnings("all")
@Slf4j(topic = "c.WaitThreadsCompete")
public class WaitThreadsCompete {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService service = Executors.newFixedThreadPool(11);
        CountDownLatch latch = new CountDownLatch(10);
        String[] steps = new String[10];
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            // 线程池中不能使用外部的变量, 只能使用常量
            int k = i;
            service.submit(() -> {
                for (int j = 1; j <= 100; j++) {
                    try {
                        Thread.sleep(random.nextInt(100));
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    steps[k] = j + "%";
                    System.out.print("\r" + Arrays.toString(steps));
                }
                // 计数减一
                latch.countDown();
            });
        }


        latch.await();
        System.out.println("\n加载完毕, 游戏开始");
        service.shutdown();

    }
}

执行结果如下 :

[100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%]
加载完毕, 游戏开始

Process finished with exit code 0

等待多个远程调用

场景描述

场景 :

  • 我们需要多个线程调用远程的 Controller , 等待所有的Controller执行完毕以后
  • 再进行后续的执行.

相关依赖

 		<!-- Rest 依赖 -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>5.2.6.RELEASE</version>
            </dependency>
            <!-- RestTemplate 必备, 否则Map数据无法转换-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.12.6</version>
            </dependency>
  • jackson-databind 这个是必须的, 否则 Map没法转

服务提供方

package cn.knightzz.remoteweb.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;


@RestController
public class CountDownlatchController {

    @GetMapping("/order/{id}")
    public Map<String, Object> order(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("total", "2300.00");
        sleep(2000);
        return map;
    }

    @GetMapping("/product/{id}")
    public Map<String, Object> product(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        if (id == 1) {
            map.put("name", "小爱音箱");
            map.put("price", 300);
        } else if (id == 2) {
            map.put("name", "小米手机");
            map.put("price", 2000);
        }
        map.put("id", id);
        sleep(1000);
        return map;
    }

    @GetMapping("/logistics/{id}")
    public Map<String, Object> logistics(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("name", "中通快递");
        sleep(2500);
        return map;
    }

    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

远程调用

private static void test3() throws InterruptedException, ExecutionException {
        RestTemplate restTemplate = new RestTemplate();
        log.debug("begin");
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(4);
        Future<Map<String,Object>> f1 = service.submit(() -> {
            Map<String, Object> response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
            return response;
        });
        Future<Map<String, Object>> f2 = service.submit(() -> {
            Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
            return response1;
        });
        Future<Map<String, Object>> f3 = service.submit(() -> {
            Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
            return response1;
        });
        Future<Map<String, Object>> f4 = service.submit(() -> {
            Map<String, Object> response3 = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
            return response3;
        });

        System.out.println(f1.get());
        System.out.println(f2.get());
        System.out.println(f3.get());
        System.out.println(f4.get());
        log.debug("执行完毕");
        service.shutdown();
    }

执行结果如下

16:16:25.116 [main] DEBUG c.RemoteCountdownLatch - begin
16:16:25.154 [pool-1-thread-4] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/logistics/1
16:16:25.154 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/product/1
16:16:25.154 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/order/1
16:16:25.154 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - HTTP GET http://localhost:8080/product/2
16:16:25.174 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate - Accept=[application/json, application/*+json]
16:16:25.174 [pool-1-thread-4] DEBUG org.springframework.web.client.RestTemplate - Accept=[application/json, application/*+json]
16:16:25.174 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - Accept=[application/json, application/*+json]
16:16:25.174 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate - Accept=[application/json, application/*+json]
16:16:26.198 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:16:26.198 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:16:26.203 [pool-1-thread-2] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.util.Map<?, ?>]
16:16:26.203 [pool-1-thread-3] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.util.Map<?, ?>]
16:16:26.224 [pool-1-thread-2] DEBUG c.RemoteCountdownLatch - result {price=300, name=小爱音箱, id=1}
16:16:27.199 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:16:27.199 [pool-1-thread-1] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.util.Map<?, ?>]
{total=2300.00, id=1}
{price=300, name=小爱音箱, id=1}
{price=2000, name=小米手机, id=2}
16:16:27.688 [pool-1-thread-4] DEBUG org.springframework.web.client.RestTemplate - Response 200 OK
16:16:27.689 [pool-1-thread-4] DEBUG org.springframework.web.client.RestTemplate - Reading to [java.util.Map<?, ?>]
{name=中通快递, id=1}
16:16:27.689 [main] DEBUG c.RemoteCountdownLatch - 执行完毕

本文作者:王天赐

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!