import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
* 线程池批量并发工具类
*
*/
@Log4j2
public class ThreadPoolUtil {
public static Builder newBuilder(int threadSize) {
return new Builder(threadSize, null);
}
public static Builder newBuilder(int threadSize, String threadName) {
return new Builder(threadSize, threadName);
}
public static class Builder {
private final ThreadPoolExecutor threadPool;
public Builder(int threadSize, String threadName) {
if (null == threadName || "".equals(threadName)) {
threadName = "thread-pool-";
}
threadName = threadName.endsWith("-") ? threadName + "thread-%d" : threadName + "-thread-%d";
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadName).build();
this.threadPool = new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
}
public <T> Builder execute(List<T> list, int batchSize, Consumer<List<T>> action) {
if (null == list || list.isEmpty()) {
return this;
}
if (batchSize < 1) {
throw new RuntimeException("the parameter batchSize cannot be less than 1");
}
CountDownLatch downLatch = new CountDownLatch((int) Math.ceil((double) list.size() / batchSize));
int index = 0, toIndex = 0, listSize = list.size();
do {
toIndex = Math.min(listSize, toIndex + batchSize);
List<T> subList = list.subList(index * batchSize, toIndex);
threadPool.execute(() -> {
try {
action.accept(subList);
} finally {
downLatch.countDown();
}
});
index++;
} while (toIndex < listSize);
try {
downLatch.await();
} catch (InterruptedException e) {
log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
}
return this;
}
public <T> Builder execute(List<T> list, Consumer<T> action) {
if (null == list || list.isEmpty()) {
return this;
}
CountDownLatch downLatch = new CountDownLatch(list.size());
list.forEach(item -> threadPool.execute(() -> {
try {
action.accept(item);
} finally {
downLatch.countDown();
}
}));
try {
downLatch.await();
} catch (InterruptedException e) {
log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
}
return this;
}
public <T> Builder execute(List<T> list, Consumer<T> firstAction, Consumer<T> action) {
if (null == list || list.isEmpty()) {
return this;
}
firstAction.accept(list.get(0));
CountDownLatch downLatch = new CountDownLatch(list.size());
list.forEach(item -> threadPool.execute(() -> {
try {
action.accept(item);
} finally {
downLatch.countDown();
}
}));
try {
downLatch.await();
} catch (InterruptedException e) {
log.error("ThreadPoolUtil.execute exception:{}", ExceptionUtils.getStackTrace(e));
}
return this;
}
public void shutdown() {
if (!threadPool.isShutdown()) {
threadPool.shutdown();
}
}
}
}
使用
// 创建线程池
ThreadPoolUtil.Builder threadPool = ThreadPoolUtil.newBuilder(10, "syncAcctTask");
// 使用
threadPool.execute(newRes, threadNum - 1, list -> {
try {
// 执行批量保存数据
boolean success = recordService.batchInsert(list);
if (!success) {
log.error("【数据同步】批量保存XXX 失败,{}", JSON.toJSONString(list));
}
} catch (Exception e) {
log.error("【数据同步】批量保存XXX 异常,list:{},exception:{}", JSON.toJSONString(list), ExceptionUtils.getStackTrace(e));
}
});
// 关闭线程池,不能少!
threadPool.shutdown();
特殊说明: 上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取全部资料 ❤