Skip to content

Using the Data Preparation Module

Data Preparation Overview

The AI & Analytics Engine's data wrangling module aims to:

  1. Build a data processing pipeline that can be applied to new data in a consistent and reproducible manner,
  2. Provide AI-powered insights into the data and make smart recommendations on how to transform the data for better machine learning outcomes,
  3. Provide clear explanations as to why the presented actions were recommended, so that users can impart trust on the recommendations and feel comfortable adopting them.

In this document you will find detailed explanations of all the Engine's data wrangling capabilities. There are instructions for using both the GUI and the API.

Understand

To fully appreciate the material covered in this page, you will need some familiarity with the concepts of:

  • Schema: A schema is attached to a dataset and it contains information about the column names, and the type of each column. A dataset must have have a schema before actions can be appplied to it. If the user uploads a dataset without a schema (e.g. a CSV or parquet file), then the platform will infer a schema for the dataset, and recommend the appropriate casting actions to convert the columns into the right types.
  • Data wrangling: Prior to using data to train machine learning models, the data must be modified into a more appropriate form. This step is called data wrangling, and involves such things as removing irrelevant data to save training time, and removing or imputing missing values.
  • Recipe: In the Engine, the chain of data transformations applied to a dataset are referred to as a recipe. Once complete, this recipe becomes reuseable on other datasets with a compliant schema.

The Data Preparation Flow

The data preparation process on the AI & Analytics Engine follows this structure:

  1. Upload a dataset to a project on the AI & Analytics Engine platform (the Engine)
  2. Create a new recipe tied to the dataset
  3. Confirm/edit the recommended schema
  4. Repeat the following until there are no more recommendations/you are satisfied:
    1. View insights and recommendations from the Engine
    2. Choose from the recommended actions and add manual actions
    3. Commit the actions to produce a new temporary dataset; the committed actions will be added to the recipe
  5. Finalize the recipe which will compile the committed actions into a reproducible data processing pipeline
  6. The processed dataset as a result of applying the recipe will become available for building machine learning models
  7. Reuse the recipe to conveniently prepare new incoming data for prediction

Note: only the dataset created at the step of recipe finalization can be accessed. All other datasets created from intermediate steps are temporary and non-accessible

Data Preparation via Web Graphical User Interface (GUI)

You can follow the previous sections to create a Project, and add a dataset.

Create a Recipe

When a dataset is first imported, the option is given to create a new data wrangling recipe. If a recipe has previously been created for a dataset and you wish to create a new one, you can do so from the dataset's details page:

Data Wrangling Window

When the recipe is first created the dataset is analyzed. This typically should take less than a minute. The duration depends on the number of variables or columns your tabular dataset contains. Once complete, we see the following screen:

This contains the following:

  1. Search bar. This can be used to find and display particular columns of your choice in the dataset.
  2. Dataset viewport. The first 1000 rows of the tabular dataset are shown to the user, in order to make informed decisions about the actions they want to apply next. Along the top are the names of the columns along with an icon indicating the type of the data it contains. The target column is highlighted for convenience. This view is refreshed whenever actions are queued.
  3. Actions dialogue box. This is the main interface to the data wrangling process. In the currently opened "Suggestions" tab, we see:
    1. Field to enter the target column (if provided, better recommendations are given)
    2. Insight generated by the Engine, click to expand and contract
    3. Recommended actions to address insight
    4. "See analysis" button, click to see a detailed explanation of the provided recommendations

Get Insights

Insights are generated when the recipe is first created and whenever actions are committed. You can see these insights in the "Suggestions" tab of the actions dialogue box. If a target column is chosen, the Engine is able to give better insights tailored to the target.

The first recommended action will always be to cast each column to a particular type, unless the dataset's schema already matches the schema inferred by the Engine.

Upon clicking "see analysis", we can view the Engine's justifications for the suggestions it has provided:

In this case, an analysis of the values in each column is shown and the Engine has decided whether it seems more numeric or categorical in nature. See the actions catalogue for when such actions are recommended and why.

Add Actions

Suggested actions can be added to the action queue by clicking the adjacent plus icon. Upon doing so, you will be brought to the "Recipe" tab where you will see the action in the action queue. Actions can similarly be added from the "Add Actions" tab.

Whenever actions in the queue are added, removed, or edited, the Engine generates a preview of the dataset with the action applied. Note that for some actions that act on the dataset globally, this preview on a subset of the data may not be 100% accurate -- see the actions catalogue.

