Mass Update

* Util Module
  * Provides: color class (for specialty stdout logging)
  * Provides: constants classes for rpm, errors, and mock
  * Provides: error classes for generic error handling and future fault
    handler
  * Provides: generic classes for generic, repeatable use cases
  * Provides: rpmutil with rpm utilities that range from basic to
    advanced metadata handling
* Add mock module
  * Can generate a usable mock config based on input provided
  * Can generate mock plugin configuration as provided
    * cache related plugins are hardcoded as disabled
  * Supports plugins: chroot scanning, embedding files, bind mounts
  * Can generate basic dnf configs with repo information
  * (Currently limited) Error handler
  * Runs mock commands (such as build, buildsrpm, init, shell)
* Add modularity module (very limited, doesn't really do much)
* Add peridotpb example (does nothing, will likely be its own thing)
* Add MIT license
This commit is contained in:
Louis Abel 2023-06-14 00:39:36 -07:00
parent 771e79c637
commit e48a54db3a
Signed by: label
GPG Key ID: 6735C0E1BD65D048
22 changed files with 2663 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__/

18
LICENSE Normal file
View File

@ -0,0 +1,18 @@
Copyright 2023 Louis Abel <label@rockylinux.org>
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the “Software”), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

106
README.md
View File

@ -1,3 +1,109 @@
# Platform POC
A POC for builder nodes or developer purposes.
## Examples of pv2.util
```
[label@sani buildsys]$ python3
Python 3.11.3 (main, Apr 5 2023, 00:00:00) [GCC 13.0.1 20230401 (Red Hat 13.0.1-0)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from pv2.util import rpmutil
>>> rpm_header = rpmutil.get_rpm_header('/tmp/golang-1.19.4-1.el9.src.rpm')
>>> generic = rpmutil.get_rpm_metadata_from_hdr(rpm_header)
>>> generic['excludearch']
[]
>>> generic['exclusivearch']
['x86_64', 'aarch64', 'ppc64le', 's390x']
# Or the actual definition itself to skip the above
>>> rpmutil.get_exclu_from_package(rpm_header)
{'ExcludeArch': [], 'ExclusiveArch': ['x86_64', 'aarch64', 'ppc64le', 's390x']}
```
```
[label@sani buildsys]$ python3
Python 3.11.3 (main, Apr 5 2023, 00:00:00) [GCC 13.0.1 20230401 (Red Hat 13.0.1-0)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from pv2.util import rpmutil
>>> rpm_header = rpmputil.get_rpm_header('/tmp/rocky-release-8.9-1.4.el8.noarch.rpm')
>>> generic = rpmutil.get_rpm_metadata_from_hdr(rpm_header)
>>> generic.keys()
dict_keys(['changelog_xml', 'files', 'obsoletes', 'provides', 'conflicts', 'requires', 'vendor', 'buildhost', 'filetime', 'description', 'license', 'nvr', 'nevra', 'name', 'version', 'release', 'epoch', 'arch', 'archivesize', 'packagesize'])
>>> generic['buildhost']
'ord1-prod-a64build003.svc.aws.rockylinux.org'
>>> generic['description']
'Rocky Linux release files.'
>>> generic['nvr']
'rocky-release-8.9-1.4.el8'
>>> generic['files']
['/etc/centos-release', '/etc/issue', '/etc/issue.net', '/etc/os-release', '/etc/redhat-release', '/etc/rocky-release', '/etc/rocky-release-upstream', '/etc/system-release', '/etc/system-release-cpe', '/usr/lib/os-release', '/usr/lib/rpm/macros.d/macros.dist', '/usr/lib/systemd/system-preset/85-display-manager.preset', '/usr/lib/systemd/system-preset/90-default.preset', '/usr/lib/systemd/system-preset/99-default-disable.preset', '/usr/share/doc/rocky-release/COMMUNITY-CHARTER', '/usr/share/doc/rocky-release/Contributors', '/usr/share/licenses/rocky-release/LICENSE', '/usr/share/man/man1/rocky.1.gz', '/usr/share/redhat-release', '/usr/share/rocky-release/EULA']
```
## Examples of pv2.mock
```
[label@sani buildsys]$ python3
Python 3.11.3 (main, Apr 5 2023, 00:00:00) [GCC 13.0.1 20230401 (Red Hat 13.0.1-0)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from pv2.mock.config import DnfConfig, DnfRepoConfig, MockConfig, MockPluginConfig, MockChrootFileConfig, MockMacroFileConfig
>>> repo = DnfRepoConfig(repoid='devel', name='baseos', priority='99', baseurl='http://dl.rockylinux.org/pub/rocky/9/devel/x86_64/os', enabled=True, gpgcheck=False)
>>> repo_list = [repo]
>>> dnf_base_config = DnfConfig(repositories=repo_list)
>>> mock_config = MockConfig(root='rocky-9-x86_64-example', target_arch='x86_64', dist='.el9', distribution='Rocky Linux', dnf_config=dnf_base_config, releasever='9')
>>> mock_config.export_mock_config('/tmp/ex.cfg')
[label@sani buildsys]$ cat /tmp/ex.cfg
config_opts["root"] = "rocky-9-x86_64-example"
config_opts["chroot_setup_cmd"] = "install bash bzip2 coreutils cpio diffutils findutils gawk glibc-minimal-langpack grep gzip info make patch redhat-rpm-config rpm-build sed shadow-utils system-release tar unzip util-linux which xz"
config_opts["dist"] = "el9"
config_opts["legal_host_arches"] = ('x86_64',)
config_opts["macros"]["%_host"] = "x86_64-redhat-linux-gnu"
config_opts["macros"]["%_host_cpu"] = "x86_64"
config_opts["macros"]["%_rpmfilename"] = "%%{NAME}-%%{VERSION}-%%{RELEASE}.%%{ARCH}.rpm"
config_opts["macros"]["%_vendor"] = "redhat"
config_opts["macros"]["%_vendor_host"] = "redhat"
config_opts["macros"]["%packager"] = "Default Packager <packager@noone.home>"
config_opts["macros"]["%vendor"] = "Default Vendor"
config_opts["print_main_output"] = True
config_opts["releasever"] = "9"
config_opts["rpmbuild_networking"] = False
config_opts["target_arch"] = "x86_64"
config_opts["use_host_resolv"] = False
config_opts["files"]["/etc/rpm/macros.xx"] = """
%dist %{!?distprefix0:%{?distprefix}}%{expand:%{lua:for i=0,9999 do print("%{?distprefix" .. i .."}") end}}.el9%{?distsuffix}%{?with_bootstrap:~bootstrap}
%distribution Rocky Linux
"""
config_opts["dnf.conf"] = """
[main]
assumeyes=1
best=1
debuglevel=1
gpgcheck=0
install_weak_deps=0
keepcache=1
logfile=/var/log/yum.log
mdpolicy=group:primary
metadata_expire=0
obsoletes=1
protected_packages=
reposdir=/dev/null
retries=20
rpm_verbosity=info
syslog_device=
syslog_ident=peridotbuilder
user_agent=peridotbuilder
[devel]
baseurl=http://dl.rockylinux.org/pub/rocky/9/devel/x86_64/os
enabled=1
gpgcheck=0
name=baseos
priority=99
repoid=devel
"""
```

13
mock/__init__.py Normal file
View File

