Skip to content

Using the Data Preparation Module

Data Preparation Overview

The AI & Analytics Engine's data wrangling module aims to achieve

  1. Build a data processing pipeline that can be applied to new data in a consistent and reproducible manner
  2. Provide AI-powered insights into the data and make smart action recommendations on how to transform the data for better machine learning outcomes

We focus on building reproducible data pipelines to ensure data processing is efficient, consistent, robust, and reproducible. We believe this will yield long term benefits in terms of maintainability. This also means that the coding style is more disciplined and structured. Therefore, if you are familiar with other data wrangling systems, such as pandas, dplyr, data.table etc, then expect some adjustments in your approach to data wrangling on the AI & Analytics Engine platform versus other data wrangling systems.

Furthermore, we strive to provide clear explanations as to why the recommended actions were chosen for recommendation. We believe that by clearly explaining the rationale behind the recommendations, you can impart trust on the recommendations and hence feel comfortable adopting the AI-powered recommendations.

Understand

To fully appreciate the material covered in this page, you will need some familiarity with the concepts of

The Data Preparation Flow

The data preparation process on the AI & Analytics Engine follows this structure

  1. Upload a dataset to a project on the AI & Analytics Engine platform (the Engine)
  2. Create a new recipe tied to the dataset
  3. Confirm the recommended schema, this step can be skipped if the dataset has a schema
  4. Apply the recommended column type casting action to generate a schema for the dataset; the committed actions will be added to the recipe
  5. Request more insights and recommendations from the Engine
  6. Choose from the recommended actions and add manual actions
  7. Commit the actions to produce a new temporary dataset; the committed actions will be added to the recipe
  8. Repeat the last three steps until either there is no more recommendations or you are happy with the data prepared thus far
  9. Finalize the recipe which will compile the committed actions into a reproducible data processing pipeline
  10. The dataset as a result of applying the recipe will also become available for building machine learning models

Note: only the dataset created at the step of recipe finalization can be accessed. All other datasets created from intermediate steps are temporary and non-accessible

Data Preparation via Web Graphical User Interface (GUI)

You can follow the previous sections to create a Project, and add a dataset.

Data Table Viewer and Confirm Schema

Once the data has been added, you can create a new recipe on the dataset which will show the dataset in the UI. The first task you should complete is to confirm a schema. To do that, click on "Problems"

and then click on "Select Solution" to confirm the schema

you can click on "Review" and then "Commit Actions"

at this point you will see the animations on the progression bar which indicates the committed action is being carried out

New Iteration

Once that's done, you may click "Continue" to request more recommendations from the AI & Analytics Engine. You will be greeted with a reminder to set a target. Setting a target is useful as the Engine can tailor the recommendations based on the target selected

You may click "No" to dismiss the target prompt and choose a target with the target selection located near the top of the data table. And once the target is selected, proceed with "Continue"

New Recommendations

You should see another iteration has appeared with an animation on progression bar which indicates the Engine is creating recommendations

Once the animation stops, you should see a set of recommendations for the datasets on the "Problems" panel

Selecting Solutions

You can click into any of the "Problems" and decide whether you wish to adopt the recommendations; if you do click on the "Select Solution" button

The solutions you have selected will be marked as "Problem: Selected"

As with schema confirmation, you need to click "Review" followed by "Commit Actions" on the right panel and then "Continue" on the left panel to commit the actions and request a new iteration.

Finalize the recipe and dataset

You may request more recommendations until the platform has no more recommendations for the dataset, at which point you must click "Finalize" to finalize the recipe.

You will be asked to name the recipe and once confirmed you will move on to building machine learning models covered in the next section.

Data Preparation via API

We can interact with the AI & Analytics Engine via Python

Initialize

As usual, we import the SDK and you need to include the code in the Code for data preparation section in a file to be imported as aia_util

from aiaengine import api
import aia_util

We then establish a connection to the server via a client

client = api.Client()

# path to the file you wish to upload
data_file = "german_credit.csv"

The client will be used extensively in future steps.

Obtain Project ID

In order to add a dataset to the platform we need to first nominate the project to which the data should belong. A project is uniquely identified by its project ID which is different to the project name. We can find the project ID by searching for project name using the below

project_name_to_search_for = "Project 1"
project_id = aia_util.search_project_by_name(client, project_name_to_search_for)

Uploading Data to the platform

We can upload a dataset to the Engine using create_dataset

from datetime import datetime
time_stamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

new_dataset = aia_util.upload_data(client,
                              project_id=project_id,
                              name='German Credit ' + time_stamp,
                              description="",
                              path=data_file)


dataset_id = new_dataset.id

Create a recipe

We can create a recipe as below

recipe_obj = aia_util.create_new_recipe(
    client,
    dataset_id,
    target_col = "default_n24m",
    name = "Process German Credit Data",
    description = "Creating a Data Preparation Recipe/Pipeline"
)
recipe_obj

Get Insights

We can ask the platform for insights

# get list of recommended actions of the first iteration
recommended_actions = aia_util.get_insights(client, recipe_obj)

The platform's UI visualizes the recommended actions for you. However, we also provide the data behind the visualizations so that you can perform your own visualizations if you wish. Running the below in a Jupyter notebook session will display the visualization of the recommendations

aia_util.visualize_recommendations(recommended_actions)

Commit Actions

You can modify the recommended actions before committing them or commit them directly. Once committed, the action interpreter will perform the actions on the dataset which will result in a temporary dataset.

recipe_obj = aia_util.commit_actions(
    client,
    recipe_obj,
    committed_actions=recommended_actions,
    wait = True)

Next Iteration and Manual Actions

You can now request more recommendations by adding a new iteration to the recipe

recipe_obj = aia_util.add_iteration_to_recipe(client, recipe_obj)
recommended_actions2 = aia_util.get_insights(client, recipe_obj)
aia_util.visualize_recommendations(recommended_actions2)

You can add manual actions from the Actions Catalogue by concatenating it with the recommended actions. In the below example, we are adding the drop column action as a manual action. The action drops the column default.

# drop the column "default"
drop_column_action = aia_util.drop_columns_actions(aia_util.get_dataset_id(recipe_obj), ["default"])
commited_actions2 = recommended_actions2 + drop_column_action

We commit the recommended actions and manual actions

recipe_obj = aia_util.commit_actions(
    client,
    recipe_obj,
    committed_actions=commited_actions2,
    wait=True
)

recipe_obj

The user can continue until the platform no longer generates recommendations.

Show the final result

To obtain the result of iteration two, the user can use the get_dataframe function as below. The function returns a pandas.DataFrame.

# get list of dataset's files
df = aia_util.get_dataframe(
    client,
    aia_util.get_dataset_id(recipe_obj)
)

df.head()

Finalize the Recipe

Once the user is content with the recipe or the platform can no longer generate more recommendations, the user can finalize the recipe and package it up as a reproducible data processing pipeline

completed_dataset_name = "German Credit Data (Cleaned)" + time_stamp

complete_recipe_response = aia_util.finalize_recipe(client, recipe_obj, completed_dataset_name)

output_dataset_id = complete_recipe_response.dataset_id

finalize_df = aia_util.get_dataframe(client, output_dataset_id)

print(finalize_df.head())

Code for data preparation

This python script is meant to be imported and used in conjunction with the code above.

File name: aia_util.py

from typing import List

# code to plot thing
import plotly
import json
import pandas as pd
from io import StringIO
import os
import json
import requests
from time import sleep
import pandas as pd

from aiaengine import api
from aiaengine.api import project
from aiaengine.api import recipe
from aiaengine.api import file
from aiaengine.api import dataset
from aiaengine import util


from core.input_selection import NameIn
import directives.historical_summ_agg as hsa
from core import PRIMARY_DATASET_ALIAS
import core.recommendation as cr
from directives.extract_from_datetime import *
import directives.drop_columns as dc

TIMEOUT = 60

def get_recipe_and_wait(client, recipe_id, step, expected_status, iteration, timeout=TIMEOUT, debug = False):
    """Wait until the iteration is in expected status or timeout
    Return the recipe object
    """
    current_step_status = ""
    while True:
        get_recipe_response = client.recipes.GetRecipe(recipe.GetRecipeRequest(id=recipe_id))
        current_step = get_recipe_response.iterations[iteration - 1].step
        status = get_recipe_response.iterations[iteration - 1].status

        if f"{current_step}: {status}" != current_step_status:
            current_step_status = f"{current_step}: {status}"
            print("")
            print(current_step_status, end="")

        print(".", end="")

        if current_step == step and status == expected_status:
            print("") # need this
            return get_recipe_response
        if timeout == 0:
            raise Exception('Timeout when waiting for interation {} in status {}'.format(iteration, status))
        timeout -= 1
        if debug:
            if timeout % 5 == 0:
                print("for DEBUGGING: statrt")
                # print every fift time for debugging
                print(get_recipe_response)
                print("for DEBUGGING: end")

        sleep(1) # wait for 1 second


def get_resultant_data_and_wait(client, dataset_request, timeout=TIMEOUT):
    """Wait until the dataset ready
    """
    while True:
        print(".", end="")
        if client.datasets.GetDataset(dataset_request).status == 'analysed':
            return True
        if timeout == 0:
            raise Exception('Timeout when waiting for dataset')
        timeout -= 1

        sleep(1) # wait for 1 s

def commit_actions(client, recipe_obj, committed_actions, wait=True):
    recipe_id = recipe_obj.id
    iteration = len(recipe_obj.iterations)

    commit_actions_request = recipe.CommitActionsRequest(
        id=recipe_id, # id of the recipe
        iteration=iteration, # remember that iteration number starts from '1', not '0'
        committed_actions=json.dumps(committed_actions))
    commit_actions_response = client.recipes.CommitActions(commit_actions_request)

    recipe_obj = get_recipe_and_wait(client, recipe_id=recipe_id, step="file_format", expected_status='success', iteration=iteration)
    return recipe_obj

def render_problem_insight(insight):
    if insight['type'] == 'vis' and insight['data']['renderer'] == 'plotly':
        item = insight['data']['info']
        return plotly.io.from_json(json.dumps(item))
    if insight['type'] == 'text':
        return insight['data']
    if insight['type'] == 'datatable':
        table_data = insight['data']
        #ZJ: i wanna cry
        if "rows" in table_data.keys():
            row_data = table_data.pop('rows')
            table_data.update({'data': row_data})
        s = StringIO()
        json.dump(table_data, s)
        s.seek(0)
        return pd.read_json(s, orient='split')

def create_recipe_util(client, create_recipe_request, timeout = TIMEOUT):
    print("creating a new blank recipe.", end = "")
    while True:
        # please run again if error is encountered
        try:
            create_recipe_response = client.recipes.CreateRecipe(create_recipe_request)
            return create_recipe_response
        except:
            print(".", end="")
            timeout = timeout - 1
            if timeout == 0:
                raise ValueError("Time out creating a recipe")


def get_dataframe(client, output_dataset_id):
    file_urls = util.get_dataset_file_download_urls(client, output_dataset_id)
    df = pd.concat([pd.read_parquet(url) for url in file_urls])
    return df

def get_recommendations(client, recipe_id, iteration):
    get_recommended_actions_request = recipe.GetRecommendedActionsUrlRequest(id=recipe_id, iteration=iteration)
    get_recommended_actions_response = client.recipes.GetRecommendedActionsUrl(get_recommended_actions_request)

    recommendedActions = json.loads(requests.get(get_recommended_actions_response.url).content)
    return recommendedActions

def get_insights(client, recipe_obj):
    recipe_id = recipe_obj.id
    iteration = len(recipe_obj.iterations)
    get_recipe_response = get_recipe_and_wait(client, recipe_id=recipe_id, step='recommendation', expected_status='success', iteration=iteration)
    get_recommended_actions_request = recipe.GetRecommendedActionsUrlRequest(id=recipe_id, iteration=iteration)
    get_recommended_actions_response = client.recipes.GetRecommendedActionsUrl(get_recommended_actions_request)
    recommended_actions = json.loads(requests.get(get_recommended_actions_response.url).content)
    return recommended_actions

def visualize_recommendations(recommended_actions):
    ui_output = [render_problem_insight(insight) for rec in recommended_actions if "problme" in rec  for insight in rec['problem']['desc']]

    for elem  in ui_output:
        display(elem)

def create_new_recipe(client, dataset_id, target_col, name, description):
    # create a new Recipe
    create_recipe_request = recipe.CreateRecipeRequest(
        name='Process Fannie Mae',
        description='',
        datasets=[recipe.InputDataset(id=dataset_id, target_columns = [target_col])
    ])
    # create_recipe_response
    create_recipe_response = create_recipe_util(client, create_recipe_request)
    return create_recipe_response

def add_iteration_to_recipe(client, recipe_obj):
    # obtain the dataset id so can obtain the target columns
    create_iteration_request = recipe.CreateRecipeIterationRequest(id=recipe_obj.id, target_columns = recipe_obj.datasets[0].target_columns)
    create_iteration_response = client.recipes.CreateRecipeIteration(create_iteration_request)

    iteration = len(recipe_obj.iterations) + 1

    recipe_obj = get_recipe_and_wait(client,recipe_id=recipe_obj.id, step='recommendation', expected_status='success', iteration=iteration)
    return recipe_obj


def create_historical_summ(dataset_id, col, fns, timeframes, id_cols, date_col):
    rec = cr.Recommendation(agent_id = "data_doctor")
    rec.add_dataset(PRIMARY_DATASET_ALIAS, dataset_id)
    rec.set_solution("Create historical variables")

    directive =  hsa.HistoricalSummAggDirective({
        "fns": [fns],
        "timeframe": [timeframes],
        "id_cols": id_cols,
        "date_col": date_col},
        action_input = [{"name": col, "dataset": PRIMARY_DATASET_ALIAS}],
        action_output = [{"name": col+"_"+f+"_"+t, "dataset": PRIMARY_DATASET_ALIAS} for f,t in zip(fns, timeframes)]
    )

    rec.add_directive(directive)

    return [rec.get_recommendation_json()]

def create_datetime_features(dataset_id, date_col, features = ["year", "month", "day", "dayName", "dayOfWeek", "monthName", "quarter", "timestamp", "amPm"]):
    rec = cr.Recommendation(agent_id = "zj_manual")
    rec.add_dataset(PRIMARY_DATASET_ALIAS, dataset_id)
    rec.set_solution("Create datetime feature")

    directives = []
    for feature in features:
        feature1 = feature[0].upper() + feature[1:]
        code = (f"Extract{feature1}Directive({{}}," +
              "[{'name':" +
              f"'{date_col}', 'dataset': PRIMARY_DATASET_ALIAS}}], " +
              "[{'name':" +
              f"'{date_col+'_'+feature}', 'dataset': PRIMARY_DATASET_ALIAS}}]" +
              ")")

        rec.add_directive(eval(code))

    return [rec.get_recommendation_json()]


def get_dataset_id(recipe_obj):
    iter = recipe_obj.iterations
    return iter[len(iter)-1].dataset_id

def drop_columns_actions(dataset_id: str, cols: List[str]):
    rec = cr.Recommendation(agent_id = "zj_manual")
    rec.add_dataset(PRIMARY_DATASET_ALIAS, dataset_id)

    directive = dc.DropColumnsDirective(
        action_input=[
            {
                'dataset': PRIMARY_DATASET_ALIAS,
                'key': 'operand',
                'select': NameIn(names=cols).to_dict(),
            }],
        action_output=None,
        parameters=None
    )
    rec.add_directive(directive)

    return [rec.get_recommendation_json()]

def upload_data(client, path, project_id, name, description):
    """Upload local file to cloud"""
    _, ext = os.path.splitext(path)
    if ext.lower() == ".csv":
        content_type = 'text/csv'
    elif ext.lower() == ".parquet":
        content_type = "application/binary+parquet"

    return util.create_dataset(
        client,
        project_id=project_id,
        name=name,
        description=description,
        data_files=[path],
        )

def finalize_recipe(client, recipe_obj, completed_dataset_name):
    complete_recipe_request = recipe.CompleteRecipeRequest(id=recipe_obj.id, dataset_name=completed_dataset_name)
    complete_recipe_response = client.recipes.CompleteRecipe(complete_recipe_request)
    return complete_recipe_response

def search_project_by_name(client, project_name):
    projects = client.projects.ListUserProjects(project.ListUserProjectsRequest())
    project_ids = [project.id for project in projects.projects if project.name.lower() == project_name.lower()]
    return project_ids[0]