toolkit/iso/empanadas/empanadas/util/shared.py

976 lines
30 KiB
Python
Raw Normal View History

# These are shared utilities used
import os
import json
import hashlib
2022-07-03 07:19:13 +00:00
import shlex
import subprocess
2022-07-01 20:28:24 +00:00
import yaml
import requests
import boto3
import xmltodict
2022-06-28 14:49:23 +00:00
import productmd.treeinfo
2022-07-11 08:06:26 +00:00
import productmd.composeinfo
import empanadas
import kobo.shortcuts
2022-07-03 07:19:13 +00:00
from empanadas.common import Color
2022-06-30 20:14:27 +00:00
class ArchCheck:
"""
Arches and their files
"""
archfile = {
'x86_64': [
'isolinux/vmlinuz',
'images/grub.conf',
'EFI/BOOT/BOOTX64.EFI'
],
'aarch64': [
'EFI/BOOT/BOOTAA64.EFI'
],
'ppc64le': [
'ppc/bootinfo.txt',
'ppc/ppc64/vmlinuz'
],
's390x': [
'generic.ins',
'images/generic.prm'
]
}
class Shared:
"""
Quick utilities that may be commonly used
"""
@staticmethod
def get_checksum(path, hashtype, logger):
"""
Generates a checksum from the provided path by doing things in chunks.
This way we don't do it in memory.
"""
try:
checksum = hashlib.new(hashtype)
except ValueError:
logger.error("Invalid hash type: %s" % hashtype)
return False
try:
input_file = open(path, "rb")
except IOError as e:
logger.error("Could not open file %s: %s" % (path, e))
return False
while True:
chunk = input_file.read(8192)
if not chunk:
break
checksum.update(chunk)
input_file.close()
stat = os.stat(path)
base = os.path.basename(path)
# This emulates our current syncing scripts that runs stat and
# sha256sum and what not with a very specific output.
return "%s: %s bytes\n%s (%s) = %s\n" % (
base,
stat.st_size,
hashtype.upper(),
base,
checksum.hexdigest()
)
2022-06-28 14:49:23 +00:00
@staticmethod
def treeinfo_new_write(
file_path,
distname,
shortname,
release,
arch,
time,
repo
):
"""
Writes really basic treeinfo, this is for single repository treeinfo
data. This is usually called in the case of a fresh run and each repo
needs one. This basic info may be overwritten later either by lorax
data or a full run.
2022-06-28 14:49:23 +00:00
"""
ti = productmd.treeinfo.TreeInfo()
ti.release.name = distname
ti.release.short = shortname
ti.release.version = release
ti.tree.arch = arch
ti.tree.build_timestamp = time
# Variants (aka repos)
variant = productmd.treeinfo.Variant(ti)
variant.id = repo
variant.uid = repo
variant.name = repo
variant.type = "variant"
2022-06-30 10:06:29 +00:00
variant.paths.repository = "."
variant.paths.packages = "Packages"
2022-06-28 14:49:23 +00:00
ti.variants.add(variant)
ti.dump(file_path)
@staticmethod
2022-07-04 06:42:35 +00:00
def treeinfo_modify_write(data, imagemap, logger):
2022-06-28 14:49:23 +00:00
"""
Modifies a specific treeinfo with already available data. This is in
the case of modifying treeinfo for primary repos or images.
"""
2022-07-04 04:00:57 +00:00
arch = data['arch']
variant = data['variant']
variant_path = data['variant_path']
checksum = data['checksum']
distname = data['distname']
fullname = data['fullname']
shortname = data['shortname']
release = data['release']
timestamp = data['timestamp']
os_or_ks = ''
2022-07-05 15:43:57 +00:00
if '/os' in variant_path or not imagemap['disc']:
2022-07-04 04:00:57 +00:00
os_or_ks = 'os'
2022-07-05 15:43:57 +00:00
if '/kickstart' in variant_path:
2022-07-04 04:00:57 +00:00
os_or_ks = 'kickstart'
image = os.path.join(variant_path)
treeinfo = os.path.join(image, '.treeinfo')
discinfo = os.path.join(image, '.discinfo')
mediarepo = os.path.join(image, 'media.repo')
#imagemap = self.iso_map['images'][variant]
primary = imagemap['variant']
repos = imagemap['repos']
is_disc = False
if imagemap['disc']:
is_disc = True
discnum = 1
# load up productmd
ti = productmd.treeinfo.TreeInfo()
ti.load(treeinfo)
# Set the name
ti.release.name = distname
ti.release.short = shortname
# Set the version (the initial lorax run does this, but we are setting
# it just in case)
ti.release.version = release
# Assign the present images into a var as a copy. For each platform,
# clear out the present dictionary. For each item and path in the
# assigned var, assign it back to the platform dictionary. If the path
# is empty, continue. Do checksums afterwards.
plats = ti.images.images.copy()
for platform in ti.images.images:
ti.images.images[platform] = {}
for i, p in plats[platform].items():
if not p:
continue
if 'boot.iso' in i and is_disc:
continue
ti.images.images[platform][i] = p
ti.checksums.add(p, checksum, root_dir=image)
# stage2 checksums
if ti.stage2.mainimage:
ti.checksums.add(ti.stage2.mainimage, checksum, root_dir=image)
if ti.stage2.instimage:
ti.checksums.add(ti.stage2.instimage, checksum, root_dir=image)
# If we are a disc, set the media section appropriately.
if is_disc:
ti.media.discnum = discnum
ti.media.totaldiscs = discnum
# Create variants
# Note to self: There's a lot of legacy stuff running around for
# Fedora, ELN, and RHEL in general. This is the general structure,
# apparently. But there could be a chance it'll change. We may need to
# put in a configuration to deal with it at some point.
#ti.variants.variants.clear()
for y in repos:
if y in ti.variants.variants.keys():
vari = ti.variants.variants[y]
else:
vari = productmd.treeinfo.Variant(ti)
vari.id = y
vari.uid = y
vari.name = y
vari.type = "variant"
if is_disc:
vari.paths.repository = y
vari.paths.packages = y + "/Packages"
else:
if y == primary:
vari.paths.repository = "."
vari.paths.packages = "Packages"
else:
vari.paths.repository = "../../../" + y + "/" + arch + "/" + os_or_ks
vari.paths.packages = "../../../" + y + "/" + arch + "/" + os_or_ks + "/Packages"
if y not in ti.variants.variants.keys():
ti.variants.add(vari)
del vari
# Set default variant
2022-07-04 06:42:35 +00:00
logger.info('Writing treeinfo')
2022-07-04 04:00:57 +00:00
ti.dump(treeinfo, main_variant=primary)
# Set discinfo
2022-07-04 06:42:35 +00:00
logger.info('Writing discinfo')
2022-07-04 04:00:57 +00:00
Shared.discinfo_write(timestamp, fullname, arch, discinfo)
# Set media.repo
2022-07-04 06:42:35 +00:00
logger.info('Writing media.repo')
2022-07-04 04:00:57 +00:00
Shared.media_repo_write(timestamp, fullname, mediarepo)
2022-06-28 14:49:23 +00:00
@staticmethod
def write_metadata(
timestamp,
datestamp,
fullname,
release,
compose_id,
file_path
):
metadata = {
"header": {
"name": "empanadas",
"version": empanadas.__version__,
"type": "toolkit",
"maintainer": "SIG/Core"
},
"payload": {
"compose": {
"date": datestamp,
"id": compose_id,
"fullname": fullname,
"release": release,
"timestamp": timestamp
}
}
}
2022-07-01 20:28:24 +00:00
with open(file_path + ".json", "w+") as fp:
json.dump(metadata, fp, indent=4)
fp.close()
with open(file_path + ".yaml", "w+") as yp:
yaml.dump(metadata, yp)
yp.close()
2022-06-28 14:49:23 +00:00
@staticmethod
def discinfo_write(timestamp, fullname, arch, file_path):
"""
Ensure discinfo is written correctly
"""
data = [
"%s" % timestamp,
"%s" % fullname,
"%s" % arch,
2022-06-30 22:38:50 +00:00
"ALL",
""
]
with open(file_path, "w+") as f:
f.write("\n".join(data))
f.close()
@staticmethod
def media_repo_write(timestamp, fullname, file_path):
"""
Ensure media.repo exists
"""
data = [
"[InstallMedia]",
"name=%s" % fullname,
"mediaid=%s" % timestamp,
"metadata_expire=-1",
"gpgcheck=0",
"cost=500",
"",
]
with open(file_path, "w") as f:
f.write("\n".join(data))
2022-07-03 07:19:13 +00:00
@staticmethod
def generate_compose_dirs(
compose_base,
shortname,
version,
date_stamp,
logger
) -> str:
"""
Generate compose dirs for full runs
"""
compose_base_dir = os.path.join(
compose_base,
"{}-{}-{}".format(
shortname,
version,
date_stamp
)
)
logger.info('Creating compose directory %s' % compose_base_dir)
if not os.path.exists(compose_base_dir):
os.makedirs(compose_base_dir)
2022-07-11 22:12:41 +00:00
os.makedirs(compose_base_dir + '/work')
os.makedirs(compose_base_dir + '/work/entries')
os.makedirs(compose_base_dir + '/work/logs')
os.makedirs(compose_base_dir + '/compose')
2022-07-03 07:19:13 +00:00
return compose_base_dir
@staticmethod
def podman_cmd(logger) -> str:
"""
This generates the podman run command. This is in the case that we want
to do reposyncs in parallel as we cannot reasonably run multiple
instances of dnf reposync on a single system.
"""
cmd = None
if os.path.exists("/usr/bin/podman"):
cmd = "/usr/bin/podman"
else:
logger.error(Color.FAIL + '/usr/bin/podman was not found. Good bye.')
2022-07-03 07:19:13 +00:00
raise SystemExit("\n\n/usr/bin/podman was not found.\n\nPlease "
" ensure that you have installed the necessary packages on "
" this system. " + Color.BOLD + "Note that docker is not "
"supported." + Color.END
)
return cmd
@staticmethod
def reposync_cmd(logger) -> str:
"""
This generates the reposync command. We don't support reposync by
itself and will raise an error.
:return: The path to the reposync command. If dnf exists, we'll use
that. Otherwise, fail immediately.
"""
cmd = None
if os.path.exists("/usr/bin/dnf"):
cmd = "/usr/bin/dnf reposync"
else:
logger(Color.FAIL + '/usr/bin/dnf was not found. Good bye.')
2022-07-03 07:19:13 +00:00
raise SystemExit("/usr/bin/dnf was not found. \n\n/usr/bin/reposync "
"is not sufficient and you are likely running on an el7 "
"system or a grossly modified EL8+ system, " + Color.BOLD +
"which tells us that you probably made changes to these tools "
"expecting them to work and got to this point." + Color.END)
return cmd
@staticmethod
def git_cmd(logger) -> str:
"""
This generates the git command. This is when we need to pull down extra
files or do work from a git repository.
"""
cmd = None
if os.path.exists("/usr/bin/git"):
cmd = "/usr/bin/git"
else:
logger.error(Color.FAIL + '/usr/bin/git was not found. Good bye.')
2022-07-03 07:19:13 +00:00
raise SystemExit("\n\n/usr/bin/git was not found.\n\nPlease "
" ensure that you have installed the necessary packages on "
" this system. "
)
return cmd
@staticmethod
def mock_cmd(logger) -> str:
"""
This generates the mock command. This is when we are building or
performing any kind of operation in mock.
"""
cmd = None
if os.path.exists("/usr/bin/mock"):
cmd = "/usr/bin/mock"
else:
logger.error(Color.FAIL + '/usr/bin/mock was not found. Good bye.')
raise SystemExit("\n\n/usr/bin/mock was not found.\n\nPlease "
" ensure that you have installed the necessary packages on "
" this system. "
)
return cmd
2022-07-03 07:19:13 +00:00
@staticmethod
def generate_conf(
shortname,
major_version,
repos,
repo_base_url,
project_id,
hashed,
extra_files,
gpgkey,
templates,
logger,
dest_path='/var/tmp'
) -> str:
2022-07-03 07:19:13 +00:00
"""
Generates the necessary repo conf file for the operation. This repo
file should be temporary in nature. This will generate a repo file
with all repos by default. If a repo is chosen for sync, that will be
the only one synced.
:param dest_path: The destination where the temporary conf goes
:param repo: The repo object to create a file for
"""
fname = os.path.join(
dest_path,
"{}-{}-config.repo".format(shortname, major_version)
2022-07-03 07:19:13 +00:00
)
logger.info('Generating the repo configuration: %s' % fname)
2022-07-03 07:19:13 +00:00
if repo_base_url.startswith("/"):
2022-07-03 07:19:13 +00:00
logger.error("Local file syncs are not supported.")
raise SystemExit(Color.BOLD + "Local file syncs are not "
"supported." + Color.END)
prehashed = ''
if hashed:
2022-07-03 07:19:13 +00:00
prehashed = "hashed-"
# create dest_path
if not os.path.exists(dest_path):
os.makedirs(dest_path, exist_ok=True)
config_file = open(fname, "w+")
repolist = []
for repo in repos:
2022-07-03 07:19:13 +00:00
constructed_url = '{}/{}/repo/{}{}/$basearch'.format(
repo_base_url,
project_id,
2022-07-03 07:19:13 +00:00
prehashed,
repo,
)
constructed_url_src = '{}/{}/repo/{}{}/src'.format(
repo_base_url,
project_id,
2022-07-03 07:19:13 +00:00
prehashed,
repo,
)
repodata = {
'name': repo,
'baseurl': constructed_url,
'srcbaseurl': constructed_url_src,
'gpgkey': extra_files['git_raw_path'] + extra_files['gpg'][gpgkey]
2022-07-03 07:19:13 +00:00
}
repolist.append(repodata)
template = templates.get_template('repoconfig.tmpl')
2022-07-03 07:19:13 +00:00
output = template.render(repos=repolist)
config_file.write(output)
config_file.close()
return fname
@staticmethod
def quick_sync(src, dest, logger, tmp_dir):
"""
Does a quick sync from one place to another. This determines the method
in which will be used. We will look for fpsync and fall back to
parallel | rsync if that is also available. It will fail if parallel is
not available.
Return true or false on completion?
"""
@staticmethod
def simple_sync(src, dest):
"""
This is for simple syncs only, using rsync or copytree.
"""
@staticmethod
2022-07-04 04:00:57 +00:00
def fpsync_method(src, dest, tmp_dir):
2022-07-03 07:19:13 +00:00
"""
Returns a list for the fpsync command
"""
cmd = '/usr/bin/fpsync'
rsync_switches = '-av --numeric-ids --no-compress --chown=10004:10005'
if not os.path.exists(cmd):
2022-07-04 04:00:57 +00:00
message = 'fpsync not found'
retval = 1
return message, retval
2022-07-03 07:19:13 +00:00
os.makedirs(tmp_dir, exist_ok=True)
fpsync_cmd = '{} -o "{}" -n 18 -t {} {} {}'.format(
cmd,
rsync_switches,
tmp_dir,
src,
dest
)
process = subprocess.call(
shlex.split(fpsync_cmd),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
2022-07-04 04:00:57 +00:00
2022-07-03 07:19:13 +00:00
if process != 0:
2022-07-04 04:00:57 +00:00
message = 'Syncing (fpsync) failed'
retval = process
return message, retval
2022-07-03 07:19:13 +00:00
if os.path.exists(dest):
2022-07-04 04:00:57 +00:00
message = 'Syncing (fpsync) succeeded'
retval = process
2022-07-03 07:19:13 +00:00
else:
2022-07-04 04:00:57 +00:00
message = 'Path synced does not seem to exist for some reason.'
retval = 1
#shutil.rmtree(tmp_dir)
2022-07-04 04:00:57 +00:00
return message, retval
2022-07-03 07:19:13 +00:00
@staticmethod
2022-07-04 04:00:57 +00:00
def rsync_method(src, dest):
2022-07-03 07:19:13 +00:00
"""
Returns a string for the rsync command plus parallel. Yes, this is a
hack.
"""
2022-07-04 04:00:57 +00:00
find_cmd = '/usr/bin/find'
parallel_cmd = '/usr/bin/parallel'
rsync_cmd = '/usr/bin/rsync'
switches = '-av --chown=10004:10005 --progress --relative --human-readable'
os.makedirs(dest, exist_ok=True)
2022-07-04 07:53:39 +00:00
return 'Not available', 1
@staticmethod
def s3_determine_latest(s3_bucket, release, arches, filetype, name, logger):
"""
Using native s3, determine the latest artifacts and return a dict
"""
temp = []
data = {}
s3 = boto3.client('s3')
try:
s3.list_objects(Bucket=s3_bucket)['Contents']
except:
logger.error(
'[' + Color.BOLD + Color.RED + 'FAIL' + Color.END + '] ' +
'Cannot access s3 bucket.'
)
raise SystemExit()
for y in s3.list_objects(Bucket=s3_bucket)['Contents']:
if filetype in y['Key'] and release in y['Key'] and name in y['Key']:
temp.append(y['Key'])
for arch in arches:
temps = []
for y in temp:
if arch in y:
temps.append(y)
temps.sort(reverse=True)
if len(temps) > 0:
data[arch] = temps[0]
return data
@staticmethod
def s3_download_artifacts(force_download, s3_bucket, source, dest, logger):
"""
Download the requested artifact(s) via s3
"""
s3 = boto3.client('s3')
if os.path.exists(dest):
if not force_download:
logger.warn(
'[' + Color.BOLD + Color.YELLOW + 'WARN' + Color.END + '] ' +
'Artifact at ' + dest + ' already exists'
)
return
logger.info('Downloading ({}) to: {}'.format(source, dest))
try:
s3.download_file(
Bucket=s3_bucket,
Key=source,
Filename=dest
)
except:
logger.error('There was an issue downloading from %s' % s3_bucket)
@staticmethod
def reqs_determine_latest(s3_bucket_url, release, arches, filetype, name, logger):
"""
Using requests, determine the latest artifacts and return a list
"""
temp = []
data = {}
try:
bucket_data = requests.get(s3_bucket_url)
except requests.exceptions.RequestException as e:
logger.error('The s3 bucket http endpoint is inaccessible')
raise SystemExit(e)
resp = xmltodict.parse(bucket_data.content)
for y in resp['ListBucketResult']['Contents']:
if filetype in y['Key'] and release in y['Key'] and name in y['Key']:
temp.append(y['Key'])
for arch in arches:
temps = []
for y in temp:
if arch in y:
temps.append(y)
temps.sort(reverse=True)
if len(temps) > 0:
data[arch] = temps[0]
return data
@staticmethod
def reqs_download_artifacts(force_download, s3_bucket_url, source, dest, logger):
"""
Download the requested artifact(s) via requests only
"""
if os.path.exists(dest):
if not force_download:
logger.warn(
'[' + Color.BOLD + Color.YELLOW + 'WARN' + Color.END + '] ' +
'Artifact at ' + dest + ' already exists'
)
return
unurl = s3_bucket_url + '/' + source
logger.info('Downloading ({}) to: {}'.format(source, dest))
try:
with requests.get(unurl, allow_redirects=True) as r:
with open(dest, 'wb') as f:
f.write(r.content)
f.close()
r.close()
except requests.exceptions.RequestException as e:
logger.error('There was a problem downloading the artifact')
raise SystemExit(e)
# ISO related
@staticmethod
def get_boot_options(arch, createfrom, efi=True, hfs_compat=False):
"""
Gets boot options based on architecture, the iso commands are not
universal.
"""
if arch in ("armhfp",):
result = []
return result
if arch in ("aarch64",):
result = [
"-eltorito-alt-boot",
"-e",
"images/efiboot.img",
"-no-emul-boot",
]
return result
if arch in ("i386", "i686", "x86_64"):
result = [
"-b",
"isolinux/isolinux.bin",
"-c",
"isolinux/boot.cat",
"-no-emul-boot",
"-boot-load-size",
"4",
"-boot-info-table",
]
# EFI args
if arch == "x86_64":
result.extend(
[
"-eltorito-alt-boot",
"-e",
"images/efiboot.img",
"-no-emul-boot"
]
)
return result
# need to go double check if this is needed with stream 9
if arch == "ppc64le" and hfs_compat:
result = [
"-part",
"-hfs",
"-r",
"-l",
"-sysid",
"PPC",
"-no-desktop",
"-allow-multidot",
"-chrp-boot",
"-map",
os.path.join(createfrom, "mapping"),
"-hfs-bless",
"/ppc/mac"
]
return result
if arch == "ppc64le" and not hfs_compat:
result = [
"-r",
"-l",
"-sysid",
"PPC",
"-chrp-boot",
]
return result
if arch in ("s390x",):
result = [
"-eltorito-boot",
"images/cdboot.img",
"-no-emul-boot",
]
return result
raise ValueError("Architecture %s%s%s is NOT known" % (Color.BOLD, arch, Color.END))
@staticmethod
def get_mkisofs_cmd(
iso,
appid=None,
volid=None,
volset=None,
exclude=None,
boot_args=None,
input_charset="utf-8",
grafts=None,
use_xorrisofs=False,
iso_level=None,
):
# I should hardcode this I think
#untranslated_filenames = True
translation_table = True
#joliet = True
#joliet_long = True
#rock = True
cmd = ["/usr/bin/xorrisofs" if use_xorrisofs else "/usr/bin/genisoimage"]
if not os.path.exists(cmd[0]):
#logger.error('%s was not found. Good bye.' % cmd[0])
raise SystemExit("\n\n" + cmd[0] + " was not found.\n\nPlease "
" ensure that you have installed the necessary packages on "
" this system. "
)
if iso_level:
cmd.extend(["-iso-level", str(iso_level)])
if appid:
cmd.extend(["-appid", appid])
#if untranslated_filenames:
cmd.append("-untranslated-filenames")
if volid:
cmd.extend(["-volid", volid])
#if joliet:
cmd.append("-J")
#if joliet_long:
cmd.append("-joliet-long")
if volset:
cmd.extend(["-volset", volset])
#if rock:
cmd.append("-rational-rock")
if not use_xorrisofs and translation_table:
cmd.append("-translation-table")
if input_charset:
cmd.extend(["-input-charset", input_charset])
if exclude:
for i in kobo.shortcuts.force_list(exclude):
cmd.extend(["-x", i])
if boot_args:
cmd.extend(boot_args)
cmd.extend(["-o", iso])
if grafts:
cmd.append("-graft-points")
cmd.extend(["-path-list", grafts])
return cmd
@staticmethod
def get_make_image_cmd(opts, hfs_compat):
"""
Generates the command to actually make the image in the first place
"""
isokwargs = {}
isokwargs["boot_args"] = Shared.get_boot_options(
opts['arch'],
os.path.join("$TEMPLATE", "config_files/ppc"),
hfs_compat=hfs_compat,
)
if opts['arch'] in ("ppc64", "ppc64le"):
isokwargs["input_charset"] = None
if opts['use_xorrisofs']:
cmd = ['/usr/bin/xorriso', '-dialog', 'on', '<', opts['graft_points']]
else:
cmd = Shared.get_mkisofs_cmd(
opts['iso_name'],
volid=opts['volid'],
exclude=["./lost+found"],
grafts=opts['graft_points'],
use_xorrisofs=False,
iso_level=opts['iso_level'],
**isokwargs
)
returned_cmd = ' '.join(cmd)
return returned_cmd
@staticmethod
def get_isohybrid_cmd(opts):
cmd = []
if not opts['use_xorrisofs']:
if opts['arch'] == "x86_64":
cmd = ["/usr/bin/isohybrid"]
cmd.append("--uefi")
cmd.append(opts['iso_name'])
returned_cmd = ' '.join(cmd)
else:
returned_cmd = ''
return returned_cmd
@staticmethod
def get_implantisomd5_cmd(opts):
"""
Implants md5 into iso
"""
cmd = ["/usr/bin/implantisomd5", "--supported-iso", opts['iso_name']]
returned_cmd = ' '.join(cmd)
return returned_cmd
@staticmethod
def get_manifest_cmd(opts):
"""
Gets an ISO manifest
"""
if opts['use_xorrisofs']:
return """/usr/bin/xorriso -dev %s --find |
tail -n+2 |
tr -d "'" |
cut -c2- | sort >> %s.manifest""" % (
shlex.quote(opts['iso_name']),
shlex.quote(opts['iso_name']),
)
else:
return "/usr/bin/isoinfo -R -f -i %s | grep -v '/TRANS.TBL$' | sort >> %s.manifest" % (
shlex.quote(opts['iso_name']),
shlex.quote(opts['iso_name']),
)
2022-07-11 08:06:26 +00:00
@staticmethod
def build_repo_list(
repo_base_url,
repos,
project_id,
current_arch,
compose_latest_sync,
compose_dir_is_here: bool = False,
hashed: bool = False,
):
"""
Builds the repo dictionary
"""
repolist = []
prehashed = ''
if hashed:
prehashed = 'hashed-'
for name in repos:
if not compose_dir_is_here:
constructed_url = '{}/{}/repo/{}{}/{}'.format(
repo_base_url,
project_id,
prehashed,
name,
current_arch
)
else:
constructed_url = 'file://{}/{}/{}/os'.format(
compose_latest_sync,
name,
current_arch
)
repodata = {
'name': name,
'url': constructed_url
}
repolist.append(repodata)
return repolist
@staticmethod
def composeinfo_write(
file_path,
distname,
shortname,
release,
release_type,
datestamp,
arches: list = [],
repos: list = []
):
"""
Write compose info similar to pungi.
arches and repos may be better suited for a dictionary. that is a
future thing we will work on for 0.3.0.
"""
cijson = file_path + '.json'
ciyaml = file_path + '.yaml'
ci = productmd.composeinfo.ComposeInfo()
ci.release.name = distname
ci.release.short = shortname
ci.release.version = release
ci.release.type = release_type
ci.compose.id = '{}-{}-{}'.format(shortname, release, datestamp)
ci.compose.type = "production"
ci.compose.date = datestamp
ci.compose.respin = 0
ci.dump(cijson)
with open(cijson, 'r') as cidump:
jsonData = json.load(cidump)
cidump.close()
with open(ciyaml, 'w+') as ymdump:
yaml.dump(jsonData, ymdump)
ymdump.close()