Java 类com.lmax.disruptor.Sequence 实例源码

项目:disruptor-code-analysis    文件:MultiBufferBatchEventProcessor.java   
public MultiBufferBatchEventProcessor(
    DataProvider<T>[] providers,
    SequenceBarrier[] barriers,
    EventHandler<T> handler)
{
    if (providers.length != barriers.length)
    {
        throw new IllegalArgumentException();
    }

    this.providers = providers;
    this.barriers = barriers;
    this.handler = handler;

    this.sequences = new Sequence[providers.length];
    for (int i = 0; i < sequences.length; i++)
    {
        sequences[i] = new Sequence(-1);
    }
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(
                               new EventProcessorFactory<TestEvent>()
                               {
                                   @Override
                                   public EventProcessor createEventProcessor(
                                                                              final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
                                   {
                                       assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
                                       return new BatchEventProcessor<TestEvent>(
                                                                                 disruptor.getRingBuffer(), ringBuffer.newBarrier(
                                                                                                                                  barrierSequences), eventHandler);
                                   }
                               });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldHonourDependenciesForCustomProcessors() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

    disruptor.handleEventsWith(delayedEventHandler).then(
        new EventProcessorFactory<TestEvent>()
        {
            @Override
            public EventProcessor createEventProcessor(
                final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
            {
                assertSame("Should have had a barrier sequence", 1, barrierSequences.length);
                return new BatchEventProcessor<TestEvent>(
                    disruptor.getRingBuffer(), ringBuffer.newBarrier(
                    barrierSequences), eventHandler);
            }
        });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:camunda-bpm-reactor    文件:RingBufferProcessor.java   
private RingBufferProcessor(String name,
                            ExecutorService executor,
                            int bufferSize,
                            WaitStrategy waitStrategy,
                            boolean shared,
                            boolean autoCancel) {
  super(name, executor, autoCancel);

  this.ringBuffer = RingBuffer.create(
    shared ? ProducerType.MULTI : ProducerType.SINGLE,
    new EventFactory<MutableSignal<E>>() {
      @Override
      public MutableSignal<E> newInstance() {
        return new MutableSignal<E>();
      }
    },
    bufferSize,
    waitStrategy
  );

  this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
  this.barrier = ringBuffer.newBarrier();
  //ringBuffer.addGatingSequences(recentSequence);
}
项目:camunda-bpm-reactor    文件:RingBufferWorkProcessor.java   
private boolean replay(final boolean unbounded) {
  Sequence replayedSequence;
  MutableSignal<T> signal;
  while ((replayedSequence = processor.cancelledSequences.poll()) != null) {
    signal = processor.ringBuffer.get(replayedSequence.get() + 1L);
    try {
      if (signal.value == null) {
        barrier.waitFor(replayedSequence.get() + 1L);
      }
      readNextEvent(signal, unbounded);
      RingBufferSubscriberUtils.routeOnce(signal, subscriber);
      processor.ringBuffer.removeGatingSequence(replayedSequence);
    } catch (TimeoutException | InterruptedException | AlertException | CancelException ce) {
      processor.ringBuffer.removeGatingSequence(sequence);
      processor.cancelledSequences.add(replayedSequence);
      return true;
    }
  }
  return false;
}
项目:jstorm-0.9.6.3-    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:jstorm-0.9.6.3-    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
{
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:learn_jstorm    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:learn_jstorm    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
{
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:jstrom    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!", e);
        }
    }
}
项目:Tstream    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:Tstream    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue)
{
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence)
        {
            return false;
        }
    }

    return true;
}
项目:ddth-queue    文件:DisruptorQueue.java   
/**
 * Init method.
 * 
 * @return
 */
public DisruptorQueue<ID, DATA> init() {
    /* single producer "seems" to offer better performance */
    ringBuffer = RingBuffer.createSingleProducer(EVENT_FACTORY, ringSize);
    // ringBuffer = RingBuffer.createMultiProducer(EVENT_FACTORY, ringSize);

    if (!isEphemeralDisabled()) {
        int ephemeralBoundSize = Math.max(0, getEphemeralMaxSize());
        ephemeralStorage = new ConcurrentHashMap<>(
                ephemeralBoundSize > 0 ? Math.min(ephemeralBoundSize, ringSize) : ringSize);
    }

    consumedSeq = new Sequence();
    ringBuffer.addGatingSequences(consumedSeq);
    long cursor = ringBuffer.getCursor();
    consumedSeq.set(cursor);
    knownPublishedSeq = cursor;

    return this;
}
项目:jetstream    文件:SingleConsumerDisruptorQueue.java   
/**
 * Construct a blocking queue based on disruptor.
 * 
 * @param bufferSize
 *            ring buffer size
 * @param singleProducer
 *            whether only single thread produce events.
 */
