5장. 맵리듀스 기초(1)

분석용 데이터 준비 - ASA 통계데이터 다운


  • 분석용 데이터 제공 사이트 접속 및 다운

stat-computing.org/dataexpo/2009



  • 데이터 준비

- 다운받은 bz2 파일들을 하둡홈/data 폴더를 생성하여 복사해둔다.
 $ cd $HADOOP_HOME/data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[hduser@hdstudy01 data]$ ll
합계 1668620
-rw-rw-r--1 hduser hduser  12652442 2017-06-11 08:56 1987.csv.bz2
-rw-rw-r--1 hduser hduser  49499025 2017-06-11 08:56 1988.csv.bz2
-rw-rw-r--1 hduser hduser  49202298 2017-06-11 08:56 1989.csv.bz2
-rw-rw-r--1 hduser hduser  52041322 2017-06-11 08:56 1990.csv.bz2
-rw-rw-r--1 hduser hduser  49877448 2017-06-11 08:56 1991.csv.bz2
-rw-rw-r--1 hduser hduser  50040946 2017-06-11 08:56 1992.csv.bz2
-rw-rw-r--1 hduser hduser  50111774 2017-06-11 08:56 1993.csv.bz2
-rw-rw-r--1 hduser hduser  51123887 2017-06-11 08:56 1994.csv.bz2
-rw-rw-r--1 hduser hduser  74881752 2017-06-11 08:56 1995.csv.bz2
-rw-rw-r--1 hduser hduser  75887707 2017-06-11 08:56 1996.csv.bz2
-rw-rw-r--1 hduser hduser  76705687 2017-06-11 08:57 1997.csv.bz2
-rw-rw-r--1 hduser hduser  76683506 2017-06-11 08:57 1998.csv.bz2
-rw-rw-r--1 hduser hduser  79449438 2017-06-11 08:57 1999.csv.bz2
-rw-rw-r--1 hduser hduser  82537924 2017-06-11 08:57 2000.csv.bz2
-rw-rw-r--1 hduser hduser  83478700 2017-06-11 08:57 2001.csv.bz2
-rw-rw-r--1 hduser hduser  75907218 2017-06-11 08:57 2002.csv.bz2
-rw-rw-r--1 hduser hduser  95326801 2017-06-11 08:57 2003.csv.bz2
-rw-rw-r--1 hduser hduser 110825331 2017-06-11 08:57 2004.csv.bz2
-rw-rw-r--1 hduser hduser 112450321 2017-06-11 08:57 2005.csv.bz2
-rw-rw-r--1 hduser hduser 115019195 2017-06-11 08:57 2006.csv.bz2
-rw-rw-r--1 hduser hduser 121249243 2017-06-11 08:57 2007.csv.bz2
-rw-rw-r--1 hduser hduser 113753229 2017-06-11 08:57 2008.csv.bz2
cs

- 압축해제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[hduser@hdstudy01 data]$ bzip2 --*.bz2
  1987.csv.bz2: done
  1988.csv.bz2: done
  1989.csv.bz2: done
  1990.csv.bz2: done
  1991.csv.bz2: done
  1992.csv.bz2: done
  1993.csv.bz2: done
  1994.csv.bz2: done
  1995.csv.bz2: done
  1996.csv.bz2: done
  1997.csv.bz2: done
  1998.csv.bz2: done
  1999.csv.bz2: done
  2000.csv.bz2: done
  2001.csv.bz2: done
  2002.csv.bz2: done
  2003.csv.bz2: done
  2004.csv.bz2: done
  2005.csv.bz2: done
  2006.csv.bz2: done
  2007.csv.bz2: done
  2008.csv.bz2: done
cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[hduser@hdstudy01 data]$ ll
합계 11747404
-rw-rw-r--1 hduser hduser 127162942 2017-06-11 08:56 1987.csv
-rw-rw-r--1 hduser hduser 501039472 2017-06-11 08:56 1988.csv
-rw-rw-r--1 hduser hduser 486518821 2017-06-11 08:56 1989.csv
-rw-rw-r--1 hduser hduser 509194687 2017-06-11 08:56 1990.csv
-rw-rw-r--1 hduser hduser 491210093 2017-06-11 08:56 1991.csv
-rw-rw-r--1 hduser hduser 492313731 2017-06-11 08:56 1992.csv
-rw-rw-r--1 hduser hduser 490753652 2017-06-11 08:56 1993.csv
-rw-rw-r--1 hduser hduser 501558665 2017-06-11 08:56 1994.csv
-rw-rw-r--1 hduser hduser 530751568 2017-06-11 08:56 1995.csv
-rw-rw-r--1 hduser hduser 533922363 2017-06-11 08:56 1996.csv
-rw-rw-r--1 hduser hduser 540347861 2017-06-11 08:57 1997.csv
-rw-rw-r--1 hduser hduser 538432875 2017-06-11 08:57 1998.csv
-rw-rw-r--1 hduser hduser 552926022 2017-06-11 08:57 1999.csv
-rw-rw-r--1 hduser hduser 570151613 2017-06-11 08:57 2000.csv
-rw-rw-r--1 hduser hduser 600411462 2017-06-11 08:57 2001.csv
-rw-rw-r--1 hduser hduser 530507013 2017-06-11 08:57 2002.csv
-rw-rw-r--1 hduser hduser 626745242 2017-06-11 08:57 2003.csv
-rw-rw-r--1 hduser hduser 669879113 2017-06-11 08:57 2004.csv
-rw-rw-r--1 hduser hduser 671027265 2017-06-11 08:57 2005.csv
-rw-rw-r--1 hduser hduser 672068096 2017-06-11 08:57 2006.csv
-rw-rw-r--1 hduser hduser 702878193 2017-06-11 08:57 2007.csv
-rw-rw-r--1 hduser hduser 689413344 2017-06-11 08:57 2008.csv
cs

- HDFS업로드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[hduser@hdstudy01 data]$ hadoop fs -put ./ input
[hduser@hdstudy01 data]$ hadoop fs -ls input
Found 22 items
-rw-r--r--   1 hduser supergroup  127162942 2017-06-11 09:18 /user/hduser/input/1987.csv
-rw-r--r--   1 hduser supergroup  501039472 2017-06-11 09:17 /user/hduser/input/1988.csv
-rw-r--r--   1 hduser supergroup  486518821 2017-06-11 09:19 /user/hduser/input/1989.csv
-rw-r--r--   1 hduser supergroup  509194687 2017-06-11 09:15 /user/hduser/input/1990.csv
-rw-r--r--   1 hduser supergroup  491210093 2017-06-11 09:14 /user/hduser/input/1991.csv
-rw-r--r--   1 hduser supergroup  492313731 2017-06-11 09:18 /user/hduser/input/1992.csv
-rw-r--r--   1 hduser supergroup  490753652 2017-06-11 09:19 /user/hduser/input/1993.csv
-rw-r--r--   1 hduser supergroup  501558665 2017-06-11 09:16 /user/hduser/input/1994.csv
-rw-r--r--   1 hduser supergroup  530751568 2017-06-11 09:19 /user/hduser/input/1995.csv
-rw-r--r--   1 hduser supergroup  533922363 2017-06-11 09:18 /user/hduser/input/1996.csv
-rw-r--r--   1 hduser supergroup  540347861 2017-06-11 09:16 /user/hduser/input/1997.csv
-rw-r--r--   1 hduser supergroup  538432875 2017-06-11 09:15 /user/hduser/input/1998.csv
-rw-r--r--   1 hduser supergroup  552926022 2017-06-11 09:15 /user/hduser/input/1999.csv
-rw-r--r--   1 hduser supergroup  570151613 2017-06-11 09:17 /user/hduser/input/2000.csv
-rw-r--r--   1 hduser supergroup  600411462 2017-06-11 09:16 /user/hduser/input/2001.csv
-rw-r--r--   1 hduser supergroup  530507013 2017-06-11 09:17 /user/hduser/input/2002.csv
-rw-r--r--   1 hduser supergroup  626745242 2017-06-11 09:15 /user/hduser/input/2003.csv
-rw-r--r--   1 hduser supergroup  669879113 2017-06-11 09:18 /user/hduser/input/2004.csv
-rw-r--r--   1 hduser supergroup  671027265 2017-06-11 09:17 /user/hduser/input/2005.csv
-rw-r--r--   1 hduser supergroup  672068096 2017-06-11 09:16 /user/hduser/input/2006.csv
-rw-r--r--   1 hduser supergroup  702878193 2017-06-11 09:16 /user/hduser/input/2007.csv
-rw-r--r--   1 hduser supergroup  689413344 2017-06-11 09:18 /user/hduser/input/2008.csv
cs


  • 항공 출발 지연 데이터 분석 - 매퍼, 리듀서, 드라이버 구현 & Jar파일 추출

①DepartureDelayCountMapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DepartureDelayCountMapper extends
        Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable outputValue = new IntWritable(1);
    private Text outputKey = new Text();
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        if (key.get() > 0) {
            String[] colums = value.toString().split(",");
            if (colums != null && colums.length > 0) {
                try {
                    outputKey.set(colums[0+ "," + colums[1]);
                    if (!colums[15].equals("NA")) {
                        int depDelayTime = Integer.parseInt(colums[15]);
                        if (depDelayTime > 0) {
                            context.write(outputKey, outputValue);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
cs

②DelayCountReducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DelayCountReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values)
            sum += value.get();
        result.set(sum);
        context.write(key, result);
    }
}
cs

③DepartureDelayCount.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package pj01.hadoop.chapter05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class DepartureDelayCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        if (args.length != 2) {
            System.err.println("Usage: DepartureDelayCount <input> <output>");
            System.exit(2);
        }
        Job job = new Job(conf, "DepartureDelayCount");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setJarByClass(DepartureDelayCount.class);
        job.setMapperClass(DepartureDelayCountMapper.class);
        job.setReducerClass(DelayCountReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.waitForCompletion(true);
    }
}
cs

※ Jar 파일 추출은 4장. 맵리듀스 프로그래밍 참고

- 맵리듀스 프로그램 실행(상당히 오래 걸림)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
[hduser@hdstudy01 hadoop]$ hadoop jar alzio-hadoop-examples.jar input dep_delay_count
17/06/11 09:26:26 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
17/06/11 09:26:26 INFO input.FileInputFormat: Total input paths to process : 22
17/06/11 09:26:26 INFO util.NativeCodeLoader: Loaded the native-hadoop library
17/06/11 09:26:26 WARN snappy.LoadSnappy: Snappy native library not loaded
17/06/11 09:26:26 INFO mapred.JobClient: Running job: job_201706091514_0007
17/06/11 09:26:27 INFO mapred.JobClient:  map 0% reduce 0%
17/06/11 09:26:44 INFO mapred.JobClient:  map 1% reduce 0%
17/06/11 09:26:54 INFO mapred.JobClient:  map 2% reduce 0%
17/06/11 09:27:06 INFO mapred.JobClient:  map 3% reduce 0%
(중략)
17/06/11 09:45:07 INFO mapred.JobClient:  map 96% reduce 32%
17/06/11 09:45:10 INFO mapred.JobClient:  map 97% reduce 32%
17/06/11 09:45:19 INFO mapred.JobClient:  map 98% reduce 32%
17/06/11 09:45:28 INFO mapred.JobClient:  map 99% reduce 32%
17/06/11 09:45:30 INFO mapred.JobClient:  map 100% reduce 32%
17/06/11 09:45:38 INFO mapred.JobClient:  map 100% reduce 78%
17/06/11 09:45:41 INFO mapred.JobClient:  map 100% reduce 86%
17/06/11 09:45:44 INFO mapred.JobClient:  map 100% reduce 93%
17/06/11 09:45:47 INFO mapred.JobClient:  map 100% reduce 100%
17/06/11 09:45:48 INFO mapred.JobClient: Job complete: job_201706091514_0007
17/06/11 09:45:48 INFO mapred.JobClient: Counters: 29
17/06/11 09:45:48 INFO mapred.JobClient:   Map-Reduce Framework
17/06/11 09:45:48 INFO mapred.JobClient:     Spilled Records=135244821
17/06/11 09:45:48 INFO mapred.JobClient:     Map output materialized bytes=663186768
17/06/11 09:45:48 INFO mapred.JobClient:     Reduce input records=50018329
17/06/11 09:45:48 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=362867494912
17/06/11 09:45:48 INFO mapred.JobClient:     Map input records=123534991
17/06/11 09:45:48 INFO mapred.JobClient:     SPLIT_RAW_BYTES=21131
17/06/11 09:45:48 INFO mapred.JobClient:     Map output bytes=563148988
17/06/11 09:45:48 INFO mapred.JobClient:     Reduce shuffle bytes=663186768
17/06/11 09:45:48 INFO mapred.JobClient:     Physical memory (bytes) snapshot=33919832064
17/06/11 09:45:48 INFO mapred.JobClient:     Reduce input groups=255
17/06/11 09:45:48 INFO mapred.JobClient:     Combine output records=0
17/06/11 09:45:48 INFO mapred.JobClient:     Reduce output records=255
17/06/11 09:45:48 INFO mapred.JobClient:     Map output records=50018329
17/06/11 09:45:48 INFO mapred.JobClient:     Combine input records=0
17/06/11 09:45:48 INFO mapred.JobClient:     CPU time spent (ms)=661650
17/06/11 09:45:48 INFO mapred.JobClient:     Total committed heap usage (bytes)=24530169856
17/06/11 09:45:48 INFO mapred.JobClient:   File Input Format Counters
17/06/11 09:45:48 INFO mapred.JobClient:     Bytes Read=12029889933
17/06/11 09:45:48 INFO mapred.JobClient:   FileSystemCounters
17/06/11 09:45:48 INFO mapred.JobClient:     HDFS_BYTES_READ=12029911064
17/06/11 09:45:48 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1802269808
17/06/11 09:45:48 INFO mapred.JobClient:     FILE_BYTES_READ=1128262179
17/06/11 09:45:48 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=3635
17/06/11 09:45:48 INFO mapred.JobClient:   Job Counters
17/06/11 09:45:48 INFO mapred.JobClient:     Launched map tasks=187
17/06/11 09:45:48 INFO mapred.JobClient:     Launched reduce tasks=1
17/06/11 09:45:48 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=1094855
17/06/11 09:45:48 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
17/06/11 09:45:48 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=2247614
17/06/11 09:45:48 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
17/06/11 09:45:48 INFO mapred.JobClient:     Data-local map tasks=187
17/06/11 09:45:48 INFO mapred.JobClient:   File Output Format Counters
17/06/11 09:45:48 INFO mapred.JobClient:     Bytes Written=3635
cs

- 추출된 파일 확인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[hduser@hdstudy01 hadoop]$ hadoop fs -ls dep_delay_count
Found 3 items
-rw-r--r--   1 hduser supergroup          0 2017-06-11 09:45 /user/hduser/dep_delay_count/_SUCCESS
drwxr-xr-x   - hduser supergroup          0 2017-06-11 09:26 /user/hduser/dep_delay_count/_logs
-rw-r--r--   1 hduser supergroup       3635 2017-06-11 09:45 /user/hduser/dep_delay_count/part-r-00000
[hduser@hdstudy01 hadoop]$ hadoop fs -cat dep_delay_count/part-r-00000 | head -10
1987,10 175568
1987,11 177218
1987,12 218858
1988,1  198610
1988,10 162211
1988,11 175123
1988,12 189137
1988,2  177939
1988,3  187141
1988,4  159216
[hduser@hdstudy01 hadoop]$
cs

항공 도착 지연 데이터 분석 - 매퍼, 리듀서, 드라이버 구현, 실행
- Java 문서 작성
① ArrivalDelayCountMapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package pj01.hadoop.chapter05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ArrivalDelayCountMapper extends
        Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable outputValue = new IntWritable(1);
    private Text outputKey = new Text();
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        if (key.get() > 0) {
            String[] colums = value.toString().split(",");
            if (colums != null && colums.length > 0) {
                try {
                    outputKey.set(colums[0+ "," + colums[1]);
                    if (!colums[14].equals("NA")) {
                        int arrDelayTime = Integer.parseInt(colums[14]);
                        if (arrDelayTime > 0) {
                            context.write(outputKey, outputValue);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
cs

② DelayCountReducer (항공출발지연데이터분석과 동일함)
③ ArrivalDelayCount
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package pj01.hadoop.chapter05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class ArrivalDelayCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        if (args.length != 2) {
            System.err.println("Usage: ArrivalDelayCount <input> <output>");
            System.exit(2);
        }
        Job job = new Job(conf, "ArrivalDelayCount");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setJarByClass(ArrivalDelayCount.class);
        job.setMapperClass(ArrivalDelayCountMapper.class);
        job.setReducerClass(DelayCountReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.waitForCompletion(true);
    }
}
cs

※ 나머지 과정은 항공 출발지연 데이터분석과 동일

댓글

가장 많이 본 글