Suggested Action Manual Action

From the "Queued actions" drop down list in the "Recipe" tab, you can edit and remove actions currently in the queue by clicking the respective icons. The "edit action" dialogue box (rightmost image above) is also displayed when actions are added manually from the "Add Actions" tab.

Commit Actions

Once satisfied with the actions currently in the queue, click "Commit Action" to apply them to the entire dataset. The queued actions will then appear in the "Committed actions" drop down list with a spinner indicating it is being applied.

The Engine will then also analyse the new processed dataset and generate new suggestions tailored to the target column (if provided). This will typically take 1-2 minutes, though it is heavily dependant on the size of the dataset.

Finalize Recipe

When an action is first committed, the "Finalize & End" button will no longer be greyed out. When satisfied with the dataset, click it, name the now complete dataset, and click "Yes" to confirm. The dataset will then be saved and available for training models, and the recipe is also saved as a reusable pipeline of actions to apply to new incoming datasets of the same shape.

Reuse Recipe

Once a recipe has been finalized, it becomes available for reuse to make predicting on new data convenient. To reuse a recipe, first upload the new dataset as described in the Importing Data page. Upon doing so, you will be prompted to choose whether to create a new recipe or use an existing one. Select "Use an existing recipe" and choose the recipe you wish to reuse from the drop down list.

The data wrangling pipeline in the recipe will then be applied to the dataset and once it has finished processing it will become available for use.

Data Preparation via API

We can interact with the AI & Analytics Engine via Python

Initialize

As usual, we import the SDK and you need to include the code in the Code for data preparation section in a file to be imported as aia_util

from aiaengine import api
import aia_util

We then establish a connection to the server via a client

client = api.Client()

# path to the file you wish to upload
data_file = "german_credit.csv"

The client will be used extensively in future steps.

Obtain Project ID

In order to add a dataset to the platform we need to first nominate the project to which the data should belong. A project is uniquely identified by its project ID which is different to the project name. We can find the project ID by searching for project name using the below

project_name_to_search_for = "Project 1"
project_id = aia_util.search_project_by_name(client, project_name_to_search_for)

Uploading Data to the platform

We can upload a dataset to the Engine using create_dataset

from datetime import datetime
time_stamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

new_dataset = aia_util.upload_data(client,
                              project_id=project_id,
                              name='German Credit ' + time_stamp,
                              description="",
                              path=data_file)


dataset_id = new_dataset.id

Create a recipe

We can create a recipe as below

recipe_obj = aia_util.create_new_recipe(
    client,
    dataset_id,
    target_col = "default_n24m",
    name = "Process German Credit Data",
    description = "Creating a Data Preparation Recipe/Pipeline"
)
recipe_obj

Get Insights

We can ask the platform for insights

# get list of recommended actions of the first iteration
recommended_actions = aia_util.get_insights(client, recipe_obj)

The platform's UI visualizes the recommended actions for you. However, we also provide the data behind the visualizations so that you can perform your own visualizations if you wish. Running the below in a Jupyter notebook session will display the visualization of the recommendations

aia_util.visualize_recommendations(recommended_actions)

Pre-commit actions: Queue, Update, and Delete Actions

To apply an action to a dataset you need to first queue the action. When you queue an action, the platform checks if the action is valid. At this stage, the action hasn't been applied to the whole dataset yet. Instead, the action has been applied to a smaller subset of the dataset for sense checking.

iteration = 1

actions = []

for ra in recommended_actions:
    actions = actions + ra['solution']['actions']

recipe_obj = aia_util.queue_actions(
    client,
    recipe_obj,
    actions=json.dumps(actions))

You can queue multiple actions one after another using the aia_util.queue_actions function repeatedly. And the actions is intended to be executed in the order of they are queued.

You can update an action that has been queued even if the action is sandwhich between other actions. This can be done as below

client.recipes.UpdateActions(recipe.UpdateActionsRequest(id = recipe_id, iteration = iteration, actions = replacement_actions, from_index = from_index))

The from_index is the action index from which the queued actions will be replaced with the replacement_actions

To delete actions from the queue, you need to use the same code as above but you can need to choose a from_index. And your replacement_actions must be the actions from from_index with the action you want deleted removed from replacement_actions.

Commit Actions