public SingleConsumerDisruptorQueue(int bufferSize, boolean singleProducer) {
    if (singleProducer) {
        ringBuffer = RingBuffer.createSingleProducer(new Factory<T>(), normalizeBufferSize(bufferSize));
    } else {
        ringBuffer = RingBuffer.createMultiProducer(new Factory<T>(), normalizeBufferSize(bufferSize));
    }

    consumedSeq = new Sequence();
    ringBuffer.addGatingSequences(consumedSeq);
    barrier = ringBuffer.newBarrier();

    long cursor = ringBuffer.getCursor();
    consumedSeq.set(cursor);
    knownPublishedSeq = cursor;
}
项目:andes    文件:SleepingBlockingWaitStrategy.java   
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {
    if(cursorSequence.get() < sequence) {
        this.lock.lock();

        try {
            while(cursorSequence.get() < sequence) {
                barrier.checkAlert();
                this.processorNotifyCondition.await();
            }
        } finally {
            this.lock.unlock();
        }
    }

    long availableSequence;
    while((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        LockSupport.parkNanos(1L);
    }

    return availableSequence;
}
项目:disruptorqueue    文件:SingleConsumerDisruptorQueue.java   
/**
 * Construct a blocking queue based on disruptor.
 * 
 * @param bufferSize
 *            ring buffer size
 * @param singleProducer
 *            whether only single thread produce events.
 */
public SingleConsumerDisruptorQueue(int bufferSize, boolean singleProducer) {
    if (singleProducer) {
        ringBuffer = RingBuffer.createSingleProducer(new Factory<T>(), normalizeBufferSize(bufferSize));
    } else {
        ringBuffer = RingBuffer.createMultiProducer(new Factory<T>(), normalizeBufferSize(bufferSize));
    }

    consumedSeq = new Sequence();
    ringBuffer.addGatingSequences(consumedSeq);
    barrier = ringBuffer.newBarrier();

    long cursor = ringBuffer.getCursor();
    consumedSeq.set(cursor);
    knownPublishedSeq = cursor;
}
项目:dsys-snio    文件:RingBufferConsumer.java   
RingBufferConsumer(@Nonnull final RingBuffer<T> buffer, @Nonnull final Object[] attachments) {
    if (buffer == null) {
        throw new NullPointerException("buffer == null");
    }
    if (attachments == null) {
        throw new NullPointerException("attachments == null");
    }
    if (buffer.getBufferSize() != attachments.length) {
        throw new IllegalArgumentException("buffer.getBufferSize() != attachments.length");
    }
    this.buffer = buffer;
    this.attachments = attachments;
    this.barrier = buffer.newBarrier();
    this.sequence = new Sequence();
    buffer.addGatingSequences(sequence);
    this.cursor = sequence.get();
    this.available = sequence.get();
}
项目:jstorm    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait, boolean isBatch, int batchSize, long flushMs) {
    _queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    _isBatch = isBatch;
    _cache = new ArrayList<>();
    _inputBatchSize = batchSize;
    if (_isBatch) {
        _batcher = new ThreadLocalBatch();
        _flusher = new DisruptorFlusher(Math.max(flushMs, 1));
        _flusher.start();
    } else {
        _batcher = null;
    }
}
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * <p>Starts the event processors and returns the fully configured ring buffer.</p>
 * <p>
 * <p>The ring buffer is set up to prevent overwriting any entry that is yet to
 * be processed by the slowest event processor.</p>
 * <p>
 * <p>This method must only be called once after all event processors have been added.</p>
 *
 * @return the configured ring buffer.
 */
public RingBuffer<T> start() {
    final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
    ringBuffer.addGatingSequences(gatingSequences);

    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository) {
        consumerInfo.start(executor);
    }

    return ringBuffer;
}
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * Confirms if all messages have been consumed by all event processors
 */
private boolean hasBacklog() {
    final long cursor = ringBuffer.getCursor();
    for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false)) {
        if (cursor > consumer.get()) {
            return true;
        }
    }
    return false;
}
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers) {
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null) {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    if (processorSequences.length > 0) {
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }

    return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences, final EventProcessorFactory<T>[] processorFactories) {
    final EventProcessor[] eventProcessors = new EventProcessor[processorFactories.length];
    for (int i = 0; i < processorFactories.length; i++) {
        eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences);
    }
    return handleEventsWith(eventProcessors);
}
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createWorkerPool(
        final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) {
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
}
项目:disruptor-code-analysis    文件:EventHandlerGroup.java   
EventHandlerGroup(
    final Disruptor<T> disruptor,
    final ConsumerRepository<T> consumerRepository,
    final Sequence[] sequences)
{
    this.disruptor = disruptor;
    this.consumerRepository = consumerRepository;
    this.sequences = Arrays.copyOf(sequences, sequences.length);
}
项目:disruptor-code-analysis    文件:EventHandlerGroup.java   
/**
 * Create a new event handler group that combines the consumers in this group with <tt>otherHandlerGroup</tt>.
 *
 * @param otherHandlerGroup the event handler group to combine.
 * @return a new EventHandlerGroup combining the existing and new consumers into a single dependency group.
 */
public EventHandlerGroup<T> and(final EventHandlerGroup<T> otherHandlerGroup)
{
    final Sequence[] combinedSequences = new Sequence[this.sequences.length + otherHandlerGroup.sequences.length];
    System.arraycopy(this.sequences, 0, combinedSequences, 0, this.sequences.length);
    System
        .arraycopy(otherHandlerGroup.sequences, 0, combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length);
    return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
}
项目:disruptor-code-analysis    文件:EventHandlerGroup.java   
/**
 * Create a new event handler group that combines the handlers in this group with <tt>processors</tt>.
 *
 * @param processors the processors to combine.
 * @return a new EventHandlerGroup combining the existing and new processors into a single dependency group.
 */
public EventHandlerGroup<T> and(final EventProcessor... processors)
{
    Sequence[] combinedSequences = new Sequence[sequences.length + processors.length];

    for (int i = 0; i < processors.length; i++)
    {
        consumerRepository.add(processors[i]);
        combinedSequences[i] = processors[i].getSequence();
    }
    System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length);

    return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
}
项目:disruptor-code-analysis    文件:Util.java   
/**
 * Get an array of {@link Sequence}s for the passed {@link EventProcessor}s
 *
 * @param processors for which to get the sequences
 * @return the array of {@link Sequence}s
 */
public static Sequence[] getSequencesFor(final EventProcessor... processors) {
    Sequence[] sequences = new Sequence[processors.length];
    for (int i = 0; i < sequences.length; i++) {
        sequences[i] = processors[i].getSequence();
    }

    return sequences;
}
项目:disruptor-code-analysis    文件:ConsumerRepositoryTest.java   
@Before
public void setUp() throws Exception
{
    consumerRepository = new ConsumerRepository<TestEvent>();
    eventProcessor1 = mockery.mock(EventProcessor.class, "eventProcessor1");
    eventProcessor2 = mockery.mock(EventProcessor.class, "eventProcessor2");

    final Sequence sequence1 = new Sequence();
    final Sequence sequence2 = new Sequence();
    mockery.checking(
        new Expectations()
        {
            {
                allowing(eventProcessor1).getSequence();
                will(returnValue(sequence1));

                allowing(eventProcessor1).isRunning();
                will(returnValue(true));

                allowing(eventProcessor2).getSequence();
                will(returnValue(sequence2));

                allowing(eventProcessor2).isRunning();
                will(returnValue(true));
            }
        });
    handler1 = new SleepingEventHandler();
    handler2 = new SleepingEventHandler();

    barrier1 = mockery.mock(SequenceBarrier.class, "barrier1");
    barrier2 = mockery.mock(SequenceBarrier.class, "barrier2");
}
项目:disruptor-code-analysis    文件:ConsumerRepositoryTest.java   
@Test
public void shouldGetLastEventProcessorsInChain() throws Exception
{
    consumerRepository.add(eventProcessor1, handler1, barrier1);
    consumerRepository.add(eventProcessor2, handler2, barrier2);

    consumerRepository.unMarkEventProcessorsAsEndOfChain(eventProcessor2.getSequence());


    final Sequence[] lastEventProcessorsInChain = consumerRepository.getLastSequenceInChain(true);
    assertThat(lastEventProcessorsInChain.length, equalTo(1));
    assertThat(lastEventProcessorsInChain[0], sameInstance(eventProcessor1.getSequence()));
}
项目:disruptor-code-analysis    文件:UtilTest.java   
@Test
public void shouldReturnMinimumSequence()
{
    final Sequence[] sequences = new Sequence[3];

    context.setImposteriser(ClassImposteriser.INSTANCE);

    sequences[0] = context.mock(Sequence.class, "s0");
    sequences[1] = context.mock(Sequence.class, "s1");
    sequences[2] = context.mock(Sequence.class, "s2");

    context.checking(
        new Expectations()
        {
            {
                oneOf(sequences[0]).get();
                will(returnValue(Long.valueOf(7L)));

                oneOf(sequences[1]).get();
                will(returnValue(Long.valueOf(3L)));

                oneOf(sequences[2]).get();
                will(returnValue(Long.valueOf(12L)));
            }
        });

    Assert.assertEquals(3L, Util.getMinimumSequence(sequences));
}
项目:disruptor-code-analysis    文件:UtilTest.java   
@Test
public void shouldReturnLongMaxWhenNoEventProcessors()
{
    final Sequence[] sequences = new Sequence[0];

    Assert.assertEquals(Long.MAX_VALUE, Util.getMinimumSequence(sequences));
}
项目:gflogger    文件:LoggerServiceImpl.java   
public long waitFor(
    long sequence,
    Sequence cursor,
    Sequence dependentSequence,
    SequenceBarrier barrier
) throws AlertException, InterruptedException, TimeoutException {
    long availableSequence;
    if ((availableSequence = cursor.get()) < sequence) {
        flush();
        synchronized (lock) {
            ++numWaiters;
            while ((availableSequence = cursor.get()) < sequence) {
                if (state == State.STOPPED) {
                    disruptor.halt();
                    throw AlertException.INSTANCE;
                }
                barrier.checkAlert();
                //*/
                lock.wait();
                /*/
                Thread.sleep(1);
                //*/
            }
            --numWaiters;
        }
    }
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
    }

    return availableSequence;
}
项目:camunda-bpm-reactor    文件:RingBufferProcessor.java   
public RingBufferSubscription(Sequence pendingRequest,
                              Subscriber<? super E> subscriber,
                              BatchSignalProcessor<E> eventProcessor) {
  this.subscriber = subscriber;
  this.eventProcessor = eventProcessor;
  this.pendingRequest = pendingRequest;
}
项目:camunda-bpm-reactor    文件:RingBufferProcessor.java   
/**
 * Construct a {@link com.lmax.disruptor.EventProcessor} that will automatically track the progress by updating
 * its
 * sequence
 */
