Python urllib.request 模块,quote() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用urllib.request.quote()

项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_json(self,url):
        '''
        Retrieve data from the Veneer service at the given url path.

        url: Path to required resource, relative to the root of the Veneer service.
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        if self.protocol=='file':
            text = open(self.prefix+url+self.data_ext).read()
        else:
            conn = hc.HTTPConnection(self.host,port=self.port)
            conn.request('GET',quote(url+self.data_ext))
            resp = conn.getresponse()
            text = resp.read().decode('utf-8')
            #text = urlopen(self.base_url + quote(url+self.data_ext)).read().decode('utf-8')

        text = self._replace_inf(text)
        if PRINT_ALL:
            print(json.loads(text))
            print("")
        return json.loads(text)
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_csv(self,url):
        '''
        Retrieve data from the Veneer service, at the given url path, in CSV format.

        url: Path to required resource, relative to the root of the Veneer service.

        NOTE: CSV responses are currently only available for time series results
        '''
        if PRINT_URLS:
            print("*** %s ***" % (url))

        req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"})
        text = urlopen(req).read().decode('utf-8')

        result = utils.read_veneer_csv(text)
        if PRINT_ALL:
            print(result)
            print("")
        return result
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_json(self,url,**kwargs):
        if self.print_urls:
            print("*** %s ***" % (url))

        try:
            text = urlopen(self.base_url + quote(url)).read().decode('utf-8')
        except:
            self.log("Couldn't retrieve %s"%url)
            return None

        self.save_data(url[1:],bytes(text,'utf-8'),"json")

        if self.print_all:
            print(json.loads(text))
            print("")
        return json.loads(text)
项目:pyconjpbot    作者:pyconjp    | 项目源码 | 文件源码
def google_image(message, keywords):
    """
    google ????????????

    https://github.com/llimllib/limbo/blob/master/limbo/plugins/image.py
    """

    query = quote(keywords)
    searchurl = "https://www.google.com/search?tbm=isch&q={0}".format(query)

    # this is an old iphone user agent. Seems to make google return good results.
    useragent = "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_0 like Mac OS X; en-us) AppleWebKit/532.9 (KHTML, like Gecko) Versio  n/4.0.5 Mobile/8A293 Safari/6531.22.7"

    result = requests.get(searchurl, headers={"User-agent": useragent}).text
    images = list(map(unescape, re.findall(r"var u='(.*?)'", result)))

    if images:
        botsend(message, choice(images))
    else:
        botsend(message, "`{}` ???????????????".format(keywords))
项目:pyconjpbot    作者:pyconjp    | 项目源码 | 文件源码
def google_map(message, keywords):
    """
    google ?????????????

    https://github.com/llimllib/limbo/blob/master/limbo/plugins/map.py
    """
    query = quote(keywords)

    # Slack seems to ignore the size param
    #
    # To get google to auto-reasonably-zoom its map, you have to use a marker
    # instead of using a "center" parameter. I found that setting it to tiny
    # and grey makes it the least visible.
    url = "https://maps.googleapis.com/maps/api/staticmap?size=800x400&markers={0}&maptype={1}"
    url = url.format(query, 'roadmap')

    botsend(message, url)
    attachments = [{
        'pretext': '<http://maps.google.com/maps?q={}|????????>'.format(query),
        'mrkdwn_in': ["pretext"],
    }]
    botwebapi(message, attachments)
项目:spotify-downloader    作者:ritiek    | 项目源码 | 文件源码
def generate_search_url(song, viewsort=False):
    """ Generate YouTube search URL for the given song. """
    # urllib.request.quote() encodes URL with special characters
    song = quote(song)
    if viewsort:
        url = u"https://www.youtube.com/results?q={0}".format(song)
    else:
        url = u"https://www.youtube.com/results?sp=EgIQAQ%253D%253D&q={0}".format(song)

    return url
项目:veneer-py    作者:flowmatters    | 项目源码 | 文件源码
def retrieve_resource(self,url,ext):
        if self.print_urls:
            print("*** %s ***" % (url))

        self.save_data(url[1:],urlopen(self.base_url+quote(url)).read(),ext,mode="b")

    # Process Run list and results
项目:decode    作者:deshima-dev    | 项目源码 | 文件源码
def youtube(keyword=None):
    """Open youtube.

    Args:
        keyword (optional): Search word.
    """
    if keyword is None:
        web.open('https://www.youtube.com/watch?v=L_mBVT2jBFw')
    else:
        web.open(quote('https://www.youtube.com/results?search_query={}'.format(keyword), RESERVED))
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def get_user(self, username):
        username_ = urllib2.quote(username)
        if six.PY3:
            username = base64.encodestring(username_.encode())[:-1]
            username = username.decode('utf-8')

        if six.PY2:
            username = base64.encodestring(username_)[:-1]

        return username
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def get_user(self, username):
        import pdb;pdb.set_trace()
        username_ = urllib2.quote(username)
        if six.PY3:
            username = base64.encodestring(username_.encode())[:-1].decode('utf-8')
        if six.PY2:
            username = base64.encodestring(username_)[:-1]

        return username
项目:163mooc_dumper    作者:wsdzl    | 项目源码 | 文件源码
def parse_videos(url):
    data = b'url=' + urllib.quote(url).encode('ascii')
    html = urllib.urlopen('http://yipeiwu.com/getvideo.html', data=data).read().decode('utf-8')
    name = _re_name.findall(html)
    if name:
        name = name[0]
    else:
        return False
    result = _re.findall(html)
    return name,result
项目:squeeze-alexa    作者:declension    | 项目源码 | 文件源码
def play_genres(self, genre_list, player_id=None):
        """Adds then plays a random mix of albums of specified genres"""
        gs = genre_list or []
        commands = (["playlist clear", "playlist shuffle 1"] +
                    ["playlist addalbum %s * *" % urllib.quote(genre)
                     for genre in gs if genre] +
                    ["play 2"])
        pid = player_id or self.cur_player_id
        return self._request(["%s %s" % (pid, com) for com in commands])
项目:squeeze-alexa    作者:declension    | 项目源码 | 文件源码
def playlist_play(self, path, player_id=None):
        """Play song / playlist immediately"""
        self.player_request("playlist play %s" % (urllib.quote(path)),
                            player_id=player_id)
项目:squeeze-alexa    作者:declension    | 项目源码 | 文件源码
def playlist_resume(self, name, resume=True, wipe=False, player_id=None):
        cmd = ("playlist resume %s noplay:%d wipePlaylist:%d"
               % (urllib.quote(name), int(not resume), int(wipe)))
        self.player_request(cmd, wait=False, player_id=player_id)
项目:pyconjpbot    作者:pyconjp    | 项目源码 | 文件源码
def google(message, keywords):
    """
    google ??????????

    https://github.com/llimllib/limbo/blob/master/limbo/plugins/google.py
    """

    if keywords == 'help':
        return

    query = quote(keywords)
    url = "https://encrypted.google.com/search?q={0}".format(query)
    soup = BeautifulSoup(requests.get(url).text, "html.parser")

    answer = soup.findAll("h3", attrs={"class": "r"})
    if not answer:
        botsend(message, "`{}` ???????????????".format(keywords))

    try:
        _, url = answer[0].a['href'].split('=', 1)
        url, _ = url.split('&', 1)
        botsend(message, unquote(url))
    except IndexError:
        # in this case there is a first answer without a link, which is a
        # google response! Let's grab it and display it to the user.
        return ' '.join(answer[0].stripped_strings)
项目:qal    作者:OptimalBPM    | 项目源码 | 文件源码
def xml_set_cdata(_node, _value, _lowercase=False):
    """Helper to set character data in an XML tree"""
    if _value is not None and _value != "":
        sec = Text()
        if _value is str:
            _value = quote(_value)
        if _lowercase:  # Force lowercase.
            sec.data = _value.lower()
        else:
            sec.data = _value

        _node.appendChild(sec)
项目:etlpy    作者:ferventdesert    | 项目源码 | 文件源码
def iriToUri(iri):
    parts= urlparse(iri)

    pp= [(parti,part) for parti, part in enumerate(parts)]
    res=[];
    for p in pp:
        res.append(p[1] if p[0] != 4 else quote(p[1] ))

    return urlunparse(res);
项目:WordQuery    作者:finalion    | 项目源码 | 文件源码
def _get_from_api(self, lang="en"):
        word = self.word
        baseurl = "https://od-api.oxforddictionaries.com/api/v1"
        app_id = "45aecf84"
        app_key = "bb36fd6a1259e5baf8df6110a2f7fc8f"
        headers = {"app_id": app_id, "app_key": app_key}

        word_id = urllib2.quote(word.lower().replace(" ", "_"))
        url = baseurl + "/entries/" + lang + "/" + word_id
        url = urllib2.Request(url, headers=headers)
        response = json.loads(urllib2.urlopen(url).read())

        return response["results"]
项目:weiboapi    作者:lawRossi    | 项目源码 | 文件源码
def quote_base64_encode(text):
    """
    Quoting and encoding string using base64 encoding.
    """
    quote_text = quote(text)
    quote_text = base64.b64encode(bytearray(quote_text, 'utf-8'))
    return quote(quote_text)
项目:photoSync    作者:soCzech    | 项目源码 | 文件源码
def escape(s):
    return quote(s, safe="~")
项目:gwrappy    作者:danielpoonwj    | 项目源码 | 文件源码
def _parse_object_name(object_name):
        if isinstance(object_name, list):
            object_name = quote(('/'.join(object_name)))
        return object_name
项目:codewars_python    作者:staticor    | 项目源码 | 文件源码
def generate_link(user):
    return "http://www.codewars.com/{user}".format(user=request.quote(user))
项目:codewars_python    作者:staticor    | 项目源码 | 文件源码
def generate_link(user):
    return "http://www.codewars.com/users/{user}".format(user=request.quote(user))

# print(generate_link("matt c"))
项目:realtime-stock    作者:condereis    | 项目源码 | 文件源码
def __yahoo_request(query):
    """Request Yahoo Finance information.

    Request information from YQL.
    `Check <http://goo.gl/8AROUD>`_ for more information on YQL.
    """
    query = quote(query)
    url = 'https://query.yahooapis.com/v1/public/yql?q=' + query + \
        '&format=json&env=store://datatables.org/alltableswithkeys'

    response = urlopen(url).read()

    return json.loads(response.decode('utf-8'))['query']['results']
项目:realtime-stock    作者:condereis    | 项目源码 | 文件源码
def request_quotes(tickers_list, selected_columns=['*']):
    """Request Yahoo Finance recent quotes.

    Returns quotes information from YQL. The columns to be requested are
    listed at selected_columns. Check `here <http://goo.gl/8AROUD>`_ for more
    information on YQL.

    >>> request_quotes(['AAPL'], ['Name', 'PreviousClose'])
    {
        'PreviousClose': '95.60',
        'Name': 'Apple Inc.'
    }

    :param table: Table name.
    :type table: string
    :param tickers_list: List of tickers that will be returned.
    :type tickers_list: list of strings
    :param selected_columns: List of columns to be returned, defaults to ['*']
    :type selected_columns: list of strings, optional
    :returns: Requested quotes.
    :rtype: json
    :raises: TypeError, TypeError
    """
    __validate_list(tickers_list)
    __validate_list(selected_columns)
    query = 'select {cols} from yahoo.finance.quotes where symbol in ({vals})'
    query = query.format(
        cols=', '.join(selected_columns),
        vals=', '.join('"{0}"'.format(s) for s in tickers_list)
    )

    response = __yahoo_request(query)

    if not response:
        raise RequestError('Unable to process the request. Check if the ' +
                           'columns selected are valid.')

    if not type(response['quote']) is list:
        return [response['quote']]
    return response['quote']
项目:sbbot    作者:lamourj    | 项目源码 | 文件源码
def getStationsFromName(self, userInput):
        """
        Query that will return some stations suggestions according to user input
        """
        return self.reqhandler.sendrequest("/stations?query=" + urlqt(userInput))
项目:Music-Scraper    作者:srivatsan-ramesh    | 项目源码 | 文件源码
def main():
    """
    The entry point for the app. Called when music-scraper is typed in terminal.
    Starts the GUI and starts the scraping process after the input is given
    """
    curses.initscr()
    if curses.COLS < 80 or curses.LINES < 5:
        curses.endwin()
        print('Terminal\'s dimensions are too small')
        return

    process = CrawlerProcess({'LOG_ENABLED': False})

    def gui_input(screen):
        GUI.screen = screen
        curses.start_color()
        GUI.screen.keypad(1)
        curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN)
        GUI.high_light_text = curses.color_pair(1)
        GUI.normal_text = curses.A_NORMAL
        GUI.box = curses.newwin(curses.LINES, curses.COLS, 0, 0)
        GUI.message = GUI.get_input()

    curses.wrapper(gui_input)
    s = request.quote(GUI.message)

    MusicSpider.start_urls = [
        "http://www.google.com/search?q=" + s,
    ]
    process.crawl(MusicSpider)
    thread = GUIThread(process, start_gui)
    thread.start()
    process.start()
    if not GUI.gui_stopped:
        if len(GUI.strings) == 0:
            GUI.box.erase()
            GUI.box.addstr(1, 1, "No Results Found... Try with Some other keywords.", GUI.high_light_text)
            GUI.add_bottom_menus()
            GUI.screen.refresh()
            GUI.box.refresh()
        else:
            GUI.box.addstr(curses.LINES - 2, 1, "Completed Scraping !!", GUI.high_light_text)
            GUI.add_bottom_menus()
            GUI.screen.refresh()
            GUI.box.refresh()
项目:aws-lambda-es-cleanup    作者:cloudreach    | 项目源码 | 文件源码
def send_to_es(self, path, method="GET", payload={}):
        """Low-level POST data to Amazon Elasticsearch Service generating a Sigv4 signed request

        Args:
            path (str): path to send to ES
            method (str, optional): HTTP method default:GET
            payload (dict, optional): additional payload used during POST or PUT

        Returns:
            dict: json answer converted in dict

        Raises:
            #: Error during ES communication
            ES_Exception: Description
        """
        if not path.startswith("/"):
            path = "/" + path

        es_region = self.cfg["es_endpoint"].split(".")[1]

        # send to ES with exponential backoff
        retries = 0
        while retries < int(self.cfg["es_max_retry"]):
            if retries > 0:
                seconds = (2**retries) * .1
                # print('Waiting for %.1f seconds', seconds)
                time.sleep(seconds)

            req = AWSRequest(
                method=method,
                url="https://%s%s?pretty&format=json" % (self.cfg["es_endpoint"], quote(path)),
                data=payload,
                headers={'Host': self.cfg["es_endpoint"]})
            credential_resolver = create_credential_resolver(get_session())
            credentials = credential_resolver.load_credentials()
            SigV4Auth(credentials, 'es', es_region).add_auth(req)

            try:
                preq = req.prepare()
                session = Session()
                res = session.send(preq)
                if res.status_code >= 200 and res.status_code <= 299:
                    # print("%s %s" % (res.status_code, res.content))
                    return json.loads(res.content)
                else:
                    raise ES_Exception(res.status_code, res._content)

            except ES_Exception as e:
                if (e.status_code >= 500) and (e.status_code <= 599):
                    retries += 1  # Candidate for retry
                else:
                    raise  # Stop retrying, re-raise exception