From e00a4587f96a9153557f6054372d4804d6971b08 Mon Sep 17 00:00:00 2001 From: DBras Date: Thu, 13 Jun 2024 10:52:45 +0200 Subject: [PATCH] add task files --- Readme.md | 38 ++++ broadcast/broadcast_listener.py | 43 +++++ broadcast/broadcaster.py | 38 ++++ combinations/heartbeat_listener.py | 17 ++ combinations/heartbeat_server.py | 31 ++++ combinations/heartbeat_unit.py | 11 ++ pubsub/pub_server.py | 26 +++ pubsub/pub_server_with_join_msg.py | 47 +++++ pubsub/sub_client.py | 30 +++ ...client_with_non_blocking_subscribe_call.py | 50 +++++ requirements.txt | 12 ++ rpc/aioxmlrpc/client.py | 174 ++++++++++++++++++ rpc/rpc_example_runner.py | 17 ++ rpc/rpc_example_server.py | 22 +++ rpc/rpc_pi_async_runner.py | 57 ++++++ rpc/rpc_pi_server.py | 21 +++ rpc/rpc_pi_sync_runner.py | 35 ++++ rpc/util.py | 55 ++++++ 18 files changed, 724 insertions(+) create mode 100644 Readme.md create mode 100644 broadcast/broadcast_listener.py create mode 100644 broadcast/broadcaster.py create mode 100644 combinations/heartbeat_listener.py create mode 100644 combinations/heartbeat_server.py create mode 100644 combinations/heartbeat_unit.py create mode 100644 pubsub/pub_server.py create mode 100644 pubsub/pub_server_with_join_msg.py create mode 100644 pubsub/sub_client.py create mode 100644 pubsub/sub_client_with_non_blocking_subscribe_call.py create mode 100644 requirements.txt create mode 100644 rpc/aioxmlrpc/client.py create mode 100644 rpc/rpc_example_runner.py create mode 100644 rpc/rpc_example_server.py create mode 100644 rpc/rpc_pi_async_runner.py create mode 100644 rpc/rpc_pi_server.py create mode 100644 rpc/rpc_pi_sync_runner.py create mode 100644 rpc/util.py diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..2db653d --- /dev/null +++ b/Readme.md @@ -0,0 +1,38 @@ +# Communication pattern exercises + +These scripts were developed for the courses 31380 and 31725 by +- Lasse Orda (Initial implementation) +- Tue Vissing Jensen (Updates, maintainer) + +## Included scripts + +### RPC + +`rpc_example_runner.py text1 text2 text3 text4` --- +Requires that `rpc/rpc_example_server.py` is running. +Returns a the list of text strings given, but in reversed order. + +`rpc_sync_pi_runner.py N` --- +Requires that `rpc/rpc_pi_server.py` is running. +Estimate pi by throwing N points in the unit circle, with the server taking over half the work. + +`rpc_async_pi_runner.py N` --- +Requires that `rpc/rpc_pi_server.py` is running. +Estimate pi by throwing N points in the unit circle, with the server taking over half the work simultaneously. + +### Pub/Sub + +`pub_server.py` --- +A server which periodically publishes the current time. + +`sub_client.py` --- +Subscribes to the server's messages and prints them. Exits after 5 messages. + +### Broadcast + +`broadcast_receiver.py` + +`broadcast_listener.py` + +`broadcaster.py` --- +Periodically broadcasts \ No newline at end of file diff --git a/broadcast/broadcast_listener.py b/broadcast/broadcast_listener.py new file mode 100644 index 0000000..c560167 --- /dev/null +++ b/broadcast/broadcast_listener.py @@ -0,0 +1,43 @@ +""" + Script which listens for messages on a hardcoded port 8881 + + @Author: orda +""" +import socket +import sys +from parse import parse + + +def main(): + if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + else: + port = 8881 + + my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + my_socket.bind(('', port)) + + formatting_string = "Today's lottery number: {number}" + + print('listener started...') + + try: + while True: + message, address = my_socket.recvfrom(port) + dmessage = message.decode('utf-8') # Decode to utf-8 + print(f'received: {dmessage}, from: {address}') + # decoded will be a pair of a tuple and a dictionary which reflect the + # "reverse" of using .format on the first string. + decoded = parse(formatting_string, dmessage) + if decoded: + print(f' Decoded into: {decoded.named}') + print(f' Check that the string matches: {formatting_string.format(*decoded.fixed, **decoded.named)}') + finally: + my_socket.close() + + +if __name__ == "__main__": + main() diff --git a/broadcast/broadcaster.py b/broadcast/broadcaster.py new file mode 100644 index 0000000..e64167a --- /dev/null +++ b/broadcast/broadcaster.py @@ -0,0 +1,38 @@ +""" + Script which broadcasts random integers to a hardcoded port 8881 + + @Author: orda +""" + +import socket +import random +import time +import sys + + +def main(): + if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + else: + port = 8881 + + my_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + # Allow reuse in case we exited ungracefully + my_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + print('broadcaster started...') + + try: + while True: + # Make a random number and send it out. + number = random.randint(1, 101) + print("sending: ", number) + my_socket.sendto(f"Today's lottery number: {number}".encode('utf-8'), ('', 8881)) + time.sleep(1) + finally: + my_socket.close() + + +if __name__ == "__main__": + main() diff --git a/combinations/heartbeat_listener.py b/combinations/heartbeat_listener.py new file mode 100644 index 0000000..5c93f44 --- /dev/null +++ b/combinations/heartbeat_listener.py @@ -0,0 +1,17 @@ +import zmq + +ip = "localhost" +heartbeat_publish_port = 10002 + + +# Get a context we can use to make sockets +context = zmq.Context() +# Socket to talk to server +socket = context.socket(zmq.SUB) +socket.connect(f"tcp://{ip}:{heartbeat_publish_port}") + +topicfilter = "HEARTBEAT" +socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) + +while True: + print(socket.recv_string()) diff --git a/combinations/heartbeat_server.py b/combinations/heartbeat_server.py new file mode 100644 index 0000000..d058598 --- /dev/null +++ b/combinations/heartbeat_server.py @@ -0,0 +1,31 @@ +import zmq +from xmlrpc.server import SimpleXMLRPCServer + +# Other connect on the server port. +heartbeat_server_port = 10001 +heartbeat_publish_port = 10002 + +# Make a context which we use to make sockets from +context = zmq.Context() +# Make a new socket. We want to publish on this socket. +socket = context.socket(zmq.PUB) +# Bind the socket (inside our program) to a port (on our machine) +# We can now send messages +socket.bind(f"tcp://*:{heartbeat_publish_port}") + + +# Make a function for others to call letting us know they are alive. +def send_heartbeat(sender: str): + print(f"Received heartbeat from {sender}") + # Publish who is alive now + socket.send_string(f"HEARTBEAT;{sender}") + # Return something just to show we succeeded + return 0 + + +# Make an RPC server to serve that function to others +server = SimpleXMLRPCServer(('localhost', heartbeat_server_port)) +# Register the function +server.register_function(send_heartbeat, 'send_heartbeat') +# Start up the server +server.serve_forever() diff --git a/combinations/heartbeat_unit.py b/combinations/heartbeat_unit.py new file mode 100644 index 0000000..e3679b5 --- /dev/null +++ b/combinations/heartbeat_unit.py @@ -0,0 +1,11 @@ +from xmlrpc.client import ServerProxy +from time import sleep + +heartbeat_server_port = 10001 + + +with ServerProxy(f'http://localhost:{heartbeat_server_port}') as proxy: + # Periodically send a heartbeat + while True: + proxy.send_heartbeat('BATTERY') + sleep(1) diff --git a/pubsub/pub_server.py b/pubsub/pub_server.py new file mode 100644 index 0000000..fb096fe --- /dev/null +++ b/pubsub/pub_server.py @@ -0,0 +1,26 @@ +import zmq +import sys +import time + +port = "5556" +if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + +# Make a context which we use to make sockets from +context = zmq.Context() +# Make a new socket. We want to publish on this socket. +socket = context.socket(zmq.PUB) +# Bind the socket (inside our program) to a port (on our machine) +# We can now send messages +socket.bind(f"tcp://*:{port}") + +topic = "TIME" +while True: + # Time to publish the latest time! + messagedata = time.ctime() + # Note the use of XXX_string here; + # the non-_stringy methods only work with bytes. + socket.send_string(f"{topic};{messagedata}") + print(f"Published topic {topic}: {messagedata}") + time.sleep(1) diff --git a/pubsub/pub_server_with_join_msg.py b/pubsub/pub_server_with_join_msg.py new file mode 100644 index 0000000..4c1d5f1 --- /dev/null +++ b/pubsub/pub_server_with_join_msg.py @@ -0,0 +1,47 @@ +import zmq +import sys +import time +from zmq.utils.monitor import recv_monitor_message + +port = "5556" +if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + +context = zmq.Context() +socket = context.socket(zmq.PUB) +socket.bind(f"tcp://*:{port}") + +# Get a monitoring socket where we can sniff information about new subscribers. +monitor = socket.get_monitor_socket() + +sub_list = set() + +topic = "TIME" +while True: + # Run through monitoring messages and check if we have new subscribers + # Note you can delete this entire + while True: + try: + # We include a NOBLOCK flag here to not hang until a status message comes in. + # If no messages are ready, zmq.Again will be raised, which we catch below. + status = recv_monitor_message(monitor, flags=zmq.NOBLOCK) + print(f"Status: {status}") + if status['event'] == zmq.EVENT_ACCEPTED: + # New subscriber, add them to our list of subscribers. + print(f"Subscriber '{status['value']}' has joined :D") + sub_list.add(status['value']) + if status['event'] == zmq.EVENT_DISCONNECTED: + # Someone left, remove them from our list. + print(f"Subscriber '{status['value']}' has left :(") + sub_list.remove(status['value']) + except zmq.Again as e: + # No more new subscribers - let's stop looking for them + break + # Time to publish the latest time! + messagedata = time.ctime() + # Note the use of XXX_string here; + # the non-_string-y methods only work with bytes. + socket.send_string(f"{topic};{messagedata}") + print(f"Published topic {topic}: {messagedata} to subscribers: {sub_list}") + time.sleep(1) diff --git a/pubsub/sub_client.py b/pubsub/sub_client.py new file mode 100644 index 0000000..a3ce0a9 --- /dev/null +++ b/pubsub/sub_client.py @@ -0,0 +1,30 @@ +import sys +import zmq + +ip = "localhost" +port = "5556" +if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + +# Socket to talk to server +context = zmq.Context() +socket = context.socket(zmq.SUB) + +print(f"Collecting updates from time server at tcp://localhost:{port}") +socket.connect(f"tcp://{ip}:{port}") + +# Filter by topic +topicfilter = "TIME" +socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) + +# Process 5 updates +topic_list = [] +for update_nbr in range(5): + string = socket.recv_string() + topic, messagedata = string.split(';') + topic_list.append(messagedata) + print(f"Received on topic {topic}: {messagedata}") +socket.close() +t_str = "\n".join(topic_list) +print(f"All the times we received: \n{t_str}") diff --git a/pubsub/sub_client_with_non_blocking_subscribe_call.py b/pubsub/sub_client_with_non_blocking_subscribe_call.py new file mode 100644 index 0000000..cce62ae --- /dev/null +++ b/pubsub/sub_client_with_non_blocking_subscribe_call.py @@ -0,0 +1,50 @@ +import sys +import zmq +from time import sleep + +""" + This version of the subscriber doesn't hang +""" + + +ip = "localhost" +port = "5556" +if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + +# Make a context we can use to get a socket +context = zmq.Context() +# Grab a socket that can connect to the server +socket = context.socket(zmq.SUB) + +# Connect to the server by saying where we can get our measurements from +print(f"Collecting updates from time server at tcp://localhost:{port}") +socket.connect(f"tcp://{ip}:{port}") + +# Filter by topic - we are only interested in knowing about the time +topicfilter = "TIME" +socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) + +# Process 5 updates +topic_list = [] +# We need to keep track of how many we've received +# since the loop may end with no messages +try: + while len(topic_list) < 5: + try: # Need to use a try block since zmq uses a + # giving the flag NOBLOCK indicates to zmq that we don't want to + # hang waiting for a message. + string = socket.recv_string(zmq.NOBLOCK) + except zmq.Again: + print("No message this time :(") + continue + topic, messagedata = string.split(';') + topic_list.append(messagedata) + print(f"Received on topic {topic}: {messagedata}") + # +finally + sleep(0.2) +socket.close() +t_str = "\n".join(topic_list) +print(f"All the times we received: \n{t_str}") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9bf2d48 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +aiohttp==3.9.5 +aiosignal==1.3.1 +attrs==23.2.0 +frozenlist==1.4.1 +greenlet==3.0.3 +idna==3.7 +msgpack==1.0.8 +multidict==6.0.5 +parse==1.20.2 +pynvim==0.5.0 +pyzmq==26.0.3 +yarl==1.9.4 diff --git a/rpc/aioxmlrpc/client.py b/rpc/aioxmlrpc/client.py new file mode 100644 index 0000000..e612cf9 --- /dev/null +++ b/rpc/aioxmlrpc/client.py @@ -0,0 +1,174 @@ +""" +XML-RPC Client with asyncio. + +This module adapt the ``xmlrpc.client`` module of the standard library to +work with asyncio. + +""" + +import sys +import asyncio +import logging +import aiohttp + +from xmlrpc import client as xmlrpc + + +__ALL__ = ['ServerProxy', 'Fault', 'ProtocolError'] + +# you don't have to import xmlrpc.client from your code +Fault = xmlrpc.Fault +ProtocolError = xmlrpc.ProtocolError + +log = logging.getLogger(__name__) +PY35 = sys.version_info >= (3, 5) + + +class _Method: + # some magic to bind an XML-RPC method to an RPC server. + # supports "nested" methods (e.g. examples.getStateName) + def __init__(self, send, name): + self.__send = send + self.__name = name + + def __getattr__(self, name): + return _Method(self.__send, "%s.%s" % (self.__name, name)) + + @asyncio.coroutine + def __call__(self, *args): + ret = yield from self.__send(self.__name, args) + return ret + + +class AioTransport(xmlrpc.Transport): + """ + ``xmlrpc.Transport`` subclass for asyncio support + """ + + def __init__(self, session, use_https, *, use_datetime=False, + use_builtin_types=False, loop, headers=None, auth=None): + super().__init__(use_datetime, use_builtin_types) + self.use_https = use_https + self._loop = loop + self._session = session + + self.auth = auth + + if not headers: + headers = {'User-Agent': 'python/aioxmlrpc', + 'Accept': 'text/xml', + 'Content-Type': 'text/xml'} + + self.headers = headers + + @asyncio.coroutine + def request(self, host, handler, request_body, verbose=False): + """ + Send the XML-RPC request, return the response. + This method is a coroutine. + """ + url = self._build_url(host, handler) + response = None + try: + response = yield from self._session.request( + 'POST', url, headers=self.headers, data=request_body, auth=self.auth) + body = yield from response.text() + if response.status != 200: + raise ProtocolError(url, response.status, + body, response.headers) + except asyncio.CancelledError: + raise + except ProtocolError: + raise + except Exception as exc: + log.error('Unexpected error', exc_info=True) + if response is not None: + errcode = response.status + headers = response.headers + else: + errcode = 0 + headers = {} + + raise ProtocolError(url, errcode, str(exc), headers) + return self.parse_response(body) + + def parse_response(self, body): + """ + Parse the xmlrpc response. + """ + p, u = self.getparser() + p.feed(body) + p.close() + return u.close() + + def _build_url(self, host, handler): + """ + Build a url for our request based on the host, handler and use_http + property + """ + scheme = 'https' if self.use_https else 'http' + return '%s://%s%s' % (scheme, host, handler) + + +class ServerProxy(xmlrpc.ServerProxy): + """ + ``xmlrpc.ServerProxy`` subclass for asyncio support + """ + + def __init__(self, uri, session=None, encoding=None, verbose=False, + allow_none=False, use_datetime=False, use_builtin_types=False, + loop=None, auth=None, headers=None): + self._loop = loop or asyncio.get_event_loop() + + if session: + self._session = session + self._close_session = False + else: + self._close_session = True + self._session = aiohttp.ClientSession(loop=self._loop) + + transport = AioTransport(use_https=uri.startswith('https://'), + loop=self._loop, + session=self._session, + auth=auth, + headers=headers) + + super().__init__(uri, transport, encoding, verbose, allow_none, + use_datetime, use_builtin_types) + + @asyncio.coroutine + def __request(self, methodname, params): + # call a method on the remote server + request = xmlrpc.dumps(params, methodname, encoding=self.__encoding, + allow_none=self.__allow_none).encode(self.__encoding) + + response = yield from self.__transport.request( + self.__host, + self.__handler, + request, + verbose=self.__verbose + ) + + if len(response) == 1: + response = response[0] + + return response + + @asyncio.coroutine + def close(self): + if self._close_session: + yield from self._session.close() + + def __getattr__(self, name): + return _Method(self.__request, name) + + if PY35: + + @asyncio.coroutine + def __aenter__(self): + return self + + @asyncio.coroutine + def __aexit__(self, exc_type, exc_val, exc_tb): + if self._close_session: + yield from self._session.close() diff --git a/rpc/rpc_example_runner.py b/rpc/rpc_example_runner.py new file mode 100644 index 0000000..2d17881 --- /dev/null +++ b/rpc/rpc_example_runner.py @@ -0,0 +1,17 @@ +from xmlrpc.client import ServerProxy +import sys + + +# Create the proxy in a nice way so it gets closed when we are done. +with ServerProxy('http://localhost:9000') as proxy: + # Ensure we got enough arguments coming in + assert len(sys.argv) >= 3, "Must supply at least 2 arguments.\n" + \ + "Usage: rpc_sync_client.py function argument1 [argument2 ...]" + # Split incoming arguments into the name of the function to call and + # the arguments to supply to that function. Note that sys.argv[0] is + # the name of the script itself. + scriptname, function, *arguments = sys.argv + # Get the indicated remote function. + remote_function = getattr(proxy, function) + # Print the result of executing the remote function. + print(remote_function(arguments)) diff --git a/rpc/rpc_example_server.py b/rpc/rpc_example_server.py new file mode 100644 index 0000000..481e2a4 --- /dev/null +++ b/rpc/rpc_example_server.py @@ -0,0 +1,22 @@ +import logging +import time +from xmlrpc.server import SimpleXMLRPCServer + + +def reverse_list(l): + logging.debug(f'Call received: reverse_list({l!r}), calculating for 1 second') + time.sleep(1) + return l[::-1] + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True) + # Register the function we are serving + server.register_function(reverse_list, 'reverse') + try: + print("Use Control-C to exit") + # Start serving our functions + server.serve_forever() + except KeyboardInterrupt: + print("Exiting") diff --git a/rpc/rpc_pi_async_runner.py b/rpc/rpc_pi_async_runner.py new file mode 100644 index 0000000..c1b0a3a --- /dev/null +++ b/rpc/rpc_pi_async_runner.py @@ -0,0 +1,57 @@ +import asyncio +import sys +from statistics import mean +from math import pi +from time import time +# Import the asynchronous version of ServerProxy +from aioxmlrpc.client import ServerProxy +# Import the asynchronous version of estimate_pi +from util import estimate_pi_async + + +# Asynchronous call to the slave +async def remote_estimate(n): + print(f"Requesting that slave estimate pi with {n} throws.") + # Create the proxy in a nice way so it gets closed when we are done. + async with ServerProxy('http://localhost:9000') as proxy: + pi_remote = await proxy.estimate_pi(n) + print(f"Result of remote estimation: pi={pi_remote:.010f}") + return pi_remote + + +# Asynchronous call to ourselves +async def local_estimate(n): + print(f"Master begins estimating pi with {n} throws.") + pi_local = await estimate_pi_async(n) + print(f"Result of local estimation: pi={pi_local:.010f}") + return pi_local + +if __name__ == "__main__": + # Ensure we got enough arguments coming in + assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \ + "Usage: rpc_sync_pi_master.py N [argument2 ...]" + # Split incoming arguments into the number of throws to use. + # Note that sys.argv[0] is the name of the script itself. + scriptname, N, *arguments = sys.argv + + # split the workload between ourselves and the remote + # note: // is integer division + N = int(N) + N_remote = N // 2 + N_local = N - N_remote + start_time = time() + + # ASYNC MAGIC BEGIN + # Gather up all tasks we have to do, and tell the event loop to + # run until they are complete. + futures = asyncio.gather(remote_estimate(N_remote), local_estimate(N_local)) + loop = asyncio.get_event_loop() + results = loop.run_until_complete(futures) + # ASYNC MAGIC END + + pi_remote, pi_local = results + + pi_m = mean([pi_remote, pi_local]) + print(f"Mean estimation result: pi ={pi_m:.010f}") + print(f"Relative error: {100*(pi_m/pi - 1):.010f}%") + print(f"Total time to execute: {time() - start_time} sec") diff --git a/rpc/rpc_pi_server.py b/rpc/rpc_pi_server.py new file mode 100644 index 0000000..79ba186 --- /dev/null +++ b/rpc/rpc_pi_server.py @@ -0,0 +1,21 @@ +import logging +from xmlrpc.server import SimpleXMLRPCServer +from util import estimate_pi + + +def local_estimate_pi(n, *args): + logging.debug(f'Call received: estimate_pi({n!r})') + return estimate_pi(n) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + server = SimpleXMLRPCServer(('localhost', 9000), logRequests=True) + # Register the function we are serving + server.register_function(local_estimate_pi, 'estimate_pi') + try: + print("Use Control-C to exit") + # Start serving our functions + server.serve_forever() + except KeyboardInterrupt: + print("Exiting") diff --git a/rpc/rpc_pi_sync_runner.py b/rpc/rpc_pi_sync_runner.py new file mode 100644 index 0000000..f4e7e19 --- /dev/null +++ b/rpc/rpc_pi_sync_runner.py @@ -0,0 +1,35 @@ +import sys +from util import estimate_pi +from statistics import mean +from math import pi +from time import time +# Import the synchronous version of ServerProxy +from xmlrpc.client import ServerProxy + + +# Create the proxy in a nice way so it gets closed when we are done. +with ServerProxy('http://localhost:9000') as proxy: + # Ensure we got enough arguments coming in + assert len(sys.argv) >= 2, "Must supply at least 1 argument.\n" + \ + "Usage: rpc_sync_pi_master.py N [argument2 ...]" + # Split incoming arguments into the number of throws to use. + # Note that sys.argv[0] is the name of the script itself. + scriptname, N, *arguments = sys.argv + + # split the workload between ourselves and the remote + # note: // is integer division + N = int(N) + N_remote = N // 2 + N_local = N - N_remote + start_time = time() + + print(f"Requesting that slave estimate pi with {N_remote} throws.") + pi_remote = proxy.estimate_pi(N_remote) + print(f"Result of remote estimation: pi={pi_remote:.010f}") + print(f"Master begins estimating pi with {N_local} throws.") + pi_local = estimate_pi(N_local) + print(f"Result of local estimation: pi={pi_local:.010f}") + pi_m = mean([pi_remote, pi_local]) + print(f"Mean estimation result: pi ={pi_m:.010f}") + print(f"Relative error: {100*(pi_m/pi - 1):.010f}%") + print(f"Total time to execute: {time() - start_time} sec") diff --git a/rpc/util.py b/rpc/util.py new file mode 100644 index 0000000..3165c17 --- /dev/null +++ b/rpc/util.py @@ -0,0 +1,55 @@ +import asyncio +from random import random +from itertools import chain, islice + + +def get_chunks_it(l, n): + """ Chunks an iterator `l` in size `n` + Args: + l (Iterator[Any]): an iterator + n (int): size of + Returns: + Generator[Any] + """ + iterator = iter(l) + for first in iterator: + yield chain([first], islice(iterator, n - 1)) + + +def estimate_pi(n): + """ + Estimates pi by throwing a point (x,y) randomly *n* times in + the unit square and counting the number of hits where + x^2 + Y^2 <= 1. + Pi is then approximated as 4 * no. hits / n. + input: + *n* (int): The number of times to throw the point + output: + *estimate* (float): The estimate for pi found here + """ + hits = sum(int(random()**2 + random()**2 <= 1) for _ in range(n)) + estimate = 4 * hits / n + return estimate + + +async def estimate_pi_async(n): + """ + Estimates pi by throwing a point (x,y) randomly *n* times in + the unit square and counting the number of hits where + x^2 + Y^2 <= 1. + Pi is then approximated as 4 * no. hits / n. + input: + *n* (int): The number of times to throw the point + output: + *estimate* (float): The estimate for pi found here + + **Note:** + This is an asynchronous implementation that throws + 100 points before awaiting to relinquish control. + """ + hits = 0 + for chunk in get_chunks_it(range(n), 100): + await asyncio.sleep(0) # Relinquish control so something else can run + hits += sum(int(random()**2 + random()**2 <= 1) for _ in chunk) + estimate = 4 * hits / n + return estimate