Python aiohttp 模块,request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用aiohttp.request()

项目:Pancake-Cogs    作者:UltimatePancake    | 项目源码 | 文件源码
def moneyconvert(self, amount: float, base: str, to: str):
        """Currency converter
        Set the amount, currency FROM (base) and currency TO 
        Available currencies for conversion:
        AUD BGN BRL CAD CHF CNY CZK DKK EUR GBP HKD HRK HUF IDR ILS INR JPY KRW MXN MYR NOK NZD PHP PLN RON RUB SEK SGD THB TRY USD ZAR
        ***WARNING***
        Conversion may not be exact"""
        if base.upper() not in self.currencies or to.upper() not in self.currencies:
            await self.bot.say('One or both of the currencies selected are invalid')
        else:
            api = 'http://api.fixer.io/latest?base={}'.format(base.upper())
            async with aiohttp.request("GET", api) as r:
                result = await r.json()
                rate = result['rates'][to.upper()]
                converted_amount = amount * rate
                pre_conv = '{0:.2f}'.format(amount)
                post_conv = '{0:.2f}'.format(converted_amount)
                await self.bot.say('`{} {} = {} {}`'.format(base.upper(), pre_conv, to.upper(), post_conv))
项目:chinabond    作者:muxuezi    | 项目源码 | 文件源码
def get_async(ym, x):
    '''
    ??GET????xls????
    parameters
    ==========
    ym : str
        ??
    x : dict
        ???????
    '''
    with (await sem):
        name, sid = x['name'], x['sid']
        url = 'http://www.chinabond.com.cn/DownLoadxlsx?sId={}&sBbly={}&sMimeType=4'.format(
            sid, ym)
        outfilename = 'data/{}{}{}.xls'.format(sid, name, ym)
        logging.info('downloading %s', outfilename)
        response = await aiohttp.request('GET', url, headers=headers)
        with closing(response), open(outfilename, 'wb') as file:
            while True:  # ????
                chunk = await response.content.read(1024)
                if not chunk:
                    break
                file.write(chunk)
        logging.info('done %s', outfilename)
项目:trellio    作者:artificilabs    | 项目源码 | 文件源码
def send_http_request(self, app: str, service: str, version: str, method: str, entity: str, params: dict):
        """
        A convenience method that allows you to send a well formatted http request to another service
        """
        host, port, node_id, service_type = self._registry_client.resolve(service, version, entity, HTTP)

        url = 'http://{}:{}{}'.format(host, port, params.pop('path'))

        http_keys = ['data', 'headers', 'cookies', 'auth', 'allow_redirects', 'compress', 'chunked']
        kwargs = {k: params[k] for k in http_keys if k in params}

        query_params = params.pop('params', {})

        if app is not None:
            query_params['app'] = app

        query_params['version'] = version
        query_params['service'] = service

        response = yield from aiohttp.request(method, url, params=query_params, **kwargs)
        return response
项目:trellio    作者:artificilabs    | 项目源码 | 文件源码
def _request_sender(self, packet: dict):
        """
        Sends a request to a server from a ServiceClient
        auto dispatch method called from self.send()
        """
        node_id = self._get_node_id_for_packet(packet)
        client_protocol = self._client_protocols.get(node_id)
        if node_id and client_protocol:
            if client_protocol.is_connected():
                packet['to'] = node_id
                client_protocol.send(packet)
                return True
            else:
                self._logger.error('Client protocol is not connected for packet %s', packet)
                raise ClientDisconnected()
        else:
            # No node found to send request
            self._logger.error('Out of %s, Client Not found for packet %s, restarting server...',
                               self._client_protocols.keys(), packet)
            raise ClientNotFoundError()
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


# BEGIN FLAGS2_ASYNCIO_EXECUTOR
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def http_get(url):
    res = yield from aiohttp.request('GET', url)
    if res.status == 200:
        ctype = res.headers.get('Content-type', '').lower()
        if 'json' in ctype or url.endswith('json'):
            data = yield from res.json()  # <1>
        else:
            data = yield from res.read()  # <2>
        return data

    elif res.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.errors.HttpProcessingError(
            code=res.status, message=res.reason,
            headers=res.headers)
项目:utils    作者:Ctrlsman    | 项目源码 | 文件源码
def do_request():
    proxy_url = 'http://198.23.237.213:1080'  # your proxy address
    response = yield from aiohttp.request(
        'GET', 'http://google.com',
        proxy=proxy_url,
    )
    return response
项目:hangoutsbot    作者:das7pad    | 项目源码 | 文件源码
def _send_to_external_chat(self, bot, event, config):
        if event.from_bot:
            # don't send my own messages
            return

        conversation_id = event.conv_id
        conversation_text = event.text

        user_id = event.user_id

        url = config["HUBOT_URL"] + conversation_id
        payload = {"from" : str(user_id.chat_id), "message" : conversation_text}
        headers = {'content-type': 'application/json'}

        connector = aiohttp.TCPConnector(verify_ssl=False)
        asyncio.ensure_future(
            aiohttp.request('post', url, data=json.dumps(payload),
                            headers=headers, connector=connector)
        )
项目:ax-cogs    作者:Aioxas    | 项目源码 | 文件源码
def _cookie(self):
        """Retrieves a random fortune cookie fortune."""
        regex = ["class=\"cookie-link\">([^`]*?)<\/a>", "<p>([^`]*?)<\/p>",
                 "(?:\\\\['])", "<strong>([^`]*?)<\/strong>",
                 "<\/strong><\/a>([^`]*?)<br>",
                 "3\)<\/strong><\/a>([^`]*?)<\/div>"]
        url = "http://www.fortunecookiemessage.com"
        await self.file_check()
        async with aiohttp.request("GET", url, headers={"encoding": "utf-8"}) as resp:
            test = str(await resp.text())
            fortune = re.findall(regex[0], test)
            fortest = re.match("<p>", fortune[0])
            if fortest is not None:
                fortune = re.findall(regex[1], fortune[0])
            title = re.findall(regex[3], test)
            info = re.findall(regex[4], test)
            info[0] = html.unescape(info[0])
            dailynum = re.findall(regex[5], test)
            self.fortune_process(fortune[0])
            await self.bot.say("Your fortune is:")
            await self.bot.upload("data/horoscope/cookie-edit.png")
            await self.bot.say("\n" + title[1] +
                               info[1] + "\n" + title[2] + dailynum[0])
            os.remove("data/horoscope/cookie-edit.png")
项目:ax-cogs    作者:Aioxas    | 项目源码 | 文件源码
def _font(self, url: str=None):
        """Allows you to set the font that the fortune cookies are shown in.
           Only accepts ttf."""
        option = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0)'
                  ' Gecko/20100101 Firefox/40.1'}

        if url is None :
            url = "https://cdn.discordapp.com/attachments/218222973557932032/240223136447070208/FortuneCookieNF.ttf"
            if os.is_file("data/horoscope/FortuneCookieNF.ttf"):
                return
            else:
                async with aiohttp.request("GET", url, headers=option) as resp:
                    test = await resp.read()
                    with open("data/horoscope/FortuneCookieNF.ttf", "wb") as f:
                        f.write(test)
        elif not url.endswith("ttf"):
            await self.bot.say("This is not a .ttf font, please use a .ttf font. Thanks")
            return
        await self.bot.say("Font has been saved")
项目:ax-cogs    作者:Aioxas    | 项目源码 | 文件源码
def _strawpoll(self, ctx, *, question, options=None):
        """Makes a poll based on questions and choices or options. must be divided by "; "
            Examples:
            [p]strawpoll What is this person?; Who is this person?; Where is this person?; When is this person coming?
            [p]strawpoll What; Who?; Where?; When?; Why?"""
        options_list = question.split('; ')
        title = options_list[0]
        options_list.remove(title)
        if len(options_list) < 2:
            await self.bot.say("You need to specify 2 or more options")
        else:
            normal = {"title": title, "options": options_list}
            request = dict(normal, **self.settings)
            async with aiohttp.request('POST', 'http://strawpoll.me/api/v2/polls',
                                       headers={'content-type': 'application/json'},
                                       data=json.dumps(request)) as resp:
                test = await resp.content.read()
                test = json.loads(test.decode())
                sid = test["id"]
                await self.bot.say("Here's your strawpoll link: http://strawpoll.me/{}".format(sid))
