distro-tools/apollo/server/routes/api_updateinfo.py

327 lines
12 KiB
Python
Raw Permalink Normal View History

2023-02-02 23:27:32 +00:00
import datetime
from typing import Optional
from xml.etree import ElementTree as ET
from fastapi import APIRouter, Response
from slugify import slugify
from apollo.db import AdvisoryAffectedProduct
from apollo.server.settings import COMPANY_NAME, MANAGING_EDITOR, UI_URL, get_setting
from apollo.rpmworker.repomd import NEVRA_RE, NVRA_RE, EPOCH_RE
from common.fastapi import RenderErrorTemplateException
router = APIRouter(tags=["updateinfo"])
@router.get("/{product_name}/{repo}/updateinfo.xml")
async def get_updateinfo(
product_name: str,
repo: str,
req_arch: Optional[str] = None,
):
filters = {
"name": product_name,
"advisory__packages__repo_name": repo,
}
if req_arch:
filters["arch"] = req_arch
affected_products = await AdvisoryAffectedProduct.filter(
**filters
).prefetch_related(
"advisory",
"advisory__cves",
"advisory__fixes",
"advisory__packages",
"supported_product",
).all()
if not affected_products:
raise RenderErrorTemplateException("No advisories found", 404)
ui_url = await get_setting(UI_URL)
managing_editor = await get_setting(MANAGING_EDITOR)
company_name = await get_setting(COMPANY_NAME)
advisories = {}
for affected_product in affected_products:
advisory = affected_product.advisory
if advisory.name not in advisories:
advisories[advisory.name] = {
"advisory":
advisory,
"arch":
affected_product.arch,
"major_version":
affected_product.major_version,
"minor_version":
affected_product.minor_version,
"supported_product_name":
affected_product.supported_product.name,
}
tree = ET.Element("updates")
for _, adv in advisories.items():
advisory = adv["advisory"]
product_arch = adv["arch"]
major_version = adv["major_version"]
minor_version = adv["minor_version"]
supported_product_name = adv["supported_product_name"]
update = ET.SubElement(tree, "update")
# Set update attributes
update.set("from", managing_editor)
update.set("status", "final")
if advisory.kind == "Security":
update.set("type", "security")
elif advisory.kind == "Bug Fix":
update.set("type", "bugfix")
elif advisory.kind == "Enhancement":
update.set("type", "enhancement")
update.set("version", "2")
# Add id
ET.SubElement(update, "id").text = advisory.name
# Add title
ET.SubElement(update, "title").text = advisory.synopsis
# Add description
ET.SubElement(update, "description").text = advisory.description
# Add time
time_format = "%Y-%m-%d %H:%M:%S"
issued = ET.SubElement(update, "issued")
issued.set("date", advisory.published_at.strftime(time_format))
updated = ET.SubElement(update, "updated")
updated.set("date", advisory.updated_at.strftime(time_format))
2023-02-02 23:27:32 +00:00
# Add rights
now = datetime.datetime.utcnow()
ET.SubElement(
update, "rights"
).text = f"Copyright {now.year} {company_name}"
# Add release name
release_name = f"{supported_product_name} {major_version}"
if minor_version:
release_name += f".{minor_version}"
ET.SubElement(update, "release").text = release_name
# Add pushcount
ET.SubElement(update, "pushcount").text = "1"
# Add severity
ET.SubElement(update, "severity").text = advisory.severity
# Add summary
ET.SubElement(update, "summary").text = advisory.topic
# Add description
ET.SubElement(update, "description").text = advisory.description
# Add solution
ET.SubElement(update, "solution").text = ""
# Add references
references = ET.SubElement(update, "references")
for cve in advisory.cves:
reference = ET.SubElement(references, "reference")
reference.set(
"href",
f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve.cve}",
)
reference.set("id", cve.cve)
reference.set("type", "cve")
reference.set("title", cve.cve)
for fix in advisory.fixes:
reference = ET.SubElement(references, "reference")
reference.set("href", fix.source)
reference.set("id", fix.ticket_id)
reference.set("type", "bugzilla")
reference.set("title", fix.description)
# Add UI self reference
reference = ET.SubElement(references, "reference")
reference.set("href", f"{ui_url}/{advisory.name}")
reference.set("id", advisory.name)
reference.set("type", "self")
reference.set("title", advisory.name)
# Add packages
packages = ET.SubElement(update, "pkglist")
suffixes_to_skip = [
"-debuginfo",
"-debugsource",
"-debuginfo-common",
"-debugsource-common",
]
2023-02-02 23:27:32 +00:00
pkg_name_map = {}
for pkg in advisory.packages:
name = pkg.package_name
if pkg.module_name:
name = f"{pkg.module_name}:{pkg.package_name}:{pkg.module_stream}"
if name not in pkg_name_map:
pkg_name_map[name] = []
2023-02-02 23:27:32 +00:00
pkg_name_map[name].append(pkg)
2023-02-02 23:27:32 +00:00
pkg_src_rpm = {}
for top_pkg in advisory.packages:
2023-02-04 01:15:01 +00:00
name = top_pkg.package_name
if top_pkg.module_name:
name = f"{top_pkg.module_name}:{top_pkg.package_name}:{top_pkg.module_stream}"
if name not in pkg_src_rpm:
for pkg in pkg_name_map[name]:
2023-02-02 23:27:32 +00:00
nvra_no_epoch = EPOCH_RE.sub("", pkg.nevra)
nvra = NVRA_RE.search(nvra_no_epoch)
if nvra:
2023-02-04 01:15:01 +00:00
nvr_name = nvra.group(1)
nvr_arch = nvra.group(4)
if pkg.package_name == nvr_name and nvr_arch == "src":
2023-02-02 23:27:32 +00:00
src_rpm = nvra_no_epoch
if not src_rpm.endswith(".rpm"):
src_rpm += ".rpm"
2023-02-04 01:15:01 +00:00
pkg_src_rpm[name] = src_rpm
2023-02-02 23:27:32 +00:00
# Collection list, may be more than one if module RPMs are involved
collections = {}
no_default_collection = False
default_collection_short = slugify(f"{product_name}-{repo}-rpms")
2023-02-02 23:27:32 +00:00
# Check if this is an actual module advisory, if so we need to split the
# collections, and module RPMs need to go into their own collection based on
# module name, while non-module RPMs go into the main collection (if any)
2023-02-02 23:27:32 +00:00
for pkg in advisory.packages:
if pkg.product_name != product_name:
continue
2023-02-03 00:57:57 +00:00
if pkg.repo_name != repo:
2023-02-03 00:52:48 +00:00
continue
2023-02-02 23:27:32 +00:00
if pkg.module_name:
collection_short = f"{default_collection_short}__{pkg.module_name}"
if collection_short not in collections:
collections[collection_short] = {
"packages": [],
"module_context": pkg.module_context,
"module_name": pkg.module_name,
"module_stream": pkg.module_stream,
"module_version": pkg.module_version,
}
no_default_collection = True
collections[collection_short]["packages"].append(pkg)
else:
if no_default_collection:
continue
if default_collection_short not in collections:
collections[default_collection_short] = {
"packages": [],
}
collections[default_collection_short]["packages"].append(pkg)
if no_default_collection and default_collection_short in collections:
del collections[default_collection_short]
2023-02-04 01:15:01 +00:00
collections_added = 0
for collection_short, info in collections.items():
# Create collection
collection = ET.Element("collection")
collection.set("short", collection_short)
# Set short to name as well
ET.SubElement(collection, "name").text = collection_short
if "module_name" in info:
module_element = ET.SubElement(collection, "module")
module_element.set("name", info["module_name"])
module_element.set("stream", info["module_stream"])
module_element.set("version", info["module_version"])
module_element.set("context", info["module_context"])
module_element.set("arch", product_arch)
added_pkg_count = 0
for pkg in info["packages"]:
if pkg.nevra.endswith(".src.rpm"):
continue
name = pkg.package_name
epoch = "0"
if NEVRA_RE.match(pkg.nevra):
nevra = NEVRA_RE.search(pkg.nevra)
name = nevra.group(1)
epoch = nevra.group(2)
version = nevra.group(3)
release = nevra.group(4)
arch = nevra.group(5)
elif NVRA_RE.match(pkg.nevra):
nvra = NVRA_RE.search(pkg.nevra)
name = nvra.group(1)
version = nvra.group(2)
release = nvra.group(3)
arch = nvra.group(4)
else:
continue
p_name = pkg.package_name
if pkg.module_name:
p_name = f"{pkg.module_name}:{pkg.package_name}:{pkg.module_stream}"
if p_name not in pkg_src_rpm:
continue
if arch != product_arch and arch != "noarch":
if arch != "x86_64":
continue
if arch == "x86_64" and product_arch != "i686":
continue
skip = False
for suffix in suffixes_to_skip:
if name.endswith(suffix):
skip = True
break
if skip:
continue
package = ET.SubElement(collection, "package")
package.set("name", name)
package.set("arch", arch)
package.set("epoch", epoch)
package.set("version", version)
package.set("release", release)
package.set("src", pkg_src_rpm[p_name])
# Add filename element
ET.SubElement(package,
"filename").text = EPOCH_RE.sub("", pkg.nevra)
# Add checksum
ET.SubElement(
package, "sum", type=pkg.checksum_type
).text = pkg.checksum
added_pkg_count += 1
if added_pkg_count > 0:
packages.append(collection)
2023-02-04 01:15:01 +00:00
collections_added += 1
if collections_added == 0:
tree.remove(update)
2023-02-02 23:27:32 +00:00
ET.indent(tree)
xml_str = ET.tostring(
tree,
encoding="unicode",
method="xml",
short_empty_elements=True,
)
2023-02-02 23:27:32 +00:00
return Response(content=xml_str, media_type="application/xml")