187 lines
5.2 KiB
Python
187 lines
5.2 KiB
Python
"""
|
|
XML-RPC Client with asyncio.
|
|
|
|
This module adapt the ``xmlrpc.client`` module of the standard library to
|
|
work with asyncio.
|
|
|
|
"""
|
|
|
|
import sys
|
|
import asyncio
|
|
import logging
|
|
import aiohttp
|
|
import inspect
|
|
import functools
|
|
|
|
from xmlrpc import client as xmlrpc
|
|
|
|
def coroutine(fn):
|
|
if inspect.iscoroutinefunction(fn):
|
|
return fn
|
|
|
|
@functools.wraps(fn)
|
|
async def _wrapper(*args, **kwargs):
|
|
return fn(*args, **kwargs)
|
|
|
|
return _wrapper
|
|
|
|
|
|
__ALL__ = ['ServerProxy', 'Fault', 'ProtocolError']
|
|
|
|
# you don't have to import xmlrpc.client from your code
|
|
Fault = xmlrpc.Fault
|
|
ProtocolError = xmlrpc.ProtocolError
|
|
|
|
log = logging.getLogger(__name__)
|
|
PY35 = sys.version_info >= (3, 5)
|
|
|
|
|
|
class _Method:
|
|
# some magic to bind an XML-RPC method to an RPC server.
|
|
# supports "nested" methods (e.g. examples.getStateName)
|
|
def __init__(self, send, name):
|
|
self.__send = send
|
|
self.__name = name
|
|
|
|
def __getattr__(self, name):
|
|
return _Method(self.__send, "%s.%s" % (self.__name, name))
|
|
|
|
@coroutine
|
|
def __call__(self, *args):
|
|
ret = yield from self.__send(self.__name, args)
|
|
return ret
|
|
|
|
|
|
class AioTransport(xmlrpc.Transport):
|
|
"""
|
|
``xmlrpc.Transport`` subclass for asyncio support
|
|
"""
|
|
|
|
def __init__(self, session, use_https, *, use_datetime=False,
|
|
use_builtin_types=False, loop, headers=None, auth=None):
|
|
super().__init__(use_datetime, use_builtin_types)
|
|
self.use_https = use_https
|
|
self._loop = loop
|
|
self._session = session
|
|
|
|
self.auth = auth
|
|
|
|
if not headers:
|
|
headers = {'User-Agent': 'python/aioxmlrpc',
|
|
'Accept': 'text/xml',
|
|
'Content-Type': 'text/xml'}
|
|
|
|
self.headers = headers
|
|
|
|
@coroutine
|
|
def request(self, host, handler, request_body, verbose=False):
|
|
"""
|
|
Send the XML-RPC request, return the response.
|
|
This method is a coroutine.
|
|
"""
|
|
url = self._build_url(host, handler)
|
|
response = None
|
|
try:
|
|
response = yield from self._session.request(
|
|
'POST', url, headers=self.headers, data=request_body, auth=self.auth)
|
|
body = yield from response.text()
|
|
if response.status != 200:
|
|
raise ProtocolError(url, response.status,
|
|
body, response.headers)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except ProtocolError:
|
|
raise
|
|
except Exception as exc:
|
|
log.error('Unexpected error', exc_info=True)
|
|
if response is not None:
|
|
errcode = response.status
|
|
headers = response.headers
|
|
else:
|
|
errcode = 0
|
|
headers = {}
|
|
|
|
raise ProtocolError(url, errcode, str(exc), headers)
|
|
return self.parse_response(body)
|
|
|
|
def parse_response(self, body):
|
|
"""
|
|
Parse the xmlrpc response.
|
|
"""
|
|
p, u = self.getparser()
|
|
p.feed(body)
|
|
p.close()
|
|
return u.close()
|
|
|
|
def _build_url(self, host, handler):
|
|
"""
|
|
Build a url for our request based on the host, handler and use_http
|
|
property
|
|
"""
|
|
scheme = 'https' if self.use_https else 'http'
|
|
return '%s://%s%s' % (scheme, host, handler)
|
|
|
|
|
|
class ServerProxy(xmlrpc.ServerProxy):
|
|
"""
|
|
``xmlrpc.ServerProxy`` subclass for asyncio support
|
|
"""
|
|
|
|
def __init__(self, uri, session=None, encoding=None, verbose=False,
|
|
allow_none=False, use_datetime=False, use_builtin_types=False,
|
|
loop=None, auth=None, headers=None):
|
|
self._loop = loop or asyncio.get_event_loop()
|
|
|
|
if session:
|
|
self._session = session
|
|
self._close_session = False
|
|
else:
|
|
self._close_session = True
|
|
self._session = aiohttp.ClientSession(loop=self._loop)
|
|
|
|
transport = AioTransport(use_https=uri.startswith('https://'),
|
|
loop=self._loop,
|
|
session=self._session,
|
|
auth=auth,
|
|
headers=headers)
|
|
|
|
super().__init__(uri, transport, encoding, verbose, allow_none,
|
|
use_datetime, use_builtin_types)
|
|
|
|
@coroutine
|
|
def __request(self, methodname, params):
|
|
# call a method on the remote server
|
|
request = xmlrpc.dumps(params, methodname, encoding=self.__encoding,
|
|
allow_none=self.__allow_none).encode(self.__encoding)
|
|
|
|
response = yield from self.__transport.request(
|
|
self.__host,
|
|
self.__handler,
|
|
request,
|
|
verbose=self.__verbose
|
|
)
|
|
|
|
if len(response) == 1:
|
|
response = response[0]
|
|
|
|
return response
|
|
|
|
@coroutine
|
|
def close(self):
|
|
if self._close_session:
|
|
yield from self._session.close()
|
|
|
|
def __getattr__(self, name):
|
|
return _Method(self.__request, name)
|
|
|
|
if PY35:
|
|
|
|
@coroutine
|
|
def __aenter__(self):
|
|
return self
|
|
|
|
@coroutine
|
|
def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self._close_session:
|
|
yield from self._session.close()
|