import datetime from typing import Optional from xml.etree import ElementTree as ET from fastapi import APIRouter, Response from slugify import slugify from apollo.db import AdvisoryAffectedProduct from apollo.server.settings import COMPANY_NAME, MANAGING_EDITOR, UI_URL, get_setting from apollo.rpmworker.repomd import NEVRA_RE, NVRA_RE, EPOCH_RE from common.fastapi import RenderErrorTemplateException router = APIRouter(tags=["updateinfo"]) @router.get("/{product_name}/{repo}/updateinfo.xml") async def get_updateinfo( product_name: str, repo: str, req_arch: Optional[str] = None, ): filters = { "name": product_name, "advisory__packages__repo_name": repo, } if req_arch: filters["arch"] = req_arch affected_products = await AdvisoryAffectedProduct.filter( **filters ).prefetch_related( "advisory", "advisory__cves", "advisory__fixes", "advisory__packages", "supported_product", ).all() if not affected_products: raise RenderErrorTemplateException("No advisories found", 404) ui_url = await get_setting(UI_URL) managing_editor = await get_setting(MANAGING_EDITOR) company_name = await get_setting(COMPANY_NAME) advisories = {} for affected_product in affected_products: advisory = affected_product.advisory if advisory.name not in advisories: advisories[advisory.name] = { "advisory": advisory, "arch": affected_product.arch, "major_version": affected_product.major_version, "minor_version": affected_product.minor_version, "supported_product_name": affected_product.supported_product.name, } tree = ET.Element("updates") for _, adv in advisories.items(): advisory = adv["advisory"] product_arch = adv["arch"] major_version = adv["major_version"] minor_version = adv["minor_version"] supported_product_name = adv["supported_product_name"] update = ET.SubElement(tree, "update") # Set update attributes update.set("from", managing_editor) update.set("status", "final") if advisory.kind == "Security": update.set("type", "security") elif advisory.kind == "Bug Fix": update.set("type", "bugfix") elif advisory.kind == "Enhancement": update.set("type", "enhancement") update.set("version", "2") # Add id ET.SubElement(update, "id").text = advisory.name # Add title ET.SubElement(update, "title").text = advisory.synopsis # Add description ET.SubElement(update, "description").text = advisory.description # Add time time_format = "%Y-%m-%d %H:%M:%S" issued = ET.SubElement(update, "issued") issued.set("date", advisory.published_at.strftime(time_format)) updated = ET.SubElement(update, "updated") updated.set("date", advisory.updated_at.strftime(time_format)) # Add rights now = datetime.datetime.utcnow() ET.SubElement( update, "rights" ).text = f"Copyright {now.year} {company_name}" # Add release name release_name = f"{supported_product_name} {major_version}" if minor_version: release_name += f".{minor_version}" ET.SubElement(update, "release").text = release_name # Add pushcount ET.SubElement(update, "pushcount").text = "1" # Add severity ET.SubElement(update, "severity").text = advisory.severity # Add summary ET.SubElement(update, "summary").text = advisory.topic # Add description ET.SubElement(update, "description").text = advisory.description # Add solution ET.SubElement(update, "solution").text = "" # Add references references = ET.SubElement(update, "references") for cve in advisory.cves: reference = ET.SubElement(references, "reference") reference.set( "href", f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve.cve}", ) reference.set("id", cve.cve) reference.set("type", "cve") reference.set("title", cve.cve) for fix in advisory.fixes: reference = ET.SubElement(references, "reference") reference.set("href", fix.source) reference.set("id", fix.ticket_id) reference.set("type", "bugzilla") reference.set("title", fix.description) # Add UI self reference reference = ET.SubElement(references, "reference") reference.set("href", f"{ui_url}/{advisory.name}") reference.set("id", advisory.name) reference.set("type", "self") reference.set("title", advisory.name) # Add packages packages = ET.SubElement(update, "pkglist") suffixes_to_skip = [ "-debuginfo", "-debugsource", "-debuginfo-common", "-debugsource-common", ] pkg_name_map = {} for pkg in advisory.packages: name = pkg.package_name if pkg.module_name: name = f"{pkg.module_name}:{pkg.package_name}:{pkg.module_stream}" if name not in pkg_name_map: pkg_name_map[name] = [] pkg_name_map[name].append(pkg) pkg_src_rpm = {} for top_pkg in advisory.packages: name = top_pkg.package_name if top_pkg.module_name: name = f"{top_pkg.module_name}:{top_pkg.package_name}:{top_pkg.module_stream}" if name not in pkg_src_rpm: for pkg in pkg_name_map[name]: nvra_no_epoch = EPOCH_RE.sub("", pkg.nevra) nvra = NVRA_RE.search(nvra_no_epoch) if nvra: nvr_name = nvra.group(1) nvr_arch = nvra.group(4) if pkg.package_name == nvr_name and nvr_arch == "src": src_rpm = nvra_no_epoch if not src_rpm.endswith(".rpm"): src_rpm += ".rpm" pkg_src_rpm[name] = src_rpm # Collection list, may be more than one if module RPMs are involved collections = {} no_default_collection = False default_collection_short = slugify(f"{product_name}-{repo}-rpms") # Check if this is an actual module advisory, if so we need to split the # collections, and module RPMs need to go into their own collection based on # module name, while non-module RPMs go into the main collection (if any) for pkg in advisory.packages: if pkg.product_name != product_name: continue if pkg.repo_name != repo: continue if pkg.module_name: collection_short = f"{default_collection_short}__{pkg.module_name}" if collection_short not in collections: collections[collection_short] = { "packages": [], "module_context": pkg.module_context, "module_name": pkg.module_name, "module_stream": pkg.module_stream, "module_version": pkg.module_version, } no_default_collection = True collections[collection_short]["packages"].append(pkg) else: if no_default_collection: continue if default_collection_short not in collections: collections[default_collection_short] = { "packages": [], } collections[default_collection_short]["packages"].append(pkg) if no_default_collection and default_collection_short in collections: del collections[default_collection_short] collections_added = 0 for collection_short, info in collections.items(): # Create collection collection = ET.Element("collection") collection.set("short", collection_short) # Set short to name as well ET.SubElement(collection, "name").text = collection_short if "module_name" in info: module_element = ET.SubElement(collection, "module") module_element.set("name", info["module_name"]) module_element.set("stream", info["module_stream"]) module_element.set("version", info["module_version"]) module_element.set("context", info["module_context"]) module_element.set("arch", product_arch) added_pkg_count = 0 for pkg in info["packages"]: if pkg.nevra.endswith(".src.rpm"): continue name = pkg.package_name epoch = "0" if NEVRA_RE.match(pkg.nevra): nevra = NEVRA_RE.search(pkg.nevra) name = nevra.group(1) epoch = nevra.group(2) version = nevra.group(3) release = nevra.group(4) arch = nevra.group(5) elif NVRA_RE.match(pkg.nevra): nvra = NVRA_RE.search(pkg.nevra) name = nvra.group(1) version = nvra.group(2) release = nvra.group(3) arch = nvra.group(4) else: continue p_name = pkg.package_name if pkg.module_name: p_name = f"{pkg.module_name}:{pkg.package_name}:{pkg.module_stream}" if p_name not in pkg_src_rpm: continue if arch != product_arch and arch != "noarch": if arch != "x86_64": continue if arch == "x86_64" and product_arch != "i686": continue skip = False for suffix in suffixes_to_skip: if name.endswith(suffix): skip = True break if skip: continue package = ET.SubElement(collection, "package") package.set("name", name) package.set("arch", arch) package.set("epoch", epoch) package.set("version", version) package.set("release", release) package.set("src", pkg_src_rpm[p_name]) # Add filename element ET.SubElement(package, "filename").text = EPOCH_RE.sub("", pkg.nevra) # Add checksum ET.SubElement( package, "sum", type=pkg.checksum_type ).text = pkg.checksum added_pkg_count += 1 if added_pkg_count > 0: packages.append(collection) collections_added += 1 if collections_added == 0: tree.remove(update) ET.indent(tree) xml_str = ET.tostring( tree, encoding="unicode", method="xml", short_empty_elements=True, ) return Response(content=xml_str, media_type="application/xml")