若您需要使用Python来访问新睿云的对象存储服务,我们可以使用第三方的boto3接口。它可以看作是符合AWS S3 API标准的Python SDK。关于如何使用boto3,请参考https://boto3.readthedocs.io/en/latest/。
pip的安装:https://pip.pypa.io/en/stable/installing/
使用aws sdk-python前,您需要先安装boto3包,命令如下(请确定pip已经安装):
pip install boto3
下面我们给出了一个创建client的例子,更多信息请参考:http://boto3.readthedocs.io/en/latest/reference/services/s3.html
import boto3
import botocore
import sys
# 您的ak/sk
ACCESS_KEY = "xxxxxxxxxxxxxxxxxxxxx"
SECRET_KEY= "xXxxxxxxxxxxxxxxxxxxxx"
# 访问点
S3_ENDPOINT_URL="http://s3.cn-north-1.xrcloudcs.com" # s3 endpoint
class AwsClient:
# 返回创建的访问云存储的资源provider
def __init__(self, endpoint_url=None, access_key=None, secret_key=None):
s3 = boto3.resource('s3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
self.provider = s3.meta.client
# 创建一个bucket
def create_bucket(self, bucket_name):
self.provider.create_bucket(Bucket=bucket_name)
# 如果bucket不存在,则创建它
def create_bucket_if_not_existed(self, bucket_name):
if not self.has_bucket(bucket_name):
self.create_bucket(bucket_name)
# 删除一个bucket
def delete_bucket(self, bucket_name):
self.provider.delete_bucket(Bucket=bucket_name)
# 删除bucket中的对象
def delete_bucket_objects(self, bucket_name):
res = self.get_bucket(bucket_name)
for aBucket in res:
self.delete_object(bucket_name, aBucket)
# 返回所有的bucket
def list_buckets(self):
response = self.provider.list_buckets()
# Get a list of all bucket names from the response
buckets = [bucket['Name'] for bucket in response['Buckets']]
return buckets
# 判断用户是否已经创建了bucket
def has_bucket(self, bucket_name):
try:
res = self.provider.head_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
print e
if e.response['Error']['Code'] == "404":
return False
return True
# 返回bucket中的所有对象
def get_bucket(self, bucket_name):
res = self.provider.list_objects(Bucket=bucket_name)
rl = list()
if "Contents" in res:
for obj in res["Contents"]:
rl.append(obj["Key"])
return rl
# 将本地文件上传到bucket
def put_object(self, bucket_name, key, file_name):
f = open(file_name, "r")
self.provider.put_object(Body=f,
Bucket=bucket_name,
Key=key)
# 删除一个对象
def delete_object(self, bucket_name, key):
return self.provider.delete_object(Bucket=bucket_name, Key=key)
# Head一个对象
def head_object(self, bucket_name, key):
res = self.provider.head_object(Bucket=bucket_name,
Key=key)
return res
# Get一个对象到本地文件
def download_file(self, bucket_name, key, file_name):
try:
self.provider.download_file(bucket_name, key, file_name)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
raise "AccessDenied"
else:
raise e
# 设置acl
def put_bucket_acl(self, bucket_name, acl):
res = self.provider.put_bucket_acl(ACL=acl, Bucket=bucket_name)
def delete_bucket_policy(self, bucket_name):
res = self.provider.delete_bucket_policy(Bucket=bucket_name)
def put_bucket_policy(self, bucket_name, bucket_policy):
# Set the new policy on the given bucket
try:
self.provider.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)
except botocore.exceptions.ClientError as e:
raise e
def get_location(self, bucket_name):
return self.provider.get_bucket_location(Bucket=bucket_name)
def copy_object(self, source_bucket_name, source_key, dest_bucket_name, dest_key):
self.provider.copy_object(Bucket=dest_bucket_name,
CopySource={'Bucket':source_bucket_name, 'Key': source_key},
Key=dest_key)
client = AwsClient(S3_ENDPOINT_URL, ACCESS_KEY, SECRET_KEY)
client.create_bucket("mybucket")
client.put_object("mybucket", "cat.jpg", "/tmp/cat.jpg")
client.download_file("mybucket","cat.jpg","/tmp/cat.jpg")