File: /volume1/@appstore/HyperBackup/addon/synocloud_swift/python/swift_agent.py
#!/usr/bin/env python
#-*-coding: utf-8 -*-
import os, sys, json
from struct import pack, unpack
from time import strptime, mktime, time
import ssl, httplib, socket, urllib
import threading
debug = False
def log_debug(*args):
if debug:
print >> sys.stderr, ' '.join(args)
def log_error(*args):
if sys.platform == "win32":
with open('HyperBackupExplorer.log', 'a+') as f:
f.write('python[%d]: ' %os.getpid())
f.write(' '.join(args))
f.write('\n')
else:
from syslog import syslog, LOG_ERR
syslog(LOG_ERR, ' '.join(args))
def _is_scoket_exception(e):
if type(e) == socket.gaierror or type(e) == socket.error or type(e) == socket.herror or type(e) == socket.timeout:
return True
return False
def _convert_socket_error_code(message):
error_code = -1
if -1 != message.find("[Errno -2] Name or service not known"):
error_code = -2
elif -1 != message.find("[Errno -3] Temporary failure in name resolution"):
error_code = -2
elif -1 != message.find("[Errno -5] No address associated with hostname"):
error_code = -2
elif -1 != message.find("[Errno 110] Connection timed out"):
error_code = 408
elif -1 != message.find("[Errno 101] Network is unreachable"):
error_code = -4
elif -1 != message.find("[Errno 111] Connection refused"):
error_code = -4
elif -1 != message.find("[Errno 113] No route to host"):
error_code = -4
elif -1 != message.find("[Errno 104] Connection reset by peer"):
error_code = -4
elif -1 != message.find("[Errno 32] Broken pipe"):
error_code = 408
return error_code
def _is_ssl_timeout(message):
#ssl.SSLErr_timeouteption log
if -1 != message.find("timed out"):
# The read operation timed out
# The write operation timed out
# The handshake operation timed out
return True
return False
def _convert_socket_exception(e):
error_code = -1
error_msg = 'Unknown'
error_cls = 'Unknown'
try:
error_cls = type(e).__name__
if hasattr(e, '__module__') and e.__module__:
error_cls = e.__module__ + '.' + error_cls
if type(e) == socket.gaierror:
if hasattr(e, 'strerror') and e.strerror:
error_msg = e.strerror
if e.errno == -2:
# socket.gaierror: [Errno -2] Name or service not known
# 1. dns not set
# 2. set to a wrong dns
error_code = -2
elif e.errno == -3:
# socket.gaierror: [Errno -3] Temporary failure in name resolution
# man gai_strerror, and find EAI_AGAIN:
# The name server returned a temporary failure indication. Try again later.
error_code = -2
elif e.errno == -5:
# socket.gaierror: [Errno -5] No address associated with hostname
# man gai_strerror, and find EAI_NODATA:
# The specified network host exists, but does not have any network addresses defined.
error_code = -2
elif e.errno < 100 and e.errno > 0:
error_code = e.errno
elif type(e) == socket.error or type(e) == socket.herror:
if hasattr(e, 'strerror') and e.strerror:
error_msg = e.strerror
elif hasattr(e, 'args'):
error_msg = str(e.args)
if -1 != error_msg.find('Tunnel connection failed'):
error_code = -4
if e.errno == socket.errno.ETIMEDOUT:
# socket.error: [Errno 110] Connection timed out
error_code = 408
elif e.errno == socket.errno.ENETUNREACH:
# socket.error: [Errno 101] Network is unreachable
error_code = -4
elif e.errno == socket.errno.ECONNREFUSED:
# socket.error: [Errno 111] Connection refused
error_code = -4
elif e.errno == socket.errno.EHOSTUNREACH:
# socket.error: [Errno 113] No route to host
error_code = -4
elif e.errno == socket.errno.ECONNRESET:
# socket.error: [Errno 104] Connection reset by peer
error_code = -4
elif e.errno == socket.errno.EPIPE:
# socket.error: [Errno 32] Broken pipe
error_code = 408
elif e.errno < 100 and e.errno > 0:
error_code = e.errno
#log_error("get network error %d:%s" % (e.errno, error_msg))
elif type(e) == socket.timeout:
error_msg = 'timed out'
error_code = 408
else:
log_error("BUG: exception [%s], type [%s]" % (str(e), type(e)))
#log_error("error %s:%d:%s" % (error_cls, error_code, str(error_msg)))
if -1 == error_code:
if hasattr(e, "args"):
log_error("error args: [%s]" % (str(e.args)))
if hasattr(e, "errno"):
log_error("error errno: [%s]" % (str(e.errno)))
if hasattr(e, "strerror"):
log_error("error strerror: [%s]" % (str(e.strerror)))
except Exception as ee:
log_error("parse socket exception failed: [%s]" % (str(ee)))
pass
return {
'success': False,
'error_class': error_cls,
'error_message': error_msg,
'error_code': error_code
}
def _convert_exception(e):
import swiftclient
import requests
#log_error("exception: %s" % (str(e)))
error_code = -1
error_msg = 'Unknown'
error_resp = 'Unknown'
error_http_path = ''
error_trans_id = ''
error_cls = 'Unknown'
try:
error_cls = type(e).__name__
if hasattr(e, '__module__') and e.__module__:
error_cls = e.__module__ + '.' + error_cls
try:
if type(e.message) is unicode:
error_msg = repr(e.message).split('\\n')[0]
else:
error_msg = str(e.message).split('\n')[0]
log_debug("\033[31mexception: \033[0m", error_msg)
except:
log_debug("\033[31mexception: \033[0m", "could not parse error msg")
if type(e) == swiftclient.ClientException:
if hasattr(e, "http_response_header") and e.http_response_header \
and 'x-trans-id' in e.http_response_header:
error_trans_id = e.http_response_header.get('x-trans-id')
if hasattr(e, "http_response_content"):
error_resp = e.http_response_content
if hasattr(e, "http_path"):
error_http_path = e.http_path
if hasattr(e, "http_status"):
if e.http_status == 400 and e.message == "Refresh access token failed":
error_code = 401
elif e.http_status >= 100 and e.http_status < 700:
error_code = e.http_status
elif type(e) == TypeError:
# this exception will raise when base64 decode for auth failed
if error_msg == "Incorrect padding":
error_code = 401
elif type(e) == UnicodeDecodeError:
error_code = -5
elif type(e) == requests.exceptions.ConnectionError:
error_code = -4
if type(e.args[0]) == requests.packages.urllib3.exceptions.ProtocolError:
if "BadStatusLine" in str(e.args[0]):
error_code = -4
elif _is_scoket_exception(e.args[0].args[1]):
return _convert_socket_exception(e.args[0].args[1])
else:
log_error("ProtocolError: exception [%s], type [%s]" % (str(e.args[0].args[1]), type(e.args[0].args[1])))
elif type(e.args[0]) == requests.packages.urllib3.exceptions.MaxRetryError:
if hasattr(e.args[0], 'reason') and e.args[0].reason:
if type(e.args[0].reason) == requests.packages.urllib3.exceptions.NewConnectionError:
error_code = _convert_socket_error_code(str(e.args[0].reason))
else:
log_error("MaxRetryError: exception [%s], type [%s]" % (str(e.args[0].reason), type(e.args[0].reason)))
elif _is_scoket_exception(e.args[0]):
return _convert_socket_exception(e.args[0])
elif type(e.args[0]) == ssl.SSLError or type(e.args[0]) == requests.packages.urllib3.exceptions.SSLError:
if hasattr(e.args[0], 'message') and e.args[0].message:
error_msg = e.args[0].message
if _is_ssl_timeout(str(error_msg)):
error_code = 408
else:
log_error("SSLError: exception [%s], type [%s]" % (str(error_msg), type(error_msg)))
elif type(e) == requests.exceptions.SSLError:
error_code = -4
# EOF occurred in violation of protocol
# SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC
# SSL: SSLV3_ALERT_BAD_RECORD_MAC
# SSL: SSLV3_ALERT_ILLEGAL_PARAMETER
# SSL: TLSV1_ALERT_DECRYPT_ERROR
# SSL: TLSV1_ALERT_DECODE_ERROR
# SSL: CERTIFICATE_VERIFY_FAILED
if _is_ssl_timeout(str(e)):
error_code = 408
elif type(e) == requests.packages.urllib3.exceptions.NewConnectionError:
error_code = _convert_socket_error_code(str(e))
elif type(e) == requests.packages.urllib3.exceptions.ConnectTimeoutError:
error_code = 408
elif type(e) == requests.packages.urllib3.exceptions.ReadTimeoutError:
error_code = 408
elif type(e) == requests.exceptions.ConnectTimeout:
error_code = 408
elif type(e) == requests.exceptions.ReadTimeout:
error_code = 408
elif type(e) == httplib.ResponseNotReady:
error_code = -4
elif type(e) == httplib.BadStatusLine:
error_code = -4
elif type(e) == httplib.IncompleteRead:
error_code = -4
elif type(e) == ssl.SSLError:
error_code = -4
if hasattr(e, "strerror") and e.strerror:
# SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC
# SSL: SSLV3_ALERT_BAD_RECORD_MAC
# SSL: CERTIFICATE_VERIFY_FAILED
error_msg = e.strerror
if _is_ssl_timeout(str(error_msg)):
error_code = 408
elif type(e) == ssl.SSLEOFError:
# ssl.SSLEOFError: [Errno 8] EOF occurred in violation of protocol
if hasattr(e, 'strerror') and e.strerror:
error_msg = e.strerror
error_code = -4
elif type(e) == IOError:
if hasattr(e, 'strerror') and e.strerror:
error_msg = e.strerror
error_code = _convert_socket_error_code(str(e))
elif _is_scoket_exception(e):
return _convert_socket_exception(e)
else:
log_error("exception [%s]" % str(e))
log_error("type [%s]" % type(e))
#log_error("error %s:%d:%s:%s:%s:%s" % (error_cls, error_code, str(error_msg), str(error_resp), str(error_http_path), str(error_trans_id)))
if -1 == error_code:
if hasattr(e, "args"):
log_error("error args: [%s]" % (str(e.args)))
if hasattr(e, "errno"):
log_error("error errno: [%s]" % (str(e.errno)))
if hasattr(e, "strerror"):
log_error("error strerror: [%s]" % (str(e.strerror)))
except Exception as ee:
log_error("parse exception failed: [%s]" % (str(ee)))
pass
return {
'success': False,
'error_class': error_cls,
'error_message': error_msg,
'error_response': error_resp,
'error_http_path': error_http_path,
'error_trans_id': error_trans_id,
'error_code': error_code
}
def _convert_GMT_time_str(val):
origin_tz = None
if os.environ.get('TZ'):
origin_tz = os.environ.get('TZ')
os.environ['TZ'] = 'GMT'
timestamp = int(mktime(strptime(val, '%a, %d %b %Y %H:%M:%S %Z')))
if origin_tz:
os.environ['TZ'] = origin_tz
return timestamp
def _convert_UTC_time_str(val):
origin_tz = None
if os.environ.get('TZ'):
origin_tz = os.environ.get('TZ')
os.environ['TZ'] = 'UTC'
time_val, sec_xtime = str(val).split('.')
timestamp = int(mktime(strptime(time_val, '%Y-%m-%dT%H:%M:%S')))
sec_xtime = sec_xtime.rstrip('Z')
if int(sec_xtime) > 0:
timestamp += 1
if origin_tz:
os.environ['TZ'] = origin_tz
return timestamp
def _convert_properties(properties):
out_json = {
# 2015-10-01T02:52:24.169600 / 1443667944.169600 -> 1443667945
'LastModified': _convert_UTC_time_str(properties.get('last_modified')),
'ETag': properties.get('hash'),
'ContentType': properties.get('content_type'),
'ContentLength': properties.get('bytes')
}
return out_json
def _convert_properties_by_dict(properties):
out_json = {
'LastModified': _convert_GMT_time_str(properties.get('last-modified')),
'ETag': properties.get('etag'),
'ContentType': properties.get('content-type'),
'ContentLength': properties.get('content-length')
}
return out_json
def _convert_swift_url(catalog):
# [{u'endpoints': [{u'url': u'https://s2swift.eu.c2.synology.com:443/v1/AUTH_xxx'}],
# u'name': u'swift'}]
swift_url = ''
for item in catalog:
if 'swift' != item.get('name'):
continue
endpoints = item.get('endpoints')
for endpoint in endpoints:
swift_url = endpoint.get('url')
break
if '' != swift_url:
break
return swift_url
class SimpleIO(object):
def read_int(self):
data = ''
n = 4
while n > 0:
read_data = sys.stdin.read(n)
if 0 == len(read_data):
raise StopIteration
data += read_data
n -= len(read_data)
if data:
return unpack('i', data)[0]
else:
# check eof?
raise StopIteration
def read_string(self):
n = self.read_int()
if 0 == n:
return ''
data = ''
while n > 0:
read_data = sys.stdin.read(n)
data += read_data
n -= len(read_data)
if data:
return data
else:
raise SystemError
def read_json(self):
json_str = self.read_string()
if json_str:
return json.loads(json_str)
else:
return None
def write_int(self, val):
data = pack('i', val)
sys.stdout.write(data)
sys.stdout.flush()
def write_string(self, val):
self.write_int(len(val))
sys.stdout.write(val)
sys.stdout.flush()
# log
def write_json(self, val):
s = json.dumps(val)
self.write_string(s)
def write_exception(self, e):
self.write_json(_convert_exception(e))
class FileObjectWithProgress(object):
"""
Readable file object with progress wrapper.
"""
def __init__(self, io):
"""
Wrap the underlying file object
:param file_obj: the file object to wrap
"""
self._io = io
self._file_obj = None
self._prev_process_size = 0
def set_file_obj(self, file_obj):
self._file_obj = file_obj
self._prev_process_size = 0
def read(self, length=None):
read_data = self._file_obj.read(length)
self._prev_process_size += len(read_data)
res = {
'success': True,
'complete': False,
'uploaded': self._prev_process_size
}
if self._io:
self._io.write_json(res)
else:
print res
return read_data
def write(self, write_data):
self._file_obj.write(write_data)
self._prev_process_size += len(write_data)
res = {
'success': True,
'complete': False,
'downloaded': self._prev_process_size
}
if self._io:
self._io.write_json(res)
else:
print res
return None
def tell(self):
return self._file_obj.tell();
def seek(self, offset, whence=os.SEEK_SET):
self._file_obj.seek(offset, whence)
class SynoCloudSwiftApi(object):
def __init__(self, io=None):
import swiftclient
self._file_obj_progress = FileObjectWithProgress(io)
self._io = io
self._syno_socket_timeout = 600
auth_url = os.environ['SYNOCLOUD_AUTH_URL']
access_key = os.environ.get('SYNOCLOUD_ACCESS_KEY')
secret_key = os.environ.get('SYNOCLOUD_SECRET_KEY')
retries = int(os.environ.get('SYNOCLOUD_RETRIES'))
tenant_id = os.environ.get('SYNOCLOUD_TENANT_ID')
swift_url = os.environ.get('SYNOCLOUD_PREAUTHURL')
scoped_token = os.environ.get('SYNOCLOUD_PREAUTHTOKEN')
user_agent = os.environ.get('SYNO_USER_AGENT')
insecure = bool(os.environ.get('OPENSTACK_INSECURE'))
max_send_speed = os.environ.get('MAX_SEND_SPEED') # Bps
#log_error("auth_url: %s" % str(auth_url))
self._lock = threading.Lock()
self._throttler = None
if max_send_speed:
self._throttler = swiftclient.Throttler(int(max_send_speed) + 1, self._lock) # Bps
self._throttler.activate()
self._auth_url = auth_url
self._tenant_id = tenant_id
self._scoped_token = scoped_token
self._user_agent = user_agent
self._insecure = insecure
# overwrite swiftclient's get_auth
swiftclient.client.get_auth = self.get_auth
self._swift = swiftclient.Connection(
authurl=auth_url,
user=access_key,
key=secret_key,
retries=retries,
preauthurl=swift_url,
preauthtoken=scoped_token,
auth_version='3.0',
insecure = insecure,
os_options={
'tenant_id': tenant_id
},
timeout=self._syno_socket_timeout)
self._service_options = {
'auth_version': '3',
'insecure': insecure,
'retries': retries,
'os_auth_url': auth_url,
'os_username': access_key,
'os_password': secret_key,
'os_tenant_id': tenant_id,
'os_storage_url': swift_url,
'os_auth_token': scoped_token
}
self._swift_service = None
self._is_set_upload_thread_num = False
self._upload_thread_num = 0
# could not delete SYNO_USER_AGENT env, it used in other place (swiftclient/client.py)
del os.environ['SYNOCLOUD_AUTH_URL']
del os.environ['SYNOCLOUD_ACCESS_KEY']
del os.environ['SYNOCLOUD_SECRET_KEY']
del os.environ['SYNOCLOUD_RETRIES']
del os.environ['SYNOCLOUD_TENANT_ID']
del os.environ['SYNOCLOUD_PREAUTHURL']
del os.environ['SYNOCLOUD_PREAUTHTOKEN']
if os.environ.get('OPENSTACK_INSECURE'):
del os.environ['OPENSTACK_INSECURE']
log_debug('agent created')
# overwrite swiftclient's get_auth
def get_auth(self, auth_url, user, key, **kwargs):
import swiftclient
os_options = kwargs.get('os_options', {})
if not os_options.get('tenant_id'):
raise ClientException('tenant id is not specified.')
tenant_id = os_options.get('tenant_id')
params = '{"source": "robot", "username": "' + user \
+ '", "password": "' + key \
+ '", "project_id": "' + tenant_id + '"}'
headers = {
"X-Backup-Date": str(time()),
"Content-Type": "application/json",
"User-Agent": str(self._user_agent)
}
conn, resp = self._request(auth_url, 'POST', "/backup/v1/token",
params, headers)
resp_read = resp.read()
conn.close()
try:
result = json.loads(resp_read)
except:
result = resp_read
# print resp.status, resp.reason, result
# 200 OK
#{'data': {'scoped_token': 'xxx',
# 'catalog': [{'endpoints': [{'url': 'https://swift.eu.c2.synology.com:8080/v1/AUTH_xxx'}],
# 'name': 'swift'}]}}
if 200 > resp.status or 300 <= resp.status: # not 2xx success
if type(result) == dict:
err_msg = str(result.get('title')) + ": " + str(result.get('description'))
else:
err_msg = str(result.replace('\n',''))
raise swiftclient.ClientException('get_auth failed. [%s]' % (err_msg),
http_host=conn.host, http_status=resp.status, http_reason=resp.reason)
swift_url = _convert_swift_url(result['data']['catalog'])
scoped_token = str(result['data']['scoped_token'])
#log_error("auth_ip: %s" % str(socket.gethostbyname(auth_url)))
#swift_host_begin_idx = str.find(swift_url, '://') + 3
#swift_host_end_idx = str.rfind(swift_url, ':')
#swifh_host = socket.gethostbyname(swift_url[swift_host_begin_idx:swift_host_end_idx])
#log_error("swift_ip: %s" % str(swifh_host))
#log_error("(get_auth) swift_url: %s" % str(swift_url))
#log_error("(get_auth) scoped_token: %s" % str(scoped_token))
return swift_url, scoped_token
def getAuth(self, in_json):
swift_url, scoped_token = self._swift.get_auth()
self._scoped_token = scoped_token
return {
'success': True,
'token': scoped_token,
'endpoint': swift_url
}
def getAuthInfo(self, in_json):
storage_url, token = self._swift.get_auth_info()
if not storage_url or not token:
return {
'success': False
}
return {
'success': True,
'token': token,
'endpoint': storage_url
}
# for test, no caller
def getCapabilities(self, in_json):
res = self._swift.get_capabilities()
return {
'success': True,
'capabilities': res
}
# for test, no caller
def headAccount(self, in_json):
headers = self._swift.head_account()
# 404 not found
return {
'success': True,
'account': headers
}
def checkQuota(self, in_json):
# use post_object to check quota
headers = {}
headers['x-check-backup-source-size'] = in_json['sourceSize']
res = self._swift.post_object(
container=in_json['container'],
obj=in_json['name'],
headers=headers)
return {
'success': True
}
def _request(self, url, method, api, params, headers):
from urlparse import urlparse
url = urlparse(url)
if url.scheme == 'https':
if self._insecure:
ctx = ssl._create_unverified_context()
conn = httplib.HTTPSConnection(*url.netloc.rsplit(':', 1), \
context = ctx, timeout = self._syno_socket_timeout)
else:
conn = httplib.HTTPSConnection(*url.netloc.rsplit(':', 1), \
timeout = self._syno_socket_timeout)
else:
conn = httplib.HTTPConnection(*url.netloc.rsplit(':', 1), \
timeout = self._syno_socket_timeout)
conn.connect()
conn.request(method, api, params, headers)
resp = conn.getresponse()
return conn, resp
# for headBackupService(), headBackupAccount(), setMeta(), getMeta(), deleteRobot()
def callBackupApi(self, caller, method, api, headers):
import swiftclient
self.getAuth({})
headers['x-auth-token'] = str(self._scoped_token)
headers['x-backup-date'] = str(time())
headers['user-agent'] = str(self._user_agent)
conn, resp = self._request(self._auth_url, method, api, '', headers)
resp_read = resp.read()
resp_headers = resp.getheaders()
conn.close()
try:
result = json.loads(resp_read)
except:
result = resp_read
if 200 > resp.status or 300 <= resp.status: # not 2xx success
if type(result) == dict:
err_msg = str(result.get('title')) + ": " + str(result.get('description'))
else:
err_msg = str(result.replace('\n',''))
raise swiftclient.ClientException('%s failed. [%s]' % (caller, err_msg),
http_host=conn.host, http_status=resp.status, http_reason=resp.reason)
# convert headers to lowercase for case-insensitive
result_headers = {}
for header in resp_headers:
result_headers[header[0].lower()] = header[1]
return result_headers, result
def headBackupService(self, in_json):
api = '/backup/v1/service/setting'
result = self.callBackupApi('headBackupService', 'HEAD', api, {})[0]
service_meta = {}
if 'x-backup-upload-thread' in result:
service_meta['upload_thread_num'] = result['x-backup-upload-thread']
if 'x-backup-download-thread' in result:
service_meta['download_thread_num'] = result['x-backup-download-thread']
if 'x-backup-compact-ratio' in result:
service_meta['compact_ratio'] = result['x-backup-compact-ratio']
return {
'service_meta': service_meta,
'success': True
}
def headBackupAccount(self, in_json):
api = '/backup/v1/resource/{account}'.format(account = self._tenant_id)
result = self.callBackupApi('headBackupAccount', 'HEAD', api, in_json['headers'])[0]
account_meta = {}
if 'x-backup-plan-group' not in result:
return {
'account_meta': account_meta,
'success': True
}
account_meta['plan_group'] = result['x-backup-plan-group']
if 'x-backup-synoaccount' in result:
account_meta['synoaccount'] = result['x-backup-synoaccount']
if 'x-backup-plan-period' in result:
account_meta['plan_period'] = result['x-backup-plan-period']
if 'x-backup-expired-time' in result:
account_meta['expire_time'] = result['x-backup-expired-time']
if 'x-backup-statistics-display' in result:
account_meta['statistics_display'] = result['x-backup-statistics-display']
if 'x-backup-schedule-constraint' in result:
if 'none' == result['x-backup-schedule-constraint']:
account_meta['schedule_rule'] = 'unlimited'
elif 'daily' == result['x-backup-schedule-constraint']:
account_meta['schedule_rule'] = 'once_a_day'
else:
account_meta['schedule_rule'] = 'none'
if 'x-backup-version-constraint' in result:
account_meta['version_delete'] = 'disable'
account_meta['version_lock'] = 'disable'
account_meta['version_rotation'] = 'none'
if 'none' == result['x-backup-version-constraint']:
account_meta['version_delete'] = 'enable'
account_meta['version_lock'] = 'enable'
account_meta['version_rotation'] = 'custom'
elif 'smart' == result['x-backup-version-constraint']:
account_meta['version_rotation'] = 'basic'
if 'business' == account_meta['plan_group']:
account_meta['quota'] = result['x-backup-target-quota']
account_meta['space_usage'] = 'target'
else:
account_meta['quota'] = result['x-backup-source-quota']
account_meta['space_usage'] = 'source'
return {
'account_meta': account_meta,
'success': True
}
def setMeta(self, in_json): # set meta (to anchor file)
api = '/backup/v1/resource/{account}/{target}'.format(account = self._tenant_id, target = urllib.quote(in_json['targetId'].encode('utf-8')))
headers = {}
if in_json.get('meta'):
meta = in_json.get('meta')
headers['x-backup-source-size'] = str(meta.get('bkpSourceSize', 0))
headers['x-backup-target-size'] = str(meta.get('bkpTargetSize', 0))
headers['x-backup-start-time'] = str(meta.get('bkpStartTime', 0))
headers['x-backup-end-time'] = str(meta.get('bkpEndTime', 0))
headers['x-backup-hostname'] = str(meta.get('bkpHostName', ''))
headers['x-backup-robot-id'] = str(meta.get('bkpRobotId', ''))
headers['x-backup-state'] = str(meta.get('bkpState', ''))
self.callBackupApi('setMeta', 'POST', api, headers)
return {
'success': True
}
def getMeta(self, in_json):
api = '/backup/v1/resource/{account}/{target}'.format(account = self._tenant_id, target = urllib.quote(in_json['targetId'].encode('utf-8')))
result = self.callBackupApi('getMeta', 'GET', api, {})[1]
meta = {}
# [{u'timestamp': 1485151539, u'version_id': u'4'},
# {u'timestamp': 1485150314, u'version_id': u'1'}]
meta['expectVersionList'] = result['data']['version_info_list']
# [{u'status': u'Complete', u'timestamp': 1485151539, u'version_id': u'4'},
# {u'status': u'Complete', u'timestamp': 1485150637, u'version_id': u'3'},
# {u'status': u'Complete', u'timestamp': 1485150314, u'version_id': u'1'}]
meta['realVersionList'] = result['data']['real_version_list']
return {
'success': True,
'meta': meta
}
def deleteRobot(self, in_json):
api = '/backup/v1/robot/{robot_id}'.format(robot_id = in_json['robotId'])
self.callBackupApi('deleteRobot', 'DELETE', api, {})
return {
'success': True
}
def headContainer(self, in_json):
headers = self._swift.head_container(in_json['container'])
# 404 not found
return {
'success': True,
'LastModified': _convert_GMT_time_str(headers.get('date'))
}
def putObject(self, in_json):
res = {}
with open(in_json['fileInput'], "rb") as stream:
self._file_obj_progress.set_file_obj(stream)
self._swift.put_object(
headers=in_json['headers'],
container=in_json['container'],
obj=in_json['name'],
contents=self._file_obj_progress,
response_dict=res,
throttler=self._throttler)
code = res.get('status')
# 201 Normal response
# 408 timeout
# 400 name len > 1024
# 411 length required
# 413 large file
# 422 unprocessable entity
if code == 201:
return {'success': True, 'error_code': code, 'error_message': res.get('reason'),
'Properties': _convert_properties_by_dict(res.get('headers'))}
else:
return {'success': False, 'error_code': code, 'error_message': res.get('reason')}
def putDirectory(self, in_json):
res = {}
self._swift.put_object(
container=in_json['container'],
obj=in_json['name'],
contents='',
content_type='application/directory',
response_dict=res,
throttler=self._throttler)
code = res.get('status')
# 201 Normal response
# 408 timeout
# 400 name len > 1024
# 411 length required
# 413 large file
# 422 unprocessable entity
if code == 201:
return {'success': True, 'error_code': code, 'error_message': res.get('reason')}
else:
return {'success': False, 'error_code': code, 'error_message': res.get('reason')}
def listObjects(self, in_json):
limit_count = 10000
marker_obj = in_json.get('Marker', None)
prefix_obj = in_json.get('Prefix', None)
delimiter_obj = in_json.get('Delimiter', None)
headers, objects = self._swift.get_container(
container=in_json['container'],
marker=marker_obj,
limit=limit_count,
prefix=prefix_obj,
delimiter=delimiter_obj)
targetType = in_json.get('TargetType', 'all')
count = 0
out_json = {
'success': True,
'folder': [],
'file': []
}
for o in objects:
if targetType == 'folder' or targetType == 'all':
if o.get('subdir'):
out_json['folder'].append({'Name': o.get('subdir')})
count += 1
if targetType == 'file' or targetType == 'all':
if o.get('name'):
out_json['file'].append({
'Name': o.get('name'),
'Properties': _convert_properties(o)
})
count += 1
if len(objects) == limit_count:
if o.get('name'):
out_json['NextMarker'] = objects[limit_count - 1].get('name')
else:
out_json['NextMarker'] = objects[limit_count - 1].get('subdir')
out_json['count'] = count
return out_json
def headObject(self, in_json):
headers = self._swift.head_object(
container=in_json['container'],
obj=in_json['name'])
properties = _convert_properties_by_dict(headers)
if "\"" == properties['ETag'][0] and "\"" == properties['ETag'][-1]:
properties['ETag'] = properties['ETag'][1:-1]
return {
'success': True,
'Properties': properties
}
# for test, no caller
def deleteObject(self, in_json):
res = {}
self._swift.delete_object(
container=in_json['container'],
obj=in_json['name'],
response_dict=res)
code = res.get('status')
# 204 Normal response
# 400
# 404 not found
# 500
if code == 204:
return {'success': True, 'error_code': code, 'error_message': res.get('reason')}
else:
return {'success': False, 'error_code': code, 'error_message': res.get('reason')}
def getObject(self, in_json):
headers_obj = None
read_len = 65536
if in_json.get('RangeStart') and in_json.get('RangeEnd'):
headers_obj = {'Range': 'bytes=' + in_json.get('RangeStart') + '-' + in_json.get('RangeEnd')}
hdrs, body = self._swift.get_object(
container=in_json['container'],
obj=in_json['name'],
resp_chunk_size=read_len,
headers=headers_obj)
with open(in_json['fileOutput'], "wb") as stream:
self._file_obj_progress.set_file_obj(stream)
while True:
try:
self._file_obj_progress.write(body.next())
except StopIteration:
break
# 404 not found
return {'success': True}
# for putLargeOrObjects()
def writeJson(self, res):
if self._io:
self._io.write_json(res)
else:
print res
# upload one or many small or large objects
def putLargeOrObjects(self, in_json):
# segmentation object format
# FILENAME/ mtime /file size/SEND_SEGMENTATION_SIZE/object seq
# ex: FILENAME/1452068374.782341/190560256/104857600 /00000000
# ex: FILENAME/1452068374.782341/190560256/104857600 /00000001
from swiftclient.service import SwiftService
from swiftclient.service import SwiftUploadObject
import swiftclient
if self._swift_service is None or not self._is_set_upload_thread_num:
storage_url, token = self._swift.get_auth_info()
if storage_url and token:
self._service_options['os_storage_url'] = storage_url
self._service_options['os_auth_token'] = token
self._service_options['object_uu_threads'] = in_json['uploadThreadNum']
self._upload_thread_num = in_json['uploadThreadNum']
self._is_set_upload_thread_num = True
self._swift_service = SwiftService(self._service_options)
elif in_json['uploadThreadNum'] != self._upload_thread_num:
log_error("Warning: setting uploadThreadNum [{}] is no effect, " \
"current _upload_thread_num is [{}]".format( \
in_json['uploadThreadNum'], self._upload_thread_num))
upload_objects = []
file_list = in_json['fileList']
for f in file_list:
obj = SwiftUploadObject(
source=f['fileInput'],
object_name=f['name'])
upload_objects.append(obj)
generator = self._swift_service.upload(
container=in_json['container'],
objects=upload_objects,
options={
'segment_size': in_json['segmentSize'],
'segment_container': in_json['container']},
throttler=self._throttler)
for r in generator:
if r['success']:
if 'object' in r and 'large_object' in r and 'upload_object' == r['action']:
if 'manifest_response_dict' in r:
self.writeJson({
'success': True,
'complete': False,
'object': r['object'],
'large_object': r['large_object'],
'error_code': r['manifest_response_dict'].get('status'),
'error_message': r['manifest_response_dict'].get('reason'),
'Properties': _convert_properties_by_dict(r['manifest_response_dict'].get('headers'))})
elif 'response_dict' in r:
self.writeJson({
'success': True,
'complete': False,
'object': r['object'],
'large_object': r['large_object'],
'error_code': r['response_dict'].get('status'),
'error_message': r['response_dict'].get('reason'),
'Properties': _convert_properties_by_dict(r['response_dict'].get('headers'))})
elif 'for_object' in r:
self.writeJson({
'success': True,
'object_complete': False,
'complete': False,
'segment_location': r['segment_location'],
'segment_size': r['segment_size']})
else:
e = r['error']
# ignore 403 Forbidden - Container PUT failed error
# since SwiftService.upload() try to create the container (in case it doesn't exist)
if type(e) == swiftclient.ClientException \
and hasattr(e, "http_status") and e.http_status == 403 \
and hasattr(e, "message") and e.message == "Container PUT failed":
continue
return _convert_exception(e)
return {
'success': True
}
def deleteLargeObject(self, in_json):
from swiftclient.service import SwiftService
if self._swift_service is None:
storage_url, token = self._swift.get_auth_info()
if storage_url and token:
self._service_options['os_storage_url'] = storage_url
self._service_options['os_auth_token'] = token
self._swift_service = SwiftService(self._service_options)
res = {}
generator = self._swift_service.delete(
container=in_json['container'],
objects=[in_json['name']])
for r in generator:
if r['success']:
if r['action'] == 'delete_object':
return {
'success': True,
'error_code': r['response_dict'].get('status'),
'error_message': r['response_dict'].get('reason')}
#elif r['action'] == 'delete_segment':
#elif r['action'] == 'delete_container':
else:
return _convert_exception(r['error'])
def start_server():
io = SimpleIO()
try:
api = SynoCloudSwiftApi(io)
except Exception as e:
io.write_exception(e)
log_error("launch swift api failed: exception [%s], type [%s]" % (str(e), type(e)))
return False
io.write_string('start')
while True:
try:
in_json = io.read_json()
fn_name = in_json['fn']
fn = getattr(api, fn_name)
del in_json['fn']
if fn is None:
raise SystemError('no such fn: ' + fn_name)
#log_debug('\033[34mexecute: ' + fn_name + ' ' + json.dumps(in_json) + '\033[0m');
res = fn(in_json)
io.write_json(res)
except StopIteration:
break
except Exception as e:
# log_error('Exception: {}'.format(e))
# stacks = traceback.format_tb(e.__traceback__)
# log_error('{}'.format(''.join(stacks)))
io.write_exception(e)
continue
if __name__ == '__main__':
# Under window, there are text mode and binary mode for files,
# Set binary mode before sending binary data(to stdout)
# to prevent some unexpected behavior!
if sys.platform == "win32":
import os, msvcrt
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
# add include path
script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
sys.path.insert(0, script_dir + '/swiftclient')
sys.path.insert(1, script_dir + '/module')
res = start_server()
sys.exit(0 if res else 1)