Spark Dataframe Analytics

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

First, start the PySpark console with CSV support. Because we want to load dataframe from CSV but it is not included in Spark. So go to the website to see how to use spark-csv package:http://spark-packages.org/package/databricks/spark-csv

Go to the spark directory and type:

1
bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0

Then it will launch the pyspark console. We need to analyze Yelp data, which is available in https://github.com/cloudera/hue/blob/master/apps/search/examples/collections/solr_configs_yelp_demo/index_data.csv
Download and load it:

1
2
3
4
yelp_df = sqlCtx.load(source='com.databricks.spark.csv',
header = 'true',
inferSchema = 'true',
path = 'file:///Users/logankim/Desktop/index_data.csv')

Type the following to see the schema:

1
yelp_df.printSchema()

And then do the following simple analysis:

  • Find the mean of the “cool” column across all of the dataset
1
yelp_df.agg({"cool":"mean"}).collect()
  • Take into consideration only the records with a “review count” of 10 or more and only records for which the venue is still open (see the “open” column). Find the average of the “cool” column for venues with 5 “stars”
1
yelp_df.filter(yelp_df.review_count >= 10).filter(yelp_df.stars==5).filter(yelp_df.open == "True").agg({"cool":"mean"}).collect()
  • Take into consideration only the records with a “review count” of 10 or more and only records for which the venue is still open (see the “open” column).
    Count the records for each “state”
1
yelp_df.filter(yelp_df.review_count >= 10).filter(yelp_df.open == "True").groupBy("state").count().orderBy(pyspark.sql.functions.desc("count")).show()
  • Take into consideration the complete dataset, what is the maximum number of reviews per venue (identified by “business_id”)?
1
yelp_df.groupBy("business_id").count().orderBy(pyspark.sql.functions.desc("count")).show()

Go to the website http://spark.apache.org/docs/latest/api/python/pyspark.sql.html to take a deeper look at it.