Skip to content

Feat/clean message records #10588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ class DataSetConfig(BaseSettings):
default=False,
)

PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING: PositiveInt = Field(
description="Interval in days for message cleanup operations - plan: sandbox",
default=30,
)


class WorkspaceConfig(BaseSettings):
"""
Expand Down
5 changes: 5 additions & 0 deletions api/extensions/ext_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __call__(self, *args: object, **kwargs: object) -> object:
"schedule.clean_unused_datasets_task",
"schedule.create_tidb_serverless_task",
"schedule.update_tidb_serverless_status_task",
"schedule.clean_messages",
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME
beat_schedule = {
Expand All @@ -87,6 +88,10 @@ def __call__(self, *args: object, **kwargs: object) -> object:
"task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
"schedule": crontab(minute="30", hour="*"),
},
"clean_messages": {
"task": "schedule.clean_messages.clean_messages",
"schedule": timedelta(days=day),
},
}
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""add_created_at_index_for_messages

Revision ID: 01d6889832f7
Revises: 09a8d1878d9b
Create Date: 2024-11-12 09:25:05.527827

"""
from alembic import op
import models as models
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '01d6889832f7'
down_revision = '09a8d1878d9b'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('messages', schema=None) as batch_op:
batch_op.create_index('message_created_at_idx', ['created_at'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('messages', schema=None) as batch_op:
batch_op.drop_index('message_created_at_idx')
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions api/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ class Message(db.Model):
db.Index("message_end_user_idx", "app_id", "from_source", "from_end_user_id"),
db.Index("message_account_idx", "app_id", "from_source", "from_account_id"),
db.Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"),
db.Index("message_created_at_idx", "created_at"),
)

id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
Expand Down
79 changes: 79 additions & 0 deletions api/schedule/clean_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import datetime
import time

import click
from werkzeug.exceptions import NotFound

import app
from configs import dify_config
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.model import (
App,
Message,
MessageAgentThought,
MessageAnnotation,
MessageChain,
MessageFeedback,
MessageFile,
)
from models.web import SavedMessage
from services.feature_service import FeatureService


@app.celery.task(queue="dataset")
def clean_messages():
click.echo(click.style("Start clean messages.", fg="green"))
start_at = time.perf_counter()
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
)
page = 1
while True:
try:
# Main query with join and filter
messages = (
db.session.query(Message)
.filter(Message.created_at < plan_sandbox_clean_message_day)
.order_by(Message.created_at.desc())
.paginate(page=page, per_page=100)
)

except NotFound:
break
if messages.items is None or len(messages.items) == 0:
break
for message in messages.items:
app = App.query.filter_by(id=message.app_id).first()
features_cache_key = f"features:{app.tenant_id}"
plan_cache = redis_client.get(features_cache_key)
if plan_cache is None:
features = FeatureService.get_features(app.tenant_id)
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
plan = features.billing.subscription.plan
else:
plan = plan_cache.decode()
if plan == "sandbox":
# clean related message
db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(Message).filter(Message.id == message.id).delete()
db.session.commit()
end_at = time.perf_counter()
click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))
14 changes: 6 additions & 8 deletions api/schedule/clean_unused_datasets_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def clean_unused_datasets_task():
start_at = time.perf_counter()
plan_sandbox_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_sandbox_clean_day_setting)
plan_pro_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_pro_clean_day_setting)
page = 1
while True:
try:
# Subquery for counting new documents
Expand Down Expand Up @@ -62,14 +61,13 @@ def clean_unused_datasets_task():
func.coalesce(document_subquery_old.c.document_count, 0) > 0,
)
.order_by(Dataset.created_at.desc())
.paginate(page=page, per_page=50)
.paginate(page=1, per_page=50)
)

except NotFound:
break
if datasets.items is None or len(datasets.items) == 0:
break
page += 1
for dataset in datasets:
dataset_query = (
db.session.query(DatasetQuery)
Expand All @@ -92,7 +90,6 @@ def clean_unused_datasets_task():
click.echo(
click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red")
)
page = 1
while True:
try:
# Subquery for counting new documents
Expand Down Expand Up @@ -132,14 +129,13 @@ def clean_unused_datasets_task():
func.coalesce(document_subquery_old.c.document_count, 0) > 0,
)
.order_by(Dataset.created_at.desc())
.paginate(page=page, per_page=50)
.paginate(page=1, per_page=50)
)

except NotFound:
break
if datasets.items is None or len(datasets.items) == 0:
break
page += 1
for dataset in datasets:
dataset_query = (
db.session.query(DatasetQuery)
Expand All @@ -149,11 +145,13 @@ def clean_unused_datasets_task():
if not dataset_query or len(dataset_query) == 0:
try:
features_cache_key = f"features:{dataset.tenant_id}"
plan = redis_client.get(features_cache_key)
if plan is None:
plan_cache = redis_client.get(features_cache_key)
if plan_cache is None:
features = FeatureService.get_features(dataset.tenant_id)
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
plan = features.billing.subscription.plan
else:
plan = plan_cache.decode()
if plan == "sandbox":
# remove index
index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
Expand Down