Disruptor高性能之道-等待策略

共 1718字,需浏览 4分钟

 ·

2022-03-06 05:00

我们接着介绍Disruptor高性能实现之道--等待策略。

等待策略waitStrategy是一种决定一个消费者如何等待生产者将event对象放入Disruptor的方式/策略。

等待策略waitStrategy是一个接口,它的所有实现都是针对消费者生效的。

Disruptor中主要的等待策略有哪些?

Disruptor中,等待策略waitStrategy有四个实现,分别是:

  • BlockingWaitStrategy:使用锁和条件变量实现的阻塞策略。如果不是将吞吐量和低延迟放在首位,则可以使用该策略。一般来说,这个策略的表现是中规中矩比较稳定的,它不会使CPU的负载飙高。

虽然客观上说, BlockingWaitStrategy是最低效的策略,但其也是CPU使用率最低和最稳定的策略。

在BlockingWaitStrategy内部维护了一个重入锁ReentrantLock和Condition;

  • SleepingWaitStrategy:性能表现和com.lmax.disruptor.BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;

SleepingWaitStrategy是一种无锁的方式,它的CPU使用率也比较低。具体的实现原理为:循环等待并且在循环中间调用LockSupport.parkNanos(1)来睡眠,(在Linux系统上面睡眠时间60µs).

SleepingWaitStrategy优点在于生产线程只需要计数,而不执行任何指令。并且没有条件变量的消耗。但是,事件对象从生产者到消费者传递的延迟变大了。SleepingWaitStrategy最好用在不需要低延迟,而且事件发布对于生产者的影响比较小的情况下。比如异步日志功能。

  • YieldingWaitStrategy:性能最好,适合用于低延迟的系统,在要求极高性能且事件处理线程数小于CPU逻辑核心数的场景中推荐使用此策略,例如CPU开启超线程的特性;

虽然YieldingWaitStrategy性能最好,但是它的实现机制是让出cpu使用权,保证cpu不会空闲,从而使得cpu始终处于工作态,因此该策略会使用100%的CPU,因此建议慎用。

  • BusySpinWaitStrategy:该策略原则上来说应当是性能最高的,它将线程绑定在特定的CPU内核,但是同时该策略也是部署过程中最为苛刻的策略。

BusySpinWaitStrategy发挥高性能的前提是事件处理线程比物理内核数目还要小的场景。例如:在禁用超线程技术的时候。

BlockingWaitStrategy

BlockingWaitStrategy是Disruptor中唯一使用到锁的地方。

public final class BlockingWaitStrategy implements WaitStrategy
{

    // 可重入锁
    private final Lock lock = new ReentrantLock();

    // 条件变量
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)
        {
            lock.lock();
            try
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally
            {
                lock.unlock();
            }
        }

        // 如果生产者新发布了事件,但是依赖的其他消费者还没处理完,则等待所依赖的消费者先处理
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
        lock.lock();
        try
        {
            processorNotifyCondition.signalAll();
        }
        finally
        {
            lock.unlock();
        }
    }

BlockingWaitStrategy的类长度不到100行,使用了Lock+Condition 实现了线程等待和唤醒操作。从而实现了生产者与消费者之间的同步。

消费者通过waitFor等待RingBuffer指定位置是否有可用数据,当存在可用数据,则消费者被唤醒。

    /**
     * @see Sequencer#publish(long)
     */
    @Override
    public void publish(long sequence)
    {
        cursor.set(sequence);
        waitStrategy.signalAllWhenBlocking();
    }
  • 如果生产者新发布了事件,但是依赖的其他消费者还没处理完,则等待所依赖的消费者先处理
  • 生产者新发布时间,会唤醒等待中的消费者。

SleepingWaitStrategy

SleepingWaitStrategy没有用到锁,这表明它无需调用signalAllWhenBlocking方法做唤醒处理。

SleepingWaitStrategy核心是通过「Thread.yield」 + 「LockSupport.parkNanos」,实现生产者和消费者之间的同步。

也就是说省去了生产线程的通知操作,官方源码注释如下:

 * This strategy is a good compromise between performance and CPU resource.
 * Latency spikes can occur after quiet periods.  It will also reduce the impact
 * on the producing thread as it will not need signal any conditional variables
 * to wake up the event handling thread.

大意是说,SleepingWaitStrategy策略在性能和CPU资源消耗之间取得了平衡,接下来去看看关键代码。


    private static final int DEFAULT_RETRIES = 200;
    private static final long DEFAULT_SLEEP = 100;

    private final int retries;
    private final long sleepTimeNs;

    @Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException
    {
        long availableSequence;
        int counter = retries;    // 默认值为DEFAULT_RETRIES = 200;

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            counter = applyWaitMethod(barrier, counter);
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
    }

