Skip to content

Profiler CLI #1623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: feature/synapse_data_collector
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ commands:
default: null
- name: configure-reconcile
description: "Configure Necessary Reconcile Dependencies"
- name: execute-databse-profiler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in "database"

description: "[EXPERIMENTAL] Profile the source system database"


15 changes: 9 additions & 6 deletions src/databricks/labs/lakebridge/assessments/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from subprocess import run, CalledProcessError, Popen, PIPE, STDOUT
from subprocess import run, CalledProcessError, Popen, PIPE, STDOUT, DEVNULL
from dataclasses import dataclass
from enum import Enum

Expand Down Expand Up @@ -39,11 +39,13 @@ def __init__(self, config: PipelineConfig, executor: DatabaseManager | None):
self.config = config
self.executor = executor
self.db_path_prefix = Path(config.extract_folder)
self._create_dir(self.db_path_prefix)

def execute(self) -> list[StepExecutionResult]:
logging.info(f"Pipeline initialized with config: {self.config.name}, version: {self.config.version}")
execution_results: list[StepExecutionResult] = []
for step in self.config.steps:
logging.info(f"Executing step: {step.name}")
result = self._process_step(step)
execution_results.append(result)
logging.info(f"Step '{step.name}' completed with status: {result.status}")
Expand Down Expand Up @@ -107,7 +109,9 @@ def _execute_python_step(self, step: Step):
venv_python = venv_dir / "bin" / "python"
venv_pip = venv_dir / "bin" / "pip"

logger.info(f"Creating a virtual environment for Python script execution: ${venv_dir}")
logger.info(
f"Creating a virtual environment for Python script execution: ${venv_dir} for step: {step.name}"
)
if step.dependencies:
self._install_dependencies(venv_pip, step.dependencies)

Expand All @@ -118,8 +122,8 @@ def _install_dependencies(venv_pip, dependencies):
logging.info(f"Installing dependencies: {', '.join(dependencies)}")
try:
logging.debug("Upgrading local pip")
run([str(venv_pip), "install", "--upgrade", "pip"], check=True)
run([str(venv_pip), "install", *dependencies], check=True)
run([str(venv_pip), "install", "--upgrade", "pip"], check=True, stdout=DEVNULL, stderr=DEVNULL)
run([str(venv_pip), "install", *dependencies], check=True, stdout=DEVNULL, stderr=DEVNULL)
except CalledProcessError as e:
logging.error(f"Failed to install dependencies: {e.stderr}")
raise RuntimeError(f"Failed to install dependencies: {e.stderr}") from e
Expand Down Expand Up @@ -164,7 +168,6 @@ def _run_python_script(venv_python, script_path, db_path, credential_config):
raise RuntimeError(f"Script execution failed with exit code {process.returncode}")

def _save_to_db(self, result, step_name: str, mode: str, batch_size: int = 1000):
self._create_dir(self.db_path_prefix)
db_path = str(self.db_path_prefix / DB_NAME)

with duckdb.connect(db_path) as conn:
Expand Down Expand Up @@ -195,7 +198,7 @@ def _create_dir(dir_path: Path):
dir_path.mkdir(parents=True, exist_ok=True)

@staticmethod
def load_config_from_yaml(file_path: str) -> PipelineConfig:
def load_config_from_yaml(file_path: str | Path) -> PipelineConfig:
with open(file_path, 'r', encoding='utf-8') as file:
data = yaml.safe_load(file)
steps = [Step(**step) for step in data['steps']]
Expand Down
77 changes: 77 additions & 0 deletions src/databricks/labs/lakebridge/assessments/profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
from pathlib import Path

from databricks.labs.lakebridge.assessments.pipeline import PipelineClass
from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager
from databricks.labs.lakebridge.connections.credential_manager import (
create_credential_manager,
)
from databricks.labs.lakebridge.connections.env_getter import EnvGetter

_PLATFORM_TO_SOURCE_TECHNOLOGY = {
"Synapse": "src/databricks/labs/lakebridge/resources/assessments/synapse/pipeline_config.yml",
}

_CONNECTOR_REQUIRED = {
"Synapse": False,
}

PRODUCT_NAME = "lakebridge"
PRODUCT_PATH_PREFIX = Path(__file__).home() / ".databricks" / "labs" / PRODUCT_NAME / "lib"

logger = logging.getLogger(__name__)


class Profiler:

@classmethod
def supported_source_technologies(cls) -> list[str]:
return list(_PLATFORM_TO_SOURCE_TECHNOLOGY.keys())

@staticmethod
def path_modifier(config_file: str | Path) -> PipelineConfig:
# TODO: Make this work install during developer mode
config = PipelineClass.load_config_from_yaml(config_file)
for step in config.steps:
step.extract_source = f"{PRODUCT_PATH_PREFIX}/{step.extract_source}"
return config

def profile(self, platform: str, extractor: DatabaseManager | None = None):
config_path = _PLATFORM_TO_SOURCE_TECHNOLOGY.get(platform, None)
if not config_path:
raise ValueError(f"Unsupported platform: {platform}")
self._execute(platform, config_path, extractor)

