Java 类com.lmax.disruptor.util.Util 实例源码

项目:disruptor-code-analysis    文件:WorkerPool.java   
/**
 * Wait for the {@link RingBuffer} to drain of published events then halt the workers.
 */
public void drainAndHalt()
{
    Sequence[] workerSequences = getWorkerSequences();
    while (ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences))
    {
        Thread.yield();
    }

    for (WorkProcessor<?> processor : workProcessors)
    {
        processor.halt();
    }

    started.set(false);
}
项目:chaperone    文件:AuditReporter.java   
AuditReporter(int queueSize, long timeBucketIntervalInSec, int reportFreqMsgCount, int reportFreqIntervalSec,
    boolean combineMetricsAmongHosts) {
  reportExecutor =
      Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(getType() + "-audit-reporter-%d")
          .build());;

  queueSize = Util.ceilingNextPowerOfTwo(queueSize);
  disruptor = new Disruptor<AuditMsgReportTask>(new AuditMsgReportTaskFactory(), queueSize, reportExecutor);
  disruptor.handleEventsWith(new AuditMsgReportTaskHandler(this));
  ringBuffer = disruptor.getRingBuffer();

  aggregator =
      new AuditAggregator(timeBucketIntervalInSec, reportFreqMsgCount, reportFreqIntervalSec,
          combineMetricsAmongHosts);

  SUBMITTED_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.submittedNumber");
  FAILED_TO_SUBMIT_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.failedToSubmitNumber");
  REPORTED_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.reportedNumber");
  FAILED_TO_REPORT_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.failedToReportNumber");
  Metrics.getRegistry().register(getType() + ".auditReporter.queueSize", new Gauge<Integer>() {
    @Override
    public Integer getValue() {
      return (int) disruptor.getRingBuffer().remainingCapacity();
    }
  });
}
项目:jstorm-0.9.6.3-    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#hasAvailableCapacity(int)
 */
@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
    long nextValue = pad.nextValue;

    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        pad.cachedValue = minSequence;

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:jstorm-0.9.6.3-    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
{
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:learn_jstorm    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#hasAvailableCapacity(int)
 */
@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
    long nextValue = pad.nextValue;

    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        pad.cachedValue = minSequence;

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:learn_jstorm    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
{
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:jstrom    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#hasAvailableCapacity(int)
 */
@Override
public boolean hasAvailableCapacity(final int requiredCapacity) {
    long nextValue = pad.nextValue;

    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        pad.cachedValue = minSequence;

        if (wrapPoint > minSequence) {
            return false;
        }
    }

    return true;
}
项目:Tstream    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#hasAvailableCapacity(int)
 */
@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
    long nextValue = pad.nextValue;

    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        pad.cachedValue = minSequence;

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:Tstream    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
{
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:annotated-src    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#hasAvailableCapacity(int)
 */
@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
    long nextValue = pad.nextValue;

    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        pad.cachedValue = minSequence;

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:annotated-src    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
{
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:annotated-src    文件:WorkerPool.java   
/**
 * Wait for the {@link RingBuffer} to drain of published events then halt
 * the workers.
 */
public void drainAndHalt()
{
    Sequence[] workerSequences = getWorkerSequences();
    while (ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences))
    {
        Thread.yield();
    }

    for (WorkProcessor<?> processor : workProcessors)
    {
        processor.halt();
    }

    started.set(false);
}
项目:log4j2    文件:AsyncLogger.java   
private static int calculateRingBufferSize() {
    int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
    final String userPreferredRBSize = System.getProperty(
            "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
    try {
        int size = Integer.parseInt(userPreferredRBSize);
        if (size < RINGBUFFER_MIN_SIZE) {
            size = RINGBUFFER_MIN_SIZE;
            LOGGER.warn(
                    "Invalid RingBufferSize {}, using minimum size {}.",
                    userPreferredRBSize, RINGBUFFER_MIN_SIZE);
        }
        ringBufferSize = size;
    } catch (final Exception ex) {
        LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
                userPreferredRBSize, ringBufferSize);
    }
    return Util.ceilingNextPowerOfTwo(ringBufferSize);
}
项目:log4j2    文件:AsyncLoggerConfigHelper.java   
private static int calculateRingBufferSize() {
    int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
    final String userPreferredRBSize = System.getProperty(
            "AsyncLoggerConfig.RingBufferSize",
            String.valueOf(ringBufferSize));
    try {
        int size = Integer.parseInt(userPreferredRBSize);
        if (size < RINGBUFFER_MIN_SIZE) {
            size = RINGBUFFER_MIN_SIZE;
            LOGGER.warn(
                    "Invalid RingBufferSize {}, using minimum size {}.",
                    userPreferredRBSize, RINGBUFFER_MIN_SIZE);
        }
        ringBufferSize = size;
    } catch (final Exception ex) {
        LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
                userPreferredRBSize, ringBufferSize);
    }
    return Util.ceilingNextPowerOfTwo(ringBufferSize);
}
项目:jstorm    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#hasAvailableCapacity(int)
 */
@Override
public boolean hasAvailableCapacity(final int requiredCapacity) {
    long nextValue = pad.nextValue;

    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        pad.cachedValue = minSequence;

        if (wrapPoint > minSequence) {
            return false;
        }
    }

    return true;
}
项目:disruptor-code-analysis    文件:SingleProducerSequencer.java   
/**
 * 判断RingBuffer是否还有可用的空间能够容纳requiredCapacity个Event.
 */
