主题
05 - 实战:高并发订单处理系统
项目架构
┌────────────────────────────────────────────────────────────────┐
│ 高并发订单处理系统架构 │
│ │
│ 模拟请求 限流 异步处理 │
│ ┌───────┐ ┌──────────┐ ┌───────────────────┐ │
│ │客户端 │────▶│ Semaphore│────▶│ BlockingQueue │ │
│ │并发提交│ │ 限流器 │ │ 订单队列 │ │
│ │订单 │ └──────────┘ └────────┬──────────┘ │
│ └───────┘ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ ThreadPool │ │
│ │ ┌─────┐┌─────┐ │ │
│ │ │ W0 ││ W1 │ │ │
│ │ ├─────┤├─────┤ │ │
│ │ │ W2 ││ W3 │ │ │
│ │ └─────┘└─────┘ │ │
│ └────────┬────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │库存扣减│ │支付处理│ │结果通知│ │
│ │(CAS) │ │(异步) │ │(回调) │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
│ 监控统计: AtomicLong + LongAdder │
└────────────────────────────────────────────────────────────────┘完整代码
java
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
// ===== 订单模型 =====
class Order {
private final String orderId;
private final String product;
private final int quantity;
private final long createTime;
private volatile String status; // volatile 保证可见性
public Order(String orderId, String product, int quantity) {
this.orderId = orderId;
this.product = product;
this.quantity = quantity;
this.createTime = System.currentTimeMillis();
this.status = "CREATED";
}
// getters, setters, toString 省略
public String getOrderId() { return orderId; }
public String getProduct() { return product; }
public int getQuantity() { return quantity; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
@Override
public String toString() {
return String.format("Order{id=%s, product=%s, qty=%d, status=%s}",
orderId, product, quantity, status);
}
}
// ===== 库存管理(CAS 无锁操作)=====
class InventoryManager {
private final ConcurrentHashMap<String, AtomicInteger> stock = new ConcurrentHashMap<>();
public InventoryManager() {
// 初始库存
stock.put("iPhone", new AtomicInteger(100));
stock.put("MacBook", new AtomicInteger(50));
stock.put("AirPods", new AtomicInteger(200));
}
// 扣减库存(CAS 原子操作,线程安全且无锁)
public boolean deductStock(String product, int quantity) {
AtomicInteger current = stock.get(product);
if (current == null) return false;
while (true) {
int existing = current.get();
if (existing < quantity) {
return false; // 库存不足
}
// CAS: 如果当前值还是 existing,就减去 quantity
if (current.compareAndSet(existing, existing - quantity)) {
return true; // 扣减成功
}
// CAS 失败说明被其他线程改了,自动重试
}
}
public int getStock(String product) {
AtomicInteger current = stock.get(product);
return current != null ? current.get() : 0;
}
}
// ===== 统计监控 =====
class OrderMetrics {
private final LongAdder totalOrders = new LongAdder();
private final LongAdder successOrders = new LongAdder();
private final LongAdder failedOrders = new LongAdder();
private final LongAdder totalProcessTimeMs = new LongAdder();
private final AtomicLong maxProcessTimeMs = new AtomicLong(0);
public void recordSuccess(long processTimeMs) {
totalOrders.increment();
successOrders.increment();
totalProcessTimeMs.add(processTimeMs);
maxProcessTimeMs.updateAndGet(old -> Math.max(old, processTimeMs));
}
public void recordFailure() {
totalOrders.increment();
failedOrders.increment();
}
public void printReport() {
long total = totalOrders.sum();
long success = successOrders.sum();
long failed = failedOrders.sum();
long avgMs = total > 0 ? totalProcessTimeMs.sum() / total : 0;
System.out.println("\n╔══════════════════════════════════════╗");
System.out.println("║ 订单处理统计报告 ║");
System.out.println("╠══════════════════════════════════════╣");
System.out.printf("║ 总订单数: %6d ║%n", total);
System.out.printf("║ 成功: %6d ║%n", success);
System.out.printf("║ 失败: %6d ║%n", failed);
System.out.printf("║ 成功率: %5.1f%% ║%n",
total > 0 ? (double) success / total * 100 : 0);
System.out.printf("║ 平均耗时: %4dms ║%n", avgMs);
System.out.printf("║ 最大耗时: %4dms ║%n", maxProcessTimeMs.get());
System.out.println("╚══════════════════════════════════════╝");
}
}
// ===== 订单处理器(核心)=====
class OrderProcessor {
private final ThreadPoolExecutor workerPool;
private final BlockingQueue<Order> orderQueue;
private final Semaphore rateLimiter;
private final InventoryManager inventory;
private final OrderMetrics metrics;
private volatile boolean running = true;
public OrderProcessor(int poolSize, int queueCapacity, int maxConcurrent) {
this.orderQueue = new LinkedBlockingQueue<>(queueCapacity);
this.rateLimiter = new Semaphore(maxConcurrent);
this.inventory = new InventoryManager();
this.metrics = new OrderMetrics();
// 手动创建线程池(不用 Executors!)
this.workerPool = new ThreadPoolExecutor(
poolSize,
poolSize * 2,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity),
r -> {
Thread t = new Thread(r);
t.setName("order-worker-" + t.getId());
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
// 提交订单(带限流)
public CompletableFuture<String> submitOrder(Order order) {
return CompletableFuture.supplyAsync(() -> {
try {
// 限流:最多 maxConcurrent 个并发
if (!rateLimiter.tryAcquire(3, TimeUnit.SECONDS)) {
metrics.recordFailure();
order.setStatus("REJECTED");
return "REJECTED: 系统繁忙";
}
try {
return processOrder(order);
} finally {
rateLimiter.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "ERROR: 被中断";
}
}, workerPool);
}
// 处理单个订单
private String processOrder(Order order) {
long start = System.currentTimeMillis();
try {
// 步骤1:扣减库存(CAS 无锁)
order.setStatus("PROCESSING");
if (!inventory.deductStock(order.getProduct(), order.getQuantity())) {
order.setStatus("FAILED");
metrics.recordFailure();
return "FAILED: 库存不足";
}
// 步骤2:模拟支付处理
Thread.sleep(ThreadLocalRandom.current().nextInt(10, 50));
// 步骤3:完成
order.setStatus("COMPLETED");
long elapsed = System.currentTimeMillis() - start;
metrics.recordSuccess(elapsed);
return "SUCCESS: " + order.getOrderId();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
order.setStatus("ERROR");
metrics.recordFailure();
return "ERROR: " + e.getMessage();
}
}
public void shutdown() {
running = false;
workerPool.shutdown();
try {
workerPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
workerPool.shutdownNow();
}
}
public InventoryManager getInventory() { return inventory; }
public OrderMetrics getMetrics() { return metrics; }
}
// ===== 主程序 =====
public class OrderSystem {
public static void main(String[] args) throws Exception {
// 创建订单处理器:4 核心线程,队列 1000,最多 20 并发
OrderProcessor processor = new OrderProcessor(4, 1000, 20);
String[] products = {"iPhone", "MacBook", "AirPods"};
int totalOrders = 500;
System.out.println("初始库存:");
for (String p : products) {
System.out.printf(" %s: %d%n", p, processor.getInventory().getStock(p));
}
System.out.printf("%n开始模拟 %d 个并发订单...%n%n", totalOrders);
long start = System.currentTimeMillis();
// 模拟大量并发订单
List<CompletableFuture<String>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(totalOrders);
for (int i = 0; i < totalOrders; i++) {
String product = products[ThreadLocalRandom.current().nextInt(products.length)];
int qty = ThreadLocalRandom.current().nextInt(1, 4);
Order order = new Order("ORD-" + String.format("%05d", i), product, qty);
CompletableFuture<String> future = processor.submitOrder(order)
.whenComplete((result, ex) -> latch.countDown());
futures.add(future);
}
// 等待所有订单处理完成
latch.await(30, TimeUnit.SECONDS);
long elapsed = System.currentTimeMillis() - start;
System.out.printf("%n总耗时: %dms (QPS: %.0f)%n", elapsed,
(double) totalOrders / elapsed * 1000);
// 打印统计报告
processor.getMetrics().printReport();
System.out.println("\n剩余库存:");
for (String p : products) {
System.out.printf(" %s: %d%n", p, processor.getInventory().getStock(p));
}
processor.shutdown();
}
}知识点映射
┌──────────────────────────────────────────────────────────┐
│ 本项目使用的 JUC 工具和并发知识: │
├──────────────────────────────────────────────────────────┤
│ │
│ JUC 工具 │ 用途 │
│ ─────────────────────┼────────────────────────── │
│ ThreadPoolExecutor │ 核心线程池,处理订单 │
│ CompletableFuture │ 异步提交和链式处理 │
│ Semaphore │ 限流(最大并发控制) │
│ CountDownLatch │ 等待所有订单处理完成 │
│ ConcurrentHashMap │ 线程安全的库存存储 │
│ AtomicInteger │ CAS 无锁库存扣减 │
│ LongAdder │ 高性能统计计数 │
│ AtomicLong │ 记录最大处理时间 │
│ BlockingQueue │ 线程池的任务队列 │
│ volatile │ 订单状态可见性、停止标志 │
│ ThreadLocalRandom │ 线程安全的随机数 │
│ │
│ 设计模式: │
│ ├── 生产者-消费者 (客户端 → 队列 → Worker) │
│ ├── Worker Pool (固定线程处理任务流) │
│ └── 限流 (Semaphore 控制并发上限) │
│ │
└──────────────────────────────────────────────────────────┘模块二完成!
下一个模块: 模块三:高并发架构设计