Skip to content

[QUESTION] When queuing tasks, needing object references to the actual tasks/workflow makes it very difficult to use in DI frameworks #1581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
tolgap opened this issue Apr 20, 2025 · 3 comments · May be fixed by #1582
Labels
enhancement New feature or request

Comments

@tolgap
Copy link

tolgap commented Apr 20, 2025

Given a simple task:

export type SimpleInput = {
  Message: string;
};

export const simple = hatchet.task({
  name: 'simple',
  fn: (input: SimpleInput) => {
    return {
      TransformedMessage: input.Message.toLowerCase(),
    };
  },
});

Providing this task in a Dependency Injection framework is easy.

But when using multiple processes it gets tricky. Lets say:

Process A: Node web framework that uses the tasks call `.run` or `.runNoWait` etc.
Process B: Node worker process that creates the workers and runs the tasks

If you are using a Dependency Injection framework (NestJS in my case), it becomes very frustrating to deal with the fact that the worker process needs object references to the Hatchet tasks. If these Hatchet Tasks have been modified to be provided by a DI container, the fn of these tasks is not available statically somewhere. It gets created when the DI container is initializing.

In the worker process, I would be forced to initialize the same DI dependencies and tasks in order to receive the entire Hatchet Task object with the correct fn. This in turn would mean that Process A initializes the tasks, and Process B initializes them as well.

The part I'm having trouble with is having to also then initialize all DI dependencies in Process A and B just to: queue the tasks, and to run the tasks.

Any pointers on how I should deal with this?

@tolgap tolgap added the enhancement New feature or request label Apr 20, 2025
@tolgap tolgap changed the title [QUESTION] When creating a worker, needing object references to the actual tasks/workflow makes it very difficult to use in DI frameworks [QUESTION] When queuing tasks, needing object references to the actual tasks/workflow makes it very difficult to use in DI frameworks Apr 20, 2025
@grutt grutt linked a pull request Apr 20, 2025 that will close this issue
1 task
@grutt
Copy link
Contributor

grutt commented Apr 20, 2025

Hi @tolgap thanks for the question.

Can you help me understand:

If these Hatchet Tasks have been modified to be provided by a DI container

I've stood up the most basic nestjs integration #1582 but this is completely bypassing di at the moment.

@grutt
Copy link
Contributor

grutt commented Apr 20, 2025

I think I actually have a simple pattern here:

Modify task to be a factory

import { HatchetClient } from '@hatchet-dev/typescript-sdk';

export type SimpleInput = {
  Message: string;
};

export const simple = (client: HatchetClient) =>
  client.task({
    name: 'simple',
    fn: (input: SimpleInput) => {
      return {
        TransformedMessage: input.Message.toLowerCase(),
      };
    },
  });

Create a hatchet provider to init the client and bind it to tasks:

import { Injectable } from '@nestjs/common';
import { HatchetClient } from '@hatchet-dev/typescript-sdk';
import { simple } from './tasks';

@Injectable()
export class HatchetService {
  private hatchet: HatchetClient;

  constructor() {
    this.hatchet = HatchetClient.init();
  }

  get simple() {
    return simple(this.hatchet);
  }
}

You can then use the provider as expected

import { Controller, Get } from '@nestjs/common';
import { HatchetService } from './hatchet.service';

@Controller()
export class AppController {
  constructor(private readonly hatchet: HatchetService) {}

  @Get()
  async getHello(): Promise<string> {
    const result = await this.hatchet.simple.run({ Message: 'Hello, world!' });
    return result.TransformedMessage;
  }
}

And you can bind the client at worker start

import { simple } from './tasks/simple';
import { HatchetClient } from '@hatchet-dev/typescript-sdk';

async function main() {
  const hatchet = HatchetClient.init();

  const worker = await hatchet.worker('simple-worker', {
    workflows: [simple].map((task) => task(hatchet)),
  });

  await worker.start();
}

if (require.main === module) {
  void main();
}

Would love feedback here, I eventually want more native nestjs support

@tolgap
Copy link
Author

tolgap commented Apr 20, 2025

Hi @grutt thanks for your messages!

The suggestion you provided has some issues:

  • As you said, Task itself is not in the DI container, and you want to be able to inject task specific dependencies into the task.fn
  • It would not be possible to have REQUEST scoped Tasks
  • The worker is not setup as a NestJS application. If you setup the Task to be in the DI container and the task.fn is receiving DI dependencies, then the worker itself also has to be a NestJS application

I'm working on a package to provide a NestJS wrapper for Hatchet, but the SDK design seems hard to implement in NestJS.

If you're curious about how this would work in NestJS, here's some explanation code:

// types.ts
import { type CreateTaskWorkflowOpts } from "@hatchet-dev/typescript-sdk";

export type HatchetTaskOptions = Omit<CreateTaskWorkflowOpts, "fn">;
// constants.ts
import { DiscoveryService } from "@nestjs/core";
import { type HatchetTaskOptions } from "./types/hatchet-task-options.type";

export const HatchetTask =
  DiscoveryService.createDecorator<HatchetTaskOptions>();
// my.task.ts
import { Logger } from "@nestjs/common";
import { HatchetTask, HatchetWorkflowHost } from "nestjs-hatchet"; // <-- I'm working on this

export type MyInput = {
  message: string
}

@HatchetTask({ name: 'my-task' })
export class MyTask extends HatchetWorkflowHost<MyInput> {
  private readonly logger = new Logger(BookTask.name);

  public async perform(input: MyInput): Promise<void> {
    this.logger.log(`Input: ${JSON.stringify(input)}`);

    // Simulate some processing
    await new Promise((resolve) => setTimeout(resolve, 25));

    this.logger.log("My task completed");
  }
}

Finally we have an Explorer class that hooks into NestJS lifecycle to setup the tasks we define using the @HatchetTask decorator.

// hatchet.explorer.ts
@Injectable()
export class HatchetExplorer implements OnModuleInit {
  private readonly logger = new Logger(HatchetExplorer.name);

  constructor(
    private readonly discoveryService: DiscoveryService,
    private readonly client: HatchetClient,
  ) {}

  onModuleInit() {
    this.discoverTasks();
  }

  private discoverTasks() {
    const providers = this.discoveryService.getProviders({
      metadataKey: HatchetTask.KEY,
    });

    for (const wrapper of providers) {
      const { instance } = wrapper;
      if (!(instance instanceof HatchetWorkflowHost)) {
        throw new Error(
          `Hatchet Tasks should extend HatchetWorkflowHost class. Check ${instance.constructor?.name} class please.`,
        );
      }

      const taskFunction = instance["perform"].bind(instance);

      const taskOptions = this.discoveryService.getMetadataByDecorator(
        HatchetTask,
        wrapper,
      );

      const task = this.client.task({
        ...taskOptions,
        fn: taskFunction,
      });
      instance.hatchetTask = task
    }
  }
}

Doing it this way, I can create NestJS provider classes that can define their own DI dependencies and use it just like any other @Injectable class.

The tricky part is: if I need to enqueue a task, I always have to inject the entire MyTask, instead of having some kind of a Queue class that lives separate from the task that needs to be executed.

If we could, for instance, use the hatchetClient to enqueue tasks without having to reference (and thus inject the Task itself), it would solve the problems I'm having architectually.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants