You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
fromdecimalimportDivisionByZerofromtypingimportUnionimportloggingimportjsonimportredisimporttimefromrich.consoleimportGroupfromrich.progressimport (
BarColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeRemainingColumn,
)
fromrich.liveimportLivefromcryptohashimportsha1fromresolve_proxy_encoder.settings.managerimportSettingsManagerclassRedisConnection:
""" Redis connection class. Pulls connection settings from config into new Redis connection instance. Attributes: connection (Redis, None): Connection object used to interface with Redis. settings (SettingsManager): Configuration used for connection. """def__init__(self, settings: SettingsManager):
self.connection: Union[redis.Redis, None] =Noneself.settings=settingsdefget_connection(self):
""" Initialise Redis connection. Returns: connection(Redis, None): Connection object used to interface with Redis. """broker_url=str(self.settings["broker"]["url"])
host=str(broker_url.split("redis://")[1].split(":")[0])
port=int(broker_url.split(":")[2].split("/")[0])
self.connection=redis.Redis(host=host, port=port, decode_responses=False)
returnself.connectionclassProgressTracker:
def__init__(self, settings: SettingsManager, callable_tasks):
""" Track encoding progress of all tasks in a group `settings` needed for connection to broker `callable_tasks` needed to know task count for accurate progress bar rendering """redis=RedisConnection(settings)
self.redis=redis.get_connection()
self.logger=logging.getLogger(__name__)
self.logger.setLevel(settings["app"]["loglevel"])
self.callable_tasks=callable_tasksself.matched_task_ids= []
self.progress_latest_data= {}
self.prog_percentages= {}
self.last_task_average=0self.active_workers=list()
self.completed_tasks=0self.group_id=Noneself.data_checksums=list()
def__define_progress_bars(self):
self.last_status=Progress(
TextColumn("{task.fields[last_status]}"),
)
self.worker_spinner=Progress(
SpinnerColumn(),
# TODO: Get individual worker names instead of host machines# labels: enhancementTextColumn("[cyan]Using {task.fields[active_workers]} workers"),
)
self.average_progress=Progress(
TextColumn("[cyan][progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("[yellow]ETA:[/]"),
TimeRemainingColumn(),
)
self.overall_progress=Progress(
TextColumn("[cyan]{task.description}"),
# TODO: Fix bar column not lining up with task_progress bars# Maybe we can make a spacer text column for all the bars and truncate long filenames with [...]?# labels: bugBarColumn(),
TextColumn("[cyan]({task.completed} of {task.total})"),
)
# Create group of renderablesself.progress_group=Group(
self.last_status,
"", # This actually works as a spacer lolself.worker_spinner,
self.average_progress,
self.overall_progress,
)
def__init_progress_bars(self):
self.worker_id=self.last_status.add_task(
description="Last task event status",
last_status="",
)
self.last_status_id=self.worker_spinner.add_task(
description="Active worker count",
active_workers=0,
last_status="",
)
self.average_id=self.average_progress.add_task(
description="Average task progress",
total=100, # percentage
)
self.overall_id=self.overall_progress.add_task(
description="Total task progress",
total=len(self.callable_tasks),
)
defget_new_data(self, key):
data=self.redis.get(key)
ifdataisNone:
self.logger.debug(f"[yellow]Could not get value from key: '{key}'")
returnNone# TODO: This shouldn't be returning invalid JSON?# Not sure why but every 8th poll returns a value that isn't None, but isn't JSON.# Also, JSONDecodeError is actually thrown as ValueError. Which is weird# labels: Enhancementtry:
data=json.loads(data)
exceptValueError:
self.logger.debug(f"[yellow]Could not decode value from key {key}")
returnNoneifnotself.__data_is_new(data):
self.logger.debug(
f"Fetching redis key: '{key}' returned stale data:\n{data}"
)
returnNonereturndatadef__data_is_new(self, data):
checksum=sha1(str(data))
# Not sure what's better practice.# Converting everything into checksums, or storing all# values of all keys in a `prior_data` variable so# we know if we've seen data before...# Maybe there's a better way?ifdatainself.data_checksums:
returnFalseelse:
self.data_checksums.append(checksum)
returnTruedefhandle_task_event(self, key):
data=self.get_new_data(key)
ifdata==None:
return# Is this one of our tasks, or another queuers?ifself.group_id==data["group_id"]:
# Update worker countself.matched_task_ids.append(data["task_id"])
ifdata["worker"] notinself.active_workers:
self.active_workers.append(data["worker"])
# Update discrete task progressifdata["status"] in ["SUCCESS", "FAILURE"]:
self.completed_tasks=self.completed_tasks+1self.overall_progress.update(
task_id=self.overall_id,
completed=self.completed_tasks,
total=len(self.callable_tasks),
)
# Print task event updatesworker=data["worker"]
file_name=data["args"][0]["file_name"]
switch= {
"SUCCESS": f"[bold green] :green_circle: {worker}[/] -> finished '{file_name}'",
"FAILURE": f"[bold red] :red_circle: {worker}[/] -> [red]failed '{file_name}'",
"STARTED": f"[bold cyan] :blue_circle: {worker}[/] -> picked up '{file_name}'",
}
self.status=switch[data["status"]]
# Update spinner last statusself.last_status.update(
task_id=self.last_status_id,
last_status=switch[data["status"]],
)
# Update worker spinnerself.worker_spinner.update(
task_id=self.worker_id,
active_workers=len(self.active_workers),
)
defhandle_task_progress(self, key):
data=self.get_new_data(key)
ifnotdata:
self.logger.debug(f"[magenta]Progress data: {data}")
return# If task is registered, track itifdata["task_id"] inself.matched_task_ids:
# Store all vals for future purposes maybe?self.progress_latest_data.update(
{data["task_id"]: [data.get("completed"), data.get("total")]}
)
# Get up-to-date averageprogress_data=self.progress_latest_data[data["task_id"]]
percentage=round(progress_data[0] /progress_data[1] *100)
self.prog_percentages.update({data["task_id"]: percentage})
active_task_average=round(
sum(self.prog_percentages.values()) /len(self.prog_percentages)
)
try:
total_task_average=round(
active_task_average/ (len(self.callable_tasks) -self.completed_tasks)
)
exceptDivisionByZero:
total_task_average=0# Log debugself.logger.debug(f"[magenta]Current task percentage: {percentage}")
self.logger.debug(
f"[magenta]Active tasks average percentage: {active_task_average}"
)
self.logger.debug(
f"[magenta]Total tasks average percentage: {total_task_average}\n"
)
# TODO: Better way to prevent progress going backward on task pickup?# Not sure why the task progress is going backwards.# It happens on new task pick up, which I thought we accounted for?# It doesn't seem to be off by much though.# labels: enhancementiftotal_task_average>self.last_task_average:
# Update average progress barself.average_progress.update(
task_id=self.average_id,
completed=total_task_average,
)
defreport_progress(self, results, loop_delay=1):
# I figure timeout should be shorter than loop delay,# that way we know we're not outpacing ourselvesself.group_id=results.idself.__define_progress_bars()
self.__init_progress_bars()
withLive(self.progress_group):
whilenotresults.ready():
task_events= [
xforxinself.redis.scan_iter("celery-task-meta*")
ifxisnotNone
]
progress_events= [
xforxinself.redis.scan_iter("task-progress*") ifxisnotNone
]
forteintask_events:
self.handle_task_event(te)
forpeinprogress_events:
self.handle_task_progress(pe)
# Let's be nice to the server ;)time.sleep(loop_delay)
# Hide the progress bars after finishself.worker_spinner.update(task_id=self.worker_id, visible=False)
self.last_status.update(task_id=self.last_status_id, visible=False)
self.average_progress.update(task_id=self.average_id, visible=False)
self.overall_progress.update(task_id=self.overall_id, visible=False)
returnresults
557f0ea293af5e49dc36c0075f36b5a60d132390
The text was updated successfully, but these errors were encountered:
Better way to prevent progress going backward on task pickup?
Not sure why the task progress is going backwards.
It happens on new task pick up, which I thought we accounted for?
It doesn't seem to be off by much though.
that way we know we're not outpacing ourselves
https://github.com/in03/Resolve-Proxy-Encoder/blob/49285682fbf27e4e30aefce77cfb6d5e56fe93ca/resolve_proxy_encoder/app/broker.py#L272
557f0ea293af5e49dc36c0075f36b5a60d132390
The text was updated successfully, but these errors were encountered: