Python http.client 模块,HTTPException() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用http.client.HTTPException()

项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_named_sequences_full(self):
        # Check all the named sequences
        url = ("http://www.unicode.org/Public/%s/ucd/NamedSequences.txt" %
               unicodedata.unidata_version)
        try:
            testdata = support.open_urlresource(url, encoding="utf-8",
                                                check=check_version)
        except (IOError, HTTPException):
            self.skipTest("Could not retrieve " + url)
        self.addCleanup(testdata.close)
        for line in testdata:
            line = line.strip()
            if not line or line.startswith('#'):
                continue
            seqname, codepoints = line.split(';')
            codepoints = ''.join(chr(int(cp, 16)) for cp in codepoints.split())
            self.assertEqual(unicodedata.lookup(seqname), codepoints)
            with self.assertRaises(SyntaxError):
                self.checkletter(seqname, None)
            with self.assertRaises(KeyError):
                unicodedata.ucd_3_2_0.lookup(seqname)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_named_sequences_full(self):
        # Check all the named sequences
        url = ("http://www.pythontest.net/unicode/%s/NamedSequences.txt" %
               unicodedata.unidata_version)
        try:
            testdata = support.open_urlresource(url, encoding="utf-8",
                                                check=check_version)
        except (OSError, HTTPException):
            self.skipTest("Could not retrieve " + url)
        self.addCleanup(testdata.close)
        for line in testdata:
            line = line.strip()
            if not line or line.startswith('#'):
                continue
            seqname, codepoints = line.split(';')
            codepoints = ''.join(chr(int(cp, 16)) for cp in codepoints.split())
            self.assertEqual(unicodedata.lookup(seqname), codepoints)
            with self.assertRaises(SyntaxError):
                self.checkletter(seqname, None)
            with self.assertRaises(KeyError):
                unicodedata.ucd_3_2_0.lookup(seqname)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_named_sequences_full(self):
        # Check all the named sequences
        url = ("http://www.unicode.org/Public/%s/ucd/NamedSequences.txt" %
               unicodedata.unidata_version)
        try:
            testdata = support.open_urlresource(url, encoding="utf-8",
                                                check=check_version)
        except (OSError, HTTPException):
            self.skipTest("Could not retrieve " + url)
        self.addCleanup(testdata.close)
        for line in testdata:
            line = line.strip()
            if not line or line.startswith('#'):
                continue
            seqname, codepoints = line.split(';')
            codepoints = ''.join(chr(int(cp, 16)) for cp in codepoints.split())
            self.assertEqual(unicodedata.lookup(seqname), codepoints)
            with self.assertRaises(SyntaxError):
                self.checkletter(seqname, None)
            with self.assertRaises(KeyError):
                unicodedata.ucd_3_2_0.lookup(seqname)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_download_failed_HTTPException(self, mock_urlopen):
        mock_urlopen.side_effect = httplib.HTTPException()
        fake_request = urllib2.Request('http://fakeurl.com')

        self.assertRaises(
            self.glance.RetryableError,
            self.glance._download_tarball_and_verify,
            fake_request, 'fake_staging_path')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
        unittest.TestCase.__init__(self, *args, **kw)
        try:
            self.open_mapping_file().close() # test it to report the error early
        except (IOError, HTTPException):
            self.skipTest("Could not retrieve "+self.mapfileurl)
