Aia util
This python script is meant to be imported and used in conjunction with data preparation steps, as outlined in the guide to data preparation and in the quick start for SDK
from typing import List, Optional
# code to plot thing
import plotly
import json
import pandas as pd
from io import StringIO
import os
import json
import requests
from time import sleep
import pandas as pd
from aiaengine import api
from aiaengine.api import project
from aiaengine.api import recipe
from aiaengine.api import file
from aiaengine.api import dataset
from aiaengine import util
TIMEOUT = 60
def get_recipe(client, recipe_id):
get_recipe_response = client.recipes.GetRecipe(recipe.GetRecipeRequest(id=recipe_id))
return get_recipe_response
def get_recipe_and_wait(client, recipe_id, iteration, step, expected_status, timeout=120, verbose=True):
"""Wait until the iteration is in expected status or timeout
Return the recipe object
"""
last_step = ""
last_status = ""
countdown = timeout
while True:
get_recipe_response = get_recipe(client, recipe_id)
current_step = get_recipe_response.iterations[iteration - 1].step
status = get_recipe_response.iterations[iteration - 1].status
if current_step == step and status == expected_status:
return get_recipe_response
if countdown <= 0:
raise Exception('Timeout when waiting for interation {} in status {}'.format(iteration, status))
countdown -= 1
if verbose and (current_step != last_step or last_status != status):
print("")
print(f'step={current_step}, status={status}', end='')
if verbose:
print(".", end="")
last_step = current_step
last_status = status
sleep(1) # wait for 1 second
def get_resultant_data_and_wait(client, dataset_request, timeout=TIMEOUT):
"""Wait until the dataset ready
"""
countdown = timeout
while True:
print(".", end="")
if client.datasets.GetDataset(dataset_request).status == 'analysed':
return True
if countdown <= 0:
raise Exception('Timeout when waiting for dataset')
countdown -= 1
sleep(1) # wait for 1 s
def commit_actions(client, recipe_id, iteration: Optional[List] = None, target_columns=[], verbose=True):
if iteration is None:
iteration = _get_current_iteration(client, recipe_id)
if verbose:
print("commit actions")
commit_actions_request = recipe.CommitActionsRequest(
id=recipe_id, # id of the recipe
iteration=iteration, # remember that iteration number starts from '1', not '0'
target_columns=target_columns)
commit_actions_response = client.recipes.CommitActions(commit_actions_request)
if verbose:
print("you can wait for commit actions to complete with `wait_for_commit_actions(client, recipe_id)`")
return commit_actions_response
def render_problem_insight(insight):
if insight['type'] == 'vis' and insight['data']['renderer'] == 'plotly':
item = insight['data']['info']
return plotly.io.from_json(json.dumps(item))
if insight['type'] == 'text':
return insight['data']
if insight['type'] == 'datatable':
table_data = insight['data']
if "rows" in table_data.keys():
row_data = table_data.pop('rows')
table_data.update({'data': row_data})
s = StringIO()
json.dump(table_data, s)
s.seek(0)
return pd.read_json(s, orient='split')
def create_recipe_util(client, create_recipe_request, timeout = TIMEOUT):
print("creating a new blank recipe.", end = "")
while True:
# please run again if error is encountered
try:
create_recipe_response = client.recipes.CreateRecipe(create_recipe_request)
return create_recipe_response
except:
print(".", end="")
timeout = timeout - 1
if timeout == 0:
raise ValueError("Time out creating a recipe")
def get_dataframe(client, output_dataset_id):
file_urls = util.get_dataset_file_download_urls(client, output_dataset_id)
df = pd.concat([pd.read_parquet(url) for url in file_urls])
return df
def get_recommendations(client, recipe_id, iteration=None):
if iteration is None:
iteration = _get_current_iteration(client, recipe_id)
get_recommended_actions_request = recipe.GetRecommendedActionsUrlRequest(id=recipe_id, iteration=iteration)
get_recommended_actions_response = client.recipes.GetRecommendedActionsUrl(get_recommended_actions_request)
print('Get recommendations from', get_recommended_actions_response.url)
recommendedActions = json.loads(requests.get(get_recommended_actions_response.url).content)
return recommendedActions
def get_insights(client, recipe_obj):
recipe_id = recipe_obj.id
iteration = len(recipe_obj.iterations)
get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, iteration=iteration, step='recommendation', expected_status='success')
get_recommended_actions_request = recipe.GetRecommendedActionsUrlRequest(id=recipe_id, iteration=iteration)
get_recommended_actions_response = client.recipes.GetRecommendedActionsUrl(get_recommended_actions_request)
recommended_actions = json.loads(requests.get(get_recommended_actions_response.url).content)
return recommended_actions
def visualize_recommendations(recommended_actions):
ui_output = [render_problem_insight(insight) for rec in recommended_actions if "problem" in rec for insight in rec['problem']['desc']]
for elem in ui_output:
display(elem)
def create_new_recipe(client, dataset_id, target_col, name, description):
"""Create a new Recipe"""
create_recipe_request = recipe.CreateRecipeRequest(
name=name,
description=description,
datasets=[recipe.InputDataset(id=dataset_id, target_columns = [target_col])
])
# create_recipe_response
create_recipe_response = create_recipe_util(client, create_recipe_request)
return create_recipe_response
def add_iteration_to_recipe(client, recipe_obj):
"""Obtain the dataset id so can obtain the target columns"""
create_iteration_request = recipe.CreateRecipeIterationRequest(id=recipe_obj.id, target_columns = recipe_obj.datasets[0].target_columns)
create_iteration_response = client.recipes.CreateRecipeIteration(create_iteration_request)
# start working with iteration #2
iteration = len(recipe_obj.iterations) + 1
recipe_obj = get_recipe_and_wait(client,recipe_id=recipe_obj.id, step='recommendation', expected_status='success', iteration=iteration)
return recipe_obj
def get_dataset_id(recipe_obj):
iter = recipe_obj.iterations
return iter[len(iter)-1].dataset_id
def upload_data(client, path, project_id, name, description):
"""Upload local file to cloud"""
_, ext = os.path.splitext(path)
if ext.lower() == ".csv":
content_type = 'text/csv'
elif ext.lower() == ".parquet":
content_type = "application/binary+parquet"
return util.create_dataset(
client,
project_id=project_id,
name=name,
description=description,
data_files=[path],
)
def finalize_recipe(client, recipe_obj, completed_dataset_name):
complete_recipe_request = recipe.CompleteRecipeRequest(id=recipe_obj.id, dataset_name=completed_dataset_name)
complete_recipe_response = client.recipes.CompleteRecipe(complete_recipe_request)
return complete_recipe_response
def search_project_by_name(client, project_name):
projects = client.projects.ListUserProjects(project.ListUserProjectsRequest())
project_ids = [project.id for project in projects.projects if project.name.lower() == project_name.lower()]
return project_ids[0]
def queue_actions(client, recipe_id, actions, iteration=None, verbose=True):
if iteration is None:
iteration = _get_current_iteration(client, recipe_id)
if verbose:
print("queueing actions")
add_actions_request = recipe.AddActionsRequest(
id=recipe_id, # id of the recipe
iteration=iteration, # remember that iteration number starts from '1', not '0'
actions=actions)
add_actions_response = client.recipes.AddActions(add_actions_request)
if verbose:
print("checking validity of queueing actions")
if add_actions_response.invalid_index != -1:
raise Exception(add_actions_response.error)
print("checks successful")
return add_actions_response
def _get_current_iteration(client, recipe_id):
tmp = get_recipe(client, recipe_id)
iteration = len(tmp.iterations)
return iteration
def wait_for_commit_actions(client, recipe_id, iteration=None, verbose=True):
if iteration is None:
iteration = _get_current_iteration(client, recipe_id)
# wait for iteration to be in file_format success
if verbose:
print("wait for commit actions to be carried out")
get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, iteration=iteration, step='file_format', expected_status='success', verbose=verbose)
print("done")
def wait_for_suggestions(client, recipe_id, verbose=True):
"""Wait for iteration + 1's recommendations to be ready"""
if verbose:
print("wait for new recommendations to be ready")
get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, iteration=(iteration + 1), step='recommendation', expected_status='success', verbose=verbose)
print(commit_actions_response)