Python heapq 模块,heappushpop() 实例源码


项目:SN-influence-maximization    作者:doinab    | 项目源码 | 文件源码
def high_degree_nodes(k, G):

    if nx.is_directed(G):
        my_degree_function = G.out_degree
        my_degree_function =

    # the top k nodes to be returned; initialization with first k elements
    H = [(my_degree_function(i), i) for i in G.nodes()[0:k]] 

    # iterate through the remaining nodes; add to heap if larger than heap root
    for i in G.nodes()[k:]: 
        if my_degree_function(i) > H[0][0]:
            hq.heappushpop(H, (my_degree_function(i), i))

    return H

# Generator variant of high_degree_nodes(k, G)
# More time-efficient for range calls if k log k > log V
# Generates _all_ sets of i nodes of highest degree, with 1 <= i <= k
# -> Time complexity: O(V log (V))
# -> Memory complexity: Theta(V)
项目:DrQA    作者:facebookresearch    | 项目源码 | 文件源码
def search_docs(inputs, max_ex=5, opts=None):
    """Given a set of document ids (returned by ranking for a question), search
    for top N best matching (by heuristic) paragraphs that contain the answer.
    if not opts:
        raise RuntimeError('Options dict must be supplied.')

    doc_ids, q_tokens, answer = inputs
    examples = []
    for i, doc_id in enumerate(doc_ids):
        for j, paragraph in enumerate(re.split(r'\n+', fetch_text(doc_id))):
            found = find_answer(paragraph, q_tokens, answer, opts)
            if found:
                # Reverse ranking, giving priority to early docs + paragraphs
                score = (found[0], -i, -j, random.random())
                if len(examples) < max_ex:
                    heapq.heappush(examples, (score, found[1]))
                    heapq.heappushpop(examples, (score, found[1]))
    return [e[1] for e in examples]
项目:DrQA_cn    作者:AmoseKang    | 项目源码 | 文件源码
def search_docs(inputs, max_ex=5, opts=None):
    """Given a set of document ids (returned by ranking for a question), search
    for top N best matching (by heuristic) paragraphs that contain the answer.
    if not opts:
        raise RuntimeError('Options dict must be supplied.')

    doc_ids, q_tokens, answer = inputs
    examples = []
    for i, doc_id in enumerate(doc_ids):
        for j, paragraph in enumerate(re.split(r'\n+', fetch_text(doc_id))):
            found = find_answer(paragraph, q_tokens, answer, opts)
            if found:
                # Reverse ranking, giving priority to early docs + paragraphs
                score = (found[0], -i, -j, random.random())
                if len(examples) < max_ex:
                    heapq.heappush(examples, (score, found[1]))
                    heapq.heappushpop(examples, (score, found[1]))
    return [e[1] for e in examples]
项目:nova-lxd    作者:openstack    | 项目源码 | 文件源码
def _addResult(self, test, *args):
            name =
        except AttributeError:
            name = 'Unknown.unknown'
        test_class, test_name = name.rsplit('.', 1)

        elapsed = (self._now() - self.start_time).total_seconds()
        item = (elapsed, test_class, test_name)
        if len(self.slow_tests) >= self.num_slow_tests:
            heapq.heappushpop(self.slow_tests, item)
            heapq.heappush(self.slow_tests, item)

        self.results.setdefault(test_class, [])
        self.results[test_class].append((test_name, elapsed) + args)
        self.last_time[test_class] = self._now()
项目:ranger-agent    作者:openstack    | 项目源码 | 文件源码
def _addResult(self, test, *args):
            name =
        except AttributeError:
            name = 'Unknown.unknown'
        test_class, test_name = name.rsplit('.', 1)

        elapsed = (self._now() - self.start_time).total_seconds()
        item = (elapsed, test_class, test_name)
        if len(self.slow_tests) >= self.num_slow_tests:
            heapq.heappushpop(self.slow_tests, item)
            heapq.heappush(self.slow_tests, item)

        self.results.setdefault(test_class, [])
        self.results[test_class].append((test_name, elapsed) + args)
        self.last_time[test_class] = self._now()
项目:LIS-Tempest    作者:LIS    | 项目源码 | 文件源码
def _addResult(self, test, *args):
            name =
        except AttributeError:
            name = 'Unknown.unknown'
        test_class, test_name = name.rsplit('.', 1)

        elapsed = (self._now() - self.start_time).total_seconds()
        item = (elapsed, test_class, test_name)
        if len(self.slow_tests) >= self.num_slow_tests:
            heapq.heappushpop(self.slow_tests, item)
            heapq.heappush(self.slow_tests, item)

        self.results.setdefault(test_class, [])
        self.results[test_class].append((test_name, elapsed) + args)
        self.last_time[test_class] = self._now()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _addResult(self, test, *args):
            name =
        except AttributeError:
            name = 'Unknown.unknown'
        test_class, test_name = name.rsplit('.', 1)

        elapsed = (self._now() - self.start_time).total_seconds()
        item = (elapsed, test_class, test_name)
        if len(self.slow_tests) >= self.num_slow_tests:
            heapq.heappushpop(self.slow_tests, item)
            heapq.heappush(self.slow_tests, item)

        self.results.setdefault(test_class, [])
        self.results[test_class].append((test_name, elapsed) + args)
        self.last_time[test_class] = self._now()
