Data for this Lecture
Basic Data Cleaning
Filtering
When
We will use the temperatures dataset in this lecture
You can download it from Blackboard
path = "/FileStore/tables/temperatures.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
#making sure that the temps is a double
from pyspark.sql.types import DoubleType
df = df.withColumn("temp", df["temp"].cast(DoubleType()))
df.printSchema()
display(df)
.count()
returns the number of rows
print(df.count())
You can use .distinct()
to remove duplicate rows
It returns a new dataframe without the duplicate rows
Two rows are duplicates if all columns are the same
print(df.count())
df = df.distinct()
print(df.count())
You can use .dropDuplicates()
to remove rows where a provided list of columns is identical
The column names must be written as a Python's list: ["col1","col2"]
print(df.dropDuplicates(["city","country"]).count())
display(df.dropDuplicates(["city","country"]))
You can use .dropna()
to remove rows with null values
You can provide a subset of columns as an argument
display(df.dropna()))
display(df.dropna(subset=("city","country")))
The .filter()
method can be used to return a new dataframe with only the rows that match a condition:
→ A row in a column being greater, less, or equal than/to a given value
→ A relationship between data in two columns
The where
method is simply an alias to filter
display(df.filter(df["temp"] > 35))
display(df.filter(df["temp"] < -15))
display(df.filter(df["country"] == "Mexico"))
Filter
can take as argument either a Column type, or a string with an SQL expression
display(df.filter(df["temp"] > 35))
display(df.filter(col("temp") > 35))
display(df.filter("temp > 35"))
The .filter()
method can also be used with string patterns
regex and sql patterns can be used:
like()
for SQL patterns
rlike()
for regex patterns
display(df.filter(df["country"].rlike("United")).\
dropDuplicates(["city"])) #regex
display(df.filter(df["country"].like("United%")).\
dropDuplicates(["city"])) #sql like
The .filter()
method returns a new dataframe
This allows for chaining methods
You can also use logical operators within filters:
&
and
|
or
~
not
display(df.filter(df["country"] == "United States").\
filter(df["city"] == "Miami"))
display(df.filter(df["country"] == "Mexico").\
filter(df["temp"] > 25))
display(df.filter((df["temp"] < -12) | (df["temp"] > 35)))
display(df.filter(~(df["temp"] > 0)))
You can use chaining with other methods as well
The .isin(LIST)
method can be used to select only the rows of data that match an element from a list
The .isin(LIST) == False
selects the rows of data that do not match any element from a list
li=["France", "Germany"]
display(df.filter(df["country"].isin(li)))
display(df.filter(df["country"].isin(li) == False))
Aggregates can be applied directly to a DataFrame
Technically, Spark runs a df.groupBy().agg()
command
Aggregates are often used, with chaining, after filtering
from pyspark.sql.functions import avg, min, max, stddev
display(df.filter((df["country"]=="United States")&(df["city"]=="Miami"))\
.agg(avg("temp"), min("temp"), max("temp"), stddev("temp")))
display(df.filter((df["country"]=="Canada")&(df["city"]=="Toronto"))\
.agg(avg("temp"), min("temp"), max("temp"), stddev("temp")))
A construct similar to if/elif/else
exists in Spark
It allows particular rows of a column to be changed based on a condition, or ...
particular rows of a new column to be created based on a condition
This is accomplished pairing withColumn
with when/when/otherwise
from pyspark.sql.functions import when
new_df = df.withColumn("opinion", \
when(df["temp"] < 0, "very cold").\
when(df["temp"] < 15, "cold").\
when(df["temp"] < 25, "nice").\
otherwise("hot"))
display(new_df)