原文:https://blog.csdn.net/u010003835/category_7599581.html?spm=1001.2014.3001.5482

测试表以及测试数据
+—————————————————-+
| createtab_stmt |
+—————————————————-+
| CREATE TABLE `datacube_salary_org`( |
| `company_name` string COMMENT ‘????’, |
| `dep_name` string COMMENT ‘????’, |
| `user_id` bigint COMMENT ‘??id’, |
| `user_name` string COMMENT ‘????’, |
| `salary` decimal(10,2) COMMENT ‘??’, |
| `create_time` date COMMENT ‘????’, |
| `update_time` date COMMENT ‘????’) |
| PARTITIONED BY ( |
| `pt` string COMMENT ‘????’) |
| ROW FORMAT SERDE |
| ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’ |
| WITH SERDEPROPERTIES ( |
| ‘field.delim’=’,’, |
| ‘serialization.format’=’,’) |
| STORED AS INPUTFORMAT |
| ‘org.apache.hadoop.mapred.TextInputFormat’ |
| OUTPUTFORMAT |
| ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’ |
| LOCATION |
| ‘hdfs://cdh-manager:8020/user/hive/warehouse/data_warehouse_test.db/datacube_salary_org’ |
| TBLPROPERTIES ( |
| ‘transient_lastDdlTime’=’1586310488’) |
+—————————————————-+
+———————————–+——————————-+——————————+——————————–+—————————–+———————————-+———————————-+————————-+
| datacube_salary_org.company_name | datacube_salary_org.dep_name | datacube_salary_org.user_id | datacube_salary_org.user_name | datacube_salary_org.salary | datacube_salary_org.create_time | datacube_salary_org.update_time | datacube_salary_org.pt |
+———————————–+——————————-+——————————+——————————–+—————————–+———————————-+———————————-+————————-+
| s.zh | engineer | 1 | szh | 28000.00 | 2020-04-07 | 2020-04-07 | 20200405 |
| s.zh | engineer | 2 | zyq | 26000.00 | 2020-04-03 | 2020-04-03 | 20200405 |
| s.zh | tester | 3 | gkm | 20000.00 | 2020-04-07 | 2020-04-07 | 20200405 |
| x.qx | finance | 4 | pip | 13400.00 | 2020-04-07 | 2020-04-07 | 20200405 |
| x.qx | finance | 5 | kip | 24500.00 | 2020-04-07 | 2020-04-07 | 20200405 |
| x.qx | finance | 6 | zxxc | 13000.00 | 2020-04-07 | 2020-04-07 | 20200405 |
| x.qx | kiccp | 7 | xsz | 8600.00 | 2020-04-07 | 2020-04-07 | 20200405 |
| s.zh | engineer | 1 | szh | 28000.00 | 2020-04-07 | 2020-04-07 | 20200406 |
| s.zh | engineer | 2 | zyq | 26000.00 | 2020-04-03 | 2020-04-03 | 20200406 |
| s.zh | tester | 3 | gkm | 20000.00 | 2020-04-07 | 2020-04-07 | 20200406 |
| x.qx | finance | 4 | pip | 13400.00 | 2020-04-07 | 2020-04-07 | 20200406 |
| x.qx | finance | 5 | kip | 24500.00 | 2020-04-07 | 2020-04-07 | 20200406 |
| x.qx | finance | 6 | zxxc | 13000.00 | 2020-04-07 | 2020-04-07 | 20200406 |
| x.qx | kiccp | 7 | xsz | 8600.00 | 2020-04-07 | 2020-04-07 | 20200406 |
| s.zh | enginer | 1 | szh | 28000.00 | 2020-04-07 | 2020-04-07 | 20200407 |
| s.zh | enginer | 2 | zyq | 26000.00 | 2020-04-03 | 2020-04-03 | 20200407 |
| s.zh | tester | 3 | gkm | 20000.00 | 2020-04-07 | 2020-04-07 | 20200407 |
| x.qx | finance | 4 | pip | 13400.00 | 2020-04-07 | 2020-04-07 | 20200407 |
| x.qx | finance | 5 | kip | 24500.00 | 2020-04-07 | 2020-04-07 | 20200407 |
| x.qx | finance | 6 | zxxc | 13000.00 | 2020-04-07 | 2020-04-07 | 20200407 |
| x.qx | kiccp | 7 | xsz | 8600.00 | 2020-04-07 | 2020-04-07 | 20200407 |
+———————————–+——————————-+——————————+——————————–+—————————–+———————————-+———————————-+————————-+

 

