Source code for luci.readers

# -*- coding: utf-8 -*-

import abc
import copy
import logging
import os
import re
from collections import OrderedDict

import six
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap

from frutils import (
    add_key_to_dict,
    append_key_to_dict,
    dict_merge,
    ordered_load,
    replace_string,
)
from frutils.defaults import (
    DEFAULT_EXCLUDE_DIRS,
    JINJA_DELIMITER_PROFILES,
    KEY_LUCI_NAME,
)
from .defaults import *
from .exceptions import DictletParseException

log = logging.getLogger("lucify")


[docs]def create_dictlet_reader(dictlet_reader_name, dictlet_reader_config=None): """Create a dictlet reader with the provided config. """ from .plugins import load_dictlet_reader_extension if dictlet_reader_config is None: dictlet_reader_config = {} dictlet_reader = load_dictlet_reader_extension( dictlet_reader_name, init_params=dictlet_reader_config ) return dictlet_reader
# classes
[docs]@six.add_metaclass(abc.ABCMeta) class DictletReader(object): """Abstract base class for a reader that can parse a dictlet of a certain format.""" def __init__(self, **kwargs): if "metadata_key" in kwargs.keys(): self.metadata_key = kwargs["metadata_key"] else: self.metadata_key = KEY_LUCI_NAME
[docs] @abc.abstractmethod def get_content(self, dictlet_path, current_metadata): pass
[docs] @abc.abstractmethod def process_content( self, content, current_vars, luci_metadata, luci_metadata_key_name=KEY_LUCI_NAME ): """Processes a set of lines in a certain format. Returns: dict: the values of the processed dictlet """ pass
[docs] def read_dictlet(self, dictlet_details, current_metadata, luci_metadata={}): log.debug("reading dictlet: {}".format(dictlet_details)) cm = copy.deepcopy(current_metadata) try: content = self.get_content(dictlet_details, cm) except (DictletParseException) as dpe: raise dpe except (Exception) as e: raise DictletParseException(e, dictlet_details) dictlet_details["content"] = content result = self.process_content( content, cm, luci_metadata, luci_metadata_key_name=self.metadata_key ) dict_merge(current_metadata, result, copy_dct=False) return current_metadata
[docs]class ClassDictletReader(DictletReader): """Reader that reads class properties for :class:`~lucify.Lucifier` sub-classes.""" def __init__(self, **kwargs): super(ClassDictletReader, self).__init__(**kwargs)
[docs] def get_content(self, dictlet_details, current_metadata): dictlet_class = dictlet_details.get("class", None) if not dictlet_class: log.debug( "No 'class' property in dictlet details: {}".format(dictlet_details) ) return None log.debug("Reading class: {}".format(dictlet_class)) dictlet_obj = dictlet_class(init_metadata=current_metadata) return dict(dictlet_obj.metadata)
[docs] def process_content( self, content, current_metadata, luci_metadata, luci_metadata_key_name=KEY_LUCI_NAME, ): reader_keys = ( luci_metadata.get("reader_config", {}).get("class", {}).get("keys", {}) ) for key, mapped_key in reader_keys.items(): value = content.get(key, None) if value is None: log.debug( "No value in class for attribute '{}', skipping...".format(key) ) continue temp = OrderedDict() add_key_to_dict(temp, mapped_key, value, split_token=".") dict_merge(current_metadata, temp, copy_dct=False) return current_metadata
[docs]class TextFileDictletReader(DictletReader): """Parent class for readers that read text files. """ def __init__(self, **kwargs): super(TextFileDictletReader, self).__init__(**kwargs)
[docs] @staticmethod def parse_lucify_line(line, luci_key_word=KEY_LUCI_NAME): tokens = re.findall(r"[\w']+", line) lucify = False # ignore_prefix = "" started = None for token in tokens: token = token.lower() if not lucify and not token == DICTLET_READER_MARKER_STRING: continue if not lucify and token == DICTLET_READER_MARKER_STRING: lucify = True continue if token in [DICTLET_START_MARKER_STRING, luci_key_word]: if started is not None: raise Exception( "Both '{}' and '{}' tokens in line, can't process: {}".format( DICTLET_STOP_MARKER_STRING, DICTLET_START_MARKER_STRING, line, ) ) started = True elif token == DICTLET_STOP_MARKER_STRING: if started is not None: raise Exception( "Both '{}' and '{}' tokens in line, can't process: {}".format( DICTLET_START_MARKER_STRING, DICTLET_STOP_MARKER_STRING, line, ) ) started = False else: log.debug("ignoring token: '{}'".format(token)) result = {"line": line} if started is None: started = True result["process"] = started return result
[docs] @abc.abstractmethod def process_lines(self, current_content, current_vars): pass
[docs] def get_content(self, dictlet_details, current_metadata): dictlet_path = dictlet_details.get("path", None) if not dictlet_path: raise Exception("No path provided for details: {}".format(dictlet_details)) dictlet_path = os.path.realpath(dictlet_path) if not os.path.exists(dictlet_path): raise Exception("Path does not exist: {}".format(dictlet_path)) file_type = dictlet_details.get("type", None) if file_type is None: if os.path.isdir(dictlet_path): file_type = "folder" elif os.path.isfile(dictlet_path): file_type = "file" else: raise Exception( "Can't determine file-type for: {}".format(dictlet_path) ) if file_type == "file": if os.path.isfile(dictlet_path): with open(dictlet_path) as dictlet_file: content = dictlet_file.read() return content else: raise Exception("No file for path: {}".format(dictlet_path)) elif file_type == "folder": if os.path.isdir(dictlet_path): return None else: raise Exception("No folder for path: {}".format(dictlet_path))
[docs] def process_content( self, content_orig, current_metadata, luci_metadata, luci_metadata_key_name=KEY_LUCI_NAME, ): ignore_prefix = None all_lines = [] no_marker = True if ( luci_metadata_key_name not in content_orig and DICTLET_READER_MARKER_STRING not in content_orig ): log.debug("No maker found in file, treating all content as metadata") ignore_prefix = "" no_marker = True elif ( luci_metadata_key_name in content_orig and DICTLET_READER_MARKER_STRING not in content_orig ): matches = [ line for line in content_orig.split("\n") if luci_metadata_key_name in line ] if len(matches) > 1: raise Exception( "More than one line containing marker string '{}' found, can't process...".format( luci_metadata_key_name ) ) ignore_prefix = matches[0].partition(luci_metadata_key_name)[0] no_marker = False log.debug( "'{}'-marker found in file, using '{}' as ignore_prefix".format( luci_metadata_key_name, ignore_prefix ) ) else: log.debug( "Using '{}'-keyword to determine ignore_prefix".format( DICTLET_READER_MARKER_STRING ) ) matches = { line.partition(DICTLET_READER_MARKER_STRING)[0] for line in content_orig.split("\n") if DICTLET_READER_MARKER_STRING in line } if len(matches) != 1: raise Exception( "Different prefixes for lucify marker strings: {}".format(matches) ) ignore_prefix = next(iter(matches)) no_marker = False process = False ln = 0 for line in content_orig.split("\n"): ln = ln + 1 is_lucify_line = False if DICTLET_READER_MARKER_STRING in line: is_lucify_line = True details = TextFileDictletReader.parse_lucify_line( line, self.metadata_key ) process = details["process"] if (not process and luci_metadata_key_name in line) or no_marker: process = True if process and ( line.startswith(ignore_prefix) or (ignore_prefix.strip() and line.startswith(ignore_prefix.strip())) ): all_lines.append((ln, is_lucify_line, True, line)) else: all_lines.append((ln, is_lucify_line, False, line)) process_lines = [ line[3][len(ignore_prefix) :] # noqa: E203 for line in all_lines if not line[1] and line[2] ] content_lines = [line[3] for line in all_lines if not line[1] and not line[2]] sensible_content = [ line[3] for line in all_lines if self.is_content_line(line[3], sensible=True) ] current_vars = self.process_lines(process_lines, current_metadata) register_vars = luci_metadata.get("register", {}) for key, value in register_vars.items(): if key == "content": append_key_to_dict(current_vars, value, content_lines) elif key == "metadata": append_key_to_dict(current_vars, value, process_lines) elif key == "content_filtered": append_key_to_dict(current_vars, value, sensible_content) elif key == "all": append_key_to_dict(current_vars, value, content_orig.split("\n")) return current_vars
[docs] def is_content_line(self, line, sensible=True): return not line.startswith("#")
[docs]class YamlDictletReader(DictletReader): def __init__(self, type="dict", **kwargs): super(YamlDictletReader, self).__init__(**kwargs) self.yaml = YAML() self.yaml.default_flow_style = False
[docs] def get_content(self, dictlet_details, current_metadata): dictlet_type = dictlet_details.get("type", "file") if not dictlet_type == "file": raise Exception("Unsupported dictlet type: {}".format(dictlet_type)) path = dictlet_details.get("path", None) if path is None: raise Exception("No path provided for dictlet: {}".format(dictlet_details)) if not os.path.isfile(os.path.realpath(path)): raise Exception("Dictlet not a file: {}".format(path)) with open(path, "r") as f: content = self.yaml.load(f) return content
[docs] def process_content( self, content, current_vars, luci_metadata, luci_metadata_key_name=KEY_LUCI_NAME ): dict_merge(current_vars, content, copy_dct=False) return current_vars
[docs]class LucifyYamlDictionaryDictletReader(TextFileDictletReader): """Reader for yaml-formatet dictlets. Basically, yaml-structures that contain a dict as it's root element. The yaml-structure can be contained in the files comments. """ SHORT_HELP = "reader for yaml formatted templates" def __init__(self, delimiter_profile=JINJA_DELIMITER_PROFILES["luci"], **kwargs): super(LucifyYamlDictionaryDictletReader, self).__init__(**kwargs) self.delimiter_profile = delimiter_profile
[docs] def process_lines(self, content, current_vars): log.debug("Processing: {}".format(content)) # now, I know this isn't really the most # optimal way of doing this, # but I don't really care that much about execution speed yet, # plus I really want to be able to use variables used in previous # lines of the content last_whitespaces = 0 current_lines = "" temp_vars = copy.deepcopy(current_vars) for line in content: if line.strip().startswith("#"): continue whitespaces = len(line) - len(line.lstrip(" ")) current_lines = "{}{}\n".format(current_lines, line) if whitespaces <= last_whitespaces: temp = replace_string( current_lines, temp_vars, **self.delimiter_profile ) if not temp.strip(): continue temp_dict = ordered_load(temp) temp_vars = dict_merge(temp_vars, temp_dict, copy_dct=False) last_whitespaces = whitespaces temp = replace_string(current_lines, temp_vars, **self.delimiter_profile) temp_dict = ordered_load(temp) temp_vars = dict_merge(temp_vars, temp_dict, copy_dct=False) dict_merge(current_vars, temp_vars, copy_dct=False) log.debug("Vars after processing: {}".format(current_vars)) return current_vars
[docs]class FilesInFolderReader(DictletReader): """DictletReader to find all files in a folder and use their paths and filenames for metadata. """ def __init__(self, base_url_var_name="base_url", **kwargs): super(FilesInFolderReader, self).__init__(**kwargs) self.use_first_parent_as_type = True self.base_url_var_name = base_url_var_name
[docs] def is_usable_file(self, path): f = os.path.basename(path) result = not f.startswith(".") return result
[docs] def get_content(self, dictlet_details, current_metadata): """Finds all files in this folder (recursively). """ path = dictlet_details["path"] result = CommentedMap() dictlet_type = dictlet_details.get("type", None) if dictlet_type is None: if os.path.isdir(os.path.realpath(path)): dictlet_type = "folder" else: dictlet_type = "file" if dictlet_type != "folder": raise Exception("Dictlet is not a folder.") for root, dirnames, filenames in os.walk(path, topdown=True): dirnames[:] = [d for d in dirnames if d not in DEFAULT_EXCLUDE_DIRS] for filename in [ f for f in filenames if self.is_usable_file(os.path.join(root, f)) ]: file_path = os.path.join(root, filename) if filename in result.keys(): log.warn( "Duplicate package name '{}', this might cause upredictable behaviour.".format( filename ) ) file_path = os.path.relpath(file_path, path) result[filename] = file_path return result
[docs] def process_content( self, content, current_metadata, luci_metadata, luci_metadata_key_name=None ): for pkg_name, path in content.items(): url = os.path.join( "{{{{ {} }}}}".format(self.base_url_var_name), "{}".format(path) ) if pkg_name in current_metadata.keys(): log.warn( "Duplicate package name '{}', this might cause upredictable behaviour.".format( pkg_name ) ) current_metadata[pkg_name] = {"download": {"url": url}} return current_metadata
[docs]class MetadataFolderReader(DictletReader): """DictletReader to read all yaml files in a directory and collect all content in one dict. Args: meta_file_name (str): if the folder structure contains metadata files to augment final data (see above), specify their file-name here. If this is 'None', no metadata will be augmented merge_keys (bool): whether to merge (True) or overwrite (False) values if the same key (aka filename in the default implementation) is already present. use_relative_paths_as_keys (bool): whether to use relative (True) or absolute (False) paths to the yaml file for the metadata keys **kwargs (dict): will be forwarded to the constructor of the underlying DictletReader parent class """ def __init__( self, meta_file_name=None, merge_keys=False, use_relative_paths_as_keys=True, safe_load=True, **kwargs ): super(MetadataFolderReader, self).__init__(**kwargs) self.meta_file_name = meta_file_name self.use_meta_file = meta_file_name is not None self.merge_keys = merge_keys self.use_relative_paths_as_keys = use_relative_paths_as_keys self.safe_load = safe_load
[docs] def is_usable_file(self, path): """Utility method determine whether a file contains metadata relevant for lupkg or not. By default, this returns true if the filename in question ends with either ".yaml" or ".yml", and does not start with a '.'. Can be overwrittern by a child class. """ f = os.path.basename(path) result = ( ( (self.use_meta_file and f != self.meta_file_name) or not self.use_meta_file ) and (f.endswith("yaml") or f.endswith("yml")) and not f.startswith(".") ) return result
[docs] def get_content(self, dictlet_details, current_metadata): """Reads all yaml files under this folder (recursively). """ path = dictlet_details["path"] abs_path = os.path.abspath(os.path.realpath(path)) result = CommentedMap() meta_dicts = {path: CommentedMap()} dictlet_type = dictlet_details.get("type", None) if dictlet_type is None: if os.path.isdir(os.path.realpath(path)): dictlet_type = "folder" else: dictlet_type = "file" if self.safe_load: yaml = YAML(typ="safe") else: yaml = YAML() if dictlet_type == "folder": for root, dirnames, filenames in os.walk(path, topdown=True): dirnames[:] = [d for d in dirnames if d not in DEFAULT_EXCLUDE_DIRS] if self.use_meta_file: for dirname in dirnames: parent = os.path.join(root, dirname) parent_dict = meta_dicts[root] meta_path = os.path.join(parent, self.meta_file_name) if not os.path.isfile(meta_path): log.debug( "No meta file '{}', using parent metadata for this folder.".format( meta_path ) ) meta_dicts[parent] = parent_dict else: log.debug("Reading meta file: {}".format(meta_path)) with open(meta_path, "r") as fi: # temp = ordered_load(fi) temp = yaml.load(fi) if not temp: meta_dicts[parent] = parent_dict else: temp = dict_merge(parent_dict, temp, copy_dct=True) meta_dicts[parent] = temp for filename in [ f for f in filenames if self.is_usable_file(os.path.join(root, f)) ]: if self.use_meta_file: parent_meta = meta_dicts[root] path_to_md = os.path.join(root, filename) log.debug("Reading: {}".format(path_to_md)) with open(path_to_md, "r") as fi: # yaml_dict = ordered_load(fi) yaml_dict = yaml.load(fi) if not yaml_dict: log.info("file '{}' is empty, ignoring".format(path_to_md)) continue if self.use_meta_file: merged = dict_merge(parent_meta, yaml_dict, copy_dct=True) else: merged = yaml_dict relpath = os.path.relpath(path_to_md, path) if self.use_relative_paths_as_keys: result[relpath] = merged else: result[os.path.join(abs_path, relpath)] = merged elif dictlet_type == "file": with open(path, "r") as fi: # yaml_dict = ordered_load(fi) yaml_dict = yaml.load(fi) if not yaml_dict: log.info("file '{}' is empty, ignoring".format(path)) else: result[path] = yaml_dict return result
[docs] def process_content( self, content, current_metadata, luci_metadata, luci_metadata_key_name=None ): for rel_path, metadata in content.items(): path_tokens = rel_path.split(os.path.sep) file_name = os.path.splitext(path_tokens[-1])[0] if file_name in current_metadata.keys(): if not self.merge_keys: log.debug( "Duplicate description for key '{}': overwriting existing value".format( file_name ) ) current_metadata[file_name] = metadata else: log.debug( "Duplicate description for key '{}': merging with existing value".format( file_name ) ) dict_merge(current_metadata[file_name], metadata, copy_dct=False) else: current_metadata[file_name] = metadata return current_metadata
[docs]def PICK_ALL_FILES_FUNCTION(path): """Default implementation of an 'is_*_file' method.""" if path.endswith(".lupkg"): return False else: return os.path.basename(path)
[docs]def PICK_ALL_FILES_WITH_LUPKG_EXTENSION(path): """Filter that picks files that end with '.lupkg'.""" if os.path.basename(path) != "meta.lupkg" and path.endswith(".lupkg"): return os.path.splitext(os.path.basename(path))[0] else: return False
# def PICK_ALL_FILES_NAMED_META_LUPKG(path): # """Filter that picks files called 'meta.lupkg'.""" # # return os.path.basename(path) == "meta.lupkg" PKG_NAME_STRATEGIES = ["filename", "basename", "foldername", "path", "path_basename"]
[docs]class LupkgFolderReader(DictletReader): """Feature-ful folder reader used for lupkg. This one reads folders containing either only files (in which case the file-name is used as pkg-name, only metadata-files, or a combination of both. If the latter. the file and (side-car) metadata file need to have the same file-name and can only differ in the extension. TODO: list pkg name strategies Args: use_files (bool): whether to add non-metadata-files to the index is_file_function (func): the function to be used to determine whether a file is to be added use_metadata_files (bool): whether to add packages described in metadata files to the index is_metadata_file_function (func): the function to be used to determine whether a metadata file is to be used use_parent_metadata_files (bool): whether to inherit metadata by placing 'non-package' metadata files in the folder tree parent_meta_file_name (str): the name of potential meta parent files include_relative_path_to_files (bool): whether to include the relative paths to a file in the file url dict use_subfolders_as_tags (bool): whether to automatically add subfolder names as tags to the metadata safe_load (bool): whether to 'safe' load yaml reader_config_filename (str): filename in root of folder to contain default reader settings """ def __init__( self, use_files=None, get_pkg_name_function=None, use_metadata_files=None, get_pkg_name_function_metadata=None, use_parent_metadata_files=None, parent_meta_file_name=None, include_relative_path_to_files=None, use_subfolders_as_tags=None, safe_load=True, reader_config_filename=".lupkg_repo", **kwargs ): super(LupkgFolderReader, self).__init__(**kwargs) if use_parent_metadata_files and not use_metadata_files: raise Exception( "Invalid configuration. Can't use parent metadata but not 'normal' metadata files." ) self.use_files = use_files if get_pkg_name_function is None: get_pkg_name_function = PICK_ALL_FILES_FUNCTION self.get_pkg_name_function = get_pkg_name_function self.use_metadata_files = use_metadata_files if get_pkg_name_function_metadata is None: get_pkg_name_function_metadata = PICK_ALL_FILES_WITH_LUPKG_EXTENSION self.get_pkg_name_function_metadata = get_pkg_name_function_metadata self.use_parent_metadata_files = use_parent_metadata_files if parent_meta_file_name is None: parent_meta_file_name = "meta.lupkg" self.parent_meta_file_name = parent_meta_file_name self.include_relative_path_to_files = include_relative_path_to_files self.use_subfolders_as_tags = use_subfolders_as_tags self.safe_load = safe_load self.reader_config_filename = reader_config_filename
[docs] def get_content(self, dictlet_details, current_metadata): path = dictlet_details["path"] abs_path = os.path.abspath(os.path.realpath(path)) result = CommentedMap() meta_dicts = {path: CommentedMap()} dictlet_type = dictlet_details.get("type", None) if dictlet_type is None: if os.path.isdir(os.path.realpath(path)): dictlet_type = "folder" else: dictlet_type = "file" if self.safe_load: yaml = YAML(typ="safe") else: yaml = YAML() if dictlet_type == "file": if self.use_files and self.get_pkg_name_function(path): pass elif self.use_metadata_files and self.get_pkg_name_function_metadata(path): pass else: return result with open(path, "r") as fi: yaml_dict = yaml.load(fi) if not yaml_dict: log.info("file '{}' is empty, ignoring".format(path)) else: result[path] = yaml_dict return result use_files = self.use_files use_metadata_files = self.use_metadata_files use_parent_metadata_files = self.use_parent_metadata_files use_subfolders_as_tags = self.use_subfolders_as_tags include_relative_path_to_files = self.include_relative_path_to_files if self.reader_config_filename: default_reader_config_file = os.path.join( abs_path, self.reader_config_filename ) if os.path.exists(default_reader_config_file) and os.path.isfile( default_reader_config_file ): yaml_read = YAML(typ="safe") with open(default_reader_config_file) as f: conf = yaml_read.load(f) reader_conf = conf.get("reader", {}) if use_files is None: if "use_files" in reader_conf.keys(): use_files = reader_conf["use_files"] if use_metadata_files is None: if "use_metadata_files" in reader_conf.keys(): use_metadata_files = reader_conf["use_metadata_files"] if use_parent_metadata_files is None: if "use_parent_metadata_files" in reader_conf.keys(): use_parent_metadata_files = reader_conf[ "use_parent_metadata_files" ] if use_subfolders_as_tags is None: if "use_subfolders_as_tags" in reader_conf.keys(): use_subfolders_as_tags = reader_conf["use_subfolders_as_tags"] if include_relative_path_to_files is None: if "include_relative_path_to_files" in reader_conf.keys(): include_relative_path_to_files = reader_conf[ "include_relative_path_to_files" ] if use_files is None: use_files = True if use_metadata_files is None: use_metadata_files = False if use_parent_metadata_files is None: use_parent_metadata_files = False if use_subfolders_as_tags is None: use_subfolders_as_tags = False if include_relative_path_to_files is None: include_relative_path_to_files = False # folder for root, dirnames, filenames in os.walk(abs_path, topdown=True): dirnames[:] = [d for d in dirnames if d not in DEFAULT_EXCLUDE_DIRS] if use_parent_metadata_files: for dirname in dirnames: parent = os.path.join(root, dirname) parent_dict = meta_dicts[root] meta_path = os.path.join(parent, self.parent_meta_file_name) if not os.path.exists(meta_path): log.debug( "No meta file '{}', using parent metadata for this folder.".format( meta_path ) ) meta_dicts[parent] = parent_dict else: log.debug("Reading meta file: {}".format(meta_path)) with open(meta_path, "r") as fi: # temp = ordered_load(fi) temp = yaml.load(fi) if not temp: meta_dicts[parent] = parent_dict else: temp = dict_merge(parent_dict, temp, copy_dct=True) meta_dicts[parent] = temp for filename in filenames: path_to_file = os.path.join(root, filename) abs_path_file = os.path.abspath(path_to_file) rel_path_to_file = os.path.relpath(abs_path_file, abs_path) if use_files: pkg_name = self.get_pkg_name_function(abs_path_file) if pkg_name is not None and pkg_name is not False: full_filename = os.path.basename(abs_path_file) # rel_parent_folder = os.path.dirname(path_to_file) # TODO: check rel_parent_folder # abs_parent_folder = os.path.dirname(abs_path_file) # foldername = os.path.basename(abs_parent_folder) basename, ext = os.path.splitext(full_filename) # path_basename = os.path.join(rel_parent_folder, basename) result.setdefault(pkg_name, {}) temp = { "file_name": full_filename, "url": "{{{{ base_url }}}}{}{}".format( os.path.sep, rel_path_to_file ), } if include_relative_path_to_files: temp["path"] = rel_path_to_file result.setdefault(pkg_name, {}).setdefault("urls", []).append( temp ) if use_metadata_files: md_pkg_name = self.get_pkg_name_function_metadata(abs_path_file) if md_pkg_name is not None and md_pkg_name is not False: if use_parent_metadata_files: parent_meta = meta_dicts[root] log.debug("Reading: {}".format(abs_path_file)) with open(abs_path_file, "r") as fi: # yaml_dict = ordered_load(fi) yaml_dict = yaml.load(fi) if not yaml_dict: log.info( "file '{}' is empty, ignoring".format(abs_path_file) ) continue if use_parent_metadata_files: merged = dict_merge(parent_meta, yaml_dict, copy_dct=True) else: merged = yaml_dict if ( "urls" not in merged.keys() and "versions" not in merged.keys() ): if pkg_name not in result.keys(): raise Exception( "No urls or versions specified for package: {}".format( pkg_name ) ) merged["urls"] = result[pkg_name]["urls"] metadata_pkg_name = merged.get("lupkg", {}).get("name", None) if metadata_pkg_name is not None: if metadata_pkg_name != md_pkg_name: raise Exception( "Can't determine unique package name between: {} - {}".format( metadata_pkg_name, md_pkg_name ) ) pkg_dict = result.setdefault(md_pkg_name, {}) if "metadata" in pkg_dict.keys(): raise Exception( "Duplicate metadata for package: {}".format(md_pkg_name) ) pkg_dict["metadata"] = merged return result
[docs] def process_content( self, content, current_metadata, luci_metadata, luci_metadata_key_name=None ): for pkg_name, details in content.items(): if pkg_name in current_metadata.keys(): raise Exception("Duplicate key: {}".format(pkg_name)) if "metadata" in details.keys(): metadata = copy.deepcopy(details.get("metadata", {})) else: metadata = {"urls": copy.deepcopy(details.get("urls", []))} # urls = details.get("urls", []) # # if self.use_metadata_files: # metadata = copy.deepcopy(details.get("metadata", {})) # if "urls" not in metadata.keys() and "versions" not in metadata.keys(): # metadata["urls"] = urls # else: # metadata = {"urls": urls} metadata.setdefault("lupkg", {})["name"] = pkg_name current_metadata[pkg_name] = metadata return current_metadata