6장. 맵리듀스 기초(2)


  • 사용자 정의 옵션 사용 -사용자정의 옵션, 매퍼, 드라이버 구현, 실행

*API 설명

Class GenericOptionsParser

- 옵션

-conf  : 명시된 파일을 환경설정에 있는 리소스 정보에 추가 

-D  : 하둡 환경 설정 파일에 있는 옵션에 새로운 값 설정 

-fs  : 네임노드와 포트를 명시하여 새롭게 설정
 
-jt  : 잡트래커와 포트를 명시하여 새롭게 설정 

 -files : 여러가지 파일리스트를 쓸수 있음.  로컬에 있는 파일을 hdfs  공유파일시스템으로 복사하는 역할

-libjars : 로컬에 있는 jar 파일을 hdfs 공유파일시스템으로 복사하고, 맵리듀스의 Task Class path에 추가.

-archives : 아카이브 파일 리스트를 쓸수 있음. 로컬에 있는 아카이브 파일을 hdfs 공유파일시스템으로 복사하고, 압축을 푸는 역할.

- 예제

 $ bin/hadoop dfs -fs darwin:8020 -ls /data
 list /data directory in dfs with namenode darwin:8020
 $ bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data
 list /data directory in dfs with namenode darwin:8020
     
 $ bin/hadoop dfs -conf hadoop-site.xml -ls /data
 list /data directory in dfs with conf specified in hadoop-site.xml
     
 $ bin/hadoop job -D mapred.job.tracker=darwin:50020 -submit job.xml
 submit a job to job tracker darwin:50020
     
 $ bin/hadoop job -jt darwin:50020 -submit job.xml
 submit a job to job tracker darwin:50020
     
 $ bin/hadoop job -jt local -submit job.xml
 submit a job to local runner
 $ bin/hadoop jar -libjars testlib.jar 
 -archives test.tgz -files file.txt inputjar args
 job submission with libjars, files and archive

- 생성자
GenericOptionsParser(Configuration conf, String[] args) 
          Create a GenericOptionsParser to parse only the generic Hadoop arguments.

※ GenericOptionParser Class는 내부적으로 Configuration 객체를 만들어서 생성자에게서 전달받은 환경 설정 정보를 설정.

※ 맵리듀스 프로그램을 개발할 때는 GenericOptionParser만 단독으로 사용하지 않고, GenericOptionsParser가 사용하는 Configuration객체를 상속받는 Tool Interface와 GanaricOptionsParser를 내부적으로 선언한 Tool-Runner Class를 많이 이용.

Interface Tool

→ GenericOptionsParser에 콘솔 설정 옵션을 지원하기 위한 Interface임.

public interface Tool
extends Configurable
Configuration Class를 상속받음

int run(String[] args)
        throws Exception

run메소드가 정의되어 있음

Class ToolRunner

→ Tool Interface에 실행을 도와주는 helper Class
→ GenericOptionsParser를 사용해서 사용자가 콘솔 명령에서 설정한 옵션을 분석하게 되고, Configuration 객체를 설정
→ Configuration 객체를 Tool Interface에 전달한 후, Tool Interface에 run 메서드 실행

static intrun(Configuration conf, Tool tool, String[] args) 

* 코드 작성

- DelayCount.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package pj01.hadoop.chapter05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCount extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(getConf(), args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: DelayCount  ");
            System.exit(2);
        }
        Job job = new Job(getConf(), "DelayCount");
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        job.setJarByClass(DelayCount.class);
        job.setMapperClass(DelayCountMapper.class);
        job.setReducerClass(DelayCountReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.waitForCompletion(true);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new DelayCount(), args);
        System.out.println("## RESULT:" + res);
    }
}
cs

※ 기존에는 main에서 다 구현했으나, 사용자정의 구현에서는 run 메소드에서 구현함.
※ Tool 인터페이스를 구현하려면 반드시 int 반환형 run 메소드를 구현해야함.
※ getRemainingArgs() 메소드를 통해서 입출력 경로를 지정하는 나머지 인자들만 뽑아냄.



-DelayCountMapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DelayCountMapper extends
        Mapper<LongWritable, Text, Text, IntWritable>; {
    private String workType;
    private final static IntWritable outputValue = new IntWritable(1);
    private Text outputKey = new Text();
    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        workType = context.getConfiguration().get("workType");
    }
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        if (key.get() > 0) {
            String[] colums = value.toString().split(",");
            if (colums != null && colums.length > 0) {
                try {
                    if (workType.equals("departure")) {
                        if (!colums[15].equals("NA")) {
                            int depDelayTime = Integer.parseInt(colums[15]);
                            if (depDelayTime &gt; 0) {
                                outputKey.set(colums[0+ "," + colums[1]);
                                context.write(outputKey, outputValue);
                            }
                        }
                    } else if (workType.equals("arrival")) {
                        if (!colums[14].equals("NA")) {
                            int arrDelayTime = Integer.parseInt(colums[14]);
                            if (arrDelayTime > 0) {
                                outputKey.set(colums[0+ "," + colums[1]);
                                context.write(outputKey, outputValue);
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
cs

- DelayCountReducer.java
(4장. 참조)

*실행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
[hduser@hdstudy01 hadoop]$ hadoop jar alzio-hadoop-examples.jar -D workType=departure input departure_delay_count
17/06/13 00:10:54 INFO input.FileInputFormat: Total input paths to process : 22
17/06/13 00:10:54 INFO util.NativeCodeLoader: Loaded the native-hadoop library
17/06/13 00:10:54 WARN snappy.LoadSnappy: Snappy native library not loaded
17/06/13 00:10:55 INFO mapred.JobClient: Running job: job_201706091514_0002
17/06/13 00:10:56 INFO mapred.JobClient:  map 0% reduce 0%
17/06/13 00:11:14 INFO mapred.JobClient:  map 1% reduce 0%
17/06/13 00:11:29 INFO mapred.JobClient:  map 2% reduce 0%
17/06/13 00:11:43 INFO mapred.JobClient:  map 3% reduce 0%
(중략)
17/06/13 00:35:31 INFO mapred.JobClient:  map 100% reduce 80%
17/06/13 00:35:34 INFO mapred.JobClient:  map 100% reduce 87%
17/06/13 00:35:37 INFO mapred.JobClient:  map 100% reduce 94%
17/06/13 00:35:40 INFO mapred.JobClient:  map 100% reduce 100%
17/06/13 00:35:42 INFO mapred.JobClient: Job complete: job_201706091514_0002
17/06/13 00:35:42 INFO mapred.JobClient: Counters: 29
17/06/13 00:35:42 INFO mapred.JobClient:   Map-Reduce Framework
17/06/13 00:35:42 INFO mapred.JobClient:     Spilled Records=134256366
17/06/13 00:35:42 INFO mapred.JobClient:     Map output materialized bytes=658461244
17/06/13 00:35:42 INFO mapred.JobClient:     Reduce input records=49680792
17/06/13 00:35:42 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=360901206016
17/06/13 00:35:42 INFO mapred.JobClient:     Map input records=122846665
17/06/13 00:35:42 INFO mapred.JobClient:     SPLIT_RAW_BYTES=21018
17/06/13 00:35:42 INFO mapred.JobClient:     Map output bytes=559098544
17/06/13 00:35:42 INFO mapred.JobClient:     Reduce shuffle bytes=658461244
17/06/13 00:35:42 INFO mapred.JobClient:     Physical memory (bytes) snapshot=33694904320
17/06/13 00:35:42 INFO mapred.JobClient:     Reduce input groups=254
17/06/13 00:35:42 INFO mapred.JobClient:     Combine output records=0
17/06/13 00:35:42 INFO mapred.JobClient:     Reduce output records=254
17/06/13 00:35:42 INFO mapred.JobClient:     Map output records=49680792
17/06/13 00:35:42 INFO mapred.JobClient:     Combine input records=0
17/06/13 00:35:42 INFO mapred.JobClient:     CPU time spent (ms)=857820
17/06/13 00:35:42 INFO mapred.JobClient:     Total committed heap usage (bytes)=24400076800
17/06/13 00:35:42 INFO mapred.JobClient:   File Input Format Counters
17/06/13 00:35:42 INFO mapred.JobClient:     Bytes Read=11962658585
17/06/13 00:35:42 INFO mapred.JobClient:   FileSystemCounters
17/06/13 00:35:42 INFO mapred.JobClient:     HDFS_BYTES_READ=11962679603
17/06/13 00:35:42 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1788443612
17/06/13 00:35:42 INFO mapred.JobClient:     FILE_BYTES_READ=1119149315
17/06/13 00:35:42 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=3619
17/06/13 00:35:42 INFO mapred.JobClient:   Job Counters
17/06/13 00:35:42 INFO mapred.JobClient:     Launched map tasks=186
17/06/13 00:35:42 INFO mapred.JobClient:     Launched reduce tasks=1
17/06/13 00:35:42 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=1408288
17/06/13 00:35:42 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
17/06/13 00:35:42 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=2895292
17/06/13 00:35:42 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
17/06/13 00:35:42 INFO mapred.JobClient:     Data-local map tasks=186
17/06/13 00:35:42 INFO mapred.JobClient:   File Output Format Counters
17/06/13 00:35:42 INFO mapred.JobClient:     Bytes Written=3619
## RESULT:0
[hduser@hdstudy01 hadoop]$
cs

*결과확인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[hduser@hdstudy01 hadoop]$ hadoop fs -lsr departure_delay_count
-rw-r--r--   1 hduser supergroup          0 2017-06-13 00:35 /user/hduser/departure_delay_count/_SUCCESS
drwxr-xr-x   - hduser supergroup          0 2017-06-13 00:10 /user/hduser/departure_delay_count/_logs
drwxr-xr-x   - hduser supergroup          0 2017-06-13 00:10 /user/hduser/departure_delay_count/_logs/history
-rw-r--r--   1 hduser supergroup     543034 2017-06-13 00:10 /user/hduser/departure_delay_count/_logs/history/job_201706091514_0002_1497280254916_hduser_DelayCount
-rw-r--r--   1 hduser supergroup      50020 2017-06-13 00:10 /user/hduser/departure_delay_count/_logs/history/job_201706091514_0002_conf.xml
-rw-r--r--   1 hduser supergroup       3619 2017-06-13 00:35 /user/hduser/departure_delay_count/part-r-00000
[hduser@hdstudy01 hadoop]$
[hduser@hdstudy01 hadoop]$ hadoop fs -cat departure_delay_count/part-r-00000 | head -10
1987,10 175568
1987,11 177218
1987,12 218858
1988,1  198610
1988,10 162211
1988,11 175123
1988,12 189137
1988,2  177939
1988,3  187141
1988,4  159216
cs


  • 카운터 사용 - 사용자정의 옵션, 매퍼, 드라이버 구현, 실행
※ 맵리듀스 진행상황을 모니터링할 수 있도록 Counter 라는 API를 제공하고 있음
※ 내장 Counter는 Map, Reduce, combiner의 입출력 Record건수와 Byte를 확인가능
※ 내장 Counter는 Maptask와 Reducetask가 실행 및 실패 여부, Read/Write Byte 확인가능
※ 개발자가 Counter를 정의할 수 있기 때문에 맵과 로직의 동작체크 때 유용함.

* 코드 작성
- DelayCounters.java (enum Class로 외부클래스로 별도작성)
1
2
3
4
5
6
package pj01.hadoop.chapter05;
public enum DelayCounters {
    not_available_arrival, scheduled_arrival, early_arrival,
not_available_departure, scheduled_departure, early_departure;
}
cs
 ※ Counter 그룹이름 : DelayCounters
 ※ Counter 이름 : 6개 field

- DelayCountMapperWithCounter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DelayCountMapperWithCounter extends
        Mapper<LongWritable, Text, Text, IntWritable> {
    private String workType;
    private final static IntWritable outputValue = new IntWritable(1);
    private Text outputKey = new Text();
    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        workType = context.getConfiguration().get("workType");
    }
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        if (key.get() > 0) {
            String[] colums = value.toString().split(",");
            if (colums != null && colums.length > 0) {
                try {
                    if (workType.equals("departure")) {
                        if (!colums[15].equals("NA")) {
                            int depDelayTime = Integer.parseInt(colums[15]);
                            if (depDelayTime > 0) {
                                outputKey.set("D," + colums[0+ ","
                                        + colums[1]);
                                context.write(outputKey, outputValue);
                            } else if (depDelayTime == 0) {
                                context.getCounter(DelayCounters.scheduled_departure)
                                        .increment(1);
                            } else if (depDelayTime < 0) {
                                context.getCounter(
                                        DelayCounters.early_departure)
                                        .increment(1);
                            }
                        } else {
                            context.getCounter(
                                    DelayCounters.not_available_departure)
                                    .increment(1);
                        }
                    } else if (workType.equals("arrival")) {
                        if (!colums[14].equals("NA")) {
                            int arrDelayTime = Integer.parseInt(colums[14]);
                            if (arrDelayTime > 0) {
                                outputKey.set("A," + colums[0+ ","
                                        + colums[1]);
                                context.write(outputKey, outputValue);
                            } else if (arrDelayTime == 0) {
                                context.getCounter(
                                        DelayCounters.scheduled_arrival)
                                        .increment(1);
                            } else if (arrDelayTime < 0) {
                                context.getCounter(DelayCounters.early_arrival)
                                        .increment(1);
                            }
                        } else {
                            context.getCounter(
                                    DelayCounters.not_available_arrival)
                                    .increment(1);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
cs

※ context객체에서 getCounter메서드에 Counter이름을 Parameter로 설정해서 조회가능
※ Counter(enum Class에서 정의)들을 각 케이스에 따라 1씩 증가시킴.

- DelayCountWithCounter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package pj01.hadoop.chapter05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCountWithCounter extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(getConf(), args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: DelayCountWithCounter <in> <out>");
            System.exit(2);
        }
        Job job = new Job(getConf(), "DelayCountWithCounter");
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        job.setJarByClass(DelayCountWithCounter.class);
        job.setMapperClass(DelayCountMapperWithCounter.class);
        job.setReducerClass(DelayCountReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.waitForCompletion(true);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                new DelayCountWithCounter(), args);
        System.out.println("## RESULT:" + res);
    }
}
cs


* 실행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
[hduser@hdstudy01 hadoop]$ hadoop jar alzio-hadoop-examples.jar input delay_count_mos
17/06/13 09:01:50 INFO input.FileInputFormat: Total input paths to process : 22
17/06/13 09:01:51 INFO util.NativeCodeLoader: Loaded the native-hadoop library
17/06/13 09:01:51 WARN snappy.LoadSnappy: Snappy native library not loaded
17/06/13 09:01:51 INFO mapred.JobClient: Running job: job_201706091514_0007
17/06/13 09:01:52 INFO mapred.JobClient:  map 0% reduce 0%
17/06/13 09:29:17 INFO mapred.JobClient:  map 1% reduce 0%
17/06/13 09:29:38 INFO mapred.JobClient:  map 2% reduce 0%
17/06/13 09:30:00 INFO mapred.JobClient:  map 3% reduce 0%
(중략)
17/06/13 10:00:05 INFO mapred.JobClient:  map 100% reduce 95%
17/06/13 10:00:08 INFO mapred.JobClient:  map 100% reduce 98%
17/06/13 10:00:10 INFO mapred.JobClient:  map 100% reduce 100%
17/06/13 10:00:13 INFO mapred.JobClient: Job complete: job_201706091514_0007
17/06/13 10:00:14 INFO mapred.JobClient: Counters: 35
17/06/13 10:00:14 INFO mapred.JobClient:   Map-Reduce Framework
17/06/13 10:00:14 INFO mapred.JobClient:     Spilled Records=336979000
17/06/13 10:00:14 INFO mapred.JobClient:     Map output materialized bytes=1513032768
17/06/13 10:00:14 INFO mapred.JobClient:     Reduce input records=99190250
17/06/13 10:00:14 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=361090170880
17/06/13 10:00:14 INFO mapred.JobClient:     Map input records=122846665
17/06/13 10:00:14 INFO mapred.JobClient:     SPLIT_RAW_BYTES=21018
17/06/13 10:00:14 INFO mapred.JobClient:     Map output bytes=1314651152
17/06/13 10:00:14 INFO mapred.JobClient:     Reduce shuffle bytes=1513032768
17/06/13 10:00:14 INFO mapred.JobClient:     Physical memory (bytes) snapshot=35178119168
17/06/13 10:00:14 INFO mapred.JobClient:     Reduce input groups=508
17/06/13 10:00:14 INFO mapred.JobClient:     Combine output records=0
17/06/13 10:00:14 INFO mapred.JobClient:     Reduce output records=0
17/06/13 10:00:14 INFO mapred.JobClient:     Map output records=99190250
17/06/13 10:00:14 INFO mapred.JobClient:     Combine input records=0
17/06/13 10:00:14 INFO mapred.JobClient:     CPU time spent (ms)=1223500
17/06/13 10:00:14 INFO mapred.JobClient:     Total committed heap usage (bytes)=24384839680
17/06/13 10:00:14 INFO mapred.JobClient:   pj01.hadoop.chapter06.DelayCounters
17/06/13 10:00:14 INFO mapred.JobClient:     scheduled_arrival=26216520
17/06/13 10:00:14 INFO mapred.JobClient:     early_arrival=44542943
17/06/13 10:00:14 INFO mapred.JobClient:     not_available_departure=2293613
17/06/13 10:00:14 INFO mapred.JobClient:     early_departure=44612589
17/06/13 10:00:14 INFO mapred.JobClient:     scheduled_departure=26259648
17/06/13 10:00:14 INFO mapred.JobClient:     not_available_arrival=2577721
17/06/13 10:00:14 INFO mapred.JobClient:   File Input Format Counters
17/06/13 10:00:14 INFO mapred.JobClient:     Bytes Read=11962658585
17/06/13 10:00:14 INFO mapred.JobClient:   FileSystemCounters
17/06/13 10:00:14 INFO mapred.JobClient:     HDFS_BYTES_READ=11962679603
17/06/13 10:00:14 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=5149360037
17/06/13 10:00:14 INFO mapred.JobClient:     FILE_BYTES_READ=3625193707
17/06/13 10:00:14 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=7238
17/06/13 10:00:14 INFO mapred.JobClient:   Job Counters
17/06/13 10:00:14 INFO mapred.JobClient:     Launched map tasks=186
17/06/13 10:00:14 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=1768311
17/06/13 10:00:14 INFO mapred.JobClient:     Launched reduce tasks=1
17/06/13 10:00:14 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
17/06/13 10:00:14 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=3595771
17/06/13 10:00:14 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
17/06/13 10:00:14 INFO mapred.JobClient:     Data-local map tasks=186
17/06/13 10:00:14 INFO mapred.JobClient:   File Output Format Counters
17/06/13 10:00:14 INFO mapred.JobClient:     Bytes Written=0
## RESULT:0
[hduser@hdstudy01 hadoop]$
cs
※ Counter 로그들을 확인할 수 있다.

*결과 확인







  • 다수의 파일 출력 - MultipleOutputs, 매퍼, 리듀서, 드라이버 구현, 실행
※ 병렬처리 로직 : 여러 개의 출력 데이터를 쉽게 생성할 수 있도록 함.
※ org.apache.hadoop.mapreduce.lib.output 에 정의되어 있음.

* 사용법
Job job = new Job();

 FileInputFormat.setInputPath(job, inDir);
 FileOutputFormat.setOutputPath(job, outDir);

 job.setMapperClass(MOMap.class);
 job.setReducerClass(MOReduce.class);
 ...

 // Defines additional single text based output 'text' for the job
 MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
 LongWritable.class, Text.class);

 // Defines additional sequence-file based output 'sequence' for the job
 MultipleOutputs.addNamedOutput(job, "seq",
   SequenceFileOutputFormat.class,
   LongWritable.class, Text.class);
 ...

 job.waitForCompletion(true);
 ...

※ MultipleOutputs.addNamedOutpupt메서드를 통해서 여러개의 Output Collector를 만들고 각 Output collector에 출력 경로와 출력 포맷, 키와 값 유형 설정
※ MultipleOutputs 에서 출력하는 데이터는 기존  MapReduce job에서 생성하는 데이터와는 별도로 저장됨. (part-r-숫자5개 파일 아닌, myFile-r-숫자5개 파일 생성됨)

* 코드 작성

※ D와 A를 구분자로 추가한다는 점만 차이가 이전 코딩과 차이가 있다.

- DelayCountMapperWithMultipleOutputs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DelayCountMapperWithMultipleOutputs extends
        Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable outputValue = new IntWritable(1);
    private Text outputKey = new Text();
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        if (key.get() > 0) {
            String[] colums = value.toString().split(",");
            if (colums != null && colums.length > 0) {
                try {
                    if (!colums[15].equals("NA")) {
                        int depDelayTime = Integer.parseInt(colums[15]);
                        if (depDelayTime > 0) {
                            outputKey.set("D," + colums[0+ "," + colums[1]);
                            context.write(outputKey, outputValue);
                        } else if (depDelayTime == 0) {
                            context.getCounter(
                                    DelayCounters.scheduled_departure)
                                    .increment(1);
                        } else if (depDelayTime < 0) {
                            context.getCounter(DelayCounters.early_departure)
                                    .increment(1);
                        }
                    } else {
                        context.getCounter(
                                DelayCounters.not_available_departure)
                                .increment(1);
                    }
                    if (!colums[14].equals("NA")) {
                        int arrDelayTime = Integer.parseInt(colums[14]);
                        if (arrDelayTime > 0) {
                            outputKey.set("A," + colums[0+ "," + colums[1]);
                            context.write(outputKey, outputValue);
                        } else if (arrDelayTime == 0) {
                            context.getCounter(DelayCounters.scheduled_arrival)
                                    .increment(1);
                        } else if (arrDelayTime < 0) {
                            context.getCounter(DelayCounters.early_arrival)
                                    .increment(1);
                        }
                    } else {
                        context.getCounter(DelayCounters.not_available_arrival)
                                .increment(1);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
cs

- DelayCountReducerWithMultipleOutputs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class DelayCountReducerWithMultipleOutputs extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private MultipleOutputs<Text, IntWritable> mos;
    private Text outputKey = new Text();
    private IntWritable result = new IntWritable();
    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        String[] colums = key.toString().split(",");
        outputKey.set(colums[1+ "," + colums[2]);
        if (colums[0].equals("D")) {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            result.set(sum);
            mos.write("departure", outputKey, result);
        } else {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            result.set(sum);
            mos.write("arrival", outputKey, result);
        }
    }
    @Override
    public void cleanup(Context context) throws IOException,
            InterruptedException {
        mos.close();
    }
}
cs

- DelayCountWithMultipleOutputs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package pj01.hadoop.chapter05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCountWithMultipleOutputs extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(getConf(), args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err
                    .println("Usage: DelayCountWithMultipleOutputs <in> <out>");
            System.exit(2);
        }
        Job job = new Job(getConf(), "DelayCountWithMultipleOutputs");
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        job.setJarByClass(DelayCountWithMultipleOutputs.class);
        job.setMapperClass(DelayCountMapperWithMultipleOutputs.class);
        job.setReducerClass(DelayCountReducerWithMultipleOutputs.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        MultipleOutputs.addNamedOutput(job, "departure",
                TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class,
                Text.class, IntWritable.class);
        job.waitForCompletion(true);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                new DelayCountWithMultipleOutputs(), args);
        System.out.println("## RESULT:" + res);
    }
}


cs

* 결과확인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[hduser@hdstudy01 hadoop]$ hadoop fs -lsr delay_count_mos
-rw-r--r--   1 hduser supergroup          0 2017-06-13 10:00 /user/hduser/delay_count_mos/_SUCCESS
drwxr-xr-x   - hduser supergroup          0 2017-06-13 09:01 /user/hduser/delay_count_mos/_logs
drwxr-xr-x   - hduser supergroup          0 2017-06-13 09:01 /user/hduser/delay_count_mos/_logs/history
-rw-r--r--   1 hduser supergroup     691750 2017-06-13 09:01 /user/hduser/delay_count_mos/_logs/history/job_201706091514_0007_1497312111810_hduser_DelayCountWithMultipleOutputs
-rw-r--r--   1 hduser supergroup      51489 2017-06-13 09:01 /user/hduser/delay_count_mos/_logs/history/job_201706091514_0007_conf.xml
-rw-r--r--   1 hduser supergroup       3619 2017-06-13 09:59 /user/hduser/delay_count_mos/arrival-r-00000
-rw-r--r--   1 hduser supergroup       3619 2017-06-13 09:59 /user/hduser/delay_count_mos/departure-r-00000
-rw-r--r--   1 hduser supergroup          0 2017-06-13 09:59 /user/hduser/delay_count_mos/part-r-00000
[hduser@hdstudy01 hadoop]$
[hduser@hdstudy01 hadoop]$ hadoop fs -cat delay_count_mos/arrival-r-00000 | head -10
1987,10 175092
1987,11 176511
1987,12 217478
1988,1  196945
1988,10 161683
1988,11 174450
1988,12 188210
1988,2  177016
1988,3  186422
1988,4  158561
[hduser@hdstudy01 hadoop]$ hadoop fs -cat delay_count_mos/departure-r-00000 | head -10
1987,10 175568
1987,11 177218
1987,12 218858
1988,1  198610
1988,10 162211
1988,11 175123
1988,12 189137
1988,2  177939
1988,3  187141
1988,4  159216
cs



  • 체인
※ 실제로 MapReduce 프로그램을 작성하다보면, 하나 이상의 Mapper와 Reducer를 작성해야할 때가 발생. 하둡은 이렇게 하나의 MapReduce job에서 여러개의 Mapper와 Reducer를 실행할 수 있도록 ChainMapper와 ChainReducer를 제공.

※ 여러개의 job을 별도로 실행하지 않게되어 입출력이 감소하게 됨.

※ ChainMapper와 ChainReducer 는 체인방식으로 Mapper와 Reducer를 호출

* ChainMapper 동작방식
 첫번째 Mapper를 실행한 후에 그 출력을 두번째 Mapper의 입력 Parameter로 전달
 두번재 Mapper의 출력은 그다음 순서의 Mapper에 입력으로 전달
 마지막 Mapper의 출력이 Mapper의 최종 출력 데이터가 됨.

static
<K1,V1,K2,V2>
void
addMapper(JobConf job, Class<? extends Mapper<K1,V1,K2,V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass,Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf mapperConf) 

addMapper : Chain 작업의 jobconf에 MapperClass를 추가하는 메서드

※ ChainReducer 동작방식
 Mapper와 동일한 체인방식
static
<K1,V1,K2,V2>
void
addMapper(JobConf job, Class<? extends Mapper<K1,V1,K2,V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass,Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf mapperConf)
          Adds a Mapper class to the chain job's JobConf.

static
<K1,V1,K2,V2>
void
setReducer(JobConf job, Class<? extends Reducer<K1,V1,K2,V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass, Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf reducerConf)
          Sets the Reducer class to the chain job's JobConf.

setReducer : Chain작업의 jobconf에 Reducer Class를 설정
addMapper : Reducer 실행후, Mapper를 추가할 때 사용

* 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
... 
conf.setJobName("chain");
conf.setInputFormat(TextInputFormat.class); 
conf.setOutputFormat(TextOutputFormat.class); 
JobConf mapAConf = new JobConf(false);
 ... 
ChainMapper.addMapper(conf, AMap.class, LongWritable.class
    Text.class, Text.class, Text.classtrue, mapAConf); 
JobConf mapBConf = new JobConf(false);
 ... 
ChainMapper.addMapper(conf, BMap.class, Text.class
    Text.class, LongWritable.class, Text.classfalse, mapBConf); 
JobConf reduceConf = new JobConf(false);
 ... 
ChainReducer.setReducer(conf, XReduce.class, LongWritable.class
    Text.class, Text.class, Text.classtrue, reduceConf); 
ChainReducer.addMapper(conf, CMap.class, Text.class
    Text.class, LongWritable.class, Text.classfalsenull); 
ChainReducer.addMapper(conf, DMap.class, LongWritable.class
    Text.class, LongWritable.class, LongWritable.classtruenull); 
FileInputFormat.setInputPaths(conf, inDir); 
FileOutputFormat.setOutputPath(conf, outDir);
 ... 
JobClient jc = new JobClient(conf);
RunningJob job = jc.submitJob(conf);
 ...
cs
※ 세번째 Mapper 추가시, ChainReducer 클래스를 사용한다는 것에 주의.

댓글

가장 많이 본 글