Python cgi 模块,parse_qs() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cgi.parse_qs()

项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote):
    """like cgi.parse_qs, only with custom unquote function"""
    d = {}
    items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")]
    for item in items:
        try:
            k, v = item.split("=", 1)
        except ValueError:
            if strict_parsing:
                raise
            continue
        if v or keep_blank_values:
            k = unquote(k.replace("+", " "))
            v = unquote(v.replace("+", " "))
            if k in d:
                d[k].append(v)
            else:
                d[k] = [v]
    return d
项目:realtimedisplay    作者:SuperDARNCanada    | 项目源码 | 文件源码
def get_target(self, target_plugin, path):
        """
        Parses the path, extracts a token, and looks up a target
        for that token using the token plugin. Sets
        target_host and target_port if successful
        """
        # The files in targets contain the lines
        # in the form of token: host:port

        # Extract the token parameter from url
        args = parse_qs(urlparse(path)[4]) # 4 is the query from url

        if not 'token' in args or not len(args['token']):
            raise self.server.EClose("Token not present")

        token = args['token'][0].rstrip('\n')

        result_pair = target_plugin.lookup(token)

        if result_pair is not None:
            return result_pair
        else:
            raise self.server.EClose("Token '%s' not found" % token)
项目:barbieri-playground    作者:barbieri    | 项目源码 | 文件源码
def do_dispatch(self, body):
        self.url = self.path

        pieces = urlparse.urlparse(self.path)
        self.path = pieces[2]
        self.query = cgi.parse_qs(pieces[4])

        if self.path == "/":
            self.serve_main(body)
        elif self.path == "/shutdown.do":
            self.serve_shutdown(body)
        elif self.path == "/stop-transcoder.do":
            self.serve_stop_transcoder(body)
        elif self.path == "/status.do":
            self.serve_status(body)
        elif self.path == "/play.do":
            self.serve_play(body)
        elif self.path == "/stream.do":
            self.serve_stream(body)
        else:
            self.send_error(404, "File not found")
    # do_dispatch()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote):
    """like cgi.parse_qs, only with custom unquote function"""
    d = {}
    items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")]
    for item in items:
        try:
            k, v = item.split("=", 1)
        except ValueError:
            if strict_parsing:
                raise
            continue
        if v or keep_blank_values:
            k = unquote(k.replace("+", " "))
            v = unquote(v.replace("+", " "))
            if k in d:
                d[k].append(v)
            else:
                d[k] = [v]
    return d
项目:stepic_web_project    作者:izverg23    | 项目源码 | 文件源码
def application(environ, start_response):

    queryString = parse_qs(environ['QUERY_STRING'], keep_blank_values=True)
    body = ''
    print(queryString)
    for key, value in queryString.items() :
        for element in value :
            body += key + '='                                       
            body += element + '\r\n'

    print(body)                                                             

    status = '200 OK'

    response_headers = [
        ('Content-Type', 'text/plain'),
    ]
    start_response(status, response_headers)                                    

    return [body]
项目:django-simple-forum    作者:MicroPyramid    | 项目源码 | 文件源码
def extend_access_token(self, app_id, app_secret):
        """
        Extends the expiration time of a valid OAuth access token. See
        <https://developers.facebook.com/roadmap/offline-access-removal/
        #extend_token>

        """
        args = {
            "client_id": app_id,
            "client_secret": app_secret,
            "grant_type": "fb_exchange_token",
            "fb_exchange_token": self.access_token,
        }
        response = urllib2.urlopen("https://graph.facebook.com/oauth/"
                                   "access_token?" +
                                   urllib.parse.urlencode(args)).read().decode('utf-8')
        query_str = parse_qs(response)
        if "access_token" in query_str:
            result = {"accesstoken": query_str["access_token"][0]}
            if "expires" in query_str:
                result["expire"] = query_str["expires"][0]
            return result
        else:
            response = json.loads(response)
            raise GraphAPIError(response)
