Skip to content

Profiler CLI #1623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: feature/synapse_data_collector
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ commands:
- name: debug-me
description: "[INTERNAL] Debug SDK connectivity"
- name: install-assessment
description: "Install Assessment"
description: "[EXPERIMENTAL] Install Assessment"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does Install assessment do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be configure-assessment would be better word, it configures crednetials

Copy link
Collaborator

@gueniai gueniai Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These credentials are the same we would use for Reconcile, no? Can we call it configure-source-credentials or something purposeful like that?

- name: install-transpile
description: "Install Transpile"
- name: install-reconcile
description: "Install Reconcile"
- name: profile
description: "[EXPERIMENTAL] Profile the database"

4 changes: 2 additions & 2 deletions src/databricks/labs/remorph/assessments/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, config: PipelineConfig, executor: DatabaseManager | None):
self.config = config
self.executor = executor
self.db_path_prefix = Path(config.extract_folder)
self._create_dir(self.db_path_prefix)

def execute(self) -> list[StepExecutionResult]:
logging.info(f"Pipeline initialized with config: {self.config.name}, version: {self.config.version}")
Expand Down Expand Up @@ -164,7 +165,6 @@ def _run_python_script(venv_python, script_path, db_path, credential_config):
raise RuntimeError(f"Script execution failed with exit code {process.returncode}")

def _save_to_db(self, result, step_name: str, mode: str, batch_size: int = 1000):
self._create_dir(self.db_path_prefix)
db_path = str(self.db_path_prefix / DB_NAME)

with duckdb.connect(db_path) as conn:
Expand Down Expand Up @@ -195,7 +195,7 @@ def _create_dir(dir_path: Path):
dir_path.mkdir(parents=True, exist_ok=True)

@staticmethod
def load_config_from_yaml(file_path: str) -> PipelineConfig:
def load_config_from_yaml(file_path: str | Path) -> PipelineConfig:
with open(file_path, 'r', encoding='utf-8') as file:
data = yaml.safe_load(file)
steps = [Step(**step) for step in data['steps']]
Expand Down
77 changes: 77 additions & 0 deletions src/databricks/labs/remorph/assessments/profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
from pathlib import Path

from databricks.labs.remorph.assessments.pipeline import PipelineClass
from databricks.labs.remorph.assessments.profiler_config import PipelineConfig
from databricks.labs.remorph.connections.database_manager import DatabaseManager
from databricks.labs.remorph.connections.credential_manager import (
create_credential_manager,
)
from databricks.labs.remorph.connections.env_getter import EnvGetter

_PLATFORM_TO_SOURCE_TECHNOLOGY = {
"Synapse": "src/databricks/labs/remorph/resources/assessments/synapse/pipeline_config.yml",
}

_CONNECTOR_REQUIRED = {
"Synapse": False,
}

PRODUCT_NAME = "remorph"
PRODUCT_PATH_PREFIX = Path(__file__).home() / ".databricks" / "labs" / PRODUCT_NAME / "lib"

logger = logging.getLogger(__name__)


class Profiler:

@classmethod
def supported_source_technologies(cls) -> list[str]:
return list(_PLATFORM_TO_SOURCE_TECHNOLOGY.keys())

@staticmethod
def path_modifier(config_file: str | Path) -> PipelineConfig:
# TODO: Make this work install during developer mode
config = PipelineClass.load_config_from_yaml(config_file)
for step in config.steps:
step.extract_source = f"{PRODUCT_PATH_PREFIX}/{step.extract_source}"
return config

def profile(self, platform: str, extractor: DatabaseManager | None = None):
config_path = _PLATFORM_TO_SOURCE_TECHNOLOGY.get(platform, None)
if not config_path:
raise ValueError(f"Unsupported platform: {platform}")
self._execute(platform, config_path, extractor)

def _setup_extractor(self, platform: str) -> DatabaseManager | None:
if not _CONNECTOR_REQUIRED[platform]:
return None
cred_manager = create_credential_manager(PRODUCT_NAME, EnvGetter())
connect_config = cred_manager.get_credentials(platform)
return DatabaseManager(platform, connect_config)

def _execute(self, platform: str, config_path: str, extractor=None):
try:
config_full_path = self._locate_config(config_path)
config = Profiler.path_modifier(config_full_path)

if extractor is None:
extractor = self._setup_extractor(platform)

results = PipelineClass(config, extractor).execute()

for result in results:
logger.info(f"Step: {result.step_name}, Status: {result.status}, Error: {result.error_message}")

except FileNotFoundError as e:
logging.error(f"Configuration file not found for source {platform}: {e}")
raise FileNotFoundError(f"Configuration file not found for source {platform}: {e}") from e
except Exception as e:
logging.error(f"Error executing pipeline for source {platform}: {e}")
raise RuntimeError(f"Pipeline execution failed for source {platform} : {e}") from e

def _locate_config(self, config_path: str) -> Path:
config_file = PRODUCT_PATH_PREFIX / config_path
if not config_file.exists():
raise FileNotFoundError(f"Configuration file not found: {config_file}")
return config_file
17 changes: 17 additions & 0 deletions src/databricks/labs/remorph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
PROFILER_SOURCE_SYSTEM,
)

from databricks.labs.remorph.assessments.profiler import Profiler

from databricks.labs.remorph.__about__ import __version__

from databricks.labs.remorph.config import TranspileConfig
Expand Down Expand Up @@ -335,5 +337,20 @@ def analyze(w: WorkspaceClient, source_directory: str, report_file: str):
Analyzer.analyze(Path(input_folder), Path(output_file), source_tech)


@remorph.command()
def profile(w: WorkspaceClient):
"""Run the Profiler"""
with_user_agent_extra("cmd", "profiler")
ctx = ApplicationContext(w)
prompts = ctx.prompts
source_tech = prompts.choice("Select the source technology", Profiler.supported_source_technologies())
with_user_agent_extra("profiler_source_tech", make_alphanum_or_semver(source_tech))
user = ctx.current_user
logger.debug(f"User: {user}")
profiler = Profiler()
# TODO: Add extractor logic to ApplicationContext instead of creating inside the Profiler class
profiler.profile(source_tech, None)


if __name__ == "__main__":
remorph()
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def _create_connector(db_type: str, config: dict[str, Any]) -> DatabaseConnector
"snowflake": SnowflakeConnector,
"mssql": MSSQLConnector,
"tsql": MSSQLConnector,
"synapse": MSSQLConnector,
}

connector_class = connectors.get(db_type.lower())
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/assessments/test_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from pathlib import Path
from unittest.mock import patch

import pytest

from databricks.labs.remorph.assessments.profiler import Profiler


def test_supported_source_technologies():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to add -> None to the test declarations, otherwise the bodies aren't type-checked.

"""Test that supported source technologies are correctly returned"""
profiler = Profiler()
supported_platforms = profiler.supported_source_technologies()
assert isinstance(supported_platforms, list)
assert "Synapse" in supported_platforms


def test_profile_unsupported_platform():
"""Test that profiling an unsupported platform raises ValueError"""
profiler = Profiler()
with pytest.raises(ValueError, match="Unsupported platform: InvalidPlatform"):
profiler.profile("InvalidPlatform")


@patch(
'databricks.labs.remorph.assessments.profiler._PLATFORM_TO_SOURCE_TECHNOLOGY',
{"Synapse": "tests/resources/assessments/pipeline_config_main.yml"},
)
@patch('databricks.labs.remorph.assessments.profiler.PRODUCT_PATH_PREFIX', Path(__file__).parent / "../../../")
def test_profile_execution():
"""Test successful profiling execution using actual pipeline configuration"""
profiler = Profiler()
profiler.profile("Synapse")
assert Path("/tmp/profiler_main/profiler_extract.db").exists(), "Profiler extract database should be created"


@patch(
'databricks.labs.remorph.assessments.profiler._PLATFORM_TO_SOURCE_TECHNOLOGY',
{"Synapse": "tests/resources/assessments/synapse/pipeline_config_main.yml"},
)
def test_profile_execution_with_invalid_config():
"""Test profiling execution with invalid configuration"""
with patch('pathlib.Path.exists', return_value=False):
profiler = Profiler()
with pytest.raises(FileNotFoundError):
profiler.profile("Synapse")
13 changes: 13 additions & 0 deletions tests/resources/assessments/pipeline_config_main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: ExamplePipeline
version: "1.0"
extract_folder: /tmp/profiler_main/
steps:
- name: random_data
type: python
extract_source: tests/resources/assessments/db_extract.py
mode: overwrite
frequency: daily
flag: active
dependencies:
- pandas
- duckdb