Python time.time 模块,sleep() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用time.time.sleep()

项目:zsky    作者:wenguonideshou    | 项目源码 | 文件源码
def recvall(the_socket, timeout=5):
    the_socket.setblocking(0)
    total_data = []
    data = ""
    begin = time()

    while True:
        sleep(0.05)
        if total_data and time()-begin > timeout:
            break
        elif time()-begin > timeout*2:
            break
        try:
            data = the_socket.recv(1024)
            if data:
                total_data.append(data)
                begin = time()
        except Exception:
            pass
    return "".join(total_data)
项目:chxanalys    作者:yugangzhang    | 项目源码 | 文件源码
def displayResult(self):
        fig = plt.figure(101)
        plt.subplot(221)
        plt.imshow(np.abs(self.reconstruction),origin='lower')
        plt.draw()
        plt.title('Reconstruction Magnitude')
        plt.subplot(222)
        plt.imshow(np.angle(self.reconstruction),origin='lower')
        plt.draw()
        plt.title('Reconstruction Phase')
        plt.subplot(223)
        plt.imshow(np.abs(self.aperture),origin='lower')
        plt.title("Aperture Magnitude")
        plt.draw()
        plt.subplot(224)
        plt.imshow(np.angle(self.aperture),origin='lower')
        plt.title("Aperture Phase")
        plt.draw()
        fig.canvas.draw()


        #fig.tight_layout()
        # display.display(fig)
        # display.clear_output(wait=True)
        # time.sleep(.00001)
项目:assetsweeper    作者:guardian    | 项目源码 | 文件源码
def do_real_import(self, vsfile, filepath,mdXML,import_tags):
        """
        Make the import call to vidispine, and wait for self._importer_timeout seconds for the job to complete.
        Raises a VSException representing the job error if the import job fails, or ImportStalled if the timeout occurs
        :param vsfile: VSFile object to import
        :param filepath: filepath of the VSFile
        :param mdXML: compiled metadata XML to import alongside the media
        :param import_tags: shape tags describing required transcodes
        :return: None
        """
        import_job = vsfile.importToItem(mdXML, tags=import_tags, priority="LOW", jobMetadata={"gnm_app": "vsingester"})

        job_start_time = time.time()
        close_sent = False
        while import_job.finished() is False:
            self.logger.info("\tJob status is %s" % import_job.status())
            if time.time() - job_start_time > self._importer_timeout:
                self.logger.error("\tJob has taken more than {0} seconds to complete, concluding that it must be stalled.".format(self._importer_timeout))
                import_job.abort()
                self.logger.error("\tSent abort signal to job")
                raise ImportStalled(filepath)
            if time.time() - job_start_time > self._close_file_timeout and not close_sent:
                vsfile.setState("CLOSED")
            sleep(5)
            import_job.update(noraise=False)
项目:CoPilot-InfotainmentSystem    作者:Joelzeller    | 项目源码 | 文件源码
def read_temp():
    global temp_f
    lines = read_temp_raw()
    while lines[0].strip()[-3:] != 'YES':
        #time.sleep(0.2)
        lines = read_temp_raw()
    equals_pos = lines[1].find('t=')
    if equals_pos != -1:
        temp_string = lines[1][equals_pos+2:]
        temp_c_raw = float(temp_string) / 1000.0
        temp_f_raw = temp_c_raw * 9.0 / 5.0 + 32.0
        temp_f = "{0:.0f}".format(temp_f_raw) #only whole numbers


#_________________________________________________________________
        #VARIABLES
项目:marconibot    作者:s4w3d0ff    | 项目源码 | 文件源码
def wait(i=10):
    """ Wraps 'time.sleep()' with logger output """
    logger.debug('Waiting %d sec... (%.2fmin)', i, i / 60.0)
    sleep(i)
项目:zsky    作者:wenguonideshou    | 项目源码 | 文件源码
def auto_send_find_node(self):
        wait = 1.0 / self.max_node_qsize
        while True:
            try:
                node = self.nodes.popleft()
                self.send_find_node((node.ip, node.port), node.nid)
            except IndexError:
                pass
            try:
                sleep(wait)
            except KeyboardInterrupt:
                os._exit(0)
