Skip to content

[7.16] Removing perf_hooks from task manager (#117294) #117331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions x-pack/plugins/task_manager/server/lib/fill_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* 2.0.
*/

import { performance } from 'perf_hooks';
import { Observable } from 'rxjs';
import { concatMap, last } from 'rxjs/operators';
import { ClaimOwnershipResult } from '../queries/task_claiming';
Expand Down Expand Up @@ -57,7 +56,6 @@ export async function fillPool(
converter: (taskInstance: ConcreteTaskInstance) => TaskManagerRunner,
run: (tasks: TaskManagerRunner[]) => Promise<TaskPoolRunResult>
): Promise<TimedFillPoolResult> {
performance.mark('fillPool.start');
return new Promise((resolve, reject) => {
const stopTaskTimer = startTaskTimer();
const augmentTimingTo = (
Expand All @@ -76,12 +74,6 @@ export async function fillPool(
res,
async ({ docs, stats }) => {
if (!docs.length) {
performance.mark('fillPool.bailNoTasks');
performance.measure(
'fillPool.activityDurationUntilNoTasks',
'fillPool.start',
'fillPool.bailNoTasks'
);
return asOk({ result: TaskPoolRunResult.NoTaskWereRan, stats });
}
return asOk(
Expand All @@ -106,20 +98,12 @@ export async function fillPool(
({ result, stats }) => {
switch (result) {
case TaskPoolRunResult.RanOutOfCapacity:
performance.mark('fillPool.bailExhaustedCapacity');
performance.measure(
'fillPool.activityDurationUntilExhaustedCapacity',
'fillPool.start',
'fillPool.bailExhaustedCapacity'
);
return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats);
case TaskPoolRunResult.RunningAtCapacity:
performance.mark('fillPool.cycle');
return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats);
case TaskPoolRunResult.NoTaskWereRan:
return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats);
default:
performance.mark('fillPool.cycle');
return augmentTimingTo(FillPoolResult.PoolFilled, stats);
}
},
Expand Down
14 changes: 0 additions & 14 deletions x-pack/plugins/task_manager/server/polling/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
* This module contains the logic for polling the task manager index for new work.
*/

import { performance } from 'perf_hooks';
import { after } from 'lodash';
import { Subject, merge, of, Observable, combineLatest, timer } from 'rxjs';
import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs/operators';

Expand Down Expand Up @@ -113,7 +111,6 @@ export function createTaskPoller<T, H>({
// take as many argumented calls as we have capacity for and call `work` with
// those arguments. If the queue is empty this will still trigger work to be done
concatMap(async (set: Set<T>) => {
closeSleepPerf();
return mapResult<H, Error, Result<H, PollingError<T>>>(
await promiseResult<H, Error>(
timeoutPromiseAfter<H, Error>(
Expand All @@ -126,7 +123,6 @@ export function createTaskPoller<T, H>({
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
);
}),
tap(openSleepPerf),
// catch errors during polling for work
catchError((err: Error) => of(asPollingError<T>(err, PollingErrorType.WorkError)))
);
Expand Down Expand Up @@ -177,13 +173,3 @@ export class PollingError<T> extends Error {
this.data = data;
}
}

const openSleepPerf = () => {
performance.mark('TaskPoller.sleep');
};
// we only want to close after an open has been called, as we're counting the time *between* work cycles
// so we'll ignore the first call to `closeSleepPerf` but we will run every subsequent call
const closeSleepPerf = after(2, () => {
performance.mark('TaskPoller.poll');
performance.measure('TaskPoller.sleepDuration', 'TaskPoller.sleep', 'TaskPoller.poll');
});
5 changes: 0 additions & 5 deletions x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
*/
import { Observable, Subject } from 'rxjs';
import moment, { Duration } from 'moment';
import { performance } from 'perf_hooks';
import { padStart } from 'lodash';
import { Logger } from '../../../../src/core/server';
import { TaskRunner } from './task_running';
Expand Down Expand Up @@ -111,7 +110,6 @@ export class TaskPool {
public run = async (tasks: TaskRunner[]): Promise<TaskPoolRunResult> => {
const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, this.availableWorkers);
if (tasksToRun.length) {
performance.mark('attemptToRun_start');
await Promise.all(
tasksToRun
.filter((taskRunner) => !this.tasksInPool.has(taskRunner.id))
Expand All @@ -130,9 +128,6 @@ export class TaskPool {
.catch((err) => this.handleFailureOfMarkAsRunning(taskRunner, err));
})
);

performance.mark('attemptToRun_stop');
performance.measure('taskPool.attemptToRun', 'attemptToRun_start', 'attemptToRun_stop');
}

if (leftOverTasks.length) {
Expand Down
13 changes: 0 additions & 13 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import apm from 'elastic-apm-node';
import { withSpan } from '@kbn/apm-utils';
import { performance } from 'perf_hooks';
import { identity, defaults, flow } from 'lodash';
import {
Logger,
Expand Down Expand Up @@ -313,7 +312,6 @@ export class TaskManagerRunner implements TaskRunner {
}`
);
}
performance.mark('markTaskAsRunning_start');

const apmTrans = apm.startTransaction('taskManager', 'taskManager markTaskAsRunning');

Expand Down Expand Up @@ -372,12 +370,10 @@ export class TaskManagerRunner implements TaskRunner {
}

if (apmTrans) apmTrans.end('success');
performanceStopMarkingTaskAsRunning();
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asOk(this.instance.task)));
return true;
} catch (error) {
if (apmTrans) apmTrans.end('failure');
performanceStopMarkingTaskAsRunning();
this.onTaskEvent(asTaskMarkRunningEvent(this.id, asErr(error)));
if (!SavedObjectsErrorHelpers.isConflictError(error)) {
if (!SavedObjectsErrorHelpers.isNotFoundError(error)) {
Expand Down Expand Up @@ -617,15 +613,6 @@ function howManyMsUntilOwnershipClaimExpires(ownershipClaimedUntil: Date | null)
return ownershipClaimedUntil ? ownershipClaimedUntil.getTime() - Date.now() : 0;
}

function performanceStopMarkingTaskAsRunning() {
performance.mark('markTaskAsRunning_stop');
performance.measure(
'taskRunner.markTaskAsRunning',
'markTaskAsRunning_start',
'markTaskAsRunning_stop'
);
}

// A type that extracts the Instance type out of TaskRunningStage
// This helps us to better communicate to the developer what the expected "stage"
// in a specific place in the code might be
Expand Down