Compare commits

..

4 Commits

Author SHA1 Message Date
DBras 7685c36677 step 2.3: multiple topics 2024-06-13 11:29:57 +02:00
DBras 1044d6224a step 1.5: abandon due to differing async versions 2024-06-13 11:19:23 +02:00
DBras 9aec9332fc step 1.2: allcaps 2024-06-13 11:02:46 +02:00
DBras e00a4587f9 add task files 2024-06-13 10:52:45 +02:00
20 changed files with 748 additions and 0 deletions

38
Readme.md Normal file
View File

@ -0,0 +1,38 @@
# Communication pattern exercises
These scripts were developed for the courses 31380 and 31725 by
- Lasse Orda (Initial implementation)
- Tue Vissing Jensen (Updates, maintainer)
## Included scripts
### RPC
`rpc_example_runner.py text1 text2 text3 text4` ---
Requires that `rpc/rpc_example_server.py` is running.
Returns a the list of text strings given, but in reversed order.
`rpc_sync_pi_runner.py N` ---
Requires that `rpc/rpc_pi_server.py` is running.
Estimate pi by throwing N points in the unit circle, with the server taking over half the work.
`rpc_async_pi_runner.py N` ---
Requires that `rpc/rpc_pi_server.py` is running.
Estimate pi by throwing N points in the unit circle, with the server taking over half the work simultaneously.
### Pub/Sub
`pub_server.py` ---
A server which periodically publishes the current time.
`sub_client.py` ---
Subscribes to the server's messages and prints them. Exits after 5 messages.
### Broadcast
`broadcast_receiver.py`
`broadcast_listener.py`
`broadcaster.py` ---
Periodically broadcasts

View File

@ -0,0 +1,43 @@
"""
Script which listens for messages on a hardcoded port 8881
@Author: orda
"""
import socket
import sys
from parse import parse
def main():
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
else:
port = 8881
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
my_socket.bind(('', port))
formatting_string = "Today's lottery number: {number}"
print('listener started...')
try:
while True:
message, address = my_socket.recvfrom(port)
dmessage = message.decode('utf-8') # Decode to utf-8
print(f'received: {dmessage}, from: {address}')
# decoded will be a pair of a tuple and a dictionary which reflect the
# "reverse" of using .format on the first string.
decoded = parse(formatting_string, dmessage)
if decoded:
print(f' Decoded into: {decoded.named}')
print(f' Check that the string matches: {formatting_string.format(*decoded.fixed, **decoded.named)}')
finally:
my_socket.close()
if __name__ == "__main__":
main()

38
broadcast/broadcaster.py Normal file
View File

@ -0,0 +1,38 @@
"""
Script which broadcasts random integers to a hardcoded port 8881
@Author: orda
"""
import socket
import random
import time
import sys
def main():
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
else:
port = 8881
my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# Allow reuse in case we exited ungracefully
my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
print('broadcaster started...')
try:
while True:
# Make a random number and send it out.
number = random.randint(1, 101)
print("sending: ", number)
my_socket.sendto(f"Today's lottery number: {number}".encode('utf-8'), ('<broadcast>', 8881))
time.sleep(1)
finally:
my_socket.close()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,17 @@
import zmq
ip = "localhost"
heartbeat_publish_port = 10002
# Get a context we can use to make sockets
context = zmq.Context()
# Socket to talk to server
socket = context.socket(zmq.SUB)
socket.connect(f"tcp://{ip}:{heartbeat_publish_port}")
topicfilter = "HEARTBEAT"
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
while True:
print(socket.recv_string())

View File

@ -0,0 +1,31 @@
import zmq
from xmlrpc.server import SimpleXMLRPCServer
# Other connect on the server port.
heartbeat_server_port = 10001
heartbeat_publish_port = 10002
# Make a context which we use to make sockets from
context = zmq.Context()
# Make a new socket. We want to publish on this socket.
socket = context.socket(zmq.PUB)
# Bind the socket (inside our program) to a port (on our machine)
# We can now send messages
socket.bind(f"tcp://*:{heartbeat_publish_port}")
# Make a function for others to call letting us know they are alive.
def send_heartbeat(sender: str):
print(f"Received heartbeat from {sender}")
# Publish who is alive now
socket.send_string(f"HEARTBEAT;{sender}")
# Return something just to show we succeeded
return 0
# Make an RPC server to serve that function to others
server = SimpleXMLRPCServer(('localhost', heartbeat_server_port))
# Register the function
server.register_function(send_heartbeat, 'send_heartbeat')
# Start up the server
server.serve_forever()