@Override
public boolean hasAvailableCapacity(final int requiredCapacity) {
    long nextValue = this.nextValue;  // 生产者下一个可使用的位置序号

    // 从nextValue位置开始,如果再申请requiredCapacity个位置,将要达到的位置,因为是环形数组,所以减去bufferSize
    // 下面会用该值和消费者的位置序号比较.
    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;

    // 消费者上一次消费的位置, 消费者每次消费之后会更新该值.
    long cachedGatingSequence = this.cachedValue;

    // 先看看这个条件的对立条件: wrapPoint <= cachedGatingSequence && cachedGatingSequence <= nextValue
    // 表示当前生产者走在消费者的前面, 并且就算再申请requiredCapacity个位置达到的位置也不会覆盖消费者上一次消费的位置(就更不用关心
    // 当前消费者消费的位置了,因为消费者消费的位置是一直增大的),这种情况一定能够分配requiredCapacity个空间.

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        // gatingSequences保存的是消费者的当前消费位置, 因为可能有多个消费者, 所以此处获取序号最小的位置.
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        // 顺便更新消费者上一次消费的位置...
        this.cachedValue = minSequence;
        // 如果申请之后的位置会覆盖消费者的位置,则不能分配空间,返回false
        if (wrapPoint > minSequence) {
            return false;
        }
        // 否则返回true.
    }

    return true;
}
项目:disruptor-code-analysis    文件:SingleProducerSequencer.java   
/**
 * 申请n个可用空间, 返回该位置的序号, 如果当前没有可用空间, 则一直阻塞直到有可用空间位置.
 */
@Override
public long next(int n) {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue;

    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = this.cachedValue;

    // 这里的判断逻辑和上面的hasAvailableCapacity函数一致, 不多说了.
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence;

        // 如果一直没有可用空间, 当前线程挂起, 不断循环检测,直到有可用空间.
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }
        // 顺便更新一下消费者消费的位置序号.
        this.cachedValue = minSequence;
    }

    this.nextValue = nextSequence;
    // 返回最后一个可用位置的序号.
    return nextSequence;
}
项目:disruptor-code-analysis    文件:SingleProducerSequencer.java   
/**
 * 返回当前RingBuffer的可用位置数目.
 */
@Override
public long remainingCapacity() {
    long nextValue = this.nextValue;

    // (多个)消费者消费的最小位置
    long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
    // 生产者的位置
    long produced = nextValue;
    // 空余的可用的位置数目.
    return getBufferSize() - (produced - consumed);
}
项目:disruptor-code-analysis    文件:MultiProducerSequencer.java   
/**
 * Construct a Sequencer with the selected wait strategy and buffer size.
 *
 * @param bufferSize   the size of the buffer that this will sequence over.
 * @param waitStrategy for those waiting on sequences.
 */
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
项目:disruptor-code-analysis    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) {
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence) {
            return false;
        }
    }

    return true;
}
项目:disruptor-code-analysis    文件:MultiProducerSequencer.java   
/**
 * @see Sequencer#remainingCapacity()
 */
