added new URLs for oval data. (#19)

This commit is contained in:
dferrisctrliq 2023-08-08 05:05:25 -07:00 committed by GitHub
parent 91bdc7ce41
commit 02e062f0d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -42,28 +42,30 @@ async def get_last_indexed_date() -> Optional[str]:
async def fetch_mapped_oval() -> dict[str, ET.ElementTree]: async def fetch_mapped_oval() -> dict[str, ET.ElementTree]:
# Download the oval_url using aiohttp, decompress using bzip and parse # Download the oval_url using aiohttp, decompress using bzip and parse
oval_url = ( oval_urls = (
"https://access.redhat.com/security/data/oval/com.redhat.rhsa-all.xml.bz2" 'https://access.redhat.com/security/data/oval/v2/RHEL8/rhel-8.oval.xml.bz2',
'https://access.redhat.com/security/data/oval/v2/RHEL9/rhel-9.oval.xml.bz2',
) )
async with aiohttp.ClientSession() as session: def_map = {}
async with session.get(oval_url) as response: for url in oval_urls:
if response.status == 200: async with aiohttp.ClientSession() as session:
data = await response.read() async with session.get(url) as response:
tree = ET.fromstring(bz2.decompress(data)) if response.status == 200:
data = await response.read()
tree = ET.fromstring(bz2.decompress(data))
# Index by advisory name # Index by advisory name
def_map = {} definitions = tree.findall("definitions/definition", OVAL_NS)
definitions = tree.findall("definitions/definition", OVAL_NS) for definition in definitions:
for definition in definitions: def_id = definition.attrib["id"]
def_id = definition.attrib["id"] id_split = def_id.split(":")
id_split = def_id.split(":") name = f"{id_split[1].split('.')[2].upper()}-{id_split[3][0:4]}:{id_split[3][4:]}"
name = f"{id_split[1].split('.')[2].upper()}-{id_split[3][0:4]}:{id_split[3][4:]}" def_map[name] = definition
def_map[name] = definition
return def_map else:
else: raise Exception("Failed to fetch OVAL data")
raise Exception("Failed to fetch OVAL data")
return def_map
@activity.defn @activity.defn
async def get_rh_advisories(from_timestamp: str = None) -> None: async def get_rh_advisories(from_timestamp: str = None) -> None: