forked from mirrors/forgejo
fix: make package cleanup work again (#12446)
- Regression of forgejo/forgejo!11776 (and forgejo/forgejo!11881) - Scope of the transaction is moved to a per-package cleanup rule basis. This is also a enhancement for scaling (already deployed on Codeberg for a while). - Package cleanup is now run with `RetryTx`, because rebuilding repository files runs `RetryTx` and it could indicate to retry the whole transaction. - Previously it would error and say running `RetryTx` in a transaction was not possible, this is now possible. Nested `RetryTx` is always allowed, matching of which errors to retry is still the responsible of the inner `RetryTx`. Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/12446 Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org>
This commit is contained in:
parent
69cf1f3333
commit
bf958fa355
4 changed files with 171 additions and 75 deletions
|
|
@ -510,31 +510,57 @@ type RetryConfig struct {
|
||||||
AttemptCount int
|
AttemptCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrNestedRetryTxFailure = errors.New("(nested)")
|
||||||
|
|
||||||
|
type nestedRetryTxState int
|
||||||
|
|
||||||
|
var nestedRetryTx nestedRetryTxState
|
||||||
|
|
||||||
// Execute the given function in a transaction. RetryConfig will retry the function on an error, if it matches the
|
// Execute the given function in a transaction. RetryConfig will retry the function on an error, if it matches the
|
||||||
// ErrorIs parameter, up to the total of AttemptCount number of tries. RetryTx cannot be invoked when already within a
|
// ErrorIs parameter, up to the total of AttemptCount number of tries. RetryTx cannot be invoked when already within a
|
||||||
// transaction and will return an error immediately.
|
// transaction and will return an error immediately.
|
||||||
|
//
|
||||||
|
// ErrNestedRetryTxFailure is an error type that will occur when RetryTx is nested within each other, and indicates that
|
||||||
|
// an inner RetryTx encountered an error that matched its error list.
|
||||||
func RetryTx(ctx context.Context, config RetryConfig, f func(ctx context.Context) error) error {
|
func RetryTx(ctx context.Context, config RetryConfig, f func(ctx context.Context) error) error {
|
||||||
if InTransaction(ctx) {
|
matchError := func(err error) bool {
|
||||||
|
for _, possibleError := range config.ErrorIs {
|
||||||
|
if errors.Is(err, possibleError) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept `ErrNestedRetryTxFailure` as error to retry on, means that a nested
|
||||||
|
// RetryTx indicated to retry the whole transaction.
|
||||||
|
config.ErrorIs = append(config.ErrorIs, ErrNestedRetryTxFailure)
|
||||||
|
|
||||||
|
withinRetryTx, present := ctx.Value(nestedRetryTx).(bool)
|
||||||
|
if present && withinRetryTx {
|
||||||
|
// If a caller already started `RetryTx`, then we assume we don't have to actually perform retries here -- we
|
||||||
|
// can attempt the requested function once, and if an error is returned that matches the configured error list,
|
||||||
|
// we'll return that error + ErrNestedRetryTxFailure wrapping.
|
||||||
|
err := f(ctx)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
} else if matchError(err) {
|
||||||
|
return fmt.Errorf("nested RetryTx; internal Tx failed with error that won't be retried: %w %w", err, ErrNestedRetryTxFailure)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
} else if InTransaction(ctx) {
|
||||||
return errors.New("unsupported operation: attempted to use RetryTx while already within a transaction")
|
return errors.New("unsupported operation: attempted to use RetryTx while already within a transaction")
|
||||||
} else if config.AttemptCount == 0 {
|
} else if config.AttemptCount == 0 {
|
||||||
return errors.New("unsupported operation: attempted to use RetryTx with 0 attempts")
|
return errors.New("unsupported operation: attempted to use RetryTx with 0 attempts")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
innerCtx := context.WithValue(ctx, nestedRetryTx, true)
|
||||||
var lastError error
|
var lastError error
|
||||||
for range config.AttemptCount {
|
for range config.AttemptCount {
|
||||||
err := WithTx(ctx, f)
|
err := WithTx(innerCtx, f)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
} else if !matchError(err) {
|
||||||
|
|
||||||
foundMatch := false
|
|
||||||
for _, possibleError := range config.ErrorIs {
|
|
||||||
if errors.Is(err, possibleError) {
|
|
||||||
foundMatch = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !foundMatch {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -275,4 +275,48 @@ func TestRetryTx(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, 2, attemptCount)
|
assert.Equal(t, 2, attemptCount)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("nested", func(t *testing.T) {
|
||||||
|
attemptCount := 0
|
||||||
|
testError := errors.New("hello")
|
||||||
|
err := db.RetryTx(t.Context(), db.RetryConfig{
|
||||||
|
AttemptCount: 2,
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
|
attemptCount++
|
||||||
|
return db.RetryTx(ctx, db.RetryConfig{
|
||||||
|
AttemptCount: 2,
|
||||||
|
ErrorIs: []error{testError},
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
|
if attemptCount == 2 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return testError
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 2, attemptCount)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("inner RetryTx decides on error", func(t *testing.T) {
|
||||||
|
attemptCount := 0
|
||||||
|
testError := errors.New("hello")
|
||||||
|
err := db.RetryTx(t.Context(), db.RetryConfig{
|
||||||
|
AttemptCount: 2,
|
||||||
|
ErrorIs: []error{},
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
|
attemptCount++
|
||||||
|
return db.RetryTx(ctx, db.RetryConfig{
|
||||||
|
AttemptCount: 2,
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
|
if attemptCount == 2 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return testError
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
require.ErrorIs(t, err, testError)
|
||||||
|
assert.Equal(t, 1, attemptCount)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,78 +33,68 @@ func CleanupTask(ctx context.Context, olderThan time.Duration) error {
|
||||||
return CleanupExpiredData(ctx, olderThan)
|
return CleanupExpiredData(ctx, olderThan)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExecuteCleanupRules(outerCtx context.Context) error {
|
func ExecuteCleanupRules(ctx context.Context) error {
|
||||||
ctx, committer, err := db.TxContext(outerCtx)
|
return packages_model.IterateEnabledCleanupRules(ctx, func(ctx context.Context, pcr *packages_model.PackageCleanupRule) error {
|
||||||
if err != nil {
|
// We have no errors to retry on, because we have no evidence we need any in
|
||||||
return err
|
// this area. What we do retry on is when a nested `db.RetryTx` indicates to
|
||||||
}
|
// retry the whole transaction.
|
||||||
defer committer.Close()
|
return db.RetryTx(ctx, db.RetryConfig{
|
||||||
|
AttemptCount: 3,
|
||||||
err = packages_model.IterateEnabledCleanupRules(ctx, func(ctx context.Context, pcr *packages_model.PackageCleanupRule) error {
|
}, func(ctx context.Context) error {
|
||||||
select {
|
versionsToRemove, err := GetCleanupTargets(ctx, pcr, true)
|
||||||
case <-outerCtx.Done():
|
if err != nil {
|
||||||
return db.ErrCancelledf("While processing package cleanup rules")
|
return fmt.Errorf("CleanupRule [%d]: GetCleanupTargets failed: %w", pcr.ID, err)
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
versionsToRemove, err := GetCleanupTargets(ctx, pcr, true)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("CleanupRule [%d]: GetCleanupTargets failed: %w", pcr.ID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
anyVersionDeleted := false
|
|
||||||
packageWithVersionDeleted := make(map[int64]bool) // set of Package.ID's where at least one package version was removed
|
|
||||||
for _, ct := range versionsToRemove {
|
|
||||||
if err := packages_service.DeletePackageVersionAndReferences(ctx, ct.PackageVersion); err != nil {
|
|
||||||
return fmt.Errorf("CleanupRule [%d]: DeletePackageVersionAndReferences failed: %w", pcr.ID, err)
|
|
||||||
}
|
}
|
||||||
packageWithVersionDeleted[ct.Package.ID] = true
|
|
||||||
anyVersionDeleted = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if pcr.Type == packages_model.TypeCargo {
|
anyVersionDeleted := false
|
||||||
for packageID := range packageWithVersionDeleted {
|
packageWithVersionDeleted := make(map[int64]bool) // set of Package.ID's where at least one package version was removed
|
||||||
owner, err := user_model.GetUserByID(ctx, pcr.OwnerID)
|
for _, ct := range versionsToRemove {
|
||||||
if err != nil {
|
if err := packages_service.DeletePackageVersionAndReferences(ctx, ct.PackageVersion); err != nil {
|
||||||
return fmt.Errorf("GetUserByID failed: %w", err)
|
return fmt.Errorf("CleanupRule [%d]: DeletePackageVersionAndReferences failed: %w", pcr.ID, err)
|
||||||
}
|
}
|
||||||
if err := cargo_service.UpdatePackageIndexIfExists(ctx, owner, owner, packageID); err != nil {
|
packageWithVersionDeleted[ct.Package.ID] = true
|
||||||
return fmt.Errorf("CleanupRule [%d]: cargo.UpdatePackageIndexIfExists failed: %w", pcr.ID, err)
|
anyVersionDeleted = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if pcr.Type == packages_model.TypeCargo {
|
||||||
|
for packageID := range packageWithVersionDeleted {
|
||||||
|
owner, err := user_model.GetUserByID(ctx, pcr.OwnerID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("GetUserByID failed: %w", err)
|
||||||
|
}
|
||||||
|
if err := cargo_service.UpdatePackageIndexIfExists(ctx, owner, owner, packageID); err != nil {
|
||||||
|
return fmt.Errorf("CleanupRule [%d]: cargo.UpdatePackageIndexIfExists failed: %w", pcr.ID, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if anyVersionDeleted {
|
if anyVersionDeleted {
|
||||||
switch pcr.Type {
|
switch pcr.Type {
|
||||||
case packages_model.TypeDebian:
|
case packages_model.TypeDebian:
|
||||||
if err := debian_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
if err := debian_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
||||||
return fmt.Errorf("CleanupRule [%d]: debian.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
return fmt.Errorf("CleanupRule [%d]: debian.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
||||||
}
|
}
|
||||||
case packages_model.TypeAlpine:
|
case packages_model.TypeAlpine:
|
||||||
if err := alpine_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
if err := alpine_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
||||||
return fmt.Errorf("CleanupRule [%d]: alpine.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
return fmt.Errorf("CleanupRule [%d]: alpine.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
||||||
}
|
}
|
||||||
case packages_model.TypeRpm:
|
case packages_model.TypeRpm:
|
||||||
if err := rpm_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
if err := rpm_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
||||||
return fmt.Errorf("CleanupRule [%d]: rpm.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
return fmt.Errorf("CleanupRule [%d]: rpm.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
||||||
}
|
}
|
||||||
case packages_model.TypeArch:
|
case packages_model.TypeArch:
|
||||||
if err := arch_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
if err := arch_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
||||||
return fmt.Errorf("CleanupRule [%d]: arch.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
return fmt.Errorf("CleanupRule [%d]: arch.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
||||||
}
|
}
|
||||||
case packages_model.TypeAlt:
|
case packages_model.TypeAlt:
|
||||||
if err := alt_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
if err := alt_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil {
|
||||||
return fmt.Errorf("CleanupRule [%d]: alt.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
return fmt.Errorf("CleanupRule [%d]: alt.BuildAllRepositoryFiles failed: %w", pcr.ID, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
return nil
|
})
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return committer.Commit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type CleanupTarget struct {
|
type CleanupTarget struct {
|
||||||
|
|
|
||||||
|
|
@ -527,6 +527,42 @@ func TestPackageCleanup(t *testing.T) {
|
||||||
|
|
||||||
duration, _ := time.ParseDuration("-1h")
|
duration, _ := time.ParseDuration("-1h")
|
||||||
|
|
||||||
|
t.Run("Debian", func(t *testing.T) {
|
||||||
|
defer tests.PrintCurrentTest(t)()
|
||||||
|
// Debian does a repository rebuild.
|
||||||
|
|
||||||
|
distribution := "forgejo"
|
||||||
|
component := "main"
|
||||||
|
architecture := "amd64"
|
||||||
|
packageName := "runner"
|
||||||
|
packageDescription := "Forgejo Runner"
|
||||||
|
|
||||||
|
rootURL := fmt.Sprintf("/api/packages/%s/debian", user.Name)
|
||||||
|
uploadURL := fmt.Sprintf("%s/pool/%s/%s/upload", rootURL, distribution, component)
|
||||||
|
|
||||||
|
req := NewRequestWithBody(t, "PUT", uploadURL,
|
||||||
|
createDebianArchive(packageName, "1.0.0", architecture, packageDescription)).
|
||||||
|
AddBasicAuth(user.Name)
|
||||||
|
MakeRequest(t, req, http.StatusCreated)
|
||||||
|
|
||||||
|
resp := MakeRequest(t, NewRequestf(t, "GET", "%s/dists/%s/%s/binary-%s/Packages", rootURL, distribution, component, architecture), http.StatusOK)
|
||||||
|
assert.Contains(t, resp.Body.String(), "pool/forgejo/main/runner_1.0.0_amd64.deb")
|
||||||
|
|
||||||
|
pcr, err := packages_model.InsertCleanupRule(t.Context(), &packages_model.PackageCleanupRule{
|
||||||
|
Enabled: true,
|
||||||
|
RemovePattern: `.+`,
|
||||||
|
OwnerID: user.ID,
|
||||||
|
Type: packages_model.TypeDebian,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, packages_cleanup_service.CleanupTask(t.Context(), duration))
|
||||||
|
|
||||||
|
MakeRequest(t, NewRequestf(t, "GET", "%s/dists/%s/%s/binary-%s/Packages", rootURL, distribution, component, architecture), http.StatusNotFound)
|
||||||
|
|
||||||
|
require.NoError(t, packages_model.DeleteCleanupRuleByID(t.Context(), pcr.ID))
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("Common", func(t *testing.T) {
|
t.Run("Common", func(t *testing.T) {
|
||||||
defer tests.PrintCurrentTest(t)()
|
defer tests.PrintCurrentTest(t)()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue