from ..core.SyslabUnit import SyslabUnit from ..core.datatypes import CompositeMeasurement, CompositeBoolean, CompositeStatus from ..core.datatypes import CommonDeviceConfig from ..core.datatypes import DELV, WYEA, GPSL, PNPL, LNPL, HLTH, ConvOpMode, ConvState from typing import List, Union from time import time def cast_to_cm(m: Union[CompositeMeasurement, float]): if type(m) == float: # TODO: Is there a better way to estimate precision of time.time? request = CompositeMeasurement(m, timestampMicros=time()*1e6, timePrecision=1000) elif type(m) == CompositeMeasurement: request = m else: raise TypeError(f"Unknown request type: {type(m)}") return request class B2BConverter(SyslabUnit): """ Class covering back-to-back converters in SYSLAB. A full list of available switchboards can be found by calling 'SwitchBoard.getAvailableSwitchBoards()' Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments. """ _CONVERTERS = { 'ABBB2B': ('syslab-04.syslab.dk', '8080', 'B2BConverter'), } MAXP = 60 MAXQ = 60 @staticmethod def getAvailableUnits(): return list(B2BConverter.__CONVERTERS.keys()) def __init__(self, which=None, host=None, port=None, unit_name=None): baseurl = 'http://{host}:{port}/typebased_WebService_ABBConverter/ABBConverterWebService/{unit_name}/' super().__init__( baseurl=baseurl, which=which, units=self._CONVERTERS, host=host, port=port, unit_name=unit_name, unit_type="B2BConverter") # Inventory functions # Configuration class used for HMI layout def getNodeConfiguration(self, ) -> CommonDeviceConfig: return self._request_resource('getNodeConfiguration') # Component description def getConverterName(self, ) -> str: return self._request_resource('getConverterName') def getConverterLogicalNameplate(self, ) -> LNPL: raise NotImplementedError def getConverterPhysicalNameplate(self, ) -> PNPL: raise NotImplementedError def getConverterHealth(self, ) -> HLTH: raise NotImplementedError def getGPSLocation(self, ) -> GPSL: raise NotImplementedError # Operating characteristics (corresponding to COPR/DRAT node) def getRatedP(self, ) -> float: raise NotImplementedError # return self._request_resource('getRatedP') def getRatedS(self, ) -> float: raise NotImplementedError # return self._request_resource('getRatedS') def getRatedQ(self, ) -> float: raise NotImplementedError # return self._request_resource('getRatedQ') def getRatedU(self, ) -> float: raise NotImplementedError # return self._request_resource('getRatedU') def getRatedI(self, ) -> float: raise NotImplementedError # return self._request_resource('getRatedI') def getRatedf(self, ) -> float: raise NotImplementedError # return self._request_resource('getRatedf') # Operating mode settings (corresponding to DOPM node) def getAvailableOperatingModes(self, ) -> List[ConvOpMode]: raise NotImplementedError def getCurrentOperatingMode(self, ) -> ConvOpMode: return self._request_resource('getCurrentOperatingMode') def setOperatingMode(self, mode: ConvOpMode) -> None: raise NotImplementedError def isPSetpointEnabled(self, ) -> CompositeBoolean: return self._request_resource('isPSetpointEnabled') def isQSetpointEnabled(self, ) -> CompositeBoolean: return self._request_resource('isQSetpointEnabled') def isUSetpointEnabled(self, ) -> CompositeBoolean: return self._request_resource('isUSetpointEnabled') def isfSetpointEnabled(self, ) -> CompositeBoolean: return self._request_resource('isfSetpointEnabled') # Status information (corresponding to DPST node) def getConverterStatus(self, ) -> ConvState: return self._request_resource('getConverterStatus') # Alarms information def hasActiveFault(self, ) -> CompositeBoolean: raise NotImplementedError def hasActiveWarning(self, ) -> CompositeBoolean: raise NotImplementedError def resetAlarms(self, ) -> None: raise NotImplementedError def getActiveEventCode(self, ) -> CompositeStatus: raise NotImplementedError # DER controller characteristics (corresponding to DRCT node) def setActivePower(self, m: Union[CompositeMeasurement, float]) -> None: """ Send a request for the converter's output active power. Requires the converter to be in PQ mode. :param m: [kW] Requested active power. If a float, will be converted to a CompositeMeasurement with the current system time as timestamp.) :return: None """ P_UpperLimit = 15.0 P_LowerLimit = -15.0 return self._request_resource('setActivePower', (), 'put', cast_to_cm(max(P_LowerLimit, min(m, P_UpperLimit))).parseToJSON()) def setFrequency(self, m: Union[CompositeMeasurement, float]) -> None: """ Send a request for the converter's output power. Requires the converter to be in UF mode. :param m: [Hz] Requested active power. If a float, will be converted to a CompositeMeasurement with the current system time as timestamp.) :return: None """ return self._request_resource('seFrequency', (), 'put', cast_to_cm(m).parseToJSON()) def setReactivePower(self, m: Union[CompositeMeasurement, float]) -> None: """ Send a request for the converter's output reactive power. Requires the converter to be in PQ mode. :param m: [kVA] Requested active power. If a float, will be converted to a CompositeMeasurement with the current system time as timestamp.) :return: None """ Q_UpperLimit = 15.0 Q_LowerLimit = -15.0 return self._request_resource('setReactivePower', (), 'put', cast_to_cm(max(Q_LowerLimit, min(m, Q_UpperLimit))).parseToJSON()) def setVoltage(self, m: Union[CompositeMeasurement, float]) -> None: """ Send a request for the converter's output voltage. Requires the converter to be in UF mode. :param m: [V] Requested active power. If a float, will be converted to a CompositeMeasurement with the current system time as timestamp.) :return: None """ return self._request_resource('setVoltage', (), 'put', cast_to_cm(m).parseToJSON()) def getActivePowerSetpoint(self, ) -> CompositeMeasurement: raise NotImplementedError def getFrequencySetpoint(self, ) -> CompositeMeasurement: raise NotImplementedError def getReactivePowerSetpoint(self, ) -> CompositeMeasurement: raise NotImplementedError def getVoltageSetpoint(self, ) -> CompositeMeasurement: raise NotImplementedError # Synchronisation (corresponding to RSYN node) def synchronize(self, ) -> None: raise NotImplementedError def unsynchronize(self, ) -> None: raise NotImplementedError def setDroopEnable(self, b: CompositeBoolean) -> None: raise NotImplementedError def setLoadEnable(self, b: CompositeBoolean) -> None: raise NotImplementedError def getVoltageDroopPct(self, ) -> CompositeMeasurement: raise NotImplementedError def getFrequencyDroopPct(self, ) -> CompositeMeasurement: raise NotImplementedError def setVoltageDroopPct(self, pct: CompositeMeasurement) -> None: raise NotImplementedError def setFrequencyDroopPct(self, pct: CompositeMeasurement) -> None: raise NotImplementedError # Reciprocating Engine (corresponds to DCIP node) def startConverter(self, ) -> None: return self._request_resource('startConverter', (), 'put') def softStopConverter(self, ) -> None: return self._request_resource('softStopConverter', (), 'put') def stopConverter(self, ) -> None: return self._request_resource('stopConverter', (), 'put') # AC quantities (corresponds to MMXU nodes) def getActivePowerOutput(self, ) -> CompositeMeasurement: return self._request_resource('getActivePowerOutput') def getReactivePowerOutput(self, ) -> CompositeMeasurement: return self._request_resource('getReactivePowerOutput') def getOutputFrequency(self, ) -> CompositeMeasurement: return self._request_resource('getOutputFrequency') def getOutputInterphaseVoltages(self, ) -> DELV: return self._request_resource('getOutputInterphaseVoltages') def getOutputPhaseCurrents(self, ) -> WYEA: raise NotImplementedError def getRectifierInterphaseVoltages(self, ) -> DELV: return self._request_resource('getRectifierInterphaseVoltages') def getRectifierPhaseCurrents(self, ) -> WYEA: return self._request_resource('getRectifierPhaseCurrents') def getSyncBusInterphaseVoltages(self, ) -> DELV: raise NotImplementedError class ABBB2BConverter(B2BConverter): def __init__(self): super(ABBB2BConverter, self).__init__(which='ABBB2B')