empty branch

This commit is contained in:
DBras 2024-06-13 10:47:31 +02:00
parent deeb335eca
commit 16d4b96c55
59 changed files with 0 additions and 5915 deletions

Binary file not shown.

View File

@ -1 +0,0 @@
Makes folder visible for git.

File diff suppressed because it is too large Load Diff

View File

@ -1 +0,0 @@
Makes folder visible for git.

View File

@ -1,47 +0,0 @@
import syslab
from json import dump
from time import sleep, time
# Defines location and name of the measurements file
LOG_FILE = f'data/measurements/measurements_{time():.00f}.json'
print(f"Logging to file {LOG_FILE}")
# Set up a connection to the switchboards
sb3192 = syslab.SwitchBoard('319-2')
sb3193 = syslab.SwitchBoard('319-3')
sb33012 = syslab.SwitchBoard("330-12")
sb1172 = syslab.SwitchBoard('117-2')
# Convenience function to
def take_measurements():
measurements = {
"pcc_p": sb3192.getActivePower('Grid'),
"pcc_q": sb3192.getReactivePower('Grid'),
"pv319_p": sb3192.getActivePower('PV'),
"pv319_q": sb3192.getReactivePower('PV'),
"dumpload_p": sb3192.getActivePower('Dumpload'),
"dumpload_q": sb3192.getReactivePower('Dumpload'),
"gaia_p": sb33012.getActivePower('Gaia'),
"gaia_q": sb33012.getReactivePower('Gaia'),
"pv330_p": sb33012.getActivePower('PV_1'),
"pv330_q": sb33012.getReactivePower('PV_1'),
"b2b_p": sb3193.getActivePower('ABB_Sec'),
"b2b_q": sb3193.getReactivePower('ABB_Sec'),
"battery_p": sb1172.getActivePower('Battery'),
"battery_q": sb1172.getReactivePower('Battery'),
}
return [{'unit': k, 'value': meas.value, 'time': meas.timestampMicros/1e6} for k, meas in measurements.items()]
while True:
measurement = take_measurements()
# Open the output file in "append" mode which adds lines to the end
with open(LOG_FILE, 'a') as file:
for m in measurement:
# Convert the dictionary m to a json string and put it
# in the file.
dump(m, file)
# Write a newline for each measurement to make loading easier
file.write('\n')
sleep(1)

View File

@ -1,139 +0,0 @@
import pandas as pd
import json
import matplotlib.pyplot as plt
import os
from datetime import timedelta
## Read the measurements data file ##
DATA_MEAS_DIR = 'data\measurements'
# Always plot latest datafile - replace [-1] with another index if you want to plot a specific file.
MEAS_LOG_FILE = sorted(os.listdir(DATA_MEAS_DIR))[-1]
# Store each dictionary of the measurements json in a list
with open(os.path.join(DATA_MEAS_DIR, MEAS_LOG_FILE)) as f:
meas_data = [json.loads(line) for line in f]
# Use setpoint logger (only necessary for part two of the exercise "collecting fresh data")
use_setpoint_log = False
## Read the setpoints data file ##
if use_setpoint_log:
DATA_SP_DIR = 'data\setpoints'
# Always plot latest datafile
SP_LOG_FILE = sorted(os.listdir(DATA_SP_DIR))[-1]
# Store each dictionary of the setpoints json in a list
with open(os.path.join(DATA_SP_DIR, SP_LOG_FILE)) as f:
sp_data = [json.loads(line) for line in f]
# Merge measurements and setpoints in one list
data = meas_data + sp_data
else:
data = meas_data
# Construct a dataframe and pivot it to obtain a dataframe with a column per unit, and a row per timestamp.
df = pd.DataFrame.from_records(data)
df_pivot = df.pivot_table(values='value', columns='unit', index='time')
# Plot the data. Note, that the data will mostly not be plotted with lines.
plt.ion() # Turn interactive mode on
plt.figure()
ax1 = plt.subplot(211) # Make two separate figures
ax2 = plt.subplot(212)
df_pivot[[c for c in df_pivot.columns if "_p" in c]].plot(marker='.', ax=ax1, linewidth=3)
df_pivot[[c for c in df_pivot.columns if "_q" in c]].plot(marker='.', ax=ax2, linewidth=3)
plt.show(block=True)
## TODO Q1: Your code here
## TODO Q2:
# Convert time column (index) of df_pivot to datetime
# TODO Your code here
# Hint1: You can use pandas to_numeric() to prepare the index for pandas to_datetime function
# Hint2: Remember to define the unit within pandas to_datetime function
# Resample the data
# TODO Your code here
# Interpolate the measurements
# TODO Your code here
# Hint: For part two of the exercise ("collecting fresh data") the nan rows after a setpoint
# in the recorded step function should be filled with the value of the setpoint until the row of the next setpoint is reached
# You can use the df.fillna(method="ffill") function for that purpose. However, the measurements should still be interpolated!
# Plot the resampled data
# TODO Your code here
## TODO Q3: Your code here
## TODO Q4: Your code here
## Part two: "Collecting fresh data"
# Hint 1: You can build up on the "read_and_plot_data.py" from day 2
# Hint 2: Yoy may want to store your response metric functions from day 2 in the "util.py" and import all of them with
# "from util import *"
if use_setpoint_log:
# Add a column to df_pivot containing the reference/target signal
# TODO your code here
# Loop over all steps and extract T_1, T_2 and the step size
results = {}
for idx in range(0, len(sp_data)-1):
label = f"Step_{sp_data[idx]['value']}kW"
# Extract T_1 and T_2 from the setpoint JSON
# TODO your code here
# Change timestamp format
T_1 = pd.to_datetime(pd.to_numeric(T_1), unit="s").round("0.1S")
T_2 = pd.to_datetime(pd.to_numeric(T_2), unit="s").round("0.1S")
# To ensure we are not considering values of the next load step
T_2 = T_2 - timedelta(seconds=0.2)
# define measured output y and target setpoint r
# TODO your code here
# Derive step direction from the setpoint data
if ...: # TODO your code here
Positive_step = True
else:
Positive_step = False
# Collect response metrics results
results[label] = {
# TODO your code here
}
pd.DataFrame.from_dict(results).plot(kind='bar')
plt.title("Metrics")
plt.tight_layout()
plt.savefig('data/test_metrics'+MEAS_LOG_FILE[-10:]+'.png')
plt.show(block=True)

View File

@ -1,12 +0,0 @@
beautifulsoup4==4.12.3
certifi==2024.6.2
charset-normalizer==3.3.2
greenlet==3.0.3
idna==3.7
msgpack==1.0.8
pynvim==0.5.0
requests==2.32.3
soupsieve==2.5
# Editable Git install with no remote (syslab==0.3.0)
-e /home/daniel/Dropbox/DTU/F24/46045/syslab/syslab-python
urllib3==2.2.1

View File

@ -1,109 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
\.DS_Store
# Pycharm
.idea/

View File

@ -1,27 +0,0 @@
# SYSLAB Python Interface
This project provides a Python interface to several SYSLAB components.
## Required software
- Python (>=3.7)
- python-requests (>=2.18)
- python-bs4 (>= 4.7)
## Installation
To install the package in your local path, run
```shell
python setup.py install
```
Alternatively, copy the `syslab` folder into your project directory.
# Contributors
- Anders Thavlov: Initial implementation
- Oliver Gehrke
- Daniel Esteban Morales Bondy
- Tue Vissing Jensen
- Federico Zarelli

View File

@ -1,11 +0,0 @@
from syslab import SwitchBoard
name = '319-2'
SB_connection = SwitchBoard(name)
print("Let's look at what is going on in the switchboard {}.".format(name))
for bay in range(SB_connection.getNumBays()):
print(SB_connection.getBayName(bay),' : ',SB_connection.getActivePower(bay))

View File

@ -1,99 +0,0 @@
# Notes on SOAP implementation
- Units should reflect SOAP methods onto their own namespace (1)
- CompositeMeasurement should convert to/from SOAP
- Type checking via client.get\_type('ns0:compositeMeasurement').elements
-
# Notes about this module - to be discussed
SYSLAB\_Unit.py has a whole bunch of static methods - should these be split into a util library instead?
Generally, many places where static methods are used for things that should perhaps just be functions...
The following files are empty:
- BattOpMode.py
- FlowBatteryState.py
- GaiaWindTurbine.py
To check the methods available, use, e.g.:
http://syslab-33.syslab.dk:8080/typebased_WebService_HeatSubstation/HeatSwitchboardWebService/716-h1/resourceNames
To figure out this URL, look at software.xml for the corresponding machine, and use this template:
http://(machineName).syslab.dk:(port)/(interfaceName)/(shortServerName)/(unitname)/resourceNames
| field | corresponds to | notes |
| ----- | -------------- | ----- |
| machineName | N/A | Look this up on the wiki |
| port | N/A | Dynamically allocated, starting at 8080 - good luck! |
| interfaceName | typeBasedWebService, interfaceName | |
| shortServerName | typeBasedWebService, serverClass | Remove the "Server" at the end |
| unitname | dataLogger, unit | Also defined as "name" in hardware.xml |
-------------------------
SYSLAB COMMON
Broadcast event logger:
Transcode to python:
https://git.elektro.dtu.dk/syslab/syslab-common/-/blob/master/src/main/java/risoe/syslab/comm/broadcast/BroadcastLogSender.java
broadcast log sender
:: transcode the "send()" method to python. It byte-encodes the message for UDP.
Java:
send(String origin, byte[] origIP, long timestamp, int ploadType, String message,
int level, int flags, String[] tags)
------------
Python:
----------.
def send(origin, origIP, timestamp, ploadType, message, level, flags, tags):
ploadbytes = message[:min(1024, len(message))].encode()
origbytes = origin[:min(32, len(origin))].encode()
tagbytes = tagsToBytes(tags, 256)
pktlen = 2 + 2 + 1 + len(origbytes) + 4 + 2 + 2 + 8 + 1 + 2 + len(ploadbytes) + len(tagbytes)
buf = bytearray(pktlen)
buf[0] = BroadcastLogConstants.BROADCASTLOG_PKTID >> 8
buf[1] = BroadcastLogConstants.BROADCASTLOG_PKTID & 0xff
buf[2] = (pktlen >> 8) & 0xff
buf[3] = pktlen & 0xff
buf[4] = len(origbytes)
buf[5:5+len(origbytes)] = origbytes
writePtr = 5 + len(origbytes)
buf[writePtr:writePtr+4] = origIP
writePtr += 4
buf[writePtr] = (level >> 8) & 0xff
buf[writePtr+1] = level & 0xff
buf[writePtr+2] = (flags >> 8) & 0xff
buf[writePtr+3] = flags & 0xff
for i in range(8):
buf[writePtr+7-i] = timestamp & 0xff
timestamp >>= 8
writePtr += 8
buf[writePtr] = ploadType & 0xff
buf[writePtr+1] = (len(ploadbytes) >> 8) & 0xff
buf[writePtr+2] = len(ploadbytes) & 0xff
buf[writePtr+3:writePtr+3+len(ploadbytes)] = ploadbytes
writePtr += len(ploadbytes)
buf[writePtr:writePtr+len(tagbytes)] = tagbytes
pack = n
pack = DatagramPacket(buf, len(buf), InetAddress.getByName("localhost"), 4445)
sock.send(pack)
------------
broadcast log receiver
+ needs a logger
listener ist interface for receiver
gui wall (SYSLAB Userspacce)
broadcast log displet
https://git.elektro.dtu.dk/syslab/syslab-userspace/-/blob/master/src/main/java/risoe/syslab/gui/wall/displets/BroadcastLogDisplet.java
... maybe extend with simple log file writer.

View File

@ -1,2 +0,0 @@
requests >= 2.18
beautifulsoup4 >= 3.7

View File

@ -1,10 +0,0 @@
[flake8]
ignore =
max-line-length = 79
max-complexity = 11
[pytest]
addopts = --doctest-glob="*.rst"
[wheel]
universal = True

View File

@ -1,36 +0,0 @@
from setuptools import setup, find_packages
setup(
name='syslab',
version='0.3.0',
author='Tue Vissing Jensen',
author_email='tvjens at elektro.dtu.dk',
description=('SYSLAB webservice client library.'),
long_description=(''),
url='https://www.syslab.dk',
install_requires=[
'requests>=2.18',
'beautifulsoup4>=3.7',
],
packages=find_packages(exclude=['tests*']),
include_package_data=True,
entry_points={
'console_scripts': [
],
},
classifiers=[
'Development Status :: 3 - Alpha',
'Environment :: Console',
'Intended Audience :: Science/Research',
'License :: Other/Proprietary License',
'Natural Language :: English',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Scientific/Engineering',
'Topic :: Software Development :: Libraries :: Python Modules',
],
)

View File

