Recommendation Engine


Internet has now became a large e-marketplace. Millions of users browse millions of pages (or items) of their relevance. User’s relevance to an item can be measured by many parameters like time spent on an item, likes, frequency etc. Recommendation engines can suggest an item to a user that user may find interesting and is not yet aware of (or not yet seen or visited), based on user’s previous affinity to other items and other users’ affinity to this item. The final goal of recommendation engines is to present the user the relevant content.

Recommendation Engine

A recommendation engine can make predictions for unknown user-item associations based on known user-item association. User-item association is user’s affinity to item. Multiple users will have affinity to multiple different items. Recommendation engine builds a prediction model based on known user-item associations (Training data) and then make predictions for unknown user-item associations (Test Data).


An example of user-item association matrix – Users rated some of the items, recommendation engine will predict the unknown associations. For example if, for user U5, predictions are


Based on these predictions item I5 looks most interesting to user U5.

Collaborative Filtering Technique

Collaborative Filtering based recommender engines use matrix factorization model typically (Alternating Least Square method) to fill the missing entries of user-item association. Item ratings are influenced by a set of factors, such as user’s choices and interests, called Latent Factors.

These factor vary from user to user, so it is difficult to estimate the impact of these factors on ratings. Collaborative Filtering uses mathematical techniques to infer these latent factors. Latent Factors are hidden factors, of user’s or item’s, that can perfectly predict an item’s rating. For example, for a movie rating, latent factors could be age, sex and movie genre. More the latent factors, better will be the predictions.

Collaborative Filtering with Apache Spark

Apache spark provides two variations of Collaborative Filtering- Explicit Feedback and Implicit Feedback. With explicit feedback, collaborative filtering algorithm treats user-item matrix as explicit preferences given by user. In a real world example user’s preferences may be given in terms of implicit feedback (views, clicks, likes etc). Apache Spark MLlib processes implicit feedback differently unlike standard approach with explicit feedback data. Collaborative Filtering in Apache Spark uses various parameters, their definition and usage can be found on Collaborative Filtering specifications.

A Movie Recommender example

In this example we will implement a movie recommender which will recommend you a list of 10 movies that you have not rated yet.

Inputs : ratings.csv, movies.csv and number of iterations to the algorithm (are 10 in sample code)

ratings.csv – a file containing ratings of movies given by different users

Format: user-id,movie-id,rating

Add your ratings to this file with user id 0.

Sample data – ratings.csv


movies.csv – a file containing movie description

Format: movie-id, movie-name.

Sample data – movies.csv

1,Toy Story (1995)
2,Jumanji (1995)
3,Grumpier Old Men (1995)
4,Waiting to Exhale (1995)
5,Father of the Bride Part II (1995)
6,Heat (1995)

A sample data-set can be downloaded from MovieLensDataset. Edit this dataset according to above format and insert your ratings as user-id 0.

Output should be the top N movie recommendations for user id 0, sorted by predicted rating in descending order.

A sample output can be top 5 recommendation for user –

MovieId 	Movie Name 			Predicted Rating
200		Avengers			 0.70
267		Frozen				 0.60
78		Dark Knight		 	 0.55
89		Minions 			 0.53
5000		Man of Steel 			 0.50

Steps to run with Apache Spark

To run the above example with Apache Spark you should be having at least a default installation of Spark

  1. Create a jar of java code file provided
  2. From SPARK_HOME directory run following command. Make sure the required libraries exists in the directories mentioned in the command.
bin/spark-submit –jars mllib/spark-mllib_2.10-1.0.0.jar –class “mllib.example.RecommendationEngine” –master local[4] spark-mllib-example/example.jar spark-mllib-example/ml-latest-small/ratings.csv spark-mllib-example/ml-latest-small/movies.csv 10


Make sure that you have all required jars in the specified paths.


Java Source Code

package mllib.example;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;

import scala.Tuple2;

public class RecommendationEngine {

	public static void main(String[] args) {
		// Turn off unnecessary logging
		// Create Java spark context
		SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example");
		JavaSparkContext sc = new JavaSparkContext(conf);

		// Read user-item rating file. format - userId,itemId,rating
		JavaRDD<String> userItemRatingsFile = sc.textFile(args[0]);
		// Read item description file. format - itemId, itemName, Other Fields,..
		JavaRDD<String> itemDescritpionFile = sc.textFile(args[1]);
		// Map file to Ratings(user,item,rating) tuples
		JavaRDD<Rating> ratings = Function<String, Rating>() {
			public Rating call(String s) {
				String[] sarray = s.split(",");
				return new Rating(Integer.parseInt(sarray[0]), Integer
						.parseInt(sarray[1]), Double.parseDouble(sarray[2]));
		// Create tuples(itemId,ItemDescription), will be used later to get names of item from itemId
		JavaPairRDD<Integer,String> itemDescritpion = itemDescritpionFile.mapToPair(
				new PairFunction<String, Integer, String>() {
			public Tuple2<Integer, String> call(String t) throws Exception {
				String[] s = t.split(",");
				return new Tuple2<Integer,String>(Integer.parseInt(s[0]), s[1]);

		// Build the recommendation model using ALS
		int rank = 10; // 10 latent factors
		int numIterations = Integer.parseInt(args[2]); // number of iterations
		MatrixFactorizationModel model = ALS.trainImplicit(JavaRDD.toRDD(ratings),
				rank, numIterations);
		//ALS.trainImplicit(arg0, arg1, arg2)

		// Create user-item tuples from ratings
		JavaRDD<Tuple2<Object, Object>> userProducts = ratings
				.map(new Function<Rating, Tuple2<Object, Object>>() {
					public Tuple2<Object, Object> call(Rating r) {
						return new Tuple2<Object, Object>(r.user(), r.product());
		// Calculate the itemIds not rated by a particular user, say user with userId = 1
		JavaRDD<Integer> notRatedByUser = userProducts.filter(new Function<Tuple2<Object,Object>, Boolean>() {
			public Boolean call(Tuple2<Object, Object> v1) throws Exception {
				if (((Integer) v1._1).intValue() != 0) {
					return true;
				return false;
		}).map(new Function<Tuple2<Object,Object>, Integer>() {
			public Integer call(Tuple2<Object, Object> v1) throws Exception {
				return (Integer) v1._2;
		// Create user-item tuples for the items that are not rated by user, with user id 1
		JavaRDD<Tuple2<Object, Object>> itemsNotRatedByUser = notRatedByUser
				.map(new Function<Integer, Tuple2<Object, Object>>() {
					public Tuple2<Object, Object> call(Integer r) {
						return new Tuple2<Object, Object>(0, r);
		// Predict the ratings of the items not rated by user for the user
		JavaRDD<Rating> recomondations = model.predict(itemsNotRatedByUser.rdd()).toJavaRDD().distinct();
		// Sort the recommendations by rating in descending order
		recomondations = recomondations.sortBy(new Function<Rating,Double>(){
			public Double call(Rating v1) throws Exception {
				return v1.rating();
		}, false, 1);	
		// Get top 10 recommendations
		JavaRDD<Rating> topRecomondations = sc.parallelize(recomondations.take(10));
		// Join top 10 recommendations with item descriptions
		JavaRDD<Tuple2<Rating, String>> recommendedItems = topRecomondations.mapToPair(
				new PairFunction<Rating, Integer, Rating>() {
			public Tuple2<Integer, Rating> call(Rating t) throws Exception {
				return new Tuple2<Integer,Rating>(t.product(),t);
		//Print the top recommendations for user 1.
		recommendedItems.foreach(new VoidFunction<Tuple2<Rating,String>>() {
			public void call(Tuple2<Rating, String> t) throws Exception {
				System.out.println(t._1.product() + "\t" + t._1.rating() + "\t" + t._2);


You might also like