HEX
Server: Apache/2.2.34 (Unix) mod_fastcgi/mod_fastcgi-SNAP-0910052141
System: Linux Kou-Etsu-Dou 4.4.59+ #25556 SMP PREEMPT Thu Mar 4 18:03:46 CST 2021 x86_64
User: hosam (1026)
PHP: 7.2.29
Disabled: NONE
Upload Files
File: /volume1/@appstore/HyperBackup/addon/azure_blob/python/azure_agent.py
#!/usr/bin/env python
#-*-coding: utf-8 -*-

import os, sys, json, ssl
from struct import pack, unpack
from time import strptime, mktime
from syslog import syslog, LOG_ERR

debug = False
def log_debug(*args):
	if debug:
		print >> sys.stderr, ' '.join(args)

class AzureErrorBody():
	'''
	Windows Azure Error Body class from xml.
	https://msdn.microsoft.com/en-us/library/azure/dd179382.aspx
	'''
	# parsing something like:
	#<?xml version="1.0" encoding="utf-8"?>
	#<Error>
	#	<Code>...</Code>
	#	<Message>...</Message>
	#	<AuthenticationErrorDetail>...</AuthenticationErrorDetail>
	#	<QueryParameterName>...</QueryParameterName>
	#	<QueryParameterValue>...</QueryParameterValue>
	#	<Reason>...</Reason>
	#</Error>

	def __init__(self):
		self.code = u''
		self.message = u''
		self.authenticationerrordetail = u''

def _is_scoket_exception(e):
	import socket
	if type(e) == socket.gaierror or type(e) == socket.error or type(e) == socket.herror or type(e) == socket.timeout:
		return True
	return False

def _convert_socket_error_code(message, auth_done):
	error_code = -1
	if -1 != message.find("[Errno -2] Name or service not known"):
		if auth_done:
			error_code = -4
		else:
			error_code = -2
	elif -1 != message.find("[Errno -3] Temporary failure in name resolution"):
		error_code = -4
	elif -1 != message.find("[Errno -5] No address associated with hostname"):
		error_code = -4
	elif -1 != message.find("[Errno 110] Connection timed out"):
		error_code = 408
	elif -1 != message.find("[Errno 101] Network is unreachable"):
		error_code = -4
	elif -1 != message.find("[Errno 111] Connection refused"):
		error_code = -4
	elif -1 != message.find("[Errno 113] No route to host"):
		error_code = -4
	elif -1 != message.find("[Errno 104] Connection reset by peer"):
		error_code = -4
	elif -1 != message.find("[Errno 32] Broken pipe"):
		error_code = 408
	return error_code

def _is_ssl_timeout(message):
	#ssl.SSLErr_timeouteption log
	if -1 != message.find("timed out"):
		# The read operation timed out
		# The write operation timed out
		# The handshake operation timed out
		return True
	return False