@ -1,44 +0,0 @@
# -*- coding: utf-8 -*-
"""
SYSLAB library
~~~~~~~~~~~~~~~~~~~~~
SYSLAB is a Python 3 library for control and monitoring of units in SYSLAB. Usage of this library requires the running
computer to be located in the SYSLAB network, either physically or virtually, ie. using a VPN connection.
"""
__title__ = 'syslab'
__version__ = '0.3.0'
__author__ = 'Anders Thavlov, Daniel Esteban Morales Bondy, Tue Vissing Jensen'
__maintainer__ = 'Tue Vissing Jensen'
__license__ = ''
__copyright__ = 'DTU'
# Raise error if we are not on Python 3
import sys
if not (sys.version_info.major == 3 and sys.version_info.minor >= 5):
raise RuntimeError("Must be using at least Python 3.5")
# DataTypes
from .core.datatypes import CompositeMeasurement
from .core.datatypes import CompositeBoolean
# Physical units
from .physical import \
SwitchBoard, \
Dumpload, \
Photovoltaics, \
Battery, \
HeatSwitchBoard, \
WindTurbine, \
DieselGenerator, \
B2BConverter, \
MeteoMast, \
EVSE
# Logging utilities
#from .comm.LogUtils import e

View File

@ -1,59 +0,0 @@
###############################################################
# SYSLAB remote logging utilities v0.9
# collect and record log events
#
# Author: Kai Heussen
# Date: 2023/06/18
###############################################################
import logging
import logging.handlers
import time
import syslab.config as config
event_log_formats = '%(asctime)s - %(name)s - %(levelname)s - %(filename)s - %(funcName)s - %(message)s'
simple_log_formats = '%(asctime)s - %(name)s - %(message)s'
def setup_event_logger(loggername=config.EVLOG_NAME, host='localhost', port=config.REMOTE_PORT_EV, level=logging.INFO, formats=simple_log_formats): # UDP
handler1 = logging.handlers.DatagramHandler(config.REMOTE_IP_EV, config.REMOTE_PORT_EV)
handler2 = logging.handlers.DatagramHandler(host, port)
handler3 = logging.StreamHandler()
formatter = logging.Formatter(formats)
handler3.setFormatter(formatter)
loggr = logging.getLogger(loggername)
loggr.setLevel(level)
loggr.addHandler(handler1)
loggr.addHandler(handler2)
loggr.addHandler(handler3)
return loggr
def setup_local_logger(loggername=config.EVLOG_NAME, host='localhost', port=config.REMOTE_PORT_EV, level=logging.INFO, formats=simple_log_formats): # UDP
handler = logging.handlers.DatagramHandler(host, port)
formatter = logging.Formatter(formats)
handler.setFormatter(formatter)
loggr = logging.getLogger(loggername)
loggr.setLevel(level)
loggr.addHandler(handler)
return loggr
def setup_udp_logger(loggername=config.SPLOG_NAME, host=config.REMOTE_IP_SP, port=config.REMOTE_PORT_SP, level=logging.INFO, formats=config.LOG_FORMATS): # UDP
handler = logging.handlers.DatagramHandler(host, port)
formatter = logging.Formatter(formats)
handler.setFormatter(formatter)
loggr = logging.getLogger(loggername)
loggr.setLevel(level)
loggr.addHandler(handler)
return loggr
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)-8s %(message)s')
# logging.getLogger().addHandler(logging.handlers.DatagramHandler('10.42.242.3', 51010))
logger = setup_local_logger()
while True:
logger.debug("This shouldn't show up")
logger.info("This should show up")
time.sleep(3)

View File

@ -1,2 +0,0 @@
#from BroadcastLogger import BroadcastLogSender
#from LogUtils import setup_udp_logger

View File

@ -1,80 +0,0 @@
###############################################################
# SYSLAB remote logger v0.9
# utility to display and record log events
# usage: move bash to target log directory, then call
# python <path to syslab-python>/syslab/comm/logrec.py
#
# Author: Kai Heussen
# Date: 2023/06/18
###############################################################
import pickle
import logging
import socket
from syslab import config
# import sys
from time import time
# from json import dump
BASIC = True
FILE = True
FILE_RAW = False # TODO: write a JSON formatter - based on https://stackoverflow.com/questions/50144628/python-logging-into-file-as-a-dictionary-or-json
logtype = "EV"
time=time()
logfile =f"syslab_{logtype}_log_{time:.00f}.txt"
logfile_raw =f"syslab_{logtype}_log_{time:.00f}.raw" # not yet json
DEFAULT_PORT = config.REMOTE_PORT_SP if logtype == "SP" else config.REMOTE_PORT_EV
port = DEFAULT_PORT
#simple_formats = '%(asctime)s - %(name)s - %(message)s'
simple_formats = '%(asctime)s - %(module)s - %(message)s'
#formats='foo: %(levelname)s - %(module)s.%(funcName)s - %(message)s'
event_log_formatter = '%(asctime)s - %(name)s - %(levelname)s - %(filename)s - %(module)s - %(funcName)s - %(message)s'
formats = simple_formats
formatter = logging.Formatter(formats)
if BASIC:
logging.basicConfig(format=formats)
else:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)
if FILE:
handler = logging.FileHandler(logfile)
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)
logger = logging.getLogger()
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(('', port))
print(f'Listening for log records on port {port}...')
if FILE:
print(f'recording log entries in file: {logfile}')
if FILE_RAW:
print(f'recording raw log entries in file: {logfile_raw}')
try:
while True:
d, _ = s.recvfrom(1024)
msg = pickle.loads(d[4:])
logrec = logging.makeLogRecord(msg)
logger.handle(logrec)
if FILE_RAW:
with open(logfile_raw, 'a') as file:
# dump(logrec, file) # requires a JSON formatter
file.write(f'{logrec.__str__() }\n') # Write a newline for each measurement to make loading easier
#print(log)
finally:
s.close()
if FILE_RAW:
file.close()

View File

@ -1,19 +0,0 @@
DEBUG = False
SPLOG_NAME = 'SetPointLogger'
EVLOG_NAME = 'EventLog'
REMOTE_LOGGER = True
REMOTE_IP_SP = '10.42.242.3' # UI Machine 03
REMOTE_IP_EV = '10.42.242.3' # UI Machine 03
REMOTE_PORT_SP = 51010
REMOTE_PORT_EV = 51020
LOCAL_IP = 'localhost'
LOCAL_PORT_SP = 51010
LOCAL_PORT_EV = 51020
LOCAL_LOGGER = False
LOG_FORMATS = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # '%(asctime)s %(levelname)-8s %(message)s'
TEST_LOCAL = False
if TEST_LOCAL:
REMOTE_PORT_SP = LOCAL_PORT_SP
REMOTE_PORT_EV = LOCAL_PORT_EV
REMOTE_IP_SP = LOCAL_IP
REMOTE_IP_EV = LOCAL_IP

View File

@ -1,306 +0,0 @@
from .datatypes import CompositeBoolean, BattOpMode, CompositeMeasurement
from .datatypes import FlowBatteryState, ConvState, ConvOpMode
from .datatypes import HeatCirculationPumpMode, HeatCirculationPumpState
import syslab.config as config
class SyslabUnit:
__HIDDEN_RESOURCES = {
'authenticate': ('boolean', 'String', 'String'),
'isAuthenticated': ('boolean',),
'checkConnection': ('boolean',),
'logout': ('void',)}
def __init__(self, baseurl, units=None, which=None, host=None, port=None, unit_name=None, unit_type=""):
"""
Initialize a proxy to the given unit.
:param baseurl: Example: 'http://{host}:{port}/typebased_WebService_Battery/VRBBatteryWebService/{unit_name}/'
:param units: dictionary of units to be loaded in the format {'which': ('host', 'port', 'unit_name')}
:param which: string indicating which unit we pick from _units_
:param host: (optional) override host
:param port: (optional) override port
:param unit_name: (optional) override unit_name
:param unit_type: (optional) Used to indicate the type of the unit.
"""
if units is None:
units = {}
if which is not None:
if which not in units:
raise TypeError(
'The {unit_type} should be one of: "{list_of_units}"'.format(
unit_type=unit_type,
list_of_units='", "'.join(units.keys())
))
else:
host, port, unit_name = units.get(which)
if host is not None:
host = host
if port is not None:
port = port
assert host is not None, "When assigning custom port, host must not be None."
assert port is not None, "When assigning custom host, port must not be None."
assert unit_name is not None, "When assigning custom host and port, unit_name must not be None."
url = baseurl.format(host=host, port=port, unit_name=unit_name)
print(url)
self._url = url
self._resources = parse_resources(url)
self._resources = {**self._resources, **SyslabUnit.__HIDDEN_RESOURCES}
# TODO: Do type checking on these types. Ignore for now
self._complex_arg_types = ['BattOpMode', 'CompositeMeasurement', 'HeatCirculationPumpMode']
if config.REMOTE_LOGGER:
#import logger
if config.DEBUG:
print(f"Setting up remote logger with default IP: {config.REMOTE_IP} and port: {config.REMOTE_PORT}")
from uuid import uuid1, getnode
from ..comm.LogUtils import setup_udp_logger
self.__logger = setup_udp_logger(config.SPLOG_NAME)
self.__session_id = uuid1().__str__()
self.__host_id = getnode().__str__()
# TODO: Set up local setpoints Logger
if config.LOCAL_LOGGER:
pass
def __request_int(self, resource, method, args, body) -> int:
return int(send_request(self._url, resource, method, args, body))
def __request_float(self, resource, method, args, body) -> float:
return float(send_request(self._url, resource, method, args, body))
def __request_boolean(self, resource, method, args, body) -> bool:
return bool(send_request(self._url, resource, method, args, body))
def __request_string(self, resource, method, args, body) -> str:
return send_request(self._url, resource, method, args, body)
def __request_void(self, resource, method, args, body) -> None:
send_request(self._url, resource, method, args, body)
def __request_composite_boolean(self, resource, method, args, body) -> CompositeBoolean:
if self.__get_resource_return_type(resource) == 'CompositeBoolean':
json_string = self.__request_string(resource, method, args, body)
result = CompositeBoolean.parseFromJSON(json_string)
else:
raise TypeError('Error: resource "{0}" does not return a CompositeBoolean.'.format(resource))
return result
def __request_composite_measurement(self, resource, method, args, body) -> CompositeMeasurement:
if self.__get_resource_return_type(resource) == 'CompositeMeasurement':
json_string = self.__request_string(resource, method, args, body)
result = CompositeMeasurement.parseFromJSON(json_string)
else:
raise TypeError('Error: resource "{0}" does not return a CompositeMeasurement.'.format(resource))
return result
def _request_resource(self, resource, args=(), method='get', body=None, check_types=True):
if not type(resource) is str:
raise TypeError("Resource should be a string, found {0}.".format(type(resource)))
if resource not in self._resources:
# print(resource)
# FIXME: Workaround for metmast, where string-based methods are suffixed by a '2' and int-based methods by a '1'
if not resource[-1] in ('1', '2'):
raise ValueError('Unknown resource: {0} - no such resource found for {1}'.format(resource, self._url))
if type(args) is int or type(args) is float or type(args) is str:
args = (args,)
if config.DEBUG:
print("Resource: ", resource, " and args: ", args, "with method: ", method)
if config.REMOTE_LOGGER and method == 'put':
logstr = f'SessionID:{self.__session_id}||SenderID:{self.__host_id}||Unit-URL:{self._url}||Setterfcn:{resource}||ArgValue:{args}'
self.__logger.info(logstr)
return_type = self.__get_resource_return_type(resource)
self.__check_argument(resource, args)
result = None
if check_types:
try:
if return_type == 'CompositeMeasurement':
result = self.__request_composite_measurement(resource, method, args, body)
elif return_type == 'CompositeBoolean':
result = self.__request_composite_boolean(resource, method, args, body)
elif return_type == 'String':
result = self.__request_string(resource, method, args, body)
elif return_type == 'int':
result = self.__request_int(resource, method, args, body)
elif return_type == 'boolean':
result = self.__request_boolean(resource, method, args, body)
elif return_type == 'float' or return_type == 'double':
result = self.__request_float(resource, method, args, body)
elif return_type == 'void':
self.__request_string(resource, method, args, body)
elif return_type == 'BattOpMode':
result = BattOpMode.parseFromJSON(self.__request_string(resource, method, args, body))
elif return_type == 'FlowBatteryState':
result = FlowBatteryState.parseFromJSON(self.__request_string(resource, method, args, body))
elif return_type == 'ConvState':
result = ConvState.parseFromJSON(self.__request_string(resource, method, args, body))
elif return_type == 'EVSEState':
result = EVSEState.parseFromJSON(self.__request_string(resource, method, args, body))
elif return_type == 'ConvOpMode':
result = ConvOpMode.parseFromJSON(self.__request_string(resource, method, args, body))
elif return_type == 'HeatCirculationPumpMode':
result = HeatCirculationPumpMode.parseFromJSON(self.__request_string(resource, method, args, body))
elif return_type == 'HeatCirculationPumpState':
result = HeatCirculationPumpState.parseFromJSON(self.__request_string(resource, method, args, body))
elif return_type == 'WYEV' or return_type == 'WYEA' or return_type == 'DELV':
import json
result = json.loads(self.__request_string(resource, method, args, body))
else:
raise TypeError(
'TypeError: The return type {0} by {1} is unknown or not supported yet.'.format(return_type,
resource))
except KeyError as e:
# raise e
raise ValueError('{0} - no such resource found for {1}'.format(resource, self._url))
else:
import json
return_string = self.__request_string(resource, method, args, body)
try:
result = json.loads(return_string)
except RecursionError:
raise RecursionError(
"Maximum recursion depth exceeded while decoding "
"a JSON array from a unicode string. Length: {0}".format(
len(return_string)))
return result
def __get_resource_return_type(self, resource):
if not resource[-1] in ('1', '2'):
# FIXME: Workaround for metmast, where string-based methods are suffixed by a '2' and int-based methods by a '1'
return self._resources[resource][0]
else:
return self._resources[resource[:-1]][0]
def __get_resource_argument_types(self, resource):
if not resource[-1] in ('1', '2'):
# FIXME: Workaround for metmast, where string-based methods are suffixed by a '2' and int-based methods by a '1'
return self._resources[resource][1:]
else:
return self._resources[resource[:-1]][1:]
def __check_argument(self, resource, args):
arg_types = self.__get_resource_argument_types(resource)
try:
# TODO Ignore complex arguments (for now)
for complex_arg_type in self._complex_arg_types:
if complex_arg_type in arg_types:
return
if len(args) == len(arg_types):
for arg, arg_type in zip(args, arg_types):
if arg_type == 'String' and type(arg) is not str:
raise TypeError()
elif arg_type == 'int' and type(arg) is not int:
raise TypeError()
elif (arg_type == 'float' or arg_type == 'double') and type(arg) is not float and type(
arg) is not int:
raise TypeError()
else:
raise TypeError()
except TypeError:
raise TypeError(
'The resource "{0}" requires exactly {1} argument(s) of type {2}.'.format(resource, len(arg_types),
arg_types))
def parse_resources(url):
import re
from ast import literal_eval
result = {}
resources = send_request(url, 'resourceNames', 'get')
resources = literal_eval(resources)
for resource in resources:
try:
# TODO - handle java arrays, ie, []
m = re.match("(\w+)\[?\]? (\w+)(\([\w ,]*\))?", resource).groups()
except AttributeError as e:
print(e)
continue
args = ''
if m[2] is not None:
args = tuple(m[2].replace(' ', '').replace('(', '').replace(')', '').split(','))
result[m[1]] = (m[0],) + args
else:
result[m[1]] = (m[0],)
if config.DEBUG:
print(f'Key: {m[1]} - {m[0]} -- {args}')
return result
def send_request(url, resource, method, args=None, body=None):
from requests import request
from requests.exceptions import ConnectionError, RequestException
if not type(url) is str:
raise TypeError('URL should be a string, found {0}'.format(type(url)))
if not type(resource) is str:
raise TypeError('URL should be a string, found {0}'.format(type(resource)))
kwargs = {}
kwargs.setdefault('allow_redirects', True)
kwargs.setdefault('headers', {'accept': 'application/json', 'content-type': 'application/json'})
if not url.endswith('/'):
url += '/'
full_url = url + resource + '/'
if args is not None and len(args) > 0:
full_url += '/'.join(tuple(str(x) for x in args))
if config.DEBUG:
print(f'Calling: {full_url}')
try:
response = request(method, full_url, data=body, **kwargs)
except ConnectionError as e:
print('Unable to connect to host: {0}.'.format(url))
raise e
except RequestException as e:
print('Request error from host: {0}.'.format(full_url))
raise e
if 200 <= response.status_code < 300:
result = response.text
elif response.status_code == 404:
raise ConnectionError(
'Resource not found on host or arguments are not formatted correctly: {0}'.format(response.text))
elif response.status_code == 405:
raise ConnectionError('Method not allowed on host: \n {0}'.format(response.text))
elif response.status_code == 500:
from pprint import pprint
# TODO: Handle exception
print('Exception on server:')
pprint(response.json())
raise
else:
raise ConnectionError(
'Unable to successfully connect to host. Returned with HTTP status code {0}.\n Content: {1}'.format(
response.status_code, response.text))
if config.DEBUG:
print(f'Succesfully called {full_url}')
print(f'Returned: {result}')
return result

