mirror of
https://github.com/resf/distro-tools.git
synced 2024-12-22 18:58:30 +00:00
196 lines
5.2 KiB
Python
196 lines
5.2 KiB
Python
|
# pylint: disable=invalid-name
|
||
|
"""
|
||
|
This module provides a Python interface to the Red Hat Errata API.
|
||
|
"""
|
||
|
from __future__ import annotations
|
||
|
from enum import Enum
|
||
|
from dataclasses import dataclass
|
||
|
from urllib.parse import quote
|
||
|
|
||
|
import aiohttp
|
||
|
from dataclass_wizard import JSONWizard
|
||
|
from yarl import URL
|
||
|
|
||
|
DEFAULT_URL = "https://access.redhat.com/hydra/rest/search/kcs"
|
||
|
|
||
|
|
||
|
class DocumentKind(str, Enum):
|
||
|
"""
|
||
|
The kind of document.
|
||
|
"""
|
||
|
ERRATA = "Errata"
|
||
|
|
||
|
|
||
|
class Distro(str, Enum):
|
||
|
"""
|
||
|
The distribution.
|
||
|
"""
|
||
|
RHEL = "Red Hat Enterprise Linux"
|
||
|
|
||
|
|
||
|
class Architecture(str, Enum):
|
||
|
"""
|
||
|
The architecture.
|
||
|
"""
|
||
|
X86_64 = "x86_64"
|
||
|
AARCH64 = "aarch64"
|
||
|
PPC64 = "ppc64"
|
||
|
PPC64LE = "ppc64le"
|
||
|
S390X = "s390x"
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class PortalProduct:
|
||
|
"""
|
||
|
Red Hat advisory product
|
||
|
"""
|
||
|
variant: str
|
||
|
name: str
|
||
|
major_version: int
|
||
|
minor_version: int
|
||
|
arch: str
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class Advisory(JSONWizard):
|
||
|
"""
|
||
|
An advisory.
|
||
|
"""
|
||
|
documentKind: str = None
|
||
|
uri: str = None
|
||
|
view_uri: str = None
|
||
|
language: str = None
|
||
|
id: str = None
|
||
|
portal_description: str = None
|
||
|
abstract: str = None
|
||
|
allTitle: str = None
|
||
|
sortTitle: str = None
|
||
|
portal_title: list[str] = None
|
||
|
lastModifiedDate: str = None
|
||
|
displayDate: str = None
|
||
|
portal_advisory_type: str = None
|
||
|
portal_synopsis: str = None
|
||
|
portal_severity: str = None
|
||
|
portal_type: str = None
|
||
|
portal_package: list[str] = None
|
||
|
portal_CVE: list[str] = None
|
||
|
portal_BZ: list[str] = None
|
||
|
portal_publication_date: str = None
|
||
|
portal_requires_subscription: str = None
|
||
|
portal_product_names: list[str] = None
|
||
|
title: str = None
|
||
|
portal_child_ids: list[str] = None
|
||
|
portal_product_filter: list[str] = None
|
||
|
boostProduct: str = None
|
||
|
boostVersion: int = None
|
||
|
detectedProducts: list[str] = None
|
||
|
caseCount: int = None
|
||
|
caseCount_365: int = None
|
||
|
timestamp: str = None
|
||
|
body: list[str] = None
|
||
|
_version_: int = None
|
||
|
|
||
|
__products: list[PortalProduct] = None
|
||
|
|
||
|
def get_products(self) -> list[PortalProduct]:
|
||
|
if self.__products:
|
||
|
return self.__products
|
||
|
|
||
|
self.__products = []
|
||
|
for product in self.portal_product_filter:
|
||
|
try:
|
||
|
if product.startswith("Red Hat Enterprise Linux"):
|
||
|
variant, name, version, arch = product.split("|")
|
||
|
major_version = version
|
||
|
if "." in version:
|
||
|
version_split = version.split(".")
|
||
|
major_version = int(version_split[0])
|
||
|
minor_version = int(version_split[1])
|
||
|
else:
|
||
|
major_version = int(version)
|
||
|
minor_version = None
|
||
|
self.__products.append(
|
||
|
PortalProduct(
|
||
|
variant, name, major_version, minor_version, arch
|
||
|
)
|
||
|
)
|
||
|
except ValueError:
|
||
|
pass
|
||
|
|
||
|
return self.__products
|
||
|
|
||
|
def affects_rhel_version_arch(
|
||
|
self, major_version: int, minor_version: int | None, arch: Architecture
|
||
|
) -> bool:
|
||
|
"""
|
||
|
Returns whether this advisory affects the given RHEL version and architecture.
|
||
|
"""
|
||
|
for product in self.get_products():
|
||
|
if product.variant == "Red Hat Enterprise Linux" and product.major_version == major_version and product.minor_version == minor_version and product.arch == arch.value:
|
||
|
return True
|
||
|
|
||
|
return False
|
||
|
|
||
|
|
||
|
class API:
|
||
|
"""
|
||
|
The Red Hat Errata API.
|
||
|
"""
|
||
|
|
||
|
url = None
|
||
|
|
||
|
def __init__(self, url=DEFAULT_URL):
|
||
|
if not url:
|
||
|
url = DEFAULT_URL
|
||
|
self.url = url
|
||
|
|
||
|
async def search(
|
||
|
self,
|
||
|
kind: DocumentKind = DocumentKind.ERRATA,
|
||
|
sort_asc: bool = False,
|
||
|
rows: int = 10,
|
||
|
query: str = "*:*",
|
||
|
distro: str = "Red%5C+Hat%5C+Enterprise%5C+Linux%7C%2A%7C%2A%7C%2A",
|
||
|
detected_product: str = "rhel",
|
||
|
from_date: str = None
|
||
|
) -> list[Advisory]:
|
||
|
params = ""
|
||
|
|
||
|
# Set query
|
||
|
params += f"q={query}"
|
||
|
|
||
|
# Set rows
|
||
|
params += f"&rows={rows}"
|
||
|
|
||
|
# Set sorting
|
||
|
sorting = "portal_publication_date+asc" if sort_asc else "portal_publication_date+desc"
|
||
|
params += f"&sort={sorting}"
|
||
|
|
||
|
# Set start
|
||
|
params += "&start=0"
|
||
|
|
||
|
# Set distribution
|
||
|
params += f"&fq=portal_product_filter:{distro}"
|
||
|
|
||
|
# Set from-to
|
||
|
if from_date:
|
||
|
params += f"&fq=portal_publication_date%3A%5B{quote(from_date)}%20TO%20NOW%5D"
|
||
|
|
||
|
# Set document kind
|
||
|
params += f"&fq=documentKind:{kind.value}"
|
||
|
|
||
|
# Set detected product
|
||
|
if detected_product:
|
||
|
params += f"&fq=detectedProducts:{detected_product}"
|
||
|
|
||
|
async with aiohttp.ClientSession() as session:
|
||
|
async with session.get(
|
||
|
URL(f"{self.url}?{params}", encoded=True)
|
||
|
) as response:
|
||
|
body = await response.json()
|
||
|
if response.status != 200:
|
||
|
raise Exception((await response.text()))
|
||
|
elif body.get("response", {}).get("numFound", 0) == 0:
|
||
|
return []
|
||
|
return Advisory.from_list(list(body["response"]["docs"]))
|