File: /volume1/@appstore/HyperBackup/addon/azure_blob/python/azure_agent.py
#!/usr/bin/env python
#-*-coding: utf-8 -*-
import os, sys, json, ssl
from struct import pack, unpack
from time import strptime, mktime
from syslog import syslog, LOG_ERR
debug = False
def log_debug(*args):
if debug:
print >> sys.stderr, ' '.join(args)
class AzureErrorBody():
'''
Windows Azure Error Body class from xml.
https://msdn.microsoft.com/en-us/library/azure/dd179382.aspx
'''
# parsing something like:
#<?xml version="1.0" encoding="utf-8"?>
#<Error>
# <Code>...</Code>
# <Message>...</Message>
# <AuthenticationErrorDetail>...</AuthenticationErrorDetail>
# <QueryParameterName>...</QueryParameterName>
# <QueryParameterValue>...</QueryParameterValue>
# <Reason>...</Reason>
#</Error>
def __init__(self):
self.code = u''
self.message = u''
self.authenticationerrordetail = u''
def _is_scoket_exception(e):
import socket
if type(e) == socket.gaierror or type(e) == socket.error or type(e) == socket.herror or type(e) == socket.timeout:
return True
return False
def _convert_socket_error_code(message, auth_done):
error_code = -1
if -1 != message.find("[Errno -2] Name or service not known"):
if auth_done:
error_code = -4
else:
error_code = -2
elif -1 != message.find("[Errno -3] Temporary failure in name resolution"):
error_code = -4
elif -1 != message.find("[Errno -5] No address associated with hostname"):
error_code = -4
elif -1 != message.find("[Errno 110] Connection timed out"):
error_code = 408
elif -1 != message.find("[Errno 101] Network is unreachable"):
error_code = -4
elif -1 != message.find("[Errno 111] Connection refused"):
error_code = -4
elif -1 != message.find("[Errno 113] No route to host"):
error_code = -4
elif -1 != message.find("[Errno 104] Connection reset by peer"):
error_code = -4
elif -1 != message.find("[Errno 32] Broken pipe"):
error_code = 408
return error_code
def _is_ssl_timeout(message):
#ssl.SSLErr_timeouteption log
if -1 != message.find("timed out"):
# The read operation timed out
# The write operation timed out
# The handshake operation timed out
return True
return False
def _convert_socket_exception(e, auth_done):
import socket
error_code = -1
error_msg = 'Unknown'
error_cls = 'Unknown'
try:
error_cls = type(e).__name__
if hasattr(e, '__module__') and e.__module__:
error_cls = e.__module__ + '.' + error_cls
if type(e) == socket.gaierror:
if hasattr(e, 'strerror') and e.strerror:
error_msg = e.strerror
if e.errno == -2:
# socket.gaierror: [Errno -2] Name or service not known
# 1. dns not set
# 2. set to a wrong dns
if auth_done:
error_code = -4
else:
error_code = -2
elif e.errno == -3:
# socket.gaierror: [Errno -3] Temporary failure in name resolution
# man gai_strerror, and find EAI_AGAIN:
# The name server returned a temporary failure indication. Try again later.
error_code = -4
elif e.errno == -5:
# socket.gaierror: [Errno -5] No address associated with hostname
# man gai_strerror, and find EAI_NODATA:
# The specified network host exists, but does not have any network addresses defined.
error_code = -4
elif e.errno < 100 and e.errno > 0:
error_code = e.errno
elif type(e) == socket.error or type(e) == socket.herror:
if hasattr(e, 'strerror') and e.strerror:
error_msg = e.strerror
elif hasattr(e, 'args'):
error_msg = str(e.args)
if -1 != error_msg.find('Tunnel connection failed'):
error_code = -4
if e.errno == socket.errno.ETIMEDOUT:
# socket.error: [Errno 110] Connection timed out
error_code = 408
elif e.errno == socket.errno.ENETUNREACH:
# socket.error: [Errno 101] Network is unreachable
error_code = -4
elif e.errno == socket.errno.ECONNREFUSED:
# socket.error: [Errno 111] Connection refused
error_code = -4
elif e.errno == socket.errno.EHOSTUNREACH:
# socket.error: [Errno 113] No route to host
error_code = -4
elif e.errno == socket.errno.ECONNRESET:
# socket.error: [Errno 104] Connection reset by peer
error_code = -4
elif e.errno == socket.errno.EPIPE:
# socket.error: [Errno 32] Broken pipe
error_code = 408
elif e.errno < 100 and e.errno > 0:
error_code = e.errno
#syslog(LOG_ERR, "get network error %d:%s" % (e.errno, error_msg))
elif type(e) == socket.timeout:
error_msg = 'timed out'
error_code = 408
else:
syslog(LOG_ERR, "BUG: exception [%s], type [%s]" % (str(e), type(e)))
#syslog(LOG_ERR, "error %s:%d:%s" % (error_cls, error_code, str(error_msg)))
if -1 == error_code:
if hasattr(e, "args"):
syslog(LOG_ERR, "error args: [%s]" % (str(e.args)))
if hasattr(e, "errno"):
syslog(LOG_ERR, "error errno: [%s]" % (str(e.errno)))
if hasattr(e, "strerror"):
syslog(LOG_ERR, "error strerror: [%s]" % (str(e.strerror)))
except Exception as ee:
syslog(LOG_ERR, "parse socket exception failed: [%s]" % (str(ee)))
pass
return {
'success': False,
'error_class': error_cls,
'error_message': error_msg,
'error_code': error_code
}
def _convert_exception(e, auth_done):
import azure
import httplib
#import pdb; pdb.set_trace()
error_code = -1
error_msg = 'Unknown'
error_cls = 'Unknown'
try:
error_cls = type(e).__name__
if hasattr(e, '__module__') and e.__module__:
error_cls = e.__module__ + '.' + error_cls
try:
if type(e.message) is unicode:
error_msg = repr(e.message).split('\\n')[0]
else:
error_msg = str(e.message).split('\n')[0]
log_debug("\033[31mexception: \033[0m", error_msg)
except:
log_debug("\033[31mexception: \033[0m", "could not parse error msg")
if type(e) == azure.WindowsAzureConflictError:
error_code = 409
elif type(e) == azure.WindowsAzureMissingResourceError:
error_code = 404
elif type(e) == azure.WindowsAzureError:
if hasattr(e, "_http_code"):
if e._http_code >= 100 and e._http_code < 700:
error_code = e._http_code
if hasattr(e, "_resp_body"):
node = azure.ETree.fromstring(e._resp_body)
error_body = azure._ETreeXmlToObject.fill_instance_element(
node, AzureErrorBody)
if error_body.code == 'AuthenticationFailed':
error_code = -2
if error_body.authenticationerrordetail:
error_msg = error_body.authenticationerrordetail
if error_msg == azure._ERROR_STORAGE_MISSING_INFO:
error_code = -2
elif type(e) == TypeError:
# this exception will raise when base64 decode for auth failed
if error_msg == "Incorrect padding":
error_code = -2
elif type(e) == UnicodeDecodeError:
error_code = -5
elif type(e) == httplib.ResponseNotReady:
error_code = -4
elif type(e) == httplib.BadStatusLine:
error_code = -4
elif type(e) == httplib.IncompleteRead:
error_code = -4
elif type(e) == ssl.SSLError:
error_code = -4
if hasattr(e, "strerror") and e.strerror:
# SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC
# SSL: SSLV3_ALERT_BAD_RECORD_MAC
# SSL: CERTIFICATE_VERIFY_FAILED
error_msg = e.strerror
if _is_ssl_timeout(str(error_msg)):
error_code = 408
elif type(e) == ssl.SSLEOFError:
# ssl.SSLEOFError: [Errno 8] EOF occurred in violation of protocol
if hasattr(e, 'strerror') and e.strerror:
error_msg = e.strerror
error_code = -4
elif type(e) == IOError:
if hasattr(e, 'strerror') and e.strerror:
error_msg = e.strerror
error_code = _convert_socket_error_code(str(e), auth_done)
elif _is_scoket_exception(e):
return _convert_socket_exception(e, auth_done)
else:
syslog(LOG_ERR, "exception [%s]" % str(e))
syslog(LOG_ERR, "type [%s]" % type(e))
#syslog(LOG_ERR, "error %s:%d:%s" % (error_cls, error_code, str(error_msg)))
if -1 == error_code:
if hasattr(e, "args"):
syslog(LOG_ERR, "error args: [%s]" % (str(e.args)))
if hasattr(e, "errno"):
syslog(LOG_ERR, "error errno: [%s]" % (str(e.errno)))
if hasattr(e, "strerror"):
syslog(LOG_ERR, "error strerror: [%s]" % (str(e.strerror)))
except Exception as ee:
syslog(LOG_ERR, "parse exception failed. %s" % str(ee))
pass
return {
'success': False,
'error_class': error_cls,
'error_message': error_msg,
'error_code': error_code
}
def _convert_time_str(val):
origin_tz = None
if os.environ.get('TZ'):
origin_tz = os.environ.get('TZ')
os.environ['TZ'] = 'GMT'
timestamp = int(mktime(strptime(val, '%a, %d %b %Y %H:%M:%S %Z')))
if origin_tz:
os.environ['TZ'] = origin_tz
return timestamp
def _convert_properties(properties):
out_json = {
# Fri, 25 Jul 2014 09:36:56 GMT
'LastModified': _convert_time_str(properties.last_modified),
'MD5': properties.content_md5,
'ETag': properties.etag,
'ContentType': properties.content_type,
'ContentLength': properties.content_length,
'BlobType': properties.blob_type
}
return out_json
def _convert_properties_by_dict(properties):
out_json = {
'LastModified': _convert_time_str(properties['last-modified']),
'MD5': properties.get('content-md5', ''),
# remove etag dobule quote
'ETag': properties['etag'][1:-1],
'ContentLength': int(properties['content-length']),
'BlobType': properties['x-ms-blob-type']
}
if 'content-type' in properties:
out_json['ContentType'] = properties['content-type'];
if 'x-ms-meta-hdi_isfolder' in properties:
out_json['hdi_isfolder'] = 'true' == properties['x-ms-meta-hdi_isfolder'];
return out_json
def _convert_properties_by_put(properties):
out_json = {
'LastModified': _convert_time_str(properties['last-modified']),
'MD5': properties['content-md5'],
# remove etag dobule quote
'ETag': properties['etag'][1:-1],
}
return out_json
def _convert_properties_by_commit(properties):
out_json = {
'LastModified': _convert_time_str(properties['last-modified']),
'MD5': '',
# remove etag dobule quote
'ETag': properties['etag'][1:-1],
}
return out_json
def _convert_block_id(id_prefix, counter):
return id_prefix + '_{0:08d}'.format(counter)
class SimpleIO(object):
def read_int(self):
data = ''
n = 4
while n > 0:
read_data = sys.stdin.read(n)
if 0 == len(read_data):
raise StopIteration
data += read_data
n -= len(read_data)
if data:
return unpack('i', data)[0]
else:
# check eof?
raise StopIteration
def read_string(self):
n = self.read_int()
if 0 == n:
return ''
data = ''
while n > 0:
read_data = sys.stdin.read(n)
if 0 == len(read_data):
raise StopIteration
data += read_data
n -= len(read_data)
if data:
return data
else:
raise SystemError
def read_json(self):
json_str = self.read_string()
if json_str:
return json.loads(json_str)
else:
return None
def write_int(self, val):
data = pack('i', val)
sys.stdout.write(data)
sys.stdout.flush()
def write_string(self, val):
self.write_int(len(val))
sys.stdout.write(val)
sys.stdout.flush()
# log
def write_json(self, val):
s = json.dumps(val)
self.write_string(s)
def write_exception(self, e, auth_done = False):
self.write_json(_convert_exception(e, auth_done))
class AzureBlobStoreageApi(object):
def __init__(self):
from azure.storage import BlobService
blob_service_opt = {
"account_name": os.environ.get('AZURE_ACCESS_KEY'),
"account_key": os.environ.get('AZURE_SECRET_KEY'),
"protocol": os.environ.get('AZURE_SCHEME', 'https')
}
if os.environ.get('AZURE_HOST_BASE') == 'china':
blob_service_opt["host_base"] = '.blob.core.chinacloudapi.cn'
elif os.environ.get('AZURE_HOST_BASE'):
blob_service_opt["host_base"] = os.environ.get('AZURE_HOST_BASE')
# os.environ['AZURE_DEBUG'] = 'yes'
self._azure = BlobService(**blob_service_opt)
del os.environ['AZURE_ACCESS_KEY']
del os.environ['AZURE_SECRET_KEY']
log_debug('agent created')
def createContainer(self, in_json):
kwargs = {
'fail_on_exist': True
}
if in_json.get('PublicAccess'):
kwargs['x_ms_blob_public_access'] = in_json['PublicAccess']
res = self._azure.create_container(in_json['container'], **kwargs)
return {'success': res}
def getContainerProperties(self, in_json):
kwargs = {}
res = self._azure.get_container_properties(in_json['container'])
return {
'success': True,
'LastModified': _convert_time_str(res['last-modified']),
'ETag': res['etag']
}
def listContainers(self, in_json):
kwargs = {}
# FIXME add prefix and marker to ta
for key in ('Prefix', 'Marker'):
if in_json.get(key):
kwargs[key.lower()] = in_json[key]
res = self._azure.list_containers(**kwargs)
out_json = {
'success': True,
'container': []
}
for rec in res:
out_json['container'].append({
'Name': rec.name
})
return out_json
def deleteContainer(self, in_json):
kwargs = {
'fail_not_exist': True
}
res = self._azure.delete_container(in_json['container'], **kwargs)
return {'success': res}
def createBlockBlob(self, in_json):
kwargs = {}
res = self._azure.put_block_blob_from_path(
in_json['container'],
in_json['blob'],
in_json['fileInput'],
**kwargs)
return {'success': True,
'Properties': _convert_properties_by_put(res)
}
def listBlobs(self, in_json):
kwargs = {}
for key in ('Delimiter', 'Prefix', 'Marker'):
if in_json.get(key):
kwargs[key.lower()] = in_json[key]
res = self._azure.list_blobs(in_json['container'], **kwargs)
targetType = in_json.get('TargetType', 'all')
count = 0
out_json = {
'success': True,
'folder': [],
'file': []
}
if targetType == 'folder' or targetType == 'all':
for rec in res.prefixes:
out_json['folder'].append({'Name': rec.name})
count += 1
if targetType == 'file' or targetType == 'all':
for rec in res.blobs:
out_json['file'].append({
'Name': rec.name,
'Properties': _convert_properties(rec.properties)
})
count += 1
if res.next_marker:
out_json['NextMarker'] = res.next_marker
out_json['count'] = count
return out_json
def getBlobProperties(self, in_json):
kwargs = {}
res = self._azure.get_blob_properties(
in_json['container'],
in_json['blob'],
**kwargs)
return {
'success': True,
'Properties': _convert_properties_by_dict(res),
}
def deleteBlob(self, in_json):
kwargs = {}
kwargs['x_ms_delete_snapshots'] = 'include'
res = self._azure.delete_blob(
in_json['container'],
in_json['blob'],
**kwargs)
return {'success': True}
def getBlob(self, in_json):
kwargs = {}
if in_json.get('RangeStart') and in_json.get('RangeEnd'):
kwargs['x_ms_range'] = 'bytes=' + in_json.get('RangeStart') + '-' + in_json.get('RangeEnd')
with open(in_json['fileOutput'], "wb") as stream:
res = self._azure.get_blob(
in_json['container'],
in_json['blob'],
**kwargs)
stream.write(res)
return {'success': True}
def createBlobBlock(self, in_json):
kwargs = {}
# FIXME do seek here, do not chunk in c++
with open(in_json['fileInput'], "r") as stream:
data = stream.read()
self._azure.put_block(
in_json['container'],
in_json['blob'],
data,
_convert_block_id(in_json['idPrefix'], in_json['counter']),
**kwargs)
return {'success': True}
def listBlobBlocks(self, in_json):
kwargs = {}
res = self._azure.get_block_list(
in_json['container'],
in_json['blob'],
blocklisttype = 'all',
**kwargs)
out_json = {
'success': True,
'Uncommitted': {},
'Committed': {}
}
for rec in res.committed_blocks:
out_json['Committed'][rec.id] = rec.size
for rec in res.uncommitted_blocks:
out_json['Uncommitted'][rec.id] = rec.size
return out_json
def commitBlobBlocks(self, in_json):
kwargs = {}
res = self._azure.put_block_list(
in_json['container'],
in_json['blob'],
map(lambda x, y: _convert_block_id(x, y), [in_json['idPrefix']] * in_json['counter'], range(0, in_json['counter'])),
**kwargs)
return {'success': True,
'Properties': _convert_properties_by_commit(res)
}
def start_server():
io = SimpleIO()
try:
api = AzureBlobStoreageApi()
except Exception as e:
io.write_exception(e)
return False
io.write_string('start')
auth_done = False
while True:
try:
in_json = io.read_json()
fn_name = in_json['fn']
fn = getattr(api, fn_name)
del in_json['fn']
if fn is None:
raise SystemError('no such fn: ' + fn_name)
#log_debug('\033[34mexecute: ' + fn_name + ' ' + json.dumps(in_json) + '\033[0m');
res = fn(in_json)
if not auth_done and res['success']:
auth_done = True
io.write_json(res)
except StopIteration:
break
except Exception as e:
io.write_exception(e, auth_done)
continue
if __name__ == '__main__':
# add include path
script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
sys.path.insert(0, script_dir)
sys.path.insert(1, script_dir + '/azure-sdk-for-python')
res = start_server()
sys.exit(0 if res else 1)