项目:cockatiel    作者:raphaelm    | 项目源码 | 文件源码
def handle_request(self, message, payload):
        url = message.path

        logger.info('{0} {1}'.format(message.method, url))

        if message.method in ('POST', 'PUT', 'PATCH'):
            data = yield from payload.read()
        else:
            data = None

        message, data = self.intercept_request(message, data)
        if not message:
            return

        response = yield from aiohttp.request(message.method, url, headers=message.headers,
                                              data=data)
        response_content = yield from response.content.read()

        response, response_content = self.intercept_response(response, response_content)
        yield from self.response_to_proxy_response(response, response_content)
项目:Books_SourceCode    作者:activeion    | 项目源码 | 文件源码
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


# BEGIN FLAGS2_ASYNCIO_EXECUTOR
项目:Books_SourceCode    作者:activeion    | 项目源码 | 文件源码
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


# BEGIN FLAGS2_ASYNCIO_EXECUTOR
项目:repo-checker    作者:1dot75cm    | 项目源码 | 文件源码
def get_page(self, _url):
        ''' ????????
        return str '''

        header = { 'Accept-Encoding': 'gzip' }
        header['User-Agent'] = self.ualist[random.randint(0, len(self.ualist)-1)]
        if opts['user_agent']: header['User-Agent'] = opts['user_agent']

        with (yield from semaphore):
            response = yield from aiohttp.request('GET', _url, headers = header)
            page = yield from response.read()

        try:
            if self.url_type == "2": return "None Content"
            if self.url_type == "4": return gzip.decompress(page).decode('gb2312').encode('utf-8')
            else:                    return gzip.decompress(page)
        except OSError:
            return page
项目:PairApi    作者:Louis-me    | 项目源码 | 文件源码
def get(self, url, param):
        data = {}
        _url = self.req["protocol"] + self.req["host"] + ":" + str(self.req["port"]) + url
        print(_url +" get?????:"+str(param))
        try:
            response = yield from aiohttp.request("GET", _url, headers=self.req["header"], params=param)
            string = (yield from response.read()).decode('utf-8')
            if response.status == 200:
                data = json.loads(string)
            else:
                print("data fetch failed for")
                print(response.content, response.status)
            data["status_code"] = response.status
            print(data)
        except asyncio.TimeoutError:
            print("????")
        except UnicodeDecodeError:
            print("?????")
        return data
项目:PairApi    作者:Louis-me    | 项目源码 | 文件源码
def post(self,url, param):
        data = {}
        _url = self.req["protocol"] + self.req["host"] + ':' + str(self.req["port"]) + url
        print(_url + " post?????:" + str(param))
        requests.post(_url,files=None, data=json.dumps(param),  headers=self.req["header"])
        response = yield from aiohttp.request('POST', _url, data=json.dumps(param), headers=self.req["header"])
        string = (yield from response.read()).decode('utf-8')
        if response.status == 200:
            data = json.loads(string)
        else:
            print("data fetch failed for")
            print(response.content, response.status)
        data["status_code"] = response.status
        print(data)

        return data
项目:trellio    作者:artificilabs    | 项目源码 | 文件源码
def ping_coroutine(self, payload=None):
        try:
            res = yield from request('get', self._url)
            if res.status == 200:
                self.pong_received(payload=payload)
                res.close()
        except Exception:
            self.logger.exception('Error while ping')
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_aiohttp(self):
        try:
            import aiohttp
        except ImportError:
            raise SkipTest("Requires aiohttp")
        from aiohttp import web

        zmq.asyncio.install()

        @asyncio.coroutine
        def echo(request):
            print(request.path)
            return web.Response(body=str(request).encode('utf8'))

        @asyncio.coroutine
        def server(loop):
            app = web.Application(loop=loop)
            app.router.add_route('GET', '/', echo)

            srv = yield from loop.create_server(app.make_handler(),
                                                '127.0.0.1', 8080)
            print("Server started at http://127.0.0.1:8080")
            return srv

        @asyncio.coroutine
        def client():
            push, pull = self.create_bound_pair(zmq.PUSH, zmq.PULL)

            res = yield from aiohttp.request('GET', 'http://127.0.0.1:8080/')
            text = yield from res.text()
            yield from push.send(text.encode('utf8'))
            rcvd = yield from pull.recv()
            self.assertEqual(rcvd.decode('utf8'), text)

        loop = asyncio.get_event_loop()
        loop.run_until_complete(server(loop))
        print("servered")
        loop.run_until_complete(client())
