Location based analytics for Taxi share

Index

  1. Introduction
  2. Algorithm
  3. Implementation

3.1 Data frame creation

3.2 Midpoints, clustering and grouping

3.3 Cartesian product and initial filtering

3.4 Finding common area and sorting

3.5 Group by tripid and get the top 10

3.6 Finding the fares

4. Conclusion

5. References

1.Introduction

Location based analytics has become one of the most important research areas in present day computer science . According a survey most of the cars will be connected to Internet of things. Connected vehicles are projected to generate 25GB of data per hour, which can be analyzed to provide real-time monitoring and apps which will lead to new concepts of mobility and vehicle usage. One of the 10 major areas in which big data is currently being used to excellent advantage is in improving cities.

Uber is using big data to perfect its processes in everything from calculating pricing to finding the optimal positioning of cars to maximize profits. In this project, we are going to use public Uber trip data to discuss building a real-time algorithm. In this algorithm Uber trip details are fetched and the most feasible trips so that the fares can be calculated based the trips and suggested to the users. Apache spark platform is used with the programming language Scala.

2. Algorithm

The algorithm has the following steps:

step 1- to fetch the data

Step 2- find the midpoints

step 3- get kmeans cluster output

step 4- group by the prediction

Step 5- get the Cartesian product in each cluster

step 6- check if pick up points are less than half mile distance and delete rest of the trips

step 7-find the common area of circles

step 8-sort the areas

step 9-get the first five highest areas.

step 10-for each couple of trips in the above five pairs find the distances of the two trips


A flow chart of the same algorithm is shown below:

Flow chart of the algorithm
3. Implementation


3.1 Data frame creation

In this project the data set is prepared by modifying the existing data obtained from Uber. The data set contains some trips near NYC. According to this project the data set is given as direct input and saved as a data frame.


scala> val schema=StructType(Array(StructField("dt",TimestampType,true),

| | StructField("lat",DoubleType, true),

| | StructField("lon",DoubleType, true),

| | StructField("lat1",DoubleType, true),

| | StructField("lon1",DoubleType, true),

| | StructField("tripid",StringType,true)))

schema: org.apache.spark.sql.types.StructType = StructType(StructField(dt,TimestampType,true), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true), StructField(lat1,DoubleType,true), StructField(lon1,DoubleType,true), StructField(tripid,StringType,true))

As shown in the figure schema contains trip id, starting location, and ending location of the trips. Even though date and time are present, they are not used in this project as we don’t require them.

scala> val df = spark.read.option("header",true).option("dateFormat", "mm/dd/yyyy hh:mm:ss").schema(schema).csv("/home/raj/Downloads/uber-raw-data-apr14.csv")

df: org.apache.spark.sql.DataFrame = [dt: timestamp, lat: double ... 4 more fields]

The above command is used to read the data from the file into data frame df.

3.2 Midpoints, clustering and grouping

To find the trips which are in nearby and cluster them, the midpoints of the starting and ending points are found.

A method called mid is declared to find midpoint of two doubles.

It is defined as a User defined function.

val df2 = df.withColumn("lat_mid", midudf(col("lat"), col("lat1")))

val df3 = df2.withColumn("lon_mid", midudf(col("lon"), col("lon1")))

Two new columns are added with the respective outputs of midpoints.

Modified data frame is shown in above picture.

K means clustering algorithm is used to cluster the above data frame based on midpoints of the trip.

Note : The clustering is done based on the midpoints in order to find the relevant shareable trips. This midpoint method may produce trips that may contain bidirectional trips (trips heading in opposite directions), but further filtering will help to remove the trips facing opposite directions.

val featureCols = Array("lat_mid","lon_mid")

val assembler =new VectorAssembler().setInputCols(featureCols).setOutputCol("features")

val df4= assembler.transform(df3)

val Array(trainingData,testData)=df4.randomSplit(Array(0.7,0.3),5043)

val kmeans=new KMeans().setK(8).setFeaturesCol("features").setPredictionCol("prediction")

I set the number of clusters to 8 as the trip area which we are dealing with is only New York area. A new column with the name prediction is created and the cluster number is stored in the prediction.

val model=kmeans.fit(df4)

model.clusterCenters.foreach(println)

Val categories=model.transform(df40)


As shown in the above picture a new column is formed with the prediction number.

val preds = categories.select("prediction").distinct.collect.flatMap(_.toSeq)

val byPreds = preds.map(pred => categories.where($"prediction" <=> pred))

A new data frame called byPreds is formed which has the trips grouped by clusters and stored in arrays.

In the above picture when we try to access the 0th array of byPreds data frame all the trips which are grouped by cluster 1 are shown. In the same way, all the clusters are stored in the in the byPreds in the form of arrays.

Note: byPreds is Array{Arrays[]}

3.3 Cartesian product and initial filtering

A new data frame byPredsNear is formed by self joining each cluster to itself and then filtering the trips based on the distance between the pickup points as well as filtering the same trips joined with itself.


To elaborate on filtering consider the above example; if there are three trips(A1-B1,A2-B2,A3-B3) in a cluster, when we do the Cartesian product we get 9 rows as output in that cluster. We have to delete the row where A1-B1 paired with A1-B91 and the same way for other trips. A method called filterSametrips is written to do this job.

And then consider A3-B3, this trip midpoint may be same as other trips but the trip direction is in opposite direction so this should not be paired with first two trips. To remove these kind of trips I am comparing the pickup points such that they should be lesser than a particular distance. A method called filterDist2 is written to calculate the distance as well as comparing and returning a Boolean value(It takes Row as input and returns boolean).


val byPredsNear = byPreds.map(g => g.as("trip1").join(g.as("trip2")).filter(filterSameTrip).filter(filterDist2))

After executing the above line byPreadsNear will have the filtered Cartesian product of the trips.


Modified data frame looks as shown in the above picture.

3.4 Finding common area and sorting

In the remaining rows, common circular area is found between the trips and a new column is added with the areas.

To find the areas I declared two methods called filterDist which returns the distance and findCommonArea which finds common circular area between two trips


val dfWithArea = byPredsNear.map(g => g.withColumn("area", findCommonUdf(col("trip1.lat"), col("trip1.lon"), col("trip1.lat_mid"), col("trip1.lon_mid"), col("trip2.lat"), col("trip2.lon"), col("trip2.lat_mid"), col("trip2.lon_mid"))))

When the above code is executed a new column is introduced to data frame byPredsNear and modified data frame is stored in dfWithArea.

val dfWithArea = byPredsNear.map(g => g.withColumn("area", findCommonUdf(col("trip1.lat"), col("trip1.lon"), col("trip1.lat_mid"), col("trip1.lon_mid"), col("trip2.lat"), col("trip2.lon"), col("trip2.lat_mid"), col("trip2.lon_mid"))))

A new column area can be found in the above picture.

3.5 Group by tripid and get the top 10

In each cluster, I grouped all the rows by first trip and clustered them inside the cluster which gives us an array inside an array inside an array.

val byTrips = dfWithArea.map(clus => {

val ids = clus.select("trip1.tripid").distinct.collect.flatMap(_.toSeq)

ids.map(id => clus.where($"trip1.tripid" <=> id))

})

In the above picture we can observe that when we try to print byTrips(0)(0) we are able to see all the common trips for T401(observe the first tripid in all the rows). In every cluster, all the trips are clustered in the same way.

In the cluster inside the cluster I.e. for each trip id sort the areas and get the top 10 trips for each trip.

val selectedTrips = byTrips.map(g => (g.map(c => c.na.drop(Array("area")).orderBy(desc("area")).limit(10))))

In the above picture selectedTrips(0)(0).show is executed which resulted in the top 10 trips which have the common area.

3.6 Finding the fares

To find the fares I wrote a method called findFares, In this method the minimum distance possibility will be calculated and the total fare is calculated based on assumption 1 mile cost 2.35$.

Note: Real time fare calculation will be different because time will come in to scenario. This fare calculation is approximation only.

val stripsWithfares = selectedTrips.map(f=>(f.map(g => g.withColumn("fare in dollars", findfareUdf(col("trip1.lat"), col("trip1.lon"), col("trip1.lat1"), col("trip1.lon1"), col("trip2.lat"), col("trip2.lon"), col("trip2.lat1"), col("trip2.lon1"))))))


After executing the above code a new column fare in dollars is included and also, we can see the fares estimation in each trip. These 10 trips can be sent to the user as suggestions in the dashboard.



4. Conclusion

Through this algorithm, applications like uber pool can be customized for user convenience instead of being determined by the driver. Fares will also be optimized for greater efficiency for the user as well. The present order of the algorithm is kn^2 where k is number of clusters(k will be way lesser than n and can be considered as constant in most cases) and n is number of trips. But it can be reduced if the certain cases are found where redundancy can be eliminated. The trip filtering algorithm also has several other uses. When combined with demographic data, it is conceivable that a predicative model for where drivers should be on a given night can be obtained so that users sharing the same destination (on the night of a concert or other public event) can find uber pools that maximize earnings for the driver as well as fares for the user.

5. References

https://www.mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-1

Vishwanath K.

Director, Cyber Fraud at Fidelity Investments

8y

Good one!

Very well explained!

Hari Krishnan

Cloud Engineer @TCS | Data Engineer | Project Manager

8y

Nice one! A very clear approach..

To view or add a comment, sign in

Insights from the community

Others also viewed

Explore topics