View File

@ -0,0 +1,11 @@
from xmlrpc.client import ServerProxy
from time import sleep
heartbeat_server_port = 10001
with ServerProxy(f'http://localhost:{heartbeat_server_port}') as proxy:
# Periodically send a heartbeat
while True:
proxy.send_heartbeat('BATTERY')
sleep(1)

32
pubsub/pub_server.py Normal file
View File

@ -0,0 +1,32 @@
import zmq
import sys
import time
import random
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
# Make a context which we use to make sockets from
context = zmq.Context()
# Make a new socket. We want to publish on this socket.
socket = context.socket(zmq.PUB)
# Bind the socket (inside our program) to a port (on our machine)
# We can now send messages
socket.bind(f"tcp://*:{port}")
topics = ('TIME', 'RANDOM')
messages = {}
while True:
# Time to publish the latest time!
messages['TIME'] = time.ctime()
messages['RANDOM'] = random.randint(1,10)
# Note the use of XXX_string here;
# the non-_stringy methods only work with bytes.
for topic in topics:
message = messages.get(topic, '')
if not message: continue
socket.send_string(f"{topic};{message}")
print(f"Published topic {topic}: {message}")
time.sleep(1)

View File

@ -0,0 +1,47 @@
import zmq
import sys
import time
from zmq.utils.monitor import recv_monitor_message
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind(f"tcp://*:{port}")
# Get a monitoring socket where we can sniff information about new subscribers.
monitor = socket.get_monitor_socket()
sub_list = set()
topic = "TIME"
while True:
# Run through monitoring messages and check if we have new subscribers
# Note you can delete this entire
while True:
try:
# We include a NOBLOCK flag here to not hang until a status message comes in.
# If no messages are ready, zmq.Again will be raised, which we catch below.
status = recv_monitor_message(monitor, flags=zmq.NOBLOCK)
print(f"Status: {status}")
if status['event'] == zmq.EVENT_ACCEPTED:
# New subscriber, add them to our list of subscribers.
print(f"Subscriber '{status['value']}' has joined :D")
sub_list.add(status['value'])
if status['event'] == zmq.EVENT_DISCONNECTED:
# Someone left, remove them from our list.
print(f"Subscriber '{status['value']}' has left :(")
sub_list.remove(status['value'])
except zmq.Again as e:
# No more new subscribers - let's stop looking for them
break
# Time to publish the latest time!
messagedata = time.ctime()
# Note the use of XXX_string here;
# the non-_string-y methods only work with bytes.
socket.send_string(f"{topic};{messagedata}")
print(f"Published topic {topic}: {messagedata} to subscribers: {sub_list}")
time.sleep(1)

31
pubsub/sub_client.py Normal file
View File

@ -0,0 +1,31 @@
import sys
import zmq
ip = "localhost"
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print(f"Collecting updates from time server at tcp://localhost:{port}")
socket.connect(f"tcp://{ip}:{port}")
# Filter by topic
topicfilters = ("TIME", "RANDOM")
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilters[0])
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilters[1])
# Process 5 updates
topic_list = []
for update_nbr in range(5):
string = socket.recv_string()
topic, messagedata = string.split(';')
topic_list.append(messagedata)
print(f"Received on topic {topic}: {messagedata}")
socket.close()
t_str = "\n".join(topic_list)
print(f"All the times we received: \n{t_str}")

View File

