Code for data preparation

This python script is meant to be imported and used in conjunction with data preparation steps, as outlined in the guide to data preparation and in the quick start for SDK

from typing import List, Optional

# code to plot thing
import plotly
import json
import pandas as pd
from io import StringIO
import os
import json
import requests
from time import sleep
import pandas as pd

from aiaengine import api
from aiaengine.api import project
from aiaengine.api import recipe
from aiaengine.api import file
from aiaengine.api import dataset
from aiaengine import util

TIMEOUT = 60

def get_recipe(client, recipe_id):
    get_recipe_response = client.recipes.GetRecipe(recipe.GetRecipeRequest(id=recipe_id))
    return get_recipe_response

def get_recipe_and_wait(client, recipe_id, iteration, step, expected_status, timeout=120, verbose=True):
    """Wait until the iteration is in expected status or timeout

    Return the recipe object
    """
    last_step = ""
    last_status = ""
    countdown = timeout
    while True:
        get_recipe_response = get_recipe(client, recipe_id)
        current_step = get_recipe_response.iterations[iteration - 1].step
        status = get_recipe_response.iterations[iteration - 1].status
        if current_step == step and status == expected_status:
            return get_recipe_response
        if countdown <= 0:
            raise Exception('Timeout when waiting for interation {} in status {}'.format(iteration, status))
        countdown -= 1
        if verbose and (current_step != last_step or last_status != status):
            print("")
            print(f'step={current_step}, status={status}', end='')

        if verbose:
            print(".", end="")
        last_step = current_step
        last_status = status

        sleep(1) # wait for 1 second


def get_resultant_data_and_wait(client, dataset_request, timeout=TIMEOUT):
    """Wait until the dataset ready
    """
    countdown = timeout
    while True:
        print(".", end="")
        if client.datasets.GetDataset(dataset_request).status == 'analysed':
            return True
        if countdown <= 0:
            raise Exception('Timeout when waiting for dataset')
        countdown -= 1

        sleep(1) # wait for 1 s

def commit_actions(client, recipe_id, iteration: Optional[List] = None, target_columns=[], verbose=True):
    if iteration is None:
        iteration = _get_current_iteration(client, recipe_id)

    if verbose:
        print("commit actions")
    commit_actions_request = recipe.CommitActionsRequest(
        id=recipe_id, # id of the recipe
        iteration=iteration, # remember that iteration number starts from '1', not '0'
        target_columns=target_columns)
    commit_actions_response = client.recipes.CommitActions(commit_actions_request)

    if verbose:
        print("you can wait for commit actions to complete with `wait_for_commit_actions(client, recipe_id)`")
    return commit_actions_response


def render_problem_insight(insight):
    if insight['type'] == 'vis' and insight['data']['renderer'] == 'plotly':
        item = insight['data']['info']
        return plotly.io.from_json(json.dumps(item))
    if insight['type'] == 'text':
        return insight['data']
    if insight['type'] == 'datatable':
        table_data = insight['data']
        if "rows" in table_data.keys():
            row_data = table_data.pop('rows')
            table_data.update({'data': row_data})
        s = StringIO()
        json.dump(table_data, s)
        s.seek(0)
        return pd.read_json(s, orient='split')

def create_recipe_util(client, create_recipe_request, timeout = TIMEOUT):
    print("creating a new blank recipe.", end = "")
    while True:
        # please run again if error is encountered
        try:
            create_recipe_response = client.recipes.CreateRecipe(create_recipe_request)
            return create_recipe_response
        except:
            print(".", end="")
            timeout = timeout - 1
            if timeout == 0:
                raise ValueError("Time out creating a recipe")


def get_dataframe(client, output_dataset_id):
    file_urls = util.get_dataset_file_download_urls(client, output_dataset_id)
    df = pd.concat([pd.read_parquet(url) for url in file_urls])
    return df


def get_recommendations(client, recipe_id, iteration=None):
    if iteration is None:
        iteration = _get_current_iteration(client, recipe_id)

    get_recommended_actions_request = recipe.GetRecommendedActionsUrlRequest(id=recipe_id, iteration=iteration)
    get_recommended_actions_response = client.recipes.GetRecommendedActionsUrl(get_recommended_actions_request)

    print('Get recommendations from', get_recommended_actions_response.url)
    recommendedActions = json.loads(requests.get(get_recommended_actions_response.url).content)
    return recommendedActions