@Override
public long remainingCapacity() {
    long consumed = Util.getMinimumSequence(gatingSequences, cursor.get());
    long produced = cursor.get();
    return getBufferSize() - (produced - consumed);
}
项目:disruptor-code-analysis    文件:SequenceBarrierTest.java   
@Test
public void shouldWaitForWorkCompleteWhereAllWorkersAreBlockedOnRingBuffer() throws Exception
{
    long expectedNumberMessages = 10;
    fillRingBuffer(expectedNumberMessages);

    final StubEventProcessor[] workers = new StubEventProcessor[3];
    for (int i = 0, size = workers.length; i < size; i++)
    {
        workers[i] = new StubEventProcessor();
        workers[i].setSequence(expectedNumberMessages - 1);
    }

    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(Util.getSequencesFor(workers));

    Runnable runnable = new Runnable()
    {
        public void run()
        {
            long sequence = ringBuffer.next();
            StubEvent event = ringBuffer.get(sequence);
            event.setValue((int) sequence);
            ringBuffer.publish(sequence);

            for (StubEventProcessor stubWorker : workers)
            {
                stubWorker.setSequence(sequence);
            }
        }
    };

    new Thread(runnable).start();

    long expectedWorkSequence = expectedNumberMessages;
    long completedWorkSequence = sequenceBarrier.waitFor(expectedNumberMessages);
    assertTrue(completedWorkSequence >= expectedWorkSequence);
}
项目:disruptor-code-analysis    文件:SequenceBarrierTest.java   
@Test
public void shouldWaitForWorkCompleteWhereCompleteWorkThresholdIsBehind() throws Exception
{
    long expectedNumberMessages = 10;
    fillRingBuffer(expectedNumberMessages);

    final StubEventProcessor[] eventProcessors = new StubEventProcessor[3];
    for (int i = 0, size = eventProcessors.length; i < size; i++)
    {
        eventProcessors[i] = new StubEventProcessor();
        eventProcessors[i].setSequence(expectedNumberMessages - 2);
    }

    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(Util.getSequencesFor(eventProcessors));

    Runnable runnable = new Runnable()
    {
        public void run()
        {
            for (StubEventProcessor stubWorker : eventProcessors)
            {
                stubWorker.setSequence(stubWorker.getSequence().get() + 1L);
            }
        }
    };

    Thread thread = new Thread(runnable);
    thread.start();
    thread.join();

    long expectedWorkSequence = expectedNumberMessages - 1;
    long completedWorkSequence = sequenceBarrier.waitFor(expectedWorkSequence);
    assertTrue(completedWorkSequence >= expectedWorkSequence);
}
项目:jstorm-0.9.6.3-    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#next(int)
 */
@Override
public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = pad.nextValue;

    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence;
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
        {
            if (AbstractSequencerExt.isWaitSleep()) {
                try {
        Thread.sleep(1);
    } catch (InterruptedException e) {
    }
            }else {
                LockSupport.parkNanos(1); 
            }
        }

        pad.cachedValue = minSequence;
    }

    pad.nextValue = nextSequence;

    return nextSequence;
}
项目:jstorm-0.9.6.3-    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#remainingCapacity()
 */
@Override
public long remainingCapacity()
{
    long nextValue = pad.nextValue;

    long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
    long produced = nextValue;
    return getBufferSize() - (produced - consumed);
}
项目:jstorm-0.9.6.3-    文件:MultiProducerSequencer.java   
/**
 * Construct a Sequencer with the selected wait strategy and buffer size.
 *
 * @param bufferSize the size of the buffer that this will sequence over.
 * @param waitStrategy for those waiting on sequences.
 */
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
项目:jstorm-0.9.6.3-    文件:MultiProducerSequencer.java   
/**
 * @see Sequencer#remainingCapacity()
 */
@Override
public long remainingCapacity()
{
    long consumed = Util.getMinimumSequence(gatingSequences, cursor.get());
    long produced = cursor.get();
    return getBufferSize() - (produced - consumed);
}
项目:learn_jstorm    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#next(int)
 */
