Sarvesh Kumar

How To Build A Recommendation Engine Using Apache Spark

Recommendation Engine

Motivation

Internet has now became a large e-marketplace. Millions of users browse millions of pages (or items) of their relevance. User’s relevance to an item can be measured by many parameters like time spent on an item, likes, frequency etc. Recommendation engines can suggest an item to a user that user may find interesting and is not yet aware of (or not yet seen or visited), based on user’s previous affinity to other items and other users’ affinity to this item. The final goal of recommendation engines is to present the user the relevant content.

Recommendation Engine

A recommendation engine can make predictions for unknown user-item associations based on known user-item association. User-item association is user’s affinity to item. Multiple users will have affinity to multiple different items. Recommendation engine builds a prediction model based on known user-item associations (Training data) and then make predictions for unknown user-item associations (Test Data).

sc1

An example of user-item association matrix – Users rated some of the items, recommendation engine will predict the unknown associations. For example if, for user U5, predictions are

sc2

Based on these predictions item I5 looks most interesting to user U5.

Collaborative Filtering Technique

Collaborative Filtering based recommender engines use matrix factorization model typically (Alternating Least Square method) to fill the missing entries of user-item association. Item ratings are influenced by a set of factors, such as user’s choices and interests, called Latent Factors.

These factor vary from user to user, so it is difficult to estimate the impact of these factors on ratings. Collaborative Filtering uses mathematical techniques to infer these latent factors. Latent Factors are hidden factors, of user’s or item’s, that can perfectly predict an item’s rating. For example, for a movie rating, latent factors could be age, sex and movie genre. More the latent factors, better will be the predictions.

Collaborative Filtering with Apache Spark

Apache spark provides two variations of Collaborative Filtering- Explicit Feedback and Implicit Feedback. With explicit feedback, collaborative filtering algorithm treats user-item matrix as explicit preferences given by user. In a real world example user’s preferences may be given in terms of implicit feedback (views, clicks, likes etc). Apache Spark MLlib processes implicit feedback differently unlike standard approach with explicit feedback data. Collaborative Filtering in Apache Spark uses various parameters, their definition and usage can be found on Collaborative Filtering specifications.

A Movie Recommender example

In this example we will implement a movie recommender which will recommend you a list of 10 movies that you have not rated yet.

Inputs : ratings.csv, movies.csv and number of iterations to the algorithm (are 10 in sample code)

ratings.csv – a file containing ratings of movies given by different users

Format: user-id,movie-id,rating

Add your ratings to this file with user id 0.

Sample data – ratings.csv

0,1,5.0
0,3,3.0
0,5,4.5
0,6,5.0
0,10,4.0
1,6,2.0
1,22,3.0
2,32,2.0
2,50,5.0
3,110,4.0
4,164,3.0
4,198,3.0

movies.csv – a file containing movie description

Format: movie-id, movie-name.

Sample data – movies.csv

1,Toy Story (1995)
2,Jumanji (1995)
3,Grumpier Old Men (1995)
4,Waiting to Exhale (1995)
5,Father of the Bride Part II (1995)
6,Heat (1995)

A sample data-set can be downloaded from MovieLensDataset. Edit this dataset according to above format and insert your ratings as user-id 0.

Output should be the top N movie recommendations for user id 0, sorted by predicted rating in descending order.

A sample output can be top 5 recommendation for user –

MovieId 	Movie Name 			Predicted Rating
200		Avengers			 0.70
267		Frozen				 0.60
78		Dark Knight		 	 0.55
89		Minions 			 0.53
5000		Man of Steel 			 0.50

Steps to run with Apache Spark

To run the above example with Apache Spark you should be having at least a default installation of Spark

  1. Create a jar of java code file provided
  2. From SPARK_HOME directory run following command. Make sure the required libraries exists in the directories mentioned in the command.
bin/spark-submit –jars mllib/spark-mllib_2.10-1.0.0.jar –class “mllib.example.RecommendationEngine” –master local[4] spark-mllib-example/example.jar spark-mllib-example/ml-latest-small/ratings.csv spark-mllib-example/ml-latest-small/movies.csv 10

 

Make sure that you have all required jars in the specified paths.

References

Java Source Code

package mllib.example;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;

import scala.Tuple2;


public class RecommendationEngine {

