Skip to content

feat: Log Signal Exp Config and Monitoring #9947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions e2e_tests/tests/cluster/test_log_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from determined.experimental import client
from tests import api_utils
from tests import experiment as exp
from tests.cluster import utils
from tests.experiment import noop


Expand Down Expand Up @@ -152,3 +153,33 @@ def test_log_policy_exclude_slurm(should_match: bool) -> None:
) # Job fails to start up the second restart since all nodes are excluded.
else:
assert times_ran == 2


@pytest.mark.e2e_cpu
@pytest.mark.parametrize("should_match", [True, False])
def test_log_policy_matched(should_match: bool) -> None:
sess = api_utils.user_session()
regex = r"executing.*action.*exit.*code.*7"
if not should_match:
regex = r"(.*) this should not match (.*)"

expected_policy = "Test"
config = {
"log_policies": [{"name": expected_policy, "pattern": regex}],
}

exp_ref = noop.create_experiment(sess, [noop.Exit(7)], config=config)
assert exp_ref.wait(interval=0.01) == client.ExperimentState.ERROR

searchRes = utils.get_run_by_exp_id(sess, exp_ref.id)
runPolicyMatched = searchRes.runs[0].logPolicyMatched

trialRes = bindings.get_GetTrial(sess, trialId=searchRes.runs[0].id)
trialPolicyMatched = trialRes.trial.logPolicyMatched

if should_match:
assert runPolicyMatched == expected_policy
assert trialPolicyMatched == expected_policy
else:
assert runPolicyMatched is None
assert trialPolicyMatched is None
28 changes: 28 additions & 0 deletions e2e_tests/tests/cluster/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing_extensions import Literal # noqa:I2041

from determined.common import api
from determined.common.api import bindings
from tests import command
from tests import config as conf
from tests import detproc
Expand Down Expand Up @@ -197,3 +198,30 @@ def set_master_port(config: str) -> None:
lc = conf.load_config(config_path=config)
port = get_master_port(lc)
conf.MASTER_PORT = port


