Skip to content

04 - 线程池与异步编程

为什么需要线程池?

没有线程池:每个任务创建一个线程
┌──────────────────────────────────────────────┐
│  请求1 → new Thread() → 执行 → 销毁          │
│  请求2 → new Thread() → 执行 → 销毁          │
│  请求3 → new Thread() → 执行 → 销毁          │
│  ...                                         │
│  请求10000 → OOM! (内存不够创建线程了)         │
│                                              │
│  问题: 线程创建/销毁开销大,数量不可控          │
└──────────────────────────────────────────────┘

有线程池:固定数量的线程复用
┌──────────────────────────────────────────────┐
│  请求1 ─┐                                    │
│  请求2 ─┤    ┌────────────────┐              │
│  请求3 ─┼──▶ │ 任务队列        │ ──▶ Worker 0 │
│  请求4 ─┤    │ [req3,req4...] │ ──▶ Worker 1 │
│  ...   ─┘    └────────────────┘ ──▶ Worker 2 │
│                                              │
│  优势: 线程复用,数量可控,有队列缓冲          │
└──────────────────────────────────────────────┘

1. ThreadPoolExecutor — 七大核心参数

java
import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 手动创建线程池(推荐!不要用 Executors 工厂方法)
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4,                    // corePoolSize:    核心线程数
            8,                    // maximumPoolSize: 最大线程数
            60, TimeUnit.SECONDS, // keepAliveTime:   非核心线程空闲超时
            new ArrayBlockingQueue<>(100),  // workQueue: 任务队列
            new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
        );

        // 提交任务
        for (int i = 0; i < 20; i++) {
            int taskId = i;
            executor.execute(() -> {
                System.out.printf("[%s] 执行任务 %d%n",
                    Thread.currentThread().getName(), taskId);
                try { Thread.sleep(1000); } catch (InterruptedException e) {}
            });
        }

        // 优雅关闭
        executor.shutdown();  // 不接受新任务,等已有任务完成
        try {
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                executor.shutdownNow();  // 强制关闭
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}
ThreadPoolExecutor 七大参数详解:

  new ThreadPoolExecutor(
      corePoolSize,        ①
      maximumPoolSize,     ②
      keepAliveTime, unit, ③
      workQueue,           ④
      threadFactory,       ⑤
      handler              ⑥
  );

  ┌─────────────────────────────────────────────────────────────┐
  │                                                             │
  │  ① corePoolSize = 4 (核心线程,一直存活)                      │
  │  ② maximumPoolSize = 8 (最大线程数)                          │
  │  ③ keepAliveTime = 60s (非核心线程空闲超过60s就销毁)           │
  │                                                             │
  │  ┌───────────────────────────────────────────────────┐      │
  │  │                线程池                              │      │
  │  │                                                   │      │
  │  │  核心线程 (常驻)        非核心线程 (按需创建)        │      │
  │  │  ┌────┐┌────┐┌────┐┌────┐  ┌────┐┌────┐┌────┐┌────┐│    │
  │  │  │ W0 ││ W1 ││ W2 ││ W3 │  │ W4 ││ W5 ││ W6 ││ W7 ││    │
  │  │  └────┘└────┘└────┘└────┘  └────┘└────┘└────┘└────┘│    │
  │  │  ◀── corePoolSize ──▶     ◀── 额外线程 ──▶         │    │
  │  │  ◀──────────── maximumPoolSize ────────────▶       │    │
  │  └───────────────────────────────────────────────────┘     │
  │                          ▲                                  │
  │  ④ workQueue ────────────┘                                  │
  │  ┌──────────────────────────┐                               │
  │  │  任务队列 [T5, T6, T7...]│  ← 来不及处理的任务在这排队      │
  │  └──────────────────────────┘                               │
  │                                                             │
  │  ⑤ threadFactory: 自定义线程名(方便排查问题)                 │
  │  ⑥ handler: 队列也满了怎么办?拒绝策略                        │
  │                                                             │
  └─────────────────────────────────────────────────────────────┘

任务提交流程

新任务到来时的处理流程:

                    提交新任务


              ┌────────────────┐
              │ 核心线程满了吗?  │
              └───────┬────────┘

                ┌─────┴─────┐
               No          Yes
                │            │
                ▼            ▼
          创建核心线程   ┌──────────────┐
          执行任务       │ 队列满了吗?  │
                        └──────┬───────┘

                         ┌─────┴─────┐
                        No          Yes
                         │            │
                         ▼            ▼
                    放入任务队列  ┌──────────────────┐
                                │ 达到最大线程数了吗?│
                                └───────┬──────────┘

                                  ┌─────┴─────┐
                                 No          Yes
                                  │            │
                                  ▼            ▼
                            创建非核心线程  执行拒绝策略!
                            执行任务

  注意顺序: 核心线程 → 队列 → 非核心线程 → 拒绝
  不是: 核心线程 → 非核心线程 → 队列!(这是常见误区)

四种拒绝策略

┌──────────────────────┬────────────────────────────────────┐
│  AbortPolicy (默认)   │  直接抛 RejectedExecutionException │
│                      │  → 适合: 关键任务,不允许丢失       │
├──────────────────────┼────────────────────────────────────┤
│  CallerRunsPolicy    │  由调用者线程自己执行任务             │
│                      │  → 适合: 需要所有任务都执行的场景    │
│                      │    (自然起到降速作用)              │
├──────────────────────┼────────────────────────────────────┤
│  DiscardPolicy       │  默默丢弃,不抛异常                 │
│                      │  → 适合: 允许丢失的非关键任务       │
├──────────────────────┼────────────────────────────────────┤
│  DiscardOldestPolicy │  丢弃队列最前面的任务,重新提交      │
│                      │  → 适合: 新任务比旧任务重要的场景    │
└──────────────────────┴────────────────────────────────────┘

线程池大小经验公式

┌──────────────────────────────────────────────────────────┐
│  CPU 密集型(计算多、IO 少):                              │
│  线程数 = CPU 核心数 + 1                                  │
│  例: 8 核 CPU → 9 个线程                                  │
│                                                          │
│  IO 密集型(网络请求、数据库、文件读写):                   │
│  线程数 = CPU 核心数 × 2 ~ CPU 核心数 ÷ (1 - IO占比)     │
│  例: 8 核 CPU,80% 时间在等 IO → 8 ÷ 0.2 = 40 个线程     │
│                                                          │
│  实际建议:                                                │
│  ┌──────────────────────────────────────────────┐        │
│  │  不要纸上谈兵,用压测来确定最佳线程数!          │        │
│  │  监控: 队列大小、活跃线程数、任务完成时间         │        │
│  └──────────────────────────────────────────────┘        │
└──────────────────────────────────────────────────────────┘

2. CompletableFuture — 异步编程利器

java
import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {

    // 模拟异步获取用户信息
    static CompletableFuture<String> fetchUser(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(500);
            return "User-" + userId;
        });
    }

    // 模拟异步获取订单
    static CompletableFuture<String> fetchOrder(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(300);
            return "Order-of-" + userId;
        });
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();

        // 链式调用:获取用户 → 获取订单 → 处理结果
        CompletableFuture<String> result = fetchUser(1)
            .thenCompose(user -> fetchOrder(user))       // 串行依赖
            .thenApply(order -> "处理: " + order)        // 转换结果
            .exceptionally(ex -> "出错: " + ex.getMessage()); // 错误处理

        // 并行执行多个异步任务
        CompletableFuture<String> user1 = fetchUser(1);
        CompletableFuture<String> user2 = fetchUser(2);
        CompletableFuture<String> user3 = fetchUser(3);

        // 等待所有完成
        CompletableFuture.allOf(user1, user2, user3).join();
        System.out.printf("三个并行请求完成,耗时: %dms%n",
            System.currentTimeMillis() - start);  // ~500ms 而非 1500ms

        // 任意一个完成
        CompletableFuture<Object> fastest = CompletableFuture.anyOf(user1, user2);
        System.out.println("最快的: " + fastest.join());

        System.out.println(result.join());
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) {}
    }
}
CompletableFuture 常用 API 速查:

  创建:
  ┌─────────────────────────────────────────────────┐
  │  supplyAsync(() -> result)    有返回值的异步任务  │
  │  runAsync(() -> { })          无返回值的异步任务  │
  └─────────────────────────────────────────────────┘

  转换 / 链式:
  ┌─────────────────────────────────────────────────┐
  │  thenApply(r -> newR)     转换结果(同步)       │
  │  thenCompose(r -> CF)     扁平化(异步串行)      │
  │  thenAccept(r -> {})      消费结果(无返回)      │
  │  thenRun(() -> {})        执行下一步(无输入)     │
  └─────────────────────────────────────────────────┘

  组合:
  ┌─────────────────────────────────────────────────┐
  │  thenCombine(cf, (r1,r2)->r)  合并两个结果      │
  │  allOf(cf1, cf2, cf3)         等所有完成         │
  │  anyOf(cf1, cf2, cf3)         等任一完成         │
  └─────────────────────────────────────────────────┘

  错误处理:
  ┌─────────────────────────────────────────────────┐
  │  exceptionally(ex -> fallback)    异常降级       │
  │  handle((r, ex) -> newR)          统一处理       │
  │  whenComplete((r, ex) -> {})      完成回调       │
  └─────────────────────────────────────────────────┘

  执行时间线示意(串行 vs 并行):

  串行: fetchUser → fetchOrder → process
  ────[500ms]───────[300ms]─────[10ms]───▶ 总计 810ms

  并行: fetchUser(1)
        fetchUser(2)    同时执行
        fetchUser(3)
  ────[500ms]───▶ 总计 500ms (取最慢的)

  thenCompose vs thenApply:
  ┌─────────────────────────────────────────────────────┐
  │  thenApply:    CF<A> → (A → B)      → CF<B>        │
  │  thenCompose:  CF<A> → (A → CF<B>)  → CF<B>        │
  │                                                     │
  │  thenApply   = map    (同步转换)                     │
  │  thenCompose = flatMap (异步串行,避免 CF<CF<B>>)     │
  └─────────────────────────────────────────────────────┘

3. 小结

┌──────────────────────────────────────────────────────────────┐
│              线程池 & 异步编程 速查                             │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  线程池黄金法则:                                              │
│  ┌────────────────────────────────────────────────────────┐  │
│  │  1. 不要用 Executors 工厂(队列可能无界导致 OOM)       │  │
│  │  2. 手动用 ThreadPoolExecutor 设置 7 大参数             │  │
│  │  3. 给线程池命名(排查问题时能分清是哪个池)            │  │
│  │  4. 核心参数用压测确定,不要拍脑袋                      │  │
│  │  5. 一定要 shutdown + awaitTermination                 │  │
│  └────────────────────────────────────────────────────────┘  │
│                                                              │
│  CompletableFuture 选择指南:                                  │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  多个独立任务并行 → allOf + join                     │    │
│  │  任务有依赖关系   → thenCompose 串行                 │    │
│  │  合并两个结果     → thenCombine                      │    │
│  │  错误降级         → exceptionally                    │    │
│  │  超时控制         → orTimeout (JDK 9+)               │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
└──────────────────────────────────────────────────────────────┘

下一节: 05 - 实战项目 — 高并发订单处理系统