@Override
public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = pad.nextValue;

    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence;
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
        {
            if (AbstractSequencerExt.isWaitSleep()) {
                try {
        Thread.sleep(1);
    } catch (InterruptedException e) {
    }
            }else {
                LockSupport.parkNanos(1); 
            }
        }

        pad.cachedValue = minSequence;
    }

    pad.nextValue = nextSequence;

    return nextSequence;
}
项目:learn_jstorm    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#remainingCapacity()
 */
@Override
public long remainingCapacity()
{
    long nextValue = pad.nextValue;

    long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
    long produced = nextValue;
    return getBufferSize() - (produced - consumed);
}
项目:learn_jstorm    文件:MultiProducerSequencer.java   
/**
 * Construct a Sequencer with the selected wait strategy and buffer size.
 *
 * @param bufferSize the size of the buffer that this will sequence over.
 * @param waitStrategy for those waiting on sequences.
 */
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
项目:learn_jstorm    文件:MultiProducerSequencer.java   
/**
 * @see Sequencer#remainingCapacity()
 */
@Override
public long remainingCapacity()
{
    long consumed = Util.getMinimumSequence(gatingSequences, cursor.get());
    long produced = cursor.get();
    return getBufferSize() - (produced - consumed);
}
项目:f1x    文件:MessageProcessorPool.java   
/**
 * Wait for the {@link RingBuffer} to drain of published events then halt the workers.
 */
public void drainAndHalt() {
    Sequence[] workerSequences = getWorkerSequences();

    while (ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences)) {
        Thread.yield();
    }

    for (MessageProcessor processor : messageProcessors) {
        processor.halt();
    }

    started.set(false);
}
项目:jstrom    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#next(int)
 */
@Override
public long next(int n) {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = pad.nextValue;

    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence;
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
            if (AbstractSequencerExt.isWaitSleep()) {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }
            } else {
                LockSupport.parkNanos(1);
            }
        }

        pad.cachedValue = minSequence;
    }

    pad.nextValue = nextSequence;

    return nextSequence;
}
项目:jstrom    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#remainingCapacity()
 */
@Override
public long remainingCapacity() {
    long nextValue = pad.nextValue;

    long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
    long produced = nextValue;
    return getBufferSize() - (produced - consumed);
}
项目:jstrom    文件:MultiProducerSequencer.java   
/**
 * Construct a Sequencer with the selected wait strategy and buffer size.
 * 
 * @param bufferSize the size of the buffer that this will sequence over.
 * @param waitStrategy for those waiting on sequences.
 */
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
项目:jstrom    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) {
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence) {
            return false;
        }
    }

    return true;
}
项目:jstrom    文件:MultiProducerSequencer.java   
/**
 * @see Sequencer#remainingCapacity()
 */
@Override
public long remainingCapacity() {
    long consumed = Util.getMinimumSequence(gatingSequences, cursor.get());
    long produced = cursor.get();
    return getBufferSize() - (produced - consumed);
}
项目:Tstream    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#next(int)
 */
@Override
public long next(int n)
{
    if (n < 1)
    {
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = pad.nextValue;

    long nextSequence = nextValue + n;
    long wrapPoint = nextSequence - bufferSize;
    long cachedGatingSequence = pad.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence;
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
        {
            if (AbstractSequencerExt.isWaitSleep()) {
                try {
        Thread.sleep(1);
    } catch (InterruptedException e) {
    }
            }else {
                LockSupport.parkNanos(1); 
            }
        }

        pad.cachedValue = minSequence;
    }

    pad.nextValue = nextSequence;

    return nextSequence;
}
项目:Tstream    文件:SingleProducerSequencer.java   
/**
 * @see Sequencer#remainingCapacity()
 */
@Override
public long remainingCapacity()
{
    long nextValue = pad.nextValue;

    long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
    long produced = nextValue;
    return getBufferSize() - (produced - consumed);
}
项目:Tstream    文件:MultiProducerSequencer.java   
/**
 * Construct a Sequencer with the selected wait strategy and buffer size.
 *
 * @param bufferSize the size of the buffer that this will sequence over.
 * @param waitStrategy for those waiting on sequences.
 */
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}