waitFor 方法核心是while循环,我们可以看到,while循环没有任何的break操作,他就是个死循环。

counter默认值为200,自旋重试一定次数,如果在重试过程中,出现了可用sequence,也就是生产者往RingBuffer中生产了数据,则直接返回可用的序列号。

只要消费者没有等到可用的数据,就会一直循环,执行applyWaitMethod。

    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    {
        barrier.checkAlert();

        if (counter > 100)
        {
            --counter;
        }
        else if (counter > 0)
        {
            --counter;
            Thread.yield();
        }
        else
        {
            LockSupport.parkNanos(sleepTimeNs);
        }

        return counter;
    }

这里的核心就是counter计数器,完全是无锁的。

当计数器高于100时就执行减一的操作(最快响应),当计数器在100到0之间时每次都交出CPU执行时间(最省资源),其他时候就睡眠固定时间:

如果重试指定次数以后,还是没有可用序列号,则继续自旋重试:

  • 0-100:每重试一次,便调用Thread.yield方法,让渡CPU的使用权,让其它线程可以使用CPU。当该线程再次获取CPU使用权时,继续重试,如果还没有可用的序列号,则继续放弃CPU使用权等待。此循环最多100次。
  • 假如在等待过程中还是没有可用的序列号,则调用LockSupport.parkNanos方法阻塞消费线程,阻塞时长通过SleepingWaitStrategy构造方法设置,一直阻塞到出现了可用的sequence(一直阻塞到生产者生产了数据)。
  • 当LockSupport.parkNanos方法由于超时返回后,还没有可用的sequence序列号,则该线程获取CPU使用权以后,可能继续调用LockSupport.parkNanos方法阻塞线程。

跟其它几种等待策略相比,它既没有直接使用锁,也没有直接自旋。属于一种在性能和CPU资源之间折中的方案。

BusySpinWaitStrategy

BusySpinWaitStrategy的实现代码行数只有几十行,从它的注释可以看出: 该策略将线程绑定到了特定的CPU内核。

/**
 * Busy Spin strategy that uses a busy spin loop for {@link EventProcessor}s waiting on a barrier.
 * 


 * This strategy will use CPU resource to avoid syscalls which can introduce latency jitter.  It is best
 * used when threads can be bound to specific CPU cores.
 */
public final class BusySpinWaitStrategy implements WaitStrategy
{
    @Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
    }
}

当没有可用sequence时,消费者会一直执行while循环,具体的逻辑为 「ThreadHints.onSpinWait();」


    private static final MethodHandle ON_SPIN_WAIT_METHOD_HANDLE;

    public static void onSpinWait()
    {
        // Call java.lang.Thread.onSpinWait() on Java SE versions that support it. Do nothing otherwise.
        // This should optimize away to either nothing or to an inlining of java.lang.Thread.onSpinWait()
        if (null != ON_SPIN_WAIT_METHOD_HANDLE)
        {
            try
            {
                ON_SPIN_WAIT_METHOD_HANDLE.invokeExact();
            }
            catch (final Throwable ignore)
            {
            }
        }
    }

当ON_SPIN_WAIT_METHOD_HANDLE 不为空,则执行 ON_SPIN_WAIT_METHOD_HANDLE.invokeExact(); 底层是一个native方法。

那么我们可以猜想,如果ON_SPIN_WAIT_METHOD_HANDLE为空,那么这个外层的while循环就是一个纯粹的自旋操作,也就是说这个操作非常消耗CPU。

ON_SPIN_WAIT_METHOD_HANDLE为空是一个比较严重的场景,它的初始化逻辑为:


    # com.lmax.disruptor.util.ThreadHints
    static
    {
        final MethodHandles.Lookup lookup = MethodHandles.lookup();

        MethodHandle methodHandle = null;
        try
        {
            methodHandle = lookup.findStatic(Thread.class, "onSpinWait", methodType(void.class));
        }
        catch (final Exception ignore)
        {
        }

        ON_SPIN_WAIT_METHOD_HANDLE = methodHandle;
    }

可以看到,这里的methodHandle其实就是Thread类中的onSpinWait方法,

如果Thread类没有onSpinWait方法那么使用BusySpinWaitStrategy作为等待策略就在RingBuffer中没有数据时,消费线程就会执行自旋空转,这个操作很耗费CPU。

那么问题就变成了,Thread类中是否存在「onSpinWait」 方法的问题了。

有趣的是,onSpinWait方法在JDK1.9之后才添加到了Thread类中,也就是说,对于JDK1.8(包括1.8)之前的用户而言,使用BusySpinWaitStrategy就意味着,找不到Thread类的onSpinWait方法,而最终导致消费者阻塞在waitFor方法上,执行无意义的自旋操作,把CPU负载打满(就是一个while(true)死循环)。

