Motivation
Internet has now became a large e-marketplace. Millions of users browse millions of pages (or items) of their relevance. User’s relevance to an item can be measured by many parameters like time spent on an item, likes, frequency etc. Recommendation engines can suggest an item to a user that user may find interesting and is not yet aware of (or not yet seen or visited), based on user’s previous affinity to other items and other users’ affinity to this item. The final goal of recommendation engines is to present the user the relevant content.
Recommendation Engine
A recommendation engine can make predictions for unknown user-item associations based on known user-item association. User-item association is user’s affinity to item. Multiple users will have affinity to multiple different items. Recommendation engine builds a prediction model based on known user-item associations (Training data) and then make predictions for unknown user-item associations (Test Data).
An example of user-item association matrix – Users rated some of the items, recommendation engine will predict the unknown associations. For example if, for user U5, predictions are
Based on these predictions item I5 looks most interesting to user U5.
Collaborative Filtering Technique
Collaborative Filtering based recommender engines use matrix factorization model typically (Alternating Least Square method) to fill the missing entries of user-item association. Item ratings are influenced by a set of factors, such as user’s choices and interests, called Latent Factors.
These factor vary from user to user, so it is difficult to estimate the impact of these factors on ratings. Collaborative Filtering uses mathematical techniques to infer these latent factors. Latent Factors are hidden factors, of user’s or item’s, that can perfectly predict an item’s rating. For example, for a movie rating, latent factors could be age, sex and movie genre. More the latent factors, better will be the predictions.
Collaborative Filtering with Apache Spark
Apache spark provides two variations of Collaborative Filtering- Explicit Feedback and Implicit Feedback. With explicit feedback, collaborative filtering algorithm treats user-item matrix as explicit preferences given by user. In a real world example user’s preferences may be given in terms of implicit feedback (views, clicks, likes etc). Apache Spark MLlib processes implicit feedback differently unlike standard approach with explicit feedback data. Collaborative Filtering in Apache Spark uses various parameters, their definition and usage can be found on Collaborative Filtering specifications.
A Movie Recommender example
In this example we will implement a movie recommender which will recommend you a list of 10 movies that you have not rated yet.
Inputs : ratings.csv, movies.csv and number of iterations to the algorithm (are 10 in sample code)
ratings.csv – a file containing ratings of movies given by different users
Format: user-id,movie-id,rating
Add your ratings to this file with user id 0.
Sample data – ratings.csv
0,1,5.0 0,3,3.0 0,5,4.5 0,6,5.0 0,10,4.0 1,6,2.0 1,22,3.0 2,32,2.0 2,50,5.0 3,110,4.0 4,164,3.0 4,198,3.0 |
movies.csv – a file containing movie description
Format: movie-id, movie-name.
Sample data – movies.csv
1,Toy Story (1995) 2,Jumanji (1995) 3,Grumpier Old Men (1995) 4,Waiting to Exhale (1995) 5,Father of the Bride Part II (1995) 6,Heat (1995) |
A sample data-set can be downloaded from MovieLensDataset. Edit this dataset according to above format and insert your ratings as user-id 0.
Output should be the top N movie recommendations for user id 0, sorted by predicted rating in descending order.
A sample output can be top 5 recommendation for user –
MovieId Movie Name Predicted Rating 200 Avengers 0.70 267 Frozen 0.60 78 Dark Knight 0.55 89 Minions 0.53 5000 Man of Steel 0.50 |
Steps to run with Apache Spark
To run the above example with Apache Spark you should be having at least a default installation of Spark
- Create a jar of java code file provided
- From SPARK_HOME directory run following command. Make sure the required libraries exists in the directories mentioned in the command.
bin/spark-submit –jars mllib/spark-mllib_2.10-1.0.0.jar –class “mllib.example.RecommendationEngine” –master local[4] spark-mllib-example/example.jar spark-mllib-example/ml-latest-small/ratings.csv spark-mllib-example/ml-latest-small/movies.csv 10
|
Make sure that you have all required jars in the specified paths.
References
- https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html/
- https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html/
- http://www.dima.tu-berlin.de/
Java Source Code
package mllib.example; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.mllib.recommendation.ALS; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import scala.Tuple2; public class RecommendationEngine { public static void main(String[] args) { // Turn off unnecessary logging Logger.getLogger("org").setLevel(Level.OFF); Logger.getLogger("akka").setLevel(Level.OFF); // Create Java spark context SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example"); JavaSparkContext sc = new JavaSparkContext(conf); // Read user-item rating file. format - userId,itemId,rating JavaRDD<String> userItemRatingsFile = sc.textFile(args[0]); // Read item description file. format - itemId, itemName, Other Fields,.. JavaRDD<String> itemDescritpionFile = sc.textFile(args[1]); // Map file to Ratings(user,item,rating) tuples JavaRDD<Rating> ratings = userItemRatingsFile.map(new Function<String, Rating>() { public Rating call(String s) { String[] sarray = s.split(","); return new Rating(Integer.parseInt(sarray[0]), Integer .parseInt(sarray[1]), Double.parseDouble(sarray[2])); } }); // Create tuples(itemId,ItemDescription), will be used later to get names of item from itemId JavaPairRDD<Integer,String> itemDescritpion = itemDescritpionFile.mapToPair( new PairFunction<String, Integer, String>() { @Override public Tuple2<Integer, String> call(String t) throws Exception { String[] s = t.split(","); return new Tuple2<Integer,String>(Integer.parseInt(s[0]), s[1]); } }); // Build the recommendation model using ALS int rank = 10; // 10 latent factors int numIterations = Integer.parseInt(args[2]); // number of iterations MatrixFactorizationModel model = ALS.trainImplicit(JavaRDD.toRDD(ratings), rank, numIterations); //ALS.trainImplicit(arg0, arg1, arg2) // Create user-item tuples from ratings JavaRDD<Tuple2<Object, Object>> userProducts = ratings .map(new Function<Rating, Tuple2<Object, Object>>() { public Tuple2<Object, Object> call(Rating r) { return new Tuple2<Object, Object>(r.user(), r.product()); } }); // Calculate the itemIds not rated by a particular user, say user with userId = 1 JavaRDD<Integer> notRatedByUser = userProducts.filter(new Function<Tuple2<Object,Object>, Boolean>() { @Override public Boolean call(Tuple2<Object, Object> v1) throws Exception { if (((Integer) v1._1).intValue() != 0) { return true; } return false; } }).map(new Function<Tuple2<Object,Object>, Integer>() { @Override public Integer call(Tuple2<Object, Object> v1) throws Exception { return (Integer) v1._2; } }); // Create user-item tuples for the items that are not rated by user, with user id 1 JavaRDD<Tuple2<Object, Object>> itemsNotRatedByUser = notRatedByUser .map(new Function<Integer, Tuple2<Object, Object>>() { public Tuple2<Object, Object> call(Integer r) { return new Tuple2<Object, Object>(0, r); } }); // Predict the ratings of the items not rated by user for the user JavaRDD<Rating> recomondations = model.predict(itemsNotRatedByUser.rdd()).toJavaRDD().distinct(); // Sort the recommendations by rating in descending order recomondations = recomondations.sortBy(new Function<Rating,Double>(){ @Override public Double call(Rating v1) throws Exception { return v1.rating(); } }, false, 1); // Get top 10 recommendations JavaRDD<Rating> topRecomondations = sc.parallelize(recomondations.take(10)); // Join top 10 recommendations with item descriptions JavaRDD<Tuple2<Rating, String>> recommendedItems = topRecomondations.mapToPair( new PairFunction<Rating, Integer, Rating>() { @Override public Tuple2<Integer, Rating> call(Rating t) throws Exception { return new Tuple2<Integer,Rating>(t.product(),t); } }).join(itemDescritpion).values(); //Print the top recommendations for user 1. recommendedItems.foreach(new VoidFunction<Tuple2<Rating,String>>() { @Override public void call(Tuple2<Rating, String> t) throws Exception { System.out.println(t._1.product() + "\t" + t._1.rating() + "\t" + t._2); } }); } } |