distro-tools/apollo/rherrata/__init__.py

200 lines
5.4 KiB
Python
Raw Normal View History

2023-02-01 21:37:16 +00:00
# pylint: disable=invalid-name
"""
This module provides a Python interface to the Red Hat Errata API.
"""
from __future__ import annotations
from enum import Enum
from dataclasses import dataclass
from urllib.parse import quote
import aiohttp
from dataclass_wizard import JSONWizard
from yarl import URL
DEFAULT_URL = "https://access.redhat.com/hydra/rest/search/kcs"
class DocumentKind(str, Enum):
"""
The kind of document.
"""
ERRATA = "Errata"
class Distro(str, Enum):
"""
The distribution.
"""
RHEL = "Red Hat Enterprise Linux"
class Architecture(str, Enum):
"""
The architecture.
"""
X86_64 = "x86_64"
AARCH64 = "aarch64"
PPC64 = "ppc64"
PPC64LE = "ppc64le"
S390X = "s390x"
@dataclass
class PortalProduct:
"""
Red Hat advisory product
"""
variant: str
name: str
major_version: int
minor_version: int
arch: str
@dataclass
class Advisory(JSONWizard):
"""
An advisory.
"""
documentKind: str = None
uri: str = None
view_uri: str = None
language: str = None
id: str = None
portal_description: str = None
abstract: str = None
allTitle: str = None
sortTitle: str = None
portal_title: list[str] = None
lastModifiedDate: str = None
displayDate: str = None
portal_advisory_type: str = None
portal_synopsis: str = None
portal_severity: str = None
portal_type: str = None
portal_package: list[str] = None
portal_CVE: list[str] = None
portal_BZ: list[str] = None
portal_publication_date: str = None
portal_requires_subscription: str = None
portal_product_names: list[str] = None
title: str = None
portal_child_ids: list[str] = None
portal_product_filter: list[str] = None
boostProduct: str = None
boostVersion: int = None
detectedProducts: list[str] = None
caseCount: int = None
caseCount_365: int = None
timestamp: str = None
body: list[str] = None
_version_: int = None
__products: list[PortalProduct] = None
def get_products(self) -> list[PortalProduct]:
if self.__products:
return self.__products
self.__products = []
for product in self.portal_product_filter:
try:
if product.startswith("Red Hat Enterprise Linux"):
variant, name, version, arch = product.split("|")
major_version = version
if "." in version:
version_split = version.split(".")
major_version = int(version_split[0])
minor_version = int(version_split[1])
else:
major_version = int(version)
minor_version = None
self.__products.append(
PortalProduct(
variant, name, major_version, minor_version, arch
)
)
except ValueError:
pass
return self.__products
def affects_rhel_version_arch(
self, major_version: int, minor_version: int | None, arch: Architecture
) -> bool:
"""
Returns whether this advisory affects the given RHEL version and architecture.
"""
for product in self.get_products():
is_variant = product.variant == "Red Hat Enterprise Linux"
is_major_version = product.major_version == major_version
is_minor_version = product.minor_version == minor_version
is_arch = product.arch == arch.value
if is_variant and is_major_version and is_minor_version and is_arch:
2023-02-01 21:37:16 +00:00
return True
return False
class API:
"""
The Red Hat Errata API.
"""
url = None
def __init__(self, url=DEFAULT_URL):
if not url:
url = DEFAULT_URL
self.url = url
async def search(
self,
kind: DocumentKind = DocumentKind.ERRATA,
sort_asc: bool = False,
rows: int = 10,
query: str = "*:*",
distro: str = "Red%5C+Hat%5C+Enterprise%5C+Linux%7C%2A%7C%2A%7C%2A",
detected_product: str = "rhel",
from_date: str = None
) -> list[Advisory]:
params = ""
# Set query
params += f"q={query}"
# Set rows
params += f"&rows={rows}"
# Set sorting
sorting = "portal_publication_date+asc" if sort_asc else "portal_publication_date+desc"
params += f"&sort={sorting}"
# Set start
params += "&start=0"
# Set distribution
params += f"&fq=portal_product_filter:{distro}"
# Set from-to
if from_date:
params += f"&fq=portal_publication_date%3A%5B{quote(from_date)}%20TO%20NOW%5D"
# Set document kind
params += f"&fq=documentKind:{kind.value}"
# Set detected product
if detected_product:
params += f"&fq=detectedProducts:{detected_product}"
async with aiohttp.ClientSession() as session:
async with session.get(
URL(f"{self.url}?{params}", encoded=True)
) as response:
body = await response.json()
if response.status != 200:
raise Exception((await response.text()))
elif body.get("response", {}).get("numFound", 0) == 0:
return []
return Advisory.from_list(list(body["response"]["docs"]))