Java 类it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap 实例源码

项目:angel    文件:MatrixOpLogCache.java   
public MatrixOpLogCache() {
  opLogs = new ConcurrentHashMap<>();

  messageQueue = new PriorityBlockingQueue<OpLogMessage>(100, new PriorityComparator());
  seqIdToMessageMaps = new Int2ObjectOpenHashMap<Int2ObjectAVLTreeMap<OpLogMessage>>();
  waitedMessageQueues = new Int2ObjectOpenHashMap<LinkedBlockingQueue<OpLogMessage>>();
  flushListeners = new Int2ObjectOpenHashMap<List<OpLogMessage>>();
  seqIdGenerator = new AtomicInteger(0);
  mergingCounters = new Int2IntOpenHashMap();
  stopped = new AtomicBoolean(false);
  messageToFutureMap = new ConcurrentHashMap<OpLogMessage, Future<VoidResult>>();
}
项目:angel    文件:MatrixOpLogCache.java   
@Override
public void run() {
  while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
    OpLogMessage message;
    try {
      message = messageQueue.take();
    } catch (InterruptedException e) {
      LOG.warn("oplog-merge-dispatcher interrupted");
      return;
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("receive oplog request " + message);
    }

    int matrixId = message.getMatrixId();
    switch (message.getType()) {
      case MERGE: {
        // If the matrix op log cache does not exist for the matrix, create a new one for the
        // matrix
        // and add it to cache maps
        if (!opLogs.containsKey(matrixId)) {
          mergingCounters.put(matrixId, 0);
          addMatrixOpLog(matrixId);
        }

        if (!seqIdToMessageMaps.containsKey(matrixId)) {
          seqIdToMessageMaps.put(matrixId, new Int2ObjectAVLTreeMap<OpLogMessage>());
        }

        // Add the message to the tree map
        if (!seqIdToMessageMaps.get(matrixId).containsKey(message.getSeqId())) {
          seqIdToMessageMaps.get(matrixId).put(message.getSeqId(), message);
        }

        if (needWait(message.getMatrixId(), message.getSeqId())) {
          // If there are flush / clock requests blocked, we need to put this merge request into
          // the waiting queue
          LOG.debug("add message=" + message + " to wait queue");
          addWaitQueue(message);
        } else {
          // Launch a merge worker to merge the update to matrix op log cache
          mergingCounters.addTo(message.getMatrixId(), 1);
          merge((OpLogMergeMessage) message);
        }
        break;
      }

      case MERGE_SUCCESS: {
        if (LOG.isDebugEnabled()) {
          LOG.debug(printMergingCounters());
        }
        // Remove the message from the tree map
        seqIdToMessageMaps.get(matrixId).remove(message.getSeqId());
        mergingCounters.addTo(message.getMatrixId(), -1);

        // Wake up blocked flush/clock request
        checkAndWakeUpListeners(message.getMatrixId());
        break;
      }

      case CLOCK:
      case FLUSH: {
        // Add flush/clock request to listener list to waiting for all the existing
        // updates are merged
        addToListenerList(message);
        // Wake up blocked flush/clock request
        checkAndWakeUpListeners(message.getMatrixId());
        break;
      }
    }
  }
}