场景四.控制任务中 节点 / 文件 数量
1) Mapper 数量控制

2) Reducer 数量控制

3) 控制 Mapper,Reducer 输出的文件数量

 

1) Mapper 数量控制
注意默认情况下: 一个文件,一个Map.

每个Map最大输入大小
set mapred.max.split.size=256000000;
256,000,000 ~= 256M;

一个节点上split的至少的大小
set mapred.min.split.size.per.node=100000000;
100,000,000 ~= 100M;

一个交换机下split的至少的大小
set mapred.min.split.size.per.rack=100000000;
100,000,000 ~= 100M;

执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
这里我们要清楚的知道什么是 MapReduce 中的 split 流程

参考文章如下:https://blog.csdn.net/weixin_39879326/article/details/80074955

 

输入分片(Input Split):

在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。

 

Hadoop 2.x默认的block大小是128MB,Hadoop 1.x默认的block大小是64MB,可以在hdfs-site.xml中设置dfs.block.size,注意单位是byte。

分片大小范围可以在mapred-site.xml中设置,mapred.min.split.size mapred.max.split.size,minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807

那么分片到底是多大呢?

minSize=max{minSplitSize,mapred.min.split.size}

maxSize=mapred.max.split.size

splitSize=max{minSize,min{maxSize,blockSize}}

源码:

 

所以在我们没有设置分片的范围的时候,分片大小是由block块大小决定的,和它的大小一样。

 

block_size : hdfs的文件块大小,默认为64M,可以通过参数dfs.block.size设置

total_size : 输入文件整体的大小

input_file_num : 输入文件的个数

 

(1)默认map个数

如果不进行任何设置,默认的map个数是和blcok_size相关的。

default_num = total_size / block_size;

 

(2)期望大小

可以通过参数mapred.map.tasks来设置程序员期望的map个数,但是这个个数只有在大于default_num的时候,才会生效。

goal_num = mapred.map.tasks;

 

(3)设置处理的文件大小

可以通过mapred.min.split.size 设置每个task处理的文件大小,但是这个大小只有在大于block_size的时候才会生效。

split_size = max(mapred.min.split.size, block_size);

split_num = total_size / split_size;

 

(4)计算的map个数

compute_map_num = min(split_num, max(default_num, goal_num))

 

除了这些配置以外,mapreduce还要遵循一些原则。 mapreduce的每一个map处理的数据是不能跨越文件的,也就是说min_map_num >= input_file_num。 所以,最终的map个数应该为:

final_map_num = max(compute_map_num, input_file_num)

 

经过以上的分析,在设置map个数的时候,可以简单的总结为以下几点:

(1)如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。

(2)如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。

(3)如果输入中有很多小文件,依然想减少map个数,则需要将小文件merge 为大文件,然后使用准则2。

 

注意 : 上面这些情况,只是针对于文件本身是可分割的情况。

如果文件本身不可分割,比如采用了压缩方式,还是由1个Map进行处理。

 

 

 

 

2) Reducer 数量控制
每个Reduce处理的数据量
set hive.exec.reducers.bytes.per.reducer=500000000;
500,000,000 ~= 500M;

指定Reduce数量
set mapred.reduce.tasks=20;

注意 指定Reduce数量 set mapred.reduce.tasks=20;

为参考值,如果是order by 还是会在一个Reducer 中进行处理 !!!!!!

SQL 如下

set mapred.reduce.tasks=20;

SELECT
type
,COUNT(1) AS num
FROM
(
SELECT
‘a’ AS type
,total_salary
FROM datacube_salary_basic_aggr AS a
UNION ALL
SELECT
‘b’ AS type
,total_salary
FROM datacube_salary_company_aggr AS b
UNION ALL
SELECT
‘c’ AS type
,total_salary
FROM datacube_salary_dep_aggr AS c
UNION ALL
SELECT
‘d’ AS type
,total_salary
FROM datacube_salary_total_aggr AS d
) AS tmp
GROUP BY
type
ORDER BY num
;
EXPLAIN 结果 :

