Data for this Lecture
Basic Column Methods
withColumn
Concatenation and Correlation
groupBy
We will use the imdb_top_1000 dataset in this lecture
You can download it from Blackboard
path = "/FileStore/tables/imdb_top_1000.csv" df = spark.read.csv(path, header=True, inferSchema=True) display(df)
There are three main methods to select particular columns:
→ df.column_name
→ df["column_name"]
→ df.select("column_name")
display(df.Series_Title) # returns a Column object print(type(df.Series_Title)) # fails with spaces display(df["Series_Title"]) # returns a Column object print(type(df["Series_Title"])) display(df.select("Series_Title")) # returns a Dataframe object print(type(df.select("Series_Title"))) display(df["Series_Title","Released_Year"]) # returns a Dataframe print(type(df["Series_Title","Released_Year"])) display(df.select("Series_Title","Released_Year")) print(type(df.select("Series_Title","Released_Year")))
display(df.Series_Title) # returns a Column object print(type(df.Series_Title)) # fails with spaces display(df["Series_Title"]) # returns a Column object print(type(df["Series_Title"])) display(df.select("Series_Title")) # returns a Dataframe object print(type(df.select("Series_Title"))) display(df["Series_Title","Released_Year"]) # returns a Dataframe print(type(df["Series_Title","Released_Year"])) display(df.select("Series_Title","Released_Year")) print(type(df.select("Series_Title","Released_Year")))
display(df.Series_Title) # returns a Column object print(type(df.Series_Title)) # fails with spaces display(df["Series_Title"]) # returns a Column object print(type(df["Series_Title"])) display(df.select("Series_Title")) # returns a Dataframe object print(type(df.select("Series_Title"))) display(df["Series_Title","Released_Year"]) # returns a Dataframe print(type(df["Series_Title","Released_Year"])) display(df.select("Series_Title","Released_Year")) print(type(df.select("Series_Title","Released_Year")))
display(df.Series_Title) # returns a Column object print(type(df.Series_Title)) # fails with spaces display(df["Series_Title"]) # returns a Column object print(type(df["Series_Title"])) display(df.select("Series_Title")) # returns a Dataframe object print(type(df.select("Series_Title"))) display(df["Series_Title","Released_Year"]) # returns a Dataframe print(type(df["Series_Title","Released_Year"])) display(df.select("Series_Title","Released_Year")) print(type(df.select("Series_Title","Released_Year")))
display(df.Series_Title) # returns a Column object print(type(df.Series_Title)) # fails with spaces display(df["Series_Title"]) # returns a Column object print(type(df["Series_Title"])) display(df.select("Series_Title")) # returns a Dataframe object print(type(df.select("Series_Title"))) display(df["Series_Title","Released_Year"]) # returns a Dataframe print(type(df["Series_Title","Released_Year"])) display(df.select("Series_Title","Released_Year")) print(type(df.select("Series_Title","Released_Year")))
The drop()
method can be used to remove unnecessary columns
It returns a new dataframe with all remaining columns
display(df.drop("Genre", "Meta_score")) display(df) # columns are back, why?
display(df.drop("Genre", "Meta_score")) display(df) # columns are back, why?
The withColumn()
method can be used to:
→ Create a new column
→ Change an existing column
→ Convert a column datatype
→ Derive a new column from existing ones
from pyspark.sql.functions import lit #inserts new column with the same value in all rows display(df.withColumn("Country", lit("USA")))
from pyspark.sql.functions import lit #inserts new column with the same value in all rows display(df.withColumn("Country", lit("USA")))
from pyspark.sql.functions import regexp_replace, col display(df.withColumn("Gross", regexp_replace("Gross",",", "")))
from pyspark.sql.functions import regexp_replace, col display(df.withColumn("Gross", regexp_replace("Gross",",", "")))
from pyspark.sql.functions import regexp_replace, col from pyspark.sql.types import * # imports IntegerType display(df.describe("Gross")) df.printSchema() df = df.withColumn("Gross", regexp_replace("Gross",",", "")).\ withColumn("Gross", col("Gross").cast(IntegerType())) display(df) #col explicitly returns a Column object #df["Gross"] isn't always explicitly treated as a Column object display(df.describe("Gross")) df.printSchema()
from pyspark.sql.functions import regexp_replace, col from pyspark.sql.types import * # imports IntegerType display(df.describe("Gross")) df.printSchema() df = df.withColumn("Gross", regexp_replace("Gross",",", "")).\ withColumn("Gross", col("Gross").cast(IntegerType())) display(df) #col explicitly returns a Column object #df["Gross"] isn't always explicitly treated as a Column object display(df.describe("Gross")) df.printSchema()
from pyspark.sql.functions import regexp_replace, col from pyspark.sql.types import * # imports IntegerType display(df.describe("Gross")) df.printSchema() df = df.withColumn("Gross", regexp_replace("Gross",",", "")).\ withColumn("Gross", col("Gross").cast(IntegerType())) display(df) #col explicitly returns a Column object #df["Gross"] isn't always explicitly treated as a Column object display(df.describe("Gross")) df.printSchema()
from pyspark.sql.functions import regexp_replace, col from pyspark.sql.types import * # imports IntegerType display(df.describe("Gross")) df.printSchema() df = df.withColumn("Gross", regexp_replace("Gross",",", "")).\ withColumn("Gross", col("Gross").cast(IntegerType())) display(df) #col explicitly returns a Column object #df["Gross"] isn't always explicitly treated as a Column object display(df.describe("Gross")) df.printSchema()
withColumnRenamed()
changes the name of an existing column
display(df.withColumnRenamed("Series_Title","Title").\ withColumnRenamed("No_of_Votes","Votes"))
The concat
can be used to concatenate values from multiple columns
It works with strings and array columns
Does not accept delimiters to be specified
from pyspark.sql.functions import concat, col df2 = spark.createDataFrame([([1, 2], [3, 4], [5]), \ ([1, 2], [3], [4])], \ ['a', 'b', 'c']) display(df2) display(df2.withColumn("allData", concat(col("a"), col("b"), col("c"))))
from pyspark.sql.functions import concat, col df2 = spark.createDataFrame([([1, 2], [3, 4], [5]), \ ([1, 2], [3], [4])], \ ['a', 'b', 'c']) display(df2) display(df2.withColumn("allData", concat(col("a"), col("b"), col("c"))))
from pyspark.sql.functions import concat, col df2 = spark.createDataFrame([([1, 2], [3, 4], [5]), \ ([1, 2], [3], [4])], \ ['a', 'b', 'c']) display(df2) display(df2.withColumn("allData", concat(col("a"), col("b"), col("c"))))
The concat_ws()
can be used to concatenate strings from multiple columns
Its first argument must be a string separator
The other arguments are the column names
from pyspark.sql.functions import concat_ws display(df.withColumn("Rating", \ concat_ws(" : ", "IMDB_Rating", "Meta_score"))\ .drop("IMDB_Rating", "Meta_score"))
from pyspark.sql.functions import concat_ws display(df.withColumn("Rating", \ concat_ws(" : ", "IMDB_Rating", "Meta_score"))\ .drop("IMDB_Rating", "Meta_score"))
The corr()
method can be used to obtain the correlation between two columns
It takes, as arguments, the column names
Does not work on stringType columns
print(df.stat.corr("IMDB_Rating", "Gross")) print(df.stat.corr("IMDB_Rating", "Meta_Score"))
print(df.stat.corr("IMDB_Rating", "Gross")) print(df.stat.corr("IMDB_Rating", "Meta_Score"))
The groupBy()
method can be used to categorize rows of data into groups
It returns a GroupedData
object which contains a number of aggregate functions:
count
, min
, max
,
sum
, avg
, etc.
Note that the aggregate functions return Dataframes
It is possible to group data based on:
→ Single-column values
→ Multiple column values
from pyspark.sql.functions import sum, avg, max, count, min display(df.groupBy("Released_Year").count()) display(df.groupBy("Meta_score").count()) display(df.groupBy("Meta_score", "Released_Year").count())
from pyspark.sql.functions import sum, avg, max, count, min display(df.groupBy("Released_Year").count()) display(df.groupBy("Meta_score").count()) display(df.groupBy("Meta_score", "Released_Year").count())
from pyspark.sql.functions import sum, avg, max, count, min display(df.groupBy("Released_Year").count()) display(df.groupBy("Meta_score").count()) display(df.groupBy("Meta_score", "Released_Year").count())
from pyspark.sql.functions import sum, avg, max, count, min display(df.groupBy("Released_Year").count()) display(df.groupBy("Meta_score").count()) display(df.groupBy("Meta_score", "Released_Year").count())
It is possible to get multiple aggregates at once
This requires using the .agg
method
Columns are named as: function(column)
This can be overrided with the .alias
method.
display(df.groupBy("Released_Year")\ .agg(avg("Meta_score"), \ max("Meta_score"), \ min("Meta_score"))) display(df.groupBy("Released_Year")\ .agg(avg("Meta_score").alias("avg"), \ max("Meta_score").alias("max"), \ min("Meta_score").alias("min")))
display(df.groupBy("Released_Year")\ .agg(avg("Meta_score"), \ max("Meta_score"), \ min("Meta_score"))) display(df.groupBy("Released_Year")\ .agg(avg("Meta_score").alias("avg"), \ max("Meta_score").alias("max"), \ min("Meta_score").alias("min")))