如何将MapReduce转换成Spark

充分利用Spark的秘诀在于理解它的RDD API和原始的Mapper和Reducer 的API之间的区别。

经典的MapReduce一开始就成为Apache Hadoop的主要计算方式。它的思想在于最初设计为下面的场景:大规模的日志访问和面向批处理的ETL操作。

随着Hadoop的功能的拓展,各方面都表明MapReduce不再是一个对所有计算都是最好的框架了。Hadoop通过将资源管理模块抽离出来成为一个可选的结构组件,叫Yarn。因此就有像Impala这样使用新型的、专门的、非MapReduce的结构来实现交互式的SQL功能。

今天Apache Spark是另一种可选的结构,并且很多人说它作为一个通用的Hadoop计算框架战胜了MapReduce。但是如果说MapReduce是如此有用,那么会马上被替代吗?毕竟,虽说现在Hadoop平台也有了实时的功能,但是仍然有不少的类ETL工具使用MapReduce运行在Hadoop上。

Spark完全有可能重新实现类似MapReduce的一些计算。由于Spark能够优化掉溢出到磁盘所以简单更快地实现这些计算。对于MapReduce,在Spark上重新实现像是一种回归。Spark毕竟是模仿Scala的函数式编程的风格和API。而MapReduce中同样的观点也是来自函数式编程语言LISP。

虽说Spark的主要抽象概念,RDD清晰地给出了map()和reduce()操作,但是它们不是直接仿照Hadoop中的Mapper和Reducer的API。这个经常给开发者Mapper和Reducer的类迁移到Spark上同等的功能上带来一些障碍。

比较传统的函数式语言的map()和reduce()在Scala或Spark上的实现,Hadoop上的Mapper和Reducer的API事实上是可伸缩的并且复杂的。这些差异对于习惯于MapReduce的开发者来说并不明显,下面的介绍都是详细的Hadoop实现和而非抽象的MapReduce思想。

  • Mapper和Reducer总是使用key-value对来作为输入和输出;
  • 一个Reducer仅在每个key上归纳value;
  • 一个Mapper或Reducer对于每个输入可能得到0,1或者多个key-value对;
  • Mapper和Reducer可能得出任意的key或value,而非仅仅是输入的子集或转换;
  • Mapper和Reducer对象的生命周期范围在map()和reduce()方法调用中。它们支持一个setup()和cleanup()方法,这两个方法可以被用来在一批记录被处理的开始和结束之时做些处理。

这篇文章将会简要地介绍如何用Spark来重现它们,并且表明不是必须要逐字地转换一个Mapper和Reducer。

Key-Value Pairs as Tuples

Let’s say we need to compute the length of each line in a large text input, and report the count of lines by line length. In Hadoop MapReduce, this begins with a Mapper that produces key-value pairs in which the line length is the key, and count of 1 is the value:

1
2
3
4
5
6
7
8
public class LineLengthMapper
extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
@Override
protected void map(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
context.write(new IntWritable(line.getLength()), new IntWritable(1));
}
}

It’s worth noting that Mappers and Reducers only operate on key-value pairs. So the input to LineLengthMapper, provided by a TextInputFormat, is actually a pair containing the line as value, with position within the file thrown in as a key, for fun. (It’s rarely used, but, something has to be the key.)
The Spark equivalent is:

1
lines.map(line => (line.length, 1))

In Spark, the input is an RDD of Strings only, not of key-value pairs. Spark’s representation of a key-value pair is a Scala tuple, created with the (a,b) syntax shown above. The result of the map() operation above is an RDD of (Int,Int) tuples. When an RDD contains tuples, it gains more methods, such as reduceByKey(), which will be essential to reproducing MapReduce behavior.

Reducer and reduce() versus reduceByKey()

To produce a count of line lengths, it’s necessary to sum the counts per length in a Reducer:

1
2
3
4
5
6
7
8
9
10
11
12
public class LineLengthReducer
extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
@Override
protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(length, new IntWritable(sum));
}
}

The equivalent of the Mapper and Reducer above together is a one-liner in Spark:

1
val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)

Spark’s RDD API has a reduce() method, but it will reduce the entire set of key-value pairs to one single value. This is not what Hadoop MapReduce does. Instead, Reducers reduce all values for a key and emit a key along with the reduced value. reduceByKey() is the closer analog. But, that is not even the most direct equivalent in Spark; see groupByKey() below.

It is worth pointing out here that a Reducer’s reduce() method receives a stream of many values, and produces 0, 1 or more results. reduceByKey(), in contrast, accepts a function that turns exactly two values into exactly one — here, a simple addition function that maps two numbers to their sum. This associative function can be used to reduce many values to one for the caller. It is a simpler, narrower API for reducing values by key than what a Reducer exposes.

Mapper and map() versus flatMap()

Now, instead consider counting the occurrences of only words beginning with an uppercase character. For each line of text in the input, a Mapper might emit 0, 1 or many key-value pairs:

1
2
3
4
5
6
7
8
9
10
11
12
public class CountUppercaseMapper
extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
for (String word : line.toString().split(" ")) {
if (Character.isUpperCase(word.charAt(0))) {
context.write(new Text(word), new IntWritable(1));
}
}
}
}

The equivalent in Spark is:

1
2
3
lines.flatMap(
_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))
)

map() will not suffice here, because map() must produce exactly one output per input, but unlike before, one line needs to yield potentially many outputs. Again, the map() function in Spark is simpler and narrower compared to what the Mapper API supports.

