mirror of
https://github.com/peridotbuild/pv2.git
synced 2024-12-22 10:48:31 +00:00
Add importutil module
Adds the importutil module that allows targetting a specific source RPM file and importing and tagging. It aims to keep the same structure of git.centos.org. Other changes: * constants.py: New constants added for git and rpm * error.py: New git and rpm error classes added * fileutil.py: * Add filter_files_inverse (matches everything but the filter) * Add get_magic_file (returns magic data from a file) * Add get_magic_content (returns magic data from data/content) * generic.py: Add safe_encoding to return a urlquote string * processor.py: * Add run_proc_foreground_shell to support shell calls * Add run_proc_no_output_shell to support shell calls * rpmutil.py: * get_rpm_header now supports verify_signature parameter (default false). If set to true and key is not available, raises exception. * Add verify_rpm_signature, which allows local rpm verification without ingesting the whole header into a usable object. * Add add_rpm_key, which enables a user to add a key to the rpm keyring.
This commit is contained in:
parent
e48a54db3a
commit
185d144567
@ -1,5 +1,7 @@
|
||||
# -*-:python; coding:utf-8; -*-
|
||||
# author: Louis Abel <label@rockylinux.org>
|
||||
"""
|
||||
srpmproc handler. this may end up not being used at all.
|
||||
Import module
|
||||
"""
|
||||
|
||||
from .operation import Import, SrpmImport
|
302
importer/operation.py
Normal file
302
importer/operation.py
Normal file
@ -0,0 +1,302 @@
|
||||
# -*-:python; coding:utf-8; -*-
|
||||
# author: Louis Abel <label@rockylinux.org>
|
||||
"""
|
||||
Importer accessories
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from pv2.util import gitutil, fileutil, rpmutil, processor, generic
|
||||
from pv2.util import error as err
|
||||
|
||||
__all__ = [
|
||||
'Import',
|
||||
'SrpmImport'
|
||||
]
|
||||
# todo: add in logging and replace print with log
|
||||
|
||||
class Import:
|
||||
"""
|
||||
Import an SRPM
|
||||
"""
|
||||
@staticmethod
|
||||
def remove_everything(local_repo_path):
|
||||
"""
|
||||
Removes all files from a repo. This is on purpose to ensure that an
|
||||
import is "clean"
|
||||
|
||||
Ignores .git and .gitignore
|
||||
"""
|
||||
file_list = fileutil.filter_files_inverse(local_repo_path, lambda file: '.git' in file)
|
||||
for file in file_list:
|
||||
if os.path.isfile(file) or os.path.islink(file):
|
||||
os.remove(file)
|
||||
elif os.path.isdir(file):
|
||||
shutil.rmtree(file)
|
||||
|
||||
@staticmethod
|
||||
def unpack_srpm(srpm_path, local_repo_path):
|
||||
"""
|
||||
Unpacks an srpm to the local repo path
|
||||
"""
|
||||
command_to_send = [
|
||||
'rpm',
|
||||
'-i',
|
||||
srpm_path,
|
||||
'--define',
|
||||
f"'%_topdir {local_repo_path}'"
|
||||
]
|
||||
command_to_send = ' '.join(command_to_send)
|
||||
processor.run_proc_no_output_shell(command_to_send)
|
||||
|
||||
@staticmethod
|
||||
def generate_metadata(repo_path: str, repo_name: str, file_dict: dict):
|
||||
"""
|
||||
Generates .repo.metadata file
|
||||
"""
|
||||
with open(f'{repo_path}/.{repo_name}.metadata', 'w+', encoding='utf-8') as meta:
|
||||
for name, sha in file_dict.items():
|
||||
meta.write(f'{sha} {name}\n')
|
||||
|
||||
meta.close()
|
||||
|
||||
@staticmethod
|
||||
def generate_filesum(repo_path: str, repo_name: str, srpm_hash: str):
|
||||
"""
|
||||
Generates the file that has the original sha256sum of the package this
|
||||
came from.
|
||||
"""
|
||||
with open(f'{repo_path}/.{repo_name}.checksum', 'w+', encoding='utf-8') as checksum:
|
||||
checksum.write(srpm_hash)
|
||||
checksum.close()
|
||||
|
||||
@staticmethod
|
||||
def get_dict_of_lookaside_files(local_repo_path):
|
||||
"""
|
||||
Returns a dict of files that are part of sources and are binary.
|
||||
"""
|
||||
source_dict = {}
|
||||
for file in os.scandir(f'{local_repo_path}/SOURCES'):
|
||||
full_path = f'{local_repo_path}/SOURCES/{file.name}'
|
||||
magic = fileutil.get_magic_file(full_path)
|
||||
if magic.encoding == 'binary':
|
||||
source_dict[f'SOURCES/{file.name}'] = fileutil.get_checksum(full_path)
|
||||
|
||||
return source_dict
|
||||
|
||||
@staticmethod
|
||||
def get_srpm_metadata(srpm_path, verify=False):
|
||||
"""
|
||||
Gets the rpm metadata
|
||||
"""
|
||||
hdr = rpmutil.get_rpm_header(file_name=srpm_path,
|
||||
verify_signature=verify)
|
||||
|
||||
metadata = rpmutil.get_rpm_metadata_from_hdr(hdr)
|
||||
return metadata
|
||||
|
||||
@staticmethod
|
||||
def import_lookaside(repo_path: str, repo_name: str, branch: str, file_dict: dict):
|
||||
"""
|
||||
Attempts to move the lookaside files if they don't exist to their
|
||||
hashed name.
|
||||
"""
|
||||
dest_dir = f'/var/www/html/sources/{repo_name}/{branch}'
|
||||
if not os.path.exists(dest_dir):
|
||||
os.makedirs(dest_dir, 0o755)
|
||||
for name, sha in file_dict.items():
|
||||
source_path = f'{repo_path}/{name}'
|
||||
dest_path = f'{dest_dir}/{sha}'
|
||||
if os.path.exists(dest_path):
|
||||
print(f'{dest_path} already exists, skipping')
|
||||
os.remove(source_path)
|
||||
else:
|
||||
shutil.move(src=source_path, dst=dest_path)
|
||||
|
||||
class SrpmImport(Import):
|
||||
"""
|
||||
Import class for importing rpms to a git service
|
||||
"""
|
||||
# pylint: disable=too-many-arguments
|
||||
def __init__(
|
||||
self,
|
||||
git_url_path: str,
|
||||
srpm_path: str,
|
||||
release: str = '',
|
||||
branch: str = '',
|
||||
git_user: str = 'git',
|
||||
org: str = 'rpms',
|
||||
verify_signature: bool = False
|
||||
):
|
||||
"""
|
||||
Init the class.
|
||||
|
||||
Set the org to something else if needed. Note that if you are using
|
||||
subgroups, do not start with a leading slash (e.g. some_group/rpms)
|
||||
"""
|
||||
self.__srpm_path = srpm_path
|
||||
self.__srpm_hash = fileutil.get_checksum(srpm_path)
|
||||
self.__srpm_metadata = self.get_srpm_metadata(srpm_path,
|
||||
verify_signature)
|
||||
self.__release = release
|
||||
|
||||
pkg_name = self.__srpm_metadata['name']
|
||||
git_url = f'ssh://{git_user}@{git_url_path}/{org}/{pkg_name}.git'
|
||||
self.__git_url = git_url
|
||||
|
||||
if len(release) == 0:
|
||||
self.__release = self.__get_srpm_release_version
|
||||
|
||||
if not self.__release:
|
||||
raise err.RpmInfoError('The dist tag does not contain elX or elXY')
|
||||
|
||||
self.__branch = branch
|
||||
if len(branch) == 0:
|
||||
self.__branch = f'c{release}'
|
||||
print(f'Warning: Branch name not specified, defaulting to {self.__branch}')
|
||||
|
||||
def __get_srpm_release_version(self):
|
||||
"""
|
||||
Gets the release version from the srpm
|
||||
"""
|
||||
regex = r'.el(\d+)'
|
||||
dist_tag = self.__srpm_metadata['release']
|
||||
regex_search = re.search(regex, dist_tag)
|
||||
if regex_search:
|
||||
return regex_search.group(1)
|
||||
|
||||
return None
|
||||
|
||||
def pkg_import(self):
|
||||
"""
|
||||
Actually perform the import
|
||||
"""
|
||||
check_repo = gitutil.lsremote(self.git_url)
|
||||
git_repo_path = f'/var/tmp/{self.rpm_name}'
|
||||
branch = self.__branch
|
||||
repo_tags = []
|
||||
# If we return None, we need to assume that this is a brand new repo,
|
||||
# so we will try to set it up accordingly. If we return refs, we'll see
|
||||
# if the branch we want to work with exists. If it does not exist,
|
||||
# we'll do a straight clone, and then create an orphan branch.
|
||||
if check_repo:
|
||||
# check for specific ref name
|
||||
ref_check = f'refs/heads/{branch}' in check_repo
|
||||
# if our check is correct, clone it. if not, clone normally and
|
||||
# orphan.
|
||||
if ref_check:
|
||||
repo = gitutil.clone(
|
||||
git_url_path=self.git_url,
|
||||
repo_name=self.rpm_name_replace,
|
||||
branch=branch
|
||||
)
|
||||
else:
|
||||
repo = gitutil.clone(
|
||||
git_url_path=self.git_url,
|
||||
repo_name=self.rpm_name_replace,
|
||||
branch=None
|
||||
)
|
||||
gitutil.checkout(repo, branch=self.__branch, orphan=True)
|
||||
# Remove everything, plain and simple. Only needed for clone.
|
||||
self.remove_everything(repo.working_dir)
|
||||
for tag_name in repo.tags:
|
||||
repo_tags.append(tag_name.name)
|
||||
else:
|
||||
print('Repo may not exist or is private. Try to import anyway.')
|
||||
repo = gitutil.init(
|
||||
git_url_path=self.git_url,
|
||||
repo_name=self.rpm_name,
|
||||
to_path=git_repo_path,
|
||||
branch=branch
|
||||
)
|
||||
|
||||
self.unpack_srpm(self.srpm_path, git_repo_path)
|
||||
sources = self.get_dict_of_lookaside_files(git_repo_path)
|
||||
self.generate_metadata(git_repo_path, self.rpm_name, sources)
|
||||
self.generate_filesum(git_repo_path, self.rpm_name, self.srpm_hash)
|
||||
self.import_lookaside(git_repo_path, self.rpm_name, branch, sources)
|
||||
gitutil.add_all(repo)
|
||||
# pylint: disable=line-too-long
|
||||
import_tag = generic.safe_encoding(f'imports/{branch}/{self.rpm_name}-{self.rpm_version}-{self.rpm_release}')
|
||||
commit_msg = f'import {self.rpm_name}-{self.rpm_version}-{self.rpm_release}'
|
||||
|
||||
# Raise an error if the tag already exists. Force the importer to tag
|
||||
# manually.
|
||||
if import_tag in repo_tags:
|
||||
raise err.GitCommitError(f'Git tag already exists: {import_tag}')
|
||||
|
||||
verify = repo.is_dirty()
|
||||
if verify:
|
||||
gitutil.commit(repo, commit_msg)
|
||||
ref = gitutil.tag(repo, import_tag, commit_msg)
|
||||
gitutil.push(repo, ref=ref)
|
||||
return True
|
||||
|
||||
# The most recent commit is assumed to be tagged also. We will not
|
||||
# push. Force the importer to tag manually.
|
||||
print('Nothing to push')
|
||||
return False
|
||||
|
||||
@property
|
||||
def git_url(self):
|
||||
"""
|
||||
Returns git_url
|
||||
"""
|
||||
return self.__git_url
|
||||
|
||||
@property
|
||||
def srpm_path(self):
|
||||
"""
|
||||
Returns srpm_path
|
||||
"""
|
||||
return self.__srpm_path
|
||||
|
||||
@property
|
||||
def srpm_hash(self):
|
||||
"""
|
||||
Returns the sha256sum of an unpacked srpm
|
||||
"""
|
||||
return self.__srpm_hash
|
||||
|
||||
@property
|
||||
def rpm_name(self):
|
||||
"""
|
||||
Returns name of srpm
|
||||
"""
|
||||
return self.__srpm_metadata['name']
|
||||
|
||||
@property
|
||||
def rpm_version(self):
|
||||
"""
|
||||
Returns version of srpm
|
||||
"""
|
||||
return self.__srpm_metadata['version']
|
||||
|
||||
@property
|
||||
def rpm_release(self):
|
||||
"""
|
||||
Returns release of srpm
|
||||
"""
|
||||
return self.__srpm_metadata['release']
|
||||
|
||||
@property
|
||||
def part_of_module(self):
|
||||
"""
|
||||
Returns if part of module
|
||||
"""
|
||||
regex = r'.+\.module\+'
|
||||
dist_tag = self.__srpm_metadata['release']
|
||||
regex_search = re.search(regex, dist_tag)
|
||||
if regex_search:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@property
|
||||
def rpm_name_replace(self):
|
||||
"""
|
||||
Returns a "fixed" version of the RPM name
|
||||
"""
|
||||
new_name = self.__srpm_metadata['name'].replace('+', 'plus')
|
||||
return new_name
|
@ -5,9 +5,15 @@ Mock and mock accessories
|
||||
"""
|
||||
|
||||
# import all thingies here
|
||||
from .config import (DnfConfig, DnfRepoConfig, MockConfig, MockPluginConfig,
|
||||
MockBindMountPluginConfig, MockChrootFileConfig,
|
||||
MockChrootScanPluginConfig, MockMacroConfig,
|
||||
MockMacroFileConfig, MockShowrcPluginConfig)
|
||||
from .config import (DnfConfig,
|
||||
DnfRepoConfig,
|
||||
MockConfig,
|
||||
MockPluginConfig,
|
||||
MockBindMountPluginConfig,
|
||||
MockChrootFileConfig,
|
||||
MockChrootScanPluginConfig,
|
||||
MockMacroConfig,
|
||||
MockMacroFileConfig,
|
||||
MockShowrcPluginConfig)
|
||||
from .error import MockErrorParser
|
||||
from .runner import MockResult, MockRunner, MockErrorResulter
|
||||
|
@ -84,6 +84,17 @@ class ErrorConstants:
|
||||
MOCK_ERR_UNEXPECTED = 9198
|
||||
# Generic error
|
||||
MOCK_ERR_GENERIC = 9199
|
||||
# Git Generic Error
|
||||
GIT_ERR_GENERAL = 9300
|
||||
GIT_ERR_COMMIT = 9301
|
||||
GIT_ERR_PUSH = 9302
|
||||
GIT_ERR_INIT = 9303
|
||||
GIT_ERR_CHECKOUT = 9304
|
||||
|
||||
# RPM errors
|
||||
RPM_ERR_OPEN = 9400
|
||||
RPM_ERR_SIG = 9401
|
||||
RPM_ERR_INFO = 9402
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
class MockConstants:
|
||||
|
@ -116,3 +116,47 @@ class MockSignalReceivedError(MockGenericError):
|
||||
Mock had a SIG received
|
||||
"""
|
||||
fault_code = errconst.MOCK_ERR_BUILD_HUP
|
||||
|
||||
class GitCommitError(GenericError):
|
||||
"""
|
||||
There was an issue pushing to git
|
||||
"""
|
||||
fault_code = errconst.GIT_ERR_COMMIT
|
||||
|
||||
class GitPushError(GenericError):
|
||||
"""
|
||||
There was an issue pushing to git
|
||||
"""
|
||||
fault_code = errconst.GIT_ERR_PUSH
|
||||
|
||||
class GitInitError(GenericError):
|
||||
"""
|
||||
There was an issue pushing to git
|
||||
"""
|
||||
fault_code = errconst.GIT_ERR_INIT
|
||||
|
||||
class GitCheckoutError(GenericError):
|
||||
"""
|
||||
There was an issue pushing to git
|
||||
"""
|
||||
fault_code = errconst.GIT_ERR_CHECKOUT
|
||||
|
||||
class RpmOpenError(GenericError):
|
||||
"""
|
||||
There was an issue opening the RPM
|
||||
"""
|
||||
fault_code = errconst.RPM_ERR_OPEN
|
||||
|
||||
class RpmSigError(GenericError):
|
||||
"""
|
||||
There was an issue opening the RPM because the signature could not be
|
||||
verified
|
||||
"""
|
||||
fault_code = errconst.RPM_ERR_SIG
|
||||
|
||||
class RpmInfoError(GenericError):
|
||||
"""
|
||||
There was an issue opening the RPM because the signature could not be
|
||||
verified
|
||||
"""
|
||||
fault_code = errconst.RPM_ERR_INFO
|
||||
|
@ -4,24 +4,37 @@ File functions
|
||||
|
||||
import os
|
||||
import hashlib
|
||||
import magic
|
||||
from pv2.util import error as err
|
||||
|
||||
# File utilities
|
||||
__all__ = [
|
||||
'filter_files',
|
||||
'get_checksum'
|
||||
'filter_files_inverse',
|
||||
'get_checksum',
|
||||
'get_magic_file',
|
||||
'get_magic_content'
|
||||
]
|
||||
|
||||
def filter_files(directory_path: str, filter_filename: str) -> list:
|
||||
"""
|
||||
Filter out specified files
|
||||
"""
|
||||
# it's literally 101/100 ...
|
||||
# pylint: disable=line-too-long
|
||||
return_list = []
|
||||
for file in os.listdir(directory_path):
|
||||
if filter_filename(file):
|
||||
return_list.append(os.path.join(directory_path, file))
|
||||
for file in os.scandir(directory_path):
|
||||
if filter_filename(file.name):
|
||||
return_list.append(os.path.join(directory_path, file.name))
|
||||
|
||||
return return_list
|
||||
|
||||
def filter_files_inverse(directory_path: str, filter_filename: str) -> list:
|
||||
"""
|
||||
Filter out specified files (inverse)
|
||||
"""
|
||||
return_list = []
|
||||
for file in os.scandir(directory_path):
|
||||
if not filter_filename(file.name):
|
||||
return_list.append(os.path.join(directory_path, file.name))
|
||||
|
||||
return return_list
|
||||
|
||||
@ -53,3 +66,19 @@ def get_checksum(file_path: str, hashtype: str = 'sha256') -> str:
|
||||
return checksum.hexdigest()
|
||||
except IOError as exc:
|
||||
raise err.GenericError(f'Could not open or process file {file_path}: {exc})')
|
||||
|
||||
def get_magic_file(file_path: str):
|
||||
"""
|
||||
Returns the magic data from a file. Use this to get mimetype and other info
|
||||
you'd get by just running `file`
|
||||
"""
|
||||
detect = magic.detect_from_filename(file_path)
|
||||
return detect
|
||||
|
||||
def get_magic_content(data):
|
||||
"""
|
||||
Returns the magic data from content. Use this to get mimetype and other info
|
||||
you'd get by just running `file` on a file (but only pass read file data)
|
||||
"""
|
||||
detect = magic.detect_from_content(data)
|
||||
return detect
|
||||
|
@ -3,6 +3,7 @@ Generic functions
|
||||
"""
|
||||
import datetime
|
||||
import hashlib
|
||||
from urllib.parse import quote as urlquote
|
||||
from pv2.util import error as err
|
||||
|
||||
# General utilities
|
||||
@ -76,3 +77,12 @@ def generate_password_hash(password: str, salt: str, hashtype: str = 'sha256') -
|
||||
hasher = hashlib.new(hashtype)
|
||||
hasher.update((salt + password).encode('utf-8'))
|
||||
return str(hasher.hexdigest())
|
||||
|
||||
def safe_encoding(data: str) -> str:
|
||||
"""
|
||||
Does url quoting for safe encoding
|
||||
"""
|
||||
quoter = urlquote(data)
|
||||
# the urllib library currently doesn't reserve this
|
||||
quoter = quoter.replace('~', '%7e')
|
||||
return quoter
|
||||
|
156
util/gitutil.py
Normal file
156
util/gitutil.py
Normal file
@ -0,0 +1,156 @@
|
||||
# -*-:python; coding:utf-8; -*-
|
||||
# author: Louis Abel <label@rockylinux.org>
|
||||
"""
|
||||
Git Utilities and Accessories
|
||||
"""
|
||||
|
||||
import os
|
||||
import git as rawgit
|
||||
from git import Repo
|
||||
from git import exc as gitexc
|
||||
from pv2.util import error as err
|
||||
|
||||
__all__ = [
|
||||
'add_all',
|
||||
'clone',
|
||||
'commit',
|
||||
'init',
|
||||
'push',
|
||||
'tag',
|
||||
'lsremote'
|
||||
]
|
||||
|
||||
def add_all(repo):
|
||||
"""
|
||||
Add all files to repo
|
||||
"""
|
||||
try:
|
||||
repo.git.add(all=True)
|
||||
except Exception as exc:
|
||||
raise err.GitCommitError('Unable to add files') from exc
|
||||
|
||||
def checkout(repo, branch: str, orphan: bool = False):
|
||||
"""
|
||||
Checkout a branch for some reason or another
|
||||
|
||||
Only set orphan to true if this is a brand new branch that never existed
|
||||
and you want to avoid tracking from another branch.
|
||||
"""
|
||||
|
||||
# We are NOT using repo.heads.NAME.checkout() because it does not play
|
||||
# very well with branches that have dashes in the name
|
||||
try:
|
||||
if orphan:
|
||||
repo.git.checkout('--orphan', branch)
|
||||
else:
|
||||
repo.git.checkout(branch)
|
||||
except repo.git.exc.CheckoutError as exc:
|
||||
raise err.GitCheckoutError('Unable to checkout that branch.') from exc
|
||||
|
||||
def clone(
|
||||
git_url_path: str,
|
||||
repo_name: str,
|
||||
to_path: str = None,
|
||||
branch: str = None
|
||||
):
|
||||
"""
|
||||
clone a repo. if branch is None, it will just clone the repo in general and
|
||||
you'll be expected to checkout.
|
||||
"""
|
||||
if not to_path:
|
||||
clone_path = f'/var/tmp/{repo_name}'
|
||||
|
||||
try:
|
||||
repo = Repo.clone_from(
|
||||
url=git_url_path,
|
||||
to_path=clone_path,
|
||||
branch=branch
|
||||
)
|
||||
# pylint: disable=no-member
|
||||
except gitexc.CommandError as exc:
|
||||
raise err.GitInitError(f'Repo could not be cloned: {exc.stderr}') from exc
|
||||
|
||||
return repo
|
||||
|
||||
def commit(repo, message: str):
|
||||
"""
|
||||
create a commit message (no tag)
|
||||
"""
|
||||
try:
|
||||
repo.index.commit(message=message)
|
||||
# pylint: disable=no-member
|
||||
except gitexc.CommandError as exc:
|
||||
raise err.GitCommitError('Unable to create commit') from exc
|
||||
|
||||
def init(
|
||||
git_url_path: str,
|
||||
repo_name: str,
|
||||
to_path: str = None,
|
||||
branch: str = None
|
||||
):
|
||||
"""
|
||||
init a git repo
|
||||
"""
|
||||
path_way = to_path
|
||||
if not to_path:
|
||||
path_way = f'/var/tmp/{repo_name}'
|
||||
|
||||
if os.path.exists(path_way):
|
||||
raise err.GenericError(f'File or directory already exists: {path_way}')
|
||||
|
||||
try:
|
||||
repo = Repo.init(path_way, initial_branch=branch)
|
||||
repo.create_remote(
|
||||
name='origin',
|
||||
url=git_url_path
|
||||
)
|
||||
# pylint: disable=no-member
|
||||
except gitexc.CommandError as exc:
|
||||
raise err.GitInitError('Could not generate git repository') from exc
|
||||
|
||||
return repo
|
||||
|
||||
|
||||
def push(repo, ref=None):
|
||||
"""
|
||||
push what we want
|
||||
|
||||
if ref is not none (aka an object), we'll push the commit first and
|
||||
then the tag ref, this way the commits and tags are in sync.
|
||||
"""
|
||||
active_branch = f'{repo.active_branch.name}:{repo.active_branch.name}'
|
||||
try:
|
||||
if ref:
|
||||
repo.remote('origin').push(active_branch).raise_if_error()
|
||||
repo.remote('origin').push(ref).raise_if_error()
|
||||
else:
|
||||
repo.remote('origin').push(active_branch).raise_if_error()
|
||||
# pylint: disable=no-member
|
||||
except gitexc.CommandError as exc:
|
||||
raise err.GitPushError('Unable to push commit to remote') from exc
|
||||
|
||||
def tag(repo, tag_name:str, message: str):
|
||||
"""
|
||||
make a tag with message
|
||||
"""
|
||||
ref = repo.create_tag(tag_name, message=message)
|
||||
return ref
|
||||
|
||||
def lsremote(url):
|
||||
"""
|
||||
Helps check if a repo exists, and if it does, return references. If not,
|
||||
return None and assume it doesn't exist.
|
||||
"""
|
||||
remote_refs = {}
|
||||
git_cmd = rawgit.cmd.Git()
|
||||
try:
|
||||
git_cmd.ls_remote(url)
|
||||
# pylint: disable=no-member
|
||||
except gitexc.CommandError as exc:
|
||||
print(f'Repo does not exist or is not accessible: {exc.stderr}')
|
||||
return None
|
||||
|
||||
for ref in git_cmd.ls_remote(url).split('\n'):
|
||||
hash_ref_list = ref.split('\t')
|
||||
remote_refs[hash_ref_list[1]] = hash_ref_list[0]
|
||||
return remote_refs
|
@ -24,6 +24,19 @@ def run_proc_foreground(command: list):
|
||||
|
||||
return processor
|
||||
|
||||
def run_proc_foreground_shell(command: str):
|
||||
"""
|
||||
Takes in the command in the form of a list and runs it via subprocess.
|
||||
Everything should be in the foreground. The return is just for the exit
|
||||
code.
|
||||
"""
|
||||
try:
|
||||
processor = subprocess.run(args=command, shell=True, check=False)
|
||||
except Exception as exc:
|
||||
raise err.GenericError(f'There was an error with your command: {exc}')
|
||||
|
||||
return processor
|
||||
|
||||
def run_proc_no_output(command: list):
|
||||
"""
|
||||
Output will be stored in stdout and stderr as needed.
|
||||
@ -42,6 +55,26 @@ def run_proc_no_output(command: list):
|
||||
|
||||
return processor
|
||||
|
||||
def run_proc_no_output_shell(command: str):
|
||||
"""
|
||||
Output will be stored in stdout and stderr as needed.
|
||||
"""
|
||||
try:
|
||||
if sys.version_info <= (3, 6):
|
||||
processor = subprocess.run(args=command, check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=True,
|
||||
shell=True)
|
||||
else:
|
||||
processor = subprocess.run(args=command, check=False, capture_output=True,
|
||||
text=True, shell=True)
|
||||
except Exception as exc:
|
||||
raise err.GenericError(f'There was an error with your command: {exc}')
|
||||
|
||||
return processor
|
||||
|
||||
|
||||
def popen_proc_no_output(command: list):
|
||||
"""
|
||||
This opens a process, but is non-blocking.
|
||||
|
@ -28,14 +28,16 @@ __all__ = [
|
||||
'get_exclu_from_package',
|
||||
'get_rpm_hdr_size',
|
||||
'split_rpm_by_header',
|
||||
'get_all_rpm_header_keys'
|
||||
'get_all_rpm_header_keys',
|
||||
'verify_rpm_signature',
|
||||
'add_rpm_key'
|
||||
]
|
||||
|
||||
# NOTES TO THOSE RUNNING PYLINT OR ANOTHER TOOL
|
||||
#
|
||||
# It is normal that your linter will say that "rpm" does not have some sort of
|
||||
# RPMTAG member or otherwise. You will find when you run this module in normal
|
||||
# circumstances, everything is returned as normal. You are free to ignore all
|
||||
# circumstances, everything is returned as normal. You are free to ignore those
|
||||
# linting errors.
|
||||
|
||||
def is_debug_package(file_name: str) -> bool:
|
||||
@ -57,7 +59,7 @@ def is_debug_package(file_name: str) -> bool:
|
||||
|
||||
return bool(re.search(r'-debug(info|source)', file_name))
|
||||
|
||||
def get_rpm_header(file_name: str):
|
||||
def get_rpm_header(file_name: str, verify_signature: bool = False):
|
||||
"""
|
||||
Gets RPM header metadata. This is a vital component to getting RPM
|
||||
information for usage later.
|
||||
@ -69,11 +71,18 @@ def get_rpm_header(file_name: str):
|
||||
raise err.GenericError("You must have the rpm python bindings installed")
|
||||
|
||||
trans_set = rpm.TransactionSet()
|
||||
# this is harmless.
|
||||
# pylint: disable=protected-access
|
||||
trans_set.setVSFlags(rpm._RPMVSF_NOSIGNATURES | rpm._RPMVSF_NODIGESTS)
|
||||
if not verify_signature:
|
||||
# this is harmless.
|
||||
# pylint: disable=protected-access
|
||||
trans_set.setVSFlags(rpm._RPMVSF_NOSIGNATURES | rpm._RPMVSF_NODIGESTS)
|
||||
|
||||
with open(file_name, 'rb') as rpm_package:
|
||||
hdr = trans_set.hdrFromFdno(rpm_package)
|
||||
try:
|
||||
hdr = trans_set.hdrFromFdno(rpm_package)
|
||||
# pylint: disable=no-member
|
||||
except rpm.error as exc:
|
||||
print(exc)
|
||||
raise err.RpmOpenError('RPM could not be opened: Public key is not available.')
|
||||
return hdr
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
@ -196,6 +205,7 @@ def get_rpm_metadata_from_hdr(hdr) -> dict:
|
||||
'release': generic.to_unicode(header_data[rpm.RPMTAG_RELEASE]),
|
||||
'epoch': found_epoch,
|
||||
'arch': pkg_arch,
|
||||
'signature': header_data[rpm.RPMTAG_RSAHEADER],
|
||||
}
|
||||
for key, rpmkey, in (('archivesize', rpm.RPMTAG_ARCHIVESIZE),
|
||||
('packagesize', rpm.RPMTAG_SIZE)):
|
||||
@ -355,3 +365,34 @@ def quick_bump(file_name: str, user: str, comment: str):
|
||||
bumprel = ['rpmdev-bumpspec', '-D', '-u', user, '-c', comment, file_name]
|
||||
success = processor.run_check_call(bumprel)
|
||||
return success
|
||||
|
||||
def verify_rpm_signature(file_name: str) -> bool:
|
||||
"""
|
||||
Returns a boolean on if the RPM signature can be verified by what is
|
||||
currently imported into the RPM keyring.
|
||||
"""
|
||||
trans_set = rpm.TransactionSet()
|
||||
with open(file_name, 'rb') as rpm_package:
|
||||
try:
|
||||
trans_set.hdrFromFdno(rpm_package)
|
||||
# pylint: disable=bare-except
|
||||
except:
|
||||
return False
|
||||
return True
|
||||
|
||||
def add_rpm_key(file_name: str):
|
||||
"""
|
||||
Adds a RPM signing signature to the keyring
|
||||
"""
|
||||
with open(file_name, 'rb') as key:
|
||||
keydata = key.read()
|
||||
keydata.close()
|
||||
|
||||
try:
|
||||
# pylint: disable=no-member
|
||||
pubkey = rpm.pubkey(keydata)
|
||||
keyring = rpm.keyring()
|
||||
keyring.addKey(pubkey)
|
||||
# pylint: disable=no-member
|
||||
except rpm.error as exc:
|
||||
raise err.RpmSigError(f'Unable to import signature: {exc}')
|
||||
|
Loading…
Reference in New Issue
Block a user