Advanced Join in Spark

Apache Spark is a fast and general-purpose cluster computing system. First we need to download it. We can launch Spark’s interactive shell – bin/pyspark for the Python one. Start it by running the following in the Spark directory:

1
./bin/pyspark

gennum files contain show names and their viewers, genchan files contain show names and their channel. We want to find out the total number of viewer across all shows for each channel.

Suppose you already have those 6 files, copy them into your Spark directory.

1
2
3
4
5
6
join2_genchanA.txt
join2_genchanB.txt
join2_genchanC.txt
join2_gennumA.txt
join2_gennumB.txt
join2_gennumC.txt

If you do not have those 6 files, please go to the website: http://kimlogan.github.io/2016/01/02/hadoop-join/ to review how to generate them.

Read shows files

1
2
show_views_file = sc.textFile("join2_gennum?.txt")
show_views_file.take(2)

Parse shows files

1
2
3
4
5
6
7
def split_show_views(line):
kv = line.split(",")
show = kv[0]
views = int(kv[1])
return (show, views)
show_views = show_views_file.map(split_show_views)
show_views.collect()

Read channel files

1
2
show_channel_file = sc.textFile("join2_genchan?.txt")
show_channel_file.take(2)

Parse channel files

1
2
3
4
5
6
7
def split_show_channel(line):
kv = line.split(",")
show = kv[0]
channel = kv[1]
return (show, channel)
show_channel = show_channel_file.map(split_show_channel)
show_channel.collect()

Join the 2 datasets

1
2
joined_dataset = show_channel.join(show_views)
joined_dataset.collect()

Extract channel as key

1
2
3
4
5
6
def extract_channel_views(show_views_channel):
channel = show_views_channel[1][0]
views = int(show_views_channel[1][1])
return (channel, views)
channel_views = joined_dataset.map(extract_channel_views)
channel_views.collect()

Sum across all channels

1
2
3
def some_function(a, b):
return a + b
channel_views.reduceByKey(some_function).collect()