项目:django-simple-forum    作者:MicroPyramid    | 项目源码 | 文件源码
def get_access_token_from_code(code, redirect_uri, app_id, app_secret):

    args = {
        "code": code,
        "redirect_uri": redirect_uri,
        "client_id": app_id,
        "client_secret": app_secret,
    }
    # We would use GraphAPI.request() here, except for that the fact
    # that the response is a key-value pair, and not JSON.
    response = urllib2.urlopen("https://graph.facebook.com/oauth/access_token" +
                              "?" + urllib.parse.urlencode(args)).read().decode('utf-8')
    query_str = parse_qs(response)
    if "access_token" in query_str:
        result = {"access_token": query_str["access_token"][0]}
        if "expires" in query_str:
            result["expires"] = query_str["expires"][0]
        return result
    else:
        jsonResponse = json.loads(str(response))
        # response = json.loads(response)
        encoding = response.info().get_content_charset('utf8')
        data = json.loads(response.read().decode(encoding))
        return data
项目:time2go    作者:twitchyliquid64    | 项目源码 | 文件源码
def parse_qs_bytes(qs, keep_blank_values=False, strict_parsing=False):
        """Parses a query string like urlparse.parse_qs, but returns the
        values as byte strings.

        Keys still become type str (interpreted as latin1 in python3!)
        because it's too painful to keep them as byte strings in
        python3 and in practice they're nearly always ascii anyway.
        """
        # This is gross, but python3 doesn't give us another way.
        # Latin1 is the universal donor of character encodings.
        result = parse_qs(qs, keep_blank_values, strict_parsing,
                          encoding='latin1', errors='strict')
        encoded = {}
        for k,v in result.iteritems():
            encoded[k] = [i.encode('latin1') for i in v]
        return encoded
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def _on_request_body(self, data):
        self._request.body = data
        content_type = self._request.headers.get("Content-Type", "")

        if self._request.method == "POST":
            if content_type.startswith("application/x-www-form-urlencoded"):
                arguments = cgi.parse_qs(self._request.body)
                for name, values in arguments.iteritems():
                    values = [v for v in values if v]
                    if values:
                        self._request.arguments.setdefault(name, []).extend(
                            values)
            elif content_type.startswith("multipart/form-data"):
                if 'boundary=' in content_type:
                    boundary = content_type.split('boundary=',1)[1]
                    if boundary: self._parse_mime_body(boundary, data)
                else:
                    logging.warning("Invalid multipart/form-data")

        self.request_callback(self._request)    # ????
项目:vagus    作者:privacore    | 项目源码 | 文件源码
def do_GET(self):   
        parsed_url = urlparse.urlparse(self.path)
        parameters = cgi.parse_qs(parsed_url.query)
        #print "parameters=",parameters
        if parsed_url.path.find("..")!=-1:
            self.send_response(404)
            self.end_headers()
            return
        if parsed_url.path=="/":
            return self.serve_root()
        # /cluster/$cluster_id$
        s = parsed_url.path.split("/")
        if len(s)==3 and s[1]=="cluster":
            return self.serve_cluster(s[2])
        if len(s)==3 and s[1]=="cluster_details":
            return self.serve_cluster_details(s[2])
        self.send_response(404)
        self.end_headers()
项目:deprecated_thedap    作者:unitedvote    | 项目源码 | 文件源码
def parse_qs_bytes(qs, keep_blank_values=False, strict_parsing=False):
        """Parses a query string like urlparse.parse_qs, but returns the
        values as byte strings.

        Keys still become type str (interpreted as latin1 in python3!)
        because it's too painful to keep them as byte strings in
        python3 and in practice they're nearly always ascii anyway.
        """
        # This is gross, but python3 doesn't give us another way.
        # Latin1 is the universal donor of character encodings.
        result = parse_qs(qs, keep_blank_values, strict_parsing,
                          encoding='latin1', errors='strict')
        encoded = {}
        for k,v in result.iteritems():
            encoded[k] = [i.encode('latin1') for i in v]
        return encoded
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _decode_payload(cls, body):
    compressed_payload_str = None
    if body.startswith(cls.PAYLOAD_KEY_PARAM):
      payload_key = body[len(cls.PAYLOAD_KEY_PARAM):]
      payload_entity = _HugeTaskPayload.get(payload_key)
      compressed_payload_str = payload_entity.payload
    elif body.startswith(cls.PAYLOAD_PARAM):
      compressed_payload_str = body[len(cls.PAYLOAD_PARAM):]

    if compressed_payload_str:
      payload_str = zlib.decompress(compressed_payload_str)
    else:
      payload_str = body

    result = {}
    for (name, value) in cgi.parse_qs(payload_str).items():
      if len(value) == 1:
        result[name] = value[0]
      else:
        result[name] = value
    return result
