Compare commits
No commits in common. "7685c36677db532359e8125af943eabebc95dceb" and "16d4b96c558aeba0c27b8fdb77dae9e8a2d7f8ca" have entirely different histories.
7685c36677
...
16d4b96c55
38
Readme.md
38
Readme.md
|
|
@ -1,38 +0,0 @@
|
||||||
# Communication pattern exercises
|
|
||||||
|
|
||||||
These scripts were developed for the courses 31380 and 31725 by
|
|
||||||
- Lasse Orda (Initial implementation)
|
|
||||||
- Tue Vissing Jensen (Updates, maintainer)
|
|
||||||
|
|
||||||
## Included scripts
|
|
||||||
|
|
||||||
### RPC
|
|
||||||
|
|
||||||
`rpc_example_runner.py text1 text2 text3 text4` ---
|
|
||||||
Requires that `rpc/rpc_example_server.py` is running.
|
|
||||||
Returns a the list of text strings given, but in reversed order.
|
|
||||||
|
|
||||||
`rpc_sync_pi_runner.py N` ---
|
|
||||||
Requires that `rpc/rpc_pi_server.py` is running.
|
|
||||||
Estimate pi by throwing N points in the unit circle, with the server taking over half the work.
|
|
||||||
|
|
||||||
`rpc_async_pi_runner.py N` ---
|
|
||||||
Requires that `rpc/rpc_pi_server.py` is running.
|
|
||||||
Estimate pi by throwing N points in the unit circle, with the server taking over half the work simultaneously.
|
|
||||||
|
|
||||||
### Pub/Sub
|
|
||||||
|
|
||||||
`pub_server.py` ---
|
|
||||||
A server which periodically publishes the current time.
|
|
||||||
|
|
||||||
`sub_client.py` ---
|
|
||||||
Subscribes to the server's messages and prints them. Exits after 5 messages.
|
|
||||||
|
|
||||||
### Broadcast
|
|
||||||
|
|
||||||
`broadcast_receiver.py`
|
|
||||||
|
|
||||||
`broadcast_listener.py`
|
|
||||||
|
|
||||||
`broadcaster.py` ---
|
|
||||||
Periodically broadcasts
|
|
||||||
|
|
@ -1,43 +0,0 @@
|
||||||
"""
|
|
||||||
Script which listens for messages on a hardcoded port 8881
|
|
||||||
|
|
||||||
@Author: orda
|
|
||||||
"""
|
|
||||||
import socket
|
|
||||||
import sys
|
|
||||||
from parse import parse
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if len(sys.argv) > 1:
|
|
||||||
port = sys.argv[1]
|
|
||||||
int(port)
|
|
||||||
else:
|
|
||||||
port = 8881
|
|
||||||
|
|
||||||
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
||||||
my_socket.bind(('', port))
|
|
||||||
|
|
||||||
formatting_string = "Today's lottery number: {number}"
|
|
||||||
|
|
||||||
print('listener started...')
|
|
||||||
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
message, address = my_socket.recvfrom(port)
|
|
||||||
dmessage = message.decode('utf-8') # Decode to utf-8
|
|
||||||
print(f'received: {dmessage}, from: {address}')
|
|
||||||
# decoded will be a pair of a tuple and a dictionary which reflect the
|
|
||||||
# "reverse" of using .format on the first string.
|
|
||||||
decoded = parse(formatting_string, dmessage)
|
|
||||||
if decoded:
|
|
||||||
print(f' Decoded into: {decoded.named}')
|
|
||||||
print(f' Check that the string matches: {formatting_string.format(*decoded.fixed, **decoded.named)}')
|
|
||||||
finally:
|
|
||||||
my_socket.close()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -1,38 +0,0 @@
|
||||||
"""
|
|
||||||
Script which broadcasts random integers to a hardcoded port 8881
|
|
||||||
|
|
||||||
@Author: orda
|
|
||||||
"""
|
|
||||||
|
|
||||||
import socket
|
|
||||||
import random
|
|
||||||
import time
|
|
||||||
import sys
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if len(sys.argv) > 1:
|
|
||||||
port = sys.argv[1]
|
|
||||||
int(port)
|
|
||||||
else:
|
|
||||||
port = 8881
|
|
||||||
|
|
||||||
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
||||||
# Allow reuse in case we exited ungracefully
|
|
||||||
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
print('broadcaster started...')
|
|
||||||
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
# Make a random number and send it out.
|
|
||||||
number = random.randint(1, 101)
|
|
||||||
print("sending: ", number)
|
|
||||||
my_socket.sendto(f"Today's lottery number: {number}".encode('utf-8'), ('<broadcast>', 8881))
|
|
||||||
time.sleep(1)
|
|
||||||
finally:
|
|
||||||
my_socket.close()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
import zmq
|
|
||||||
|
|
||||||
ip = "localhost"
|
|
||||||
heartbeat_publish_port = 10002
|
|
||||||
|
|
||||||
|
|
||||||
# Get a context we can use to make sockets
|
|
||||||
context = zmq.Context()
|
|
||||||
# Socket to talk to server
|
|
||||||
socket = context.socket(zmq.SUB)
|
|
||||||
socket.connect(f"tcp://{ip}:{heartbeat_publish_port}")
|
|
||||||
|
|
||||||
topicfilter = "HEARTBEAT"
|
|
||||||
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
print(socket.recv_string())
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
import zmq
|
|
||||||
from xmlrpc.server import SimpleXMLRPCServer
|
|
||||||
|
|
||||||
# Other connect on the server port.
|
|
||||||
heartbeat_server_port = 10001
|
|
||||||
heartbeat_publish_port = 10002
|
|
||||||
|
|
||||||
# Make a context which we use to make sockets from
|
|
||||||
context = zmq.Context()
|
|
||||||
# Make a new socket. We want to publish on this socket.
|
|
||||||
socket = context.socket(zmq.PUB)
|
|
||||||
# Bind the socket (inside our program) to a port (on our machine)
|
|
||||||
# We can now send messages
|
|
||||||
socket.bind(f"tcp://*:{heartbeat_publish_port}")
|
|
||||||
|
|
||||||
|
|
||||||
# Make a function for others to call letting us know they are alive.
|
|
||||||
def send_heartbeat(sender: str):
|
|
||||||
print(f"Received heartbeat from {sender}")
|
|
||||||
# Publish who is alive now
|
|
||||||
socket.send_string(f"HEARTBEAT;{sender}")
|
|
||||||
# Return something just to show we succeeded
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
# Make an RPC server to serve that function to others
|
|
||||||
server = SimpleXMLRPCServer(('localhost', heartbeat_server_port))
|
|
||||||
# Register the function
|
|
||||||
server.register_function(send_heartbeat, 'send_heartbeat')
|
|
||||||
# Start up the server
|
|
||||||
server.serve_forever()
|
|
||||||
|
|
@ -1,11 +0,0 @@
|
||||||
from xmlrpc.client import ServerProxy
|
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
heartbeat_server_port = 10001
|
|
||||||
|
|
||||||
|
|
||||||
with ServerProxy(f'http://localhost:{heartbeat_server_port}') as proxy:
|
|
||||||
# Periodically send a heartbeat
|
|
||||||
while True:
|
|
||||||
proxy.send_heartbeat('BATTERY')
|
|
||||||
sleep(1)
|
|
||||||
|
|
@ -1,32 +0,0 @@
|
||||||
import zmq
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import random
|
|
||||||
|
|
||||||
port = "5556"
|
|
||||||
if len(sys.argv) > 1:
|
|
||||||
port = sys.argv[1]
|
|
||||||
int(port)
|
|
||||||
|
|
||||||
# Make a context which we use to make sockets from
|
|
||||||
context = zmq.Context()
|
|
||||||
# Make a new socket. We want to publish on this socket.
|
|
||||||
socket = context.socket(zmq.PUB)
|
|
||||||
# Bind the socket (inside our program) to a port (on our machine)
|
|
||||||
# We can now send messages
|
|
||||||
socket.bind(f"tcp://*:{port}")
|
|
||||||
|
|
||||||
topics = ('TIME', 'RANDOM')
|
|
||||||
messages = {}
|
|
||||||
while True:
|
|
||||||
# Time to publish the latest time!
|
|
||||||
messages['TIME'] = time.ctime()
|
|
||||||
messages['RANDOM'] = random.randint(1,10)
|
|
||||||
# Note the use of XXX_string here;
|
|
||||||
# the non-_stringy methods only work with bytes.
|
|
||||||
for topic in topics:
|
|
||||||
message = messages.get(topic, '')
|
|
||||||
if not message: continue
|
|
||||||
socket.send_string(f"{topic};{message}")
|
|
||||||
print(f"Published topic {topic}: {message}")
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
@ -1,47 +0,0 @@
|
||||||
import zmq
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
from zmq.utils.monitor import recv_monitor_message
|
|
||||||
|
|
||||||
port = "5556"
|
|
||||||
if len(sys.argv) > 1:
|
|
||||||
port = sys.argv[1]
|
|
||||||
int(port)
|
|
||||||
|
|
||||||
context = zmq.Context()
|
|
||||||
socket = context.socket(zmq.PUB)
|
|
||||||
socket.bind(f"tcp://*:{port}")
|
|
||||||
|
|
||||||
# Get a monitoring socket where we can sniff information about new subscribers.
|
|
||||||
monitor = socket.get_monitor_socket()
|
|
||||||
|
|
||||||
sub_list = set()
|
|
||||||
|
|
||||||
topic = "TIME"
|
|
||||||
while True:
|
|
||||||
# Run through monitoring messages and check if we have new subscribers
|
|
||||||
# Note you can delete this entire
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# We include a NOBLOCK flag here to not hang until a status message comes in.
|
|
||||||
# If no messages are ready, zmq.Again will be raised, which we catch below.
|
|
||||||
status = recv_monitor_message(monitor, flags=zmq.NOBLOCK)
|
|
||||||
print(f"Status: {status}")
|
|
||||||
if status['event'] == zmq.EVENT_ACCEPTED:
|
|
||||||
# New subscriber, add them to our list of subscribers.
|
|
||||||
print(f"Subscriber '{status['value']}' has joined :D")
|
|
||||||
sub_list.add(status['value'])
|
|
||||||
if status['event'] == zmq.EVENT_DISCONNECTED:
|
|
||||||
# Someone left, remove them from our list.
|
|
||||||
print(f"Subscriber '{status['value']}' has left :(")
|
|
||||||
sub_list.remove(status['value'])
|
|
||||||
except zmq.Again as e:
|
|
||||||
# No more new subscribers - let's stop looking for them
|
|
||||||
break
|
|
||||||
# Time to publish the latest time!
|
|
||||||
messagedata = time.ctime()
|
|
||||||
# Note the use of XXX_string here;
|
|
||||||
# the non-_string-y methods only work with bytes.
|
|
||||||
socket.send_string(f"{topic};{messagedata}")
|
|
||||||
print(f"Published topic {topic}: {messagedata} to subscribers: {sub_list}")
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
import sys
|
|
||||||
import zmq
|
|
||||||
|
|
||||||
ip = "localhost"
|
|
||||||
port = "5556"
|
|
||||||
if len(sys.argv) > 1:
|
|
||||||
port = sys.argv[1]
|
|
||||||
int(port)
|
|
||||||
|
|
||||||
# Socket to talk to server
|
|
||||||
context = zmq.Context()
|
|
||||||
socket = context.socket(zmq.SUB)
|
|
||||||
|
|
||||||
print(f"Collecting updates from time server at tcp://localhost:{port}")
|
|
||||||
socket.connect(f"tcp://{ip}:{port}")
|
|
||||||
|
|
||||||
# Filter by topic
|
|
||||||
topicfilters = ("TIME", "RANDOM")
|
|
||||||
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilters[0])
|
|
||||||
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilters[1])
|
|
||||||
|
|
||||||
# Process 5 updates
|
|
||||||
topic_list = []
|
|
||||||
for update_nbr in range(5):
|
|
||||||
string = socket.recv_string()
|
|
||||||
topic, messagedata = string.split(';')
|
|
||||||
topic_list.append(messagedata)
|
|
||||||
print(f"Received on topic {topic}: {messagedata}")
|
|
||||||
socket.close()
|
|
||||||
t_str = "\n".join(topic_list)
|
|
||||||
print(f"All the times we received: \n{t_str}")
|
|
||||||
|
|
@ -1,50 +0,0 @@
|
||||||
import sys
|
|
||||||
import zmq
|
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
"""
|
|
||||||
This version of the subscriber doesn't hang
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
ip = "localhost"
|
|
||||||
port = "5556"
|
|
||||||
if len(sys.argv) > 1:
|
|
||||||
port = sys.argv[1]
|
|
||||||
int(port)
|
|
||||||
|
|
||||||
# Make a context we can use to get a socket
|
|
||||||
context = zmq.Context()
|
|
||||||
# Grab a socket that can connect to the server
|
|
||||||
socket = context.socket(zmq.SUB)
|
|
||||||
|
|
||||||
# Connect to the server by saying where we can get our measurements from
|
|
||||||
print(f"Collecting updates from time server at tcp://localhost:{port}")
|
|
||||||
socket.connect(f"tcp://{ip}:{port}")
|
|
||||||
|
|
||||||
# Filter by topic - we are only interested in knowing about the time
|
|
||||||
topicfilter = "TIME"
|
|
||||||
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
|
|
||||||
|
|
||||||
# Process 5 updates
|
|
||||||
topic_list = []
|
|
||||||
# We need to keep track of how many we've received
|
|
||||||
# since the loop may end with no messages
|
|
||||||
try:
|
|
||||||
while len(topic_list) < 5:
|
|
||||||
try: # Need to use a try block since zmq uses a
|
|
||||||
# giving the flag NOBLOCK indicates to zmq that we don't want to
|
|
||||||
# hang waiting for a message.
|
|
||||||
string = socket.recv_string(zmq.NOBLOCK)
|
|
||||||
except zmq.Again:
|
|
||||||
print("No message this time :(")
|
|
||||||
continue
|
|
||||||
topic, messagedata = string.split(';')
|
|
||||||
topic_list.append(messagedata)
|
|
||||||
print(f"Received on topic {topic}: {messagedata}")
|
|
||||||
#
|
|
||||||
finally
|
|
||||||
sleep(0.2)
|
|
||||||
socket.close()
|
|
||||||
t_str = "\n".join(topic_list)
|
|
||||||
print(f"All the times we received: \n{t_str}")
|
|
||||||
|
|
@ -1,12 +0,0 @@
|
||||||
aiohttp==3.9.5
|
|
||||||
aiosignal==1.3.1
|
|
||||||
attrs==23.2.0
|
|
||||||
frozenlist==1.4.1
|
|
||||||
greenlet==3.0.3
|
|
||||||
idna==3.7
|
|
||||||
msgpack==1.0.8
|
|
||||||
multidict==6.0.5
|
|
||||||
parse==1.20.2
|
|
||||||
pynvim==0.5.0
|
|
||||||
pyzmq==26.0.3
|
|
||||||
yarl==1.9.4
|
|
||||||
Binary file not shown.
Binary file not shown.
|
|
@ -1,186 +0,0 @@
|
||||||
"""
|
|
||||||
XML-RPC Client with asyncio.
|
|
||||||
|
|
||||||
This module adapt the ``xmlrpc.client`` module of the standard library to
|
|
||||||
work with asyncio.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
import aiohttp
|
|
||||||
import inspect
|
|
||||||
import functools
|
|
||||||
|
|
||||||
from xmlrpc import client as xmlrpc
|
|
||||||
|
|
||||||
def coroutine(fn):
|
|
||||||
if inspect.iscoroutinefunction(fn):
|
|
||||||
return fn
|
|
||||||
|
|
||||||
@functools.wraps(fn)
|
|
||||||
async def _wrapper(*args, **kwargs):
|
|
||||||
return fn(*args, **kwargs)
|
|
||||||
|
|
||||||
return _wrapper
|
|
||||||
|
|
||||||
|
|
||||||
__ALL__ = ['ServerProxy', 'Fault', 'ProtocolError']
|
|
||||||
|
|
||||||
# you don't have to import xmlrpc.client from your code
|
|
||||||
Fault = xmlrpc.Fault
|
|
||||||
ProtocolError = xmlrpc.ProtocolError
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
PY35 = sys.version_info >= (3, 5)
|
|
||||||
|
|
||||||
|
|
||||||
class _Method:
|
|
||||||
# some magic to bind an XML-RPC method to an RPC server.
|
|
||||||
# supports "nested" methods (e.g. examples.getStateName)
|
|
||||||
def __init__(self, send, name):
|
|
||||||
self.__send = send
|
|
||||||
self.__name = name
|
|
||||||
|
|
||||||
def __getattr__(self, name):
|
|
||||||
return _Method(self.__send, "%s.%s" % (self.__name, name))
|
|
||||||
|
|
||||||
@coroutine
|
|
||||||
def __call__(self, *args):
|
|
||||||
ret = yield from self.__send(self.__name, args)
|
|
||||||
return ret
|
|
||||||
|
|
||||||
|
|
||||||
class AioTransport(xmlrpc.Transport):
|
|
||||||
"""
|
|
||||||
``xmlrpc.Transport`` subclass for asyncio support
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, session, use_https, *, use_datetime=False,
|
|
||||||
use_builtin_types=False, loop, headers=None, auth=None):
|
|
||||||
super().__init__(use_datetime, use_builtin_types)
|
|
||||||
self.use_https = use_https
|
|
||||||
self._loop = loop
|
|
||||||
self._session = session
|
|
||||||
|
|
||||||
self.auth = auth
|
|
||||||
|
|
||||||
if not headers:
|
|
||||||
headers = {'User-Agent': 'python/aioxmlrpc',
|
|
||||||
'Accept': 'text/xml',
|
|
||||||
'Content-Type': 'text/xml'}
|
|
||||||
|
|
||||||
self.headers = headers
|
|
||||||
|
|
||||||
@coroutine
|
|
||||||
def request(self, host, handler, request_body, verbose=False):
|
|
||||||
"""
|
|
||||||
Send the XML-RPC request, return the response.
|
|
||||||
This method is a coroutine.
|
|
||||||
"""
|
|
||||||
url = self._build_url(host, handler)
|
|
||||||
response = None
|
|
||||||
try:
|
|
||||||
response = yield from self._session.request(
|
|
||||||
'POST', url, headers=self.headers, data=request_body, auth=self.auth)
|
|
||||||
body = yield from response.text()
|
|
||||||
if response.status != 200:
|
|
||||||
raise ProtocolError(url, response.status,
|
|
||||||
body, response.headers)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except ProtocolError:
|
|
||||||
raise
|
|
||||||
except Exception as exc:
|
|
||||||
log.error('Unexpected error', exc_info=True)
|
|
||||||
if response is not None:
|
|
||||||
errcode = response.status
|
|
||||||
headers = response.headers
|
|
||||||
else:
|
|
||||||
errcode = 0
|
|
||||||
headers = {}
|
|
||||||
|
|
||||||
raise ProtocolError(url, errcode, str(exc), headers)
|
|
||||||
return self.parse_response(body)
|
|
||||||
|
|
||||||
def parse_response(self, body):
|
|
||||||
"""
|
|
||||||
Parse the xmlrpc response.
|
|
||||||
"""
|
|
||||||
p, u = self.getparser()
|
|
||||||
p.feed(body)
|
|
||||||
p.close()
|
|
||||||
return u.close()
|
|
||||||
|
|
||||||
def _build_url(self, host, handler):
|
|
||||||
"""
|
|
||||||
Build a url for our request based on the host, handler and use_http
|
|
||||||
property
|
|
||||||
"""
|
|
||||||
scheme = 'https' if self.use_https else 'http'
|
|
||||||
return '%s://%s%s' % (scheme, host, handler)
|
|
||||||
|
|
||||||
|
|
||||||
class ServerProxy(xmlrpc.ServerProxy):
|
|
||||||
"""
|
|
||||||
``xmlrpc.ServerProxy`` subclass for asyncio support
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, uri, session=None, encoding=None, verbose=False,
|
|
||||||
allow_none=False, use_datetime=False, use_builtin_types=False,
|
|
||||||
loop=None, auth=None, headers=None):
|
|
||||||
self._loop = loop or asyncio.get_event_loop()
|
|
||||||
|
|
||||||
if session:
|
|
||||||
self._session = session
|
|
||||||
self._close_session = False
|
|
||||||
else:
|
|
||||||
self._close_session = True
|
|
||||||
self._session = aiohttp.ClientSession(loop=self._loop)
|
|
||||||
|
|
||||||
transport = AioTransport(use_https=uri.startswith('https://'),
|
|
||||||
loop=self._loop,
|
|
||||||
session=self._session,
|
|
||||||
auth=auth,
|
|
||||||
headers=headers)
|
|
||||||
|
|
||||||
super().__init__(uri, transport, encoding, verbose, allow_none,
|
|
||||||
use_datetime, use_builtin_types)
|
|
||||||
|
|
||||||
@coroutine
|
|
||||||
def __request(self, methodname, params):
|
|
||||||
# call a method on the remote server
|
|
||||||
request = xmlrpc.dumps(params, methodname, encoding=self.__encoding,
|
|
||||||
allow_none=self.__allow_none).encode(self.__encoding)
|
|
||||||
|
|
||||||
response = yield from self.__transport.request(
|
|
||||||
self.__host,
|
|
||||||
self.__handler,
|
|
||||||
request,
|
|
||||||
verbose=self.__verbose
|
|
||||||
)
|
|
||||||
|
|
||||||
if len(response) == 1:
|
|
||||||
response = response[0]
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
@coroutine
|
|
||||||
def close(self):
|
|
||||||
if self._close_session:
|
|
||||||
yield from self._session.close()
|
|
||||||
|
|
||||||
def __getattr__(self, name):
|
|
||||||
return _Method(self.__request, name)
|
|
||||||
|
|
||||||
if PY35:
|
|
||||||
|
|
||||||
@coroutine
|
|
||||||
def __aenter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
@coroutine
|
|
||||||
def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
||||||
if self._close_session:
|
|
||||||
yield from self._session.close()
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
from xmlrpc.client import ServerProxy
|
|
||||||
import sys
|
|
||||||
|
|
||||||
|
|
||||||
# Create the proxy in a nice way so it gets closed when we are done.
|
|
||||||
with ServerProxy('http://localhost:9000') as proxy:
|
|
||||||
# Ensure we got enough arguments coming in
|
|
||||||
assert len(sys.argv) >= 3, "Must supply at least 2 arguments.\n" + \
|
|
||||||
"Usage: rpc_sync_client.py function argument1 [argument2 ...]"
|
|
||||||
# Split incoming arguments into the name of the function to call and
|
|
||||||
# the arguments to supply to that function. Note that sys.argv[0] is
|
|
||||||
# the name of the script itself.
|
|
||||||
scriptname, function, *arguments = sys.argv
|
|
||||||
# Get the indicated remote function.
|
|
||||||
remote_function = getattr(proxy, function)
|
|
||||||
# Print the result of executing the remote function.
|
|
||||||
print(remote_function(arguments))
|
|
||||||
|
|
@ -1,26 +0,0 @@
|
||||||
import logging
|
|
||||||
import time
|
|
||||||
from xmlrpc.server import SimpleXMLRPCServer
|
|
||||||
|
|
||||||
|
|
||||||
def reverse_list(l):
|
|
||||||
logging.debug(f'Call received: reverse_list({l!r}), calculating for 1 second')
|
|
||||||
time.sleep(1)
|
|
||||||
return l[::-1]
|
|
||||||
|
|
||||||
def allcaps_list(l):
|
|
||||||
logging.debug(f'Call received: allcaps({l!r}), calculating for 1 second')
|
|
||||||
return [i.upper() for i in l]
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True)
|
|
||||||
# Register the function we are serving
|
|
||||||
server.register_function(reverse_list, 'reverse')
|
|
||||||
server.register_function(allcaps_list, 'allcaps')
|
|
||||||
try:
|
|
||||||
print("Use Control-C to exit")
|
|
||||||
# Start serving our functions
|
|
||||||
server.serve_forever()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("Exiting")
|
|
||||||
|
|
@ -1,58 +0,0 @@
|
||||||
import asyncio
|
|
||||||
import sys
|
|
||||||
from statistics import mean
|
|
||||||
from math import pi
|
|
||||||
from time import time
|
|
||||||
# Import the asynchronous version of ServerProxy
|
|
||||||
from aioxmlrpc.client import ServerProxy
|
|
||||||
# Import the asynchronous version of estimate_pi
|
|
||||||
from util import estimate_pi_async
|
|
||||||
|
|
||||||
|
|
||||||
# Asynchronous call to the slave
|
|
||||||
async def remote_estimate(n):
|
|
||||||
print(f"Requesting that slave estimate pi with {n} throws.")
|
|
||||||
# Create the proxy in a nice way so it gets closed when we are done.
|
|
||||||
async with ServerProxy('http://localhost:9000') as proxy:
|
|
||||||
pi_remote = await proxy.estimate_pi(n)
|
|
||||||
# print(f"Result of remote estimation: pi={pi_remote:.010f}")
|
|
||||||
print(pi_remote)
|
|
||||||
return pi_remote
|
|
||||||
|
|
||||||
|
|
||||||
# Asynchronous call to ourselves
|
|
||||||
async def local_estimate(n):
|
|
||||||
print(f"Master begins estimating pi with {n} throws.")
|
|
||||||
pi_local = await estimate_pi_async(n)
|
|
||||||
print(f"Result of local estimation: pi={pi_local:.010f}")
|
|
||||||
return pi_local
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# Ensure we got enough arguments coming in
|
|
||||||
assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \
|
|
||||||
"Usage: rpc_sync_pi_master.py N [argument2 ...]"
|
|
||||||
# Split incoming arguments into the number of throws to use.
|
|
||||||
# Note that sys.argv[0] is the name of the script itself.
|
|
||||||
scriptname, N, *arguments = sys.argv
|
|
||||||
|
|
||||||
# split the workload between ourselves and the remote
|
|
||||||
# note: // is integer division
|
|
||||||
N = int(N)
|
|
||||||
N_remote = N // 2
|
|
||||||
N_local = N - N_remote
|
|
||||||
start_time = time()
|
|
||||||
|
|
||||||
# ASYNC MAGIC BEGIN
|
|
||||||
# Gather up all tasks we have to do, and tell the event loop to
|
|
||||||
# run until they are complete.
|
|
||||||
futures = asyncio.gather(remote_estimate(N_remote), local_estimate(N_local))
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
results = loop.run_until_complete(futures)
|
|
||||||
# ASYNC MAGIC END
|
|
||||||
|
|
||||||
pi_remote, pi_local = results
|
|
||||||
|
|
||||||
pi_m = mean([pi_remote, pi_local])
|
|
||||||
print(f"Mean estimation result: pi ={pi_m:.010f}")
|
|
||||||
print(f"Relative error: {100*(pi_m/pi - 1):.010f}%")
|
|
||||||
print(f"Total time to execute: {time() - start_time} sec")
|
|
||||||
|
|
@ -1,21 +0,0 @@
|
||||||
import logging
|
|
||||||
from xmlrpc.server import SimpleXMLRPCServer
|
|
||||||
from util import estimate_pi
|
|
||||||
|
|
||||||
|
|
||||||
def local_estimate_pi(n, *args):
|
|
||||||
logging.debug(f'Call received: estimate_pi({n!r})')
|
|
||||||
return estimate_pi(n)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
|
||||||
server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True)
|
|
||||||
# Register the function we are serving
|
|
||||||
server.register_function(local_estimate_pi, 'estimate_pi')
|
|
||||||
try:
|
|
||||||
print("Use Control-C to exit")
|
|
||||||
# Start serving our functions
|
|
||||||
server.serve_forever()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("Exiting")
|
|
||||||
|
|
@ -1,35 +0,0 @@
|
||||||
import sys
|
|
||||||
from util import estimate_pi
|
|
||||||
from statistics import mean
|
|
||||||
from math import pi
|
|
||||||
from time import time
|
|
||||||
# Import the synchronous version of ServerProxy
|
|
||||||
from xmlrpc.client import ServerProxy
|
|
||||||
|
|
||||||
|
|
||||||
# Create the proxy in a nice way so it gets closed when we are done.
|
|
||||||
with ServerProxy('http://localhost:9000') as proxy:
|
|
||||||
# Ensure we got enough arguments coming in
|
|
||||||
assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \
|
|
||||||
"Usage: rpc_sync_pi_master.py N [argument2 ...]"
|
|
||||||
# Split incoming arguments into the number of throws to use.
|
|
||||||
# Note that sys.argv[0] is the name of the script itself.
|
|
||||||
scriptname, N, *arguments = sys.argv
|
|
||||||
|
|
||||||
# split the workload between ourselves and the remote
|
|
||||||
# note: // is integer division
|
|
||||||
N = int(N)
|
|
||||||
N_remote = N // 2
|
|
||||||
N_local = N - N_remote
|
|
||||||
start_time = time()
|
|
||||||
|
|
||||||
print(f"Requesting that slave estimate pi with {N_remote} throws.")
|
|
||||||
pi_remote = proxy.estimate_pi(N_remote)
|
|
||||||
print(f"Result of remote estimation: pi={pi_remote:.010f}")
|
|
||||||
print(f"Master begins estimating pi with {N_local} throws.")
|
|
||||||
pi_local = estimate_pi(N_local)
|
|
||||||
print(f"Result of local estimation: pi={pi_local:.010f}")
|
|
||||||
pi_m = mean([pi_remote, pi_local])
|
|
||||||
print(f"Mean estimation result: pi ={pi_m:.010f}")
|
|
||||||
print(f"Relative error: {100*(pi_m/pi - 1):.010f}%")
|
|
||||||
print(f"Total time to execute: {time() - start_time} sec")
|
|
||||||
55
rpc/util.py
55
rpc/util.py
|
|
@ -1,55 +0,0 @@
|
||||||
import asyncio
|
|
||||||
from random import random
|
|
||||||
from itertools import chain, islice
|
|
||||||
|
|
||||||
|
|
||||||
def get_chunks_it(l, n):
|
|
||||||
""" Chunks an iterator `l` in size `n`
|
|
||||||
Args:
|
|
||||||
l (Iterator[Any]): an iterator
|
|
||||||
n (int): size of
|
|
||||||
Returns:
|
|
||||||
Generator[Any]
|
|
||||||
"""
|
|
||||||
iterator = iter(l)
|
|
||||||
for first in iterator:
|
|
||||||
yield chain([first], islice(iterator, n - 1))
|
|
||||||
|
|
||||||
|
|
||||||
def estimate_pi(n):
|
|
||||||
"""
|
|
||||||
Estimates pi by throwing a point (x,y) randomly *n* times in
|
|
||||||
the unit square and counting the number of hits where
|
|
||||||
x^2 + Y^2 <= 1.
|
|
||||||
Pi is then approximated as 4 * no. hits / n.
|
|
||||||
input:
|
|
||||||
*n* (int): The number of times to throw the point
|
|
||||||
output:
|
|
||||||
*estimate* (float): The estimate for pi found here
|
|
||||||
"""
|
|
||||||
hits = sum(int(random()**2 + random()**2 <= 1) for _ in range(n))
|
|
||||||
estimate = 4 * hits / n
|
|
||||||
return estimate
|
|
||||||
|
|
||||||
|
|
||||||
async def estimate_pi_async(n):
|
|
||||||
"""
|
|
||||||
Estimates pi by throwing a point (x,y) randomly *n* times in
|
|
||||||
the unit square and counting the number of hits where
|
|
||||||
x^2 + Y^2 <= 1.
|
|
||||||
Pi is then approximated as 4 * no. hits / n.
|
|
||||||
input:
|
|
||||||
*n* (int): The number of times to throw the point
|
|
||||||
output:
|
|
||||||
*estimate* (float): The estimate for pi found here
|
|
||||||
|
|
||||||
**Note:**
|
|
||||||
This is an asynchronous implementation that throws
|
|
||||||
100 points before awaiting to relinquish control.
|
|
||||||
"""
|
|
||||||
hits = 0
|
|
||||||
for chunk in get_chunks_it(range(n), 100):
|
|
||||||
await asyncio.sleep(0) # Relinquish control so something else can run
|
|
||||||
hits += sum(int(random()**2 + random()**2 <= 1) for _ in chunk)
|
|
||||||
estimate = 4 * hits / n
|
|
||||||
return estimate
|
|
||||||
Loading…
Reference in New Issue