def _convert_socket_exception(e, auth_done):
	import socket

	error_code = -1
	error_msg = 'Unknown'
	error_cls = 'Unknown'

	try:
		error_cls = type(e).__name__
		if hasattr(e, '__module__') and e.__module__:
			error_cls = e.__module__ + '.' + error_cls

		if type(e) == socket.gaierror:
			if hasattr(e, 'strerror') and e.strerror:
				error_msg = e.strerror
			if e.errno == -2:
				# socket.gaierror: [Errno -2] Name or service not known
				# 1. dns not set
				# 2. set to a wrong dns
				if auth_done:
					error_code = -4
				else:
					error_code = -2
			elif e.errno == -3:
				# socket.gaierror: [Errno -3] Temporary failure in name resolution
				# man gai_strerror, and find EAI_AGAIN:
				# The name server returned a temporary failure indication.  Try again later.
				error_code = -4
			elif e.errno == -5:
				# socket.gaierror: [Errno -5] No address associated with hostname
				# man gai_strerror, and find EAI_NODATA:
				# The specified network host exists, but does not have any network addresses defined.
				error_code = -4
			elif e.errno < 100 and e.errno > 0:
				error_code = e.errno
		elif type(e) == socket.error or type(e) == socket.herror:
			if hasattr(e, 'strerror') and e.strerror:
				error_msg = e.strerror
			elif hasattr(e, 'args'):
				error_msg = str(e.args)
				if -1 != error_msg.find('Tunnel connection failed'):
					error_code = -4
			if e.errno == socket.errno.ETIMEDOUT:
				# socket.error: [Errno 110] Connection timed out
				error_code = 408
			elif e.errno == socket.errno.ENETUNREACH:
				# socket.error: [Errno 101] Network is unreachable
				error_code = -4
			elif e.errno == socket.errno.ECONNREFUSED:
				# socket.error: [Errno 111] Connection refused
				error_code = -4
			elif e.errno == socket.errno.EHOSTUNREACH:
				# socket.error: [Errno 113] No route to host
				error_code = -4
			elif e.errno == socket.errno.ECONNRESET:
				# socket.error: [Errno 104] Connection reset by peer
				error_code = -4
			elif e.errno == socket.errno.EPIPE:
				# socket.error: [Errno 32] Broken pipe
				error_code = 408
			elif e.errno < 100 and e.errno > 0:
				error_code = e.errno
			#syslog(LOG_ERR, "get network error %d:%s" % (e.errno, error_msg))
		elif type(e) == socket.timeout:
			error_msg = 'timed out'
			error_code = 408
		else:
			syslog(LOG_ERR, "BUG: exception [%s], type [%s]" % (str(e), type(e)))
		#syslog(LOG_ERR, "error %s:%d:%s" % (error_cls, error_code, str(error_msg)))

		if -1 == error_code:
			if hasattr(e, "args"):
				syslog(LOG_ERR, "error args: [%s]" % (str(e.args)))
			if hasattr(e, "errno"):
				syslog(LOG_ERR, "error errno: [%s]" % (str(e.errno)))
			if hasattr(e, "strerror"):
				syslog(LOG_ERR, "error strerror: [%s]" % (str(e.strerror)))
	except Exception as ee:
		syslog(LOG_ERR, "parse socket exception failed: [%s]" % (str(ee)))
		pass

	return {
		'success': False,
		'error_class': error_cls,
		'error_message': error_msg,
		'error_code': error_code
	}

