# -*- coding: utf-8 -*-
import abc
import copy
import logging
import os
import re
from collections import OrderedDict
import six
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap
from frutils import (
add_key_to_dict,
append_key_to_dict,
dict_merge,
ordered_load,
replace_string,
)
from frutils.defaults import (
DEFAULT_EXCLUDE_DIRS,
JINJA_DELIMITER_PROFILES,
KEY_LUCI_NAME,
)
from .defaults import *
from .exceptions import DictletParseException
log = logging.getLogger("lucify")
[docs]def create_dictlet_reader(dictlet_reader_name, dictlet_reader_config=None):
"""Create a dictlet reader with the provided config.
"""
from .plugins import load_dictlet_reader_extension
if dictlet_reader_config is None:
dictlet_reader_config = {}
dictlet_reader = load_dictlet_reader_extension(
dictlet_reader_name, init_params=dictlet_reader_config
)
return dictlet_reader
# classes
[docs]@six.add_metaclass(abc.ABCMeta)
class DictletReader(object):
"""Abstract base class for a reader that can parse a dictlet of a certain format."""
def __init__(self, **kwargs):
if "metadata_key" in kwargs.keys():
self.metadata_key = kwargs["metadata_key"]
else:
self.metadata_key = KEY_LUCI_NAME
[docs] @abc.abstractmethod
def get_content(self, dictlet_path, current_metadata):
pass
[docs] @abc.abstractmethod
def process_content(
self, content, current_vars, luci_metadata, luci_metadata_key_name=KEY_LUCI_NAME
):
"""Processes a set of lines in a certain format.
Returns:
dict: the values of the processed dictlet
"""
pass
[docs] def read_dictlet(self, dictlet_details, current_metadata, luci_metadata={}):
log.debug("reading dictlet: {}".format(dictlet_details))
cm = copy.deepcopy(current_metadata)
try:
content = self.get_content(dictlet_details, cm)
except (DictletParseException) as dpe:
raise dpe
except (Exception) as e:
raise DictletParseException(e, dictlet_details)
dictlet_details["content"] = content
result = self.process_content(
content, cm, luci_metadata, luci_metadata_key_name=self.metadata_key
)
dict_merge(current_metadata, result, copy_dct=False)
return current_metadata
[docs]class ClassDictletReader(DictletReader):
"""Reader that reads class properties for :class:`~lucify.Lucifier` sub-classes."""
def __init__(self, **kwargs):
super(ClassDictletReader, self).__init__(**kwargs)
[docs] def get_content(self, dictlet_details, current_metadata):
dictlet_class = dictlet_details.get("class", None)
if not dictlet_class:
log.debug(
"No 'class' property in dictlet details: {}".format(dictlet_details)
)
return None
log.debug("Reading class: {}".format(dictlet_class))
dictlet_obj = dictlet_class(init_metadata=current_metadata)
return dict(dictlet_obj.metadata)
[docs] def process_content(
self,
content,
current_metadata,
luci_metadata,
luci_metadata_key_name=KEY_LUCI_NAME,
):
reader_keys = (
luci_metadata.get("reader_config", {}).get("class", {}).get("keys", {})
)
for key, mapped_key in reader_keys.items():
value = content.get(key, None)
if value is None:
log.debug(
"No value in class for attribute '{}', skipping...".format(key)
)
continue
temp = OrderedDict()
add_key_to_dict(temp, mapped_key, value, split_token=".")
dict_merge(current_metadata, temp, copy_dct=False)
return current_metadata
[docs]class TextFileDictletReader(DictletReader):
"""Parent class for readers that read text files.
"""
def __init__(self, **kwargs):
super(TextFileDictletReader, self).__init__(**kwargs)
[docs] @staticmethod
def parse_lucify_line(line, luci_key_word=KEY_LUCI_NAME):
tokens = re.findall(r"[\w']+", line)
lucify = False
# ignore_prefix = ""
started = None
for token in tokens:
token = token.lower()
if not lucify and not token == DICTLET_READER_MARKER_STRING:
continue
if not lucify and token == DICTLET_READER_MARKER_STRING:
lucify = True
continue
if token in [DICTLET_START_MARKER_STRING, luci_key_word]:
if started is not None:
raise Exception(
"Both '{}' and '{}' tokens in line, can't process: {}".format(
DICTLET_STOP_MARKER_STRING,
DICTLET_START_MARKER_STRING,
line,
)
)
started = True
elif token == DICTLET_STOP_MARKER_STRING:
if started is not None:
raise Exception(
"Both '{}' and '{}' tokens in line, can't process: {}".format(
DICTLET_START_MARKER_STRING,
DICTLET_STOP_MARKER_STRING,
line,
)
)
started = False
else:
log.debug("ignoring token: '{}'".format(token))
result = {"line": line}
if started is None:
started = True
result["process"] = started
return result
[docs] @abc.abstractmethod
def process_lines(self, current_content, current_vars):
pass
[docs] def get_content(self, dictlet_details, current_metadata):
dictlet_path = dictlet_details.get("path", None)
if not dictlet_path:
raise Exception("No path provided for details: {}".format(dictlet_details))
dictlet_path = os.path.realpath(dictlet_path)
if not os.path.exists(dictlet_path):
raise Exception("Path does not exist: {}".format(dictlet_path))
file_type = dictlet_details.get("type", None)
if file_type is None:
if os.path.isdir(dictlet_path):
file_type = "folder"
elif os.path.isfile(dictlet_path):
file_type = "file"
else:
raise Exception(
"Can't determine file-type for: {}".format(dictlet_path)
)
if file_type == "file":
if os.path.isfile(dictlet_path):
with open(dictlet_path) as dictlet_file:
content = dictlet_file.read()
return content
else:
raise Exception("No file for path: {}".format(dictlet_path))
elif file_type == "folder":
if os.path.isdir(dictlet_path):
return None
else:
raise Exception("No folder for path: {}".format(dictlet_path))
[docs] def process_content(
self,
content_orig,
current_metadata,
luci_metadata,
luci_metadata_key_name=KEY_LUCI_NAME,
):
ignore_prefix = None
all_lines = []
no_marker = True
if (
luci_metadata_key_name not in content_orig
and DICTLET_READER_MARKER_STRING not in content_orig
):
log.debug("No maker found in file, treating all content as metadata")
ignore_prefix = ""
no_marker = True
elif (
luci_metadata_key_name in content_orig
and DICTLET_READER_MARKER_STRING not in content_orig
):
matches = [
line
for line in content_orig.split("\n")
if luci_metadata_key_name in line
]
if len(matches) > 1:
raise Exception(
"More than one line containing marker string '{}' found, can't process...".format(
luci_metadata_key_name
)
)
ignore_prefix = matches[0].partition(luci_metadata_key_name)[0]
no_marker = False
log.debug(
"'{}'-marker found in file, using '{}' as ignore_prefix".format(
luci_metadata_key_name, ignore_prefix
)
)
else:
log.debug(
"Using '{}'-keyword to determine ignore_prefix".format(
DICTLET_READER_MARKER_STRING
)
)
matches = {
line.partition(DICTLET_READER_MARKER_STRING)[0]
for line in content_orig.split("\n")
if DICTLET_READER_MARKER_STRING in line
}
if len(matches) != 1:
raise Exception(
"Different prefixes for lucify marker strings: {}".format(matches)
)
ignore_prefix = next(iter(matches))
no_marker = False
process = False
ln = 0
for line in content_orig.split("\n"):
ln = ln + 1
is_lucify_line = False
if DICTLET_READER_MARKER_STRING in line:
is_lucify_line = True
details = TextFileDictletReader.parse_lucify_line(
line, self.metadata_key
)
process = details["process"]
if (not process and luci_metadata_key_name in line) or no_marker:
process = True
if process and (
line.startswith(ignore_prefix)
or (ignore_prefix.strip() and line.startswith(ignore_prefix.strip()))
):
all_lines.append((ln, is_lucify_line, True, line))
else:
all_lines.append((ln, is_lucify_line, False, line))
process_lines = [
line[3][len(ignore_prefix) :] # noqa: E203
for line in all_lines
if not line[1] and line[2]
]
content_lines = [line[3] for line in all_lines if not line[1] and not line[2]]
sensible_content = [
line[3]
for line in all_lines
if self.is_content_line(line[3], sensible=True)
]
current_vars = self.process_lines(process_lines, current_metadata)
register_vars = luci_metadata.get("register", {})
for key, value in register_vars.items():
if key == "content":
append_key_to_dict(current_vars, value, content_lines)
elif key == "metadata":
append_key_to_dict(current_vars, value, process_lines)
elif key == "content_filtered":
append_key_to_dict(current_vars, value, sensible_content)
elif key == "all":
append_key_to_dict(current_vars, value, content_orig.split("\n"))
return current_vars
[docs] def is_content_line(self, line, sensible=True):
return not line.startswith("#")
[docs]class YamlDictletReader(DictletReader):
def __init__(self, type="dict", **kwargs):
super(YamlDictletReader, self).__init__(**kwargs)
self.yaml = YAML()
self.yaml.default_flow_style = False
[docs] def get_content(self, dictlet_details, current_metadata):
dictlet_type = dictlet_details.get("type", "file")
if not dictlet_type == "file":
raise Exception("Unsupported dictlet type: {}".format(dictlet_type))
path = dictlet_details.get("path", None)
if path is None:
raise Exception("No path provided for dictlet: {}".format(dictlet_details))
if not os.path.isfile(os.path.realpath(path)):
raise Exception("Dictlet not a file: {}".format(path))
with open(path, "r") as f:
content = self.yaml.load(f)
return content
[docs] def process_content(
self, content, current_vars, luci_metadata, luci_metadata_key_name=KEY_LUCI_NAME
):
dict_merge(current_vars, content, copy_dct=False)
return current_vars
[docs]class LucifyYamlDictionaryDictletReader(TextFileDictletReader):
"""Reader for yaml-formatet dictlets.
Basically, yaml-structures that contain a dict as it's root element. The yaml-structure
can be contained in the files comments.
"""
SHORT_HELP = "reader for yaml formatted templates"
def __init__(self, delimiter_profile=JINJA_DELIMITER_PROFILES["luci"], **kwargs):
super(LucifyYamlDictionaryDictletReader, self).__init__(**kwargs)
self.delimiter_profile = delimiter_profile
[docs] def process_lines(self, content, current_vars):
log.debug("Processing: {}".format(content))
# now, I know this isn't really the most
# optimal way of doing this,
# but I don't really care that much about execution speed yet,
# plus I really want to be able to use variables used in previous
# lines of the content
last_whitespaces = 0
current_lines = ""
temp_vars = copy.deepcopy(current_vars)
for line in content:
if line.strip().startswith("#"):
continue
whitespaces = len(line) - len(line.lstrip(" "))
current_lines = "{}{}\n".format(current_lines, line)
if whitespaces <= last_whitespaces:
temp = replace_string(
current_lines, temp_vars, **self.delimiter_profile
)
if not temp.strip():
continue
temp_dict = ordered_load(temp)
temp_vars = dict_merge(temp_vars, temp_dict, copy_dct=False)
last_whitespaces = whitespaces
temp = replace_string(current_lines, temp_vars, **self.delimiter_profile)
temp_dict = ordered_load(temp)
temp_vars = dict_merge(temp_vars, temp_dict, copy_dct=False)
dict_merge(current_vars, temp_vars, copy_dct=False)
log.debug("Vars after processing: {}".format(current_vars))
return current_vars
[docs]class FilesInFolderReader(DictletReader):
"""DictletReader to find all files in a folder and use their paths and filenames for metadata.
"""
def __init__(self, base_url_var_name="base_url", **kwargs):
super(FilesInFolderReader, self).__init__(**kwargs)
self.use_first_parent_as_type = True
self.base_url_var_name = base_url_var_name
[docs] def is_usable_file(self, path):
f = os.path.basename(path)
result = not f.startswith(".")
return result
[docs] def get_content(self, dictlet_details, current_metadata):
"""Finds all files in this folder (recursively).
"""
path = dictlet_details["path"]
result = CommentedMap()
dictlet_type = dictlet_details.get("type", None)
if dictlet_type is None:
if os.path.isdir(os.path.realpath(path)):
dictlet_type = "folder"
else:
dictlet_type = "file"
if dictlet_type != "folder":
raise Exception("Dictlet is not a folder.")
for root, dirnames, filenames in os.walk(path, topdown=True):
dirnames[:] = [d for d in dirnames if d not in DEFAULT_EXCLUDE_DIRS]
for filename in [
f for f in filenames if self.is_usable_file(os.path.join(root, f))
]:
file_path = os.path.join(root, filename)
if filename in result.keys():
log.warn(
"Duplicate package name '{}', this might cause upredictable behaviour.".format(
filename
)
)
file_path = os.path.relpath(file_path, path)
result[filename] = file_path
return result
[docs] def process_content(
self, content, current_metadata, luci_metadata, luci_metadata_key_name=None
):
for pkg_name, path in content.items():
url = os.path.join(
"{{{{ {} }}}}".format(self.base_url_var_name), "{}".format(path)
)
if pkg_name in current_metadata.keys():
log.warn(
"Duplicate package name '{}', this might cause upredictable behaviour.".format(
pkg_name
)
)
current_metadata[pkg_name] = {"download": {"url": url}}
return current_metadata
[docs]def PICK_ALL_FILES_FUNCTION(path):
"""Default implementation of an 'is_*_file' method."""
if path.endswith(".lupkg"):
return False
else:
return os.path.basename(path)
[docs]def PICK_ALL_FILES_WITH_LUPKG_EXTENSION(path):
"""Filter that picks files that end with '.lupkg'."""
if os.path.basename(path) != "meta.lupkg" and path.endswith(".lupkg"):
return os.path.splitext(os.path.basename(path))[0]
else:
return False
# def PICK_ALL_FILES_NAMED_META_LUPKG(path):
# """Filter that picks files called 'meta.lupkg'."""
#
# return os.path.basename(path) == "meta.lupkg"
PKG_NAME_STRATEGIES = ["filename", "basename", "foldername", "path", "path_basename"]
[docs]class LupkgFolderReader(DictletReader):
"""Feature-ful folder reader used for lupkg.
This one reads folders containing either only files (in which case the file-name is used as pkg-name,
only metadata-files, or a combination of both. If the latter. the file and (side-car) metadata file
need to have the same file-name and can only differ in the extension.
TODO: list pkg name strategies
Args:
use_files (bool): whether to add non-metadata-files to the index
is_file_function (func): the function to be used to determine whether a file is to be added
use_metadata_files (bool): whether to add packages described in metadata files to the index
is_metadata_file_function (func): the function to be used to determine whether a metadata file is to be used
use_parent_metadata_files (bool): whether to inherit metadata by placing 'non-package' metadata files in the folder tree
parent_meta_file_name (str): the name of potential meta parent files
include_relative_path_to_files (bool): whether to include the relative paths to a file in the file url dict
use_subfolders_as_tags (bool): whether to automatically add subfolder names as tags to the metadata
safe_load (bool): whether to 'safe' load yaml
reader_config_filename (str): filename in root of folder to contain default reader settings
"""
def __init__(
self,
use_files=None,
get_pkg_name_function=None,
use_metadata_files=None,
get_pkg_name_function_metadata=None,
use_parent_metadata_files=None,
parent_meta_file_name=None,
include_relative_path_to_files=None,
use_subfolders_as_tags=None,
safe_load=True,
reader_config_filename=".lupkg_repo",
**kwargs
):
super(LupkgFolderReader, self).__init__(**kwargs)
if use_parent_metadata_files and not use_metadata_files:
raise Exception(
"Invalid configuration. Can't use parent metadata but not 'normal' metadata files."
)
self.use_files = use_files
if get_pkg_name_function is None:
get_pkg_name_function = PICK_ALL_FILES_FUNCTION
self.get_pkg_name_function = get_pkg_name_function
self.use_metadata_files = use_metadata_files
if get_pkg_name_function_metadata is None:
get_pkg_name_function_metadata = PICK_ALL_FILES_WITH_LUPKG_EXTENSION
self.get_pkg_name_function_metadata = get_pkg_name_function_metadata
self.use_parent_metadata_files = use_parent_metadata_files
if parent_meta_file_name is None:
parent_meta_file_name = "meta.lupkg"
self.parent_meta_file_name = parent_meta_file_name
self.include_relative_path_to_files = include_relative_path_to_files
self.use_subfolders_as_tags = use_subfolders_as_tags
self.safe_load = safe_load
self.reader_config_filename = reader_config_filename
[docs] def get_content(self, dictlet_details, current_metadata):
path = dictlet_details["path"]
abs_path = os.path.abspath(os.path.realpath(path))
result = CommentedMap()
meta_dicts = {path: CommentedMap()}
dictlet_type = dictlet_details.get("type", None)
if dictlet_type is None:
if os.path.isdir(os.path.realpath(path)):
dictlet_type = "folder"
else:
dictlet_type = "file"
if self.safe_load:
yaml = YAML(typ="safe")
else:
yaml = YAML()
if dictlet_type == "file":
if self.use_files and self.get_pkg_name_function(path):
pass
elif self.use_metadata_files and self.get_pkg_name_function_metadata(path):
pass
else:
return result
with open(path, "r") as fi:
yaml_dict = yaml.load(fi)
if not yaml_dict:
log.info("file '{}' is empty, ignoring".format(path))
else:
result[path] = yaml_dict
return result
use_files = self.use_files
use_metadata_files = self.use_metadata_files
use_parent_metadata_files = self.use_parent_metadata_files
use_subfolders_as_tags = self.use_subfolders_as_tags
include_relative_path_to_files = self.include_relative_path_to_files
if self.reader_config_filename:
default_reader_config_file = os.path.join(
abs_path, self.reader_config_filename
)
if os.path.exists(default_reader_config_file) and os.path.isfile(
default_reader_config_file
):
yaml_read = YAML(typ="safe")
with open(default_reader_config_file) as f:
conf = yaml_read.load(f)
reader_conf = conf.get("reader", {})
if use_files is None:
if "use_files" in reader_conf.keys():
use_files = reader_conf["use_files"]
if use_metadata_files is None:
if "use_metadata_files" in reader_conf.keys():
use_metadata_files = reader_conf["use_metadata_files"]
if use_parent_metadata_files is None:
if "use_parent_metadata_files" in reader_conf.keys():
use_parent_metadata_files = reader_conf[
"use_parent_metadata_files"
]
if use_subfolders_as_tags is None:
if "use_subfolders_as_tags" in reader_conf.keys():
use_subfolders_as_tags = reader_conf["use_subfolders_as_tags"]
if include_relative_path_to_files is None:
if "include_relative_path_to_files" in reader_conf.keys():
include_relative_path_to_files = reader_conf[
"include_relative_path_to_files"
]
if use_files is None:
use_files = True
if use_metadata_files is None:
use_metadata_files = False
if use_parent_metadata_files is None:
use_parent_metadata_files = False
if use_subfolders_as_tags is None:
use_subfolders_as_tags = False
if include_relative_path_to_files is None:
include_relative_path_to_files = False
# folder
for root, dirnames, filenames in os.walk(abs_path, topdown=True):
dirnames[:] = [d for d in dirnames if d not in DEFAULT_EXCLUDE_DIRS]
if use_parent_metadata_files:
for dirname in dirnames:
parent = os.path.join(root, dirname)
parent_dict = meta_dicts[root]
meta_path = os.path.join(parent, self.parent_meta_file_name)
if not os.path.exists(meta_path):
log.debug(
"No meta file '{}', using parent metadata for this folder.".format(
meta_path
)
)
meta_dicts[parent] = parent_dict
else:
log.debug("Reading meta file: {}".format(meta_path))
with open(meta_path, "r") as fi:
# temp = ordered_load(fi)
temp = yaml.load(fi)
if not temp:
meta_dicts[parent] = parent_dict
else:
temp = dict_merge(parent_dict, temp, copy_dct=True)
meta_dicts[parent] = temp
for filename in filenames:
path_to_file = os.path.join(root, filename)
abs_path_file = os.path.abspath(path_to_file)
rel_path_to_file = os.path.relpath(abs_path_file, abs_path)
if use_files:
pkg_name = self.get_pkg_name_function(abs_path_file)
if pkg_name is not None and pkg_name is not False:
full_filename = os.path.basename(abs_path_file)
# rel_parent_folder = os.path.dirname(path_to_file)
# TODO: check rel_parent_folder
# abs_parent_folder = os.path.dirname(abs_path_file)
# foldername = os.path.basename(abs_parent_folder)
basename, ext = os.path.splitext(full_filename)
# path_basename = os.path.join(rel_parent_folder, basename)
result.setdefault(pkg_name, {})
temp = {
"file_name": full_filename,
"url": "{{{{ base_url }}}}{}{}".format(
os.path.sep, rel_path_to_file
),
}
if include_relative_path_to_files:
temp["path"] = rel_path_to_file
result.setdefault(pkg_name, {}).setdefault("urls", []).append(
temp
)
if use_metadata_files:
md_pkg_name = self.get_pkg_name_function_metadata(abs_path_file)
if md_pkg_name is not None and md_pkg_name is not False:
if use_parent_metadata_files:
parent_meta = meta_dicts[root]
log.debug("Reading: {}".format(abs_path_file))
with open(abs_path_file, "r") as fi:
# yaml_dict = ordered_load(fi)
yaml_dict = yaml.load(fi)
if not yaml_dict:
log.info(
"file '{}' is empty, ignoring".format(abs_path_file)
)
continue
if use_parent_metadata_files:
merged = dict_merge(parent_meta, yaml_dict, copy_dct=True)
else:
merged = yaml_dict
if (
"urls" not in merged.keys()
and "versions" not in merged.keys()
):
if pkg_name not in result.keys():
raise Exception(
"No urls or versions specified for package: {}".format(
pkg_name
)
)
merged["urls"] = result[pkg_name]["urls"]
metadata_pkg_name = merged.get("lupkg", {}).get("name", None)
if metadata_pkg_name is not None:
if metadata_pkg_name != md_pkg_name:
raise Exception(
"Can't determine unique package name between: {} - {}".format(
metadata_pkg_name, md_pkg_name
)
)
pkg_dict = result.setdefault(md_pkg_name, {})
if "metadata" in pkg_dict.keys():
raise Exception(
"Duplicate metadata for package: {}".format(md_pkg_name)
)
pkg_dict["metadata"] = merged
return result
[docs] def process_content(
self, content, current_metadata, luci_metadata, luci_metadata_key_name=None
):
for pkg_name, details in content.items():
if pkg_name in current_metadata.keys():
raise Exception("Duplicate key: {}".format(pkg_name))
if "metadata" in details.keys():
metadata = copy.deepcopy(details.get("metadata", {}))
else:
metadata = {"urls": copy.deepcopy(details.get("urls", []))}
# urls = details.get("urls", [])
#
# if self.use_metadata_files:
# metadata = copy.deepcopy(details.get("metadata", {}))
# if "urls" not in metadata.keys() and "versions" not in metadata.keys():
# metadata["urls"] = urls
# else:
# metadata = {"urls": urls}
metadata.setdefault("lupkg", {})["name"] = pkg_name
current_metadata[pkg_name] = metadata
return current_metadata