View File

@ -1 +0,0 @@
from . import *

View File

@ -1,47 +0,0 @@
class BattOpMode:
_OPMODES = {
0: 'BATT_UNKNOWN',
1: 'BATT_AUTO',
2: 'BATT_AUTO_SUBMODE1',
3: 'BATT_AUTO_SUBMODE2',
4: 'BATT_AUTO_SUBMODE3',
5: 'BATT_AUTO_SUBMODE4',
6: 'BATT_MANUAL',
7: 'BATT_OFF',
8: 'BATT_NOBMS',
9: 'BATT_WITHBMS', }
def __init__(self, mode):
assert mode in BattOpMode.__OPMODES
self._mode = mode
@property
def mode(self):
return self._mode
def modeAsString(self):
return BattOpMode.__OPMODES[self.mode]
def __str__(self):
return 'BattOpMode: {0} ({1})'.format(self.mode, BattOpMode.__OPMODES[self.mode])
def parseToJSON(self):
from json import dumps
jsonObj = {};
jsonObj['mode'] = self.mode;
return dumps(jsonObj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str: raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
# result = None ## Appears unused
try:
jsonObj = loads(jsonString)
return BattOpMode(jsonObj.get('mode'))
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(jsonString))
raise e

View File

@ -1,5 +0,0 @@
# TODO: Implement
class CommonDeviceConfig:
pass

View File

@ -1,68 +0,0 @@
class CompositeBoolean:
def __init__(self, value, timestampMicros, timePrecision=0, quality=0, validity=0, source=0):
self._value = value
self._timestampMicros = timestampMicros
self._timePrecision = timePrecision if timePrecision != None else 0
self._quality = quality if quality != None else 0
self._validity = validity if validity != None else 0
self._source = source if source != None else 0
@property
def value(self):
return self._value
@property
def timestampMicros(self):
return self._timestampMicros
@property
def timePrecision(self):
return self._timePrecision
@property
def quality(self):
return self._quality
@property
def validity(self):
return self._validity
@property
def source(self):
return self._source
def __repr__(self):
return "{0}(".format(self.__class__.__name__) + \
"{0!r}, {1!r}, ".format(self._value, self._timestampMicros) + \
"{0!r}, {1!r}, ".format(self._timePrecision, self._quality) + \
"{0!r}, {1!r})".format(self._validity, self._source)
def __str__(self):
from datetime import datetime
return 'CompositeBoolean: {0} (@time: {1} UTC) '.format(self.value, datetime.utcfromtimestamp(self.timestampMicros/1000000))
def parseToJSON(self):
from json import dumps
jsonObj = {};
jsonObj['value'] = self.value;
jsonObj['timestampMicros'] = self.timestampMicros;
jsonObj['timePrecision'] = self.timePrecision
jsonObj['quality'] = self.quality
jsonObj['validity'] = self.validity
jsonObj['source'] = self.source;
return dumps(jsonObj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str: raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
try:
jsonObj = loads(jsonString)
result = CompositeBoolean(jsonObj.get('value'), jsonObj.get('timestampMicros'),
jsonObj.get('timePrecision'),
jsonObj.get('quality'), jsonObj.get('validity'), jsonObj.get('source'))
return result
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(jsonString))
raise e

View File

@ -1,75 +0,0 @@
MICROS_PER_SECOND = 1000000
def none_to_zero(x):
if x is None:
return 0
return x
class CompositeMeasurement:
def __init__(self, value, timestampMicros, timePrecision=0, quality=0, validity=0, source=0):
self._value = value
self._timestampMicros = timestampMicros
self._timePrecision = none_to_zero(timePrecision)
self._quality = quality if quality != None else 0
self._validity = validity if validity != None else 0
self._source = source if source != None else 0
@property
def value(self):
return self._value
@property
def timestampMicros(self):
return self._timestampMicros
@property
def timePrecision(self):
return self._timePrecision
@property
def quality(self):
return self._quality
@property
def validity(self):
return self._validity
@property
def source(self):
return self._source
def __str__(self):
from datetime import datetime
return 'CompositeMeasurement: {0} (@time: {1})'.format(self.value, datetime.fromtimestamp(self.timestampMicros/MICROS_PER_SECOND))
def __repr__(self):
return "{0}(".format(self.__class__.__name__) + \
"{0!r}, {1!r}, ".format(self._value, self._timestampMicros) + \
"{0!r}, {1!r}, ".format(self._timePrecision, self._quality) + \
"{0!r}, {1!r})".format(self._validity, self._source)
def parseToJSON(self):
from json import dumps
jsonObj = {};
jsonObj['value'] = self.value;
jsonObj['timestampMicros'] = self.timestampMicros;
jsonObj['timePrecision'] = self.timePrecision
jsonObj['quality'] = self.quality
jsonObj['validity'] = self.validity
jsonObj['source'] = self.source;
return dumps(jsonObj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str: raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
try:
jsonObj = loads(jsonString)
result = CompositeMeasurement(jsonObj.get('value'), jsonObj.get('timestampMicros'),
jsonObj.get('timePrecision'),
jsonObj.get('quality'), jsonObj.get('validity'), jsonObj.get('source'))
return result
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(jsonString))
raise e

View File

@ -1,5 +0,0 @@
# TODO: Add Processing here
class CompositeStatus:
pass

View File

