Python httpretty 模块,PUT 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用httpretty.PUT

项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_get_or_create_db():
    server = BootstrapServer("http://test.test:5984")
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/test", status=404, body=""
    )
    def create_test_db(request, uri, headers):
        httpretty.reset()
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/test", status=200
        )
        httpretty.register_uri(
            httpretty.PUT, "http://test.test:5984/test", status=500
        )
        return 201, headers, ""
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/test", body=create_test_db
    )
    assert "test" not in server
    test_db = server.get_or_create("test")
    assert "test" in server
项目:openag_python    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_get_or_create_db():
    server = Server("http://test.test:5984")
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/test", status=404, body=""
    )
    def create_test_db(request, uri, headers):
        httpretty.reset()
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/test", status=200
        )
        httpretty.register_uri(
            httpretty.PUT, "http://test.test:5984/test", status=500
        )
        return 201, headers, ""
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/test", body=create_test_db
    )
    assert "test" not in server
    test_db = server.get_or_create("test")
    assert "test" in server
项目:cloudstrype    作者:btimby    | 项目源码 | 文件源码
def test_upload(self):
        httpretty.register_uri(
            httpretty.PUT,
            OnedriveAPIClient.DOWNLOAD_URL[1].format(path=self._get_path()),
            body='')

        self.client.upload(self.chunk, TEST_CHUNK_BODY)
项目:cloudstrype    作者:btimby    | 项目源码 | 文件源码
def test_upload(self):
        httpretty.register_uri(
            httpretty.PUT,
            OnedriveAPIClient.DOWNLOAD_URL[1].format(path=self._get_path()),
            body='')

        self.client.upload(self.chunk, TEST_CHUNK_BODY)
项目:cloudstrype    作者:btimby    | 项目源码 | 文件源码
def test_upload(self):
        httpretty.register_uri(
            httpretty.PUT,
            OnedriveAPIClient.DOWNLOAD_URL[1].format(path=self._get_path()),
            body='')

        self.client.upload(self.chunk, TEST_CHUNK_BODY)
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_create_user():
    server = BootstrapServer("http://test.test:5984")
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/_users"
    )
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest",
        status=201
    )
    server.create_user("test", "test")
项目:paystack-python    作者:andela-sjames    | 项目源码 | 文件源码
def test_update(self):
        """Function defined to test paystackapi plan update."""
        httpretty.register_uri(
            httpretty.PUT,
            "https://api.paystack.co/plan/78",
            content_type='text/json',
            body='{"status": true, "contributors": true}',
            status=201,
        )

        response = Plan.update(plan_id=78, interval="monthly", amount=70000)
        self.assertEqual(response['status'], True)
项目:paystack-python    作者:andela-sjames    | 项目源码 | 文件源码
def test_update(self):
        """Function defined to test paystackapi customer update."""
        httpretty.register_uri(
            httpretty.PUT,
            "https://api.paystack.co/customer/4013",
            content_type='text/json',
            body='{"status": true, "contributors": true}',
            status=201,
        )

        response = Customer.update(customer_id=4013, first_name='andela')
        self.assertEqual(response['status'], True)
项目:paystack-python    作者:andela-sjames    | 项目源码 | 文件源码
def test_put_method_called(self):

        httpretty.register_uri(
            httpretty.PUT,
            "https://api.paystack.co/customer/4013",
            content_type='text/json',
            body='{"status": true, "contributors": null}',
            status=302,
        )

        req = PayStackRequests()
        response = req.put('customer/4013',
                           data={'test': 'requests'})
        self.assertTrue(response['status'])
项目:openag_python    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_init_without_cloud_server(
    config, push_design_documents, get_or_create, generate_config
):
    runner = CliRunner()

    generate_config.return_value = {"test": {"test": "test"}}
    httpretty.register_uri(
        httpretty.GET, "http://localhost:5984/_config/test/test",
        body='"test_val"'
    )
    httpretty.register_uri(
        httpretty.PUT, "http://localhost:5984/_config/test/test"
    )

    # Show -- Should throw an error because no local server is selected
    res = runner.invoke(show)
    assert res.exit_code, res.output

    # Init -- Should work and push the design documents but not replicate
    # anything
    res = runner.invoke(init)
    assert res.exit_code == 0, res.exception or res.output
    assert get_or_create.call_count == len(all_dbs)
    assert push_design_documents.call_count == 1
    push_design_documents.reset_mock()

    # Show -- Should work
    res = runner.invoke(show)
    assert res.exit_code == 0, res.exception or res.output

    # Init -- Should throw an error because a different database is already
    # selected
    res = runner.invoke(init, ["--db_url", "http://test.test:5984"])
    assert res.exit_code, res.output
