Skip to content

Commit

Permalink
Merge pull request #826 from sirosen/complete-flow-run-example
Browse files Browse the repository at this point in the history
Add example scripts for creating and running flows
  • Loading branch information
sirosen authored Aug 29, 2023
2 parents d7703f8 + 341ae99 commit 9b832e5
Show file tree
Hide file tree
Showing 6 changed files with 407 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Documentation
~~~~~~~~~~~~~

- Add an example script which handles creating and running a **flow**. (:pr:`NUMBER`)
53 changes: 53 additions & 0 deletions docs/examples/create_and_run_flow/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
Create & Run a Flow
===================

These examples guide you through creating and running a **flow** using the Globus
Flows service.

Note that users are restricted to only creating one **flow** unless covered by a
Globus subscription. Therefore, these examples will also include an option to
delete your **flow**.


Create and Delete Hello World Flow
----------------------------------

This script provides commands to create, list, and delete your **flows**.
The **flow** definition used for it is simple and baked into the script, but this
demonstrates the minimal **flow** creation and deletion process.

.. literalinclude:: manage_flow_minimal.py
:caption: ``manage_flow_minimal.py`` [:download:`download <manage_flow_minimal.py>`]
:language: python


Run a Flow
----------

This next example is distinct. It runs a flow but has no capability to create
or delete a flow. Note how ``SpecificFlowClient`` is used -- this class allows
users to access the flow-specific scope and provides the methods associated
with that scope of running **flows** and resuming **runs**.

The login code is slightly different from the previous example, as it has to
key off of the Flow ID in order to act appropriately.

.. literalinclude:: run_flow_minimal.py
:caption: ``run_flow_minimal.py`` [:download:`download <run_flow_minimal.py>`]
:language: python


Create, Delete, and Run Flows
-----------------------------

The following example combines the previous two.
It has to further enhance the login code to account for the two different
styles of login which it supports, but minimal other adjustments are needed.

Depending on the operation chosen, either the ``FlowsClient`` or the
``SpecificFlowClient`` will be used, and the login flow will also be
appropriately parametrized.

.. literalinclude:: manage_flow.py
:caption: ``manage_flow.py`` [:download:`download <manage_flow.py>`]
:language: python
150 changes: 150 additions & 0 deletions docs/examples/create_and_run_flow/manage_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env python
import argparse
import os
import sys

import globus_sdk
from globus_sdk.tokenstorage import SimpleJSONFileAdapter

MY_FILE_ADAPTER = SimpleJSONFileAdapter(os.path.expanduser("~/.sdk-manage-flow.json"))

# tutorial client ID
# we recommend replacing this with your own client for any production use-cases
CLIENT_ID = "61338d24-54d5-408f-a10d-66c06b59f6d2"

NATIVE_CLIENT = globus_sdk.NativeAppAuthClient(CLIENT_ID)


def do_login_flow(scope):
NATIVE_CLIENT.oauth2_start_flow(requested_scopes=scope, refresh_tokens=True)
authorize_url = NATIVE_CLIENT.oauth2_get_authorize_url()
print(f"Please go to this URL and login:\n\n{authorize_url}\n")
auth_code = input("Please enter the code here: ").strip()
tokens = NATIVE_CLIENT.oauth2_exchange_code_for_tokens(auth_code)
return tokens


def get_authorizer(flow_id=None):
if flow_id:
resource_server = flow_id
scope = globus_sdk.SpecificFlowClient(flow_id).scopes.user
else:
resource_server = globus_sdk.FlowsClient.resource_server
scope = globus_sdk.FlowsClient.scopes.manage_flows

# try to load the tokens from the file, possibly returning None
if MY_FILE_ADAPTER.file_exists():
tokens = MY_FILE_ADAPTER.get_token_data(resource_server)
else:
tokens = None

if tokens is None:
# do a login flow, getting back initial tokens
response = do_login_flow(scope)
# now store the tokens and pull out the correct token
MY_FILE_ADAPTER.store(response)
tokens = response.by_resource_server[resource_server]

return globus_sdk.RefreshTokenAuthorizer(
tokens["refresh_token"],
NATIVE_CLIENT,
access_token=tokens["access_token"],
expires_at=tokens["expires_at_seconds"],
on_refresh=MY_FILE_ADAPTER.on_refresh,
)


def get_flows_client():
return globus_sdk.FlowsClient(authorizer=get_authorizer())


def get_specific_flow_client(flow_id):
authorizer = get_authorizer(flow_id)
return globus_sdk.SpecificFlowClient(flow_id, authorizer=authorizer)


def create_flow(args):
flows_client = get_flows_client()
print(
flows_client.create_flow(
title=args.title,
definition={
"StartAt": "DoIt",
"States": {
"DoIt": {
"Type": "Action",
"ActionUrl": "https://actions.globus.org/hello_world",
"Parameters": {
"echo_string": "Hello, Asynchronous World!",
},
"End": True,
}
},
},
input_schema={},
subtitle="A flow created by the SDK tutorial",
)
)


def delete_flow(args):
flows_client = get_flows_client()
print(flows_client.delete_flow(args.flow_id))