def get_insights(client, recipe_obj):
    recipe_id = recipe_obj.id
    iteration = len(recipe_obj.iterations)
    get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, iteration=iteration, step='recommendation', expected_status='success')
    get_recommended_actions_request = recipe.GetRecommendedActionsUrlRequest(id=recipe_id, iteration=iteration)
    get_recommended_actions_response = client.recipes.GetRecommendedActionsUrl(get_recommended_actions_request)
    recommended_actions = json.loads(requests.get(get_recommended_actions_response.url).content)
    return recommended_actions

def visualize_recommendations(recommended_actions):
    ui_output = [render_problem_insight(insight) for rec in recommended_actions if "problem" in rec  for insight in rec['problem']['desc']]

    for elem  in ui_output:
        display(elem)

def create_new_recipe(client, dataset_id, target_col, name, description):
    """Create a new Recipe"""
    create_recipe_request = recipe.CreateRecipeRequest(
        name=name,
        description=description,
        datasets=[recipe.InputDataset(id=dataset_id, target_columns = [target_col])
    ])
    # create_recipe_response
    create_recipe_response = create_recipe_util(client, create_recipe_request)
    return create_recipe_response

def add_iteration_to_recipe(client, recipe_obj):
    """Obtain the dataset id so can obtain the target columns"""
    create_iteration_request = recipe.CreateRecipeIterationRequest(id=recipe_obj.id, target_columns = recipe_obj.datasets[0].target_columns)
    create_iteration_response = client.recipes.CreateRecipeIteration(create_iteration_request)

    # start working with iteration #2
    iteration = len(recipe_obj.iterations) + 1

    recipe_obj = get_recipe_and_wait(client,recipe_id=recipe_obj.id, step='recommendation', expected_status='success', iteration=iteration)
    return recipe_obj

def get_dataset_id(recipe_obj):
    iter = recipe_obj.iterations
    return iter[len(iter)-1].dataset_id

def upload_data(client, path, project_id, name, description):
    """Upload local file to cloud"""
    _, ext = os.path.splitext(path)
    if ext.lower() == ".csv":
        content_type = 'text/csv'
    elif ext.lower() == ".parquet":
        content_type = "application/binary+parquet"

    return util.create_dataset(
        client,
        project_id=project_id,
        name=name,
        description=description,
        data_files=[path],
        )

def finalize_recipe(client, recipe_obj, completed_dataset_name):
    complete_recipe_request = recipe.CompleteRecipeRequest(id=recipe_obj.id, dataset_name=completed_dataset_name)
    complete_recipe_response = client.recipes.CompleteRecipe(complete_recipe_request)
    return complete_recipe_response

def search_project_by_name(client, project_name):
    projects = client.projects.ListUserProjects(project.ListUserProjectsRequest())
    project_ids = [project.id for project in projects.projects if project.name.lower() == project_name.lower()]
    return project_ids[0]

def queue_actions(client, recipe_id, actions, iteration=None, verbose=True):
    if iteration is None:
        iteration = _get_current_iteration(client, recipe_id)

    if verbose:
        print("queueing actions")
    add_actions_request = recipe.AddActionsRequest(
        id=recipe_id, # id of the recipe
        iteration=iteration, # remember that iteration number starts from '1', not '0'
        actions=actions)

    add_actions_response = client.recipes.AddActions(add_actions_request)

    if verbose:
        print("checking validity of queueing actions")
    if add_actions_response.invalid_index != -1:
        raise Exception(add_actions_response.error)

    print("checks successful")

    return add_actions_response


def _get_current_iteration(client, recipe_id):
    tmp = get_recipe(client, recipe_id)
    iteration = len(tmp.iterations)
    return iteration

def wait_for_commit_actions(client, recipe_id, iteration=None, verbose=True):
    if iteration is None:
        iteration = _get_current_iteration(client, recipe_id)

    # wait for iteration to be in file_format success
    if verbose:
        print("wait for commit actions to be carried out")

    get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, iteration=iteration, step='file_format', expected_status='success', verbose=verbose)
    print("done")


def wait_for_suggestions(client, recipe_id, verbose=True):
    """Wait for iteration + 1's recommendations to be ready"""
    if verbose:
        print("wait for new recommendations to be ready")
    get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, iteration=(iteration + 1), step='recommendation', expected_status='success', verbose=verbose)
    print(commit_actions_response)