Skip to content

Update n8n_pipe.py to support sending image attachments to n8n, inclu… #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 168 additions & 14 deletions n8n_pipe.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
"""
title: n8n Pipe Function
author: Cole Medin
contributors: Ricardo Leon (leoric-crown)
author_url: https://www.youtube.com/@ColeMedin
version: 0.1.0
version: 0.3.2

This module defines a Pipe class that utilizes N8N for an Agent
This module defines a Pipe class that utilizes N8N for an Agent with support for sending image attachments to n8n
No support for receiving image attachments from n8n.
Metadata: original filename is not preserved, mime and size are sent to n8n.
"""

from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time
import requests
import base64
import io
import json

# Mapping of MIME types to file extensions
MIME_TO_EXT = {
"png": "png",
"jpeg": "jpg",
"jpg": "jpg",
"webp": "webp",
"gif": "gif",
}


def extract_event_info(event_emitter) -> tuple[Optional[str], Optional[str]]:
if not event_emitter or not event_emitter.__closure__:
Expand All @@ -23,6 +38,7 @@ def extract_event_info(event_emitter) -> tuple[Optional[str], Optional[str]]:
return chat_id, message_id
return None, None


class Pipe:
class Valves(BaseModel):
n8n_url: str = Field(
Expand Down Expand Up @@ -81,6 +97,7 @@ async def pipe(
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Optional[dict]:

await self.emit_status(
__event_emitter__, "info", "/Calling N8N Workflow...", False
)
Expand All @@ -89,25 +106,162 @@ async def pipe(

# Verify a message is available
if messages:
question = messages[-1]["content"]
# Get the most recent user message by iterating in reverse
user_message = None
for msg in reversed(messages):
if msg["role"] == "user":
user_message = msg
break

if user_message and isinstance(user_message["content"], list):
# Extract text content and file attachments
question = ""
processed_attachments = []

for content_item in user_message["content"]:
if content_item["type"] == "text":
question = content_item["text"]
elif (
content_item["type"] == "image_url"
and "url" in content_item["image_url"]
):
url = content_item["image_url"]["url"]
if url.startswith("data:"):
try:
# Parse media type and base64 data
parts = url.split(";base64,")
if len(parts) == 2:
media_type_part, base64_data = parts
media_type = media_type_part.split(":").pop()
else:
print(
f"DEBUG - Skipping attachment: Malformed data URL (no ';base64,'): {url[:100]}..."
)
continue

decoded_data = base64.b64decode(base64_data)
file_size = len(decoded_data)
ext = MIME_TO_EXT.get(media_type.split("/")[-1], "bin")
filename = (
f"attachment_{len(processed_attachments)}.{ext}"
)

processed_attachments.append(
{
"filename": filename,
"mimetype": media_type,
"size": file_size,
"decoded_data": decoded_data,
}
)
except base64.binascii.Error as e:
raise Exception(
f"Base64 decode error: {e}. URL: {url[:100]}..."
)
except Exception as e:
print(f"Error processing: {e}")
else:
raise Exception(
f"Unsupported image attachment type: `content_item['type']` === {content_item['type']}"
)
else:
# Handle plain text content (backward compatibility)
question = user_message["content"] if user_message else ""
processed_attachments = []

try:
# Invoke N8N workflow
headers = {
"Authorization": f"Bearer {self.valves.n8n_bearer_token}",
"Content-Type": "application/json",
}
payload = {"sessionId": f"{chat_id}"}
payload[self.valves.input_field] = question
response = requests.post(
self.valves.n8n_url, json=payload, headers=headers
)

if processed_attachments:
print("DEBUG - Preparing multipart request")
# 1. Prepare metadata for 'filesInput' field
filesInput_metadata = [
{
"filename": att["filename"],
"mimetype": att["mimetype"],
"size": att["size"],
}
for att in processed_attachments
]

# 2. Prepare form data fields
form_data = {
"sessionId": f"{chat_id}",
self.valves.input_field: question,
"filesInput": json.dumps(filesInput_metadata),
}
print(f"DEBUG - FORM DATA: {form_data}")

# 3. Prepare files for upload
files_for_upload = {}
for i, attachment_info in enumerate(processed_attachments):
file_obj = io.BytesIO(attachment_info["decoded_data"])
# Use a consistent key like 'file_0', 'file_1' etc. n8n might expect this.
# Or use the actual filename if n8n handles it. Let's use 'file_i' for now.
file_key = f"file_{i}"
files_for_upload[file_key] = (
attachment_info["filename"],
file_obj,
attachment_info["mimetype"],
)
print(
f"DEBUG - Added file to upload: Key='{file_key}', Filename='{attachment_info['filename']}', Mimetype='{attachment_info['mimetype']}'"
)

# 4. Send multipart request
# NOTE: requests sets Content-Type automatically for multipart
response = requests.post(
self.valves.n8n_url,
data=form_data,
files=files_for_upload,
headers=headers,
)
else:
# No attachments, send standard JSON request
payload = {
"sessionId": f"{chat_id}",
self.valves.input_field: question,
}
headers["Content-Type"] = "application/json"
response = requests.post(
self.valves.n8n_url, json=payload, headers=headers
)

if response.status_code == 200:
n8n_response = response.json()[self.valves.response_field]
response_data = response.json()

# Handle case where n8n returns a list
if isinstance(response_data, list) and len(response_data) > 0:
# Assume the first item contains the relevant data
n8n_item_data = response_data[0]
elif isinstance(response_data, dict):
# n8n returns a dictionary
n8n_item_data = response_data
else:
# Unexpected response format
raise Exception(
f"Unexpected n8n response format: {type(response_data)}"
)

# Extract the main response content
n8n_response = n8n_item_data.get(
self.valves.response_field, "Error: Response field not found"
)

assistant_message = {"role": "assistant"}

if "content" in n8n_item_data:
assistant_message["content"] = n8n_item_data["content"]
else:
assistant_message["content"] = n8n_response

body["messages"].append(assistant_message)

else:
raise Exception(f"Error: {response.status_code} - {response.text}")

# Set assitant message with chain reply
body["messages"].append({"role": "assistant", "content": n8n_response})
except Exception as e:
await self.emit_status(
__event_emitter__,
Expand Down