项目:brainforks    作者:minervax    | 项目源码 | 文件源码
def do_POST(s):
        method_word = s.path[1:].split("/")
        length = int(s.headers.getheader('content-length'))
        postvars = cgi.parse_qs(s.rfile.read(length), keep_blank_values=1)
        print method_word
        ret = ""

        if method_word[0] == "getfreq":
            ret = []
            print postvars["list"][0]
            wlist = ast.literal_eval(urllib.unquote(postvars["list"][0]))
            wlist = noDup2(wlist)
            print wlist
            for words in wlist:
                ret.append(get_frequency(words))

        s.send_response(200)
        s.send_header("content-type","application/json")
        s.send_header("Access-Control-Allow-Origin","*")
        s.end_headers()
        s.wfile.write(json.dumps(ret))
        s.wfile.close();
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def parse_qs(qs, keep_blank_values=0, strict_parsing=0):
    """
    Like C{cgi.parse_qs}, but with support for parsing byte strings on Python 3.

    @type qs: C{bytes}
    """
    d = {}
    items = [s2 for s1 in qs.split(b"&") for s2 in s1.split(b";")]
    for item in items:
        try:
            k, v = item.split(b"=", 1)
        except ValueError:
            if strict_parsing:
                raise
            continue
        if v or keep_blank_values:
            k = unquote(k.replace(b"+", b" "))
            v = unquote(v.replace(b"+", b" "))
            if k in d:
                d[k].append(v)
            else:
                d[k] = [v]
    return d
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_007_get_arg(self):
        # define a new handler that does a get_arg as well as a read_body
        def new_app(env, start_response):
            body = bytes_to_str(env['wsgi.input'].read())
            a = cgi.parse_qs(body).get('a', [1])[0]
            start_response('200 OK', [('Content-type', 'text/plain')])
            return [six.b('a is %s, body is %s' % (a, body))]

        self.site.application = new_app
        sock = eventlet.connect(self.server_addr)
        request = b'\r\n'.join((
            b'POST / HTTP/1.0',
            b'Host: localhost',
            b'Content-Length: 3',
            b'',
            b'a=a'))
        sock.sendall(request)

        # send some junk after the actual request
        sock.sendall(b'01234567890123456789')
        result = read_http(sock)
        self.assertEqual(result.body, b'a is a, body is a=a')
项目:sdn    作者:ray6    | 项目源码 | 文件源码
def put_vtable(self, req, **kwargs):
        shortest_switch = self.shortest_switch_spp

        #Check request isn't blank.
        try:
            request_body_size = int(req.environ.get('CONTENT_LENGTH', 0))

        except(ValueError):
            request_body_size = 0

        d = parse_qs(req.body)

        #Interpret vlan input
        for key, value in d.items():
            host = ('00:00:00:00:00:0'+key[4])
            vlan = value[0][4]
            shortest_switch.set_vtable(host, vlan)

        body = self.html()

        return Response(content_type='text/html', body=body)
项目:sdn    作者:ray6    | 项目源码 | 文件源码
def put_vtable(self, req, **kwargs):
        simple_switch = self.simple_switch_spp

        #Check request is not Blank.
        try:
            request_body_size = int(req.environ.get('CONTENT_LENGTH',0))
        except(ValueError):
            request_body_size = 0

        d = parse_qs(req.body)

        #Interpret the vlan list
        for key, value in d.items():
            host = ('00:00:00:00:00:0'+key[4])
            vlan = value[0][4]
            simple_switch.set_vtable(host, vlan)

        body = self.html()

        return Response(content_type='text/html', body=body)
