其他分享
首页 > 其他分享> > Boto3访问S3的基本用法

Boto3访问S3的基本用法

作者:互联网

一、简述Boto3

  1. Boto3有两种API,低级和高级
  1. Boto3 是整个 AWS 的 SDK, 而不只是包括 S3. 还可以用来访问 SQS, EC2 等等。
  2. boto3.resource(“s3”)例子
import boto3

s3 = boto3.resource("s3")

# 创建一个 bucket
bucket = s3.create_bucket(Bucket="my-bucket")

# 获得所有的 bucket, boto 会自动处理 API 的翻页等信息。
for bucket in s3.buckets.all():
    print(bucket.name)

# 过滤 bucket, 同样返回一个 bucket_iterator
s3.buckets.fitler()

# 生成一个 Bucket 资源对象
bucket = s3.Bucket("my-bucket")
bucket.name  # bucket 的名字
bucket.delete()  # 删除 bucket

# 删除一些对象
bucket.delete_objects(
    Delete={
        'Objects': [
            {
                'Key': 'string',
                'VersionId': 'string'
            },
        ],
        'Quiet': True|False
    },
)
# 返回结果
{
    'Deleted': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'DeleteMarker': True|False,
            'DeleteMarkerVersionId': 'string'
        },
    ],
    'RequestCharged': 'requester',
    'Errors': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'Code': 'string',
            'Message': 'string'
        },
    ]
}

# 下载文件
bucket.download_file(Key, Filename, ExtraArgs=None, Callback=None, Config=None)

# 下载到文件对象,可能会自动开启多线程下载
with open('filename', 'wb') as data:
    bucket.download_fileobj('mykey', data)

# 上传文件
object = bucket.put_object(Body=b"data"|file, ContentMD5="", Key="xxx")

# 这个方法会自动开启多线程上传
with open('filename', 'rb') as f:
    bucket.upload_fileobj(f, 'mykey')

# 列出所有对象
bucket.objects.all()

# 过滤并返回对象
objects = bucket.objects.filter(
    Delimiter='string',
    EncodingType='url',
    Marker='string',
    MaxKeys=123,
    Prefix='string',
    RequestPayer='requester',
    ExpectedBucketOwner='string'
)

# 创建一个对象
obj = bucket.Object("xxx")
# 或者
obj = s3.Object("my-bucket", "key")

obj.bucket_name
obj.key

# 删除对象
obj.delete()
# 下载对象
obj.download_file(path)
# 自动多线程下载
with open('filename', 'wb') as data:
    obj.download_fileobj(data)
# 获取文件内容
rsp = obj.get()
body = rsp["Body"].read()  # 文件内容
obj.put(Body=b"xxx"|file, ContentMD5="")

# 上传文件
obj.upload_file(filename)
# 自动多线程上传
obj.upload_fileobj(fileobj)

二、Low-level clients

  1. 创建clients
import boto3

# Create a low-level client with the service name
sqs = boto3.client('sqs')
# Create the resource
sqs_resource = boto3.resource('sqs')

# Get the client from the resource
sqs = sqs_resource.meta.client
  1. 服务操作(Service operations)

服务操作映射到同名客户端的方法,并通过关键字参数提供对相同操作参数的访问;

# Make a call using the low-level client
response = sqs.send_message(QueueUrl='...', MessageBody='...')
  1. 处理响应(Handing responses)

响应作为 python 字典返回,可以遍历或以其他方式处理所需数据的响应,响应可能并不总是包含所有预期数据;

# List all your queues
response = sqs.list_queues()
for url in response.get('QueueUrls', []):
    print(url)
{ 
    "QueueUrls" :  [ 
        "http://url1" , 
        "http://url2" , 
        "http://url3" 
    ] 
}
  1. Waiters

Waiters 使用client的服务操作来轮询 AWS 资源的状态并暂停执行,直到 AWS 资源达到 Waiter 正在轮询的状态或轮询时发生故障。通过使用client,可以了解client有权访问的每个Waiter的名字:

import boto3

s3 = boto3.client('s3')
sqs = boto3.client('sqs')

# List all of the possible waiters for both clients
print("s3 waiters:")
s3.waiter_names

print("sqs waiters:")
sqs.waiter_names
s3 waiters:
[u'bucket_exists', u'bucket_not_exists', u'object_exists', u'object_not_exists']
sqs waiters:
[]
# Retrieve waiter instance that will wait till a specified
# S3 bucket exists
s3_bucket_exists_waiter = s3.get_waiter('bucket_exists')
# Begin waiting for the S3 bucket, mybucket, to exist
s3_bucket_exists_waiter.wait(Bucket='mybucket')
  1. 客户端的多线程或多处理(Multithreading or multiprocessing with clients)

