This quickstart builds on the previous one to train and save a machine learning model. Here we cover the next steps: load the model and apply it to a table in real-time.
Specifically, we’ll load our model from a Beneath checkpoint, and use the Pipeline API to derive a new table. We’ll walk through the pipeline script in sections, but here it is in full: make_predictions.py
If you haven’t yet familiarized yourself with Beneath pipelines, you should start with the quickstart Use the Pipeline API.
At the top of the file, we initialize a Beneath pipeline and define the schema for a new table of predictions:
import beneath
import pickle
PREDICTIONS_SCHEMA = """
type Predictions @schema {
id: Int! @key
prediction: Float!
}
"""
# initialize a Beneath Pipeline
p = beneath.Pipeline(parse_args=True, disable_checkpoints=True)
Next we create a function to load and deserialize the model, which we saved to Beneath in the previous quickstart.
Again we use a Beneath checkpointer, but this time to load the model. Every Beneath pipeline embeds a client so the checkpointer is easy to access.
Deserializing the model converts the byte string back into the Python classifier object originally created with sklearn
. We use pickle
, which we imported above, to deserialize the model.
# rudimentary cache to store the predictive model
_clf = None
# load the ML model from Beneath
async def get_clf():
global _clf
if _clf is None:
checkpointer = await p.client.checkpointer(
project_path="USERNAME/PROJECT_NAME",
metatable_name="predictive-model",
)
s = await checkpointer.get("clf_serialized")
_clf = pickle.loads(s)
return _clf
Now that we’ve loaded our machine learning model, we can use it in a function to make predictions:
# use the ML model to make predictions and emit records that
# conform to the PREDICTIONS_SCHEMA
async def predict_outcome(record):
clf = await get_clf()
X = [[record["FEATURE_1"], record["FEATURE_2"], ...]]
prediction = clf.predict_proba(X)[0][0]
yield {
"id": record["id"],
"prediction": prediction,
}
Here we construct the Beneath pipeline that reads our table of features, applies the predict_outcome
function, and outputs the results to a new table named predictions
:
if __name__ == "__main__":
p.description = "A pipeline that makes predictions \
based on the features table"
# consume the features table
features = p.read_table("USERNAME/PROJECT_NAME/features")
# derive a new table and write to Beneath
predictions = p.apply(features, predict_outcome)
p.write_table(
predictions,
"predictions",
schema=PREDICTIONS_SCHEMA,
)
# run the pipeline
p.main()
Run the pipeline, then check out the web console to see the predictions stream in!