项目:Software-Architecture-with-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def callback(self, channel, data):
        """ Callback method """

        # The data is a URL
        url = data
        # We return the response immediately
        print('Fetching URL',url,'...')
        future = aiohttp.request('GET', url)
        self.futures.append(future)

        return future
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def fetch (url):
    r = yield from aiohttp.request ('GET', url)
    print ('%s %s' % (r.status, r.reason), end = " ")
    return (yield from r.read ())
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def fetch (url):
    r = yield from aiohttp.request ('GET', url)
    print ('%s %s' % (r.status, r.reason), end = " ")
    return (yield from r.read ())
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def get_flag(base_url, cc): # <2>
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def get_flag(base_url, cc): # <2>
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    with closing(await aiohttp.request('GET', url)) as resp:
        if resp.status == 200:
            image = await resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    res = yield from aiohttp.request('GET', url)
    image = yield from res.read()
    return image
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def download(url):
    response = yield from aiohttp.request('GET', url)
    for k, v in response.items():
        print('{}: {}'.format(k, v[:80]))

    data = yield from response.read()
    print('\nReceived {} bytes.\n'.format(len(data)))
项目:pushka    作者:rudyryk    | 项目源码 | 文件源码
def _request(self, url_, method_, params=None, data=None, **kwargs):
        """Perform request via `aiohttp.request`_.

        .. _aiohttp.request: http://aiohttp.readthedocs.org/en/v0.9.2/client.html#make-a-request
        """
        response = yield from aiohttp.request(method_.lower(), url_,
                                              params=params, data=data, loop=self._loop,
                                              **utils.norm_aiohttp_kwargs(**kwargs))
        return {
            'code': response.status,
            'body': (yield from response.text()),
            'error': None, # TODO: how errors statues are described in aiohttp?
        }
项目:hangoutsbot    作者:das7pad    | 项目源码 | 文件源码
def _handle_forwarding(bot, event, command):
    """Handle message forwarding"""
    # Test if message forwarding is enabled
    if not bot.get_config_suboption(event.conv_id, 'forwarding_enabled'):
        return

    forward_to_list = bot.get_config_suboption(event.conv_id, 'forward_to')
    if forward_to_list:
        logger.debug("{}".format(forward_to_list))

        for _conv_id in forward_to_list:
            html_identity = "<b><a href='https://plus.google.com/u/0/{}/about'>{}</a></b><b>:</b> ".format(event.user_id.chat_id, event.user.full_name)

            html_message = event.text

            if not event.conv_event.attachments:
                yield from bot.coro_send_message( _conv_id,
                                                  html_identity + html_message )

            for link in event.conv_event.attachments:

                filename = os.path.basename(link)
                r = yield from aiohttp.request('get', link)
                raw = yield from r.read()
                image_data = io.BytesIO(raw)
                image_id = None

                try:
                    image_id = yield from bot._client.upload_image(image_data, filename=filename)
                    if not html_message:
                        html_message = "(sent an image)"
                    yield from bot.coro_send_message( _conv_id,
                                                      html_identity + html_message,
                                                      image_id=image_id )

                except AttributeError:
                    yield from bot.coro_send_message( _conv_id,
                                                      html_identity + html_message + " " + link )
项目:hangoutsbot    作者:das7pad    | 项目源码 | 文件源码
def image_upload_single(image_uri, bot):
    logger.info("getting {}".format(image_uri))
    filename = os.path.basename(image_uri)
    r = yield from aiohttp.request('get', image_uri)
    raw = yield from r.read()
    image_data = io.BytesIO(raw)
    image_id = yield from bot._client.upload_image(image_data, filename=filename)
    return image_id
项目:web_develop    作者:dongweiming    | 项目源码 | 文件源码
def handle(request):
    coroutines = [aiohttp.request('get', url) for url in REQEUST_URLS]

    results = await asyncio.gather(*coroutines, return_exceptions=True)

    response_data = {
        url: not isinstance(result, Exception) and result.status == 200
        for url, result in zip(REQEUST_URLS, results)
    }

    body = json.dumps(response_data).encode('utf-8')
    return web.Response(body=body, content_type="application/json")
项目:Weeds    作者:seamile    | 项目源码 | 文件源码
def get(url, *args, **kwargs):
    url = url.split('#')[0]
    with (yield from sem):
        response = yield from aiohttp.request('GET', url, *args, **kwargs)
    body = yield from response.read_and_close()
    body = body.decode('utf-8')
    print('GET', url)
    return body
项目:Weeds    作者:seamile    | 项目源码 | 文件源码
def fetch(self, url, retry=3):
        '''?? URL?????? HTML ??'''
        try:
            start_time = self.loop.time()
            response = await aiohttp.request('GET', url)
            time_used = self.loop.time() - start_time
        except (TimeoutError, aiohttp.ClientResponseError) as e:
            # ??
            if retry > 0:
                retry -= 1
                await asyncio.sleep(1)
                return await self.fetch(url, retry)
            else:
                time_used = self.loop.time() - start_time
                logger.error('USE %6.3f s STAT: 500 URL: %s  (%s)'
                             % (time_used,  url, e))
                return ''
        except Exception as e:
            time_used = self.loop.time() - start_time
            logger.error('USE %6.3f s STAT: 500 URL: %s  (%s)'
                         % (time_used, url, e))
            return ''

        if not (200 <= response.status < 300):
            logger.error('USE %6.3f s STAT: %s URL: %s'
                         % (time_used, response.status, url))

        # ?? html ??????
        body = await response.read()
        try:
            return body.decode('utf-8')
        except UnicodeDecodeError:
            try:
                return body.decode('gbk')
            except UnicodeDecodeError:
                return body
项目:ax-cogs    作者:Aioxas    | 项目源码 | 文件源码
def file_check(self):
        urls = ["https://images-2.discordapp.net/.eJwNwcENwyAMAMBdGABDCWCyDSKIoCYxwuZVdff27qPWvNSuTpHBO8DRudA8NAvN3Kp"
                "uRO2qeXTWhW7IIrmcd32EwQbjMCRMaJNxPmwILxcRg_9Da_yWYoQ3dV5z6fE09f0BC6EjAw.B0sII_QLbL9kJo6Zbb4GuO4MQNw",
                "https://cdn.discordapp.com/attachments/218222973557932032/240223136447070208/FortuneCookieNF.ttf"]
        option = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0)'
                  ' Gecko/20100101 Firefox/40.1'}
        if os.path.exists("data/horoscope/cookie.png"):
            async with aiohttp.request("GET", urls[0], headers=option) as resp:
                test = await resp.read()
                meow = False
                with open("data/horoscope/cookie.png", "rb") as e:
                    if len(test) != len(e.read()):
                        meow = True
                if meow:
                    with open("data/horoscope/cookie.png", "wb") as f:
                        f.write(test)
        elif not os.path.exists("data/horoscope/cookie.png"):
            async with aiohttp.request("GET", urls[0], headers=option) as resp:
                test = await resp.read()
                with open("data/horoscope/cookie.png", "wb") as f:
                    f.write(test)
        if not os.path.exists("data/horoscope/FortuneCookieNF.ttf"):
            async with aiohttp.request("GET", urls[1], headers=option) as resp:
                test = await resp.read()
                with open("data/horoscope/FortuneCookieNF.ttf", "wb") as f:
                    f.write(test)
项目:ax-cogs    作者:Aioxas    | 项目源码 | 文件源码
def add(self, ctx, name, url):
        """Allows you to add emotes to the emote list
        [p]emotes add pan http://i.imgur.com/FFRjKBW.gifv"""
        server = ctx.message.server
        name = name.lower()
        option = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 '
                                '(KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36'}
        if server.id not in self.servers:
            # default off
            self.servers[server.id] = dict({"status": False})
            if "emotes" not in self.servers[server.id]:
                self.servers[server.id]["emotes"] = dict()
            dataIO.save_json(self.data_path, self.servers)
        if not url.endswith((".gif", ".gifv", ".png")):
            await self.bot.say("Links ending in .gif, .png, and .gifv are the only ones accepted."
                               "Please try again with a valid emote link, thanks.")
            return
        if name in self.servers[server.id]["emotes"]:
            await self.bot.say("This keyword already exists, please use another keyword.")
            return
        if url.endswith(".gifv"):
            url = url.replace(".gifv", ".gif")
        try:
            await self.bot.say("Downloading {}.".format(name))
            async with aiohttp.request("GET", url, headers=option) as r:
                emote = await r.read()
                with open(self.emote+"{}.{}".format(name, url[-3:]), 'wb') as f:
                    f.write(emote)
                await self.bot.say("Adding {} to the list.".format(name))
                self.servers[server.id]["emotes"][name] = "{}.{}".format(name, url[-3:])
                self.servers[server.id]["emotes"]
            dataIO.save_json(self.data_path, self.servers)
            await self.bot.say("{} has been added to the list".format(name))
        except Exception as e:
            print(e)
            await self.bot.say("It seems your url is not valid,"
                               " please make sure you are not typing names with spaces as they are and then the url."
                               " If so, do [p]emotes add name_with_spaces url")
项目:ax-cogs    作者:Aioxas    | 项目源码 | 文件源码
def _quotes(self, ctx, *, author: str):
        """Retrieves a specified number of quotes from a specified author. Max number of quotes at a time is 5.
        Examples:
        [p]quotes Morgan Freeman; 5
        [p]quotes Margaret Thatcher; 2"""
        regex = "title=\"view quote\">([^`]*?)<\/a>"
        url = 'http://www.brainyquote.com/quotes/authors/'
        try:
            author = author.split('; ')
            title = author[0]
            number = int(author[1])
            if number > 5:
                number = 5
                await self.bot.reply("Heck naw brah. 5 is max. Any more and you get killed.")
            url = url + title.lower()[0] + "/" + title.lower().replace(" ", "_") + ".html"
            async with aiohttp.request("GET", url) as resp:
                test = str(await resp.text())
                quote_find = list(set(re.findall(regex, test)))
            for i in range(number):
                random_quote = choice(quote_find)
                quote_find.remove(random_quote)
                while random_quote == title:
                    random_quote = choice(quote_find)
                random_quote = random_quote.replace("&#39;", "'") if "&#39;" in random_quote else random_quote
                await self.bot.say(box(random_quote))
                await asyncio.sleep(1)

        except IndexError:
            await self.bot.say("Your search is not valid, please follow the examples."
                               "Make sure the names are correctly written\n"
                               "[p]quotes Margaret Thatcher; 5\n[p]quotes Morgan Freeman; 5")
项目:Pancake-Cogs    作者:UltimatePancake    | 项目源码 | 文件源码
def dadjoke(self):
        """Gets a random dad joke."""
        api = 'https://icanhazdadjoke.com/'
        async with aiohttp.request('GET', api, headers={'Accept': 'text/plain'}) as r:
            result = await r.text()
            await self.bot.say('`' + result + '`')
项目:Pancake-Cogs    作者:UltimatePancake    | 项目源码 | 文件源码
def typeracer(self, user: str):
        """Get user stats from typeracer API"""
        api = 'http://data.typeracer.com/users?id=tr:{}'.format(user)
        async with aiohttp.request("GET", api) as r:
            if r.status == 200:
                result = await r.json()

                random_colour = int("0x%06x" % random.randint(0, 0xFFFFFF), 16)

                last_scores = '\n'.join(str(int(x)) for x in result['tstats']['recentScores'])

                embed = discord.Embed(colour=random_colour)
                embed.set_author(name=result['name'])
                embed.add_field(name='Country', value=':flag_{}:'.format(result['country']))
                embed.add_field(name='Level', value=result['tstats']['level'])
                embed.add_field(name='Wins', value=result['tstats']['gamesWon'])
                embed.add_field(name='Recent WPM', value=int(result['tstats']['recentAvgWpm']))
                embed.add_field(name='Average WPM', value=int(result['tstats']['wpm']))
                embed.add_field(name='Best WPM', value=int(result['tstats']['bestGameWpm']))
                embed.add_field(name='Recent scores', value=last_scores)
                embed.set_footer(text='typeracer.com')
                embed.url = 'http://play.typeracer.com/'

                await self.bot.say(embed=embed)
            else:
                await self.bot.say('`Unable to retieve stats for user {}`'.format(user))
项目:jamdb    作者:CenterForOpenScience    | 项目源码 | 文件源码
def _authenticate(self, data):
        async with aiohttp.request('GET', '{}/oauth2/profile'.format(settings.OSF['ACCOUNTS_URL']), headers={
            'Authorization': 'Bearer {}'.format(data.get('access_token'))
        }) as resp:
            if resp.status != 200:
                raise exceptions.Unauthorized()
            return 'user', 'osf', (await resp.json())['id']


