分布式1112-Spark简单功能补充介绍

目录

对spark进行一些补充介绍。

两个函数可以选定

Cache()

Persist()

主动将数据放到硬盘上-内存中

Data.persist()

旧数据放在内存里,新数据放硬盘,spark帮助中有persist 的水平(默认是全放进内存的cache,假设内存很大)

lineLengths.unpersist()

persist的参数水平

Memory_only

Memory_AND_DISK

Memory_AND_DISK_SER

DISK_ONLY

MEMORY_ONLY_2

内存不够则会报错

二、Spark的一些特别之处

1. 广播

分布式最重要的是“数据共享”使得不同节点之间能够用一个数据。

比如正态分布的概率密度函数,π就如此,共享,但不修改。

broadcastVar = sc.broadcast([1, 2, 3])

broadcastVar

Sparkcontext.broadcast(v)

广播出去的变量不能修改,否则会乱。

Broadcast.value可以查看广播出去的变量。

spark中accumulator可以用于累积,在MapReduce中:

accum = sc.accumulator(0)

sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) # foreach有点像R的

accum.value

2. 懒人模式

spark的懒人模式:

节约计算资源

x=3 , y = 4 , z = 5

提交任务

1.2x = ?

2.4y = ?

3.2x +4y = ?

或许前两步根本不用算,于是节约了资源。

spark使用DAG有向无环图,控制最后的结果本质上要求哪些计算。实现懒人模式。

分布式就是管理人和物的一种抽象。

—— 李丰老师

3. 线性代数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]),
np.array([0, 2]),
np.array([0, 2])), shape=(3, 1))

spark的线性代数模块很强大: pyspark.mllib.linalg

spark专门提供的标签工具

做分类模型时就可以使用特有变量了

1
2
3
4
5
6
7
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

允许导入各式各样的稀疏数据。有了local就有distributed。

如果要做个逻辑回归、线性回归,能否模拟一个线性回归的数据,将其存入矩阵。

小结

spark集成了很多hive的优秀理念。

对于常见的数据框的操作,归类成不同类型的函数。

依赖于sparkSQL,有别于传统的RDD形式,因为在RDD上可以更底层地操作数据(矩阵向量……)

sparkSQL与hive结合,可以把hive的sql查询直接应用在数据框上,也允许用户自己的函数。支持读取hdfs上的数据,是个通用的多接口的形式。

sparkRDD形式数据灵活,操作很琐碎。于是spark提供了自己的dataset集合。其实就是分布式数据的综合,通过java的jvm集成的(java虚拟机,用于快速计算的技术)

dataset的api只支持scala和Java。

故如果想在spark上处理数据集,需要自己学习Scala语言(最后一节有讲,敬请期待)。

spark上的dataframe是分布式的,其实就是表,不同列之间可以是不同的数据类型。

可以对dataframe做清洗和操作。可以通过hive的表来构建,可以通过现成的表来构建。

各种语言都支持。

三、战斗案例

目标,处理分布式的DataFrame,首先启动SC。

1
2
3
4
5
6
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark").getOrCreate()
spark # test if Spark session is created or not

sc = spark.sparkContext # make a spakr context for RDD
sc

spark有read函数

1
2
sdf = spark.read.csv("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
sdf.show() # Displays the content of the DataFrame to stdout

这个就是分布式上的表。

json格式可以直接读取(spark.read.json)

schema

属于读取表格时的表头信息。名字,类型,缺失等等。

经常需要手写表头,因为自动容易出错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# We specify the correct schema by hand
schema_sdf = StructType([
StructField('Year', IntegerType(), True),
StructField('Month', IntegerType(), True),
StructField('DayofMonth', IntegerType(), True),
StructField('DayOfWeek', IntegerType(), True),
StructField('DepTime', DoubleType(), True),
StructField('CRSDepTime', DoubleType(), True),
StructField('ArrTime', DoubleType(), True),
StructField('CRSArrTime', DoubleType(), True),
StructField('UniqueCarrier', StringType(), True),
StructField('FlightNum', StringType(), True),
StructField('TailNum', StringType(), True),
StructField('ActualElapsedTime', DoubleType(), True),
StructField('CRSElapsedTime', DoubleType(), True),
StructField('AirTime', DoubleType(), True),
StructField('ArrDelay', DoubleType(), True),
StructField('DepDelay', DoubleType(), True),
StructField('Origin', StringType(), True),
StructField('Dest', StringType(), True),
StructField('Distance', DoubleType(), True),
StructField('TaxiIn', DoubleType(), True),
StructField('TaxiOut', DoubleType(), True),
StructField('Cancelled', IntegerType(), True),
StructField('CancellationCode', StringType(), True),
StructField('Diverted', IntegerType(), True),
StructField('CarrierDelay', DoubleType(), True),
StructField('WeatherDelay', DoubleType(), True),
StructField('NASDelay', DoubleType(), True),
StructField('SecurityDelay', DoubleType(), True),
StructField('LateAircraftDelay', DoubleType(), True)
])

oridat = spark.read.options(header='true').schema(schema_sdf).csv("/data/airdelay_small.csv") # spark dataframe

air.describe().show()就相当于简单的描述统计

air.describe([‘ArrDelay’]).show() 看具体的列

Data.collect()可以避开懒人模式直接计算

四、作业 air-delay数据清洗

五百万 * 十九列

转化成新的df,一类是0、1,告诉大家有没有延误

arrivedelay设置成0-1变量。现有的列可以使用:里程,是不是US(0-1),是不是AA(0-1),诸如此类,相当于把原变量修改成哑变量了。

最后得到————>五百万 * 一百八十列

不要超过这么多列。(在老师github上/dlsa/blob/master/projects/logistic…)

作业,整理好这个数据。

下节课对这个数据做逻辑回归。


本文链接: https://konelane.github.io/2020/11/12/201112hadoop/

-- EOF --

¥^¥请氦核牛饮一盒奶~suki