6장. 맵리듀스 기초(2)
- 사용자 정의 옵션 사용 -사용자정의 옵션, 매퍼, 드라이버 구현, 실행
*API 설명
Class GenericOptionsParser
- 옵션
-conf
-D : 하둡 환경 설정 파일에 있는 옵션에 새로운 값 설정
-fs : 네임노드와 포트를 명시하여 새롭게 설정
-jt : 잡트래커와 포트를 명시하여 새롭게 설정
-files : 여러가지 파일리스트를 쓸수 있음. 로컬에 있는 파일을 hdfs 공유파일시스템으로 복사하는 역할
-libjars : 로컬에 있는 jar 파일을 hdfs 공유파일시스템으로 복사하고, 맵리듀스의 Task Class path에 추가.
-archives : 아카이브 파일 리스트를 쓸수 있음. 로컬에 있는 아카이브 파일을 hdfs 공유파일시스템으로 복사하고, 압축을 푸는 역할.
- 예제
$ bin/hadoop dfs -fs darwin:8020 -ls /data
list /data directory in dfs with namenode darwin:8020
$ bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data
list /data directory in dfs with namenode darwin:8020
$ bin/hadoop dfs -conf hadoop-site.xml -ls /data
list /data directory in dfs with conf specified in hadoop-site.xml
$ bin/hadoop job -D mapred.job.tracker=darwin:50020 -submit job.xml
submit a job to job tracker darwin:50020
$ bin/hadoop job -jt darwin:50020 -submit job.xml
submit a job to job tracker darwin:50020
$ bin/hadoop job -jt local -submit job.xml
submit a job to local runner
$ bin/hadoop jar -libjars testlib.jar
-archives test.tgz -files file.txt inputjar args
job submission with libjars, files and archive
- 생성자
GenericOptionsParser(Configuration conf, String[] args) Create a
GenericOptionsParser to parse only the generic Hadoop arguments.
※ GenericOptionParser Class는 내부적으로 Configuration 객체를 만들어서 생성자에게서 전달받은 환경 설정 정보를 설정.
※ 맵리듀스 프로그램을 개발할 때는 GenericOptionParser만 단독으로 사용하지 않고, GenericOptionsParser가 사용하는 Configuration객체를 상속받는 Tool Interface와 GanaricOptionsParser를 내부적으로 선언한 Tool-Runner Class를 많이 이용.
Interface Tool
→ GenericOptionsParser에 콘솔 설정 옵션을 지원하기 위한 Interface임.
Configuration Class를 상속받음
run메소드가 정의되어 있음
Class ToolRunner
→ Tool Interface에 실행을 도와주는 helper Class
→ GenericOptionsParser를 사용해서 사용자가 콘솔 명령에서 설정한 옵션을 분석하게 되고, Configuration 객체를 설정
→ Configuration 객체를 Tool Interface에 전달한 후, Tool Interface에 run 메서드 실행
static int | run(Configuration conf, Tool tool, String[] args) |
* 코드 작성
- DelayCount.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package pj01.hadoop.chapter05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCount extends Configured implements Tool {
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: DelayCount
System.exit(2);
}
Job job = new Job(getConf(), "DelayCount");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(DelayCount.class);
job.setMapperClass(DelayCountMapper.class);
job.setReducerClass(DelayCountReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new DelayCount(), args);
System.out.println("## RESULT:" + res);
}
}
| cs |
※ 기존에는 main에서 다 구현했으나, 사용자정의 구현에서는 run 메소드에서 구현함.
※ Tool 인터페이스를 구현하려면 반드시 int 반환형 run 메소드를 구현해야함.
※ getRemainingArgs() 메소드를 통해서 입출력 경로를 지정하는 나머지 인자들만 뽑아냄.
-DelayCountMapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DelayCountMapper extends
Mapper<LongWritable, Text, Text, IntWritable>; {
private String workType;
private final static IntWritable outputValue = new IntWritable(1);
private Text outputKey = new Text();
@Override
public void setup(Context context) throws IOException, InterruptedException {
workType = context.getConfiguration().get("workType");
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (key.get() > 0) {
String[] colums = value.toString().split(",");
if (colums != null && colums.length > 0) {
try {
if (workType.equals("departure")) {
if (!colums[15].equals("NA")) {
int depDelayTime = Integer.parseInt(colums[15]);
if (depDelayTime > 0) {
outputKey.set(colums[0] + "," + colums[1]);
context.write(outputKey, outputValue);
}
}
} else if (workType.equals("arrival")) {
if (!colums[14].equals("NA")) {
int arrDelayTime = Integer.parseInt(colums[14]);
if (arrDelayTime > 0) {
outputKey.set(colums[0] + "," + colums[1]);
context.write(outputKey, outputValue);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
| cs |
- DelayCountReducer.java
(4장. 참조)
*실행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
[hduser@hdstudy01 hadoop]$ hadoop jar alzio-hadoop-examples.jar -D workType=departure input departure_delay_count
17/06/13 00:10:54 INFO input.FileInputFormat: Total input paths to process : 22
17/06/13 00:10:54 INFO util.NativeCodeLoader: Loaded the native-hadoop library
17/06/13 00:10:54 WARN snappy.LoadSnappy: Snappy native library not loaded
17/06/13 00:10:55 INFO mapred.JobClient: Running job: job_201706091514_0002
17/06/13 00:10:56 INFO mapred.JobClient: map 0% reduce 0%
17/06/13 00:11:14 INFO mapred.JobClient: map 1% reduce 0%
17/06/13 00:11:29 INFO mapred.JobClient: map 2% reduce 0%
17/06/13 00:11:43 INFO mapred.JobClient: map 3% reduce 0%
(중략)
17/06/13 00:35:31 INFO mapred.JobClient: map 100% reduce 80%
17/06/13 00:35:34 INFO mapred.JobClient: map 100% reduce 87%
17/06/13 00:35:37 INFO mapred.JobClient: map 100% reduce 94%
17/06/13 00:35:40 INFO mapred.JobClient: map 100% reduce 100%
17/06/13 00:35:42 INFO mapred.JobClient: Job complete: job_201706091514_0002
17/06/13 00:35:42 INFO mapred.JobClient: Counters: 29
17/06/13 00:35:42 INFO mapred.JobClient: Map-Reduce Framework
17/06/13 00:35:42 INFO mapred.JobClient: Spilled Records=134256366
17/06/13 00:35:42 INFO mapred.JobClient: Map output materialized bytes=658461244
17/06/13 00:35:42 INFO mapred.JobClient: Reduce input records=49680792
17/06/13 00:35:42 INFO mapred.JobClient: Virtual memory (bytes) snapshot=360901206016
17/06/13 00:35:42 INFO mapred.JobClient: Map input records=122846665
17/06/13 00:35:42 INFO mapred.JobClient: SPLIT_RAW_BYTES=21018
17/06/13 00:35:42 INFO mapred.JobClient: Map output bytes=559098544
17/06/13 00:35:42 INFO mapred.JobClient: Reduce shuffle bytes=658461244
17/06/13 00:35:42 INFO mapred.JobClient: Physical memory (bytes) snapshot=33694904320
17/06/13 00:35:42 INFO mapred.JobClient: Reduce input groups=254
17/06/13 00:35:42 INFO mapred.JobClient: Combine output records=0
17/06/13 00:35:42 INFO mapred.JobClient: Reduce output records=254
17/06/13 00:35:42 INFO mapred.JobClient: Map output records=49680792
17/06/13 00:35:42 INFO mapred.JobClient: Combine input records=0
17/06/13 00:35:42 INFO mapred.JobClient: CPU time spent (ms)=857820
17/06/13 00:35:42 INFO mapred.JobClient: Total committed heap usage (bytes)=24400076800
17/06/13 00:35:42 INFO mapred.JobClient: File Input Format Counters
17/06/13 00:35:42 INFO mapred.JobClient: Bytes Read=11962658585
17/06/13 00:35:42 INFO mapred.JobClient: FileSystemCounters
17/06/13 00:35:42 INFO mapred.JobClient: HDFS_BYTES_READ=11962679603
17/06/13 00:35:42 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1788443612
17/06/13 00:35:42 INFO mapred.JobClient: FILE_BYTES_READ=1119149315
17/06/13 00:35:42 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=3619
17/06/13 00:35:42 INFO mapred.JobClient: Job Counters
17/06/13 00:35:42 INFO mapred.JobClient: Launched map tasks=186
17/06/13 00:35:42 INFO mapred.JobClient: Launched reduce tasks=1
17/06/13 00:35:42 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=1408288
17/06/13 00:35:42 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
17/06/13 00:35:42 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=2895292
17/06/13 00:35:42 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
17/06/13 00:35:42 INFO mapred.JobClient: Data-local map tasks=186
17/06/13 00:35:42 INFO mapred.JobClient: File Output Format Counters
17/06/13 00:35:42 INFO mapred.JobClient: Bytes Written=3619
## RESULT:0
[hduser@hdstudy01 hadoop]$
| cs |
*결과확인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
[hduser@hdstudy01 hadoop]$ hadoop fs -lsr departure_delay_count
-rw-r--r-- 1 hduser supergroup 0 2017-06-13 00:35 /user/hduser/departure_delay_count/_SUCCESS
drwxr-xr-x - hduser supergroup 0 2017-06-13 00:10 /user/hduser/departure_delay_count/_logs
drwxr-xr-x - hduser supergroup 0 2017-06-13 00:10 /user/hduser/departure_delay_count/_logs/history
-rw-r--r-- 1 hduser supergroup 543034 2017-06-13 00:10 /user/hduser/departure_delay_count/_logs/history/job_201706091514_0002_1497280254916_hduser_DelayCount
-rw-r--r-- 1 hduser supergroup 50020 2017-06-13 00:10 /user/hduser/departure_delay_count/_logs/history/job_201706091514_0002_conf.xml
-rw-r--r-- 1 hduser supergroup 3619 2017-06-13 00:35 /user/hduser/departure_delay_count/part-r-00000
[hduser@hdstudy01 hadoop]$
[hduser@hdstudy01 hadoop]$ hadoop fs -cat departure_delay_count/part-r-00000 | head -10
1987,10 175568
1987,11 177218
1987,12 218858
1988,1 198610
1988,10 162211
1988,11 175123
1988,12 189137
1988,2 177939
1988,3 187141
1988,4 159216
| cs |
- 카운터 사용 - 사용자정의 옵션, 매퍼, 드라이버 구현, 실행
※ 내장 Counter는 Map, Reduce, combiner의 입출력 Record건수와 Byte를 확인가능
※ 내장 Counter는 Maptask와 Reducetask가 실행 및 실패 여부, Read/Write Byte 확인가능
※ 개발자가 Counter를 정의할 수 있기 때문에 맵과 로직의 동작체크 때 유용함.
* 코드 작성
- DelayCounters.java (enum Class로 외부클래스로 별도작성)
1
2
3
4
5
6 |
package pj01.hadoop.chapter05;
public enum DelayCounters {
not_available_arrival, scheduled_arrival, early_arrival,
not_available_departure, scheduled_departure, early_departure;
}
| cs |
※ Counter 이름 : 6개 field
- DelayCountMapperWithCounter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DelayCountMapperWithCounter extends
Mapper<LongWritable, Text, Text, IntWritable> {
private String workType;
private final static IntWritable outputValue = new IntWritable(1);
private Text outputKey = new Text();
@Override
public void setup(Context context) throws IOException, InterruptedException {
workType = context.getConfiguration().get("workType");
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (key.get() > 0) {
String[] colums = value.toString().split(",");
if (colums != null && colums.length > 0) {
try {
if (workType.equals("departure")) {
if (!colums[15].equals("NA")) {
int depDelayTime = Integer.parseInt(colums[15]);
if (depDelayTime > 0) {
outputKey.set("D," + colums[0] + ","
+ colums[1]);
context.write(outputKey, outputValue);
} else if (depDelayTime == 0) {
context.getCounter(DelayCounters.scheduled_departure)
.increment(1);
} else if (depDelayTime < 0) {
context.getCounter(
DelayCounters.early_departure)
.increment(1);
}
} else {
context.getCounter(
DelayCounters.not_available_departure)
.increment(1);
}
} else if (workType.equals("arrival")) {
if (!colums[14].equals("NA")) {
int arrDelayTime = Integer.parseInt(colums[14]);
if (arrDelayTime > 0) {
outputKey.set("A," + colums[0] + ","
+ colums[1]);
context.write(outputKey, outputValue);
} else if (arrDelayTime == 0) {
context.getCounter(
DelayCounters.scheduled_arrival)
.increment(1);
} else if (arrDelayTime < 0) {
context.getCounter(DelayCounters.early_arrival)
.increment(1);
}
} else {
context.getCounter(
DelayCounters.not_available_arrival)
.increment(1);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
| cs |
※ context객체에서 getCounter메서드에 Counter이름을 Parameter로 설정해서 조회가능
※ Counter(enum Class에서 정의)들을 각 케이스에 따라 1씩 증가시킴.
- DelayCountWithCounter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
package pj01.hadoop.chapter05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCountWithCounter extends Configured implements Tool {
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: DelayCountWithCounter <in> <out>");
System.exit(2);
}
Job job = new Job(getConf(), "DelayCountWithCounter");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(DelayCountWithCounter.class);
job.setMapperClass(DelayCountMapperWithCounter.class);
job.setReducerClass(DelayCountReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(),
new DelayCountWithCounter(), args);
System.out.println("## RESULT:" + res);
}
}
| cs |
* 실행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
[hduser@hdstudy01 hadoop]$ hadoop jar alzio-hadoop-examples.jar input delay_count_mos
17/06/13 09:01:50 INFO input.FileInputFormat: Total input paths to process : 22
17/06/13 09:01:51 INFO util.NativeCodeLoader: Loaded the native-hadoop library
17/06/13 09:01:51 WARN snappy.LoadSnappy: Snappy native library not loaded
17/06/13 09:01:51 INFO mapred.JobClient: Running job: job_201706091514_0007
17/06/13 09:01:52 INFO mapred.JobClient: map 0% reduce 0%
17/06/13 09:29:17 INFO mapred.JobClient: map 1% reduce 0%
17/06/13 09:29:38 INFO mapred.JobClient: map 2% reduce 0%
17/06/13 09:30:00 INFO mapred.JobClient: map 3% reduce 0%
(중략)
17/06/13 10:00:05 INFO mapred.JobClient: map 100% reduce 95%
17/06/13 10:00:08 INFO mapred.JobClient: map 100% reduce 98%
17/06/13 10:00:10 INFO mapred.JobClient: map 100% reduce 100%
17/06/13 10:00:13 INFO mapred.JobClient: Job complete: job_201706091514_0007
17/06/13 10:00:14 INFO mapred.JobClient: Counters: 35
17/06/13 10:00:14 INFO mapred.JobClient: Map-Reduce Framework
17/06/13 10:00:14 INFO mapred.JobClient: Spilled Records=336979000
17/06/13 10:00:14 INFO mapred.JobClient: Map output materialized bytes=1513032768
17/06/13 10:00:14 INFO mapred.JobClient: Reduce input records=99190250
17/06/13 10:00:14 INFO mapred.JobClient: Virtual memory (bytes) snapshot=361090170880
17/06/13 10:00:14 INFO mapred.JobClient: Map input records=122846665
17/06/13 10:00:14 INFO mapred.JobClient: SPLIT_RAW_BYTES=21018
17/06/13 10:00:14 INFO mapred.JobClient: Map output bytes=1314651152
17/06/13 10:00:14 INFO mapred.JobClient: Reduce shuffle bytes=1513032768
17/06/13 10:00:14 INFO mapred.JobClient: Physical memory (bytes) snapshot=35178119168
17/06/13 10:00:14 INFO mapred.JobClient: Reduce input groups=508
17/06/13 10:00:14 INFO mapred.JobClient: Combine output records=0
17/06/13 10:00:14 INFO mapred.JobClient: Reduce output records=0
17/06/13 10:00:14 INFO mapred.JobClient: Map output records=99190250
17/06/13 10:00:14 INFO mapred.JobClient: Combine input records=0
17/06/13 10:00:14 INFO mapred.JobClient: CPU time spent (ms)=1223500
17/06/13 10:00:14 INFO mapred.JobClient: Total committed heap usage (bytes)=24384839680
17/06/13 10:00:14 INFO mapred.JobClient: pj01.hadoop.chapter06.DelayCounters
17/06/13 10:00:14 INFO mapred.JobClient: scheduled_arrival=26216520
17/06/13 10:00:14 INFO mapred.JobClient: early_arrival=44542943
17/06/13 10:00:14 INFO mapred.JobClient: not_available_departure=2293613
17/06/13 10:00:14 INFO mapred.JobClient: early_departure=44612589
17/06/13 10:00:14 INFO mapred.JobClient: scheduled_departure=26259648
17/06/13 10:00:14 INFO mapred.JobClient: not_available_arrival=2577721
17/06/13 10:00:14 INFO mapred.JobClient: File Input Format Counters
17/06/13 10:00:14 INFO mapred.JobClient: Bytes Read=11962658585
17/06/13 10:00:14 INFO mapred.JobClient: FileSystemCounters
17/06/13 10:00:14 INFO mapred.JobClient: HDFS_BYTES_READ=11962679603
17/06/13 10:00:14 INFO mapred.JobClient: FILE_BYTES_WRITTEN=5149360037
17/06/13 10:00:14 INFO mapred.JobClient: FILE_BYTES_READ=3625193707
17/06/13 10:00:14 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=7238
17/06/13 10:00:14 INFO mapred.JobClient: Job Counters
17/06/13 10:00:14 INFO mapred.JobClient: Launched map tasks=186
17/06/13 10:00:14 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=1768311
17/06/13 10:00:14 INFO mapred.JobClient: Launched reduce tasks=1
17/06/13 10:00:14 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
17/06/13 10:00:14 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=3595771
17/06/13 10:00:14 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
17/06/13 10:00:14 INFO mapred.JobClient: Data-local map tasks=186
17/06/13 10:00:14 INFO mapred.JobClient: File Output Format Counters
17/06/13 10:00:14 INFO mapred.JobClient: Bytes Written=0
## RESULT:0
[hduser@hdstudy01 hadoop]$
| cs |
*결과 확인
- 다수의 파일 출력 - MultipleOutputs, 매퍼, 리듀서, 드라이버 구현, 실행
※ org.apache.hadoop.mapreduce.lib.output 에 정의되어 있음.
* 사용법
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
...
// Defines additional single text based output 'text' for the job
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// Defines additional sequence-file based output 'sequence' for the job
MultipleOutputs.addNamedOutput(job, "seq",
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
...
job.waitForCompletion(true);
...
※ MultipleOutputs.addNamedOutpupt메서드를 통해서 여러개의 Output Collector를 만들고 각 Output collector에 출력 경로와 출력 포맷, 키와 값 유형 설정
※ MultipleOutputs 에서 출력하는 데이터는 기존 MapReduce job에서 생성하는 데이터와는 별도로 저장됨. (part-r-숫자5개 파일 아닌, myFile-r-숫자5개 파일 생성됨)
* 코드 작성
※ D와 A를 구분자로 추가한다는 점만 차이가 이전 코딩과 차이가 있다.
- DelayCountMapperWithMultipleOutputs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DelayCountMapperWithMultipleOutputs extends
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable outputValue = new IntWritable(1);
private Text outputKey = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (key.get() > 0) {
String[] colums = value.toString().split(",");
if (colums != null && colums.length > 0) {
try {
if (!colums[15].equals("NA")) {
int depDelayTime = Integer.parseInt(colums[15]);
if (depDelayTime > 0) {
outputKey.set("D," + colums[0] + "," + colums[1]);
context.write(outputKey, outputValue);
} else if (depDelayTime == 0) {
context.getCounter(
DelayCounters.scheduled_departure)
.increment(1);
} else if (depDelayTime < 0) {
context.getCounter(DelayCounters.early_departure)
.increment(1);
}
} else {
context.getCounter(
DelayCounters.not_available_departure)
.increment(1);
}
if (!colums[14].equals("NA")) {
int arrDelayTime = Integer.parseInt(colums[14]);
if (arrDelayTime > 0) {
outputKey.set("A," + colums[0] + "," + colums[1]);
context.write(outputKey, outputValue);
} else if (arrDelayTime == 0) {
context.getCounter(DelayCounters.scheduled_arrival)
.increment(1);
} else if (arrDelayTime < 0) {
context.getCounter(DelayCounters.early_arrival)
.increment(1);
}
} else {
context.getCounter(DelayCounters.not_available_arrival)
.increment(1);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
| cs |
- DelayCountReducerWithMultipleOutputs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class DelayCountReducerWithMultipleOutputs extends
Reducer<Text, IntWritable, Text, IntWritable> {
private MultipleOutputs<Text, IntWritable> mos;
private Text outputKey = new Text();
private IntWritable result = new IntWritable();
@Override
public void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, IntWritable>(context);
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
String[] colums = key.toString().split(",");
outputKey.set(colums[1] + "," + colums[2]);
if (colums[0].equals("D")) {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
result.set(sum);
mos.write("departure", outputKey, result);
} else {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
result.set(sum);
mos.write("arrival", outputKey, result);
}
}
@Override
public void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
| cs |
- DelayCountWithMultipleOutputs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
package pj01.hadoop.chapter05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DelayCountWithMultipleOutputs extends Configured implements Tool {
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err
.println("Usage: DelayCountWithMultipleOutputs <in> <out>");
System.exit(2);
}
Job job = new Job(getConf(), "DelayCountWithMultipleOutputs");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(DelayCountWithMultipleOutputs.class);
job.setMapperClass(DelayCountMapperWithMultipleOutputs.class);
job.setReducerClass(DelayCountReducerWithMultipleOutputs.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
MultipleOutputs.addNamedOutput(job, "departure",
TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class,
Text.class, IntWritable.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(),
new DelayCountWithMultipleOutputs(), args);
System.out.println("## RESULT:" + res);
}
}
| cs |
* 결과확인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
[hduser@hdstudy01 hadoop]$ hadoop fs -lsr delay_count_mos
-rw-r--r-- 1 hduser supergroup 0 2017-06-13 10:00 /user/hduser/delay_count_mos/_SUCCESS
drwxr-xr-x - hduser supergroup 0 2017-06-13 09:01 /user/hduser/delay_count_mos/_logs
drwxr-xr-x - hduser supergroup 0 2017-06-13 09:01 /user/hduser/delay_count_mos/_logs/history
-rw-r--r-- 1 hduser supergroup 691750 2017-06-13 09:01 /user/hduser/delay_count_mos/_logs/history/job_201706091514_0007_1497312111810_hduser_DelayCountWithMultipleOutputs
-rw-r--r-- 1 hduser supergroup 51489 2017-06-13 09:01 /user/hduser/delay_count_mos/_logs/history/job_201706091514_0007_conf.xml
-rw-r--r-- 1 hduser supergroup 3619 2017-06-13 09:59 /user/hduser/delay_count_mos/arrival-r-00000
-rw-r--r-- 1 hduser supergroup 3619 2017-06-13 09:59 /user/hduser/delay_count_mos/departure-r-00000
-rw-r--r-- 1 hduser supergroup 0 2017-06-13 09:59 /user/hduser/delay_count_mos/part-r-00000
[hduser@hdstudy01 hadoop]$
[hduser@hdstudy01 hadoop]$ hadoop fs -cat delay_count_mos/arrival-r-00000 | head -10
1987,10 175092
1987,11 176511
1987,12 217478
1988,1 196945
1988,10 161683
1988,11 174450
1988,12 188210
1988,2 177016
1988,3 186422
1988,4 158561
[hduser@hdstudy01 hadoop]$ hadoop fs -cat delay_count_mos/departure-r-00000 | head -10
1987,10 175568
1987,11 177218
1987,12 218858
1988,1 198610
1988,10 162211
1988,11 175123
1988,12 189137
1988,2 177939
1988,3 187141
1988,4 159216
| cs |
- 체인
※ 여러개의 job을 별도로 실행하지 않게되어 입출력이 감소하게 됨.
※ ChainMapper와 ChainReducer 는 체인방식으로 Mapper와 Reducer를 호출
* ChainMapper 동작방식
첫번째 Mapper를 실행한 후에 그 출력을 두번째 Mapper의 입력 Parameter로 전달
두번재 Mapper의 출력은 그다음 순서의 Mapper에 입력으로 전달
마지막 Mapper의 출력이 Mapper의 최종 출력 데이터가 됨.
static | addMapper(JobConf job, Class<? extends Mapper<K1,V1,K2,V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass,Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf mapperConf) |
addMapper : Chain 작업의 jobconf에 MapperClass를 추가하는 메서드
※ ChainReducer 동작방식
Mapper와 동일한 체인방식
static | addMapper(JobConf job, Class<? extends Mapper<K1,V1,K2,V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass,Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf mapperConf)Adds a Mapper class to the chain job's JobConf. | |
static | setReducer(JobConf job, Class<? extends Reducer<K1,V1,K2,V2>> klass, Class<? extends K1> inputKeyClass, Class<? extends V1> inputValueClass, Class<? extends K2> outputKeyClass, Class<? extends V2> outputValueClass, boolean byValue, JobConf reducerConf)Sets the Reducer class to the chain job's JobConf. |
setReducer : Chain작업의 jobconf에 Reducer Class를 설정
addMapper : Reducer 실행후, Mapper를 추가할 때 사용
* 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 |
...
conf.setJobName("chain");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
JobConf mapAConf = new JobConf(false);
...
ChainMapper.addMapper(conf, AMap.class, LongWritable.class,
Text.class, Text.class, Text.class, true, mapAConf);
JobConf mapBConf = new JobConf(false);
...
ChainMapper.addMapper(conf, BMap.class, Text.class,
Text.class, LongWritable.class, Text.class, false, mapBConf);
JobConf reduceConf = new JobConf(false);
...
ChainReducer.setReducer(conf, XReduce.class, LongWritable.class,
Text.class, Text.class, Text.class, true, reduceConf);
ChainReducer.addMapper(conf, CMap.class, Text.class,
Text.class, LongWritable.class, Text.class, false, null);
ChainReducer.addMapper(conf, DMap.class, LongWritable.class,
Text.class, LongWritable.class, LongWritable.class, true, null);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
...
JobClient jc = new JobClient(conf);
RunningJob job = jc.submitJob(conf);
...
| cs |




댓글
댓글 쓰기