Skip to content

feat: full support for opendal and sync configurations between .env and docker-compose #11754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions api/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,8 @@ DB_DATABASE=dify
STORAGE_TYPE=opendal

# Apache OpenDAL storage configuration, refer to https://github.com/apache/opendal
STORAGE_OPENDAL_SCHEME=fs
# OpenDAL FS
OPENDAL_SCHEME=fs
OPENDAL_FS_ROOT=storage
# OpenDAL S3
OPENDAL_S3_ROOT=/
OPENDAL_S3_BUCKET=your-bucket-name
OPENDAL_S3_ENDPOINT=https://s3.amazonaws.com
OPENDAL_S3_ACCESS_KEY_ID=your-access-key
OPENDAL_S3_SECRET_ACCESS_KEY=your-secret-key
OPENDAL_S3_REGION=your-region
OPENDAL_S3_SERVER_SIDE_ENCRYPTION=

# S3 Storage configuration
S3_USE_AWS_MANAGED_IAM=false
Expand Down
10 changes: 0 additions & 10 deletions api/app_factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
import time

from configs import dify_config
Expand All @@ -17,15 +16,6 @@ def create_flask_app_with_configs() -> DifyApp:
dify_app = DifyApp(__name__)
dify_app.config.from_mapping(dify_config.model_dump())

# populate configs into system environment variables
for key, value in dify_app.config.items():
if isinstance(value, str):
os.environ[key] = value
elif isinstance(value, int | float | bool):
os.environ[key] = str(value)
elif value is None:
os.environ[key] = ""

return dify_app


Expand Down
46 changes: 2 additions & 44 deletions api/configs/middleware/storage/opendal_storage_config.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,9 @@
from enum import StrEnum
from typing import Literal

from pydantic import Field
from pydantic_settings import BaseSettings


class OpenDALScheme(StrEnum):
FS = "fs"
S3 = "s3"


