BDA420


Row Operations and Filtering

Summary

Data for this Lecture

Basic Data Cleaning

Filtering

When

Data for this Lecture

Data for this Lecture

We will use the temperatures dataset in this lecture

You can download it from Blackboard

Data for this Lecture

path = "/FileStore/tables/temperatures.csv"
df = spark.read.csv(path, header=True, inferSchema=True)

#making sure that the temps is a double
from pyspark.sql.types import DoubleType
df = df.withColumn("temp", df["temp"].cast(DoubleType()))

df.printSchema()
display(df)

Basic Data Cleaning

Count

.count() returns the number of rows

print(df.count())
        

Distinct

You can use .distinct() to remove duplicate rows

It returns a new dataframe without the duplicate rows

Two rows are duplicates if all columns are the same

print(df.count())

df = df.distinct()
print(df.count())

Dropduplicates

You can use .dropDuplicates() to remove rows where a provided list of columns is identical

The column names must be written as a Python's list: ["col1","col2"]

print(df.dropDuplicates(["city","country"]).count())
display(df.dropDuplicates(["city","country"]))

Dropna

You can use .dropna() to remove rows with null values

You can provide a subset of columns as an argument

display(df.dropna()))
display(df.dropna(subset=("city","country")))
        

Filtering

Filter

The .filter() method can be used to return a new dataframe with only the rows that match a condition:

  A row in a column being greater, less, or equal than/to a given value

  A relationship between data in two columns

The where method is simply an alias to filter

Filter

display(df.filter(df["temp"] > 35))
display(df.filter(df["temp"] < -15))
display(df.filter(df["country"] == "Mexico"))

Filter

Filter can take as argument either a Column type, or a string with an SQL expression

display(df.filter(df["temp"] > 35))
display(df.filter(col("temp") > 35))
display(df.filter("temp > 35"))

Filter

The .filter() method can also be used with string patterns

regex and sql patterns can be used:

    like() for SQL patterns

    rlike() for regex patterns

Filter

display(df.filter(df["country"].rlike("United")).\
    dropDuplicates(["city"])) #regex
display(df.filter(df["country"].like("United%")).\
    dropDuplicates(["city"])) #sql like

Filter

The .filter() method returns a new dataframe

This allows for chaining methods

You can also use logical operators within filters:

  & and

  | or

  ~ not

Filter

display(df.filter(df["country"] == "United States").\
    filter(df["city"] == "Miami"))
display(df.filter(df["country"] == "Mexico").\
    filter(df["temp"] > 25))
display(df.filter((df["temp"] < -12) | (df["temp"] > 35)))
display(df.filter(~(df["temp"] > 0)))

You can use chaining with other methods as well

isin

The .isin(LIST) method can be used to select only the rows of data that match an element from a list

The .isin(LIST) == False selects the rows of data that do not match any element from a list

isin

li=["France", "Germany"]
display(df.filter(df["country"].isin(li)))
display(df.filter(df["country"].isin(li) == False))

agg

agg overview

Aggregates can be applied directly to a DataFrame

Technically, Spark runs a df.groupBy().agg() command

Aggregates are often used, with chaining, after filtering

agg

from pyspark.sql.functions import avg, min, max, stddev

display(df.filter((df["country"]=="United States")&(df["city"]=="Miami"))\
          .agg(avg("temp"), min("temp"), max("temp"), stddev("temp")))

display(df.filter((df["country"]=="Canada")&(df["city"]=="Toronto"))\
          .agg(avg("temp"), min("temp"), max("temp"), stddev("temp")))

when

when

A construct similar to if/elif/else exists in Spark

It allows particular rows of a column to be changed based on a condition, or ...

particular rows of a new column to be created based on a condition

This is accomplished pairing withColumn with when/when/otherwise

when

from pyspark.sql.functions import when
new_df = df.withColumn("opinion", \
            when(df["temp"] < 0, "very cold").\
            when(df["temp"] < 15, "cold").\
            when(df["temp"] < 25, "nice").\
            otherwise("hot"))
display(new_df)
  

Reading Material

Filtering in Spark

Aggregates

When/Otherwise