Module liquer.commands
This module is responsible for registering commands.
Commands are composed of a command executable and command metadata, which are collected in a command registry. CommandRegistry is a singleton that can be obtained by get_command_registry().
Command metadata (CommandMetadata tuple) contain informations about the command and its arguments (type, parsing and editing of each argument). These metadata are a basis for parsing of arguments as well as command editor creation.
Argument parsers are responsible for parsing command arguments into desired types. ArgumentParser has a parse method, which takes argument metadata and list of arguments (typically supplied as list of strings resulting from liquer.parser.decode). Parsing may extract arbitrary amount of arguments and thus support more complex data structures. Parse method returns a tuple with the parsed argument and remaining unparsed arguments. Argument parsers which do not need multiple instances (typical case) have predefined constants. Multiple argument parsers may be collected in a SequenceArgumentParser by use of + operator (e.g. INT_AP + FLOAT_AP).
Though commands can be registered with a low level method 'register' of CommandRegistry (which allows the greatest flexibility), the "mainstream" way of command registration is by simply decorating a function with @command or @first_command.
Expand source code
"""This module is responsible for registering commands.
Commands are composed of a command executable and command metadata, which are collected in a command registry.
CommandRegistry is a singleton that can be obtained by get_command_registry().
Command metadata (CommandMetadata tuple) contain informations about the command and its arguments
(type, parsing and editing of each argument). These metadata are a basis for parsing of arguments as well as command editor creation.
Argument parsers are responsible for parsing command arguments into desired types.
ArgumentParser has a parse method, which takes argument metadata and list of arguments (typically supplied as list of strings
resulting from liquer.parser.decode). Parsing may extract arbitrary amount of arguments and thus support more complex data structures.
Parse method returns a tuple with the parsed argument and remaining unparsed arguments.
Argument parsers which do not need multiple instances (typical case) have predefined constants.
Multiple argument parsers may be collected in a SequenceArgumentParser by use of + operator (e.g. INT_AP + FLOAT_AP).
Though commands can be registered with a low level method 'register' of CommandRegistry (which allows the greatest flexibility),
the "mainstream" way of command registration is by simply decorating a function with @command or @first_command.
"""
import traceback
from collections import namedtuple
import inspect
from liquer.state import State
from liquer.parser import encode
import marshal
import pickle
import types
import traceback
import base64
from liquer.parser import (
StringActionParameter,
ActionParameter,
QueryException,
ExpandedActionParameter,
)
CommandMetadata = namedtuple(
"CommandMetadata",
[
"name",
"label",
"module",
"doc",
"state_argument",
"arguments",
"attributes",
"version",
],
)
_remote_registration = False
def is_remote_registration_enabled():
"""Returns true if remote registration is enabled.
This is a flag that a web interface can use to enable or disable the remote command registration.
WARNING: Remote command registration allows to deploy arbitrary python code on LiQuer server,
therefore it is a HUGE SECURITY RISK and it only should be used if other security measures are taken
(e.g. on localhost or intranet where only trusted users have access).
This is on by default on Jupyter server extension.
"""
return _remote_registration
def enable_remote_registration():
"""Enable remote registration service
WARNING: Remote command registration allows to deploy arbitrary python code on LiQuer server,
therefore it is a HUGE SECURITY RISK and it only should be used if other security measures are taken
(e.g. on localhost or intranet where only trusted users have access).
This is on by default on Jupyter server extension.
"""
global _remote_registration
_remote_registration = True
def disable_remote_registration():
"Disable remote registration service"
global _remote_registration
_remote_registration = True
class RegisterRemoteMixin:
def register_remote_serialized(self, b):
"""Helper method used to register serialized command in remote registration"""
if is_remote_registration_enabled():
try:
f, metadata, modify = self.decode_registration(b)
self.register_command(f, metadata, modify=modify)
ns = metadata.attributes.get("ns", "root")
return dict(
message=f"Function {f.__name__} in namespace {ns} is registered as command",
status="OK",
)
except:
return dict(
message="Error while registering command",
traceback=traceback.format_exc(),
status="ERROR",
)
else:
return dict(
message="Remote command registration is disabled.", status="ERROR"
)
@classmethod
def encode_registration(cls, f, metadata, modify=False):
code = marshal.dumps(f.__code__)
return b"B" + pickle.dumps(
(
code,
f.__name__,
f.__defaults__,
f.__closure__,
metadata._asdict(),
modify,
)
)
@classmethod
def encode_registration_base64(cls, f, metadata, modify=False):
return b"E" + base64.urlsafe_b64encode(
cls.encode_registration(f, metadata, modify)
)
@classmethod
def decode_registration(cls, b):
assert type(b) == bytes
if b[0] == b"E"[0]:
print("DECODE E:", b[:20])
print()
b = base64.urlsafe_b64decode(b[1:])
print("DECODE B:", b[:20])
print()
print()
assert b[0] == b"B"[0]
b = b[1:]
code, name, defaults, closure, metadata, modify = pickle.loads(b)
bc = marshal.loads(code)
f = types.FunctionType(bc, globals(), name, defaults, closure)
return (f, CommandMetadata(**metadata), modify)
class RemoteCommandRegistry(RegisterRemoteMixin, object):
"""Remote command registry allows to register commands into a remote LiQuer server, e.g. Jupyter server extension"""
def __init__(self, url, use_get_method=False):
self.url = url
self.use_get_method = use_get_method
def register_command(self, f, metadata, modify=False):
import requests
url = self.url
url += "" if url[-1] == "/" else "/"
if self.use_get_method:
b = self.encode_registration_base64(f, metadata, modify)
encoded = b.decode("ascii")
response = requests.get(url=url + encoded)
else:
b = self.encode_registration(f, metadata, modify)
response = requests.post(url=url, data=b)
if response.ok:
response = response.json()
if response["status"] != "OK":
print(response.get("traceback", ""))
raise Exception(
"Remote registration failed: "
+ response.get("message", f"Error registering {f.__name__}")
)
else:
response.raise_for_status()
class CommandRegistry(RegisterRemoteMixin, object):
"""Class responsible for registering all commands and their metadata"""
def __init__(self):
"""Create empty command registry"""
self.executables = {}
self.metadata = {}
self.namespaces = {}
def is_doubleregistered(self, executable, metadata):
"""Returns True if the same function is already registered as a command.
Another function registered under the same name would return False.
"""
name = metadata.name
ns = metadata.attributes.get("ns", "root")
self.executables[ns] = self.executables.get(ns, {})
self.metadata[ns] = self.metadata.get(ns, {})
if name in self.executables[ns]:
registered = self.metadata[ns][name]
return (
name == registered.name
and executable.inner_id() == self.executables[ns][name].inner_id()
)
return False
def register_command(self, f, metadata, modify=False):
parser = argument_parser_from_command_metadata(metadata)
if metadata.state_argument is None:
executable = FirstCommandExecutable(f, metadata, parser)
else:
executable = CommandExecutable(f, metadata, parser)
return self.register(executable, metadata)
def register(self, executable, metadata, modify=False):
"""Create command
executable is an CommandExecutable of the command,
metadata is CommandMetadata
"""
name = metadata.name
ns = metadata.attributes.get("ns", "root")
modify = modify or metadata.attributes.get("modify_command", False)
can_register = modify
self.executables[ns] = self.executables.get(ns, {})
self.metadata[ns] = self.metadata.get(ns, {})
if name in self.executables[ns]:
if self.is_doubleregistered(executable, metadata):
can_register = True
else:
can_register = True
if can_register:
self.executables[ns][name] = executable
self.metadata[ns][name] = metadata
else:
raise Exception(f"Command {name} is already registered")
def as_dict(self):
"""Returns dictionary representation of the registry, safe to serialize as json"""
return {
ns: {name: cmd._asdict() for name, cmd in metadata.items()}
for ns, metadata in self.metadata.items()
}
def evaluate_command_old(self, state, qcommand: list):
if not state.is_volatile():
state = state.clone()
command_name = qcommand[0]
ns, command, metadata = self.resolve_command(state, command_name)
if command is None:
print(f"Unknown command: {command_name}")
return state.with_data(None).log_error(
message=f"Unknown command: {command_name}"
)
else:
try:
state = command(state, *qcommand[1:])
except Exception as e:
traceback.print_exc()
state.log_exception(message=str(e), traceback=traceback.format_exc())
state.exception = e
arguments = getattr(state, "arguments", None)
state.metadata["commands"].append(qcommand)
state.metadata["extended_commands"].append(
dict(
command_name=command_name,
ns=ns,
qcommand=qcommand,
command_metadata=metadata._asdict(),
arguments=arguments,
)
)
state.query = encode(state.metadata["commands"])
state.metadata["attributes"] = {
key: value
for key, value in state.metadata["attributes"].items()
if key[0].isupper()
}
if metadata is not None:
state.metadata["attributes"].update(metadata.attributes)
return state
def resolve_command(self, state, command_name):
for ns in state.vars.get("active_namespaces", ["root"]):
if ns not in self.executables:
print(f"Unknown namespace: {ns}")
continue
if command_name in self.executables[ns]:
break
if command_name in self.executables[ns]:
command = self.executables[ns][command_name]
if command_name in self.metadata[ns]:
return ns, command, self.metadata[ns][command_name]
else:
print(f"Unknown command (metadata): {command_name}")
return ns, command, None
return None, None, None
_command_registry = None
def command_registry():
"""Return global the command registry object"""
global _command_registry
if _command_registry is None:
_command_registry = CommandRegistry()
return _command_registry
def remote_command_registry(url, use_get_method=False):
"""Configure the command registry to use remote LiQuer server
Provide url to the registration service
"""
global _command_registry
_command_registry = RemoteCommandRegistry(url, use_get_method=use_get_method)
return _command_registry
def reset_command_registry():
"""Create empty global command registry"""
global _command_registry
_command_registry = CommandRegistry()
return _command_registry
class ArgumentParserException(QueryException):
def __init__(self, message, position=None, query=None):
super().__init__(message, position, query)
class ArgumentParser(object):
is_argv = False
def __add__(self, ap):
return SequenceArgumentParser(self, ap)
def parse(self, metadata, args):
return args[0], args[1:]
def parse_meta(self, metadata, args, context=None):
query = None if context is None else context.raw_query
if isinstance(args[0], str):
return args[0], (args[0], metadata), args[1:]
elif isinstance(args[0], StringActionParameter):
return args[0].string, (args[0].string, metadata), args[1:]
elif isinstance(args[0], ExpandedActionParameter):
return args[0].value, (args[0].value, metadata), args[1:]
elif isinstance(args[0], ActionParameter):
raise ArgumentParserException(
f"Unsupported action parameter {repr(args[0])} in ArgumentParser",
position=args[0].position,
query=query,
)
else:
return args[0], (args[0], metadata), args[1:]
class SequenceArgumentParser(ArgumentParser):
def __init__(self, *sequence):
self.sequence = list(sequence)
def __add__(self, ap):
return SequenceArgumentParser(*(self.sequence + [ap]))
def __iadd__(self, ap):
self.sequence.append(ap)
return self
def parse(self, metadata, args):
parsed_arguments = []
for ap, meta in zip(self.sequence, metadata):
parsed, args = ap.parse(meta, args)
if ap.is_argv:
parsed_arguments.extend(parsed)
else:
parsed_arguments.append(parsed)
return parsed_arguments, args
def parse_meta(self, metadata, args, context=None):
parsed_arguments = []
parsed_meta = []
for ap, meta in zip(self.sequence, metadata):
parsed, argmeta, args = ap.parse_meta(meta, args, context=context)
if ap.is_argv:
parsed_arguments.extend(parsed)
else:
parsed_arguments.append(parsed)
parsed_meta.append(argmeta)
return parsed_arguments, parsed_meta, args
class ContextArgumentParser(ArgumentParser):
def parse(self, metadata, args):
return None, args
def parse_meta(self, metadata, args, context=None):
return context, (context, metadata), args
class IntArgumentParser(ArgumentParser):
def parse(self, metadata, args):
return int(args[0]), args[1:]
def parse_meta(self, metadata, args, context=None):
query = None if context is None else context.raw_query
if isinstance(args[0], str):
try:
value = int(args[0])
except:
raise ArgumentParserException(
f"""Error parsing integer argument '{metadata["name"]}' from string {repr(args[0])}""",
query=query,
)
elif isinstance(args[0], StringActionParameter):
try:
value = int(args[0].string)
except:
raise ArgumentParserException(
f"""Error parsing integer argument '{metadata["name"]}' from {repr(args[0].string)}""",
position=args[0].position,
query=query,
)
elif isinstance(args[0], ExpandedActionParameter):
try:
value = int(args[0].value)
except:
raise ArgumentParserException(
f"""Error parsing integer argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""",
position=args[0].position,
query=query,
)
elif isinstance(args[0], ActionParameter):
raise ArgumentParserException(
f"Unsupported action parameter type {repr(args[0])} in IntArgumentParser",
position=args[0].position,
query=query,
)
else:
try:
value = int(args[0])
except:
raise ArgumentParserException(
f"""Error parsing integer argument '{metadata["name"]}' from object {repr(args[0])}""",
query=query,
)
return value, (value, metadata), args[1:]
class FloatArgumentParser(ArgumentParser):
def parse(self, metadata, args):
return float(args[0]), args[1:]
def parse_meta(self, metadata, args, context=None):
query = None if context is None else context.raw_query
if isinstance(args[0], str):
try:
value = float(args[0])
except:
raise ArgumentParserException(
f"""Error parsing float argument '{metadata["name"]}' from string {repr(args[0])}""",
query=query,
)
elif isinstance(args[0], StringActionParameter):
try:
value = float(args[0].string)
except:
raise ArgumentParserException(
f"""Error parsing float argument '{metadata["name"]}' from {repr(args[0].string)}""",
position=args[0].position,
query=query,
)
elif isinstance(args[0], ExpandedActionParameter):
try:
value = float(args[0].value)
except:
raise ArgumentParserException(
f"""Error parsing float argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""",
position=args[0].position,
query=query,
)
elif isinstance(args[0], ActionParameter):
raise ArgumentParserException(
f"Unsupported action parameter type {repr(args[0])} in FloatArgumentParser",
position=args[0].position,
query=query,
)
else:
try:
value = float(args[0])
except:
raise ArgumentParserException(
f"""Error parsing float argument '{metadata["name"]}' from object {repr(args[0])}""",
query=query,
)
return value, (value, metadata), args[1:]
class BooleanArgumentParser(ArgumentParser):
def parse(self, metadata, args):
return (
dict(
y=True,
yes=True,
n=False,
no=False,
t=True,
true=True,
f=False,
false=False,
).get(str(args[0]).lower(), False),
args[1:],
)
def parse_meta(self, metadata, args, context=None):
query = None if context is None else context.raw_query
def to_bool(x):
return dict(
y=True,
yes=True,
n=False,
no=False,
t=True,
true=True,
f=False,
false=False,
).get(str(x).lower(), False)
if isinstance(args[0], str):
try:
value = to_bool(args[0])
except:
raise ArgumentParserException(
f"""Error parsing boolean argument '{metadata["name"]}' from string {repr(args[0])}""",
query=query,
)
elif isinstance(args[0], StringActionParameter):
try:
value = to_bool(args[0].string)
except:
raise ArgumentParserException(
f"""Error parsing boolean argument '{metadata["name"]}' from value {repr(args[0].string)}""",
position=args[0].position,
query=query,
)
elif isinstance(args[0], ExpandedActionParameter):
try:
value = to_bool(args[0].value)
except:
raise ArgumentParserException(
f"""Error parsing boolean argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""",
position=args[0].position,
query=query,
)
elif isinstance(args[0], ActionParameter):
raise ArgumentParserException(
f"Unsupported action parameter type {repr(args[0])} in BooleanArgumentParser",
position=args[0].position,
query=query,
)
else:
try:
value = dict(
y=True,
yes=True,
n=False,
no=False,
t=True,
true=True,
f=False,
false=False,
).get(str(args[0]).lower(), False)
except:
raise ArgumentParserException(
f"""Error parsing boolean argument '{metadata["name"]}' from object {repr(args[0])}""",
query=query,
)
return value, (value, metadata), args[1:]
class ListArgumentParser(ArgumentParser):
def parse(self, metadata, args):
return args, []
def parse_meta(self, metadata, args, context=None):
query = None if context is None else context.raw_query
value = []
for x in args:
if isinstance(x, str):
value.append(x)
elif isinstance(x, StringActionParameter):
value.append(x.string)
elif isinstance(x, ExpandedActionParameter):
value.append(x.value)
elif isinstance(x, ActionParameter):
raise ArgumentParserException(
f"Unsupported action parameter type {repr(x)} in ListArgumentParser",
position=x.position,
query=query,
)
else:
raise ArgumentParserException(
f"Unsupported action parameter object {repr(x)} in ListArgumentParser",
query=query,
)
return value, (value, metadata), []
class ArgvArgumentParser(ListArgumentParser):
is_argv = True
GENERIC_AP = ArgumentParser()
CONTEXT_AP = ContextArgumentParser()
INT_AP = IntArgumentParser()
FLOAT_AP = FloatArgumentParser()
BOOLEAN_AP = BooleanArgumentParser()
LIST_AP = ListArgumentParser()
ARGV_AP = ArgvArgumentParser()
def identifier_to_label(identifier):
"""Tries to convert an identifier to a more human readable text label.
Replaces underscores by spaces and may do other tweaks.
"""
txt = identifier.replace("_", " ")
txt = txt.replace(" id", "ID")
txt = dict(url="URL").get(txt, txt)
txt = txt[0].upper() + txt[1:]
return txt
def callable_hash(f):
import hashlib
h = hashlib.md5()
h.update(f.__code__.co_code)
separator = b"__sep3024325ab2a7__" # random string of bytes
for x in f.__code__.co_consts:
try:
h.update(pickle.dumps(x))
except:
continue
h.update(separator)
sig = inspect.signature(f)
for argname in list(sig.parameters):
h.update(argname.encode("utf-8"))
h.update(separator)
p = sig.parameters[argname]
if p.default != inspect.Parameter.empty:
try:
h.update(pickle.dumps(x))
except:
continue
h.update(separator)
return h.hexdigest()
def command_metadata_from_callable(f, has_state_argument=True, attributes=None):
"""Extract command metadata structure from a callable.
Function interprets function name, document string, argument names and annotations into command metadata.
"""
name = f.__name__
doc = f.__doc__
module = f.__module__
annotations = f.__annotations__
if attributes is None:
attributes = dict(ns=None)
if doc is None:
doc = ""
arguments = []
sig = inspect.signature(f)
for argname in list(sig.parameters):
arg = dict(name=argname, label=identifier_to_label(argname))
if argname == "context":
p = sig.parameters[argname]
if p.default != inspect.Parameter.empty:
if p.default is not None:
raise Exception(
f"Default value for context in command {name} should be None"
)
arg["default"] = p.default
if argname in annotations:
raise Exception(
f"Context in command {name} should not have type annotations"
)
arg["multiple"] = False
arg["optional"] = True
arg["type"] = "context"
arg["editor"] = "ignore"
arguments.append(arg)
continue
arg_type = None
if argname in annotations:
arg_annotation = annotations[argname]
if type(arg_annotation) == type:
arg_type = arg_annotation.__name__
p = sig.parameters[argname]
if p.default != inspect.Parameter.empty:
arg["default"] = p.default
arg["optional"] = True
if arg_type is None and p.default is not None:
arg_type = type(p.default).__name__
else:
arg["optional"] = False
arg["multiple"] = p.kind is inspect.Parameter.VAR_POSITIONAL
arg_editor = None
if arg_type is not None:
arg_editor = arg_type
arg["type"] = arg_type
arg["editor"] = arg_editor
arguments.append(arg)
if has_state_argument and len(arguments):
state_argument = arguments[0]
state_argument["pass_state"] = state_argument["name"] == "state"
arguments = arguments[1:]
else:
state_argument = None
return CommandMetadata(
name=name,
label=identifier_to_label(name),
module=module,
doc=doc,
state_argument=state_argument,
arguments=arguments,
attributes=attributes,
version=callable_hash(f),
)
def argument_parser_from_command_metadata(command_metadata):
"""Create argument parser from command metadata"""
ap = SequenceArgumentParser()
for arg in command_metadata.arguments:
if arg.get("multiple", False):
ap += ARGV_AP
break
arg_type = arg.get("type")
ap += dict(
context=CONTEXT_AP,
str=GENERIC_AP,
int=INT_AP,
float=FLOAT_AP,
bool=BOOLEAN_AP,
list=LIST_AP,
).get(arg_type, GENERIC_AP)
return ap
class CommandExecutable(object):
"""Wrapper around a registered command
Adapts arbitrary function to be used as a command.
Function needs to be described by a command metadata structure and accompanied by an argument parser.
This decodes all the arguments, executes the command and (if needed) wraps the result as a State.
"""
def __init__(self, f, metadata, argument_parser):
self.f = f
self.metadata = metadata
self.argument_parser = argument_parser
def inner_id(self):
return id(self.f)
def parse_argv(self, args, kwargs=None, context=None):
if kwargs is None:
kwargs = {}
query = None if context is None else context.raw_query
args = list(args)
try:
position = args[-1].position
except:
position = None
used_kwargs = []
for i, a in list(enumerate(self.metadata.arguments))[len(args) :]:
try:
position = args[i].position
except:
position = None
if a["name"] in kwargs:
used_kwargs.append(a["name"])
args.append(kwargs[a["name"]])
if context is not None:
context.debug(f"Using keyword argument {repr(a['name'])}")
else:
if not a.get("multiple", False) and a["name"] != "context":
if "default" in a:
args.append(a["default"])
else:
raise ArgumentParserException(
f"Expected '{a['name']}' argument for '{self.metadata.name}', no default",
position=position,
query=query,
)
if context is not None:
unused = [x for x in kwargs.keys() if x not in used_kwargs]
if len(unused):
context.warning(f"Unused keyword arguments: {unused}")
try:
argv, argmeta, remainder = self.argument_parser.parse_meta(
self.metadata.arguments, args, context=context
)
except ArgumentParserException as e:
raise ArgumentParserException(
f"{e.original_message} while executing '{self.metadata.name}'",
position=e.position,
query=query,
) from e
if len(remainder) != 0:
position = (
remainder[0].position
if isinstance(remainder[0], ActionParameter)
else None
)
raise ArgumentParserException(
f"Too many arguments for '{self.metadata.name}': {repr(remainder)}",
position=position,
query=query,
)
return argv, argmeta
def __call__(self, state, *args, context=None, **kwargs):
argv, argmeta = self.parse_argv(args, kwargs=kwargs, context=context)
state_arg = state if self.metadata.state_argument["pass_state"] else state.get()
result = self.f(state_arg, *argv)
if isinstance(result, State):
result.arguments = argmeta
return result
else:
state.arguments = argmeta
return state.with_data(result)
class FirstCommandExecutable(CommandExecutable):
"""Wrapper around a registered first command"""
def __call__(self, state, *args, context=None, **kwargs):
argv, argmeta = self.parse_argv(args, kwargs=kwargs, context=context)
result = self.f(*argv)
if isinstance(result, State):
result.arguments = argmeta
return result
else:
state.arguments = argmeta
return state.with_data(result)
def command(*arg, **kwarg):
"""Register a callable as a command.
Callable is expected to take a state data as a first argument.
This function typically can be used as a decorator.
As a decorator it can be used directly (@command) or it can have parameters,
e.g. @command(ns="MyNameSpace")
"""
if len(arg) == 1:
assert callable(arg[0])
f = arg[0]
if "ns" not in kwarg:
kwarg["ns"] = "root"
metadata = command_metadata_from_callable(f, attributes=kwarg)
command_registry().register_command(f, metadata)
return f
else:
assert len(arg) == 0
return lambda f, attributes=kwarg: command(f, **attributes)
def first_command(*arg, **kwarg):
"""Register a callable as a command.
Unlike in command(), callable is expected NOT to take a state data as a first argument.
Thus first_command can be a first command in the query - does not require a state to be applied on.
However, first_command is a perfectly valid command, so it can as well be used inside the query and
then the state passed to the command is ignored (and not passed to f).
This typically can be used as a decorator.
"""
if len(arg) == 1:
assert callable(arg[0])
f = arg[0]
if "ns" not in kwarg:
kwarg["ns"] = "root"
metadata = command_metadata_from_callable(
f, has_state_argument=False, attributes=kwarg
)
command_registry().register_command(f, metadata)
return f
else:
assert len(arg) == 0
return lambda f, attributes=kwarg: first_command(f, **attributes)
Functions
def argument_parser_from_command_metadata(command_metadata)
-
Create argument parser from command metadata
Expand source code
def argument_parser_from_command_metadata(command_metadata): """Create argument parser from command metadata""" ap = SequenceArgumentParser() for arg in command_metadata.arguments: if arg.get("multiple", False): ap += ARGV_AP break arg_type = arg.get("type") ap += dict( context=CONTEXT_AP, str=GENERIC_AP, int=INT_AP, float=FLOAT_AP, bool=BOOLEAN_AP, list=LIST_AP, ).get(arg_type, GENERIC_AP) return ap
def callable_hash(f)
-
Expand source code
def callable_hash(f): import hashlib h = hashlib.md5() h.update(f.__code__.co_code) separator = b"__sep3024325ab2a7__" # random string of bytes for x in f.__code__.co_consts: try: h.update(pickle.dumps(x)) except: continue h.update(separator) sig = inspect.signature(f) for argname in list(sig.parameters): h.update(argname.encode("utf-8")) h.update(separator) p = sig.parameters[argname] if p.default != inspect.Parameter.empty: try: h.update(pickle.dumps(x)) except: continue h.update(separator) return h.hexdigest()
def command(*arg, **kwarg)
-
Register a callable as a command. Callable is expected to take a state data as a first argument.
This function typically can be used as a decorator. As a decorator it can be used directly (@command) or it can have parameters, e.g. @command(ns="MyNameSpace")
Expand source code
def command(*arg, **kwarg): """Register a callable as a command. Callable is expected to take a state data as a first argument. This function typically can be used as a decorator. As a decorator it can be used directly (@command) or it can have parameters, e.g. @command(ns="MyNameSpace") """ if len(arg) == 1: assert callable(arg[0]) f = arg[0] if "ns" not in kwarg: kwarg["ns"] = "root" metadata = command_metadata_from_callable(f, attributes=kwarg) command_registry().register_command(f, metadata) return f else: assert len(arg) == 0 return lambda f, attributes=kwarg: command(f, **attributes)
def command_metadata_from_callable(f, has_state_argument=True, attributes=None)
-
Extract command metadata structure from a callable. Function interprets function name, document string, argument names and annotations into command metadata.
Expand source code
def command_metadata_from_callable(f, has_state_argument=True, attributes=None): """Extract command metadata structure from a callable. Function interprets function name, document string, argument names and annotations into command metadata. """ name = f.__name__ doc = f.__doc__ module = f.__module__ annotations = f.__annotations__ if attributes is None: attributes = dict(ns=None) if doc is None: doc = "" arguments = [] sig = inspect.signature(f) for argname in list(sig.parameters): arg = dict(name=argname, label=identifier_to_label(argname)) if argname == "context": p = sig.parameters[argname] if p.default != inspect.Parameter.empty: if p.default is not None: raise Exception( f"Default value for context in command {name} should be None" ) arg["default"] = p.default if argname in annotations: raise Exception( f"Context in command {name} should not have type annotations" ) arg["multiple"] = False arg["optional"] = True arg["type"] = "context" arg["editor"] = "ignore" arguments.append(arg) continue arg_type = None if argname in annotations: arg_annotation = annotations[argname] if type(arg_annotation) == type: arg_type = arg_annotation.__name__ p = sig.parameters[argname] if p.default != inspect.Parameter.empty: arg["default"] = p.default arg["optional"] = True if arg_type is None and p.default is not None: arg_type = type(p.default).__name__ else: arg["optional"] = False arg["multiple"] = p.kind is inspect.Parameter.VAR_POSITIONAL arg_editor = None if arg_type is not None: arg_editor = arg_type arg["type"] = arg_type arg["editor"] = arg_editor arguments.append(arg) if has_state_argument and len(arguments): state_argument = arguments[0] state_argument["pass_state"] = state_argument["name"] == "state" arguments = arguments[1:] else: state_argument = None return CommandMetadata( name=name, label=identifier_to_label(name), module=module, doc=doc, state_argument=state_argument, arguments=arguments, attributes=attributes, version=callable_hash(f), )
def command_registry()
-
Return global the command registry object
Expand source code
def command_registry(): """Return global the command registry object""" global _command_registry if _command_registry is None: _command_registry = CommandRegistry() return _command_registry
def disable_remote_registration()
-
Disable remote registration service
Expand source code
def disable_remote_registration(): "Disable remote registration service" global _remote_registration _remote_registration = True
def enable_remote_registration()
-
Enable remote registration service
WARNING: Remote command registration allows to deploy arbitrary python code on LiQuer server, therefore it is a HUGE SECURITY RISK and it only should be used if other security measures are taken (e.g. on localhost or intranet where only trusted users have access). This is on by default on Jupyter server extension.
Expand source code
def enable_remote_registration(): """Enable remote registration service WARNING: Remote command registration allows to deploy arbitrary python code on LiQuer server, therefore it is a HUGE SECURITY RISK and it only should be used if other security measures are taken (e.g. on localhost or intranet where only trusted users have access). This is on by default on Jupyter server extension. """ global _remote_registration _remote_registration = True
def first_command(*arg, **kwarg)
-
Register a callable as a command. Unlike in command(), callable is expected NOT to take a state data as a first argument. Thus first_command can be a first command in the query - does not require a state to be applied on. However, first_command is a perfectly valid command, so it can as well be used inside the query and then the state passed to the command is ignored (and not passed to f). This typically can be used as a decorator.
Expand source code
def first_command(*arg, **kwarg): """Register a callable as a command. Unlike in command(), callable is expected NOT to take a state data as a first argument. Thus first_command can be a first command in the query - does not require a state to be applied on. However, first_command is a perfectly valid command, so it can as well be used inside the query and then the state passed to the command is ignored (and not passed to f). This typically can be used as a decorator. """ if len(arg) == 1: assert callable(arg[0]) f = arg[0] if "ns" not in kwarg: kwarg["ns"] = "root" metadata = command_metadata_from_callable( f, has_state_argument=False, attributes=kwarg ) command_registry().register_command(f, metadata) return f else: assert len(arg) == 0 return lambda f, attributes=kwarg: first_command(f, **attributes)
def identifier_to_label(identifier)
-
Tries to convert an identifier to a more human readable text label. Replaces underscores by spaces and may do other tweaks.
Expand source code
def identifier_to_label(identifier): """Tries to convert an identifier to a more human readable text label. Replaces underscores by spaces and may do other tweaks. """ txt = identifier.replace("_", " ") txt = txt.replace(" id", "ID") txt = dict(url="URL").get(txt, txt) txt = txt[0].upper() + txt[1:] return txt
def is_remote_registration_enabled()
-
Returns true if remote registration is enabled. This is a flag that a web interface can use to enable or disable the remote command registration.
WARNING: Remote command registration allows to deploy arbitrary python code on LiQuer server, therefore it is a HUGE SECURITY RISK and it only should be used if other security measures are taken (e.g. on localhost or intranet where only trusted users have access). This is on by default on Jupyter server extension.
Expand source code
def is_remote_registration_enabled(): """Returns true if remote registration is enabled. This is a flag that a web interface can use to enable or disable the remote command registration. WARNING: Remote command registration allows to deploy arbitrary python code on LiQuer server, therefore it is a HUGE SECURITY RISK and it only should be used if other security measures are taken (e.g. on localhost or intranet where only trusted users have access). This is on by default on Jupyter server extension. """ return _remote_registration
def remote_command_registry(url, use_get_method=False)
-
Configure the command registry to use remote LiQuer server Provide url to the registration service
Expand source code
def remote_command_registry(url, use_get_method=False): """Configure the command registry to use remote LiQuer server Provide url to the registration service """ global _command_registry _command_registry = RemoteCommandRegistry(url, use_get_method=use_get_method) return _command_registry
def reset_command_registry()
-
Create empty global command registry
Expand source code
def reset_command_registry(): """Create empty global command registry""" global _command_registry _command_registry = CommandRegistry() return _command_registry
Classes
class ArgumentParser
-
Expand source code
class ArgumentParser(object): is_argv = False def __add__(self, ap): return SequenceArgumentParser(self, ap) def parse(self, metadata, args): return args[0], args[1:] def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query if isinstance(args[0], str): return args[0], (args[0], metadata), args[1:] elif isinstance(args[0], StringActionParameter): return args[0].string, (args[0].string, metadata), args[1:] elif isinstance(args[0], ExpandedActionParameter): return args[0].value, (args[0].value, metadata), args[1:] elif isinstance(args[0], ActionParameter): raise ArgumentParserException( f"Unsupported action parameter {repr(args[0])} in ArgumentParser", position=args[0].position, query=query, ) else: return args[0], (args[0], metadata), args[1:]
Subclasses
- BooleanArgumentParser
- ContextArgumentParser
- FloatArgumentParser
- IntArgumentParser
- ListArgumentParser
- SequenceArgumentParser
Class variables
var is_argv
Methods
def parse(self, metadata, args)
-
Expand source code
def parse(self, metadata, args): return args[0], args[1:]
def parse_meta(self, metadata, args, context=None)
-
Expand source code
def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query if isinstance(args[0], str): return args[0], (args[0], metadata), args[1:] elif isinstance(args[0], StringActionParameter): return args[0].string, (args[0].string, metadata), args[1:] elif isinstance(args[0], ExpandedActionParameter): return args[0].value, (args[0].value, metadata), args[1:] elif isinstance(args[0], ActionParameter): raise ArgumentParserException( f"Unsupported action parameter {repr(args[0])} in ArgumentParser", position=args[0].position, query=query, ) else: return args[0], (args[0], metadata), args[1:]
class ArgumentParserException (message, position=None, query=None)
-
Base class for all exceptions in liquer parser.
Expand source code
class ArgumentParserException(QueryException): def __init__(self, message, position=None, query=None): super().__init__(message, position, query)
Ancestors
- QueryException
- builtins.Exception
- builtins.BaseException
class ArgvArgumentParser
-
Expand source code
class ArgvArgumentParser(ListArgumentParser): is_argv = True
Ancestors
Class variables
var is_argv
class BooleanArgumentParser
-
Expand source code
class BooleanArgumentParser(ArgumentParser): def parse(self, metadata, args): return ( dict( y=True, yes=True, n=False, no=False, t=True, true=True, f=False, false=False, ).get(str(args[0]).lower(), False), args[1:], ) def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query def to_bool(x): return dict( y=True, yes=True, n=False, no=False, t=True, true=True, f=False, false=False, ).get(str(x).lower(), False) if isinstance(args[0], str): try: value = to_bool(args[0]) except: raise ArgumentParserException( f"""Error parsing boolean argument '{metadata["name"]}' from string {repr(args[0])}""", query=query, ) elif isinstance(args[0], StringActionParameter): try: value = to_bool(args[0].string) except: raise ArgumentParserException( f"""Error parsing boolean argument '{metadata["name"]}' from value {repr(args[0].string)}""", position=args[0].position, query=query, ) elif isinstance(args[0], ExpandedActionParameter): try: value = to_bool(args[0].value) except: raise ArgumentParserException( f"""Error parsing boolean argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""", position=args[0].position, query=query, ) elif isinstance(args[0], ActionParameter): raise ArgumentParserException( f"Unsupported action parameter type {repr(args[0])} in BooleanArgumentParser", position=args[0].position, query=query, ) else: try: value = dict( y=True, yes=True, n=False, no=False, t=True, true=True, f=False, false=False, ).get(str(args[0]).lower(), False) except: raise ArgumentParserException( f"""Error parsing boolean argument '{metadata["name"]}' from object {repr(args[0])}""", query=query, ) return value, (value, metadata), args[1:]
Ancestors
Methods
def parse(self, metadata, args)
-
Expand source code
def parse(self, metadata, args): return ( dict( y=True, yes=True, n=False, no=False, t=True, true=True, f=False, false=False, ).get(str(args[0]).lower(), False), args[1:], )
def parse_meta(self, metadata, args, context=None)
-
Expand source code
def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query def to_bool(x): return dict( y=True, yes=True, n=False, no=False, t=True, true=True, f=False, false=False, ).get(str(x).lower(), False) if isinstance(args[0], str): try: value = to_bool(args[0]) except: raise ArgumentParserException( f"""Error parsing boolean argument '{metadata["name"]}' from string {repr(args[0])}""", query=query, ) elif isinstance(args[0], StringActionParameter): try: value = to_bool(args[0].string) except: raise ArgumentParserException( f"""Error parsing boolean argument '{metadata["name"]}' from value {repr(args[0].string)}""", position=args[0].position, query=query, ) elif isinstance(args[0], ExpandedActionParameter): try: value = to_bool(args[0].value) except: raise ArgumentParserException( f"""Error parsing boolean argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""", position=args[0].position, query=query, ) elif isinstance(args[0], ActionParameter): raise ArgumentParserException( f"Unsupported action parameter type {repr(args[0])} in BooleanArgumentParser", position=args[0].position, query=query, ) else: try: value = dict( y=True, yes=True, n=False, no=False, t=True, true=True, f=False, false=False, ).get(str(args[0]).lower(), False) except: raise ArgumentParserException( f"""Error parsing boolean argument '{metadata["name"]}' from object {repr(args[0])}""", query=query, ) return value, (value, metadata), args[1:]
class CommandExecutable (f, metadata, argument_parser)
-
Wrapper around a registered command Adapts arbitrary function to be used as a command. Function needs to be described by a command metadata structure and accompanied by an argument parser. This decodes all the arguments, executes the command and (if needed) wraps the result as a State.
Expand source code
class CommandExecutable(object): """Wrapper around a registered command Adapts arbitrary function to be used as a command. Function needs to be described by a command metadata structure and accompanied by an argument parser. This decodes all the arguments, executes the command and (if needed) wraps the result as a State. """ def __init__(self, f, metadata, argument_parser): self.f = f self.metadata = metadata self.argument_parser = argument_parser def inner_id(self): return id(self.f) def parse_argv(self, args, kwargs=None, context=None): if kwargs is None: kwargs = {} query = None if context is None else context.raw_query args = list(args) try: position = args[-1].position except: position = None used_kwargs = [] for i, a in list(enumerate(self.metadata.arguments))[len(args) :]: try: position = args[i].position except: position = None if a["name"] in kwargs: used_kwargs.append(a["name"]) args.append(kwargs[a["name"]]) if context is not None: context.debug(f"Using keyword argument {repr(a['name'])}") else: if not a.get("multiple", False) and a["name"] != "context": if "default" in a: args.append(a["default"]) else: raise ArgumentParserException( f"Expected '{a['name']}' argument for '{self.metadata.name}', no default", position=position, query=query, ) if context is not None: unused = [x for x in kwargs.keys() if x not in used_kwargs] if len(unused): context.warning(f"Unused keyword arguments: {unused}") try: argv, argmeta, remainder = self.argument_parser.parse_meta( self.metadata.arguments, args, context=context ) except ArgumentParserException as e: raise ArgumentParserException( f"{e.original_message} while executing '{self.metadata.name}'", position=e.position, query=query, ) from e if len(remainder) != 0: position = ( remainder[0].position if isinstance(remainder[0], ActionParameter) else None ) raise ArgumentParserException( f"Too many arguments for '{self.metadata.name}': {repr(remainder)}", position=position, query=query, ) return argv, argmeta def __call__(self, state, *args, context=None, **kwargs): argv, argmeta = self.parse_argv(args, kwargs=kwargs, context=context) state_arg = state if self.metadata.state_argument["pass_state"] else state.get() result = self.f(state_arg, *argv) if isinstance(result, State): result.arguments = argmeta return result else: state.arguments = argmeta return state.with_data(result)
Subclasses
Methods
def inner_id(self)
-
Expand source code
def inner_id(self): return id(self.f)
def parse_argv(self, args, kwargs=None, context=None)
-
Expand source code
def parse_argv(self, args, kwargs=None, context=None): if kwargs is None: kwargs = {} query = None if context is None else context.raw_query args = list(args) try: position = args[-1].position except: position = None used_kwargs = [] for i, a in list(enumerate(self.metadata.arguments))[len(args) :]: try: position = args[i].position except: position = None if a["name"] in kwargs: used_kwargs.append(a["name"]) args.append(kwargs[a["name"]]) if context is not None: context.debug(f"Using keyword argument {repr(a['name'])}") else: if not a.get("multiple", False) and a["name"] != "context": if "default" in a: args.append(a["default"]) else: raise ArgumentParserException( f"Expected '{a['name']}' argument for '{self.metadata.name}', no default", position=position, query=query, ) if context is not None: unused = [x for x in kwargs.keys() if x not in used_kwargs] if len(unused): context.warning(f"Unused keyword arguments: {unused}") try: argv, argmeta, remainder = self.argument_parser.parse_meta( self.metadata.arguments, args, context=context ) except ArgumentParserException as e: raise ArgumentParserException( f"{e.original_message} while executing '{self.metadata.name}'", position=e.position, query=query, ) from e if len(remainder) != 0: position = ( remainder[0].position if isinstance(remainder[0], ActionParameter) else None ) raise ArgumentParserException( f"Too many arguments for '{self.metadata.name}': {repr(remainder)}", position=position, query=query, ) return argv, argmeta
class CommandMetadata (name, label, module, doc, state_argument, arguments, attributes, version)
-
CommandMetadata(name, label, module, doc, state_argument, arguments, attributes, version)
Ancestors
- builtins.tuple
Instance variables
var arguments
-
Alias for field number 5
var attributes
-
Alias for field number 6
var doc
-
Alias for field number 3
var label
-
Alias for field number 1
var module
-
Alias for field number 2
var name
-
Alias for field number 0
var state_argument
-
Alias for field number 4
var version
-
Alias for field number 7
class CommandRegistry
-
Class responsible for registering all commands and their metadata
Create empty command registry
Expand source code
class CommandRegistry(RegisterRemoteMixin, object): """Class responsible for registering all commands and their metadata""" def __init__(self): """Create empty command registry""" self.executables = {} self.metadata = {} self.namespaces = {} def is_doubleregistered(self, executable, metadata): """Returns True if the same function is already registered as a command. Another function registered under the same name would return False. """ name = metadata.name ns = metadata.attributes.get("ns", "root") self.executables[ns] = self.executables.get(ns, {}) self.metadata[ns] = self.metadata.get(ns, {}) if name in self.executables[ns]: registered = self.metadata[ns][name] return ( name == registered.name and executable.inner_id() == self.executables[ns][name].inner_id() ) return False def register_command(self, f, metadata, modify=False): parser = argument_parser_from_command_metadata(metadata) if metadata.state_argument is None: executable = FirstCommandExecutable(f, metadata, parser) else: executable = CommandExecutable(f, metadata, parser) return self.register(executable, metadata) def register(self, executable, metadata, modify=False): """Create command executable is an CommandExecutable of the command, metadata is CommandMetadata """ name = metadata.name ns = metadata.attributes.get("ns", "root") modify = modify or metadata.attributes.get("modify_command", False) can_register = modify self.executables[ns] = self.executables.get(ns, {}) self.metadata[ns] = self.metadata.get(ns, {}) if name in self.executables[ns]: if self.is_doubleregistered(executable, metadata): can_register = True else: can_register = True if can_register: self.executables[ns][name] = executable self.metadata[ns][name] = metadata else: raise Exception(f"Command {name} is already registered") def as_dict(self): """Returns dictionary representation of the registry, safe to serialize as json""" return { ns: {name: cmd._asdict() for name, cmd in metadata.items()} for ns, metadata in self.metadata.items() } def evaluate_command_old(self, state, qcommand: list): if not state.is_volatile(): state = state.clone() command_name = qcommand[0] ns, command, metadata = self.resolve_command(state, command_name) if command is None: print(f"Unknown command: {command_name}") return state.with_data(None).log_error( message=f"Unknown command: {command_name}" ) else: try: state = command(state, *qcommand[1:]) except Exception as e: traceback.print_exc() state.log_exception(message=str(e), traceback=traceback.format_exc()) state.exception = e arguments = getattr(state, "arguments", None) state.metadata["commands"].append(qcommand) state.metadata["extended_commands"].append( dict( command_name=command_name, ns=ns, qcommand=qcommand, command_metadata=metadata._asdict(), arguments=arguments, ) ) state.query = encode(state.metadata["commands"]) state.metadata["attributes"] = { key: value for key, value in state.metadata["attributes"].items() if key[0].isupper() } if metadata is not None: state.metadata["attributes"].update(metadata.attributes) return state def resolve_command(self, state, command_name): for ns in state.vars.get("active_namespaces", ["root"]): if ns not in self.executables: print(f"Unknown namespace: {ns}") continue if command_name in self.executables[ns]: break if command_name in self.executables[ns]: command = self.executables[ns][command_name] if command_name in self.metadata[ns]: return ns, command, self.metadata[ns][command_name] else: print(f"Unknown command (metadata): {command_name}") return ns, command, None return None, None, None
Ancestors
Methods
def as_dict(self)
-
Returns dictionary representation of the registry, safe to serialize as json
Expand source code
def as_dict(self): """Returns dictionary representation of the registry, safe to serialize as json""" return { ns: {name: cmd._asdict() for name, cmd in metadata.items()} for ns, metadata in self.metadata.items() }
def evaluate_command_old(self, state, qcommand: list)
-
Expand source code
def evaluate_command_old(self, state, qcommand: list): if not state.is_volatile(): state = state.clone() command_name = qcommand[0] ns, command, metadata = self.resolve_command(state, command_name) if command is None: print(f"Unknown command: {command_name}") return state.with_data(None).log_error( message=f"Unknown command: {command_name}" ) else: try: state = command(state, *qcommand[1:]) except Exception as e: traceback.print_exc() state.log_exception(message=str(e), traceback=traceback.format_exc()) state.exception = e arguments = getattr(state, "arguments", None) state.metadata["commands"].append(qcommand) state.metadata["extended_commands"].append( dict( command_name=command_name, ns=ns, qcommand=qcommand, command_metadata=metadata._asdict(), arguments=arguments, ) ) state.query = encode(state.metadata["commands"]) state.metadata["attributes"] = { key: value for key, value in state.metadata["attributes"].items() if key[0].isupper() } if metadata is not None: state.metadata["attributes"].update(metadata.attributes) return state
def is_doubleregistered(self, executable, metadata)
-
Returns True if the same function is already registered as a command. Another function registered under the same name would return False.
Expand source code
def is_doubleregistered(self, executable, metadata): """Returns True if the same function is already registered as a command. Another function registered under the same name would return False. """ name = metadata.name ns = metadata.attributes.get("ns", "root") self.executables[ns] = self.executables.get(ns, {}) self.metadata[ns] = self.metadata.get(ns, {}) if name in self.executables[ns]: registered = self.metadata[ns][name] return ( name == registered.name and executable.inner_id() == self.executables[ns][name].inner_id() ) return False
def register(self, executable, metadata, modify=False)
-
Create command executable is an CommandExecutable of the command, metadata is CommandMetadata
Expand source code
def register(self, executable, metadata, modify=False): """Create command executable is an CommandExecutable of the command, metadata is CommandMetadata """ name = metadata.name ns = metadata.attributes.get("ns", "root") modify = modify or metadata.attributes.get("modify_command", False) can_register = modify self.executables[ns] = self.executables.get(ns, {}) self.metadata[ns] = self.metadata.get(ns, {}) if name in self.executables[ns]: if self.is_doubleregistered(executable, metadata): can_register = True else: can_register = True if can_register: self.executables[ns][name] = executable self.metadata[ns][name] = metadata else: raise Exception(f"Command {name} is already registered")
def register_command(self, f, metadata, modify=False)
-
Expand source code
def register_command(self, f, metadata, modify=False): parser = argument_parser_from_command_metadata(metadata) if metadata.state_argument is None: executable = FirstCommandExecutable(f, metadata, parser) else: executable = CommandExecutable(f, metadata, parser) return self.register(executable, metadata)
def resolve_command(self, state, command_name)
-
Expand source code
def resolve_command(self, state, command_name): for ns in state.vars.get("active_namespaces", ["root"]): if ns not in self.executables: print(f"Unknown namespace: {ns}") continue if command_name in self.executables[ns]: break if command_name in self.executables[ns]: command = self.executables[ns][command_name] if command_name in self.metadata[ns]: return ns, command, self.metadata[ns][command_name] else: print(f"Unknown command (metadata): {command_name}") return ns, command, None return None, None, None
Inherited members
class ContextArgumentParser
-
Expand source code
class ContextArgumentParser(ArgumentParser): def parse(self, metadata, args): return None, args def parse_meta(self, metadata, args, context=None): return context, (context, metadata), args
Ancestors
Methods
def parse(self, metadata, args)
-
Expand source code
def parse(self, metadata, args): return None, args
def parse_meta(self, metadata, args, context=None)
-
Expand source code
def parse_meta(self, metadata, args, context=None): return context, (context, metadata), args
class FirstCommandExecutable (f, metadata, argument_parser)
-
Wrapper around a registered first command
Expand source code
class FirstCommandExecutable(CommandExecutable): """Wrapper around a registered first command""" def __call__(self, state, *args, context=None, **kwargs): argv, argmeta = self.parse_argv(args, kwargs=kwargs, context=context) result = self.f(*argv) if isinstance(result, State): result.arguments = argmeta return result else: state.arguments = argmeta return state.with_data(result)
Ancestors
class FloatArgumentParser
-
Expand source code
class FloatArgumentParser(ArgumentParser): def parse(self, metadata, args): return float(args[0]), args[1:] def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query if isinstance(args[0], str): try: value = float(args[0]) except: raise ArgumentParserException( f"""Error parsing float argument '{metadata["name"]}' from string {repr(args[0])}""", query=query, ) elif isinstance(args[0], StringActionParameter): try: value = float(args[0].string) except: raise ArgumentParserException( f"""Error parsing float argument '{metadata["name"]}' from {repr(args[0].string)}""", position=args[0].position, query=query, ) elif isinstance(args[0], ExpandedActionParameter): try: value = float(args[0].value) except: raise ArgumentParserException( f"""Error parsing float argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""", position=args[0].position, query=query, ) elif isinstance(args[0], ActionParameter): raise ArgumentParserException( f"Unsupported action parameter type {repr(args[0])} in FloatArgumentParser", position=args[0].position, query=query, ) else: try: value = float(args[0]) except: raise ArgumentParserException( f"""Error parsing float argument '{metadata["name"]}' from object {repr(args[0])}""", query=query, ) return value, (value, metadata), args[1:]
Ancestors
Methods
def parse(self, metadata, args)
-
Expand source code
def parse(self, metadata, args): return float(args[0]), args[1:]
def parse_meta(self, metadata, args, context=None)
-
Expand source code
def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query if isinstance(args[0], str): try: value = float(args[0]) except: raise ArgumentParserException( f"""Error parsing float argument '{metadata["name"]}' from string {repr(args[0])}""", query=query, ) elif isinstance(args[0], StringActionParameter): try: value = float(args[0].string) except: raise ArgumentParserException( f"""Error parsing float argument '{metadata["name"]}' from {repr(args[0].string)}""", position=args[0].position, query=query, ) elif isinstance(args[0], ExpandedActionParameter): try: value = float(args[0].value) except: raise ArgumentParserException( f"""Error parsing float argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""", position=args[0].position, query=query, ) elif isinstance(args[0], ActionParameter): raise ArgumentParserException( f"Unsupported action parameter type {repr(args[0])} in FloatArgumentParser", position=args[0].position, query=query, ) else: try: value = float(args[0]) except: raise ArgumentParserException( f"""Error parsing float argument '{metadata["name"]}' from object {repr(args[0])}""", query=query, ) return value, (value, metadata), args[1:]
class IntArgumentParser
-
Expand source code
class IntArgumentParser(ArgumentParser): def parse(self, metadata, args): return int(args[0]), args[1:] def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query if isinstance(args[0], str): try: value = int(args[0]) except: raise ArgumentParserException( f"""Error parsing integer argument '{metadata["name"]}' from string {repr(args[0])}""", query=query, ) elif isinstance(args[0], StringActionParameter): try: value = int(args[0].string) except: raise ArgumentParserException( f"""Error parsing integer argument '{metadata["name"]}' from {repr(args[0].string)}""", position=args[0].position, query=query, ) elif isinstance(args[0], ExpandedActionParameter): try: value = int(args[0].value) except: raise ArgumentParserException( f"""Error parsing integer argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""", position=args[0].position, query=query, ) elif isinstance(args[0], ActionParameter): raise ArgumentParserException( f"Unsupported action parameter type {repr(args[0])} in IntArgumentParser", position=args[0].position, query=query, ) else: try: value = int(args[0]) except: raise ArgumentParserException( f"""Error parsing integer argument '{metadata["name"]}' from object {repr(args[0])}""", query=query, ) return value, (value, metadata), args[1:]
Ancestors
Methods
def parse(self, metadata, args)
-
Expand source code
def parse(self, metadata, args): return int(args[0]), args[1:]
def parse_meta(self, metadata, args, context=None)
-
Expand source code
def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query if isinstance(args[0], str): try: value = int(args[0]) except: raise ArgumentParserException( f"""Error parsing integer argument '{metadata["name"]}' from string {repr(args[0])}""", query=query, ) elif isinstance(args[0], StringActionParameter): try: value = int(args[0].string) except: raise ArgumentParserException( f"""Error parsing integer argument '{metadata["name"]}' from {repr(args[0].string)}""", position=args[0].position, query=query, ) elif isinstance(args[0], ExpandedActionParameter): try: value = int(args[0].value) except: raise ArgumentParserException( f"""Error parsing integer argument '{metadata["name"]}' from link {repr(args[0].link.encode())}""", position=args[0].position, query=query, ) elif isinstance(args[0], ActionParameter): raise ArgumentParserException( f"Unsupported action parameter type {repr(args[0])} in IntArgumentParser", position=args[0].position, query=query, ) else: try: value = int(args[0]) except: raise ArgumentParserException( f"""Error parsing integer argument '{metadata["name"]}' from object {repr(args[0])}""", query=query, ) return value, (value, metadata), args[1:]
class ListArgumentParser
-
Expand source code
class ListArgumentParser(ArgumentParser): def parse(self, metadata, args): return args, [] def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query value = [] for x in args: if isinstance(x, str): value.append(x) elif isinstance(x, StringActionParameter): value.append(x.string) elif isinstance(x, ExpandedActionParameter): value.append(x.value) elif isinstance(x, ActionParameter): raise ArgumentParserException( f"Unsupported action parameter type {repr(x)} in ListArgumentParser", position=x.position, query=query, ) else: raise ArgumentParserException( f"Unsupported action parameter object {repr(x)} in ListArgumentParser", query=query, ) return value, (value, metadata), []
Ancestors
Subclasses
Methods
def parse(self, metadata, args)
-
Expand source code
def parse(self, metadata, args): return args, []
def parse_meta(self, metadata, args, context=None)
-
Expand source code
def parse_meta(self, metadata, args, context=None): query = None if context is None else context.raw_query value = [] for x in args: if isinstance(x, str): value.append(x) elif isinstance(x, StringActionParameter): value.append(x.string) elif isinstance(x, ExpandedActionParameter): value.append(x.value) elif isinstance(x, ActionParameter): raise ArgumentParserException( f"Unsupported action parameter type {repr(x)} in ListArgumentParser", position=x.position, query=query, ) else: raise ArgumentParserException( f"Unsupported action parameter object {repr(x)} in ListArgumentParser", query=query, ) return value, (value, metadata), []
class RegisterRemoteMixin
-
Expand source code
class RegisterRemoteMixin: def register_remote_serialized(self, b): """Helper method used to register serialized command in remote registration""" if is_remote_registration_enabled(): try: f, metadata, modify = self.decode_registration(b) self.register_command(f, metadata, modify=modify) ns = metadata.attributes.get("ns", "root") return dict( message=f"Function {f.__name__} in namespace {ns} is registered as command", status="OK", ) except: return dict( message="Error while registering command", traceback=traceback.format_exc(), status="ERROR", ) else: return dict( message="Remote command registration is disabled.", status="ERROR" ) @classmethod def encode_registration(cls, f, metadata, modify=False): code = marshal.dumps(f.__code__) return b"B" + pickle.dumps( ( code, f.__name__, f.__defaults__, f.__closure__, metadata._asdict(), modify, ) ) @classmethod def encode_registration_base64(cls, f, metadata, modify=False): return b"E" + base64.urlsafe_b64encode( cls.encode_registration(f, metadata, modify) ) @classmethod def decode_registration(cls, b): assert type(b) == bytes if b[0] == b"E"[0]: print("DECODE E:", b[:20]) print() b = base64.urlsafe_b64decode(b[1:]) print("DECODE B:", b[:20]) print() print() assert b[0] == b"B"[0] b = b[1:] code, name, defaults, closure, metadata, modify = pickle.loads(b) bc = marshal.loads(code) f = types.FunctionType(bc, globals(), name, defaults, closure) return (f, CommandMetadata(**metadata), modify)
Subclasses
Static methods
def decode_registration(b)
-
Expand source code
@classmethod def decode_registration(cls, b): assert type(b) == bytes if b[0] == b"E"[0]: print("DECODE E:", b[:20]) print() b = base64.urlsafe_b64decode(b[1:]) print("DECODE B:", b[:20]) print() print() assert b[0] == b"B"[0] b = b[1:] code, name, defaults, closure, metadata, modify = pickle.loads(b) bc = marshal.loads(code) f = types.FunctionType(bc, globals(), name, defaults, closure) return (f, CommandMetadata(**metadata), modify)
def encode_registration(f, metadata, modify=False)
-
Expand source code
@classmethod def encode_registration(cls, f, metadata, modify=False): code = marshal.dumps(f.__code__) return b"B" + pickle.dumps( ( code, f.__name__, f.__defaults__, f.__closure__, metadata._asdict(), modify, ) )
def encode_registration_base64(f, metadata, modify=False)
-
Expand source code
@classmethod def encode_registration_base64(cls, f, metadata, modify=False): return b"E" + base64.urlsafe_b64encode( cls.encode_registration(f, metadata, modify) )
Methods
def register_remote_serialized(self, b)
-
Helper method used to register serialized command in remote registration
Expand source code
def register_remote_serialized(self, b): """Helper method used to register serialized command in remote registration""" if is_remote_registration_enabled(): try: f, metadata, modify = self.decode_registration(b) self.register_command(f, metadata, modify=modify) ns = metadata.attributes.get("ns", "root") return dict( message=f"Function {f.__name__} in namespace {ns} is registered as command", status="OK", ) except: return dict( message="Error while registering command", traceback=traceback.format_exc(), status="ERROR", ) else: return dict( message="Remote command registration is disabled.", status="ERROR" )
class RemoteCommandRegistry (url, use_get_method=False)
-
Remote command registry allows to register commands into a remote LiQuer server, e.g. Jupyter server extension
Expand source code
class RemoteCommandRegistry(RegisterRemoteMixin, object): """Remote command registry allows to register commands into a remote LiQuer server, e.g. Jupyter server extension""" def __init__(self, url, use_get_method=False): self.url = url self.use_get_method = use_get_method def register_command(self, f, metadata, modify=False): import requests url = self.url url += "" if url[-1] == "/" else "/" if self.use_get_method: b = self.encode_registration_base64(f, metadata, modify) encoded = b.decode("ascii") response = requests.get(url=url + encoded) else: b = self.encode_registration(f, metadata, modify) response = requests.post(url=url, data=b) if response.ok: response = response.json() if response["status"] != "OK": print(response.get("traceback", "")) raise Exception( "Remote registration failed: " + response.get("message", f"Error registering {f.__name__}") ) else: response.raise_for_status()
Ancestors
Methods
def register_command(self, f, metadata, modify=False)
-
Expand source code
def register_command(self, f, metadata, modify=False): import requests url = self.url url += "" if url[-1] == "/" else "/" if self.use_get_method: b = self.encode_registration_base64(f, metadata, modify) encoded = b.decode("ascii") response = requests.get(url=url + encoded) else: b = self.encode_registration(f, metadata, modify) response = requests.post(url=url, data=b) if response.ok: response = response.json() if response["status"] != "OK": print(response.get("traceback", "")) raise Exception( "Remote registration failed: " + response.get("message", f"Error registering {f.__name__}") ) else: response.raise_for_status()
Inherited members
class SequenceArgumentParser (*sequence)
-
Expand source code
class SequenceArgumentParser(ArgumentParser): def __init__(self, *sequence): self.sequence = list(sequence) def __add__(self, ap): return SequenceArgumentParser(*(self.sequence + [ap])) def __iadd__(self, ap): self.sequence.append(ap) return self def parse(self, metadata, args): parsed_arguments = [] for ap, meta in zip(self.sequence, metadata): parsed, args = ap.parse(meta, args) if ap.is_argv: parsed_arguments.extend(parsed) else: parsed_arguments.append(parsed) return parsed_arguments, args def parse_meta(self, metadata, args, context=None): parsed_arguments = [] parsed_meta = [] for ap, meta in zip(self.sequence, metadata): parsed, argmeta, args = ap.parse_meta(meta, args, context=context) if ap.is_argv: parsed_arguments.extend(parsed) else: parsed_arguments.append(parsed) parsed_meta.append(argmeta) return parsed_arguments, parsed_meta, args
Ancestors
Methods
def parse(self, metadata, args)
-
Expand source code
def parse(self, metadata, args): parsed_arguments = [] for ap, meta in zip(self.sequence, metadata): parsed, args = ap.parse(meta, args) if ap.is_argv: parsed_arguments.extend(parsed) else: parsed_arguments.append(parsed) return parsed_arguments, args
def parse_meta(self, metadata, args, context=None)
-
Expand source code
def parse_meta(self, metadata, args, context=None): parsed_arguments = [] parsed_meta = [] for ap, meta in zip(self.sequence, metadata): parsed, argmeta, args = ap.parse_meta(meta, args, context=context) if ap.is_argv: parsed_arguments.extend(parsed) else: parsed_arguments.append(parsed) parsed_meta.append(argmeta) return parsed_arguments, parsed_meta, args