多处理(Multi-Processing):虽然客户端是线程安全的,但由于它们的网络实现,它们不能跨进程共享。这样做可能会导致调用服务时响应顺序不正确;

import boto3.session
from concurrent.futures import ThreadPoolExecutor

def do_s3_task(client, task_definition):
    # Put your thread-safe code here

def my_workflow():
    # Create a session and use it to make our client
    session = boto3.session.Session()
    s3_client = session.client('s3')

    # Define some work to be done, this can be anything
    my_tasks = [ ... ]

    # Dispatch work tasks with our s3_client
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(do_s3_task, s3_client, task) for task in my_tasks]

三、resource

  1. 简述
# Get resources from the default session
sqs = boto3.resource('sqs')
s3 = boto3.resource('s3')
  1. 标识符和属性
# SQS Queue (url is an identifier)
queue = sqs.Queue(url='http://...')
print(queue.url)

# S3 Object (bucket_name and key are identifiers)
obj = s3.Object(bucket_name='boto3', key='test.py')
print(obj.bucket_name)
print(obj.key)

# Raises exception, missing identifier: key!
obj = s3.Object(bucket_name='boto3')
# SQS Queue
queue = sqs.Queue('http://...')

# S3 Object
obj = s3.Object('boto3', 'test.py')

# Raises exception, missing key!
obj = s3.Object('boto3')
>>> bucket1 = s3.Bucket('boto3')
>>> bucket2 = s3.Bucket('boto3')
>>> bucket3 = s3.Bucket('some-other-bucket')

>>> bucket1 == bucket2
True
>>> bucket1 == bucket3
False
# SQS Message
message.body

# S3 Object
obj.last_modified
obj.e_tag
  1. 动作(Actions)

动作是调用服务的方法。操作可能会返回低级响应、新资源实例或新资源实例列表。动作自动将资源标识符设置为参数,但允许您通过关键字参数传递其他参数。

# SQS Queue
messages = queue.receive_messages()

# SQS Message
for message in messages:
    message.delete()

# S3 Object
obj = s3.Object(bucket_name='boto3', key='test.py')
response = obj.get()
data = response['Body'].read()
# SQS Service
queue = sqs.get_queue_by_name(QueueName='test')

# SQS Queue
queue.send_message(MessageBody='hello')
  1. 子资源(Sub-resources)

子资源类似于引用,但它是一个相关的类而不是一个实例。子资源在实例化时与其父资源共享标识符。这是一种严格的亲子关系。在关系方面,这些可以被认为是一对多的。

# SQS
queue = sqs.Queue(url='...')
message = queue.Message(receipt_handle='...')
print(queue.url == message.queue_url)
print(message.receipt_handle)

# S3
obj = bucket.Object(key='new_file.txt')
print(obj.bucket_name)
print(obj.key)
  1. Waiters
# S3: Wait for a bucket to exist.
bucket.wait_until_exists()

# EC2: Wait for an instance to reach the running state.
instance.wait_until_running()
  1. 多线程或多处理资源(Multithreading or multiprocessing with resources)
import boto3
import boto3.session
import threading

class MyTask(threading.Thread):
    def run(self):
        # Here we create a new session per thread
        session = boto3.session.Session()

        # Next, we create a resource client using our thread's session object
        s3 = session.resource('s3')

        # Put your thread-safe code here

四、session(会话)

  1. 默认会话(Default session)
import boto3

# Using the default session
sqs = boto3.client('sqs')
s3 = boto3.resource('s3')
  1. 自定义会话(Custom session)
import boto3
import boto3.session

# Create your own session
my_session = boto3.session.Session()

# Now we can create low-level clients or resource clients from our custom session
sqs = my_session.client('sqs')
s3 = my_session.resource('s3')
  1. 会话配置(Session configurations)

使用特定凭证、AWS 区域信息或配置文件配置每个会话;

  1. 使用会话进行多线程或多处理
import boto3
import boto3.session
import threading

class MyTask(threading.Thread):
    def run(self):
        # Here we create a new session per thread
        session = boto3.session.Session()

        # Next, we create a resource client using our thread's session object
        s3 = session.resource('s3')

        # Put your thread-safe code here

标签:boto3,obj,S3,bucket,用法,Boto3,s3,session,sqs
来源: https://blog.csdn.net/weixin_44536215/article/details/121283038