class OpenDALStorageConfig(BaseSettings):
STORAGE_OPENDAL_SCHEME: str = Field(
default=OpenDALScheme.FS.value,
OPENDAL_SCHEME: str = Field(
default="fs",
description="OpenDAL scheme.",
)
# FS
OPENDAL_FS_ROOT: str = Field(
default="storage",
description="Root path for local storage.",
)
# S3
OPENDAL_S3_ROOT: str = Field(
default="/",
description="Root path for S3 storage.",
)
OPENDAL_S3_BUCKET: str = Field(
default="",
description="S3 bucket name.",
)
OPENDAL_S3_ENDPOINT: str = Field(
default="https://s3.amazonaws.com",
description="S3 endpoint URL.",
)
OPENDAL_S3_ACCESS_KEY_ID: str = Field(
default="",
description="S3 access key ID.",
)
OPENDAL_S3_SECRET_ACCESS_KEY: str = Field(
default="",
description="S3 secret access key.",
)
OPENDAL_S3_REGION: str = Field(
default="",
description="S3 region.",
)
OPENDAL_S3_SERVER_SIDE_ENCRYPTION: Literal["aws:kms", ""] = Field(
default="",
description="S3 server-side encryption.",
)
92 changes: 6 additions & 86 deletions api/extensions/ext_storage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import logging
from collections.abc import Callable, Generator, Mapping
from collections.abc import Callable, Generator
from typing import Union

from flask import Flask

from configs import dify_config
from configs.middleware.storage.opendal_storage_config import OpenDALScheme
from dify_app import DifyApp
from extensions.storage.base_storage import BaseStorage
from extensions.storage.storage_type import StorageType
Expand All @@ -23,21 +22,17 @@ def init_app(self, app: Flask):
def get_storage_factory(storage_type: str) -> Callable[[], BaseStorage]:
match storage_type:
case StorageType.S3:
from extensions.storage.opendal_storage import OpenDALStorage
from extensions.storage.aws_s3_storage import AwsS3Storage

kwargs = _load_s3_storage_kwargs()
return lambda: OpenDALStorage(scheme=OpenDALScheme.S3, **kwargs)
return AwsS3Storage
case StorageType.OPENDAL:
from extensions.storage.opendal_storage import OpenDALStorage

scheme = OpenDALScheme(dify_config.STORAGE_OPENDAL_SCHEME)
kwargs = _load_opendal_storage_kwargs(scheme)
return lambda: OpenDALStorage(scheme=scheme, **kwargs)
return lambda: OpenDALStorage(dify_config.OPENDAL_SCHEME)
case StorageType.LOCAL:
from extensions.storage.opendal_storage import OpenDALStorage

kwargs = _load_local_storage_kwargs()
return lambda: OpenDALStorage(scheme=OpenDALScheme.FS, **kwargs)
return lambda: OpenDALStorage(scheme="fs", root=dify_config.STORAGE_LOCAL_PATH)
case StorageType.AZURE_BLOB:
from extensions.storage.azure_blob_storage import AzureBlobStorage

Expand Down Expand Up @@ -75,7 +70,7 @@ def get_storage_factory(storage_type: str) -> Callable[[], BaseStorage]:

return SupabaseStorage
case _:
raise ValueError(f"Unsupported storage type {storage_type}")
raise ValueError(f"unsupported storage type {storage_type}")

def save(self, filename, data):
try:
Expand Down Expand Up @@ -130,81 +125,6 @@ def delete(self, filename):
raise e


def _load_s3_storage_kwargs() -> Mapping[str, str]:
"""
Load the kwargs for S3 storage based on dify_config.
Handles special cases like AWS managed IAM and R2.
"""
kwargs = {
"root": "/",
"bucket": dify_config.S3_BUCKET_NAME,
"endpoint": dify_config.S3_ENDPOINT,
"access_key_id": dify_config.S3_ACCESS_KEY,
"secret_access_key": dify_config.S3_SECRET_KEY,
"region": dify_config.S3_REGION,
}
kwargs = {k: v for k, v in kwargs.items() if isinstance(v, str)}

# For AWS managed IAM
if dify_config.S3_USE_AWS_MANAGED_IAM:
from extensions.storage.opendal_storage import S3_SSE_WITH_AWS_MANAGED_IAM_KWARGS

logger.debug("Using AWS managed IAM role for S3")
kwargs = {**kwargs, **{k: v for k, v in S3_SSE_WITH_AWS_MANAGED_IAM_KWARGS.items() if k not in kwargs}}

# For Cloudflare R2
if kwargs.get("endpoint"):
from extensions.storage.opendal_storage import S3_R2_COMPATIBLE_KWARGS, is_r2_endpoint

if is_r2_endpoint(kwargs["endpoint"]):
logger.debug("Using R2 for OpenDAL S3")
kwargs = {**kwargs, **{k: v for k, v in S3_R2_COMPATIBLE_KWARGS.items() if k not in kwargs}}

return kwargs


def _load_local_storage_kwargs() -> Mapping[str, str]:
"""
Load the kwargs for local storage based on dify_config.
"""
return {
"root": dify_config.STORAGE_LOCAL_PATH,
}


def _load_opendal_storage_kwargs(scheme: OpenDALScheme) -> Mapping[str, str]:
"""
Load the kwargs for OpenDAL storage based on the given scheme.
"""
match scheme:
case OpenDALScheme.FS:
kwargs = {
"root": dify_config.OPENDAL_FS_ROOT,
}
case OpenDALScheme.S3:
# Load OpenDAL S3-related configs
kwargs = {
"root": dify_config.OPENDAL_S3_ROOT,
"bucket": dify_config.OPENDAL_S3_BUCKET,
"endpoint": dify_config.OPENDAL_S3_ENDPOINT,
"access_key_id": dify_config.OPENDAL_S3_ACCESS_KEY_ID,
"secret_access_key": dify_config.OPENDAL_S3_SECRET_ACCESS_KEY,
"region": dify_config.OPENDAL_S3_REGION,
}

# For Cloudflare R2
if kwargs.get("endpoint"):
from extensions.storage.opendal_storage import S3_R2_COMPATIBLE_KWARGS, is_r2_endpoint

if is_r2_endpoint(kwargs["endpoint"]):
logger.debug("Using R2 for OpenDAL S3")
kwargs = {**kwargs, **{k: v for k, v in S3_R2_COMPATIBLE_KWARGS.items() if k not in kwargs}}
case _:
logger.warning(f"Unrecognized OpenDAL scheme: {scheme}, will fall back to default.")
kwargs = {}
return kwargs


storage = Storage()


Expand Down
62 changes: 40 additions & 22 deletions api/extensions/storage/opendal_storage.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,57 @@
import logging
import os
from collections.abc import Generator
from pathlib import Path
from urllib.parse import urlparse

import opendal
from dotenv import dotenv_values

from configs.middleware.storage.opendal_storage_config import OpenDALScheme
from extensions.storage.base_storage import BaseStorage

S3_R2_HOSTNAME = "r2.cloudflarestorage.com"
S3_R2_COMPATIBLE_KWARGS = {
"delete_max_size": "700",
"disable_stat_with_override": "true",
"region": "auto",
}
S3_SSE_WITH_AWS_MANAGED_IAM_KWARGS = {
"server_side_encryption": "aws:kms",
}
logger = logging.getLogger(__name__)


def is_r2_endpoint(endpoint: str) -> bool:
if not endpoint:
return False
def _get_opendal_kwargs(*, scheme: str, env_file_path: str = ".env", prefix: str = "OPENDAL_"):
kwargs = {}
config_prefix = prefix + scheme.upper() + "_"
for key, value in os.environ.items():
if key.startswith(config_prefix):
kwargs[key[len(config_prefix) :].lower()] = value

parsed_url = urlparse(endpoint)
return bool(parsed_url.hostname and parsed_url.hostname.endswith(S3_R2_HOSTNAME))
file_env_vars = dotenv_values(env_file_path)
for key, value in file_env_vars.items():
if key.startswith(config_prefix) and key[len(config_prefix) :].lower() not in kwargs and value:
kwargs[key[len(config_prefix) :].lower()] = value

return kwargs


class OpenDALStorage(BaseStorage):
def __init__(self, scheme: OpenDALScheme, **kwargs):
if scheme == OpenDALScheme.FS:
Path(kwargs["root"]).mkdir(parents=True, exist_ok=True)
def __init__(self, scheme: str, **kwargs):
kwargs = kwargs or _get_opendal_kwargs(scheme=scheme)

if scheme == "fs":
root = kwargs.get("root", "storage")
Path(root).mkdir(parents=True, exist_ok=True)

# self.op = opendal.Operator(scheme=scheme, **kwargs)
self.op = opendal.Operator(scheme=scheme, **kwargs)
logger.debug(f"opendal operator created with scheme {scheme}")
retry_layer = opendal.layers.RetryLayer(max_times=3, factor=2.0, jitter=True)
self.op = self.op.layer(retry_layer)
logger.debug("added retry layer to opendal operator")

def save(self, filename: str, data: bytes) -> None:
self.op.write(path=filename, bs=data)
logger.debug(f"file {filename} saved")

def load_once(self, filename: str) -> bytes:
if not self.exists(filename):
raise FileNotFoundError("File not found")

return self.op.read(path=filename)
content = self.op.read(path=filename)
logger.debug(f"file {filename} loaded")
return content

def load_stream(self, filename: str) -> Generator:
if not self.exists(filename):
Expand All @@ -50,23 +61,30 @@ def load_stream(self, filename: str) -> Generator:
file = self.op.open(path=filename, mode="rb")
while chunk := file.read(batch_size):
yield chunk
logger.debug(f"file {filename} loaded as stream")

def download(self, filename: str, target_filepath: str):
if not self.exists(filename):
raise FileNotFoundError("File not found")

with Path(target_filepath).open("wb") as f:
f.write(self.op.read(path=filename))
logger.debug(f"file {filename} downloaded to {target_filepath}")

def exists(self, filename: str) -> bool:
# FIXME this is a workaround for opendal python-binding do not have a exists method and no better
# error handler here when opendal python-binding has a exists method, we should use it
# more https://github.com/apache/opendal/blob/main/bindings/python/src/operator.rs
try:
return self.op.stat(path=filename).mode.is_file()
except Exception as e:
res = self.op.stat(path=filename).mode.is_file()
logger.debug(f"file {filename} checked")
return res
except Exception:
return False

def delete(self, filename: str):
if self.exists(filename):
self.op.delete(path=filename)
logger.debug(f"file {filename} deleted")
return
logger.debug(f"file {filename} not found, skip delete")
20 changes: 0 additions & 20 deletions api/tests/unit_tests/configs/test_opendal_config_parse.py

This file was deleted.

5 changes: 1 addition & 4 deletions api/tests/unit_tests/oss/opendal/test_opendal.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import os
from collections.abc import Generator
from pathlib import Path

import pytest

from configs.middleware.storage.opendal_storage_config import OpenDALScheme
from extensions.storage.opendal_storage import OpenDALStorage
from tests.unit_tests.oss.__mock.base import (
get_example_data,
get_example_filename,
get_example_filepath,
get_opendal_bucket,
)

Expand All @@ -19,7 +16,7 @@ class TestOpenDAL:
def setup_method(self, *args, **kwargs):
"""Executed before each test method."""
self.storage = OpenDALStorage(
scheme=OpenDALScheme.FS,
scheme="fs",
root=get_opendal_bucket(),
)

Expand Down
Loading
Loading