项目:dcept    作者:secureworks    | 项目源码 | 文件源码
def do_POST(s):
        length = int(s.headers['content-length'])
        postvars = cgi.parse_qs(s.rfile.read(length), keep_blank_values=1)

        logging.debug(postvars)

        try:                    
            username     = postvars['u'][0]
            domain       = postvars['d'][0]
            encTimestamp = postvars['t'][0]
        except:     
            s.send_response(500)
            s.end_headers() 
            return      

        cracker.enqueueJob(username, domain, encTimestamp, dcept.passwordHit)       

        s.send_response(200)
        s.end_headers()
项目:badoo_scrapy_splash_redis    作者:Supe2015    | 项目源码 | 文件源码
def test_form_request_from_response():
    # Copied from scrapy tests (test_from_response_submit_not_first_clickable)
    def _buildresponse(body, **kwargs):
        kwargs.setdefault('body', body)
        kwargs.setdefault('url', 'http://example.com')
        kwargs.setdefault('encoding', 'utf-8')
        return HtmlResponse(**kwargs)
    response = _buildresponse(
        """<form action="get.php" method="GET">
        <input type="submit" name="clickable1" value="clicked1">
        <input type="hidden" name="one" value="1">
        <input type="hidden" name="two" value="3">
        <input type="submit" name="clickable2" value="clicked2">
        </form>""")
    req = SplashFormRequest.from_response(
        response, formdata={'two': '2'}, clickdata={'name': 'clickable2'})
    assert req.method == 'GET'
    assert req.meta['splash']['args']['url'] == req.url
    fs = cgi.parse_qs(req.url.partition('?')[2], True)
    assert fs['clickable2'] == ['clicked2']
    assert 'clickable1' not in fs
    assert fs['one'] == ['1']
    assert fs['two'] == ['2']
项目:badoo_scrapy_splash_redis    作者:Supe2015    | 项目源码 | 文件源码
def test_form_request_from_response():
    # Copied from scrapy tests (test_from_response_submit_not_first_clickable)
    def _buildresponse(body, **kwargs):
        kwargs.setdefault('body', body)
        kwargs.setdefault('url', 'http://example.com')
        kwargs.setdefault('encoding', 'utf-8')
        return HtmlResponse(**kwargs)
    response = _buildresponse(
        """<form action="get.php" method="GET">
        <input type="submit" name="clickable1" value="clicked1">
        <input type="hidden" name="one" value="1">
        <input type="hidden" name="two" value="3">
        <input type="submit" name="clickable2" value="clicked2">
        </form>""")
    req = SplashFormRequest.from_response(
        response, formdata={'two': '2'}, clickdata={'name': 'clickable2'})
    assert req.method == 'GET'
    assert req.meta['splash']['args']['url'] == req.url
    fs = cgi.parse_qs(req.url.partition('?')[2], True)
    assert fs['clickable2'] == ['clicked2']
    assert 'clickable1' not in fs
    assert fs['one'] == ['1']
    assert fs['two'] == ['2']
项目:badoo_scrapy_splash_redis    作者:Supe2015    | 项目源码 | 文件源码
def test_form_request_from_response():
    # Copied from scrapy tests (test_from_response_submit_not_first_clickable)
    def _buildresponse(body, **kwargs):
        kwargs.setdefault('body', body)
        kwargs.setdefault('url', 'http://example.com')
        kwargs.setdefault('encoding', 'utf-8')
        return HtmlResponse(**kwargs)
    response = _buildresponse(
        """<form action="get.php" method="GET">
        <input type="submit" name="clickable1" value="clicked1">
        <input type="hidden" name="one" value="1">
        <input type="hidden" name="two" value="3">
        <input type="submit" name="clickable2" value="clicked2">
        </form>""")
    req = SplashFormRequest.from_response(
        response, formdata={'two': '2'}, clickdata={'name': 'clickable2'})
    assert req.method == 'GET'
    assert req.meta['splash']['args']['url'] == req.url
    fs = cgi.parse_qs(req.url.partition('?')[2], True)
    assert fs['clickable2'] == ['clicked2']
    assert 'clickable1' not in fs
    assert fs['one'] == ['1']
    assert fs['two'] == ['2']