@ -1,132 +0,0 @@
from .CompositeStatus import CompositeStatus
class ConvOpMode:
_OPMODES = {
0: 'CONV_UNKNOWN',
1: 'CONV_PQ',
2: 'CONV_UF',
3: 'CONV_OFF',
}
def __init__(self, status, timestampMicros, timePrecision=0, quality=0, validity=0, source=0):
assert status in ConvOpMode._OPMODES
self._status = status
self._timestampMicros = timestampMicros
self._timePrecision = timePrecision
self._quality = quality
self._validity = validity
self._source = source
@property
def mode(self) -> int:
return self._status
def modeAsString(self):
return ConvOpMode._OPMODES[self.mode]
def __str__(self):
return f'ConvOpMode({self.mode}; {ConvOpMode._OPMODES[self.mode]})'
def parseToJSON(self):
from json import dumps
jsonObj = {'mode': self.mode}
return dumps(jsonObj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str:
raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
try:
jsonObj = loads(jsonString)
return ConvOpMode(**jsonObj.get('mode'))
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(jsonString))
raise e
class ConvState:
_STATES = {
0: 'CONV_UNKNOWN',
1: 'CONV_STOPPED',
2: 'CONV_STARTING',
4: 'CONV_RUNNING',
8: 'CONV_STOPPING',
16: 'CONV_SYNCED',
32: 'CONV_DROOP',
64: 'CONV_LOADENABLED',
128: 'CONV_INHIBITED',
1024: 'CONV_READY',
2048: 'CONV_WARNING',
4096: 'CONV_ALARM'}
def __init__(self, status, timestampMicros, timePrecision=0, quality=0, validity=0, source=0):
self._status = status
self._timestampMicros = timestampMicros
self._timePrecision = timePrecision
self._quality = quality
self._validity = validity
self._source = source
@property
def status(self):
return self._status
@property
def timestampMicros(self):
return self._timestampMicros
@property
def timePrecision(self):
return self._timePrecision
@property
def quality(self):
return self._quality
@property
def validity(self):
return self._validity
@property
def source(self):
return self._source
def statusAsString(self):
return ConvState._STATES[self.status]
def __repr__(self):
return "{0}(".format(self.__class__.__name__) + \
"{0!r}, {1!r}, ".format(self._status, self._timestampMicros) + \
"{0!r}, {1!r}, ".format(self._timePrecision, self._quality) + \
"{0!r}, {1!r})".format(self._validity, self._source)
def __str__(self):
from datetime import datetime
return 'FlowBatteryState: {0} : {1} (@time: {2}))'.format(
self._status,
# status & statuscode evaluates to True if statuscode is 2^k and in the binary expansion of status
";".join([label for statuscode, label in ConvState._STATES.items() if self._status & statuscode]),
datetime.fromtimestamp(self.timestampMicros / 1000000))
def parseToJSON(self):
from json import dumps
json_obj = {'status': self._status}
return dumps(json_obj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str: raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
# result = None
try:
jsonObj = loads(jsonString).get('status')
return ConvState(
jsonObj.get('status'), jsonObj.get('timestampMicros'),
jsonObj.get('timePrecision'),
jsonObj.get('quality'), jsonObj.get('validity'), jsonObj.get('source'))
except JSONDecodeError as e:
print('Not a valid JSON string in parsing Converter Status: {0}.'.format(jsonString))
raise e

View File

@ -1,83 +0,0 @@
class EVSEState:
_STATES = {
0: 'EVSE_UNKNOWN',
1: 'EVSE_NO_EV',
2: 'EVSE_EV_STOPPED',
4: 'EVSE_EV_READY',
8: 'EVSE_EV_STARTING',
16: 'EVSE_EV_CHARGING',
32: 'EVSE_EV_DISCHARGING',
64: 'EVSE_EV_PAUSED',
128: 'EVSE_EV_ALARM',
256: 'EVSE_EV_ESD', }
def __init__(self, status, timestampMicros, timePrecision=0, quality=0, validity=0, source=0):
self._status = status
self._timestampMicros = timestampMicros
self._timePrecision = timePrecision
self._quality = quality
self._validity = validity
self._source = source
@property
def status(self):
return self._status
@property
def timestampMicros(self):
return self._timestampMicros
@property
def timePrecision(self):
return self._timePrecision
@property
def quality(self):
return self._quality
@property
def validity(self):
return self._validity
@property
def source(self):
return self._source
def statusAsString(self):
return EVSEState.__STATES[self.status]
def __repr__(self):
return "{0}(".format(self.__class__.__name__) + \
"{0!r}, {1!r}, ".format(self._status, self._timestampMicros) + \
"{0!r}, {1!r}, ".format(self._timePrecision, self._quality) + \
"{0!r}, {1!r})".format(self._validity, self._source)
def __str__(self):
from datetime import datetime
return 'EVSEState: {0} : {1} (@time: {2}))'.format(
self.status,
# status & statuscode evaluates to True if statuscode is 2^k and in the binary expansion of status
";".join([label for statuscode, label in EVSEState._STATES.items() if self._status & statuscode]),
datetime.fromtimestamp(self.timestampMicros / 1000000))
def parseToJSON(self):
from json import dumps
jsonObj = {'status': self.status}
return dumps(jsonObj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str: raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
# result = None
try:
jsonObj = loads(jsonString).get('status')
return EVSEState(
jsonObj.get('status'), jsonObj.get('timestampMicros'),
jsonObj.get('timePrecision'),
jsonObj.get('quality'), jsonObj.get('validity'), jsonObj.get('source'))
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(jsonString))
raise e

View File

@ -1,88 +0,0 @@
class FlowBatteryState:
_STATES = {
0: 'BATT_UNKNOWN',
1: 'BATT_STOPPED',
2: 'BATT_STARTING',
4: 'BATT_FLOODING',
8: 'BATT_RUNNING',
16: 'BATT_STOPPING',
32: 'BATT_DRAINING',
64: 'BATT_PUMPSRAMP',
128: 'BATT_TANKVALVESOP',
256: 'BATT_PUMPSRUN',
512: 'BATT_DCBREAKER',
1024: 'PCS_READY',
2048: 'PCS_RUNNING',
4096: 'BATT_ALARM',
8192: 'BATT_ESD'}
def __init__(self, status, timestampMicros, timePrecision=0, quality=0, validity=0, source=0):
self._status = status
self._timestampMicros = timestampMicros
self._timePrecision = timePrecision
self._quality = quality
self._validity = validity
self._source = source
@property
def status(self):
return self._status
@property
def timestampMicros(self):
return self._timestampMicros
@property
def timePrecision(self):
return self._timePrecision
@property
def quality(self):
return self._quality
@property
def validity(self):
return self._validity
@property
def source(self):
return self._source
def statusAsString(self):
return FlowBatteryState.__STATES[self.status]
def __repr__(self):
return "{0}(".format(self.__class__.__name__) + \
"{0!r}, {1!r}, ".format(self._status, self._timestampMicros) + \
"{0!r}, {1!r}, ".format(self._timePrecision, self._quality) + \
"{0!r}, {1!r})".format(self._validity, self._source)
def __str__(self):
from datetime import datetime
return 'FlowBatteryState: {0} : {1} (@time: {2}))'.format(
self.status,
# status & statuscode evaluates to True if statuscode is 2^k and in the binary expansion of status
";".join([label for statuscode, label in FlowBatteryState._STATES.items() if self._status & statuscode]),
datetime.fromtimestamp(self.timestampMicros / 1000000))
def parseToJSON(self):
from json import dumps
jsonObj = {'status': self.status}
return dumps(jsonObj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str: raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
# result = None
try:
jsonObj = loads(jsonString).get('status')
return FlowBatteryState(
jsonObj.get('status'), jsonObj.get('timestampMicros'),
jsonObj.get('timePrecision'),
jsonObj.get('quality'), jsonObj.get('validity'), jsonObj.get('source'))
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(jsonString))
raise e

View File

@ -1,89 +0,0 @@
class HeatCirculationPumpMode:
__OPMODES = {
-1: 'PUMP_MODE_UNKNOWN',
0: 'PUMP_MODE_CONSTANT_SPEED',
1: 'PUMP_MODE_CONSTANT_FREQUENCY',
3: 'PUMP_MODE_CONSTANT_HEAD',
4: 'PUMP_MODE_CONSTANT_PRESSURE',
5: 'PUMP_MODE_CONSTANT_DIFF_PRESSURE',
6: 'PUMP_MODE_PROPORTIONAL_PRESSURE',
7: 'PUMP_MODE_CONSTANT_FLOW',
8: 'PUMP_MODE_CONSTANT_TEMP',
10: 'PUMP_MODE_CONSTANT_LEVEL',
128: 'PUMP_MODE_AUTO_ADAPT',
129: 'PUMP_MODE_FLOW_ADAPT' }
__OPMODES_R = {v: k for k, v in __OPMODES.items()}
def __init__(self, status, timestampMicros, timePrecision=0, quality=0, validity=0, source=0):
"""
@Input:
mode (int or str): Pump operating mode, can be both int and string
To get list of available modes: HeatCirculationPumpMode.opmodes()
"""
if status in HeatCirculationPumpMode.__OPMODES:
self._status = status
elif mode in HeatCirculationPumpMode.__OPMODES_R:
self._status = HeatCirculationPumpMode.__OPMODES_R[status]
else:
raise AssertionError('HeatCirculationPumpMode not recognized: \'{0}\''.format(mode))
self._timestampMicros = timestampMicros
self._timePrecision = timePrecision
self._quality = quality
self._validity = validity
self._source = source
@staticmethod
def opmodes():
return HeatCirculationPumpMode.__OPMODES.copy()
@property
def mode(self):
return self._status
@property
def modeAsString(self):
return HeatCirculationPumpMode.__OPMODES[self.mode]
def __repr__(self):
return "{0}(".format(self.__class__.__name__) + \
"{0!r}, {1!r}, ".format(self._status, self._timestampMicros) + \
"{0!r}, {1!r}, ".format(self._timePrecision, self._quality) + \
"{0!r}, {1!r})".format(self._validity, self._source)
def __str__(self):
return 'HeatCirculationPumpMode: {0} ({1})'.format(self.mode, self.modeAsString)
def parseToJSON(self):
from json import dumps
jsonObj = {};
jsonObj['mode'] = {
'status': self._status,
'timestampMicros': self._timestampMicros,
'timePrecision': self._timePrecision,
'quality': self._quality,
'validity': self._validity,
'source': self._source,
}
return dumps(jsonObj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str: raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
# result = None ## Appears unused
try:
jsonObj = loads(jsonString).get('mode')
return HeatCirculationPumpMode(
jsonObj.get('status'),
jsonObj.get('timestampMicros'),
jsonObj.get('timePrecision'),
jsonObj.get('quality'),
jsonObj.get('validity'),
jsonObj.get('source'))
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(jsonString))
raise e

View File

@ -1,83 +0,0 @@
class HeatCirculationPumpState:
__OPSTATES = {
-1: 'PUMP_STATE_UNKNOWN',
0: 'PUMP_STATE_STOPPED',
1: 'PUMP_STATE_RUNNING',
2: 'PUMP_STATE_ERROR',}
__OPSTATES_R = {v: k for k, v in __OPSTATES.items()}
def __init__(self, status, timestampMicros, timePrecision=0, quality=0, validity=0, source=0):
"""
@Input:
status (int or str): Pump operating mode, can be both int and string
To get list of available modes: HeatCirculationPumpState.opstates()
"""
if status in HeatCirculationPumpState.__OPSTATES:
self._status = status
elif state in HeatCirculationPumpState.__OPSTATES_R:
self._status = HeatCirculationPumpState.__OPSTATES_R[status]
else:
raise AssertionError('HeatCirculationPumpState not recognized: \'{0}\''.format(mode))
self._timestampMicros = timestampMicros
self._timePrecision = timePrecision
self._quality = quality
self._validity = validity
self._source = source
@staticmethod
def opstates():
return HeatCirculationPumpState.__OPSTATES.copy()
@property
def state(self):
return self._status
@property
def stateAsString(self):
return HeatCirculationPumpState.__OPSTATES[self._status]
def __repr__(self):
return "{0}(".format(self.__class__.__name__) + \
"{0!r}, {1!r}, ".format(self._status, self._timestampMicros) + \
"{0!r}, {1!r}, ".format(self._timePrecision, self._quality) + \
"{0!r}, {1!r})".format(self._validity, self._source)
def __str__(self):
return 'HeatCirculationPumpState: {0} ({1})'.format(self.state, self.stateAsString)
def parseToJSON(self):
from json import dumps
jsonObj = {}
jsonObj['state'] = {
'status': self._status,
'timestampMicros': self._timestampMicros,
'timePrecision': self._timePrecision,
'quality': self._quality,
'validity': self._validity,
'source': self._source,
}
return dumps(jsonObj)
@staticmethod
def parseFromJSON(jsonString):
from json import JSONDecodeError, loads
if not type(jsonString) is str: raise TypeError('jsonString should be a string, found {0}'.format(type(jsonString)))
# result = None ## Appears unused
try:
jsonObj = loads(jsonString).get('state')
return HeatCirculationPumpState(
jsonObj.get('status'),
jsonObj.get('timestampMicros'),
jsonObj.get('timePrecision'),
jsonObj.get('quality'),
jsonObj.get('validity'),
jsonObj.get('source'))
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(jsonString))
raise e

View File

@ -1,73 +0,0 @@
from dataclasses import dataclass
from . import CompositeMeasurement
from typing import Optional
# TODO: Add processing to these.
@dataclass
class DELV:
phaseAB: CompositeMeasurement
phaseBC: CompositeMeasurement
phaseCA: CompositeMeasurement
phaseAverage: CompositeMeasurement
@staticmethod
def parseFromJSON(json_string):
from json import JSONDecodeError, loads
if not type(json_string) is str:
raise TypeError('jsonString should be a string, found {0}'.format(type(json_string)))
try:
jsonObj = loads(json_string)
phaseABcm = CompositeMeasurement(**jsonObj.get('phaseAB'))
phaseBCcm = CompositeMeasurement(**jsonObj.get('phaseBC'))
phaseCAcm = CompositeMeasurement(**jsonObj.get('phaseCA'))
phaseAVGcm = CompositeMeasurement(**jsonObj.get('phaseAverage'))
return DELV(phaseABcm, phaseBCcm, phaseCAcm, phaseAVGcm)
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(json_string))
raise e
class GPSL:
pass
class HLTH:
pass
class LNPL:
pass
class PNPL:
pass
@dataclass()
class WYEA:
phaseAB: CompositeMeasurement
phaseBC: CompositeMeasurement
phaseCA: CompositeMeasurement
neutral: Optional[CompositeMeasurement] = None
@staticmethod
def parseFromJSON(json_string):
from json import JSONDecodeError, loads
if not type(json_string) is str:
raise TypeError('jsonString should be a string, found {0}'.format(type(json_string)))
try:
jsonObj = loads(json_string)
phaseAcm = CompositeMeasurement(**jsonObj.get('phaseA'))
phaseBcm = CompositeMeasurement(**jsonObj.get('phaseB'))
phaseCcm = CompositeMeasurement(**jsonObj.get('phaseC'))
neutral = jsonObj.get('neutral')
if neutral is not None:
neutral = CompositeMeasurement(**neutral)
return WYEA(phaseAcm, phaseBcm, phaseCcm, neutral)
except JSONDecodeError as e:
print('Not a valid JSON string: {0}.'.format(json_string))
raise e

View File

@ -1,16 +0,0 @@
__all__ = []
# Composite Data Types
from .CompositeBoolean import CompositeBoolean
from .CompositeMeasurement import CompositeMeasurement
from .CompositeStatus import CompositeStatus
# SYSLAB-specific data types
from .Identifiers import DELV, GPSL, HLTH, LNPL, PNPL, WYEA
# Unit-specific data types
from .BattOpMode import BattOpMode
from .FlowBatteryState import FlowBatteryState
from .HeatCirculationPumpMode import HeatCirculationPumpMode
from .HeatCirculationPumpState import HeatCirculationPumpState
from .ConverterTypes import ConvState, ConvOpMode

View File