项目:zsky    作者:wenguonideshou    | 项目源码 | 文件源码
def fetch_torrent(session, ih, timeout):
        name = ih.upper()
        url = 'magnet:?xt=urn:btih:%s' % (name,)
        data = ''
        params = {
            'save_path': '/tmp/downloads/',
            'storage_mode': lt.storage_mode_t(2),
            'paused': False,
            'auto_managed': False,
            'duplicate_is_error': True}
        try:
            handle = lt.add_magnet_uri(session, url, params)
        except:
            return None
        status = session.status()
        #print 'downloading metadata:', url
        handle.set_sequential_download(1)
        meta = None
        down_time = time.time()
        down_path = None
        for i in xrange(0, timeout):
            if handle.has_metadata():
                info = handle.get_torrent_info()
                down_path = '/tmp/downloads/%s' % info.name()
                #print 'status', 'p', status.num_peers, 'g', status.dht_global_nodes, 'ts', status.dht_torrents, 'u', status.total_upload, 'd', status.total_download
                meta = info.metadata()
                break
            time.sleep(1)
        if down_path and os.path.exists(down_path):
            os.system('rm -rf "%s"' % down_path)
        session.remove_torrent(handle)
        return meta
项目:CellsCycle    作者:AQuadroTeam    | 项目源码 | 文件源码
def _setToSlaveThread(logger,settings,  cache, master,url, queue, hostState):
    if(not master):
        return

    socket =  zmq.Context.instance().socket(zmq.PUSH)

    import time
    while hostState["current"] is None:
        logger.debug("cannot send to slave, net info: "+ str(hostState["current"]))
        time.sleep(1)

    slaveAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort())
    socket.connect(slaveAddress)
    oldAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort())

    logger.debug("Finally I'm configured")
    while True:

        objToSend = queue.get()

        if(slaveAddress != None):
            sended = False
            while( not sended):
                try:
                    slaveAddress = "tcp://"+hostState["current"].slave.ip + ":"+ str(settings.getSlaveSetPort())
                    if(slaveAddress != oldAddress):
                        oldAddress = slaveAddress
                        socket =  zmq.Context.instance().socket(zmq.PUSH)
                        socket.connect(slaveAddress)
                        logger.debug("Change of slave:" + slaveAddress)

                    socket.send(dumps(Command(SETCOMMAND, objToSend.key, objToSend.value)))

                    if(settings.isVerbose()):
                        logger.debug("sended current key to slave: "+str(objToSend.key) +" to " + str(slaveAddress))
                    sended = True
                except Exception as e:
                    logger.warning("error in slave: " + str(e))
项目:nc-backup-py    作者:ChinaNetCloud    | 项目源码 | 文件源码
def send_post_report(self):
        """Send post report to a given URL."""
        data_post = self.create_post_report()
        count = 1
        time_retry = 60
        message_config_command = self.__json_dict['GENERAL']['MESSAGE_CONFIG_COMMAND']
        message_config_method = self.__json_dict['GENERAL']['MESSAGE_CONFIG_METHOD']
        while count <= 5:
            request_to_brt = Communications.send_message(
                Communications(),
                data_post,
                message_config_command,
                message_config_method)
            self.__logger.info(
                'Report sent status: ' +
                str(request_to_brt.status_code) +
                ' <===> ' + request_to_brt.reason)
            print 'Response from server:'
            attempt_notification = 'Attempt: ' + str(count)
            print attempt_notification
            self.__logger.info(attempt_notification)
            print (request_to_brt.status_code, request_to_brt.reason)
            self.__logger.info('Server response: ' + str(request_to_brt.status_code) + ' ' + str(request_to_brt.reason))
            # this should make the script wait for 60s (1min), 120s (2min), 360s (6min), 1440s (24min), 7200s (2h)
            time_retry = time_retry * count
            count = count + 1
            if request_to_brt.status_code == 200:
                self.__logger.info('Sent')
                break
            elif request_to_brt.status_code != 200 and count is not 5:
                attempt_failed_notification = 'The attempt to send report failed. Attempt number ' + \
                                              str(count) + ' will be in: ' + str(time_retry/60) + ' minutes.'
                print attempt_failed_notification
                self.__logger.warning(attempt_failed_notification)
                time.sleep(time_retry)
            elif count == 5 and request_to_brt.status_code != 200:
                attempt_failed_notification = 'Last attempt to send report FAILED, please check connectivity to BRT'
                print attempt_failed_notification
                self.__logger.critical(attempt_failed_notification)
                exit(1)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def patch_time():
    """Replace :func:`time.sleep` with :func:`gevent.sleep`."""
    from gevent.hub import sleep
    import time
    patch_item(time, 'sleep', sleep)
项目:isard    作者:isard-vdi    | 项目源码 | 文件源码
def polling_status(self):

        self.status_obj = UpdateStatus(self.hyp_id, self.hostname, port=self.port, user=self.user)

        while self.stop is not True:
            self.status_obj.update_status_hyps_rethink()
            interval = 0.0
            while interval < self.polling_interval:
                sleep(0.1)
                interval = interval + 0.1
                if self.stop is True:
                    break
项目:isard    作者:isard-vdi    | 项目源码 | 文件源码
def polling_status(self):

        self.status_obj = UpdateStatus(self.hyp_id, self.hostname, port=self.port, user=self.user)

        while self.stop is not True:
            self.status_obj.update_status_hyps_rethink()
            interval = 0.0
            while interval < self.polling_interval:
                sleep(0.1)
                interval = interval + 0.1
                if self.stop is True:
                    break
项目:grical    作者:wikical    | 项目源码 | 文件源码
def test_clone_of_event( self ):
        """ checks that all classes with references to Event implement a clone
        method with ``self`` and ``event`` as only parameters.
        """
        pass # FIXME
        # TODO check also a clone of another clone

    # TODO {{{2
    #This test can't work with sqlite, because sqlite not support multiusers.
    #It is recomended to use this in future
    # def test_visibility_in_thread(self):
    #     "testing visibility public and private events in thread"
    #     class TestThread(threading.Thread):
    #         "thread with random delay"
    #         def __init__(self, test, user_nr):
    #             self.user_nr = user_nr
    #             self.test = test
    #             threading.Thread.__init__(self)
    #         def run(self):
    #             time.sleep(random.randint(0, 100)/100.0)
    #             self.test.user_test_visibility(self.user_nr)
    #     for user_nr in range(USERS_COUNT):
    #         thread = TestThread(self, user_nr)
    #         thread.start()
    #     for second in range(20, 0, -1):
    #         print "wait %d seconds     \r" % second,
    #         time.sleep(1)

    # TODO: test that a notification email is sent to all members of a group
    # when a new event is added to the group. See class Membership in
    # events/models.py
项目:PyTrader    作者:didw    | 项目源码 | 文件源码
def save_table(self, code, date):
        TR_REQ_TIME_INTERVAL = 4
        time.sleep(TR_REQ_TIME_INTERVAL)
        data_81 = self.wrapper.get_data_opt10081(code, date)
        time.sleep(TR_REQ_TIME_INTERVAL)
        data_86 = self.wrapper.get_data_opt10086(code, date)
        col_86 = ['???', '???', '??(??)', '???', '??', '??', '????', '???', '????',
                  '???', '????', '????', '????', '?????', '?????', '?????', '?????']
        data = pd.concat([data_81, data_86.loc[:, col_86]], axis=1)
        #con = sqlite3.connect("../data/stock.db")
        try:
            data = data.loc[data.index > int(self.kiwoom.start_date.strftime("%Y%m%d"))]
            #orig_data = pd.read_sql("SELECT * FROM '%s'" % code, con, index_col='??').sort_index()
            orig_data = pd.read_hdf("../data/hdf/%s.hdf" % code, 'day').sort_index()
            end_date = orig_data.index[-1]
            orig_data = orig_data.loc[orig_data.index < end_date]
            data = data.loc[data.index >= end_date]
            data = pd.concat([orig_data, data], axis=0)
        except (FileNotFoundError, IndexError) as e:
            print(e)
            pass
        finally:
            data.index.name = '??'
            if len(data) != 0:
                #data.to_sql(code, con, if_exists='replace')
                data.to_hdf('../data/hdf/%s.hdf'%code, 'day', mode='w')
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def patch_time():
    """Replace :func:`time.sleep` with :func:`gevent.sleep`."""
    from gevent.hub import sleep
    import time
    patch_item(time, 'sleep', sleep)
项目:AutoTrade    作者:changye    | 项目源码 | 文件源码
def balance(self):
        counter = 0
        while (counter < self.__retry_time) and (not self._get_balance()):
            sleep(self.__retry_interval)
            counter += 1
        return self.__balance

    # @property
项目:AutoTrade    作者:changye    | 项目源码 | 文件源码
def stock_position(self):
        counter = 0
        while (counter < self.__retry_time) and (not self._get_position()):
            sleep(self.__retry_interval)
            counter += 1
        return self.__stock_position

    # @property
项目:AutoTrade    作者:changye    | 项目源码 | 文件源码
def cancel_list(self):
        counter = 0
        while (counter < self.__retry_time) and (not self._get_cancel_list()):
            sleep(self.__retry_interval)
            counter += 1
        return self.__cancel_list

    # @property