项目:the-knights-who-say-ni    作者:python    | 项目源码 | 文件源码
def test_failure(self):
        host = bpo.Host(util.FakeServerHost())
        failed_response = util.FakeResponse(status=404)
        fake_session = util.FakeSession(response=failed_response)
        with self.assertRaises(client.HTTPException):
            self.run_awaitable(host.check(fake_session, ['brettcannon']))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_too_many_headers(self):
        headers = '\r\n'.join('Header%d: foo' % i
                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
        text = ('HTTP/1.1 200 OK\r\n' + headers)
        s = FakeSocket(text)
        r = client.HTTPResponse(s)
        self.assertRaisesRegex(client.HTTPException,
                               r"got more than \d+ headers", r.begin)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
        unittest.TestCase.__init__(self, *args, **kw)
        try:
            self.open_mapping_file().close() # test it to report the error early
        except (IOError, HTTPException):
            self.skipTest("Could not retrieve "+self.mapfileurl)
项目:python-phoenixdb    作者:lalinsky    | 项目源码 | 文件源码
def connect(self):
        """Opens a HTTP connection to the RPC server."""
        logger.debug("Opening connection to %s:%s", self.url.hostname, self.url.port)
        try:
            self.connection = httplib.HTTPConnection(self.url.hostname, self.url.port)
            self.connection.connect()
        except (httplib.HTTPException, socket.error) as e:
            raise errors.InterfaceError('Unable to connect to the specified service', e)
项目:python-phoenixdb    作者:lalinsky    | 项目源码 | 文件源码
def close(self):
        """Closes the HTTP connection to the RPC server."""
        if self.connection is not None:
            logger.debug("Closing connection to %s:%s", self.url.hostname, self.url.port)
            try:
                self.connection.close()
            except httplib.HTTPException:
                logger.warning("Error while closing connection", exc_info=True)
            self.connection = None
项目:python-phoenixdb    作者:lalinsky    | 项目源码 | 文件源码
def _post_request(self, body, headers):
        retry_count = self.max_retries
        while True:
            logger.debug("POST %s %r %r", self.url.path, body, headers)
            try:
                self.connection.request('POST', self.url.path, body=body, headers=headers)
                response = self.connection.getresponse()
            except httplib.HTTPException as e:
                if retry_count > 0:
                    delay = math.exp(-retry_count)
                    logger.debug("HTTP protocol error, will retry in %s seconds...", delay, exc_info=True)
                    self.close()
                    self.connect()
                    time.sleep(delay)
                    retry_count -= 1
                    continue
                raise errors.InterfaceError('RPC request failed', cause=e)
            else:
                if response.status == httplib.SERVICE_UNAVAILABLE:
                    if retry_count > 0:
                        delay = math.exp(-retry_count)
                        logger.debug("Service unavailable, will retry in %s seconds...", delay, exc_info=True)
                        time.sleep(delay)
                        retry_count -= 1
                        continue
                return response
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_too_many_headers(self):
        headers = '\r\n'.join('Header%d: foo' % i
                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
        text = ('HTTP/1.1 200 OK\r\n' + headers)
        s = FakeSocket(text)
        r = client.HTTPResponse(s)
        self.assertRaisesRegex(client.HTTPException,
                               r"got more than \d+ headers", r.begin)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        try:
            self.open_mapping_file().close() # test it to report the error early
        except (OSError, HTTPException):
            self.skipTest("Could not retrieve "+self.mapfileurl)
项目:instagram-livestream-downloader    作者:taengstagram    | 项目源码 | 文件源码
def get_live(self, first_comment_created_at=0):
        comments_collected = self.comments
        commenter_ids = self.user_config.commenters or []

        before_count = len(comments_collected)
        try:
            comments_res = self.api.broadcast_comments(
                self.broadcast['id'], last_comment_ts=first_comment_created_at)
            comments = comments_res.get('comments', [])
            first_comment_created_at = (
                comments[0]['created_at_utc'] if comments else int(time.time() - 5))
            # save comment if it's in list of commenter IDs or if user is verified
            comments_collected.extend(
                list(filter(
                    lambda x: (str(x['user_id']) in commenter_ids or
                               x['user']['username'] in commenter_ids or
                               x['user']['is_verified']),
                    comments)))
            after_count = len(comments_collected)
            if after_count > before_count:
                # save intermediately to avoid losing comments due to unexpected errors
                broadcast = self.broadcast.copy()
                broadcast.pop('segments', None)     # save space
                broadcast['comments'] = comments_collected
                with open(self.destination_file, 'w') as outfile:
                    json.dump(broadcast, outfile, indent=2)
            self.comments = comments_collected

        except (SSLError, timeout, URLError, HTTPException, SocketError) as e:
            # Probably transient network error, ignore and continue
            self.logger.warning('Comment collection error: %s' % e)
        except ClientError as e:
            if e.code == 500:
                self.logger.warning('Comment collection ClientError: %d %s' % (e.code, e.error_response))
            elif e.code == 400 and not e.msg:   # 400 error fail but no error message
                self.logger.warning('Comment collection ClientError: %d %s' % (e.code, e.error_response))
            else:
                raise e
        finally:
            time.sleep(4)
        return first_comment_created_at
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_too_many_headers(self):
        headers = '\r\n'.join('Header%d: foo' % i
                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
        text = ('HTTP/1.1 200 OK\r\n' + headers)
        s = FakeSocket(text)
        r = client.HTTPResponse(s)
        self.assertRaisesRegex(client.HTTPException,
                               r"got more than \d+ headers", r.begin)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        try:
            self.open_mapping_file().close() # test it to report the error early
        except (OSError, HTTPException):
            self.skipTest("Could not retrieve "+self.mapfileurl)
项目:pyttanko    作者:Francesco149    | 项目源码 | 文件源码
def osu_get(conn, endpoint, paramsdict=None):
    '''GETs /api/endpoint?paramsdict&k=args.key from conn.
    return json object, exits process on api errors'''
    global osu_treset, osu_ncalls, args

    sys.stderr.write("%s %s\n" % (endpoint, str(paramsdict)))

    paramsdict["k"] = args.key
    path = "/api/%s?%s" % (endpoint, urllib.urlencode(paramsdict))

    while True:
        while True:
            if time.time() >= osu_treset:
                osu_ncalls = 0
                osu_treset = time.time() + 60
                sys.stderr.write("\napi ready\n")

            if osu_ncalls < 60:
                break
            else:
                sys.stderr.write("waiting for api cooldown...\r")
                time.sleep(1)


        try:
            conn.request("GET", path)
            osu_ncalls += 1
            r = conn.getresponse()

            raw = ""

            while True:
                try:
                    raw += r.read()
                    break
                except httplib.IncompleteRead as e:
                    raw += e.partial

            j = json.loads(raw)

            if "error" in j:
                sys.stderr.write("%s\n" % j["error"])
                sys.exit(1)

            return j

        except (httplib.HTTPException, ValueError) as e:
            sys.stderr.write("%s\n" % (traceback.format_exc()))

            try:
                '''
                prevents exceptions on next request if the
                response wasn't previously read due to errors
                '''
                conn.getresponse().read()

            except httplib.HTTPException:
                pass

            time.sleep(5)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_main(self):
        part = None
        part1_data = {}
        # Hit the exception early
        try:
            testdata = open_urlresource(TESTDATAURL, encoding="utf-8",
                                        check=check_version)
        except (IOError, HTTPException):
            self.skipTest("Could not retrieve " + TESTDATAURL)
        self.addCleanup(testdata.close)
        for line in testdata:
            if '#' in line:
                line = line.split('#')[0]
            line = line.strip()
            if not line:
                continue
            if line.startswith("@Part"):
                part = line.split()[0]
                continue
            try:
                c1,c2,c3,c4,c5 = [unistr(x) for x in line.split(';')[:-1]]
            except RangeError:
                # Skip unsupported characters;
                # try atleast adding c1 if we are in part1
                if part == "@Part1":
                    try:
                        c1 = unistr(line.split(';')[0])
                    except RangeError:
                        pass
                    else:
                        part1_data[c1] = 1
                continue

            # Perform tests
            self.assertTrue(c2 ==  NFC(c1) ==  NFC(c2) ==  NFC(c3), line)
            self.assertTrue(c4 ==  NFC(c4) ==  NFC(c5), line)
            self.assertTrue(c3 ==  NFD(c1) ==  NFD(c2) ==  NFD(c3), line)
            self.assertTrue(c5 ==  NFD(c4) ==  NFD(c5), line)
            self.assertTrue(c4 == NFKC(c1) == NFKC(c2) == \
                            NFKC(c3) == NFKC(c4) == NFKC(c5),
                            line)
            self.assertTrue(c5 == NFKD(c1) == NFKD(c2) == \
                            NFKD(c3) == NFKD(c4) == NFKD(c5),
                            line)

            # Record part 1 data
            if part == "@Part1":
                part1_data[c1] = 1

        # Perform tests for all other data
        for c in range(sys.maxunicode+1):
            X = chr(c)
            if X in part1_data:
                continue
            self.assertTrue(X == NFC(X) == NFD(X) == NFKC(X) == NFKD(X), c)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_main(self):
        part = None
        part1_data = {}
        # Hit the exception early
        try:
            testdata = open_urlresource(TESTDATAURL, encoding="utf-8",
                                        check=check_version)
        except (IOError, HTTPException):
            self.skipTest("Could not retrieve " + TESTDATAURL)
        self.addCleanup(testdata.close)
        for line in testdata:
            if '#' in line:
                line = line.split('#')[0]
            line = line.strip()
            if not line:
                continue
            if line.startswith("@Part"):
                part = line.split()[0]
                continue
            try:
                c1,c2,c3,c4,c5 = [unistr(x) for x in line.split(';')[:-1]]
            except RangeError:
                # Skip unsupported characters;
                # try at least adding c1 if we are in part1
                if part == "@Part1":
                    try:
                        c1 = unistr(line.split(';')[0])
                    except RangeError:
                        pass
                    else:
                        part1_data[c1] = 1
                continue

            # Perform tests
            self.assertTrue(c2 ==  NFC(c1) ==  NFC(c2) ==  NFC(c3), line)
            self.assertTrue(c4 ==  NFC(c4) ==  NFC(c5), line)
            self.assertTrue(c3 ==  NFD(c1) ==  NFD(c2) ==  NFD(c3), line)
            self.assertTrue(c5 ==  NFD(c4) ==  NFD(c5), line)
            self.assertTrue(c4 == NFKC(c1) == NFKC(c2) == \
                            NFKC(c3) == NFKC(c4) == NFKC(c5),
                            line)
            self.assertTrue(c5 == NFKD(c1) == NFKD(c2) == \
                            NFKD(c3) == NFKD(c4) == NFKD(c5),
                            line)

            # Record part 1 data
            if part == "@Part1":
                part1_data[c1] = 1

        # Perform tests for all other data
        for c in range(sys.maxunicode+1):
            X = chr(c)
            if X in part1_data:
                continue
            self.assertTrue(X == NFC(X) == NFD(X) == NFKC(X) == NFKD(X), c)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_main(self):
        part = None
        part1_data = {}
        # Hit the exception early
        try:
            testdata = open_urlresource(TESTDATAURL, encoding="utf-8",
                                        check=check_version)
        except (OSError, HTTPException):
            self.skipTest("Could not retrieve " + TESTDATAURL)
        self.addCleanup(testdata.close)
        for line in testdata:
            if '#' in line:
                line = line.split('#')[0]
            line = line.strip()
            if not line:
                continue
            if line.startswith("@Part"):
                part = line.split()[0]
                continue
            try:
                c1,c2,c3,c4,c5 = [unistr(x) for x in line.split(';')[:-1]]
            except RangeError:
                # Skip unsupported characters;
                # try at least adding c1 if we are in part1
                if part == "@Part1":
                    try:
                        c1 = unistr(line.split(';')[0])
                    except RangeError:
                        pass
                    else:
                        part1_data[c1] = 1
                continue

            # Perform tests
            self.assertTrue(c2 ==  NFC(c1) ==  NFC(c2) ==  NFC(c3), line)
            self.assertTrue(c4 ==  NFC(c4) ==  NFC(c5), line)
            self.assertTrue(c3 ==  NFD(c1) ==  NFD(c2) ==  NFD(c3), line)
            self.assertTrue(c5 ==  NFD(c4) ==  NFD(c5), line)
            self.assertTrue(c4 == NFKC(c1) == NFKC(c2) == \
                            NFKC(c3) == NFKC(c4) == NFKC(c5),
                            line)
            self.assertTrue(c5 == NFKD(c1) == NFKD(c2) == \
                            NFKD(c3) == NFKD(c4) == NFKD(c5),
                            line)

            # Record part 1 data
            if part == "@Part1":
                part1_data[c1] = 1

        # Perform tests for all other data
        for c in range(sys.maxunicode+1):
            X = chr(c)
            if X in part1_data:
                continue
            self.assertTrue(X == NFC(X) == NFD(X) == NFKC(X) == NFKD(X), c)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_main(self):
        part = None
        part1_data = {}
        # Hit the exception early
        try:
            testdata = open_urlresource(TESTDATAURL, encoding="utf-8",
                                        check=check_version)
        except (OSError, HTTPException):
            self.skipTest("Could not retrieve " + TESTDATAURL)
        self.addCleanup(testdata.close)
        for line in testdata:
            if '#' in line:
                line = line.split('#')[0]
            line = line.strip()
            if not line:
                continue
            if line.startswith("@Part"):
                part = line.split()[0]
                continue
            try:
                c1,c2,c3,c4,c5 = [unistr(x) for x in line.split(';')[:-1]]
            except RangeError:
                # Skip unsupported characters;
                # try at least adding c1 if we are in part1
                if part == "@Part1":
                    try:
                        c1 = unistr(line.split(';')[0])
                    except RangeError:
                        pass
                    else:
                        part1_data[c1] = 1
                continue

            # Perform tests
            self.assertTrue(c2 ==  NFC(c1) ==  NFC(c2) ==  NFC(c3), line)
            self.assertTrue(c4 ==  NFC(c4) ==  NFC(c5), line)
            self.assertTrue(c3 ==  NFD(c1) ==  NFD(c2) ==  NFD(c3), line)
            self.assertTrue(c5 ==  NFD(c4) ==  NFD(c5), line)
            self.assertTrue(c4 == NFKC(c1) == NFKC(c2) == \
                            NFKC(c3) == NFKC(c4) == NFKC(c5),
                            line)
            self.assertTrue(c5 == NFKD(c1) == NFKD(c2) == \
                            NFKD(c3) == NFKD(c4) == NFKD(c5),
                            line)

            # Record part 1 data
            if part == "@Part1":
                part1_data[c1] = 1

        # Perform tests for all other data
        for c in range(sys.maxunicode+1):
            X = chr(c)
            if X in part1_data:
                continue
            self.assertTrue(X == NFC(X) == NFD(X) == NFKC(X) == NFKD(X), c)