Compare commits
4 Commits
16d4b96c55
...
7685c36677
| Author | SHA1 | Date |
|---|---|---|
|
|
7685c36677 | |
|
|
1044d6224a | |
|
|
9aec9332fc | |
|
|
e00a4587f9 |
|
|
@ -0,0 +1,38 @@
|
|||
# Communication pattern exercises
|
||||
|
||||
These scripts were developed for the courses 31380 and 31725 by
|
||||
- Lasse Orda (Initial implementation)
|
||||
- Tue Vissing Jensen (Updates, maintainer)
|
||||
|
||||
## Included scripts
|
||||
|
||||
### RPC
|
||||
|
||||
`rpc_example_runner.py text1 text2 text3 text4` ---
|
||||
Requires that `rpc/rpc_example_server.py` is running.
|
||||
Returns a the list of text strings given, but in reversed order.
|
||||
|
||||
`rpc_sync_pi_runner.py N` ---
|
||||
Requires that `rpc/rpc_pi_server.py` is running.
|
||||
Estimate pi by throwing N points in the unit circle, with the server taking over half the work.
|
||||
|
||||
`rpc_async_pi_runner.py N` ---
|
||||
Requires that `rpc/rpc_pi_server.py` is running.
|
||||
Estimate pi by throwing N points in the unit circle, with the server taking over half the work simultaneously.
|
||||
|
||||
### Pub/Sub
|
||||
|
||||
`pub_server.py` ---
|
||||
A server which periodically publishes the current time.
|
||||
|
||||
`sub_client.py` ---
|
||||
Subscribes to the server's messages and prints them. Exits after 5 messages.
|
||||
|
||||
### Broadcast
|
||||
|
||||
`broadcast_receiver.py`
|
||||
|
||||
`broadcast_listener.py`
|
||||
|
||||
`broadcaster.py` ---
|
||||
Periodically broadcasts
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
"""
|
||||
Script which listens for messages on a hardcoded port 8881
|
||||
|
||||
@Author: orda
|
||||
"""
|
||||
import socket
|
||||
import sys
|
||||
from parse import parse
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) > 1:
|
||||
port = sys.argv[1]
|
||||
int(port)
|
||||
else:
|
||||
port = 8881
|
||||
|
||||
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
my_socket.bind(('', port))
|
||||
|
||||
formatting_string = "Today's lottery number: {number}"
|
||||
|
||||
print('listener started...')
|
||||
|
||||
try:
|
||||
while True:
|
||||
message, address = my_socket.recvfrom(port)
|
||||
dmessage = message.decode('utf-8') # Decode to utf-8
|
||||
print(f'received: {dmessage}, from: {address}')
|
||||
# decoded will be a pair of a tuple and a dictionary which reflect the
|
||||
# "reverse" of using .format on the first string.
|
||||
decoded = parse(formatting_string, dmessage)
|
||||
if decoded:
|
||||
print(f' Decoded into: {decoded.named}')
|
||||
print(f' Check that the string matches: {formatting_string.format(*decoded.fixed, **decoded.named)}')
|
||||
finally:
|
||||
my_socket.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
"""
|
||||
Script which broadcasts random integers to a hardcoded port 8881
|
||||
|
||||
@Author: orda
|
||||
"""
|
||||
|
||||
import socket
|
||||
import random
|
||||
import time
|
||||
import sys
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) > 1:
|
||||
port = sys.argv[1]
|
||||
int(port)
|
||||
else:
|
||||
port = 8881
|
||||
|
||||
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
# Allow reuse in case we exited ungracefully
|
||||
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
print('broadcaster started...')
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Make a random number and send it out.
|
||||
number = random.randint(1, 101)
|
||||
print("sending: ", number)
|
||||
my_socket.sendto(f"Today's lottery number: {number}".encode('utf-8'), ('<broadcast>', 8881))
|
||||
time.sleep(1)
|
||||
finally:
|
||||
my_socket.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
import zmq
|
||||
|
||||
ip = "localhost"
|
||||
heartbeat_publish_port = 10002
|
||||
|
||||
|
||||
# Get a context we can use to make sockets
|
||||
context = zmq.Context()
|
||||
# Socket to talk to server
|
||||
socket = context.socket(zmq.SUB)
|
||||
socket.connect(f"tcp://{ip}:{heartbeat_publish_port}")
|
||||
|
||||
topicfilter = "HEARTBEAT"
|
||||
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
|
||||
|
||||
while True:
|
||||
print(socket.recv_string())
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
import zmq
|
||||
from xmlrpc.server import SimpleXMLRPCServer
|
||||
|
||||
# Other connect on the server port.
|
||||
heartbeat_server_port = 10001
|
||||
heartbeat_publish_port = 10002
|
||||
|
||||
# Make a context which we use to make sockets from
|
||||
context = zmq.Context()
|
||||
# Make a new socket. We want to publish on this socket.
|
||||
socket = context.socket(zmq.PUB)
|
||||
# Bind the socket (inside our program) to a port (on our machine)
|
||||
# We can now send messages
|
||||
socket.bind(f"tcp://*:{heartbeat_publish_port}")
|
||||
|
||||
|
||||
# Make a function for others to call letting us know they are alive.
|
||||
def send_heartbeat(sender: str):
|
||||
print(f"Received heartbeat from {sender}")
|
||||
# Publish who is alive now
|
||||
socket.send_string(f"HEARTBEAT;{sender}")
|
||||
# Return something just to show we succeeded
|
||||
return 0
|
||||
|
||||
|
||||
# Make an RPC server to serve that function to others
|
||||
server = SimpleXMLRPCServer(('localhost', heartbeat_server_port))
|
||||
# Register the function
|
||||
server.register_function(send_heartbeat, 'send_heartbeat')
|
||||
# Start up the server
|
||||
server.serve_forever()
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
from xmlrpc.client import ServerProxy
|
||||
from time import sleep
|
||||
|
||||
heartbeat_server_port = 10001
|
||||
|
||||
|
||||
with ServerProxy(f'http://localhost:{heartbeat_server_port}') as proxy:
|
||||
# Periodically send a heartbeat
|
||||
while True:
|
||||
proxy.send_heartbeat('BATTERY')
|
||||
sleep(1)
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
import zmq
|
||||
import sys
|
||||
import time
|
||||
import random
|
||||
|
||||
port = "5556"
|
||||
if len(sys.argv) > 1:
|
||||
port = sys.argv[1]
|
||||
int(port)
|
||||
|
||||
# Make a context which we use to make sockets from
|
||||
context = zmq.Context()
|
||||
# Make a new socket. We want to publish on this socket.
|
||||
socket = context.socket(zmq.PUB)
|
||||
# Bind the socket (inside our program) to a port (on our machine)
|
||||
# We can now send messages
|
||||
socket.bind(f"tcp://*:{port}")
|
||||
|
||||
topics = ('TIME', 'RANDOM')
|
||||
messages = {}
|
||||
while True:
|
||||
# Time to publish the latest time!
|
||||
messages['TIME'] = time.ctime()
|
||||
messages['RANDOM'] = random.randint(1,10)
|
||||
# Note the use of XXX_string here;
|
||||
# the non-_stringy methods only work with bytes.
|
||||
for topic in topics:
|
||||
message = messages.get(topic, '')
|
||||
if not message: continue
|
||||
socket.send_string(f"{topic};{message}")
|
||||
print(f"Published topic {topic}: {message}")
|
||||
time.sleep(1)
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
import zmq
|
||||
import sys
|
||||
import time
|
||||
from zmq.utils.monitor import recv_monitor_message
|
||||
|
||||
port = "5556"
|
||||
if len(sys.argv) > 1:
|
||||
port = sys.argv[1]
|
||||
int(port)
|
||||
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.PUB)
|
||||
socket.bind(f"tcp://*:{port}")
|
||||
|
||||
# Get a monitoring socket where we can sniff information about new subscribers.
|
||||
monitor = socket.get_monitor_socket()
|
||||
|
||||
sub_list = set()
|
||||
|
||||
topic = "TIME"
|
||||
while True:
|
||||
# Run through monitoring messages and check if we have new subscribers
|
||||
# Note you can delete this entire
|
||||
while True:
|
||||
try:
|
||||
# We include a NOBLOCK flag here to not hang until a status message comes in.
|
||||
# If no messages are ready, zmq.Again will be raised, which we catch below.
|
||||
status = recv_monitor_message(monitor, flags=zmq.NOBLOCK)
|
||||
print(f"Status: {status}")
|
||||
if status['event'] == zmq.EVENT_ACCEPTED:
|
||||
# New subscriber, add them to our list of subscribers.
|
||||
print(f"Subscriber '{status['value']}' has joined :D")
|
||||
sub_list.add(status['value'])
|
||||
if status['event'] == zmq.EVENT_DISCONNECTED:
|
||||
# Someone left, remove them from our list.
|
||||
print(f"Subscriber '{status['value']}' has left :(")
|
||||
sub_list.remove(status['value'])
|
||||
except zmq.Again as e:
|
||||
# No more new subscribers - let's stop looking for them
|
||||
break
|
||||
# Time to publish the latest time!
|
||||
messagedata = time.ctime()
|
||||
# Note the use of XXX_string here;
|
||||
# the non-_string-y methods only work with bytes.
|
||||
socket.send_string(f"{topic};{messagedata}")
|
||||
print(f"Published topic {topic}: {messagedata} to subscribers: {sub_list}")
|
||||
time.sleep(1)
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
import sys
|
||||
import zmq
|
||||
|
||||
ip = "localhost"
|
||||
port = "5556"
|
||||
if len(sys.argv) > 1:
|
||||
port = sys.argv[1]
|
||||
int(port)
|
||||
|
||||
# Socket to talk to server
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.SUB)
|
||||
|
||||
print(f"Collecting updates from time server at tcp://localhost:{port}")
|
||||
socket.connect(f"tcp://{ip}:{port}")
|
||||
|
||||
# Filter by topic
|
||||
topicfilters = ("TIME", "RANDOM")
|
||||
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilters[0])
|
||||
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilters[1])
|
||||
|
||||
# Process 5 updates
|
||||
topic_list = []
|
||||
for update_nbr in range(5):
|
||||
string = socket.recv_string()
|
||||
topic, messagedata = string.split(';')
|
||||
topic_list.append(messagedata)
|
||||
print(f"Received on topic {topic}: {messagedata}")
|
||||
socket.close()
|
||||
t_str = "\n".join(topic_list)
|
||||
print(f"All the times we received: \n{t_str}")
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
import sys
|
||||
import zmq
|
||||
from time import sleep
|
||||
|
||||
"""
|
||||
This version of the subscriber doesn't hang
|
||||
"""
|
||||
|
||||
|
||||
ip = "localhost"
|
||||
port = "5556"
|
||||
if len(sys.argv) > 1:
|
||||
port = sys.argv[1]
|
||||
int(port)
|
||||
|
||||
# Make a context we can use to get a socket
|
||||
context = zmq.Context()
|
||||
# Grab a socket that can connect to the server
|
||||
socket = context.socket(zmq.SUB)
|
||||
|
||||
# Connect to the server by saying where we can get our measurements from
|
||||
print(f"Collecting updates from time server at tcp://localhost:{port}")
|
||||
socket.connect(f"tcp://{ip}:{port}")
|
||||
|
||||
# Filter by topic - we are only interested in knowing about the time
|
||||
topicfilter = "TIME"
|
||||
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
|
||||
|
||||
# Process 5 updates
|
||||
topic_list = []
|
||||
# We need to keep track of how many we've received
|
||||
# since the loop may end with no messages
|
||||
try:
|
||||
while len(topic_list) < 5:
|
||||
try: # Need to use a try block since zmq uses a
|
||||
# giving the flag NOBLOCK indicates to zmq that we don't want to
|
||||
# hang waiting for a message.
|
||||
string = socket.recv_string(zmq.NOBLOCK)
|
||||
except zmq.Again:
|
||||
print("No message this time :(")
|
||||
continue
|
||||
topic, messagedata = string.split(';')
|
||||
topic_list.append(messagedata)
|
||||
print(f"Received on topic {topic}: {messagedata}")
|
||||
#
|
||||
finally
|
||||
sleep(0.2)
|
||||
socket.close()
|
||||
t_str = "\n".join(topic_list)
|
||||
print(f"All the times we received: \n{t_str}")
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
aiohttp==3.9.5
|
||||
aiosignal==1.3.1
|
||||
attrs==23.2.0
|
||||
frozenlist==1.4.1
|
||||
greenlet==3.0.3
|
||||
idna==3.7
|
||||
msgpack==1.0.8
|
||||
multidict==6.0.5
|
||||
parse==1.20.2
|
||||
pynvim==0.5.0
|
||||
pyzmq==26.0.3
|
||||
yarl==1.9.4
|
||||
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,186 @@
|
|||
"""
|
||||
XML-RPC Client with asyncio.
|
||||
|
||||
This module adapt the ``xmlrpc.client`` module of the standard library to
|
||||
work with asyncio.
|
||||
|
||||
"""
|
||||
|
||||
import sys
|
||||
import asyncio
|
||||
import logging
|
||||
import aiohttp
|
||||
import inspect
|
||||
import functools
|
||||
|
||||
from xmlrpc import client as xmlrpc
|
||||
|
||||
def coroutine(fn):
|
||||
if inspect.iscoroutinefunction(fn):
|
||||
return fn
|
||||
|
||||
@functools.wraps(fn)
|
||||
async def _wrapper(*args, **kwargs):
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
return _wrapper
|
||||
|
||||
|
||||
__ALL__ = ['ServerProxy', 'Fault', 'ProtocolError']
|
||||
|
||||
# you don't have to import xmlrpc.client from your code
|
||||
Fault = xmlrpc.Fault
|
||||
ProtocolError = xmlrpc.ProtocolError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
PY35 = sys.version_info >= (3, 5)
|
||||
|
||||
|
||||
class _Method:
|
||||
# some magic to bind an XML-RPC method to an RPC server.
|
||||
# supports "nested" methods (e.g. examples.getStateName)
|
||||
def __init__(self, send, name):
|
||||
self.__send = send
|
||||
self.__name = name
|
||||
|
||||
def __getattr__(self, name):
|
||||
return _Method(self.__send, "%s.%s" % (self.__name, name))
|
||||
|
||||
@coroutine
|
||||
def __call__(self, *args):
|
||||
ret = yield from self.__send(self.__name, args)
|
||||
return ret
|
||||
|
||||
|
||||
class AioTransport(xmlrpc.Transport):
|
||||
"""
|
||||
``xmlrpc.Transport`` subclass for asyncio support
|
||||
"""
|
||||
|
||||
def __init__(self, session, use_https, *, use_datetime=False,
|
||||
use_builtin_types=False, loop, headers=None, auth=None):
|
||||
super().__init__(use_datetime, use_builtin_types)
|
||||
self.use_https = use_https
|
||||
self._loop = loop
|
||||
self._session = session
|
||||
|
||||
self.auth = auth
|
||||
|
||||
if not headers:
|
||||
headers = {'User-Agent': 'python/aioxmlrpc',
|
||||
'Accept': 'text/xml',
|
||||
'Content-Type': 'text/xml'}
|
||||
|
||||
self.headers = headers
|
||||
|
||||
@coroutine
|
||||
def request(self, host, handler, request_body, verbose=False):
|
||||
"""
|
||||
Send the XML-RPC request, return the response.
|
||||
This method is a coroutine.
|
||||
"""
|
||||
url = self._build_url(host, handler)
|
||||
response = None
|
||||
try:
|
||||
response = yield from self._session.request(
|
||||
'POST', url, headers=self.headers, data=request_body, auth=self.auth)
|
||||
body = yield from response.text()
|
||||
if response.status != 200:
|
||||
raise ProtocolError(url, response.status,
|
||||
body, response.headers)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except ProtocolError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
log.error('Unexpected error', exc_info=True)
|
||||
if response is not None:
|
||||
errcode = response.status
|
||||
headers = response.headers
|
||||
else:
|
||||
errcode = 0
|
||||
headers = {}
|
||||
|
||||
raise ProtocolError(url, errcode, str(exc), headers)
|
||||
return self.parse_response(body)
|
||||
|
||||
def parse_response(self, body):
|
||||
"""
|
||||
Parse the xmlrpc response.
|
||||
"""
|
||||
p, u = self.getparser()
|
||||
p.feed(body)
|
||||
p.close()
|
||||
return u.close()
|
||||
|
||||
def _build_url(self, host, handler):
|
||||
"""
|
||||
Build a url for our request based on the host, handler and use_http
|
||||
property
|
||||
"""
|
||||
scheme = 'https' if self.use_https else 'http'
|
||||
return '%s://%s%s' % (scheme, host, handler)
|
||||
|
||||
|
||||
class ServerProxy(xmlrpc.ServerProxy):
|
||||
"""
|
||||
``xmlrpc.ServerProxy`` subclass for asyncio support
|
||||
"""
|
||||
|
||||
def __init__(self, uri, session=None, encoding=None, verbose=False,
|
||||
allow_none=False, use_datetime=False, use_builtin_types=False,
|
||||
loop=None, auth=None, headers=None):
|
||||
self._loop = loop or asyncio.get_event_loop()
|
||||
|
||||
if session:
|
||||
self._session = session
|
||||
self._close_session = False
|
||||
else:
|
||||
self._close_session = True
|
||||
self._session = aiohttp.ClientSession(loop=self._loop)
|
||||
|
||||
transport = AioTransport(use_https=uri.startswith('https://'),
|
||||
loop=self._loop,
|
||||
session=self._session,
|
||||
auth=auth,
|
||||
headers=headers)
|
||||
|
||||
super().__init__(uri, transport, encoding, verbose, allow_none,
|
||||
use_datetime, use_builtin_types)
|
||||
|
||||
@coroutine
|
||||
def __request(self, methodname, params):
|
||||
# call a method on the remote server
|
||||
request = xmlrpc.dumps(params, methodname, encoding=self.__encoding,
|
||||
allow_none=self.__allow_none).encode(self.__encoding)
|
||||
|
||||
response = yield from self.__transport.request(
|
||||
self.__host,
|
||||
self.__handler,
|
||||
request,
|
||||
verbose=self.__verbose
|
||||
)
|
||||
|
||||
if len(response) == 1:
|
||||
response = response[0]
|
||||
|
||||
return response
|
||||
|
||||
@coroutine
|
||||
def close(self):
|
||||
if self._close_session:
|
||||
yield from self._session.close()
|
||||
|
||||
def __getattr__(self, name):
|
||||
return _Method(self.__request, name)
|
||||
|
||||
if PY35:
|
||||
|
||||
@coroutine
|
||||
def __aenter__(self):
|
||||
return self
|
||||
|
||||
@coroutine
|
||||
def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
if self._close_session:
|
||||
yield from self._session.close()
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
from xmlrpc.client import ServerProxy
|
||||
import sys
|
||||
|
||||
|
||||
# Create the proxy in a nice way so it gets closed when we are done.
|
||||
with ServerProxy('http://localhost:9000') as proxy:
|
||||
# Ensure we got enough arguments coming in
|
||||
assert len(sys.argv) >= 3, "Must supply at least 2 arguments.\n" + \
|
||||
"Usage: rpc_sync_client.py function argument1 [argument2 ...]"
|
||||
# Split incoming arguments into the name of the function to call and
|
||||
# the arguments to supply to that function. Note that sys.argv[0] is
|
||||
# the name of the script itself.
|
||||
scriptname, function, *arguments = sys.argv
|
||||
# Get the indicated remote function.
|
||||
remote_function = getattr(proxy, function)
|
||||
# Print the result of executing the remote function.
|
||||
print(remote_function(arguments))
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
import logging
|
||||
import time
|
||||
from xmlrpc.server import SimpleXMLRPCServer
|
||||
|
||||
|
||||
def reverse_list(l):
|
||||
logging.debug(f'Call received: reverse_list({l!r}), calculating for 1 second')
|
||||
time.sleep(1)
|
||||
return l[::-1]
|
||||
|
||||
def allcaps_list(l):
|
||||
logging.debug(f'Call received: allcaps({l!r}), calculating for 1 second')
|
||||
return [i.upper() for i in l]
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True)
|
||||
# Register the function we are serving
|
||||
server.register_function(reverse_list, 'reverse')
|
||||
server.register_function(allcaps_list, 'allcaps')
|
||||
try:
|
||||
print("Use Control-C to exit")
|
||||
# Start serving our functions
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("Exiting")
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
import asyncio
|
||||
import sys
|
||||
from statistics import mean
|
||||
from math import pi
|
||||
from time import time
|
||||
# Import the asynchronous version of ServerProxy
|
||||
from aioxmlrpc.client import ServerProxy
|
||||
# Import the asynchronous version of estimate_pi
|
||||
from util import estimate_pi_async
|
||||
|
||||
|
||||
# Asynchronous call to the slave
|
||||
async def remote_estimate(n):
|
||||
print(f"Requesting that slave estimate pi with {n} throws.")
|
||||
# Create the proxy in a nice way so it gets closed when we are done.
|
||||
async with ServerProxy('http://localhost:9000') as proxy:
|
||||
pi_remote = await proxy.estimate_pi(n)
|
||||
# print(f"Result of remote estimation: pi={pi_remote:.010f}")
|
||||
print(pi_remote)
|
||||
return pi_remote
|
||||
|
||||
|
||||
# Asynchronous call to ourselves
|
||||
async def local_estimate(n):
|
||||
print(f"Master begins estimating pi with {n} throws.")
|
||||
pi_local = await estimate_pi_async(n)
|
||||
print(f"Result of local estimation: pi={pi_local:.010f}")
|
||||
return pi_local
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Ensure we got enough arguments coming in
|
||||
assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \
|
||||
"Usage: rpc_sync_pi_master.py N [argument2 ...]"
|
||||
# Split incoming arguments into the number of throws to use.
|
||||
# Note that sys.argv[0] is the name of the script itself.
|
||||
scriptname, N, *arguments = sys.argv
|
||||
|
||||
# split the workload between ourselves and the remote
|
||||
# note: // is integer division
|
||||
N = int(N)
|
||||
N_remote = N // 2
|
||||
N_local = N - N_remote
|
||||
start_time = time()
|
||||
|
||||
# ASYNC MAGIC BEGIN
|
||||
# Gather up all tasks we have to do, and tell the event loop to
|
||||
# run until they are complete.
|
||||
futures = asyncio.gather(remote_estimate(N_remote), local_estimate(N_local))
|
||||
loop = asyncio.get_event_loop()
|
||||
results = loop.run_until_complete(futures)
|
||||
# ASYNC MAGIC END
|
||||
|
||||
pi_remote, pi_local = results
|
||||
|
||||
pi_m = mean([pi_remote, pi_local])
|
||||
print(f"Mean estimation result: pi ={pi_m:.010f}")
|
||||
print(f"Relative error: {100*(pi_m/pi - 1):.010f}%")
|
||||
print(f"Total time to execute: {time() - start_time} sec")
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
import logging
|
||||
from xmlrpc.server import SimpleXMLRPCServer
|
||||
from util import estimate_pi
|
||||
|
||||
|
||||
def local_estimate_pi(n, *args):
|
||||
logging.debug(f'Call received: estimate_pi({n!r})')
|
||||
return estimate_pi(n)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True)
|
||||
# Register the function we are serving
|
||||
server.register_function(local_estimate_pi, 'estimate_pi')
|
||||
try:
|
||||
print("Use Control-C to exit")
|
||||
# Start serving our functions
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("Exiting")
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
import sys
|
||||
from util import estimate_pi
|
||||
from statistics import mean
|
||||
from math import pi
|
||||
from time import time
|
||||
# Import the synchronous version of ServerProxy
|
||||
from xmlrpc.client import ServerProxy
|
||||
|
||||
|
||||
# Create the proxy in a nice way so it gets closed when we are done.
|
||||
with ServerProxy('http://localhost:9000') as proxy:
|
||||
# Ensure we got enough arguments coming in
|
||||
assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \
|
||||
"Usage: rpc_sync_pi_master.py N [argument2 ...]"
|
||||
# Split incoming arguments into the number of throws to use.
|
||||
# Note that sys.argv[0] is the name of the script itself.
|
||||
scriptname, N, *arguments = sys.argv
|
||||
|
||||
# split the workload between ourselves and the remote
|
||||
# note: // is integer division
|
||||
N = int(N)
|
||||
N_remote = N // 2
|
||||
N_local = N - N_remote
|
||||
start_time = time()
|
||||
|
||||
print(f"Requesting that slave estimate pi with {N_remote} throws.")
|
||||
pi_remote = proxy.estimate_pi(N_remote)
|
||||
print(f"Result of remote estimation: pi={pi_remote:.010f}")
|
||||
print(f"Master begins estimating pi with {N_local} throws.")
|
||||
pi_local = estimate_pi(N_local)
|
||||
print(f"Result of local estimation: pi={pi_local:.010f}")
|
||||
pi_m = mean([pi_remote, pi_local])
|
||||
print(f"Mean estimation result: pi ={pi_m:.010f}")
|
||||
print(f"Relative error: {100*(pi_m/pi - 1):.010f}%")
|
||||
print(f"Total time to execute: {time() - start_time} sec")
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
import asyncio
|
||||
from random import random
|
||||
from itertools import chain, islice
|
||||
|
||||
|
||||
def get_chunks_it(l, n):
|
||||
""" Chunks an iterator `l` in size `n`
|
||||
Args:
|
||||
l (Iterator[Any]): an iterator
|
||||
n (int): size of
|
||||
Returns:
|
||||
Generator[Any]
|
||||
"""
|
||||
iterator = iter(l)
|
||||
for first in iterator:
|
||||
yield chain([first], islice(iterator, n - 1))
|
||||
|
||||
|
||||
def estimate_pi(n):
|
||||
"""
|
||||
Estimates pi by throwing a point (x,y) randomly *n* times in
|
||||
the unit square and counting the number of hits where
|
||||
x^2 + Y^2 <= 1.
|
||||
Pi is then approximated as 4 * no. hits / n.
|
||||
input:
|
||||
*n* (int): The number of times to throw the point
|
||||
output:
|
||||
*estimate* (float): The estimate for pi found here
|
||||
"""
|
||||
hits = sum(int(random()**2 + random()**2 <= 1) for _ in range(n))
|
||||
estimate = 4 * hits / n
|
||||
return estimate
|
||||
|
||||
|
||||
async def estimate_pi_async(n):
|
||||
"""
|
||||
Estimates pi by throwing a point (x,y) randomly *n* times in
|
||||
the unit square and counting the number of hits where
|
||||
x^2 + Y^2 <= 1.
|
||||
Pi is then approximated as 4 * no. hits / n.
|
||||
input:
|
||||
*n* (int): The number of times to throw the point
|
||||
output:
|
||||
*estimate* (float): The estimate for pi found here
|
||||
|
||||
**Note:**
|
||||
This is an asynchronous implementation that throws
|
||||
100 points before awaiting to relinquish control.
|
||||
"""
|
||||
hits = 0
|
||||
for chunk in get_chunks_it(range(n), 100):
|
||||
await asyncio.sleep(0) # Relinquish control so something else can run
|
||||
hits += sum(int(random()**2 + random()**2 <= 1) for _ in chunk)
|
||||
estimate = 4 * hits / n
|
||||
return estimate
|
||||
Loading…
Reference in New Issue