Commit cf473562 authored by Valentin Reis's avatar Valentin Reis
Browse files

Merge branch 'doc-formats' into 'master'

[fix] fixes mypy annotations and removes dead code from pynrm

See merge request !68
parents ec10788d c7efb222
Pipeline #11787 passed with stages
in 3 minutes and 48 seconds
......@@ -35,19 +35,6 @@ class Action(NamedTuple):
lib = nrm.sharedlib.UnsafeLib(os.environ["PYNRMSO"])
def _doesntThrow(f, *args, **kwargs):
f(*args, **kwargs)
return True
return False
def _exitCodeBool(*args, **kwargs):
return _doesntThrow(subprocess.check_call, *args, **kwargs)
class CPD:
def __init__(self, cpd: str):
self.cpd = cpd
......@@ -149,24 +136,21 @@ class NRMD:
def actuate(self, actionList: List[Action]) -> None:
""" Upstream request: Run an available action """
actionList = [(str(a.actuatorID), float(a.actuatorValue)) for a in actionList]
if lib.action(self.commonOpts, actionList):
if not lib.action(self.commonOpts, actionList):
raise (Exception("couldn't actuate"))
def get_cpd(self) -> str:
def get_cpd(self) -> CPD:
""" Upstream request: Obtain the current Control Problem Description """
return CPD(lib.cpd(self.commonOpts))
def get_state(self) -> str:
def get_state(self) -> NRMState:
""" Upstream request: Obtain the current daemon state """
return NRMState(lib.state(self.commonOpts))
def all_finished(self) -> None:
def all_finished(self) -> bool:
""" Upstream request: Checks NRM to see whether all tasks are finished. """
return lib.finished(self.commonOpts)
def upstream_recv(self) -> {}:
def upstream_recv(self) -> dict:
""" Upstream listen: Receive a message from NRM's upstream API. """
return json.loads(self.upstreampub.recv())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment