Skip to content

refactor: Update AWS Glue Databrew StartJobRun step to use integration pattern input #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions src/stepfunctions/steps/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,10 +550,14 @@ class GlueDataBrewStartJobRunStep(Task):
Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, **kwargs):
def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompletion, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. Supported integration patterns (default: WaitForCompletion):

* WaitForCompletion: Wait for the Databrew job to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync>`_ for more details.)
* CallAndContinue: Call StartJobRun and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default>`_ for more details.)
comment (str, optional): Human-readable comment or description. (default: None)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
Expand All @@ -563,23 +567,18 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
"""
if wait_for_completion:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun.sync
"""

kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun
"""
supported_integ_patterns = [IntegrationPattern.WaitForCompletion, IntegrationPattern.CallAndContinue]

kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun)
is_integration_pattern_valid(integration_pattern, supported_integ_patterns)
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun,
integration_pattern)
"""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit (non-blocking): would be good to have the comment precede the Field.Resource assignment.

Example resource arns:
- CallAndContinue: arn: arn:aws:states:::databrew:startJobRun
- WaitForCompletion: arn: arn:aws:states:::databrew:startJobRun.sync
"""

super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs)

Expand Down
42 changes: 36 additions & 6 deletions tests/unit/test_service_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,8 @@ def test_emr_modify_instance_group_by_name_step_creation():


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_sync():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={
def test_databrew_start_job_run_step_creation_default():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - default', parameters={
"Name": "MyWorkflowJobRun"
})

Expand All @@ -693,10 +693,30 @@ def test_databrew_start_job_run_step_creation_sync():


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={
"Name": "MyWorkflowJobRun"
})
def test_databrew_start_job_run_step_creation_wait_for_completion():
step = GlueDataBrewStartJobRunStep(
'Start Glue DataBrew Job Run - WaitForCompletion', integration_pattern=IntegrationPattern.WaitForCompletion,
parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::databrew:startJobRun.sync',
'Parameters': {
'Name': 'MyWorkflowJobRun'
},
'End': True
}


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_call_and_continue():
step = GlueDataBrewStartJobRunStep(
'Start Glue DataBrew Job Run - CallAndContinue',
integration_pattern=IntegrationPattern.CallAndContinue, parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
Expand All @@ -708,6 +728,16 @@ def test_databrew_start_job_run_step_creation():
}


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_wait_for_task_token_raises_error():
error_message = re.escape(f"Integration Pattern ({IntegrationPattern.WaitForTaskToken.name}) is not supported for this step - "
f"Please use one of the following: "
f"{[IntegrationPattern.WaitForCompletion.name, IntegrationPattern.CallAndContinue.name]}")
with pytest.raises(ValueError, match=error_message):
GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - WaitForTaskToken',
integration_pattern=IntegrationPattern.WaitForTaskToken)


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_eks_create_cluster_step_creation_call_and_continue():
step = EksCreateClusterStep("Create Eks cluster - CallAndContinue", integration_pattern=IntegrationPattern.CallAndContinue,
Expand Down