The solution in Spark is to first map each line to an array of output values. The array may be empty, or have many values. Merely map()-ing lines to arrays would produce an RDD of arrays as the result, when the result should be the contents of those arrays. The result needs to be “flattened” afterward, and flatMap() does exactly this. Here, the array of words in the line is filtered and converted into tuples inside the function. In a case like this, it’s flatMap() that’s required to emulate such a Mapper, not map().

groupByKey()

It’s simple to write a Reducer that then adds up the counts for each word, as before. And in Spark, again, reduceByKey() could be used to sum counts per word. But what if for some reason the output has to contain the word in all uppercase, along with a count? In MapReduce, that’s:

1
2
3
4
5
6
7
8
9
10
11
12
public class CountUppercaseReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text word, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
}
}

But reduceByKey() by itself doesn’t quite work in Spark, since it preserves the original key. To emulate this in Spark, something even more like the Reducer API is needed. Recall that Reducer’s reduce() method receives a key and Iterable of values, and then emits some transformation of those. groupByKey() and a subsequent map() can achieve this:

1
... .groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }

groupByKey() merely collects all values for a key together, and does not apply a reduce function. From there, any transformation can be applied to the key and Iterable of values. Here, the key is transformed to uppercase, and the values are directly summed.

Be careful! groupByKey() works, but also collects all values for a key into memory. If a key is associated to many values, a worker could run out of memory. Although this is the most direct analog of a Reducer, it’s not necessarily the best choice in all cases. For example, Spark could have simply transformed the keys after a call to reduceByKey:

1
... .reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total) }

It’s better to let Spark manage the reduction rather than ask it to collect all values just for us to manually sum them.

setup() and cleanup()

In MapReduce, a Mapper and Reducer can declare a setup() method, called before any input is processed, to perhaps allocate an expensive resource like a database connection, and a cleanup() method to release the resource:

1
2
3
4
5
6
7
8
9
10
11
12
public class SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
private Connection dbConnection;
@Override
protected void setup(Context context) {
dbConnection = ...;
}
...
@Override
protected void cleanup(Context context) {
dbConnection.close();
}
}

The Spark map() and flatMap() methods only operate on one input at a time though, and provide no means to execute code before or after transforming a batch of values. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark:

1
2
3
val dbConnection = ...
lines.map(... dbConnection.createStatement(...) ...)
dbConnection.close() // Wrong!

However, this fails for several reasons:

It puts the object dbConnection into the map function’s closure, which requires that it be serializable (for example, by implementing java.io.Serializable). An object like a database connection is generally not serializable.
map() is a transformation, rather than an operation, and is lazily evaluated. The connection can’t be closed immediately here.
Even so, it would only close the connection on the driver, not necessarily freeing resources allocated by serialized copies.
In fact, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark — it’s the important mapPartitions() method. This method does not map just one value to one other value, but rather maps an Iterator of values to an Iterator of other values. It’s like a “bulk map” method. This means that the mapPartitions() function can allocate resources locally at its start, and release them when done mapping many values.

Adding setup code is simple; adding cleanup code is harder because it remains difficult to detect when the transformed iterator has been fully evaluated. For example, this does not work:

1
2
3
4
5
6
7
8
9
lines.mapPartitions { valueIterator =>
val dbConnection = ... // OK
val transformedIterator = valueIterator.map(... dbConnection ...)
dbConnection.close() // Still wrong! May not have evaluated iterator
transformedIterator
}
```

A more complete formulation (HT Tobias Pfeiffer) is roughly:

lines.mapPartitions { valueIterator =>
if (valueIterator.isEmpty) {
Iterator
} else {
val dbConnection = …
valueIterator.map { item =>
val transformedItem = …
if (!valueIterator.hasNext) {
dbConnection.close()
}
transformedItem
}
}
}


Although decidedly less elegant than previous translations, it can be done.

There is no flatMapPartitions() method. However, the same effect can be achieved by calling mapPartitions(), followed by a call to flatMap(a => a) to flatten.

The equivalent of a Reducer with setup() and cleanup() is just a groupByKey() followed by a mapPartitions() call like the one above. Take note of the caveat about using groupByKey() above, though.

### But Wait, There's More

MapReduce developers will point out that there is yet more to the API that hasn't been mentioned yet:

- MapReduce supports a special type of Reducer, called a Combiner, that can reduce shuffled data size from a Mapper.
- It also supports custom partitioning via a Partitioner, and custom grouping for purposes of the Reducer via grouping Comparator.
- The Context objects give access to a Counter API for accumulating statistics.
- A Reducer always sees keys in sorted order within its lifecycle.
- MapReduce has its own Writable serialization scheme.
- Mappers and Reducers can emit multiple outputs at once.
- MapReduce alone has tens of tuning parameters.  

There are ways to implement or port these concepts into Spark, using APIs like the Accumulator, methods like groupBy() and the partitioner argument in various of these methods, Java or Kryo serialization, caching, and more. To keep this post brief, the remainder will be left to a follow-up post.

The concepts in MapReduce haven't stopped being useful. It just now has a different and potentially more powerful implementation on Hadoop, and in a functional language that better matches its functional roots. Understanding the differences between Spark's RDD API, and the original Mapper and Reducer APIs, helps developers better understand how all of them truly work and how to use Spark's counterparts to best advantage.
小鹏 wechat
公众号:数据Man
0%