分布式1015-1021-分布式回归分析

目录

一、开始!今日信息量巨大

大佬们展示肌肉。

回归部分还需要些数学根底。

代码后面也有一丢丢正文。

先给一个linux服务器的方便功能。

Screen -DR 加个名字,可以开启永远运行的窗口。

Ctrl A +

  1. C 创建新窗口
  2. N 切换下一个窗口
  3. D 回到主界面(并未关闭窗口)
    Exit 命令退出

同学们的学习能力很强,可以打开vim编辑器并且退出了。(笑

不不,是有与数据对话的能力了。

希望脱颖而出,代码能力肯定比不上,但是我们有专业优势,即对数据分析的能力。我们懂模型,懂预测。

至于为什么要用hadoop,因为数据多了,非常大。

给出一个场景

场景:美国二手车,kaggle us-used-car。一共300w条记录,66个变量。

因变量:Price,最主要的任务就是探究price受谁的影响。

首先这么多变量中,存在很多数据缺失问题。去缺失。传到服务器上后,挑出一些缺失值少的变量。

今天作业:
用这个数据,清理出一份可以回归的变量来。r里面有很多现成的东西。

二、分布式上的回归分析

如何在分布式上进行回归分析?区别在哪?(按行读取)

原来的数据n乘p维,n很小100,p很小10。

现在的数据n乘p维,n很大300w,p有66列,实际上会比这多得多,比如多个水平的哑变量就会占很多列。p很可能大于1k。

原始数据9gb,存成双精度需要60g的内存。需要双倍的空间才能执行任务,单机不可能。但是我们有分布式。

beta不大,但是帽子阵根本求不了。要想解决这个问题,最难的在于计算:

有了目标,剩下的就很简单了。

第一个问题:如何构造把X^{t}Y求出来?

如果x仅有一列,相当于 $ 1n $ 与 $ n1 $相乘,代数运算即一一对应相乘求和,放在转置前看,即每行的元素相乘。如果x有两列,最终结果是2乘1的两个数,第一行为x第一列与y的对应乘积求和,第二行为x第二列与y的对应元素乘积求和。(内积)

第二个问题:如何把X^{t}X构造出来?

最终得到的是p乘p维的矩阵,第xij位置的元素,为x第i列与x第j列对应元素的乘积(内积)。i可以等于j。

看到所有问题的答案,我们发现,所有的计算都是行内部的计算!那不是很舒服?分行计算就行啊!

三、作业代码

1. 简单线性回归

生成回归数据的r文件就不贴了。来看看我写的又臭又长的估计法。

1
2
3
4
readme.txt
先使用Rscript reg.r建立reg.csv(回归数据集)
再使用MapReduce
mapperprocess文件都是分行操作,使用1个processreducer)求和就行

下面是生成数据用的R代码。代码中控制了beta的值,可以与最后结果比较。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#! /usr/bin/env Rscript

n = 1000
p = 10
x = matrix(rnorm(n*p), n, p)
e = rnorm(n)
beta = c(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
y = x%*%beta+0.3*e
mydata = cbind(x, y)
dim(mydata)
write.table(mydata, "linear.csv", sep = "," , row.names = FALSE, col.names = FALSE)
colnames(mydata) = c("x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "y")
mydata = data.frame(mydata)
myfit <- lm(y~x1+x2+x3+x4+x5+x6+x7+x8+x9+x10, mydata)
myfit$coefficients

这段代码是mapper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#! usr/bin/env python3
# 目标是做一些读取的工作,R做不了不同类型数据的存储

import sys
import numpy as np

# 先按“读取,把文件里字符型数据中的逗号替换成其他符号
def commakiller(abc):
i = 1
while(i<len(abc)):
abc[i] = abc[i].replace(",","***")
a = '"'
abc[i] = a + abc[i] + a
i = i + 2
b = ""
qline = b.join(abc)
return(qline)

#reader = csv.reader(sys.stdin)
#next(reader)
times = 1
for line in sys.stdin:
abc = line.split('"')
data = commakiller(abc).split(",")
p = len(data)
if p <= 1: continue
#if times == 1 : names = data;times = times + 1;continue
if p > 1 :
data[p-1] = data[p-1][:-1] # 每行后的换行符
datak = list(map(float, data))
xty = []
for i in range(p-1):
xty.append(datak[i] * datak[p-1]) # 默认第p个是因变量
print("*",",".join(str(i) for i in xty))
xtx = np.outer(datak[0:(p-1)],datak[0:(p-1)]) # 外积
print(",".join(",".join(str(k) for k in qq) for qq in xtx.tolist()))

mapper把数据用逗号分隔,标准输出在屏幕上。
用管道将mapper的输出结果能够被吸入process.py(reducer,如下段代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#! usr/bin/env python3
import sys
import numpy as np

xtx = [];xty = [];temp = []
for line in sys.stdin:
data = line.split(',')
#print(data)
p = len(data)
data[p-1] = data[p-1][:-1]
if line[0] == "*": # 说明是标记的xty
data[0] = data[0][2:]
p1 = len(data)
data = list(map(float, data))
if len(xty) == 0:
xty = data
continue
else:
xty = xty + np.array(data)
# 在新的xtx出现之前
if len(xtx) == 0:
xtx = temp
else:
xtx = np.array(xtx) + np.array(temp) # bug
len2 = len(temp)
temp = [] # 循环结束初始化
else: # 其他都是xtx
data = list(map(float, data))
if len(temp) == 0:
temp = data
else:
temp = temp + data # 连接


xtx = np.asarray(xtx).reshape(int(p1),int(len2/p1))
print(np.dot(np.linalg.inv(xtx),np.array(xty)))

原理与之前讲的相似,先计算xtx与xty,求逆(生成的矩阵有时候会奇异,那就重新生成一波)

最后是我们的main主函数shell文件,这个没啥变化 (不要直接跑,我改了文件名)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/bin/bash

PWD=$(cd $(dirname $0); pwd)
cd $PWD 1> /dev/null 2>&1

hadoop fs -put linear.csv /user/devel/hehe/reg

TASKNAME=linear-hehe
HADOOP_INPUT_DIR=/user/devel/hehe/reg/linear.csv
HADOOP_OUTPUT_DIR=/user/devel/hehe/output/1020output

echo $HADOOP_HOME
echo $HADOOP_INPUT_DIR
echo $HADOOP_OUTPUT_DIR

hadoop fs -rm -r $HADOOP_OUTPUT_DIR

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar \
-D mapred.job.name=$TASKNAME \
-D mapred.job.priority=HIGH \
-D stream.memory.limit=1000 \
-D mapred.reduce.tasks=1 \
-D mapred.job.map.capacity=100 \
-D mapred.job.map.capacity=100 \
-input ${HADOOP_INPUT_DIR} \
-output ${HADOOP_OUTPUT_DIR} \
-mapper "$PWD/mapper.py" \
-reducer "$PWD/process.py" \
-file "$PWD/mapper.py" "$PWD/process.py"


if [ $? -ne 0 ]; then
echo 'error'
exit 1
fi
hadoop fs -touchz ${HADOOP_OUTPUT_DIR}/done

hadoop fs -ls $HADOOP_OUTPUT_DIR | cat

exit 0

2. 清洗二手车数据

二手车数据来自kaggle 给个链接

1
2
3
4
5
6
7
8
9
10
11
本文件仅适用于二手车数据

仅进行了OLS回归,GLM需要在帽子阵计算中加权,未实现

本地5w行数据计算成功,hadoop上还未测试

na.py用于清洗数据,计算变量均值标准差,并给出适合的列。对300万原数据得到的结果存入vars.txt

mapper.py用于简单正态插补,计算帽子矩阵

reducer.py用于计算系数阵估计值betahat,得到的结果存入result1.txt中

先看na.py,这个代码对数据na等情况做了处理,有点长,这个文件在处理时单独运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#! usr/bin/env python3
## 任务:计算每列的na、均值、方差、总量与内部情况
times = 1
import sys
import re # 正则化
import numpy as np
import math
from operator import itemgetter

# 先按“读取,把文件里字符型数据中的逗号替换成其他符号
def commakiller(abc):
i = 1
while(i<len(abc)):
abc[i] = abc[i].replace(",","***")
a = '"'
abc[i] = a + abc[i] + a
i = i + 2
b = ""
qline = b.join(abc)
return(qline)

na_count = {}
num_count = {};var_count = {}
strvalue = {}
for line in sys.stdin:
data = line.split('"')
items = commakiller(data).split(',')
#print(",".join(str(k) for k in items))
if(times == 1):
times += 1
names = items # 后续遍历使用
continue
if(times >= 2):
## na的计算与插补需要把全数据遍历
for key, value in enumerate(items):
count = int((value == '') | (value == "--"))
na_count[key] = na_count.get(key, 0) + count # 每行统计缺失值
# get函数,如果对应key是空值,则返回0(设置的默认值),有缺失会被上一行count记录下来
# 计算其他列属性,描述统计

if ((value != "") & (value != "--")):
#if len(value) > 25: # 超长取值一般都无法处理,删除
# continue
try:
val_num = float(value) # 如果是数值型变量
num_count[key] = num_count.get(key,0) + val_num # 数值型直接求和
var_count[key] = var_count.get(key,0) + val_num ** 2 # 平方求和
except(ValueError):
try:
# 数值型变量带单位的,如下处理
if (("in" in value[-5:])&(re.findall('[a-z]in',value)==[])):
val_num = float(re.sub('in','',value,1))
num_count[key] = num_count.get(key,0) + val_num
var_count[key] = var_count.get(key,0) + val_num ** 2
elif (("seats" in value[-8:])&(re.findall('[a-z]seats',value)==[])):
val_num = float(re.sub('seats','',value,1))
num_count[key] = num_count.get(key,0) + val_num
var_count[key] = var_count.get(key,0) + val_num ** 2
elif (('gal' in value[-6:])&(re.findall('[a-z]gal',value)==[])):
val_num = float(re.sub('gal','',value,1))
num_count[key] = num_count.get(key,0) + val_num
var_count[key] = var_count.get(key,0) + val_num ** 2
#elif 'RPM' in value[-3:]:
# val_num = float(re.sub('RPM','',value,1))
# num_count[key] = num_count.get(key,0) + val_num
# 带单位的只有这几个,数值化后,全存进num_count的字典中
# 出了点问题,'148 lb-ft @ 200 RPM' 这什么意思(于是这列被删了)
# 下面处理所有字符类型的变量,用字典存储元素,并计算种类和数量
#print(num_count)
else:
strvalue.setdefault(key,{}) # 设定每个变量默认字典初始值为空
strvalue[key][value] = strvalue[key].get(value,0) + 1 # 每次更新对应元素的value,+1
except(ValueError):
#print('转换失败 第%s列\t%s'%(key,names[key]))
continue
times += 1

### 1.处理 na
abort = [];fix = [];perf = []

print(",".join(str(k) for k in names)) # 变量名-1行
sorted_na_count = sorted(na_count.items(), key=itemgetter(0))
for num, count in sorted_na_count:
na01 = times - count - 1
print('%s\t%s\t%s' % (num, count,times)) # 每列缺失值-66行
value = int(count)
key = int(num)
lendata = times - 1 # 数据长度
if key == 0:
abort.append(key)
continue # ID列直接加入废弃
if (value/lendata) >= 0.3:
abort.append(key) # 把大于30% 的缺失列号加入废弃
elif ((value/lendata > 0) & (value/lendata < 0.3)):
fix.append(key)
elif(value == 0):
perf.append(key)

print("需要丢掉的列号:%s\n需要插补的列号:%s\n完美列号:%s" % ((",".join(str(k) for k in abort)),(",".join(str(k) for k in fix)),(",".join(str(k) for k in perf))))

### 2.数值型变量
sorted_num_count = sorted(num_count.items(), key=itemgetter(0))
for num,count in sorted_num_count:
if int(num) in abort:
print('丢掉第%d列\t%s\t-是首列或因na过多'%(num,names[num]))
continue
xbar = count/na01
if xbar > 100000:
print('丢掉第%d列\t%s\t-xbar大于100000'%(num,names[num]))
continue
sdlist = math.sqrt(var_count[num]/na01 - xbar**2) # EX2 - (EX)2
print('%s\t%s\t%s\t%s' % (num, names[num], xbar, sdlist)) # num是列号,count是全元素和
## 问题,会出现很多大均值的列,不清楚为什么,需要筛选

### 3.字符型变量
for key in strvalue.keys(): # 把所有字符型的key遍历一遍 (都是列号)
if int(key) in abort:
try:
print('丢掉第%d列\t%s\t-na过多'%(key,names[key]))
continue
except(TypeError):
print('丢掉第%d列\t%s\t-数据出界'%(key,names[key]))
continue


if len(strvalue[key].keys()) > 10 : # 字符型变量之内有个统计,也存成dict,现在要取值大于5类的变量都消灭掉
print('丢掉第%d列\t%s\t-类数大于10'%(key,names[key]))
continue # 分行操作时,可能有些时候会保留一些取值本来很多的变量,不过没关系
sorted_str_count = sorted(strvalue[key].items(), key=itemgetter(0)) # 变量内部的字典-再计数,根据变量名这个key放回到names中找原位置
print('%s'%('第%d列'%(key)))
for num, count in sorted_str_count:
try:
print('%s\t%s\t%s' % (num, names[key], count))
except(TypeError):
print('上一行的有问题,丢掉第%d列\t%s\t-type_error了'%(key,names[key]))
continue

上面处理时,其实要十分了解原数据的含义和数据初始形式。在数据处理之前,尽可能选择取样观察,或者利用信息提前计划。

na.py的处理中,会给出需要删除/不需删除,各列的均值与标准差等。结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
[devel@emr-header-1 1026]$ cat vars.txt
vin,back_legroom,bed,bed_height,bed_length,body_type,cabin,city,city_fuel_economy,combine_fuel_economy,daysonmarket,dealer_zip,description,engine_cylinders,engine_displacement,engine_type,exterior_color,fleet,frame_damaged,franchise_dealer,franchise_make,front_legroom,fuel_tank_volume,fuel_type,has_accidents,height,highway_fuel_economy,horsepower,interior_color,isCab,is_certified,is_cpo,is_new,is_oemcpo,latitude,length,listed_date,listing_color,listing_id,longitude,main_picture_url,major_options,make_name,maximum_seating,mileage,model_name,owner_count,power,price,salvage,savings_amount,seller_rating,sp_id,sp_name,theft_title,torque,transmission,transmission_display,trimId,trim_name,vehicle_damage_category,wheel_system,wheel_system_display,wheelbase,width,year

0 0 3000600
1 242727 3000600
2 2980472 3000600
3 3000040 3000600
4 2579859 3000600
5 13543 3000600
6 2936507 3000600
7 0 3000600
8 491285 3000600
9 3000040 3000600
10 0 3000600
11 0 3000600
12 77901 3000600
13 100578 3000600
14 172383 3000600
15 100578 3000600
16 0 3000600
17 1426595 3000600
18 1426595 3000600
19 0 3000600
20 572568 3000600
21 175452 3000600
22 160673 3000600
23 82721 3000600
24 1426595 3000600
25 159733 3000600
26 491266 3000600
27 172383 3000600
28 2 3000600
29 1426595 3000600
30 2999953 3000600
31 2817055 3000600
32 0 3000600
33 2864591 3000600
34 0 3000600
35 159722 3000600
36 0 3000600
37 0 3000600
38 0 3000600
39 0 3000600
40 369087 3000600
41 200042 3000600
42 0 3000600
43 159766 3000600
44 144387 3000600
45 0 3000600
46 1517012 3000600
47 481415 3000600
48 0 3000600
49 1426595 3000600
50 0 3000600
51 40828 3000600
52 52 3000600
53 0 3000600
54 1426595 3000600
55 517782 3000600
56 64166 3000600
57 64166 3000600
58 115826 3000600
59 116293 3000600
60 2999953 3000600
61 146731 3000600
62 146731 3000600
63 159698 3000600
64 159746 3000600
65 0 3000600
66 0 3000600
67 0 3000600
68 0 3000600
69 0 3000600
70 0 3000600
71 0 3000600
72 0 3000600
73 0 3000600
74 0 3000600
75 0 3000600
76 0 3000600
77 0 3000600
78 0 3000600
79 0 3000600
80 0 3000600
81 0 3000600
82 0 3000600
83 0 3000600
84 0 3000600
85 0 3000600
86 0 3000600
87 0 3000600
88 0 3000600
89 0 3000600
90 0 3000600
91 0 3000600
92 0 3000600
93 0 3000600
94 0 3000600
95 0 3000600
需要丢掉的列号:0,2,3,4,6,9,17,18,24,29,30,31,33,46,49,54,60
需要插补的列号:1,5,8,12,13,14,15,20,21,22,23,25,26,27,28,35,40,41,43,44,47,51,52,55,56,57,58,59,61,62,63,64
完美列号:7,10,11,16,19,32,34,36,37,38,39,42,45,48,50,53,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95
丢掉第0列 vin -是首列或因na过多
1 back_legroom 34.88554152025584 10.803048024003107
丢掉第4列 bed_length -是首列或因na过多
8 city_fuel_economy 18.973479961834286 11.637300445996951
10 daysonmarket 76.0455595699392 108.87863896938518
11 dealer_zip 50669.81150830218 27375.94715300726
12 description 0.17846959890341893 255.2549902888531
14 engine_displacement 2797.285075413276 1481.0219220754384
16 exterior_color 2.260508282512925 442.6922512862191
21 front_legroom 39.72592702320007 10.02756775400742
22 fuel_tank_volume 17.608113546641402 6.7668815295322435
25 height 62.35115022029862 16.541684822918757
26 highway_fuel_economy 24.641676545249798 13.020379389747855
27 horsepower 233.69342021376397 105.1379926203423
28 interior_color 2.2194684128069095 559.1713000695153
34 latitude 36.97614874552056 5.025653349279904
35 length 183.34090699859522 47.80448637471194
丢掉第38列 listing_id -xbar大于100000
39 longitude -90.62260148008761 13.967765609990069
43 maximum_seating 5.18447183379052 1.6826429709038224
44 mileage 29640.29273455067 73067.93566116491
45 model_name 58.31043968221012 333.09392240515734
丢掉第46列 owner_count -是首列或因na过多
48 price 29926.953581444013 19568.717865222385
50 savings_amount 550.8380996594346 1079.351149695568
51 seller_rating 4.211379641169585 0.7130308234825646
丢掉第52列 sp_id -xbar大于100000
59 trim_name 0.04460616030332584 10.331173669741302
63 wheelbase 109.1642761329817 29.529077769800306
64 width 74.1947137895581 19.1476096014168
65 year
2017.2940512877597 29.89588340213624
丢掉第0列 vin -na过多
丢掉第5列 body_type -类数大于10
丢掉第7列 city -类数大于10
丢掉第12列 description -类数大于10
丢掉第13列 engine_cylinders -类数大于10
丢掉第15列 engine_type -类数大于10
丢掉第16列 exterior_color -类数大于10
丢掉第19列 franchise_dealer -类数大于10
丢掉第20列 franchise_make -类数大于10
丢掉第23列 fuel_type -类数大于10
丢掉第28列 interior_color -类数大于10
丢掉第32列 is_new -类数大于10
丢掉第36列 listed_date -类数大于10
丢掉第37列 listing_color -类数大于10
丢掉第40列 main_picture_url -类数大于10
丢掉第41列 major_options -类数大于10
丢掉第42列 make_name -类数大于10
丢掉第45列 model_name -类数大于10
丢掉第47列 power -类数大于10
丢掉第53列 sp_name -类数大于10
丢掉第55列 torque -类数大于10
丢掉第56列 transmission -类数大于10
丢掉第57列 transmission_display -类数大于10
丢掉第58列 trimId -类数大于10
丢掉第59列 trim_name -类数大于10
丢掉第61列 wheel_system -类数大于10
丢掉第62列 wheel_system_display -类数大于10
丢掉第17列 fleet -na过多
丢掉第18列 frame_damaged -na过多
丢掉第24列 has_accidents -na过多
丢掉第29列 isCab -na过多
丢掉第49列 salvage -na过多
丢掉第54列 theft_title -na过多
丢掉第31列 is_cpo -na过多
丢掉第33列 is_oemcpo -na过多
丢掉第6列 cabin -na过多
丢掉第2列 bed -na过多
丢掉第1列 back_legroom -类数大于10
丢掉第3列 bed_height -na过多
丢掉第4列 bed_length -na过多
丢掉第8列 city_fuel_economy -类数大于10
丢掉第9列 combine_fuel_economy -na过多
丢掉第10列 daysonmarket -类数大于10
丢掉第11列 dealer_zip -类数大于10
丢掉第14列 engine_displacement -类数大于10
丢掉第21列 front_legroom -类数大于10
丢掉第25列 height -类数大于10
丢掉第26列 highway_fuel_economy -类数大于10
丢掉第27列 horsepower -类数大于10
丢掉第22列 fuel_tank_volume -类数大于10
丢掉第30列 is_certified -na过多
丢掉第34列 latitude -类数大于10
35
'Backup Camera' length 1
4-wheel antilock length 1
Traction control - ABS and driveline length 1
automatic high beam on/off|Glass length 1
body-color (Not available on Double Cab models.)|Glass length 1
driver 8-way power|Seats length 1
front passenger length 1
heated driver and front passenger|Console front center with 2 cup holders and storage length 1
includes rear storage drawer (Excludes storage drawer with (GAT) All Terrain with (ABD) 5-passenger seating.)|Power outlet length 2
38
2 in front door panel listing_id 1
3-prong household style located on the rear of center console|Cup holders 2 in front center console listing_id 1
Xenon headlights"***V6***3600.0***V6***Silver***True***False***False******42.1 in***19 gal***Gasoline***False***59.1 in***28.0***304.0***Gray (Dark Grey)***True*********False******31.8552***202 in***2020-08-27***SILVER***280340040***-106.028***https://static.cargurus.com/images/forsale/2020/08/26/00/06/2016_cadillac_xts-pic-1844722391038826724-152x114.jpeg***"['Leather Seats' listing_id 1
deep-tinted|Wipers listing_id 1
folding|Dead pedal listing_id 1
rear (Requires Crew Cab or Double Cab model. Deleted with (ZW9) pickup box delete.)|Bumper listing_id1
tilt and telescopic|Display listing_id 2
top|Tailgate listing_id 1
39
'Navigation System' longitude 1
2 bottle holders in front door panel longitude 1
2 in front door panel longitude 1
EZ-Lift and Lower (Deleted when (ZW9) pickup box delete is ordered.)|Remote Locking Tailgate|Radio longitude 1
driver instrument information enhanced longitude 2
driver|Steering wheel longitude 1
front chrome|CornerStep longitude 1
front intermittent longitude 1
43
'Heated Seats' maximum_seating 1
3-channel programmable|Defogger maximum_seating 2
3-passenger (includes child seat top tether anchor)|Instrumentation maximum_seating 1
HD|Floor covering maximum_seating 1
chrome|4X4 chrome badge (Included and only available with 4X4 models.)|Grille surround maximum_seating 1
driver instrument information enhanced maximum_seating 1
miles/kilometers|Driver Information Center maximum_seating 1
tilt and telescopic|Display maximum_seating 1
44
'Android Auto' mileage 1
6-gauge cluster featuring speedometer mileage 1
chrome|Headlamps mileage 1
color-keyed carpeting|Driver Information Center mileage 1
driver instrument information enhanced mileage 1
enhanced mileage 1
one color|Sensor mileage 1
rear-window electric|Cup holders 2 in front center console mileage 2
丢掉第46列 owner_count -na过多
48
'Bluetooth' price 1
10 total|Lighting price 2
3-channel programmable|Air conditioning price 1
cargo box with switch on center switch bank (Deleted when (ZW9) pickup box delete is ordered.) (Deleted with (ZW9) pickup box delete.)|Fog lamps price 1
power price 1
power with driver and passenger Express-Down/Up|Cruise control price 1
right front passenger and rear seat occupants|Defogger price 1
voltage and oil pressure|Driver Information Center price 1
50
'Backup Camera' savings_amount 1
cargo compartment savings_amount 2
halogen|Mirror caps savings_amount 1
inside rearview manual day/night|Lighting savings_amount 1
programmable|Pedals savings_amount 1
right front passenger and rear seat occupants (Dual-zone climate control when (GAT) All Terrain is ordered. Tri-zone climate control on all other models.)|Defogger savings_amount 1
steering wheel mounted|Mirror savings_amount 1
warning messages and vehicle information|Windows savings_amount 1
51
'Remote Start']"***Cadillac***5 seats***64070.0***XTS***3.0***"304 hp @ 6 seller_rating 1
body-color (Included and only available with (GAT) All Terrain HD Package.) (Included and only available with (GAT) All Terrain Package and mirror caps will be Black.)|Mirror caps seller_rating 1
inside rearview manual day/night seller_rating 1
interior with theater dimming seller_rating 1
power with driver express up and down and express down on all other windows|Visors seller_rating 1
power-adjustable for accelerator and brake|Climate control seller_rating 1
rear-window electric|Mirror seller_rating 1
52
cargo compartment sp_id 1
chrome|Glass sp_id 1
driver and front passenger illuminated vanity mirrors|Assist handle sp_id 1
frameless|Visors sp_id 1
inside rearview auto-dimming|Lighting sp_id 1
second row reading lamps integrated into dome light sp_id 2
tri-zone automatic with individual climate settings for driver sp_id 1
800 RPM"***19950.0***False***65*********private seller***False***"264 lb-ft @ 5 sp_id 1
丢掉第60列 vehicle_damage_category -na过多
63
'Bluetooth' wheelbase 1
GMC Smart Driver wheelbase 2
front reading lamps|Shift knob wheelbase 1
frontal and side impact for driver and front passenger driver inboard seat-mounted side-impact wheelbase1
interior with dome light wheelbase 1
rear child security|Teen Driver mode a configurable feature that lets you activate customizable vehicle settings associated with a key fob wheelbase 1
tachometer wheelbase 1
64
'Backup Camera' width 1
Marketplace and more (Limitations apply. Not transferable. Standard connectivity available to original purchaser for ten years from the date of initial vehicle purchase for model year 2018 or newer GMC vehicles. See onstar.com for details and further plan limitations. Connected Access does not include emergency or security services. Availability and additional services enables by Connected Access are subject to change.)|Rear Vision Camera|Door locks width 2
driver side knee and head curtain side-impact for all rows in outboard seating positions (Always use safety belts and the correct child restraints. Children are safer when properly secured in a rear seat in the appropriate child restraint. See the Owner's Manual for more information.)|Rear Vision Camera|Door locks width 1
driver- and passenger-side door switch with delayed entry feature width 1
leather-wrapped|Brake width 1
to encourage safe driving behavior. It can limit certain vehicle features width 1
voltage and oil pressure|Driver Information Center width 1
65
'CarPlay']"***Chevrolet***6 seats***42921.0***Silverado 2500HD***1.0******56995.0***False***4549***4.814814814814815***285608***Platinum Auto Group***False******A***Automatic***t78815***LT Crew Cab 4WD******4WD***Four-Wheel Drive***153.7 in***80.5 in***2019
" year
1
4.2-inch diagonal color display includes driver personalization year
1
and it prevents certain safety systems from being turned off. An in-vehicle report gives you information on your teen's driving habits and helps you to continue to coach your new driver|Tire pressure monitoring system|Horn year
1
cargo lights year
1
parking year
1
rear child security|Rear seat reminder|Teen Driver configurable feature that lets you activate customizable vehicle settings associated with a key fob year
2
rear child security|Teen Driver mode a configurable feature that lets you activate customizable vehicle settings associated with a key fob year
1
66

上面的结果就是我们处理的标准。

下面看看mapper.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#! usr/bin/env python3
# 目标:插补na,并计算乘积 xtx与xty

import sys
import re
import numpy as np

# 先按“读取,把文件里字符型数据中的逗号替换成其他符号
def commakiller(abc):
i = 1
while(i<len(abc)):
abc[i] = abc[i].replace(",","***")
a = '"'
abc[i] = a + abc[i] + a
i = i + 2
b = ""
qline = b.join(abc)
return(qline)

num_list = {'1':[34.886,10.803],'8':[18.973,11.637],'10':[76.046,108.879],'11':[50669.812,27375.947],'14':[2797.2851,1481.0219],'21':[39.726,10.028],'22':[17.608,6.767],'25':[62.351,16.542],'26':[24.642,13.020],'27':[233.693,105.138],'35':[183.341,47.804],'43':[5.184,1.683],'44':[29640.293,73067.936],'64':[74.195,19.148],'51':[4.211,0.713],'63':[109.164,29.529],'48':[29926.954,19568.718]}
nafixed = []
times = 1
for line in sys.stdin:
abc = line.split('"')
items = commakiller(abc).split(",")
if(times == 1):
times += 1
names = items # 列名,后续遍历使用
continue
if(times >= 2):
times += 1
p = len(num_list.keys()) # 变量长度
## 1. na fix
for key, value in enumerate(items):
if str(key) in num_list.keys() :
if((value == '') | (value == "--")):
# 每行统计缺失值
mu = num_list[str(key)][0] #期望
sigma = num_list[str(key)][1] #标准差
nafixed.append(np.random.normal(mu, sigma, 1))
else:
nafixed.append(value)
else:
continue
# 对变量取值处理
if p <= 1: continue
if p > 1 :
# 全部数值变量转换为浮点型数据
flag = 0 # 每行从0开始算
for value in nafixed:
try:
nafixed[flag] = float(value)
flag += 1
except(ValueError):
try:
## 1.数值型变量带单位的,如下处理
if (("in" in value[-5:])&(re.findall('[a-z]in',value)==[])):
nafixed[flag] = float(re.sub('in','',value,1));flag += 1
elif (("seats" in value[-8:])&(re.findall('[a-z]seats',value)==[])):
nafixed[flag] = float(re.sub('seats','',value,1));flag += 1
elif (('gal' in value[-6:])&(re.findall('[a-z]gal',value)==[])):
nafixed[flag] = float(re.sub('gal','',value,1));flag += 1
else:
nafixed[flag] = np.random.normal(list(num_list.values())[flag][0],list(num_list.values())[flag][1],1)
flag += 1
except(ValueError):
nafixed[flag] = np.random.normal(list(num_list.values())[flag][0],list(num_list.values())[flag][1],1) # 未经或无法转换的值当na,插补处理
flag += 1
continue
## 3. computing XTX & XTY
xty = []
for i in range(p-1):
xty.append(nafixed[i] * nafixed[p-1]) # 默认第p个是因变量price (事先设定)
print("*",",".join(str(i) for i in xty))
xtx = np.outer(nafixed[0:(p-1)],nafixed[0:(p-1)]) # 外积
print(",".join(",".join(str(k) for k in qq) for qq in xtx.tolist()))

mapper算出各行的xtx与xty,标准输出时以开头有无“*”来判定。

下面看看reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#! usr/bin/env python3
import sys
import numpy as np

xtx = [];xty = [];temp = []
for line in sys.stdin:
data = line.split(',')
#print(data)
p = len(data)
data[p-1] = data[p-1][:-1]
if line[0] == "*": # 说明是标记的xty
data[0] = data[0][2:]
p1 = len(data)
data = list(map(float, data))
if len(xty) == 0:
xty = data
continue
else:
xty = xty + np.array(data)
# 在新的xtx出现之前
if len(xtx) == 0:
xtx = temp
else:
xtx = np.array(xtx) + np.array(temp) # bug
len2 = len(temp)
temp = [] # 循环结束初始化
else: # 其他都是xtx
data = list(map(float, data))
if len(temp) == 0:
temp = data
else:
temp = temp + data # 连接


xtx = np.asarray(xtx).reshape(int(p1),int(len2/p1))
print(xtx,'\n')
print(xty,'\n')
try:
print(np.dot(np.linalg.inv(xtx),np.array(xty)))
except(LinAlgError):
continue

由于某些原因,分布式没跑成(大家都去运行,系统拥堵了),只用了五万数据单机测试了一下。最终结果如下:

1
2
3
4
5
[devel@emr-header-1 1026]$ cat result1.txt
[-1.37866211e+01 -3.97949219e+00 4.95000000e+01 -1.43750000e+01
-1.50000000e+01 -1.48950195e+00 -8.51562500e-01 -3.92000000e+02
-6.60400391e-01 5.33750000e+01 1.35742188e-01 7.20000000e+03
-9.96875000e+00 -4.97070312e-01 9.94726562e+00 -8.00781250e-02]

最后附上main文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/bash

PWD=$(cd $(dirname $0); pwd)
cd $PWD 1> /dev/null 2>&1

hadoop fs -put used_cars_5w.csv /user/devel/2020210995wangyuanhe/reg

TASKNAME=lm-usedcars-hehe
HADOOP_INPUT_DIR=/user/devel/2020210995wangyuanhe/reg/used_cars_5w.csv
HADOOP_OUTPUT_DIR=/user/devel/2020210995wangyuanhe/output/1026output

echo $HADOOP_HOME
echo $HADOOP_INPUT_DIR
echo $HADOOP_OUTPUT_DIR

hadoop fs -rm -r $HADOOP_OUTPUT_DIR

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.1.3.jar \
-D mapred.job.name=$TASKNAME \
-D mapred.job.priority=NORMAL \
-D mapred.reduce.tasks=1 \
-file "$PWD/mapper.py" "$PWD/reducer.py" \
-input ${HADOOP_INPUT_DIR} \
-output ${HADOOP_OUTPUT_DIR} \
-mapper "$PWD/mapper.py" \
-reducer "$PWD/reducer.py"


if [ $? -ne 0 ]; then
echo 'error'
exit 1
fi
hadoop fs -touchz ${HADOOP_OUTPUT_DIR}/done

hadoop fs -ls $HADOOP_OUTPUT_DIR | cat

exit 0

至此,分布式的运用基本结束了。

四、总结

先放一个系统卡住的样子

同学的代码写的很糟,放上去跑不动,系统没资源……理由有很多,总之就是卡了。

上面代码漏说了一个技巧:

Tail -n 99 used_cars_data.csv
就可以只取一部分行

我们可以把大文件一条一条读,与stdin一样,这样就能对数据按行操作。

Slice 指每次读入多大的数据,如1024k,2000行。你想使用集成化工具时,可以这样做,当while循环跑到最后一行时停止。

但这样太慢了。因为每次都在找,不能存入内存

不过,SAS软件允许在很有限的内存中处理大量数据,每次只操作一行(有钱任性)

分布式系统上,最好是stdin的形式,标准输入输出。

处理中新的问题:
我现在有个很长的数据,其中有一列我知道是哑变量。

需要统计:有多少个哑变量,有多少种type,占比如何?如何自动识别呢(频数统计)

设置一个“其他”类,如果我要保证设定的“其他”类的占比大于20%,

大家的问题:

1.面向python的编程,而非面向MapReduce的编程

hadoop可以做到streaming,成为数据流。所有操作都应该在第一个循环下操作!这样才能完成对所有数据的处理,如果不能再这个缩进下操作,则代码不能面对分布式。

2.介绍了python中的log模块,记录了一些信息。不要随便把过程打印出来。

老师的程序:在大数据集上找到全部哑变量,并且把哑变量的top取出来

可以用这个

MapReduce如何在分布式系统上呈现的

最常听的:键值对。以标准输入输出来理解。key-value,将任何的行拆分成这两部分。必须尊重这两部分的对应关系。一般情况下,键值的对应有一个标准形式。

Map(key,value) ——> list(key2,value2)

拿到了学号的姓名,现在要数一下名字有几画。拿到的是(学号-姓名),输出是(学号-笔画)。

如何体现键值对的影响呢?比如有个数据,记录了某个地区的温度,以及记录温度的设备。位置信息就作为了键,对应的温度就是值。(csv数据是碰巧有换行符作为间隔值)

如果我们把关心的数据拿出来(举个栗子:)

1950,0
1950,22
1950,-11

这不是键值,这是一行中两个值,并非是键与值。但经过计算之后,就能得到新的键值对!

map过的key可能变了,不再是原来的key。csv其实是打印换行符对应的一行数据,平时的cat也是一行一换。如果数据是不换行的键值对,那么就需要自己识别key,写自己的map函数。

如果数据很大,那我们不能放进内存。

比如放入:swap交换分区,缓存,页面文件……都在硬盘上。map出来的结果,需要做一定的排序(主要是打乱),打乱之后,数据均匀,负载平衡。

最后,代码的路径应该是:

Input —-> Map —-> shuffle —-> reduce > output

操作都以行为单位,都是以标准的键值对形式实施!map与reduce之间可以(且必要)加入排序,这个过程需要硬盘,或者需要很大内存的机器,读写频繁。

MapReduce可以拆分开来,只有map,没有reduce。

数据清洗时,这个很重要。只需要拆分,不需要合并。化整为零,所有资源就能一起打工(bushi)!

也可以很多mapper很多reduce,拆分成很多份,但每一份都一定有键值对应关系。也可以很多mapper,但只使用一个reduce,此时reduce任务不重,可以在这里合并。

(未完待续)


本文链接: https://konelane.github.io/2020/10/21/201015hadoop/

-- EOF --

¥^¥请氦核牛饮一盒奶~suki