Python urllib.request 模块,Request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request.Request()

项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def download(self, url, retry_count=3, headers=None, proxy=None, data=None):
        if url is None:
            return None
        try:
            req = request.Request(url, headers=headers, data=data)
            cookie = cookiejar.CookieJar()
            cookie_process = request.HTTPCookieProcessor(cookie)
            opener = request.build_opener()
            if proxy:
                proxies = {urlparse(url).scheme: proxy}
                opener.add_handler(request.ProxyHandler(proxies))
            content = opener.open(req).read()
        except error.URLError as e:
            print('HtmlDownLoader download error:', e.reason)
            content = None
            if retry_count > 0:
                if hasattr(e, 'code') and 500 <= e.code < 600:
                    #??? HTTPError ??? HTTP CODE ? 5XX ???????????????????
                    return self.download(url, retry_count-1, headers, proxy, data)
        return content
项目:TuShare    作者:andyzsf    | 项目源码 | 文件源码
def get_cpi():
    """
        ????????????
    Return
    --------
    DataFrame
        month :????
        cpi :????
    """
    rdint = vs.random()
    request = Request(vs.MACRO_URL%(vs.P_TYPE['http'], vs.DOMAINS['sina'],
                                    rdint, vs.MACRO_TYPE[1], 0, 600,
                                    rdint))
    text = urlopen(request,timeout=10).read()
    text = text.decode('gbk') if ct.PY3 else text
    regSym = re.compile(r'\,count:(.*?)\}')
    datastr = regSym.findall(text)
    datastr = datastr[0]
    datastr = datastr.split('data:')[1]
    js = json.loads(datastr)
    df = pd.DataFrame(js, columns=vs.CPI_COLS)
    df['cpi'] = df['cpi'].astype(float)
    return df
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def workthread(item, user_agent,path):
    strurl = 'http://yxpjw.club'+item[0]
    picname = item[1]
    print('????%s...........................\n' %(picname))
    req = request.Request(strurl)
    req.add_header('User-Agent',user_agent)
    response = request.urlopen(req)
    content = response.read().decode('gbk')
    strurl2 = re.search(r'^(.*)/',strurl).group(0)
    print('https headers...............%s'%(strurl2))
    #destname = os.path.join(path,picname+'.txt')
    #with open(destname, 'w',encoding='gbk') as file:
        #file.write(content)
    destdir = os.path.join(path,picname)
    os.makedirs(destdir)
    page = 1
    while(1):
        content = getpagedata(content,destdir,page,strurl2)
        if not content:
            break
        page = page + 1
    print('%s?????????\n'%(picname))
