Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions pipelines/dspa-kubeflow/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from typing import List, NamedTuple
from kfp import dsl, compiler

PYTHON_IMAGE = "registry.access.redhat.com/ubi10/python-312-minimal@sha256:d68ed3504e63368dba411301af9df8d20d767864a61b58fe47ec7195bb8a4d13" # noqa
DATA_SCIENCE_IMAGE = "registry.redhat.io/rhoai/odh-pipeline-runtime-datascience-cpu-py312-rhel9@sha256:81293ba4e8adaed7e90ceaf03852739169f6fae7c98d1b41a953c5bf26b76522" # noqa


# TODO: define the component to process the dataset
def process_data() -> NamedTuple("outputs", [("texts", List[str]), ("labels", List[int])]):
# Sample dataset
dataset = [
("I love this!", "positive"),
("I hate this!", "negative"),
("This is awesome! ", "positive"),
("This is terrible!", "negative"),
("I really enjoyed the experience.", "positive"),
("I did not like the experience at all. ", "negative"),
(" I love it.", "positive"),
("I hate it.", "negative"),
("I like it.", "positive"),
("I don't like it.", "negative"),
("I like this.", "positive"),
(" That is not good.", "negative"),
("That is so cool, love it.", "positive"),
("I had a bad experience.", "negative"),
("That is awesome", "positive"),
(" I am worried about it. ", "negative"),
("This is wonderful", "positive"),
("This is terrible, don't even try", "negative"),
("This is amazing!", "positive"),
("That is not my cup of tea.", "negative"),
("I like it so much .", "positive"),
]

# Separate texts and labels into different lists
texts = [sample[0].strip() for sample in dataset]
labels = [sample[1] for sample in dataset]

# Convert labels into numbers
class_ids = {"positive": 1, "negative": 0}
labels = [class_ids[label] for label in labels]

# Return texts and labels
outputs = NamedTuple("outputs", texts=List[str], labels=List[int])
return outputs(texts, labels)

