Data for this Lecture
Basic Column Methods
withColumn
Concatenation and Correlation
groupBy
We will use the imdb_top_1000 dataset in this lecture
You can download it from Blackboard
path = "/FileStore/tables/imdb_top_1000.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
display(df)
There are three main methods to select particular columns:
→ df.column_name
→ df["column_name"]
→ df.select("column_name")
display(df.Series_Title) # returns a Column object
print(type(df.Series_Title)) # fails with spaces
display(df["Series_Title"]) # returns a Column object
print(type(df["Series_Title"]))
display(df.select("Series_Title")) # returns a Dataframe object
print(type(df.select("Series_Title")))
display(df["Series_Title","Released_Year"]) # returns a Dataframe
print(type(df["Series_Title","Released_Year"]))
display(df.select("Series_Title","Released_Year"))
print(type(df.select("Series_Title","Released_Year")))
The drop()
method can be used to remove unnecessary columns
It returns a new dataframe with all remaining columns
display(df.drop("Genre", "Meta_score"))
display(df) # columns are back, why?
The withColumn()
method can be used to:
→ Create a new column
→ Change an existing column
→ Convert a column datatype
→ Derive a new column from existing ones
from pyspark.sql.functions import lit
#inserts new column with the same value in all rows
display(df.withColumn("Country", lit("USA")))
from pyspark.sql.functions import regexp_replace, col
display(df.withColumn("Gross", regexp_replace("Gross",",", "")))
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import * # imports IntegerType
display(df.describe("Gross"))
df.printSchema()
df = df.withColumn("Gross", regexp_replace("Gross",",", "")).\
withColumn("Gross", col("Gross").cast(IntegerType()))
display(df)
#col explicitly returns a Column object
#df["Gross"] isn't always explicitly treated as a Column object
display(df.describe("Gross"))
df.printSchema()
withColumnRenamed()
changes the name of an existing column
display(df.withColumnRenamed("Series_Title","Title").\
withColumnRenamed("No_of_Votes","Votes"))
The concat
can be used to concatenate values from multiple columns
It works with strings and array columns
Does not accept delimiters to be specified
from pyspark.sql.functions import concat, col
df2 = spark.createDataFrame([([1, 2], [3, 4], [5]), \
([1, 2], [3], [4])], \
['a', 'b', 'c'])
display(df2)
display(df2.withColumn("allData", concat(col("a"), col("b"), col("c"))))
The concat_ws()
can be used to concatenate strings from multiple columns
Its first argument must be a string separator
The other arguments are the column names
from pyspark.sql.functions import concat_ws
display(df.withColumn("Rating", \
concat_ws(" : ", "IMDB_Rating", "Meta_score"))\
.drop("IMDB_Rating", "Meta_score"))
The corr()
method can be used to obtain the correlation between two columns
It takes, as arguments, the column names
Does not work on stringType columns
print(df.stat.corr("IMDB_Rating", "Gross"))
print(df.stat.corr("IMDB_Rating", "Meta_Score"))
The groupBy()
method can be used to categorize rows of data into groups
It returns a GroupedData
object which contains a number of aggregate functions:
count
, min
, max
,
sum
, avg
, etc.
Note that the aggregate functions return Dataframes
It is possible to group data based on:
→ Single-column values
→ Multiple column values
from pyspark.sql.functions import sum, avg, max, count, min
display(df.groupBy("Released_Year").count())
display(df.groupBy("Meta_score").count())
display(df.groupBy("Meta_score", "Released_Year").count())
It is possible to get multiple aggregates at once
This requires using the .agg
method
Columns are named as: function(column)
This can be overrided with the .alias
method.
display(df.groupBy("Released_Year")\
.agg(avg("Meta_score"), \
max("Meta_score"), \
min("Meta_score")))
display(df.groupBy("Released_Year")\
.agg(avg("Meta_score").alias("avg"), \
max("Meta_score").alias("max"), \
min("Meta_score").alias("min")))