分布式1119-Spark做些实战-以及为之后的实战铺路

目录

讲解作业,炒鸡复杂(其实也罢了)的air-delay数据清洗。

附带可以认识spark的强大之处。

一、作业集锦

上一次作业里提出,我们从kaggle上download了一份巨大的数据,一共有五百万行,但只有19列。老师希望大家能处理好这个数据,清洗到能够建模的地步。

我记录了一些汇报亮点,但是大部分都消散在那节课中了。

1.数据展示

1
2
air.groupby('Month').count().collect() ## collect可以看到所有列
# count默认不排序

累计求和百分比,计算占比较大的类,作为重要的变量
结合sql语句
spark命令结合sql

2.哑变量处理

陈曦同学:对老师的代码的理解:
students/2020211004chenxi/1112work

3.引入sql

周童给出了引入sql的写法:

1
2
3
4
sql_accumulated = f"""{参数}
trueselect *
truetruefrom ( select {col_name}
"""

4.分类变量的问题

注意,3分类只要两个变量,否则有共线性。有k个变量都有可能哑变量,总体应该drop掉2的k次方-k个。全都是0-1的话,就如此。计算机里叫onehot,统计就叫哑变量。

特别地,onehot存储的形式就会改变:[40,38]一共40个数,第38个位为1

5.小心使用toPandas

有一种储存方式是选择将sdf转化成pandas,toPandas对于count都是单机的操作。如果数据量不大,可以这样,因为这样会把数据存上master。

6.某种转化因子变量的方法(我的期末作业里是另一种方式)

用一个if else,把所有factor变成0-1,不过这样生成的矩阵就不是稀疏矩阵(spark里可以)

某同学构建了:是否延误-各种定性变量的不同取值情况列联表。

列联表,这看着像统计人干的。

——李丰老师

7.自己写个新函数get_sdummies

pandas里有个getdummy函数,于是老师写了一个sdummies,即get_sdummies。

输入spark的df,只能具体的哪一个dummycol做修改,保持累计比例,自动删除那一列,最后有一个dummy_info=[]

如何在spark上自己生成?

1,清理

2,多少行,对所有dummy列循环

如果info空,则创建一个新的,放入所有变量

3,spark里的数据框根据对应的dummycol做一个计数和排序。

对于所有count从上往下求和,分母是所有的行

1
Window.partitionby.orderby.rowsbetween(-sys.maxsize,0)

就能获取前%多少的dummy变量

cumperc是累计求和除以总行数。累计百分比只保留(filter)小于我的top值

于是就能找到topdummy,且不用算到结束,算到出结果就停止

二、成果展示

下面是李丰老师与cx同学代码的解析,太强了,点个赞!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#! /usr/bin/env python3
'''
500000*19 去掉缺失值
转化成delay 判断是否延误 √
dlsa-project 参考
常用变量继承,增加dummy列(航空公司)
通过
500000*180
'''
###################################################开启
import findspark
findspark.init('/usr/lib/spark-current')
from pyspark.sql import SparkSession
##session封装了conf
spark = SparkSession.builder.appName("chenxi session").getOrCreate()
###################################################读入数据
#处理schema
from pyspark.sql.types import *
schema_sdf = StructType([
StructField('Year', IntegerType(), True),
StructField('Month', IntegerType(), True),
StructField('DayofMonth', IntegerType(), True),
StructField('DayOfWeek', IntegerType(), True),
StructField('DepTime', DoubleType(), True),
StructField('CRSDepTime', DoubleType(), True),
StructField('ArrTime', DoubleType(), True),
StructField('CRSArrTime', DoubleType(), True),
StructField('UniqueCarrier', StringType(), True),
StructField('FlightNum', StringType(), True),
StructField('TailNum', StringType(), True),
StructField('ActualElapsedTime', DoubleType(), True),
StructField('CRSElapsedTime', DoubleType(), True),
StructField('AirTime', DoubleType(), True),
StructField('ArrDelay', DoubleType(), True),
StructField('DepDelay', DoubleType(), True),
StructField('Origin', StringType(), True),
StructField('Dest', StringType(), True),
StructField('Distance', DoubleType(), True),
StructField('TaxiIn', DoubleType(), True),
StructField('TaxiOut', DoubleType(), True),
StructField('Cancelled', IntegerType(), True),
StructField('CancellationCode', StringType(), True),
StructField('Diverted', IntegerType(), True),
StructField('CarrierDelay', DoubleType(), True),
StructField('WeatherDelay', DoubleType(), True),
StructField('NASDelay', DoubleType(), True),
StructField('SecurityDelay', DoubleType(), True),
StructField('LateAircraftDelay', DoubleType(), True)
])

air00 = spark.read.options(header='true').schema(schema_sdf).csv("/data/airdelay_small.csv") #这是spark dataframe,不是pd的那个
use_columns=[
'ArrDelay', #double
'Year', #int
'Month', #int
'DayofMonth', #int
'DayOfWeek', #int
'DepTime', #double
'CRSDepTime', #double
'CRSArrTime', #double
'UniqueCarrier', #str
'ActualElapsedTime', #double',
'Origin',#str
'Dest', #str
'Distance' #double
]
air=air00.select(use_columns).na.drop()
#####################################################处理因变量########################
def delay(x):
if x>0:
return 1
else :
return 0

#参考 https://blog.csdn.net/wulishinian/article/details/105817409 spark中生成新列的各种方法,不能直接定义了
import pyspark.sql.functions as F
yfunc = F.udf(delay, StringType())#类似apply的使用,对该列每个数做个操作
air = air.withColumn("delay_or_not", yfunc("ArrDelay"))

#####################################################处理自变量#########################
#注意,pyspark好像识别不了空行和换行(在一行一行跑的时候)
####################先把一些列转成others
#使用老师给的代码统计哪些类别归入others
import pickle
import pandas as pd
import numpy as np
import os
from collections import Counter

def dummy_factors_counts(pdf, dummy_columns):
'''Function to count unique dummy factors for given dummy columns
pdf: pandas data frame
dummy_columns: list. Numeric or strings are both accepted.
return: dict same as dummy columns
'''
# Check if current argument is numeric or string
pdf_columns = pdf.columns # Fetch data frame header
dummy_columns_isint = all(isinstance(item, int) for item in dummy_columns)
#isinstance() 判断item是否是int
#all()用于判断给定的可迭代参数 iterable 中的所有元素是否都为 TRUE,如果是返回 True,否则返回 False
if dummy_columns_isint:
dummy_columns_names = [pdf_columns[i] for i in dummy_columns]
else:
dummy_columns_names = dummy_columns
factor_counts = {}
for i in dummy_columns_names:
factor_counts[i] = (pdf[i]).value_counts().to_dict()
#统计每一列里的不同值的个数
return factor_counts

###合并两个字典,并计算同一key的和(两个字典都有子字典)
def cumsum_dicts(dict1, dict2):
'''Merge two dictionaries and accumulate the sum for the same key where each dictionary
containing sub-dictionaries with elements and counts.
'''
# If only one dict is supplied, do nothing.
if len(dict1) == 0:
dict_new = dict2
elif len(dict2) == 0:
dict_new = dict1
else:
dict_new = {}
for i in dict1.keys():
dict_new[i] = dict(Counter(dict1[i]) + Counter(dict2[i]))
return dict_new
#counter是python计数器类,返回元素取值的字典,且按频数降序

def select_dummy_factors(dummy_dict, keep_top, replace_with, pickle_file):
'''Merge dummy key with frequency in the given file
dummy_dict: dummy information in a dictionary format
keep_top: list
'''
dummy_columns_name = list(dummy_dict)#本身词典里就是取值
# nobs = sum(dummy_dict[dummy_columns_name[1]].values())#没用到
factor_set = {} # The full dummy sets——————注意,是空字典,不是集合
factor_selected = {} # Used dummy sets
factor_dropped = {} # Dropped dummy sets
factor_selected_names = {} # Final revised factors
for i in range(len(dummy_columns_name)):
column_i = dummy_columns_name[i] #给出列来
factor_set[column_i] = list((dummy_dict[column_i]).keys())#第i列的可能取值表
factor_counts = list((dummy_dict[column_i]).values())#第i列的值的个数
factor_cumsum = np.cumsum(factor_counts)#累加
factor_cumpercent = factor_cumsum / factor_cumsum[-1]#累积比率
factor_selected_index = np.where(factor_cumpercent <= keep_top[i])#top这个是给定的
factor_dropped_index = np.where(factor_cumpercent > keep_top[i])
factor_selected[column_i] = list(
np.array(factor_set[column_i])[factor_selected_index])#一列有一堆可用取值
factor_dropped[column_i] = list(
np.array(factor_set[column_i])[factor_dropped_index])
# Replace dropped dummies with indicators like `others`
if len(factor_dropped_index[0]) == 0:
factor_new = []
else:
factor_new = [replace_with]
factor_new.extend(factor_selected[column_i])#extend列表末尾一次性追加另一个序列中的多个值
factor_selected_names[column_i] = [column_i + '_' + str(x) for x in factor_new]
dummy_info = {
'factor_set': factor_set,
'factor_selected': factor_selected,
'factor_dropped': factor_dropped,
'factor_selected_names': factor_selected_names}
pickle.dump(dummy_info, open(os.path.expanduser(pickle_file), 'wb'))
print("dummy_info saved in:\t" + pickle_file)
return dummy_info #返回了一个包含处理信息的字典

'''
pickle提供了一个简单的持久化功能。可以将对象以文件的形式存放在磁盘上
pickle.dump(obj, file[, protocol])
  序列化对象,并将结果数据流写入到文件对象中。参数protocol是序列化模式,默认值为0,表示以文本的形式序列化。protocol的值还可以是1或2,表示以二进制的形式序列化。
  pickle.load(file)
  反序列化对象。将文件中的数据解析为一个Python对象。

其中要注意的是,在load(file)的时候,要让python能够找到类的定义,否则会报错:
'''


def select_dummy_factors_from_file(file, header, dummy_columns, keep_top,
replace_with, pickle_file):
'''Memory constrained algorithm to select dummy factors from a large file
对大文件使用内存约束算法选择dummy,一个真正的分布式的算法
要输入文件路径、表头,要变成哑变量的列,保留的比例,
'''
dummy_dict = {}
buffer_num = 0
with open(file) as f:
while True:
buffer = f.readlines(
1024000) # Returns *at most* 1024000 bytes, maybe less
if len(buffer) == 0:
break
else:
buffer_list = [x.strip().split(",") for x in buffer]
buffer_num += 1
if ((buffer_num == 1) and (header is True)):
buffer_header = buffer_list[0]
buffer_starts = 1
else:
buffer_starts = 0
buffer_pdf = pd.DataFrame(buffer_list[buffer_starts:])
if header is True:
buffer_pdf.columns = buffer_header
dummy_dict_new = dummy_factors_counts(buffer_pdf,
dummy_columns)
dummy_dict = cumsum_dicts(dummy_dict, dummy_dict_new)
dummy_info = select_dummy_factors(dummy_dict, keep_top, replace_with,
pickle_file)
return (dummy_info)

'''
#####看一下情况确定要不要+others列
air.groupby('Month').count().show() #没有比例差异
air.groupby('DayofMonth').count().show(31) #没有比例差异
air.groupby('DayofWeek').count().show()#没有比例差异
air.groupby('Year').count().collect()#没有比例差异
air.groupby('UniqueCarrier').count().collect()#
air.groupby('UniqueCarrier').count().orderBy('count').show(50)
m=air.groupby('Origin').count()
#air.groupby('UniqueCarrier').count().rdd.foreach(print) 为什么打印不出来?
m.orderBy(-m('count')).collect()
m.sort(desc('count')).collect()
.collect()
'''

if __name__ == "__main__":

# User settings
file = os.path.expanduser("/home/devel/data/airdelay_small.csv")
header = True
dummy_columns = ['UniqueCarrier', 'Origin', 'Dest']
keep_top = [0.8, 0.8, 0.8]
replace_with = '00_OTHERS'
pickle_file = os.path.expanduser("/home/devel/students/2020211004chenxi/1112work/airdelay_dummy_info_latest.pkl")
dummy_info = select_dummy_factors_from_file(file, header, dummy_columns,keep_top, replace_with,pickle_file)

#得到应该记为others的列名
drop_uc=dummy_info['factor_dropped']['UniqueCarrier']
drop_o=dummy_info['factor_dropped']['Origin']
drop_d=dummy_info['factor_dropped']['Dest']
sle_uc=dummy_info['factor_selected']['UniqueCarrier']
sle_o=dummy_info['factor_selected']['Origin']
sle_d=dummy_info['factor_selected']['Dest']
########################使用字典把很小的类别更改成others
#生成字典
drop_all=drop_uc+drop_o+drop_d
sle_all=sle_uc+sle_o+sle_d
v=["others"]*len(drop_all)
dic=dict(zip(sle_all+drop_all,sle_all+v))
air11=air.na.replace(dic,1,'UniqueCarrier')
air11=air11.na.replace(dic,1,'Origin')
air11=air11.na.replace(dic,1,'Dest')
print('替换others后的数据\n')
air11.show(10)#更改后的结果
#报错好像是内存太小?
###########################################################独热编码################
'''sample
from pyspark.ml.feature import OneHotEncoder,StringIndexer
indexer = StringIndexer(inputCol='Month', outputCol='MonthIndex')
model = indexer.fit(air)
indexed = model.transform(air)
onehotencoder = OneHotEncoder(inputCol='MonthIndex', outputCol='MonthVec')
oncoded = onehotencoder.transform(indexed)
oncoded.show(5)
'''
#https://github.com/spark-in-action/first-edition/blob/master/ch08/python/ch08-listings.py
#先生成index
def indexStringColumns(df, cols):
from pyspark.ml.feature import StringIndexer
#variable newdf will be updated several times
newdf = df
for c in cols:
si = StringIndexer(inputCol=c, outputCol=c+"-num")
sm = si.fit(newdf)
newdf = sm.transform(newdf).drop(c)
newdf = newdf.withColumnRenamed(c+"-num", c)
return newdf

#根据index进行独热编码
def oneHotEncodeColumns(df, cols):
from pyspark.ml.feature import OneHotEncoder
newdf = df
for c in cols:
onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
newdf = onehotenc.transform(newdf).drop(c)
newdf = newdf.withColumnRenamed(c+"-onehot", c)
return newdf
cols=['Year','Month','DayofMonth','DayOfWeek','UniqueCarrier','Origin','Dest']
dff=indexStringColumns(air11,['Year','Month','DayofMonth','DayOfWeek','UniqueCarrier','Origin','Dest'])
dfhot = oneHotEncodeColumns(dff, cols)
print('编码后形式\n')
dfhot.take(2)
'''
py4j.protocol.Py4JJavaError: An error occurred while calling o1290.transform.
: java.lang.IllegalArgumentException: Field "DayofWeek" does not exist
Available fields: ArrDelay, DepTime, CRSDepTime, CRSArrTime, ActualElapsedTime, Distance, DayOfWeek, UniqueCarrier, Origin, Dest, Year, Month, DayofMonth
'''
#转换成能使用的形式
from pyspark.ml.feature import VectorAssembler#把所有字符型向量转换成数值型的后,可以合并,能直接在MLlib里用
va = VectorAssembler(outputCol="features", inputCols=dfhot.columns[0:])#取除最后一列外的所有值
lpoints = va.transform(dfhot).select("features")
print('最终结果\n')
lpoints.take(2)

'''
问题1:独热编码没有办法设定基准组,默认count中的最后一个是基准组,并不是数据最少的那个————如果替换了others,问题应该不是很大
问题2:要求是矩阵形式怎么办?——可以尝试vector分列,有合适的写法,但是没能实现
###尝试把一个vector分列,但是报错没有numpy?
#vectors = lpoints.select("features").rdd.map(lambda row: row.features)#PipelinedRDD
#疑问:退出后就没有以前的操作了,怎么办? 有没有类似screen或者保存工作空间的操作?
'''

三、稍微讲了点新课

1.需求

机器学习,可分成一些步骤:

Featurization-特征选取。

40个观测,并非特征越多,模型越好(要选,steplm之类)

下来变换清理数据。

0-1,降维……等等问题都涌现出来。

这些都叫“特征工程”,这个变量矩阵本身就是分布式的(spark)。

最后使用模型建模,得到想要的信息等等。

2.Pipelines(管道)

spark通过管道把不同的流程结合起来。

pipline来自于python机器学习模块中scikit-learn。

persistence(工具性)模型存储,加载。

utilities:线性代数,统计学等等。

旧版本的mllib中,基于rdd形式。现在逐渐转化成df(好处是能和sql结合)。这是由于sql很难被直接用在rdd形式上。

不过,根据上面所说的,其实可以将df转化(Transformer、Estimator)

Pipeline 提供了一个能够完整工作流的链(Parameter)

比如有个文本数据:

pipeline:

————1.Tokenizer————2.hashingTF———3.logistic regression

pipeline的流:

0.5Rawtext————1.5words————2.5feature vectors

(数字表示时间顺序)

课件以逻辑回归为例,regparam是惩罚。fit结果,不用规定x和y,因为默认的需要标记y为label,x标记为features。(机器学习的默认规则,那些函数的默认参数都是features,头大)

(未完待续,下节课讲文本处理)


本文链接: https://konelane.github.io/2020/11/19/201119hadoop/

-- EOF --

¥^¥请氦核牛饮一盒奶~suki