import logging
from collections import OrderedDict
import click
import os
import requests
from six import string_types
from frutils import flatten_lists, get_key_path_value, replace_string
from frutils.frutils_cli import (
create_flag_var,
create_output_type_var,
create_pager_var,
output,
output_item,
)
from frutils.defaults import KEY_LUCI_NAME
from .lucify import Lucifier
log = logging.getLogger("lucify")
[docs]class DebugLucifier(Lucifier):
"""Lucifier to be used when debugging dictlets.
It let's you print debug information of both metadata and content of a dictlet to stdout.
"""
LUCI_DEBUG = {
"__lucify__": {
"register": {
"content": "__debug__.content.content",
"content_filtered": "__debug__.content.filtered",
"metadata": "__debug__.content.metadata",
"all": "__debug__.content.all",
},
"doc": {
"short_help": "displays relevant parsing information for dictlets",
"epilog": "the 'debug' lucifier is part of the luci package",
},
"vars": OrderedDict(
{
"__dictlet__.debug.all": create_flag_var(
"display all available information", ["--all", "-a"]
),
"__dictlet__.debug.all_metadata": create_flag_var(
"display all available metadata information",
["--all-metadata", "-ma"],
),
"__dictlet__.debug.all_lines": create_flag_var(
"display all line-related information ('content', 'content_filtered', 'metadata')",
["--all-lines", "-l"],
),
"__dictlet__.debug.metadata": create_flag_var(
"display the merged metadata)", ["--metadata", "-m"]
),
"__dictlet__.debug.metadata_defaults": create_flag_var(
"display default vars", ["--metadata-defaults", "-md"]
),
"__dictlet__.debug.metadata_user": create_flag_var(
"display user-input arguments", ["--metadata-user", "-mu"]
),
"__dictlet__.debug.metadata_luci": create_flag_var(
"display luci-related metadata", ["--metadata-luci", "-ml"]
),
"__dictlet__.debug.metadata_dictlet": create_flag_var(
"display processed dictlet metadata",
["--metadata-dictlet", "-mt"],
),
"__dictlet__.debug.lines_content_all": create_flag_var(
"display all lines of a template", ["--lines-all", "-la"]
),
"__dictlet__.debug.lines_metadata": create_flag_var(
"display all lines of a template that were processed by luci",
["--lines-metadata", "-lm"],
),
"__dictlet__.debug.lines_content": create_flag_var(
"display all lines of a template that were skipped by luci processing and will be used as content",
["--lines-content", "-lc"],
),
"__dictlet__.debug.lines_content_filtered": create_flag_var(
"display all lines of a template that were skipped by luci processing and are pre-filtered",
["--lines-filtered", "-lf"],
),
"__dictlet__.output_type": create_output_type_var(),
"__dictlet__.pager": create_pager_var(),
}
),
}
}
def __init__(self, **kwargs):
super(DebugLucifier, self).__init__(**kwargs)
[docs] def process_control_options(self, dictlet_properties):
"""Just a utility method to set the output options specified in the user input."""
self.all = dictlet_properties["all"]
self.all_metadata = dictlet_properties["all_metadata"]
self.all_lines = dictlet_properties["all_lines"]
self.metadata_merged = dictlet_properties["metadata"]
self.metadata_luci = dictlet_properties["metadata_luci"]
self.metadata_user = dictlet_properties["metadata_user"]
self.metadata_defaults = dictlet_properties["metadata_defaults"]
self.metadata_dictlet = dictlet_properties["metadata_dictlet"]
# self.parse_results = dictlet_properties.get("parse_results", {})
self.content_lines = dictlet_properties["lines_content"]
self.content_filtered_lines = dictlet_properties["lines_content_filtered"]
self.metadata_lines = dictlet_properties["lines_metadata"]
self.all_content_lines = dictlet_properties["lines_content_all"]
if (
not self.all
and not self.all_metadata
and not self.all_lines
and not self.metadata_merged
and not self.metadata_luci
and not self.metadata_user
and not self.metadata_defaults
and not self.metadata_dictlet
and not self.content_lines
and not self.content_filtered_lines
and not self.metadata_lines
and not self.all_content_lines
):
self.metadata_merged = True
self.content_lines = True
if self.all:
self.all_lines = True
self.all_metadata = True
# self.parse_results = True
if self.all_lines:
self.content_lines = True
self.content_filtered_lines = True
self.metadata_lines = True
self.all_content_lines = True
if self.all_metadata:
self.metadata_merged = True
self.metadata_luci = True
self.metadata_user = True
self.metadata_defaults = True
self.metadata_dictlet = True
# disable for now
# self.parsed_results = False
[docs] def process_dictlet(self, metadata, dictlet_details):
dictlet_options = metadata.pop("__dictlet__", {})
output_type = dictlet_options.get("output_type", "yaml")
pager = dictlet_options.get("pager", False)
self.process_control_options(dictlet_options["debug"])
debug_vars = self.metadata.get("__debug__", {})
metadata_defaults_vars = debug_vars.get("metadata_defaults", {})
metadata_user = debug_vars.get("metadata_user", {})
metadata_luci = debug_vars.get("metadata_luci", {})
metadata_dictlet = debug_vars.get("metadata_dictlet", {})
dictlet_debug_vars = metadata.get("__debug__", {})
content_lines = dictlet_debug_vars.get("content", {}).get("content", [])
content_filtered_lines = dictlet_debug_vars.get("content", {}).get(
"filtered", []
)
all_lines = dictlet_debug_vars.get("content", {}).get("all", [])
metadata_content_lines = dictlet_debug_vars.get("content", {}).get(
"metadata", []
)
click.echo("")
# if self.parse_results:
# self.output_item("parsed results", parse_results, self.format, self.pager)
result = {}
# metadata
if self.metadata_defaults:
output_item(
"metadata: extra vars", metadata_defaults_vars, output_type, pager
)
result["metadata_defaults"] = metadata_defaults_vars
if self.metadata_user:
output_item("metadata: user vars", metadata_user, output_type, pager)
result["metadata_user"] = metadata_user
if self.metadata_luci:
output_item("metadata: luci", metadata_luci, output_type, pager)
result["metadata_luci"] = metadata_luci
if self.metadata_dictlet:
output_item("metadata: dictlet", metadata_dictlet, output_type, pager)
result["metadata_dictlet"] = metadata_dictlet
if self.metadata_merged:
output_item("metadata (merged)", metadata, output_type, pager)
result["metadata_merged"] = metadata
# content
if self.all_lines:
output_item("all lines", "\n".join(flatten_lists(all_lines)), "raw", pager)
result["all_lines"] = all_lines
if self.content_lines:
output_item(
"content lines", "\n".join(flatten_lists(content_lines)), "raw", pager
)
result["content_lines"] = content_lines
if self.content_filtered_lines:
output_item(
"content filtered lines",
"\n".join(flatten_lists(content_filtered_lines)),
"raw",
pager,
)
result["content_filtered_lines"] = content_filtered_lines
if self.metadata_lines:
output_item(
"metadata lines",
"\n".join(flatten_lists(metadata_content_lines)),
"raw",
pager,
)
result["metadata_lines"] = metadata_content_lines
return result
[docs]class CatLucifier(Lucifier):
"""Very simple lucifier, outputs the part of the dictlet that is not processed by luci.
"""
LUCI_CAT = {
"__lucify__": {
"register": {
"content_filtered": "__cat__.data",
"metadata": "__cat__.metadata",
},
"doc": {
"short_help": "output of the dictlet (minus the luci parts)",
"epilog": "the 'cat' lucifier is part of the luci package",
},
"vars": {
"__cat__.display_luci_contents": {
"type": bool,
"required": False,
"default": False,
"doc": {"help": "print the whole dictlet, including luci metadata"},
"click": {
"option": {
"is_flag": True,
"param_decls": ["--luci-content", "-l"],
}
},
},
"__cat__.pager": {
"type": bool,
"required": False,
"default": False,
"doc": {"help": "whether to use a pager for display"},
"click": {
"option": {
"param_decls": ["--pager", "-p"],
"type": bool,
"is_flag": True,
}
},
},
},
}
}
DEFAULT_READER = "skip"
def __init__(self, **kwargs):
super(CatLucifier, self).__init__(**kwargs)
[docs] def process_dictlet(self, metadata, dictlet_details):
luci_content = get_key_path_value(metadata, "__cat__.display_luci_contents")
pager = get_key_path_value(metadata, "__cat__.pager")
if luci_content:
processed = get_key_path_value(metadata, "__cat__.metadata")
result = "\n".join(flatten_lists(processed))
else:
data = get_key_path_value(metadata, "__cat__.data")
result = "\n".join(flatten_lists(data))
output(result, output_type="raw", safe=False, pager=pager)
return result
[docs]class TemplateLucifier(Lucifier):
"""Lucifier that uses the dictlet metadata as variables to replace strings in the dictlet content.
"""
LUCI_TEMPLATE = {
"__lucify__": {
"register": {
"content": "__template__.content.all",
"metadata": "__template__.content.data",
"content_filtered": "__template__.content.filtered",
},
"doc": {
"short_help": "use luci metadata for (jinja2) templating, in the same dictlet",
"epilog": "the 'template' lucifier is part of the luci package",
},
"vars": {
"__user__.output_type": create_output_type_var(),
"__user__.pager": create_pager_var(),
"__user__.all": {
"type": bool,
"doc": {
"help": "whether to include parts that were being used by luci to extract metadata in the output"
},
"click": {
"option": {"is_flag": True, "param_decls": ["--all", "-a"]}
},
},
},
}
}
def __init__(self, output_type=None, pager=None, all=False, **kwargs):
super(TemplateLucifier, self).__init__(**kwargs)
[docs] def process_dictlet(self, metadata, dictlet_details):
output_type = get_key_path_value(
metadata, "__user__.output_type", default_value="raw"
)
pager = get_key_path_value(metadata, "__user__.pager", default_value=False)
all = get_key_path_value(metadata, "__user__.all", default_value=False)
content_all = get_key_path_value(metadata, "__template__.content.all")
# content_data = get_key_path_value(metadata, "__template__.content.data")
content_sensible = get_key_path_value(metadata, "__template__.content.filtered")
if not all:
template = content_sensible
else:
template = content_all
for t in template:
result = replace_string("\n".join(t), metadata)
output(result, output_type=output_type, pager=pager)
[docs]class DownloadLucifier(Lucifier):
"""Lucifier that can update a template by downloading and replacing it on filesystem.
The dictlet needs to contain a metadata key 'url' with a child key 'default' and the remote url to download as value. It can also, optionally, contain a child key 'version' and a templated string like 'https://dl.com/dictlet-{{ version }}.yml' as value. The latter will be used if the '--version' option is specified (as described below).
This lucifier adds a few command-line options to the dictlet-part of the command (not the lucifier part, as is normal -- mostly because I like it better that way in this case):
- target ('--target', '-t'): a target file
- replace ('--replace', '-r'): whether to replace an existing file
- version ('--version): whether to download a specific version of the file
By default, this lucifier will print out the content of the remote file. If the '--replace' option is specified, the template itself will be replaced with the remote file. If the '--target' option is specified, the remote file will be downloaded to it's value instead.
"""
LUCI_DOWNLOAD = {
"__lucify__": {
"lucifiers": {
"download": {
"reader": {"name": "yaml", "config": {"metadata_key": "url"}}
}
},
"doc": {
"short_help": "re-download/update a dictlet",
"epilog": "the 'template' lucifier is part of the luci package",
},
"vars": {"__user__.pager": create_pager_var()},
"cli": {
"default_dictlet_vars": {
"__download__.replace": {
"type": bool,
"required": False,
"doc": {
"help": "if set, the target file will be overwritten (if '-t' is not specified, the original dictlet will be overwritten)"
},
"click": {
"option": {
"param_decls": ["--replace", "-r"],
"is_flag": True,
}
},
},
"__download__.target": {
"type": str,
"required": False,
"doc": {
"help": "if specified, the url content will be saved into the specified file, otherwise the dictlet itself will be replaced"
},
"click": {
"option": {
"type": str,
"param_decls": ["--target", "-t"],
"metavar": "PATH",
}
},
},
"__download__.version": {
"type": str,
"default": "default",
"required": False,
"alias": "version",
"doc": {
"help": "the version of the dictlet to download (optional)"
},
"click": {"option": {"metavar": "VERSION"}},
},
},
# "__download__.backup": {
# "meta": {
# "type": bool,
# "default": False,
# "required": False
# },
# "doc": {
# "help": "whether to backup the dictlet or not"
# },
# "cli": {
# "option": {
# "is_flag": True
# }
# }
# }
},
}
}
def __init__(self, **kwargs):
super(DownloadLucifier, self).__init__(**kwargs)
[docs] def process_dictlet(self, metadata, dictlet_details):
# utils.output(metadata, output_type="yaml")
# utils.output(dictlet_details, output_type="yaml")
pager = get_key_path_value(metadata, "__user__.pager", default_value=False)
version = get_key_path_value(
metadata, "__download__.version", default_value="default"
)
replace = get_key_path_value(
metadata, "__download__.replace", default_value=False
)
target = get_key_path_value(metadata, "__download__.target", default_value=None)
if target:
target_file = target
if os.path.isdir(os.path.realpath(target_file)):
dictlet_file = dictlet_details.get("path", None)
if not dictlet_file:
raise Exception("Dictlet file path not found in metadata.")
target_file = os.path.join(target_file, os.path.basename(dictlet_file))
else:
if replace:
target_file = dictlet_details.get("path", None)
if not target_file:
raise Exception("Dictlet file path not found in metadata.")
else:
target_file = None
if target_file:
log.debug("target file: {}".format(target_file))
else:
log.debug("no target file specified, only outputting content to stdout")
if version == "default":
if isinstance(metadata.get("url", None), dict):
url = metadata.get("url", {}).get("default", None)
elif isinstance(metadata.get("url", None), string_types):
url = metadata.get("url")
else:
if "url" not in metadata.keys():
raise Exception(
"Could not determine url, make sure to include the metadata key 'url' in your dictlet."
)
else:
raise Exception(
"'url' key provided, but value type not valid: {}".format(
metadata.get("url")
)
)
else:
url = metadata.get("url", {}).get("version", None)
rep_dict = {"version": version}
url = replace_string(url, rep_dict)
if not url:
raise Exception(
"Could not determine url, make sure to include the metadata key 'url' in your dictlet."
)
if target_file and os.path.exists(target_file):
if not replace:
click.echo("")
click.echo(
"file '{}' already exists, and '--replace' option not specified, doing nothing...".format(
target_file
)
)
return
if target_file:
click.echo("")
click.echo("Downloading: {}".format(url))
verify_ssl = True
disable_cache = True
headers = {}
if disable_cache:
headers = {
"Cache-Control": "no-cache,no-store,must-revalidate,max-age=0",
"Expires": "0",
}
try:
log.debug("downloading url: {}".format(url))
r = requests.get(url, verify=verify_ssl, headers=headers)
r.raise_for_status()
content = r.text
except Exception as e:
raise Exception("Could not download file: {}".format(url), e)
if not target_file:
output(content, output_type="raw", pager=pager)
else:
# TODO: implement backup of original file
with open(target_file, "w") as w:
click.echo("Saving file: {}".format(target_file))
w.write(content)
click.echo("Done.")