Python urllib.request 模块,selector() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用urllib.request.selector()

项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def _parse(self):
        self.type, rest = splittype(self.full_url)
        if self.type is None:
            raise ValueError("unknown url type: %r" % self.full_url)
        self.host, self.selector = splithost(rest)
        if self.host:
            self.host = unquote(self.host)
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def set_proxy(self, host, type):
        if self.type == 'https' and not self._tunnel_host:
            self._tunnel_host = self.host
        else:
            self.type= type
            self.selector = self.full_url
        self.host = host
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def has_proxy(self):
        return self.selector == self.full_url
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def file_open(self, req):
        url = req.selector
        if url[:2] == '//' and url[2:3] != '/' and (req.host and
                req.host != 'localhost'):
            if not req.host is self.get_names():
                raise URLError("file:// scheme is supported only on localhost")
        else:
            return self.open_local_file(req)

    # names for the localhost
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def open_local_file(self, req):
        import future.backports.email.utils as email_utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email_utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(exp)
        raise URLError('file not on local host')
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def open(self, fullurl, data=None):
        """Use URLopener().open(file) instead of open(file, 'r')."""
        fullurl = unwrap(to_bytes(fullurl))
        fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|")
        if self.tempcache and fullurl in self.tempcache:
            filename, headers = self.tempcache[fullurl]
            fp = open(filename, 'rb')
            return addinfourl(fp, headers, fullurl)
        urltype, url = splittype(fullurl)
        if not urltype:
            urltype = 'file'
        if urltype in self.proxies:
            proxy = self.proxies[urltype]
            urltype, proxyhost = splittype(proxy)
            host, selector = splithost(proxyhost)
            url = (host, fullurl) # Signal special case to open_*()
        else:
            proxy = None
        name = 'open_' + urltype
        self.type = urltype
        name = name.replace('-', '_')
        if not hasattr(self, name):
            if proxy:
                return self.open_unknown_proxy(proxy, fullurl, data)
            else:
                return self.open_unknown(fullurl, data)
        try:
            if data is None:
                return getattr(self, name)(url)
            else:
                return getattr(self, name)(url, data)
        except HTTPError:
            raise
        except socket.error as msg:
            raise_with_traceback(IOError('socket error', msg))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _parse(self):
        self.type, rest = splittype(self.full_url)
        if self.type is None:
            raise ValueError("unknown url type: %s" % self.full_url)
        self.host, self.selector = splithost(rest)
        if self.host:
            self.host = unquote(self.host)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def set_proxy(self, host, type):
        if self.type == 'https' and not self._tunnel_host:
            self._tunnel_host = self.host
        else:
            self.type= type
            self.selector = self.full_url
        self.host = host
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def has_proxy(self):
        return self.selector == self.full_url
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def do_request_(self, request):
        host = request.host
        if not host:
            raise URLError('no host given')

        if request.data is not None:  # POST
            data = request.data
            if isinstance(data, str):
                raise TypeError("POST data should be bytes"
                        " or an iterable of bytes. It cannot be str.")
            if not request.has_header('Content-type'):
                request.add_unredirected_header(
                    'Content-type',
                    'application/x-www-form-urlencoded')
            if not request.has_header('Content-length'):
                try:
                    mv = memoryview(data)
                except TypeError:
                    if isinstance(data, collections.Iterable):
                        raise ValueError("Content-Length should be specified "
                                "for iterable data of type %r %r" % (type(data),
                                data))
                else:
                    request.add_unredirected_header(
                            'Content-length', '%d' % (len(mv) * mv.itemsize))

        sel_host = host
        if request.has_proxy():
            scheme, sel = splittype(request.selector)
            sel_host, sel_path = splithost(sel)
        if not request.has_header('Host'):
            request.add_unredirected_header('Host', sel_host)
        for name, value in self.parent.addheaders:
            name = name.capitalize()
            if not request.has_header(name):
                request.add_unredirected_header(name, value)

        return request
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def file_open(self, req):
        url = req.selector
        if url[:2] == '//' and url[2:3] != '/' and (req.host and
                req.host != 'localhost'):
            if not req.host is self.get_names():
                raise URLError("file:// scheme is supported only on localhost")
        else:
            return self.open_local_file(req)

    # names for the localhost
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def open_local_file(self, req):
        import email.utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as msg:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(msg)
        raise URLError('file not on local host')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def open(self, fullurl, data=None):
        """Use URLopener().open(file) instead of open(file, 'r')."""
        fullurl = unwrap(to_bytes(fullurl))
        fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|")
        if self.tempcache and fullurl in self.tempcache:
            filename, headers = self.tempcache[fullurl]
            fp = open(filename, 'rb')
            return addinfourl(fp, headers, fullurl)
        urltype, url = splittype(fullurl)
        if not urltype:
            urltype = 'file'
        if urltype in self.proxies:
            proxy = self.proxies[urltype]
            urltype, proxyhost = splittype(proxy)
            host, selector = splithost(proxyhost)
            url = (host, fullurl) # Signal special case to open_*()
        else:
            proxy = None
        name = 'open_' + urltype
        self.type = urltype
        name = name.replace('-', '_')
        if not hasattr(self, name):
            if proxy:
                return self.open_unknown_proxy(proxy, fullurl, data)
            else:
                return self.open_unknown(fullurl, data)
        try:
            if data is None:
                return getattr(self, name)(url)
            else:
                return getattr(self, name)(url, data)
        except socket.error as msg:
            raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def _parse(self):
        self.type, rest = splittype(self.full_url)
        if self.type is None:
            raise ValueError("unknown url type: %r" % self.full_url)
        self.host, self.selector = splithost(rest)
        if self.host:
            self.host = unquote(self.host)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def set_proxy(self, host, type):
        if self.type == 'https' and not self._tunnel_host:
            self._tunnel_host = self.host
        else:
            self.type= type
            self.selector = self.full_url
        self.host = host
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def has_proxy(self):
        return self.selector == self.full_url
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def file_open(self, req):
        url = req.selector
        if url[:2] == '//' and url[2:3] != '/' and (req.host and
                req.host != 'localhost'):
            if not req.host is self.get_names():
                raise URLError("file:// scheme is supported only on localhost")
        else:
            return self.open_local_file(req)

    # names for the localhost
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def open_local_file(self, req):
        import future.backports.email.utils as email_utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email_utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(exp)
        raise URLError('file not on local host')
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def open(self, fullurl, data=None):
        """Use URLopener().open(file) instead of open(file, 'r')."""
        fullurl = unwrap(to_bytes(fullurl))
        fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|")
        if self.tempcache and fullurl in self.tempcache:
            filename, headers = self.tempcache[fullurl]
            fp = open(filename, 'rb')
            return addinfourl(fp, headers, fullurl)
        urltype, url = splittype(fullurl)
        if not urltype:
            urltype = 'file'
        if urltype in self.proxies:
            proxy = self.proxies[urltype]
            urltype, proxyhost = splittype(proxy)
            host, selector = splithost(proxyhost)
            url = (host, fullurl) # Signal special case to open_*()
        else:
            proxy = None
        name = 'open_' + urltype
        self.type = urltype
        name = name.replace('-', '_')
        if not hasattr(self, name):
            if proxy:
                return self.open_unknown_proxy(proxy, fullurl, data)
            else:
                return self.open_unknown(fullurl, data)
        try:
            if data is None:
                return getattr(self, name)(url)
            else:
                return getattr(self, name)(url, data)
        except HTTPError:
            raise
        except socket.error as msg:
            raise_with_traceback(IOError('socket error', msg))
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def _parse(self):
        self.type, rest = splittype(self.full_url)
        if self.type is None:
            raise ValueError("unknown url type: %r" % self.full_url)
        self.host, self.selector = splithost(rest)
        if self.host:
            self.host = unquote(self.host)
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def set_proxy(self, host, type):
        if self.type == 'https' and not self._tunnel_host:
            self._tunnel_host = self.host
        else:
            self.type= type
            self.selector = self.full_url
        self.host = host
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def has_proxy(self):
        return self.selector == self.full_url
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def file_open(self, req):
        url = req.selector
        if url[:2] == '//' and url[2:3] != '/' and (req.host and
                req.host != 'localhost'):
            if not req.host is self.get_names():
                raise URLError("file:// scheme is supported only on localhost")
        else:
            return self.open_local_file(req)

    # names for the localhost
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def open_local_file(self, req):
        import future.backports.email.utils as email_utils
        import mimetypes
        host = req.host
        filename = req.selector
        localfile = url2pathname(filename)
        try:
            stats = os.stat(localfile)
            size = stats.st_size
            modified = email_utils.formatdate(stats.st_mtime, usegmt=True)
            mtype = mimetypes.guess_type(filename)[0]
            headers = email.message_from_string(
                'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
                (mtype or 'text/plain', size, modified))
            if host:
                host, port = splitport(host)
            if not host or \
                (not port and _safe_gethostbyname(host) in self.get_names()):
                if host:
                    origurl = 'file://' + host + filename
                else:
                    origurl = 'file://' + filename
                return addinfourl(open(localfile, 'rb'), headers, origurl)
        except OSError as exp:
            # users shouldn't expect OSErrors coming from urlopen()
            raise URLError(exp)
        raise URLError('file not on local host')
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def open(self, fullurl, data=None):
        """Use URLopener().open(file) instead of open(file, 'r')."""
        fullurl = unwrap(to_bytes(fullurl))
        fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|")
        if self.tempcache and fullurl in self.tempcache:
            filename, headers = self.tempcache[fullurl]
            fp = open(filename, 'rb')
            return addinfourl(fp, headers, fullurl)
        urltype, url = splittype(fullurl)
        if not urltype:
            urltype = 'file'
        if urltype in self.proxies:
            proxy = self.proxies[urltype]
            urltype, proxyhost = splittype(proxy)
            host, selector = splithost(proxyhost)
            url = (host, fullurl) # Signal special case to open_*()
        else:
            proxy = None
        name = 'open_' + urltype
        self.type = urltype
        name = name.replace('-', '_')
        if not hasattr(self, name):
            if proxy:
                return self.open_unknown_proxy(proxy, fullurl, data)
            else:
                return self.open_unknown(fullurl, data)
        try:
            if data is None:
                return getattr(self, name)(url)
            else:
                return getattr(self, name)(url, data)
        except HTTPError:
            raise
        except socket.error as msg:
            raise_with_traceback(IOError('socket error', msg))
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def _parse(self):
        self.type, rest = splittype(self.full_url)
        if self.type is None:
            raise ValueError("unknown url type: %r" % self.full_url)
        self.host, self.selector = splithost(rest)
        if self.host:
            self.host = unquote(self.host)
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def set_proxy(self, host, type):
        if self.type == 'https' and not self._tunnel_host:
            self._tunnel_host = self.host
        else:
            self.type= type
            self.selector = self.full_url
        self.host = host
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def has_proxy(self):
        return self.selector == self.full_url