import abc
import copy
import logging
# from collections import OrderedDict
from ruamel.yaml.comments import CommentedMap
import click
import six
from frutils import dict_merge, get_key_path_value
from frutils.defaults import *
from frutils.frutils_cli import output
from .defaults import *
from .finders import create_dictlet_finder
from .readers import create_dictlet_reader
log = logging.getLogger("lucify")
[docs]@six.add_metaclass(abc.ABCMeta)
class Lucifier(object):
"""Abstract base class for an object that can execute a certain task using the added dictlet(s).
Args:
name (str): the name/alias of this lucifier
init_metadata (dict): (optional) metadata to be used as seed for this lucifiers internal metadata
**kwargs (dict): other arguments, most likely those will be ignored
"""
[docs] @staticmethod
def process_default_values(user_vars, vars_metadata):
"""Removes any keys from a dict where the user didn't specify a value.
If this is not done, those keys stay with a 'None' value, which would shadow a 'valid' value that was provided at an earlier stage.
Args:
user_vars (dict): the user input dictionary
vars_metadata (dict): the vars metadata
Returns:
dict: the cleaned dict
"""
keys_to_remove = []
for key, value in user_vars.items():
if value is not None:
continue
default_var_value = vars_metadata.get(key, {}).get(
KEY_DEFAULT_VALUE_NAME, None
)
if default_var_value is None:
keys_to_remove.append(key)
log.debug(
"Removing keys, as no default specified by user: {}".format(keys_to_remove)
)
for key in keys_to_remove:
user_vars.pop(key, None)
return user_vars
[docs] @staticmethod
def get_var_lookup_dict(vars_metadata):
"""Generates a reverse lookup dictionary out of vars metadata.
This enables the mapping of flags like "--help" to the var-name-keys in the vars metadata.
Args:
vars_metadata (dict): the vars metadata
Returns:
dict: the lookup dict
"""
lookup = {}
def add_key(var_alias, name):
if var_alias.startswith("-"):
var_alias = var_alias[1:]
# I know, I know
if var_alias.startswith("-"):
var_alias = var_alias[1:]
var_alias = var_alias.replace("-", "_").lower()
if var_alias in lookup.keys():
raise Exception(
"Alias '{}' used (at least) twice: {}, {}".format(
var_alias, lookup[var_alias], name
)
)
lookup[var_alias] = name
for var_name, md in vars_metadata.items():
if (
KEY_CLI_CLICK_PARAM_NAME
in md.get(KEY_CLI_GROUP, {}).get(KEY_CLI_CLICK_OPTIONS_NAME, {}).keys()
):
aliases = md[KEY_CLI_GROUP][KEY_CLI_CLICK_OPTIONS_NAME][
KEY_CLI_CLICK_PARAM_NAME
]
for a in aliases:
add_key(a, var_name)
if (
KEY_CLI_CLICK_PARAM_NAME
in md.get(KEY_CLI_GROUP, {}).get(KEY_CLI_CLICK_ARGUMENT_NAME, {}).keys()
):
alias = md[KEY_CLI_GROUP][KEY_CLI_CLICK_ARGUMENT_NAME][
KEY_CLI_CLICK_PARAM_NAME
]
add_key(alias, var_name)
if KEY_ALIAS_NAME in md.get(KEY_META_GROUP, {}).keys():
alias = md[KEY_META_GROUP][KEY_ALIAS_NAME]
add_key(alias, var_name)
return lookup
def __init__(self, name=None, init_metadata=None, **kwargs):
self.name = name
self.metadata_key = KEY_LUCI_NAME
self.metadata = self.get_default_metadata()
if init_metadata is not None:
self.add_metadata(init_metadata)
self.dictlets = CommentedMap()
[docs] def process_dictlet(self, metadata, dictlet_details=None):
"""The main method implemented by a lucifier, it executes the task it is written for, using the metadata accrued as configuration.
Depending on the lucifier, this may return a string, an object, or nothing (and just prints out text to stdout).
Args:
metadata: the metadata to process
dictlet_details: metadata about the dictlet itself
Returns:
object: the new (processed) metadata
"""
return metadata
[docs] def get_key_path_value(self, key_path, default_value=None):
"""Utility method to retrieve a key-path from the (current) metadata of this lucifier.
A key-path is a character-delimited (mostly using '.') string ala: key1.child_key.another_key
Args:
key_path (str): the path to the key we are interested in
default_value (object): the default value if the key is not found
Returns:
object: the value for the key-path
"""
return get_key_path_value(self.metadata, key_path, default_value=default_value)
[docs] def get_default_reader_config(self):
"""Returns an object of the default dictlet reader for this lucifier.
Returns:
dict: the reader object configuration
"""
if self.name is None:
name = ALL_LUCIFIERS_KEY
else:
name = self.name
reader_config = self.get_key_path_value(
LUCIFIER_DEFAULT_READER.format(self.metadata_key, name)
)
if not reader_config and name != ALL_LUCIFIERS_KEY:
name = ALL_LUCIFIERS_KEY
reader_config = self.get_key_path_value(
LUCIFIER_DEFAULT_READER.format(self.metadata_key, name), default_value={}
)
return reader_config
[docs] def get_default_dictlet_finder(self):
dictlet_finder_name = self.get_default_finder_config().get("name", None)
if dictlet_finder_name:
dictlet_finder_config = self.get_default_finder_config().get("config", {})
else:
dictlet_finder_name = DEFAULT_FINDER_CONFIG.get("name")
dictlet_finder_config = DEFAULT_FINDER_CONFIG.get("config", {})
return create_dictlet_finder(dictlet_finder_name, dictlet_finder_config)
[docs] def get_default_finder_config(self):
"""Returns an object of the default dictlet finder for this lucifier.
Returns:
dict: the finder object configuration
"""
if self.name is None:
name = ALL_LUCIFIERS_KEY
else:
name = self.name
finder_config = self.get_key_path_value(
LUCIFIER_DEFAULT_FINDER.format(self.metadata_key, name)
)
if not finder_config and name != ALL_LUCIFIERS_KEY:
name = ALL_LUCIFIERS_KEY
finder_config = self.get_key_path_value(
LUCIFIER_DEFAULT_FINDER.format(self.metadata_key, name), default_value={}
)
return finder_config
[docs] def process(self):
"""Processes all dictlets that were added to this Lucifier, one after the other.
"""
result = CommentedMap()
for dictlet_name, details in self.dictlets.items():
try:
p_r = self.process_dictlet(details["metadata"], dictlet_details=details)
result[dictlet_name] = p_r
except (Exception) as e:
click.echo(e)
return result
[docs] def get_default_dictlet_reader(self):
"""Returns the default dictlet reader for this lucifier."""
dictlet_reader_name = self.get_default_reader_config().get("name", None)
if dictlet_reader_name:
dictlet_reader_config = self.get_default_reader_config().get("config", {})
else:
dictlet_reader_name = DEFAULT_READER_CONFIG.get("name")
dictlet_reader_config = DEFAULT_READER_CONFIG.get("config", {})
return create_dictlet_reader(dictlet_reader_name, dictlet_reader_config)
[docs] def overlay_dictlet(
self,
dictlet_name,
dictlet_details=None,
add_dictlet=True,
add_metadata=False,
dictlet_reader_name=None,
dictlet_reader_config=None,
):
"""Overlays a dictlet on the current lucifier metadata.
If the 'add_metadata' arg is True, the dictlet and it's newly created metadata will be stored in this object.
Args:
dictlet_name (str): an alias/name for the dictlet
dictlet_details (dict): the dictlet details
add_dictlet (bool): whether to add the resulted metadata to this lucifiers' 'base'-metadata
add_metadata (bool): whether to add the dictlet and it's metadata to the lucifier permanently
dictlet_reader_name (str): the dictlet reader to use to extract metadata out of the dictlet
dictlet_reader_config (str): the config for the dictlet reader
Returns:
dict: a copy of the (merged) metadata of this dictlet
"""
if dictlet_reader_name is None:
dictlet_reader = self.get_default_dictlet_reader()
else:
dictlet_reader = create_dictlet_reader(
dictlet_reader_name, dictlet_reader_config
)
metadata_copy = copy.deepcopy(self.metadata)
current_luci_metadata = metadata_copy.get(KEY_LUCI_NAME, {})
if dictlet_details is None:
dictlet_details = self.get_default_dictlet_finder().get_dictlet_details(
dictlet_name
)
dictlet_metadata = dictlet_reader.read_dictlet(
dictlet_details, metadata_copy, current_luci_metadata
)
if add_metadata:
self.metadata = copy.deepcopy(dictlet_metadata)
luci_metadata = dictlet_metadata.pop(KEY_LUCI_NAME, {})
if add_dictlet:
dictlet_details["metadata"] = dictlet_metadata
dictlet_details["luci_metadata"] = luci_metadata
self.dictlets[dictlet_name] = dictlet_details
return copy.deepcopy(dictlet_metadata)
else:
return dictlet_metadata