Python utils 模块,get_time() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用utils.get_time()

项目:BaiduCloudHelper    作者:yp05327    | 项目源码 | 文件源码
def get_token(self):
        '''
        ??getapi??token

        :returns: False?????
        '''

        passport_getapi_url = passport_url + 'getapi&tpl=netdisk&apiver=v3&tt=%s' % utils.get_time()
        passport_getapi_url += '&class=login&logintype=basicLogin&callback=bd__cbs__' + utils.get_callback_function()
        passport_getapi_response = self.get_response(passport_getapi_url)

        json = utils.get_json_from_response(passport_getapi_response)

        try:
            json = eval(json[0])
            self.token = json['data']['token']
        except Exception:
            utils.show_msg(traceback.print_exc())
            utils.show_msg('???Can\'t get passport getapi\'s response json.')
            return False

        return True
项目:RFR-solution    作者:baoblackcoal    | 项目源码 | 文件源码
def play(self, n_step=10000, n_episode=1000, test_ep=0.05, render=False):
    if test_ep == None:
      test_ep = self.ep_end

    test_history = History(self.config, self.ob_shape_list)

    if not self.display:
      gym_dir = '/tmp/%s-%s' % (self.env_name, get_time())
      self.env.env.monitor.start(gym_dir)

    best_reward, best_idx = 0, 0
    for idx in xrange(n_episode):
      screen, reward, action, terminal = self.env.new_random_game()
      current_reward = 0

      for _ in range(self.history_length):
        test_history.add(screen)

      for t in tqdm(range(n_step), ncols=70):
        # 1. predict
        action = self.predict(test_history.get(), test_ep)
        # 2. act
        screen, reward, terminal = self.env.act(action, is_training=False)
        # 3. observe
        test_history.add(screen)

        current_reward += reward
        if terminal:
          break

      if current_reward > best_reward:
        best_reward = current_reward
        best_idx = idx

      print "=" * 30
      print " [%d] Best reward : %d" % (best_idx, best_reward)
      print "=" * 30

    if not self.display:
      self.env.env.monitor.close()
      # gym.upload(gym_dir, writeup='https://github.com/devsisters/DQN-tensorflow', api_key='')
项目:BaiduCloudHelper    作者:yp05327    | 项目源码 | 文件源码
def do_pan_api(self, api, args):
        '''
        ??????api

        :param api: ?????api
        :param args: string????
        :returns: ??True or False
        '''
        api_url = pan_api_url + api + '?'
        api_url += 'channel=chunlei&clienttype=0&web=1&t=%s' % utils.get_time()
        api_url += '&bdstoken=' + self.token

        for arg in args:
            api_url += '&%s=%s' % (arg, args[arg])

        pan_api_response = self.get_response(api_url)

        json = pan_api_response

        try:
            json = eval(json)
            errno = str(json['errno'])

            if errno == '0':
                return json['list']
        except Exception:
            utils.show_msg(traceback.print_exc())
            utils.show_msg("??:Can't get pan api:" + api + " response json.")
            return False

        # ????
        utils.show_msg('??:?????api?' + api + '?????????' + errno + '??????' + errmsg.get_errmsg_by_errno(errno))
        return False
项目:BaiduCloudHelper    作者:yp05327    | 项目源码 | 文件源码
def get_list(self, dir, page=None, page_size=None, order='name', desc='1'):
        '''
        ?????????????

        ????????string??

        :param dir?????
        :param page????
        :param page_size??????????20
        :param order: ????
                      ???time  ????
                           name  ???
                           size  ??????????
        :param desc?1????0?????????
        :returns: dict???????server_filename?path?unicode???????False
        '''

        args = {
            "_": utils.get_time(),
            "dir": urllib.quote(dir),
            "order": order,
            "desc" : desc,
        }

        if page is not None:
            args['page'] = page
        if page_size is not None:
            args['num'] = page_size

        result = self.do_pan_api('list', args)

        if result != False:
            for file in result:
                file['server_filename'] = eval('u\'' + file['server_filename'] + '\'')
                file['path'] = eval('u\'' + file['path'] + '\'.replace(\'\\\\\',\'\')')

            self.file_list[dir] = result

        return result
项目:BaiduCloudHelper    作者:yp05327    | 项目源码 | 文件源码
def get_baiducloudclient_url(self, dir):
        headers = {
            'User-Agent': 'netdisk;2.1.0;pc;pc-mac;10.12.5;macbaiduyunguanjia'
        }

        # ??????????????????
        url = 'https://pan.baidu.com/rest/2.0/membership/speeds/freqctrl'
        postdata = {
            'method': 'consume'
        }
        '''
        get????????????freq_cnt=1?????
        consume?????
        '''

        try:
            responese = self.get_response(url, post_data=postdata, headers=headers)

        except Exception:
            utils.show_msg(traceback.print_exc())
            utils.show_msg('???Get file size failed.url %s.' % url)
            return False

        # ????
        url = 'https://d.pcs.baidu.com/rest/2.0/pcs/file?time=' + utils.get_time() + '&clienttype=21&version=2.1.0&vip=0&method=locatedownload&app_id=250528&esl=1&ver=4.0&dtype=1&ehps=1&check_blue=1&path=' + dir + '&err_ver=1.0'

        try:
            response = self.get_response(url, headers=headers)

        except Exception:
            utils.show_msg(traceback.print_exc())
            utils.show_msg('???Get file size failed.url %s.' % url)
            return False

        # ?????url
        url_info = json.loads(response)

        return url_info['urls'][0]['url']
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def rpush(self, msg, timeout=0):
        self._conn.rpush(self.queue, msg)
        if timeout:
            msg = "%s%s"%(msg,get_uuid())
            self.zadd(get_time()+timeout ,msg)
            return msg
        return True
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def zrangebyscore(self):
        res = self._conn.zrangebyscore(self.ack_queue , 0, get_time())
        if not isinstance(res,list):
            res = [res]
        return res

#    def sadd(self, queue, value):
#        return self._conn.sadd(queue, value)
#
#    def spop(self, queue):
#        msg = self._conn.spop(queue)
#        return msg if msg else msg
#
#    def srem(self, queue, value):
#        return self._conn.srem(queue, value)
项目:async-rl-tensorflow    作者:datavizweb    | 项目源码 | 文件源码
def play(self, sv, is_chief, n_step=10000, n_episode=100, test_ep=None, render=False):
    if test_ep == None:
      test_ep = self.ep_end

    test_history = History(self.config)

    if not self.display:
      gym_dir = '/tmp/%s-%s' % (self.env_name, get_time())
      self.env.env.monitor.start(gym_dir)

    best_reward, best_idx = 0, 0
    for idx in xrange(n_episode):
      screen, reward, action, terminal = self.env.new_random_game()
      current_reward = 0

      for _ in xrange(self.history_length):
        test_history.add(screen)

      for t in tqdm(xrange(n_step), ncols=70):
        # 1. predict
        action = self.predict(test_history.get(), test_ep)
        # 2. act
        screen, reward, terminal = self.env.act(action, is_training=False)
        # 3. observe
        test_history.add(screen)

        current_reward += reward
        if terminal:
          break

      if current_reward > best_reward:
        best_reward = current_reward
        best_idx = idx

      print "="*30
      print " [%d] Best reward : %d" % (best_idx, best_reward)
      print "="*30

    if not self.display:
      self.env.env.monitor.close()
      #gym.upload(gym_dir, writeup='https://github.com/devsisters/DQN-tensorflow', api_key='')