项目:TuShare    作者:andyzsf    | 项目源码 | 文件源码
def get_loan_rate():
    """
        ????????
    Return
    --------
    DataFrame
        date :????
        loan_type :????
        rate:???%?
    """
    rdint = vs.random()
    request = Request(vs.MACRO_URL%(vs.P_TYPE['http'], vs.DOMAINS['sina'],
                                    rdint, vs.MACRO_TYPE[2], 3, 800,
                                    rdint))
    text = urlopen(request, timeout=10).read()
    text = text.decode('gbk')
    regSym = re.compile(r'\,count:(.*?)\}')
    datastr = regSym.findall(text)
    datastr = datastr[0]
    datastr = datastr.split('data:')[1]
    js = json.loads(datastr)
    df = pd.DataFrame(js, columns=vs.LOAN_COLS)
    for i in df.columns:
        df[i] = df[i].apply(lambda x:np.where(x is None, '--', x))
    return df
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getAbstractInfo(self):

        try:
            req = request.Request(self.url)
            req.add_header('User-Agent', self.user_agent)
            response = request.urlopen(req)
            #bytes?????
            content = response.read().decode('gbk')
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 8)
            #bytes?????
            content = response.read().decode('utf-8')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(url, user_agent):
    try:
        req = request.Request(url)
        req.add_header('User-Agent', user_agent)
        response = request.urlopen(req,timeout = 8)
        #bytes?????
        content = response.read().decode('gbk')
    except error.URLError as e:
        if hasattr(e,'code'):
            print (e.code)
        if hasattr(e,'reason'):
            print (e.reason)
    except error.HTTPError as e:
        if hasattr(e,'code'):
            print(e.code)
        if hasattr(e,'reason'):
            print(e.reason)
        print('HTTPError!!!')

    return content
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def requestData(self,url, user_agent):
        try:
            req = request.Request(url)
            req.add_header('User-Agent', user_agent)
            response = request.urlopen(req,timeout = 3)
            #bytes?????
            content = response.read().decode('gbk')
            return content
        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            if hasattr(e,'code'):
                print(e.code)
            if hasattr(e,'reason'):
                print(e.reason)
            print('HTTPError!!!')
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getAbstractInfo(self):

        try:
            req = request.Request(self.url)
            req.add_header('User-Agent', self.user_agent)
            response = request.urlopen(req)
            #bytes?????
            content = response.read().decode('gbk')
            self.getDetailList(content)

        except error.URLError as e:
            if hasattr(e,'code'):
                print (e.code)
            if hasattr(e,'reason'):
                print (e.reason)
        except error.HTTPError as e:
            print('HTTPError!!!')
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_download_and_verify_ok(self, mock_urlopen):
        mock_extract_tarball = self.mock_patch_object(
            self.glance.utils, 'extract_tarball')
        mock_md5 = mock.Mock()
        mock_md5.hexdigest.return_value = 'expect_cksum'
        mock_md5_new = self.mock_patch_object(
            self.glance.md5, 'new', mock_md5)
        mock_info = mock.Mock()
        mock_info.getheader.return_value = 'expect_cksum'
        mock_urlopen.return_value.info.return_value = mock_info
        fake_request = urllib2.Request('http://fakeurl.com')

        self.glance._download_tarball_and_verify(
            fake_request, 'fake_staging_path')
        mock_urlopen.assert_called_with(fake_request)
        mock_extract_tarball.assert_called_once()
        mock_md5_new.assert_called_once()
        mock_info.getheader.assert_called_once()
        mock_md5_new.return_value.hexdigest.assert_called_once()
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_download_ok_verify_failed(self, mock_urlopen):
        mock_extract_tarball = self.mock_patch_object(
            self.glance.utils, 'extract_tarball')
        mock_md5 = mock.Mock()
        mock_md5.hexdigest.return_value = 'unexpect_cksum'
        mock_md5_new = self.mock_patch_object(
            self.glance.md5, 'new', mock_md5)
        mock_info = mock.Mock()
        mock_info.getheader.return_value = 'expect_cksum'
        mock_urlopen.return_value.info.return_value = mock_info
        fake_request = urllib2.Request('http://fakeurl.com')

        self.assertRaises(self.glance.RetryableError,
                          self.glance._download_tarball_and_verify,
                          fake_request, 'fake_staging_path'
                          )
        mock_urlopen.assert_called_with(fake_request)
        mock_extract_tarball.assert_called_once()
        mock_md5_new.assert_called_once()
        mock_md5_new.return_value.hexdigest.assert_called_once()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def compute_dependencies(self, filename=REQUIRES):
        text = Utils.readf(filename)
        data = safe_urlencode([('text', text)])

        if '--offline' in sys.argv:
            self.constraints = self.local_resolve(text)
        else:
            req = Request(get_resolve_url(), data)
            try:
                response = urlopen(req, timeout=TIMEOUT)
            except URLError as e:
                Logs.warn('The package server is down! %r' % e)
                self.constraints = self.local_resolve(text)
            else:
                ret = response.read()
                try:
                    ret = ret.decode('utf-8')
                except Exception:
                    pass
                self.trace(ret)
                self.constraints = parse_constraints(ret)
        self.check_errors()
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def execute(self):
        if hasattr(Context.g_module, 'publish'):
            Context.Context.execute(self)
        mod = Context.g_module

        rfile = getattr(self, 'rfile', send_package_name())
        if not os.path.isfile(rfile):
            self.fatal('Create the release file with "waf release" first! %r' % rfile)

        fdata = Utils.readf(rfile, m='rb')
        data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)])

        req = Request(get_upload_url(), data)
        response = urlopen(req, timeout=TIMEOUT)
        data = response.read().strip()

        if sys.hexversion>0x300000f:
            data = data.decode('utf-8')

        if data != 'ok':
            self.fatal('Could not publish the package %r' % data)