	public static void main(String[] args) {
		
		// Turn off unnecessary logging
		Logger.getLogger("org").setLevel(Level.OFF);
		Logger.getLogger("akka").setLevel(Level.OFF);
		
		// Create Java spark context
		SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example");
		JavaSparkContext sc = new JavaSparkContext(conf);

		// Read user-item rating file. format - userId,itemId,rating
		JavaRDD<String> userItemRatingsFile = sc.textFile(args[0]);
		
		// Read item description file. format - itemId, itemName, Other Fields,..
		JavaRDD<String> itemDescritpionFile = sc.textFile(args[1]);
		
		// Map file to Ratings(user,item,rating) tuples
		JavaRDD<Rating> ratings = userItemRatingsFile.map(new Function<String, Rating>() {
			public Rating call(String s) {
				String[] sarray = s.split(",");
				return new Rating(Integer.parseInt(sarray[0]), Integer
						.parseInt(sarray[1]), Double.parseDouble(sarray[2]));
			}
		});
		
		// Create tuples(itemId,ItemDescription), will be used later to get names of item from itemId
		JavaPairRDD<Integer,String> itemDescritpion = itemDescritpionFile.mapToPair(
				new PairFunction<String, Integer, String>() {
			@Override
			public Tuple2<Integer, String> call(String t) throws Exception {
				String[] s = t.split(",");
				return new Tuple2<Integer,String>(Integer.parseInt(s[0]), s[1]);
			}
		});

		// Build the recommendation model using ALS
		
		int rank = 10; // 10 latent factors
		int numIterations = Integer.parseInt(args[2]); // number of iterations
		
		MatrixFactorizationModel model = ALS.trainImplicit(JavaRDD.toRDD(ratings),
				rank, numIterations);
		//ALS.trainImplicit(arg0, arg1, arg2)

		// Create user-item tuples from ratings
		JavaRDD<Tuple2<Object, Object>> userProducts = ratings
				.map(new Function<Rating, Tuple2<Object, Object>>() {
					public Tuple2<Object, Object> call(Rating r) {
						return new Tuple2<Object, Object>(r.user(), r.product());
					}
				});
		
		// Calculate the itemIds not rated by a particular user, say user with userId = 1
		JavaRDD<Integer> notRatedByUser = userProducts.filter(new Function<Tuple2<Object,Object>, Boolean>() {
			@Override
			public Boolean call(Tuple2<Object, Object> v1) throws Exception {
				if (((Integer) v1._1).intValue() != 0) {
					return true;
				}
				return false;
			}
		}).map(new Function<Tuple2<Object,Object>, Integer>() {
			@Override
			public Integer call(Tuple2<Object, Object> v1) throws Exception {
				return (Integer) v1._2;
			}
		});
		
		// Create user-item tuples for the items that are not rated by user, with user id 1
		JavaRDD<Tuple2<Object, Object>> itemsNotRatedByUser = notRatedByUser
				.map(new Function<Integer, Tuple2<Object, Object>>() {
					public Tuple2<Object, Object> call(Integer r) {
						return new Tuple2<Object, Object>(0, r);
					}
		});
		
		// Predict the ratings of the items not rated by user for the user
		JavaRDD<Rating> recomondations = model.predict(itemsNotRatedByUser.rdd()).toJavaRDD().distinct();
				
		// Sort the recommendations by rating in descending order
		recomondations = recomondations.sortBy(new Function<Rating,Double>(){
			@Override
			public Double call(Rating v1) throws Exception {
				return v1.rating();
			}
			
		}, false, 1);	
		
		// Get top 10 recommendations
		JavaRDD<Rating> topRecomondations = sc.parallelize(recomondations.take(10));
		
		// Join top 10 recommendations with item descriptions
		JavaRDD<Tuple2<Rating, String>> recommendedItems = topRecomondations.mapToPair(
				new PairFunction<Rating, Integer, Rating>() {
			@Override
			public Tuple2<Integer, Rating> call(Rating t) throws Exception {
				return new Tuple2<Integer,Rating>(t.product(),t);
			}
		}).join(itemDescritpion).values();
		
		
		//Print the top recommendations for user 1.
		recommendedItems.foreach(new VoidFunction<Tuple2<Rating,String>>() {
			@Override
			public void call(Tuple2<Rating, String> t) throws Exception {
				System.out.println(t._1.product() + "\t" + t._1.rating() + "\t" + t._2);
			}
		});
		
	}

}
 

Related Articles

#Tech

NHibernate, Linq and Lazy Collections

For the past few months we have been busy building a services framework for one of our clients so that they can build all of their enterprise web services without ever having to worry about the cross cutting concerns and... Read more
#Tech

Page Redirects using Spring.NET

Who is responsible for page redirects in ASPNET MVP – The View or the Presenter None of the above, it is you :) On a serious note, it is the View as one shouldn’t pollute the Presenter with the page navigation... Read more
  • Pingback: Article: Big Data Processing with Apache Spark – Part 4: Spark Machine Learning – Technology Up2date()

  • Pingback: Big Data Processing with Apache Spark – Part 4: Spark Machine Learning | 神刀安全网()

  • Dipesh Malani

    Hi, I am totally new in Spark and Java need your help for below exception
    I have copied above mentioned code in my java spark program, it throwing below exception.

    Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
    17/01/13 16:46:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
    17/01/13 16:46:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
    17/01/13 16:46:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
    17/01/13 16:46:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK

    Thanks
    Dipesh

    • Sarvesh Kumar

      Hi Dipesh, It is a warning not exception. You can continue with this.

    • ThanhRobb

      I was so and results in this:”// Predict the ratings of the items not rated by user for the user
      JavaRDD recomondations = model.predict(itemsNotRatedByUser.rdd()).toJavaRDD().distinct();” segment also results count = 0. please, help me!!

  • ThanhRobb

    I’m have run to code results in this:”JavaRDD recomondations = model.predict(itemsNotRatedByUser.rdd()).toJavaRDD().distinct();” segment also results count = 0. please, help me!!

    • Sarvesh Kumar

      Can you explain what problem you are facing?. It’s not clear

  • salsabeel

    i got this error when try to run cod
    “” at Recommendation.main(Recommendation.java:112)
    Caused by: java.lang.StackOverflowError “” is there a solution for this problem

    • Sarvesh Kumar

      It is not clear what exception you are getting. Can you provide some more stack trace?

  • santhosh kumar

    Hello,

    Please provide me scala code.

  • Gaurav

    Hi,
    I am getting the following exception:
    D:softwaresspark-2.2.0-bin-hadoop2.7spark-2.2.0-bin-hadoop2.7>binspark-submi
    t –jars jarsspark-mllib_2.10-1.5.2.jar –class RecommendationEngine –master l
    ocal[4] sparkdemosparktesttargetspark-test-1.0-SNAPSHOT.jar ml-latest-smallm
    l-latest-smallratings.csv ml-latest-smallml-latest-smallmovies.csv 10
    Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
    Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/spark/Logg
    ing
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:14
    2)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at RecommendationEngine.main(RecommendationEngine.java:62)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
    java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
    sorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSub
    mit$$runMain(SparkSubmit.scala:755)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:18
    0)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    … 23 more

    i created a maven project. Following are the dependencies-

    org.apache.spark
    spark-core_2.10
    1.5.2

    org.apache.spark
    spark-mllib_2.10
    1.5.2

    Can you help me?

  • Gaurav Abburi

    Hi,
    I am getting the following error.
    D:softwaresspark-2.2.0-bin-hadoop2.7spark-2.2.0-bin-hadoop2.7>binspark-submi
    t –class RecommendationEngine –master local[4] sparkdemosparktesttargetspar
    k-test-1.0-SNAPSHOT-jar-with-dependencies.jar ml-latest-smallml-latest-smallra
    tings.csv ml-latest-smallml-latest-smallmovies.csv 10
    Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to “WARN”.
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLeve
    l(newLevel).
    Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
    17/11/18 13:04:54 INFO Remoting: Starting remoting
    17/11/18 13:04:54 INFO Remoting: Remoting started; listening on addresses :[akka
    .tcp://sparkDriver@192.168.0.102:59093]
    Exception in thread “main” java.lang.NoSuchMethodError: org.apache.spark.network
    .util.TransportConf.(Lorg/apache/spark/network/util/ConfigProvider;)V
    at org.apache.spark.network.netty.SparkTransportConf$.fromSparkConf(Spar
    kTransportConf.scala:59)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.(IndexShuffl
    eBlockResolver.scala:47)
    at org.apache.spark.shuffle.sort.SortShuffleManager.(SortShuffleMa
    nager.scala:28)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct
    orAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC
    onstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:276)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:327)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:194)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)
    at org.apache.spark.SparkContext.(SparkContext.scala:450)
    at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.sc
    ala:61)
    at RecommendationEngine.main(RecommendationEngine.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
    java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
    sorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSub
    mit$$runMain(SparkSubmit.scala:674)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:18
    0)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)

    Can someone help me out here?