Skip to content

Commit 129680d

Browse files
authored
Persist worker name on claimed run (#3122)
1 parent 842397b commit 129680d

File tree

10 files changed

+164
-49
lines changed

10 files changed

+164
-49
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ and this project adheres to
4040
- When claiming a run, a worker name can optionally be provided to the
4141
adaptor that is responsible for claiming runs.
4242
[#3079](https://github.com/OpenFn/lightning/issues/3079)
43+
- Persist worker name provided by worker when claiming a run.
44+
[#3079](https://github.com/OpenFn/lightning/issues/3079)
4345

4446
### Changed
4547

assets/package-lock.json

+21-21
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

assets/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
"devDependencies": {
4747
"@eslint-community/eslint-plugin-eslint-comments": "^4.4.1",
4848
"@eslint/js": "^9.21.0",
49-
"@openfn/ws-worker": "^1.13.0",
49+
"@openfn/ws-worker": "^1.13.2",
5050
"@total-typescript/ts-reset": "^0.6.1",
5151
"@tsconfig/node-lts": "^22.0.1",
5252
"@tsconfig/recommended": "^1.0.8",

lib/lightning/extensions/run_queue.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ defmodule Lightning.Extensions.RunQueue do
1313
| {:error, Ecto.Multi.name(), any(),
1414
%{required(Ecto.Multi.name()) => any()}}
1515

16-
@callback claim(demand :: non_neg_integer()) ::
16+
@callback claim(demand :: non_neg_integer(), worker_name :: String.t()) ::
1717
{:ok, [Lightning.Run.t()]}
1818

1919
@callback dequeue(run :: Lightning.Run.t()) ::

lib/lightning/runs.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ defmodule Lightning.Runs do
4040
# all implementation should default to 1.
4141
# """
4242
@impl Lightning.Extensions.RunQueue
43-
def claim(demand \\ 1) do
44-
RunQueue.claim(demand)
43+
def claim(demand \\ 1, worker_name) do
44+
RunQueue.claim(demand, worker_name)
4545
end
4646

4747
# @doc """

lib/lightning/services/run_queue.ex

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ defmodule Lightning.Services.RunQueue do
1717
end
1818

1919
@impl true
20-
def claim(demand) do
21-
adapter().claim(demand)
20+
def claim(demand, worker_name) do
21+
adapter().claim(demand, worker_name)
2222
end
2323

2424
@impl true

lib/lightning_web/channels/worker_channel.ex

+10-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@ defmodule LightningWeb.WorkerChannel do
2020
end
2121

2222
@impl true
23-
def handle_in("claim", %{"demand" => demand}, socket) do
24-
case Runs.claim(demand) do
23+
def handle_in(
24+
"claim",
25+
%{"demand" => demand, "worker_name" => worker_name},
26+
socket
27+
) do
28+
case Runs.claim(demand, sanitise_worker_name(worker_name)) do
2529
{:ok, runs} ->
2630
runs =
2731
runs
@@ -43,6 +47,10 @@ defmodule LightningWeb.WorkerChannel do
4347
end
4448
end
4549

50+
defp sanitise_worker_name(""), do: nil
51+
52+
defp sanitise_worker_name(worker_name), do: worker_name
53+
4654
defp run_options(run) do
4755
Ecto.assoc(run, :workflow)
4856
|> Lightning.Repo.one()

test/lightning/runs_test.exs

+40-17
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ defmodule Lightning.RunsTest do
4141
end
4242

4343
describe "claim/1" do
44-
test "claims a run from the queue" do
44+
setup do
45+
%{worker_name: "my.worker.name"}
46+
end
47+
48+
test "claims a run from the queue", %{worker_name: worker_name} do
4549
%{triggers: [trigger]} =
4650
workflow = insert(:simple_workflow) |> with_snapshot()
4751

@@ -51,15 +55,32 @@ defmodule Lightning.RunsTest do
5155
dataclip: params_with_assocs(:dataclip)
5256
)
5357

54-
assert {:ok, [claimed]} = Runs.claim()
58+
assert {:ok, [claimed]} = Runs.claim(worker_name)
5559

5660
assert claimed.id == run.id
5761
assert claimed.state == :claimed
5862

59-
assert {:ok, []} = Runs.claim()
63+
assert {:ok, []} = Runs.claim(worker_name)
6064
end
6165

62-
test "claims a run from the queue having parallel runs disabled" do
66+
test "persists worker name when claiming", %{worker_name: worker_name} do
67+
%{triggers: [trigger]} =
68+
workflow = insert(:simple_workflow) |> with_snapshot()
69+
70+
{:ok, %{runs: [run]}} =
71+
WorkOrders.create_for(trigger,
72+
workflow: workflow,
73+
dataclip: params_with_assocs(:dataclip)
74+
)
75+
76+
Runs.claim(worker_name)
77+
78+
assert %{worker_name: ^worker_name} = Repo.get!(Run, run.id)
79+
end
80+
81+
test "claims a run from the queue having parallel runs disabled", %{
82+
worker_name: worker_name
83+
} do
6384
project1 = insert(:project, concurrency: 1)
6485
project2 = insert(:project)
6586

@@ -82,26 +103,26 @@ defmodule Lightning.RunsTest do
82103
{run, %{trigger: trigger, workflow: workflow}}
83104
end)
84105

85-
assert {:ok, [%{id: ^run1_id, state: :claimed}]} = Runs.claim()
86-
assert {:ok, [%{id: ^run2a_id, state: :claimed}]} = Runs.claim()
87-
assert {:ok, []} = Runs.claim()
106+
assert {:ok, [%{id: ^run1_id, state: :claimed}]} = Runs.claim(worker_name)
107+
assert {:ok, [%{id: ^run2a_id, state: :claimed}]} = Runs.claim(worker_name)
108+
assert {:ok, []} = Runs.claim(worker_name)
88109

89110
{:ok, %{runs: [%{id: run2b_id}]}} =
90111
WorkOrders.create_for(trigger2,
91112
workflow: workflow2,
92113
dataclip: params_with_assocs(:dataclip)
93114
)
94115

95-
assert {:ok, [%{id: ^run2b_id, state: :claimed}]} = Runs.claim()
116+
assert {:ok, [%{id: ^run2b_id, state: :claimed}]} = Runs.claim(worker_name)
96117

97118
Repo.get!(Run, run1_id)
98119
|> Ecto.Changeset.change(%{state: :success})
99120
|> Repo.update!()
100121

101-
assert {:ok, [%{id: ^run3_id, state: :claimed}]} = Runs.claim()
122+
assert {:ok, [%{id: ^run3_id, state: :claimed}]} = Runs.claim(worker_name)
102123
end
103124

104-
test "claims with demand" do
125+
test "claims with demand", %{worker_name: worker_name} do
105126
%{triggers: [trigger]} =
106127
workflow = insert(:simple_workflow) |> with_snapshot()
107128

@@ -117,20 +138,22 @@ defmodule Lightning.RunsTest do
117138
run
118139
end)
119140

120-
assert {:ok, [claimed_1, claimed_2]} = Runs.claim(2)
141+
assert {:ok, [claimed_1, claimed_2]} = Runs.claim(2, worker_name)
121142

122143
assert claimed_1.id == run_1.id
123144
assert claimed_1.state == :claimed
124145
assert claimed_2.id == run_2.id
125146
assert claimed_2.state == :claimed
126147

127-
assert {:ok, [claimed_3]} = Runs.claim(2)
148+
assert {:ok, [claimed_3]} = Runs.claim(2, worker_name)
128149

129150
assert claimed_3.id == run_3.id
130151
assert claimed_3.state == :claimed
131152
end
132153

133-
test "claims with demand for all immediate run" do
154+
test "claims with demand for all immediate run", %{
155+
worker_name: worker_name
156+
} do
134157
%{triggers: [trigger]} =
135158
workflow = insert(:simple_workflow) |> with_snapshot()
136159

@@ -168,19 +191,19 @@ defmodule Lightning.RunsTest do
168191
|> then(fn {:ok, %{run: run}} -> {i, run} end)
169192
end)
170193

