Compare commits
4 Commits
16d4b96c55
...
7685c36677
| Author | SHA1 | Date |
|---|---|---|
|
|
7685c36677 | |
|
|
1044d6224a | |
|
|
9aec9332fc | |
|
|
e00a4587f9 |
|
|
@ -0,0 +1,38 @@
|
||||||
|
# Communication pattern exercises
|
||||||
|
|
||||||
|
These scripts were developed for the courses 31380 and 31725 by
|
||||||
|
- Lasse Orda (Initial implementation)
|
||||||
|
- Tue Vissing Jensen (Updates, maintainer)
|
||||||
|
|
||||||
|
## Included scripts
|
||||||
|
|
||||||
|
### RPC
|
||||||
|
|
||||||
|
`rpc_example_runner.py text1 text2 text3 text4` ---
|
||||||
|
Requires that `rpc/rpc_example_server.py` is running.
|
||||||
|
Returns a the list of text strings given, but in reversed order.
|
||||||
|
|
||||||
|
`rpc_sync_pi_runner.py N` ---
|
||||||
|
Requires that `rpc/rpc_pi_server.py` is running.
|
||||||
|
Estimate pi by throwing N points in the unit circle, with the server taking over half the work.
|
||||||
|
|
||||||
|
`rpc_async_pi_runner.py N` ---
|
||||||
|
Requires that `rpc/rpc_pi_server.py` is running.
|
||||||
|
Estimate pi by throwing N points in the unit circle, with the server taking over half the work simultaneously.
|
||||||
|
|
||||||
|
### Pub/Sub
|
||||||
|
|
||||||
|
`pub_server.py` ---
|
||||||
|
A server which periodically publishes the current time.
|
||||||
|
|
||||||
|
`sub_client.py` ---
|
||||||
|
Subscribes to the server's messages and prints them. Exits after 5 messages.
|
||||||
|
|
||||||
|
### Broadcast
|
||||||
|
|
||||||
|
`broadcast_receiver.py`
|
||||||
|
|
||||||
|
`broadcast_listener.py`
|
||||||
|
|
||||||
|
`broadcaster.py` ---
|
||||||
|
Periodically broadcasts
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
"""
|
||||||
|
Script which listens for messages on a hardcoded port 8881
|
||||||
|
|
||||||
|
@Author: orda
|
||||||
|
"""
|
||||||
|
import socket
|
||||||
|
import sys
|
||||||
|
from parse import parse
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
port = sys.argv[1]
|
||||||
|
int(port)
|
||||||
|
else:
|
||||||
|
port = 8881
|
||||||
|
|
||||||
|
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||||
|
my_socket.bind(('', port))
|
||||||
|
|
||||||
|
formatting_string = "Today's lottery number: {number}"
|
||||||
|
|
||||||
|
print('listener started...')
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
message, address = my_socket.recvfrom(port)
|
||||||
|
dmessage = message.decode('utf-8') # Decode to utf-8
|
||||||
|
print(f'received: {dmessage}, from: {address}')
|
||||||
|
# decoded will be a pair of a tuple and a dictionary which reflect the
|
||||||
|
# "reverse" of using .format on the first string.
|
||||||
|
decoded = parse(formatting_string, dmessage)
|
||||||
|
if decoded:
|
||||||
|
print(f' Decoded into: {decoded.named}')
|
||||||
|
print(f' Check that the string matches: {formatting_string.format(*decoded.fixed, **decoded.named)}')
|
||||||
|
finally:
|
||||||
|
my_socket.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
"""
|
||||||
|
Script which broadcasts random integers to a hardcoded port 8881
|
||||||
|
|
||||||
|
@Author: orda
|
||||||
|
"""
|
||||||
|
|
||||||
|
import socket
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
port = sys.argv[1]
|
||||||
|
int(port)
|
||||||
|
else:
|
||||||
|
port = 8881
|
||||||
|
|
||||||
|
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||||
|
# Allow reuse in case we exited ungracefully
|
||||||
|
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
print('broadcaster started...')
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
# Make a random number and send it out.
|
||||||
|
number = random.randint(1, 101)
|
||||||
|
print("sending: ", number)
|
||||||
|
my_socket.sendto(f"Today's lottery number: {number}".encode('utf-8'), ('<broadcast>', 8881))
|
||||||
|
time.sleep(1)
|
||||||
|
finally:
|
||||||
|
my_socket.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
import zmq
|
||||||
|
|
||||||
|
ip = "localhost"
|
||||||
|
heartbeat_publish_port = 10002
|
||||||
|
|
||||||
|
|
||||||
|
# Get a context we can use to make sockets
|
||||||
|
context = zmq.Context()
|
||||||
|
# Socket to talk to server
|
||||||
|
socket = context.socket(zmq.SUB)
|
||||||
|
socket.connect(f"tcp://{ip}:{heartbeat_publish_port}")
|
||||||
|
|
||||||
|
topicfilter = "HEARTBEAT"
|
||||||
|
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
print(socket.recv_string())
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
import zmq
|
||||||
|
from xmlrpc.server import SimpleXMLRPCServer
|
||||||
|
|
||||||
|
# Other connect on the server port.
|
||||||
|
heartbeat_server_port = 10001
|
||||||
|
heartbeat_publish_port = 10002
|
||||||
|
|
||||||
|
# Make a context which we use to make sockets from
|
||||||
|
context = zmq.Context()
|
||||||
|
# Make a new socket. We want to publish on this socket.
|
||||||
|
socket = context.socket(zmq.PUB)
|
||||||
|
# Bind the socket (inside our program) to a port (on our machine)
|
||||||
|
# We can now send messages
|
||||||
|
socket.bind(f"tcp://*:{heartbeat_publish_port}")
|
||||||
|
|
||||||
|
|
||||||
|
# Make a function for others to call letting us know they are alive.
|
||||||
|
def send_heartbeat(sender: str):
|
||||||
|
print(f"Received heartbeat from {sender}")
|
||||||
|
# Publish who is alive now
|
||||||
|
socket.send_string(f"HEARTBEAT;{sender}")
|
||||||
|
# Return something just to show we succeeded
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
# Make an RPC server to serve that function to others
|
||||||
|
server = SimpleXMLRPCServer(('localhost', heartbeat_server_port))
|
||||||
|
# Register the function
|
||||||
|
server.register_function(send_heartbeat, 'send_heartbeat')
|
||||||
|
# Start up the server
|
||||||
|
server.serve_forever()
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
from xmlrpc.client import ServerProxy
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
heartbeat_server_port = 10001
|
||||||
|
|
||||||
|
|
||||||
|
with ServerProxy(f'http://localhost:{heartbeat_server_port}') as proxy:
|
||||||
|
# Periodically send a heartbeat
|
||||||
|
while True:
|
||||||
|
proxy.send_heartbeat('BATTERY')
|
||||||
|
sleep(1)
|
||||||
|
|
@ -0,0 +1,32 @@
|
||||||
|
import zmq
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
|
||||||
|
port = "5556"
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
port = sys.argv[1]
|
||||||
|
int(port)
|
||||||
|
|
||||||
|
# Make a context which we use to make sockets from
|
||||||
|
context = zmq.Context()
|
||||||
|
# Make a new socket. We want to publish on this socket.
|
||||||
|
socket = context.socket(zmq.PUB)
|
||||||
|
# Bind the socket (inside our program) to a port (on our machine)
|
||||||
|
# We can now send messages
|
||||||
|
socket.bind(f"tcp://*:{port}")
|
||||||
|
|
||||||
|
topics = ('TIME', 'RANDOM')
|
||||||
|
messages = {}
|
||||||
|
while True:
|
||||||
|
# Time to publish the latest time!
|
||||||
|
messages['TIME'] = time.ctime()
|
||||||
|
messages['RANDOM'] = random.randint(1,10)
|
||||||
|
# Note the use of XXX_string here;
|
||||||
|
# the non-_stringy methods only work with bytes.
|
||||||
|
for topic in topics:
|
||||||
|
message = messages.get(topic, '')
|
||||||
|
if not message: continue
|
||||||
|
socket.send_string(f"{topic};{message}")
|
||||||
|
print(f"Published topic {topic}: {message}")
|
||||||
|
time.sleep(1)
|
||||||
|
|
@ -0,0 +1,47 @@
|
||||||
|
import zmq
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from zmq.utils.monitor import recv_monitor_message
|
||||||
|
|
||||||
|
port = "5556"
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
port = sys.argv[1]
|
||||||
|
int(port)
|
||||||
|
|
||||||
|
context = zmq.Context()
|
||||||
|
socket = context.socket(zmq.PUB)
|
||||||
|
socket.bind(f"tcp://*:{port}")
|
||||||
|
|
||||||
|
# Get a monitoring socket where we can sniff information about new subscribers.
|
||||||
|
monitor = socket.get_monitor_socket()
|
||||||
|
|
||||||
|
sub_list = set()
|
||||||
|
|
||||||
|
topic = "TIME"
|
||||||
|
while True:
|
||||||
|
# Run through monitoring messages and check if we have new subscribers
|
||||||
|
# Note you can delete this entire
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# We include a NOBLOCK flag here to not hang until a status message comes in.
|
||||||
|
# If no messages are ready, zmq.Again will be raised, which we catch below.
|
||||||
|
status = recv_monitor_message(monitor, flags=zmq.NOBLOCK)
|
||||||
|
print(f"Status: {status}")
|
||||||
|
if status['event'] == zmq.EVENT_ACCEPTED:
|
||||||
|
# New subscriber, add them to our list of subscribers.
|
||||||
|
print(f"Subscriber '{status['value']}' has joined :D")
|
||||||
|
sub_list.add(status['value'])
|
||||||
|
if status['event'] == zmq.EVENT_DISCONNECTED:
|
||||||
|
# Someone left, remove them from our list.
|
||||||
|
print(f"Subscriber '{status['value']}' has left :(")
|
||||||
|
sub_list.remove(status['value'])
|
||||||
|
except zmq.Again as e:
|
||||||
|
# No more new subscribers - let's stop looking for them
|
||||||
|
break
|
||||||
|
# Time to publish the latest time!
|
||||||
|
messagedata = time.ctime()
|
||||||
|
# Note the use of XXX_string here;
|
||||||
|
# the non-_string-y methods only work with bytes.
|
||||||
|
socket.send_string(f"{topic};{messagedata}")
|
||||||
|
print(f"Published topic {topic}: {messagedata} to subscribers: {sub_list}")
|
||||||
|
time.sleep(1)
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
import sys
|
||||||
|
import zmq
|
||||||
|
|
||||||
|
ip = "localhost"
|
||||||
|
port = "5556"
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
port = sys.argv[1]
|
||||||
|
int(port)
|
||||||
|
|
||||||
|
# Socket to talk to server
|
||||||
|
context = zmq.Context()
|
||||||
|
socket = context.socket(zmq.SUB)
|
||||||
|
|
||||||
|
print(f"Collecting updates from time server at tcp://localhost:{port}")
|
||||||
|
socket.connect(f"tcp://{ip}:{port}")
|
||||||
|
|
||||||
|
# Filter by topic
|
||||||
|
topicfilters = ("TIME", "RANDOM")
|
||||||
|
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilters[0])
|
||||||
|
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilters[1])
|
||||||
|
|
||||||
|
# Process 5 updates
|
||||||
|
topic_list = []
|
||||||
|
for update_nbr in range(5):
|
||||||
|
string = socket.recv_string()
|
||||||
|
topic, messagedata = string.split(';')
|
||||||
|
topic_list.append(messagedata)
|
||||||
|
print(f"Received on topic {topic}: {messagedata}")
|
||||||
|
socket.close()
|
||||||
|
t_str = "\n".join(topic_list)
|
||||||
|
print(f"All the times we received: \n{t_str}")
|
||||||
|
|
@ -0,0 +1,50 @@
|
||||||
|
import sys
|
||||||
|
import zmq
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
"""
|
||||||
|
This version of the subscriber doesn't hang
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
ip = "localhost"
|
||||||
|
port = "5556"
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
port = sys.argv[1]
|
||||||
|
int(port)
|
||||||
|
|
||||||
|
# Make a context we can use to get a socket
|
||||||
|
context = zmq.Context()
|
||||||
|
# Grab a socket that can connect to the server
|
||||||
|
socket = context.socket(zmq.SUB)
|
||||||
|
|
||||||
|
# Connect to the server by saying where we can get our measurements from
|
||||||
|
print(f"Collecting updates from time server at tcp://localhost:{port}")
|
||||||
|
socket.connect(f"tcp://{ip}:{port}")
|
||||||
|
|
||||||
|
# Filter by topic - we are only interested in knowing about the time
|
||||||
|
topicfilter = "TIME"
|
||||||
|
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
|
||||||
|
|
||||||
|
# Process 5 updates
|
||||||
|
topic_list = []
|
||||||
|
# We need to keep track of how many we've received
|
||||||
|
# since the loop may end with no messages
|
||||||
|
try:
|
||||||
|
while len(topic_list) < 5:
|
||||||
|
try: # Need to use a try block since zmq uses a
|
||||||
|
# giving the flag NOBLOCK indicates to zmq that we don't want to
|
||||||
|
# hang waiting for a message.
|
||||||
|
string = socket.recv_string(zmq.NOBLOCK)
|
||||||
|
except zmq.Again:
|
||||||
|
print("No message this time :(")
|
||||||
|
continue
|
||||||
|
topic, messagedata = string.split(';')
|
||||||
|
topic_list.append(messagedata)
|
||||||
|
print(f"Received on topic {topic}: {messagedata}")
|
||||||
|
#
|
||||||
|
finally
|
||||||
|
sleep(0.2)
|
||||||
|
socket.close()
|
||||||
|
t_str = "\n".join(topic_list)
|
||||||
|
print(f"All the times we received: \n{t_str}")
|
||||||
|
|
@ -0,0 +1,12 @@
|
||||||
|
aiohttp==3.9.5
|
||||||
|
aiosignal==1.3.1
|
||||||
|
attrs==23.2.0
|
||||||
|
frozenlist==1.4.1
|
||||||
|
greenlet==3.0.3
|
||||||
|
idna==3.7
|
||||||
|
msgpack==1.0.8
|
||||||
|
multidict==6.0.5
|
||||||
|
parse==1.20.2
|
||||||
|
pynvim==0.5.0
|
||||||
|
pyzmq==26.0.3
|
||||||
|
yarl==1.9.4
|
||||||
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,186 @@
|
||||||
|
"""
|
||||||
|
XML-RPC Client with asyncio.
|
||||||
|
|
||||||
|
This module adapt the ``xmlrpc.client`` module of the standard library to
|
||||||
|
work with asyncio.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import aiohttp
|
||||||
|
import inspect
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from xmlrpc import client as xmlrpc
|
||||||
|
|
||||||
|
def coroutine(fn):
|
||||||
|
if inspect.iscoroutinefunction(fn):
|
||||||
|
return fn
|
||||||
|
|
||||||
|
@functools.wraps(fn)
|
||||||
|
async def _wrapper(*args, **kwargs):
|
||||||
|
return fn(*args, **kwargs)
|
||||||
|
|
||||||
|
return _wrapper
|
||||||
|
|
||||||
|
|
||||||
|
__ALL__ = ['ServerProxy', 'Fault', 'ProtocolError']
|
||||||
|
|
||||||
|
# you don't have to import xmlrpc.client from your code
|
||||||
|
Fault = xmlrpc.Fault
|
||||||
|
ProtocolError = xmlrpc.ProtocolError
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
PY35 = sys.version_info >= (3, 5)
|
||||||
|
|
||||||
|
|
||||||
|
class _Method:
|
||||||
|
# some magic to bind an XML-RPC method to an RPC server.
|
||||||
|
# supports "nested" methods (e.g. examples.getStateName)
|
||||||
|
def __init__(self, send, name):
|
||||||
|
self.__send = send
|
||||||
|
self.__name = name
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return _Method(self.__send, "%s.%s" % (self.__name, name))
|
||||||
|
|
||||||
|
@coroutine
|
||||||
|
def __call__(self, *args):
|
||||||
|
ret = yield from self.__send(self.__name, args)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
class AioTransport(xmlrpc.Transport):
|
||||||
|
"""
|
||||||
|
``xmlrpc.Transport`` subclass for asyncio support
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, session, use_https, *, use_datetime=False,
|
||||||
|
use_builtin_types=False, loop, headers=None, auth=None):
|
||||||
|
super().__init__(use_datetime, use_builtin_types)
|
||||||
|
self.use_https = use_https
|
||||||
|
self._loop = loop
|
||||||
|
self._session = session
|
||||||
|
|
||||||
|
self.auth = auth
|
||||||
|
|
||||||
|
if not headers:
|
||||||
|
headers = {'User-Agent': 'python/aioxmlrpc',
|
||||||
|
'Accept': 'text/xml',
|
||||||
|
'Content-Type': 'text/xml'}
|
||||||
|
|
||||||
|
self.headers = headers
|
||||||
|
|
||||||
|
@coroutine
|
||||||
|
def request(self, host, handler, request_body, verbose=False):
|
||||||
|
"""
|
||||||
|
Send the XML-RPC request, return the response.
|
||||||
|
This method is a coroutine.
|
||||||
|
"""
|
||||||
|
url = self._build_url(host, handler)
|
||||||
|
response = None
|
||||||
|
try:
|
||||||
|
response = yield from self._session.request(
|
||||||
|
'POST', url, headers=self.headers, data=request_body, auth=self.auth)
|
||||||
|
body = yield from response.text()
|
||||||
|
if response.status != 200:
|
||||||
|
raise ProtocolError(url, response.status,
|
||||||
|
body, response.headers)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except ProtocolError:
|
||||||
|
raise
|
||||||
|
except Exception as exc:
|
||||||
|
log.error('Unexpected error', exc_info=True)
|
||||||
|
if response is not None:
|
||||||
|
errcode = response.status
|
||||||
|
headers = response.headers
|
||||||
|
else:
|
||||||
|
errcode = 0
|
||||||
|
headers = {}
|
||||||
|
|
||||||
|
raise ProtocolError(url, errcode, str(exc), headers)
|
||||||
|
return self.parse_response(body)
|
||||||
|
|
||||||
|
def parse_response(self, body):
|
||||||
|
"""
|
||||||
|
Parse the xmlrpc response.
|
||||||
|
"""
|
||||||
|
p, u = self.getparser()
|
||||||
|
p.feed(body)
|
||||||
|
p.close()
|
||||||
|
return u.close()
|
||||||
|
|
||||||
|
def _build_url(self, host, handler):
|
||||||
|
"""
|
||||||
|
Build a url for our request based on the host, handler and use_http
|
||||||
|
property
|
||||||
|
"""
|
||||||
|
scheme = 'https' if self.use_https else 'http'
|
||||||
|
return '%s://%s%s' % (scheme, host, handler)
|
||||||
|
|
||||||
|
|
||||||
|
class ServerProxy(xmlrpc.ServerProxy):
|
||||||
|
"""
|
||||||
|
``xmlrpc.ServerProxy`` subclass for asyncio support
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, uri, session=None, encoding=None, verbose=False,
|
||||||
|
allow_none=False, use_datetime=False, use_builtin_types=False,
|
||||||
|
loop=None, auth=None, headers=None):
|
||||||
|
self._loop = loop or asyncio.get_event_loop()
|
||||||
|
|
||||||
|
if session:
|
||||||
|
self._session = session
|
||||||
|
self._close_session = False
|
||||||
|
else:
|
||||||
|
self._close_session = True
|
||||||
|
self._session = aiohttp.ClientSession(loop=self._loop)
|
||||||
|
|
||||||
|
transport = AioTransport(use_https=uri.startswith('https://'),
|
||||||
|
loop=self._loop,
|
||||||
|
session=self._session,
|
||||||
|
auth=auth,
|
||||||
|
headers=headers)
|
||||||
|
|
||||||
|
super().__init__(uri, transport, encoding, verbose, allow_none,
|
||||||
|
use_datetime, use_builtin_types)
|
||||||
|
|
||||||
|
@coroutine
|
||||||
|
def __request(self, methodname, params):
|
||||||
|
# call a method on the remote server
|
||||||
|
request = xmlrpc.dumps(params, methodname, encoding=self.__encoding,
|
||||||
|
allow_none=self.__allow_none).encode(self.__encoding)
|
||||||
|
|
||||||
|
response = yield from self.__transport.request(
|
||||||
|
self.__host,
|
||||||
|
self.__handler,
|
||||||
|
request,
|
||||||
|
verbose=self.__verbose
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(response) == 1:
|
||||||
|
response = response[0]
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
@coroutine
|
||||||
|
def close(self):
|
||||||
|
if self._close_session:
|
||||||
|
yield from self._session.close()
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return _Method(self.__request, name)
|
||||||
|
|
||||||
|
if PY35:
|
||||||
|
|
||||||
|
@coroutine
|
||||||
|
def __aenter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
@coroutine
|
||||||
|
def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
if self._close_session:
|
||||||
|
yield from self._session.close()
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
from xmlrpc.client import ServerProxy
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
# Create the proxy in a nice way so it gets closed when we are done.
|
||||||
|
with ServerProxy('http://localhost:9000') as proxy:
|
||||||
|
# Ensure we got enough arguments coming in
|
||||||
|
assert len(sys.argv) >= 3, "Must supply at least 2 arguments.\n" + \
|
||||||
|
"Usage: rpc_sync_client.py function argument1 [argument2 ...]"
|
||||||
|
# Split incoming arguments into the name of the function to call and
|
||||||
|
# the arguments to supply to that function. Note that sys.argv[0] is
|
||||||
|
# the name of the script itself.
|
||||||
|
scriptname, function, *arguments = sys.argv
|
||||||
|
# Get the indicated remote function.
|
||||||
|
remote_function = getattr(proxy, function)
|
||||||
|
# Print the result of executing the remote function.
|
||||||
|
print(remote_function(arguments))
|
||||||
|
|
@ -0,0 +1,26 @@
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from xmlrpc.server import SimpleXMLRPCServer
|
||||||
|
|
||||||
|
|
||||||
|
def reverse_list(l):
|
||||||
|
logging.debug(f'Call received: reverse_list({l!r}), calculating for 1 second')
|
||||||
|
time.sleep(1)
|
||||||
|
return l[::-1]
|
||||||
|
|
||||||
|
def allcaps_list(l):
|
||||||
|
logging.debug(f'Call received: allcaps({l!r}), calculating for 1 second')
|
||||||
|
return [i.upper() for i in l]
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True)
|
||||||
|
# Register the function we are serving
|
||||||
|
server.register_function(reverse_list, 'reverse')
|
||||||
|
server.register_function(allcaps_list, 'allcaps')
|
||||||
|
try:
|
||||||
|
print("Use Control-C to exit")
|
||||||
|
# Start serving our functions
|
||||||
|
server.serve_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Exiting")
|
||||||
|
|
@ -0,0 +1,58 @@
|
||||||
|
import asyncio
|
||||||
|
import sys
|
||||||
|
from statistics import mean
|
||||||
|
from math import pi
|
||||||
|
from time import time
|
||||||
|
# Import the asynchronous version of ServerProxy
|
||||||
|
from aioxmlrpc.client import ServerProxy
|
||||||
|
# Import the asynchronous version of estimate_pi
|
||||||
|
from util import estimate_pi_async
|
||||||
|
|
||||||
|
|
||||||
|
# Asynchronous call to the slave
|
||||||
|
async def remote_estimate(n):
|
||||||
|
print(f"Requesting that slave estimate pi with {n} throws.")
|
||||||
|
# Create the proxy in a nice way so it gets closed when we are done.
|
||||||
|
async with ServerProxy('http://localhost:9000') as proxy:
|
||||||
|
pi_remote = await proxy.estimate_pi(n)
|
||||||
|
# print(f"Result of remote estimation: pi={pi_remote:.010f}")
|
||||||
|
print(pi_remote)
|
||||||
|
return pi_remote
|
||||||
|
|
||||||
|
|
||||||
|
# Asynchronous call to ourselves
|
||||||
|
async def local_estimate(n):
|
||||||
|
print(f"Master begins estimating pi with {n} throws.")
|
||||||
|
pi_local = await estimate_pi_async(n)
|
||||||
|
print(f"Result of local estimation: pi={pi_local:.010f}")
|
||||||
|
return pi_local
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Ensure we got enough arguments coming in
|
||||||
|
assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \
|
||||||
|
"Usage: rpc_sync_pi_master.py N [argument2 ...]"
|
||||||
|
# Split incoming arguments into the number of throws to use.
|
||||||
|
# Note that sys.argv[0] is the name of the script itself.
|
||||||
|
scriptname, N, *arguments = sys.argv
|
||||||
|
|
||||||
|
# split the workload between ourselves and the remote
|
||||||
|
# note: // is integer division
|
||||||
|
N = int(N)
|
||||||
|
N_remote = N // 2
|
||||||
|
N_local = N - N_remote
|
||||||
|
start_time = time()
|
||||||
|
|
||||||
|
# ASYNC MAGIC BEGIN
|
||||||
|
# Gather up all tasks we have to do, and tell the event loop to
|
||||||
|
# run until they are complete.
|
||||||
|
futures = asyncio.gather(remote_estimate(N_remote), local_estimate(N_local))
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
results = loop.run_until_complete(futures)
|
||||||
|
# ASYNC MAGIC END
|
||||||
|
|
||||||
|
pi_remote, pi_local = results
|
||||||
|
|
||||||
|
pi_m = mean([pi_remote, pi_local])
|
||||||
|
print(f"Mean estimation result: pi ={pi_m:.010f}")
|
||||||
|
print(f"Relative error: {100*(pi_m/pi - 1):.010f}%")
|
||||||
|
print(f"Total time to execute: {time() - start_time} sec")
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
import logging
|
||||||
|
from xmlrpc.server import SimpleXMLRPCServer
|
||||||
|
from util import estimate_pi
|
||||||
|
|
||||||
|
|
||||||
|
def local_estimate_pi(n, *args):
|
||||||
|
logging.debug(f'Call received: estimate_pi({n!r})')
|
||||||
|
return estimate_pi(n)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True)
|
||||||
|
# Register the function we are serving
|
||||||
|
server.register_function(local_estimate_pi, 'estimate_pi')
|
||||||
|
try:
|
||||||
|
print("Use Control-C to exit")
|
||||||
|
# Start serving our functions
|
||||||
|
server.serve_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Exiting")
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
import sys
|
||||||
|
from util import estimate_pi
|
||||||
|
from statistics import mean
|
||||||
|
from math import pi
|
||||||
|
from time import time
|
||||||
|
# Import the synchronous version of ServerProxy
|
||||||
|
from xmlrpc.client import ServerProxy
|
||||||
|
|
||||||
|
|
||||||
|
# Create the proxy in a nice way so it gets closed when we are done.
|
||||||
|
with ServerProxy('http://localhost:9000') as proxy:
|
||||||
|
# Ensure we got enough arguments coming in
|
||||||
|
assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \
|
||||||
|
"Usage: rpc_sync_pi_master.py N [argument2 ...]"
|
||||||
|
# Split incoming arguments into the number of throws to use.
|
||||||
|
# Note that sys.argv[0] is the name of the script itself.
|
||||||
|
scriptname, N, *arguments = sys.argv
|
||||||
|
|
||||||
|
# split the workload between ourselves and the remote
|
||||||
|
# note: // is integer division
|
||||||
|
N = int(N)
|
||||||
|
N_remote = N // 2
|
||||||
|
N_local = N - N_remote
|
||||||
|
start_time = time()
|
||||||
|
|
||||||
|
print(f"Requesting that slave estimate pi with {N_remote} throws.")
|
||||||
|
pi_remote = proxy.estimate_pi(N_remote)
|
||||||
|
print(f"Result of remote estimation: pi={pi_remote:.010f}")
|
||||||
|
print(f"Master begins estimating pi with {N_local} throws.")
|
||||||
|
pi_local = estimate_pi(N_local)
|
||||||
|
print(f"Result of local estimation: pi={pi_local:.010f}")
|
||||||
|
pi_m = mean([pi_remote, pi_local])
|
||||||
|
print(f"Mean estimation result: pi ={pi_m:.010f}")
|
||||||
|
print(f"Relative error: {100*(pi_m/pi - 1):.010f}%")
|
||||||
|
print(f"Total time to execute: {time() - start_time} sec")
|
||||||
|
|
@ -0,0 +1,55 @@
|
||||||
|
import asyncio
|
||||||
|
from random import random
|
||||||
|
from itertools import chain, islice
|
||||||
|
|
||||||
|
|
||||||
|
def get_chunks_it(l, n):
|
||||||
|
""" Chunks an iterator `l` in size `n`
|
||||||
|
Args:
|
||||||
|
l (Iterator[Any]): an iterator
|
||||||
|
n (int): size of
|
||||||
|
Returns:
|
||||||
|
Generator[Any]
|
||||||
|
"""
|
||||||
|
iterator = iter(l)
|
||||||
|
for first in iterator:
|
||||||
|
yield chain([first], islice(iterator, n - 1))
|
||||||
|
|
||||||
|
|
||||||
|
def estimate_pi(n):
|
||||||
|
"""
|
||||||
|
Estimates pi by throwing a point (x,y) randomly *n* times in
|
||||||
|
the unit square and counting the number of hits where
|
||||||
|
x^2 + Y^2 <= 1.
|
||||||
|
Pi is then approximated as 4 * no. hits / n.
|
||||||
|
input:
|
||||||
|
*n* (int): The number of times to throw the point
|
||||||
|
output:
|
||||||
|
*estimate* (float): The estimate for pi found here
|
||||||
|
"""
|
||||||
|
hits = sum(int(random()**2 + random()**2 <= 1) for _ in range(n))
|
||||||
|
estimate = 4 * hits / n
|
||||||
|
return estimate
|
||||||
|
|
||||||
|
|
||||||
|
async def estimate_pi_async(n):
|
||||||
|
"""
|
||||||
|
Estimates pi by throwing a point (x,y) randomly *n* times in
|
||||||
|
the unit square and counting the number of hits where
|
||||||
|
x^2 + Y^2 <= 1.
|
||||||
|
Pi is then approximated as 4 * no. hits / n.
|
||||||
|
input:
|
||||||
|
*n* (int): The number of times to throw the point
|
||||||
|
output:
|
||||||
|
*estimate* (float): The estimate for pi found here
|
||||||
|
|
||||||
|
**Note:**
|
||||||
|
This is an asynchronous implementation that throws
|
||||||
|
100 points before awaiting to relinquish control.
|
||||||
|
"""
|
||||||
|
hits = 0
|
||||||
|
for chunk in get_chunks_it(range(n), 100):
|
||||||
|
await asyncio.sleep(0) # Relinquish control so something else can run
|
||||||
|
hits += sum(int(random()**2 + random()**2 <= 1) for _ in chunk)
|
||||||
|
estimate = 4 * hits / n
|
||||||
|
return estimate
|
||||||
Loading…
Reference in New Issue