本文整理了下Java中常用的多线程相关的类型和使用方式。

线程类

Thread类型

public class Thread implements Runnable {
    public static native void sleep(long millis) throws InterruptedException;

    public Thread(Runnable target);
    public Thread(Runnable target, String name);

    public final void join() throws InterruptedException;
}

例:继承Thread,覆盖Thread的run方法

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("hello1");
    }
}

MyThread mythd = new MyThread();
mythd.start();

Runnable接口

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

例:实现Runnable接口,传递给Thread

Thread thd = new Thread(new Runnable() {
    @Override
    public void run() {
        System.out.println("hello");
    }
});

thd.start();

线程同步控制

多线程同时访问共享的对象,就需要做好同步保护。

锁 Lock

最常用的锁是可重入锁:ReentrantLock

用锁保护一段代码(共享对象)的惯用法:

class X {
    private final ReentrantLock lock = new ReentrantLock();

    public void m() {
        lock.lock();
        try {
            // ... 
        } finally {
            lock.unlock();
        }
    }
}

条件对象 Condition

有时候只有锁,还无法满足一些带条件的需求,即获取了锁之后,还要满足一定的条件,才能继续执行相应的功能。

Java为锁对象,提供了条件对象,可以通过 newCondition() 方法获得一个条件对象。

银行账户转账的例子:

class Bank {

    public Bank(int n, double initial) {
        accounts = new double[n];
        for (int i=0; i<accounts.length; ++i) {
            accounts[i] = initial;
        }

        bankLock = new ReentrantLock();
        sufficientFunds = bankLock.newCondition();
    }

    public void transfer(int from, int to, double amount) throws InterruptedException {
        bankLock.lock();

        try {
            while (accounts[from] < amount) {
                sufficientFunds.await();
            }

            accounts[from] -= amount;
            accounts[to] += amount;

            sufficientFunds.notifyAll();

        } finally {
            bankLock.unlock();
        }
    }

    private final double[] accounts;
    private Lock bankLock;
    private Condition sufficientFunds;
}

说明:

  1. 当条件对象调用await后,当前线程被阻塞,进入该条件的等待集,并且放弃了锁,这样别的线程有机会继续获取锁。
  2. 当其他线程调用条件对象的notifyAll方法后,会激活所有该条件对象上的等待集中的线程
  3. 一旦锁可用时,这些激活的线程中的某一个线程将从await中返回,并且从阻塞的地方继续执行。

对象锁机制 synchronized

Java的每个对象内部都有一个对象锁,如果用synchronized关键字声明对象的方法,对象的锁将保护整个方法。

public synchronized void method() {
    
}

内部对象锁只有一个条件对象,可以直接调用await来添加线程到等待集,notifyAll / notify来解除等待线程的阻塞状态。

读写锁 ReentrantReadWriteLock

读写锁适合去保护读多写少的共享数据。

ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock rl = rwLock.readLock();
Lock wl = rwLock.writeLock();

原子操作

JDK1.5开始提供了java.util.concurrent.atomic包,方便在多线程环境下,无锁的进行原子操作。

常用 AtomicBoolean , AtomicInteger, AtomicLong, AtomicReference 这四种基本类型来处理布尔,整数,长整数,对象四种数据。

方法 说明
int get() 取得当前值
void set(int newValue) 设置当前值
int getAndSet(int newValue) 设置新值,并返回旧值
boolean compareAndSet(int expect, int u) 如果当前值为expect,则设置为u,否则则不进行修改
int getAndIncrement() 当前值加1,返回旧值
int getAndDecrement() 当前值减1,返回旧值
int getAndAdd(int delta) 当前值增加delta,返回旧值
int incrementAndGet() 当前值加1,返回新值
int decrementAndGet() 当前值减1,返回新值
int addAndGet(int delta) 当前值增加delta,返回新值

阻塞队列

阻塞队列抽象了解决生产者写和消费者读的常见场景。当向已满队列添加元素,或从为空队列读取元素时,线程发生阻塞等待。

阻塞队列方法:

方法 动作及特殊说明
put 添加一个元素;如果队列满,则阻塞
take 移出并返回头元素;如果队列空则阻塞
add 添加一个元素;如果队列满,抛IllegalStateException
element 返回队列头元素;如果队列空,抛NoSuchElementException
remove 移出并返回头元素;如果队列空则抛NoSuchElementException
offer 添加一个元素并返回true;如果队列满返回false
peek 返回队列的头元素;如果队列空返回null
poll 移出并返回队列的头元素;如果队列空返回null

线程安全的集合

ConcurrentHashMap

线程安全的散列映射表,常用方法:

ConcurrentSkipListMap

线程安全的有序映射表,要求Key类型实现Comparable接口

ConcurrentLinkedQueue

线程安全的无边界非阻塞的队列

ConcurrentSkipListSet

线程安全的有序集,要求元素类型实现Comparable接口

Callable与Future

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Callable是一个参数化的函数接口,类型参数就是返回值的类型

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();

    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future用来保存异步计算的结果:

FutureTask

FutureTask包装器可以把Callable转换成Future或Runnable

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

public class FutureTask<V> implements RunnableFuture<V> {
    //...
}

例子:

Callable<Integer> callback = () -> {return 5;};
FutureTask<Integer> task = new FutureTask<Integer>(callback);

Thread t = new Thread(task);
t.start();

Integer result = task.get(); // 5

执行器与线程池

创建线程是有一定代价的,如果程序需要很多生命期很短的线程,考虑使用线程池

执行器类(Executors)提供了很多静态工厂方法来创建不同类型的线程池

方法 说明
newCachedThreadPool 必要时会创建新线程,空闲线程会保留60s
newFixedThreadPool 包含固定数量的线程,空闲线程会一直保留
newSingleThreadExecutor 只有一个线程的“池”
newScheduledThreadPool 用于执行定时任务的线程池
newSingleThreadScheduledExecutor 用于执行定时任务的单线程池
package net.suninf.utils;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * 线程工具类
 * @author suninf
 */
@Component
public class ThreadUtil {

    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private ExecutorService threadPoolService;

    @PostConstruct
    public void init() {
        scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5,
                new BasicThreadFactory.Builder().namingPattern("thread-util-pool-%d").daemon(true).build());

        threadPoolService = new ThreadPoolExecutor(5, 64, 10L, TimeUnit.SECONDS,
            new SynchronousQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * 并发执行future任务
     * @param futuresList
     * @param <T>
     * @return
     */
    public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {

        CompletableFuture<Void> allFuturesResult =
                CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[] {}));

        return allFuturesResult.thenApply(v ->
                futuresList.stream().
                        map(future -> future.join()).
                        collect(Collectors.<T>toList())
        );
    }

    /**
     * 异步执行定时任务
     * @param task
     * @param startTime
     */
    public void postDelayTask(Runnable task, Date startTime) {
        ConcurrentTaskScheduler scheduler = new ConcurrentTaskScheduler(scheduledThreadPoolExecutor);
        scheduler.schedule(task, startTime);
    }

    /**
     * 异步执行任务
     * @param task
     */
    public void postTask(Runnable task) {
        threadPoolService.execute(task);
    }

}

参考