Java 并发编程:线程池与线程安全
在 Java 开发中,多线程是提升性能的利器,但直接使用 new Thread() 往往会导致资源耗尽或数据错乱。通过线程池管理线程生命周期,并利用同步机制保证线程安全,是编写高并发程序的必经之路。
一、 创建与管理线程池
频繁创建和销毁线程会消耗大量系统资源。线程池通过复用线程,显著降低开销。Java 提供了 Executor 框架来简化这一过程。
1. 使用标准线程池
避免手动创建 ThreadPoolExecutor,除非你有特殊的调优需求。对于常规业务,直接使用 Executors 工厂类提供的静态方法。
运行以下代码创建一个固定大小的线程池:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 创建一个包含 4 个工作线程的线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交 10 个任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务 ID: " + taskId + " 正由 " + Thread.currentThread().getName() + " 执行");
});
}
// 关闭线程池(不再接受新任务,等待已提交任务完成)
executor.shutdown();
}
}
注意:必须调用 shutdown() 或 shutdownNow(),否则程序可能无法退出。
2. 理解核心参数
在生产环境中,为了防止资源溢出(OOM),推荐显式创建 ThreadPoolExecutor 以掌控所有参数。
配置以下核心参数来定义线程池行为:
| 参数名 | 类型 | 作用 |
|---|---|---|
corePoolSize |
int | 核心线程数,即使空闲也会保留在线程池中 |
maximumPoolSize |
int | 最大线程数,线程池允许创建的最大线程数量 |
keepAliveTime |
long | 非核心线程空闲时的存活时间 |
workQueue |
BlockingQueue | 存放待执行任务的阻塞队列 |
handler |
RejectedExecutionHandler | 当队列和线程池都满时的拒绝策略 |
编写自定义线程池代码:
import java.util.concurrent.*;
public class CustomPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(10), // 任务队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:由调用者线程执行
);
}
}
二、 理解线程安全与数据竞争
当多个线程同时读写同一个共享变量时,可能会发生“线程安全”问题。最典型的后果是“脏读”或数据丢失。
1. 识别竞态条件
观察下面这段不安全的代码:
public class UnsafeCounter {
private int count = 0;
public void increment() {
count++; // 这一步并非原子操作
}
public int getCount() {
return count;
}
}
count++ 操作在字节码层面分为三步:取值、加一、写回。如果两个线程同时取到相同的值,分别加一并写回,最终结果只加了 1 而不是 2。
2. 使用 synchronized 关键字
synchronized 是 Java 提供的内置锁机制,可以确保同一时刻只有一个线程执行被修饰的代码块。
修改上面的计数器代码,添加关键字:
public class SafeCounter {
private int count = 0;
// 同步实例方法,锁住的是 this 对象
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
或者,只同步必要的代码块以减少锁的粒度:
public void increment() {
synchronized(this) {
count++;
}
}
3. 使用原子类
对于简单的计数器、累加器等场景,使用 java.util.concurrent.atomic 包下的原子类比锁效率更高,它们利用了 CPU 的 CAS(比较并交换)指令。
替换 int 为 AtomicInteger:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
// 底层是原子操作,无需加锁
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
三、 线程池工作流程与拒绝策略
理解线程池如何处理任务提交,有助于配置合理的参数,防止任务被丢弃或系统崩溃。
1. 任务处理逻辑
当调用 executor.execute() 提交任务时,线程池内部会按照以下逻辑进行判断和处理:
已满?} B -- 否 --> C[创建核心线程
执行任务] B -- 是 --> D{工作队列
已满?} D -- 否 --> E[任务入队列
等待空闲线程] D -- 是 --> F{最大线程数
已满?} F -- 否 --> G[创建非核心线程
执行任务] F -- 是 --> H[执行拒绝策略
Handler]
2. 选择拒绝策略
当队列满了且线程数达到最大值时,线程池会触发拒绝策略。JDK 提供了四种标准策略:
| 策略名称 | 行为描述 |
|---|---|
AbortPolicy (默认) |
抛出 RejectedExecutionException 异常,阻止系统正常运行 |
CallerRunsPolicy |
由提交任务的线程(调用者)自己去执行该任务,以此降低提交速度 |
DiscardPolicy |
直接丢弃任务,不做任何处理,可能导致数据丢失 |
DiscardOldestPolicy |
丢弃队列里最老的任务,然后尝试再次提交当前任务 |
配置策略示例:
// 当队列和线程都满时,回退到主线程执行,起到“背压”作用
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
四、 常见并发容器
在多线程环境下,常用的集合类如 ArrayList、HashMap 并不是线程安全的。应当使用并发集合来替代。
1. 替换 HashMap
使用 ConcurrentHashMap 代替 HashMap。
在 Java 8 及以上版本中,ConcurrentHashMap 使用了 CAS + synchronized 锁节点的方式,只锁住链表或红黑树的头节点,并发度非常高。
Map<String, String> map = new ConcurrentHashMap<>();
map.put("key", "value");
2. 替换 ArrayList
使用 CopyOnWriteArrayList 代替 ArrayList。
适用于“读多写少”的场景。写操作时会复制一份新数组,因此写操作开销较大,但读操作完全无锁。
List<String> list = new CopyOnWriteArrayList<>();
list.add("item");
五、 处理线程中的异常
多线程代码中,子线程抛出的异常不会直接打印到主控制台,这会导致错误被“吞掉”。
1. 捕获任务内异常
在 run() 方法内部使用 try-catch 块。
executor.submit(() -> {
try {
// 业务逻辑
int result = 10 / 0;
} catch (Exception e) {
e.printStackTrace(); // 或者记录到日志系统
}
});
2. 设置全局异常处理器
实现 Thread.UncaughtExceptionHandler 接口。
ThreadFactory factory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("线程 " + thread.getName() + " 抛出异常: " + throwable.getMessage());
});
return t;
};
ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), factory);
暂无评论,快来抢沙发吧!