项目:sharead    作者:strin    | 项目源码 | 文件源码
def _parse_token(cls, s):
        if not s:
            raise ValueError("Invalid parameter string.")

        params = parse_qs(s, keep_blank_values=False)
        if not params:
            raise ValueError("Invalid parameter string: %r" % s)

        try:
            key = params['oauth_token'][0]
        except Exception:
            raise ValueError("'oauth_token' not found in OAuth request.")

        try:
            secret = params['oauth_token_secret'][0]
        except Exception:
            raise ValueError("'oauth_token_secret' not found in "
                             "OAuth request.")

        return OAuthToken(key, secret)

# Don't use this class directly.
项目:sharead    作者:strin    | 项目源码 | 文件源码
def _parse_token(cls, s):
        if not s:
            raise ValueError("Invalid parameter string.")

        params = parse_qs(s, keep_blank_values=False)
        if not params:
            raise ValueError("Invalid parameter string: %r" % s)

        try:
            key = params['oauth_token'][0]
        except Exception:
            raise ValueError("'oauth_token' not found in OAuth request.")

        try:
            secret = params['oauth_token_secret'][0]
        except Exception:
            raise ValueError("'oauth_token_secret' not found in "
                             "OAuth request.")

        return OAuthToken(key, secret)

# Don't use this class directly.
项目:uicourses_v2    作者:sumerinlan    | 项目源码 | 文件源码
def do_POST(self):
        # http://stackoverflow.com/questions/4233218/python-basehttprequesthandler-post-variables
        ctype, pdict = cgi.parse_header(self.headers['content-type'])
        if ctype == 'multipart/form-data':
            postvars = cgi.parse_multipart(self.rfile, pdict)
        elif ctype == 'application/x-www-form-urlencoded':
            length = int(self.headers['content-length'])
            postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1)
        else:
            postvars = {}
        # print(postvars)
        if 'Username' not in list(postvars.keys()) \
                or 'Password' not in list(postvars.keys()):
            log('E', 'vali.', 'No credentials.')
            self.exit_on_error('No credentials.')
            return
        if not validate_id(postvars['Username'][0], postvars['Password'][0]):
            log('E', 'vali.', 'Wrong credentials.')
            self.exit_on_error('Wrong credentials.')
            return
        # print(postvars)
        try:
            dispatch(postvars)
            self.write_response({'Status': 'OK'})
        except:
            log('E', 'hand.', 'Handler throws an exception.')
            self.exit_on_error('Handler throws and exception.')
项目:zappa-bittorrent-tracker    作者:Miserlou    | 项目源码 | 文件源码
def get_info_hash(request, multiple=False):
    """
    Get infohashes from a QS.
    """
    if not multiple:
        return b2a_hex(cgi.parse_qs(request.query_string)['info_hash'][0])
    else:
        hashes = set()
        for hash in cgi.parse_qs(request.query_string)['info_hash']:
            hashes.add(b2a_hex(hash))
    return hashes
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def render_GET(self, request):
        def finish(data):
            # Write the data back to our client
            request.clientproto = clientproto
            transport.write(data)
            transport.loseConnection()


        # Get the referer and parse it
        referer = request.getHeader('referer')
        scheme, netloc, path, parameters, query, fragment = urlparse(referer)
        args = parse_qs(query)

        transport = request.transport
        clientproto = request.clientproto

        # Modify the original request
        request.uri = referer
        request.path = path
        request.args = args
        request.clientproto = 'HTTP/1.0'  # we don't want chunk encoding
        request.transport = FakeTransport()

        # Reload the modified request on the server, without using HTTP
        deferred = request.notifyFinish()
        request.process()

        # XXX TODO handle errors
        deferred.addCallback(lambda _: request.transport.getData()
                             ).addCallback(self.validate, request
                                           ).addCallback(finish)

        return server.NOT_DONE_YET
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def from_string(s):
        """ Returns a token from something like:
        oauth_token_secret=xxx&oauth_token=xxx
        """
        params = cgi.parse_qs(s, keep_blank_values=False)
        key = params['oauth_token'][0]
        secret = params['oauth_token_secret'][0]
        token = OAuthToken(key, secret)
        try:
            token.callback_confirmed = params['oauth_callback_confirmed'][0]
        except KeyError:
            pass # 1.0, no callback confirmed.
        return token
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def _split_url_string(param_str):
        """Turn URL string into parameters."""
        parameters = cgi.parse_qs(param_str, keep_blank_values=False)
        for k, v in parameters.iteritems():
            parameters[k] = urllib.unquote(v[0])
        return parameters
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def callback(ticket, callback_class, options):
    # NOTE: cgi.parse_qs creates arrays for *all* values
    options = cgi.parse_qs(options)
    print "options: ", options

    # get the callback, if there is one
    callback = eval("%s()" % callback_class)
    callback.set_ticket(ticket)
    callback.set_options(options)
    callback._execute()
