import syslab from time import sleep, time from dataclasses import dataclass from util import clamp, cast_to_cm import sys import zmq import logging # Make a context that we use to set up sockets context = zmq.Context() # Set up a socket to subscribe to the tester reference_in_socket = context.socket(zmq.SUB) reference_in_socket.connect(f"tcp://localhost:5000") # Ensure we only see message on the setpoint reference reference_in_socket.subscribe("setpoint_reference") # Used to log the incoming reference setpoints logging.basicConfig(level=logging.INFO) @dataclass class PIDController: Kp: float = 0.0 # P factor Ki: float = 0.0 # I factor Kd: float = 0.0 # D factor r: float = 0.0 # Reference which we want the measured y to be Pterm: float = 0.0 # P part Iterm: float = 0.0 # Integral part Dterm: float = 0.0 # Differential part previous_error: float = 0.0 # used to calculate differential part previous_Iterm: float = 0.0 # used to calculate integral part current_time: float = None # used to calculate differential and integral part previous_time: float = None # used to calculate differential and integral part u_max: float = None # used to calculate controller input saturation u_min: float = None # used to calculate controller input saturation u_p: float = 0.0 u: float = 0.0 def __post_init__(self): self.previous_time = self.previous_time if self.previous_time is not None else time() self.current_time = self.current_time if self.current_time is not None else time() def set_reference(self, new_reference): self.r = new_reference def update(self, new_y_meas, x_batt, current_time=None): # Ensure we have the time elapsed since our last value; we'll use that for the integral and differential parts self.current_time = current_time if current_time is not None else time() delta_time = self.current_time - self.previous_time # Filter the incoming value y_2 = self._filter_input(new_y_meas) # Calculate the error to the setpoint error = self._calculate_error(self.r, y_2) # Apply splitting factor # TODO step 1.2: Familiarize with the usage of the splitting factor error = error - (1-x_batt)*max(error, 0) # Calculate the PID terms Pterm = self._calc_Pterm(error) Iterm = self._calc_Iterm(error, delta_time) Dterm = self._calc_Dterm(error, delta_time) # Calculate our raw response self.u_p = Pterm + Iterm + Dterm # Filter our response self.u = self._filter_output(self.u_p) u_new = clamp(self.u_min, self.u, self.u_max) # Remember values for next update if self.u == u_new: self.previous_Iterm = Iterm else: # in saturation - don't sum up Iterm self.u = u_new self.previous_time = self.current_time self.previous_error = error # Return the filtered response return self.u def _filter_input(self, new_y_hat): # optional code to filter the measurement signal return new_y_hat def _filter_output(self, u_p): # optional code to filter the input / actuation signal return u_p def _calculate_error(self, y_hat_2): # TODO step 1.2: calculate and return the error between reference and measurement return self.r - y_hat_2 def _calc_Pterm(self, error): # TODO step 1.2: calculate the proportional term based on the error and self.Kp return error * self.Kp def _calc_Iterm(self, error, delta_time): # TODO step 1.2: calculate the integral term based on error, last error and deltaT, and self.Ki return 0.0 def _calc_Dterm(self, error, delta_time): # calculate the proportional term based on error, last error and deltaT return self.Kd * 0.0 if __name__ == "__main__": # "main" method if this class is run as a script BATT_MIN_P, BATT_MAX_P, = -15, 15 #LOAD_MIN_P, LOAD_MAX_P = 0, 20 pid = PIDController(Kp=0.4, Ki=0.8, Kd=0.0, # 0.5 , 0.2 / 0.4 , 0.1, Kp 0.7 u_min=BATT_MIN_P, u_max=BATT_MAX_P) # Establish connection to the switchboard to get the pcc power measurement (that's the usual practice, however, vswitchboard currently not working) # sb = syslab.SwitchBoard('vswitchboard') # Instead, establish connection to all units to replace switchboard measurements gaia = syslab.WindTurbine("vgaia1") dumpload = syslab.Dumpload("vload1") mobload1 = syslab.Dumpload("vmobload1") pv319 = syslab.Photovoltaics("vpv319") batt = syslab.Battery('vbatt1') # Initiate reference signal r_old = 0.0 r = 0.0 try: while True: # We loop over the broadcast queue to empty it and only keep the latest value try: while True: # Receive reference setpoint incoming_str = reference_in_socket.recv_string(flags=zmq.NOBLOCK) # The incoming string will look like "setpoint_reference;0.547", # so we split it up and take the last bit forward. r = float(incoming_str.split(" ")[-1]) #logging.info(f"New reference setpoint: {r}") # An error will indicate that the queue is empty so we move along. except zmq.Again as e: pass # Overwrite setpoint reference r if different from previous value pid.set_reference(r) if r != r_old else None # Store reference for comparison in next loop r_old = r # To ensure accurate start_time = time() # Get measurements of the current power exchange at the grid connection (=Point of Common Coupling (PCC)) #pcc_p = sb.getActivePower(0) pcc_p = cast_to_cm(-(gaia.getActivePower().value - dumpload.getActivePower().value - mobload1.getActivePower().value + batt.getActivePower().value + pv319.getACActivePower().value)) # Calculate new requests using PID controller p_request_total = pid.update(pcc_p.value) # Ensure requests do not exceed limits of battery (extra safety, should be ensured by controller) batt_p = clamp(BATT_MIN_P, p_request_total, BATT_MAX_P) # # Send new setpoints batt.setActivePower(batt_p) #print(f"Measured: P:{pcc_p.value:.02f}, want to set: P {batt_p:.02f}") print(f"Setpoint:{r:.02f} Measured: P:{pcc_p.value:.02f}, want to set: P {batt_p:.02f}") # Run the loop again once at most a second has passed. sleep(1) finally: # When closing, set all setpoints to 0. batt.setActivePower(0.0) reference_in_socket.close()