API Compat should support compat pagination

This commit is contained in:
Mustafa Gezen 2023-02-02 16:55:02 +01:00
parent 04eabd2b54
commit e7af833e6f
Signed by untrusted user who does not match committer: mustafa
GPG Key ID: DCDF010D946438C1

View File

@ -3,16 +3,20 @@ This module implements the compatibility API for Apollo V2 advisories
""" """
import datetime import datetime
from typing import TypeVar, Generic, Optional from typing import TypeVar, Generic, Optional, Any, Sequence
from tortoise import connections from tortoise import connections
from fastapi import APIRouter, Depends, Query, Response from fastapi import APIRouter, Depends, Query, Response
from fastapi.exceptions import HTTPException from fastapi.exceptions import HTTPException
from fastapi_pagination.links import Page from fastapi_pagination import pagination_ctx
from fastapi_pagination import Params from fastapi_pagination.bases import BasePage
from fastapi_pagination.default import Page
from fastapi_pagination.types import GreaterEqualOne, GreaterEqualZero
from fastapi_pagination.ext.tortoise import create_page from fastapi_pagination.ext.tortoise import create_page
from pydantic import BaseModel
from rssgen.feed import RssGenerator from rssgen.feed import RssGenerator
from apollo.db import Advisory, RedHatIndexState from apollo.db import Advisory, RedHatIndexState
@ -26,21 +30,51 @@ router = APIRouter(tags=["v2_compat"])
T = TypeVar("T") T = TypeVar("T")
class Pagination(Page[T], Generic[T]): class CompatParams(BaseModel):
page: int = Query(0, ge=0, description="Page number")
limit: int = Query(20, ge=1, le=100, description="Page size")
def get_offset(self) -> int:
print(self.limit * self.page)
return self.limit * self.page
def get_size(self) -> int:
return self.limit
class Pagination(BasePage[T], Generic[T]):
lastUpdated: Optional[str] # noqa # pylint: disable=invalid-name lastUpdated: Optional[str] # noqa # pylint: disable=invalid-name
page: GreaterEqualZero
size: GreaterEqualOne
__params_type__ = CompatParams
@classmethod
def create(
cls,
items: Sequence[T],
params: CompatParams,
*,
total: Optional[int] = None,
**kwargs: Any,
) -> Page[T]:
if not isinstance(params, CompatParams):
raise ValueError("Pagination should be used with CompatParams")
return cls(
total=total,
items=items,
page=params.page,
size=params.limit,
**kwargs,
)
class Config: class Config:
allow_population_by_field_name = True allow_population_by_field_name = True
fields = {"items": {"alias": "advisories"}} fields = {"items": {"alias": "advisories"}}
class CompatParams(Params):
limit: int = Query(50, ge=1, le=100, description="Page size")
def get_size(self) -> int:
return self.limit if self.limit else self.size
def v3_advisory_to_v2( def v3_advisory_to_v2(
advisory: Advisory, advisory: Advisory,
include_rpms=True, include_rpms=True,
@ -93,9 +127,11 @@ def v3_advisory_to_v2(
rpms[name] = [] rpms[name] = []
rpms[name].append(pkg.nevra) rpms[name].append(pkg.nevra)
published_at = advisory.published_at.isoformat("T"
).replace("+00:00", "") + "Z"
return Advisory_Pydantic_V2( return Advisory_Pydantic_V2(
id=advisory.id, id=advisory.id,
publishedAt=advisory.published_at, publishedAt=published_at,
name=advisory.name, name=advisory.name,
synopsis=advisory.synopsis, synopsis=advisory.synopsis,
description=advisory.description, description=advisory.description,
@ -190,7 +226,7 @@ async def fetch_advisories_compat(
a, [ a, [
keyword, keyword,
params.get_size(), params.get_size(),
params.get_size() * (params.page - 1), params.get_offset(),
product, product,
before, before,
after, after,
@ -206,12 +242,19 @@ async def fetch_advisories_compat(
if results[1]: if results[1]:
count = results[1][0]["total"] count = results[1][0]["total"]
return (count, results[1]) advisories = [Advisory(**x) for x in results[1]]
return (
count,
advisories,
)
@router.get( @router.get(
"/", "/",
response_model=Pagination[Advisory_Pydantic_V2], response_model=Pagination[Advisory_Pydantic_V2],
dependencies=[
Depends(pagination_ctx(Pagination[Advisory_Pydantic_V2], CompatParams))
]
) )
async def list_advisories_compat_v2( async def list_advisories_compat_v2(
params: CompatParams = Depends(), params: CompatParams = Depends(),
@ -224,8 +267,6 @@ async def list_advisories_compat_v2(
severity: str = Query(default=None, alias="filters.severity"), severity: str = Query(default=None, alias="filters.severity"),
kind: str = Query(default=None, alias="filters.type"), kind: str = Query(default=None, alias="filters.type"),
): ):
params.page = params.page + 1
state = await RedHatIndexState.first() state = await RedHatIndexState.first()
fetch_adv = await fetch_advisories_compat( fetch_adv = await fetch_advisories_compat(
@ -243,8 +284,7 @@ async def list_advisories_compat_v2(
advisories = [] advisories = []
for adv in fetch_adv[1]: for adv in fetch_adv[1]:
advisory = Advisory(**adv) await adv.fetch_related(
await advisory.fetch_related(
"packages", "packages",
"cves", "cves",
"fixes", "fixes",
@ -253,7 +293,7 @@ async def list_advisories_compat_v2(
"packages__supported_product", "packages__supported_product",
"packages__supported_products_rh_mirror", "packages__supported_products_rh_mirror",
) )
advisories.append(advisory) advisories.append(adv)
v2_advisories: list[Advisory_Pydantic_V2] = [] v2_advisories: list[Advisory_Pydantic_V2] = []
for advisory in advisories: for advisory in advisories:
@ -261,7 +301,8 @@ async def list_advisories_compat_v2(
page = create_page(v2_advisories, count, params) page = create_page(v2_advisories, count, params)
page.lastUpdated = state.last_indexed_at.isoformat("T").replace( page.lastUpdated = state.last_indexed_at.isoformat("T").replace(
"+00:00", "" "+00:00",
"",
) + "Z" ) + "Z"
return page return page
@ -292,6 +333,7 @@ async def list_advisories_compat_v2_rss(
) )
count = fetch_adv[0] count = fetch_adv[0]
advisories = fetch_adv[1] advisories = fetch_adv[1]
advisories.reverse()
ui_url = await get_setting(UI_URL) ui_url = await get_setting(UI_URL)
company_name = await get_setting(COMPANY_NAME) company_name = await get_setting(COMPANY_NAME)
@ -308,11 +350,10 @@ async def list_advisories_compat_v2_rss(
fg.managingEditor(f"{managing_editor} ({company_name})") fg.managingEditor(f"{managing_editor} ({company_name})")
if count != 0: if count != 0:
fg.pubDate(advisories[0]["published_at"]) fg.pubDate(advisories[0].published_at)
fg.lastBuildDate(advisories[0]["published_at"]) fg.lastBuildDate(advisories[0].published_at)
for adv in advisories: for advisory in advisories:
advisory = Advisory(**adv)
fe = fg.add_entry() fe = fg.add_entry()
fe.title(f"{advisory.name}: {advisory.synopsis}") fe.title(f"{advisory.name}: {advisory.synopsis}")
fe.link(href=f"{ui_url}/{advisory.name}", rel="alternate") fe.link(href=f"{ui_url}/{advisory.name}", rel="alternate")