CompleableFuture原理和源码分析

CompleableFuture 使用场景 CompletableFuture的定义如下: public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 我们看到CompletableFuture是实现了Future的接口的,在没有CompletableFuture之前,我们可以用FutureTask来实现一个Future的功能。那么有了FutureTask那么为什么还要有CompletableFuture呢? 我任务主要是CompletableFuture有两个优点 CompletableFuture可以实现完全的异步,而FutureTask必须通过get阻塞的方式获取结果 CompletableFuture .supplyAsync(()-> 1+2) .thenAccept((v)-> System.out.println(v*v)); 如上面的代码所示,我们完整的任务有两个阶段,一阶段是计算1+2,二阶段是计算一阶段返回结果的平方,在整个过程中,主线程完全不需要管这个任务的执行情况,也不会阻塞主线程。但是如果用FutureTask实现如上功能如下: FutureTask<Integer> futureTask1 = new FutureTask<Integer>(() -> { return 1 + 2; }); new Thread(futureTask1).start(); Integer periodOneResult = futureTask1.get(); FutureTask<Integer> futureTask2 = new FutureTask<Integer>(() -> { return periodOneResult * periodOneResult; }); new Thread(futureTask2).start(); Integer secondOneResult = futureTask2.get(); System.out.println(secondOneResult); 代码冗长不说,还需要get方法阻塞主线程去获取结果。以上代码只是说明CompletableFuture的异步优点,实际工作中你可以把两个任务看出两个api CompletableFuture可以实现复杂的任务编排,请思考下面代码的执行顺序是什么? CompletableFuture<String> base = new CompletableFuture<>(); CompletableFuture<String> completion0 = base.thenApply(s -> { System.out.println("completion 0"); return s + " 0"; }); CompletableFuture<String> completion1 = base....

4 分钟 · pan

java.util.concurrent.locks包下锁的实现原理之ReentrantLock

一、java concurrent包下lock类图概览 红色连线的表示内部类 ![image.png](/java java.util.concurrent.locks包下锁的实现原理之ReentrantLock/8596800-037daeafe21e322b.png) 1、java并发包下面的锁主要就两个,ReentrantLock(实现Lock接口) 和ReentrantReadWriteLock(实现ReadWriteLock接口)。 2、ReentrantLock类构造函数如下, sync是Sync的实例,NonfairSync(非公平锁)和FairSync(公平锁)是Sync的子类。 public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } 3、ReentrantReadWriteLock类构造函数如下,共有三个属性,sync、readerLock、writerLock public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } 我们看到ReentrantLock和ReentrantReadWriteLock都的实现都依赖于sync这个对象。sync是AbstractQueuedSynchronizer的实例。AbstractQueuedSynchronizer就是java并发包下面实现锁和线程同步的基础,AbstractQueuedSynchronizer就是大名鼎鼎的AQS队列,下文我们都用AQS来表示AbstractQueuedSynchronizer。 ##二、ReentrantLock实现原理 1、如何加锁 ReentrantLock使用方式如下 class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock....

5 分钟 · pan

java.util.concurrent.locks包下锁的实现原理之读写锁

java并发包已经存在Reentrant锁和条件锁,已经可以满足很多并发场景下线程安全的需求。但是在大量读少量写的场景下,并不是最优的选择。与传统锁不同的是读写锁的规则是可以共享读,但只能一个写。很多博客中总结写到读读不互斥,读写互斥,写写互斥。就读写这个场景下来说,如果一个线程获取了写锁,然后再获取读锁(同一个线程)也是可以的。锁降级就是这种情况。但是如果也是同一个线程,先获取读锁,再获取写锁是获取不到的(发生死锁)。所以严谨一点情况如下: 项目 非同一个线程 同一个线程 读读 不互斥 不互斥 读写 互斥 锁升级(不支持),发生死锁 写读 互斥 锁降级(支持),不互斥 写写 互斥 不互斥 读写锁的主要特性: 公平性:支持公平性和非公平性。 重入性:支持重入。读写锁最多支持 65535 个递归写入锁和 65535 个递归读取锁。 锁降级:遵循获取写锁,再获取读锁,最后释放写锁的次序,如此写锁能够降级成为读锁。 ReentrantReadWriteLock java.util.concurrent.locks.ReentrantReadWriteLock ,实现 ReadWriteLock 接口,可重入的读写锁实现类。在它内部,维护了一对相关的锁,一个用于只读操作(共享锁),另一个用于写入操作(排它锁)。 ReentrantReadWriteLock 类的大体结构如下: /** 内部类 读锁 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 内部类 写锁 */ private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; /** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock() { this(false); } /** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */ public ReentrantReadWriteLock(boolean fair) { sync = fair ?...

4 分钟 · pan

java.util.concurrent.locks包下锁的实现原理之条件锁

上一篇 文章中我们分析了ReentrantLock的实现原理,今天在分析一下条件锁。条件锁的具体实现在AbstractQueuedSynchronizer的内部类ConditionObject类中,总之,java中的锁,离不开AQS的实现。条件锁一般如下使用。 class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock....

2 分钟 · pan

JAVA并发编程 线程池Executors(JDK1.7)

Java中对线程池提供了很好的支持,有了线程池,我们就不需要自已再去创建线程。如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。JAVA的线程池中的线程可以在执行完任务后,不销毁,继续执行其他的任务。所以了解Java的线程池对我们掌握并发编程是很有帮助的。 先看一下线程池框架Executors涉及到的核心类。 Executor:父类,官方表述为用来解耦任务的提交,可以自已实现,比如调用线程执行该任务,或者起一个新的线程执行该任务 ExecutorService:比父类Executor定义了更多的接口用来提交、管理、终止任务 AbstractExecutorService:提供了ExecutorService默认实现 下面我就从Executors这个多线程框架开始讲起,首先看一下Executors中主要的方法 ThreadPoolExecutor:比AbstractExecutorService提供更多的功能,特别是大量的线程创建,销毁。在性能上更优越。 Executors:工厂类,提供了几个核心创建线程池的方法。 Executors核心方法 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit....

6 分钟 · pan