Combining Operational and Analytical Big Data Using Couchbase and Spark: A Market Basket Analysis Example

Couchbase is emerging as a platform of choice in the Enterprise NoSQL market. Couchbase is engineered for handling the operational aspects of big data. However, the platform is continually being enhanced to support integration with related technologies that can address the analytical aspects of big data, and that integration offers disruptive solutions capability to organizations. This post will examine one such integration: the integration of Couchbase with Spark.

Couchbase has made integrating with their platform extremely easy with a variety of SDK’s and connectors to many languages and external systems. Among them is the Couchbase Spark Connector, an integration tool to make reading and writing Couchbase data from Spark simple.

Spark is a cluster computing platform that provides a fast and efficient way to crunch data. Spark is a general-purpose computational tool that sports a wide array of features to make data processing easier including Spark SQL, MLlib, and Spark Streaming.

By integrating Spark with Couchbase we can leverage the power and analytical capabilities of Spark to gain insights into our data, while leaving Couchbase resources free to keep our applications and data stores running optimally.

With Couchbase and Spark integrated via the Couchbase Spark Connector, we can use Spark to tackle data analytics scenarios like Market Basket Analysis (MBA). MBA is a form of analyzing customer purchases to find insights into what products are commonly bought together.

The Scenario

To demonstrate Couchbase and Spark integration let’s lay out an online marketplace scenario driven by a Couchbase backend. Couchbase can drive all the parts of our marketplace including a user store, product management console, inventory tracker, and purchase and transaction records. For this example we will focus on customer purchases.

We’ll start with orders purchased by our customers. An order can consist of many things like payment details, billing address, shipping address, shipping option, but we’ll keep it simple and only include a list of items that the customer purchased.

Our online marketplace works great, but we would like to add some recommendation functionality based on order history from our entire customer base. We’ve chosen to use Spark to do some MBA to produce recommendations.

After we have done our MBA, we will want to store the information we gathered back into Couchbase in a manner that facilitates our ability  to efficiently take advantage of it later. With MBA data alongside our operational data in Couchbase we can implement a set of product recommendations in our online marketplace.

Setting up the Environment

For this tutorial we will need a running Couchbase cluster as well as Spark installed. We recommend spinning up a Virtual Machine in your favorite VM manager, but both Couchbase and Spark can also run on your desktop. Installation instructions for both can be found here:

We will need a Couchbase bucket to store our data and recommendations in. A simple Couchbase bucket should suffice, we named ours ‘retail’ and you will see it referred to that way from here on out.

When you have Couchbase and Spark up and running, it’s time to get some data! You can find a simple market basket data set example here: http://fimi.ua.ac.be/data/.

Note: Please visit this page for full details on the data set and its permissible usages.
The dataset originated with this paper:

Brijs T., Swinnen G., Vanhoof K., and Wets G. (1999), The use of association rules for product assortment decisions: a case study, in: Proceedings of the Fifth International Conference on Knowledge Discovery and Data Mining, San Diego (USA), August 15-18, pp. 254-260. ISBN: 1-58113-143-7.

The retail data set is 23,668 rows, where each row represents a customer’s basket at a marketplace. Each row is a space delimited list of id numbers. Each unique ID number corresponds to a unique product. You can download a compressed file, or just copy and paste the data directly into a file; either way, save the retail data into a file called ‘retail.txt’.

We need to do a small amount of data munging to get this example data set into order documents. Feel free to take a crack at putting this data into Couchbase yourself, but if you need a little help here is a simple python script that does the job pretty quickly. (You will need the Couchbase Python SDK installed for this script to work, instructions on how to get the Couchbase Python SDK are here: http://docs.couchbase.com/developer/python-2.0/download-links.html )

Simply copy this python script into the directory where your retail data file is located and run it. In just a few seconds you will have 23,668 order documents in your Couchbase cluster.

from couchbase.bucket import Bucket

bucket = Bucket("couchbase://localhost/retail")
f = open('retail.txt','r')
count = 1
inserts = dict()

for line in f:
 items = map(int, line.strip().split(" "))
 inserts["order::%d" % count] = {'type': 'order', 'items': items}
 count += 1

bucket.insert_multi(inserts)

The schema for the order documents looks like this:

order::12345
{
   type: ‘order’,
   items: [
       235,
       6345,
       35
   ]
}

Where the document key is just a concatenation of ‘order::’ and the line number from retail.txt.

The last thing we will need to do to prepare our data is to create a View in Couchbase so we can retrieve all of our order documents. We created a new design document called ‘retail’ and a new View called ‘allOrders’. The simple View looks like this:

function (doc, meta) {
   if (doc.type == 'order')
       emit(meta.id, null);
}

Market Basket Analysis

Once our environment is all set up, and we have some data, we can get on with the fun stuff! You can find the Couchbase Spark Connector in Couchbase Labs along with instructions on how to get it: https://github.com/couchbaselabs/couchbase-spark-connector.

The first thing we need to do is the configuration setup so Spark can speak to Couchbase.

import com.couchbase.spark._
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._

val sparkConf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("MBA")
  .set("com.couchbase.nodes", "localhost")
  .set("com.couchbase.bucket.retail", "")

val sc = new SparkContext(sparkConf)

The configurations here are fairly simple. We want to run our Spark job locally, connecting to a local Couchbase cluster, using our retail bucket. It’s important to note some of the key imports necessary.

  • com.couchbase.spark._ provides all of the Couchbase features
  • scala.collection.JavaConverters._ and scala.collection.JavaConversions._ are especially important when converting between Scala and Java object types.

The next step is to get all of our order documents out of Couchbase for processing:

val order_docs = sc.couchbaseView(ViewQuery.from("retail", "allOrders"))
  .map(row => row.id)
  .couchbaseGet[JsonDocument]()

We are using our allOrders View here to grab all of our order document keys, then retrieving all of the matching documents.

Now that we have a list of documents, the first thing we want to compute is the total number of times each product has been purchased. This information will be useful later on when we compute the affinity values for each recommendation.

val purchase_counts = order_docs
  .flatMap(doc => for {
    item <- doc.content().getArray("items").asScala
  } yield (item.asInstanceOf[Integer], 1))
  .reduceByKey((a, b) => a + b)

Here we are using some bread and butter Spark functions (flatMap and reduceByKey) to count the number of times each product has been purchased by first exploding all the order lists into a single list, then reducing the list down to produce counts per product ID.

NOTE: We have to do some type conversion work here. While Scala is written in Java (and thus working with Scala and Java together is doable) when it comes to passing objects back and forth you have to do a little work.

We want to use the order documents again to start to compute our recommendations. Start by creating a list of all the pairs of products that were purchased together.

val product_pairs = order_docs
  .flatMap(doc => for {
    item1 <- doc.content().getArray("items").asScala
    item2 <- doc.content().getArray("items").asScala
    if item1.asInstanceOf[Integer] <  item2.asInstanceOf[Integer]
   } yield ((item1.asInstanceOf[Integer], item2.asInstanceOf[Integer]), 1))
   .reduceByKey((a, b) => a + b)

We are using the same flatMap and reduceByKey pattern here, only this time we will explode each order list twice to create a comprehensive list of pairs of products purchased together. Then, our reduce step will count all of the times specific pairs are purchased together.

At the moment what we have is a list of all product pairs that were purchased together, and the number of times those pairs occurred. We can refine this further to start aggregating recommendation information.

val recommendations = product_pairs
  .flatMap(tuple => List((tuple._1._1, (tuple._1._2, tuple._2)), (tuple._1._2, (tuple._1._1, tuple._2))))
  .groupByKey()
  .map(product => (product._1, product._2.toList.sortBy(pair_count => -pair_count._2).take(3)))
  .join(purchase_counts)

Currently our data only shows product pair relationships in one direction. We want to create the relationships in both directions, then group all product pair relationships together for each individual product. We also want to reduce our data set down to the top most recommended items; we will do this by sorting our recommendation list for each product and taking the top 3. Finally, we will join this set with our purchase_counts so we have a comprehensive set of data that contains the total number of purchases and a list of recommendations for each product.

The last thing we need to do in our Spark script is to save our recommendations back to Couchbase.

val documents = recommendations
  .map(recommendation => {
    val total_purchases = recommendation._2._2.toDouble
    val json_items = for {
      item <- recommendation._2._1
    } yield JsonObject.empty().put("product", item._1).put("affinity", item._2 / total_purchases)
    
    ("product_recommendation::" + recommendation._1, JsonObject.empty().put("type", "product_recommendation").put("recommendations", JsonArray.from(json_items: java.util.List[JsonObject])))
  })
 
documents.toCouchbaseDocument[JsonDocument].saveToCouchbase("retail")

For each of our product recommendations we are building JsonObjects that will house our list of recommended products along with an affinity value which is a percentage representing the number of times a customer purchased the recommend product when purchasing the original product. We finish by using the Couchbase Spark Connector methods to create Couchbase Documents and save them into our retail bucket.

Checking the Results

The fastest way to look at the recommendations is to open up the Couchbase UI, and look at the documents in the retail bucket. You can look up specific IDs in the lookup box. Here is an example:

product_recommendation::35
{
 "type": "product_recommendation",
 "recommendations": [
   {
     "product": 39,
     "affinity": 0.4615384615384616
   },
   {
     "product": 48,
     "affinity": 0.3846153846153846
   },
   {
     "product": 32,
     "affinity": 0.1923076923076923
   }
 ]
}

You can see our top 3 recommendations for product ID 35. Looking at this recommendation it might be a good idea to suggest product ID 39 to any users interested in product 35, 46% of the time that product 35 is purchased, product 39 is also purchased.

Conclusion

By combining the flexibility and scalability of Couchbase with the strong analytical capabilities of Spark we can create systems to generate real value from our data. This post provided an example that will get you started with how to integrate Couchbase to Spark using Couchbase’s Spark Connector. Have other Couchbase integration use cases you’d like us to examine? If so, share your request in the comments.

NOTE: A full version of the code as well as a pre-configured environment to run it in can be found here: https://github.com/Avalon-Consulting-LLC/couchbase-spark-mba.

About Andy Kruth

Andy is the Manager of Avalon's Hadoop COE, group focused on providing knowledge and expertise in the world of Big Data including the Hadoop ecosystem and NoSQL technologies.

Leave a Comment

*