@ -1,27 +0,0 @@
def get_flexhouse(simulated=False, physical=False, simulate_on_dumpload_ID="", simulate_on_battery=False):
"""
Return an instantiated object which operates as a flexhouse.
input:
simulated (bool): Whether the flexhouse should be simulated
physical (bool): Whether the flexhouse should be the real flexhouse
simulate_on_dumpload_ID (string): The ID of the dumpload on which to simulate the flexhouse
simulate_on_battery (bool): Whether to simulate on battery
return:
flexhouse: A flexhouse object
"""
if simulated:
if simulate_on_battery:
from .virtual.FlexHouse_sim_batt import FlexHouse_sim_batt
return FlexHouse_sim_batt('batt1')
else:
from .virtual.FlexHouse_sim import FlexHouse_sim
assert simulate_on_dumpload_ID != "", "Must supply an ID string for the dumpload used in Flexhouse simulation if not simulating on battery"
return FlexHouse_sim(simulate_on_dumpload_ID)
elif physical:
from .physical.FlexHouse_real import FlexHouse_real
return FlexHouse_real()
else:
raise Exception('Must define if FlexHouse instance is real or simulated')

View File

@ -1,258 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
from ..core.datatypes import CompositeMeasurement, CompositeBoolean, CompositeStatus
from ..core.datatypes import CommonDeviceConfig
from ..core.datatypes import DELV, WYEA, GPSL, PNPL, LNPL, HLTH, ConvOpMode, ConvState
from typing import List, Union
from time import time
def cast_to_cm(m: Union[CompositeMeasurement, float]):
if type(m) == float:
# TODO: Is there a better way to estimate precision of time.time?
request = CompositeMeasurement(m, timestampMicros=time()*1e6, timePrecision=1000)
elif type(m) == CompositeMeasurement:
request = m
else:
raise TypeError(f"Unknown request type: {type(m)}")
return request
class B2BConverter(SyslabUnit):
"""
Class covering back-to-back converters in SYSLAB.
A full list of available switchboards can be found by calling 'SwitchBoard.getAvailableSwitchBoards()'
Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments.
"""
_CONVERTERS = {
'ABBB2B': ('syslab-04.syslab.dk', '8080', 'B2BConverter'),
}
MAXP = 60
MAXQ = 60
@staticmethod
def getAvailableUnits():
return list(B2BConverter.__CONVERTERS.keys())
def __init__(self, which=None, host=None, port=None, unit_name=None):
baseurl = 'http://{host}:{port}/typebased_WebService_ABBConverter/ABBConverterWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self._CONVERTERS,
host=host,
port=port,
unit_name=unit_name,
unit_type="B2BConverter")
# Inventory functions
# Configuration class used for HMI layout
def getNodeConfiguration(self, ) -> CommonDeviceConfig:
return self._request_resource('getNodeConfiguration')
# Component description
def getConverterName(self, ) -> str:
return self._request_resource('getConverterName')
def getConverterLogicalNameplate(self, ) -> LNPL:
raise NotImplementedError
def getConverterPhysicalNameplate(self, ) -> PNPL:
raise NotImplementedError
def getConverterHealth(self, ) -> HLTH:
raise NotImplementedError
def getGPSLocation(self, ) -> GPSL:
raise NotImplementedError
# Operating characteristics (corresponding to COPR/DRAT node)
def getRatedP(self, ) -> float:
raise NotImplementedError
# return self._request_resource('getRatedP')
def getRatedS(self, ) -> float:
raise NotImplementedError
# return self._request_resource('getRatedS')
def getRatedQ(self, ) -> float:
raise NotImplementedError
# return self._request_resource('getRatedQ')
def getRatedU(self, ) -> float:
raise NotImplementedError
# return self._request_resource('getRatedU')
def getRatedI(self, ) -> float:
raise NotImplementedError
# return self._request_resource('getRatedI')
def getRatedf(self, ) -> float:
raise NotImplementedError
# return self._request_resource('getRatedf')
# Operating mode settings (corresponding to DOPM node)
def getAvailableOperatingModes(self, ) -> List[ConvOpMode]:
raise NotImplementedError
def getCurrentOperatingMode(self, ) -> ConvOpMode:
return self._request_resource('getCurrentOperatingMode')
def setOperatingMode(self, mode: ConvOpMode) -> None:
raise NotImplementedError
def isPSetpointEnabled(self, ) -> CompositeBoolean:
return self._request_resource('isPSetpointEnabled')
def isQSetpointEnabled(self, ) -> CompositeBoolean:
return self._request_resource('isQSetpointEnabled')
def isUSetpointEnabled(self, ) -> CompositeBoolean:
return self._request_resource('isUSetpointEnabled')
def isfSetpointEnabled(self, ) -> CompositeBoolean:
return self._request_resource('isfSetpointEnabled')
# Status information (corresponding to DPST node)
def getConverterStatus(self, ) -> ConvState:
return self._request_resource('getConverterStatus')
# Alarms information
def hasActiveFault(self, ) -> CompositeBoolean:
raise NotImplementedError
def hasActiveWarning(self, ) -> CompositeBoolean:
raise NotImplementedError
def resetAlarms(self, ) -> None:
raise NotImplementedError
def getActiveEventCode(self, ) -> CompositeStatus:
raise NotImplementedError
# DER controller characteristics (corresponding to DRCT node)
def setActivePower(self, m: Union[CompositeMeasurement, float]) -> None:
"""
Send a request for the converter's output active power. Requires the converter to be in PQ mode.
:param m: [kW] Requested active power. If a float, will be converted to a CompositeMeasurement with the current system time as timestamp.)
:return: None
"""
P_UpperLimit = 15.0
P_LowerLimit = -15.0
return self._request_resource('setActivePower', (), 'put', cast_to_cm(max(P_LowerLimit, min(m, P_UpperLimit))).parseToJSON())
def setFrequency(self, m: Union[CompositeMeasurement, float]) -> None:
"""
Send a request for the converter's output power. Requires the converter to be in UF mode.
:param m: [Hz] Requested active power. If a float, will be converted to a CompositeMeasurement with the current system time as timestamp.)
:return: None
"""
return self._request_resource('seFrequency', (), 'put', cast_to_cm(m).parseToJSON())
def setReactivePower(self, m: Union[CompositeMeasurement, float]) -> None:
"""
Send a request for the converter's output reactive power. Requires the converter to be in PQ mode.
:param m: [kVA] Requested active power. If a float, will be converted to a CompositeMeasurement with the current system time as timestamp.)
:return: None
"""
Q_UpperLimit = 15.0
Q_LowerLimit = -15.0
return self._request_resource('setReactivePower', (), 'put', cast_to_cm(max(Q_LowerLimit, min(m, Q_UpperLimit))).parseToJSON())
def setVoltage(self, m: Union[CompositeMeasurement, float]) -> None:
"""
Send a request for the converter's output voltage. Requires the converter to be in UF mode.
:param m: [V] Requested active power. If a float, will be converted to a CompositeMeasurement with the current system time as timestamp.)
:return: None
"""
return self._request_resource('setVoltage', (), 'put', cast_to_cm(m).parseToJSON())
def getActivePowerSetpoint(self, ) -> CompositeMeasurement:
raise NotImplementedError
def getFrequencySetpoint(self, ) -> CompositeMeasurement:
raise NotImplementedError
def getReactivePowerSetpoint(self, ) -> CompositeMeasurement:
raise NotImplementedError
def getVoltageSetpoint(self, ) -> CompositeMeasurement:
raise NotImplementedError
# Synchronisation (corresponding to RSYN node)
def synchronize(self, ) -> None:
raise NotImplementedError
def unsynchronize(self, ) -> None:
raise NotImplementedError
def setDroopEnable(self, b: CompositeBoolean) -> None:
raise NotImplementedError
def setLoadEnable(self, b: CompositeBoolean) -> None:
raise NotImplementedError
def getVoltageDroopPct(self, ) -> CompositeMeasurement:
raise NotImplementedError
def getFrequencyDroopPct(self, ) -> CompositeMeasurement:
raise NotImplementedError
def setVoltageDroopPct(self, pct: CompositeMeasurement) -> None:
raise NotImplementedError
def setFrequencyDroopPct(self, pct: CompositeMeasurement) -> None:
raise NotImplementedError
# Reciprocating Engine (corresponds to DCIP node)
def startConverter(self, ) -> None:
return self._request_resource('startConverter', (), 'put')
def softStopConverter(self, ) -> None:
return self._request_resource('softStopConverter', (), 'put')
def stopConverter(self, ) -> None:
return self._request_resource('stopConverter', (), 'put')
# AC quantities (corresponds to MMXU nodes)
def getActivePowerOutput(self, ) -> CompositeMeasurement:
return self._request_resource('getActivePowerOutput')
def getReactivePowerOutput(self, ) -> CompositeMeasurement:
return self._request_resource('getReactivePowerOutput')
def getOutputFrequency(self, ) -> CompositeMeasurement:
return self._request_resource('getOutputFrequency')
def getOutputInterphaseVoltages(self, ) -> DELV:
return self._request_resource('getOutputInterphaseVoltages')
def getOutputPhaseCurrents(self, ) -> WYEA:
raise NotImplementedError
def getRectifierInterphaseVoltages(self, ) -> DELV:
return self._request_resource('getRectifierInterphaseVoltages')
def getRectifierPhaseCurrents(self, ) -> WYEA:
return self._request_resource('getRectifierPhaseCurrents')
def getSyncBusInterphaseVoltages(self, ) -> DELV:
raise NotImplementedError
class ABBB2BConverter(B2BConverter):
def __init__(self):
super(ABBB2BConverter, self).__init__(which='ABBB2B')

View File

@ -1,85 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
from ..core.datatypes.FlowBatteryState import FlowBatteryState
from ..core.datatypes.BattOpMode import BattOpMode
class Battery(SyslabUnit):
"""The Battery class represents a battery in SYSLAB.
The Battery class is instantiated using a string with the unique name of the battery, ie. 'which'
A full list of available batteries can be found by calling 'Battery.getAvailableBatteries()'
Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments.
"""
__BATTERIES = {
'batt1': ('syslab-12.syslab.dk', '8080', 'batt1'),
'battemu': ('syslab-31.syslab.dk', '8080', 'battemu'),
'battfh1': ('syslab-s01.syslab.dk', '8080', 'battfh1'),
'simlab-15': ('192.168.0.115', '8080', 'batt1'),
'vbatt1': ('simlab-12', '8080', 'batt1'),
}
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = 'http://{host}:{port}/typebased_WebService_Battery/VRBBatteryWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__BATTERIES,
host=host,
port=port,
unit_name=unitname,
unit_type="Battery")
def startBattery(self):
return self._request_resource('startBattery', (), 'put')
def stopBattery(self):
return self._request_resource('stopBattery', (), 'put')
def getActivePower(self):
return self._request_resource('getACActivePower')
def setActivePower(self, setPoint):
P_UpperLimit = 15.0
P_LowerLimit = -15.0
return self._request_resource('setP', (max(P_LowerLimit, min(setPoint, P_UpperLimit))), 'put')
def getReactivePower(self):
return self._request_resource('getACReactivePower')
def setReactivePower(self, setPoint):
return self._request_resource('setQ', (setPoint), 'put')
def getFrequency(self):
return self._request_resource('getACFrequency')
def getRemainingFloodTime(self):
return min(self._request_resource('getRemainingFloodTime'), self._request_resource('getRemainingDrainTime'))
def getCurrentOperatingMode(self):
return self._request_resource('getCurrentOperatingMode')
def getCurrentOperatingState(self):
return self._request_resource('getCurrentOperatingState')
def setOperatingMode(self, mode):
return self._request_resource('setOperatingMode', (), 'put', BattOpMode(mode).parseToJSON())
def getSOC(self):
return self._request_resource('getSOC')
def getRatedActivePower(self):
return self._request_resource('getRatedP')
def getRatedReactivePower(self):
return self._request_resource('getRatedQ')
def getName(self):
return self._request_resource('getBatteryName')
@staticmethod
def getAvailableBatteries():
return list(Battery.__BATTERIES.keys())

View File

@ -1,100 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
class DieselGenerator(SyslabUnit):
"""The DieselGenerator class represents a photovoltaic panel array in SYSLAB.
The DieselGenerator class is instantiated using a string with the unique name of the dumpload, ie. 'which'
A full list of available panel arrays can be found by calling 'DieselGenerator.getAvailableDieselGenerator()'
Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments.
"""
__UNITS = {
'diesel319': ('syslab-02.syslab.dk', '8080', 'genset1'),
}
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = 'http://{host}:{port}/typebased_WebService_Diesel/DEIFDieselGensetWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__UNITS,
host=host,
port=port,
unit_name=unitname,
unit_type="DieselGenerator")
def getRatedP(self):
return self._request_resource('getRatedP')
def getRatedQ(self):
return self._request_resource('getRatedQ')
def getRatedS(self):
return self._request_resource('getRatedS')
def getRatedU(self):
return self._request_resource('getRatedU')
def getRatedf(self):
return self._request_resource('getRatedf')
def isGeneratorBreakerClosed(self):
return self._request_resource('isGeneratorBreakerClosed')
def isGensetRunning(self):
return self._request_resource('isGensetRunning')
def isGensetSynchronized(self):
return self._request_resource('isGensetSynchronized')
def isSynchronising(self):
return self._request_resource('isSynchronising')
def closeGB(self):
return self._request_resource('closeGB', method='put')
def openGB(self):
return self._request_resource('openGB', method='put')
def startGenset(self):
return self._request_resource('startGenset', method='put')
def stopGenset(self):
return self._request_resource('stopGenset', method='put')
def setTargetActivePower(self, setpoint):
return self._request_resource('setTargetActivePower', setpoint, 'put')
def setTargetReactivePower(self, setpoint):
return self._request_resource('setTargetReactivePower', setpoint, 'put')
def getActivePower(self):
return self._request_resource('getActivePower')
def getReactivePower(self):
return self._request_resource('getReactivePower')
#TODO: Implement
def getCurrentGensetMode(self):
pass
#TODO: Implement
def getCurrentRunningMode(self):
pass
#TODO: Implement
def setGensetMode(self, mode):
pass
#TODO: Implement
def setRunningMode(self, mode):
pass
@staticmethod
def getAvailableDieselGenerators():
return list(DieselGenerator.__UNITS.keys())

