Skip to content

Added Amazon SageMaker Notebook hook and operators #33219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
64c7741
Added Amazon SageMaker Notebook hook and operators
ellisms Aug 8, 2023
f37d6c9
Update tests/system/providers/amazon/aws/example_sagemaker_notebook.py
ellisms Aug 9, 2023
7fca1e1
Update airflow/providers/amazon/aws/operators/sagemaker_notebook.py
ellisms Aug 9, 2023
d4353fd
Update tests/system/providers/amazon/aws/example_sagemaker_notebook.py
ellisms Aug 9, 2023
9133a65
Update tests/system/providers/amazon/aws/example_sagemaker_notebook.py
ellisms Aug 9, 2023
5c702c9
Hook and operator cleanup based on PR feedback
ellisms Aug 14, 2023
3bbeda5
Refactor sagemaker_notebook into sagemaker files
ellisms Aug 15, 2023
4a89821
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
a0cfbad
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
7531ecc
Update tests/providers/amazon/aws/operators/test_sagemaker_notebook.py
ellisms Aug 15, 2023
46288ef
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
93a2d50
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
1a6192c
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
48bb1ee
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
0776208
Update tests/system/providers/amazon/aws/example_sagemaker_notebook.py
ellisms Aug 15, 2023
e40eab4
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
087c9e2
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
a4a0234
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
36494eb
Implemented feedback from PR
ellisms Aug 15, 2023
fbf70dc
pre commit changes
ellisms Aug 15, 2023
86b7728
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
db11d31
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
518721e
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
53256f6
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
9e76a01
Update airflow/providers/amazon/aws/operators/sagemaker.py
ellisms Aug 15, 2023
7a5f987
Removed SageMaker notebook hook test file
ellisms Aug 15, 2023
b3574e1
Docstring update
ellisms Aug 16, 2023
bcbecb4
Removed sagemaker_notebook from provider.yaml
ellisms Aug 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 242 additions & 0 deletions airflow/providers/amazon/aws/operators/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from airflow.providers.amazon.aws.utils import trim_none_values
from airflow.providers.amazon.aws.utils.sagemaker import ApprovalStatus
from airflow.providers.amazon.aws.utils.tags import format_tags
from airflow.utils.helpers import prune_dict
from airflow.utils.json import AirflowJsonEncoder

if TYPE_CHECKING:
Expand Down Expand Up @@ -1523,3 +1524,244 @@ def execute(self, context: Context) -> str:
arn = ans["ExperimentArn"]
self.log.info("Experiment %s created successfully with ARN %s.", self.name, arn)
return arn


class SageMakerCreateNotebookOperator(BaseOperator):
"""
Create a SageMaker notebook.

More information regarding parameters of this operator can be found here
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/create_notebook_instance.html.

.. seealso:
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SageMakerCreateNotebookOperator`

:param instance_name: The name of the notebook instance.
:param instance_type: The type of instance to create.
:param role_arn: The Amazon Resource Name (ARN) of the IAM role that SageMaker can assume to access
:param volume_size_in_gb: Size in GB of the EBS root device volume of the notebook instance.
:param volume_kms_key_id: The KMS key ID for the EBS root device volume.
:param lifecycle_config_name: The name of the lifecycle configuration to associate with the notebook
:param direct_internet_access: Whether to enable direct internet access for the notebook instance.
:param root_access: Whether to give the notebook instance root access to the Amazon S3 bucket.
:param wait_for_completion: Whether or not to wait for the notebook to be InService before returning
:param create_instance_kwargs: Additional configuration options for the create call.
:param aws_conn_id: The AWS connection ID to use.

:return: The ARN of the created notebook.
"""

template_fields: Sequence[str] = (
"instance_name",
"instance_type",
"role_arn",
"volume_size_in_gb",
"volume_kms_key_id",
"lifecycle_config_name",
"direct_internet_access",
"root_access",
"wait_for_completion",
"create_instance_kwargs",
)

ui_color = "#ff7300"