def _convert_exception(e, auth_done):
	import azure
	import httplib
	#import pdb; pdb.set_trace()

	error_code = -1
	error_msg = 'Unknown'
	error_cls = 'Unknown'

	try:
		error_cls = type(e).__name__
		if hasattr(e, '__module__') and e.__module__:
			error_cls = e.__module__ + '.' + error_cls

		try:
			if type(e.message) is unicode:
				error_msg = repr(e.message).split('\\n')[0]
			else:
				error_msg = str(e.message).split('\n')[0]
			log_debug("\033[31mexception: \033[0m", error_msg)
		except:
			log_debug("\033[31mexception: \033[0m", "could not parse error msg")

		if type(e) == azure.WindowsAzureConflictError:
			error_code = 409
		elif type(e) == azure.WindowsAzureMissingResourceError:
			error_code = 404
		elif type(e) == azure.WindowsAzureError:
			if hasattr(e, "_http_code"):
				if e._http_code >= 100 and e._http_code < 700:
					error_code = e._http_code
			if hasattr(e, "_resp_body"):
				node = azure.ETree.fromstring(e._resp_body)
				error_body = azure._ETreeXmlToObject.fill_instance_element(
						node, AzureErrorBody)
				if error_body.code == 'AuthenticationFailed':
					error_code = -2
					if error_body.authenticationerrordetail:
						error_msg = error_body.authenticationerrordetail

			if error_msg == azure._ERROR_STORAGE_MISSING_INFO:
				error_code = -2
		elif type(e) == TypeError:
			# this exception will raise when base64 decode for auth failed
			if error_msg == "Incorrect padding":
				error_code = -2
		elif type(e) == UnicodeDecodeError:
			error_code = -5
		elif type(e) == httplib.ResponseNotReady:
			error_code = -4
		elif type(e) == httplib.BadStatusLine:
			error_code = -4
		elif type(e) == httplib.IncompleteRead:
			error_code = -4
		elif type(e) == ssl.SSLError:
			error_code = -4
			if hasattr(e, "strerror") and e.strerror:
				# SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC
				# SSL: SSLV3_ALERT_BAD_RECORD_MAC
				# SSL: CERTIFICATE_VERIFY_FAILED
				error_msg = e.strerror
			if _is_ssl_timeout(str(error_msg)):
				error_code = 408
		elif type(e) == ssl.SSLEOFError:
			# ssl.SSLEOFError: [Errno 8] EOF occurred in violation of protocol
			if hasattr(e, 'strerror') and e.strerror:
				error_msg = e.strerror
			error_code = -4
		elif type(e) == IOError:
			if hasattr(e, 'strerror') and e.strerror:
				error_msg = e.strerror
			error_code = _convert_socket_error_code(str(e), auth_done)
		elif _is_scoket_exception(e):
			return _convert_socket_exception(e, auth_done)
		else:
			syslog(LOG_ERR, "exception [%s]" % str(e))
			syslog(LOG_ERR, "type [%s]" % type(e))
		#syslog(LOG_ERR, "error %s:%d:%s" % (error_cls, error_code, str(error_msg)))

		if -1 == error_code:
			if hasattr(e, "args"):
				syslog(LOG_ERR, "error args: [%s]" % (str(e.args)))
			if hasattr(e, "errno"):
				syslog(LOG_ERR, "error errno: [%s]" % (str(e.errno)))
			if hasattr(e, "strerror"):
				syslog(LOG_ERR, "error strerror: [%s]" % (str(e.strerror)))
	except Exception as ee:
		syslog(LOG_ERR, "parse exception failed. %s" % str(ee))
		pass

	return {
		'success': False,
		'error_class': error_cls,
		'error_message': error_msg,
		'error_code': error_code
	}
def _convert_time_str(val):
	origin_tz = None
	if os.environ.get('TZ'):
		origin_tz = os.environ.get('TZ')
	os.environ['TZ'] = 'GMT'

	timestamp = int(mktime(strptime(val, '%a, %d %b %Y %H:%M:%S %Z')))

	if origin_tz:
		os.environ['TZ'] = origin_tz
	return timestamp
def _convert_properties(properties):
	out_json = {
		# Fri, 25 Jul 2014 09:36:56 GMT
		'LastModified': _convert_time_str(properties.last_modified),
		'MD5': properties.content_md5,
		'ETag': properties.etag,
		'ContentType': properties.content_type,
		'ContentLength': properties.content_length,
		'BlobType': properties.blob_type
	}
	return out_json
def _convert_properties_by_dict(properties):
	out_json = {
		'LastModified': _convert_time_str(properties['last-modified']),
		'MD5': properties.get('content-md5', ''),
		# remove etag dobule quote
		'ETag': properties['etag'][1:-1],
		'ContentLength': int(properties['content-length']),
		'BlobType': properties['x-ms-blob-type']
	}
	if 'content-type' in properties:
		out_json['ContentType'] = properties['content-type'];
	if 'x-ms-meta-hdi_isfolder' in properties:
		out_json['hdi_isfolder'] = 'true' == properties['x-ms-meta-hdi_isfolder'];
	return out_json
def _convert_properties_by_put(properties):
	out_json = {
		'LastModified': _convert_time_str(properties['last-modified']),
		'MD5': properties['content-md5'],
		# remove etag dobule quote
		'ETag': properties['etag'][1:-1],
	}
	return out_json
def _convert_properties_by_commit(properties):
	out_json = {
		'LastModified': _convert_time_str(properties['last-modified']),
		'MD5': '',
		# remove etag dobule quote
		'ETag': properties['etag'][1:-1],
	}
	return out_json
def _convert_block_id(id_prefix, counter):
	return id_prefix + '_{0:08d}'.format(counter)

class SimpleIO(object):
	def read_int(self):
		data = ''
		n = 4
		while n > 0:
			read_data = sys.stdin.read(n)
			if 0 == len(read_data):
				raise StopIteration
			data += read_data
			n -= len(read_data)
		if data:
			return unpack('i', data)[0]
		else:
			# check eof?
			raise StopIteration
	def read_string(self):
		n = self.read_int()
		if 0 == n:
			return ''
		data = ''
		while n > 0:
			read_data = sys.stdin.read(n)
			if 0 == len(read_data):
				raise StopIteration
			data += read_data
			n -= len(read_data)
		if data:
			return data
		else:
			raise SystemError
	def read_json(self):
		json_str = self.read_string()
		if json_str:
			return json.loads(json_str)
		else:
			return None
	def write_int(self, val):
		data = pack('i', val)
		sys.stdout.write(data)
		sys.stdout.flush()
	def write_string(self, val):
		self.write_int(len(val))
		sys.stdout.write(val)
		sys.stdout.flush()
		# log
	def write_json(self, val):
		s = json.dumps(val)
		self.write_string(s)
	def write_exception(self, e, auth_done = False):
		self.write_json(_convert_exception(e, auth_done))

