1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
| #! /usr/bin/env python3 ''' 500000*19 去掉缺失值 转化成delay 判断是否延误 √ dlsa-project 参考 常用变量继承,增加dummy列(航空公司) 通过 500000*180 ''' ###################################################开启 import findspark findspark.init('/usr/lib/spark-current') from pyspark.sql import SparkSession ##session封装了conf spark = SparkSession.builder.appName("chenxi session").getOrCreate() ###################################################读入数据 #处理schema from pyspark.sql.types import * schema_sdf = StructType([ StructField('Year', IntegerType(), True), StructField('Month', IntegerType(), True), StructField('DayofMonth', IntegerType(), True), StructField('DayOfWeek', IntegerType(), True), StructField('DepTime', DoubleType(), True), StructField('CRSDepTime', DoubleType(), True), StructField('ArrTime', DoubleType(), True), StructField('CRSArrTime', DoubleType(), True), StructField('UniqueCarrier', StringType(), True), StructField('FlightNum', StringType(), True), StructField('TailNum', StringType(), True), StructField('ActualElapsedTime', DoubleType(), True), StructField('CRSElapsedTime', DoubleType(), True), StructField('AirTime', DoubleType(), True), StructField('ArrDelay', DoubleType(), True), StructField('DepDelay', DoubleType(), True), StructField('Origin', StringType(), True), StructField('Dest', StringType(), True), StructField('Distance', DoubleType(), True), StructField('TaxiIn', DoubleType(), True), StructField('TaxiOut', DoubleType(), True), StructField('Cancelled', IntegerType(), True), StructField('CancellationCode', StringType(), True), StructField('Diverted', IntegerType(), True), StructField('CarrierDelay', DoubleType(), True), StructField('WeatherDelay', DoubleType(), True), StructField('NASDelay', DoubleType(), True), StructField('SecurityDelay', DoubleType(), True), StructField('LateAircraftDelay', DoubleType(), True) ])
air00 = spark.read.options(header='true').schema(schema_sdf).csv("/data/airdelay_small.csv") #这是spark dataframe,不是pd的那个 use_columns=[ 'ArrDelay', #double 'Year', #int 'Month', #int 'DayofMonth', #int 'DayOfWeek', #int 'DepTime', #double 'CRSDepTime', #double 'CRSArrTime', #double 'UniqueCarrier', #str 'ActualElapsedTime', #double', 'Origin',#str 'Dest', #str 'Distance' #double ] air=air00.select(use_columns).na.drop() #####################################################处理因变量######################## def delay(x): if x>0: return 1 else : return 0
#参考 https://blog.csdn.net/wulishinian/article/details/105817409 spark中生成新列的各种方法,不能直接定义了 import pyspark.sql.functions as F yfunc = F.udf(delay, StringType())#类似apply的使用,对该列每个数做个操作 air = air.withColumn("delay_or_not", yfunc("ArrDelay"))
#####################################################处理自变量######################### #注意,pyspark好像识别不了空行和换行(在一行一行跑的时候) ####################先把一些列转成others #使用老师给的代码统计哪些类别归入others import pickle import pandas as pd import numpy as np import os from collections import Counter
def dummy_factors_counts(pdf, dummy_columns): '''Function to count unique dummy factors for given dummy columns pdf: pandas data frame dummy_columns: list. Numeric or strings are both accepted. return: dict same as dummy columns ''' # Check if current argument is numeric or string pdf_columns = pdf.columns # Fetch data frame header dummy_columns_isint = all(isinstance(item, int) for item in dummy_columns) #isinstance() 判断item是否是int #all()用于判断给定的可迭代参数 iterable 中的所有元素是否都为 TRUE,如果是返回 True,否则返回 False if dummy_columns_isint: dummy_columns_names = [pdf_columns[i] for i in dummy_columns] else: dummy_columns_names = dummy_columns factor_counts = {} for i in dummy_columns_names: factor_counts[i] = (pdf[i]).value_counts().to_dict() #统计每一列里的不同值的个数 return factor_counts
###合并两个字典,并计算同一key的和(两个字典都有子字典) def cumsum_dicts(dict1, dict2): '''Merge two dictionaries and accumulate the sum for the same key where each dictionary containing sub-dictionaries with elements and counts. ''' # If only one dict is supplied, do nothing. if len(dict1) == 0: dict_new = dict2 elif len(dict2) == 0: dict_new = dict1 else: dict_new = {} for i in dict1.keys(): dict_new[i] = dict(Counter(dict1[i]) + Counter(dict2[i])) return dict_new #counter是python计数器类,返回元素取值的字典,且按频数降序
def select_dummy_factors(dummy_dict, keep_top, replace_with, pickle_file): '''Merge dummy key with frequency in the given file dummy_dict: dummy information in a dictionary format keep_top: list ''' dummy_columns_name = list(dummy_dict)#本身词典里就是取值 # nobs = sum(dummy_dict[dummy_columns_name[1]].values())#没用到 factor_set = {} # The full dummy sets——————注意,是空字典,不是集合 factor_selected = {} # Used dummy sets factor_dropped = {} # Dropped dummy sets factor_selected_names = {} # Final revised factors for i in range(len(dummy_columns_name)): column_i = dummy_columns_name[i] #给出列来 factor_set[column_i] = list((dummy_dict[column_i]).keys())#第i列的可能取值表 factor_counts = list((dummy_dict[column_i]).values())#第i列的值的个数 factor_cumsum = np.cumsum(factor_counts)#累加 factor_cumpercent = factor_cumsum / factor_cumsum[-1]#累积比率 factor_selected_index = np.where(factor_cumpercent <= keep_top[i])#top这个是给定的 factor_dropped_index = np.where(factor_cumpercent > keep_top[i]) factor_selected[column_i] = list( np.array(factor_set[column_i])[factor_selected_index])#一列有一堆可用取值 factor_dropped[column_i] = list( np.array(factor_set[column_i])[factor_dropped_index]) # Replace dropped dummies with indicators like `others` if len(factor_dropped_index[0]) == 0: factor_new = [] else: factor_new = [replace_with] factor_new.extend(factor_selected[column_i])#extend列表末尾一次性追加另一个序列中的多个值 factor_selected_names[column_i] = [column_i + '_' + str(x) for x in factor_new] dummy_info = { 'factor_set': factor_set, 'factor_selected': factor_selected, 'factor_dropped': factor_dropped, 'factor_selected_names': factor_selected_names} pickle.dump(dummy_info, open(os.path.expanduser(pickle_file), 'wb')) print("dummy_info saved in:\t" + pickle_file) return dummy_info #返回了一个包含处理信息的字典
''' pickle提供了一个简单的持久化功能。可以将对象以文件的形式存放在磁盘上 pickle.dump(obj, file[, protocol]) 序列化对象,并将结果数据流写入到文件对象中。参数protocol是序列化模式,默认值为0,表示以文本的形式序列化。protocol的值还可以是1或2,表示以二进制的形式序列化。 pickle.load(file) 反序列化对象。将文件中的数据解析为一个Python对象。
其中要注意的是,在load(file)的时候,要让python能够找到类的定义,否则会报错: '''
def select_dummy_factors_from_file(file, header, dummy_columns, keep_top, replace_with, pickle_file): '''Memory constrained algorithm to select dummy factors from a large file 对大文件使用内存约束算法选择dummy,一个真正的分布式的算法 要输入文件路径、表头,要变成哑变量的列,保留的比例, ''' dummy_dict = {} buffer_num = 0 with open(file) as f: while True: buffer = f.readlines( 1024000) # Returns *at most* 1024000 bytes, maybe less if len(buffer) == 0: break else: buffer_list = [x.strip().split(",") for x in buffer] buffer_num += 1 if ((buffer_num == 1) and (header is True)): buffer_header = buffer_list[0] buffer_starts = 1 else: buffer_starts = 0 buffer_pdf = pd.DataFrame(buffer_list[buffer_starts:]) if header is True: buffer_pdf.columns = buffer_header dummy_dict_new = dummy_factors_counts(buffer_pdf, dummy_columns) dummy_dict = cumsum_dicts(dummy_dict, dummy_dict_new) dummy_info = select_dummy_factors(dummy_dict, keep_top, replace_with, pickle_file) return (dummy_info)
''' #####看一下情况确定要不要+others列 air.groupby('Month').count().show() #没有比例差异 air.groupby('DayofMonth').count().show(31) #没有比例差异 air.groupby('DayofWeek').count().show()#没有比例差异 air.groupby('Year').count().collect()#没有比例差异 air.groupby('UniqueCarrier').count().collect()# air.groupby('UniqueCarrier').count().orderBy('count').show(50) m=air.groupby('Origin').count() #air.groupby('UniqueCarrier').count().rdd.foreach(print) 为什么打印不出来? m.orderBy(-m('count')).collect() m.sort(desc('count')).collect() .collect() '''
if __name__ == "__main__":
# User settings file = os.path.expanduser("/home/devel/data/airdelay_small.csv") header = True dummy_columns = ['UniqueCarrier', 'Origin', 'Dest'] keep_top = [0.8, 0.8, 0.8] replace_with = '00_OTHERS' pickle_file = os.path.expanduser("/home/devel/students/2020211004chenxi/1112work/airdelay_dummy_info_latest.pkl") dummy_info = select_dummy_factors_from_file(file, header, dummy_columns,keep_top, replace_with,pickle_file)
#得到应该记为others的列名 drop_uc=dummy_info['factor_dropped']['UniqueCarrier'] drop_o=dummy_info['factor_dropped']['Origin'] drop_d=dummy_info['factor_dropped']['Dest'] sle_uc=dummy_info['factor_selected']['UniqueCarrier'] sle_o=dummy_info['factor_selected']['Origin'] sle_d=dummy_info['factor_selected']['Dest'] ########################使用字典把很小的类别更改成others #生成字典 drop_all=drop_uc+drop_o+drop_d sle_all=sle_uc+sle_o+sle_d v=["others"]*len(drop_all) dic=dict(zip(sle_all+drop_all,sle_all+v)) air11=air.na.replace(dic,1,'UniqueCarrier') air11=air11.na.replace(dic,1,'Origin') air11=air11.na.replace(dic,1,'Dest') print('替换others后的数据\n') air11.show(10)#更改后的结果 #报错好像是内存太小? ###########################################################独热编码################ '''sample from pyspark.ml.feature import OneHotEncoder,StringIndexer indexer = StringIndexer(inputCol='Month', outputCol='MonthIndex') model = indexer.fit(air) indexed = model.transform(air) onehotencoder = OneHotEncoder(inputCol='MonthIndex', outputCol='MonthVec') oncoded = onehotencoder.transform(indexed) oncoded.show(5) ''' #https://github.com/spark-in-action/first-edition/blob/master/ch08/python/ch08-listings.py #先生成index def indexStringColumns(df, cols): from pyspark.ml.feature import StringIndexer #variable newdf will be updated several times newdf = df for c in cols: si = StringIndexer(inputCol=c, outputCol=c+"-num") sm = si.fit(newdf) newdf = sm.transform(newdf).drop(c) newdf = newdf.withColumnRenamed(c+"-num", c) return newdf
#根据index进行独热编码 def oneHotEncodeColumns(df, cols): from pyspark.ml.feature import OneHotEncoder newdf = df for c in cols: onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False) newdf = onehotenc.transform(newdf).drop(c) newdf = newdf.withColumnRenamed(c+"-onehot", c) return newdf cols=['Year','Month','DayofMonth','DayOfWeek','UniqueCarrier','Origin','Dest'] dff=indexStringColumns(air11,['Year','Month','DayofMonth','DayOfWeek','UniqueCarrier','Origin','Dest']) dfhot = oneHotEncodeColumns(dff, cols) print('编码后形式\n') dfhot.take(2) ''' py4j.protocol.Py4JJavaError: An error occurred while calling o1290.transform. : java.lang.IllegalArgumentException: Field "DayofWeek" does not exist Available fields: ArrDelay, DepTime, CRSDepTime, CRSArrTime, ActualElapsedTime, Distance, DayOfWeek, UniqueCarrier, Origin, Dest, Year, Month, DayofMonth ''' #转换成能使用的形式 from pyspark.ml.feature import VectorAssembler#把所有字符型向量转换成数值型的后,可以合并,能直接在MLlib里用 va = VectorAssembler(outputCol="features", inputCols=dfhot.columns[0:])#取除最后一列外的所有值 lpoints = va.transform(dfhot).select("features") print('最终结果\n') lpoints.take(2)
''' 问题1:独热编码没有办法设定基准组,默认count中的最后一个是基准组,并不是数据最少的那个————如果替换了others,问题应该不是很大 问题2:要求是矩阵形式怎么办?——可以尝试vector分列,有合适的写法,但是没能实现 ###尝试把一个vector分列,但是报错没有numpy? #vectors = lpoints.select("features").rdd.map(lambda row: row.features)#PipelinedRDD #疑问:退出后就没有以前的操作了,怎么办? 有没有类似screen或者保存工作空间的操作? '''
|