def __init__(
self,
*,
instance_name: str,
instance_type: str,
role_arn: str,
volume_size_in_gb: int | None = None,
volume_kms_key_id: str | None = None,
lifecycle_config_name: str | None = None,
direct_internet_access: str | None = None,
root_access: str | None = None,
create_instance_kwargs: dict[str, Any] = {},
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
**kwargs,
):
super().__init__(**kwargs)
self.instance_name = instance_name
self.instance_type = instance_type
self.role_arn = role_arn
self.volume_size_in_gb = volume_size_in_gb
self.volume_kms_key_id = volume_kms_key_id
self.lifecycle_config_name = lifecycle_config_name
self.direct_internet_access = direct_internet_access
self.root_access = root_access
self.wait_for_completion = wait_for_completion
self.aws_conn_id = aws_conn_id
self.create_instance_kwargs = create_instance_kwargs

if self.create_instance_kwargs.get("tags") is not None:
self.create_instance_kwargs["tags"] = format_tags(self.create_instance_kwargs["tags"])

@cached_property
def hook(self) -> SageMakerHook:
"""Create and return SageMakerHook."""
return SageMakerHook(aws_conn_id=self.aws_conn_id)

def execute(self, context: Context):

create_notebook_instance_kwargs = {
"NotebookInstanceName": self.instance_name,
"InstanceType": self.instance_type,
"RoleArn": self.role_arn,
"VolumeSizeInGB": self.volume_size_in_gb,
"KmsKeyId": self.volume_kms_key_id,
"LifecycleConfigName": self.lifecycle_config_name,
"DirectInternetAccess": self.direct_internet_access,
"RootAccess": self.root_access,
}
if len(self.create_instance_kwargs) > 0:
create_notebook_instance_kwargs.update(self.create_instance_kwargs)

self.log.info("Creating SageMaker notebook %s.", self.instance_name)
response = self.hook.conn.create_notebook_instance(**prune_dict(create_notebook_instance_kwargs))

self.log.info("SageMaker notebook created: %s", response["NotebookInstanceArn"])

if self.wait_for_completion:
self.log.info("Waiting for SageMaker notebook %s to be in service", self.instance_name)
waiter = self.hook.conn.get_waiter("notebook_instance_in_service")
waiter.wait(NotebookInstanceName=self.instance_name)

return response["NotebookInstanceArn"]


class SageMakerStopNotebookOperator(BaseOperator):
"""
Stop a notebook instance.

.. seealso:
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SageMakerStopNotebookOperator`

:param instance_name: The name of the notebook instance to stop.
:param wait_for_completion: Whether or not to wait for the notebook to be stopped before returning
:param aws_conn_id: The AWS connection ID to use.
"""

template_fields: Sequence[str] = ("instance_name", "wait_for_completion")

ui_color = "#ff7300"

def __init__(
self,
instance_name: str,
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
**kwargs,
):
super().__init__(**kwargs)
self.instance_name = instance_name
self.wait_for_completion = wait_for_completion
self.aws_conn_id = aws_conn_id

@cached_property
def hook(self) -> SageMakerHook:
"""Create and return SageMakerHook."""
return SageMakerHook(aws_conn_id=self.aws_conn_id)

def execute(self, context):
self.log.info("Stopping SageMaker notebook %s.", self.instance_name)
self.hook.conn.stop_notebook_instance(NotebookInstanceName=self.instance_name)

if self.wait_for_completion:
self.log.info("Waiting for SageMaker notebook %s to stop", self.instance_name)
self.hook.conn.get_waiter("notebook_instance_stopped").wait(
NotebookInstanceName=self.instance_name
)


class SageMakerDeleteNotebookOperator(BaseOperator):
"""
Delete a notebook instance.

.. seealso:
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SageMakerDeleteNotebookOperator`

:param instance_name: The name of the notebook instance to delete.
:param wait_for_completion: Whether or not to wait for the notebook to delete before returning.
:param aws_conn_id: The AWS connection ID to use.
"""

template_fields: Sequence[str] = ("instance_name", "wait_for_completion")

ui_color = "#ff7300"

def __init__(
self,
instance_name: str,
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
**kwargs,
):
super().__init__(**kwargs)
self.instance_name = instance_name
self.aws_conn_id = aws_conn_id
self.wait_for_completion = wait_for_completion

@cached_property
def hook(self) -> SageMakerHook:
"""Create and return SageMakerHook."""
return SageMakerHook(aws_conn_id=self.aws_conn_id)

def execute(self, context):
self.log.info("Deleting SageMaker notebook %s....", self.instance_name)
self.hook.conn.delete_notebook_instance(NotebookInstanceName=self.instance_name)

