下载安卓APP箭头
箭头给我发消息

客服QQ:3315713922

对比Pandas,学习PySpark大数据处理

作者:匿名     来源: 大数据点击数:686发布时间: 2022-12-02 17:26:54

标签: pandasPySpark大数据

  在这篇文章中,我们将对比用于基本数据操作任务的 pandas 代码片段和它们在 PySpark 中的对应功能的代码片段。利用 pandas 数据操作技能来学习 PySpark 。

  ​常有优势的技能。如果你已经熟悉运用 Python 和 pandas 做常规数据处理,并且想学习处理大数据,那么熟悉 PySpark,并将用其做数据处理,将会是一个不错的开始。PySpark是一种适用于 Apache Spark 的 Python API,一种流行的大数据开源数据处理引擎。

  本文的前提是,假设读者在 Python 中熟练使用 pandas 操作数据。

  数据集

  从导包开始。在 PySpark 中,需要创建一个 Spark 会话 SparkSession。创建 Spark 会话后,可以从以下位置访问 Spark Web 用户界面 (Web UI):http://localhost:4040/。下面定义的应用程序名称appName为“PyDataStudio”,将显示为 Web UI 右上角的应用程序名称。本文将不会使用 Web UI,但是,如果您有兴趣了解更多信息,请查看官方文档[1]。

  复制

  1.  import pandas as pd

  2.  from pyspark.sql import SparkSession

  3.  spark = SparkSession.builder.appName('PyDataStudio').getOrCreate()

  我们将在这篇文章中使用企鹅数据集[2]。使用下面的脚本,我们将penguins.csv数据的修改版本保存在工作目录中。

  复制

  1.  from seaborn import load_dataset

  2.  (load_dataset('penguins')

  3.  .drop(columns=['bill_length_mm', 'bill_depth_mm'])

  4.  .rename(columns={'flipper_length_mm': 'flipper',

  5.  'body_mass_g': 'mass'})

  6.  .to_csv('penguins.csv', index=False))

  看一下两个库之间的语法比较。为了简洁,我们仅保留显示 PySpark 输出。

  基本使用

  两个库的数据对象都称为 DataFrame:pandas DataFrame vs PySpark DataFrame。

  导入数据并检查其形状

  复制

  1.  # pandas

  2.  df = pd.read_csv('penguins.csv')

  3.  df.shape

  4.  # PySpark

  5.  df = spark.read.csv('penguins.csv', header=True, inferSchema=True)

  6.  df.count(), len(df.columns)

  复制

  1.  (344, 5)

  使用 PySpark 导入数据时,指定header=True​数据类型用第一行作标题,并设置inferSchema=True​。可以尝试不使用这些选项导入并检查 DataFrame 及其数据类型(类似于 pandas 使用df.dtype 检查 PySpark DataFrames 的数据类型)。

  与 pandas DataFrame 不同,PySpark DataFrame 没有像.shape可以直接查看数据的形状。所以要得到数据形状,我们分别求行数和列数。

  检查有关数据的高级信息

  复制

  1.  # pandas

  2.  df.info()

  3.  # PySpark

  4.  df.printSchema()

  复制

  1.  root

  2.  |-- species: string (nullable = true)

  3.  |-- island: string (nullable = true)

  4.  |-- flipper: double (nullable = true)

  5.  |-- mass: double (nullable = true)

  6.  |-- sex: string (nullable = true)

  虽然此方法不会提供与df.info()相同的输出,但它是最接近的内置方法之一。

  查看数据的前几行

  复制

  1.  # pandas

  2.  df.head()

  3.  # PySpark

  4.  df.show(5)

  复制

  1.  +-------+---------+-------+------+------+

  2.  |species| island|flipper| mass| sex|

  3.  +-------+---------+-------+------+------+

  4.  | Adelie|Torgersen| 181.0|3750.0| Male|

  5.  | Adelie|Torgersen| 186.0|3800.0|Female|

  6.  | Adelie|Torgersen| 195.0|3250.0|Female|

  7.  | Adelie|Torgersen| null| null| null|

  8.  | Adelie|Torgersen| 193.0|3450.0|Female|

  9.  +-------+---------+-------+------+------+

  10.  only showing top 5 rows

  默认情况下,df.show()​默认显示前 20 行。PySpark DataFrame 实际上有一个名为.head()​的方法,可以查看前几行的数据,并以row对象形式打印出。运行df.head(5)提供如下输出:

  复制

  1.  df.head(5)

  .show()​方法的输出更简洁,因此在查看数据集的top行时用.show()。

  选择列

  复制

  1.  # pandas

  2.  df[['island', 'mass']].head(3)

  3.  # PySpark

  4.  df[['island', 'mass']].show(3)

  复制

  1.  +---------+------+

  2.  | island| mass|

  3.  +---------+------+

  4.  |Torgersen|3750.0|

  5.  |Torgersen|3800.0|

  6.  |Torgersen|3250.0|

  7.  +---------+------+

  8.  only showing top 3 rows

  虽然可以在这里使用的是类似于 pandas 的语法,而在 PySpark 中默认使用如下代码片段所示的方法选择列:

  复制

  1.  df.select('island', 'mass').show(3)

  2.  df.select(['island', 'mass']).show(3)

  过滤

  根据条件过滤数据

  复制

  1.  # pandas

  2.  df[df['species']=='Gentoo'].head()

  3.  # PySpark

  4.  df[df['species']=='Gentoo'].show(5)

  复制

  1.  +-------+------+-------+------+------+

  2.  |species|island|flipper| mass| sex|

  3.  +-------+------+-------+------+------+

  4.  | Gentoo|Biscoe| 211.0|4500.0|Female|

  5.  | Gentoo|Biscoe| 230.0|5700.0| Male|

  6.  | Gentoo|Biscoe| 210.0|4450.0|Female|

  7.  | Gentoo|Biscoe| 218.0|5700.0| Male|

  8.  | Gentoo|Biscoe| 215.0|5400.0| Male|

  9.  +-------+------+-------+------+------+

  10.  only showing top 5 rows

  两个库之间的语法几乎相同。要获得相同的输出,还可以使用:

  复制

  1.  df.filter(df['species']=='Gentoo').show(5) df.filter("species=='Gentoo'").show(5) 

  下面显示了一些常见的过滤器比较:

  复制

  1.  # pandas

  2.  df[df['species'].isin(['Chinstrap', 'Gentoo'])].head()

  3.  df[df['species'].str.match('G.')] .head()

  4.  df[df['flipper'].between(225,229)].head()

  5.  df[df['mass'].isnull()].head()1b df.loc[df['species']!='Gentoo'].head()

  6.  df[~df['species'].isin(['Chinstrap', 'Gentoo'])].head()

  7.  df[-df['species'].str.match('G.')].head()

  8.  df[~df['flipper'].between(225,229)].head()

  9.  df[df['mass'].notnull()].head()6 df[(df['mass']<3400) & (df['sex']=='Male')].head()

  10.  df[(df['mass']<3400) | (df['sex']=='Male')].head()

      11.

  12.  # PySpark

  13.  df[df['species'].isin(['Chinstrap', 'Gentoo'])].show(5)

  14.  df[df['species'].rlike('G.')].show(5)

  15.  df[df['flipper'].between(225,229)].show(5)

  16.  df[df['mass'].isNull()].show(5)1b df[df['species']!='Gentoo'].show(5)

  17.  df[~df['species'].isin(['Chinstrap', 'Gentoo'])].show(5)

  18.  df[~df['species'].rlike('G.')].show(5)

  19.  df[~df['flipper'].between(225,229)].show(5)

  20.  df[df['mass'].isNotNull()].show(5)

  21.  df[(df['mass']<3400) & (df['sex']=='Male')].show(5)

  22.  df[(df['mass']<3400) |(df[ 'sex']=='Male')].show(5)

  虽然~和-​在 pandas 中都可以作为否定,但在 PySpark 中仅有~能作为有效的否定。

  排序

  对数据进行排序并检查mass最小的 5 行:

  复制

  1.  # pandas

  2.  df.nsmallest(5, 'mass')

  3.  # PySpark

  4.  df[df['mass'].isNotNull()].orderBy('mass').show(5)

  复制

  1.  +---------+------+-------+------+------+

  2.  | species|island|flipper| mass| sex|

  3.  +---------+------+-------+------+------+

  4.  |Chinstrap| Dream| 192.0|2700.0|Female|

  5.  | Adelie|Biscoe| 184.0|2850.0|Female|

  6.  | Adelie|Biscoe| 181.0|2850.0|Female|

  7.  | Adelie|Biscoe| 187.0|2900.0|Female|

  8.  | Adelie| Dream| 178.0|2900.0|Female|

  9.  +---------+------+-------+------+------+

  10.  only showing top 5 rows

  Pandas的.nsmallest()和.nlargest()​方法会自动排除缺失值。而 PySpark 没有等效的方法。为了获得相同的输出,首先过滤掉缺失mass的行,然后对数据进行排序并查看前 5 行。如果没有删除数据,可以简写为:

  复制

  1.  df.orderBy(‘mass’).show(5).sort()

  代替的另一种排序方式.orderBy():

  复制

  1.  # pandas

  2.  df.nlargest(5, 'mass')

  3.  # PySpark

  4.  df.sort('mass', ascending=False).show(5)1

  复制

  1.  +-------+------+-------+------+----+

  2.  |species|island|flipper| mass| sex|

  3.  +-------+------+-------+------+----+

  4.  | Gentoo|Biscoe| 221.0|6300.0|Male|

  5.  | Gentoo|Biscoe| 230.0|6050.0|Male|

  6.  | Gentoo|Biscoe| 220.0|6000.0|Male|

  7.  | Gentoo|Biscoe| 222.0|6000.0|Male|

  8.  | Gentoo|Biscoe| 229.0|5950.0|Male|

  9.  +-------+------+-------+------+----+

  10.  only showing top 5 rows

  这些语法的变体也是等效的:

  复制

  1.  df.sort(df['mass'].desc()).show(5)

  2.  df.orderBy('mass', ascending=False).show(5)

  3.  df.orderBy(df['mass'].desc( )).show(5)

  按多列排序,如下所示:

  复制

  1.  # pandas

  2.  df.sort_values(['mass', 'flipper'], ascending=False).head()

  3.  # PySpark

  4.  df.orderBy(['mass', 'flipper'], ascending=False).show(5)

  复制

  1.  +-------+------+-------+------+----+

  2.  |species|island|flipper| mass| sex|

  3.  +-------+------+-------+------+----+

  4.  | Gentoo|Biscoe| 221.0|6300.0|Male|

  5.  | Gentoo|Biscoe| 230.0|6050.0|Male|

  6.  | Gentoo|Biscoe| 222.0|6000.0|Male|

  7.  | Gentoo|Biscoe| 220.0|6000.0|Male|

  8.  | Gentoo|Biscoe| 229.0|5950.0|Male|

  9.  +-------+------+-------+------+----+

  10.  only showing top 5 rows

  在 PySpark 中,可以在将所有列分别传参数,而不需要写成列表的形式

  复制

  1.  df.orderBy('mass', 'flipper', ascending=False).show(5)1

  要按多列但按不同方向排序:

  复制

  1.  # pandas

  2.  df.sort_values(['mass', 'flipper'], ascending=[True, False]).head()

  3.  # PySpark

  4.  df[df['mass'].isNotNull()]\\

  5.  .sort('mass', 'flipper', ascending=[True, False]).show(5)

  复制

  1.  +---------+---------+-------+------+------+

  2.  | species| island|flipper| mass| sex|

  3.  +---------+---------+-------+------+------+

  4.  |Chinstrap| Dream| 192.0|2700.0|Female|

  5.  | Adelie| Biscoe| 184.0|2850.0|Female|

  6.  | Adelie| Biscoe| 181.0|2850.0|Female|

  7.  | Adelie|Torgersen| 188.0|2900.0|Female|

  8.  | Adelie| Biscoe| 187.0|2900.0|Female|

  9.  +---------+---------+-------+------+------+

  10.  only showing top 5 rows

  pyspark的另一种写法

  复制

  1.  df[df['mass'].isNotNull()]\\

  2.  .orderBy(df['mass'].asc(), df['flipper'].desc()).show(5)

  聚合

  现在,看几个聚合数据的示例。

  简单的聚合

  二者方法类似:

  复制

  1.  # pandas

  2.  df.agg({'flipper': 'mean'})

  3.  # PySpark

  4.  df.agg({'flipper': 'mean'}).show()1

  复制

  1.  +------------------+

  2.  | avg(flipper)|

  3.  +------------------+

  4.  |200.91520467836258|

  5.  +------------------+

  多个聚合

  需要采用不同的方法:

  复制

  1.  # pandas

  2.  df.agg({'flipper': ['min', 'max']})

  3.  # PySpark

  4.  from pyspark.sql import functions as F

  5.  df.agg(F.min('flipper'), F.max('flipper')).show()1

  复制

  1.  +------------+------------+

  2.  |min(flipper)|max(flipper)|

  3.  +------------+------------+

  4.  | 172.0| 231.0|

  5.  +------------+------------+

  获取唯一值

  复制

  1.  # pandas

  2.  df['species'].unique()

  3.  # PySpark

  4.  df.select('species').distinct().show()1

  复制

  1.  +---------+

  2.  | species|

  3.  +---------+

  4.  | Gentoo|

  5.  | Adelie|

  6.  |Chinstrap|

  7.  +---------+

  要在列中获取多个不同的值:

  复制

  1.  # pandas

  2.  df['species'].nunique()

  3.  # PySpark

  4.  df.select('species').distinct().count()1

  按组聚合

  到目前为止,PySpark 使用 camelCase 驼峰命名法来表示方法和函数。.groupBy()这也是如此。这是一个简单的按聚合分组的示例:

  复制

  1.  # pandas

  2.  df.groupby('species')['mass'].mean()

  3.  # PySpark

  4.  df.groupBy('species').agg({'mass': 'mean'}).show()1

  复制

  1.  +---------+------------------+

  2.  | species| avg(mass)|

  3.  +---------+------------------+

  4.  | Gentoo| 5076.016260162602|

  5.  | Adelie| 3700.662251655629|

  6.  |Chinstrap|3733.0882352941176|

  7.  +---------+------------------+1

  这是一个聚合多个选定列的示例:

  复制

  1.  # pandas

  2.  df.groupby('species').agg({'flipper': 'sum',

  3.  'mass': 'mean'})

  4.  # PySpark

  5.  df.groupBy('species').agg({'flipper': 'sum',

  6.   'mass': 'mean'}).show()

  复制

  1.  +---------+------------+--------------+

  2.  | species|sum(flipper)| avg(mass)|

  3.  +---------+------------+--------------+

  4.  | Gentoo| 26714.0| 5076.01626016|

  5.  | Adelie| 28683.0| 3700.66225165|

  6.  |Chinstrap| 13316.0|3733.088235294|

  7.  +---------+------------+--------------+

  如果我们不指定列,它将显示所有数字列的统计信息:

  复制

  1.  # pandas

  2.  df.groupby('species').mean()

  3.  # PySpark

  4.  df.groupBy('species').mean().show()

  复制

  1.  +---------+--------------+--------------+

  2.  | species| avg(flipper)| avg(mass)|

  3.  +---------+--------------+--------------+

  4.  | Gentoo| 217.186991869| 5076.01626016|

  5.  | Adelie|189.9536423841| 3700.66225165|

  6.  |Chinstrap| 195.823529411|3733.088235294|

  7.  +---------+--------------+--------------+1

  也可以将.mean()​替换为.avg()​,即可以使用df.groupBy(‘species’).avg().show()。

  以上就是本文的所有内容,希望能够帮到你对 PySpark 语法有所了解。我们注意到,在基本任务方面,这两个库之间有很多相似之处。这使得在熟悉 pandas 工作知识的人更容易开始使用 PySpark,在处理小数据分析与挖掘后,遇到大数据分析与挖掘时,也能够轻松面对。

  参考资料

  [1]官方文档: https://spark.apache.org/docs/latest/web-ui.html

  [2]企鹅数据集: https://github.com/mwaskom/seaborn-data/blob/master/penguins.csv

  来源: 数据STUDIO

    >>>>>>点击进入大数据专题

赞(11)
踩(0)
分享到:
华为认证网络工程师 HCIE直播课视频教程