+—————————————————-+
| Explain |
+—————————————————-+
| STAGE DEPENDENCIES: |
| Stage-1 is a root stage |
| Stage-2 depends on stages: Stage-1 |
| Stage-0 depends on stages: Stage-2 |
| |
| STAGE PLANS: |
| Stage: Stage-1 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| alias: a |
| Statistics: Num rows: 7 Data size: 2086 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: ‘a’ (type: string) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 7 Data size: 595 Basic stats: COMPLETE Column stats: COMPLETE |
| Union |
| Statistics: Num rows: 17 Data size: 1445 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: _col0 (type: string) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 17 Data size: 1445 Basic stats: COMPLETE Column stats: COMPLETE |
| Group By Operator |
| aggregations: count(1) |
| keys: _col0 (type: string) |
| mode: hash |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| Reduce Output Operator |
| key expressions: _col0 (type: string) |
| sort order: + |
| Map-reduce partition columns: _col0 (type: string) |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| value expressions: _col1 (type: bigint) |
| TableScan |
| alias: b |
| Statistics: Num rows: 2 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: ‘b’ (type: string) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 2 Data size: 170 Basic stats: COMPLETE Column stats: COMPLETE |
| Union |
| Statistics: Num rows: 17 Data size: 1445 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: _col0 (type: string) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 17 Data size: 1445 Basic stats: COMPLETE Column stats: COMPLETE |
| Group By Operator |
| aggregations: count(1) |
| keys: _col0 (type: string) |
| mode: hash |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| Reduce Output Operator |
| key expressions: _col0 (type: string) |
| sort order: + |
| Map-reduce partition columns: _col0 (type: string) |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| value expressions: _col1 (type: bigint) |
| TableScan |
| alias: c |
| Statistics: Num rows: 4 Data size: 1160 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: ‘c’ (type: string) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 4 Data size: 340 Basic stats: COMPLETE Column stats: COMPLETE |
| Union |
| Statistics: Num rows: 17 Data size: 1445 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: _col0 (type: string) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 17 Data size: 1445 Basic stats: COMPLETE Column stats: COMPLETE |
| Group By Operator |
| aggregations: count(1) |
| keys: _col0 (type: string) |
| mode: hash |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| Reduce Output Operator |
| key expressions: _col0 (type: string) |
| sort order: + |
| Map-reduce partition columns: _col0 (type: string) |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| value expressions: _col1 (type: bigint) |
| TableScan |
| alias: d |
| Statistics: Num rows: 4 Data size: 448 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: ‘d’ (type: string) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 4 Data size: 340 Basic stats: COMPLETE Column stats: COMPLETE |
| Union |
| Statistics: Num rows: 17 Data size: 1445 Basic stats: COMPLETE Column stats: COMPLETE |
| Select Operator |
| expressions: _col0 (type: string) |
| outputColumnNames: _col0 |
| Statistics: Num rows: 17 Data size: 1445 Basic stats: COMPLETE Column stats: COMPLETE |
| Group By Operator |
| aggregations: count(1) |
| keys: _col0 (type: string) |
+—————————————————-+
| Explain |
+—————————————————-+
| mode: hash |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| Reduce Output Operator |
| key expressions: _col0 (type: string) |
| sort order: + |
| Map-reduce partition columns: _col0 (type: string) |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| value expressions: _col1 (type: bigint) |
| Reduce Operator Tree: |
| Group By Operator |
| aggregations: count(VALUE._col0) |
| keys: KEY._col0 (type: string) |
| mode: mergepartial |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| File Output Operator |
| compressed: false |
| table: |
| input format: org.apache.hadoop.mapred.SequenceFileInputFormat |
| output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat |
| serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe |
| |
| Stage: Stage-2 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| Reduce Output Operator |
| key expressions: _col1 (type: bigint) |
| sort order: + |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| value expressions: _col0 (type: string) |
| Reduce Operator Tree: |
| Select Operator |
| expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: bigint) |
| outputColumnNames: _col0, _col1 |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| File Output Operator |
| compressed: false |
| Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE |
| table: |
| input format: org.apache.hadoop.mapred.SequenceFileInputFormat |
| output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat |
| serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
| |
| Stage: Stage-0 |
| Fetch Operator |
| limit: -1 |
| Processor Tree: |
| ListSink |
| |
+—————————————————-+
151 rows selected (0.181 seconds)
可以看到 STAGE 2 是一个全局排序的过程 !!!

我们执行下SQL

INFO : Total jobs = 2
INFO : Launching Job 1 out of 2
INFO : Starting task [Stage-1:MAPRED] in parallel
INFO : Launching Job 2 out of 2
INFO : Starting task [Stage-2:MAPRED] in parallel
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-1: Map: 4 Reduce: 20 Cumulative CPU: 42.35 sec HDFS Read: 140563 HDFS Write: 2000 SUCCESS
INFO : Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 3.98 sec HDFS Read: 12389 HDFS Write: 151 SUCCESS
INFO : Total MapReduce CPU Time Spent: 46 seconds 330 msec
INFO : Completed executing command(queryId=hive_20200411132830_d964b0ba-b7d8-4526-aabd-99a84ecc726b); Time taken: 144.075 seconds
INFO : OK
+——-+——+
| type | num |
+——-+——+
| b | 2 |
| c | 4 |
| d | 4 |
| a | 7 |
+——-+——+
4 rows selected (144.21 seconds)
可以看到,ORDER BY 的时候会额外增加一个JOB。并且 reducer 数量 并不会按照我们设置的那样,设置为20个,而是1个。

 

 

 

另外,盲目调整Reducer 数量,对小数量任务并不会起到任何作用,反而会增加耗时!!

因为,申请 Reducer 是需要额外的时间开销的

以如下SQL为例

EXPLAIN
SELECT
type
,COUNT(1) AS num
FROM
(
SELECT
‘a’ AS type
,total_salary
FROM datacube_salary_basic_aggr AS a
UNION ALL
SELECT
‘b’ AS type
,total_salary
FROM datacube_salary_company_aggr AS b
UNION ALL
SELECT
‘c’ AS type
,total_salary
FROM datacube_salary_dep_aggr AS c
UNION ALL
SELECT
‘d’ AS type
,total_salary
FROM datacube_salary_total_aggr AS d
) AS tmp
GROUP BY
type

;
set mapred.reduce.tasks=1;

INFO : Total jobs = 1
INFO : Launching Job 1 out of 1
INFO : Starting task [Stage-1:MAPRED] in parallel
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-1: Map: 4 Reduce: 1 Cumulative CPU: 10.05 sec HDFS Read: 46686 HDFS Write: 151 SUCCESS
INFO : Total MapReduce CPU Time Spent: 10 seconds 50 msec
INFO : Completed executing command(queryId=hive_20200411131939_bcd3a7a2-6610-4d87-a00f-28159cd04238); Time taken: 38.014 seconds
INFO : OK
+——-+——+
| type | num |
+——-+——+
| a | 7 |
| b | 2 |
| c | 4 |
| d | 4 |
+——-+——+
4 rows selected (38.137 seconds)
set mapred.reduce.tasks=20;

INFO : Total jobs = 1
INFO : Launching Job 1 out of 1
INFO : Starting task [Stage-1:MAPRED] in parallel
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-1: Map: 4 Reduce: 20 Cumulative CPU: 43.24 sec HDFS Read: 157843 HDFS Write: 1804 SUCCESS
INFO : Total MapReduce CPU Time Spent: 43 seconds 240 msec
INFO : Completed executing command(queryId=hive_20200411132129_f6428764-ae55-44cb-9f0e-0c4bee9f1356); Time taken: 120.029 seconds
INFO : OK
+——-+——+
| type | num |
+——-+——+
| d | 4 |
| a | 7 |
| b | 2 |
| c | 4 |
+——-+——+
4 rows selected (120.276 seconds)
可以看到,我们开销了20个reducer数量,结果消耗的时间反而更多了 !!!

 

 

3)MR job 输出文件数控制
主要是解决以下问题:

map输入的小文件从哪里来, 怎么避免map合并小文件?
reduce太多,导致作业生成小文件数量过多。Tips:正常情况下,一个reduce,对应一个输出文件。
小文件过多会降低namenode性能

相关参数:

在Map-only的任务结束时,合并小文件
set hive.merge.mapfiles=true

在Map-Reduce的任务结束时合并小文件
set hive.merge.mapredfiles=true

合并文件的大小
set hive.merge.size.per.task=256000000;
256,000,000 ~= 256M;

输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件合并
set hive.merge.smallfiles.avgsize=16000000;
16,000,000 ~= 16M;
————————————————
版权声明:本文为CSDN博主「高达一号」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u010003835/article/details/105494261

原文地址:http://www.cnblogs.com/liujiacai/p/16854765.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性