if self.wait_for_completion:
self.log.info("Waiting for SageMaker notebook %s to delete...", self.instance_name)
self.hook.conn.get_waiter("notebook_instance_deleted").wait(
NotebookInstanceName=self.instance_name
)


class SageMakerStartNoteBookOperator(BaseOperator):
"""
Start a notebook instance.

.. seealso:
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SageMakerStartNotebookOperator`

:param instance_name: The name of the notebook instance to start.
:param wait_for_completion: Whether or not to wait for notebook to be InService before returning
:param aws_conn_id: The AWS connection ID to use.
"""

template_fields: Sequence[str] = ("instance_name", "wait_for_completion")

ui_color = "#ff7300"

def __init__(
self,
instance_name: str,
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
**kwargs,
):
super().__init__(**kwargs)
self.instance_name = instance_name
self.aws_conn_id = aws_conn_id
self.wait_for_completion = wait_for_completion

@cached_property
def hook(self) -> SageMakerHook:
"""Create and return SageMakerHook."""
return SageMakerHook(aws_conn_id=self.aws_conn_id)

def execute(self, context):
self.log.info("Starting SageMaker notebook %s....", self.instance_name)
self.hook.conn.start_notebook_instance(NotebookInstanceName=self.instance_name)

if self.wait_for_completion:
self.log.info("Waiting for SageMaker notebook %s to start...", self.instance_name)
self.hook.conn.get_waiter("notebook_instance_in_service").wait(
NotebookInstanceName=self.instance_name
)
2 changes: 2 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ operators:
- integration-name: Amazon SageMaker
python-modules:
- airflow.providers.amazon.aws.operators.sagemaker
- airflow.providers.amazon.aws.operators.sagemaker_notebook
- integration-name: Amazon Simple Notification Service (SNS)
python-modules:
- airflow.providers.amazon.aws.operators.sns
Expand Down Expand Up @@ -503,6 +504,7 @@ hooks:
- integration-name: Amazon SageMaker
python-modules:
- airflow.providers.amazon.aws.hooks.sagemaker
- airflow.providers.amazon.aws.hooks.sagemaker_notebook
- integration-name: Amazon Simple Email Service (SES)
python-modules:
- airflow.providers.amazon.aws.hooks.ses
Expand Down
57 changes: 57 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/sagemaker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,63 @@ This creates an experiment so that it's ready to be associated with processing,
:start-after: [START howto_operator_sagemaker_experiment]
:end-before: [END howto_operator_sagemaker_experiment]

.. _howto/operator:SageMakerCreateNotebookOperator:

Create a SageMaker Notebook Instance
====================================

To create a SageMaker Notebook Instance , you can use :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerCreateNotebookOperator`.
This creates a SageMaker Notebook Instance ready to run Jupyter notebooks.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker_notebook.py
:language: python
:dedent: 4
:start-after: [START howto_operator_sagemaker_notebook_create]
:end-before: [END howto_operator_sagemaker_notebook_create]

.. _howto/operator:SageMakerStopNotebookOperator:

Stop a SageMaker Notebook Instance
==================================

To terminate SageMaker Notebook Instance , you can use :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerStopNotebookOperator`.
This terminates the ML compute instance and disconnects the ML storage volume.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker_notebook.py
:language: python
:dedent: 4
:start-after: [START howto_operator_sagemaker_notebook_stop]
:end-before: [END howto_operator_sagemaker_notebook_stop]

.. _howto/operator:SageMakerStartNotebookOperator:

Start a SageMaker Notebook Instance
===================================

To launch a SageMaker Notebook Instance and re-attach an ML storage volume, you can use :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerStartNotebookOperator`.
This launches a new ML compute instance with the latest version of the libraries and attached your ML storage volume.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker_notebook.py
:language: python
:dedent: 4
:start-after: [START howto_operator_sagemaker_notebook_start]
:end-before: [END howto_operator_sagemaker_notebook_start]


.. _howto/operator:SageMakerDeleteNotebookOperator:

Delete a SageMaker Notebook Instance
====================================

To delete a SageMaker Notebook Instance, you can use :class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerDeleteNotebookOperator`.
This terminates the instance and deletes the ML storage volume and network interface associated with the instance. The instance must be stopped before it can be deleted.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker_notebook.py
:language: python
:dedent: 4
:start-after: [START howto_operator_sagemaker_notebook_delete]
:end-before: [END howto_operator_sagemaker_notebook_delete]

Sensors
-------

Expand Down
Loading