BDA420


Columns and groupBy

Summary

Data for this Lecture

Basic Column Methods

withColumn

Concatenation and Correlation

groupBy

Data for this Lecture

Data for this Lecture

We will use the imdb_top_1000 dataset in this lecture

You can download it from Blackboard

Data for this Lecture

path = "/FileStore/tables/imdb_top_1000.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
display(df)

Basic Column Methods

Selecting columns

There are three main methods to select particular columns:

  df.column_name

  df["column_name"]

  df.select("column_name")

Selecting Columns

display(df.Series_Title)     # returns a Column object
print(type(df.Series_Title)) # fails with spaces

display(df["Series_Title"]) # returns a Column object
print(type(df["Series_Title"]))

display(df.select("Series_Title")) # returns a Dataframe object
print(type(df.select("Series_Title")))

display(df["Series_Title","Released_Year"]) # returns a Dataframe
print(type(df["Series_Title","Released_Year"]))

display(df.select("Series_Title","Released_Year"))
print(type(df.select("Series_Title","Released_Year")))

Removing Columns

The drop() method can be used to remove unnecessary columns

It returns a new dataframe with all remaining columns

display(df.drop("Genre", "Meta_score"))
display(df) # columns are back, why?

withColumn

withColumn

The withColumn() method can be used to:

  Create a new column

  Change an existing column

  Convert a column datatype

  Derive a new column from existing ones

withColumn - adding

from pyspark.sql.functions import lit
#inserts new column with the same value in all rows
display(df.withColumn("Country", lit("USA")))

withColumn - Changing

from pyspark.sql.functions import regexp_replace, col

display(df.withColumn("Gross", regexp_replace("Gross",",", "")))

withColumn - Converting

from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import * # imports IntegerType

display(df.describe("Gross"))
df.printSchema()

df = df.withColumn("Gross", regexp_replace("Gross",",", "")).\
        withColumn("Gross", col("Gross").cast(IntegerType()))
display(df)
#col explicitly returns a Column object
#df["Gross"] isn't always explicitly treated as a Column object

display(df.describe("Gross"))
df.printSchema()

withColumnRenamed

withColumnRenamed() changes the name of an existing column

display(df.withColumnRenamed("Series_Title","Title").\
         withColumnRenamed("No_of_Votes","Votes"))

Concatenating and correlating columns

Concat

The concat can be used to concatenate values from multiple columns

It works with strings and array columns

Does not accept delimiters to be specified

Concat

from pyspark.sql.functions import concat, col

df2 = spark.createDataFrame([([1, 2], [3, 4], [5]), \
                            ([1, 2], [3], [4])], \
                             ['a', 'b', 'c'])
display(df2)

display(df2.withColumn("allData", concat(col("a"), col("b"), col("c"))))

Concat_WS

The concat_ws() can be used to concatenate strings from multiple columns

Its first argument must be a string separator

The other arguments are the column names

Concat_WS

from pyspark.sql.functions import concat_ws

display(df.withColumn("Rating", \
           concat_ws(" : ", "IMDB_Rating", "Meta_score"))\
          .drop("IMDB_Rating", "Meta_score"))

Corr

The corr() method can be used to obtain the correlation between two columns

It takes, as arguments, the column names

Does not work on stringType columns

print(df.stat.corr("IMDB_Rating", "Gross"))
print(df.stat.corr("IMDB_Rating", "Meta_Score"))

groupBy

GroupBy

The groupBy() method can be used to categorize rows of data into groups

It returns a GroupedData object which contains a number of aggregate functions:

    count, min, max, sum, avg, etc.

Note that the aggregate functions return Dataframes

GroupBy

It is possible to group data based on:

  Single-column values

  Multiple column values

groupBy

from pyspark.sql.functions import sum, avg, max, count, min

display(df.groupBy("Released_Year").count())
display(df.groupBy("Meta_score").count())
display(df.groupBy("Meta_score", "Released_Year").count())
  

GroupBy

It is possible to get multiple aggregates at once

This requires using the .agg method

Columns are named as: function(column)

This can be overrided with the .alias method.

groupBy

display(df.groupBy("Released_Year")\
          .agg(avg("Meta_score"), \
               max("Meta_score"), \
               min("Meta_score")))
            
display(df.groupBy("Released_Year")\
          .agg(avg("Meta_score").alias("avg"), \
               max("Meta_score").alias("max"), \
               min("Meta_score").alias("min")))

Reading Material

withColumn Documentation

withColumn by Example

groupBy Documentation

groupedData Documentation

groupBy by Example

groupBy Tutorial