From e23a83e859f32cf78e97b1b7b1239675603a4aec Mon Sep 17 00:00:00 2001 From: Mustafa Gezen Date: Sun, 6 Nov 2022 04:39:28 +0100 Subject: [PATCH] Fix Apollo skipping errata if failure happens after updating last sync When polling RH Errata, Apollo sets the last sync date to the first advisory we process (advisories are synced in descending order). If a failure happens while processing newer advisories, the sync is already set and the system skips over advisories it has NOT yet processed. This PR sets the mirror state in a parent transaction, then commits that transaction if no failures occur. Skipping advisories when an error occurs is also reworked, where the first failure fails the workflow. The workflow will retry on its normal schedule. --- apollo/workflow/poll_mirror_errata.go | 45 ++++++++++++++++++--------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/apollo/workflow/poll_mirror_errata.go b/apollo/workflow/poll_mirror_errata.go index 5a6ee61..f0fe044 100644 --- a/apollo/workflow/poll_mirror_errata.go +++ b/apollo/workflow/poll_mirror_errata.go @@ -79,12 +79,24 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode var newLastSync *time.Time + parentBeginTx, err := c.db.Begin() + if err != nil { + return fmt.Errorf("could not begin transaction: %v", err) + } + parentTx := c.db.UseTransaction(parentBeginTx) + rollbackParent := true + defer func() { + if rollbackParent { + _ = parentBeginTx.Rollback() + } + }() + for _, advisory := range advisories { if newLastSync == nil { parsedTime, err := time.Parse(time.RFC3339, advisory.PublicationDate) if err == nil { newLastSync = &parsedTime - _ = c.db.UpdateMirrorStateErrata(shortCode.Code, newLastSync) + _ = parentTx.UpdateMirrorStateErrata(shortCode.Code, newLastSync) } } @@ -124,6 +136,7 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode } if err != sql.ErrNoRows { logrus.Errorf("an unknown error occurred: %v", err) + _ = beginTx.Rollback() return fmt.Errorf("an unknown error occurred") } @@ -148,7 +161,6 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode logrus.Infof("Added %s to %s (%s)", cve, shortCode.Code, advisory.Name) } } else if strings.HasPrefix(advisory.Name, "RHBA") || strings.HasPrefix(advisory.Name, "RHEA") { - doRollback := false _, err := tx.GetAffectedProductByAdvisory(advisory.Name) if err != nil { if err == sql.ErrNoRows { @@ -158,6 +170,7 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode } if err != sql.ErrNoRows { logrus.Errorf("an unknown error occurred: %v", err) + _ = beginTx.Rollback() return fmt.Errorf("an unknown error occurred") } @@ -165,9 +178,8 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode resourceUrl := fmt.Sprintf("https://access.redhat.com/errata/%s", advisory.Name) _, err = tx.CreateCVE(advisory.Name, product.ShortCode, &sourceBy, &resourceUrl, types.NullJSONText{}) if err != nil { - logrus.Errorf("Could not create cve: %v", err) _ = beginTx.Rollback() - continue + return fmt.Errorf("could not create cve: %v", err) } for _, srpm := range advisory.AffectedPackages { @@ -196,35 +208,38 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode } _, err := tx.CreateAffectedProduct(product.ID, advisory.Name, int(apollopb.AffectedProduct_STATE_FIXED_UPSTREAM), product.CurrentFullVersion, pkg, &advisory.Name) if err != nil { - logrus.Errorf("Could not create affected product for srpm: %v", err) - doRollback = true - break + _ = beginTx.Rollback() + return fmt.Errorf("could not create affected product for srpm: %v", err) } } - if doRollback { - _ = beginTx.Rollback() - continue - } logrus.Infof("Added %s to %s", advisory.Name, shortCode.Code) } else { - logrus.Errorf("Could not get affected product by advisory: %v", err) - continue + _ = beginTx.Rollback() + return fmt.Errorf("Could not get affected product by advisory: %v", err) } } } } else { + _ = beginTx.Rollback() logrus.Errorf("Could not fetch advisory: %v", err) - continue + return err } } err = beginTx.Commit() if err != nil { logrus.Errorf("Could not commit new advisory tx: %v", err) - continue + return err } } + rollbackParent = false + err = parentBeginTx.Commit() + if err != nil { + logrus.Errorf("Could not commit parent tx: %v", err) + return err + } + return nil }