#!/usr/bin/env python3 # Copyright 2014 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import argparse import collections import functools import json import logging import os import re import sys import yaml from diskimage_builder import logging_config logger = logging.getLogger(__name__) def get_element_installtype(element_name): default = os.environ.get("DIB_DEFAULT_INSTALLTYPE", "source") return os.environ.get( "DIB_INSTALLTYPE_%s" % element_name.replace('-', '_'), default) def _is_arch_in_list(strlist): """Checks if os.environ['ARCH'] is in comma separated strlist""" strlist = strlist.split(',') map(str.strip, strlist) return os.environ['ARCH'] in strlist def _valid_for_arch(pkg_name, arch, not_arch): """Filter out incorrect ARCH versions""" if arch is None and not_arch is None: # nothing specified; always OK return True if arch and not_arch: print("package-installs configuration error: arch and not_arch " "given for package [%s]" % pkg_name) sys.exit(1) # if we have an arch list, our current arch must be in it # to install. if arch: return _is_arch_in_list(arch) # if we don't have an explicit arch list, we should # install unless we are in the not-arch list. return not _is_arch_in_list(not_arch) def _when(statements): '''evaulate a when: statement Evaluate statements of the form when: ENVIRONMENT_VARIABLE[!]=value Returns True if the package should be installed, False otherwise If the ENVIRONMENT_VARIABLE is unset, raises an error ''' # No statement means install if statements is None: return True if not isinstance(statements, (list, tuple)): statements = [statements] result = [] for s in statements: # FOO = BAR # var op val match = re.match( r"(?P[\w]+)(\s*)(?P=|!=)(\s*)(?P.*)", s) if not match: print("Malformed when line: <%s>" % s) sys.exit(1) match = match.groupdict() var = match['var'] op = match['op'] val = match['val'] if var not in os.environ: raise RuntimeError("The variable <%s> is not set" % var) logger.debug("... when eval %s%s%s against <%s>" % (var, op, val, os.environ[var])) if op == '=': if val == os.environ[var]: result.append(True) continue elif op == '!=': if val != os.environ[var]: result.append(True) continue else: print("Malformed when op: %s" % op) sys.exit(1) result.append(False) return all(result) def collect_data(data, objs, element_name): for pkg_name, params in objs.items(): if not params: params = [{}] if not isinstance(params, (list, tuple)): params = [params] for param in params: logger.debug("Considering %s/%s param:%s" % (element_name, pkg_name, param)) phase = param.get('phase', 'install.d') installs = ["install"] if 'uninstall' in param or 'build-only' in param: # We don't add the package to the uninstall list if # something else has requested we install it without # removing it. in_install = any( map(lambda x: x[0] == pkg_name, data[phase]["install"])) not_in_uninstall = all( map(lambda x: x[0] != pkg_name, data[phase]["uninstall"])) if in_install and not_in_uninstall: if 'build-only' not in param: # Just skip further processing as we have no uninstall # work to do continue else: if 'uninstall' in param: installs = ["uninstall"] if 'build-only' in param: installs = ["install", "uninstall"] else: # Remove any uninstallations if we are trying to install # the package without uninstallation elsewhere. data[phase]["uninstall"] = [ x for x in data[phase]["uninstall"] if x[0] != pkg_name] # Filter out incorrect installtypes installtype = param.get('installtype', None) elem_installtype = get_element_installtype(element_name) valid_installtype = (installtype is None or installtype == elem_installtype) if not valid_installtype: logger.debug("... skipping due to installtype") continue valid_arch = _valid_for_arch(pkg_name, param.get('arch', None), param.get('not-arch', None)) if not valid_arch: logger.debug("... skipping due to arch match") continue dib_py_version = str(param.get('dib_python_version', '')) dib_py_version_env = os.environ.get('DIB_PYTHON_VERSION', '') valid_dib_python_version = (dib_py_version == '' or dib_py_version == dib_py_version_env) if not valid_dib_python_version: logger.debug("... skipping due to python version") continue # True means install, false skip if _when(param.get('when', None)) is False: logger.debug("... skipped due to when: failures") continue for install in installs: logger.debug("... installing for '%s'" % install) data[phase][install].append((pkg_name, element_name)) return data def main(): parser = argparse.ArgumentParser( description="Produce a single packages-installs file from all of" " the available package-installs files") parser.add_argument('--elements', required=True, help="Which elements to squash") parser.add_argument('--path', required=True, help="Elements path to search for elements") parser.add_argument('outfile', help="Location of the output file") args = parser.parse_args() logging_config.setup() # Replicate the logic of finding the first element, because we can't # operate on the post-copied hooks dir, since we lose element context element_dirs = list() for element_name in args.elements.split(): for elements_dir in args.path.split(':'): potential_path = os.path.join(elements_dir, element_name) if os.path.exists(potential_path): element_dirs.append((elements_dir, element_name)) logger.debug("element_dirs -> %s" % element_dirs) # Collect the merge of all of the existing install files in the elements # that are the first on the ELEMENT_PATH final_dict = collections.defaultdict( functools.partial(collections.defaultdict, list)) for (elements_dir, element_name) in element_dirs: for file_type in ('json', 'yaml'): target_file = os.path.join( elements_dir, element_name, "package-installs.%s" % file_type) if not os.path.exists(target_file): continue logger.info("Squashing install file: %s" % target_file) try: objs = json.load(open(target_file)) except ValueError: objs = yaml.safe_load(open(target_file)) final_dict = collect_data(final_dict, objs, element_name) logger.debug("final_dict -> %s" % final_dict) # Write the resulting file with open(args.outfile, 'w') as outfile: json.dump( final_dict, outfile, indent=True, separators=(',', ': '), sort_keys=False) if __name__ == '__main__': main()