项目:openag_python    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_user_with_cloud_server(config):
    runner = CliRunner()

    # Register -- Should work
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/_users"
    )
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest",
        status=201
    )
    res = runner.invoke(register, input="test\ntest\ntest\n")
    assert res.exit_code == 0, res.exception or res.output

    # Login -- Should work
    httpretty.register_uri(
        httpretty.GET, "http://test.test:5984/_session"
    )
    res = runner.invoke(login, input="test\ntest\n")
    assert res.exit_code == 0, res.exception or res.output

    # Login -- Should throw an error because you're already logged in as a
    # different user
    res = runner.invoke(login, input="test2\ntest2\n")
    assert res.exit_code, res.output
    assert isinstance(res.exception, SystemExit)

    # Logout -- Should work
    res = runner.invoke(logout)
    assert res.exit_code == 0, res.exception or res.output
项目:openag_python    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_create_user():
    server = Server("http://test.test:5984")
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/_users"
    )
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest",
        status=201
    )
    server.create_user("test", "test")
项目:ouroboros    作者:madedotcom    | 项目源码 | 文件源码
def given_an_existing_user(self):
        self.start_mocking_http()
        self.fake_response('/users/foo', file='user-foo.json')
        self.expect_call('/users/foo', httpretty.PUT)
项目:ouroboros    作者:madedotcom    | 项目源码 | 文件源码
def given_an_existing_user(self):
        self.start_mocking_http()
        self.fake_response('/users/admin', file='user-admin.json')
        self.expect_call('/users/admin', httpretty.PUT)
项目:ouroboros    作者:madedotcom    | 项目源码 | 文件源码
def given_an_existing_user(self):
        self.start_mocking_http()
        self.fake_response('/users/admin', file='user-admin.json')
        self.expect_call('/users/admin', httpretty.PUT)
项目:ouroboros    作者:madedotcom    | 项目源码 | 文件源码
def given_an_existing_user(self):
        self.start_mocking_http()
        self.fake_response('/users/admin', file='user-admin.json')
        self.expect_call('/users/admin', httpretty.PUT)
项目:ouroboros    作者:madedotcom    | 项目源码 | 文件源码
def given_an_existing_user(self):
        self.start_mocking_http()
        self.fake_response('/users/giddy', file='user-giddy.json')
        self.expect_call('/users/giddy', httpretty.PUT)
项目:swagger-stub    作者:Trax-air    | 项目源码 | 文件源码
def swagger_stub(swagger_files_url):
    """Fixture to stub a microservice from swagger files.

    To use this fixture you need to define a swagger fixture named
    swagger_files_url with the path to your swagger files, and the url to stub.
    Then just add this fixture to your tests and your request pointing to the
    urls in swagger_files_url will be managed by the stub.

    Example:
        @pytest.fixture
        def swagger_files_url():
            return [('tests/swagger.yaml', 'http://localhost:8000')]
    """
    httpretty.enable()

    for i in swagger_files_url:  # Get all given swagger files and url
        base_url = i[1]
        s = SwaggerParser(i[0])
        swagger_url[base_url] = s

        # Register all urls
        httpretty.register_uri(
            httpretty.GET, re.compile(base_url + r'/.*'),
            body=get_data_from_request)

        httpretty.register_uri(
            httpretty.POST, re.compile(base_url + r'/.*'),
            body=get_data_from_request)

        httpretty.register_uri(
            httpretty.PUT, re.compile(base_url + r'/.*'),
            body=get_data_from_request)

        httpretty.register_uri(
            httpretty.PATCH, re.compile(base_url + r'/.*'),
            body=get_data_from_request)

        httpretty.register_uri(
            httpretty.DELETE, re.compile(base_url + r'/.*'),
            body=get_data_from_request)

        memory[base_url] = StubMemory(s)
        yield memory[base_url]

    # Close httpretty
    httpretty.disable()
    httpretty.reset()
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_replicate():
    # Start a replication
    server = BootstrapServer("http://test.test:5984")
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/_replicator"
    )
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/_replicator/test",
        status=404
    )
    def replicate_test_src(request, uri, headers):
        httpretty.reset()
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/_replicator"
        )
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/_replicator/test",
            status=200, etag="a"
        )
        return 201, headers, json.dumps({
            "id": "test_src", "rev": "a", "ok": True
        })
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/_replicator/test",
        status=201, content_type="application/json", body=replicate_test_src
    )
    server.replicate("test", "test_src", "test_dest", continuous=True)
    assert "test" in server["_replicator"]

    # Make sure replicate is idempotent
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/_replicator/test_src",
        status=500
    )
    server.replicate("test", "test_src", "test_dest", continuous=True)
    assert "test" in server["_replicator"]

    # Cancel the replication
    def cancel_test_replication(request, uri, headers):
        httpretty.reset()
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/_replicator"
        )
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/_replicator/test",
            status=404
        )
        return 200, headers, ""
    httpretty.register_uri(
        httpretty.DELETE, "http://test.test:5984/_replicator/test",
        status=200, body=cancel_test_replication
    )
    server.cancel_replication("test")
    assert "test" not in server["_replicator"]
项目:openag_brain    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_push_design_documents():
    server = BootstrapServer("http://test.test:5984")

    tempdir = tempfile.mkdtemp()
    try:
        test_db_path = os.path.join(tempdir, "test")
        os.mkdir(test_db_path)
        hidden_db_path = os.path.join(tempdir, ".test")
        os.mkdir(hidden_db_path)
        views_path = os.path.join(test_db_path, "views")
        os.mkdir(views_path)
        test_view_path = os.path.join(views_path, "test")
        os.mkdir(test_view_path)
        map_path = os.path.join(test_view_path, "map.js")
        with open(map_path, "w+") as f:
            f.write("test")
        hidden_map_path = os.path.join(test_view_path, ".test.js")
        with open(hidden_map_path, "w+") as f:
            f.write("test")

        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/test"
        )
        def create_design_doc(request, uri, headers):
            obj = {
                "_id": "_design/openag",
                "views": {
                    "test": {
                        "map": "test"
                    }
                }
            }
            if json.loads(request.body) == obj:
                return 200, headers, json.dumps({
                    "id": "_design/openag",
                    "rev": "a"
                })
            else:
                return 500, headers, ""
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/test/_design/openag",
            status=404
        )
        httpretty.register_uri(
            httpretty.PUT, "http://test.test:5984/test/_design/openag",
            body=create_design_doc, content_type="application/json"
        )
        server.push_design_documents(tempdir)
    finally:
        shutil.rmtree(tempdir)
项目:openag_python    作者:OpenAgInitiative    | 项目源码 | 文件源码
def test_replicate():
    # Start a replication
    server = Server("http://test.test:5984")
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/_replicator"
    )
    httpretty.register_uri(
        httpretty.HEAD, "http://test.test:5984/_replicator/test",
        status=404
    )
    def replicate_test_src(request, uri, headers):
        httpretty.reset()
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/_replicator"
        )
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/_replicator/test",
            status=200, etag="a"
        )
        return 201, headers, json.dumps({
            "id": "test_src", "rev": "a", "ok": True
        })
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/_replicator/test",
        status=201, content_type="application/json", body=replicate_test_src
    )
    server.replicate("test", "test_src", "test_dest", continuous=True)
    assert "test" in server["_replicator"]

    # Make sure replicate is idempotent
    httpretty.register_uri(
        httpretty.PUT, "http://test.test:5984/_replicator/test_src",
        status=500
    )
    server.replicate("test", "test_src", "test_dest", continuous=True)
    assert "test" in server["_replicator"]

    # Cancel the replication
    def cancel_test_replication(request, uri, headers):
        httpretty.reset()
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/_replicator"
        )
        httpretty.register_uri(
            httpretty.HEAD, "http://test.test:5984/_replicator/test",
            status=404
        )
        return 200, headers, ""
    httpretty.register_uri(
        httpretty.DELETE, "http://test.test:5984/_replicator/test",
        status=200, body=cancel_test_replication
    )
    server.cancel_replication("test")
    assert "test" not in server["_replicator"]