# TODO: define the component to train the model
def train_model(texts: list, labels: list) -> float:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Split dataset into training and test sets (even though it's tiny)
X_train, X_test, y_train, y_test = train_test_split(
texts, labels, test_size=0.2, random_state=83
)

# Create a pipeline with a TF-IDF Vectorizer and a Logistic Regression classifier
pipeline = make_pipeline(
TfidfVectorizer(), MultinomialNB() # Converts text into TF-IDF features
)

# Train the model
pipeline.fit(X_train, y_train)

# Make predictions
predictions = pipeline.predict(X_test)

# Evaluate the model
accuracy = accuracy_score(y_test, predictions)

print("Test texts:", X_test)
print("Test predictions:", predictions)
print("Expected prediction:", y_test)

return accuracy


# TODO: define the component to verify the accuracy of the model
def verify_accuracy(accuracy: float, threshold: float):
import sys

if accuracy >= threshold:
print("Model trained successfully")
print(f"Accuracy: {accuracy * 100:.2f}%")
else:
print(f"The model did not achieve the minimum accuracy of {threshold * 100:.2f}%.")
print(f"Accuracy: {accuracy * 100:.2f}%")
sys.exit(1)

# TODO: define the pipeline
def pipeline():
# TODO: Load and preprocess data

# TODO: Train the model

# TODO: Verify the model accuracy


if __name__ == "__main__":
outfile = "pipeline.yaml"
# TODO: compile the pipeline
print(
"Pipeline compiled.\n"
f"Use the RHOAI dashboard to import the '{outfile}' file"
)
134 changes: 134 additions & 0 deletions pipelines/dspa-kubeflow/solution/pipeline-manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from typing import List, NamedTuple
from kfp import dsl, compiler
from kfp.compiler.compiler_utils import KubernetesManifestOptions

PYTHON_IMAGE = "registry.access.redhat.com/ubi10/python-312-minimal@sha256:d68ed3504e63368dba411301af9df8d20d767864a61b58fe47ec7195bb8a4d13" # noqa
DATA_SCIENCE_IMAGE = "registry.redhat.io/rhoai/odh-pipeline-runtime-datascience-cpu-py312-rhel9@sha256:81293ba4e8adaed7e90ceaf03852739169f6fae7c98d1b41a953c5bf26b76522" # noqa

# Component to process the dataset
@dsl.component(base_image=DATA_SCIENCE_IMAGE)
def process_data() -> NamedTuple("outputs", [("texts", List[str]), ("labels", List[int])]):
# Sample dataset
dataset = [
("I love this!", "positive"),
("I hate this!", "negative"),
("This is awesome! ", "positive"),
("This is terrible!", "negative"),
("I really enjoyed the experience.", "positive"),
("I did not like the experience at all. ", "negative"),
(" I love it.", "positive"),
("I hate it.", "negative"),
("I like it.", "positive"),
("I don't like it.", "negative"),
("I like this.", "positive"),
(" That is not good.", "negative"),
("That is so cool, love it.", "positive"),
("I had a bad experience.", "negative"),
("That is awesome", "positive"),
(" I am worried about it. ", "negative"),
("This is wonderful", "positive"),
("This is terrible, don't even try", "negative"),
("This is amazing!", "positive"),
("That is not my cup of tea.", "negative"),
("I like it so much .", "positive"),
]

# Separate texts and labels into different lists
texts = [sample[0].strip() for sample in dataset]
labels = [sample[1] for sample in dataset]

# Convert labels into numbers
class_ids = {"positive": 1, "negative": 0}
labels = [class_ids[label] for label in labels]

# Return texts and labels
outputs = NamedTuple("outputs", texts=List[str], labels=List[int])
return outputs(texts, labels)


# Component to train the model
@dsl.component(base_image=DATA_SCIENCE_IMAGE)
def train_model(texts: list, labels: list) -> float:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Split dataset into training and test sets (even though it's tiny)
X_train, X_test, y_train, y_test = train_test_split(
texts, labels, test_size=0.2, random_state=83
)

# Create a pipeline with a TF-IDF Vectorizer and a Logistic Regression classifier
pipeline = make_pipeline(
TfidfVectorizer(), MultinomialNB() # Converts text into TF-IDF features
)

# Train the model
pipeline.fit(X_train, y_train)

# Make predictions
predictions = pipeline.predict(X_test)

# Evaluate the model
accuracy = accuracy_score(y_test, predictions)

print("Test texts:", X_test)
print("Test predictions:", predictions)
print("Expected prediction:", y_test)

return accuracy


# Component to verify the accuracy of the model
@dsl.component(base_image=PYTHON_IMAGE)
def verify_accuracy(accuracy: float, threshold: float):
import sys

if accuracy >= threshold:
print("Model trained successfully")
print(f"Accuracy: {accuracy * 100:.2f}%")
else:
print(f"The model did not achieve the minimum accuracy of {threshold * 100:.2f}%.")
print(f"Accuracy: {accuracy * 100:.2f}%")
sys.exit(1)


# The pipeline
@dsl.pipeline(name="sentiment-analysis")
def pipeline():
# Load and preprocess data
data_processing_task = process_data()
texts = data_processing_task.outputs["texts"]
labels = data_processing_task.outputs["labels"]

# Train the model
train_task = train_model(texts=texts, labels=labels)
accuracy = train_task.output

# Verify the model accuracy
verify_accuracy(accuracy=accuracy, threshold=0.5)


if __name__ == "__main__":
outfile = "pipeline-manifest.yaml"
# Compile the pipeline
compiler.Compiler().compile(
pipeline,
outfile,
kubernetes_manifest_format=True,
kubernetes_manifest_options=KubernetesManifestOptions(
pipeline_name="sentiment-analysis",
pipeline_display_name="Sentiment Analysis v3",
pipeline_version_name="sentiment-analysis-v3",
pipeline_version_display_name="Sentiment Analysis v3",
namespace="dspa-kubeflow",
include_pipeline_manifest=True
)
)
print(
"Pipeline compiled.\n"
f"Use the Kubernetes API to import the '{outfile}' manifest file"
)

121 changes: 121 additions & 0 deletions pipelines/dspa-kubeflow/solution/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from typing import List, NamedTuple
from kfp import dsl, compiler


PYTHON_IMAGE = "registry.access.redhat.com/ubi10/python-312-minimal@sha256:d68ed3504e63368dba411301af9df8d20d767864a61b58fe47ec7195bb8a4d13" # noqa
DATA_SCIENCE_IMAGE = "registry.redhat.io/rhoai/odh-pipeline-runtime-datascience-cpu-py312-rhel9@sha256:81293ba4e8adaed7e90ceaf03852739169f6fae7c98d1b41a953c5bf26b76522" # noqa

# Component to process the dataset
@dsl.component(base_image=DATA_SCIENCE_IMAGE)
def process_data() -> NamedTuple("outputs", [("texts", List[str]), ("labels", List[int])]):
# Sample dataset
dataset = [
("I love this!", "positive"),
("I hate this!", "negative"),
("This is awesome! ", "positive"),
("This is terrible!", "negative"),
("I really enjoyed the experience.", "positive"),
("I did not like the experience at all. ", "negative"),
(" I love it.", "positive"),
("I hate it.", "negative"),
("I like it.", "positive"),
("I don't like it.", "negative"),
("I like this.", "positive"),
(" That is not good.", "negative"),
("That is so cool, love it.", "positive"),
("I had a bad experience.", "negative"),
("That is awesome", "positive"),
(" I am worried about it. ", "negative"),
("This is wonderful", "positive"),
("This is terrible, don't even try", "negative"),
("This is amazing!", "positive"),
("That is not my cup of tea.", "negative"),
("I like it so much .", "positive"),
]

# Separate texts and labels into different lists
texts = [sample[0].strip() for sample in dataset]
labels = [sample[1] for sample in dataset]

# Convert labels into numbers
class_ids = {"positive": 1, "negative": 0}
labels = [class_ids[label] for label in labels]

# Return texts and labels
outputs = NamedTuple("outputs", texts=List[str], labels=List[int])
return outputs(texts, labels)


# Component to train the model
@dsl.component(base_image=DATA_SCIENCE_IMAGE)
def train_model(texts: list, labels: list) -> float:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Split dataset into training and test sets (even though it's tiny)
X_train, X_test, y_train, y_test = train_test_split(
texts, labels, test_size=0.2, random_state=83
)

# Create a pipeline with a TF-IDF Vectorizer and a Logistic Regression classifier
pipeline = make_pipeline(
TfidfVectorizer(), MultinomialNB() # Converts text into TF-IDF features
)

# Train the model
pipeline.fit(X_train, y_train)

# Make predictions
predictions = pipeline.predict(X_test)

# Evaluate the model
accuracy = accuracy_score(y_test, predictions)

print("Test texts:", X_test)
print("Test predictions:", predictions)
print("Expected prediction:", y_test)

return accuracy


# Component to verify the accuracy of the model
@dsl.component(base_image=PYTHON_IMAGE)
def verify_accuracy(accuracy: float, threshold: float):
import sys

if accuracy >= threshold:
print("Model trained successfully")
print(f"Accuracy: {accuracy * 100:.2f}%")
else:
print(f"The model did not achieve the minimum accuracy of {threshold * 100:.2f}%.")
print(f"Accuracy: {accuracy * 100:.2f}%")
sys.exit(1)


# The pipeline
@dsl.pipeline(name="sentiment-analysis")
def pipeline():
# Load and preprocess data
data_processing_task = process_data()
texts = data_processing_task.outputs["texts"]
labels = data_processing_task.outputs["labels"]

# Train the model
train_task = train_model(texts=texts, labels=labels)
accuracy = train_task.output

# Verify the model accuracy
verify_accuracy(accuracy=accuracy, threshold=0.5)


if __name__ == "__main__":
outfile = "pipeline.yaml"
# Compile the pipeline
compiler.Compiler().compile(pipeline, outfile)
print(
"Pipeline compiled.\n"
f"Use the RHOAI dashboard to import the '{outfile}' file"
)