mirror of
https://github.com/resf/distro-tools.git
synced 2024-11-24 14:11:26 +00:00
421 lines
12 KiB
Python
421 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import hashlib
|
|
import gzip
|
|
from dataclasses import dataclass
|
|
import time
|
|
from urllib.parse import quote
|
|
from xml.etree import ElementTree as ET
|
|
|
|
import aiohttp
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("apollo_tree")
|
|
|
|
NS = {
|
|
"": "http://linux.duke.edu/metadata/repo",
|
|
"rpm": "http://linux.duke.edu/metadata/rpm"
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Repository:
|
|
base_repomd: str = None
|
|
source_repomd: str = None
|
|
debug_repomd: str = None
|
|
arch: str = None
|
|
|
|
|
|
async def scan_path(
|
|
base_path: str,
|
|
fmt: str,
|
|
ignore_repos: list[str],
|
|
ignore_arches: list[str],
|
|
):
|
|
"""
|
|
Scan base path for repositories
|
|
The format string can contain $reponame and $arch
|
|
When we reach $reponame, that means we have found a repository
|
|
Follow the path further into the tree until $arch is found
|
|
That determines the architecture of the repository
|
|
"""
|
|
|
|
repos = {}
|
|
|
|
# First we need to find the root.
|
|
# Construct root by prepending base_path and all parts until $reponame
|
|
# Then we can walk the tree from there
|
|
root = base_path
|
|
parts = fmt.split("/")
|
|
repo_first = True
|
|
if "$reponame" in parts:
|
|
for part in parts:
|
|
parts.pop(0)
|
|
if part == "$reponame":
|
|
break
|
|
if part == "$arch":
|
|
repo_first = False
|
|
break
|
|
root = os.path.join(root, part)
|
|
|
|
logger.info("Found root: %s", root)
|
|
|
|
# Walk the base path
|
|
for directory in os.listdir(root):
|
|
current_parts = parts
|
|
if repo_first:
|
|
repo_name = directory
|
|
if repo_name in ignore_repos:
|
|
logger.info("Ignoring repo: %s", repo_name)
|
|
continue
|
|
logger.info("Found repo: %s", repo_name)
|
|
else:
|
|
arch = directory
|
|
if arch in ignore_arches:
|
|
logger.info("Ignoring arch: %s", arch)
|
|
continue
|
|
logger.info("Found arch: %s", arch)
|
|
repo_base = os.path.join(root, directory)
|
|
|
|
if repo_first:
|
|
repos[repo_name] = []
|
|
|
|
# Construct repo base until we reach $arch
|
|
if "$arch" in current_parts:
|
|
for part in current_parts:
|
|
if (part == "$arch" and repo_first) or part == "$reponame":
|
|
break
|
|
repo_base = os.path.join(repo_base, part)
|
|
current_parts.pop(0)
|
|
|
|
logger.warning("Searching for arches in %s", repo_base)
|
|
|
|
if not os.path.isdir(repo_base):
|
|
logger.warning("Path is not a directory: %s, skipping", repo_base)
|
|
continue
|
|
|
|
# All dirs in repo_base is an architecture
|
|
for arch_ in os.listdir(repo_base):
|
|
if repo_first:
|
|
arch = arch_
|
|
else:
|
|
repo_name = arch_
|
|
# Now append each combination + rest of parts as repo_info
|
|
if repo_first:
|
|
if arch in ignore_arches:
|
|
logger.info("Ignoring arch: %s", arch)
|
|
continue
|
|
logger.info("Found arch: %s", arch)
|
|
else:
|
|
if repo_name in ignore_repos:
|
|
logger.info("Ignoring repo: %s", repo_name)
|
|
continue
|
|
logger.info("Found repo: %s", repo_name)
|
|
|
|
if repo_first:
|
|
found_path = f"{repo_base}/{arch}/{'/'.join(current_parts[1:])}"
|
|
else:
|
|
found_path = f"{repo_base}/{repo_name}/{'/'.join(current_parts[1:])}"
|
|
|
|
# Verify that the path exists
|
|
if not os.path.exists(found_path):
|
|
logger.warning("Path does not exist: %s, skipping", found_path)
|
|
continue
|
|
|
|
repo = {
|
|
"name": repo_name,
|
|
"arch": arch,
|
|
"found_path": found_path,
|
|
}
|
|
if repo_name not in repos:
|
|
repos[repo_name] = []
|
|
repos[repo_name].append(repo)
|
|
|
|
return repos
|
|
|
|
|
|
async def fetch_updateinfo_from_apollo(
|
|
repo: dict,
|
|
product_name: str,
|
|
api_base: str = None,
|
|
) -> str:
|
|
pname_arch = product_name.replace("$arch", repo["arch"])
|
|
if not api_base:
|
|
api_base = "https://apollo.build.resf.org/api/v3/updateinfo"
|
|
api_url = f"{api_base}/{quote(pname_arch)}/{quote(repo['name'])}/updateinfo.xml"
|
|
api_url += f"?req_arch={repo['arch']}"
|
|
|
|
logger.info("Fetching updateinfo from %s", api_url)
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(api_url) as resp:
|
|
if resp.status != 200 and resp.status != 404:
|
|
logger.warning(
|
|
"Failed to fetch updateinfo from %s, skipping", api_url
|
|
)
|
|
return None
|
|
if resp.status != 200:
|
|
raise Exception(f"Failed to fetch updateinfo from {api_url}")
|
|
return await resp.text()
|
|
|
|
|
|
async def gzip_updateinfo(updateinfo: str) -> dict:
|
|
# Gzip updateinfo, get both open and closed size as
|
|
# well as the sha256sum for both
|
|
|
|
# First get the sha256sum and size of the open updateinfo
|
|
sha256sum = hashlib.sha256(updateinfo.encode("utf-8")).hexdigest()
|
|
size = len(updateinfo)
|
|
|
|
# Then gzip it and get hash and size
|
|
gzipped = gzip.compress(updateinfo.encode("utf-8"), mtime=0)
|
|
gzipped_sha256sum = hashlib.sha256(gzipped).hexdigest()
|
|
gzipped_size = len(gzipped)
|
|
|
|
return {
|
|
"sha256sum": sha256sum,
|
|
"size": size,
|
|
"gzipped_sha256sum": gzipped_sha256sum,
|
|
"gzipped_size": gzipped_size,
|
|
"gzipped": gzipped,
|
|
}
|
|
|
|
|
|
async def write_updateinfo_to_file(
|
|
repomd_xml_path: str, updateinfo: dict
|
|
) -> str:
|
|
# Write updateinfo to file
|
|
repomd_dir = os.path.dirname(repomd_xml_path)
|
|
gzipped_sum = updateinfo["gzipped_sha256sum"]
|
|
updateinfo_path = os.path.join(
|
|
repomd_dir, f"{gzipped_sum}-updateinfo.xml.gz"
|
|
)
|
|
with open(updateinfo_path, "wb") as f:
|
|
f.write(updateinfo["gzipped"])
|
|
|
|
return updateinfo_path
|
|
|
|
|
|
async def update_repomd_xml(repomd_xml_path: str, updateinfo: dict):
|
|
# Update repomd.xml with new updateinfo
|
|
gzipped_sum = updateinfo["gzipped_sha256sum"]
|
|
updateinfo_path = f"{gzipped_sum}-updateinfo.xml.gz"
|
|
|
|
# Parse repomd.xml
|
|
ET.register_namespace("", NS[""])
|
|
repomd_xml = ET.parse(repomd_xml_path).getroot()
|
|
|
|
# Iterate over data and find type="updateinfo" and delete it
|
|
existing_updateinfo_path = None
|
|
for data in repomd_xml.findall("data", NS):
|
|
data_type = data.attrib["type"]
|
|
if not data_type:
|
|
logger.warning("No type found in data, skipping")
|
|
continue
|
|
if data_type == "updateinfo":
|
|
# Get the location of the updateinfo file
|
|
location = data.find("location", NS)
|
|
location_href = location.attrib["href"]
|
|
existing_updateinfo_path = os.path.abspath(
|
|
os.path.join(repomd_xml_path, "../..", location_href)
|
|
)
|
|
|
|
# Delete the data element
|
|
repomd_xml.remove(data)
|
|
|
|
break
|
|
|
|
# Create new data element and set type to updateinfo
|
|
data = ET.Element("data")
|
|
data.set("type", "updateinfo")
|
|
|
|
# Add checksum, open-checksum, location, timestamp, size and open-size
|
|
checksum = ET.SubElement(data, "checksum")
|
|
checksum.set("type", "sha256")
|
|
checksum.text = updateinfo["gzipped_sha256sum"]
|
|
|
|
open_checksum = ET.SubElement(data, "open-checksum")
|
|
open_checksum.set("type", "sha256")
|
|
open_checksum.text = updateinfo["sha256sum"]
|
|
|
|
location = ET.SubElement(data, "location")
|
|
location.set("href", f"repodata/{updateinfo_path}")
|
|
|
|
timestamp = ET.SubElement(data, "timestamp")
|
|
timestamp.text = str(int(time.time()))
|
|
|
|
size = ET.SubElement(data, "size")
|
|
size.text = str(updateinfo["gzipped_size"])
|
|
|
|
open_size = ET.SubElement(data, "open-size")
|
|
open_size.text = str(updateinfo["size"])
|
|
|
|
# Add data to repomd.xml
|
|
repomd_xml.append(data)
|
|
|
|
# Create string
|
|
ET.indent(repomd_xml)
|
|
xml_str = ET.tostring(
|
|
repomd_xml,
|
|
xml_declaration=True,
|
|
encoding="utf-8",
|
|
short_empty_elements=True,
|
|
)
|
|
|
|
# Prepend declaration with double quotes
|
|
xml_str = xml_str.decode("utf-8")
|
|
xml_str = xml_str.replace("'", "\"")
|
|
xml_str = xml_str.replace("utf-8", "UTF-8")
|
|
|
|
# "Fix" closing tags to not have a space
|
|
xml_str = xml_str.replace(" />", "/>")
|
|
|
|
# Add xmlns:rpm
|
|
xml_str = xml_str.replace(
|
|
"repo\">",
|
|
f"repo\" xmlns:rpm=\"{NS['rpm']}\">",
|
|
)
|
|
|
|
# Write to repomd.xml
|
|
logger.info("Writing to %s", repomd_xml_path)
|
|
with open(repomd_xml_path, "w", encoding="utf-8") as f:
|
|
f.write(xml_str)
|
|
|
|
# Delete old updateinfo file if not the same as the new one
|
|
if existing_updateinfo_path:
|
|
updinfo_base = os.path.basename(existing_updateinfo_path)
|
|
if updinfo_base != updateinfo_path:
|
|
try:
|
|
logger.info("Deleting %s", existing_updateinfo_path)
|
|
os.remove(existing_updateinfo_path)
|
|
except FileNotFoundError:
|
|
logger.warning("File %s not found", existing_updateinfo_path)
|
|
|
|
|
|
async def run_apollo_tree(
|
|
base_format: str,
|
|
manual: bool,
|
|
auto_scan: bool,
|
|
path: str,
|
|
ignore: list[str],
|
|
ignore_arch: list[str],
|
|
product_name: str,
|
|
):
|
|
if manual:
|
|
raise Exception("Manual mode not implemented yet")
|
|
|
|
if auto_scan:
|
|
repos = await scan_path(
|
|
path,
|
|
base_format,
|
|
ignore,
|
|
ignore_arch,
|
|
)
|
|
|
|
for _, repo_variants in repos.items():
|
|
for repo in repo_variants:
|
|
updateinfo = await fetch_updateinfo_from_apollo(
|
|
repo,
|
|
product_name,
|
|
)
|
|
if not updateinfo:
|
|
logger.warning("No updateinfo found for %s", repo["name"])
|
|
continue
|
|
|
|
gzipped = await gzip_updateinfo(updateinfo)
|
|
await write_updateinfo_to_file(
|
|
repo["found_path"],
|
|
gzipped,
|
|
)
|
|
await update_repomd_xml(
|
|
repo["found_path"],
|
|
gzipped,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
prog="apollo_tree",
|
|
description="Apollo updateinfo.xml publisher (Local file tree)",
|
|
epilog="(C) 2023 Rocky Enterprise Software Foundation, Inc.",
|
|
)
|
|
parser.add_argument(
|
|
"-b",
|
|
"--base-format",
|
|
default="$reponame/$arch/os/repodata/repomd.xml",
|
|
help="Format for main repo.xml file",
|
|
)
|
|
parser.add_argument(
|
|
"-m",
|
|
"--manual",
|
|
action="store_true",
|
|
help="Manual mode",
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--repos",
|
|
nargs="+",
|
|
action="append",
|
|
default=[],
|
|
help="Repositories to publish (manual mode), format: <arch>:<repomd>",
|
|
)
|
|
parser.add_argument(
|
|
"-a",
|
|
"--auto-scan",
|
|
default=True,
|
|
action="store_true",
|
|
help="Automatically scan for repos",
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--path",
|
|
help="Default path to scan for repos",
|
|
)
|
|
parser.add_argument(
|
|
"-i",
|
|
"--ignore",
|
|
nargs="+",
|
|
action="append",
|
|
default=[],
|
|
help="Repos to ignore in auto-scan mode",
|
|
)
|
|
parser.add_argument(
|
|
"-x",
|
|
"--ignore-arch",
|
|
nargs="+",
|
|
action="append",
|
|
default=[],
|
|
help="Arches to ignore in auto-scan mode",
|
|
)
|
|
parser.add_argument(
|
|
"-n",
|
|
"--product-name",
|
|
required=True,
|
|
help="Product name",
|
|
)
|
|
|
|
p_args = parser.parse_args()
|
|
if p_args.auto_scan and p_args.manual:
|
|
parser.error("Cannot use --auto-scan and --manual together")
|
|
|
|
if p_args.manual and not p_args.repos:
|
|
parser.error("Must specify repos to publish in manual mode")
|
|
|
|
if p_args.auto_scan and not p_args.path:
|
|
parser.error("Must specify path to scan for repos in auto-scan mode")
|
|
|
|
asyncio.run(
|
|
run_apollo_tree(
|
|
p_args.base_format,
|
|
p_args.manual,
|
|
p_args.auto_scan,
|
|
p_args.path,
|
|
[y for x in p_args.ignore for y in x],
|
|
[y for x in p_args.ignore_arch for y in x],
|
|
p_args.product_name,
|
|
)
|
|
)
|