项目:BaiduCloudHelper    作者:yp05327    | 项目源码 | 文件源码
def check_login(self, username):
        '''
        ?????????token?codestring

        :param username: ???
        :returns: ??????string????codestring
                  0????None?????
        '''

        response =  self.get_response(home_url)

        if response == '':
            return False
        else:
            # ??dv
            try:
                tmp = re.findall('id=\"dv_Input\" type=\"hidden\" value=\"(.*?)\"', response)

            except Exception:
                utils.show_msg(traceback.print_exc())
                utils.show_msg('???Can\'t get dv_Input.')
                return False

        codestring = None

        if not self.get_token():
            return False

        # logincheck
        passport_logincheck_url = passport_url + 'logincheck&&token=%s' % self.token
        passport_logincheck_url += '&tpl=netdisk&apiver=v3&tt=%s' % utils.get_time()
        passport_logincheck_url += '&username=%s' % urllib.quote(username)
        passport_logincheck_url += '&isphone=false&callback=bd__cbs__' + utils.get_callback_function()
        passport_logincheck_response = self.get_response(passport_logincheck_url)

        json = utils.get_json_from_response(passport_logincheck_response)

        try:
            json = eval(json[0])
            codeString = json['data']['codeString']

        except Exception:
            utils.show_msg(traceback.print_exc())
            utils.show_msg('??:Can\'t get passport logincheck\'s response json.')
            return False

        return codeString
项目:ERL    作者:NoListen    | 项目源码 | 文件源码
def play(self, n_step=10000, n_episode=100, test_ep=True, render=False):
    if test_ep == None:
      test_ep = self.ep_end

    test_history = History(self.config)

    if not self.display:
      gym_dir = './tmp/%s-%s' % (self.env_name, get_time())
      # self.env.env.monitor.start(gym_dir)
      monitor = gym.wrappers.Monitor(self.env.env, gym_dir)

    best_reward, best_idx = 0, 0
    ep_rewards = []
    for idx in tqdm(xrange(n_episode),ncols=70):
      screen = monitor.reset()
      screen = imresize(rgb2gray(screen), (110, 84))
      screen = screen[18:102, :]
      current_reward = 0

      # if not os.path.exists("fuck/epoch%i" % idx):
        # os.mkdir("fuck/epoch%i" % idx)
      for _ in range(self.history_length):
        test_history.add(screen)

      for t in range(n_step):
        # 1. predict
        action = self.predict(test_history.get(), test_ep)
        # 2. act
        screen, reward, terminal, _ = monitor.step(action)
        screen = imresize(rgb2gray(screen), (110, 84))
        screen = screen[18:102, :]
        # 3. observe
        test_history.add(screen)

        current_reward += reward
        if terminal:

          break

      print "GET REWARD", current_reward
      ep_rewards.append(current_reward)
      if current_reward > best_reward:
        best_reward = current_reward
        best_idx = idx


    print "="*30
    print " [%d] Best reward : %d" % (best_idx, best_reward),
    print "Average reward: %f" %  np.mean(ep_rewards)
    print "="*30

    if not self.display:
      monitor.close()
      #gym.upload(gym_dir, writeup='https://github.com/devsisters/DQN-tensorflow', api_key='')
项目:ERL    作者:NoListen    | 项目源码 | 文件源码
def play(self, n_step=10000, n_episode=100, test_ep=None, render=False):
    if test_ep == None:
      test_ep = self.ep_end

    test_history = History(self.config)

    if not self.display:
      gym_dir = './tmp/%s-%s' % (self.env_name, get_time())
      # self.env.env.monitor.start(gym_dir)
      monitor = gym.wrappers.Monitor(self.env.env, gym_dir)

    best_reward, best_idx = 0, 0
    for idx in tqdm(xrange(n_episode),ncols=70):
      screen, reward, action, terminal = self.env.new_random_game()
      current_reward = 0

      for _ in range(self.history_length):
        test_history.add(screen)

      for t in range(n_step):
        # 1. predict
        action = self.predict(test_history.get(), test_ep)
        # 2. act
        screen, reward, terminal = self.env.act(action, is_training=False)
        # 3. observe
        test_history.add(screen)

        current_reward += reward
        if terminal:
          break

      if current_reward > best_reward:
        best_reward = current_reward
        best_idx = idx

    print "="*30
    print " [%d] Best reward : %d" % (best_idx, best_reward)
    print "="*30

    if not self.display:
      monitor.close()
      #gym.upload(gym_dir, writeup='https://github.com/devsisters/DQN-tensorflow', api_key='')