171-
assert {:ok, [claimed_1, claimed_2]} = Runs.claim(2)
194+
assert {:ok, [claimed_1, claimed_2]} = Runs.claim(2, worker_name)
172195

173196
assert claimed_1.id == runs[1].id
174197
assert claimed_1.state == :claimed
175198
assert claimed_2.id == runs[2].id
176199
assert claimed_2.state == :claimed
177200

178-
assert {:ok, [claimed_3]} = Runs.claim()
201+
assert {:ok, [claimed_3]} = Runs.claim(worker_name)
179202

180203
assert claimed_3.id == runs[3].id
181204
assert claimed_3.state == :claimed
182205

183-
assert {:ok, [claimed_4, claimed_5]} = Runs.claim(2)
206+
assert {:ok, [claimed_4, claimed_5]} = Runs.claim(2, worker_name)
184207

185208
assert claimed_4.id in [runs[4].id, second_last_run.id]
186209
assert claimed_4.state == :claimed
@@ -189,7 +212,7 @@ defmodule Lightning.RunsTest do
189212
assert claimed_5.id in [runs[4].id, second_last_run.id]
190213
assert claimed_5.state == :claimed
191214

192-
assert {:ok, [claimed_6]} = Runs.claim(2)
215+
assert {:ok, [claimed_6]} = Runs.claim(2, worker_name)
193216

194217
assert claimed_6.id == last_run.id
195218
assert claimed_6.state == :claimed
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
defmodule Lightning.Services.RunQueueTest do
2+
use Lightning.DataCase, async: true
3+
4+
alias Lightning.Repo
5+
alias Lightning.Services.RunQueue
6+
alias Lightning.WorkOrders
7+
8+
describe "claim/2" do
9+
test "persists the worker name when claiming a run" do
10+
worker_name = "my.worker.name"
11+
12+
project1 = insert(:project)
13+
14+
%{triggers: [trigger1]} =
15+
workflow1 =
16+
insert(:simple_workflow, project: project1) |> with_snapshot()
17+
18+
{:ok, %{runs: [run1]}} =
19+
WorkOrders.create_for(trigger1,
20+
workflow: workflow1,
21+
dataclip: params_with_assocs(:dataclip)
22+
)
23+
24+
{:ok, %{runs: [run2]}} =
25+
WorkOrders.create_for(trigger1,
26+
workflow: workflow1,
27+
dataclip: params_with_assocs(:dataclip)
28+
)
29+
30+
RunQueue.claim(2, worker_name)
31+
32+
assert %{worker_name: ^worker_name} = Repo.reload!(run1)
33+
assert %{worker_name: ^worker_name} = Repo.reload!(run2)
34+
end
35+
end
36+
end

0 commit comments

Comments
 (0)