def _setup_extractor(self, platform: str) -> DatabaseManager | None:
if not _CONNECTOR_REQUIRED[platform]:
return None
cred_manager = create_credential_manager(PRODUCT_NAME, EnvGetter())
connect_config = cred_manager.get_credentials(platform)
return DatabaseManager(platform, connect_config)

def _execute(self, platform: str, config_path: str, extractor=None):
try:
config_full_path = self._locate_config(config_path)
config = Profiler.path_modifier(config_full_path)

if extractor is None:
extractor = self._setup_extractor(platform)

results = PipelineClass(config, extractor).execute()

for result in results:
logger.info(f"Step: {result.step_name}, Status: {result.status}, Error: {result.error_message}")

except FileNotFoundError as e:
logging.error(f"Configuration file not found for source {platform}: {e}")
raise FileNotFoundError(f"Configuration file not found for source {platform}: {e}") from e
except Exception as e:
logging.error(f"Error executing pipeline for source {platform}: {e}")
raise RuntimeError(f"Pipeline execution failed for source {platform} : {e}") from e

def _locate_config(self, config_path: str) -> Path:
config_file = PRODUCT_PATH_PREFIX / config_path
if not config_file.exists():
raise FileNotFoundError(f"Configuration file not found: {config_file}")
return config_file
16 changes: 16 additions & 0 deletions src/databricks/labs/lakebridge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
create_assessment_configurator,
PROFILER_SOURCE_SYSTEM,
)
from databricks.labs.lakebridge.assessments.profiler import Profiler

from databricks.labs.lakebridge.__about__ import __version__
from databricks.labs.lakebridge.config import TranspileConfig, LSPConfigOptionV1
Expand Down Expand Up @@ -445,5 +446,20 @@ def analyze(w: WorkspaceClient, source_directory: str, report_file: str):
Analyzer.analyze(Path(input_folder), Path(output_file), source_tech)


@lakebridge.command()
def database_profiler(w: WorkspaceClient):
"""Run the Profiler"""
with_user_agent_extra("cmd", "profiler")
ctx = ApplicationContext(w)
prompts = ctx.prompts
source_tech = prompts.choice("Select the source technology", Profiler.supported_source_technologies())
with_user_agent_extra("profiler_source_tech", make_alphanum_or_semver(source_tech))
user = ctx.current_user
logger.debug(f"User: {user}")
profiler = Profiler()
# TODO: Add extractor logic to ApplicationContext instead of creating inside the Profiler class
profiler.profile(source_tech, None)


if __name__ == "__main__":
lakebridge()
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def _create_connector(db_type: str, config: dict[str, Any]) -> DatabaseConnector
"snowflake": SnowflakeConnector,
"mssql": MSSQLConnector,
"tsql": MSSQLConnector,
"synapse": MSSQLConnector,
}

connector_class = connectors.get(db_type.lower())
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/assessments/test_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from pathlib import Path
from unittest.mock import patch

import pytest

from databricks.labs.remorph.assessments.profiler import Profiler


def test_supported_source_technologies():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to add -> None to the test declarations, otherwise the bodies aren't type-checked.

"""Test that supported source technologies are correctly returned"""
profiler = Profiler()
supported_platforms = profiler.supported_source_technologies()
assert isinstance(supported_platforms, list)
assert "Synapse" in supported_platforms


def test_profile_unsupported_platform():
"""Test that profiling an unsupported platform raises ValueError"""
profiler = Profiler()
with pytest.raises(ValueError, match="Unsupported platform: InvalidPlatform"):
profiler.profile("InvalidPlatform")


@patch(
'databricks.labs.remorph.assessments.profiler._PLATFORM_TO_SOURCE_TECHNOLOGY',
{"Synapse": "tests/resources/assessments/pipeline_config_main.yml"},
)
@patch('databricks.labs.remorph.assessments.profiler.PRODUCT_PATH_PREFIX', Path(__file__).parent / "../../../")
def test_profile_execution():
"""Test successful profiling execution using actual pipeline configuration"""
profiler = Profiler()
profiler.profile("Synapse")
assert Path("/tmp/profiler_main/profiler_extract.db").exists(), "Profiler extract database should be created"


@patch(
'databricks.labs.remorph.assessments.profiler._PLATFORM_TO_SOURCE_TECHNOLOGY',
{"Synapse": "tests/resources/assessments/synapse/pipeline_config_main.yml"},
)
def test_profile_execution_with_invalid_config():
"""Test profiling execution with invalid configuration"""
with patch('pathlib.Path.exists', return_value=False):
profiler = Profiler()
with pytest.raises(FileNotFoundError):
profiler.profile("Synapse")
13 changes: 13 additions & 0 deletions tests/resources/assessments/pipeline_config_main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: ExamplePipeline
version: "1.0"
extract_folder: /tmp/profiler_main/
steps:
- name: random_data
type: python
extract_source: tests/resources/assessments/db_extract.py
mode: overwrite
frequency: daily
flag: active
dependencies:
- pandas
- duckdb
Loading