项目:synergy-service    作者:openstack    | 项目源码 | 文件源码
def parseParameters(f):
        def wrapper(self, *args, **kw):
            if not args:
                return f(self, *args, **kw)

            context = args[0]

            query = context.get("QUERY_STRING", None)

            if query:
                parameters = parse_qs(query)

                for key in parameters:
                    value = escape(parameters[key][0])
                    value = value.replace("'", "\"")

                    try:
                        value = json.loads(value)
                    except ValueError:
                        pass

                    context[key] = value

            return f(self, *args, **kw)

        return wrapper
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testParseqs(self):
        self.failUnlessEqual(cgi.parse_qs("a=b&d=c;+=f"),
            http.parse_qs("a=b&d=c;+=f"))
        self.failUnlessRaises(ValueError, http.parse_qs, "blah",
            strict_parsing = 1)
        self.failUnlessEqual(cgi.parse_qs("a=&b=c", keep_blank_values = 1),
            http.parse_qs("a=&b=c", keep_blank_values = 1))
        self.failUnlessEqual(cgi.parse_qs("a=&b=c"),
            http.parse_qs("a=&b=c"))
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def set_token_string(self, token_string):
    """Sets the token key and secret from the token string.

    Args:
      token_string: str Token string of form
          oauth_token=[0]&oauth_token_secret=[1]. If oauth_token is not present,
          self.key will be None. If oauth_token_secret is not present,
          self.secret will be None.
    """
    token_params = cgi.parse_qs(token_string, keep_blank_values=False)
    if 'oauth_token' in token_params:
      self.key = token_params['oauth_token'][0]
    if 'oauth_token_secret' in token_params:
      self.secret = token_params['oauth_token_secret'][0]
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def from_string(s):
        params = cgi.parse_qs(s, keep_blank_values=False)
        key = params['oauth_token'][0]
        secret = params['oauth_token_secret'][0]
        return OAuthToken(key, secret)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _split_url_string(param_str):
        parameters = cgi.parse_qs(param_str, keep_blank_values=False)
        for k, v in parameters.iteritems():
            parameters[k] = urllib.unquote(v[0])
        return parameters
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def from_string(data_string):
        """Deserializes a token from a string like one returned by

        `to_string()`."""

        if not len(data_string):
            raise ValueError("Invalid parameter string.")

        params = parse_qs(data_string, keep_blank_values=False)
        if not len(params):
            raise ValueError("Invalid parameter string.")

        try:
            key = params['oauth_token'][0]
        except Exception:
            raise ValueError("'oauth_token' not found in OAuth request.")

        try:
            secret = params['oauth_token_secret'][0]
        except Exception:
            raise ValueError("'oauth_token_secret' not found in "
                "OAuth request.")

        token = YahooToken(key, secret)

        session_handle = params.get('oauth_session_handle')
        if session_handle:
            setattr(token, 'session_handle', session_handle[0])

        timestamp = params.get('token_creation_timestamp')
        if timestamp:
            setattr(token, 'timestamp', timestamp[0])

        try:
            token.callback_confirmed = params['oauth_callback_confirmed'][0]
        except KeyError:
            pass  # 1.0, no callback confirmed.

        return token