@ -0,0 +1,13 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
Mock and mock accessories
"""
# import all thingies here
from .config import (DnfConfig, DnfRepoConfig, MockConfig, MockPluginConfig,
MockBindMountPluginConfig, MockChrootFileConfig,
MockChrootScanPluginConfig, MockMacroConfig,
MockMacroFileConfig, MockShowrcPluginConfig)
from .error import MockErrorParser
from .runner import MockResult, MockRunner, MockErrorResulter

872
mock/config.py Normal file
View File

@ -0,0 +1,872 @@
# -*- mode:python; coding:utf-8; -*-
# Louis Abel <label@rockylinux.org>
"""
Utility functions for mock configuration.
"""
import collections
import copy
import json
import re
import hashlib
from configparser import ConfigParser
from io import StringIO, IOBase
from pv2.util import error as err
from pv2.util import constants as const
from pv2.util import generic as generic_util
# List all classes in this module
__all__ = [
'DnfConfig',
'DnfRepoConfig',
'MockConfig',
'MockPluginConfig',
'MockBindMountPluginConfig',
'MockChrootScanPluginConfig',
'MockChrootFileConfig',
'MockConfigUtils',
'MockMacroConfig',
'MockMacroFileConfig',
]
# pylint: disable=too-few-public-methods
class MockConfigUtils:
"""
Mock config utilities. Provides basic things needed when making a mock
config.
"""
@staticmethod
def config_string(value):
"""
Converts a given value to a mock compatible string
Value should be:
* bool
* int
* string
* list
* tuple
* None
"""
# If the value being sent is none, a boolean, int, or tuple, just
# straight up return it as a string.
if value is None or isinstance(value, (bool, int, tuple)):
return str(value)
# If it's a string or a list, return it as a json string/list. We make
# sure we convert it properly and going through json makes sure it
# comes out right.
if isinstance(value, (str, list)):
return json.dumps(value)
# Error out if a value was sent that is not supported.
raise err.ProvidedValueError(f'{type(value)}: {value} is not supported.')
@staticmethod
def gen_config_string(name: str, status: bool) -> str:
"""
Generates a output string to enable a plugin
"""
config_name = copy.copy(name)
config_status = __class__.config_string(status)
output = f'config_opts["plugin_conf"]["{config_name}_enable"] = {config_status}\n'
return output
@staticmethod
def gen_config_string_with_opts(name: str, status: bool, opts: dict) -> str:
"""
Generates a output string to add options to an enabled plugin
"""
config_name = copy.copy(name)
config_status = __class__.config_string(status)
config_opts = copy.copy(opts)
output = f'config_opts["plugin_conf"]["{config_name}_enable"] = {config_status}\n'
if not status:
return output
output += f'config_opts["plugin_conf"]["{config_name}_opts"] = {{}}\n'
# If plugin options were provided, we try to go through and spit them
# out properly. Some documented plugins use nested dictionaries and the
# value being a string. This helps with that.
for key, option in sorted(config_opts):
key_config = __class__.config_string(key)
option_config = __class__.config_string(option)
# pylint: disable=line-too-long
output += f'config_opts["plugin_conf"]["{config_name}_opts"][{key_config}] = {option_config}\n'
return output
@staticmethod
def gen_config_option(option, value, append=False) -> str:
"""
Helps generate the 'config_opts' part of a mock configuration.
"""
outter = ''
option = __class__.config_string(option)
# If a dictionary, get all key value pairs and splay them out into
# strings (sending to config_string).
if isinstance(value, dict):
for key, val in sorted(value.items()):
key_name = __class__.config_string(key)
val_name = __class__.config_string(val)
outter += f'config_opts[{option}][{key_name}] = {val_name}\n'
# Some options/plugins use .append for whatever reason. Setting
# append to True will allow this portion to work and play it out into a
# string.
elif append:
value_str = __class__.config_string(value)
outter += f'config_opts[{option}].append({value_str})\n'
# Some options are just options in general, a key value string. This
# covers the rest.
else:
value_str = __class__.config_string(value)
# pylint: disable=consider-using-f-string
outter += f'config_opts[{option}] = {value_str}\n'
return outter
class DnfConfigurator:
"""
Base class for dnf configuration generation. Should only contain static
classes.
"""
@staticmethod
def gen_config_section(section, opts):
"""
Generate a config section using the config parser and data we're
receiving. This should be able to handle both [main] and repo sections.
"""
# A dnf configuration is key=value, sort of like an ini file.
# ConfigParser gets us close to that.
config = ConfigParser()
config.add_section(section)
for key, value in sorted(opts.items()):
# Continue if repositoryid was caught. We already added the section
# above.
if key == 'repositoryid':
continue
# Based on the key we received, we'll determine how the value will
# be presented. For example, for cases of the key/values being
# boolean options, regardless of what's received as the truthy
# value, we'll convert it to a string integer. The rest are
# strings in general.
if key in const.MockConstants.MOCK_DNF_BOOL_OPTIONS:
config.set(section, key, generic_util.gen_bool_option(value))
elif key in const.MockConstants.MOCK_DNF_STR_OPTIONS:
config.set(section, key, str(value))
elif key in const.MockConstants.MOCK_DNF_LIST_OPTIONS:
config.set(section, key, value.strip())
elif key == 'baseurl':
if isinstance(value, (list, tuple)):
value = "\n ".join(value)
config.set(section, key, value.strip())
else:
config.set(section, key, generic_util.trim_non_empty_string(key, value))
# Export the configuration we made into a file descriptor for use in
# DnfConfig.
file_descriptor = StringIO()
config.write(file_descriptor, space_around_delimiters=False)
file_descriptor.flush()
file_descriptor.seek(0)
return file_descriptor.read()
class DnfConfig(DnfConfigurator):
"""
This helps with the base configuration part of a mock config.
"""
# All these arguments are used. Everything made here is typically pushed
# into MockConfig.
# pylint: disable=too-many-locals,too-many-arguments,unused-argument
def __init__(
self,
debuglevel=1,
retries=20,
obsoletes=True,
gpgcheck=False,
assumeyes=True,
keepcache=True,
best=True,
syslog_ident='peridotbuilder',
syslog_device='',
metadata_expire=0,
install_weak_deps=False,
protected_packages='',
reposdir='/dev/null',
logfile='/var/log/yum.log',
mdpolicy='group:primary',
rpmverbosity='info',
repositories=None,
module_platform_id=None,
user_agent='peridotbuilder',
exclude=None,
):
if rpmverbosity not in const.MockConstants.MOCK_RPM_VERBOSITY:
raise err.ProvidedValueError(f'{rpmverbosity} is not set to a valid value')
# The repodata setup is a bit weird. What we do is we go through all
# "locals" for this class and build everything into a dictionary. We
# later send this and the repositories dictionary to gen_config_section.
self.__repodata = {}
for (key, value) in iter(list(locals().items())):
if key not in ['self', 'repositories'] and value is not None:
self.__repodata[key] = value
self.__repositories = {}
if repositories:
for repo in repositories:
self.add_repo_slot(repo)
def add_repo_slot(self, repo):
"""
Adds a repository as needed for mock.
DnfRepoConfig object is expected for repo.
"""
if not isinstance(repo, DnfRepoConfig):
raise err.ProvidedValueError(f'This type of repo is not supported: {type(repo)}')
if repo.name in self.__repositories:
raise err.ExistsValueError(f'Repository already added: {repo.name}')
self.__repositories[repo.name] = repo
def gen_config(self) -> str:
"""
Generates the configuration that will be used for mock.
Call this to generate the configuration.
"""
outter = 'config_opts["dnf.conf"] = """\n'
outter += self.gen_config_section('main', self.__repodata)
# Each "repo" instance as a gen_config() command as DnfRepoConfig has
# that method.
for repo_name in sorted(self.__repositories.keys()):
outter += self.__repositories[repo_name].gen_config()
outter += '"""\n'
return outter
class DnfRepoConfig(DnfConfigurator):
"""
This helps with the repo configs that would be in a mock config.
"""
# pylint: disable=too-many-arguments,unused-argument
def __init__(self,
repoid,
name,
priority,
baseurl=None,
enabled=True,
gpgcheck=None,
gpgkey=None,
sslverify=None,
module_hotfixes=None
):
"""
Basic dnf repo init, tailored for peridot usage. Mirror lists are *not*
supported in this class.
repoid: str
A unique name for the repository.
name: str
Human readable repo description
priority: str
Repository priority. Recommended to set if emulating koji tagging
and/or doing bootstrapping of some sort.
baseurl: str or list
A URL to the directory where the repo is located. repodata must be
there. Multiple URL's can be provided as a list.
enabled: bool or int
Enabled (True or 1) or disabled (False or 0) for this repository.
More than likely if you've added some extra repository, you want it
enabled. Otherwise, why are you adding it? For aesthetic reasons?
gpgcheck: bool or int
Perform a GPG check on packages if set to True/1.
gpgkey: str or None
Some URL or location of the repo gpg key
sslverify: str or None
Enable SSL certificate verification if set to 1.
"""
self.__repoconf = {}
for (key, value) in locals().items():
if key != 'self' and value is not None:
self.__repoconf[key] = value
def gen_config(self) -> str:
"""
Generates the dnf repo config
Returns a string
"""
section = generic_util.trim_non_empty_string(
'repoid',
self.__repoconf['repoid']
)
return self.gen_config_section(section, self.__repoconf)
@property
def name(self):
"""
Repo name
"""
return self.__repoconf['name']
# All mock classes
class MockConfig(MockConfigUtils):
"""
Mock configuration file generator
"""
# pylint: disable=too-many-locals,too-many-arguments,unused-argument
def __init__(
self,
target_arch,
root=None,
chroot_setup_cmd=None,
chroot_setup_cmd_pkgs=None,
dist=None,
releasever=None,
package_manager: str = 'dnf',
enable_networking: bool = False,
files=None,
macros=None,
dnf_config=None,
basedir=None,
print_main_output: bool = True,
target_vendor: str = 'redhat',
vendor: str = 'Default Vendor',
packager: str = 'Default Packager <packager@noone.home>',
distsuffix=None,
distribution=None,
**kwargs
):
"""
Mock config init
target_arch: string (config_opts['target_arch'])
files: list (optional)
dist: must be a string with starting characters . and alphanumeric character
macros: dict expected, key should start with '%'
target_vendor: typically 'redhat' and shouldn't be changed in most cases
vendor: packaging vendor, e.g. Rocky Enterprise Software Foundation
packager: the packager, e.g. Release Engineering <releng@rockylinux.org>
chroot_setup_cmd_pkgs: list of packages for the chroot
"""
# A dist value must be defined. This dist value is typically what we
# see as the %{dist} macro in RPM distributions. For EL and Fedora,
# they usually start with a "." and then continue with an alphanumeric
# character.
if not dist:
raise err.MissingValueError('The dist value is NOT defined')
if dist and not re.match(r'^\.[a-zA-Z0-9]', dist):
raise err.ProvidedValueError('The dist value does not start with a ' +
'. and alphanumeric character')
# A releasever value must be defined. This is basically the version of
# the EL we're building for.
if not releasever:
raise err.MissingValueError('The releasever value is NOT defined.')
if releasever and not re.match(r'^[0-9]+', releasever):
raise err.ProvidedValueError('The releasever value does not start ' +
'with a number.')
# Set chroot defaults if necessary. In the constants module, we have a
# list of the most basic package set required. In the event that
# someone is building a mock config to use, they can set the
# chroot_setup_cmd if they wish to something other than install
# (usually this is almost never the case). More importantly, the
# packages actually installed into the chroot can be set. Some projects
# in peridot can potentially dictate this to something other than the
# defaults.
if not chroot_setup_cmd:
chroot_setup_cmd = const.MockConstants.MOCK_DEFAULT_CHROOT_SETUP_CMD
if not chroot_setup_cmd_pkgs:
chroot_setup_cmd_pkgs = const.MockConstants.MOCK_DEFAULT_CHROOT_BUILD_PKGS
# Each mock chroot needs a name. We do not arbitrarily generate any.
# The admin must be specific on what they want.
if not root:
raise err.MissingValueError('The mock root name was not provided.')
# Here we are building the basic mock configuration. We push most of it
# into dictionaries and then later translate it all into strings.
legal_host_arches = self.determine_legal_host_arches(target_arch)
interpreted_dist = self.determine_dist_macro(dist)
chroot_pkgs = ' '.join(chroot_setup_cmd_pkgs)
chroot_setup_cmd_string = chroot_setup_cmd + ' ' + chroot_pkgs
default_macros = {
'%_rpmfilename': '%%{NAME}-%%{VERSION}-%%{RELEASE}.%%{ARCH}.rpm',
'%_host': f'{target_arch}-{target_vendor}-linux-gnu',
'%_host_cpu': target_arch,
'%_vendor': target_vendor,
'%_vendor_host': target_vendor,
'%vendor': vendor,
'%packager': packager,
}
self.__config_opts = {
'root': root,
'target_arch': target_arch,
'legal_host_arches': legal_host_arches,
'chroot_setup_cmd': chroot_setup_cmd_string,
'dist': dist.strip('.'),
'releasever': releasever,
'basedir': basedir,
'use_host_resolv': enable_networking,
'rpmbuild_networking': enable_networking,
'print_main_output': print_main_output,
'macros': default_macros,
}
self.__config_opts.update(**kwargs)
self.__extra_config_opts = collections.defaultdict(list)
self.__files = {}
self.__macros = {}
self.__plugins = {}
if files:
for chroot_file in files:
self.add_file(chroot_file)
# Set absolute default macros for each build. This is a partial carry
# over from peridot v1. We add these to an /etc/rpm/macros... file on
# purpose. Otherwise, if they are set as macros in config_opts, they
# are placed in /builddir/.rpmmacros, which cannot be overriden. Doing
# this ensures we can override these macros (e.g. for modules)
starter_macros = {
'%dist': interpreted_dist,
'%distribution': distribution,
}
self.add_macros(starter_macros, macro_file='/etc/rpm/macros.xx')
if macros:
self.add_macros(macros)
# Set the absolute disabled plugins for each build. These three are
# disabled on purpose. Do NOT alter these. Do NOT attempt to override
# them. There should never be a reason to ever have these enabled in a
# build system nor in development tools that use this module.
yum_cache_plugin = MockPluginConfig(name='yum_cache', enable=False)
root_cache_plugin = MockPluginConfig(name='root_cache', enable=False)
ccache_plugin = MockPluginConfig(name='ccache', enable=False)
self.add_plugin(yum_cache_plugin)
self.add_plugin(root_cache_plugin)
self.add_plugin(ccache_plugin)
self.__dnf_config = dnf_config
def add_file(self, chroot_file):
"""
Adds a chroot file to the configuration.
"""
if chroot_file.file in self.__files:
raise err.ProvidedValueError(f'file {chroot_file.file} is already added')
self.__files[chroot_file.file] = chroot_file
def add_macros(self, macro_set, macro_file='/etc/macros/macros.zz'):
"""
Adds a set of macros to a mock configuration. This generates a file
that will be placed into the mock chroot, rather than
/builddir/.rpmmacros made by config_opts.
"""
macro_data = ''
for key, value in macro_set.items():
if '%' not in key:
macro_name = f'%{key}'
else:
macro_name = key
if not value:
continue
macro_value = value
macro_data += f'{macro_name} {macro_value}\n'
macro_config = MockMacroFileConfig(content=macro_data, file=macro_file)
returned_content = macro_config.gen_config()
self.__macros[macro_file] = returned_content
def add_plugin(self, plugin):
"""
Adds a mock plugin to the configuration.
"""
if plugin.name in self.__plugins:
raise err.ProvidedValueError(f'plugin {plugin.name} is already configured')
self.__plugins[plugin.name] = plugin
def module_install(self, module_name):
"""
Adds a module to module_install
"""
if 'module_install' not in self.__config_opts:
self.__config_opts['module_install'] = []
if module_name in self.__config_opts['module_install']:
raise err.ExistsValueError(f'{module_name} is already provided in module_install')
self.__config_opts['module_install'].append(module_name)
def module_enable(self, module_name):
"""
Adds a module to module_enable
"""
if 'module_enable' not in self.__config_opts:
self.__config_opts['module_enable'] = []
if module_name in self.__config_opts['module_enable']:
raise err.ExistsValueError(f'{module_name} is already provided in module_enable')
self.__config_opts['module_enable'].append(module_name)
def add_config_opt(self, key: str, value: str):
"""
Use this to add additional options not covered by this module
"""
self.__extra_config_opts[key].append(value)
@staticmethod
def determine_dist_macro(dist: str) -> str:
"""
Return a string of the interpreted dist macro. This will typically
match current EL release packages.
"""
# We don't want a case where we are sending "~bootstrap" as the dist
# already. So we're stripping it and letting the build figure it out
# for itself. The macro with_bootstrap conditional should dictate it.
if "~bootstrap" in dist:
starting_dist = dist.replace('~bootstrap', '')
else:
starting_dist = dist
# This is the current dist value that is used in current EL's. It will
# likely change over time. This value is *also* provided in
# system-release, but having it here is to make sure it *is* here just
# in case. This is especially useful when bootstrapping from ELN or
# stream.
# pylint: disable=line-too-long,consider-using-f-string
dist_value = '%{{!?distprefix0:%{{?distprefix}}}}%{{expand:%{{lua:for i=0,9999 do print("%{{?distprefix" .. i .."}}") end}}}}{0}%{{?distsuffix}}%{{?with_bootstrap:~bootstrap}}'.format(starting_dist)
return dist_value
# pylint: disable=too-many-return-statements
@staticmethod
def determine_legal_host_arches(target_arch: str) -> tuple:
"""
Return a tuple of acceptable arches for a given architecture. This will
appear as a list in the final mock config.
"""
# The legal_host_arches is typically a tuple of supported arches for a
# given platform. Based on the target_arch sent, we'll set the legal
# arches.
# We can easily use "switch" here but we are accounting for python 3.9
# at this time, which does not have it.
if target_arch == "x86_64":
return const.MockConstants.MOCK_X86_64_LEGAL_ARCHES
if target_arch in ['i386', 'i486', 'i586', 'i686']:
return const.MockConstants.MOCK_I686_LEGAL_ARCHES
if target_arch in "aarch64":
return const.MockConstants.MOCK_AARCH64_LEGAL_ARCHES
if target_arch in "armv7hl":
return const.MockConstants.MOCK_ARMV7HL_LEGAL_ARCHES
if target_arch in "ppc64le":
return const.MockConstants.MOCK_PPC64LE_LEGAL_ARCHES
if target_arch in "s390x":
return const.MockConstants.MOCK_S390X_LEGAL_ARCHES
if target_arch in "riscv64":
return const.MockConstants.MOCK_RISCV64_LEGAL_ARCHES
# This shouldn't happen, but who knows
if target_arch in "noarch":
return const.MockConstants.MOCK_NOARCH_LEGAL_ARCHES
return err.ProvidedValueError(f'Legal arches not found for {target_arch}.')
def set_dnf_config(self, dnf_config):
"""
Adds a dnf config section
"""
self.__dnf_config = dnf_config
# Disabling until I can figure out a better way to handle this
# pylint: disable=too-many-branches
def export_mock_config(self, config_file, root=None):
"""
Exports the mock configuration to a file.
"""
if not root:
if self.__config_opts.get('root'):
root = self.__config_opts.get('root')
else:
raise err.MissingValueError('root value is missing. This should ' +
'not have happened and is likely the ' +
'result of this module being ' +
'modified and not tested.')
if not isinstance(config_file, str):
if isinstance(config_file, IOBase):
raise err.ProvidedValueError('config_file must be a string. it cannot ' \
'be an open file handle.')
raise err.ProvidedValueError('config_file must be a string.')
# This is where we'll write the file. We'll go through each
# configuration option, generate their configs as they're found, and
# write them. It should look close, if not identical to a typical mock
# configuration.
with open(config_file, 'w', encoding='utf-8') as file_descriptor:
try:
if root:
file_descriptor.write(self.gen_config_option('root', root))
for option, value in sorted(self.__config_opts.items()):
if option == 'root' or value is None:
continue
file_descriptor.write(self.gen_config_option(option, value))
for option, value_list in sorted(self.__extra_config_opts.items()):
for value in value_list:
file_descriptor.write(self.gen_config_option(option, value, append=True))
for plugin in self.__plugins.values():
file_descriptor.write(plugin.gen_config())
for macro_file in self.__macros.values():
file_descriptor.write(macro_file)
for chroot_file in self.__files.values():
file_descriptor.write(chroot_file.gen_config())
if self.__dnf_config:
file_descriptor.write(self.__dnf_config.gen_config())
except Exception as exc:
raise err.ConfigurationError('There was an error exporting the mock ' \
f'configuration: {exc}')
finally:
file_descriptor.close()
@property
def mock_config_hash(self):
"""
Creates a hash sum of the configuration. Could be used for tracking
and/or comparison purposes.
This may not currently work at this time.
"""
hasher = hashlib.sha256()
file_descriptor = StringIO()
self.export_mock_config(file_descriptor)
file_descriptor.seek(0)
hasher.update(file_descriptor.read().encode('utf-8'))
file_descriptor.close()
return hasher.hexdigest()
### Start Plugins
class MockPluginConfig(MockConfigUtils):
"""
Mock plugin configuration helper. For cases where some plugin doesn't have
some sort of class in this module.
"""
def __init__(
self,
name: str,
enable: bool,
**kwargs
):
"""
Plugin config init. Used to enable/disable plugins. Additional plugin
options can be defined in kwargs (may or may not work)
name: plugin name, string
enable: boolean
"""
self.name = copy.copy(name)
self.enable = enable
self.opts = copy.copy(kwargs)
def gen_config(self):
"""
Helps add a plugin configuration to mock
"""
plugin_name = self.name
config_string_status = self.enable
outter = self.gen_config_string_with_opts(
name=plugin_name,
status=config_string_status,
opts=self.opts
)
return outter
class MockBindMountPluginConfig(MockConfigUtils):
"""
Mock plugin configuration helper
"""
def __init__(
self,
enable: bool,
mounts: list
):
"""
Plugin config init. Used to enable/disable bind mount plugin.
enable: boolean
mounts: list of tuples
"""
self.name = 'bind_mount'
self.enable = enable
self.mounts = mounts
def gen_config(self):
"""
Helps add a plugin configuration to mock
"""
bind_config_status = self.config_string(self.enable)
# Documentation wants a ['dirs'] section added, so we're obliging.
outter = self.gen_config_string(name='bind_mount', status=bind_config_status)
if not self.enable or not self.mounts:
return outter
for local_path, mock_chroot_path in self.mounts:
# pylint: disable=line-too-long
outter += f'config_opts["plugin_conf"]["bind_mount_opts"]["dirs"].append(("{local_path}", "{mock_chroot_path}"))\n'
return outter
class MockChrootScanPluginConfig(MockConfigUtils):
"""
Helps setup the chroot scan plugin.
"""
def __init__(
self,
enable,
**kwargs
):
"""
Inits the plugin configuration.
enable: bool
kwargs: additional options can be sent in here
"""
self.name = 'chroot_scan'
self.enable = enable
self.opts = copy.copy(kwargs)
def gen_config(self):
"""
Helps add a plugin configuration to mock
"""
chroot_config_status = self.enable
# This one is weird. The documentation specifically wants a "dict" as a
# string... Not really clear why. But we'll roll with it.
outter = self.gen_config_string(
name='chroot_scan',
status=chroot_config_status
)
opts_dict = {}
for key, option in sorted(self.opts.items()):
opts_dict[key] = option
outter += f'config_opts["plugin_conf"]["chroot_scan_opts"] = {opts_dict}\n'
return outter
class MockShowrcPluginConfig(MockConfigUtils):
"""
Helps enable the showrc plugin. Useful for showing defined rpm macros for a
build.
"""
def __init__(self, enable):
"""
Inits the plugin configuration.
enable: bool
"""
self.name = 'showrc'
self.enable = enable
def gen_config(self):
"""
Helps add a plugin configuration to mock
"""
showrc_config_status = self.enable
outter = f'config_opts["plugin_conf"]["showrc_enable"] = {showrc_config_status}\n'
return outter
### End Plugins
class MockChrootFileConfig:
"""
Helps embed files into a mock chroot. May be useful to trick builds if
necessary but also could help with things like secureboot if needed.
"""
def __init__(
self,
file: str,
content=None
):
"""
Create a file to embed into the mock root
"""
if not content:
raise err.MissingValueError('Macro content was not provided')
self.file = file
self._content = content
def gen_config(self):
"""
Return a string to be added to mock config
"""
return f'config_opts["files"]["{self.file}"] = """{self._content}\n"""\n\n'
class MockMacroConfig:
"""
Helps add macros into a mock configuration. This is a typical staple of
builds. In most cases, you won't need this and instead will use
MockMacroFileConfig.
"""
def __init__(
self,
name: str,
value: str
):
"""
init the class
"""
self.name = name
self.value = value
def gen_config(self):
"""
Generate the macro option
"""
return f'config_opts["macros"]["{self.name}"] = "{self.value}"'
class MockMacroFileConfig:
"""
Helps add macros into a mock configuration into a file instead.
"""
def __init__(
self,
file: str = '/etc/rpm/macros.zz',
content=None
):
"""
Create a macro file to embed into the mock root
"""
if not content:
raise err.MissingValueError('Macro content was not provided')
self.file = file
self._content = content
def gen_config(self):
"""
Return a string to be added to mock config
"""
return f'config_opts["files"]["{self.file}"] = """\n\n{self._content}\n"""\n\n'

