#!/usr/bin/env python3 import os import argparse import asyncio import logging import hashlib import gzip from dataclasses import dataclass import time from urllib.parse import quote from xml.etree import ElementTree as ET import aiohttp logging.basicConfig(level=logging.INFO) logger = logging.getLogger("apollo_tree") NS = { "": "http://linux.duke.edu/metadata/repo", "rpm": "http://linux.duke.edu/metadata/rpm" } @dataclass class Repository: base_repomd: str = None source_repomd: str = None debug_repomd: str = None arch: str = None async def scan_path( base_path: str, fmt: str, ignore_dirs: list[str], ): """ Scan base path for repositories The format string can contain $reponame and $arch When we reach $reponame, that means we have found a repository Follow the path further into the tree until $arch is found That determines the architecture of the repository """ repos = {} # First we need to find the root. # Construct root by prepending base_path and all parts until $reponame # Then we can walk the tree from there root = base_path parts = fmt.split("/") repo_first = True if "$reponame" in parts: for part in parts: parts.pop(0) if part == "$reponame": break if part == "$arch": repo_first = False break root = os.path.join(root, part) logger.info("Found root: %s", root) # Walk the base path for directory in os.listdir(root): if directory in ignore_dirs: continue current_parts = parts if repo_first: repo_name = directory logger.info("Found repo: %s", repo_name) else: arch = directory logger.info("Found arch: %s", arch) repo_base = os.path.join(root, directory) if repo_first: repos[repo_name] = [] # Construct repo base until we reach $arch if "$arch" in current_parts: for part in current_parts: if (part == "$arch" and repo_first) or part == "$reponame": break repo_base = os.path.join(repo_base, part) current_parts.pop(0) logger.warning("Searching for arches in %s", repo_base) # All dirs in repo_base is an architecture for arch_ in os.listdir(repo_base): if repo_first: arch = arch_ else: repo_name = arch_ # Now append each combination + rest of parts as repo_info if repo_first: logger.info("Found arch: %s", arch) else: logger.info("Found repo: %s", repo_name) if repo_first: found_path = f"{repo_base}/{arch}/{'/'.join(current_parts[1:])}" else: found_path = f"{repo_base}/{repo_name}/{'/'.join(current_parts[1:])}" # Verify that the path exists if not os.path.exists(found_path): logger.warning("Path does not exist: %s, skipping", found_path) continue repo = { "name": repo_name, "arch": arch, "found_path": found_path, } if repo_name not in repos: repos[repo_name] = [] repos[repo_name].append(repo) return repos async def fetch_updateinfo_from_apollo( repo: dict, product_name: str, arch: str = None, api_base: str = None, ) -> str: if not api_base: api_base = "https://apollo.build.resf.org/api/v3/updateinfo" api_url = f"{api_base}/{quote(product_name)}/{quote(repo['name'])}/updateinfo.xml" if arch: api_url += f"?req_arch={arch}" logger.info("Fetching updateinfo from %s", api_url) async with aiohttp.ClientSession() as session: async with session.get(api_url) as resp: if resp.status != 200: logger.error("Failed to fetch updateinfo from %s", api_url) return None return await resp.text() async def gzip_updateinfo(updateinfo: str) -> dict: # Gzip updateinfo, get both open and closed size as # well as the sha256sum for both # First get the sha256sum and size of the open updateinfo sha256sum = hashlib.sha256(updateinfo.encode("utf-8")).hexdigest() size = len(updateinfo) # Then gzip it and get hash and size gzipped = gzip.compress(updateinfo.encode("utf-8"), mtime=0) gzipped_sha256sum = hashlib.sha256(gzipped).hexdigest() gzipped_size = len(gzipped) return { "sha256sum": sha256sum, "size": size, "gzipped_sha256sum": gzipped_sha256sum, "gzipped_size": gzipped_size, "gzipped": gzipped, } async def write_updateinfo_to_file( repomd_xml_path: str, updateinfo: dict ) -> str: # Write updateinfo to file repomd_dir = os.path.dirname(repomd_xml_path) gzipped_sum = updateinfo["gzipped_sha256sum"] updateinfo_path = os.path.join( repomd_dir, f"{gzipped_sum}-updateinfo.xml.gz" ) with open(updateinfo_path, "wb") as f: f.write(updateinfo["gzipped"]) return updateinfo_path async def update_repomd_xml(repomd_xml_path: str, updateinfo: dict): # Update repomd.xml with new updateinfo gzipped_sum = updateinfo["gzipped_sha256sum"] updateinfo_path = f"{gzipped_sum}-updateinfo.xml.gz" # Parse repomd.xml ET.register_namespace("", NS[""]) ET.register_namespace("rpm", NS["rpm"]) repomd_xml = ET.parse(repomd_xml_path).getroot() # Iterate over data and find type="updateinfo" and delete it existing_updateinfo_path = None for data in repomd_xml.findall("data", NS): data_type = data.attrib["type"] if not data_type: logger.warning("No type found in data, skipping") continue if data_type == "updateinfo": # Delete the data element repomd_xml.remove(data) # Get the location of the updateinfo file location = data.find("location", NS) if not location: logger.warning("No location found in data, skipping") continue existing_updateinfo_path = location.attrib["href"] break # Create new data element and set type to updateinfo data = ET.Element("data") data.set("type", "updateinfo") # Add checksum, open-checksum, location, timestamp, size and open-size checksum = ET.SubElement(data, "checksum") checksum.set("type", "sha256") checksum.text = updateinfo["gzipped_sha256sum"] open_checksum = ET.SubElement(data, "open-checksum") open_checksum.set("type", "sha256") open_checksum.text = updateinfo["sha256sum"] location = ET.SubElement(data, "location") location.set("href", f"repodata/{updateinfo_path}") timestamp = ET.SubElement(data, "timestamp") timestamp.text = str(int(time.time())) size = ET.SubElement(data, "size") size.text = str(updateinfo["gzipped_size"]) open_size = ET.SubElement(data, "open-size") open_size.text = str(updateinfo["size"]) # Add data to repomd.xml repomd_xml.append(data) # Create string ET.indent(repomd_xml) xml_str = ET.tostring( repomd_xml, xml_declaration=True, encoding="utf-8", short_empty_elements=True, ) # Prepend declaration with double quotes xml_str = xml_str.decode("utf-8") xml_str = xml_str.replace("'", "\"") xml_str = xml_str.replace("utf-8", "UTF-8") # "Fix" closing tags to not have a space xml_str = xml_str.replace(" />", "/>") # Add xmlns:rpm xml_str = xml_str.replace( "repo\">", f"repo\" xmlns:rpm=\"{NS['rpm']}\">", ) # Write to repomd.xml with open(repomd_xml_path, "w", encoding="utf-8") as f: f.write(xml_str) # Delete old updateinfo file if existing_updateinfo_path: os.remove(existing_updateinfo_path) async def main(args): base_paths = await scan_path( args.path, args.base_format, args.ignore, ) print(args) pass if __name__ == "__main__": parser = argparse.ArgumentParser( prog="apollo_tree", description="Apollo updateinfo.xml publisher (Local file tree)", epilog="(C) 2023 Rocky Enterprise Software Foundation, Inc.", ) parser.add_argument( "-b", "--base-format", default="$reponame/$arch/os/repodata/repomd.xml", help="Format for main repo.xml file", ) parser.add_argument( "-m", "--manual", action="store_true", help="Manual mode", ) parser.add_argument( "-r", "--repos", nargs="+", action="append", default=[], help="Repositories to publish (manual mode), format: :", ) parser.add_argument( "-a", "--auto-scan", default=True, action="store_true", help="Automatically scan for repos", ) parser.add_argument( "-p", "--path", help="Default path to scan for repos", ) parser.add_argument( "-i", "--ignore", nargs="+", action="append", default=[], help="Directories in base path to ignore in auto-scan mode", ) parser.add_argument( "-n", "--product-name", required=True, help="Product name", ) p_args = parser.parse_args() if p_args.auto_scan and p_args.manual: parser.error("Cannot use --auto-scan and --manual together") if p_args.manual and not p_args.repos: parser.error("Must specify repos to publish in manual mode") if p_args.auto_scan and not p_args.path: parser.error("Must specify path to scan for repos in auto-scan mode") asyncio.run(main(p_args))