This commit is contained in:
Mustafa Gezen 2023-03-06 05:49:20 +01:00
parent 5b8935e387
commit 76dc39fc6b

View File

@ -44,14 +44,14 @@ class OSVEvent(BaseModel):
class OSVRangeDatabaseSpecific(BaseModel): class OSVRangeDatabaseSpecific(BaseModel):
pass yum_repository: str
class OSVRange(BaseModel): class OSVRange(BaseModel):
type: str type: str
repo: str repo: Optional[str]
events: list[OSVEvent] events: list[OSVEvent]
database_specific: OSVRangeDatabaseSpecific database_specific: Optional[OSVRangeDatabaseSpecific]
class OSVEcosystemSpecific(BaseModel): class OSVEcosystemSpecific(BaseModel):
@ -65,9 +65,9 @@ class OSVAffectedDatabaseSpecific(BaseModel):
class OSVAffected(BaseModel): class OSVAffected(BaseModel):
package: OSVPackage package: OSVPackage
ranges: list[OSVRange] ranges: list[OSVRange]
versions: list[str] versions: Optional[list[str]]
ecosystem_specific: OSVEcosystemSpecific ecosystem_specific: Optional[OSVEcosystemSpecific]
database_specific: OSVAffectedDatabaseSpecific database_specific: Optional[OSVAffectedDatabaseSpecific]
class OSVReference(BaseModel): class OSVReference(BaseModel):
@ -91,10 +91,10 @@ class OSVAdvisory(BaseModel):
published: str published: str
withdrawn: Optional[str] withdrawn: Optional[str]
aliases: list[str] aliases: list[str]
related: list[str] related: Optional[list[str]]
summary: str summary: str
details: str details: str
severity: list[OSVSeverity] severity: Optional[list[OSVSeverity]]
affected: list[OSVAffected] affected: list[OSVAffected]
references: list[OSVReference] references: list[OSVReference]
credits: list[OSVCredit] credits: list[OSVCredit]
@ -128,7 +128,7 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory:
pkg_name_map[product_name][arch][name].append((pkg, nevra)) pkg_name_map[product_name][arch][name].append((pkg, nevra))
for product_name, arches in pkg_name_map.items(): for product_name, arches in pkg_name_map.items():
for arch, affected_arches in arches.items(): for _, affected_arches in arches.items():
if not affected_arches: if not affected_arches:
continue continue
@ -136,6 +136,9 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory:
for pkg in affected_packages: for pkg in affected_packages:
x = pkg[0] x = pkg[0]
nevra = pkg[1] nevra = pkg[1]
# Only process "src" packages
if nevra.group(5) != "src":
continue
ver_rel = f"{nevra.group(3)}-{nevra.group(4)}" ver_rel = f"{nevra.group(3)}-{nevra.group(4)}"
slugified = slugify(x.supported_product.variant) slugified = slugify(x.supported_product.variant)
@ -148,7 +151,7 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory:
) )
epoch = nevra.group(2) epoch = nevra.group(2)
purl = f"pkg:rpm/{slugified}/{pkg_name}@{ver_rel}?arch={arch}&distro={slugified_distro}&epoch={epoch}" purl = f"pkg:rpm/{slugified}/{pkg_name}@{ver_rel}?distro={slugified_distro}&epoch={epoch}"
affected = OSVAffected( affected = OSVAffected(
package=OSVPackage( package=OSVPackage(
@ -159,17 +162,18 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory:
ranges=[ ranges=[
OSVRange( OSVRange(
type="ECOSYSTEM", type="ECOSYSTEM",
repo=x.repo_name,
events=[ events=[
OSVEvent(introduced="0"), OSVEvent(introduced="0"),
OSVEvent(fixed=ver_rel), OSVEvent(fixed=ver_rel),
], ],
database_specific=OSVRangeDatabaseSpecific(), database_specific=OSVRangeDatabaseSpecific(
yum_repository=x.repo_name,
),
) )
], ],
versions=[], versions=None,
ecosystem_specific=OSVEcosystemSpecific(), ecosystem_specific=None,
database_specific=OSVAffectedDatabaseSpecific(), database_specific=None,
) )
affected_pkgs.append(affected) affected_pkgs.append(affected)
@ -184,19 +188,32 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory:
if advisory.red_hat_advisory: if advisory.red_hat_advisory:
osv_credits.append(OSVCredit(name="Red Hat")) osv_credits.append(OSVCredit(name="Red Hat"))
# Calculate severity by finding the highest CVSS score
highest_cvss_base_score = 0.0
final_score_vector = None
for x in advisory.cves:
# Convert cvss3_scoring_vector to a float
base_score = x.cvss3_base_score
if base_score and base_score != "UNKNOWN":
base_score = float(base_score)
if base_score > highest_cvss_base_score:
highest_cvss_base_score = base_score
final_score_vector = x.cvss3_scoring_vector
severity = None
if final_score_vector:
severity = [OSVSeverity(type="CVSS_V3", score=final_score_vector)]
return OSVAdvisory( return OSVAdvisory(
id=advisory.name, id=advisory.name,
modified=to_rfc3339_date(advisory.updated_at), modified=to_rfc3339_date(advisory.updated_at),
published=to_rfc3339_date(advisory.published_at), published=to_rfc3339_date(advisory.published_at),
withdrawn=None, withdrawn=None,
aliases=[x.cve for x in advisory.cves], aliases=[x.cve for x in advisory.cves],
related=[], related=None,
summary=advisory.synopsis, summary=advisory.synopsis,
details=advisory.description, details=advisory.description,
severity=[ severity=severity,
OSVSeverity(type="CVSS_V3", score=x.cvss3_scoring_vector)
for x in advisory.cves
],
affected=affected_pkgs, affected=affected_pkgs,
references=references, references=references,
credits=osv_credits, credits=osv_credits,
@ -204,7 +221,7 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory:
) )
@router.get("/", response_model=Pagination[OSVAdvisory]) @router.get("/", response_model=Pagination[OSVAdvisory], response_model_exclude_none=True)
async def get_advisories_osv( async def get_advisories_osv(
params: Params = Depends(), params: Params = Depends(),
product: Optional[str] = None, product: Optional[str] = None,
@ -214,7 +231,6 @@ async def get_advisories_osv(
synopsis: Optional[str] = None, synopsis: Optional[str] = None,
keyword: Optional[str] = None, keyword: Optional[str] = None,
severity: Optional[str] = None, severity: Optional[str] = None,
kind: Optional[str] = None,
): ):
fetch_adv = await fetch_advisories( fetch_adv = await fetch_advisories(
params.get_size(), params.get_size(),
@ -226,7 +242,7 @@ async def get_advisories_osv(
cve, cve,
synopsis, synopsis,
severity, severity,
kind, kind="Security",
fetch_related=True, fetch_related=True,
) )
count = fetch_adv[0] count = fetch_adv[0]
@ -237,9 +253,9 @@ async def get_advisories_osv(
return create_page(osv_advisories, count, params) return create_page(osv_advisories, count, params)
@router.get("/{advisory_id}", response_model=OSVAdvisory) @router.get("/{advisory_id}", response_model=OSVAdvisory, response_model_exclude_none=True)
async def get_advisory_osv(advisory_id: str): async def get_advisory_osv(advisory_id: str):
advisory = await Advisory.filter(name=advisory_id).prefetch_related( advisory = await Advisory.filter(name=advisory_id, type="Security").prefetch_related(
"packages", "packages",
"cves", "cves",
"fixes", "fixes",