100
mock/error.py Normal file
View File

@ -0,0 +1,100 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
Mock Error Classes (mainly for parsing, if we care)
"""
import os
import re
from pv2.util import constants as const
from pv2.util import generic as generic_util
# list every error class that's enabled
__all__ = [
'MockErrorParser'
]
class MockErrorChecks:
"""
Static methods of all error checks
"""
@staticmethod
def analyze_log(checks, log_file):
"""
Go through the list of checks and verify the log file
All checks are listed throughout the class below this one.
"""
log_file_name = os.path.basename(log_file)
result_dict = {}
with open(log_file_name, 'rb') as file_descriptor:
for line_number, line in enumerate(file_descriptor, 1):
for check in checks:
result = check(line)
if result:
error_code, error_message = result
result_dict = {
'error_code': error_code,
'error_message': error_message,
'file_name': log_file_name,
'line': line_number
}
return result_dict
@staticmethod
def check_error(regex, message, error_code, line):
"""
Does the actual regex verification
"""
result = re.search(regex, generic_util.to_unicode(line))
if result:
return error_code, message.format(*result.groups())
return None
@staticmethod
def unmet_dep(line):
"""
Searches for a dependency error in the root log
"""
regex = r'Error:\s+No\s+Package\s+found\s+for\s+(.*?)$'
message_template = 'No package(s) found for "{0}"'
verify_pattern = __class__.check_error(regex,
message_template,
const.MockConstants.MOCK_EXIT_DNF_ERROR,
line)
return verify_pattern
class MockErrorParser(MockErrorChecks):
"""
Helps provide checking definitions to find errors and report them. This
could be used in the case of having a generic error (like 1) from mock and
needing to find the real reason.
"""
def __init__(
self,
root_log,
build_log
):
"""
Initialize parser
"""
self._root_log = root_log
self._build_log = build_log
def check_for_error(self):
"""
Checks for errors
"""
# we'll get this eventually
#build_log_check = []
root_log_check = [
self.unmet_dep
]
# pylint: disable=line-too-long
return self.analyze_log(root_log_check, self._root_log)

423
mock/runner.py Normal file
View File

@ -0,0 +1,423 @@
# -*- mode:python; coding:utf-8; -*-
# Louis Abel <label@rockylinux.org>
"""
Mock runners and limited error handler
"""
import os
import re
import logging
from pv2.util import error as err
from pv2.util import fileutil
from pv2.util import constants as const
from pv2.util import processor
__all__ = [
'MockRunner',
'MockResult'
]
class MockRunner:
"""
Mock runner definitions
"""
def __init__(self, config_path: str):
"""
Initialize the runner
"""
self.logger = logging.getLogger(self.__module__)
self.config_path = config_path
def init(self, resultdir=None, quiet=None, isolation=None, foreground=False):
"""
Inits a mock root
"""
return self.__run_mock(mock_call='init', resultdir=resultdir,
quiet=quiet, isolation=isolation,
foreground=foreground)
# pylint: disable=too-many-arguments
def shell(
self,
command: str,
resultdir=None,
quiet=None,
isolation=None,
foreground=False
):
"""
Runs shell for a given mock root
"""
return self.__run_mock(mock_call='shell', mock_arg=command,
resultdir=resultdir, quiet=quiet,
isolation=isolation, foreground=foreground)
def clean(self, quiet=None, isolation=None, foreground=False):
"""
Clean up the mock root
"""
try:
self.__run_mock(mock_call='clean', quiet=quiet,
isolation=isolation, foreground=foreground)
except MockErrorResulter as exc:
self.logger.error('Unable to run clean on %s', self.config_path)
self.logger.error('Output:\n%s\n', exc)
self.__run_mock(mock_call='clean')
# pylint: disable=too-many-arguments
def buildsrpm(
self,
spec: str,
sources: str,
resultdir=None,
definitions=None,
timeout=None,
quiet=None,
isolation=None,
foreground=False
):
"""
Builds a source RPM, but does not actually build the package
"""
return self.__run_mock(
mock_call='buildsrpm',
spec=spec,
sources=sources,
resultdir=resultdir,
definitions=definitions,
rpmbuild_timeout=timeout,
quiet=quiet,
target='noarch',
isolation=isolation,
foreground=foreground
)
# pylint: disable=too-many-arguments
def build(
self,
srpm_path: str,
resultdir=None,
definitions=None,
timeout=None,
quiet=None,
isolation=None,
foreground=False
):
"""
Builds a given source package
"""
return self.__run_mock(
mock_call='rebuild',
mock_arg=srpm_path,
resultdir=resultdir,
rpmbuild_timeout=timeout,
definitions=definitions,
quiet=quiet,
isolation=isolation,
foreground=foreground
)
def __determine_resultdir(self):
"""
Receives no input. This should figure out where the resultdir
will ultimately be.
"""
mock_debug_args = [
'mock',
'--root', self.config_path,
'--debug-config-expanded'
]
mock_debug_run = processor.run_proc_no_output(command=mock_debug_args)
regex = r"^config_opts\['resultdir'\] = '(.*)'"
regex_search = re.search(regex, mock_debug_run.stdout, re.MULTILINE)
if regex_search:
return regex_search.group(1)
return None
# pylint: disable=too-many-locals,too-many-branches
def __run_mock(
self,
mock_call: str,
mock_arg: str = '',
resultdir=None,
foreground=False,
**kwargs
):
"""
Actually run mock.
mock_call should be the command being used (such as rebuild, shell, and
so on)
mock_arg is a string, and can be an additional argument (some mock
commands do not need an additional argument, thus default is an empty
string)
kwargs can be any set of additional arguments to add to mock as
key:value pairs. for example, lets say your function accepts an
argument like isolation and you set it to 'simple', the kwargs.items()
block will parse it as `--isolation simple`. if your function does not
require an argument, and it's not a matter of it being true or false,
you would send it as argument='' to ensure that an additional list item
is not added after the argument.
"""
# Note: You will notice that everything appears to be separate list
# items. This is on purpose to try to make sure subprocess is happy.
# Don't try to simplify it.
initial_args = [
'mock',
'--root', self.config_path,
f'--{mock_call}', mock_arg
]
if resultdir:
initial_args.append('--resultdir')
initial_args.append(resultdir)
# As you probably noticed, not all options being sent by the other
# methods are accounted for, so we are using kwargs to deal with them
# instead. This is because not all mock commands use the same options
# (or get the same effects out of them if they can be specified). But
# we are firm on on the ones that should be set.
for option, argument in kwargs.items():
if argument is None:
continue
# If we are sending mock specific macro definitions that are not in
# the config, this is how you do it. It's expected that definitions
# is a dict with only key value pairs.
if option == 'definitions':
for macro, value in argument.items():
initial_args.append('--define')
# Macro definitions require quotes between name and value.
# DO NOT UNDO THIS.
initial_args.append(f"'{macro} {value}'")
# "quiet" is a weird one because it doesn't accept a value in mock.
# We purposely set it to "None" so it gets passed over (way above).
# Setting to True will make this flag appear.
elif option == 'quiet':
initial_args.append('--quiet')
elif option == 'isolation':
if argument in ('simple', 'nspawn', 'simple'):
initial_args.append('--isolation')
initial_args.append(str(argument))
else:
raise err.ProvidedValueError(f'{argument} is an invalid isolation option.')
# If we're not covering the obvious ones above that we need special
# care for, this is where the rest happens. If an argument is sent
# with an empty string, it'll just show up as --option. Any
# argument will make it show up as --option argument.
else:
initial_args.append(f'--{option}')
if len(str(argument)) > 0:
initial_args.append(str(argument))
# Might not need this. This just makes sure our list is in order.
initial_args = [arg for arg in initial_args if arg]
mock_command = ' '.join(initial_args)
self.logger.info('The following mock command will be executed: %s', mock_command)
# If foreground is enabled, all output from mock will show up in the
# user's terminal (or wherever the output is being sent). This means
# stdout and stderr will NOT contain any data. It may be better to set
# "quiet" instead of foreground and then stream the actual log files
# themselves, but this requires you to be specific on the resultdir to
# find and stream them.
if foreground:
mock_run = processor.run_proc_foreground(command=initial_args)
else:
mock_run = processor.run_proc_no_output(command=initial_args)
# Assign vars based on what happened above.
mock_config = self.config_path
exit_code = mock_run.returncode
stdout = mock_run.stdout
stderr = mock_run.stderr
# If a resultdir wasn't presented, we try to look for it. We do this by
# running mock's debug commands to get the correct value and regex it
# out.
if not resultdir:
resultdir = self.__determine_resultdir()
if exit_code != 0:
raise MockErrorResulter(
mock_command,
exit_code,
resultdir)
return MockResult(
mock_command,
mock_config,
exit_code,
stdout,
stderr,
resultdir)
class MockResult:
"""
Mock result parser
"""
# pylint: disable=too-many-arguments
def __init__(
self,
mock_command,
mock_config,
exit_code,
stdout,
stderr,
resultdir=None
):
"""
Initialize the mock result parser
"""
self.mock_command = mock_command
self.mock_config = mock_config
self.exit_code = exit_code
self.__stdout = stdout
self.__stderr = stderr
self.resultdir = resultdir
@property
def srpm(self):
"""
Turns a string (or None) of the build source RPM package
"""
return next(iter(fileutil.filter_files(
self.resultdir,
lambda file: file.endswith('src.rpm'))),
None
)
@property
def rpms(self):
"""
Returns a list of RPM package paths in the resultdir.
"""
return fileutil.filter_files(
self.resultdir,
lambda file: re.search(r'(?<!\.src)\.rpm$', file)
)
@property
def logs(self):
"""
Returns a list of mock log files
"""
mock_log_files = fileutil.filter_files(self.resultdir,
lambda file: file.endswith('.log'))
# If we are using the chroot scan plugin, then let's search for other
# logs that we may have cared about in this build.
chroot_scan_dir = os.path.join(self.resultdir, 'chroot_scan')
if os.path.exists(chroot_scan_dir):
for dir_name, _, files in os.walk(chroot_scan_dir):
for file in files:
if file.endswith('.log'):
mock_log_files.append(os.path.join(os.path.abspath(dir_name), file))
return mock_log_files
@property
def stdout(self):
"""
Returns stdout
"""
return self.__stdout
@property
def stderr(self):
"""
Returns stdout
"""
return self.__stderr
# Is there a better way to do this?
# Note that this isn't in pv2.util.error because this *may* be used to parse
# logs at some point, and we do not want to add additional parsers to
# pv2.util.error or have it import mock modules if it's not actually required.
# I don't want to have to import more than needed in pv2.util.error.
class MockErrorResulter(Exception):
"""
Mock error result checker.
Takes in an exception and reports the exit code.
"""
def __init__(
self,
mock_command,
exit_code,
resultdir=None,
result_message=None
):
"""
Initialize the MockError class this way.
"""
# We probably don't need to do this, but it doesn't hurt. There should
# always be a resultdir to reference.
self.build_log = None
self.root_log = None
if resultdir:
self.build_log = os.path.join(resultdir, 'build.log')
self.root_log = os.path.join(resultdir, 'root.log')
if not result_message:
result_message = f'Command {mock_command} exited with code ' \
f'{exit_code}. Please review build.log and root.log ' \
f'located in the main root ({resultdir}) or bootstrap root.'
# This is awkward. I can't think of a better way to do this.
if exit_code == const.MockConstants.MOCK_EXIT_ERROR:
#error_object = errmock(self.root_log, self.build_log)
#error_dict = error_object.check_for_error()
#if len(error_dict) > 0:
# result_message = f'Command {mock_command} exited with code ' \
# '{error_dict["error_code"]}: {error_dict["error_message"]}'
# pylint: disable=non-parent-init-called
err.MockGenericError.__init__(self, result_message)
# Send to log parser to figure out what it actually is, and use the
# above to report it.
elif exit_code == const.MockConstants.MOCK_EXIT_SETUID:
# pylint: disable=non-parent-init-called
result_message = 'Either setuid/setgid is not available or ' \
'another error occurred (such as a bootstrap init failure). ' \
'Please review build.log or root.log, in the main root ' \
f'({resultdir}) or bootstrap root if applicable.'
err.MockGenericError.__init__(self, result_message)
elif exit_code == const.MockConstants.MOCK_EXIT_INVCONF:
# pylint: disable=non-parent-init-called
err.MockInvalidConfError.__init__(self, result_message)
elif exit_code == const.MockConstants.MOCK_EXIT_INVARCH:
# pylint: disable=non-parent-init-called
err.MockInvalidArchError.__init__(self, result_message)
elif exit_code in (const.MockConstants.MOCK_EXIT_DNF_ERROR,
const.MockConstants.MOCK_EXIT_EXTERNAL_DEP):
# pylint: disable=non-parent-init-called
err.MockDnfError.__init__(self, result_message)
elif exit_code == const.MockConstants.MOCK_EXIT_RESULTDIR_NOT_CREATED:
# pylint: disable=non-parent-init-called
err.MockResultdirError.__init__(self, result_message)
elif exit_code in (const.MockConstants.MOCK_EXIT_SIGHUP_RECEIVED,
const.MockConstants.MOCK_EXIT_SIGPIPE_RECEIVED,
const.MockConstants.MOCK_EXIT_SIGTERM_RECEIVED):
# pylint: disable=non-parent-init-called
err.MockSignalReceivedError.__init__(self, result_message)
else:
result_message = 'An unexpected mock error was caught. Review ' \
f'stdout/stderr or other logs to determine the issue. ' \
f'\n\nMock command: {mock_command}'
# pylint: disable=non-parent-init-called
err.MockUnexpectedError.__init__(self, result_message)

5
models/__init__.py Normal file
View File

@ -0,0 +1,5 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
Useful models. These may not be used and may be put elsewhere.
"""

