Python constants 模块,py() 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用constants.py()

项目:gae-sports-data    作者:jnguyen-ca    | 项目源码 | 文件源码
def __init__(self, league_id, start_date, end_date):
        """
        Args:
            league_id (string): a valid league_id from constants.py
            start_date (string): date string in iso 8601 format
            end_date (string): date string in iso 8601 format
        """
        self.league = league_id
        self.start_date = start_date
        self.end_date = end_date

        logging.info("Requesting %s for %s until %s" % (self.league, self.start_date, self.end_date))
        response = urlfetch.fetch(self.url)
        self.response_dict = json.loads(response.content)

        self.game_list = self.parse_game_list()
项目:mgr.p2p.proxy    作者:tomusdrw    | 项目源码 | 文件源码
def addContact(self, contact):
        """ Add the given contact to the correct k-bucket; if it already
        exists, its status will be updated

        @param contact: The contact to add to this node's k-buckets
        @type contact: kademlia.contact.Contact
        """
        if contact.id == self._parentNodeID:
            return

        # Initialize/reset the "successively failed RPC" counter
        contact.failedRPCs = 0

        bucketIndex = self._kbucketIndex(contact.id)
        try:
            self._buckets[bucketIndex].addContact(contact)
        except kbucket.BucketFull:
            # The bucket is full; see if it can be split (by checking if its range includes the host node's id)
            if self._buckets[bucketIndex].keyInRange(self._parentNodeID):
                self._splitBucket(bucketIndex)
                # Retry the insertion attempt
                self.addContact(contact)
            else:
                # We can't split the k-bucket
                # NOTE: This implementation follows section 4.1 of the 13 page version
                # of the Kademlia paper (optimized contact accounting without PINGs
                #- results in much less network traffic, at the expense of some memory)

                # Put the new contact in our replacement cache for the corresponding k-bucket (or update it's position if it exists already)
                if not self._replacementCache.has_key(bucketIndex):
                    self._replacementCache[bucketIndex] = []
                if contact in self._replacementCache[bucketIndex]:
                    self._replacementCache[bucketIndex].remove(contact)
                #TODO: Using k to limit the size of the contact replacement cache - maybe define a seperate value for this in constants.py?
                elif len(self._replacementCache) >= constants.k:
                    self._replacementCache.pop(0)
                self._replacementCache[bucketIndex].append(contact)
项目:mgr.p2p.proxy    作者:tomusdrw    | 项目源码 | 文件源码
def addContact(self, contact):
        """ Add the given contact to the correct k-bucket; if it already
        exists, its status will be updated

        @param contact: The contact to add to this node's k-buckets
        @type contact: kademlia.contact.Contact
        """
        if contact.id == self._parentNodeID:
            return

        # Initialize/reset the "successively failed RPC" counter
        contact.failedRPCs = 0

        bucketIndex = self._kbucketIndex(contact.id)
        try:
            self._buckets[bucketIndex].addContact(contact)
        except kbucket.BucketFull:
            # The bucket is full; see if it can be split (by checking if its range includes the host node's id)
            if self._buckets[bucketIndex].keyInRange(self._parentNodeID):
                self._splitBucket(bucketIndex)
                # Retry the insertion attempt
                self.addContact(contact)
            else:
                # We can't split the k-bucket
                # NOTE: This implementation follows section 4.1 of the 13 page version
                # of the Kademlia paper (optimized contact accounting without PINGs
                #- results in much less network traffic, at the expense of some memory)

                # Put the new contact in our replacement cache for the corresponding k-bucket (or update it's position if it exists already)
                if not self._replacementCache.has_key(bucketIndex):
                    self._replacementCache[bucketIndex] = []
                if contact in self._replacementCache[bucketIndex]:
                    self._replacementCache[bucketIndex].remove(contact)
                #TODO: Using k to limit the size of the contact replacement cache - maybe define a seperate value for this in constants.py?
                elif len(self._replacementCache) >= constants.k:
                    self._replacementCache.pop(0)
                self._replacementCache[bucketIndex].append(contact)
项目:mgr.p2p.proxy    作者:tomusdrw    | 项目源码 | 文件源码
def addContact(self, contact):
        """ Add the given contact to the correct k-bucket; if it already
        exists, its status will be updated

        @param contact: The contact to add to this node's k-buckets
        @type contact: kademlia.contact.Contact
        """
        if contact.id == self._parentNodeID:
            return

        # Initialize/reset the "successively failed RPC" counter
        contact.failedRPCs = 0

        bucketIndex = self._kbucketIndex(contact.id)
        try:
            self._buckets[bucketIndex].addContact(contact)
        except kbucket.BucketFull:
            # The bucket is full; see if it can be split (by checking if its range includes the host node's id)
            if self._buckets[bucketIndex].keyInRange(self._parentNodeID):
                self._splitBucket(bucketIndex)
                # Retry the insertion attempt
                self.addContact(contact)
            else:
                # We can't split the k-bucket
                # NOTE: This implementation follows section 4.1 of the 13 page version
                # of the Kademlia paper (optimized contact accounting without PINGs
                #- results in much less network traffic, at the expense of some memory)

                # Put the new contact in our replacement cache for the corresponding k-bucket (or update it's position if it exists already)
                if not self._replacementCache.has_key(bucketIndex):
                    self._replacementCache[bucketIndex] = []
                if contact in self._replacementCache[bucketIndex]:
                    self._replacementCache[bucketIndex].remove(contact)
                #TODO: Using k to limit the size of the contact replacement cache - maybe define a seperate value for this in constants.py?
                elif len(self._replacementCache) >= constants.k:
                    self._replacementCache.pop(0)
                self._replacementCache[bucketIndex].append(contact)
项目:p2p-network    作者:PeARSearch    | 项目源码 | 文件源码
def addContact(self, contact):
        """ Add the given contact to the correct k-bucket; if it already
        exists, its status will be updated

        @param contact: The contact to add to this node's k-buckets
        @type contact: kademlia.contact.Contact
        """
        if contact.id == self._parentNodeID:
            return

        # Initialize/reset the "successively failed RPC" counter
        contact.failedRPCs = 0

        bucketIndex = self._kbucketIndex(contact.id)
        try:
            self._buckets[bucketIndex].addContact(contact)
        except kbucket.BucketFull:
            # The bucket is full; see if it can be split (by checking if its range includes the host node's id)
            if self._buckets[bucketIndex].keyInRange(self._parentNodeID):
                self._splitBucket(bucketIndex)
                # Retry the insertion attempt
                self.addContact(contact)
            else:
                # We can't split the k-bucket
                # NOTE: This implementation follows section 4.1 of the 13 page version
                # of the Kademlia paper (optimized contact accounting without PINGs
                #- results in much less network traffic, at the expense of some memory)

                # Put the new contact in our replacement cache for the corresponding k-bucket (or update it's position if it exists already)
                if not self._replacementCache.has_key(bucketIndex):
                    self._replacementCache[bucketIndex] = []
                if contact in self._replacementCache[bucketIndex]:
                    self._replacementCache[bucketIndex].remove(contact)
                #TODO: Using k to limit the size of the contact replacement cache - maybe define a seperate value for this in constants.py?
                elif len(self._replacementCache) >= constants.k:
                    self._replacementCache.pop(0)
                self._replacementCache[bucketIndex].append(contact)
项目:regulately    作者:regulately    | 项目源码 | 文件源码
def get_category_documents(category, document_type, results_per_page, comments_open):
    """
    Get a list of documents in a particular category

    Explanation of additional parameters used in API query:
        - cp: comment period. Set to O (Open)
        - cs: comment period closing soon. Set to 90 (days until closing)

    :param str category: Category of docket. One of the keys to REGULATION_CATEGORIES stored in constants.py
    :param str document_type: One of N: Notice, PR: Proposed Rule, FR: Rule, O: Other, SR: Supporting & Related Material
        PS: Public Submission
    :param int results_per_page: Number of records to return per query (since we're not doing a paged query).
    :param bool comments_open: Return results that have comments open or not
    :return: A JSON object containing 'documents', a list of document records returned by the API and an added
        'category' field.
    """
    # The results per page search parameter can only take the following values
    rpp = min(r for r in [10, 25, 100, 500, 1000] if r > results_per_page)

    search_parameters = {'api_key': REG_API_KEY,
                         'cat': category,
                         'dct': document_type,
                         'rpp': rpp}
    if comments_open:
        search_parameters['cp'] = 'O'
        search_parameters['cs'] = 90
    else:
        search_parameters['cp'] = 'C'

    fetched_dockets = requests.get('https://api.data.gov/regulations/v3/documents', params=search_parameters)
    dockets_obj = fetched_dockets.json()

    return dockets_obj
项目:regulately    作者:regulately    | 项目源码 | 文件源码
def get_docket(document, category,):
    """
    Fetch a specific docket by id

    :param str document: A JSON object containing a document records returned by the regulations.gov API
    :param str category: Category of docket. One of the keys to REGULATION_CATEGORIES stored in constants.py
    :return: A string containing the JSON returned by the regulations.gov API. None if received a 'status' field (which
        indicates some kind of error)
    """
    if 'docketId' not in document:
        return None

    search_parameters = {'api_key': REG_API_KEY, 'docketId': document['docketId']}
    fetched_docket = requests.get('https://api.data.gov/regulations/v3/docket', params=search_parameters)

    if fetched_docket.status_code != 200:
        return None

    # Add category field and whether open for comment
    docket_obj = fetched_docket.json()
    docket_obj['categoryId'] = category
    docket_obj['category'] = constants.REGULATION_CATEGORIES[category]
    docket_obj['openForComment'] = document.get('openForComment')

    # Define date information for sorting
    docket_obj = add_sort_date(docket_obj, document.get('commentDueDate'))

    return docket_obj