class AzureBlobStoreageApi(object):
	def __init__(self):
		from azure.storage import BlobService

		blob_service_opt = {
			"account_name": os.environ.get('AZURE_ACCESS_KEY'),
			"account_key": os.environ.get('AZURE_SECRET_KEY'),
			"protocol": os.environ.get('AZURE_SCHEME', 'https')
			}

		if os.environ.get('AZURE_HOST_BASE') == 'china':
			blob_service_opt["host_base"] = '.blob.core.chinacloudapi.cn'
		elif os.environ.get('AZURE_HOST_BASE'):
			blob_service_opt["host_base"] = os.environ.get('AZURE_HOST_BASE')

		# os.environ['AZURE_DEBUG'] = 'yes'
		self._azure = BlobService(**blob_service_opt)

		del os.environ['AZURE_ACCESS_KEY']
		del os.environ['AZURE_SECRET_KEY']
		log_debug('agent created')

	def createContainer(self, in_json):
		kwargs = {
			'fail_on_exist': True
		}

		if in_json.get('PublicAccess'):
			kwargs['x_ms_blob_public_access'] = in_json['PublicAccess']

		res = self._azure.create_container(in_json['container'], **kwargs)

		return {'success': res}
	def getContainerProperties(self, in_json):
		kwargs = {}

		res = self._azure.get_container_properties(in_json['container'])

		return {
			'success': True,
			'LastModified': _convert_time_str(res['last-modified']),
			'ETag': res['etag']
		}
	def listContainers(self, in_json):
		kwargs = {}
		# FIXME add prefix and marker to ta

		for key in ('Prefix', 'Marker'):
			if in_json.get(key):
				kwargs[key.lower()] = in_json[key]

		res = self._azure.list_containers(**kwargs)

		out_json = {
			'success': True,
			'container': []
		}
		for rec in res:
			out_json['container'].append({
				'Name': rec.name
			})

		return out_json
	def deleteContainer(self, in_json):
		kwargs = {
			'fail_not_exist': True
		}

		res = self._azure.delete_container(in_json['container'], **kwargs)

		return {'success': res}

	def createBlockBlob(self, in_json):
		kwargs = {}

		res = self._azure.put_block_blob_from_path(
				in_json['container'],
				in_json['blob'],
				in_json['fileInput'],
				**kwargs)

		return {'success': True,
				'Properties': _convert_properties_by_put(res)
		}
	def listBlobs(self, in_json):
		kwargs = {}
		for key in ('Delimiter', 'Prefix', 'Marker'):
			if in_json.get(key):
				kwargs[key.lower()] = in_json[key]

		res = self._azure.list_blobs(in_json['container'], **kwargs)

		targetType = in_json.get('TargetType', 'all')
		count = 0
		out_json = {
			'success': True,
			'folder': [],
			'file': []
		}

		if targetType == 'folder' or targetType == 'all':
			for rec in res.prefixes:
				out_json['folder'].append({'Name': rec.name})
				count += 1
		if targetType == 'file' or targetType == 'all':
			for rec in res.blobs:
				out_json['file'].append({
					'Name': rec.name,
					'Properties': _convert_properties(rec.properties)
				})
				count += 1
		if res.next_marker:
			out_json['NextMarker'] = res.next_marker
		out_json['count'] = count
		return out_json
	def getBlobProperties(self, in_json):
		kwargs = {}

		res = self._azure.get_blob_properties(
				in_json['container'],
				in_json['blob'],
				**kwargs)

		return {
			'success': True,
			'Properties': _convert_properties_by_dict(res),
		}
	def deleteBlob(self, in_json):
		kwargs = {}
		kwargs['x_ms_delete_snapshots'] = 'include'

		res = self._azure.delete_blob(
				in_json['container'],
				in_json['blob'],
				**kwargs)

		return {'success': True}
	def getBlob(self, in_json):
		kwargs = {}

		if in_json.get('RangeStart') and in_json.get('RangeEnd'):
			kwargs['x_ms_range'] = 'bytes=' + in_json.get('RangeStart') + '-' + in_json.get('RangeEnd')

		with open(in_json['fileOutput'], "wb") as stream:
			res = self._azure.get_blob(
				in_json['container'],
				in_json['blob'],
				**kwargs)
			stream.write(res)

		return {'success': True}

	def createBlobBlock(self, in_json):
		kwargs = {}

		# FIXME do seek here, do not chunk in c++
		with open(in_json['fileInput'], "r") as stream:
			data = stream.read()
			self._azure.put_block(
					in_json['container'],
					in_json['blob'],
					data,
					_convert_block_id(in_json['idPrefix'], in_json['counter']),
					**kwargs)

		return {'success': True}
	def listBlobBlocks(self, in_json):
		kwargs = {}

		res = self._azure.get_block_list(
				in_json['container'],
				in_json['blob'],
				blocklisttype = 'all',
				**kwargs)

		out_json = {
			'success': True,
			'Uncommitted': {},
			'Committed': {}
		}
		for rec in res.committed_blocks:
			out_json['Committed'][rec.id] = rec.size
		for rec in res.uncommitted_blocks:
			out_json['Uncommitted'][rec.id] = rec.size

		return out_json
	def commitBlobBlocks(self, in_json):
		kwargs = {}

		res = self._azure.put_block_list(
				in_json['container'],
				in_json['blob'],
				map(lambda x, y: _convert_block_id(x, y), [in_json['idPrefix']] * in_json['counter'], range(0, in_json['counter'])),
				**kwargs)

		return {'success': True,
				'Properties': _convert_properties_by_commit(res)
		}

def start_server():
	io = SimpleIO()

	try:
		api = AzureBlobStoreageApi()
	except Exception as e:
		io.write_exception(e)
		return False

	io.write_string('start')
	auth_done = False

	while True:
		try:
			in_json = io.read_json()
			fn_name = in_json['fn']
			fn = getattr(api, fn_name)
			del in_json['fn']

			if fn is None:
				raise SystemError('no such fn: ' + fn_name)

			#log_debug('\033[34mexecute: ' + fn_name + ' ' + json.dumps(in_json) + '\033[0m');

			res = fn(in_json)
			if not auth_done and res['success']:
				auth_done = True
			io.write_json(res)
		except StopIteration:
			break
		except Exception as e:
			io.write_exception(e, auth_done)
			continue

if __name__ == '__main__':
	# add include path
	script_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
	sys.path.insert(0, script_dir)
	sys.path.insert(1, script_dir + '/azure-sdk-for-python')

	res = start_server()
	sys.exit(0 if res else 1)