项目:waybackscraper    作者:abrenaut    | 项目源码 | 文件源码
def list_archive_timestamps(url, min_date, max_date, user_agent):
    """
    List the available archive between min_date and max_date for the given URL
    """
    logger.info('Listing the archives for the url {url}'.format(url=url))

    # Construct the URL used to download the memento list
    parameters = {'url': url,
                  'output': 'json',
                  'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT),
                  'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)}
    cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters))

    req = Request(cdx_url, None, {'User-Agent': user_agent})
    with urlopen(req) as cdx:
        memento_json = cdx.read().decode("utf-8")

        timestamps = []
        # Ignore the first line which contains column names
        for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]:
            # Ignore archives with a status code != OK
            if status_code == '200':
                timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT))

    return timestamps
项目:CMSpider    作者:chengyu2333    | 项目源码 | 文件源码
def fetch_file(self, url, filename):
        # if not os.path.exists(filename):
        #     os.makedirs(filename)
        try:
            req = request.Request(url, headers=self.__headers)
            data = request.urlopen(req).read()
            with open(filename, 'wb') as f:
                f.write(data)
                f.flush()
                f.close()
                self.__url_manager.set_url_status(url, 2)
        except Exception as e:
            self.__url_manager.set_url_status(url, -1)
            raise e
        finally:
            time.sleep(config['basic']['sleep'])
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_csv(self,url):
        '''
        Retrieve data from the Veneer service, at the given url path, in CSV format.

        url: Path to required resource, relative to the root of the Veneer service.

        NOTE: CSV responses are currently only available for time series results
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"})
        text = urlopen(req).read().decode('utf-8')

        result = utils.read_veneer_csv(text)
        if PRINT_ALL:
            print(result)
            print("")
        return result
项目:securedrop-reachability-monitor    作者:freedomofpress    | 项目源码 | 文件源码
def read_directory(self, directory_url):
        """Parses the SecureDrop directory into a dictionary of instance
        details."""
        # CloudFlare will block us if we don't set user-agent
        dir_req = Request(directory_url)
        dir_req.add_header("User-Agent", 
                           "Mozilla/5.0 (Windows NT 6.1; rv:45.0) "
                           "Gecko/20100101 Firefox/45.0")
        directory = urlopen(dir_req).read().decode()

        instances = []
        for line in directory.splitlines()[1:-1]:
            fields = line.split("\t")
            instances.append(dict(organization=fields[0],
                                  landing_page=fields[1],
                                  ths_address=fields[2]))

        return instances
项目:instagram_private_api    作者:ping    | 项目源码 | 文件源码
def respond_to_checkpoint(self, response_code):
        headers = {
            'User-Agent': self.USER_AGENT,
            'Origin': 'https://i.instagram.com',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US',
            'Accept-Encoding': 'gzip',
            'Referer': self.endpoint,
            'Cookie': self.cookie,
        }

        req = Request(self.endpoint, headers=headers)
        data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code}
        res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout)

        if res.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(res.read())
            content = gzip.GzipFile(fileobj=buf).read().decode('utf-8')
        else:
            content = res.read().decode('utf-8')

        return res.code, content
项目:tvlinker    作者:ozmartian    | 项目源码 | 文件源码
def add_uri(self) -> None:
        user, passwd = '', ''
        if len(self.rpc_username) > 0 and len(self.rpc_password) > 0:
            user = self.rpc_username
            passwd = self.rpc_password
        elif len(self.rpc_secret) > 0:
            user = 'token'
            passwd = self.rpc_secret
        aria2_endpoint = '%s:%s/jsonrpc' % (self.rpc_host, self.rpc_port)
        headers = {'Content-Type': 'application/json'}
        payload = json.dumps({'jsonrpc': '2.0', 'id': 1, 'method': 'aria2.addUri',
                              'params': ['%s:%s' % (user, passwd), [self.link_url]]},
                             sort_keys=False).encode('utf-8')
        try:
            req = Request(aria2_endpoint, headers=headers, data=payload)
            res = urlopen(req).read().decode('utf-8')
            jsonres = json.loads(res)
            # res = requests.post(aria2_endpoint, headers=headers, data=payload)
            # jsonres = res.json()
            self.aria2Confirmation.emit('result' in jsonres.keys())
        except HTTPError:
            print(sys.exc_info())
            QMessageBox.critical(self, 'ERROR NOTIFICATION', sys.exc_info(), QMessageBox.Ok)
            self.aria2Confirmation.emit(False)
            # self.exit()
项目:ask_data_science    作者:AngelaVC    | 项目源码 | 文件源码
def __init__(self, url=None):
        self.url = url
        self.html = None
        self.links = []
        self.soup = None
        self.text = None
        self.title = None

        req = Request(self.url, headers={'User-Agent': "Magic Browser"})
        try:
            self.html = urlopen(req)
        except URLError as e:
            if hasattr(e, 'reason'):
                print('We failed to reach a server.')
                print('Reason: ', e.reason)

            elif hasattr(e, 'code'):
                print('The server couldn\'t fulfill the request.')
                print('Error code: ', e.code)
项目:TuShare    作者:andyzsf    | 项目源码 | 文件源码
def _day_cinema(date=None, pNo=1, retry_count=3, pause=0.001):
    ct._write_console()
    for _ in range(retry_count):
        time.sleep(pause)
        try:
            request = Request(ct.BOXOFFICE_CBD%(ct.P_TYPE['http'], ct.DOMAINS['mbox'],
                              ct.BOX, pNo, date))
            lines = urlopen(request, timeout = 10).read()
            if len(lines) < 15: #no data
                return None
        except Exception as e:
            print(e)
        else:
            js = json.loads(lines.decode('utf-8') if ct.PY3 else lines)
            df = pd.DataFrame(js['data1'])
            df = df.drop(['CinemaID'], axis=1)
            return df
项目:TuShare    作者:andyzsf    | 项目源码 | 文件源码
def _get_detail(tag, retry_count=3, pause=0.001):
    for _ in range(retry_count):
        time.sleep(pause)
        try:
            ct._write_console()
            request = Request(ct.SINA_DATA_DETAIL_URL%(ct.P_TYPE['http'],
                                                               ct.DOMAINS['vsf'], ct.PAGES['jv'],
                                                               tag))
            text = urlopen(request, timeout=10).read()
            text = text.decode('gbk')
        except _network_error_classes:
            pass
        else:
            reg = re.compile(r'\,(.*?)\:') 
            text = reg.sub(r',"\1":', text) 
            text = text.replace('"{symbol', '{"symbol')
            text = text.replace('{symbol', '{"symbol"')
            jstr = json.dumps(text)
            js = json.loads(jstr)
            df = pd.DataFrame(pd.read_json(js, dtype={'code':object}), columns=ct.THE_FIELDS)
            df = df[ct.FOR_CLASSIFY_B_COLS]
            return df
        raise IOError(ct.NETWORK_URL_ERROR_MSG)
项目:TuShare    作者:andyzsf    | 项目源码 | 文件源码
def _sz_hz(date='', retry_count=3, pause=0.001):
    for _ in range(retry_count):
        time.sleep(pause)
        ct._write_console()
        try:
            request = Request(rv.MAR_SZ_HZ_URL%(ct.P_TYPE['http'], ct.DOMAINS['szse'],
                                    ct.PAGES['szsefc'], date))
            lines = urlopen(request, timeout = 10).read()
            if len(lines) <= 200:
                return pd.DataFrame()
            df = pd.read_html(lines, skiprows=[0])[0]
            df.columns = rv.MAR_SZ_HZ_COLS
            df['opDate'] = date
        except Exception as e:
            print(e)
        else:
            return df
    raise IOError(ct.NETWORK_URL_ERROR_MSG)
项目:TuShare    作者:andyzsf    | 项目源码 | 文件源码
def _parase_fq_factor(code, start, end):
    symbol = _code_to_symbol(code)
    request = Request(ct.HIST_FQ_FACTOR_URL%(ct.P_TYPE['http'],
                                             ct.DOMAINS['vsf'], symbol))
    text = urlopen(request, timeout=10).read()
    text = text[1:len(text)-1]
    text = text.decode('utf-8') if ct.PY3 else text
    text = text.replace('{_', '{"')
    text = text.replace('total', '"total"')
    text = text.replace('data', '"data"')
    text = text.replace(':"', '":"')
    text = text.replace('",_', '","')
    text = text.replace('_', '-')
    text = json.loads(text)
    df = pd.DataFrame({'date':list(text['data'].keys()), 'factor':list(text['data'].values())})
    df['date'] = df['date'].map(_fun_except) # for null case
    if df['date'].dtypes == np.object:
        df['date'] = df['date'].astype(np.datetime64)
    df = df.drop_duplicates('date')
    df['factor'] = df['factor'].astype(float)
    return df
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def weibo(content):
    st = get_st()
    add_weibo_url = 'https://m.weibo.cn/api/statuses/update'

    # ??????
    params = {
        'content':content,
        'st':st
    }

    params = parse.urlencode(params).encode('utf-8')
    req =request.Request(add_weibo_url,params,method="POST")
    res = request.urlopen(req)
    html = res.read().decode('utf-8')
    print(html)

#???????????
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def login():
    login_data = configparser.ConfigParser()
    login_data.read("user.ini") #????????user.ini????

    username = login_data.get("LoginInfo", "email")
    password = login_data.get("LoginInfo", "password")
    url = 'https://www.v2ex.com/signin'
    req = request.Request(url)
    res = request.urlopen(req)
    html = res.read().decode('gbk','ignore')  #?????????????
    soup = BeautifulSoup(html, 'html.parser', from_encoding='utf-8')
    inputs = soup.find_all('input')

    # ??????
    params = {
        inputs[1]["name"]: username,
        inputs[2]["name"]: password,
        inputs[3]["name"]: inputs[3]["value"],
        inputs[5]["name"]: inputs[5]["value"]
    }
    params = parse.urlencode(params).encode('utf-8')

    req = request.Request(url,params,method="POST") #????????????????
    res = request.urlopen(req)
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def daily():
    url = 'https://www.v2ex.com/mission/daily' #??????
    req = request.Request(url)
    res = request.urlopen(req)
    html = res.read().decode("utf-8")
    soup = BeautifulSoup(html, 'html.parser', from_encoding='utf-8') #???????????
    inputs = soup.find_all('input')

    try:
        daily_link = 'https://www.v2ex.com' + re.search("location.href = '(.*)';", inputs[1]['onclick']).group(1) #?????????
        if daily_link == 'https://www.v2ex.com/balance':
            print('?????????')
            return False

        req = request.Request(daily_link)
        res = request.urlopen(req)

        html = res.read().decode("utf-8")

        soup = BeautifulSoup(html, 'html.parser', from_encoding='utf-8')

        print(soup.find('div',class_="message").text)
    except Exception as e:
        print('??????')
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def getQrcode(uuid):
    print('Please scan the QR code.')

    url = 'https://login.weixin.qq.com/qrcode/' + uuid

    req = request.Request(url)
    res = request.urlopen(req)

    f = open(QRImagePath, 'wb')
    f.write(res.read())
    f.close()
    time.sleep(1)

    if sys.platform.find('darwin') >= 0:
        subprocess.call(['open', QRImagePath])
    elif sys.platform.find('linux') >= 0:
        subprocess.call(['xdg-open', QRImagePath])
    else:
        os.startfile(QRImagePath)
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def is_login():
    uuid = get_uuid()
    getQrcode(uuid)
    while True:
        url = 'https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid='+ uuid +'&tip=0&r=' + str(time.time()) + '&_=' + str(time.time())
        req = request.Request(url)
        res = request.urlopen(req)
        html = res.read().decode('utf-8')
        status = re.search("window.code=(.*);",html).group(1)
        if str(status) == '200':
            print('login success.')
            index_url = re.search("window.redirect_uri=\"(.*)\"",html).group(1)

            # closeQRImage
            if sys.platform.find('darwin') >= 0:  # for OSX with Preview
                os.system("osascript -e 'quit app \"Preview\"'")

            return index_url
            break
        elif str(status) == '201':
            print('confirm the login on the phone.')

#????????
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def getSKey(url):
    global globalData
    req = request.Request(url+'&fun=new&version=v2&lang=zh_CN')
    res = request.urlopen(req)
    html = res.read().decode('utf-8')
    globalData = {}

    globalData['wxuin'] = re.search("<wxuin>(.*)</wxuin>",html).group(1)
    globalData['skey'] = re.search("<skey>(.*)</skey>",html).group(1)
    globalData['wxsid'] = re.search("<wxsid>(.*)</wxsid>",html).group(1)
    globalData['pass_ticket'] = re.search("<pass_ticket>(.*)</pass_ticket>",html).group(1)

    webwxinit() #????,?????
    return globalData


#?????????
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def webwxstatusnotify():
    global globalData
    url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' %(globalData['pass_ticket'])
    params = {
        'BaseRequest':globalData['BaseRequest'],
        "Code":3,
        "FromUserName":globalData['UserName'],
        "ToUserName":globalData['UserName'],
        "ClientMsgId":math.floor(time.time()*1000)
    }
    params = json.dumps(params)
    params = bytes(params,'utf8')
    headers = {'Content-Type': 'application/json;charset=UTF-8'}
    req = request.Request(url,params,method="POST",headers=headers)
    res = request.urlopen(req)
    result = json.loads(res.read().decode('utf-8'))
    if result['BaseResponse']['Ret'] == 0:
        print('MsgID:%s' %(result['MsgID']))


#??????
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def webwxbatchgetcontact():
    global globalData
    url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxbatchgetcontact?type=ex&r=%s&lang=zh_CN&pass_ticket=%s' %(int(time.time()),globalData['pass_ticket'])
    params = {
        'BaseRequest':globalData['BaseRequest'],
    "Count":1,
    "List":[
            {
                "UserName":"@@a900dd2fe01da33c7d6b5db0b18eaeea678fa542570cd7dc2a9f7182605561de",
                "EncryChatRoomId":""
            }
        ]
    }
    params = json.dumps(params)
    params = bytes(params,'utf8')
    headers = {'Content-Type': 'application/json;charset=UTF-8'}
    req = request.Request(url,params,method="POST",headers=headers)
    res = request.urlopen(req)
    result = json.loads(res.read().decode('utf-8'))
    print(result)

#????
项目:Spider    作者:vincenth520    | 项目源码 | 文件源码
def sendMsg(content):
    global globalData
    url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?lang=zh_CN&pass_ticket=%s' %(globalData['pass_ticket'])
    ClientMsgId = math.floor(time.time()*1000)
    params = {
        'BaseRequest':globalData['BaseRequest'],
        "Msg":{
            'ClientMsgId':ClientMsgId,
            'Content':content,
            'FromUserName':globalData['UserName'],
            'LocalID':ClientMsgId,
            'ToUserName':"filehelper",
            'Type':1
        },
        'Scene':0
    }
    params = json.dumps(params)
    params = bytes(params,'utf8')
    headers = {'Content-Type': 'application/json;charset=UTF-8'}
    req = request.Request(url,params,method="POST",headers=headers)
    res = request.urlopen(req)
    result = json.loads(res.read().decode('utf-8'))
    print(result)

#????
项目:WORLD4py    作者:yamachu    | 项目源码 | 文件源码
def run(self):
        install.run(self)

        platform = self._get_platform()
        library_full_path = self._get_install_full_path(
            self._get_base_install_path(),
            self._LIBRARY_NAME[platform][1])

        get_latest_request = Request('https://github.com/yamachu/World/releases/latest',
                                     headers={'Accept': 'application/json'})
        get_latest_response = urlopen(get_latest_request)
        response_str = get_latest_response.read().decode('utf-8')
        response_json = json.loads(response_str)
        latest_version = response_json['tag_name']

        urlretrieve("{}/{}/{}".format(
            self._DOWNLOAD_BASE_URL,
            latest_version,
            self._LIBRARY_NAME[platform][0]), library_full_path)
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def upload(url, filename=None):
    from urllib.request import Request, urlopen
    from urllib.parse import urlsplit
    import shutil
    def getFilename(url,openUrl):
        if 'Content-Disposition' in openUrl.info():
            # If the response has Content-Disposition, try to get filename from it
            cd = dict([x.strip().split('=') if '=' in x else (x.strip(),'')
                                        for x in openUrl.info().split(';')])
            if 'filename' in cd:
                fname = cd['filename'].strip("\"'")
                if fname: return fname
        # if no filename was found above, parse it out of the final URL.
        return os.path.basename(urlsplit(openUrl.url)[2])
    r = urlopen(Request(url))
    success = None
    try:
        filename = filename or "/tmp/%s" % getFilename(url,r)
        with open(filename, 'wb') as f:
            shutil.copyfileobj(r,f)
        success = filename
    finally:
        r.close()
    return success
项目:DSMChatbot    作者:devArtoria    | 项目源码 | 文件源码
def present():
    url = 'http://api.openweathermap.org/data/2.5/weather?q=daejeon,kr&units=metric'
    service_key = '709f54e9062fdbadbe73863ff0ac30b5'

    queryParams = '&' + urlencode({quote_plus('APPID'): service_key})
    request = Request(url + queryParams)
    request.get_method = lambda: 'GET'
    response_body = (urlopen(request).read()).decode("utf-8")
    WeatherData = json.loads(response_body)

    # ??? ???? ?????. JSON ?? ? ??? ????? ??????
    weather = WeatherData['weather'][0]
    weather = weather['description']
    temp_min = WeatherData['main']['temp_min']
    temp_max = WeatherData['main']['temp_max']

    humidity = WeatherData['main']['humidity']
    temp = WeatherData['main']['temp']
    present_weather = [weather, temp, temp_max, temp_min, humidity]

    return present_weather
项目:DSMChatbot    作者:devArtoria    | 项目源码 | 文件源码
def week():
    url = "http://api.openweathermap.org/data/2.5/forecast?q=daejeon,kr&units=metric"
    service_key = '709f54e9062fdbadbe73863ff0ac30b5'

    queryParams = '&' + urlencode({quote_plus('APPID'): service_key})
    request = Request(url + queryParams)
    request.get_method = lambda: 'GET'
    response_body = (urlopen(request).read()).decode("utf-8")
    WeatherData = json.loads(response_body)
    day1 = WeatherData["list"][5]
    day2 = WeatherData["list"][12]
    day3 = WeatherData["list"][19]
    day4 = WeatherData["list"][26]
    day5 = WeatherData['list'][34]

    day1 = [day1['main']['temp'], day1['weather'][0]["description"]]
    day2 = [day2['main']['temp'], day2['weather'][0]["description"]]
    day3 = [day3['main']['temp'], day3['weather'][0]["description"]]
    day4 = [day4['main']['temp'], day4['weather'][0]["description"]]
    day5 = [day5['main']['temp'], day5['weather'][0]["description"]]

    days = [day1, day2, day3, day4, day5]

    return days
项目:freesound    作者:iwkse    | 项目源码 | 文件源码
def request(cls, uri, params={}, client=None, wrapper=FreesoundObject, method='GET',data=False):
        p = params if params else {}
        url = '%s?%s' % (uri, urlencode(p)) if params else uri
        d = urllib.urlencode(data) if data else None
        headers = {'Authorization':client.header}
        req = Request(url,d,headers)
        try:
            f = urlopen(req)
        except HTTPError as e:
            resp = e.read()
            if e.code >= 200 and e.code < 300:
                return resp
            else:
                return FreesoundException(e.code, json.loads(resp.decode("utf-8")))
        resp = f.read()
        f.close()
        result = None
        try:
            result = json.loads(resp.decode("utf-8"))
        except:
            raise FreesoundException(0,"Couldn't parse response")
        if wrapper:
            return wrapper(result,client)
        return result
项目:imcsdk    作者:CiscoUcs    | 项目源码 | 文件源码
def __create_request(self, uri, data=None):
        """
        Internal method to create http/https web request

        Args:
            uri (str): protocol://web_address:port
            data (str): data to send over web request

        Returns:
            web request object
        """

        request_ = urllib2.Request(url=uri, data=data)
        headers = self.__headers
        for header in headers:
            request_.add_header(header, headers[header])
        return request_
项目:Optimus    作者:ironmussa    | 项目源码 | 文件源码
def download(self, data_loader):
        display_name = self.data_def["displayName"]
        bytes_downloaded = 0
        if "path" in self.data_def:
            path = self.data_def["path"]
        else:
            url = self.data_def["url"]
            req = Request(url, None, self.headers)
            print("Downloading '{0}' from {1}".format(display_name, url))
            with tempfile.NamedTemporaryFile(delete=False) as f:
                bytes_downloaded = self.write(urlopen(req), f)
                path = f.name
                self.data_def["path"] = path = f.name
        if path:
            try:
                if bytes_downloaded > 0:
                   print("Downloaded {} bytes".format(bytes_downloaded))
                print("Creating {1} DataFrame for '{0}'. Please wait...".format(display_name, 'pySpark'))
                return data_loader(path)
            finally:
                print("Successfully created {1} DataFrame for '{0}'".format(display_name, 'pySpark'))
项目:BitcoinExchangeFH    作者:Aurora-Team    | 项目源码 | 文件源码
def request(cls, url, verify_cert=True):
        """
        Web request
        :param: url: The url link
        :return JSON object
        """
        req = urlrequest.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
        # res = urlrequest.urlopen(url)
        if verify_cert:
            res = urlrequest.urlopen(
                req,
                timeout=RESTfulApiSocket.DEFAULT_URLOPEN_TIMEOUT)
        else:
            res = urlrequest.urlopen(
                req,
                context=ssl._create_unverified_context(),
                timeout=RESTfulApiSocket.DEFAULT_URLOPEN_TIMEOUT)
        try:
            res = json.loads(res.read().decode('utf8'))
            return res
        except:
            return {}
项目:PyCarGr    作者:Florents-Tselai    | 项目源码 | 文件源码
def __init__(self, search_page_url):
        self.search_page_url = search_page_url
        req = Request(
            search_page_url,
            data=None,
            headers={
                'User-Agent': UserAgent().chrome
            }
        )
        self.html = urlopen(req).read().decode('utf-8')
        self.soup = BeautifulSoup(self.html, 'html.parser')
        self.num_results = None
        for f in self.soup.find_all('strong'):
            if '????????' in f.text:
                if f.text.split()[0].isdigit():
                    self.num_results = int(f.text.split()[0])
项目:twittershade    作者:nicolavic98    | 项目源码 | 文件源码
def follow_redirects(link, sites= None):
    """Follow directs for the link as long as the redirects are on the given
    sites and return the resolved link."""
    def follow(url):
        return sites == None or urlparse.urlparse(url).hostname in sites

    class RedirectHandler(urllib2.HTTPRedirectHandler):
        def __init__(self):
            self.last_url = None
        def redirect_request(self, req, fp, code, msg, hdrs, newurl):
            self.last_url = newurl
            if not follow(newurl):
                return None
            r = urllib2.HTTPRedirectHandler.redirect_request(
                self, req, fp, code, msg, hdrs, newurl)
            r.get_method = lambda : 'HEAD'
            return r

    if not follow(link):
        return link
    redirect_handler = RedirectHandler()
    opener = urllib2.build_opener(redirect_handler)
    req = urllib2.Request(link)
    req.get_method = lambda : 'HEAD'
    try:
        with contextlib.closing(opener.open(req,timeout=1)) as site:
            return site.url
    except:
        return redirect_handler.last_url if redirect_handler.last_url else link
项目:jubi-monitor    作者:fishmanwang    | 项目源码 | 文件源码
def __do_collect(coin, pk):
    """
    ????
    :param coin: 
    :return: 
    """
    url = depth_url.format(coin, random.random())
    req = request.Request(url=url, headers=headers)

    key = order_key_prefix + str(pk) + "_" + coin
    try:
        if RedisPool.conn.exists(key) == 1:
            print("exists : " + key)
            return

        time.sleep(0.2)
        with request.urlopen(req, timeout=3, context=context) as resp:
            d = resp.read().decode()
            if RedisPool.conn.set(key, d, ex=3600, nx=True) == 1:
                RedisPool.conn.rpush(order_coll_queue, key)
    except:
        exstr = traceback.format_exc()
        logger.warn(exstr)
        logger.warn("collect {} error".format(key))
        time.sleep(2)  # ???????????2s