import numpy as np
import matplotlib.pyplot as plt
import os

%pylab inline

Populating the interactive namespace from numpy and matplotlib

Setting up your Environment

Note, these instructions only hold for Python 2. As of now, no python 3 version has been created.

For a single-user, standalone installation of Spark, first install Apache Spark and make sure the installation is valid.

Now, follow these directions: http://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/ which will let you use spark in the iPython notebook.

If all went according to plan, you should now have access to the SparkContext variable, sc.

sc

<pyspark.context.SparkContext at 0x7f66d7f46550>

Using the test instruction from the above link we can demonstrate some basic functionality.

spark_home = os.environ.get('SPARK_HOME', None)
text_file = sc.textFile(spark_home + "/README.md")
word_counts = text_file \
    .flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)
word_counts.collect()[:10] # just print first 10

[(u'all', 1),
 (u'when', 1),
 (u'"local"', 1),
 (u'including', 3),
 (u'computation', 1),
 (u'Spark](#building-spark).', 1),
 (u'using:', 1),
 (u'guidance', 3),
 (u'Scala,', 1),
 (u'environment', 1)]

What is Spark?

Spark is an in-memory computational framework. It uses immutable, lazily evaluated objects called RDDs (short for Resilient Distributed Datasets). It provides an API for Java, Python, and Scala. We will be using the PySpark API.

For more, see the following links.

SparkContext Tools

I won't go over every tool here, rather I will list the more important ones, in alphabetical order. For more reference, see http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.rdd.RDD.

  • collect: Collects the data from operations
  • count: Counts the items in the object
  • distinct: Returns just the unique elements
  • filter: Filters the items based on a criteria
  • map: Apply a function to each item
  • reduce: Collapses list into a single item using some aggregating function. In python, such a function is defined as f(agg, nxt) where agg is the aggregator that we're keeping track of, and nxt is the next item in the list. A simple example is using the function lambda agg, nxt: agg + nxt on the list range(10). This will yield sum(range(10)) = 45.

Again, this is not a complete list. A more complete reference is available at the above link.

Another part to note is that Spark uses lazy evaluation, meaning that no commands are actually run until some terminating action is performed. Two notable terminating actions are collect and count.

Playing with Data

Using the dataset available here: https://github.com/umbrae/reddit-top-2.5-million/ we will demonstrate simple csv usage.

sc.textFile('data/reddit-top-2.5-million/data/pics.csv')\
    .filter(lambda line: len(line.split(',')) == 22)\
    .map(lambda line: line.split(',')[4])\
    .filter(lambda line: line.startswith("Found"))\
    .collect()

[u'Found this while looking for fridges at Sears',
 u'Found a hidden room in our attic from WW2!',
 u'Found on a german ATM..',
 u"Found this on my porch this morning. I'm keeping it.",
 u'Found this on my bike after it started raining; I love Portland',
 u'Found this in San Antonio. Who would do this?',
 u'Found MJ on the cover of an old danish book',
 u"Found out Bill Nye used to go to my mom's parties in the 80's"]

This simply counts each post title that starts with the word "Found". Spark does not have a built in csv parsing method, which means that you have to account for separation characters.

Now let's try doing something a little more useful. The following cell pulls out all the post scores and plots them.

post_scores = np.array(sc.textFile('data/reddit-top-2.5-million/data/pics.csv')\
                        .filter(lambda line: len(line.split(',')) == 22)\
                        .map(lambda line: line.split(',')[1])\
                        .collect()[1:], dtype=int)
plt.figure(figsize=(8, 4))
plt.plot(range(len(post_scores)), post_scores)
plt.show()

png

Using a different dataset, available here: https://xato.net/passwords/ten-million-passwords, we can look at the most commonly used passwords. First, we establish how many passwords exist.

sc.textFile('data/10-million-combos.txt').count()

10000000

Now, for each letter we look at how many passwords start with that letter. We also lowercase each password so this included uppercase letters too.

count_dict = {}
for i in xrange(97, 123):
    letter = chr(i)
    count = sc.textFile('data/10-million-combos.txt')\
                .map(lambda line: line.split('\t'))\
                .filter(lambda tup: len(tup) == 2)\
                .filter(lambda tup: tup[1].lower().startswith(letter))\
                .count()
    count_dict[letter] = count

We are given a dictionary of values.

count_dict

{'a': 492295,
 'b': 501189,
 'c': 451278,
 'd': 384153,
 'e': 186717,
 'f': 289502,
 'g': 300124,
 'h': 250859,
 'i': 142208,
 'j': 258590,
 'k': 258585,
 'l': 286000,
 'm': 520827,
 'n': 219139,
 'o': 113788,
 'p': 402553,
 'q': 95947,
 'r': 311934,
 's': 660511,
 't': 344515,
 'u': 65313,
 'v': 159223,
 'w': 168473,
 'x': 50076,
 'y': 79976,
 'z': 94804}

Which we can plot.

values = sorted([(key, count_dict[key]) for key in count_dict.keys()],
                key=lambda tup: tup[1])

width = 1.5
fig = plt.figure(figsize=(8, 6))
ax = fig.add_axes([0.1, 0.1, 0.8, 0.8])
ax.bar(np.arange(26) * width, [t[1] for t in values], width=width, color='g', alpha=0.75)
ax.set_xticks(np.arange(26) * width + 0.75)
ax.set_xticklabels(tuple([t[0] for t in values]))

plt.show()

png