# TODO Fuzz test me
项目:jamdb    作者:CenterForOpenScience    | 项目源码 | 文件源码
def send_remote(self, url, data, headers=None, callback=None):
        headers = headers or {}
        if not self.state.should_try():
            message = self._get_log_message(data)
            self.error_logger.error(message)
            return

        async def _future():
            resp = await aiohttp.request('POST', url, data=data, headers=headers)
            await resp.release()

        asyncio.ensure_future(_future())
项目:jsonapi-client    作者:qvantel    | 项目源码 | 文件源码
def http_request(self, http_method: str, url: str, send_json: dict,
                     expected_statuses: List[str]=None) -> Tuple[int, dict, str]:
        """
        Internal use.

        Method to make PATCH/POST requests to server using requests library.
        """
        self.assert_sync()
        import requests
        logger.debug('%s request: %s', http_method.upper(), send_json)
        expected_statuses = expected_statuses or HttpStatus.ALL_OK

        response = requests.request(http_method, url, json=send_json,
                                    headers={'Content-Type': 'application/vnd.api+json'},
                                    **self._request_kwargs)

        if response.status_code not in expected_statuses:
            raise DocumentError(f'Could not {http_method.upper()} '
                                f'({response.status_code}): '
                                f'{error_from_response(response)}',
                                errors={'status_code': response.status_code},
                                response=response,
                                json_data=send_json)

        return response.status_code, response.json() \
            if response.content \
            else {}, response.headers.get('Location')
项目:jsonapi-client    作者:qvantel    | 项目源码 | 文件源码
def http_request_async(
                self,
                http_method: str,
                url: str,
                send_json: dict,
                expected_statuses: List[str]=None) \
            -> Tuple[int, dict, str]:
        """
        Internal use. Async version.

        Method to make PATCH/POST requests to server using aiohttp library.
        """

        self.assert_async()
        logger.debug('%s request: %s', http_method.upper(), send_json)
        expected_statuses = expected_statuses or HttpStatus.ALL_OK
        content_type = '' if http_method == HttpMethod.DELETE else 'application/vnd.api+json'
        async with self._aiohttp_session.request(
                http_method, url, data=json.dumps(send_json),
                headers={'Content-Type':'application/vnd.api+json'},
                **self._request_kwargs) as response:

            if response.status not in expected_statuses:
                raise DocumentError(f'Could not {http_method.upper()} '
                                    f'({response.status}): '
                                    f'{error_from_response(response)}',
                                    errors={'status_code': response.status},
                                    response=response,
                                    json_data=send_json)

            response_json = await response.json(content_type=content_type)

            return response.status, response_json or {}, response.headers.get('Location')
项目:little-python    作者:JeffyLu    | 项目源码 | 文件源码
def get_body(url):
    response = await aiohttp.request('GET', url)
    return await response.read()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def request(self, method, url, **kwargs):
        """Perform HTTP request."""
        return _RequestContextManager(self._request(method, url, **kwargs))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def get(self, url, *, allow_redirects=True, **kwargs):
        """Perform HTTP GET request."""
        return _RequestContextManager(
            self._request(hdrs.METH_GET, url,
                          allow_redirects=allow_redirects,
                          **kwargs))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def options(self, url, *, allow_redirects=True, **kwargs):
        """Perform HTTP OPTIONS request."""
        return _RequestContextManager(
            self._request(hdrs.METH_OPTIONS, url,
                          allow_redirects=allow_redirects,
                          **kwargs))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def head(self, url, *, allow_redirects=False, **kwargs):
        """Perform HTTP HEAD request."""
        return _RequestContextManager(
            self._request(hdrs.METH_HEAD, url,
                          allow_redirects=allow_redirects,
                          **kwargs))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def post(self, url, *, data=None, **kwargs):
        """Perform HTTP POST request."""
        return _RequestContextManager(
            self._request(hdrs.METH_POST, url,
                          data=data,
                          **kwargs))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def patch(self, url, *, data=None, **kwargs):
        """Perform HTTP PATCH request."""
        return _RequestContextManager(
            self._request(hdrs.METH_PATCH, url,
                          data=data,
                          **kwargs))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def delete(self, url, **kwargs):
        """Perform HTTP DELETE request."""
        return _RequestContextManager(
            self._request(hdrs.METH_DELETE, url,
                          **kwargs))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def get(url, **kwargs):
    warnings.warn("Use ClientSession().get() instead", DeprecationWarning)
    return request(hdrs.METH_GET, url, **kwargs)