项目:Python-GoogleDrive-VideoStream    作者:ddurdle    | 项目源码 | 文件源码
def parse_query(query):
    queries = {}
    try:
        queries = cgi.parse_qs(query)
    except:
        return
    q = {}
    for key, value in queries.items():
        q[key] = value[0]
    q['mode'] = q.get('mode', 'main')
    return q
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def from_string(s):
        """Deserializes a token from a string like one returned by
        `to_string()`."""

        if not len(s):
            raise ValueError("Invalid parameter string.")

        params = parse_qs(s, keep_blank_values=False)
        if not len(params):
            raise ValueError("Invalid parameter string.")

        try:
            key = params['oauth_token'][0]
        except Exception:
            raise ValueError("'oauth_token' not found in OAuth request.")

        try:
            secret = params['oauth_token_secret'][0]
        except Exception:
            raise ValueError("'oauth_token_secret' not found in " 
                "OAuth request.")

        token = Token(key, secret)
        try:
            token.callback_confirmed = params['oauth_callback_confirmed'][0]
        except KeyError:
            pass  # 1.0, no callback confirmed.
        return token
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def to_url(self):
        """Serialize as a URL for a GET request."""
        base_url = urlparse.urlparse(self.url)
        try:
            query = base_url.query
        except AttributeError:
            # must be python <2.5
            query = base_url[4]
        query = parse_qs(query)
        for k, v in self.items():
            query.setdefault(k, []).append(v)

        try:
            scheme = base_url.scheme
            netloc = base_url.netloc
            path = base_url.path
            params = base_url.params
            fragment = base_url.fragment
        except AttributeError:
            # must be python <2.5
            scheme = base_url[0]
            netloc = base_url[1]
            path = base_url[2]
            params = base_url[3]
            fragment = base_url[5]

        url = (scheme, netloc, path, params,
               urllib.urlencode(query, True), fragment)
        return urlparse.urlunparse(url)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def _split_url_string(param_str):
        """Turn URL string into parameters."""
        parameters = parse_qs(param_str.encode('utf-8'), keep_blank_values=True)
        for k, v in parameters.iteritems():
            parameters[k] = urllib.unquote(v[0])
        return parameters
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def from_string(data_string):
        """Deserializes a token from a string like one returned by

        `to_string()`."""

        if not len(data_string):
            raise ValueError("Invalid parameter string.")

        params = parse_qs(data_string, keep_blank_values=False)
        if not len(params):
            raise ValueError("Invalid parameter string.")

        try:
            key = params['oauth_token'][0]
        except Exception:
            raise ValueError("'oauth_token' not found in OAuth request.")

        try:
            secret = params['oauth_token_secret'][0]
        except Exception:
            raise ValueError("'oauth_token_secret' not found in "
                "OAuth request.")

        token = YahooToken(key, secret)

        session_handle = params.get('oauth_session_handle')
        if session_handle:
            setattr(token, 'session_handle', session_handle[0])

        timestamp = params.get('token_creation_timestamp')
        if timestamp:
            setattr(token, 'timestamp', timestamp[0])

        try:
            token.callback_confirmed = params['oauth_callback_confirmed'][0]
        except KeyError:
            pass # 1.0, no callback confirmed.

        return token
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_deprecated_parse_qs(self):
        # this func is moved to urllib.parse, this is just a sanity check
        with check_warnings(('cgi.parse_qs is deprecated, use urllib.parse.'
                             'parse_qs instead', DeprecationWarning)):
            self.assertEqual({'a': ['A1'], 'B': ['B3'], 'b': ['B2']},
                             cgi.parse_qs('a=A1&b=B2&B=B3'))
