我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用swiftclient.Connection()。
def GetObjectStorage(container, filename): if 'VCAP_SERVICES' not in os.environ: return MakeJSONMsgResponse({"message":"cannot authenticate"}, 500) try: ibmobjectstoreconn = swiftclient.Connection(key=password, authurl=authurl, auth_version='3', os_options={"project_id": projectId,"user_id": userId,"region_name": region}) obj = ibmobjectstoreconn.get_object(container, filename) if filename.endswith('.txt'): return Response(obj[1], mimetype='text/plain', status=200) elif filename.endswith('.csv'): return Response(obj[1], mimetype='text/csv', status=200) elif filename.endswith('.json'): return Response(obj[1], mimetype='application/json', status=200) else: return Response(obj[1], mimetype='application/binary', status=200) except ClientException as ce: return MakeJSONMsgResponse({"message":ce.msg,"containername":container,"filename":filename}, 404)
def _get_object_storage_client(self, username, password, tenant_name): auth_url = CONF.identity.uri # add current tenant to swift operator role group. keystone_admin = self._get_identity_client( CONF.identity.admin_username, CONF.identity.admin_password, CONF.identity.admin_tenant_name) # enable test user to operate swift by adding operator role to him. roles = keystone_admin.roles.list() operator_role = CONF.object_storage.operator_role member_role = [role for role in roles if role.name == operator_role][0] # NOTE(maurosr): This is surrounded in the try-except block cause # neutron tests doesn't have tenant isolation. try: keystone_admin.roles.add_user_role(self.identity_client.user_id, member_role.id, self.identity_client.tenant_id) except keystoneclient.apiclient.exceptions.Conflict: pass return swiftclient.Connection(auth_url, username, password, tenant_name=tenant_name, auth_version='2')
def authenticate_swift_user(self, keystone, user, password, tenant): """Authenticates a regular user with swift api.""" self.log.debug('Authenticating swift user ({})...'.format(user)) ep = keystone.service_catalog.url_for(service_type='identity', endpoint_type='publicURL') return swiftclient.Connection(authurl=ep, user=user, key=password, tenant_name=tenant, auth_version='2.0')
def authenticate_swift_user(self, keystone, user, password, tenant): """Authenticates a regular user with swift api.""" self.log.debug('Authenticating swift user ({})...'.format(user)) ep = keystone.service_catalog.url_for(service_type='identity', interface='publicURL') if keystone.session: return swiftclient.Connection(session=keystone.session) else: return swiftclient.Connection(authurl=ep, user=user, key=password, tenant_name=tenant, auth_version='2.0')
def GetObjStoreInfo(): try: ibmobjectstoreconn = swiftclient.Connection(key=password, authurl=authurl, auth_version='3', os_options={"project_id": projectId,"user_id": userId,"region_name": region}) conns = [] for container in ibmobjectstoreconn.get_account()[1]: container['accessURL'] = thehost + "/" + container['name'] container['objects'] = container['count'] del container['count'] conns.append(container) jconns = '{"containers":'+json.dumps(conns)+'}' return Response(jconns, mimetype='application/json', status=200) except ClientException as ce: return MakeJSONMsgResponse({"message":ce.msg,"containername":container,"filename":filename}, 404)
def GetObjStoContainerInfo(container): try: ibmobjectstoreconn = swiftclient.Connection(key=password, authurl=authurl, auth_version='3', os_options={"project_id": projectId,"user_id": userId,"region_name": region}) objs = [] for data in ibmobjectstoreconn.get_container(container)[1]: del data['hash'] data['downloadURL'] = thehost + "/" + container + "/" + data['name'] objs.append(data) jobjs = '{"objects":'+json.dumps(objs)+'}' return Response(jobjs, mimetype='application/json', status=200) except ClientException as ce: return MakeJSONMsgResponse({"message":ce.msg+". Bad container name?","containername":container}, 404)
def get_swiftclient(): session = _get_user_keystone_session() conn = swiftclient.Connection(session=session) return conn
def main(context, container, object): conn = swiftclient.Connection( session=context['os_session'], os_options={'region_name': 'RegionOne'}, ) new_container = '%s_thumb' % container # Download original photo image_path = '/%s' % object _, obj_contents = conn.get_object(container, object) with open(image_path, 'w') as local: local.write(obj_contents) print('Downloaded object %s from container %s' % (object, container)) thumb_path = '/thumb_%s' % object resize_image(image_path, thumb_path) print('Resized.') # Upload thumb photo with open(thumb_path, 'r') as new_local: conn.put_object( new_container, object, contents=new_local, content_type='text/plain' ) os.remove(image_path) os.remove(thumb_path) print('Uploaded object %s to container %s' % (object, new_container))
def stat_object(context, container, object): conn = swiftclient.Connection( session=context['os_session'], os_options={'region_name': 'RegionOne'}, ) obj_header = conn.head_object(container, object) return obj_header