Skip to content

feat[langsmith]: transparent handoff between @traceable HOF and LangChain #5339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
"ioredis": "^5.3.2",
"js-yaml": "^4.1.0",
"langchain": "workspace:*",
"langsmith": "^0.1.1",
"langsmith": "^0.1.30",
"ml-distance": "^4.0.0",
"mongodb": "^6.3.0",
"pg": "^8.11.0",
Expand Down
2 changes: 1 addition & 1 deletion langchain-core/langchain.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ function abs(relativePath) {
}

export const config = {
internals: [/node\:/, /js-tiktoken/],
internals: [/node\:/, /js-tiktoken/, /langsmith/],
entrypoints: {
agents: "agents",
caches: "caches",
Expand Down
2 changes: 1 addition & 1 deletion langchain-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"camelcase": "6",
"decamelize": "1.2.0",
"js-tiktoken": "^1.0.12",
"langsmith": "~0.1.7",
"langsmith": "~0.1.30",
"ml-distance": "^4.0.0",
"mustache": "^4.2.0",
"p-queue": "^6.6.2",
Expand Down
10 changes: 8 additions & 2 deletions langchain-core/src/callbacks/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ export class CallbackManager

name = "callback_manager";

public readonly _parentRunId?: string;
public _parentRunId?: string;

constructor(
parentRunId?: string,
Expand Down Expand Up @@ -1010,7 +1010,13 @@ export class CallbackManager
)
) {
if (tracingV2Enabled) {
callbackManager.addHandler(await getTracingV2CallbackHandler(), true);
const tracerV2 = await getTracingV2CallbackHandler();
callbackManager.addHandler(tracerV2, true);

// handoff between langchain and langsmith/traceable
// override the parent run ID
callbackManager._parentRunId =
tracerV2.getTraceableRunTree()?.id ?? callbackManager._parentRunId;
}
}
}
Expand Down
124 changes: 121 additions & 3 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { z } from "zod";
import pRetry from "p-retry";
import { v4 as uuidv4 } from "uuid";

import {
type TraceableFunction,
isTraceableFunction,
} from "langsmith/singletons/traceable";
import type { RunnableInterface, RunnableBatchOptions } from "./types.js";
import {
CallbackManager,
Expand Down Expand Up @@ -48,6 +52,7 @@ import {
consumeAsyncIterableInContext,
consumeIteratorInContext,
isAsyncIterable,
isIterableIterator,
isIterator,
} from "./iter.js";

Expand Down Expand Up @@ -2062,6 +2067,91 @@ export class RunnableMap<
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type AnyTraceableFunction = TraceableFunction<(...any: any[]) => any>;

/**
* A runnable that wraps a traced LangSmith function.
*/
export class RunnableTraceable<RunInput, RunOutput> extends Runnable<
RunInput,
RunOutput
> {
lc_serializable = false;

lc_namespace = ["langchain_core", "runnables"];

protected func: AnyTraceableFunction;

constructor(fields: { func: AnyTraceableFunction }) {
super(fields);

if (!isTraceableFunction(fields.func)) {
throw new Error(
"RunnableTraceable requires a function that is wrapped in traceable higher-order function"
);
}

this.func = fields.func;
}

async invoke(input: RunInput, options?: Partial<RunnableConfig>) {
const [config] = this._getOptionsList(options ?? {}, 1);
const callbacks = await getCallbackManagerForConfig(config);

return (await this.func(
patchConfig(config, { callbacks }),
input
)) as RunOutput;
}

async *_streamIterator(
input: RunInput,
options?: Partial<RunnableConfig>
): AsyncGenerator<RunOutput> {
const result = await this.invoke(input, options);

if (isAsyncIterable(result)) {
for await (const item of result) {
yield item as RunOutput;
}
return;
}

if (isIterator(result)) {
while (true) {
const state: IteratorResult<unknown> = result.next();
if (state.done) break;
yield state.value as RunOutput;
}
return;
}

yield result;
}

static from(func: AnyTraceableFunction) {
return new RunnableTraceable({ func });
}
}

function assertNonTraceableFunction<RunInput, RunOutput>(
func:
| RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
| TraceableFunction<
RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
>
): asserts func is RunnableFunc<
RunInput,
RunOutput | Runnable<RunInput, RunOutput>
> {
if (isTraceableFunction(func)) {
throw new Error(
"RunnableLambda requires a function that is not wrapped in traceable higher-order function. This shouldn't happen."
);
}
}

/**
* A runnable that runs a callable.
*/
Expand All @@ -2081,14 +2171,42 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
>;

constructor(fields: {
func: RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>;
func:
| RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
| TraceableFunction<
RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
>;
}) {
if (isTraceableFunction(fields.func)) {
// eslint-disable-next-line no-constructor-return
return RunnableTraceable.from(fields.func) as unknown as RunnableLambda<
RunInput,
RunOutput
>;
}

super(fields);

assertNonTraceableFunction(fields.func);
this.func = fields.func;
}

static from<RunInput, RunOutput>(
func: RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
): RunnableLambda<RunInput, RunOutput>;

static from<RunInput, RunOutput>(
func: TraceableFunction<
RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
>
): RunnableLambda<RunInput, RunOutput>;

static from<RunInput, RunOutput>(
func:
| RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
| TraceableFunction<
RunnableFunc<RunInput, RunOutput | Runnable<RunInput, RunOutput>>
>
): RunnableLambda<RunInput, RunOutput> {
return new RunnableLambda({
func,
Expand Down Expand Up @@ -2141,7 +2259,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
}
}
output = finalOutput as typeof output;
} else if (isIterator(output)) {
} else if (isIterableIterator(output)) {
let finalOutput: RunOutput | undefined;
for (const chunk of consumeIteratorInContext(
childConfig,
Expand Down Expand Up @@ -2233,7 +2351,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
for await (const chunk of consumeAsyncIterableInContext(config, output)) {
yield chunk as RunOutput;
}
} else if (isIterator(output)) {
} else if (isIterableIterator(output)) {
for (const chunk of consumeIteratorInContext(config, output)) {
yield chunk as RunOutput;
}
Expand Down
10 changes: 9 additions & 1 deletion langchain-core/src/runnables/iter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
import { RunnableConfig } from "./config.js";

export function isIterator(thing: unknown): thing is IterableIterator<unknown> {
export function isIterableIterator(
thing: unknown
): thing is IterableIterator<unknown> {
return (
typeof thing === "object" &&
thing !== null &&
Expand All @@ -11,6 +13,12 @@ export function isIterator(thing: unknown): thing is IterableIterator<unknown> {
);
}

export const isIterator = (x: unknown): x is Iterator<unknown> =>
x != null &&
typeof x === "object" &&
"next" in x &&
typeof x.next === "function";

export function isAsyncIterable(
thing: unknown
): thing is AsyncIterable<unknown> {
Expand Down
45 changes: 45 additions & 0 deletions langchain-core/src/tracers/tracer_langchain.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { Client } from "langsmith";
import { RunTree } from "langsmith/run_trees";
import { getCurrentRunTree } from "langsmith/singletons/traceable";

import {
BaseRun,
RunCreate,
Expand Down Expand Up @@ -57,6 +60,40 @@ export class LangChainTracer
getEnvironmentVariable("LANGCHAIN_SESSION");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there! I noticed that the recent code changes in tracer_langchain.ts explicitly access an environment variable using the getEnvironmentVariable function. I've flagged this for your review to ensure it aligns with our environment variable usage guidelines. Keep up the great work!

this.exampleId = exampleId;
this.client = client ?? new Client({});

// if we're inside traceable, we can obtain the traceable tree
// and populate the run map, which is used to correctly
// infer dotted order and execution order
const traceableTree = this.getTraceableRunTree();
if (traceableTree) {
let rootRun: RunTree = traceableTree;
const visited = new Set<string>();
while (rootRun.parent_run) {
if (visited.has(rootRun.id)) break;
visited.add(rootRun.id);

if (!rootRun.parent_run) break;
rootRun = rootRun.parent_run as RunTree;
}
visited.clear();

const queue = [rootRun];
while (queue.length > 0) {
const current = queue.shift();
if (!current || visited.has(current.id)) continue;
visited.add(current.id);

// @ts-expect-error Types of property 'events' are incompatible.
this.runMap.set(current.id, current);
if (current.child_runs) {
queue.push(...current.child_runs);
}
}

this.client = traceableTree.client ?? this.client;
this.projectName = traceableTree.project_name ?? this.projectName;
this.exampleId = traceableTree.reference_example_id ?? this.exampleId;
}
}

private async _convertToCreate(
Expand Down Expand Up @@ -102,4 +139,12 @@ export class LangChainTracer
getRun(id: string): Run | undefined {
return this.runMap.get(id);
}

getTraceableRunTree(): RunTree | undefined {
try {
return getCurrentRunTree();
} catch {
return undefined;
}
}
}
2 changes: 1 addition & 1 deletion langchain/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@
"js-yaml": "^4.1.0",
"jsonpointer": "^5.0.1",
"langchainhub": "~0.0.8",
"langsmith": "~0.1.7",
"langsmith": "~0.1.30",
"ml-distance": "^4.0.0",
"openapi-types": "^12.1.3",
"p-retry": "4",
Expand Down
4 changes: 2 additions & 2 deletions langchain/src/smith/runner_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
} from "langsmith";
import { EvaluationResult, RunEvaluator } from "langsmith/evaluation";
import { DataType } from "langsmith/schemas";
import type { TraceableFunction } from "langsmith/traceable";
import type { TraceableFunction } from "langsmith/singletons/traceable";
import { LLMStringEvaluator } from "../evaluation/base.js";
import { loadEvaluator } from "../evaluation/loader.js";
import { EvaluatorType } from "../evaluation/types.js";
Expand Down Expand Up @@ -165,7 +165,7 @@ class CallbackManagerRunTree extends RunTree {
this.callbackManager = callbackManager;
}

async createChild(config: RunTreeConfig): Promise<CallbackManagerRunTree> {
createChild(config: RunTreeConfig): CallbackManagerRunTree {
const child = new CallbackManagerRunTree(
{
...config,
Expand Down
2 changes: 1 addition & 1 deletion libs/langchain-community/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"flat": "^5.0.2",
"js-yaml": "^4.1.0",
"langchain": "0.2.3",
"langsmith": "~0.1.1",
"langsmith": "~0.1.30",
"uuid": "^9.0.0",
"zod": "^3.22.3",
"zod-to-json-schema": "^3.22.5"
Expand Down
Loading
Loading