@ -0,0 +1,50 @@
import sys
import zmq
from time import sleep
"""
This version of the subscriber doesn't hang
"""
ip = "localhost"
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
# Make a context we can use to get a socket
context = zmq.Context()
# Grab a socket that can connect to the server
socket = context.socket(zmq.SUB)
# Connect to the server by saying where we can get our measurements from
print(f"Collecting updates from time server at tcp://localhost:{port}")
socket.connect(f"tcp://{ip}:{port}")
# Filter by topic - we are only interested in knowing about the time
topicfilter = "TIME"
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
# Process 5 updates
topic_list = []
# We need to keep track of how many we've received
# since the loop may end with no messages
try:
while len(topic_list) < 5:
try: # Need to use a try block since zmq uses a
# giving the flag NOBLOCK indicates to zmq that we don't want to
# hang waiting for a message.
string = socket.recv_string(zmq.NOBLOCK)
except zmq.Again:
print("No message this time :(")
continue
topic, messagedata = string.split(';')
topic_list.append(messagedata)
print(f"Received on topic {topic}: {messagedata}")
#
finally
sleep(0.2)
socket.close()
t_str = "\n".join(topic_list)
print(f"All the times we received: \n{t_str}")

12
requirements.txt Normal file
View File

@ -0,0 +1,12 @@
aiohttp==3.9.5
aiosignal==1.3.1
attrs==23.2.0
frozenlist==1.4.1
greenlet==3.0.3
idna==3.7
msgpack==1.0.8
multidict==6.0.5
parse==1.20.2
pynvim==0.5.0
pyzmq==26.0.3
yarl==1.9.4

Binary file not shown.

Binary file not shown.

186
rpc/aioxmlrpc/client.py Normal file
View File

@ -0,0 +1,186 @@
"""
XML-RPC Client with asyncio.
This module adapt the ``xmlrpc.client`` module of the standard library to
work with asyncio.
"""
import sys
import asyncio
import logging
import aiohttp
import inspect
import functools
from xmlrpc import client as xmlrpc
def coroutine(fn):
if inspect.iscoroutinefunction(fn):
return fn
@functools.wraps(fn)
async def _wrapper(*args, **kwargs):
return fn(*args, **kwargs)
return _wrapper
__ALL__ = ['ServerProxy', 'Fault', 'ProtocolError']
# you don't have to import xmlrpc.client from your code
Fault = xmlrpc.Fault
ProtocolError = xmlrpc.ProtocolError
log = logging.getLogger(__name__)
PY35 = sys.version_info >= (3, 5)
class _Method:
# some magic to bind an XML-RPC method to an RPC server.
# supports "nested" methods (e.g. examples.getStateName)
def __init__(self, send, name):
self.__send = send
self.__name = name
def __getattr__(self, name):
return _Method(self.__send, "%s.%s" % (self.__name, name))
@coroutine
def __call__(self, *args):
ret = yield from self.__send(self.__name, args)
return ret
class AioTransport(xmlrpc.Transport):
"""
``xmlrpc.Transport`` subclass for asyncio support
"""
def __init__(self, session, use_https, *, use_datetime=False,
use_builtin_types=False, loop, headers=None, auth=None):
super().__init__(use_datetime, use_builtin_types)
self.use_https = use_https
self._loop = loop
self._session = session
self.auth = auth
if not headers:
headers = {'User-Agent': 'python/aioxmlrpc',
'Accept': 'text/xml',
'Content-Type': 'text/xml'}
self.headers = headers
@coroutine
def request(self, host, handler, request_body, verbose=False):
"""
Send the XML-RPC request, return the response.
This method is a coroutine.
"""
url = self._build_url(host, handler)
response = None
try:
response = yield from self._session.request(
'POST', url, headers=self.headers, data=request_body, auth=self.auth)
body = yield from response.text()
if response.status != 200:
raise ProtocolError(url, response.status,
body, response.headers)
except asyncio.CancelledError:
raise
except ProtocolError:
raise
except Exception as exc:
log.error('Unexpected error', exc_info=True)
if response is not None:
errcode = response.status
headers = response.headers
else:
errcode = 0
headers = {}
raise ProtocolError(url, errcode, str(exc), headers)
return self.parse_response(body)
def parse_response(self, body):
"""
Parse the xmlrpc response.
"""
p, u = self.getparser()
p.feed(body)
p.close()
return u.close()
def _build_url(self, host, handler):
"""
Build a url for our request based on the host, handler and use_http
property
"""
scheme = 'https' if self.use_https else 'http'
return '%s://%s%s' % (scheme, host, handler)
class ServerProxy(xmlrpc.ServerProxy):
"""
``xmlrpc.ServerProxy`` subclass for asyncio support
"""
def __init__(self, uri, session=None, encoding=None, verbose=False,
allow_none=False, use_datetime=False, use_builtin_types=False,
loop=None, auth=None, headers=None):
self._loop = loop or asyncio.get_event_loop()
if session:
self._session = session
self._close_session = False
else:
self._close_session = True
self._session = aiohttp.ClientSession(loop=self._loop)
transport = AioTransport(use_https=uri.startswith('https://'),
loop=self._loop,
session=self._session,
auth=auth,
headers=headers)
super().__init__(uri, transport, encoding, verbose, allow_none,
use_datetime, use_builtin_types)
@coroutine
def __request(self, methodname, params):
# call a method on the remote server
request = xmlrpc.dumps(params, methodname, encoding=self.__encoding,
allow_none=self.__allow_none).encode(self.__encoding)
response = yield from self.__transport.request(
self.__host,
self.__handler,
request,
verbose=self.__verbose
)
if len(response) == 1:
response = response[0]
return response
@coroutine
def close(self):
if self._close_session:
yield from self._session.close()
def __getattr__(self, name):
return _Method(self.__request, name)
if PY35:
@coroutine
def __aenter__(self):
return self
@coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
if self._close_session:
yield from self._session.close()