def list_flows():
flows_client = get_flows_client()
for flow in flows_client.list_flows(filter_role="flow_owner"):
print(f"title: {flow['title']}")
print(f"id: {flow['id']}")
print()


def run_flow(args):
flow_client = get_specific_flow_client(args.flow_id)
print(flow_client.run_flow({}))


def logout():
for tokendata in MY_FILE_ADAPTER.get_by_resource_server().values():
for tok_key in ("access_token", "refresh_token"):
token = tokendata[tok_key]
NATIVE_CLIENT.oauth2_revoke_token(token)

os.remove(MY_FILE_ADAPTER.filename)


def main():
parser = argparse.ArgumentParser()
parser.add_argument("action", choices=["logout", "create", "delete", "list", "run"])
parser.add_argument("-f", "--flow-id", help="Flow ID for delete and run")
parser.add_argument("-t", "--title", help="Name for create")
args = parser.parse_args()

try:
if args.action == "logout":
logout()
elif args.action == "create":
if args.title is None:
parser.error("create requires --title")
create_flow(args)
elif args.action == "delete":
if args.flow_id is None:
parser.error("delete requires --flow-id")
delete_flow(args)
elif args.action == "list":
list_flows()
elif args.action == "run":
if args.flow_id is None:
parser.error("run requires --flow-id")
run_flow(args)
else:
raise NotImplementedError()
except globus_sdk.FlowsAPIError as e:
print(f"API Error: {e.code} {e.message}")
print(e.text)
sys.exit(1)


if __name__ == "__main__":
main()
122 changes: 122 additions & 0 deletions docs/examples/create_and_run_flow/manage_flow_minimal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python

import argparse
import os
import sys

import globus_sdk
from globus_sdk.tokenstorage import SimpleJSONFileAdapter

MY_FILE_ADAPTER = SimpleJSONFileAdapter(os.path.expanduser("~/.sdk-manage-flow.json"))

SCOPES = [globus_sdk.FlowsClient.scopes.manage_flows]
RESOURCE_SERVER = globus_sdk.FlowsClient.resource_server

# tutorial client ID
# we recommend replacing this with your own client for any production use-cases
CLIENT_ID = "61338d24-54d5-408f-a10d-66c06b59f6d2"

NATIVE_CLIENT = globus_sdk.NativeAppAuthClient(CLIENT_ID)


def do_login_flow():
NATIVE_CLIENT.oauth2_start_flow(requested_scopes=SCOPES, refresh_tokens=True)
authorize_url = NATIVE_CLIENT.oauth2_get_authorize_url()
print(f"Please go to this URL and login:\n\n{authorize_url}\n")
auth_code = input("Please enter the code here: ").strip()
tokens = NATIVE_CLIENT.oauth2_exchange_code_for_tokens(auth_code)
return tokens


def get_authorizer():
# try to load the tokens from the file, possibly returning None
if MY_FILE_ADAPTER.file_exists():
tokens = MY_FILE_ADAPTER.get_token_data(RESOURCE_SERVER)
else:
tokens = None

if tokens is None:
# do a login flow, getting back initial tokens
response = do_login_flow()
# now store the tokens and pull out the correct token
MY_FILE_ADAPTER.store(response)
tokens = response.by_resource_server[RESOURCE_SERVER]

return globus_sdk.RefreshTokenAuthorizer(
tokens["refresh_token"],
NATIVE_CLIENT,
access_token=tokens["access_token"],
expires_at=tokens["expires_at_seconds"],
on_refresh=MY_FILE_ADAPTER.on_refresh,
)


def get_flows_client():
return globus_sdk.FlowsClient(authorizer=get_authorizer())


def create_flow(args):
flows_client = get_flows_client()
print(
flows_client.create_flow(
title=args.title,
definition={
"StartAt": "DoIt",
"States": {
"DoIt": {
"Type": "Action",
"ActionUrl": "https://actions.globus.org/hello_world",
"Parameters": {
"echo_string": "Hello, Asynchronous World!",
},
"End": True,
}
},
},
input_schema={},
subtitle="A flow created by the SDK tutorial",
)
)


def delete_flow(args):
flows_client = get_flows_client()
print(flows_client.delete_flow(args.flow_id))


def list_flows():
flows_client = get_flows_client()
for flow in flows_client.list_flows(filter_role="flow_owner"):
print(f"title: {flow['title']}")
print(f"id: {flow['id']}")
print()


def main():
parser = argparse.ArgumentParser()
parser.add_argument("action", choices=["create", "delete", "list"])
parser.add_argument("-f", "--flow-id", help="Flow ID for delete")
parser.add_argument("-t", "--title", help="Name for create")
args = parser.parse_args()

try:
if args.action == "create":
if args.title is None:
parser.error("create requires --title")
create_flow(args)
elif args.action == "delete":
if args.flow_id is None:
parser.error("delete requires --flow-id")
delete_flow(args)
elif args.action == "list":
list_flows()
else:
raise NotImplementedError()
except globus_sdk.FlowsAPIError as e:
print(f"API Error: {e.code} {e.message}")
print(e.text)
sys.exit(1)


if __name__ == "__main__":
main()
Loading

0 comments on commit 9b832e5

Please sign in to comment.