View File

@ -1,112 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
import warnings
class Dumpload(SyslabUnit):
"""The Dumpload class represents a dumpload in SYSLAB.
The Dumpload class is instantiated using a string with the unique name of the dumpload, ie. 'which'
A full list of available dumploads can be found by calling 'Dumpload.getAvailableDumpLoads()'
Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments.
"""
__DUMPLOADS = {
'mobload1': ('syslab-16.syslab.dk', '8080', 'mobload1'),
'mobload2': ('syslab-17.syslab.dk', '8080', 'mobload2'),
'mobload3': ('syslab-18.syslab.dk', '8080', 'mobload3'),
'load1': ('syslab-05.syslab.dk', '8080', 'load1'),
'vmobload1': ('simlab-16', '8080', 'mobload1'),
'vmobload2': ('simlab-17', '8080', 'mobload2'),
'vmobload3': ('simlab-18', '8080', 'mobload3'),
'vload1': ('simlab-05', '8080', 'load1'),
'simlab-05': ('192.168.0.105', '8080', 'mobload2'),
'simlab-11': ('192.168.0.111', '8080', 'mobload1'),
'simlab-12': ('192.168.0.112', '8080', 'load1'),
}
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = 'http://{host}:{port}/typebased_WebService_Load/GenericLoadWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__DUMPLOADS,
host=host,
port=port,
unit_name=unitname,
unit_type="Battery")
def startLoad(self):
return self._request_resource('startLoad', (), 'put')
def stopLoad(self):
return self._request_resource('stopLoad', (), 'put')
def getPowerSetPoint(self):
return self._request_resource('getConstantP')
def setPowerSetPoint(self, setPoint):
"""
Set the active power setpoint for the load.
Inputs:
setPoint (float): Requested power setpoint
Outputs:
Ack (bool): Acknowledgement of receiver
"""
P_UpperLimit = 15.0
P_LowerLimit = 0
return self._request_resource('setConstantP', max(P_LowerLimit, min(setPoint, P_UpperLimit)), 'put')
def getActivePower(self):
return self._request_resource('getActivePower')
def getReactivePower(self):
"""
Get the reactive power draw from the load.
Outputs:
Q (CompositeMeasurement): Current reactive power draw, calculated from active power draw (see note)
NOTE: This is a theoretical value calculated from the relation
Q = Q_r * sin (pi * P /P_r)
where P_r and Q_r are the rated active and reactive power draw.
For control purposes, use the measured value from the switchboard
instead to get an actual measurement.
"""
warnings.warn("The output of getReactivePower from the Dumpload class is calculated from the active power draw rather than a measured value. For control purposes, use the measured power draw on the switchboard.")
return self._request_resource('getReactivePower')
def getRatedPower(self):
return self._request_resource('getRatedP')
def getRatedReactivePower(self):
return self._request_resource('getRatedQ')
def getName(self):
return self._request_resource('getLoadName')
def isLoadOn(self):
return self._request_resource('isLoadOn')
@staticmethod
def getAvailableDumploads():
return list(Dumpload.__DUMPLOADS.keys())
class MobileLoad1(Dumpload):
def __init__(self):
super(MobileLoad1, self).__init__("mobload1")
class MobileLoad2(Dumpload):
def __init__(self):
super(MobileLoad2, self).__init__("mobload2")
class MobileLoad3(Dumpload):
def __init__(self):
super(MobileLoad3, self).__init__("mobload3")

View File

@ -1,213 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
from ..core.datatypes import CompositeMeasurement
from typing import Union
from time import time
def cast_to_cm(m: Union[CompositeMeasurement, float]):
if type(m) == float:
# TODO: Is there a better way to estimate precision of time.time?
request = CompositeMeasurement(m, timestampMicros=time()*1e6, timePrecision=1000)
elif type(m) == CompositeMeasurement:
request = m
else:
raise TypeError(f"Unknown request type: {type(m)}")
return request
class EVSE(SyslabUnit):
"""The EVSE class represents a charging post in SYSLAB.
The EVSE class is instantiated using a string with the unique name of the charging post, ie. 'which'
A full list of available charging posts can be found by calling 'EVSE.getAvailableChargingPosts()'
Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments.
"""
__CHARGINGPOSTS = {
'V2G-319': ('syslab-35.syslab.dk', '8080', 'Endesa_V2G'),
'EVSE-NEVIC-6': ('10.42.245.96', '8080', 'EVSE_NEVIC_6'),
}
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = 'http://{host}:{port}/typebased_WebService_EVSE/EVSEWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__CHARGINGPOSTS,
host=host,
port=port,
unit_name=unitname,
unit_type="EVSE")
def isEVPresent(self):
return self._request_resource('isEVPresent')
def canRemoteStartStop(self):
return self._request_resource('canRemoteStartStop')
def canLimitChargePower(self):
return self._request_resource('canLimitChargePower')
def canDischarge(self):
return self._request_resource('canDischarge')
def hasSOC(self):
return self._request_resource('hasSOC')
def hasACMeasurements(self):
return self._request_resource('hasACMeasurements')
def hasDCMeasurements(self):
return self._request_resource('hasDCMeasurements')
def getAvailableOperatingStates(self):
return self._request_resource('getAvailableOperatingStates')
def getCurrentOperatingState(self):
return self._request_resource('getCurrentOperatingState')
def getACActivePower(self):
return self._request_resource('getACActivePower')
def getACReactivePower(self):
return self._request_resource('getACReactivePower')
def getACFrequency(self):
return self._request_resource('getACFrequency')
def getACVoltages(self):
return self._request_resource('getACVoltages')
def getACCurrents(self):
return self._request_resource('getACCurrents')
def getSOC(self):
return self._request_resource('getSOC')
def getMinSOC(self):
return self._request_resource('getMinSOC')
def getMaxSOC(self):
return self._request_resource('getMaxSOC')
def startCharge(self):
return self._request_resource('startCharge', (), 'put')
def stopCharge(self):
return self._request_resource('stopCharge', (), 'put')
def startDischarge(self):
return self._request_resource('startDischarge', (), 'put')
def stopDischarge(self):
return self._request_resource('stopDischarge', (), 'put')
def getMinimumChargePower(self):
return self._request_resource('getMinimumChargePower')
def getMaximumChargePower(self):
return self._request_resource('getMaximumChargePower')
def getMinimumDischargePower(self):
return self._request_resource('getMinimumDischargePower')
def getMaximumDischargePower(self):
return self._request_resource('getMaximumDischargePower')
def setPowerSetpoint(self, setPoint):
P_UpperLimit = 10.0
P_LowerLimit = -10.0
return self._request_resource('setP', (), 'put', cast_to_cm(max(P_LowerLimit, min(setPoint, P_UpperLimit))).parseToJSON())
def isPSetpointEnabled(self):
return self._request_resource('isPSetpointEnabled')
def getActiveEnergyImport(self): # TODO not working/implemented
return self._request_resource('getActiveEnergyImport')
def getActiveEnergyExport(self): # TODO not working/implemented
return self._request_resource('getActiveEnergyExport')
def getReactiveEnergyImport(self): # TODO not working/implemented
return self._request_resource('getReactiveEnergyImport')
def getReactiveEnergyExport(self): # TODO not working/implemented
return self._request_resource('getReactiveEnergyExport')
def getNodeConfiguration(self): # TODO not working/implemented
return self._request_resource('getNodeConfiguration')
def getEVSEName(self):
return self._request_resource('getEVSEName')
def getEVSELogicalNameplate(self): # TODO not working/implemented
return self._request_resource('getEVSELogicalNameplate')
def getEVSEPhysicalNameplate(self): # TODO not working/implemented
return self._request_resource('getEVSEPhysicalNameplate')
def getEVSEHealth(self): # TODO not working/implemented
return self._request_resource('getEVSEHealth')
def getGPSLocation(self): # TODO not working/implemented
return self._request_resource('getGPSLocation')
def getNumberOfEvents(self): # TODO not working/implemented
return self._request_resource('getNumberOfEvents')
def getEvents(self): # TODO not working/implemented
return self._request_resource('getEvents')
def getNumberOfAlarms(self): # TODO not working/implemented
return self._request_resource('getNumberOfAlarms')
def getAlarms(self): # TODO not working/implemented
return self._request_resource('getAlarms')
def getNumberOfUnacknowledgedAlarms(self): # TODO not working/implemented
return self._request_resource('getNumberOfUnacknowledgedAlarms')
def getUnacknowledgedAlarms(self): # TODO not working/implemented
return self._request_resource('getUnacknowledgedAlarms')
def acknowledgeAlarms(self): # TODO not working/implemented
return self._request_resource('acknowledgeAlarms')
def getRatedP(self):
return self._request_resource('getRatedP')
def getRatedQ(self):
return self._request_resource('getRatedQ')
def getRatedS(self):
return self._request_resource('getRatedS')
def getRatedU(self):
return self._request_resource('getRatedU')
def getRatedf(self):
return self._request_resource('getRatedf')
def getInverterName(self): # TODO not working/implemented
return self._request_resource('getInverterName')
def getInverterLogicalNameplate(self): # TODO not working/implemented
return self._request_resource('getInverterLogicalNameplate')
def getInverterHealth(self): # TODO not working/implemented
return self._request_resource('getInverterHealth')
def getDCVoltage(self): # TODO not working/implemented
return self._request_resource('getDCVoltage')
def getDCPower(self): # TODO not working/implemented
return self._request_resource('getDCPower')
def isDCContactorClosed(self): # TODO not working/implemented
return self._request_resource('isDCContactorClosed')
@staticmethod
def getAvailableEVSE():
return list(EVSE.__CHARGINGPOSTS.keys())

View File