17
rpc/rpc_example_runner.py Normal file
View File

@ -0,0 +1,17 @@
from xmlrpc.client import ServerProxy
import sys
# Create the proxy in a nice way so it gets closed when we are done.
with ServerProxy('http://localhost:9000') as proxy:
# Ensure we got enough arguments coming in
assert len(sys.argv) >= 3, "Must supply at least 2 arguments.\n" + \
"Usage: rpc_sync_client.py function argument1 [argument2 ...]"
# Split incoming arguments into the name of the function to call and
# the arguments to supply to that function. Note that sys.argv[0] is
# the name of the script itself.
scriptname, function, *arguments = sys.argv
# Get the indicated remote function.
remote_function = getattr(proxy, function)
# Print the result of executing the remote function.
print(remote_function(arguments))

26
rpc/rpc_example_server.py Normal file
View File

@ -0,0 +1,26 @@
import logging
import time
from xmlrpc.server import SimpleXMLRPCServer
def reverse_list(l):
logging.debug(f'Call received: reverse_list({l!r}), calculating for 1 second')
time.sleep(1)
return l[::-1]
def allcaps_list(l):
logging.debug(f'Call received: allcaps({l!r}), calculating for 1 second')
return [i.upper() for i in l]
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True)
# Register the function we are serving
server.register_function(reverse_list, 'reverse')
server.register_function(allcaps_list, 'allcaps')
try:
print("Use Control-C to exit")
# Start serving our functions
server.serve_forever()
except KeyboardInterrupt:
print("Exiting")

View File

@ -0,0 +1,58 @@
import asyncio
import sys
from statistics import mean
from math import pi
from time import time
# Import the asynchronous version of ServerProxy
from aioxmlrpc.client import ServerProxy
# Import the asynchronous version of estimate_pi
from util import estimate_pi_async
# Asynchronous call to the slave
async def remote_estimate(n):
print(f"Requesting that slave estimate pi with {n} throws.")
# Create the proxy in a nice way so it gets closed when we are done.
async with ServerProxy('http://localhost:9000') as proxy:
pi_remote = await proxy.estimate_pi(n)
# print(f"Result of remote estimation: pi={pi_remote:.010f}")
print(pi_remote)
return pi_remote
# Asynchronous call to ourselves
async def local_estimate(n):
print(f"Master begins estimating pi with {n} throws.")
pi_local = await estimate_pi_async(n)
print(f"Result of local estimation: pi={pi_local:.010f}")
return pi_local
if __name__ == "__main__":
# Ensure we got enough arguments coming in
assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \
"Usage: rpc_sync_pi_master.py N [argument2 ...]"
# Split incoming arguments into the number of throws to use.
# Note that sys.argv[0] is the name of the script itself.
scriptname, N, *arguments = sys.argv
# split the workload between ourselves and the remote
# note: // is integer division
N = int(N)
N_remote = N // 2
N_local = N - N_remote
start_time = time()
# ASYNC MAGIC BEGIN
# Gather up all tasks we have to do, and tell the event loop to
# run until they are complete.
futures = asyncio.gather(remote_estimate(N_remote), local_estimate(N_local))
loop = asyncio.get_event_loop()
results = loop.run_until_complete(futures)
# ASYNC MAGIC END
pi_remote, pi_local = results
pi_m = mean([pi_remote, pi_local])
print(f"Mean estimation result: pi ={pi_m:.010f}")
print(f"Relative error: {100*(pi_m/pi - 1):.010f}%")
print(f"Total time to execute: {time() - start_time} sec")

21
rpc/rpc_pi_server.py Normal file
View File

@ -0,0 +1,21 @@
import logging
from xmlrpc.server import SimpleXMLRPCServer
from util import estimate_pi
def local_estimate_pi(n, *args):
logging.debug(f'Call received: estimate_pi({n!r})')
return estimate_pi(n)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True)
# Register the function we are serving
server.register_function(local_estimate_pi, 'estimate_pi')
try:
print("Use Control-C to exit")
# Start serving our functions
server.serve_forever()
except KeyboardInterrupt:
print("Exiting")

35
rpc/rpc_pi_sync_runner.py Normal file
View File

@ -0,0 +1,35 @@
import sys
from util import estimate_pi
from statistics import mean
from math import pi
from time import time
# Import the synchronous version of ServerProxy
from xmlrpc.client import ServerProxy
# Create the proxy in a nice way so it gets closed when we are done.
with ServerProxy('http://localhost:9000') as proxy:
# Ensure we got enough arguments coming in
assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \
"Usage: rpc_sync_pi_master.py N [argument2 ...]"
# Split incoming arguments into the number of throws to use.
# Note that sys.argv[0] is the name of the script itself.
scriptname, N, *arguments = sys.argv
# split the workload between ourselves and the remote
# note: // is integer division
N = int(N)
N_remote = N // 2
N_local = N - N_remote
start_time = time()
print(f"Requesting that slave estimate pi with {N_remote} throws.")
pi_remote = proxy.estimate_pi(N_remote)
print(f"Result of remote estimation: pi={pi_remote:.010f}")
print(f"Master begins estimating pi with {N_local} throws.")
pi_local = estimate_pi(N_local)
print(f"Result of local estimation: pi={pi_local:.010f}")
pi_m = mean([pi_remote, pi_local])
print(f"Mean estimation result: pi ={pi_m:.010f}")
print(f"Relative error: {100*(pi_m/pi - 1):.010f}%")
print(f"Total time to execute: {time() - start_time} sec")

55
rpc/util.py Normal file
View File

@ -0,0 +1,55 @@
import asyncio
from random import random
from itertools import chain, islice
def get_chunks_it(l, n):
""" Chunks an iterator `l` in size `n`
Args:
l (Iterator[Any]): an iterator
n (int): size of
Returns:
Generator[Any]
"""
iterator = iter(l)
for first in iterator:
yield chain([first], islice(iterator, n - 1))
def estimate_pi(n):
"""
Estimates pi by throwing a point (x,y) randomly *n* times in
the unit square and counting the number of hits where
x^2 + Y^2 <= 1.
Pi is then approximated as 4 * no. hits / n.
input:
*n* (int): The number of times to throw the point
output:
*estimate* (float): The estimate for pi found here
"""
hits = sum(int(random()**2 + random()**2 <= 1) for _ in range(n))
estimate = 4 * hits / n
return estimate
async def estimate_pi_async(n):
"""
Estimates pi by throwing a point (x,y) randomly *n* times in
the unit square and counting the number of hits where
x^2 + Y^2 <= 1.
Pi is then approximated as 4 * no. hits / n.
input:
*n* (int): The number of times to throw the point
output:
*estimate* (float): The estimate for pi found here
**Note:**
This is an asynchronous implementation that throws
100 points before awaiting to relinquish control.
"""
hits = 0
for chunk in get_chunks_it(range(n), 100):
await asyncio.sleep(0) # Relinquish control so something else can run
hits += sum(int(random()**2 + random()**2 <= 1) for _ in chunk)
estimate = 4 * hits / n
return estimate