Source code for luci.lucifiers

import logging
from collections import OrderedDict

import click
import os
import requests
from six import string_types

from frutils import flatten_lists, get_key_path_value, replace_string
from frutils.frutils_cli import (
    create_flag_var,
    create_output_type_var,
    create_pager_var,
    output,
    output_item,
)
from frutils.defaults import KEY_LUCI_NAME
from .lucify import Lucifier

log = logging.getLogger("lucify")


[docs]class MetadataLucifier(Lucifier): """Lucifier that extracts metadata out of a dictlet and prints it to stdout. Args: luci_metadata (bool): whether to also display 'luci'-specific metadata output_type (str): the format of the output. Available: 'raw', 'json', 'yaml' pager (bool): whether to use a pager or not """ LUCI_METADATA = { "__lucify__": { "doc": { "short_help": "direct output of processed luci metadata", "epilog": "the 'metadata' lucifier is part of the luci package", }, "vars": { "__dictlet__.output_type": create_output_type_var(), "__dictlet__.pager": create_pager_var(), "__dictlet__.luci_metadata": { "type": bool, "required": False, "default": False, "doc": { "help": "whether to print internal metadata (mostly useful for debugging purposes)" }, "click": { "option": { "param_decls": ["--luci-metadata", "-l"], "type": bool, "is_flag": True, } }, }, }, } } def __init__(self, **kwargs): super(MetadataLucifier, self).__init__(**kwargs)
[docs] def get_default_metadata(self): return MetadataLucifier.LUCI_METADATA
[docs] def process_dictlet(self, metadata, dictlet_details): dictlet_vars = metadata.pop("__dictlet__", {}) include_luci_metadata = dictlet_vars.get("luci_metadata", False) pager = dictlet_vars.get("pager", False) output_type = dictlet_vars.get("output_type", "yaml") metadata_luci = metadata.pop(KEY_LUCI_NAME, {}) if include_luci_metadata: click.echo("") click.secho("dictlet vars:", bold=True) click.echo("") output(dictlet_vars, output_type=output_type, pager=pager) click.echo("") click.secho("luci metadata:", bold=True) click.echo("") output(metadata_luci, output_type=output_type, pager=pager) click.echo("") click.secho("dictlet metadata:", bold=True) click.echo("") output(metadata, output_type=output_type, safe=False, pager=pager) return metadata
[docs]class DebugLucifier(Lucifier): """Lucifier to be used when debugging dictlets. It let's you print debug information of both metadata and content of a dictlet to stdout. """ LUCI_DEBUG = { "__lucify__": { "register": { "content": "__debug__.content.content", "content_filtered": "__debug__.content.filtered", "metadata": "__debug__.content.metadata", "all": "__debug__.content.all", }, "doc": { "short_help": "displays relevant parsing information for dictlets", "epilog": "the 'debug' lucifier is part of the luci package", }, "vars": OrderedDict( { "__dictlet__.debug.all": create_flag_var( "display all available information", ["--all", "-a"] ), "__dictlet__.debug.all_metadata": create_flag_var( "display all available metadata information", ["--all-metadata", "-ma"], ), "__dictlet__.debug.all_lines": create_flag_var( "display all line-related information ('content', 'content_filtered', 'metadata')", ["--all-lines", "-l"], ), "__dictlet__.debug.metadata": create_flag_var( "display the merged metadata)", ["--metadata", "-m"] ), "__dictlet__.debug.metadata_defaults": create_flag_var( "display default vars", ["--metadata-defaults", "-md"] ), "__dictlet__.debug.metadata_user": create_flag_var( "display user-input arguments", ["--metadata-user", "-mu"] ), "__dictlet__.debug.metadata_luci": create_flag_var( "display luci-related metadata", ["--metadata-luci", "-ml"] ), "__dictlet__.debug.metadata_dictlet": create_flag_var( "display processed dictlet metadata", ["--metadata-dictlet", "-mt"], ), "__dictlet__.debug.lines_content_all": create_flag_var( "display all lines of a template", ["--lines-all", "-la"] ), "__dictlet__.debug.lines_metadata": create_flag_var( "display all lines of a template that were processed by luci", ["--lines-metadata", "-lm"], ), "__dictlet__.debug.lines_content": create_flag_var( "display all lines of a template that were skipped by luci processing and will be used as content", ["--lines-content", "-lc"], ), "__dictlet__.debug.lines_content_filtered": create_flag_var( "display all lines of a template that were skipped by luci processing and are pre-filtered", ["--lines-filtered", "-lf"], ), "__dictlet__.output_type": create_output_type_var(), "__dictlet__.pager": create_pager_var(), } ), } } def __init__(self, **kwargs): super(DebugLucifier, self).__init__(**kwargs)
[docs] def get_default_metadata(self): return DebugLucifier.LUCI_DEBUG
[docs] def process_control_options(self, dictlet_properties): """Just a utility method to set the output options specified in the user input.""" self.all = dictlet_properties["all"] self.all_metadata = dictlet_properties["all_metadata"] self.all_lines = dictlet_properties["all_lines"] self.metadata_merged = dictlet_properties["metadata"] self.metadata_luci = dictlet_properties["metadata_luci"] self.metadata_user = dictlet_properties["metadata_user"] self.metadata_defaults = dictlet_properties["metadata_defaults"] self.metadata_dictlet = dictlet_properties["metadata_dictlet"] # self.parse_results = dictlet_properties.get("parse_results", {}) self.content_lines = dictlet_properties["lines_content"] self.content_filtered_lines = dictlet_properties["lines_content_filtered"] self.metadata_lines = dictlet_properties["lines_metadata"] self.all_content_lines = dictlet_properties["lines_content_all"] if ( not self.all and not self.all_metadata and not self.all_lines and not self.metadata_merged and not self.metadata_luci and not self.metadata_user and not self.metadata_defaults and not self.metadata_dictlet and not self.content_lines and not self.content_filtered_lines and not self.metadata_lines and not self.all_content_lines ): self.metadata_merged = True self.content_lines = True if self.all: self.all_lines = True self.all_metadata = True # self.parse_results = True if self.all_lines: self.content_lines = True self.content_filtered_lines = True self.metadata_lines = True self.all_content_lines = True if self.all_metadata: self.metadata_merged = True self.metadata_luci = True self.metadata_user = True self.metadata_defaults = True self.metadata_dictlet = True
# disable for now # self.parsed_results = False
[docs] def process_dictlet(self, metadata, dictlet_details): dictlet_options = metadata.pop("__dictlet__", {}) output_type = dictlet_options.get("output_type", "yaml") pager = dictlet_options.get("pager", False) self.process_control_options(dictlet_options["debug"]) debug_vars = self.metadata.get("__debug__", {}) metadata_defaults_vars = debug_vars.get("metadata_defaults", {}) metadata_user = debug_vars.get("metadata_user", {}) metadata_luci = debug_vars.get("metadata_luci", {}) metadata_dictlet = debug_vars.get("metadata_dictlet", {}) dictlet_debug_vars = metadata.get("__debug__", {}) content_lines = dictlet_debug_vars.get("content", {}).get("content", []) content_filtered_lines = dictlet_debug_vars.get("content", {}).get( "filtered", [] ) all_lines = dictlet_debug_vars.get("content", {}).get("all", []) metadata_content_lines = dictlet_debug_vars.get("content", {}).get( "metadata", [] ) click.echo("") # if self.parse_results: # self.output_item("parsed results", parse_results, self.format, self.pager) result = {} # metadata if self.metadata_defaults: output_item( "metadata: extra vars", metadata_defaults_vars, output_type, pager ) result["metadata_defaults"] = metadata_defaults_vars if self.metadata_user: output_item("metadata: user vars", metadata_user, output_type, pager) result["metadata_user"] = metadata_user if self.metadata_luci: output_item("metadata: luci", metadata_luci, output_type, pager) result["metadata_luci"] = metadata_luci if self.metadata_dictlet: output_item("metadata: dictlet", metadata_dictlet, output_type, pager) result["metadata_dictlet"] = metadata_dictlet if self.metadata_merged: output_item("metadata (merged)", metadata, output_type, pager) result["metadata_merged"] = metadata # content if self.all_lines: output_item("all lines", "\n".join(flatten_lists(all_lines)), "raw", pager) result["all_lines"] = all_lines if self.content_lines: output_item( "content lines", "\n".join(flatten_lists(content_lines)), "raw", pager ) result["content_lines"] = content_lines if self.content_filtered_lines: output_item( "content filtered lines", "\n".join(flatten_lists(content_filtered_lines)), "raw", pager, ) result["content_filtered_lines"] = content_filtered_lines if self.metadata_lines: output_item( "metadata lines", "\n".join(flatten_lists(metadata_content_lines)), "raw", pager, ) result["metadata_lines"] = metadata_content_lines return result
[docs]class CatLucifier(Lucifier): """Very simple lucifier, outputs the part of the dictlet that is not processed by luci. """ LUCI_CAT = { "__lucify__": { "register": { "content_filtered": "__cat__.data", "metadata": "__cat__.metadata", }, "doc": { "short_help": "output of the dictlet (minus the luci parts)", "epilog": "the 'cat' lucifier is part of the luci package", }, "vars": { "__cat__.display_luci_contents": { "type": bool, "required": False, "default": False, "doc": {"help": "print the whole dictlet, including luci metadata"}, "click": { "option": { "is_flag": True, "param_decls": ["--luci-content", "-l"], } }, }, "__cat__.pager": { "type": bool, "required": False, "default": False, "doc": {"help": "whether to use a pager for display"}, "click": { "option": { "param_decls": ["--pager", "-p"], "type": bool, "is_flag": True, } }, }, }, } } DEFAULT_READER = "skip" def __init__(self, **kwargs): super(CatLucifier, self).__init__(**kwargs)
[docs] def get_default_metadata(self): return CatLucifier.LUCI_CAT
[docs] def process_dictlet(self, metadata, dictlet_details): luci_content = get_key_path_value(metadata, "__cat__.display_luci_contents") pager = get_key_path_value(metadata, "__cat__.pager") if luci_content: processed = get_key_path_value(metadata, "__cat__.metadata") result = "\n".join(flatten_lists(processed)) else: data = get_key_path_value(metadata, "__cat__.data") result = "\n".join(flatten_lists(data)) output(result, output_type="raw", safe=False, pager=pager) return result
[docs]class TemplateLucifier(Lucifier): """Lucifier that uses the dictlet metadata as variables to replace strings in the dictlet content. """ LUCI_TEMPLATE = { "__lucify__": { "register": { "content": "__template__.content.all", "metadata": "__template__.content.data", "content_filtered": "__template__.content.filtered", }, "doc": { "short_help": "use luci metadata for (jinja2) templating, in the same dictlet", "epilog": "the 'template' lucifier is part of the luci package", }, "vars": { "__user__.output_type": create_output_type_var(), "__user__.pager": create_pager_var(), "__user__.all": { "type": bool, "doc": { "help": "whether to include parts that were being used by luci to extract metadata in the output" }, "click": { "option": {"is_flag": True, "param_decls": ["--all", "-a"]} }, }, }, } } def __init__(self, output_type=None, pager=None, all=False, **kwargs): super(TemplateLucifier, self).__init__(**kwargs)
[docs] def get_default_metadata(self): return TemplateLucifier.LUCI_TEMPLATE
[docs] def process_dictlet(self, metadata, dictlet_details): output_type = get_key_path_value( metadata, "__user__.output_type", default_value="raw" ) pager = get_key_path_value(metadata, "__user__.pager", default_value=False) all = get_key_path_value(metadata, "__user__.all", default_value=False) content_all = get_key_path_value(metadata, "__template__.content.all") # content_data = get_key_path_value(metadata, "__template__.content.data") content_sensible = get_key_path_value(metadata, "__template__.content.filtered") if not all: template = content_sensible else: template = content_all for t in template: result = replace_string("\n".join(t), metadata) output(result, output_type=output_type, pager=pager)
[docs]class DownloadLucifier(Lucifier): """Lucifier that can update a template by downloading and replacing it on filesystem. The dictlet needs to contain a metadata key 'url' with a child key 'default' and the remote url to download as value. It can also, optionally, contain a child key 'version' and a templated string like 'https://dl.com/dictlet-{{ version }}.yml' as value. The latter will be used if the '--version' option is specified (as described below). This lucifier adds a few command-line options to the dictlet-part of the command (not the lucifier part, as is normal -- mostly because I like it better that way in this case): - target ('--target', '-t'): a target file - replace ('--replace', '-r'): whether to replace an existing file - version ('--version): whether to download a specific version of the file By default, this lucifier will print out the content of the remote file. If the '--replace' option is specified, the template itself will be replaced with the remote file. If the '--target' option is specified, the remote file will be downloaded to it's value instead. """ LUCI_DOWNLOAD = { "__lucify__": { "lucifiers": { "download": { "reader": {"name": "yaml", "config": {"metadata_key": "url"}} } }, "doc": { "short_help": "re-download/update a dictlet", "epilog": "the 'template' lucifier is part of the luci package", }, "vars": {"__user__.pager": create_pager_var()}, "cli": { "default_dictlet_vars": { "__download__.replace": { "type": bool, "required": False, "doc": { "help": "if set, the target file will be overwritten (if '-t' is not specified, the original dictlet will be overwritten)" }, "click": { "option": { "param_decls": ["--replace", "-r"], "is_flag": True, } }, }, "__download__.target": { "type": str, "required": False, "doc": { "help": "if specified, the url content will be saved into the specified file, otherwise the dictlet itself will be replaced" }, "click": { "option": { "type": str, "param_decls": ["--target", "-t"], "metavar": "PATH", } }, }, "__download__.version": { "type": str, "default": "default", "required": False, "alias": "version", "doc": { "help": "the version of the dictlet to download (optional)" }, "click": {"option": {"metavar": "VERSION"}}, }, }, # "__download__.backup": { # "meta": { # "type": bool, # "default": False, # "required": False # }, # "doc": { # "help": "whether to backup the dictlet or not" # }, # "cli": { # "option": { # "is_flag": True # } # } # } }, } } def __init__(self, **kwargs): super(DownloadLucifier, self).__init__(**kwargs)
[docs] def get_default_metadata(self): return DownloadLucifier.LUCI_DOWNLOAD
[docs] def process_dictlet(self, metadata, dictlet_details): # utils.output(metadata, output_type="yaml") # utils.output(dictlet_details, output_type="yaml") pager = get_key_path_value(metadata, "__user__.pager", default_value=False) version = get_key_path_value( metadata, "__download__.version", default_value="default" ) replace = get_key_path_value( metadata, "__download__.replace", default_value=False ) target = get_key_path_value(metadata, "__download__.target", default_value=None) if target: target_file = target if os.path.isdir(os.path.realpath(target_file)): dictlet_file = dictlet_details.get("path", None) if not dictlet_file: raise Exception("Dictlet file path not found in metadata.") target_file = os.path.join(target_file, os.path.basename(dictlet_file)) else: if replace: target_file = dictlet_details.get("path", None) if not target_file: raise Exception("Dictlet file path not found in metadata.") else: target_file = None if target_file: log.debug("target file: {}".format(target_file)) else: log.debug("no target file specified, only outputting content to stdout") if version == "default": if isinstance(metadata.get("url", None), dict): url = metadata.get("url", {}).get("default", None) elif isinstance(metadata.get("url", None), string_types): url = metadata.get("url") else: if "url" not in metadata.keys(): raise Exception( "Could not determine url, make sure to include the metadata key 'url' in your dictlet." ) else: raise Exception( "'url' key provided, but value type not valid: {}".format( metadata.get("url") ) ) else: url = metadata.get("url", {}).get("version", None) rep_dict = {"version": version} url = replace_string(url, rep_dict) if not url: raise Exception( "Could not determine url, make sure to include the metadata key 'url' in your dictlet." ) if target_file and os.path.exists(target_file): if not replace: click.echo("") click.echo( "file '{}' already exists, and '--replace' option not specified, doing nothing...".format( target_file ) ) return if target_file: click.echo("") click.echo("Downloading: {}".format(url)) verify_ssl = True disable_cache = True headers = {} if disable_cache: headers = { "Cache-Control": "no-cache,no-store,must-revalidate,max-age=0", "Expires": "0", } try: log.debug("downloading url: {}".format(url)) r = requests.get(url, verify=verify_ssl, headers=headers) r.raise_for_status() content = r.text except Exception as e: raise Exception("Could not download file: {}".format(url), e) if not target_file: output(content, output_type="raw", pager=pager) else: # TODO: implement backup of original file with open(target_file, "w") as w: click.echo("Saving file: {}".format(target_file)) w.write(content) click.echo("Done.")