项目:AutoTrade    作者:changye    | 项目源码 | 文件源码
def entrust_list(self):
        counter = 0
        while (counter < self.__retry_time) and (not self._get_today_entrust()):
            sleep(self.__retry_interval)
            counter += 1
        return self.__entrust_list

    # @property
项目:AutoTrade    作者:changye    | 项目源码 | 文件源码
def trade_list(self):
        counter = 0
        while (counter < self.__retry_time) and (not self._get_today_trade()):
            sleep(self.__retry_interval)
            counter += 1
        return self.__trade_list

    # ?????
项目:CellsCycle    作者:AQuadroTeam    | 项目源码 | 文件源码
def _memoryMetricatorThread(logger, cache, settings, master, timing):
    if master:
        period = settings.getScalePeriod()

        setScaleDownLevel   = settings.getSetScaleDownLevel()   if settings.getSetScaleDownLevel()  >0 else -float("inf")
        setScaleUpLevel     = settings.getSetScaleUpLevel()     if settings.getSetScaleUpLevel()    >0 else  float("inf")
        getScaleDownLevel   = settings.getGetScaleDownLevel()   if settings.getGetScaleDownLevel()  >0 else -float("inf")
        getScaleUpLevel     = settings.getGetScaleUpLevel()     if settings.getGetScaleUpLevel()    >0 else  float("inf")

        logger.debug("Metricator alive, period: "+ str(period) +"s, getThrLevel: [" +str(getScaleDownLevel) +"," + str(getScaleUpLevel)+ "], setThrLevel: [" + str(setScaleDownLevel) + "," + str(setScaleUpLevel) + "]"  )

        # this channel is necessary to send scale up/down requests
        internal_channel = InternalChannel(addr='127.0.0.1', port=settings.getIntPort(), logger=logger)
        internal_channel.generate_internal_channel_client_side()
        from random import gauss
        sleep(60)
        while True:

            sleep(abs(gauss(period, period/10)))
            locked = timing["setters"][0].isTransferring()
            setMean = 1.0 - timing["setters"][0].calcMean()
            getMean = 0.0
            for metr in timing["getters"]:
                getMean += 1.0 - metr.calcMean()
            getMean = getMean / settings.getGetterThreadNumber()

            logger.debug("Working time for setters: " + str(setMean) + ", getters (mean): " + str(getMean) )

            # scale up needed
            if getMean >= getScaleUpLevel or setMean >= setScaleUpLevel and not locked:

                logger.debug("Requests for scale Up!")
                # call scale up service
                ListThread.notify_scale_up(internal_channel)
                # self.list_communication_thread.notify_scale_up()

            # scale down needed
            elif getMean <= getScaleDownLevel and setMean <= setScaleDownLevel and not locked:

                logger.debug("Requests for scale Down!")
                # call scale down service
                ListThread.notify_scale_down(internal_channel)
                # self.list_communication_thread.notify_scale_down()
项目:subDomainsBrute    作者:0xa-saline    | 项目源码 | 文件源码
def getsearchresult_url(ip, url):  # ??????????????
    try:
        html = gethtml(url)
        if not html:
            print '??????url:%s' % url
            return 'error'
        if html == 'error':  # ??????
            return 'error'
        root = etree.HTML(html)
        lines = root.xpath('//*[@id="b_results"]')  # ????
        if not lines:
            return 'error'
        else:
            lines = lines[0]
        urlandtitle = []
        for l in lines:
            url = l.xpath('.//h2/a/@href')
            title = l.xpath('.//h2/a/text()')
            if url and title:
                url = url[0]
                parser = urlparse(url)
                netloc = parser.netloc
                if netloc == 'ip.chinaz.com':
                    pass
                else:
                    title = title[0]
                    urlandtitle.append({'url': url, 'title': title})
            else:
                pass
        urls_temp = {}
        urls_temp = urlandtitle

        for u in urls_temp:
            title = u['title']
            parser = urlparse(u['url'])
            url = parser.scheme + '://' + parser.netloc + '/'
            if url not in check:
                end.append({'url': url, 'title': title})
                check.append(url)
        next_page = lines.xpath('.//*[@class="sb_pagN"]/@href')
        # time.sleep(1)

        if len(next_page) > 0:
            url = 'https://www.bing.com'+next_page[0]
            return 0,url,end
        else:
            return 1,None,end
    except Exception, e:
        print e
        return 'error'