项目:pytorch-dist    作者:apaszke    | 项目源码 | 文件源码
def call_plugins(self, queue_name, time, *args):
        args = (time,) + args
        queue = self.plugin_queues[queue_name]
        if len(queue) == 0:
        while queue[0][0] <= time:
            plugin = queue[0][2]
            getattr(plugin, queue_name)(*args)
            for trigger in plugin.trigger_interval:
                if trigger[1] == queue_name:
                    interval = trigger[0]
            new_item = (time + interval, queue[0][1], plugin)
            heapq.heappushpop(queue, new_item)
项目:keras-image-captioning    作者:danieljl    | 项目源码 | 文件源码
def add(self, item):
        if item is None:
        if len(self._heap) < self._n:
            heapq.heappush(self._heap, item)
            heapq.heappushpop(self._heap, item)
项目:pytorch-visdom    作者:alexsax    | 项目源码 | 文件源码
def call_plugins(self, queue_name, time, *args):
        args = (time,) + args
        queue = self.plugin_queues[queue_name]
        if len(queue) == 0:
        while queue[0][0] <= time:
            plugin = queue[0][2]
            getattr(plugin, queue_name)(*args)
            for trigger in plugin.trigger_interval:
                if trigger[1] == queue_name:
                    interval = trigger[0]
            new_item = (time + interval, queue[0][1], plugin)
            heapq.heappushpop(queue, new_item)
项目:Skeleton-key    作者:feiyu1990    | 项目源码 | 文件源码
def push(self, x):
        """Pushes a new element."""
        assert self._data is not None
        if len(self._data) < self._n:
            heapq.heappush(self._data, x)
            heapq.heappushpop(self._data, x)
项目:JetsonTX1_im2txt    作者:Netzeband    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:samplernn-pytorch    作者:deepsound-project    | 项目源码 | 文件源码
def call_plugins(self, queue_name, time, *args):
        args = (time,) + args
        queue = self.plugin_queues[queue_name]
        if len(queue) == 0:
        while queue[0][0] <= time:
            plugin = queue[0][2]
            getattr(plugin, queue_name)(*args)
            for trigger in plugin.trigger_interval:
                if trigger[1] == queue_name:
                    interval = trigger[0]
            new_item = (time + interval, queue[0][1], plugin)
            heapq.heappushpop(queue, new_item)
项目:pytorch    作者:tylergenter    | 项目源码 | 文件源码
def call_plugins(self, queue_name, time, *args):
        args = (time,) + args
        queue = self.plugin_queues[queue_name]
        if len(queue) == 0:
        while queue[0][0] <= time:
            plugin = queue[0][2]
            getattr(plugin, queue_name)(*args)
            for trigger in plugin.trigger_interval:
                if trigger[1] == queue_name:
                    interval = trigger[0]
            new_item = (time + interval, queue[0][1], plugin)
            heapq.heappushpop(queue, new_item)
项目:tefla    作者:openAGI    | 项目源码 | 文件源码
def push(self, x):
        """Pushes a new element."""
        assert self._data is not None
        if len(self._data) < self._n:
            heapq.heappush(self._data, x)
            heapq.heappushpop(self._data, x)
项目:tefla    作者:openAGI    | 项目源码 | 文件源码
def push(self, x):
        """Pushes a new element."""
        assert self._data is not None
        if len(self._data) < self._n:
            heapq.heappush(self._data, x)
            heapq.heappushpop(self._data, x)
项目:pytorch-coriander    作者:hughperkins    | 项目源码 | 文件源码
def call_plugins(self, queue_name, time, *args):
        args = (time,) + args
        queue = self.plugin_queues[queue_name]
        if len(queue) == 0:
        while queue[0][0] <= time:
            plugin = queue[0][2]
            getattr(plugin, queue_name)(*args)
            for trigger in plugin.trigger_interval:
                if trigger[1] == queue_name:
                    interval = trigger[0]
            new_item = (time + interval, queue[0][1], plugin)
            heapq.heappushpop(queue, new_item)
项目:interviews    作者:alexksikes    | 项目源码 | 文件源码
def nsmallest(n, a):
    h = []
    for e in a[:n]:
        heapq.heappush(h, -e)  # we use a max heap but push -1 * e
    for e in a[n:]:
        heapq.heappushpop(h, -e)
    return -heapq.heappop(h)
项目:seq2seq.pytorch    作者:eladhoffer    | 项目源码 | 文件源码
def push(self, x):
        """Pushes a new element."""
        assert self._data is not None
        if len(self._data) < self._n:
            heapq.heappush(self._data, x)
            heapq.heappushpop(self._data, x)
项目:im2txt_api    作者:mainyaa    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:Kaggler    作者:qqgeogor    | 项目源码 | 文件源码
def shuf_file(f, shuf_win):
    heap = []
    for line in f:
        key = hash(line)
        if len(heap) < shuf_win:
            heapq.heappush(heap, (key, line))
            _, out = heapq.heappushpop(heap, (key, line))
            yield out

    while len(heap) > 0:
        _, out = heapq.heappop(heap)
        yield out
项目:im2latex    作者:dhoman01    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:semsearch    作者:sanjana-Bijoe    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:py-leetcode    作者:coolmich    | 项目源码 | 文件源码
def findKthLargest(self, nums, k):
        :type nums: List[int]
        :type k: int
        :rtype: int
        # heap = []
        # for num in nums:
        #     if len(heap) < k:
        #         heapq.heappush(heap, num)
        #     else:
        #         heapq.heappushpop(heap, num)
        # return heap[0]
        def select_sort(s, e):
            l, r = s, e - 1
            while l < r:
                while l < r and nums[l] < nums[e]: l += 1
                while l < r and nums[r] >= nums[e]: r -= 1
                nums[l], nums[r] = nums[r], nums[l]
            nums[l], nums[e] = nums[e], nums[l]
            return l
        s, e = 0, len(nums)-1
        i, k = select_sort(s, e), len(nums)-k
        print i
        print nums
        while i != k:
            # print '{},{}'.format(i, k)
            if i < k: s = i + 1
            else: e = i - 1
            i = select_sort(s, e)
        return nums[i]
项目:pytorch    作者:ezyang    | 项目源码 | 文件源码
def call_plugins(self, queue_name, time, *args):
        args = (time,) + args
        queue = self.plugin_queues[queue_name]
        if len(queue) == 0:
        while queue[0][0] <= time:
            plugin = queue[0][2]
            getattr(plugin, queue_name)(*args)
            for trigger in plugin.trigger_interval:
                if trigger[1] == queue_name:
                    interval = trigger[0]
            new_item = (time + interval, queue[0][1], plugin)
            heapq.heappushpop(queue, new_item)
项目:jacksearch    作者:srome    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:im2txt    作者:huichen    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:pytorch    作者:pytorch    | 项目源码 | 文件源码
def call_plugins(self, queue_name, time, *args):
        args = (time,) + args
        queue = self.plugin_queues[queue_name]
        if len(queue) == 0:
        while queue[0][0] <= time:
            plugin = queue[0][2]
            getattr(plugin, queue_name)(*args)
            for trigger in plugin.trigger_interval:
                if trigger[1] == queue_name:
                    interval = trigger[0]
            new_item = (time + interval, queue[0][1], plugin)
            heapq.heappushpop(queue, new_item)
