하둡 맵리듀스는 클러스터 환경에서 대량의 데이터를 병렬로 처리하는 응용 프로그램을 쉽게 작성할 수 있는 소프트웨어 프레임워크입니다.

  맵리듀스 작업은 일반적으로 입력 데이터를 독립적인 청크로 분할(Split)하여 다수의 노드에서 병렬로 맵 작업을 수행합니다. 맵 작업의 결과물은 Shuffling 되어 리듀스 작업의 입력으로 전달되며, 리듀스 작업의 결과물이 최종 결과 데이터 셋이 됩니다.
  이러한 맵 리듀스 과정에서의 입/출력 데이터는 <key, value> 쌍의 집합 형태로 표현합니다.

  대표적인 맵리듀스의 예제인 단어세기 프로그램을 기준으로 맵리듀스의 논리적 데이터 흐름을 살펴보겠습니다.

  단어세기 프로그램은 입력 데이터에서 각 단어의 건수를 계산하여 출력하는 프로그램입니다.

  해당 프로그램은 먼저 입력 경로 내 존재하는 텍스트 파일을 읽어 문자열 데이터를 분할(Split)하여 각 맵에 분배하여 줍니다. 데이터를 분할하는 기준은 InputFormat으로 정의할 수 있습니다. InputFormat의 기본은 TextInputFormat이며, 이 경우 개행을 기준으로 데이터를 분할하므로 맵은 한 번에 한 줄의 문자열을 처리하게 됩니다.

  맵은 입력받은 문자열을 공백 기준으로 분리(Splitting)하여 <단어,1> 형태의 <key, value> 쌍을 생성합니다. 아래 목록은 맵 연산의 결과 예시입니다.
  <Hadoop, 1>
  <MapRecdce, 1>
  <software, 1> ...
  <MapRecdce, 1> ...

  맵의 출력은 키를 기준으로 정렬된 후 리듀스에 전달됩니다. 

  만약 프로그램 내 컴바이너(Combiner)가 정의되어 있다면 리듀스 연산에 앞서 컴바이너 연산이 선행됩니다. 이때 정렬된 맵의 출력 값은 컴바이너의 입력값이 되며, 컴바이너의 출력 값이 리듀스에 전달되게 됩니다.

  컴바이너는 네트워크 사용량을 줄이기 위한 목적으로 리듀스에 전달하는 데이터의 크기를 최소화하기 위해 사용됩니다. 일반적으로 연산 내용이 리듀스와 동일하며 로컬 내 맵 결과 데이터만을 대상으로 리듀스 작업을 미리 진행하는 단계라고 이해하시면 됩니다.

  아래는 컴바이너 연산까지 적용된 맵 연산 결과 예시입니다.
  <Hadoop, 1>
  <MapRecdce, 2>
  <software, 1> ...

  위에서 언급했듯이 맵 연산 결과 데이터는 리듀스의 입력 데이터로 전달됩니다. 이때 키 값이 같은 맵의 출력 결과물은 하나의 리듀스가 키 값의 건수를 계산할 수 있도록 동일한 리듀스로 전달됩니다(Shuffling). 

  리듀스에서는 입력 값으로 전달받은 맵 연산 결과 데이터를 읽어 단어별로 건수를 계산하게 됩니다. <key, value> 구조인 입력 값을 읽어 key가 동일한 데이터의 value를 합한 결과를 <key, value> 쌍 구조로 출력하게 됩니다.

  아래는 리듀스 연산 후 출력되는 최종 결과 데이터의 예입니다.
  <Hadoop, 10>
  <MapRecdce, 18>
  <job, 23> ...

 

  지금까지 살펴본 맵 리듀스 연산을 Java로 구현한 소스 코드는 아래와 같습니다.

package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
       String line = value.toString();
       StringTokenizer tokenizer = new StringTokenizer(line);
       while (tokenizer.hasMoreTokens()) {
         word.set(tokenizer.nextToken());
         output.collect(word, one);
       }
     }
   }

   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
       int sum = 0;
       while (values.hasNext()) {
         sum += values.next().get();
       }
       output.collect(key, new IntWritable(sum));
     }
   }

   public static void main(String[] args) throws Exception {
     JobConf conf = new JobConf(WordCount.class);
     conf.setJobName("wordcount");

     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);

     conf.setMapperClass(Map.class);
     conf.setCombinerClass(Reduce.class);
     conf.setReducerClass(Reduce.class);

     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputFormat(TextOutputFormat.class);

     FileInputFormat.setInputPaths(conf, new Path(args[0]));
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

     JobClient.runJob(conf);
   }
}

[Ref.] https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html

+ Recent posts