Once you are done with all the actions that you want queued, you can apply the action to the dataset with a Commit command. Once the action is committed, it is final. That means, you can not edit a recipes committed actions like you can queued actions.

Once committed, the action interpreter will perform the actions on the dataset which will result in a temporary dataset. See code to commit actions

aia_util.commit_actions(client, recipe_obj, iteration)
aia_util.wait_for_commit_actions(client, recipe_obj, iteration)

Show the final result

To obtain the result of iteration two, the user can use the get_dataframe function as below. The function returns a pandas.DataFrame.

# get list of dataset's files
df = aia_util.get_dataframe(
    client,
    aia_util.get_dataset_id(recipe_obj)
)

df.head()

Finalize the Recipe

Once the user is content with the recipe or the platform can no longer generate more recommendations, the user can finalize the recipe and package it up as a reproducible data processing pipeline

completed_dataset_name = "German Credit Data (Prepared)" + time_stamp

complete_recipe_response = aia_util.finalize_recipe(client, recipe_obj, completed_dataset_name)

output_dataset_id = complete_recipe_response.dataset_id

finalize_df = aia_util.get_dataframe(client, output_dataset_id)

print(finalize_df.head())

Code for data preparation

This python script is meant to be imported and used in conjunction with the code above.

File name: aia_util.py

from typing import List, Optional

# code to plot thing
import plotly
import json
import pandas as pd
from io import StringIO
import os
import json
import requests
from time import sleep
import pandas as pd

from aiaengine import api
from aiaengine.api import project
from aiaengine.api import recipe
from aiaengine.api import file
from aiaengine.api import dataset
from aiaengine import util
from core.input_selection import NameIn

TIMEOUT = 60

def get_recipe(client, recipe_id):
    get_recipe_response = client.recipes.GetRecipe(recipe.GetRecipeRequest(id=recipe_id))
    return get_recipe_response

def get_recipe_and_wait(client, recipe_id, iteration, step, expected_status, timeout=120, verbose=True):
    """Wait until the iteration is in expected status or timeout

    Return the recipe object
    """
    last_step = ""
    last_status = ""
    while True:
        get_recipe_response = client.recipes.GetRecipe(recipe.GetRecipeRequest(id=recipe_id))
        current_step = get_recipe_response.iterations[iteration - 1].step
        status = get_recipe_response.iterations[iteration - 1].status
        if current_step == step and status == expected_status:
            return get_recipe_response
        if timeout == 0:
            raise Exception('Timeout when waiting for interation {} in status {}'.format(iteration, status))
        timeout -= 1
        if verbose and (current_step != last_step or last_status != status):
            print("")
            print(f'step={current_step}, status={status}', end='')

        if verbose:
            print(".", end="")
        last_step = current_step
        last_status = status

        sleep(1) # wait for 1 second


def get_resultant_data_and_wait(client, dataset_request, timeout=TIMEOUT):
    """Wait until the dataset ready
    """
    while True:
        print(".", end="")
        if client.datasets.GetDataset(dataset_request).status == 'analysed':
            return True
        if timeout == 0:
            raise Exception('Timeout when waiting for dataset')
        timeout -= 1

        sleep(1) # wait for 1 s

def commit_actions(client, recipe_id, iteration: Optional[List] = None, target_columns=[], verbose=True):
    if iteration is None:
        iteration = get_current_iteration(client, recipe_id)

    if verbose:
        print("commit actions")
    commit_actions_request = recipe.CommitActionsRequest(
        id=recipe_id, # id of the recipe
        iteration=iteration, # remember that iteration number starts from '1', not '0'
        target_columns=target_columns)
    commit_actions_response = client.recipes.CommitActions(commit_actions_request)

    if verbose:
        print("you can wait for commit actions to complete with `wait_for_commit_actions(client, recipe_id)`")
    return commit_actions_response


def render_problem_insight(insight):
    if insight['type'] == 'vis' and insight['data']['renderer'] == 'plotly':
        item = insight['data']['info']
        return plotly.io.from_json(json.dumps(item))
    if insight['type'] == 'text':
        return insight['data']
    if insight['type'] == 'datatable':
        table_data = insight['data']
        #ZJ: i wanna cry
        if "rows" in table_data.keys():
            row_data = table_data.pop('rows')
            table_data.update({'data': row_data})
        s = StringIO()
        json.dump(table_data, s)
        s.seek(0)
        return pd.read_json(s, orient='split')

def create_recipe_util(client, create_recipe_request, timeout = TIMEOUT):
    print("creating a new blank recipe.", end = "")
    while True:
        # please run again if error is encountered
        try:
            create_recipe_response = client.recipes.CreateRecipe(create_recipe_request)
            return create_recipe_response
        except:
            print(".", end="")
            timeout = timeout - 1
            if timeout == 0:
                raise ValueError("Time out creating a recipe")


def get_dataframe(client, output_dataset_id):
    file_urls = util.get_dataset_file_download_urls(client, output_dataset_id)
    df = pd.concat([pd.read_parquet(url) for url in file_urls])
    return df


def get_recommendations(client, recipe_id, iteration=None):
    if iteration is None:
        iteration = get_current_iteration(client, recipe_id)

    get_recommended_actions_request = recipe.GetRecommendedActionsUrlRequest(id=recipe_id, iteration=iteration)
    get_recommended_actions_response = client.recipes.GetRecommendedActionsUrl(get_recommended_actions_request)

    print('Get recommendations from', get_recommended_actions_response.url)
    recommendedActions = json.loads(requests.get(get_recommended_actions_response.url).content)
    return recommendedActions


def get_insights(client, recipe_obj, **kwargs):
    recipe_id = recipe_obj.id
    iteration = len(recipe_obj.iterations)
    get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, step='recommendation', expected_status='success', iteration=iteration, **kwargs)
    get_recommended_actions_request = recipe.GetRecommendedActionsUrlRequest(id=recipe_id, iteration=iteration)
    get_recommended_actions_response = client.recipes.GetRecommendedActionsUrl(get_recommended_actions_request)
    recommended_actions = json.loads(requests.get(get_recommended_actions_response.url).content)
    return recommended_actions

def visualize_recommendations(recommended_actions):
#     for rec in recommended_actions:
#         if rec["solution"]["actions"][0]["name"] == "drop_columns":
#             lhs = rec["problem"]["desc"][0]["data"]
#             rhs = rec["solution"]["actions"][0]["input"][0]["select"]["names"]
#             rec["problem"]["desc"][0]["data"] = rec["problem"]["desc"][0]["data"] + ": We recommend dropping these columns: ", ", ".join(rhs)

    ui_output = [render_problem_insight(insight) for rec in recommended_actions if "problem" in rec  for insight in rec['problem']['desc']]

    for elem  in ui_output:
        display(elem)

def create_new_recipe(client, dataset_id, target_col, name, description):
    # create a new Recipe
    create_recipe_request = recipe.CreateRecipeRequest(
        name=name,
        description=description,
        datasets=[recipe.InputDataset(id=dataset_id, target_columns = [target_col])
    ])
    # create_recipe_response
    create_recipe_response = create_recipe_util(client, create_recipe_request)
    return create_recipe_response

def add_iteration_to_recipe(client, recipe_obj):
    # obtain the dataset id so can obtain the target columns
    create_iteration_request = recipe.CreateRecipeIterationRequest(id=recipe_obj.id, target_columns = recipe_obj.datasets[0].target_columns)
    create_iteration_response = client.recipes.CreateRecipeIteration(create_iteration_request)

    # start working with iteration #2
    iteration = len(recipe_obj.iterations) + 1

    recipe_obj = get_recipe_and_wait(client,recipe_id=recipe_obj.id, step='recommendation', expected_status='success', iteration=iteration)
    return recipe_obj

import directives.historical_summ_agg as hsa
from core import PRIMARY_DATASET_ALIAS
import core.recommendation as cr

def create_historical_summ(dataset_id, col, fns, timeframes, id_cols, date_col):
    rec = cr.Recommendation(agent_id = "data_doctor")
    rec.add_dataset(PRIMARY_DATASET_ALIAS, dataset_id)
    #rec.set_problem("Take into account historicla info")
    rec.set_solution("Create historical variables")

    directive =  hsa.HistoricalSummAggDirective({
        "fns": [fns],
        "timeframe": [timeframes],
        "id_cols": id_cols,
        "date_col": date_col},
        action_input = [{"name": col, "dataset": PRIMARY_DATASET_ALIAS}],
        action_output = [{"name": col+"_"+f+"_"+t, "dataset": PRIMARY_DATASET_ALIAS} for f,t in zip(fns, timeframes)]
    )

    rec.add_directive(directive)

    return [rec.get_recommendation_json()]


from directives.extract_from_datetime import *

def create_datetime_features(dataset_id, date_col, features = ["year", "month", "day", "dayName", "dayOfWeek", "monthName", "quarter", "timestamp", "amPm"]):
    rec = cr.Recommendation(agent_id = "zj_manual")
    rec.add_dataset(PRIMARY_DATASET_ALIAS, dataset_id)
    rec.set_solution("Create datetime feature")

    directives = []
    for feature in features:
        feature1 = feature[0].upper() + feature[1:]
        code = (f"Extract{feature1}Directive({{}}," +
              "[{'name':" +
              f"'{date_col}', 'dataset': PRIMARY_DATASET_ALIAS}}], " +
              "[{'name':" +
              f"'{date_col+'_'+feature}', 'dataset': PRIMARY_DATASET_ALIAS}}]" +
              ")")

        rec.add_directive(eval(code))

    return [rec.get_recommendation_json()]

import directives.drop_columns as dc


def get_dataset_id(recipe_obj):
    iter = recipe_obj.iterations
    return iter[len(iter)-1].dataset_id

def drop_columns_actions(dataset_id: str, cols: List[str]):
    rec = cr.Recommendation(agent_id = "zj_manual")
    rec.add_dataset(PRIMARY_DATASET_ALIAS, dataset_id)
#     rec.set_problem("sole_value")
#     rec.set_solution("Drop Columns")
#     rec.add_problem_description("text", f"These columns are recommended to be dropped: {cols}")


    directive = dc.DropColumnsDirective(
        action_input=[
            {
                'dataset': PRIMARY_DATASET_ALIAS,
                'key': 'operand',
                'select': NameIn(names=cols).to_dict(),
            }],
        action_output=None,
        parameters=None
    )
    rec.add_directive(directive)

    return [rec.get_recommendation_json()]

def upload_data(client, path, project_id, name, description):
    """Upload local file to cloud"""
    _, ext = os.path.splitext(path)
    if ext.lower() == ".csv":
        content_type = 'text/csv'
    elif ext.lower() == ".parquet":
        content_type = "application/binary+parquet"

    return util.create_dataset(
        client,
        project_id=project_id,
        name=name,
        description=description,
        data_files=[path],
        )

def finalize_recipe(client, recipe_obj, completed_dataset_name):
    complete_recipe_request = recipe.CompleteRecipeRequest(id=recipe_obj.id, dataset_name=completed_dataset_name)
    complete_recipe_response = client.recipes.CompleteRecipe(complete_recipe_request)
    return complete_recipe_response

def search_project_by_name(client, project_name):
    projects = client.projects.ListUserProjects(project.ListUserProjectsRequest())
    project_ids = [project.id for project in projects.projects if project.name.lower() == project_name.lower()]
    return project_ids[0]

def queue_actions(client, recipe_id, actions, iteration=None, verbose=True):
    if iteration is None:
        iteration = get_current_iteration(client, recipe_id)

    if verbose:
        print("queueing actions")
    add_actions_request = recipe.AddActionsRequest(
        id=recipe_id, # id of the recipe
        iteration=iteration, # remember that iteration number starts from '1', not '0'
        actions=actions)

    add_actions_response = client.recipes.AddActions(add_actions_request)

    if verbose:
        print("checking validity of queueing actions")
    if add_actions_response.invalid_index != -1:
        raise Exception(add_actions_response.error)

    print("checks successful")

    return add_actions_response


def get_current_iteration(client, recipe_id):
    tmp = get_recipe(client, recipe_id)
    iteration = len(tmp.iterations)
    return iteration

def wait_for_commit_actions(client, recipe_id, iteration=None, verbose=True):
    if iteration is None:
        iteration = get_current_iteration(client, recipe_id)

    # wait for iteration to be in file_format success
    if verbose:
        print("wait for commit actions to be carried out")

    get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, iteration=iteration, step="file_format", expected_status='success', verbose = verbose)
    print("done")


def wait_for_suggestions(client, recipe_id, verbose=True):
    # wait for iterationt + 1's recommendations to be ready
    if verbose:
        print("wait for new recommendations to be ready")
    get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, iteration=(iteration + 1), step="recommendation", expected_status='success', verbose = verbose)
    print(commit_actions_response)