diff --git a/apollo/workflow/poll_mirror_errata.go b/apollo/workflow/poll_mirror_errata.go index 5a6ee61..f0fe044 100644 --- a/apollo/workflow/poll_mirror_errata.go +++ b/apollo/workflow/poll_mirror_errata.go @@ -79,12 +79,24 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode var newLastSync *time.Time + parentBeginTx, err := c.db.Begin() + if err != nil { + return fmt.Errorf("could not begin transaction: %v", err) + } + parentTx := c.db.UseTransaction(parentBeginTx) + rollbackParent := true + defer func() { + if rollbackParent { + _ = parentBeginTx.Rollback() + } + }() + for _, advisory := range advisories { if newLastSync == nil { parsedTime, err := time.Parse(time.RFC3339, advisory.PublicationDate) if err == nil { newLastSync = &parsedTime - _ = c.db.UpdateMirrorStateErrata(shortCode.Code, newLastSync) + _ = parentTx.UpdateMirrorStateErrata(shortCode.Code, newLastSync) } } @@ -124,6 +136,7 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode } if err != sql.ErrNoRows { logrus.Errorf("an unknown error occurred: %v", err) + _ = beginTx.Rollback() return fmt.Errorf("an unknown error occurred") } @@ -148,7 +161,6 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode logrus.Infof("Added %s to %s (%s)", cve, shortCode.Code, advisory.Name) } } else if strings.HasPrefix(advisory.Name, "RHBA") || strings.HasPrefix(advisory.Name, "RHEA") { - doRollback := false _, err := tx.GetAffectedProductByAdvisory(advisory.Name) if err != nil { if err == sql.ErrNoRows { @@ -158,6 +170,7 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode } if err != sql.ErrNoRows { logrus.Errorf("an unknown error occurred: %v", err) + _ = beginTx.Rollback() return fmt.Errorf("an unknown error occurred") } @@ -165,9 +178,8 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode resourceUrl := fmt.Sprintf("https://access.redhat.com/errata/%s", advisory.Name) _, err = tx.CreateCVE(advisory.Name, product.ShortCode, &sourceBy, &resourceUrl, types.NullJSONText{}) if err != nil { - logrus.Errorf("Could not create cve: %v", err) _ = beginTx.Rollback() - continue + return fmt.Errorf("could not create cve: %v", err) } for _, srpm := range advisory.AffectedPackages { @@ -196,35 +208,38 @@ func (c *Controller) processErrataShortCodeProduct(shortCode *apollodb.ShortCode } _, err := tx.CreateAffectedProduct(product.ID, advisory.Name, int(apollopb.AffectedProduct_STATE_FIXED_UPSTREAM), product.CurrentFullVersion, pkg, &advisory.Name) if err != nil { - logrus.Errorf("Could not create affected product for srpm: %v", err) - doRollback = true - break + _ = beginTx.Rollback() + return fmt.Errorf("could not create affected product for srpm: %v", err) } } - if doRollback { - _ = beginTx.Rollback() - continue - } logrus.Infof("Added %s to %s", advisory.Name, shortCode.Code) } else { - logrus.Errorf("Could not get affected product by advisory: %v", err) - continue + _ = beginTx.Rollback() + return fmt.Errorf("Could not get affected product by advisory: %v", err) } } } } else { + _ = beginTx.Rollback() logrus.Errorf("Could not fetch advisory: %v", err) - continue + return err } } err = beginTx.Commit() if err != nil { logrus.Errorf("Could not commit new advisory tx: %v", err) - continue + return err } } + rollbackParent = false + err = parentBeginTx.Commit() + if err != nil { + logrus.Errorf("Could not commit parent tx: %v", err) + return err + } + return nil }