public BatchSignalProcessor(RingBufferProcessor<T> processor,
                            Sequence pendingRequest,
                            Subscriber<? super T> subscriber) {
  this.processor = processor;
  this.pendingRequest = pendingRequest;
  this.subscriber = subscriber;
}
项目:camunda-bpm-reactor    文件:ParkWaitStrategy.java   
@Override
public long waitFor(long sequence,
                    Sequence cursor,
                    Sequence dependentSequence,
                    SequenceBarrier barrier) throws AlertException,
  InterruptedException,
  TimeoutException {
  long availableSequence;
  while ((availableSequence = dependentSequence.get()) < sequence) {
    barrier.checkAlert();
    LockSupport.parkNanos(parkFor);
  }
  return availableSequence;
}
项目:perf-workshop    文件:SpinLoopHintBusySpinWaitStrategy.java   
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
{
    long availableSequence;

    while ((availableSequence = dependentSequence.get()) < sequence)
    {
        SpinHint.spinLoopHint();
        barrier.checkAlert();
    }

    return availableSequence;
}
项目:jstrom    文件:MultiProducerSequencer.java   
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) {
    long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
    long cachedGatingSequence = gatingSequenceCache.get();

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) {
        long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue);
        gatingSequenceCache.set(minSequence);

        if (wrapPoint > minSequence) {
            return false;
        }
    }

    return true;
}
项目:ddth-queue    文件:QndDisruptor.java   
public static void main(String[] args) throws Exception {
    ringBuffer = RingBuffer.createSingleProducer(LongEvent.FACTORY, 4);
    consumedSeq = new Sequence();
    ringBuffer.addGatingSequences(consumedSeq);
    long cursor = ringBuffer.getCursor();
    consumedSeq.set(cursor);
    knownPublishedSeq = cursor;

    for (int i = 0; i < 100; i++) {
        boolean status = put(ringBuffer, i);
        System.out.println(i + ": " + status);
        Long value = take();
        System.out.println("Took: " + value);
    }
}
项目:incubator-storm    文件:DisruptorQueue.java   
public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
    _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.setGatingSequences(_consumer);
    if(claim instanceof SingleThreadedClaimStrategy) {
        consumerStartedFlag = true;
    }
}
项目:storm-resa    文件:DisruptorQueue.java   
public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
    _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.setGatingSequences(_consumer);
    if(claim instanceof SingleThreadedClaimStrategy) {
        consumerStartedFlag = true;
    }
}