Skip to content

Commit 4928568

Browse files
feat: Queuer-side progress indication (#190)
Some refinements and performance improvements are likely needed, but this is good enough to be merged into main.
1 parent ceb6d9e commit 4928568

File tree

13 files changed

+588
-197
lines changed

13 files changed

+588
-197
lines changed

poetry.lock

+10-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ shortuuid = "^1.0.8"
2626
yaspin = "^2.1.0"
2727
ffmpeg-python = "^0.2.0"
2828
asciimatics = "^1.14.0"
29+
cryptohash = "^1.0.5"
2930

3031
[tool.poetry.dev-dependencies]
3132
mkdocs-material = "^7.3.6"

resolve_proxy_encoder/app/broker.py

+323
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
from decimal import DivisionByZero
2+
from typing import Union
3+
import logging
4+
import json
5+
import redis
6+
import time
7+
from rich.console import Group
8+
from rich.progress import (
9+
BarColumn,
10+
Progress,
11+
SpinnerColumn,
12+
TextColumn,
13+
TimeRemainingColumn,
14+
)
15+
from rich.live import Live
16+
from cryptohash import sha1
17+
18+
from resolve_proxy_encoder.settings.manager import SettingsManager
19+
20+
21+
class RedisConnection:
22+
23+
"""
24+
Redis connection class.
25+
26+
Pulls connection settings from config into new Redis connection instance.
27+
28+
Attributes:
29+
connection (Redis, None): Connection object used to interface with Redis.
30+
settings (SettingsManager): Configuration used for connection.
31+
"""
32+
33+
def __init__(self, settings: SettingsManager):
34+
self.connection: Union[redis.Redis, None] = None
35+
self.settings = settings
36+
37+
def get_connection(self):
38+
"""
39+
Initialise Redis connection.
40+
41+
Returns:
42+
connection(Redis, None): Connection object used to interface with Redis.
43+
"""
44+
45+
broker_url = str(self.settings["broker"]["url"])
46+
host = str(broker_url.split("redis://")[1].split(":")[0])
47+
port = int(broker_url.split(":")[2].split("/")[0])
48+
49+
self.connection = redis.Redis(host=host, port=port, decode_responses=False)
50+
return self.connection
51+
52+
53+
class ProgressTracker:
54+
def __init__(self, settings: SettingsManager, callable_tasks):
55+
"""
56+
Track encoding progress of all tasks in a group
57+
58+
`settings` needed for connection to broker
59+
`callable_tasks` needed to know task count
60+
for accurate progress bar rendering
61+
62+
"""
63+
64+
redis = RedisConnection(settings)
65+
self.redis = redis.get_connection()
66+
67+
self.logger = logging.getLogger(__name__)
68+
self.logger.setLevel(settings["app"]["loglevel"])
69+
70+
self.callable_tasks = callable_tasks
71+
72+
self.matched_task_ids = []
73+
self.progress_latest_data = {}
74+
self.prog_percentages = {}
75+
self.last_task_average = 0
76+
77+
self.active_workers = list()
78+
self.completed_tasks = 0
79+
self.group_id = None
80+
81+
self.data_checksums = list()
82+
83+
def __define_progress_bars(self):
84+
85+
self.last_status = Progress(
86+
TextColumn("{task.fields[last_status]}"),
87+
)
88+
89+
self.worker_spinner = Progress(
90+
SpinnerColumn(),
91+
# TODO: Get individual worker names instead of host machines
92+
# labels: enhancement
93+
TextColumn("[cyan]Using {task.fields[active_workers]} workers"),
94+
)
95+
96+
self.average_progress = Progress(
97+
TextColumn("[cyan][progress.description]{task.description}"),
98+
BarColumn(),
99+
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
100+
TextColumn("[yellow]ETA:[/]"),
101+
TimeRemainingColumn(),
102+
)
103+
104+
self.overall_progress = Progress(
105+
TextColumn("[cyan]{task.description}"),
106+
# TODO: Fix bar column not lining up with task_progress bars
107+
# Maybe we can make a spacer text column for all the bars and truncate long filenames with [...]?
108+
# labels: bug
109+
BarColumn(),
110+
TextColumn("[cyan]({task.completed} of {task.total})"),
111+
)
112+
113+
# Create group of renderables
114+
self.progress_group = Group(
115+
self.last_status,
116+
"", # This actually works as a spacer lol
117+
self.worker_spinner,
118+
self.average_progress,
119+
self.overall_progress,
120+
)
121+
122+
def __init_progress_bars(self):
123+
124+
self.worker_id = self.last_status.add_task(
125+
description="Last task event status",
126+
last_status="",
127+
)
128+
self.last_status_id = self.worker_spinner.add_task(
129+
description="Active worker count",
130+
active_workers=0,
131+
last_status="",
132+
)
133+
134+
self.average_id = self.average_progress.add_task(
135+
description="Average task progress",
136+
total=100, # percentage
137+
)
138+
139+
self.overall_id = self.overall_progress.add_task(
140+
description="Total task progress",
141+
total=len(self.callable_tasks),
142+
)
143+
144+
def get_new_data(self, key):
145+
146+
data = self.redis.get(key)
147+
if data is None:
148+
self.logger.debug(f"[yellow]Could not get value from key: '{key}'")
149+
return None
150+
151+
# TODO: This shouldn't be returning invalid JSON?
152+
# Not sure why but every 8th poll returns a value that isn't None, but isn't JSON.
153+
# Also, JSONDecodeError is actually thrown as ValueError. Which is weird
154+
# labels: Enhancement
155+
156+
try:
157+
data = json.loads(data)
158+
except ValueError:
159+
self.logger.debug(f"[yellow]Could not decode value from key {key}")
160+
return None
161+
162+
if not self.__data_is_new(data):
163+
self.logger.debug(
164+
f"Fetching redis key: '{key}' returned stale data:\n{data}"
165+
)
166+
return None
167+
168+
return data
169+
170+
def __data_is_new(self, data):
171+
172+
checksum = sha1(str(data))
173+
174+
# Not sure what's better practice.
175+
# Converting everything into checksums, or storing all
176+
# values of all keys in a `prior_data` variable so
177+
# we know if we've seen data before...
178+
# Maybe there's a better way?
179+
180+
if data in self.data_checksums:
181+
return False
182+
else:
183+
self.data_checksums.append(checksum)
184+
return True
185+
186+
def handle_task_event(self, key):
187+
188+
data = self.get_new_data(key)
189+
if data == None:
190+
return
191+
192+
# Is this one of our tasks, or another queuers?
193+
if self.group_id == data["group_id"]:
194+
195+
# Update worker count
196+
self.matched_task_ids.append(data["task_id"])
197+
if data["worker"] not in self.active_workers:
198+
self.active_workers.append(data["worker"])
199+
200+
# Update discrete task progress
201+
if data["status"] in ["SUCCESS", "FAILURE"]:
202+
203+
self.completed_tasks = self.completed_tasks + 1
204+
self.overall_progress.update(
205+
task_id=self.overall_id,
206+
completed=self.completed_tasks,
207+
total=len(self.callable_tasks),
208+
)
209+
210+
# Print task event updates
211+
worker = data["worker"]
212+
file_name = data["args"][0]["file_name"]
213+
214+
switch = {
215+
"SUCCESS": f"[bold green] :green_circle: {worker}[/] -> finished '{file_name}'",
216+
"FAILURE": f"[bold red] :red_circle: {worker}[/] -> [red]failed '{file_name}'",
217+
"STARTED": f"[bold cyan] :blue_circle: {worker}[/] -> picked up '{file_name}'",
218+
}
219+
220+
self.status = switch[data["status"]]
221+
222+
# Update spinner last status
223+
self.last_status.update(
224+
task_id=self.last_status_id,
225+
last_status=switch[data["status"]],
226+
)
227+
228+
# Update worker spinner
229+
self.worker_spinner.update(
230+
task_id=self.worker_id,
231+
active_workers=len(self.active_workers),
232+
)
233+
234+
def handle_task_progress(self, key):
235+
236+
data = self.get_new_data(key)
237+
if not data:
238+
self.logger.debug(f"[magenta]Progress data: {data}")
239+
return
240+
241+
# If task is registered, track it
242+
if data["task_id"] in self.matched_task_ids:
243+
244+
# Store all vals for future purposes maybe?
245+
self.progress_latest_data.update(
246+
{data["task_id"]: [data.get("completed"), data.get("total")]}
247+
)
248+
# Get up-to-date average
249+
progress_data = self.progress_latest_data[data["task_id"]]
250+
percentage = round(progress_data[0] / progress_data[1] * 100)
251+
self.prog_percentages.update({data["task_id"]: percentage})
252+
active_task_average = round(
253+
sum(self.prog_percentages.values()) / len(self.prog_percentages)
254+
)
255+
try:
256+
total_task_average = round(
257+
active_task_average
258+
/ (len(self.callable_tasks) - self.completed_tasks)
259+
)
260+
except DivisionByZero:
261+
total_task_average = 0
262+
263+
# Log debug
264+
self.logger.debug(f"[magenta]Current task percentage: {percentage}")
265+
self.logger.debug(
266+
f"[magenta]Active tasks average percentage: {active_task_average}"
267+
)
268+
self.logger.debug(
269+
f"[magenta]Total tasks average percentage: {total_task_average}\n"
270+
)
271+
272+
# TODO: Better way to prevent progress going backward on task pickup?
273+
# Not sure why the task progress is going backwards.
274+
# It happens on new task pick up, which I thought we accounted for?
275+
# It doesn't seem to be off by much though.
276+
# labels: enhancement
277+
if total_task_average > self.last_task_average:
278+
279+
# Update average progress bar
280+
self.average_progress.update(
281+
task_id=self.average_id,
282+
completed=total_task_average,
283+
)
284+
285+
def report_progress(self, results, loop_delay=1):
286+
287+
# I figure timeout should be shorter than loop delay,
288+
# that way we know we're not outpacing ourselves
289+
290+
self.group_id = results.id
291+
292+
self.__define_progress_bars()
293+
self.__init_progress_bars()
294+
295+
with Live(self.progress_group):
296+
297+
while not results.ready():
298+
299+
task_events = [
300+
x
301+
for x in self.redis.scan_iter("celery-task-meta*")
302+
if x is not None
303+
]
304+
progress_events = [
305+
x for x in self.redis.scan_iter("task-progress*") if x is not None
306+
]
307+
308+
for te in task_events:
309+
self.handle_task_event(te)
310+
311+
for pe in progress_events:
312+
self.handle_task_progress(pe)
313+
314+
# Let's be nice to the server ;)
315+
time.sleep(loop_delay)
316+
317+
# Hide the progress bars after finish
318+
self.worker_spinner.update(task_id=self.worker_id, visible=False)
319+
self.last_status.update(task_id=self.last_status_id, visible=False)
320+
self.average_progress.update(task_id=self.average_id, visible=False)
321+
self.overall_progress.update(task_id=self.overall_id, visible=False)
322+
323+
return results

0 commit comments

Comments
 (0)