产品文档
对象存储OSS

SDK-Python

概述

若您需要使用Python来访问新睿云的对象存储服务,我们可以使用第三方的boto3接口。它可以看作是符合AWS S3 API标准的Python SDK。关于如何使用boto3,请参考https://boto3.readthedocs.io/en/latest/

安装pip和boto3

pip的安装:https://pip.pypa.io/en/stable/installing/

使用aws sdk-python前,您需要先安装boto3包,命令如下(请确定pip已经安装):

pip install boto3

示例

下面我们给出了一个创建client的例子,更多信息请参考:http://boto3.readthedocs.io/en/latest/reference/services/s3.html

import boto3
import botocore
import sys
 
# 您的ak/sk
ACCESS_KEY = "xxxxxxxxxxxxxxxxxxxxx"
SECRET_KEY= "xXxxxxxxxxxxxxxxxxxxxx"
 
# 访问点
S3_ENDPOINT_URL="http://s3.cn-north-1.xrcloudcs.com"  # s3 endpoint
 
class AwsClient:
 
    # 返回创建的访问云存储的资源provider
    def __init__(self, endpoint_url=None, access_key=None, secret_key=None):
        s3 = boto3.resource('s3',
            endpoint_url=endpoint_url,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key)
        self.provider = s3.meta.client
 
    # 创建一个bucket
    def create_bucket(self, bucket_name):
        self.provider.create_bucket(Bucket=bucket_name)
 
    # 如果bucket不存在,则创建它
    def create_bucket_if_not_existed(self, bucket_name):
        if not self.has_bucket(bucket_name):
            self.create_bucket(bucket_name)
    
    # 删除一个bucket  
    def delete_bucket(self, bucket_name):
        self.provider.delete_bucket(Bucket=bucket_name)
 
    # 删除bucket中的对象
    def delete_bucket_objects(self, bucket_name):
        res = self.get_bucket(bucket_name)
        for aBucket in res:
            self.delete_object(bucket_name, aBucket)
 
    # 返回所有的bucket
    def list_buckets(self):
        response = self.provider.list_buckets()
        # Get a list of all bucket names from the response
        buckets = [bucket['Name'] for bucket in response['Buckets']]
        return buckets
 
    # 判断用户是否已经创建了bucket
    def has_bucket(self, bucket_name):
        try:
            res = self.provider.head_bucket(Bucket=bucket_name)
        except botocore.exceptions.ClientError as e:
            print e
            if e.response['Error']['Code'] == "404":
                return False
        return True
 
    # 返回bucket中的所有对象
    def get_bucket(self, bucket_name):
        res = self.provider.list_objects(Bucket=bucket_name)
        rl = list()
        if "Contents" in res:
           for obj in res["Contents"]:
               rl.append(obj["Key"])
        return rl
            
    # 将本地文件上传到bucket
    def put_object(self, bucket_name, key, file_name):
        f = open(file_name, "r")
        self.provider.put_object(Body=f,
            Bucket=bucket_name,
            Key=key)
 
    # 删除一个对象
    def delete_object(self, bucket_name, key):
        return self.provider.delete_object(Bucket=bucket_name, Key=key)
 
    # Head一个对象
    def head_object(self, bucket_name, key):
        res = self.provider.head_object(Bucket=bucket_name,
                                       Key=key)
        return res
 
    # Get一个对象到本地文件
    def download_file(self, bucket_name, key, file_name):
        try:
            self.provider.download_file(bucket_name, key, file_name)
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == "404":
                raise "AccessDenied"
            else:
                raise e
 
    # 设置acl
    def put_bucket_acl(self, bucket_name, acl):
        res = self.provider.put_bucket_acl(ACL=acl, Bucket=bucket_name)
 
    def delete_bucket_policy(self, bucket_name):
        res = self.provider.delete_bucket_policy(Bucket=bucket_name)
 
    def put_bucket_policy(self, bucket_name, bucket_policy):
        # Set the new policy on the given bucket
        try:
            self.provider.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)
        except botocore.exceptions.ClientError as e:
            raise e
 
    def get_location(self, bucket_name):
        return self.provider.get_bucket_location(Bucket=bucket_name)
 
    def copy_object(self, source_bucket_name, source_key, dest_bucket_name, dest_key):
        self.provider.copy_object(Bucket=dest_bucket_name,
              CopySource={'Bucket':source_bucket_name, 'Key': source_key},
              Key=dest_key)
 
client = AwsClient(S3_ENDPOINT_URL, ACCESS_KEY, SECRET_KEY)
client.create_bucket("mybucket")
client.put_object("mybucket", "cat.jpg", "/tmp/cat.jpg")
client.download_file("mybucket","cat.jpg","/tmp/cat.jpg")


如果您还有其他疑问,您还可以联系客服>
在线咨询
咨询热线 400-1515-720
投诉与建议
{{item.description}}

—您的烦恼我们已经收到—

我们会将处理结果发送至您的手机

请耐心等待