import copy
import logging
from collections import OrderedDict
import click
from frkl.frkl import dict_merge
from frutils.defaults import *
from .finders import LucifierFinder
from .lucifiers import Lucifier
from .defaults import *
from frutils.frutils_cli import clean_user_input, convert_args_to_dict, parse_args_dict
from frutils import ordered_load
log = logging.getLogger("lucify")
[docs]class LucifierCommand(click.MultiCommand):
"""Class to assemble and hold a :class:`~luci.lucifiers.Lucifier` object, and create a dictlet-cli-command according to user input.
Internally, this uses stevedore to load the lucifier extension that matches the selected command name.
Args:
name: the name (alias) of the selected command
lucifier_defaults (dict): the default metadata dict with which to initialize the Lucifier
"""
def __init__(self, name, lucifier_defaults, dictlet=None, **kwargs):
self.lucifier_defaults = lucifier_defaults
self.dictlet = dictlet
# creating lucifier object
self.lucifier_finder = LucifierFinder()
log.debug("loading lucifier: {}".format(name))
lucifier_details = self.lucifier_finder.get_dictlet_details(name)
self.lucifier = lucifier_details["class"](
name=lucifier_details.get("name", None),
init_metadata=self.lucifier_defaults,
)
# those are the vars that are added to every 'child' dictlet (if not overwritten by the dictlet)
self.default_dictlet_vars = self.lucifier.get_key_path_value(
LUCIFIER_CLI_DEFAULT_DICTLET_VARS.format(KEY_LUCI_NAME)
)
if not self.default_dictlet_vars:
self.default_dictlet_vars = OrderedDict()
# this part is about creating the cli options for the lucifier dynamically
self.lucifier_param_metadata = self.lucifier.get_metadata().get(
KEY_LUCI_NAME, {}
)
help_string = self.lucifier_param_metadata.get(KEY_DOC_GROUP, {}).get(
KEY_HELP_NAME, None
)
if not help_string:
help_string = self.lucifier.__doc__
if help_string:
kwargs["help"] = help_string
short_help = self.lucifier_param_metadata.get(KEY_DOC_GROUP, {}).get(
KEY_SHORT_HELP_NAME, None
)
if short_help:
kwargs["short_help"] = short_help
epilog = self.lucifier_param_metadata.get(KEY_DOC_GROUP, {}).get(
KEY_EPILOG_NAME, None
)
if epilog:
kwargs["epilog"] = epilog
self.lucifier_command_vars = self.lucifier_param_metadata.get(
KEY_VARS_GROUP, {}
)
if not isinstance(self.lucifier_command_vars, dict):
self.lucifier_command_vars = convert_args_to_dict(
self.lucifier_command_vars
)
kwargs["params"] = parse_args_dict(self.lucifier_command_vars)
# initializing the parent click.MultiCommand, using the options we just created
super(LucifierCommand, self).__init__(**kwargs)
self.lucifier_finder = self.lucifier.get_default_dictlet_finder()
# prepare command_cache. this is used to minimize the amount of times dictlets have to be created,
# depending on whether ``luci`` is called with the ``--help`` option or not.
# this should also improves performance when tab-completion is used
self.command_cache = None
[docs] def get_command_cache(self, ctx):
"""Creates the command cache that contains all auto-detectable dictlets.
Non-auto-detectable dictlets can still be loaded if the user specifies their exact path/name.
Args:
ctx (click.Context): the context
"""
if self.command_cache is None:
self.command_cache = OrderedDict()
command_names = self.lucifier_finder.get_all_dictlets().keys()
for n in command_names:
command = self.get_command_direct(ctx, n)
if command:
self.command_cache[n] = command
return self.command_cache
[docs] def list_commands(self, ctx):
"""Lists all auto-detectable dictlets.
Args:
ctx (click.Context): the context
Returns:
list: a listof dictlet-names/paths
"""
if self.dictlet:
return [self.dictlet]
else:
return self.get_command_cache(ctx).keys()
[docs] def get_command(self, ctx, name):
"""Returns a command function that is enriched with the metadata of the specified dictlet.
If the command_cache for this object was created, it is used to retrieve the result, otherwise the
:meth:`get_command_direct` method is used.
Args:
ctx (click.Context): the context
name (str): the name/path of/to the dictlet
Returns:
function: the function callback for click to use to create a (sub-)command-line interface
"""
if self.dictlet:
result = self.get_command_direct(ctx, name, no_cache=True)
return result
if self.command_cache is None:
return self.get_command_direct(ctx, name, no_cache=True)
return self.get_command_cache(ctx)[name]
[docs] def get_command_direct(self, ctx, name, no_cache=False):
"""
Args:
ctx (click.Context): the context
name (str): the name/path of/to the dictlet
no_cache (bool): whether to involve cache or not (depending on the use-case, one might be faster than the other)
Returns:
function: the function callback for click to use to create a (sub-)command-line interface
"""
try:
# parses and adds user input to the current lucifier
lucifier_params = ctx.params
lucifier_user_input = clean_user_input(
lucifier_params, self.lucifier_command_vars
)
self.lucifier.add_metadata(lucifier_user_input)
# circumvent cache or not
# the former is better if the dictlet-path is specified directly by the user, otherwise
# all auto-discoverable dictlets would have to be loaded
if no_cache:
abspath = os.path.abspath(name)
dictlet_details = self.lucifier_finder.find_available_dictlets(
paths=[abspath]
).get(abspath)
else:
dictlet_details = self.lucifier_finder.get_dictlet_details(name)
if not dictlet_details:
log.debug("can't load dictlet: {}".format(name))
raise click.ClickException("Can't load dictlet: {}".format(name))
# add the dictlet to the lucifier, to extract it's metadata
d_type = dictlet_details.get("type", None)
if d_type is None:
log.debug(
"type for dictlet '{}' is not specified, ignoring.".format(name)
)
return None
try:
dictlet_metadata = self.lucifier.overlay_dictlet(
name, dictlet_details, add_dictlet=False
)
except (Exception) as e:
log.debug("Could not parse dictlet '{}': {}".format(name, e))
log.debug(e, exc_info=1)
if not no_cache:
return None
else:
raise click.ClickException(
"Could not parse dictlet '{}': {}\n{}\n".format(
name,
e,
"For more details you might want to try to increase verbosity (--verbosity DEBUG)",
)
)
# the luci-specific part of the dictlet metadata
# used to create the dictlet command-line interface
luci_metadata = dictlet_metadata.get(KEY_LUCI_NAME, {})
command_doc = luci_metadata.get(KEY_DOC_GROUP, {})
command_vars = luci_metadata.get(KEY_VARS_GROUP, {})
if not isinstance(command_vars, dict):
command_vars = convert_args_to_dict(command_vars)
if command_vars:
c_vars = dict_merge(
self.default_dictlet_vars, command_vars, copy_dct=True
)
else:
c_vars = self.default_dictlet_vars
click_options = parse_args_dict(c_vars)
@click.command(name=name)
def command(*args, **kwargs):
# here's where the lucifier in question actually does it's work
# adding user input
user_input = clean_user_input(kwargs, c_vars)
merged = OrderedDict()
dict_merge(merged, user_input, copy_dct=False)
self.lucifier.add_metadata(merged)
# now we add the dictlet 'for real', having all the 'base' metadata in place
self.lucifier.overlay_dictlet(name, dictlet_details, add_dictlet=True)
# some debug variables
debug_vars = {
"metadata_user": user_input,
"metadata_defaults": self.lucifier_defaults,
"metadata_dictlet": dictlet_metadata,
"metadata_luci": luci_metadata,
"metadata_base": self.lucifier.get_metadata(),
}
self.lucifier.add_metadata({"__debug__": debug_vars})
# and off we go
self.lucifier.process()
# assemble cli
command.params = click_options
command.help = command_doc.get(KEY_HELP_NAME)
command.short_help = command_doc.get(KEY_SHORT_HELP_NAME)
command.epilog = command_doc.get(KEY_EPILOG_NAME)
return command
except (click.ClickException) as e:
raise e
except (Exception) as e:
# import traceback
# traceback.print_exc()
log.debug(e)
click.echo("Could not load dictlet '{}': {}".format(name, e))
[docs]class CliLucifier(Lucifier):
"""Lucifier to create a command-line interface with options and argument(s).
This Lucifier is a special case, as it take a class that extend :class:`~luci.lucifiers.Lucifier` as an 'input-dictlet'.
Args:
ctx (dict): the context passed from the parent :class:`~click.MultiCommand`
"""
LUCI_CLI = {
"__lucify__": {
"doc": {
"short help": "creating a command-line interface",
"epilog": "the 'cli' lucifier is part of the luci package",
},
"lucifiers": {
"cli": {
"reader": {
"name": "class",
"config": {"class": {"keys": {"__luci__": "__luci__"}}},
}
}
},
}
}
def __init__(self, ctx=None, dictlet=None, **kwargs):
self.dictlet = dictlet
# adding global metadata from config file (if it exists)
self.init_config = None
if os.path.exists(LUCI_CONFIG_FILE):
try:
with open(LUCI_CONFIG_FILE) as cf:
self.init_config = ordered_load(cf)
except (Exception) as e:
log.debug(e, exc_info=1)
log.info(
"Could not load config file '{}', ignoring...".format(
LUCI_CONFIG_FILE
)
)
if not self.init_config:
self.init_config = OrderedDict()
# adding potential metadata specified using the '--defaults' luci cli option
defaults = OrderedDict()
if ctx is not None:
defaults_tuple = ctx.params.get("defaults", None)
if defaults_tuple:
for d_dict in defaults_tuple:
dict_merge(defaults, d_dict, copy_dct=False)
if defaults:
dict_merge(self.init_config, defaults, copy_dct=False)
copy_init_config = copy.deepcopy(self.init_config)
super(CliLucifier, self).__init__(
name="cli", init_metadata=copy_init_config, **kwargs
)
self.ctx = ctx
[docs] def process_dictlet(self, metadata, dictlet_details=None):
"""Assembles the :class:`click.Multicommand` that represents a lucifier.
This uses the name of the stevedore extension that was selected as command-name, and the default values that were also used for this 'parent' CliLucifier as initial metadata. The 'child' lucifier will be assembled in the :class:`LucifierCommand constructor <luci.cli_lucifier.LucifierCommand>`.
Returns:
function: the command
"""
if len(self.dictlets) != 1:
raise Exception(
"CliLucifier can't have more than one input dictlet (aka LucifierClass)"
)
lucifier_name = list(self.dictlets.keys())[-1]
@click.command(
cls=LucifierCommand,
name=lucifier_name,
dictlet=self.dictlet,
subcommand_metavar="DICTLET",
lucifier_defaults=self.init_config,
)
@click.pass_context
def lucifier_command(ctx, **kwargs):
pass
return lucifier_command
[docs] def get_dictlet(self, dictlet_path):
pass