@ -1,164 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
from ..core.datatypes import CompositeMeasurement, HeatCirculationPumpMode, HeatCirculationPumpState
class Valve:
"""
Convenience class which calls methods on its associated heatswitchboard
"""
def __init__(self, label, heatswitchboard):
self.label = label
self.heatswitchboard = heatswitchboard
#TODO: Add methods
class Pump:
"""
Convenience class which calls methods on its associated heatswitchboard
"""
def __init__(self, label, heatswitchboard):
self.label = label
self.heatswitchboard = heatswitchboard
#TODO: Add methods
class Meter:
"""
Convenience class which calls methods on its associated heatswitchboard
"""
def __init__(self, label, heatswitchboard):
self.label = label
self.heatswitchboard = heatswitchboard
#TODO: Add methods
class HeatSwitchBoard(SyslabUnit):
"""
The HeatSwitchBoard class represents a HeatSwitchBoard in SYSLAB.
The HeatSwitchBoard class is instantiated using a string with the unique name of the switchboard, ie. 'which'
A full list of available switchboards can be found by calling 'SwitchBoard.getAvailableSwitchBoards()'
Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments.
"""
__HEAT_SWITCH_BOARDS = {
'716-h1': ('syslab-33.syslab.dk', '8080', '716-h1'),
}
@staticmethod
def getAvailableHeatSwitchBoards():
return list(HeatSwitchBoard.__HEAT_SWITCH_BOARDS.keys())
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = f'http://{host}:{port}/typebased_WebService_HeatSubstation/HeatSwitchboardWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__HEAT_SWITCH_BOARDS,
host=host,
port=port,
unit_name=unitname,
unit_type="HeatSwitchBoard")
# Inventory functions
#TODO Handle these in core/SYSLAB_unit.py by properly handling arrays. This is a bodge for now.
def getHeatMeters(self):
result = self._request_resource('getHeatMeters', check_types=False)
return result
#TODO, P1: Awaiting Valve[]
def getValves(self):
raise NotImplementedError
# Valve functions
def getValvePosition(self, valve) -> CompositeMeasurement:
return self._request_resource('getValvePosition', valve)
#TODO: Implement
#def setValvePosition(self, valve: str, position: float, timestamp=0):
def setValvePosition(self, valve: str, position: CompositeMeasurement):
"""
Sets *valve* to *position*. Position must be float in the
interval from 0.0 to 1.0
Position = 0.0 => *valve* fully closed
Position = 1.0 => *valve* fully open
"""
assert 0.0 <= position.value and position.value <= 1.0
return self._request_resource('setValvePosition', (valve), 'put',
position.parseToJSON())
#CompositeMeasurement(value=position, timestampMicros=timestamp).parseToJSON())
# Meter functions
def getBackTemperature(self, meter) -> CompositeMeasurement:
return self._request_resource('getBackTemperature', meter)
def getFlow(self, meter) -> CompositeMeasurement:
return self._request_resource('getFlow', meter)
def getThermalPower(self, meter) -> CompositeMeasurement:
return self._request_resource('getThermalPower', meter)
def getFwdTemperature(self, meter) -> CompositeMeasurement:
return self._request_resource('getFwdTemperature', meter)
def getPressure(self, meter) -> CompositeMeasurement:
return self._request_resource('getPressure', meter)
def getVolume(self, meter) -> CompositeMeasurement:
return self._request_resource('getVolume', meter)
def getMass(self, meter) -> CompositeMeasurement:
return self._request_resource('getMass', meter)
def getHeatEnergy(self, meter) -> CompositeMeasurement:
return self._request_resource('getHeatEnergy', meter)
def getCoolingEnergy(self, meter) -> CompositeMeasurement:
return self._request_resource('getCoolingEnergy', meter)
# Pump functions
def getPumpState(self, pump) -> HeatCirculationPumpState:
return self._request_resource('getPumpState', pump)
def getPumpControlMode(self, pump) -> HeatCirculationPumpMode:
return self._request_resource('getPumpControlMode', pump)
def setPumpControlMode(self, pump, mode: HeatCirculationPumpMode):
return self._request_resource('setPumpControlMode', (pump), 'put', mode.parseToJSON())
def setPumpMaxFlow(self, pump, limit: CompositeMeasurement):
return self._request_resource('setPumpMaxFlow', (pump), 'put', limit.parseToJSON())
def startPump(self, pump):
return self._request_resource('startPump', pump)
def stopPump(self, pump):
return self._request_resource('stopPump', pump)
#TODO: Split into three functions that check against current pump mode.
def setPumpSetpoint(self, pump, setpoint: CompositeMeasurement):
"""
Sets the target for the pump.
NOTE: How to interpret the setpoint depends on the mode which the pump is in.
Here be dragons.
"""
return self._request_resource('setPumpSetpoint', (pump), 'put', setpoint.parseToJSON())
def getPumpHead(self, pump) -> CompositeMeasurement:
return self._request_resource('getPumpHead', pump)
def getPumpFlow(self, pump) -> CompositeMeasurement:
return self._request_resource('getPumpFlow', pump)
def getPumpRPM(self, pump) -> CompositeMeasurement:
return self._request_resource('getPumpRPM', pump)
def getPumpRelPerformance(self, pump) -> CompositeMeasurement:
return self._request_resource('getPumpPerformance', pump)

View File

@ -1,72 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
class MeteoMast(SyslabUnit):
__METMASTS = {
'metmast1': ('syslab-13.syslab.dk', '8080', 'meteo1'),
}
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = 'http://{host}:{port}/typebased_WebService_Meteo/MeteoStationWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__METMASTS,
host=host,
port=port,
unit_name=unitname,
unit_type="MeteoMast")
def getAirPressure(self, instrumentIndex: int or str):
if type(instrumentIndex) is str:
return self._request_resource('getAirPressure2', instrumentIndex)
if type(instrumentIndex) is int:
return self._request_resource('getAirPressure1', instrumentIndex)
def getAirTemperature(self, instrumentIndex: int or str):
if type(instrumentIndex) is str:
return self._request_resource('getAirTemperature2', instrumentIndex)
if type(instrumentIndex) is int:
return self._request_resource('getAirTemperature1', instrumentIndex)
def getHeightAboveGround(self):
return self._request_resource('getHeightAboveGround')
def getInsolation(self, instrumentIndex: int or str):
if type(instrumentIndex) is str:
return self._request_resource('getInsolation2', instrumentIndex)
if type(instrumentIndex) is int:
return self._request_resource('getInsolation1', instrumentIndex)
def getMeteoGPSLocation(self):
return self._request_resource('getMeteoGPSLocation')
def getRelativeHumidity(self, instrumentIndex: int or str):
if type(instrumentIndex) is str:
return self._request_resource('getRelativeHumidity2', instrumentIndex)
if type(instrumentIndex) is int:
return self._request_resource('getRelativeHumidity1', instrumentIndex)
def getWindDirection(self, instrumentIndex: int or str):
if type(instrumentIndex) is str:
return self._request_resource('getWindDirection2', instrumentIndex)
if type(instrumentIndex) is int:
return self._request_resource('getWindDirection1', instrumentIndex)
def getWindSpeed(self, instrumentIndex: int or str):
if type(instrumentIndex) is str:
return self._request_resource('getWindSpeed2', instrumentIndex)
if type(instrumentIndex) is int:
return self._request_resource('getWindSpeed1', instrumentIndex)
def getInstrumentNames(self):
return self._request_resource('getInstrumentNames')
def getSupportedInstrumentTypes(self):
return self._request_resource('getSupportedInstrumentTypes')
def getInstrumentNamesForType(self, instrumentType: str):
return self._request_resource('getInstrumentNamesForType', instrumentType)

View File

@ -1,61 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
class Photovoltaics(SyslabUnit):
"""The Photovoltaics class represents a photovoltaic panel array in SYSLAB.
The Photovoltaics class is instantiated using a string with the unique name of the dumpload, ie. 'which'
A full list of available panel arrays can be found by calling 'Photovoltaics.getAvailablePhotovoltaics()'
Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments.
"""
__PHOTOVOLTAICS = {
'pv319': ('syslab-24.syslab.dk', '8080', 'pv319'),
'pv715': ('syslab-10.syslab.dk', '8080', 'pv715'),
'pv117': ('syslab-07.syslab.dk', '8080', 'pv117'),
'simlab-03': ('192.168.0.103', '8080', 'pv715'),
'simlab-13': ('192.168.0.113', '8080', 'pv319'),
'vpv319': ('simlab-24', '8080', 'pv319'),
'vpv715': ('simlab-10', '8080', 'pv715'),
}
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = 'http://{host}:{port}/typebased_WebService_PV/PVSystemWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__PHOTOVOLTAICS,
host=host,
port=port,
unit_name=unitname,
unit_type="Photovoltaics")
def getACActivePower(self):
return self._request_resource('getACActivePower')
def getACReactivePower(self):
return self._request_resource('getACReactivePower')
def getPacLimit(self):
return self._request_resource('getPacLimit')
def getQSetpoint(self):
return self._request_resource('getQSetpoint')
def getRatedPower(self):
return self._request_resource('getRatedP')
def getName(self):
return self._request_resource('getSystemName')
def setPacLimit(self, setPoint):
return self._request_resource('setPacLimit', setPoint, 'put')
def setQSetpoint(self, Q):
return self._request_resource('setQ', Q, 'put')
@staticmethod
def getAvailablePhotovoltaics():
return list(Photovoltaics.__PHOTOVOLTAICS.keys())

View File

@ -1,144 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
class SwitchBoard(SyslabUnit):
"""
The SwitchBoard class represents a SwitchBoard in SYSLAB.
The SwitchBoard class is instantiated using a string with the unique name of the switchboard, ie. 'which'
A full list of available switchboards can be found by calling 'SwitchBoard.getAvailableSwitchBoards()'
Alternatively, the user may specify a host and port to connect to via the *host* and *port* arguments.
"""
__SWITCH_BOARDS = {
'319-2':('syslab-01.syslab.dk', '8080', '319-2'),
'319-3':('syslab-52.syslab.dk', '8080', '319-3'),
'117-2':('syslab-11.syslab.dk', '8080', '117-2'),
'117-4':('syslab-11.syslab.dk', '8080', '117-4'),
'117-5':('syslab-11.syslab.dk', '8080', '117-5'),
'117-6':('syslab-26.syslab.dk', '8080', '117-6'),
'715-2':('syslab-09.syslab.dk', '8080', '715-2'),
'716-2':('syslab-29.syslab.dk', '8080', '716-2'),
'simlab-00': ('192.168.0.1', '8080', '319-2'),
'simlab-10': ('192.168.0.2', '8080', '319-2'),
'simlab-20': ('192.168.0.3', '8080', '319-2'),
'vswitchboard': ('simlab-01', '8080', '319-2')
}
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = 'http://{host}:{port}/typebased_WebService_Substation/StandardSubstationWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__SWITCH_BOARDS,
host=host,
port=port,
unit_name=unitname,
unit_type="SwitchBoard")
def getName(self):
return self._request_resource('getSwitchboardName')
def getActivePower(self, instrName):
return self._request_resource('getActivePower', instrName)
def getPhaseActivePower(self, instrName):
return self._request_resource('getPhaseActivePower', instrName)
def getReactivePower(self, instrName):
return self._request_resource('getReactivePower', instrName)
def getPhaseReactivePower(self, instrName):
return self._request_resource('getPhaseReactivePower', instrName)
def getApparentPower(self, instrName):
return self._request_resource('getApparentPower', instrName)
def getPhaseApparentPower(self, instrName):
return self._request_resource('getPhaseApparentPower', instrName)
def getPowerFactor(self, instrName):
return self._request_resource('getPowerFactor', instrName)
def getPhasePowerFactor(self, instrName):
return self._request_resource('getPhasePowerFactor', instrName)
def getActiveEnergyExport(self, instrName):
return self._request_resource('getActiveEnergyExport', instrName)
def getActiveEnergyImport(self, instrName):
return self._request_resource('getActiveEnergyImport', instrName)
def getReactiveEnergyExport(self, instrName):
return self._request_resource('getReactiveEnergyExport', instrName)
def getReactiveEnergyImport(self, instrName):
return self._request_resource('getReactiveEnergyImport', instrName)
def isAuthenticated(self):
return self._request_resource('isAuthenticated')
def getBreakerState(self, breakerName):
return self._request_resource('getBreakerState', breakerName)
def getFrequency(self, instrName):
return self._request_resource('getFrequency', instrName)
def getBayNames(self):
return self._request_resource('getBayNames')
def getBreakerName(self, busbarName, bayName):
return self._request_resource('getBreakerName', (busbarName, bayName))
def getInstrumentNames(self, bayName):
return self._request_resource('getInstrumentNamesPerBay', bayName)
def getBusbarNames(self):
return self._request_resource('getBusbarNames')
def getInterphaseVoltage(self, instrName):
return self._request_resource('getInterphaseVoltage', instrName)
def getInterphaseVoltages(self, instrName):
return self._request_resource('getInterphaseVoltages', instrName)
def getPhaseVoltage(self, instrName):
return self._request_resource('getPhaseVoltage', instrName)
def getPhaseVoltages(self, instrName):
return self._request_resource('getPhaseVoltages', instrName)
def getVoltageImbalance(self, instrName):
return self._request_resource('getVoltageImbalance', instrName)
def getPhaseCurrent(self, instrName):
return self._request_resource('getPhaseCurrent', instrName)
def getNeutralCurrent(self, instrName):
return self._request_resource('getNeutralCurrent', instrName)
def getPhaseCurrents(self, instrName):
return self._request_resource('getPhaseCurrents', instrName)
def getCurrentImbalance(self, instrName):
return self._request_resource('getCurrentImbalance', instrName)
def authenticate(self, user, password):
return self._request_resource('authenticate', (user, password), 'put')
def logout(self):
return self._request_resource('logout', (), 'put')
def closeBreaker(self, breakerName):
return self._request_resource('closeBreaker', breakerName, 'put')
def openBreaker(self, breakerName):
return self._request_resource('openBreaker', (breakerName), 'put')
@staticmethod
def getAvailableSwitchBoards():
return list(SwitchBoard.__SWITCH_BOARDS.keys())

View File

@ -1,36 +0,0 @@
from ..core.SyslabUnit import SyslabUnit
class WindTurbine(SyslabUnit):
__TURBINES = {
'gaia1': ('syslab-03.syslab.dk', '8080', 'gaia1'),
'simlab-01': ('192.168.0.101', '8080', 'gaia1'),
'vgaia1': ('simlab-03', '8080', 'gaia1'),
}
def __init__(self, which=None, host=None, port=None, unitname=None):
baseurl = 'http://{host}:{port}/typebased_WebService_WTGS/GaiaWindTurbineWebService/{unit_name}/'
super().__init__(
baseurl=baseurl,
which=which,
units=self.__TURBINES,
host=host,
port=port,
unit_name=unitname,
unit_type="WindTurbine")
def getGeneratorName(self):
return self._request_resource('getGeneratorName')
def getName(self):
return self.getGeneratorName()
def getActivePower(self):
return self._request_resource('getActivePower')
def getReactivePower(self):
return self._request_resource('getReactivePower')
def getWindspeed(self):
return self._request_resource('getWindspeedOutsideNacelle')

View File

