【Spark】Pyspark RDD,yoga11s
cpugpu芯片开发光刻机
智能终端演进
1
文件名:【Spark】Pyspark RDD,yoga11s
【Spark】Pyspark RDD
1. RDD算子1.1 文件 <=> rdd对象1.2 map、foreach、mapPartitions、foreach Partitions1.3 flatMap 先map再解除嵌套1.4 reduceByKey、reduce、fold 分组聚合1.5 mapValue 二元组value进行map操作1.6 groupBy、groupByKey1.7 filter、distinct 过滤筛选1.8 union 合并1.9 join、leftOuterJoin、rightOuterJoin 连接1.10 intersection 交集1.11 sortBy、sortByKey 排序1.12 countByKey 统计key出现次数1.13 first、take、top、count 取元素1.14 takeOrdered 排序取前n个1.15 takeSample 随机抽取 from pyspark import SparkConf, SparkContextconf = SparkConf().setAppName('test')\.setMaster('local[*]')sc = SparkContext(conf=conf) 1. RDD算子 1.1 文件 <=> rdd对象 # 集合对象 -> rdd (集合对象,分区数默认cpu核数)rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)print(rdd.glom().collect(), rdd.getNumPartitions())# [[1, 2], [3, 4], [5, 6]] 3# 文件 -> rddrdd = sc.textFile("./data.csv")print(rdd.collect())# ['1, 2, 3, 4, 5, 6']# rdd -> 文件rdd = sc.parallelize([1, 2, 3], 3)rdd.saveAsTextFile('./output')''' 生成output文件夹里面有按分区存储的多个文件''' 1.2 map、foreach、mapPartitions、foreach Partitions # map函数rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)rdd2 = rdd.map(lambda x: (x, 1))print(rdd2.map(lambda x: x[0] + x[1]).collect())# [2, 3, 4, 5, 6, 7] # foreach # 同map,但无返回值,且不改变原元素# 另有foreachPartitionsrdd = sc.parallelize([1, 2, 3])rdd.foreach(lambda x: print(x))# 1 3 2rdd.foreach(lambda x: -x)rdd.collect()# [1, 2, 3] # mapPartitions''' map 一次调出一个元素进行计算,io次数多mapPartitions 一次将一个分区的所有元素调出计算s'''rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)def func(iter):# 相较于map时间复杂度没优化,空间复杂度优化res = list()for it in iter:res.append(it * 10)return resrdd.mapPartitions(func).collect()# [10, 20, 30, 40, 50, 60] 1.3 flatMap 先map再解除嵌套 # flatMap 先执行map操作,再解除嵌套(降维 softmax前flatten)rdd = sc.textFile("./data.csv")print(rdd.collect())rdd.flatMap(lambda x: x.split(' ')).collect() 1.4 reduceByKey、reduce、fold 分组聚合 # reduceByKey 按照key分组,再对组内value完成聚合逻辑# key-value型(二元元组)rddrdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])print(rdd.reduceByKey(lambda a, b: a + b).collect())# [('b', 3), ('a', 3)] # reduce 只聚合# 不返回rdd rdd = sc.parallelize(range(1, 3))print(rdd.reduce(lambda a, b: a + b))# 3print(sc.parallelize([('a', 1), ('a', 1)]).reduce(lambda a, b: a + b))# ('a', 1, 'a', 1) # fold 带初值的reducerdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)print(rdd.fold(10, lambda a, b: a + b))''' [[1, 2], [3, 4], [5, 6]]10 + 1 + 2 = 1310 + 3 + 4 = 1710 + 5 + 6 = 2110 + 13 + 17 + 21 = 61> 61''' 1.5 mapValue 二元组value进行map操作 # mapValues 对二元组内的value执行map操作, 没有分组操作rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])rdd.mapValues(lambda x: x * 10).collect() 1.6 groupBy、groupByKey groupBy、groupByKey、reduceByKey区别 # groupBy 多元组皆可进行分组,可选择按哪一个值分组# reduceByKey 分组后(ByKey)对value进行聚合(reduce),二元组第一个值为keyrdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])# 分组 按第一个值rdd2 = rdd.groupBy(lambda x: x[0])print(rdd2.collect())'''返回的是迭代器,需进一步转换[('a', <pyspark.resultiterable.ResultIterable object at 0x106178370>), ('b', <pyspark.resultiterable.ResultIterable object at 0x1060abe50>)]'''rdd3 = rdd2.map(lambda x: (x[0], list(x[1])))print(rdd3.collect())''' [('a', [('a', 1), ('a', 2)]), ('b', [('b', 3), ('b', 4)])]''' # groupByKey# 自动按照key分组,分组后没有聚合操作,只允许二元组rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 3), ('b', 4)])rdd2 = rdd.groupByKey()rdd2.map(lambda x: (x[0], list(x[1]))).collect()# [('a', [1, 2]), ('b', [3, 4])] 1.7 filter、distinct 过滤筛选 # filter 过滤器# 过滤条件True则保留rdd = sc.parallelize([1, 2, 3, 4, 5])rdd.filter(lambda x: x > 3).collect()# [4, 5] # distinct 去重rdd = sc.parallelize([1, 1, 1, 1, 2, 3, 'a', 'a'])rdd.distinct().collect()# [[1, 'a', 2, 3] 1.8 union 合并 # union 合并两个rdd# 元素凑在一起,不考虑重复rdd_a = sc.parallelize([1, 1, 2, 3])rdd_b = sc.parallelize([2, 3, ('a', 1), ('b', 2)])rdd_a.union(rdd_b).collect()# [1, 1, 2, 3, 2, 3, ('a', 1), ('b', 2)] 1.9 join、leftOuterJoin、rightOuterJoin 连接 # join JOIN操作# 只用于二元组,相同key进行关联rdd_a = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])rdd_b = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])print(rdd_a.join(rdd_b).collect())''' 内连接 取交集[('b', (3, 2)), ('a', (1, 1)), ('a', (2, 1))]'''print(rdd_a.leftOuterJoin(rdd_b).collect())''' 左连接 取交集和左边全部[('b', (3, 2)), ('a', (1, 1)), ('a', (2, 1))]'''print(rdd_a.rightOuterJoin(rdd_b).collect())''' 右连接 取交集和右边全部[('b', (3, 2)), ('c', (None, 3)), ('a', (1, 1)), ('a', (2, 1))]''' 1.10 intersection 交集 # intersection 取交集# 区别于join,没有按key连接的操作rdd_a = sc.parallelize([('a', 1), ('a', 2), ('b', 3)])rdd_b = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])rdd_a.intersection(rdd_b).collect()# [('a', 1)] 1.11 sortBy、sortByKey 排序 # sortBy# func 指定排序元素的方法# ascending True生序,False降序# numPartitions 用多少分区排序rdd = sc.parallelize([[1, 2, 3], [7, 8, 9],[4, 5, 6]])rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect()''' [[1, 2, 3], [4, 5, 6], [7, 8, 9]]''' # sortByKey 针对kv型rdd''' ascending True升序,False降序numPartitions 全局有序要设为1,否则只能保证分区内有序keyfunc 对key进行处理,再排序'''rdd = sc.parallelize([('a', 1), ('c', 2), ('B', 3)])print(rdd.sortByKey(ascending=True, numPartitions=1).collect())''' [('B', 3), ('a', 1), ('c', 2)]'''print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda k: str(k).lower()).collect())''' [('a', 1), ('B', 3), ('c', 2)]''' 1.12 countByKey 统计key出现次数 # countByKey 统计key出现次数,可多元元组# 返回dict 不是rddrdd = sc.parallelize([('a', 1, 2), ('a'), ('b', 1)])rdd.countByKey() 1.13 first、take、top、count 取元素 # first 取第一个元素rdd = sc.parallelize([('a', 1, 2), ('a'), ('b', 1)])print(rdd.first() )# ('a', 1, 2)# take 取前n个元素print(rdd.take(2))# [('a', 1, 2), 'a']# count 返回元素个数print(rdd.count())# top 降序排序取前n个rdd = sc.parallelize([2, 4, 1, 6])print(rdd.top(2))# [6, 4] 1.14 takeOrdered 排序取前n个 # takeOrdered 排序取前n个''' param1: nparam2: func取数前更改元素,不更改元素本身,不传func,默认升序(取前n最小值)func = lambda x: -x 变为降序,取前n最大值,和top相同'''rdd = sc.parallelize([2, 4, 1, 6])rdd.takeOrdered(2) # [1, 2]rdd.takeOrdered(2, lambda x: -x) # [6, 4] 1.15 takeSample 随机抽取 # takeSample 随机抽取元素''' param1: True随机有放回抽样,Fasle不放回抽样 param2: 抽样个数param3: 随机数种子'''rdd = sc.parallelize([1])rdd.takeSample(True, 2)
同类推荐
-

-【UVA - 10382】Watering Grass(贪心,区间覆盖问题,思维),t57
查看 -

【Project3】技术总结,天堂网2015
查看 -

【Proteus仿真】【51单片机】贪吃蛇游戏,720p高清
查看 -

【Qt】QMainWidget中的栏和菜单,z60t(qt widget添加菜单栏)
查看 -

【Qt】顶层窗口和普通窗口区别以及用法,聚划算官网首页(qt 顶层窗口)
查看 -

【Qt之QTableWidget和QTreeWidget】树悬停、选择样式及表格表头和首行间隔线,泡泡网笔记本
查看 -

【Qt开发流程】之事件过滤器及sendEvent和postEvent,索爱w810c
查看 -

【Qt绘制仪表盘】,lg t320
查看 -

【ROS导航Navigation】五 - 导航相关的消息 - 地图 - 里程计 - 坐标变换 - 定位 - 目标点和路径规划 - 激光雷达 - 相机,el71
查看
控制面板
网站分类
搜索
最新留言
文章归档
网站收藏
友情链接