Fix Apollo skipping errata if failure happens after updating last sync

When polling RH Errata, Apollo sets the last sync date to the first advisory we process (advisories are synced in descending order). If a failure happens while processing newer advisories, the sync is already set and the system skips over advisories it has NOT yet processed.

This PR sets the mirror state in a parent transaction, then commits that transaction if no failures occur. Skipping advisories when an error occurs is also reworked, where the first failure fails the workflow. The workflow will retry on its normal schedule.
This commit is contained in:
Mustafa Gezen 2022-11-06 04:39:28 +01:00
parent 6e752a3704
commit e23a83e859
Signed by untrusted user who does not match committer: mustafa
GPG Key ID: DCDF010D946438C1

View File

@ -79,12 +79,24 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode
var newLastSync *time.Time var newLastSync *time.Time
parentBeginTx, err := c.db.Begin()
if err != nil {
return fmt.Errorf("could not begin transaction: %v", err)
}
parentTx := c.db.UseTransaction(parentBeginTx)
rollbackParent := true
defer func() {
if rollbackParent {
_ = parentBeginTx.Rollback()
}
}()
for _, advisory := range advisories { for _, advisory := range advisories {
if newLastSync == nil { if newLastSync == nil {
parsedTime, err := time.Parse(time.RFC3339, advisory.PublicationDate) parsedTime, err := time.Parse(time.RFC3339, advisory.PublicationDate)
if err == nil { if err == nil {
newLastSync = &parsedTime newLastSync = &parsedTime
_ = c.db.UpdateMirrorStateErrata(shortCode.Code, newLastSync) _ = parentTx.UpdateMirrorStateErrata(shortCode.Code, newLastSync)
} }
} }
@ -124,6 +136,7 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode
} }
if err != sql.ErrNoRows { if err != sql.ErrNoRows {
logrus.Errorf("an unknown error occurred: %v", err) logrus.Errorf("an unknown error occurred: %v", err)
_ = beginTx.Rollback()
return fmt.Errorf("an unknown error occurred") return fmt.Errorf("an unknown error occurred")
} }
@ -148,7 +161,6 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode
logrus.Infof("Added %s to %s (%s)", cve, shortCode.Code, advisory.Name) logrus.Infof("Added %s to %s (%s)", cve, shortCode.Code, advisory.Name)
} }
} else if strings.HasPrefix(advisory.Name, "RHBA") || strings.HasPrefix(advisory.Name, "RHEA") { } else if strings.HasPrefix(advisory.Name, "RHBA") || strings.HasPrefix(advisory.Name, "RHEA") {
doRollback := false
_, err := tx.GetAffectedProductByAdvisory(advisory.Name) _, err := tx.GetAffectedProductByAdvisory(advisory.Name)
if err != nil { if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -158,6 +170,7 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode
} }
if err != sql.ErrNoRows { if err != sql.ErrNoRows {
logrus.Errorf("an unknown error occurred: %v", err) logrus.Errorf("an unknown error occurred: %v", err)
_ = beginTx.Rollback()
return fmt.Errorf("an unknown error occurred") return fmt.Errorf("an unknown error occurred")
} }
@ -165,9 +178,8 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode
resourceUrl := fmt.Sprintf("https://access.redhat.com/errata/%s", advisory.Name) resourceUrl := fmt.Sprintf("https://access.redhat.com/errata/%s", advisory.Name)
_, err = tx.CreateCVE(advisory.Name, product.ShortCode, &sourceBy, &resourceUrl, types.NullJSONText{}) _, err = tx.CreateCVE(advisory.Name, product.ShortCode, &sourceBy, &resourceUrl, types.NullJSONText{})
if err != nil { if err != nil {
logrus.Errorf("Could not create cve: %v", err)
_ = beginTx.Rollback() _ = beginTx.Rollback()
continue return fmt.Errorf("could not create cve: %v", err)
} }
for _, srpm := range advisory.AffectedPackages { for _, srpm := range advisory.AffectedPackages {
@ -196,35 +208,38 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode
} }
_, err := tx.CreateAffectedProduct(product.ID, advisory.Name, int(apollopb.AffectedProduct_STATE_FIXED_UPSTREAM), product.CurrentFullVersion, pkg, &advisory.Name) _, err := tx.CreateAffectedProduct(product.ID, advisory.Name, int(apollopb.AffectedProduct_STATE_FIXED_UPSTREAM), product.CurrentFullVersion, pkg, &advisory.Name)
if err != nil { if err != nil {
logrus.Errorf("Could not create affected product for srpm: %v", err)
doRollback = true
break
}
}
if doRollback {
_ = beginTx.Rollback() _ = beginTx.Rollback()
continue return fmt.Errorf("could not create affected product for srpm: %v", err)
}
} }
logrus.Infof("Added %s to %s", advisory.Name, shortCode.Code) logrus.Infof("Added %s to %s", advisory.Name, shortCode.Code)
} else { } else {
logrus.Errorf("Could not get affected product by advisory: %v", err) _ = beginTx.Rollback()
continue return fmt.Errorf("Could not get affected product by advisory: %v", err)
} }
} }
} }
} else { } else {
_ = beginTx.Rollback()
logrus.Errorf("Could not fetch advisory: %v", err) logrus.Errorf("Could not fetch advisory: %v", err)
continue return err
} }
} }
err = beginTx.Commit() err = beginTx.Commit()
if err != nil { if err != nil {
logrus.Errorf("Could not commit new advisory tx: %v", err) logrus.Errorf("Could not commit new advisory tx: %v", err)
continue return err
} }
} }
rollbackParent = false
err = parentBeginTx.Commit()
if err != nil {
logrus.Errorf("Could not commit parent tx: %v", err)
return err
}
return nil return nil
} }