项目:yt    作者:yt-project    | 项目源码 | 文件源码
def GET(self):
        """ The QUERY_STRING parsed into a MultiDict.

            Keys and values are strings. Multiple values per key are possible.
            See MultiDict for details.
        """
        if 'bottle.get' not in self.environ:
            data = parse_qs(self.query_string, keep_blank_values=True)
            get = self.environ['bottle.get'] = MultiDict()
            for key, values in data.iteritems():
                for value in values:
                    get[key] = value
        return self.environ['bottle.get']
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_deprecated_parse_qs(self):
        # this func is moved to urlparse, this is just a sanity check
        with check_warnings(('cgi.parse_qs is deprecated, use urlparse.'
                             'parse_qs instead', PendingDeprecationWarning)):
            self.assertEqual({'a': ['A1'], 'B': ['B3'], 'b': ['B2']},
                             cgi.parse_qs('a=A1&b=B2&B=B3'))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_deprecated_parse_qs(self):
        # this func is moved to urlparse, this is just a sanity check
        with check_warnings(('cgi.parse_qs is deprecated, use urlparse.'
                             'parse_qs instead', PendingDeprecationWarning)):
            self.assertEqual({'a': ['A1'], 'B': ['B3'], 'b': ['B2']},
                             cgi.parse_qs('a=A1&b=B2&B=B3'))
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def qs(s):
    if isinstance(s, six.binary_type):
        s = s.decode('utf8')
    blob = cgi.parse_qs(s)
    if len(blob['data']) != 1:
        pytest.fail('found multi-item data: %s' % blob['data'])
    json_bytes = base64.b64decode(blob['data'][0])
    blob['data'] = json.loads(json_bytes.decode('utf8'))
    return blob
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def _assertRequested(self, expect_url, expect_data):
        mock_response = Mock()
        mock_response.read.return_value = six.b('{"status":1, "error": null}')
        with patch('six.moves.urllib.request.urlopen', return_value=mock_response) as urlopen:
            yield

            assert urlopen.call_count == 1
            ((request,), _) = urlopen.call_args
            assert request.get_full_url() == expect_url
            data = urllib.parse.parse_qs(request.data.decode('utf8'))
            assert len(data['data']) == 1
            payload_encoded = data['data'][0]
            payload_json = base64.b64decode(payload_encoded).decode('utf8')
            payload = json.loads(payload_json)
            assert payload == expect_data
项目:FireMISP    作者:deralexxx    | 项目源码 | 文件源码
def _parse_POST(self):
        import cgi
        ctype, pdict = cgi.parse_header(self.headers.getheader('content-type'))
        if ctype == 'multipart/form-data':
            postvars = cgi.parse_multipart(self.rfile, pdict)
        elif ctype == 'application/x-www-form-urlencoded':
            length = int(self.headers.getheader('content-length'))
            postvars = cgi.parse_qs(
                self.rfile.read(length), keep_blank_values=1)
        else:
            postvars = {}

        return postvars

    # -------------- POST handler: where the magic happens --------------
项目:Zappa    作者:Miserlou    | 项目源码 | 文件源码
def hello_world(environ, start_response):
    parameters = parse_qs(environ.get('QUERY_STRING', ''))
    if 'subject' in parameters:
        subject = escape(parameters['subject'][0])
    else:
        subject = 'World'
    start_response('200 OK', [('Content-Type', 'text/html')])
    return ['''Hello {subject!s}
    Hello {subject!s}!

'''.format(**{'subject': subject})]
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def query(self):
        ''' The :attr:`query_string` parsed into a :class:`FormsDict`. These
            values are sometimes called "URL arguments" or "GET parameters", but
            not to be confused with "URL wildcards" as they are provided by the
            :class:`Router`. '''
        data = parse_qs(self.query_string, keep_blank_values=True)
        get = self.environ['bottle.get'] = FormsDict()
        for key, values in data.iteritems():
            for value in values:
                get[key] = value
        return get
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testParseqs(self):
        self.failUnlessEqual(cgi.parse_qs("a=b&d=c;+=f"),
            http.parse_qs("a=b&d=c;+=f"))
        self.failUnlessRaises(ValueError, http.parse_qs, "blah",
            strict_parsing = 1)
        self.failUnlessEqual(cgi.parse_qs("a=&b=c", keep_blank_values = 1),
            http.parse_qs("a=&b=c", keep_blank_values = 1))
        self.failUnlessEqual(cgi.parse_qs("a=&b=c"),
            http.parse_qs("a=&b=c"))