mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-06-22 10:02:15 +00:00
chore(services/actions/task.go): re-enable nilnil lint (#13089)
Related issue: https://codeberg.org/forgejo/forgejo/issues/11261 ### Tests for Go changes - I added test coverage for Go changes... - [ ] in their respective `*_test.go` for unit tests. - [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I ran... - [x] `make pr-go` before pushing ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [ ] This change will be noticed by a Forgejo user or admin (feature, bug fix, performance, etc.). I suggest to include a release note for this change. - [x] This change is not visible to a Forgejo user or admin (refactor, dependency upgrade, etc.). I think there is no need to add a release note for this change. Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/13089 Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org>
This commit is contained in:
parent
5bd855709b
commit
481387a652
5 changed files with 140 additions and 74 deletions
|
|
@ -321,9 +321,6 @@ linters:
|
|||
- path: services/actions/context.go
|
||||
linters:
|
||||
- nilnil
|
||||
- path: services/actions/task.go
|
||||
linters:
|
||||
- nilnil
|
||||
- path: services/actions/trust.go
|
||||
linters:
|
||||
- nilnil
|
||||
|
|
|
|||
|
|
@ -166,27 +166,30 @@ func (s *Service) FetchTask(
|
|||
// if the task version in request is not equal to the version in db,
|
||||
// it means there may still be some tasks not be assigned.
|
||||
// try to pick a task for the runner that send the request.
|
||||
if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, nil); err != nil {
|
||||
log.Error("pick task failed: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
|
||||
} else if ok {
|
||||
if t, err := actions_service.PickTask(ctx, runner, requestKey, nil); err != nil {
|
||||
if !(actions_service.IsNoTaskAvailable(err)) {
|
||||
log.Error("pick task failed: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
|
||||
}
|
||||
} else {
|
||||
task = t
|
||||
|
||||
taskCapacity := req.Msg.GetTaskCapacity()
|
||||
taskCapacity-- // remove 1 for the task already fetched as `task`
|
||||
for taskCapacity > 0 {
|
||||
if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, nil); err != nil {
|
||||
// Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner
|
||||
// and if we don't return them, they can't be picked up by another runner and will become zombie tasks.
|
||||
// Log the error and return the tasks we've assigned so far.
|
||||
log.Error("pick task failed: %v", err)
|
||||
break
|
||||
} else if ok {
|
||||
additionalTasks = append(additionalTasks, t)
|
||||
taskCapacity--
|
||||
} else {
|
||||
t, err := actions_service.PickTask(ctx, runner, requestKey, nil)
|
||||
if err != nil {
|
||||
if !(actions_service.IsNoTaskAvailable(err)) {
|
||||
// Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner
|
||||
// and if we don't return them, they can't be picked up by another runner and will become zombie tasks.
|
||||
// Log the error and return the tasks we've assigned so far.
|
||||
log.Error("pick task failed: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
additionalTasks = append(additionalTasks, t)
|
||||
taskCapacity--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -233,10 +236,12 @@ func (s *Service) FetchSingleTask(
|
|||
handle = req.Msg.Handle
|
||||
}
|
||||
|
||||
if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, handle); err != nil {
|
||||
log.Error("pick task failed: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
|
||||
} else if ok {
|
||||
if t, err := actions_service.PickTask(ctx, runner, requestKey, handle); err != nil {
|
||||
if !(actions_service.IsNoTaskAvailable(err)) {
|
||||
log.Error("pick task failed: %v", err)
|
||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
|
||||
}
|
||||
} else {
|
||||
task = t
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,9 @@ import (
|
|||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKey, handle *string) (*runnerv1.Task, bool, error) {
|
||||
var ErrEphemeralRunnerHasAssignedTask = errors.New("ephemeral runner already has an assigned task")
|
||||
|
||||
func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKey, handle *string) (*runnerv1.Task, error) {
|
||||
var (
|
||||
task *runnerv1.Task
|
||||
job *actions_model.ActionRunJob
|
||||
|
|
@ -32,23 +34,18 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKe
|
|||
hasRunnerAssignedTask, err := actions_model.HasTaskForRunner(ctx, runner.ID)
|
||||
// Let the runner retry the request, do not allow to proceed
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if runner has task, dont assign new task
|
||||
if hasRunnerAssignedTask {
|
||||
return nil, false, nil
|
||||
return nil, ErrEphemeralRunnerHasAssignedTask
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||
t, err := actions_model.CreateTaskForRunner(ctx, runner, requestKey, handle)
|
||||
if err != nil {
|
||||
if errors.Is(err, actions_model.ErrNoMatchingJobFound) ||
|
||||
errors.Is(err, actions_model.ErrNoJobUpdated) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("CreateTaskForRunner: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -93,16 +90,18 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKe
|
|||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if task == nil {
|
||||
return nil, false, nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
CreateCommitStatus(ctx, job)
|
||||
|
||||
return task, true, nil
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func IsNoTaskAvailable(err error) bool {
|
||||
return errors.Is(err, ErrEphemeralRunnerHasAssignedTask) ||
|
||||
errors.Is(err, actions_model.ErrNoMatchingJobFound) ||
|
||||
errors.Is(err, actions_model.ErrNoJobUpdated)
|
||||
}
|
||||
|
||||
func RecoverTasks(ctx context.Context, tasks []*actions_model.ActionTask) ([]*runnerv1.Task, error) {
|
||||
|
|
|
|||
|
|
@ -5,22 +5,51 @@ package integration
|
|||
|
||||
import (
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
"testing/fstest"
|
||||
|
||||
actions_model "forgejo.org/models/actions"
|
||||
repo_model "forgejo.org/models/repo"
|
||||
unit_model "forgejo.org/models/unit"
|
||||
"forgejo.org/models/unittest"
|
||||
user_model "forgejo.org/models/user"
|
||||
"forgejo.org/modules/setting"
|
||||
"forgejo.org/modules/util"
|
||||
files_service "forgejo.org/services/repository/files"
|
||||
"forgejo.org/tests"
|
||||
"forgejo.org/tests/forgery"
|
||||
|
||||
"code.forgejo.org/xorm/xorm/convert"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func createFetchTaskTestRepository(
|
||||
t *testing.T,
|
||||
owner *user_model.User,
|
||||
workflowFileName,
|
||||
workflowFileContent string,
|
||||
) *repo_model.Repository {
|
||||
t.Helper()
|
||||
|
||||
fileSystem := forgery.MapFS{
|
||||
".forgejo/workflows/" + workflowFileName: &fstest.MapFile{
|
||||
Data: []byte(workflowFileContent),
|
||||
},
|
||||
}
|
||||
|
||||
opts := &forgery.CreateRepositoryOptions{
|
||||
LatestSha: new(string),
|
||||
Name: "repo-many-tasks",
|
||||
Files: fileSystem,
|
||||
}
|
||||
|
||||
repo := forgery.CreateRepository(t, owner, opts)
|
||||
|
||||
var unitConfig convert.Conversion
|
||||
forgery.EnableRepoUnit(t, repo, unit_model.TypeActions, unitConfig)
|
||||
|
||||
return repo
|
||||
}
|
||||
|
||||
func TestActionFetchTask_TaskCapacity(t *testing.T) {
|
||||
if !setting.Database.Type.IsSQLite3() {
|
||||
// mock repo runner only supported on SQLite testing
|
||||
|
|
@ -31,13 +60,7 @@ func TestActionFetchTask_TaskCapacity(t *testing.T) {
|
|||
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||
|
||||
// create the repo
|
||||
repo, _, f := tests.CreateDeclarativeRepo(t, user2, "repo-many-tasks",
|
||||
[]unit_model.Type{unit_model.TypeActions}, nil,
|
||||
[]*files_service.ChangeRepoFile{
|
||||
{
|
||||
Operation: "create",
|
||||
TreePath: ".forgejo/workflows/matrix.yml",
|
||||
ContentReader: strings.NewReader(`
|
||||
repo := createFetchTaskTestRepository(t, user2, "matrix.yml", `
|
||||
on:
|
||||
push:
|
||||
jobs:
|
||||
|
|
@ -52,11 +75,7 @@ jobs:
|
|||
steps:
|
||||
- run: echo ${{ matrix.d1 }} ${{ matrix.d2 }} ${{ matrix.d3 }}
|
||||
- run: sleep 2
|
||||
`),
|
||||
},
|
||||
},
|
||||
)
|
||||
defer f()
|
||||
`)
|
||||
|
||||
runner := newMockRunner()
|
||||
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"})
|
||||
|
|
@ -100,13 +119,7 @@ func TestActionFetchTask_Idempotent(t *testing.T) {
|
|||
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||
|
||||
// create the repo
|
||||
repo, _, f := tests.CreateDeclarativeRepo(t, user2, "repo-many-tasks",
|
||||
[]unit_model.Type{unit_model.TypeActions}, nil,
|
||||
[]*files_service.ChangeRepoFile{
|
||||
{
|
||||
Operation: "create",
|
||||
TreePath: ".forgejo/workflows/matrix.yml",
|
||||
ContentReader: strings.NewReader(`
|
||||
repo := createFetchTaskTestRepository(t, user2, "matrix.yml", `
|
||||
on:
|
||||
push:
|
||||
jobs:
|
||||
|
|
@ -117,11 +130,7 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- run: sleep 2
|
||||
`),
|
||||
},
|
||||
},
|
||||
)
|
||||
defer f()
|
||||
`)
|
||||
|
||||
runner := newMockRunner()
|
||||
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"})
|
||||
|
|
@ -202,13 +211,7 @@ func TestActionFetchTask_RequestedJob(t *testing.T) {
|
|||
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||
|
||||
// create the repo
|
||||
repo, _, f := tests.CreateDeclarativeRepo(t, user2, "repo-many-tasks",
|
||||
[]unit_model.Type{unit_model.TypeActions}, nil,
|
||||
[]*files_service.ChangeRepoFile{
|
||||
{
|
||||
Operation: "create",
|
||||
TreePath: ".forgejo/workflows/simple.yml",
|
||||
ContentReader: strings.NewReader(`
|
||||
repo := createFetchTaskTestRepository(t, user2, "simple.yml", `
|
||||
on:
|
||||
push:
|
||||
jobs:
|
||||
|
|
@ -224,11 +227,7 @@ jobs:
|
|||
runs-on: debian
|
||||
steps:
|
||||
- run: echo OK
|
||||
`),
|
||||
},
|
||||
},
|
||||
)
|
||||
defer f()
|
||||
`)
|
||||
|
||||
debianRunner := newMockRunner()
|
||||
debianRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "debian-runner", []string{"debian"})
|
||||
|
|
@ -277,3 +276,59 @@ jobs:
|
|||
assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1")
|
||||
})
|
||||
}
|
||||
|
||||
func TestActionFetchTask_EphemeralRunnerAssignedAlready(t *testing.T) {
|
||||
if !setting.Database.Type.IsSQLite3() {
|
||||
// mock repo runner only supported on SQLite testing
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
onApplicationRun(t, func(t *testing.T, u *url.URL) {
|
||||
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||
|
||||
// create the repo
|
||||
repo := createFetchTaskTestRepository(t, user2, "simple.yml", `
|
||||
on:
|
||||
push:
|
||||
jobs:
|
||||
job1:
|
||||
runs-on: debian
|
||||
steps:
|
||||
- run: echo OK
|
||||
job2:
|
||||
runs-on: debian
|
||||
steps:
|
||||
- run: echo OK
|
||||
job3:
|
||||
runs-on: debian
|
||||
steps:
|
||||
- run: echo OK
|
||||
`)
|
||||
|
||||
ephemeralDebianRunner := newMockRunner()
|
||||
ephemeralDebianRunner.registerAsEphemeralRepoRunner(t, user2.Name, repo.Name, "debian-runner-ephemeral", []string{"debian"})
|
||||
|
||||
normalDebianRunner := newMockRunner()
|
||||
normalDebianRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "debian-runner-normal", []string{"debian"})
|
||||
|
||||
job1 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RepoID: repo.ID, Name: "job1"})
|
||||
job2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RepoID: repo.ID, Name: "job2"})
|
||||
job3 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RepoID: repo.ID, Name: "job3"})
|
||||
|
||||
assert.NotEmpty(t, job1.Handle)
|
||||
assert.NotEmpty(t, job2.Handle)
|
||||
assert.NotEmpty(t, job3.Handle)
|
||||
|
||||
// Fetch a task for the ephemeral runner. This will only create one task even tho we have three waiting jobs
|
||||
task, additionalTasks := ephemeralDebianRunner.maybeFetchTaskWithTaskCapacity(t, 3)
|
||||
require.NotNil(t, task)
|
||||
assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1")
|
||||
require.Empty(t, additionalTasks)
|
||||
|
||||
// Fetch a task for the normal runner. This will only create two tasks even tho we set the capacity to three
|
||||
task, additionalTasks = normalDebianRunner.maybeFetchTaskWithTaskCapacity(t, 3)
|
||||
require.NotNil(t, task)
|
||||
assert.Contains(t, string(task.GetWorkflowPayload()), "name: job2")
|
||||
require.Len(t, additionalTasks, 1)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -150,6 +150,16 @@ func (r *mockRunner) maybeFetchTask(t *testing.T) *runnerv1.Task {
|
|||
return resp.Msg.Task
|
||||
}
|
||||
|
||||
func (r *mockRunner) maybeFetchTaskWithTaskCapacity(t *testing.T, taskCapacity int64) (*runnerv1.Task, []*runnerv1.Task) {
|
||||
resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
|
||||
TasksVersion: r.lastTasksVersion,
|
||||
TaskCapacity: &taskCapacity,
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
r.lastTasksVersion = resp.Msg.TasksVersion
|
||||
return resp.Msg.Task, resp.Msg.AdditionalTasks
|
||||
}
|
||||
|
||||
func (r *mockRunner) maybeFetchSingleTask(t *testing.T, handle *string) *runnerv1.Task {
|
||||
resp, err := r.client.runnerServiceClient.FetchSingleTask(t.Context(), connect.NewRequest(&runnerv1.FetchSingleTaskRequest{
|
||||
TasksVersion: r.lastTasksVersion,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue