Skip to main content

Data Preparation Guide

Learn how to prepare data in Uniflow for the ML pipeline on Michelangelo using Ray's distributed processing capabilities.

What You'll Learn

  • Apply preprocessing at scale with Ray
  • Create train/validation/test splits
  • Handle large datasets efficiently

Preprocessing Patterns

Distributed Preprocessing with Ray

import ray.data as rd
from michelangelo.sdk.workflow.variables import DatasetVariable

dataset = rd.read_parquet("s3://bucket/data.parquet") \
.map_batches(clean_missing_values, batch_size=1000) \
.map_batches(normalize_features) \
.map_batches(encode_categories)

train_ds, val_ds = dataset.train_test_split(test_size=0.2)
train_dv = DatasetVariable(value=train_ds)
val_dv = DatasetVariable(value=val_ds)

Common Preprocessing Functions

TaskImplementation PatternNotes
Missing Valuesdf.fillna() or df.dropna()Use inside map_batches
NormalizationStandardScaler or MinMaxScalerApply per batch for efficiency
Categorical Encodingpd.get_dummies() or LabelEncoderMaintain consistent encoding
Text TokenizationHuggingFace tokenizersFor NLP workflows
Image Preprocessingtorchvision.transformsFor computer vision

Data Splitting Strategies

Random Split

train_ds, temp_ds = dataset.train_test_split(test_size=0.3)
val_ds, test_ds = temp_ds.train_test_split(test_size=0.5)

Temporal Split (Time Series)

train_ds = dataset.filter(lambda x: x["date"] <= "2023-01-01")
val_ds = dataset.filter(lambda x: "2023-01-01" < x["date"] <= "2023-06-01")

DatasetVariable: Michelangelo's Dataset Abstraction

Michelangelo provides DatasetVariable to handle datasets across different frameworks with automatic storage and serialization.

Flexible Dataset Usage

FrameworkUsageLoad Method
Ray DatasetsDatasetVariable(value=ray_dataset)load_ray_dataset()
Pandas DataFramesDatasetVariable(value=pandas_df)load_pandas_dataframe()
Spark DataFramesDatasetVariable(value=spark_df)load_spark_dataframe()

Direct Dataset Usage

FrameworkDirect UsageWhen to Use
Ray Datasetsrd.read_parquet(...)Large-scale processing
Pandas DataFramespd.read_csv(...)Small datasets
Spark DataFramesspark.read.parquet(...)Large-scale processing
@uniflow.task()
def process_data_directly(data_path: str):
dataset = rd.read_parquet(data_path) \
.map_batches(preprocessing_function) \
.train_test_split(test_size=0.2)
return dataset

Creating DatasetVariables

import ray.data as rd
from michelangelo.sdk.workflow.variables import DatasetVariable

ray_dataset = rd.read_parquet("s3://bucket/data.parquet")
dataset_var = DatasetVariable(value=ray_dataset)

import pandas as pd
pandas_df = pd.read_csv("local_file.csv")
dataset_var = DatasetVariable(value=pandas_df)

spark_df = spark.read.parquet("s3://bucket/data.parquet")
dataset_var = DatasetVariable(value=spark_df)

Automatic Storage in Uniflow Tasks

@uniflow.task()
def prepare_training_data(data_path: str):
dataset = rd.read_parquet(data_path).map_batches(clean_and_normalize)
train_ds, val_ds = dataset.train_test_split(test_size=0.2)
train_dv = DatasetVariable(value=train_ds)
train_dv.save_ray_dataset()
val_dv = DatasetVariable(value=val_ds)
val_dv.save_ray_dataset()
return {
"train": train_dv,
"validation": val_dv
}
@uniflow.task()
def use_prepared_data(datasets: dict):
datasets["train"].load_ray_dataset()
datasets["validation"].load_ray_dataset()
train_data = datasets["train"].value
val_data = datasets["validation"].value

Integration with Trainer SDK

trainer_param = LightningTrainerParam(
create_model=create_model_function,
model_kwargs=model_config,
train_data=train_dv.value,
validation_data=val_dv.value,
batch_size=32,
num_epochs=10
)

trainer = LightningTrainer(trainer_param)
result = trainer.train(run_config, scaling_config)

Best Practices

  • Use Parquet for large datasets
  • Process in batches
  • Validate data after preprocessing
  • Leverage uniflow tasks for caching and reproducibility

Next Steps

Common Issues

  • Out of memory → Reduce batch size or use Spark
  • Slow preprocessing → Increase num_cpus
  • Inconsistent results → Ensure deterministic preprocessing