import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.exception.ExceptionUtils;

import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;

/**
 * 线程池批量并发工具类
 *
 */
@Log4j2
public class ThreadPoolUtil {

    public static Builder newBuilder(int threadSize) {
        return new Builder(threadSize, null);
    }

    public static Builder newBuilder(int threadSize, String threadName) {
        return new Builder(threadSize, threadName);
    }

    public static class Builder {

        private final ThreadPoolExecutor threadPool;

        public Builder(int threadSize, String threadName) {
            if (null == threadName || "".equals(threadName)) {
                threadName = "thread-pool-";
            }
            threadName = threadName.endsWith("-") ? threadName + "thread-%d" : threadName + "-thread-%d";
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadName).build();
            this.threadPool = new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
        }

        public <T> Builder execute(List<T> list, int batchSize, Consumer<List<T>> action) {
            if (null == list || list.isEmpty()) {
                return this;
            }
            if (batchSize < 1) {
                throw new RuntimeException("the parameter batchSize cannot be less than 1");
            }
            CountDownLatch downLatch = new CountDownLatch((int) Math.ceil((double) list.size() / batchSize));
            int index = 0, toIndex = 0, listSize = list.size();
            do {
                toIndex = Math.min(listSize, toIndex + batchSize);
                List<T> subList = list.subList(index * batchSize, toIndex);
                threadPool.execute(() -> {
                    try {
                        action.accept(subList);
                    } finally {
                        downLatch.countDown();
                    }
                });
                index++;
            } while (toIndex < listSize);
            try {
                downLatch.await();
            } catch (InterruptedException e) {
                log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
            }
            return this;
        }

        public <T> Builder execute(List<T> list, Consumer<T> action) {
            if (null == list || list.isEmpty()) {
                return this;
            }
            CountDownLatch downLatch = new CountDownLatch(list.size());
            list.forEach(item -> threadPool.execute(() -> {
                try {
                    action.accept(item);
                } finally {
                    downLatch.countDown();
                }
            }));
            try {
                downLatch.await();
            } catch (InterruptedException e) {
                log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
            }
            return this;
        }

        public <T> Builder execute(List<T> list, Consumer<T> firstAction, Consumer<T> action) {
            if (null == list || list.isEmpty()) {
                return this;
            }
            firstAction.accept(list.get(0));
            CountDownLatch downLatch = new CountDownLatch(list.size());
            list.forEach(item -> threadPool.execute(() -> {
                try {
                    action.accept(item);
                } finally {
                    downLatch.countDown();
                }
            }));
            try {
                downLatch.await();
            } catch (InterruptedException e) {
                log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
            }
            return this;
        }

        public void shutdown() {
            if (!threadPool.isShutdown()) {
                threadPool.shutdown();
            }
        }

    }

}

使用

// 创建线程池
ThreadPoolUtil.Builder threadPool = ThreadPoolUtil.newBuilder(10, "syncAcctTask");
// 使用
 threadPool.execute(newRes, threadNum - 1, list -> {
                try {
                    // 执行批量保存数据
                    boolean success = recordService.batchInsert(list);
                    if (!success) {
                        log.error("【数据同步】批量保存XXX 失败,{}", JSON.toJSONString(list));
                    }
                } catch (Exception e) {
                    log.error("【数据同步】批量保存XXX 异常,list:{},exception:{}", JSON.toJSONString(list), ExceptionUtils.getStackTrace(e));
                   
                }
            });
// 关闭线程池,不能少!
threadPool.shutdown();
特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