@ -1,10 +0,0 @@
from .Battery import Battery
from .DieselGenerator import DieselGenerator
from .Dumpload import Dumpload
from .HeatSwitchBoard import HeatSwitchBoard
from .MeteoMast import MeteoMast
from .Photovoltaics import Photovoltaics
from .SwitchBoard import SwitchBoard
from .WindTurbine import WindTurbine
from .B2BConverter import B2BConverter
from .EVSE import EVSE

View File

@ -1,47 +0,0 @@
class FlexHouse_real():
def __init__(self):
pass
def setActivePower(self, power_reference):
if power_reference < -10:
power_reference = -10
if power_reference > 0:
raise ValueError("Positive power means production")
self.__setValue('flexHousePowerRef_kW',-power_reference)
def getTemperature(self):
return self.__getValue('flexHouseTemperature_C')
def getActivePower(self):
return self.__getValue('flexHousePower_kW')
def __getValue(self, key):
from requests import get, auth
import re
if not type(key) is str: raise TypeError('Key should be a string, found {0}'.format(type(key)))
if key is '': raise TypeError('Key should be an empty string')
url = 'http://whiteboard.syslab.dk/wbget.php'
r = get(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
entries = r.text.split('\n')
result = None
for entry in entries:
entry = entry.rstrip().lstrip()
g = re.match('SYSLAB@(\d+):{0}=(.+);'.format(key), entry)
if g is None:
continue
else:
g = g.groups()
result = g[1]
break
return result
def __setValue(selv, key, value):
from requests import post, auth
url = '{0}{1}?source=SYSLAB&{2}={3}'.format('http://whiteboard.syslab.dk/', 'wbset.php', key, str(value))
post(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))

View File

@ -1,51 +0,0 @@
from ..physical.Dumpload import Dumpload
from ..virtual.MetMast import MetMast
from datetime import datetime
class FlexHouse_sim(Dumpload):
def __init__(self, which='localhost'):
super().__init__(which)
self.internal_temperature = 20 # Celsius
self.time = datetime.now()
self.mt = MetMast()
# The variables below are only relevant for implementing an advanced house model.
#self.__Th = 20 # internal heater state
#self.__Te = 19 # envelope state
#self.__Aw = 14.351 # m^2
#self.__Ce = 4.741 # kWh/C
#self.__Ch = 0.00225 # kWh/Cst
#self.__Ci = 2.555 # kWh/C
#self.__Rea = 3.265 # C/kW
#self.__Rie = 0.817 # C/kW
#self.__Ria = 37.005 # C/kW
#self.__Rih = 140.44 # C/kW
def setPowerConsumption(self, power_reference):
if power_reference > 10:
self.startLoad()
power_reference = 10.0
if power_reference < 0:
raise ValueError("Negative power means production")
if power_reference == 0:
self.stopLoad()
self.setPowerSetPoint(power_reference)
def step_sim(self):
time_now = datetime.now()
time_delta = (time_now - self.time).total_seconds()
T_ambient = self.mt.getTemperature()
irradiance = self.mt.getInsolation()/1000
# Below is a simple house model. A more complex one can be implemented
self.internal_temperature += 0.5*(self.getActivePower().value/3600*time_delta) - 0.00025*((self.internal_temperature-T_ambient)*time_delta) + 0.001*irradiance*time_delta
#self.__Th = 20 # internal heater state
#self.__Te = 19 # envelope state
self.time = time_now
def getTemperature(self):
self.step_sim()
return self.internal_temperature

View File

@ -1,52 +0,0 @@
from ..physical.Battery import Battery
from datetime import datetime
class FlexHouse_sim_batt(Battery):
def __init__(self, which='localhost'):
super().__init__(which)
self.internal_temperature = 20 # Celsius
self.time = datetime.now()
self.loss = 1 # kW
if self.getRemainingFloodTime() == 300:
#print('Battery is not started. I will initiate start sequence, please retry in 5 minutes')
self.startBattery()
raise SystemError('Battery is not started. I will initiate start sequence, please retry in 5 minutes')
if (self.getRemainingFloodTime() > 1) and (self.getRemainingFloodTime() < 300):
#print('Battery is not ready yet')
raise SystemError('Battery is not ready yet, please wait {} seconds'.format(self.getRemainingFloodTime()))
# The variables below are only relevant for implementing and advanced house model.
#self.__Th = 20 # internal heater state
#self.__Te = 19 # envelope state
#self.__Aw = 14.351 # m^2
#self.__Ce = 4.741 # kWh/C
#self.__Ch = 0.00225 # kWh/C
#self.__Ci = 2.555 # kWh/C
#self.__Rea = 3.265 # C/kW
#self.__Rie = 0.817 # C/kW
#self.__Ria = 37.005 # C/kW
#self.__Rih = 140.44 # C/kW
def setPowerConsumption(self, power_reference):
if power_reference > 10:
power_reference = 10
if power_reference < 0:
raise ValueError("Negative power means production")
self.setActivePower(power_reference)
def step_sim(self):
time_now = datetime.now()
time_delta = (time_now - self.time).total_seconds()
# Below is a simple house model. A more complex one can be implemented
self.internal_temperature += 0.5*(self.getActivePower().value/3600*time_delta) - 2*(self.loss/3600*time_delta)
#self.__Th = 20 # internal heater state
#self.__Te = 19 # envelope state
self.time = time_now
def getTemperature(self):
self.step_sim()
return self.internal_temperature

View File

@ -1,40 +0,0 @@
class MetMast():
def __init__(self):
pass
def getTemperature(self):
return self.__getValue('OutsideTemperature')
def getInsolation(self):
return self.__getValue('Insolation')
def getWindDirection(self):
return self.__getValue('WindDirection')
def getWindSpeed(self):
return self.__getValue('WindSpeed')
def __getValue(self, key):
from requests import get, auth
import re
if not type(key) is str: raise TypeError('Key should be a string, found {0}'.format(type(key)))
assert key is not '', 'Key should not be an empty string'
url = 'http://whiteboard.syslab.dk/wbget.php'
r = get(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
entries = r.text.split('\n')
result = None
for entry in entries:
entry = entry.rstrip().lstrip()
g = re.match('SYSLAB@(\d+):{0}=(.+);'.format(key), entry)
if g is None:
continue
else:
g = g.groups()
result = g[1]
break
return result

View File

@ -1,28 +0,0 @@
from ..physical.Dumpload import Dumpload
from datetime import datetime
class WaterBoiler(Dumpload):
def __init__(self, which='localhost'):
super().__init__(which)
self.energy_state = 10 # kWh
self.energy_max = 15
self.time = datetime.now()
self.loss = 1 # kW
def step_sim(self):
time_now = datetime.now()
time_delta = (time_now - self.time).total_seconds()
power = self.getActivePower().value
self.energy_state += (power - self.loss)/3600*time_delta
self.time = time_now
def getSOC(self):
self.step_sim()
if self.energy_state > self.energy_max:
self.energy_state = self.energy_max
elif self.energy_state < 0:
self.energy_state = 0.0
return self.energy_state

View File

@ -1 +0,0 @@
from . import *

View File

@ -1,143 +0,0 @@
from syslab.whiteboard.WhiteBoardEntry import WhiteBoardEntry
class CommModule():
__BASE_URL = 'http://whiteboard.syslab.dk/'
def __init__(self, namespace):
if not type(namespace) is str or len(namespace) == 0: raise TypeError('Namespace should be an non-empty string, found "{0}"'.format(type(namespace)))
self.__namespace = namespace
self.__entries = {}
def appendValue(self, key, value):
values = self.getList(key)
if values == None:
self.publishToWhiteBoardServer(key, '[{0}]'.format(str(value)))
else:
values.append(value)
self.publishList(key, values)
def getList(self, key):
from ast import literal_eval
if not type(key) is str: raise TypeError('Key should be a string, found {0}'.format(type(key)))
entry = self.getFromWhiteBoardServer(key)
values = None
if entry is not None:
values_str = str(entry.value)
values_str = values_str.replace('[','["').replace(']','"]'.replace(',','","'))
values = literal_eval(values_str)
if type(values) is not list:
values = [values, ]
return values
def getAllEntries(self):
return self.__entries
def update(self):
from requests import get, auth
import re
url = CommModule.__BASE_URL + 'wbget.php'
r = get(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
entries = r.text.split('\n')
result = {}
for entry in entries:
entry = entry.rstrip().lstrip()
g = re.match('SYSLAB@(\d+):{0}::(\w+)=(.+);'.format(self.__namespace), entry)
if g is None:
continue
else:
g = g.groups()
self.__entries[g[1]] = WhiteBoardEntry(g[1], g[2], g[0])
def printEntries(self):
if len(self.__entries)==0:
print('No WhiteBoard entries found for namespace: {0}'.format(self.__namespace))
return
for e in self.__entries:
print(e)
def publishList(self, key, values):
from requests import post, auth
if not type(key) is str: raise TypeError('Key should be a string, found {0}'.format(type(key)))
if not type(values) is list: raise TypeError('Values should be represented by a list'.format(type(values)))
values_str = "[{0}]".format(",".join(map(str,values)))
url = '{0}{1}?source=SYSLAB&{2}::{3}={4}'.format(CommModule.__BASE_URL, 'wbset.php', self.__namespace, str(key), values_str)
post(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
def getFromWhiteBoardServer(self, key):
"""
Go in and read/write the value from a key from the whiteboard. The value
should use decimal dot, not comma!
"""
from requests import get, auth
import re
if not type(key) is str: raise TypeError('Key should be a string, found {0}'.format(type(key)))
if key is '': raise TypeError('Key should be an empty string')
url = CommModule.__BASE_URL+'wbget.php'
r = get(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
entries = r.text.split('\n')
result = None
for entry in entries:
entry = entry.rstrip().lstrip()
g = re.match('SYSLAB@(\d+):{0}::{1}=(.+);'.format(self.__namespace, key), entry)
if g is None:
continue
else:
g = g.groups()
result = WhiteBoardEntry(key, g[1], g[0])
break
return result
def publishToWhiteBoardServer(self, key, value):
from requests import post, auth
if not type(key) is str: raise TypeError('Key should be a string, found {0}'.format(type(key)))
if type(value) is list or type(value) is dict or type(value) is tuple: raise TypeError('This function only supports single values, found {0}'.format(type(value)))
url = '{0}{1}?source=SYSLAB&{2}::{3}={4}'.format(CommModule.__BASE_URL, 'wbset.php', self.__namespace, str(key), str(value))
post(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
def getPoolKeys(self):
from requests import get, auth
import re
url = CommModule.__BASE_URL+'wbget.php'
r = get(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
entries = r.text.split('\n')
result = {}
for entry in entries:
entry = entry.rstrip().lstrip()
g = re.match('SYSLAB@(\d+):{0}::(\w+)=(.+)'.format(self.__namespace), entry)
if g is None:
continue
else:
g = g.groups()
result[g[1]] = g[2]
return result
def clearKey(self, key):
from requests import post, auth
if not type(key) is str: raise TypeError('Key should be a string, found {0}'.format(type(key)))
url = '{0}{1}?source=SYSLAB&key={2}::{3}'.format(CommModule.__BASE_URL, 'wbclean.php', self.__namespace, str(key))
post(url, auth=auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
def clearAllKeys(self):
entries = self.getPoolKeys()
for key in entries.keys():
self.clearKey(key)

View File

@ -1,21 +0,0 @@
class WhiteBoardEntry:
def __init__(self, key, value, time):
self.__key = key
self.__value = value
self.__time = int(time)
@property
def key(self):
return self.__key
@property
def value(self):
return self.__value
@property
def time(self):
return self.__time
def __str__(self):
from datetime import datetime
return 'WhiteBoardEntry: {0} -> {1} (@time: {2})'.format(self.key, self.value, datetime.fromtimestamp(self.time))

View File

@ -1,23 +0,0 @@
from bs4 import BeautifulSoup
import requests
def getFromWhiteBoardServer(key):
"""
Go in and read/write the value from a key from the whiteboard. The value
should use decimal dot, not comma!
"""
url = 'http://whiteboard.syslab.dk/wbget.php?mode=html'
r = requests.get(url, auth=requests.auth.HTTPBasicAuth('twinPV99', 'twinPV99'))
data = r.text
print(data)
soup = BeautifulSoup(data)
table = soup.find('table')
key = table.find('td', text=key)
value = key.findNext('td')
return value.text
def publishToWhiteBoardServer(key, value):
url = 'http://whiteboard.syslab.dk/wbset.php?source=SYSLAB&' + key + '=' + str(value)
r = requests.post(url, auth=requests.auth.HTTPBasicAuth('twinPV99', 'twinPV99'))

View File

@ -1,3 +0,0 @@
from .CommModule import CommModule
from .WhiteBoardEntry import WhiteBoardEntry
from .Whiteboard import publishToWhiteBoardServer, getFromWhiteBoardServer

View File

@ -1,5 +0,0 @@
def clamp(a, x, b):
"""
Restrict x to lie in the range [a, b]
"""
return max(a, min(x, b))