项目:Video-Captioning    作者:hehefan    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:Video-Captioning    作者:hehefan    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:Video-Captioning    作者:hehefan    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:im2txt_demo    作者:siavash9000    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:bnpy    作者:bnpy    | 项目源码 | 文件源码
def add_to_ranked_target_data(RankedDataHeap, maxSize, Data, weights,
    docIDs = np.arange(Data.nDoc)

    # First, decide which docs are promising,
    #  since we don't want to blow-up memory costs by using *all* docs
    if len(RankedDataHeap) > 0:
        cutoffThr = RankedDataHeap[0][0]
        if keep == 'largest':
            docIDs = np.argsort(-1 * weights)[:maxSize]
            docIDs = docIDs[weights[docIDs] > cutoffThr]
            docIDs = np.argsort(weights)[:maxSize]
            docIDs = docIDs[weights[docIDs] < cutoffThr]

    if len(docIDs) < 1:

    # For promising docs, convert to list-of-tuples format,
    #   and add to the heap
    if keep == 'largest':
        tList = Data.to_list_of_tuples(docIDs, w=weights)
        tList = Data.to_list_of_tuples(docIDs, w=-1 * weights)
    for docID, unitTuple in enumerate(tList):
            if len(RankedDataHeap) >= maxSize:
                heapq.heappushpop(RankedDataHeap, unitTuple)
                heapq.heappush(RankedDataHeap, unitTuple)
        except ValueError as error:
            # skip stupid errors related to duplicate weights

# sample_target_data
项目:tf-tutorial    作者:zchen0211    | 项目源码 | 文件源码
def push(self, x):
    """Pushes a new element."""
    assert self._data is not None
    if len(self._data) < self._n:
      heapq.heappush(self._data, x)
      heapq.heappushpop(self._data, x)
项目:implicit    作者:benfred    | 项目源码 | 文件源码
def explain(self, userid, user_items, itemid, user_weights=None, N=10):
        """ Provides explanations for why the item is liked by the user.

        userid : int
            The userid to explain recommendations for
        user_items : csr_matrix
            Sparse matrix containing the liked items for the user
        itemid : int
            The itemid to explain recommendations for
        user_weights : ndarray, optional
            Precomputed Cholesky decomposition of the weighted user liked items.
            Useful for speeding up repeated calls to this function, this value
            is returned
        N : int, optional
            The number of liked items to show the contribution for

        total_score : float
            The total predicted score for this user/item pair
        top_contributions : list
            A list of the top N (itemid, score) contributions for this user/item pair
        user_weights : ndarray
            A factorized representation of the user. Passing this in to
            future 'explain' calls will lead to noticeable speedups
        # user_weights = Cholesky decomposition of Wu^-1
        # from section 5 of the paper CF for Implicit Feedback Datasets
        user_items = user_items.tocsr()
        if user_weights is None:
            A, _ = user_linear_equation(self.item_factors, self.YtY,
                                        user_items, userid,
                                        self.regularization, self.factors)
            user_weights = scipy.linalg.cho_factor(A)
        seed_item = self.item_factors[itemid]

        # weighted_item = y_i^t W_u
        weighted_item = scipy.linalg.cho_solve(user_weights, seed_item)

        total_score = 0.0
        h = []
        for i, (itemid, confidence) in enumerate(nonzeros(user_items, userid)):
            factor = self.item_factors[itemid]
            # s_u^ij = (y_i^t W^u) y_j
            score = * confidence
            total_score += score
            contribution = (score, itemid)
            if i < N:
                heapq.heappush(h, contribution)
                heapq.heappushpop(h, contribution)

        items = (heapq.heappop(h) for i in range(len(h)))
        top_contributions = list((i, s) for s, i in items)[::-1]
        return total_score, top_contributions, user_weights
项目:cc-crawl-statistics    作者:commoncrawl    | 项目源码 | 文件源码
def stats_reducer(self, key, values):
        outputType = CST(key[0])
        item = key[1]
        crawl = MonthlyCrawl.to_name(key[2])
        if outputType in (CST.size, CST.new_items,
                          CST.size_estimate, CST.size_robotstxt):
            verbose_key = (, CST(item).name, crawl)
            if outputType in (CST.size, CST.size_robotstxt):
                val = sum(values)
            elif outputType == CST.new_items:
                val = MultiCount.sum_values(values)
            elif outputType == CST.size_estimate:
                # already "reduced" in count job
                for val in values:
            yield verbose_key, val
        elif outputType == CST.histogram:
            yield((, CST(item).name, crawl,
                   CST(key[3]).name, key[4]), sum(values))
        elif outputType in (CST.mimetype, CST.mimetype_detected, CST.scheme,
                            CST.surt_domain, CST.tld, CST.domain,,
                            CST.http_status, CST.robotstxt_status):
            item = key[1]
            for counts in values:
                page_count = MultiCount.get_count(0, counts)
                url_count = MultiCount.get_count(1, counts)
                if outputType in (CST.domain, CST.surt_domain, CST.tld):
                    host_count = MultiCount.get_count(2, counts)
                if (self.options.min_domain_frequency <= 1 or
                    outputType not in (, CST.domain,
                    self.counters[(,, crawl)] += 1
                                   crawl,, page_count)] += 1
                                   crawl,, url_count)] += 1
                    if outputType in (CST.domain, CST.surt_domain, CST.tld):
                                       crawl,, host_count)] += 1
                if outputType == CST.tld:
                    domain_count = MultiCount.get_count(3, counts)
                                   crawl,, domain_count)] += 1
                if outputType in (CST.domain,, CST.surt_domain):
                    outKey = (, crawl)
                    outVal = (page_count, url_count, item)
                    if outputType in (CST.domain, CST.surt_domain):
                        outVal = (page_count, url_count, host_count, item)
                    # take most common
                    if len(self.mostfrequent[outKey]) < self.options.max_hosts:
                        heapq.heappush(self.mostfrequent[outKey], outVal)
                        heapq.heappushpop(self.mostfrequent[outKey], outVal)
                    yield((, item, crawl), counts)
            logging.error('Unhandled type {}\n'.format(outputType))