8
modularity/__init__.py Normal file
View File

@ -0,0 +1,8 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
Mock and mock accessories
"""
# import all thingies here
from .util import GenericModule,ArtifactHandler,ModuleMangler

208
modularity/util.py Normal file
View File

@ -0,0 +1,208 @@
# -*- mode:python; coding:utf-8; -*-
# Louis Abel <label@rockylinux.org>
"""
Utility functions for Modularity
"""
import datetime
import hashlib
import gi
from pv2.util import error as err
from pv2.util import constants as const
from pv2.util import generic
from pv2.util import fileutil
gi.require_version('Modulemd', '2.0')
# Note: linter says this should be at the top. but then the linter says that
# everything else should be above it. it's fine here.
# pylint: disable=wrong-import-order,wrong-import-position
from gi.repository import Modulemd
__all__ = [
'GenericModuleHandler',
'ArtifactHandler',
'ModuleMangler'
]
class GenericModuleHandler:
"""
Generic module utility functions
"""
@staticmethod
def gen_stream_prefix(major: int, minor: int, patch: int) -> int:
"""
Generates a module stream prefix if one isn't provided by some other
means.
"""
major_version = str(major)
minor_version = str(minor) if len(str(minor)) > 1 else f'0{str(minor)}'
patch_version = str(patch) if len(str(patch)) > 1 else f'0{str(patch)}'
return int(f'{major_version}{minor_version}{patch_version}')
@staticmethod
def gen_stream_version(prefix: int) -> int:
"""
Generates a module stream version. Requires an initial prefix (like
90200 or similar).
"""
timestamp = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')
return int(f'{prefix}{timestamp}')
@staticmethod
def gen_stream_dist_prefix(major: int, minor: int, patch: int) -> str:
"""
Generates a dist prefix (elX.Y.Z)
"""
major_version = str(major)
minor_version = str(minor)
patch_version = str(patch)
return f'el{major_version}.{minor_version}.{patch_version}'
@staticmethod
def gen_stream_dist_macro(
dist_prefix: str,
stream,
index=None,
scratch_build=False
) -> str:
"""
Generates a dist macro. stream should be a Modulemd.ModuleStreamV2 object
"""
# Fedora uses + it seems, while there are others who seem to use _.
# We'll just use +
# (Hopefully I did this better than in lazybuilder)
mod_prefix = 'module+'
# If this is a scratch build, change the initial prefix. Should be like
# what MBS does.
if scratch_build:
mod_prefix = 'scrmod+'
dist_string = '.'.join([
stream.get_module_name(),
stream.get_stream_name(),
str(stream.get_version()),
str(stream.get_context())
]
).encode('utf-8')
dist_hash = hashlib.sha1(dist_string, usedforsecurity=False).hexdigest()[:8]
template = f'.{mod_prefix}{dist_prefix}+{index}+{dist_hash}'
return template
@staticmethod
def gen_stream_build_deps():
"""
Gets a module stream's build deps
"""
return 'how'
@staticmethod
def gen_stream_runtime_deps():
"""
Gets a module stream's runtime deps
"""
return 'how'
@staticmethod
def gen_xmd_data(data: dict):
"""
Generates basic XMD information
"""
xmd = {'peridot': data}
return xmd
@staticmethod
def gen_module_defaults(name):
"""
Creates a modulemd default object
"""
return Modulemd.DefaultsV1.new(name)
@staticmethod
def merge_modules(module_a, module_b):
"""
Merges two module yamls together
"""
merge_object = Modulemd.ModuleIndexMerger.new()
merge_object.associate_index(module_b, 0)
merge_object.associate_index(module_a, 0)
return merge_object.resolve()
@staticmethod
def dump_to_yaml(stream):
"""
Dumps a module stream to YAML string
"""
module_index = Modulemd.ModuleIndex.new()
module_index.add_module_stream(stream)
return module_index.dump_to_string()
@staticmethod
def get_stream_metadata(module, stream):
"""
Gets a module's general information. Expects a Modulemd.Module object
and a Modulemd.ModuleStreamV2 object.
"""
module_dict = {
'name': stream.get_module_name(),
'stream': stream.get_stream_name(),
'arch': stream.get_arch(),
'version': stream.get_version(),
'context': stream.get_context(),
'summary': stream.get_summary(),
'is_default_stream': False,
'default_profiles': [],
'yaml_template': __class__.dump_to_yaml(stream)
}
defaults = module.get_defaults()
if not defaults:
return module_dict
default_stream = defaults.get_default_stream()
module_dict['is_default_stream'] = stream.get_stream_name() == default_stream
module_dict['default_profiles'] = defaults.get_default_profiles_for_stream(
stream.get_stream_name()
)
return module_dict
# pylint: disable=too-few-public-methods
class ArtifactHandler:
"""
Handles artifacts for a module. Typically RPMs
"""
# pylint: disable=too-many-arguments
def __init__(
self,
name: str,
version: str,
release: str,
arch: str,
epoch=None
):
"""
Initialize wrapper
"""
self.name = name
self.version = version
self.release = release
self.arch = arch
self.epoch = epoch
def return_artifact(self) -> str:
"""
Returns artifact string
"""
epoch = self.epoch if self.epoch else '0'
return f'{self.name}-{epoch}:{self.version}-{self.release}.{self.arch}'
class ModuleMangler:
"""
Specific functions for dealing with module yamls.
"""
def __init__(self):
"""
Initialize class
"""

5
peridotpb/__init__.py Normal file
View File

@ -0,0 +1,5 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
The peridotpb part of everything I suppose.
"""

3
util/README.md Normal file
View File

@ -0,0 +1,3 @@
# util module
This is for pv2 utilities.

5
util/__init__.py Normal file
View File

@ -0,0 +1,5 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
Start up the util module with no defaults
"""

30
util/color.py Normal file
View File

@ -0,0 +1,30 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
# borrowed from empanadas
"""
Color classes
"""
# RPM utilities
__all__ = [
'Color',
]
# pylint: disable=too-few-public-methods
class Color:
"""
Supported colors
"""
RED = "\033[91m"
GREEN = "\033[92m"
PURPLE = "\033[95m"
CYAN = "\033[96m"
DARKCYAN = "\033[36m"
BLUE = "\033[94m"
YELLOW = "\033[93m"
UNDERLINE = "\033[4m"
BOLD = "\033[1m"
END = "\033[0m"
INFO = "[" + BOLD + GREEN + "INFO" + END + "] "
WARN = "[" + BOLD + YELLOW + "WARN" + END + "] "
FAIL = "[" + BOLD + RED + "FAIL" + END + "] "
STAT = "[" + BOLD + CYAN + "STAT" + END + "] "

168
util/constants.py Normal file
View File

@ -0,0 +1,168 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
All constants
"""
__all__ = [
'RpmConstants',
'ErrorConstants',
'MockConstants'
]
# pylint: disable=too-few-public-methods
class RpmConstants:
"""
All RPM constants are here. These are used mainly in the rpm utility but
could be used elsewhere.
"""
RPM_HEADER_MAGIC = b'\xed\xab\xee\xdb'
RPM_TAG_HEADERSIGNATURES = 62
RPM_TAG_FILEDIGESTALGO = 5011
RPM_SIGTAG_DSA = 267
RPM_SIGTAG_RSA = 268
RPM_SIGTAG_PGP = 1002
RPM_SIGTAG_MD5 = 1004
RPM_SIGTAG_GPG = 1005
RPM_FILEDIGESTALGO_IDS = {
None: 'MD5',
1: 'MD5',
2: 'SHA1',
3: 'RIPEMD160',
8: 'SHA256',
9: 'SHA384',
10: 'SHA512',
11: 'SHA224'
}
MOCK_CLONE_DIRECTORY = '/var/peridot/peridot__rpmbuild_content'
# pylint: disable=too-few-public-methods
class ErrorConstants:
"""
All error codes as constants.
9000-9099: Generic Errors, this means not entirely specific to a process or
component.
9100-9199: Mock errors, any error that can occur in mock.
"""
# General errors
ERR_GENERAL = 9000
ERR_PROVIDED_VALUE = 9001
ERR_VALUE_EXISTS = 9002
ERR_MISSING_VALUE = 9003
ERR_CONFIGURATION = 9004
ERR_NOTFOUND = 9005
# Error in spec file
MOCK_ERR_SPEC = 9100
# Error trying to get dependencies for a build
MOCK_ERR_DEP = 9101
# Architecture is excluded - there should be no reason this appears normally.
MOCK_ERR_ARCH_EXCLUDED = 9102
# A build hung up during build
MOCK_ERR_BUILD_HUP = 9103
# Build ran out of disk space
MOCK_ERR_NO_SPACE = 9104
# Missing file error
MOCK_ERR_ENOENT = 9105
# Files were installed but not packaged
MOCK_ERR_UNPACKED_FILES = 9106
# Error in repository
MOCK_ERR_REPO = 9107
# Timeout
MOCK_ERR_ETIMEDOUT = 9108
# Changelog is not in chronological order
MOCK_ERR_CHLOG_CHRONO = 9109
# Invalid conf
MOCK_ERR_CONF_INVALID = 9110
# DNF Error
MOCK_ERR_DNF_ERROR = 9111
# Result dir generic error
MOCK_ERR_RESULTDIR_GENERIC = 9180
# Unexpected error
MOCK_ERR_UNEXPECTED = 9198
# Generic error
MOCK_ERR_GENERIC = 9199
# pylint: disable=too-few-public-methods
class MockConstants:
"""
All mock constants, usually for defaults
"""
# I'm aware this line is too long
MOCK_DEFAULT_CHROOT_BUILD_PKGS = [
'bash',
'bzip2',
'coreutils',
'cpio',
'diffutils',
'findutils',
'gawk',
'glibc-minimal-langpack',
'grep',
'gzip',
'info',
'make',
'patch',
'redhat-rpm-config',
'rpm-build',
'sed',
'shadow-utils',
'system-release',
'tar',
'unzip',
'util-linux',
'which',
'xz'
]
MOCK_DEFAULT_CHROOT_SRPM_PKGS = [
'bash',
"glibc-minimal-langpack",
"gnupg2",
"rpm-build",
"shadow-utils",
"system-release"
]
MOCK_DEFAULT_CHROOT_SETUP_CMD = 'install'
# Mock architecture related
MOCK_X86_64_LEGAL_ARCHES = ('x86_64',)
MOCK_I686_LEGAL_ARCHES = ('i386', 'i486', 'i586', 'i686', 'x86_64',)
MOCK_AARCH64_LEGAL_ARCHES = ('aarch64',)
MOCK_ARMV7HL_LEGAL_ARCHES = ('armv7hl',)
MOCK_PPC64LE_LEGAL_ARCHES = ('ppc64le',)
MOCK_S390X_LEGAL_ARCHES = ('s390x',)
MOCK_RISCV64_LEGAL_ARCHES = ('riscv64',)
# pylint: disable=line-too-long
MOCK_NOARCH_LEGAL_ARCHES = ('i386', 'i486', 'i586', 'i686', 'x86_64', 'aarch64', 'ppc64le', 's390x', 'noarch')
# Mock general config related
MOCK_DNF_BOOL_OPTIONS = ('assumeyes', 'best', 'enabled', 'gpgcheck',
'install_weak_deps', 'keepcache', 'module_hotfixes',
'obsoletes')
MOCK_DNF_STR_OPTIONS = ('debuglevel', 'retries', 'metadata_expire')
MOCK_DNF_LIST_OPTIONS = ('syslog_device', 'protected_packages')
MOCK_RPM_VERBOSITY = ('critical', 'debug', 'emergency', 'error', 'info', 'warn')
# Most mock error codes
MOCK_EXIT_SUCCESS = 0
MOCK_EXIT_ERROR = 1
MOCK_EXIT_SETUID = 2
MOCK_EXIT_INVCONF = 3
MOCK_EXIT_CMDLINE = 5
MOCK_EXIT_INVARCH = 6
MOCK_EXIT_BUILD_PROBLEM = 10
MOCK_EXIT_CMDTMOUT = 11
MOCK_EXIT_ERROR_IN_CHROOT = 20
MOCK_EXIT_DNF_ERROR = 30
MOCK_EXIT_EXTERNAL_DEP = 31
MOCK_EXIT_PKG_ERROR = 40
MOCK_EXIT_MOCK_CMDLINE = 50
MOCK_EXIT_BUILDROOT_LOCKED = 60
MOCK_EXIT_RESULTDIR_NOT_CREATED = 70
MOCK_EXIT_WEAK_DEP_NOT_INSTALLED = 120
MOCK_EXIT_SIGHUP_RECEIVED = 129
MOCK_EXIT_SIGPIPE_RECEIVED = 141
MOCK_EXIT_SIGTERM_RECEIVED = 143

