Flight Delay Prediction With InsightEdge Spark
By Danylo Hurin
We will create a solution based on a decision tree algorithm described by Carol McDonald in her MapR blog post.
InsightEdge Architecture
The following diagram shows the architecture with InsightEdge.
For performing real-time predictions we will use Spark Streaming combined with Apache Kafka, which will simulate an endless and continuous data flow. For the prediction part, we will use Spark Machine Learning and decision tree algorithm. Streamed data will be processed by a decision tree model and results are saved into InsightEdge data grid for future usage.
Our solution consists of two parts (Spark jobs):
- Model Training
- Flight Delay Prediction
Let’s take a look at these two jobs in detail. All codes and instructions can be found on github.
Want to Predict Flight Delays? Try InsightEdge!
‘Model Training’ Spark Job
Model training job is a one-time job designed to model initial training and store it in the data grid, so the model can then be used during the second job. In this post, we won’t go into too much detail about machine learning algorithms and decision tree model training. If you’d like, you can familiarize yourself with it with the help of Carol McDonald’s blog post we mentioned earlier.
First Spark job consists of 3 simple steps:
1. Load data, split it on training and testing part, save testing part for second job usage using the same data set from Carol McDonald’s blog:
flight_data_file = ...
sc = SparkContext(appName="Flight prediction model training")
text_rdd = sc.textFile(flight_data_file)
splits = text_rdd.randomSplit([0.7, 0.3])
(training_rdd, test_rdd) = (splits[0], splits[1])
test_rdd.coalesce(1, True).saveAsTextFile(...)
2. During the second job we will convert flight data into LabeledPoint, so we will need to store integer representations of origin, destination and carrier in the data grid:
all_flights_rdd = text_rdd.map(lambda r: Utils.parse_flight(r))
carrier_mapping = dict(all_flights_rdd.map(lambda flight: flight.carrier).distinct().zipWithIndex().collect())
origin_mapping = dict(all_flights_rdd.map(lambda flight: flight.origin).distinct().zipWithIndex().collect())
destination_mapping = dict(all_flights_rdd.map(lambda flight: flight.destination).distinct().zipWithIndex().collect())
sqlc = SQLContext(sc)
save_mapping(carrier_mapping, DF_SUFFIX + ".CarrierMap", sqlc)
save_mapping(origin_mapping, DF_SUFFIX + ".OriginMap", sqlc)
save_mapping(destination_mapping, DF_SUFFIX + ".DestinationMap", sqlc)
3. Train a model and save it to the data grid:
training_data = training_rdd.map(Utils.parse_flight).map(lambda rdd: Utils.create_labeled_point(rdd, carrier_mapping, origin_mapping, destination_mapping))
classes_count = 2
impurity = "gini"
max_depth = 9
max_bins = 7000
model = DecisionTree.trainClassifier(training_data, classes_count, categorical_features_info, impurity, max_depth, max_bins)
Utils.save_model_to_grid(model, sc)
‘Flight Delay Prediction’ Spark Job
Second Spark job loads model and mappings from the grid, reads data from stream and uses the model for prediction. Predictions will be stored in the grid along with flight data.
Second Spark job in 3 easy steps:
1. Load models and mappings form data grid:
sc = SparkContext(appName="Flight delay prediction job")
model = DecisionTreeModel(Utils.load_model_from_grid(sc))
sqlc = SQLContext(sc)
carrier_mapping = load_mapping(DF_SUFFIX + ".CarrierMap", sqlc)
origin_mapping = load_mapping(DF_SUFFIX + ".OriginMap", sqlc)
destination_mapping = load_mapping(DF_SUFFIX + ".DestinationMap", sqlc)
2. Open Kafka stream and parse lines with flight data:
ssc = StreamingContext(sc, 3)
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
3. Parse a bunch of lines (rdd), make a prediction and save it to the data grid:
lines.foreachRDD(predict_and_save)
def predict_and_save(rdd):
if not rdd.isEmpty():
parsed_flights = rdd.map(Utils.parse_flight)
labeled_points = parsed_flights.map(lambda flight: Utils.create_labeled_point(flight, carrier_mapping, origin_mapping, destination_mapping))
predictions = model.predict(labeled_points.map(lambda x: x.features))
labels_and_predictions = labeled_points.map(lambda lp: lp.label).zip(predictions).zip(parsed_flights).map(to_row())
df = sqlc.createDataFrame(labels_and_predictions)
df.write.format(IE_FORMAT).mode("append").save(DF_SUFFIX + ".FlightWithPrediction")
Running The Demo And Examining Results
To run the demo we need to perform the following steps:
- Start up InsightEdge
- Start up Kafka and create a topic
- Submit Model Training job
- Submit Flight Prediction job
- Push the test data into Kafka’s topic
You can find detailed instructions here to help you run the demo.
After all steps have been completed, we can examine what was stored in the data grid.
Open Zeppelin at http://127.0.0.1:8090 and import a notebook. Below you can see an example of the stored data:
- Day – day of the month
- Origin – origin airport
- Destination – destination airport
- Distance – distance between airports in miles
- Carrier – airline company
- Actual_delay_minutes – actual flight delay in minutes
- Prediction – whether our model made a correct or incorrect prediction
Since we store prediction result alongside with actual flight delays, we can see the ratio of correct and incorrect predictions:
What’s Next?
In this post we built a simple, real-time prediction application using Spark ML combined with Spark Streaming on top of InsightEdge. We haven’t built the perfect solution just yet and there is always room improve it, e.g.:
- You may want to take a look at other ML algorithm or tune existing algorithms to give a better prediction rate.
- Over time this model might become outdated. In order to keep it up to date we will need to come up with a model update strategy. There are two possible solutions you can use:
- Incremental algorithms: A model built on such algorithms will update itself every time it encounters new data.
- Periodical model retraining: Here the solution is to store income data and periodically preform model retraining and substitute an existing model with an updated one.