Skip to content

Commit 83ec0ba

Browse files
wolfogrelunny
authored andcommitted
Support upload outputs and use needs context (#133)
See [Example usage of the needs context](https://docs.github.com/en/actions/learn-github-actions/contexts#example-usage-of-the-needs-context). Related to: - [actions-proto-def #5](https://gitea.com/gitea/actions-proto-def/pulls/5) - [gitea #24230](go-gitea/gitea#24230) Reviewed-on: https://gitea.com/gitea/act_runner/pulls/133 Reviewed-by: Lunny Xiao <[email protected]> Reviewed-by: Zettat123 <[email protected]> Co-authored-by: Jason Song <[email protected]> Co-committed-by: Jason Song <[email protected]>
1 parent ed86e2f commit 83ec0ba

File tree

4 files changed

+199
-25
lines changed

4 files changed

+199
-25
lines changed

internal/app/run/runner.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package run
55

66
import (
7-
"bytes"
87
"context"
98
"encoding/json"
109
"fmt"
@@ -112,16 +111,11 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
112111

113112
reporter.Logf("%s(version:%s) received task %v of job %v, be triggered by event: %s", r.name, ver.Version(), task.Id, task.Context.Fields["job"].GetStringValue(), task.Context.Fields["event_name"].GetStringValue())
114113

115-
workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
114+
workflow, jobID, err := generateWorkflow(task)
116115
if err != nil {
117116
return err
118117
}
119118

120-
jobIDs := workflow.GetJobIDs()
121-
if len(jobIDs) != 1 {
122-
return fmt.Errorf("multiple jobs found: %v", jobIDs)
123-
}
124-
jobID := jobIDs[0]
125119
plan, err := model.CombineWorkflowPlanner(workflow).PlanJob(jobID)
126120
if err != nil {
127121
return err
@@ -209,5 +203,7 @@ func (r *Runner) run(ctx context.Context, task *runnerv1.Task, reporter *report.
209203
// add logger recorders
210204
ctx = common.WithLoggerHook(ctx, reporter)
211205

212-
return executor(ctx)
206+
execErr := executor(ctx)
207+
reporter.SetOutputs(job.Outputs)
208+
return execErr
213209
}

internal/app/run/workflow.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package run
5+
6+
import (
7+
"bytes"
8+
"fmt"
9+
"sort"
10+
"strings"
11+
12+
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
13+
"github.com/nektos/act/pkg/model"
14+
"gopkg.in/yaml.v3"
15+
)
16+
17+
func generateWorkflow(task *runnerv1.Task) (*model.Workflow, string, error) {
18+
workflow, err := model.ReadWorkflow(bytes.NewReader(task.WorkflowPayload))
19+
if err != nil {
20+
return nil, "", err
21+
}
22+
23+
jobIDs := workflow.GetJobIDs()
24+
if len(jobIDs) != 1 {
25+
return nil, "", fmt.Errorf("multiple jobs found: %v", jobIDs)
26+
}
27+
jobID := jobIDs[0]
28+
29+
needJobIDs := make([]string, 0, len(task.Needs))
30+
for id, need := range task.Needs {
31+
needJobIDs = append(needJobIDs, id)
32+
needJob := &model.Job{
33+
Outputs: need.Outputs,
34+
Result: strings.ToLower(strings.TrimPrefix(need.Result.String(), "RESULT_")),
35+
}
36+
workflow.Jobs[id] = needJob
37+
}
38+
sort.Strings(needJobIDs)
39+
40+
rawNeeds := yaml.Node{
41+
Kind: yaml.SequenceNode,
42+
Content: make([]*yaml.Node, 0, len(needJobIDs)),
43+
}
44+
for _, id := range needJobIDs {
45+
rawNeeds.Content = append(rawNeeds.Content, &yaml.Node{
46+
Kind: yaml.ScalarNode,
47+
Value: id,
48+
})
49+
}
50+
51+
workflow.Jobs[jobID].RawNeeds = rawNeeds
52+
53+
return workflow, jobID, nil
54+
}

internal/app/run/workflow_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2023 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package run
5+
6+
import (
7+
"testing"
8+
9+
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
10+
"github.com/nektos/act/pkg/model"
11+
"github.com/stretchr/testify/require"
12+
"gotest.tools/v3/assert"
13+
)
14+
15+
func Test_generateWorkflow(t *testing.T) {
16+
type args struct {
17+
task *runnerv1.Task
18+
}
19+
tests := []struct {
20+
name string
21+
args args
22+
assert func(t *testing.T, wf *model.Workflow)
23+
want1 string
24+
wantErr bool
25+
}{
26+
{
27+
name: "has needs",
28+
args: args{
29+
task: &runnerv1.Task{
30+
WorkflowPayload: []byte(`
31+
name: Build and deploy
32+
on: push
33+
34+
jobs:
35+
job9:
36+
needs: build
37+
runs-on: ubuntu-latest
38+
steps:
39+
- uses: actions/checkout@v3
40+
- run: ./deploy --build ${{ needs.job1.outputs.output1 }}
41+
- run: ./deploy --build ${{ needs.job2.outputs.output2 }}
42+
`),
43+
Needs: map[string]*runnerv1.TaskNeed{
44+
"job1": {
45+
Outputs: map[string]string{
46+
"output1": "output1 value",
47+
},
48+
Result: runnerv1.Result_RESULT_SUCCESS,
49+
},
50+
"job2": {
51+
Outputs: map[string]string{
52+
"output2": "output2 value",
53+
},
54+
Result: runnerv1.Result_RESULT_SUCCESS,
55+
},
56+
},
57+
},
58+
},
59+
assert: func(t *testing.T, wf *model.Workflow) {
60+
assert.DeepEqual(t, wf.GetJob("job9").Needs(), []string{"job1", "job2"})
61+
},
62+
want1: "job9",
63+
wantErr: false,
64+
},
65+
}
66+
for _, tt := range tests {
67+
t.Run(tt.name, func(t *testing.T) {
68+
got, got1, err := generateWorkflow(tt.args.task)
69+
require.NoError(t, err)
70+
tt.assert(t, got)
71+
assert.Equal(t, got1, tt.want1)
72+
})
73+
}
74+
}

internal/pkg/report/reporter.go

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ type Reporter struct {
3131
logOffset int
3232
logRows []*runnerv1.LogRow
3333
logReplacer *strings.Replacer
34-
state *runnerv1.TaskState
35-
stateM sync.RWMutex
34+
35+
state *runnerv1.TaskState
36+
stateMu sync.RWMutex
37+
outputs sync.Map
3638
}
3739

3840
func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, task *runnerv1.Task) *Reporter {
@@ -56,8 +58,8 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
5658
}
5759

5860
func (r *Reporter) ResetSteps(l int) {
59-
r.stateM.Lock()
60-
defer r.stateM.Unlock()
61+
r.stateMu.Lock()
62+
defer r.stateMu.Unlock()
6163
for i := 0; i < l; i++ {
6264
r.state.Steps = append(r.state.Steps, &runnerv1.StepState{
6365
Id: int64(i),
@@ -70,8 +72,8 @@ func (r *Reporter) Levels() []log.Level {
7072
}
7173

7274
func (r *Reporter) Fire(entry *log.Entry) error {
73-
r.stateM.Lock()
74-
defer r.stateM.Unlock()
75+
r.stateMu.Lock()
76+
defer r.stateMu.Unlock()
7577

7678
log.WithFields(entry.Data).Trace(entry.Message)
7779

@@ -155,9 +157,13 @@ func (r *Reporter) RunDaemon() {
155157
}
156158

157159
func (r *Reporter) Logf(format string, a ...interface{}) {
158-
r.stateM.Lock()
159-
defer r.stateM.Unlock()
160+
r.stateMu.Lock()
161+
defer r.stateMu.Unlock()
162+
163+
r.logf(format, a...)
164+
}
160165

166+
func (r *Reporter) logf(format string, a ...interface{}) {
161167
if !r.duringSteps() {
162168
r.logRows = append(r.logRows, &runnerv1.LogRow{
163169
Time: timestamppb.Now(),
@@ -166,10 +172,30 @@ func (r *Reporter) Logf(format string, a ...interface{}) {
166172
}
167173
}
168174

175+
func (r *Reporter) SetOutputs(outputs map[string]string) {
176+
r.stateMu.Lock()
177+
defer r.stateMu.Unlock()
178+
179+
for k, v := range outputs {
180+
if len(k) > 255 {
181+
r.logf("ignore output because the key is too long: %q", k)
182+
continue
183+
}
184+
if l := len(v); l > 1024*1024 {
185+
log.Println("ignore output because the value is too long:", k, l)
186+
r.logf("ignore output because the value %q is too long: %d", k, l)
187+
}
188+
if _, ok := r.outputs.Load(k); ok {
189+
continue
190+
}
191+
r.outputs.Store(k, v)
192+
}
193+
}
194+
169195
func (r *Reporter) Close(lastWords string) error {
170196
r.closed = true
171197

172-
r.stateM.Lock()
198+
r.stateMu.Lock()
173199
if r.state.Result == runnerv1.Result_RESULT_UNSPECIFIED {
174200
if lastWords == "" {
175201
lastWords = "Early termination"
@@ -191,7 +217,7 @@ func (r *Reporter) Close(lastWords string) error {
191217
Content: lastWords,
192218
})
193219
}
194-
r.stateM.Unlock()
220+
r.stateMu.Unlock()
195221

196222
return retry.Do(func() error {
197223
if err := r.ReportLog(true); err != nil {
@@ -205,9 +231,9 @@ func (r *Reporter) ReportLog(noMore bool) error {
205231
r.clientM.Lock()
206232
defer r.clientM.Unlock()
207233

208-
r.stateM.RLock()
234+
r.stateMu.RLock()
209235
rows := r.logRows
210-
r.stateM.RUnlock()
236+
r.stateMu.RUnlock()
211237

212238
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
213239
TaskId: r.state.Id,
@@ -224,10 +250,10 @@ func (r *Reporter) ReportLog(noMore bool) error {
224250
return fmt.Errorf("submitted logs are lost")
225251
}
226252

227-
r.stateM.Lock()
253+
r.stateMu.Lock()
228254
r.logRows = r.logRows[ack-r.logOffset:]
229255
r.logOffset = ack
230-
r.stateM.Unlock()
256+
r.stateMu.Unlock()
231257

232258
if noMore && ack < r.logOffset+len(rows) {
233259
return fmt.Errorf("not all logs are submitted")
@@ -240,21 +266,45 @@ func (r *Reporter) ReportState() error {
240266
r.clientM.Lock()
241267
defer r.clientM.Unlock()
242268

243-
r.stateM.RLock()
269+
r.stateMu.RLock()
244270
state := proto.Clone(r.state).(*runnerv1.TaskState)
245-
r.stateM.RUnlock()
271+
r.stateMu.RUnlock()
272+
273+
outputs := make(map[string]string)
274+
r.outputs.Range(func(k, v interface{}) bool {
275+
if val, ok := v.(string); ok {
276+
outputs[k.(string)] = val
277+
}
278+
return true
279+
})
246280

247281
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
248-
State: state,
282+
State: state,
283+
Outputs: outputs,
249284
}))
250285
if err != nil {
251286
return err
252287
}
253288

289+
for _, k := range resp.Msg.SentOutputs {
290+
r.outputs.Store(k, struct{}{})
291+
}
292+
254293
if resp.Msg.State != nil && resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED {
255294
r.cancel()
256295
}
257296

297+
var noSent []string
298+
r.outputs.Range(func(k, v interface{}) bool {
299+
if _, ok := v.(string); ok {
300+
noSent = append(noSent, k.(string))
301+
}
302+
return true
303+
})
304+
if len(noSent) > 0 {
305+
return fmt.Errorf("there are still outputs that have not been sent: %v", noSent)
306+
}
307+
258308
return nil
259309
}
260310

0 commit comments

Comments
 (0)