46045-syslab/simlab_controller_d5_load.py

182 lines
6.9 KiB
Python

import syslab
from time import sleep, time
from dataclasses import dataclass
from util import clamp, cast_to_cm
import sys
import zmq
import logging
# Make a context that we use to set up sockets
context = zmq.Context()
# Set up a socket to subscribe to the tester
reference_in_socket = context.socket(zmq.SUB)
reference_in_socket.connect(f"tcp://localhost:5000")
# Ensure we only see message on the setpoint reference
reference_in_socket.subscribe("setpoint_reference")
# Used to log the incoming reference setpoints
logging.basicConfig(level=logging.INFO)
@dataclass
class PIDController:
Kp: float = 0.0 # P factor
Ki: float = 0.0 # I factor
Kd: float = 0.0 # D factor
r: float = 0.0 # Reference which we want the measured y to be
Pterm: float = 0.0 # P part
Iterm: float = 0.0 # Integral part
Dterm: float = 0.0 # Differential part
previous_error: float = 0.0 # used to calculate differential part
previous_Iterm: float = 0.0 # used to calculate integral part
current_time: float = None # used to calculate differential and integral part
previous_time: float = None # used to calculate differential and integral part
u_max: float = None # used to calculate controller input saturation
u_min: float = None # used to calculate controller input saturation
u_p: float = 0.0
u: float = 0.0
def __post_init__(self):
self.previous_time = self.previous_time if self.previous_time is not None else time()
self.current_time = self.current_time if self.current_time is not None else time()
def set_reference(self, new_reference):
self.r = new_reference
def update(self, new_y_meas, x_load, current_time=None):
# Ensure we have the time elapsed since our last value; we'll use that for the integral and differential parts
self.current_time = current_time if current_time is not None else time()
delta_time = self.current_time - self.previous_time
# Filter the incoming value
y_2 = self._filter_input(new_y_meas)
# Calculate the error to the setpoint
error = self._calculate_error(self.r, y_2)
# Apply splitting factor # TODO step 1.2: Familiarize with the usage of the splitting factor
error = x_load*max(error, 0)
# Calculate the PID terms
Pterm = self._calc_Pterm(error)
Iterm = self._calc_Iterm(error, delta_time)
Dterm = self._calc_Dterm(error, delta_time)
# Calculate our raw response
self.u_p = Pterm + Iterm + Dterm
# Filter our response
self.u = self._filter_output(self.u_p)
u_new = clamp(self.u_min, self.u, self.u_max)
# Remember values for next update
if self.u == u_new:
self.previous_Iterm = Iterm
else: # in saturation - don't sum up Iterm
self.u = u_new
self.previous_time = self.current_time
self.previous_error = error
# Return the filtered response
return self.u
def _filter_input(self, new_y_hat):
# optional code to filter the measurement signal
return new_y_hat
def _filter_output(self, u_p):
# optional code to filter the input / actuation signal
return u_p
def _calculate_error(self, r, y_hat_2):
# calculate the error between reference and measurement
return r - y_hat_2
def _calculate_error(self, y_hat_2):
# TODO step 1.2: calculate and return the error between reference and measurement
return 0.0
def _calc_Pterm(self, error):
# TODO step 1.2: calculate the proportional term based on the error and self.Kp
return 0.0
def _calc_Iterm(self, error, delta_time):
# TODO step 1.2: calculate the integral term based on error, last error and deltaT, and self.Ki
return 0.0
def _calc_Dterm(self, error, delta_time):
# calculate the proportional term based on error, last error and deltaT
return self.Kd * 0.0
if __name__ == "__main__":
# "main" method if this class is run as a script
BATT_MIN_P, BATT_MAX_P, = -15, 15
#LOAD_MIN_P, LOAD_MAX_P = 0, 20
pid = PIDController(Kp=0.4, Ki=0.8, Kd=0.0, # 0.5 , 0.2 / 0.4 , 0.1, Kp 0.7
u_min=BATT_MIN_P,
u_max=BATT_MAX_P)
# Establish connection to the switchboard to get the pcc power measurement (that's the usual practice, however, vswitchboard currently not working)
# sb = syslab.SwitchBoard('vswitchboard')
# Instead, establish connection to all units to replace switchboard measurements
gaia = syslab.WindTurbine("vgaia1")
dumpload = syslab.Dumpload("vload1")
mobload1 = syslab.Dumpload("vmobload1")
pv319 = syslab.Photovoltaics("vpv319")
batt = syslab.Battery('vbatt1')
# Initiate reference signal
r_old = 0.0
r = 0.0
try:
while True:
# We loop over the broadcast queue to empty it and only keep the latest value
try:
while True:
# Receive reference setpoint
incoming_str = reference_in_socket.recv_string(flags=zmq.NOBLOCK)
# The incoming string will look like "setpoint_reference;0.547",
# so we split it up and take the last bit forward.
r = float(incoming_str.split(" ")[-1])
#logging.info(f"New reference setpoint: {r}")
# An error will indicate that the queue is empty so we move along.
except zmq.Again as e:
pass
# Overwrite setpoint reference r if different from previous value
pid.set_reference(r) if r != r_old else None
# Store reference for comparison in next loop
r_old = r
# To ensure accurate
start_time = time()
# Get measurements of the current power exchange at the grid connection (=Point of Common Coupling (PCC))
#pcc_p = sb.getActivePower(0)
pcc_p = cast_to_cm(-(gaia.getActivePower().value - dumpload.getActivePower().value - mobload1.getActivePower().value + batt.getActivePower().value + pv319.getACActivePower().value))
# Calculate new requests using PID controller
p_request_total = pid.update(pcc_p.value)
# Ensure requests do not exceed limits of battery (extra safety, should be ensured by controller)
batt_p = clamp(BATT_MIN_P, p_request_total, BATT_MAX_P)
# # Send new setpoints
batt.setActivePower(batt_p)
#print(f"Measured: P:{pcc_p.value:.02f}, want to set: P {batt_p:.02f}")
print(f"Setpoint:{r:.02f} Measured: P:{pcc_p.value:.02f}, want to set: P {batt_p:.02f}")
# Run the loop again once at most a second has passed.
sleep(1)
finally:
# When closing, set all setpoints to 0.
batt.setActivePower(0.0)
reference_in_socket.close()