Source code for luci.cli_lucifier

import copy
import logging
from collections import OrderedDict

import click
from frkl.frkl import dict_merge

from frutils.defaults import *
from .finders import LucifierFinder
from .lucifiers import Lucifier

from .defaults import *
from frutils.frutils_cli import clean_user_input, convert_args_to_dict, parse_args_dict
from frutils import ordered_load

log = logging.getLogger("lucify")


[docs]class LucifierCommand(click.MultiCommand): """Class to assemble and hold a :class:`~luci.lucifiers.Lucifier` object, and create a dictlet-cli-command according to user input. Internally, this uses stevedore to load the lucifier extension that matches the selected command name. Args: name: the name (alias) of the selected command lucifier_defaults (dict): the default metadata dict with which to initialize the Lucifier """ def __init__(self, name, lucifier_defaults, dictlet=None, **kwargs): self.lucifier_defaults = lucifier_defaults self.dictlet = dictlet # creating lucifier object self.lucifier_finder = LucifierFinder() log.debug("loading lucifier: {}".format(name)) lucifier_details = self.lucifier_finder.get_dictlet_details(name) self.lucifier = lucifier_details["class"]( name=lucifier_details.get("name", None), init_metadata=self.lucifier_defaults, ) # those are the vars that are added to every 'child' dictlet (if not overwritten by the dictlet) self.default_dictlet_vars = self.lucifier.get_key_path_value( LUCIFIER_CLI_DEFAULT_DICTLET_VARS.format(KEY_LUCI_NAME) ) if not self.default_dictlet_vars: self.default_dictlet_vars = OrderedDict() # this part is about creating the cli options for the lucifier dynamically self.lucifier_param_metadata = self.lucifier.get_metadata().get( KEY_LUCI_NAME, {} ) help_string = self.lucifier_param_metadata.get(KEY_DOC_GROUP, {}).get( KEY_HELP_NAME, None ) if not help_string: help_string = self.lucifier.__doc__ if help_string: kwargs["help"] = help_string short_help = self.lucifier_param_metadata.get(KEY_DOC_GROUP, {}).get( KEY_SHORT_HELP_NAME, None ) if short_help: kwargs["short_help"] = short_help epilog = self.lucifier_param_metadata.get(KEY_DOC_GROUP, {}).get( KEY_EPILOG_NAME, None ) if epilog: kwargs["epilog"] = epilog self.lucifier_command_vars = self.lucifier_param_metadata.get( KEY_VARS_GROUP, {} ) if not isinstance(self.lucifier_command_vars, dict): self.lucifier_command_vars = convert_args_to_dict( self.lucifier_command_vars ) kwargs["params"] = parse_args_dict(self.lucifier_command_vars) # initializing the parent click.MultiCommand, using the options we just created super(LucifierCommand, self).__init__(**kwargs) self.lucifier_finder = self.lucifier.get_default_dictlet_finder() # prepare command_cache. this is used to minimize the amount of times dictlets have to be created, # depending on whether ``luci`` is called with the ``--help`` option or not. # this should also improves performance when tab-completion is used self.command_cache = None
[docs] def get_command_cache(self, ctx): """Creates the command cache that contains all auto-detectable dictlets. Non-auto-detectable dictlets can still be loaded if the user specifies their exact path/name. Args: ctx (click.Context): the context """ if self.command_cache is None: self.command_cache = OrderedDict() command_names = self.lucifier_finder.get_all_dictlets().keys() for n in command_names: command = self.get_command_direct(ctx, n) if command: self.command_cache[n] = command return self.command_cache
[docs] def list_commands(self, ctx): """Lists all auto-detectable dictlets. Args: ctx (click.Context): the context Returns: list: a listof dictlet-names/paths """ if self.dictlet: return [self.dictlet] else: return self.get_command_cache(ctx).keys()
[docs] def get_command(self, ctx, name): """Returns a command function that is enriched with the metadata of the specified dictlet. If the command_cache for this object was created, it is used to retrieve the result, otherwise the :meth:`get_command_direct` method is used. Args: ctx (click.Context): the context name (str): the name/path of/to the dictlet Returns: function: the function callback for click to use to create a (sub-)command-line interface """ if self.dictlet: result = self.get_command_direct(ctx, name, no_cache=True) return result if self.command_cache is None: return self.get_command_direct(ctx, name, no_cache=True) return self.get_command_cache(ctx)[name]
[docs] def get_command_direct(self, ctx, name, no_cache=False): """ Args: ctx (click.Context): the context name (str): the name/path of/to the dictlet no_cache (bool): whether to involve cache or not (depending on the use-case, one might be faster than the other) Returns: function: the function callback for click to use to create a (sub-)command-line interface """ try: # parses and adds user input to the current lucifier lucifier_params = ctx.params lucifier_user_input = clean_user_input( lucifier_params, self.lucifier_command_vars ) self.lucifier.add_metadata(lucifier_user_input) # circumvent cache or not # the former is better if the dictlet-path is specified directly by the user, otherwise # all auto-discoverable dictlets would have to be loaded if no_cache: abspath = os.path.abspath(name) dictlet_details = self.lucifier_finder.find_available_dictlets( paths=[abspath] ).get(abspath) else: dictlet_details = self.lucifier_finder.get_dictlet_details(name) if not dictlet_details: log.debug("can't load dictlet: {}".format(name)) raise click.ClickException("Can't load dictlet: {}".format(name)) # add the dictlet to the lucifier, to extract it's metadata d_type = dictlet_details.get("type", None) if d_type is None: log.debug( "type for dictlet '{}' is not specified, ignoring.".format(name) ) return None try: dictlet_metadata = self.lucifier.overlay_dictlet( name, dictlet_details, add_dictlet=False ) except (Exception) as e: log.debug("Could not parse dictlet '{}': {}".format(name, e)) log.debug(e, exc_info=1) if not no_cache: return None else: raise click.ClickException( "Could not parse dictlet '{}': {}\n{}\n".format( name, e, "For more details you might want to try to increase verbosity (--verbosity DEBUG)", ) ) # the luci-specific part of the dictlet metadata # used to create the dictlet command-line interface luci_metadata = dictlet_metadata.get(KEY_LUCI_NAME, {}) command_doc = luci_metadata.get(KEY_DOC_GROUP, {}) command_vars = luci_metadata.get(KEY_VARS_GROUP, {}) if not isinstance(command_vars, dict): command_vars = convert_args_to_dict(command_vars) if command_vars: c_vars = dict_merge( self.default_dictlet_vars, command_vars, copy_dct=True ) else: c_vars = self.default_dictlet_vars click_options = parse_args_dict(c_vars) @click.command(name=name) def command(*args, **kwargs): # here's where the lucifier in question actually does it's work # adding user input user_input = clean_user_input(kwargs, c_vars) merged = OrderedDict() dict_merge(merged, user_input, copy_dct=False) self.lucifier.add_metadata(merged) # now we add the dictlet 'for real', having all the 'base' metadata in place self.lucifier.overlay_dictlet(name, dictlet_details, add_dictlet=True) # some debug variables debug_vars = { "metadata_user": user_input, "metadata_defaults": self.lucifier_defaults, "metadata_dictlet": dictlet_metadata, "metadata_luci": luci_metadata, "metadata_base": self.lucifier.get_metadata(), } self.lucifier.add_metadata({"__debug__": debug_vars}) # and off we go self.lucifier.process() # assemble cli command.params = click_options command.help = command_doc.get(KEY_HELP_NAME) command.short_help = command_doc.get(KEY_SHORT_HELP_NAME) command.epilog = command_doc.get(KEY_EPILOG_NAME) return command except (click.ClickException) as e: raise e except (Exception) as e: # import traceback # traceback.print_exc() log.debug(e) click.echo("Could not load dictlet '{}': {}".format(name, e))
[docs]class CliLucifier(Lucifier): """Lucifier to create a command-line interface with options and argument(s). This Lucifier is a special case, as it take a class that extend :class:`~luci.lucifiers.Lucifier` as an 'input-dictlet'. Args: ctx (dict): the context passed from the parent :class:`~click.MultiCommand` """ LUCI_CLI = { "__lucify__": { "doc": { "short help": "creating a command-line interface", "epilog": "the 'cli' lucifier is part of the luci package", }, "lucifiers": { "cli": { "reader": { "name": "class", "config": {"class": {"keys": {"__luci__": "__luci__"}}}, } } }, } } def __init__(self, ctx=None, dictlet=None, **kwargs): self.dictlet = dictlet # adding global metadata from config file (if it exists) self.init_config = None if os.path.exists(LUCI_CONFIG_FILE): try: with open(LUCI_CONFIG_FILE) as cf: self.init_config = ordered_load(cf) except (Exception) as e: log.debug(e, exc_info=1) log.info( "Could not load config file '{}', ignoring...".format( LUCI_CONFIG_FILE ) ) if not self.init_config: self.init_config = OrderedDict() # adding potential metadata specified using the '--defaults' luci cli option defaults = OrderedDict() if ctx is not None: defaults_tuple = ctx.params.get("defaults", None) if defaults_tuple: for d_dict in defaults_tuple: dict_merge(defaults, d_dict, copy_dct=False) if defaults: dict_merge(self.init_config, defaults, copy_dct=False) copy_init_config = copy.deepcopy(self.init_config) super(CliLucifier, self).__init__( name="cli", init_metadata=copy_init_config, **kwargs ) self.ctx = ctx
[docs] def get_default_metadata(self): return CliLucifier.LUCI_CLI
[docs] def process_dictlet(self, metadata, dictlet_details=None): """Assembles the :class:`click.Multicommand` that represents a lucifier. This uses the name of the stevedore extension that was selected as command-name, and the default values that were also used for this 'parent' CliLucifier as initial metadata. The 'child' lucifier will be assembled in the :class:`LucifierCommand constructor <luci.cli_lucifier.LucifierCommand>`. Returns: function: the command """ if len(self.dictlets) != 1: raise Exception( "CliLucifier can't have more than one input dictlet (aka LucifierClass)" ) lucifier_name = list(self.dictlets.keys())[-1] @click.command( cls=LucifierCommand, name=lucifier_name, dictlet=self.dictlet, subcommand_metavar="DICTLET", lucifier_defaults=self.init_config, ) @click.pass_context def lucifier_command(ctx, **kwargs): pass return lucifier_command
[docs] def get_dictlet(self, dictlet_path): pass