对象存储python SDK开发
作者:互联网
在这里插入代码片
#!/usr/bin/python
-- coding:utf-8 --
Copyright 2019 Huawei Technologies Co.,Ltd.
Licensed under the Apache License, Version 2.0 (the “License”); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
import time
import functools
import threading
import os
import re
import traceback
import math
import random
from obs import const, convertor, util, auth, locks, progress
from obs.cache import LocalCache
from obs.ilog import NoneLogClient, INFO, WARNING, ERROR, DEBUG, LogClient
from obs.transfer import _resumer_upload, _resumer_download
from obs.extension import _download_files
from obs.model import Logging
from obs.model import AppendObjectHeader
from obs.model import AppendObjectContent
from obs.model import Notification
from obs.model import ListMultipartUploadsRequest
from obs.model import PutObjectHeader
from obs.model import BaseModel
from obs.model import GetResult
from obs.model import ObjectStream
from obs.model import ResponseWrapper
from obs.model import CreateBucketHeader
from obs.model import ACL
from obs.model import Versions
from obs.model import GetObjectRequest
from obs.model import GetObjectHeader
from obs.model import CopyObjectHeader
from obs.model import SetObjectMetadataHeader
from obs.bucket import BucketClient
from obs import loadtoken
from inspect import isfunction
if const.IS_PYTHON2:
from urlparse import urlparse
import httplib
else:
import http.client as httplib
from urllib.parse import urlparse
class _RedirectException(Exception):
def init(self, msg, location, result=None):
self.msg = msg
self.location = location
self.result = result
def __str__(self):
return self.msg
class _InternalException(Exception):
def init(self, result):
self.result = result
class _SecurityProvider(object):
def init(self, access_key_id, secret_access_key, security_token=None):
access_key_id = util.to_string(util.safe_encode(access_key_id)).strip()
secret_access_key = util.to_string(util.safe_encode(secret_access_key)).strip()
security_token = util.to_string(util.safe_encode(security_token)).strip() if security_token is not None else None
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key
self.security_token = security_token
def funcCache(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
ret = None
obsClient = args[0] if isinstance(args[0], ObsClient) else None
try:
if obsClient:
obsClient.log_client.log(INFO, ‘enter %s …’ % func.name)
key = ‘’
if func.name == ‘copyObject’:
if ‘destBucketName’ in kwargs:
key = kwargs[‘destBucketName’]
elif len(args) >= 4:
key = args[3]
if not obsClient.is_cname:
obsClient._assert_not_null(key, 'destBucketName is empty')
elif func.__name__ != 'listBuckets':
if len(args) > 1:
key = args[1]
elif 'bucketName' in kwargs:
key = kwargs['bucketName']
if not obsClient.is_cname:
obsClient._assert_not_null(key, 'bucketName is empty')
if obsClient.is_signature_negotiation:
caches = obsClient.cache
if func.__name__ == 'listBuckets':
authType, resp = obsClient._getApiVersion()
if not authType:
return resp
obsClient.thread_local.signature = authType
else:
result_dic = caches.get(key)
if not result_dic:
with locks.get_lock(hash(key) % locks.LOCK_COUNT):
result_dic = caches.get(key)
if not result_dic:
authType, resp = obsClient._getApiVersion() if func.__name__ == 'createBucket' else obsClient._getApiVersion(key)
if not authType :
return resp
result_dic = {'signature' : authType, 'expire' : random.randint(900, 1200) + caches.nowTime()}
if func.__name__ != 'createBucket':
caches.set(key, result_dic)
obsClient.thread_local.signature = result_dic['signature']
ret = func(*args, **kwargs)
except Exception as e:
if obsClient and obsClient.log_client:
obsClient.log_client.log(INFO, traceback.format_exc())
raise e
finally:
if obsClient:
obsClient.log_client.log(INFO, '%s cost %s ms' % (func.__name__, int((time.time() - start) * 1000)))
if obsClient.is_signature_negotiation and hasattr(obsClient.thread_local, 'signature'):
del obsClient.thread_local.signature
return ret
return wrapper
class HaWrapper(object):
def init(self, thread_local, signature):
self.obsHA = convertor.Adapter(‘obs’)
self.v2HA = convertor.Adapter(‘v2’)
self.v4HA = convertor.Adapter(‘v4’)
self.thread_local = thread_local
self.signature = signature
def __getattr__(self, item):
signature = self.thread_local.signature if hasattr(self.thread_local, 'signature') else self.signature
ha = self.obsHA if signature == 'obs' else self.v4HA if signature == 'v4' else self.v2HA
return getattr(ha, item)
class ConvertWrapper(object):
def init(self, thread_local, signature):
self.obsCover = convertor.Convertor(‘obs’, convertor.Adapter(‘obs’))
self.v2Cover = convertor.Convertor(‘v2’, convertor.Adapter(‘v2’))
self.v4Cover = convertor.Convertor(‘v4’, convertor.Adapter(‘v4’))
self.thread_local = thread_local
self.signature = signature
def __getattr__(self, item):
signature = self.thread_local.signature if hasattr(self.thread_local, 'signature') else self.signature
convert = self.obsCover if signature == 'obs' else self.v4Cover if signature == 'v4' else self.v2Cover
return getattr(convert, item)
class _BasicClient(object):
def init(self, access_key_id=’’, secret_access_key=’’, is_secure=True, server=None,
signature=‘obs’, region=‘region’, path_style=False, ssl_verify=False,
port=None, max_retry_count=3, timeout=60, chunk_size=65536,
long_conn_mode=False, proxy_host=None, proxy_port=None,
proxy_username=None, proxy_password=None, security_token=None,
custom_ciphers=None, use_http2=False, is_signature_negotiation=True, is_cname=False,
max_redirect_count=10, security_providers=None, security_provider_policy=None):
self.securityProvider = _SecurityProvider(access_key_id, secret_access_key, security_token)
server = server if server is not None else ‘’
server = util.to_string(util.safe_encode(server))
_server = urlparse(server)
hostname = _server.netloc if util.is_valid(_server.netloc) else _server.path
if not util.is_valid(hostname):
raise Exception('server is not set correctly')
if util.is_valid(_server.scheme):
if _server.scheme == 'https':
is_secure = True
elif _server.scheme == 'http':
is_secure = False
host_port = hostname.split(':')
if len(host_port) == 2:
port = util.to_int(host_port[1])
self.security_provider_policy = security_provider_policy
if security_providers is None:
self.security_providers = [loadtoken.ENV, loadtoken.ECS]
else:
self.security_providers = security_providers
try:
if security_providers == []:
raise ValueError('no available security_providers')
for method in self.security_providers:
getattr(method, '__name__')
if not isfunction(method.search):
raise AttributeError(method+'has no function called search')
except Exception:
self.security_provider_policy = None
print(traceback.format_exc())
self.is_secure = is_secure
self.server = host_port[0]
path_style = True if util.is_ipaddress(self.server) else path_style
self.signature = util.to_string(util.safe_encode(signature))
self.region = region
self.path_style = path_style
self.ssl_verify = ssl_verify
self.calling_format = util.RequestFormat.get_pathformat() if self.path_style else util.RequestFormat.get_subdomainformat()
self.port = port if port is not None else const.DEFAULT_SECURE_PORT if is_secure else const.DEFAULT_INSECURE_PORT
self.max_retry_count = max_retry_count
self.timeout = timeout
self.chunk_size = chunk_size
self.log_client = NoneLogClient()
self.use_http2 = use_http2
self.is_signature_negotiation = is_signature_negotiation
self.is_cname = is_cname
self.max_redirect_count = max_redirect_count
if self.path_style or self.is_cname:
self.is_signature_negotiation = False
if self.signature == 'obs':
self.signature = 'v2'
self.context = None
if self.is_secure:
if self.use_http2:
from obs import http2
self.context = http2._get_ssl_context(self.ssl_verify)
else:
self._init_ssl_context(custom_ciphers)
self.long_conn_mode = long_conn_mode
self.connHolder = None
if self.long_conn_mode:
self._init_connHolder()
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.proxy_username = proxy_username
self.proxy_password = proxy_password
self.pattern = re.compile('xmlns="http.*?"')
self.thread_local = threading.local()
self.thread_local.signature = self.signature
if self.is_signature_negotiation:
self.cache = LocalCache(maxlen=100)
self.ha = HaWrapper(self.thread_local, self.signature)
self.convertor = ConvertWrapper(self.thread_local, self.signature)
else:
self.ha = convertor.Adapter(self.signature)
self.convertor = convertor.Convertor(self.signature, self.ha)
def _get_token(self):
from obs.searchmethod import get_token
try:
if self.security_provider_policy is not None:
if self.securityProvider.access_key_id!='' and self.securityProvider.secret_access_key!='':
return self.securityProvider
value_dict = get_token(self.security_providers, name=self.security_provider_policy)
securityProvider = _SecurityProvider(value_dict.get('accessKey'),value_dict.get('secretKey'),value_dict.get('securityToken'))
return securityProvider
except Exception:
self.log_client.log(WARNING, traceback.format_exc())
return self.securityProvider
def _init_connHolder(self):
if const.IS_PYTHON2:
from Queue import Queue
else:
from queue import Queue
self.connHolder = {'connSet' : Queue(), 'lock' : threading.Lock()}
def _init_ssl_context(self, custom_ciphers):
try:
import ssl
if hasattr(ssl, 'SSLContext'):
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
if custom_ciphers is not None:
custom_ciphers = util.to_string(custom_ciphers).strip()
if custom_ciphers != '' and hasattr(context, 'set_ciphers') and callable(context.set_ciphers):
context.set_ciphers(custom_ciphers)
if self.ssl_verify:
import _ssl
cafile = util.to_string(self.ssl_verify)
context.options |= getattr(_ssl, "OP_NO_COMPRESSION", 0)
context.verify_mode = ssl.CERT_REQUIRED
if os.path.isfile(cafile):
context.load_verify_locations(cafile)
else:
context.verify_mode = ssl.CERT_NONE
if hasattr(context, 'check_hostname'):
context.check_hostname = False
self.context = context
except Exception:
print(traceback.format_exc())
def close(self):
if self.connHolder is not None:
with self.connHolder['lock']:
while not self.connHolder['connSet'].empty():
conn = self.connHolder['connSet'].get()
if conn and hasattr(conn, 'close'):
try:
conn.close()
except Exception as ex:
self.log_client.log(WARNING, ex)
self.connHolder = None
if self.log_client:
self.log_client.close()
def refresh(self, access_key_id, secret_access_key, security_token=None):
self.securityProvider = _SecurityProvider(access_key_id, secret_access_key, security_token)
def initLog(self, log_config=None, log_name='OBS_LOGGER'):
if log_config:
self.log_client = LogClient(log_config, 'OBS_LOGGER' if const.IS_WINDOWS else log_name, log_name)
msg = ['[OBS SDK Version=' + const.OBS_SDK_VERSION]
msg.append('Endpoint=' + ('%s://%s:%d' % ('https' if self.is_secure else 'http', self.server, self.port)))
msg.append('Access Mode=' + ('Path' if self.path_style else 'Virtual Hosting') + ']')
self.log_client.log(WARNING, '];['.join(msg))
def _assert_not_null(self, param, msg):
param = util.safe_encode(param)
if param is None or util.to_string(param).strip() == '':
raise Exception(msg)
def _generate_object_url(self, ret, bucketName, objectKey):
if ret and ret.status < 300 and ret.body:
ret.body.objectUrl = self.calling_format.get_full_url(self.is_secure, self.server, self.port, bucketName, objectKey, {})
def _make_options_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, methodName=None):
return self._make_request_with_retry(const.HTTP_METHOD_OPTIONS, bucketName, objectKey, pathArgs, headers, methodName=methodName)
def _make_head_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, methodName=None, skipAuthentication=False):
return self._make_request_with_retry(const.HTTP_METHOD_HEAD, bucketName, objectKey, pathArgs, headers, methodName=methodName, skipAuthentication=skipAuthentication)
def _make_get_request(self, bucketName='', objectKey=None, pathArgs=None, headers=None, methodName=None, parseMethod=None, readable=False):
return self._make_request_with_retry(const.HTTP_METHOD_GET, bucketName, objectKey, pathArgs, headers, methodName=methodName, parseMethod=parseMethod, readable=readable)
def _make_delete_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, methodName=None):
return self._make_request_with_retry(const.HTTP_METHOD_DELETE, bucketName, objectKey, pathArgs, headers, entity, methodName=methodName)
def _make_post_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, chunkedMode=False, methodName=None, readable=False):
return self._make_request_with_retry(const.HTTP_METHOD_POST, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, methodName=methodName, readable=readable)
def _make_put_request(self, bucketName, objectKey=None, pathArgs=None, headers=None, entity=None, chunkedMode=False, methodName=None, readable=False):
return self._make_request_with_retry(const.HTTP_METHOD_PUT, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, methodName=methodName, readable=readable)
def _make_error_result(self, e, ret):
self.log_client.log(ERROR, 'request error, %s' % e)
self.log_client.log(ERROR, traceback.format_exc())
if ret is not None:
return ret
raise e
def _make_request_with_retry(self, methodType, bucketName, objectKey=None, pathArgs=None, headers=None,
entity=None, chunkedMode=False, methodName=None, readable=False, parseMethod=None, redirectLocation=None, skipAuthentication=False):
flag = 0
redirect_count = 0
conn = None
_redirectLocation = redirectLocation
redirectFlag = False
while True:
try:
conn = self._make_request_internal(methodType, bucketName, objectKey, pathArgs, headers, entity, chunkedMode, _redirectLocation, skipAuthentication=skipAuthentication, redirectFlag=redirectFlag)
return self._parse_xml(conn, methodName, readable) if not parseMethod else parseMethod(conn)
except Exception as e:
ret = None
if isinstance(e, _InternalException):
ret = e.result
else:
util.close_conn(conn, self.log_client)
if isinstance(e, _RedirectException):
redirect_count += 1
_redirectLocation = e.location
flag -= 1
ret = e.result
if methodType == const.HTTP_METHOD_GET and e.result.status == 302:
redirectFlag = True
else:
redirectFlag = False
if redirect_count >= self.max_redirect_count:
self.log_client.log(ERROR, 'request redirect count [%d] greater than max redirect count [%d]' % (
redirect_count, self.max_redirect_count))
return self._make_error_result(e, ret)
if flag >= self.max_retry_count or readable:
return self._make_error_result(e, ret)
flag += 1
time.sleep(math.pow(2, flag) * 0.05)
self.log_client.log(WARNING, 'request again, time:%d' % int(flag))
continue
break
def _make_request_internal(self, method, bucketName='', objectKey=None, pathArgs=None, headers=None, entity=None,
chunkedMode=False, redirectLocation=None, skipAuthentication=False, redirectFlag=False):
objectKey = util.safe_encode(objectKey)
if objectKey is None:
objectKey = ''
port = None
scheme = None
path = None
if redirectLocation:
redirectLocation = urlparse(redirectLocation)
connect_server = redirectLocation.hostname
scheme = redirectLocation.scheme
port = redirectLocation.port if redirectLocation.port is not None else const.DEFAULT_INSECURE_PORT if scheme.lower() == 'http' else const.DEFAULT_SECURE_PORT
redirect = True
_path = redirectLocation.path
query = redirectLocation.query
path = _path + '?' + query if query else _path
skipAuthentication = True
if not redirectFlag and not path:
skipAuthentication = False
else:
connect_server = self.server if self.is_cname else self.calling_format.get_server(self.server, bucketName)
redirect = False
port = self.port
if self.is_cname:
bucketName = ''
if not path:
path = self.calling_format.get_url(bucketName, objectKey, pathArgs)
headers = self._rename_request_headers(headers, method)
if entity is not None and not callable(entity):
entity = util.safe_encode(entity)
if not isinstance(entity, str) and not isinstance(entity, bytes):
entity = util.to_string(entity)
if not const.IS_PYTHON2:
entity = entity.encode('UTF-8') if not isinstance(entity, bytes) else entity
headers[const.CONTENT_LENGTH_HEADER] = util.to_string(len(entity))
headers[const.HOST_HEADER] = '%s:%s' % (connect_server, port) if port != 443 and port != 80 else connect_server
header_config = self._add_auth_headers(headers, method, bucketName, objectKey, pathArgs, skipAuthentication)
header_log = header_config.copy()
header_log[const.HOST_HEADER] = '******'
header_log[const.AUTHORIZATION_HEADER] = '******'
self.log_client.log(DEBUG, 'method:%s, path:%s, header:%s', method, path, header_log)
conn = self._send_request(connect_server, method, path, header_config, entity, port, scheme, redirect, chunkedMode)
return conn
def _add_auth_headers(self, headers, method, bucketName, objectKey, pathArgs, skipAuthentication=False):
from datetime import datetime
now_date = None
if self.ha.date_header() not in headers:
now_date = datetime.utcnow()
headers[const.DATE_HEADER] = now_date.strftime(const.GMT_DATE_FORMAT)
if skipAuthentication:
return headers
securityProvider = self._get_token()
ak = securityProvider.access_key_id
sk = securityProvider.secret_access_key
if util.is_valid(ak) and util.is_valid(sk):
if securityProvider.security_token is not None:
headers[self.ha.security_token_header()] = securityProvider.security_token
cur_signature = self.thread_local.signature.lower() if self.is_signature_negotiation else self.signature.lower()
if cur_signature == 'v4':
if now_date is None:
now_date = datetime.strptime(headers[self.ha.date_header()], const.LONG_DATE_FORMAT)
shortDate = now_date.strftime(const.SHORT_DATE_FORMAT)
longDate = now_date.strftime(const.LONG_DATE_FORMAT)
v4Auth = auth.V4Authentication(ak, sk, str(self.region) if self.region is not None else '', shortDate, longDate, self.path_style, self.ha)
ret = v4Auth.doAuth(method, bucketName, objectKey, pathArgs, headers)
self.log_client.log(DEBUG, '%s: %s' % (const.CANONICAL_REQUEST, ret[const.CANONICAL_REQUEST]))
else:
obsAuth = auth.Authentication(ak, sk, self.path_style, self.ha, self.server, self.is_cname)
ret = obsAuth.doAuth(method, bucketName, objectKey, pathArgs, headers)
self.log_client.log(DEBUG, '%s: %s' % (const.CANONICAL_STRING, ret[const.CANONICAL_STRING]))
headers[const.AUTHORIZATION_HEADER] = ret[const.AUTHORIZATION_HEADER]
return headers
def _rename_request_headers(self, headers, method):
new_headers = {}
if isinstance(headers, dict):
for k, v in headers.items():
if k is not None and v is not None:
k = util.to_string(k).strip()
if k.lower() not in const.ALLOWED_REQUEST_HTTP_HEADER_METADATA_NAMES and not k.lower().startswith(const.V2_HEADER_PREFIX) and not k.lower().startswith(const.OBS_HEADER_PREFIX):
if method not in (const.HTTP_METHOD_PUT, const.HTTP_METHOD_POST):
continue
k = self.ha._get_meta_header_prefix() + k
if(k.lower().startswith(self.ha._get_meta_header_prefix())):
k = util.encode_item(k, ' ;/?:@&=+$,')
if(k.lower() == self.ha._get_header_prefix() + 'copy-source'):
index = v.rfind('?versionId=')
if index > 0:
new_headers[k] = util.encode_item(v[0:index], '/') + v[index:]
else:
new_headers[k] = util.encode_item(v, '/')
else:
new_headers[k] = v if (isinstance(v, list)) else util.encode_item(v, ' ;/?:@&=+$,\'*')
return new_headers
def _get_server_connection(self, server, port=None, scheme=None, redirect=False, proxy_host=None, proxy_port=None):
is_secure = self.is_secure if scheme is None else True if scheme == 'https' else False
if self.connHolder is not None and not self.connHolder['connSet'].empty() and not redirect:
try:
return self.connHolder['connSet'].get_nowait()
except:
self.log_client.log(DEBUG, 'can not get conn, will create a new one')
if self.use_http2:
from obs import http2
conn = http2._get_server_connection(server, port, self.context, is_secure, proxy_host, proxy_port)
else:
if proxy_host is not None and proxy_port is not None:
server = proxy_host
port = proxy_port
if is_secure:
if const.IS_PYTHON2:
try:
conn = httplib.HTTPSConnection(server, port=port, timeout=self.timeout, context=self.context)
except Exception:
conn = httplib.HTTPSConnection(server, port=port, timeout=self.timeout)
else:
conn = httplib.HTTPSConnection(server, port=port, timeout=self.timeout, context=self.context, check_hostname=False)
else:
conn = httplib.HTTPConnection(server, port=port, timeout=self.timeout)
if redirect:
conn._clear = True
return conn
def _send_request(self, server, method, path, header, entity=None, port=None, scheme=None, redirect=False, chunkedMode=False):
conn = None
header = header or {}
connection_key = const.CONNECTION_HEADER
if self.proxy_host is not None and self.proxy_port is not None:
conn = self._get_server_connection(server, port, scheme, redirect, util.to_string(self.proxy_host), util.to_int(self.proxy_port))
_header = {}
if self.proxy_username is not None and self.proxy_password is not None:
_header[const.PROXY_AUTHORIZATION_HEADER] = 'Basic %s' % (util.base64_encode(util.to_string(self.proxy_username) + ':' + util.to_string(self.proxy_password)))
if not self.use_http2:
conn.set_tunnel(server, port, _header)
else:
header[const.PROXY_AUTHORIZATION_HEADER] = _header[const.PROXY_AUTHORIZATION_HEADER]
connection_key = const.PROXY_CONNECTION_HEADER
else:
conn = self._get_server_connection(server, port, scheme, redirect)
if self.long_conn_mode:
header[connection_key] = const.CONNECTION_KEEP_ALIVE_VALUE
else:
header[const.CONNECTION_HEADER] = const.CONNECTION_CLOSE_VALUE
header[const.USER_AGENT_HEADER] = 'obs-sdk-python/' + const.OBS_SDK_VERSION
if method == const.HTTP_METHOD_OPTIONS and not self.use_http2:
conn.putrequest(method, path, skip_host=1)
for k, v in header.items():
if isinstance(v, list):
for item in v:
conn.putheader(k, item)
else:
conn.putheader(k, v)
conn.endheaders()
else:
if chunkedMode:
header[const.TRANSFER_ENCODING_HEADER] = const.TRANSFER_ENCODING_VALUE
if self.use_http2:
conn.request(method, path, body=entity, headers=header)
else:
if chunkedMode:
conn.putrequest(method, path, skip_host=1)
for k, v in header.items():
conn.putheader(k, v)
conn.endheaders()
else:
conn.request(method, path, headers=header)
if entity is not None:
if callable(entity):
entity(util.conn_delegate(conn))
else:
conn.send(entity)
self.log_client.log(DEBUG, 'request content:%s', util.to_string(entity))
return conn
def _getNoneResult(self, message='None Result'):
raise Exception(message)
def _parse_xml(self, conn, methodName=None, readable=False):
if not conn:
return self._getNoneResult('connection is none')
result = None
try:
result = conn.getresponse(True) if const.IS_PYTHON2 else conn.getresponse()
if not result:
return self._getNoneResult('response is none')
return self._parse_xml_internal(result, methodName, readable=readable)
except _RedirectException as ex:
raise ex
except _InternalException as ex:
raise ex
except Exception as e:
conn._clear = True
self.log_client.log(ERROR, traceback.format_exc())
raise e
finally:
util.do_close(result, conn, self.connHolder, self.log_client)
def _parse_content_with_notifier(self, conn, objectKey, chuckSize=65536, downloadPath=None, notifier=None):
if not conn:
return self._getNoneResult('connection is none')
result = None
close_conn_flag = True
try:
result = conn.getresponse(True) if const.IS_PYTHON2 else conn.getresponse()
if not result:
return self._getNoneResult('response is none')
if not util.to_int(result.status) < 300:
return self._parse_xml_internal(result)
headers = {}
for k, v in result.getheaders():
headers[k.lower()] = v
content_length = headers.get('content-length')
content_length = util.to_long(content_length) if content_length is not None else None
resultWrapper = ResponseWrapper(conn, result, self.connHolder, content_length, notifier)
if downloadPath is None:
self.log_client.log(DEBUG, 'DownloadPath is none, return conn directly')
close_conn_flag = False
body = ObjectStream(response=resultWrapper)
else:
objectKey = util.safe_encode(objectKey)
downloadPath = util.safe_encode(downloadPath)
file_path, _ = self._get_data(resultWrapper, downloadPath, chuckSize)
body = ObjectStream(url=util.to_string(file_path))
self.log_client.log(DEBUG, 'DownloadPath is ' + util.to_string(file_path))
status = util.to_int(result.status)
reason = result.reason
self.convertor.parseGetObject(headers, body)
header = self._rename_response_headers(headers)
requestId = dict(header).get('request-id')
return GetResult(status=status, reason=reason, header=header, body=body, requestId=requestId)
except _RedirectException as ex:
raise ex
except Exception as e:
self.log_client.log(ERROR, traceback.format_exc())
raise e
finally:
if close_conn_flag:
util.do_close(result, conn, self.connHolder, self.log_client)
def _parse_content(self, conn, objectKey, downloadPath=None, chuckSize=65536, loadStreamInMemory=False, progressCallback=None):
if not conn:
return self._getNoneResult('connection is none')
close_conn_flag = True
result = None
resultWrapper = None
try:
result = conn.getresponse(True) if const.IS_PYTHON2 else conn.getresponse()
if not result:
return self._getNoneResult('response is none')
if not util.to_int(result.status) < 300:
return self._parse_xml_internal(result)
headers = {}
for k, v in result.getheaders():
headers[k.lower()] = v
content_length = headers.get('content-length')
content_length = util.to_long(content_length) if content_length is not None else None
notifier = progress.ProgressNotifier(progressCallback, content_length) if content_length is not None and content_length > 0 and progressCallback is not None else progress.NONE_NOTIFIER
notifier.start()
resultWrapper = ResponseWrapper(conn, result, self.connHolder, content_length, notifier)
if loadStreamInMemory:
self.log_client.log(DEBUG, 'loadStreamInMemory is True, read stream into memory')
buf = None
while True:
chunk = resultWrapper.read(chuckSize)
if not chunk:
break
if buf is None:
buf = chunk
else:
buf += chunk
body = ObjectStream(buffer=buf, size=util.to_long(len(buf)) if buf is not None else 0)
elif downloadPath is None:
self.log_client.log(DEBUG, 'DownloadPath is none, return conn directly')
close_conn_flag = False
body = ObjectStream(response=resultWrapper)
else:
objectKey = util.safe_encode(objectKey)
downloadPath = util.safe_encode(downloadPath)
file_path, _ = self._get_data(resultWrapper, downloadPath, chuckSize)
body = ObjectStream(url=util.to_string(file_path))
self.log_client.log(DEBUG, 'DownloadPath is ' + util.to_string(file_path))
status = util.to_int(result.status)
reason = result.reason
self.convertor.parseGetObject(headers, body)
header = self._rename_response_headers(headers)
requestId = dict(header).get('request-id')
return GetResult(status=status, reason=reason, header=header, body=body, requestId=requestId)
except _RedirectException as ex:
raise ex
except Exception as e:
self.log_client.log(ERROR, traceback.format_exc())
raise e
finally:
if close_conn_flag:
if resultWrapper is not None:
resultWrapper.close()
else:
util.do_close(result, conn, self.connHolder, self.log_client)
def _get_data(self, resultWrapper, downloadPath, chuckSize):
origin_file_path = downloadPath
readed_count = 0
if const.IS_WINDOWS:
downloadPath = util.safe_trans_to_gb2312(downloadPath)
pathDir = os.path.dirname(downloadPath)
if not os.path.exists(pathDir):
os.makedirs(pathDir, 0o755)
with open(downloadPath, 'wb') as f:
while True:
chunk = resultWrapper.read(chuckSize)
if not chunk:
break
f.write(chunk)
readed_count += len(chunk)
return origin_file_path, readed_count
def _rename_key(self, k, v):
flag = 0
if k.startswith(const.V2_META_HEADER_PREFIX):
k = k[k.index(const.V2_META_HEADER_PREFIX) + len(const.V2_META_HEADER_PREFIX):]
k = util.decode_item(k)
v = util.decode_item(v)
flag = 1
elif k.startswith(const.V2_HEADER_PREFIX):
k = k[k.index(const.V2_HEADER_PREFIX) + len(const.V2_HEADER_PREFIX):]
v = util.decode_item(v)
flag = 1
elif k.startswith(const.OBS_META_HEADER_PREFIX):
k = k[k.index(const.OBS_META_HEADER_PREFIX) + len(const.OBS_META_HEADER_PREFIX):]
k = util.decode_item(k)
v = util.decode_item(v)
flag = 1
elif k.startswith(const.OBS_HEADER_PREFIX):
k = k[k.index(const.OBS_HEADER_PREFIX) + len(const.OBS_HEADER_PREFIX):]
v = util.decode_item(v)
flag = 1
return flag, k, v
def _rename_response_headers(self, headers):
header = []
for k, v in headers.items():
flag = 0
if k in const.ALLOWED_RESPONSE_HTTP_HEADER_METADATA_NAMES:
flag = 1
else:
flag, k, v = self._rename_key(k, v)
if flag:
header.append((k, v))
return header
def _parse_xml_internal(self, result, methodName=None, chuckSize=65536, readable=False):
status = util.to_int(result.status)
reason = result.reason
code = None
message = None
body = None
requestId = None
hostId = None
resource = None
headers = {}
for k, v in result.getheaders():
headers[k.lower()] = v
xml = None
while True:
chunk = result.read(chuckSize)
if not chunk:
break
xml = chunk if xml is None else xml + chunk
header = self._rename_response_headers(headers)
indicator = headers.get(self.ha.indicator_header())
if status < 300:
if methodName is not None:
parseMethod = getattr(self.convertor, 'parse' + methodName[:1].upper() + methodName[1:])
if parseMethod is not None:
try:
if xml:
xml = xml if const.IS_PYTHON2 else xml.decode('UTF-8')
self.log_client.log(DEBUG, 'recv Msg:%s', xml)
search = self.pattern.search(xml)
xml = xml if search is None else xml.replace(search.group(), '')
body = parseMethod(xml, headers)
else:
body = parseMethod(headers)
except Exception as e:
self.log_client.log(ERROR, util.to_string(e))
self.log_client.log(ERROR, traceback.format_exc())
requestId = headers.get('x-obs-request-id')
if requestId is None:
requestId = headers.get('x-amz-request-id')
elif xml:
xml = xml if const.IS_PYTHON2 else xml.decode('UTF-8')
try:
search = self.pattern.search(xml)
xml = xml if search is None else xml.replace(search.group(), '')
code, message, requestId, hostId, resource = self.convertor.parseErrorResult(xml)
except Exception as ee:
self.log_client.log(ERROR, util.to_string(ee))
self.log_client.log(ERROR, traceback.format_exc())
if requestId is None:
requestId = headers.get('x-obs-request-id')
if requestId is None:
requestId = headers.get('x-amz-request-id')
self.log_client.log(DEBUG, 'http response result:status:%d,reason:%s,code:%s,message:%s,headers:%s',
status, reason, code, message, header)
if status >= 300:
self.log_client.log(ERROR, 'exceptional obs response:status:%d,reason:%s,code:%s,message:%s,requestId:%s',
status, reason, code, message, requestId)
ret = GetResult(code=code, message=message, status=status, reason=reason, body=body,
requestId=requestId, hostId=hostId, resource=resource, header=header, indicator=indicator)
if not readable:
if status >= 300 and status < 400 and status != 304 and const.LOCATION_HEADER.lower() in headers:
location = headers.get(const.LOCATION_HEADER.lower())
self.log_client.log(WARNING, 'http code is %d, need to redirect to %s', status, location)
raise _RedirectException('http code is {0}, need to redirect to {1}'.format(status, location), location, ret)
if status >= 500:
raise _InternalException(ret)
return ret
class _CreateSignedUrlResponse(BaseModel):
allowedAttr = {‘signedUrl’: const.BASESTRING, ‘actualSignedRequestHeaders’: dict}
class _CreatePostSignatureResponse(BaseModel):
allowedAttr = {‘originPolicy’: const.BASESTRING, ‘policy’: const.BASESTRING,
‘credential’: const.BASESTRING, ‘date’: const.BASESTRING, ‘signature’: const.BASESTRING, ‘accessKeyId’: const.BASESTRING}
class ObsClient(_BasicClient):
def __init__(self, *args, **kwargs):
super(ObsClient, self).__init__(*args, **kwargs)
def _prepareParameterForSignedUrl(self, specialParam, expires, headers, queryParams):
headers = {} if headers is None or not isinstance(headers, dict) else headers.copy()
queryParams = {} if queryParams is None or not isinstance(queryParams, dict) else queryParams.copy()
_headers = {}
for k, v in headers.items():
if k is not None and k != '':
_headers[k] = v
_queryParams = {}
for k, v in queryParams.items():
if k is not None and k != '':
_queryParams[k] = v
if specialParam is not None:
specialParam = 'storageClass' if self.signature.lower() == 'obs' and specialParam == 'storagePolicy' else 'storagePolicy' if self.signature.lower() != 'obs' and specialParam == 'storageClass' else specialParam
_queryParams[specialParam] = None
expires = 300 if expires is None else util.to_int(expires)
return _headers, _queryParams, expires, self.calling_format
def createSignedUrl(self, method, bucketName=None, objectKey=None, specialParam=None, expires=300, headers=None, queryParams=None):
delegate = self._createV4SignedUrl if self.signature.lower() == 'v4' else self._createV2SignedUrl
return delegate(method, bucketName, objectKey, specialParam, expires, headers, queryParams)
def createV2SignedUrl(self, method, bucketName=None, objectKey=None, specialParam=None, expires=300, headers=None, queryParams=None):
return self._createV2SignedUrl(method, bucketName, objectKey, specialParam, expires, headers, queryParams)
def createV4SignedUrl(self, method, bucketName=None, objectKey=None, specialParam=None, expires=300, headers=None, queryParams=None):
return self._createV4SignedUrl(method, bucketName, objectKey, specialParam, expires, headers, queryParams)
def _createV2SignedUrl(self, method, bucketName=None, objectKey=None, specialParam=None, expires=300, headers=None, queryParams=None):
headers, queryParams, expires, calling_format = self._prepareParameterForSignedUrl(specialParam, expires, headers, queryParams)
connect_server = self.server if self.is_cname else calling_format.get_server(self.server, bucketName)
headers[const.HOST_HEADER] = '%s:%s' % (connect_server, self.port) if self.port != 443 and self.port != 80 else connect_server
expires += util.to_int(time.time())
securityProvider = self._get_token()
if securityProvider.security_token is not None and self.ha.security_token_header() not in queryParams:
queryParams[self.ha.security_token_header()] = securityProvider.security_token
v2Auth = auth.Authentication(securityProvider.access_key_id, securityProvider.secret_access_key, self.path_style, self.ha, self.server, self.is_cname)
signature = v2Auth.getSignature(method, bucketName, objectKey, queryParams, headers, util.to_string(expires))['Signature']
queryParams['Expires'] = expires
queryParams['AccessKeyId' if self.signature == 'obs' else 'AWSAccessKeyId'] = securityProvider.access_key_id
queryParams['Signature'] = signature
if self.is_cname:
bucketName = None
result = {
'signedUrl': calling_format.get_full_url(self.is_secure, self.server, self.port, bucketName, objectKey, queryParams),
'actualSignedRequestHeaders': headers
}
return _CreateSignedUrlResponse(**result)
def _createV4SignedUrl(self, method, bucketName=None, objectKey=None, specialParam=None, expires=300, headers=None, queryParams=None):
from datetime import datetime
headers, queryParams, expires, calling_format = self._prepareParameterForSignedUrl(specialParam, expires, headers, queryParams)
if self.is_cname:
connect_server = self.server
bucketName = None
else:
connect_server = calling_format.get_server(self.server, bucketName)
headers[const.HOST_HEADER] = '%s:%s' % (connect_server, self.port) if self.port != 443 and self.port != 80 else connect_server
date = headers[const.DATE_HEADER] if const.DATE_HEADER in headers else headers.get(const.DATE_HEADER.lower())
date = datetime.strptime(date, const.GMT_DATE_FORMAT) if date else datetime.utcnow()
shortDate = date.strftime(const.SHORT_DATE_FORMAT)
longDate = date.strftime(const.LONG_DATE_FORMAT)
securityProvider = self._get_token()
if securityProvider.security_token is not None and self.ha.security_token_header() not in queryParams:
queryParams[self.ha.security_token_header()] = securityProvider.security_token
v4Auth = auth.V4Authentication(securityProvider.access_key_id, securityProvider.secret_access_key, self.region, shortDate, longDate, self.path_style, self.ha)
queryParams['X-Amz-Algorithm'] = 'AWS4-HMAC-SHA256'
queryParams['X-Amz-Credential'] = v4Auth.getCredenttial()
queryParams['X-Amz-Date'] = longDate
queryParams['X-Amz-Expires'] = expires
headMap = v4Auth.setMapKeyLower(headers)
signedHeaders = v4Auth.getSignedHeaders(headMap)
queryParams['X-Amz-SignedHeaders'] = signedHeaders
signature = v4Auth.getSignature(method, bucketName, objectKey, queryParams, headMap, signedHeaders, 'UNSIGNED-PAYLOAD')['Signature']
queryParams['X-Amz-Signature'] = signature
if self.is_cname:
bucketName = None
result = {
'signedUrl': calling_format.get_full_url(self.is_secure, self.server, self.port, bucketName, objectKey, queryParams),
'actualSignedRequestHeaders': headers
}
return _CreateSignedUrlResponse(**result)
def createV4PostSignature(self, bucketName=None, objectKey=None, expires=300, formParams=None):
return self._createPostSignature(bucketName, objectKey, expires, formParams, True)
def createPostSignature(self, bucketName=None, objectKey=None, expires=300, formParams=None):
return self._createPostSignature(bucketName, objectKey, expires, formParams, self.signature.lower() == 'v4')
def _createPostSignature(self, bucketName=None, objectKey=None, expires=300, formParams=None, is_v4=False):
from datetime import datetime, timedelta
date = datetime.utcnow()
shortDate = date.strftime(const.SHORT_DATE_FORMAT)
longDate = date.strftime(const.LONG_DATE_FORMAT)
securityProvider = self._get_token()
expires = 300 if expires is None else util.to_int(expires)
expires = date + timedelta(seconds=expires)
expires = expires.strftime(const.EXPIRATION_DATE_FORMAT)
formParams = {} if formParams is None or not isinstance(formParams, dict) else formParams.copy()
if securityProvider.security_token is not None and self.ha.security_token_header() not in formParams:
formParams[self.ha.security_token_header()] = securityProvider.security_token
if is_v4:
formParams['X-Amz-Algorithm'] = 'AWS4-HMAC-SHA256'
formParams['X-Amz-Date'] = longDate
formParams['X-Amz-Credential'] = '%s/%s/%s/s3/aws4_request' % (securityProvider.access_key_id, shortDate, self.region)
if bucketName:
formParams['bucket'] = bucketName
if objectKey:
formParams['key'] = objectKey
policy = ['{"expiration":"']
policy.append(expires)
policy.append('", "conditions":[')
matchAnyBucket = True
matchAnyKey = True
conditionAllowKeys = ['acl', 'bucket', 'key', 'success_action_redirect', 'redirect', 'success_action_status']
for key, value in formParams.items():
if key:
key = util.to_string(key).lower()
if key == 'bucket':
matchAnyBucket = False
elif key == 'key':
matchAnyKey = False
if key not in const.ALLOWED_REQUEST_HTTP_HEADER_METADATA_NAMES and not key.startswith(self.ha._get_header_prefix()) and not key.startswith(const.OBS_HEADER_PREFIX) and key not in conditionAllowKeys:
continue
policy.append('{"')
policy.append(key)
policy.append('":"')
policy.append(util.to_string(value) if value is not None else '')
policy.append('"},')
if matchAnyBucket:
policy.append('["starts-with", "$bucket", ""],')
if matchAnyKey:
policy.append('["starts-with", "$key", ""],')
policy.append(']}')
originPolicy = ''.join(policy)
policy = util.base64_encode(originPolicy)
if is_v4:
v4Auth = auth.V4Authentication(securityProvider.access_key_id, securityProvider.secret_access_key, self.region, shortDate, longDate,
self.path_style, self.ha)
signingKey = v4Auth.getSigningKey_python2() if const.IS_PYTHON2 else v4Auth.getSigningKey_python3()
signature = v4Auth.hmacSha256(signingKey, policy if const.IS_PYTHON2 else policy.encode('UTF-8'))
result = {'originPolicy': originPolicy, 'policy': policy, 'algorithm': formParams['X-Amz-Algorithm'], 'credential': formParams['X-Amz-Credential'], 'date': formParams['X-Amz-Date'], 'signature': signature}
else:
v2Auth = auth.Authentication(securityProvider.access_key_id, securityProvider.secret_access_key, self.path_style, self.ha, self.server, self.is_cname)
signature = v2Auth.hmacSha128(policy)
result = {'originPolicy': originPolicy, 'policy': policy, 'signature': signature, 'accessKeyId': securityProvider.access_key_id}
return _CreatePostSignatureResponse(**result)
def bucketClient(self, bucketName):
return BucketClient(self, bucketName)
def _getApiVersion(self, bucketName=''):
res = self._make_head_request(bucketName, pathArgs={'apiversion':None}, skipAuthentication=True)
if res.status >= 500 or res.status == 404 :
return '', res
if not hasattr(res, 'header') :
return const.V2_SIGNATURE, res
header = dict(res.header)
if header.get('api', '0.0') >= '3.0' or header.get('x-obs-api', '0.0') >= '3.0' :
return const.OBS_SIGNATURE, res
return const.V2_SIGNATURE, res
@funcCache
def listBuckets(self, isQueryLocation=True):
if self.is_cname:
raise Exception('listBuckets is not allowed in customdomain mode')
return self._make_get_request(methodName='listBuckets', **self.convertor.trans_list_buckets(isQueryLocation=isQueryLocation))
@funcCache
def createBucket(self, bucketName, header=CreateBucketHeader(), location=None):
if self.is_cname:
raise Exception('createBucket is not allowed in customdomain mode')
res = self._make_put_request(bucketName, **self.convertor.trans_create_bucket(header=header, location=location))
try:
if self.is_signature_negotiation and res.status == 400 and res.errorMessage == 'Unsupported Authorization Type' and self.thread_local.signature == const.OBS_SIGNATURE:
self.thread_local.signature = const.V2_SIGNATURE
res = self._make_put_request(bucketName, **self.convertor.trans_create_bucket(header=header, location=location))
finally:
return res
@funcCache
def listObjects(self, bucketName, prefix=None, marker=None, max_keys=None, delimiter=None):
return self._make_get_request(bucketName, methodName='listObjects',
**self.convertor.trans_list_objects(prefix=prefix, marker=marker, max_keys=max_keys, delimiter=delimiter))
@funcCache
def headBucket(self, bucketName):
return self._make_head_request(bucketName)
@funcCache
def getBucketMetadata(self, bucketName, origin=None, requestHeaders=None):
return self._make_head_request(bucketName, methodName='getBucketMetadata', **self.convertor.trans_get_bucket_metadata(origin=origin, requestHeaders=requestHeaders))
@funcCache
def getBucketLocation(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'location':None}, methodName='getBucketLocation')
@funcCache
def deleteBucket(self, bucketName):
return self._make_delete_request(bucketName)
@funcCache
def setBucketQuota(self, bucketName, quota):
self._assert_not_null(quota, 'quota is empty')
return self._make_put_request(bucketName, pathArgs={'quota': None}, entity=self.convertor.trans_quota(quota))
@funcCache
def getBucketQuota(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'quota': None}, methodName='getBucketQuota')
@funcCache
def getBucketStorageInfo(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'storageinfo': None}, methodName='getBucketStorageInfo')
@funcCache
def setBucketAcl(self, bucketName, acl=ACL(), aclControl=None):
if acl is not None and len(acl) > 0 and aclControl is not None:
raise Exception('Both acl and aclControl are set')
if not acl and not aclControl:
raise Exception('Both acl and aclControl are not set')
return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_acl(acl=acl, aclControl=aclControl))
@funcCache
def getBucketAcl(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'acl': None}, methodName='getBucketAcl')
@funcCache
def setBucketPolicy(self, bucketName, policyJSON):
self._assert_not_null(policyJSON, 'policyJSON is empty')
return self._make_put_request(bucketName, pathArgs={'policy' : None}, entity=policyJSON)
@funcCache
def getBucketPolicy(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'policy' : None}, methodName='getBucketPolicy')
@funcCache
def deleteBucketPolicy(self, bucketName):
return self._make_delete_request(bucketName, pathArgs={'policy' : None})
@funcCache
def setBucketVersioning(self, bucketName, status):
self._assert_not_null(status, 'status is empty')
return self._make_put_request(bucketName, pathArgs={'versioning' : None}, entity=self.convertor.trans_version_status(status))
@funcCache
def getBucketVersioning(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'versioning' : None}, methodName='getBucketVersioning')
@funcCache
def listVersions(self, bucketName, version=Versions()):
return self._make_get_request(bucketName, methodName='listVersions', **self.convertor.trans_list_versions(version=version))
@funcCache
def listMultipartUploads(self, bucketName, multipart=ListMultipartUploadsRequest()):
return self._make_get_request(bucketName, methodName='listMultipartUploads', **self.convertor.trans_list_multipart_uploads(multipart=multipart))
@funcCache
def deleteBucketLifecycle(self, bucketName):
return self._make_delete_request(bucketName, pathArgs={'lifecycle':None})
@funcCache
def setBucketLifecycle(self, bucketName, lifecycle):
self._assert_not_null(lifecycle, 'lifecycle is empty')
return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_lifecycle(lifecycle=lifecycle))
@funcCache
def getBucketLifecycle(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'lifecycle':None}, methodName='getBucketLifecycle')
@funcCache
def deleteBucketWebsite(self, bucketName):
return self._make_delete_request(bucketName, pathArgs={'website':None})
@funcCache
def setBucketWebsite(self, bucketName, website):
self._assert_not_null(website, 'website is empty')
return self._make_put_request(bucketName, pathArgs={'website':None}, entity=self.convertor.trans_website(website))
@funcCache
def getBucketWebsite(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'website':None}, methodName='getBucketWebsite')
@funcCache
def setBucketLogging(self, bucketName, logstatus=Logging()):
if logstatus is None:
logstatus = Logging()
return self._make_put_request(bucketName, pathArgs={'logging':None}, entity=self.convertor.trans_logging(logstatus))
@funcCache
def getBucketLogging(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'logging':None}, methodName='getBucketLogging')
@funcCache
def getBucketTagging(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'tagging' : None}, methodName='getBucketTagging')
@funcCache
def setBucketTagging(self, bucketName, tagInfo):
self._assert_not_null(tagInfo, 'tagInfo is empty')
return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_tagging(tagInfo=tagInfo))
@funcCache
def deleteBucketTagging(self, bucketName):
return self._make_delete_request(bucketName, pathArgs={'tagging' : None})
@funcCache
def setBucketCors(self, bucketName, corsRuleList):
self._assert_not_null(corsRuleList, 'corsRuleList is empty')
return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_cors(corsRuleList=corsRuleList))
@funcCache
def deleteBucketCors(self, bucketName):
return self._make_delete_request(bucketName, pathArgs={'cors' : None})
@funcCache
def getBucketCors(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'cors': None}, methodName='getBucketCors')
@funcCache
def optionsBucket(self, bucketName, option):
return self.optionsObject(bucketName, None, option=option)
@funcCache
def setBucketNotification(self, bucketName, notification=Notification()):
if notification is None:
notification = Notification()
return self._make_put_request(bucketName, pathArgs={'notification': None}, entity=self.convertor.trans_notification(notification))
@funcCache
def getBucketNotification(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'notification': None}, methodName='getBucketNotification')
@funcCache
def optionsObject(self, bucketName, objectKey, option):
headers = {}
if option is not None:
if option.get('origin') is not None:
headers[const.ORIGIN_HEADER] = util.to_string(option['origin'])
if option.get('accessControlRequestMethods') is not None:
headers[const.ACCESS_CONTROL_REQUEST_METHOD_HEADER] = option['accessControlRequestMethods']
if option.get('accessControlRequestHeaders') is not None:
headers[const.ACCESS_CONTROL_REQUEST_HEADERS_HEADER] = option['accessControlRequestHeaders']
return self._make_options_request(bucketName, objectKey, headers=headers, methodName='optionsBucket')
@funcCache
def getObjectMetadata(self, bucketName, objectKey, versionId=None, sseHeader=None, origin=None, requestHeaders=None):
pathArgs = {}
if versionId:
pathArgs[const.VERSION_ID_PARAM] = util.to_string(versionId)
headers = {}
if origin:
headers[const.ORIGIN_HEADER] = util.to_string(origin)
_requestHeaders = requestHeaders[0] if isinstance(requestHeaders, list) and len(requestHeaders) == 1 else requestHeaders
if _requestHeaders:
headers[const.ACCESS_CONTROL_REQUEST_HEADERS_HEADER] = util.to_string(_requestHeaders)
return self._make_head_request(bucketName, objectKey, pathArgs=pathArgs,
headers=self.convertor._set_sse_header(sseHeader, headers=headers, onlySseCHeader=True), methodName='getObjectMetadata')
@funcCache
def setObjectMetadata(self, bucketName, objectKey, metadata=None, headers=None, versionId=None):
if headers is None:
headers = SetObjectMetadataHeader()
return self._make_put_request(bucketName, objectKey, methodName='setObjectMetadata', **self.convertor.trans_set_object_metadata(metadata=metadata, headers=headers, versionId=versionId))
@funcCache
def getObject(self, bucketName, objectKey, downloadPath=None, getObjectRequest=GetObjectRequest(),
headers=GetObjectHeader(), loadStreamInMemory=False, progressCallback=None):
_parse_content = self._parse_content
CHUNKSIZE = self.chunk_size
readable = False if progressCallback is None else True
def parseMethod(conn):
return _parse_content(conn, objectKey, downloadPath, CHUNKSIZE, loadStreamInMemory, progressCallback)
return self._make_get_request(bucketName, objectKey, parseMethod=parseMethod, readable=readable, **self.convertor.trans_get_object(getObjectRequest=getObjectRequest, headers=headers))
@funcCache
def _getObjectWithNotifier(self, bucketName, objectKey, getObjectRequest=GetObjectRequest(),
headers=GetObjectHeader(), downloadPath=None, notifier=None):
_parse_content_with_notifier = self._parse_content_with_notifier
CHUNKSIZE = self.chunk_size
readable = False if notifier is None else True
def parseMethod(conn):
return _parse_content_with_notifier(conn, objectKey, CHUNKSIZE, downloadPath, notifier)
return self._make_get_request(bucketName, objectKey, parseMethod=parseMethod, readable=readable, **self.convertor.trans_get_object(getObjectRequest=getObjectRequest, headers=headers))
@funcCache
def appendObject(self, bucketName, objectKey, content=None, metadata=None, headers=None, progressCallback=None, autoClose=True):
objectKey = util.safe_encode(objectKey)
if objectKey is None:
objectKey = ''
if headers is None:
headers = AppendObjectHeader()
if content is None:
content = AppendObjectContent()
if headers.get('contentType') is None:
headers['contentType'] = const.MIME_TYPES.get(objectKey[objectKey.rfind('.') + 1:].lower())
chunkedMode = False
readable = False
notifier = None
if content.get('isFile'):
file_path = util.safe_encode(content.get('content'))
if not os.path.exists(file_path):
file_path = util.safe_trans_to_gb2312(file_path)
if not os.path.exists(file_path):
raise Exception('file [%s] does not exist' % file_path)
if headers.get('contentType') is None:
headers['contentType'] = const.MIME_TYPES.get(file_path[file_path.rfind('.') + 1:].lower())
file_size = util.to_long(os.path.getsize(file_path))
headers['contentLength'] = util.to_long(headers.get('contentLength'))
headers['contentLength'] = headers['contentLength'] if headers.get('contentLength') is not None and headers['contentLength'] <= file_size else file_size
offset = util.to_long(content.get('offset'))
if offset is not None and 0 < offset < file_size:
headers['contentLength'] = headers['contentLength'] if 0 < headers['contentLength'] <= (file_size - offset) else file_size - offset
totalCount = headers['contentLength']
if totalCount > 0 and progressCallback is not None:
readable = True
notifier = progress.ProgressNotifier(progressCallback, totalCount)
else:
notifier = progress.NONE_NOTIFIER
entity = util.get_file_entity_by_offset_partsize(file_path, offset, totalCount, self.chunk_size, notifier)
else:
totalCount = headers['contentLength']
if totalCount > 0 and progressCallback is not None:
readable = True
notifier = progress.ProgressNotifier(progressCallback, totalCount)
else:
notifier = progress.NONE_NOTIFIER
entity = util.get_file_entity_by_totalcount(file_path, totalCount, self.chunk_size, notifier)
headers = self.convertor.trans_put_object(metadata=metadata, headers=headers)
self.log_client.log(DEBUG, 'send Path:%s' % file_path)
else:
entity = content.get('content')
if entity is None:
entity = ''
elif hasattr(entity, 'read') and callable(entity.read):
readable = True
if headers.get('contentLength') is None:
chunkedMode = True
notifier = progress.ProgressNotifier(progressCallback, -1) if progressCallback is not None else progress.NONE_NOTIFIER
entity = util.get_readable_entity(entity, self.chunk_size, notifier, autoClose)
else:
totalCount = util.to_long(headers.get('contentLength'))
notifier = progress.ProgressNotifier(progressCallback, totalCount) if totalCount > 0 and progressCallback is not None else progress.NONE_NOTIFIER
entity = util.get_readable_entity_by_totalcount(entity, totalCount, self.chunk_size, notifier, autoClose)
headers = self.convertor.trans_put_object(metadata=metadata, headers=headers)
try:
if notifier is not None:
notifier.start()
ret = self._make_post_request(bucketName, objectKey, pathArgs={'append': None, 'position': util.to_string(content['position']) if content.get('position') is not None else 0},
headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='appendObject', readable=readable)
finally:
if notifier is not None:
notifier.end()
self._generate_object_url(ret, bucketName, objectKey)
return ret
@funcCache
def putContent(self, bucketName, objectKey, content=None, metadata=None, headers=None, progressCallback=None, autoClose=True):
objectKey = util.safe_encode(objectKey)
if objectKey is None:
objectKey = ''
if headers is None:
headers = PutObjectHeader()
if headers.get('contentType') is None:
headers['contentType'] = const.MIME_TYPES.get(objectKey[objectKey.rfind('.') + 1:].lower())
_headers = self.convertor.trans_put_object(metadata=metadata, headers=headers)
readable = False
chunkedMode = False
try:
entity = content
notifier = None
if entity is None:
entity = ''
elif hasattr(entity, 'read') and callable(entity.read):
readable = True
if headers.get('contentLength') is None:
chunkedMode = True
notifier = progress.ProgressNotifier(progressCallback, -1) if progressCallback is not None else progress.NONE_NOTIFIER
entity = util.get_readable_entity(entity, self.chunk_size, notifier, autoClose)
else:
totalCount = util.to_long(headers.get('contentLength'))
notifier = progress.ProgressNotifier(progressCallback, totalCount) if totalCount > 0 and progressCallback is not None else progress.NONE_NOTIFIER
entity = util.get_readable_entity_by_totalcount(entity, totalCount, self.chunk_size, notifier, autoClose)
notifier.start()
ret = self._make_put_request(bucketName, objectKey, headers=_headers, entity=entity, chunkedMode=chunkedMode, methodName='putContent', readable=readable)
finally:
if notifier is not None:
notifier.end()
self._generate_object_url(ret, bucketName, objectKey)
return ret
def putObject(self, bucketName, objectKey, content, metadata=None, headers=None, progressCallback=None, autoClose=True):
return self.putContent(bucketName, objectKey, content, metadata, headers, progressCallback, autoClose)
@funcCache
def putFile(self, bucketName, objectKey, file_path, metadata=None, headers=None, progressCallback=None):
file_path = util.safe_encode(file_path)
if not os.path.exists(file_path):
file_path = util.safe_trans_to_gb2312(file_path)
if not os.path.exists(file_path):
raise Exception('file [{0}] doesnot exist'.format(file_path))
_flag = os.path.isdir(file_path)
if headers is None:
headers = PutObjectHeader()
if _flag:
headers['contentLength'] = None
headers['md5'] = None
headers['contentType'] = None
results = []
for f in os.listdir(file_path):
f = util.safe_encode(f)
__file_path = os.path.join(file_path, f)
if not objectKey:
key = util.safe_trans_to_gb2312('{0}/'.format(os.path.split(file_path)[1]) + f)
else:
key = '{0}/'.format(objectKey) + util.safe_trans_to_gb2312(f)
result = self.putFile(bucketName, key, __file_path, metadata, headers)
results.append((key, result))
return results
if not objectKey:
objectKey = os.path.split(file_path)[1]
size = util.to_long(os.path.getsize(file_path))
headers['contentLength'] = util.to_long(headers.get('contentLength'))
if headers.get('contentLength') is not None:
headers['contentLength'] = size if headers['contentLength'] > size else headers['contentLength']
if headers.get('contentType') is None:
headers['contentType'] = const.MIME_TYPES.get(objectKey[objectKey.rfind('.') + 1:].lower())
if headers.get('contentType') is None:
headers['contentType'] = const.MIME_TYPES.get(file_path[file_path.rfind('.') + 1:].lower())
_headers = self.convertor.trans_put_object(metadata=metadata, headers=headers)
if const.CONTENT_LENGTH_HEADER not in _headers:
_headers[const.CONTENT_LENGTH_HEADER] = util.to_string(size)
self.log_client.log(DEBUG, 'send Path:%s' % file_path)
totalCount = util.to_long(headers['contentLength']) if headers.get('contentLength') is not None else os.path.getsize(file_path)
if totalCount > 0 and progressCallback is not None:
notifier = progress.ProgressNotifier(progressCallback, totalCount)
readable = True
else:
notifier = progress.NONE_NOTIFIER
readable = False
entity = util.get_file_entity_by_totalcount(file_path, totalCount, self.chunk_size, notifier)
try:
notifier.start()
ret = self._make_put_request(bucketName, objectKey, headers=_headers, entity=entity, methodName='putContent', readable=readable)
finally:
notifier.end()
self._generate_object_url(ret, bucketName, objectKey)
return ret
@funcCache
def uploadPart(self, bucketName, objectKey, partNumber, uploadId, object=None, isFile=False, partSize=None,
offset=0, sseHeader=None, isAttachMd5=False, md5=None, content=None, progressCallback=None, autoClose=True):
self._assert_not_null(partNumber, 'partNumber is empty')
self._assert_not_null(uploadId, 'uploadId is empty')
if content is None:
content = object
chunkedMode = False
readable = False
notifier = None
if isFile:
file_path = util.safe_encode(content)
if not os.path.exists(file_path):
file_path = util.safe_trans_to_gb2312(file_path)
if not os.path.exists(file_path):
raise Exception('file [%s] does not exist' % file_path)
file_size = util.to_long(os.path.getsize(file_path))
offset = util.to_long(offset)
offset = offset if offset is not None and 0 <= offset < file_size else 0
partSize = util.to_long(partSize)
partSize = partSize if partSize is not None and 0 < partSize <= (file_size - offset) else file_size - offset
headers = {const.CONTENT_LENGTH_HEADER : util.to_string(partSize)}
if md5:
headers[const.CONTENT_MD5_HEADER] = md5
elif isAttachMd5:
headers[const.CONTENT_MD5_HEADER] = util.base64_encode(util.md5_file_encode_by_size_offset(file_path, partSize, offset, self.chunk_size))
if sseHeader is not None:
self.convertor._set_sse_header(sseHeader, headers, True)
if partSize > 0 and progressCallback is not None:
readable = True
notifier = progress.ProgressNotifier(progressCallback, partSize)
else:
notifier = progress.NONE_NOTIFIER
entity = util.get_file_entity_by_offset_partsize(file_path, offset, partSize, self.chunk_size, notifier)
else:
headers = {}
if content is not None and hasattr(content, 'read') and callable(content.read):
readable = True
if md5:
headers[const.CONTENT_MD5_HEADER] = md5
if sseHeader is not None:
self.convertor._set_sse_header(sseHeader, headers, True)
if partSize is None:
self.log_client.log(DEBUG, 'missing partSize when uploading a readable stream')
chunkedMode = True
notifier = progress.ProgressNotifier(progressCallback, -1) if progressCallback is not None else progress.NONE_NOTIFIER
entity = util.get_readable_entity(content, self.chunk_size, notifier, autoClose)
else:
headers[const.CONTENT_LENGTH_HEADER] = util.to_string(partSize)
totalCount = util.to_long(partSize)
notifier = progress.ProgressNotifier(progressCallback, totalCount) if totalCount > 0 and progressCallback is not None else progress.NONE_NOTIFIER
entity = util.get_readable_entity_by_totalcount(content, totalCount, self.chunk_size, notifier, autoClose)
else:
entity = content
if entity is None:
entity = ''
if md5:
headers[const.CONTENT_MD5_HEADER] = md5
if sseHeader:
self.convertor._set_sse_header(sseHeader, headers, True)
try:
if notifier is not None:
notifier.start()
ret = self._make_put_request(bucketName, objectKey, pathArgs={'partNumber': partNumber, 'uploadId': uploadId},
headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='uploadPart', readable=readable)
finally:
if notifier is not None:
notifier.end()
return ret
@funcCache
def _uploadPartWithNotifier(self, bucketName, objectKey, partNumber, uploadId, content=None, isFile=False, partSize=None,
offset=0, sseHeader=None, isAttachMd5=False, md5=None, notifier=None):
self._assert_not_null(partNumber, 'partNumber is empty')
self._assert_not_null(uploadId, 'uploadId is empty')
chunkedMode = False
readable = False
if isFile:
file_path = util.safe_encode(content)
if not os.path.exists(file_path):
file_path = util.safe_trans_to_gb2312(file_path)
if not os.path.exists(file_path):
raise Exception('file [%s] does not exist' % file_path)
file_size = util.to_long(os.path.getsize(file_path))
offset = util.to_long(offset)
offset = offset if offset is not None and 0 <= offset < file_size else 0
partSize = util.to_long(partSize)
partSize = partSize if partSize is not None and 0 < partSize <= (file_size - offset) else file_size - offset
headers = {const.CONTENT_LENGTH_HEADER : util.to_string(partSize)}
if md5:
headers[const.CONTENT_MD5_HEADER] = md5
elif isAttachMd5:
headers[const.CONTENT_MD5_HEADER] = util.base64_encode(util.md5_file_encode_by_size_offset(file_path, partSize, offset, self.chunk_size))
if sseHeader is not None:
self.convertor._set_sse_header(sseHeader, headers, True)
if notifier is not None and not isinstance(notifier, progress.NoneNotifier):
readable = True
entity = util.get_file_entity_by_offset_partsize(file_path, offset, partSize, self.chunk_size, notifier)
else:
headers = {}
if content is not None and hasattr(content, 'read') and callable(content.read):
readable = True
if md5:
headers[const.CONTENT_MD5_HEADER] = md5
if sseHeader is not None:
self.convertor._set_sse_header(sseHeader, headers, True)
if partSize is None:
chunkedMode = True
entity = util.get_readable_entity(content, self.chunk_size, notifier)
else:
headers[const.CONTENT_LENGTH_HEADER] = util.to_string(partSize)
entity = util.get_readable_entity_by_totalcount(content, util.to_long(partSize), self.chunk_size, notifier)
else:
entity = content
if entity is None:
entity = ''
if md5:
headers[const.CONTENT_MD5_HEADER] = md5
if sseHeader:
self.convertor._set_sse_header(sseHeader, headers, True)
ret = self._make_put_request(bucketName, objectKey, pathArgs={'partNumber': partNumber, 'uploadId': uploadId},
headers=headers, entity=entity, chunkedMode=chunkedMode, methodName='uploadPart', readable=readable)
return ret
@funcCache
def copyObject(self, sourceBucketName, sourceObjectKey, destBucketName, destObjectKey, metadata=None, headers=None, versionId=None):
self._assert_not_null(sourceBucketName, 'sourceBucketName is empty')
sourceObjectKey = util.safe_encode(sourceObjectKey)
if sourceObjectKey is None:
sourceObjectKey = ''
destObjectKey = util.safe_encode(destObjectKey)
if destObjectKey is None:
destObjectKey = ''
if headers is None:
headers = CopyObjectHeader()
return self._make_put_request(destBucketName, destObjectKey,
methodName='copyObject', **self.convertor.trans_copy_object(metadata=metadata, headers=headers, versionId=versionId,
sourceBucketName=sourceBucketName, sourceObjectKey=sourceObjectKey))
@funcCache
def setObjectAcl(self, bucketName, objectKey, acl=ACL(), versionId=None, aclControl=None):
if acl is not None and len(acl) > 0 and aclControl is not None:
raise Exception('Both acl and aclControl are set')
if not acl and not aclControl:
raise Exception('Both acl and aclControl are not set')
return self._make_put_request(bucketName, objectKey, **self.convertor.trans_set_object_acl(acl=acl, versionId=versionId, aclControl=aclControl))
@funcCache
def getObjectAcl(self, bucketName, objectKey, versionId=None):
pathArgs = {'acl': None}
if versionId:
pathArgs[const.VERSION_ID_PARAM] = util.to_string(versionId)
return self._make_get_request(bucketName, objectKey, pathArgs=pathArgs, methodName='getObjectAcl')
@funcCache
def deleteObject(self, bucketName, objectKey, versionId=None):
path_args = {}
if versionId:
path_args[const.VERSION_ID_PARAM] = util.to_string(versionId)
return self._make_delete_request(bucketName, objectKey, pathArgs=path_args, methodName='deleteObject')
@funcCache
def deleteObjects(self, bucketName, deleteObjectsRequest):
self._assert_not_null(deleteObjectsRequest, 'deleteObjectsRequest is empty')
return self._make_post_request(bucketName, methodName='deleteObjects', **self.convertor.trans_delete_objects(deleteObjectsRequest=deleteObjectsRequest))
@funcCache
def restoreObject(self, bucketName, objectKey, days, tier=None, versionId=None):
self._assert_not_null(days, 'days is empty')
return self._make_post_request(bucketName, objectKey, **self.convertor.trans_restore_object(days=days, tier=tier, versionId=versionId))
@funcCache
def initiateMultipartUpload(self, bucketName, objectKey, acl=None, storageClass=None,
metadata=None, websiteRedirectLocation=None, contentType=None, sseHeader=None, expires=None, extensionGrants=None):
objectKey = util.safe_encode(objectKey)
if objectKey is None:
objectKey = ''
if contentType is None:
contentType = const.MIME_TYPES.get(objectKey[objectKey.rfind('.') + 1:].lower())
return self._make_post_request(bucketName, objectKey, methodName='initiateMultipartUpload',
**self.convertor.trans_initiate_multipart_upload(acl=acl, storageClass=storageClass,
metadata=metadata, websiteRedirectLocation=websiteRedirectLocation,
contentType=contentType, sseHeader=sseHeader, expires=expires, extensionGrants=extensionGrants))
@funcCache
def copyPart(self, bucketName, objectKey, partNumber, uploadId, copySource, copySourceRange=None, destSseHeader=None, sourceSseHeader=None):
self._assert_not_null(partNumber, 'partNumber is empty')
self._assert_not_null(uploadId, 'uploadId is empty')
self._assert_not_null(copySource, 'copySource is empty')
return self._make_put_request(bucketName, objectKey, methodName='copyPart', **self.convertor.trans_copy_part(partNumber=partNumber, uploadId=uploadId, copySource=copySource,
copySourceRange=copySourceRange, destSseHeader=destSseHeader, sourceSseHeader=sourceSseHeader))
@funcCache
def completeMultipartUpload(self, bucketName, objectKey, uploadId, completeMultipartUploadRequest):
self._assert_not_null(uploadId, 'uploadId is empty')
self._assert_not_null(completeMultipartUploadRequest, 'completeMultipartUploadRequest is empty')
ret = self._make_post_request(bucketName, objectKey, pathArgs={'uploadId':uploadId},
entity=self.convertor.trans_complete_multipart_upload_request(completeMultipartUploadRequest), methodName='completeMultipartUpload')
self._generate_object_url(ret, bucketName, objectKey)
return ret
@funcCache
def abortMultipartUpload(self, bucketName, objectKey, uploadId):
self._assert_not_null(uploadId, 'uploadId is empty')
return self._make_delete_request(bucketName, objectKey, pathArgs={'uploadId' : uploadId})
@funcCache
def listParts(self, bucketName, objectKey, uploadId, maxParts=None, partNumberMarker=None):
self._assert_not_null(uploadId, 'uploadId is empty')
pathArgs = {'uploadId': uploadId}
if maxParts is not None:
pathArgs['max-parts'] = maxParts
if partNumberMarker is not None:
pathArgs['part-number-marker'] = partNumberMarker
return self._make_get_request(bucketName, objectKey, pathArgs=pathArgs, methodName='listParts')
@funcCache
def getBucketStoragePolicy(self, bucketName):
return self._make_get_request(bucketName, methodName='getBucketStoragePolicy', **self.convertor.trans_get_bucket_storage_policy())
@funcCache
def setBucketStoragePolicy(self, bucketName, storageClass):
self._assert_not_null(storageClass, 'storageClass is empty')
return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_storage_policy(storageClass=storageClass))
@funcCache
def setBucketEncryption(self, bucketName, encryption, key=None):
self._assert_not_null(encryption, 'encryption is empty')
return self._make_put_request(bucketName, pathArgs={'encryption': None}, entity=self.convertor.trans_encryption(encryption=encryption, key=key))
@funcCache
def getBucketEncryption(self, bucketName):
return self._make_get_request(bucketName, methodName='getBucketEncryption', pathArgs={'encryption':None})
@funcCache
def deleteBucketEncryption(self, bucketName):
return self._make_delete_request(bucketName, pathArgs={'encryption':None})
@funcCache
def setBucketReplication(self, bucketName, replication):
self._assert_not_null(replication, 'replication is empty')
return self._make_put_request(bucketName, **self.convertor.trans_set_bucket_replication(replication=replication))
@funcCache
def getBucketReplication(self, bucketName):
return self._make_get_request(bucketName, pathArgs={'replication':None}, methodName='getBucketReplication')
@funcCache
def deleteBucketReplication(self, bucketName):
return self._make_delete_request(bucketName, pathArgs={'replication':None})
@funcCache
def uploadFile(self, bucketName, objectKey, uploadFile, partSize=9 * 1024 * 1024,
taskNum=1, enableCheckpoint=False, checkpointFile=None,
checkSum=False, metadata=None, progressCallback=None, headers=None):
self.log_client.log(INFO, 'enter resume upload file...')
self._assert_not_null(bucketName, 'bucketName is empty')
self._assert_not_null(objectKey, 'objectKey is empty')
self._assert_not_null(uploadFile, 'uploadFile is empty')
if enableCheckpoint and checkpointFile is None:
checkpointFile = uploadFile + '.upload_record'
if partSize < const.DEFAULT_MINIMUM_SIZE:
partSize = const.DEFAULT_MINIMUM_SIZE
elif partSize > const.DEFAULT_MAXIMUM_SIZE:
partSize = const.DEFAULT_MAXIMUM_SIZE
else:
partSize = util.to_int(partSize)
if taskNum <= 0:
taskNum = 1
else:
taskNum = int(math.ceil(taskNum))
return _resumer_upload(bucketName, objectKey, uploadFile, partSize, taskNum, enableCheckpoint, checkpointFile, checkSum, metadata, progressCallback, self, headers)
@funcCache
def _downloadFileWithNotifier(self, bucketName, objectKey, downloadFile=None, partSize=5 * 1024 * 1024, taskNum=1, enableCheckpoint=False,
checkpointFile=None, header=None, versionId=None, progressCallback=None, imageProcess=None, notifier=progress.NONE_NOTIFIER):
self.log_client.log(INFO, 'enter resume download...')
self._assert_not_null(bucketName, 'bucketName is empty')
self._assert_not_null(objectKey, 'objectKey is empty')
if header is None:
header = GetObjectHeader()
if downloadFile is None:
downloadFile = objectKey
if enableCheckpoint and checkpointFile is None:
checkpointFile = downloadFile + '.download_record'
if partSize < const.DEFAULT_MINIMUM_SIZE:
partSize = const.DEFAULT_MINIMUM_SIZE
elif partSize > const.DEFAULT_MAXIMUM_SIZE:
partSize = const.DEFAULT_MAXIMUM_SIZE
else:
partSize = util.to_int(partSize)
if taskNum <= 0:
taskNum = 1
else:
taskNum = int(math.ceil(taskNum))
return _resumer_download(bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckpoint, checkpointFile, header, versionId, progressCallback, self,
imageProcess, notifier)
def downloadFile(self, bucketName, objectKey, downloadFile=None, partSize=5 * 1024 * 1024, taskNum=1, enableCheckpoint=False,
checkpointFile=None, header=None, versionId=None, progressCallback=None, imageProcess=None):
return self._downloadFileWithNotifier(bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckpoint, checkpointFile, header, versionId, progressCallback, imageProcess)
def downloadFiles(self, bucketName, prefix, downloadFolder=None, taskNum=const.DEFAULT_TASK_NUM, taskQueueSize=const.DEFAULT_TASK_QUEUE_SIZE,
headers=GetObjectHeader(), imageProcess=None, interval=const.DEFAULT_BYTE_INTTERVAL, taskCallback=None, progressCallback=None,
threshold=const.DEFAULT_MAXIMUM_SIZE, partSize=5*1024*1024, subTaskNum=1, enableCheckpoint=False, checkpointFile=None):
return _download_files(self, bucketName, prefix, downloadFolder, taskNum, taskQueueSize, headers, imageProcess,
interval, taskCallback, progressCallback, threshold, partSize, subTaskNum, enableCheckpoint, checkpointFile)
ObsClient.setBucketVersioningConfiguration = ObsClient.setBucketVersioning
ObsClient.getBucketVersioningConfiguration = ObsClient.getBucketVersioning
ObsClient.deleteBucketLifecycleConfiguration = ObsClient.deleteBucketLifecycle
ObsClient.setBucketLifecycleConfiguration = ObsClient.setBucketLifecycle
ObsClient.getBucketLifecycleConfiguration = ObsClient.getBucketLifecycle
ObsClient.getBucketWebsiteConfiguration = ObsClient.getBucketWebsite
ObsClient.setBucketWebsiteConfiguration = ObsClient.setBucketWebsite
ObsClient.deleteBucketWebsiteConfiguration = ObsClient.deleteBucketWebsite
ObsClient.setBucketLoggingConfiguration = ObsClient.setBucketLogging
ObsClient.getBucketLoggingConfiguration = ObsClient.getBucketLogging
标签:存储,return,python,self,None,headers,bucketName,._,SDK 来源: https://blog.csdn.net/qq_42698918/article/details/119301928