本文整理了下Java中常用的多线程相关的类型和使用方式。
线程类
Thread类型
public class Thread implements Runnable {
public static native void sleep(long millis) throws InterruptedException;
public Thread(Runnable target);
public Thread(Runnable target, String name);
public final void join() throws InterruptedException;
}
例:继承Thread,覆盖Thread的run方法
class MyThread extends Thread {
@Override
public void run() {
System.out.println("hello1");
}
}
MyThread mythd = new MyThread();
mythd.start();
Runnable接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
例:实现Runnable接口,传递给Thread
Thread thd = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
});
thd.start();
线程同步控制
多线程同时访问共享的对象,就需要做好同步保护。
锁 Lock
最常用的锁是可重入锁:ReentrantLock
用锁保护一段代码(共享对象)的惯用法:
class X {
private final ReentrantLock lock = new ReentrantLock();
public void m() {
lock.lock();
try {
// ...
} finally {
lock.unlock();
}
}
}
条件对象 Condition
有时候只有锁,还无法满足一些带条件的需求,即获取了锁之后,还要满足一定的条件,才能继续执行相应的功能。
Java为锁对象,提供了条件对象,可以通过 newCondition()
方法获得一个条件对象。
银行账户转账的例子:
class Bank {
public Bank(int n, double initial) {
accounts = new double[n];
for (int i=0; i<accounts.length; ++i) {
accounts[i] = initial;
}
bankLock = new ReentrantLock();
sufficientFunds = bankLock.newCondition();
}
public void transfer(int from, int to, double amount) throws InterruptedException {
bankLock.lock();
try {
while (accounts[from] < amount) {
sufficientFunds.await();
}
accounts[from] -= amount;
accounts[to] += amount;
sufficientFunds.notifyAll();
} finally {
bankLock.unlock();
}
}
private final double[] accounts;
private Lock bankLock;
private Condition sufficientFunds;
}
说明:
- 当条件对象调用await后,当前线程被阻塞,进入该条件的等待集,并且放弃了锁,这样别的线程有机会继续获取锁。
- 当其他线程调用条件对象的notifyAll方法后,会激活所有该条件对象上的等待集中的线程
- 一旦锁可用时,这些激活的线程中的某一个线程将从await中返回,并且从阻塞的地方继续执行。
对象锁机制 synchronized
Java的每个对象内部都有一个对象锁,如果用synchronized关键字声明对象的方法,对象的锁将保护整个方法。
public synchronized void method() {
}
内部对象锁只有一个条件对象,可以直接调用await来添加线程到等待集,notifyAll / notify来解除等待线程的阻塞状态。
读写锁 ReentrantReadWriteLock
读写锁适合去保护读多写少的共享数据。
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock rl = rwLock.readLock();
Lock wl = rwLock.writeLock();
原子操作
JDK1.5开始提供了java.util.concurrent.atomic包,方便在多线程环境下,无锁的进行原子操作。
常用 AtomicBoolean , AtomicInteger, AtomicLong, AtomicReference 这四种基本类型来处理布尔,整数,长整数,对象四种数据。
方法 | 说明 |
---|---|
int get() | 取得当前值 |
void set(int newValue) | 设置当前值 |
int getAndSet(int newValue) | 设置新值,并返回旧值 |
boolean compareAndSet(int expect, int u) | 如果当前值为expect,则设置为u,否则则不进行修改 |
int getAndIncrement() | 当前值加1,返回旧值 |
int getAndDecrement() | 当前值减1,返回旧值 |
int getAndAdd(int delta) | 当前值增加delta,返回旧值 |
int incrementAndGet() | 当前值加1,返回新值 |
int decrementAndGet() | 当前值减1,返回新值 |
int addAndGet(int delta) | 当前值增加delta,返回新值 |
阻塞队列
阻塞队列抽象了解决生产者写和消费者读的常见场景。当向已满队列添加元素,或从为空队列读取元素时,线程发生阻塞等待。
- LinkedBlockingQueue:默认无边界的队列,也可以设置容量
- ArrayBlockingQueue:需要指定容量
- PriorityBlockingQueue:基于优先队列
阻塞队列方法:
方法 | 动作及特殊说明 |
---|---|
put | 添加一个元素;如果队列满,则阻塞 |
take | 移出并返回头元素;如果队列空则阻塞 |
add | 添加一个元素;如果队列满,抛IllegalStateException |
element | 返回队列头元素;如果队列空,抛NoSuchElementException |
remove | 移出并返回头元素;如果队列空则抛NoSuchElementException |
offer | 添加一个元素并返回true;如果队列满返回false |
peek | 返回队列的头元素;如果队列空返回null |
poll | 移出并返回队列的头元素;如果队列空返回null |
线程安全的集合
ConcurrentHashMap
线程安全的散列映射表,常用方法:
- putIfAbsent(key, value)
- remove(key, value)
- replace(key, oldValue, newValue)
ConcurrentSkipListMap
线程安全的有序映射表,要求Key类型实现Comparable接口
ConcurrentLinkedQueue
线程安全的无边界非阻塞的队列
ConcurrentSkipListSet
线程安全的有序集,要求元素类型实现Comparable接口
Callable与Future
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable是一个参数化的函数接口,类型参数就是返回值的类型
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future用来保存异步计算的结果:
- get 方法的调用会阻塞,直到计算完成
- cancel 方法可以取消该计算
FutureTask
FutureTask包装器可以把Callable转换成Future或Runnable
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
public class FutureTask<V> implements RunnableFuture<V> {
//...
}
例子:
Callable<Integer> callback = () -> {return 5;};
FutureTask<Integer> task = new FutureTask<Integer>(callback);
Thread t = new Thread(task);
t.start();
Integer result = task.get(); // 5
执行器与线程池
创建线程是有一定代价的,如果程序需要很多生命期很短的线程,考虑使用线程池
执行器类(Executors)提供了很多静态工厂方法来创建不同类型的线程池
方法 | 说明 |
---|---|
newCachedThreadPool | 必要时会创建新线程,空闲线程会保留60s |
newFixedThreadPool | 包含固定数量的线程,空闲线程会一直保留 |
newSingleThreadExecutor | 只有一个线程的“池” |
newScheduledThreadPool | 用于执行定时任务的线程池 |
newSingleThreadScheduledExecutor | 用于执行定时任务的单线程池 |
- 前3个方法返回实现ExecutorService接口的类对象,可以使用submit提交任务,同时返回Future对象,可以用来取结果或取消任务。
- 定时任务执行器,可以使用schedule或scheduleAtFixedRate来提交任务
package net.suninf.utils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 线程工具类
* @author suninf
*/
@Component
public class ThreadUtil {
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private ExecutorService threadPoolService;
@PostConstruct
public void init() {
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5,
new BasicThreadFactory.Builder().namingPattern("thread-util-pool-%d").daemon(true).build());
threadPoolService = new ThreadPoolExecutor(5, 64, 10L, TimeUnit.SECONDS,
new SynchronousQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 并发执行future任务
* @param futuresList
* @param <T>
* @return
*/
public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
CompletableFuture<Void> allFuturesResult =
CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[] {}));
return allFuturesResult.thenApply(v ->
futuresList.stream().
map(future -> future.join()).
collect(Collectors.<T>toList())
);
}
/**
* 异步执行定时任务
* @param task
* @param startTime
*/
public void postDelayTask(Runnable task, Date startTime) {
ConcurrentTaskScheduler scheduler = new ConcurrentTaskScheduler(scheduledThreadPoolExecutor);
scheduler.schedule(task, startTime);
}
/**
* 异步执行任务
* @param task
*/
public void postTask(Runnable task) {
threadPoolService.execute(task);
}
}
参考
- http://blog.csdn.net/erlian1992/article/details/51702843
- http://blog.csdn.net/evankaka/article/details/44153709
- http://wanglizhi.github.io/2016/08/05/Java-Concurrency/
- http://www.jianshu.com/p/40d4c7aebd66
- http://blog.csdn.net/lzm1340458776/article/details/27964243
- https://netboyc.gitbooks.io/java-high/content/ru-he-jian-she-gao-ke-yong-xi-tong.html