在jdk1.9及以上版本中,Thread.onSpinWait是有意义的。它会通知CPU当前线程处于循环查询的状态,CPU得知该状态后就会调度更多CPU资源给其他线程,从而缓解死循环对当前cpu核的压力。

回过头来,BusySpinWaitStrategy的注释告诉我们:如果使用该策略,尽量绑定线程到固定的CPU核心。但是同样的,该策略与YieldingWaitStrategy策略相比,会出现当没有可用序列号时长期占用CPU而让出CPU使用权(死循环),导致其它线程无法获取CPU使用权。

如何实现利用线程亲和性绑定线程到具体的CPU?

那么这个操作又该如何实现呢?

通过使用net.openhft.affinity包,就可以实现线程亲和性,它会强制你的应用线程运行在特定的一个或多个cpu上。

maven依赖为:net.openhftaffinity3.0.6

在初始化Disruptor实例时,ThreadFactory参数传入affinity线程亲和工厂。

以Spring项目中实例化Disruptor为例:

disruptor-init.png

openhft-affinity.png

YieldingWaitStrategy

YieldingWaitStrategy相比于SleepingWaitStrategy,实现机制就很激进,它完全基于Thread.yield出让cpu使用权,让CPU利用率保持在100%。

public final class YieldingWaitStrategy implements WaitStrategy
{
    private static final int SPIN_TRIES = 100;

    @Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        int counter = SPIN_TRIES;

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            counter = applyWaitMethod(barrier, counter);
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking()
    {
    }

当消费者没有获取到可用的sequence,则循环执行applyWaitMethod。直到存在可用的sequence,就返回该sequence。

返回sequence之后就可以根据该sequence从RingBuffer中get出这个sequence对应的event,执行业务操作。

    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    {
        barrier.checkAlert();

        // counter默认为100,在减小到0之前不会进入if分支
        if (0 == counter)
        {
            Thread.yield();
        }
        else
        {
            --counter;
        }

        return counter;
    }
}
  • 首先,counter默认为100,在减小到0之前不会进入if分支,直接进入else,执行减1操作。

也就是说,首先会自旋重试100次(此值可设置,默认100次),如果在重试过程中,存在可用的序列号,则直接返回可用的序列号。

  • 如果自旋了100次,counter减到0了,还是没有得到可用的sequence序列号,那么就会调用Thread.yield方法,让渡CPU的使用权,让其它线程可以争抢到CPU使用权。当该线程再次获取CPU使用权时,继续该过程:如果没有可用的序列号,则继续放弃CPU使用权等待。

从分析我们可以看出,YieldingWaitStrategy基本上是在等待sequence期间,不断的通过Thread.yield出让CPU的使用权,因此这个策略会让CPU使用率保持在100%的满负荷,生产中强烈推荐 「不要使用」

盘点等待策略

  • BlockingWaitStrategy:基于ReentrantLock的等待&&唤醒机制实现等待逻辑,该策略是Disruptor的默认策略,比较节省CPU,生产环境推荐使用;
  • BusySpinWaitStrategy:持续自旋,不推荐使用,会造成CPU负载100%;
  • DummyWaitStrategy:返回的Sequence值为0,正常情况下不使用
  • LiteBlockingWaitStrategy:基于BlockingWaitStrategy的轻量级等待策略,在没有锁竞争的时候会省去唤醒操作,但是作者说测试不充分,因此不建议使用
  • TimeoutBlockingWaitStrategy:带超时的等待策略,超时后会执行业务指定的处理逻辑
  • LiteTimeoutBlockingWaitStrategy:基于TimeoutBlockingWaitStrategy的策略,当没有锁竞争的时候会省去唤醒操作
  • SleepingWaitStrategy:三段式策略,第一阶段自旋,第二阶段执行Thread.yield让出CPU,第三阶段睡眠执行时间,反复的睡眠
  • YieldingWaitStrategy:二段式策略,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  • PhasedBackoffWaitStrategy:四段式策略,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,该成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy三个中的一个

扩展:单一写原则

在并发系统中提高性能最好的方式之一就是单一写原则,Disruptor中的生产者就体现了这一原则。

如果在你的代码中仅仅有一个事件生产者,那么可以设置为单一生产者模式来提高系统的性能。

单一写的好处在于:完全不需要考虑同步多个写线程,写入操作没有上下文切换,并且是线程安全的(写入串行化)。

关于单一写原则,可以阅读:https://mechanical-sympathy.blogspot.com/2011/09/single-writer-principle.html


浏览 569
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报