前言:本篇文章系作者本人学习CyclicBarrier处理工作问题之心得记录,仅供参考和学习。有误敬请指正。
一、认识CyclicBarrier
1、介绍:
CyclicBarrier是java.util.concurrent包下的一个线程同步工具,简单来说就是让一组子线程互相执行后并等待其他子线程,直到所有子线程都执行完毕后,再执行指定的子线程任务(也就是barrier屏障点),屏障点的子线程执行完成后,继续重复前面流程,让各子线程执行任务,然后执行屏障点,直到所有子线程满足中断要求后,在同一流程中一同中断,最后执行屏障点,屏障点执行完毕,执行主线程。
2、举例:
双十一你在淘宝上买了十几件商品,然后你发现这十几个商品,今天会到几个,明天会到几个,你又住在没有电梯的8楼,不想一天中一次又一次的下楼取包裹,只想等当天能到的商品都到达菜鸟驿站后,再一次取出全部包裹。每天一趟拿完当天到的商品,直到所有商品拿完。
每天等快递,再拿快递就是一次CyclicBarrier,每天不同的商品会在不同的时间到达菜鸟驿站(就是不同的子线程),直到当天能到的所有的商品都到达菜鸟驿站(就是所有子线程执行完毕),你再下楼去取(这就是屏障点子线程任务),直到没有商品(子线程全部中断),你就不用下楼(不会执行屏障点)。整个CyclicBarrier就结束了。
二、简单使用CyclicBarrier(简单实战)
1、需求说明:
有1000W条订单,需要将该1000W条订单总金额算出来?
2、思路过程:
最简单直观的方式就是用for循环累加,这样写肯定不行,因此考虑分批多线程处理,每20W条订单为一个批次,按照索引均分给四个线程各自累加,分别将结果放在ConcurrentHashMap(线程安全)里面,每批四个线程执行完成后,执行屏障点子线程任务,将四个线程的结果累加,然后存起来,下一批200000条,重复上面操作,由屏障点子线程任务累加到每批的结果中。
3、实战(源码):
public class CountWithCyclicBarrier { // 模拟订单数据 static class Order { long orderId; double amount; Order(long orderId, double amount) { this.orderId = orderId; this.amount = amount; } public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public double getAmount() { return amount; } public void setAmount(double amount) { this.amount = amount; } } // 全局订单列表(模拟大数据集) private static final List<Order> orders = new ArrayList<>(1_000_0000); private static final int TOTAL_ORDERS = 1_000_0000; // 订单总数 private static final int BATCH_SIZE = 200_000; // 每批处理的订单总数 private static final int THREAD_COUNT = 4; // 工作线程数 // 全局总金额(最终结果)AtomicLong (原子操作,线程安全) private static final AtomicLong totalAmount = new AtomicLong(0L); // 当前批次的起始索引(原子操作,保证线程安全) private static final AtomicInteger currentBatchStart = new AtomicInteger(0); // 每个线程在本批次的局部累加结果(key=每批下每个线程结束订单的索引,value=本批部分和) // 使用 ConcurrentHashMap 防止并发写入冲突 private static final ConcurrentHashMap<Integer, Double> partialSums = new ConcurrentHashMap<>(); // 创建屏障点子线程 private static final Runnable barrierAction = ()-> { // 将每个批次累加 double countAmountByBatch = partialSums.values().stream().mapToDouble(Double::doubleValue).sum(); totalAmount.addAndGet((long) countAmountByBatch); partialSums.clear(); // 修改下一批次执行的开始索引 int nextStart = currentBatchStart.addAndGet(BATCH_SIZE); }; // 创建cyclicBarrier private static final CyclicBarrier cb = new CyclicBarrier(THREAD_COUNT,barrierAction); public static void main(String[] args) throws Exception { // 1. 准备模拟数据 initOrders(); System.out.printf("共 %d 条订单,每批 %d 条,%d 个线程并行处理\n\n", TOTAL_ORDERS, BATCH_SIZE, THREAD_COUNT); // 创建线程 Thread[] workers = new Thread[THREAD_COUNT]; for (int i = 0; i < THREAD_COUNT; i++) { // 设置每一个线程,方便后面计算,每个线程分别从那个位置开始处理数据 final int threadIndex = i; // 创建索引线程 workers[i]=new Thread(()->{ try{ // 循环这四个线程 while (true){ // 获取每次线程起始处理订单索引位置 int batchStart = currentBatchStart.get(); // 当索引位置大于总数,说明已执行完整,终止线程 if (batchStart >= TOTAL_ORDERS) { break; } // 均分每批订单总数,并计算出索引 int subSize = BATCH_SIZE / THREAD_COUNT; int start = batchStart + threadIndex * subSize; int end = start + subSize; // 处理剩下的,当起始位置大于总数,也终止线程 if (start >= TOTAL_ORDERS) { break; } // 如果是结束索引大于总数,说明剩下的end位置大于最后索引位置,直接将总数赋值给end if (end > TOTAL_ORDERS) { end = TOTAL_ORDERS; } // 根据索引获取集合处理片段,然后处理数据 double treadCount = orders.subList(start, end).stream().mapToDouble(Order::getAmount).sum(); // 将每个线程处理结果存入map,等待其他线程执行完成 partialSums.put(threadIndex, treadCount); // 等待其他线程 cb.await(); } }catch (Exception e) { e.printStackTrace(); } }); // 启动线程 workers[i].start(); } // 阻塞所有子线程,等待所有子线程终止 for (Thread t : workers) { t.join(); } System.out.println("总数"+totalAmount); } // 初始化模拟订单:简单点,订单ID从1~1000W,订单金额从1~1000W。 private static void initOrders() { for (long i = 1; i <= TOTAL_ORDERS; i++) { orders.add(new Order(i, i)); } System.out.println("测试数据初始化完成\n"); } }三、总结
以前遇到大批量数据处理,就是for处理,但在实际业务中,在缓存中处理大批量数据,不仅执行缓慢,还非常容易内存溢出,导致系统崩溃。以上案例为本人实际业务简化版,实际业务中针对大批量数据,可以分批分页从数据库中获取,处理完一批再从数据库获取下一批次,这样不仅能降低数据库压力,也不会出现内存溢出问题,更能高效利用cpu核心,实现接口的快速响应。