def get_run_by_exp_id(sess: api.Session, exp_id: int) -> bindings.v1SearchRunsResponse:
return bindings.post_SearchRuns(
sess,
body=bindings.v1SearchRunsRequest(
limit=1,
filter="""{
"filterGroup": {
"children": [
{
"columnName": "experimentId",
"kind": "field",
"location": "LOCATION_TYPE_RUN",
"operator": "=",
"type": "COLUMN_TYPE_NUMBER",
"value": %s
}
],
"conjunction": "and",
"kind": "group"
},
"showArchived": false
}"""
% exp_id,
),
)
32 changes: 16 additions & 16 deletions harness/determined/common/api/bindings.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion master/internal/api_experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2741,7 +2741,7 @@ func (a *apiServer) SearchExperiments(
Column("searcher_metric_value").
Column("trials.external_trial_id").
ColumnExpr("nullif(trials.metadata, 'null') as metadata").
ColumnExpr("NULL as log_signal").
ColumnExpr("NULL as log_policy_matched").
Join("LEFT JOIN validations bv ON trials.best_validation_id = bv.id").
Join("LEFT JOIN validations lv ON trials.latest_validation_id = lv.id").
Join("LEFT JOIN checkpoints_v2 new_ckpt ON new_ckpt.id = trials.warm_start_checkpoint_id").
Expand Down
2 changes: 1 addition & 1 deletion master/internal/api_runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func getRunsColumns(q *bun.SelectQuery) *bun.SelectQuery {
'pachyderm_integration', NULLIF(e.config#>'{integrations,pachyderm}', 'null'),
'id', e.id) AS experiment`).
ColumnExpr("rm.metadata AS metadata").
ColumnExpr("r.log_signal AS log_signal").
ColumnExpr("r.log_policy_matched AS log_policy_matched").
Join("LEFT JOIN experiments AS e ON r.experiment_id=e.id").
Join("LEFT JOIN runs_metadata AS rm ON r.id=rm.run_id").
Join("LEFT JOIN users u ON e.owner_id = u.id").
Expand Down
85 changes: 79 additions & 6 deletions master/internal/api_tasks_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,14 @@ func TestPostTaskLogsLogPattern(t *testing.T) {
activeConfig, err := api.m.db.ActiveExperimentConfig(trial.ExperimentID)
require.NoError(t, err)
activeConfig.RawLogPolicies = expconf.LogPoliciesConfig{
expconf.LogPolicy{RawPattern: "sub", RawAction: expconf.LogAction{
RawCancelRetries: &expconf.LogActionCancelRetries{},
}},
expconf.LogPolicy{RawPattern: `\d{5}$`, RawAction: expconf.LogAction{
RawExcludeNode: &expconf.LogActionExcludeNode{},
}},
expconf.LogPolicy{
RawPattern: ptrs.Ptr("sub"),
RawAction: &expconf.LogActionV0{Type: expconf.LogActionTypeCancelRetries},
},
expconf.LogPolicy{
RawPattern: ptrs.Ptr(`\d{5}$`),
RawAction: &expconf.LogActionV0{Type: expconf.LogActionTypeExcludeNode},
},
}

v, err := json.Marshal(activeConfig)
Expand Down Expand Up @@ -443,3 +445,74 @@ func TestGetAllocationAcceleratorData(t *testing.T) {
require.Equal(t, resp.AcceleratorData[0].ResourcePool,
a1.ResourcePool, "failed to get the correct allocation's resource pool data")
}

func TestPostTaskLogsLogSignalDataSaving(t *testing.T) {
api, curUser, ctx := setupAPITest(t, nil)
trial, task := createTestTrial(t, api, curUser)

activeConfig, err := api.m.db.ActiveExperimentConfig(trial.ExperimentID)
require.NoError(t, err)

activeConfig.RawLogPolicies = expconf.LogPoliciesConfig{
expconf.LogPolicy{
RawName: ptrs.Ptr("test"),
RawPattern: ptrs.Ptr("sub"),
},
}

v, err := json.Marshal(activeConfig)
require.NoError(t, err)

var m map[string]any
require.NoError(t, json.Unmarshal(v, &m))

_, err = db.Bun().NewUpdate().Table("experiments").
Where("id = ?", trial.ExperimentID).
Set("config = ?", m).
Exec(ctx)
require.NoError(t, err)

_, err = api.PostTaskLogs(ctx, &apiv1.PostTaskLogsRequest{
Logs: []*taskv1.TaskLog{
{
TaskId: string(task.TaskID),
AgentId: ptrs.Ptr("a1"),
Log: "stringsubstring",
},
{
TaskId: string(task.TaskID),
AgentId: ptrs.Ptr("a1"),
Log: "12345",
},
},
})
require.NoError(t, err)

runsOut := struct {
bun.BaseModel `bun:"table:runs"`
LogPolicyMatched *string `db:"log_policy_matched"`
}{}

err = db.Bun().NewSelect().Model(&runsOut).
Where("id = ?", trial.ID).
Scan(ctx)
require.NoError(t, err)
require.NotNil(t, runsOut)
require.NotNil(t, runsOut.LogPolicyMatched)

require.Equal(t, "test", *runsOut.LogPolicyMatched)

tasksOut := struct {
bun.BaseModel `bun:"table:tasks"`
LogPolicyMatched *string `db:"log_policy_matched"`
}{}
err = db.Bun().NewSelect().Model(&tasksOut).
Join("LEFT JOIN run_id_task_id AS rt on tasks.task_id = rt.task_id").
Where("run_id = ?", trial.ID).
Scan(ctx)
require.NoError(t, err)
require.NotNil(t, tasksOut)
require.NotNil(t, tasksOut.LogPolicyMatched)

require.Equal(t, "test", *tasksOut.LogPolicyMatched)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ const (
// DefaultInvariantConfigStr is the default invariant config val used for tests.
DefaultInvariantConfigStr = `{
"description": "random description",
"resources": {"slots": 4, "max_slots": 8},
"log_policies": [
{
"pattern": "nonrepeat"
}
]
"resources": {"slots": 4, "max_slots": 8}
}`
// DefaultConstraintsStr is the default constraints val used for tests.
DefaultConstraintsStr = `{"priority_limit": 10, "resources": {"max_slots": 8}}`
Expand Down
13 changes: 8 additions & 5 deletions master/internal/configpolicy/task_config_policy_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,6 @@ func TestMergeWithInvariantExperimentConfigs(t *testing.T) {
"read_only": true,
"propagation": "cluster-wide"
}
],
"log_policies": [
{
"pattern": "nonrepeat"
}
]
}`

Expand Down Expand Up @@ -672,6 +667,7 @@ func testMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "nonrepeat policy",
"pattern": "nonrepeat"
}
]
Expand Down Expand Up @@ -711,6 +707,7 @@ func testMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "repeat policy",
"pattern": "repeat"
}
]
Expand Down Expand Up @@ -778,9 +775,11 @@ func testMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "nonrepeat policy",
"pattern": "nonrepeat"
},
{
"name": "repeat policy",
"pattern": "repeat"
}
]
Expand Down Expand Up @@ -820,6 +819,7 @@ func testMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "global repeat policy",
"pattern": "gloablrepeat"
}
]
Expand Down Expand Up @@ -903,12 +903,15 @@ func testMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "nonrepeat policy",
"pattern": "nonrepeat"
},
{
"name": "repeat policy",
"pattern": "repeat"
},
{
"name": "global repeat policy",
"pattern": "gloablrepeat"
}
]
Expand Down
Loading
Loading