13
util/cr.py Normal file
View File

@ -0,0 +1,13 @@
"""
Parses repo metadata to get information. May be useful for getting general info
about a project's repository, like for generating a summary.
"""
#import os
#import createrepo_c as cr
__all__ = []
def _warning_cb(warning_type, message):
print(f"WARNING: {message}")
return True

118
util/error.py Normal file
View File

@ -0,0 +1,118 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
Generic Error Classes
"""
# needed imports
from pv2.util.constants import ErrorConstants as errconst
# list every error class that's enabled
__all__ = [
'GenericError',
'ProvidedValueError',
'ExistsValueError',
'MissingValueError',
'ConfigurationError',
'FileNotFound',
'MockGenericError',
'MockUnexpectedError',
'MockInvalidConfError',
'MockInvalidArchError',
'MockDnfError',
'MockResultdirError',
'MockSignalReceivedError',
]
# todo: find a way to logically use fault_code
class GenericError(Exception):
"""
Custom exceptions entrypoint
"""
fault_code = errconst.ERR_GENERAL
from_fault = False
def __str__(self):
try:
return str(self.args[0]['args'][0])
# pylint: disable=broad-exception-caught
except Exception:
try:
return str(self.args[0])
# pylint: disable=broad-exception-caught
except Exception:
return str(self.__dict__)
# Starting at this point is every error class that pv2 will deal with.
class ProvidedValueError(GenericError):
"""
What it says on the tin
"""
fault_code = errconst.ERR_PROVIDED_VALUE
class ExistsValueError(GenericError):
"""
Value being requested already exists
"""
fault_code = errconst.ERR_VALUE_EXISTS
class MissingValueError(GenericError):
"""
Value being requested already exists
"""
fault_code = errconst.ERR_MISSING_VALUE
class ConfigurationError(GenericError):
"""
Value being requested already exists
"""
fault_code = errconst.ERR_CONFIGURATION
class FileNotFound(GenericError):
"""
Value being requested already exists
"""
fault_code = errconst.ERR_NOTFOUND
class MockGenericError(GenericError):
"""
Mock error exceptions
"""
fault_code = errconst.MOCK_ERR_GENERIC
class MockUnexpectedError(MockGenericError):
"""
Mock (or the environment) experienced an unexpected error.
"""
fault_code = errconst.MOCK_ERR_UNEXPECTED
class MockInvalidConfError(MockGenericError):
"""
Mock (or the environment) experienced an error with the conf.
"""
fault_code = errconst.MOCK_ERR_CONF_INVALID
class MockInvalidArchError(MockGenericError):
"""
Mock (or the environment) didn't like the arch
"""
fault_code = errconst.MOCK_ERR_ARCH_EXCLUDED
class MockDnfError(MockGenericError):
"""
Mock (or the environment) had some kind of dnf error
"""
fault_code = errconst.MOCK_ERR_DNF_ERROR
class MockResultdirError(MockGenericError):
"""
Mock (or the environment) had some kind of error in the resultdir
"""
fault_code = errconst.MOCK_ERR_RESULTDIR_GENERIC
class MockSignalReceivedError(MockGenericError):
"""
Mock had a SIG received
"""
fault_code = errconst.MOCK_ERR_BUILD_HUP

55
util/fileutil.py Normal file
View File

@ -0,0 +1,55 @@
"""
File functions
"""
import os
import hashlib
from pv2.util import error as err
# File utilities
__all__ = [
'filter_files',
'get_checksum'
]
def filter_files(directory_path: str, filter_filename: str) -> list:
"""
Filter out specified files
"""
# it's literally 101/100 ...
# pylint: disable=line-too-long
return_list = []
for file in os.listdir(directory_path):
if filter_filename(file):
return_list.append(os.path.join(directory_path, file))
return return_list
def get_checksum(file_path: str, hashtype: str = 'sha256') -> str:
"""
Generates a checksum from the provided path by doing things in chunks. This
reduces the time needed to make the hashes and avoids memory issues.
Borrowed from empanadas with some modifications
"""
# We shouldn't be using sha1 or md5.
if hashtype in ('sha', 'sha1', 'md5'):
raise err.ProvidedValueError(f'{hashtype} is not allowed.')
try:
checksum = hashlib.new(hashtype)
except ValueError as exc:
raise err.GenericError(f'hash type not available: {ValueError}') from exc
try:
with open(file_path, 'rb') as input_file:
while True:
chunk = input_file.read(8192)
if not chunk:
break
checksum.update(chunk)
input_file.close()
return checksum.hexdigest()
except IOError as exc:
raise err.GenericError(f'Could not open or process file {file_path}: {exc})')

78
util/generic.py Normal file
View File

@ -0,0 +1,78 @@
"""
Generic functions
"""
import datetime
import hashlib
from pv2.util import error as err
# General utilities
__all__ = [
'ordered',
'conv_multibyte',
'to_unicode',
'convert_from_unix_time',
'trim_non_empty_string',
'gen_bool_option',
'generate_password_hash'
]
def to_unicode(string: str) -> str:
"""
Convert to unicode
"""
if isinstance(string, bytes):
return string.decode('utf8')
if isinstance(string, str):
return string
return str(string)
def conv_multibyte(data):
"""
Convert to multibytes
"""
potential_sum = 0
num = len(data)
for i in range(num):
potential_sum += data[i] << (8 * (num - i - 1))
return potential_sum
def ordered(data):
"""
Lazy ordering
"""
if isinstance(data, int):
return data
return ord(data)
def convert_from_unix_time(timestamp: int) -> str:
"""
Convert UNIX time to a timestamp
"""
return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%dT%H:%M:%S')
def trim_non_empty_string(key, value) -> str:
"""
Verify that a given value is a non-empty string
"""
if not isinstance(value, str) or not value.strip():
raise err.ProvidedValueError(f'{key} must be a non-empty string')
return value
def gen_bool_option(value) -> str:
"""
Helps convert a value to how dnf and other configs interpret a boolean config value.
This should accept bool, string, or int and will act accordingly.
"""
return '1' if value and value != '0' else '0'
def generate_password_hash(password: str, salt: str, hashtype: str = 'sha256') -> str:
"""
Generates a password hash with a given hash type and salt
"""
if hashtype in ('sha', 'sha1', 'md5'):
raise err.ProvidedValueError(f'{hashtype} is not allowed.')
hasher = hashlib.new(hashtype)
hasher.update((salt + password).encode('utf-8'))
return str(hasher.hexdigest())

72
util/processor.py Normal file
View File

@ -0,0 +1,72 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
Provides subprocess utilities
"""
import os
import sys
import subprocess
from pv2.util import error as err
# todo: remove python 3.6 checks. nodes won't be on el8.
def run_proc_foreground(command: list):
"""
Takes in the command in the form of a list and runs it via subprocess.
Everything should be in the foreground. The return is just for the exit
code.
"""
try:
processor = subprocess.run(args=command, check=False)
except Exception as exc:
raise err.GenericError(f'There was an error with your command: {exc}')
return processor
def run_proc_no_output(command: list):
"""
Output will be stored in stdout and stderr as needed.
"""
try:
if sys.version_info <= (3, 6):
processor = subprocess.run(args=command, check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
else:
processor = subprocess.run(args=command, check=False, capture_output=True,
text=True)
except Exception as exc:
raise err.GenericError(f'There was an error with your command: {exc}')
return processor
def popen_proc_no_output(command: list):
"""
This opens a process, but is non-blocking.
"""
try:
if sys.version_info <= (3, 6):
processor = subprocess.Popen(args=command, stdout=subprocess.PIPE,
universal_newlines=True)
else:
# pylint: disable=consider-using-with
processor = subprocess.Popen(args=command, stdout=subprocess.PIPE,
text=True)
except Exception as exc:
raise err.GenericError(f'There was an error with your command: {exc}')
return processor
def run_check_call(command: list) -> int:
"""
Runs subprocess check_call and returns an integer.
"""
env = os.environ
try:
subprocess.check_call(command, env=env)
except subprocess.CalledProcessError as exc:
sys.stderr.write(f'Run failed: {exc}\n')
return 1
return 0

357
util/rpmutil.py Normal file
View File

@ -0,0 +1,357 @@
# -*- mode:python; coding:utf-8; -*-
# Louis Abel <label@rockylinux.org>
"""
Utility functions for RPM's
"""
import re
import stat
import lxml.etree
from pv2.util import error as err
from pv2.util import generic
from pv2.util import processor
from pv2.util.constants import RpmConstants as rpmconst
# We should have the python rpm modules. Forcing `rpm` to be none should make
# that known to the admin that they did not setup their node correctly.
try:
import rpm
except ImportError:
rpm = None
__all__ = [
'is_debug_package',
'get_rpm_header',
'get_rpm_metadata_from_hdr',
'compare_rpms',
'is_rpm',
'get_files_from_package',
'get_exclu_from_package',
'get_rpm_hdr_size',
'split_rpm_by_header',
'get_all_rpm_header_keys'
]
# NOTES TO THOSE RUNNING PYLINT OR ANOTHER TOOL
#
# It is normal that your linter will say that "rpm" does not have some sort of
# RPMTAG member or otherwise. You will find when you run this module in normal
# circumstances, everything is returned as normal. You are free to ignore all
# linting errors.
def is_debug_package(file_name: str) -> bool:
"""
Quick utility to state if a package is a debug package
file_name: str, package filename
Returns: bool
"""
file_name_search_rpm_res = re.search(r'.*?\.rpm$', file_name, re.IGNORECASE)
file_name_search_srpm_res = re.search(r'.*?\.src\.rpm$', file_name, re.IGNORECASE)
if not file_name_search_rpm_res:
return False
if file_name_search_srpm_res:
return False
return bool(re.search(r'-debug(info|source)', file_name))
def get_rpm_header(file_name: str):
"""
Gets RPM header metadata. This is a vital component to getting RPM
information for usage later.
Returns: dict
"""
if rpm is None:
raise err.GenericError("You must have the rpm python bindings installed")
trans_set = rpm.TransactionSet()
# this is harmless.
# pylint: disable=protected-access
trans_set.setVSFlags(rpm._RPMVSF_NOSIGNATURES | rpm._RPMVSF_NODIGESTS)
with open(file_name, 'rb') as rpm_package:
hdr = trans_set.hdrFromFdno(rpm_package)
return hdr
# pylint: disable=too-many-locals
def get_rpm_metadata_from_hdr(hdr) -> dict:
"""
Asks for RPM header information and generates some basic metadata in the
form of a dict.
Currently the metadata returns the following information, and their
potential use cases:
* changelog_xml -> Provides the changelog, which could be parsed and
placed on to a build's summary page
* files -> List of all files in the package. Obtained from
get_files_from_package
* obsoletes -> Packages that this obsoletes
* provides -> Packages that this provides
* conflicts -> Packages that this conflicts with
* requires -> Packages that this requires
* vendor -> Package vendor
* buildhost -> Which system/container built it
* filetime -> When the package was built
* description -> Package description
* license -> License of the packaged software
* nvr -> NVR, excluding epoch and arch. This can be used as a build package
name, similar to how koji displays a particular build. For
example, bash-5.2.15-3.fc38
* nevra -> Full NEVRA. Could be used as a filing mechanism in a
database and/or could be used to be part of a list of what
architecture this package may belong to for a particular
build.
* name -> Package name
* version -> Package version
* release -> Package release
* epoch -> Package epoch
* arch -> Package arch
* archivesize -> Size of the archive
* packagesize -> Size of the package
"""
changelog_result = ''
header_data = hdr
file_stuff = get_files_from_package(header_data)
exclu_stuff = get_exclu_from_package(header_data)
change_logs = zip(
# pylint: disable=no-member
header_data[rpm.RPMTAG_CHANGELOGNAME],
header_data[rpm.RPMTAG_CHANGELOGTIME],
header_data[rpm.RPMTAG_CHANGELOGTEXT]
)
for name, time, text in reversed(list(change_logs)):
# I need to come back and address this
# pylint: disable=c-extension-no-member
change = lxml.etree.Element(
'changelog',
author=generic.to_unicode(name),
date=generic.to_unicode(time)
)
change.text = generic.to_unicode(text)
changelog_result += generic.to_unicode(lxml.etree.tostring(change, pretty_print=True))
# Source RPM's can be built on any given architecture, regardless of where
# they'll be built. There are also cases where an RPM may report some other
# architecture that may be multilib or not native to the system checking
# the headers. As a result, the RPM header may return erroneous information if we
# are trying to look at the metadata of a source package. So this is a hack
# to determine if we are dealing with a source package.
# pylint: disable=no-member
source_files = header_data[rpm.RPMTAG_SOURCE]
source_pkg = header_data[rpm.RPMTAG_SOURCERPM]
pkg_arch = generic.to_unicode(header_data[rpm.RPMTAG_ARCH])
if len(source_files) != 0 or not source_pkg:
pkg_arch = 'src'
# The NEVRA exhibits the same issue.
found_nevra = header_data[rpm.RPMTAG_NEVR] + '.' + pkg_arch
# This avoids epoch being None or 'None' in the dict.
found_epoch = header_data[rpm.RPMTAG_EPOCH]
if not found_epoch:
found_epoch = ''
metadata = {
'changelog_xml': changelog_result,
'files': file_stuff['file'],
'obsoletes': header_data[rpm.RPMTAG_OBSOLETENEVRS],
'provides': header_data[rpm.RPMTAG_PROVIDENEVRS],
'conflicts': header_data[rpm.RPMTAG_CONFLICTNEVRS],
'requires': header_data[rpm.RPMTAG_REQUIRENEVRS],
'vendor': generic.to_unicode(header_data[rpm.RPMTAG_VENDOR]),
'buildhost': generic.to_unicode(header_data[rpm.RPMTAG_BUILDHOST]),
'filetime': int(header_data[rpm.RPMTAG_BUILDTIME]),
'description': generic.to_unicode(header_data[rpm.RPMTAG_DESCRIPTION]),
'license': generic.to_unicode(header_data[rpm.RPMTAG_LICENSE]),
'exclusivearch': exclu_stuff['ExclusiveArch'],
'excludearch': exclu_stuff['ExcludeArch'],
'nvr': generic.to_unicode(header_data[rpm.RPMTAG_NEVR]),
'nevra': found_nevra,
'name': generic.to_unicode(header_data[rpm.RPMTAG_NAME]),
'version': generic.to_unicode(header_data[rpm.RPMTAG_VERSION]),
'release': generic.to_unicode(header_data[rpm.RPMTAG_RELEASE]),
'epoch': found_epoch,
'arch': pkg_arch,
}
for key, rpmkey, in (('archivesize', rpm.RPMTAG_ARCHIVESIZE),
('packagesize', rpm.RPMTAG_SIZE)):
value = header_data[rpmkey]
if value is not None:
value = int(value)
metadata[key] = value
return metadata
def compare_rpms(first_pkg, second_pkg) -> int:
"""
Compares package versions. Both arguments must be a dict.
Returns an int.
1 = first version is greater
0 = versions are equal
-1 = second version is greater
"""
# pylint: disable=no-member
return rpm.labelCompare(
(first_pkg['epoch'], first_pkg['version'], first_pkg['release']),
(second_pkg['epoch'], second_pkg['version'], second_pkg['release'])
)
def is_rpm(file_name: str, magic: bool = False) -> bool:
"""
Checks if a file is an RPM
"""
file_name_search_res = re.search(r'.*?\.rpm$', file_name, re.IGNORECASE)
if magic:
with open(file_name, 'rb') as file:
block = file.read(4)
file.close()
return bool(block == rpmconst.RPM_HEADER_MAGIC) and bool(file_name_search_res)
return bool(file_name_search_res)
def get_files_from_package(hdr) -> dict:
"""
hdr should be the header of the package.
returns a dict
"""
cache = {}
# pylint: disable=no-member
files = hdr[rpm.RPMTAG_FILENAMES]
fileflags = hdr[rpm.RPMTAG_FILEFLAGS]
filemodes = hdr[rpm.RPMTAG_FILEMODES]
filetuple = list(zip(files, filemodes, fileflags))
returned_files = {}
for (filename, mode, flag) in filetuple:
if mode is None or mode == '':
if 'file' not in returned_files:
returned_files['file'] = []
returned_files['file'].append(generic.to_unicode(filename))
continue
if mode not in cache:
cache[mode] = stat.S_ISDIR(mode)
filekey = 'file'
if cache[mode]:
filekey = 'dir'
elif flag is not None and (flag & 64):
filekey = 'ghost'
returned_files.setdefault(filekey, []).append(generic.to_unicode(filename))
return returned_files
def get_exclu_from_package(hdr) -> dict:
"""
Gets exclusivearch and excludedarch from an RPM's header. This mainly
applies to source packages.
"""
# pylint: disable=no-member
excluded_arches = hdr[rpm.RPMTAG_EXCLUDEARCH]
exclusive_arches = hdr[rpm.RPMTAG_EXCLUSIVEARCH]
exclu = {
'ExcludeArch': excluded_arches,
'ExclusiveArch': exclusive_arches
}
return exclu
def get_rpm_hdr_size(file_name: str, offset: int = 0, padding: bool = False) -> int:
"""
Returns the length of the rpm header in bytes
Accepts only a file name.
"""
with open(file_name, 'rb') as file_outer:
if offset is not None:
file_outer.seek(offset, 0)
magic = file_outer.read(4)
if magic != rpmconst.RPM_HEADER_MAGIC:
raise err.GenericError(f"RPM error: bad magic: {magic}")
# Skips magic, plus end of reserve (4 bytes)
file_outer.seek(offset + 8, 0)
data = [generic.ordered(x) for x in file_outer.read(8)]
start_length = generic.conv_multibyte(data[0:4])
end_length = generic.conv_multibyte(data[4:8])
hdrsize = 8 + 16 * start_length + end_length
if padding:
# signature headers are padded to a multiple of 8 bytes
hdrsize = hdrsize + (8 - (hdrsize % 8)) % 8
hdrsize = hdrsize + 8
file_outer.close()
return hdrsize
def split_rpm_by_header(hdr) -> tuple:
"""
Attempts to split an RPM name into parts. Relies on the RPM header. May
result in failures.
Only use this if you need simplicity.
Note: A package without an epoch turns None. We turn an empty string
instead.
Note: Splitting a source package will result in an erroneous "arch" field.
"""
# pylint: disable=no-member
name = hdr[rpm.RPMTAG_NAME]
version = hdr[rpm.RPMTAG_VERSION]
release = hdr[rpm.RPMTAG_RELEASE]
epoch = hdr[rpm.RPMTAG_EPOCH]
arch = hdr[rpm.RPMTAG_ARCH]
if not epoch:
epoch = ''
return name, version, release, epoch, arch
def get_all_rpm_header_keys(hdr) -> dict:
"""
Gets all applicable header keys from an RPM.
"""
returner = {}
# pylint: disable=no-member
fields = [rpm.tagnames[k] for k in hdr.keys()]
for field in fields:
hdr_key = getattr(rpm, f'RPMTAG_{field}', None)
returner[field] = hdr_key
return returner
def quick_bump(file_name: str, user: str, comment: str):
"""
Does a quick bump of a spec file. For dev purposes only.
Loosely borrowed from sig core toolkit mangler
"""
bumprel = ['rpmdev-bumpspec', '-D', '-u', user, '-c', comment, file_name]
success = processor.run_check_call(bumprel)
return success

5
util/srpmproc.py Normal file
View File

@ -0,0 +1,5 @@
# -*-:python; coding:utf-8; -*-
# author: Louis Abel <label@rockylinux.org>
"""
srpmproc handler. this may end up not being used at all.
"""