lab-lab

1)

from pyspark.sql import *

from pyspark.sql.functions import *

from pyspark.sql.types import *

spark = SparkSession.builder.appName(“Emp”).getOrCreate()

emp = spark.read.csv(“emp_data.csv”,header=True,inferSchema=True)

emp.show()

missing_counts = emp.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in emp.columns])

missing_counts.show()

emp = emp.fillna({“LastName”: “Unknown”})

emp = emp.dropna(subset=[“EmpID”, “StartDate”])

emp=emp.filter((col(“Current Employee Rating”) >= 1) & (col(“Current Employee Rating”) <= 5))

emp = emp.filter(length(col(“LocationCode”)) <= 6)

emp = emp.dropDuplicates()

df =emp.groupBy(“DepartmentType”,“Title”).count().orderBy(“DepartmentType”,“Title”)

df.show()

emp.show()

#This is an easier code for the last question, however it does not use max like asked in the question. Study at your own risk.

df = emp.withColumn(

  “Performance Score”,

  when(col(“Performance Score”) == “Exceeds”, 3)

  .when(col(“Performance Score”) == “Fully Meets”, 2)

  .when(col(“Performance Score”) == “Needs Improvement”, 1)

  .otherwise(0)

)

highest_perf = (

  df.orderBy(“DepartmentType”, col(“Performance Score”).desc())

  .dropDuplicates([“DepartmentType”])

  .select(“DepartmentType”, “EmpID”, “FirstName”, “LastName”, “Performance Score”)

)

highest_perf.show()

#This is the correct code, however it is harder.

# Step 1: Map performance scores to numeric values

df = emp.withColumn(

  “Performance Score”,

  when(col(“Performance Score”) == “Exceeds”, 3)

  .when(col(“Performance Score”) == “Fully Meets”, 2)

  .when(col(“Performance Score”) == “Needs Improvement”, 1)

  .otherwise(0)

)

# Step 2: Find the maximum performance score for each department

max_scores = df.groupBy(“DepartmentType”).agg(max(“Performance Score”).alias(“MaxPerformanceScore”))

# Step 3: Alias DataFrames and join with qualified column names

df_alias = df.alias(“df”)

max_scores_alias = max_scores.alias(“max_scores”)

highest_perf = df_alias.join(

  max_scores_alias,

  (col(“df.DepartmentType”) == col(“max_scores.DepartmentType”)) &

  (col(“df.Performance Score”) == col(“max_scores.MaxPerformanceScore”))

).select(

  col(“df.DepartmentType”),

  col(“df.EmpID”),

  col(“df.FirstName”),

  col(“df.LastName”),

  col(“df.Performance Score”)

)

# Step 4: Drop duplicates based on DepartmentType

highest_perf = highest_perf.dropDuplicates([“DepartmentType”])

# Step 5: Show the results

highest_perf.show()

2)

from pyspark.sql import *

from pyspark.sql.functions import *

from pyspark.sql.types import *

# Initialize Spark session

spark = SparkSession.builder.appName(“DataCleaning”).getOrCreate()

# Load dataset (replace ‘path_to_file’ with the actual path)

df = spark.read.csv(“Sales Data.csv”, header=True, inferSchema=True)

df.select(

  [count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()

# Decide whether to fill or drop rows with missing values

# For numerical columns, replace nulls with mean values

numerical_columns = [‘Sales’, ‘Quantity Ordered’]

for col_name in numerical_columns:

  mean_value = df.select(mean(col_name)).collect()[0][0]

  df = df.fillna({col_name: mean_value})

# Drop rows with null values in critical columns

df = df.dropna()

df = df.dropDuplicates()

# Cast numerical columns to appropriate types

df = df.withColumn(“Sales”, col(“Sales”).cast(“float”))

df = df.withColumn(“Quantity Ordered”, col(“Quantity Ordered”).cast(“integer”))

df = df.withColumn(“Price Each”, col(“Price Each”).cast(“float”))

# Verify schema

df.printSchema()

# Remove rows with negative values in specific columns

columns_to_check = [‘Sales’, ‘Price Each’, ‘Quantity Ordered’]

for col_name in columns_to_check:

  df = df.filter(col(col_name) >= 0)

df.show()

# Calculate total sales per product

df.groupBy(“Product”).agg(sum(“Sales”).alias(“Total Sales”)).show()

3)

from pyspark.sql import *

from pyspark.sql.functions import *

from pyspark.sql.types import *

spark = SparkSession.builder.appName(“jobs”).getOrCreate()

jobs = spark.read.csv(“/content/Cleaned_DS_Jobs.csv”,header=True,inferSchema=True)

jobs = jobs.withColumn(“min_salary”,regexp_extract(col(“Salary Estimate”),r“(\d+)-(\d+)”,1))

jobs = jobs.withColumn(“max_salary”,regexp_extract(col(“Salary Estimate”),r“(\d+)-(\d+)”,2))

jobs = jobs.withColumn(“avg_sal”, (col(“min_salary”) + col(“max_salary”)) / 2) # Overwrite jobs DataFrame with the new column

jobs = jobs.withColumn(“Rating”,when((col(“Rating”)==0 )| (col(“Rating”)==-1),1).otherwise(col(“Rating”)))

for column in jobs.columns:

    jobs = jobs.withColumn(column, when(col(column).isNull(), -1).otherwise(col(column)))

jobs.write.csv(“jobs.csv”, header=True, mode=“overwrite”)

jobs.show()

grp_data = jobs.groupBy(“Job Title”).agg(avg(“avg_sal”).alias(“AVG_SAL”))

grp_data.show()

grp_cmp = jobs.groupBy(“Size”).agg(avg(“avg_sal”))

grp_cmp.show()

4)

from pyspark.sql import *

from pyspark.sql.functions import *

from pyspark.sql.types import *

spark = SparkSession.builder.appName(“DataCleaning”).getOrCreate()

df = spark.read.csv(“audible_uncleaned.csv”, header=True, inferSchema=True)

missing_counts = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])

missing_counts.show()

df = df.fillna({‘author’: ‘Unknown’, ‘narrator’: ‘Unknown’}).dropna(subset=[‘stars’])

df=df.filter((col(“price”)>=100) & (col(“price”)<=2000))

df = df.withColumn(“author”, regexp_replace(col(“author”), “^Writtenby:”, “”)) \

    .withColumn(“narrator”, regexp_replace(col(“narrator”), “^Narratedby:”, “”))

# df = df.withColumn(

#  “time_in_minutes”,

#  when(

#    col(“time”).rlike(r”\d+ hrs and \d+ mins”), # Case 1: Both hours and minutes

#    (split(col(“time”), ” hrs and “)[0].cast(“int”) * 60) +

#    split(col(“time”), ” hrs and “)[1].substr(1, 2).cast(“int”)

#  ).when(

#    col(“time”).rlike(r”\d+ hrs”), # Case 2: Only hours

#    split(col(“time”), ” hrs”)[0].cast(“int”) * 60

#  ).otherwise(0) # Default to 0 if format is invalid

# )

df = df.withColumn(

  “time_in_minutes”,

  when(

    col(“time”).rlike(r“\d+ hrs and \d+ mins”), # Case 1: Both hours and minutes

    regexp_extract(col(“time”), r“(\d+) hrs”, 1).cast(“int”) * 60# Extract hours

    regexp_extract(col(“time”), r“(\d+) mins”, 1).cast(“int”) # Extract minutes

  ).when(

    col(“time”).rlike(r“\d+ hrs”), # Case 2: Only hours

    regexp_extract(col(“time”), r“(\d+) hrs”, 1).cast(“int”) * 60

  ).when(

    col(“time”).rlike(r“\d+ mins”), # Case 3: Only minutes

    regexp_extract(col(“time”), r“(\d+) mins”, 1).cast(“int”)

  ).otherwise(0) # Default to 0 if format is invalid

)

df=df.drop(“time”)

df = df.withColumn(“releasedate”, to_date(col(“releasedate”), “dd-MM-yyyy”))

star_pattern = r“(\d+(\.\d+)?) out of 5 stars” # Extracts ‘5’ or ‘4.5’

rating_pattern = r“(\d+)\s+ratings” # Extracts ’34’, ’41’, etc.

# Extract stars and ratings into separate columns

df = df.withColumn(“extracted_stars”, regexp_extract(col(“stars”), star_pattern, 1).cast(“float”))

df = df.withColumn(“ratings”, regexp_extract(col(“stars”), rating_pattern, 1).cast(“int”))

# Replace ‘Not rated yet’ for stars if needed

df = df.withColumn(“extracted_stars”, when(col(“stars”).contains(“Not rated yet”), “Not rated yet”).otherwise(col(“extracted_stars”)))

df = df.withColumn(“ratings”, when(col(“stars”).contains(“Not rated yet”), “Not rated yet”).otherwise(col(“ratings”)))

df = df.withColumn(

  “price”,

  regexp_replace(col(“Price”), “[$,€£Rs ]+”, “”)

)

df = df.withColumn(“price”, col(“price”).cast(“int”))

grouped_df = df.groupBy(“language”).count()

grouped_df.show()

# df.show()

df.show()

df.write.csv(“cleaned_audiobooks.csv”, header=True, mode=“overwrite”)

5)

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

# Initialize Spark Session

spark = SparkSession.builder.appName(“CrimeDataCleaning”).getOrCreate()

# Load dataset

df = spark.read.csv(“crime_data.csv”, header=True, inferSchema=True)

# Identify missing values

missing_counts = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])

missing_counts.show()

# Handle missing values

df = df.fillna({

  ‘Vict Sex’: ‘Unknown’,

  ‘Vict Age’: 0,

  ‘Crime Code’: ‘000’,

  ‘AREA NAME’: ‘Unknown’

}).dropna()

# Remove outliers

df = df.filter((col(“Vict Age”) >= 0) & (col(“Vict Age”) <= 120))

# Standardize TIME OCC column

df = df.withColumn(“TIME_OCC”, lpad(col(“TIME OCC”).cast(StringType()), 4, ‘0’))

df = df.withColumn(“TIME_OCC”, concat_ws(“:”, col(“TIME_OCC”).substr(1,2), col(“TIME_OCC”).substr(3,2)))

df = df.drop(“TIME OCC”)

# Clean AREA NAME column

df = df.withColumn(“AREA NAME”, regexp_replace(col(“AREA NAME”), r“[\\/|//]”, “”))

# Remove time portion from Date Rptd column

# df = df.withColumn(“Date Rptd”, split(col(“Date Rptd”), ” “).getItem(0))

df = df.withColumn(“Date Rptd”, split(col(“Date Rptd”), ” “)[0])

# Standardize Date Rptd column

df = df.withColumn(

  “Date Rptd”,

  coalesce(

    to_date(col(“Date Rptd”), “dd-MM-yyyy”),

    to_date(col(“Date Rptd”), “MM/dd/yyyy”)

  )

)

df = df.withColumn(“Date Rptd”, date_format(col(“Date Rptd”), “yyyy-MM-dd”))

# df = df.withColumn(“DATE OCC”, split(col(“DATE OCC”), ” “).getItem(0))

df = df.withColumn(“DATE OCC”, split(col(“DATE OCC”), ” “)[0])

# Standardize Date Rptd column

df = df.withColumn(

  “DATE OCC”,

  coalesce(

    to_date(col(“DATE OCC”), “dd-MM-yyyy”),

    to_date(col(“DATE OCC”), “MM/dd/yyyy”)

  )

)

df = df.withColumn(“DATE OCC”, date_format(col(“DATE OCC”), “yyyy-MM-dd”))

# Clean Crime Code column

df = df.withColumn(“Crime Code”, regexp_extract(col(“Crime Code”), r“(\d+)”, 1))

# Create derived columns

df = df.withColumn(“Year”, year(col(“Date Rptd”)))

df = df.withColumn(“Month”, month(col(“Date Rptd”)))

# Group by AREA NAME and count crimes

grouped_df = df.groupBy(“AREA NAME”).count()

grouped_df.show()

# Save cleaned data

df.write.csv(“cleaned_crime_data.csv”, header=True, mode=“overwrite”)

# Show cleaned data

df.show()

6)

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, mean, expr, median, when, regexp_replace, regexp_extract, split, lower, trim

# Initialize Spark session

spark = SparkSession.builder.appName(“DataCleaning”).getOrCreate()

# Load the dataset

file_path = “movie_dataset_uncleaned.csv” # Update with your actual path

df = spark.read.csv(file_path, header=True, inferSchema=True)

# Remove parentheses from Year column

df = df.withColumn(“Year”, regexp_replace(col(“Year”), r“[()]”, “”))

# Handle negative Year cases (-YYYY means End_Year is YYYY and Start_Year should be YYYY – 5)

df = df.withColumn(“End_Year”, when(col(“Year”).rlike(r“^-\d{4}$”), regexp_extract(col(“Year”), r“-(\d{4})”, 1).cast(“int”)))

df = df.withColumn(“Start_Year”, when(col(“End_Year”).isNotNull(), col(“End_Year”)5))

# Process other Year cases

df = df.withColumn(“Start_Year”, when(col(“Start_Year”).isNull(), regexp_extract(col(“Year”), r“(\d{4})”, 1).cast(“int”)).otherwise(col(“Start_Year”)))

df = df.withColumn(“End_Year”, when(col(“End_Year”).isNull(), regexp_extract(col(“Year”), r“(\d{4})$”, 1).cast(“int”)).otherwise(col(“End_Year”)))

# If Start_Year is missing but End_Year exists, calculate Start_Year as End_Year – 5

df = df.withColumn(“Start_Year”, when(col(“Start_Year”).isNull(), col(“End_Year”)5).otherwise(col(“Start_Year”)))

# If End_Year is missing but Start_Year exists, calculate End_Year as Start_Year + 5

df = df.withColumn(“End_Year”, when(col(“End_Year”).isNull(), col(“Start_Year”) + 5).otherwise(col(“End_Year”)))

# Remove rows where both Start_Year and End_Year are missing

df = df.dropna(subset=[“Start_Year”, “End_Year”])

# Handle missing values

df = df.fillna({‘Rating’: df.select(mean(col(“Rating”))).collect()[0][0]})

df = df.fillna({‘Votes’: df.select(median(col(“Votes”))).collect()[0][0]})

df = df.fillna({‘RunTime’: 0})

df = df.fillna({‘Genre’: ‘Unknown’})

df = df.fillna({‘Gross’: ‘$0.00M’})

# Remove outliers in Rating column (ensure values are between 1 and 10)

df = df.filter((col(“Rating”) >= 1) & (col(“Rating”) <= 10))

# Clean Genre Column

# df = df.withColumn(“Final_Genre”, trim(lower(split(col(“Genre”), “\$”)[0])))

df = df.withColumn(“Final_Genre”, split(col(“Genre”), “\$”)[0])

df = df.withColumn(“Final_Genre”, when(col(“Final_Genre”).isNull(), “unknown”).otherwise(col(“Final_Genre”)))

# Save cleaned data with overwrite mode

df.write.csv(“cleaned_movie_dataset.csv”, header=True, mode=“overwrite”)

# Show sample cleaned data

df.show()

7)

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

# Initialize Spark Session

spark = SparkSession.builder.appName(“DataCleaning”).getOrCreate()

# Load dataset

df = spark.read.csv(“phone_usage_india.csv”, header=True, inferSchema=True)

# 1. Remove Outliers in the Age Column

df = df.filter((col(“Age”) >= 18) & (col(“Age”) <= 90))

# 2. Add a new column named Age_Category

df = df.withColumn(

  “Age_Category”,

  when((col(“Age”) >= 18) & (col(“Age”) <= 35), “Young”)

  .when((col(“Age”) >= 36) & (col(“Age”) <= 60), “Middle-aged”)

  .when((col(“Age”) >= 61) & (col(“Age”) <= 90), “Senior”)

  .otherwise(“Unknown”)

)

# 3. Replace missing values with “Unknown” for Location column

df = df.fillna({“Location”: “Unknown”})

# 4. Convert Screen Time to Minutes

df = df.withColumn(“Screen_Time_Minutes”, col(“Screen Time (hrs/day)”) * 60)

df = df.withColumn(“Social_Media_Time_Minutes”, col(“Social Media Time (hrs/day)”) * 60)

df = df.withColumn(“Streaming_Time_Minutes”, col(“Streaming Time (hrs/day)”) * 60)

df = df.withColumn(“Gaming_Time_Minutes”, col(“Gaming Time (hrs/day)”) * 60)

# 5. Add a new column named Screen_Time_Category

df = df.withColumn(

  “Screen_Time_Category”,

  when(col(“Screen_Time_Minutes”) <= 180, “Low Usage”)

  .when((col(“Screen_Time_Minutes”) > 180) & (col(“Screen_Time_Minutes”) <= 360), “Moderate Usage”)

  .otherwise(“High Usage”)

)

# 6. Group by “Primary Use” and aggregate “Monthly Recharge Cost (INR)”

grouped_df = df.groupBy(“Primary Use”).agg(

  sum(“Monthly Recharge Cost (INR)”).alias(“Total_Monthly_Recharge”)

)

# Show the results

df.show() # Display the cleaned dataset

grouped_df.show() # Display the aggregated results

# Optionally, save the cleaned data to a CSV file

df.write.csv(“cleaned_data.csv”, header=True, mode=“overwrite”)

8)

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

# Initialize Spark Session

spark = SparkSession.builder.appName(“EmployeeDataCleaning”).getOrCreate()

# Load dataset

df = spark.read.csv(“HR_Analytics.csv”, header=True, inferSchema=True)

# 1. Handle missing values in BusinessTravel, DistanceFromHome, and EducationField columns

# Replace nulls in BusinessTravel with “Unknown”

df = df.fillna({“BusinessTravel”: “Unknown”})

# Replace nulls in DistanceFromHome with the mean value of the column

mean_distance = df.select(mean(col(“DistanceFromHome”))).collect()[0][0]

df = df.fillna({“DistanceFromHome”: mean_distance})

# Replace nulls in EducationField with “other”

df = df.fillna({“EducationField”: “other”})

# 2. Check for duplicate rows based on EmpID and remove them

df = df.dropDuplicates(subset=[“EmpID”])

# 3. Remove outliers in the DistanceFromHome column (values > 50)

df = df.filter(col(“DistanceFromHome”) <= 50)

# 4. Add a new column PercentSalaryHike_Category categorizing PercentSalaryHike

df = df.withColumn(

  “PercentSalaryHike_Category”,

  when((col(“PercentSalaryHike”) >= 0) & (col(“PercentSalaryHike”) <= 10), “Low”)

  .when((col(“PercentSalaryHike”) >= 11) & (col(“PercentSalaryHike”) <= 20), “Moderate”)

  .otherwise(“High”)

)

# 5. Standardize the JobSatisfaction column with descriptive categories

df = df.withColumn(

  “JobSatisfaction”,

  when(col(“JobSatisfaction”) == 1, “Poor”)

  .when(col(“JobSatisfaction”) == 2, “Below Average”)

  .when(col(“JobSatisfaction”) == 3, “Average”)

  .when(col(“JobSatisfaction”) == 4, “Excellent”)

  .otherwise(“Unknown”)

)

# 6. Transform the SalarySlab column by removing “k” and converting to numerical values

df = df.withColumn(“SalarySlab”, regexp_replace(col(“SalarySlab”), “k”, “”).cast(“int”) * 1000)

df = df.drop(“_c15”, “_c16”, “_c17”, “_c18”)

# 7. Save the cleaned dataset in CSV format

df.write.csv(“cleaned_employee_data.csv”, header=True, mode=“overwrite”)

# Show the cleaned data

df.show()

9)

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

# Initialize Spark Session

spark = SparkSession.builder.appName(“StudentDataCleaning”).getOrCreate()

# Load the dataset into a PySpark DataFrame

df = spark.read.csv(“student-info.csv”, header=True, inferSchema=True)

# Show the initial DataFrame to verify proper loading

df.show()

# 1. Replace binary values in the ‘school’ column

df = df.withColumn(“school”, when(col(“school”) == “GP”, “Gabriel Pereira”)

                .when(col(“school”) == “MS”, “Mousinho da Silveira”)

                .otherwise(col(“school”)))

# 2. Replace binary values in the ‘address’ column

df = df.withColumn(“address”, when(col(“address”) == “U”, “Urban”)

                 .when(col(“address”) == “R”, “Rural”)

                 .otherwise(col(“address”)))

# 3. Replace binary values in the ‘family_size’ column

df = df.withColumn(“family_size”, when(col(“family_size”) == “LE3”, “Less or Equal to 3”)

                    .when(col(“family_size”) == “GT3”, “Greater than 3”)

                    .otherwise(col(“family_size”)))

# 4. Replace numeric values in the ‘mother_edu’ column

df = df.withColumn(“mother_edu”, when(col(“mother_edu”) == 0, “None”)

                   .when(col(“mother_edu”) == 1, “Primary Education (4th Grade)”)

                   .when(col(“mother_edu”) == 2, “5th to 9th Grade”)

                   .when(col(“mother_edu”) == 3, “Secondary Education”)

                   .when(col(“mother_edu”) == 4, “Higher Education”)

                   .otherwise(col(“mother_edu”)))

# Perform the same transformation for the ‘father_edu’ column

df = df.withColumn(“father_edu”, when(col(“father_edu”) == 0, “None”)

                   .when(col(“father_edu”) == 1, “Primary Education (4th Grade)”)

                   .when(col(“father_edu”) == 2, “5th to 9th Grade”)

                   .when(col(“father_edu”) == 3, “Secondary Education”)

                   .when(col(“father_edu”) == 4, “Higher Education”)

                   .otherwise(col(“father_edu”)))

# 5. Handle missing values in the ‘mother_job’ column by replacing all missing entries with ‘other’

df = df.fillna({“mother_job”: “other”})

# 6. Replace numeric values in the ‘traveltime’ column with descriptive labels

df = df.withColumn(“traveltime”, when(col(“traveltime”) == 1, “15 min”)

                    .when(col(“traveltime”) == 2, “30 min”)

                    .when(col(“traveltime”) == 3, “45 min”)

                    .when(col(“traveltime”) == 4, “60 min”)

                    .otherwise(col(“traveltime”)))

# 7. Handle missing values in the ‘internet’ column by replacing all missing entries with ‘unknown’

df = df.fillna({“internet”: “unknown”})

# 8. Remove rows in the ‘age’ column that contain outliers (age not in the range of 15 to 22)

df = df.filter((col(“age”) >= 15) & (col(“age”) <= 22))

# Show the cleaned data

df.show()

# Optionally, save the cleaned data to a CSV file

df.write.csv(“cleaned_student_data.csv”, header=True, mode=“overwrite”)

10)

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql import functions as F

# Initialize Spark Session

spark = SparkSession.builder.appName(“SalesDataCleaning”).getOrCreate()

# Load the dataset into a PySpark DataFrame

df = spark.read.csv(“Real_estate.csv”, header=True, inferSchema=True)

# Show the initial DataFrame to verify proper loading

df.show()

# 1. Standardize the Date Recorded column to a consistent format (YYYY-MM-DD)

df = df.withColumn(

  “Date Recorded”,

  coalesce(

    to_date(col(“Date Recorded”), “MM-dd-yyyy”), # Try converting MM-DD-YYYY format

    to_date(col(“Date Recorded”), “MM/dd/yyyy”) # If failed, try MM/DD/YYYY format

  )

)

# Standardize the date format to YYYY-MM-DD

df = df.withColumn(“Date Recorded”, date_format(col(“Date Recorded”), “yyyy-MM-dd”))

# 2. Calculate the Sales Ratio for each row and add it as a new column

df = df.withColumn(“Sales_Ratio”, col(“Assessed Value”) / col(“Sale Amount”))

# 3. Handle missing values in the Residential Type column by replacing all missing entries with ‘Unknown’

df = df.fillna({“Residential Type”: “Unknown”})

# 4. Remove rows in the List Year column that contain outliers, retaining only rows where the year is between 2001 and 2022

df = df.filter((col(“List Year”) >= 2001) & (col(“List Year”) <= 2022))

# 5. Handle missing values in the Town column by replacing all missing entries with the mode (most frequent value)

mode_value = df.groupBy(“Town”).count().orderBy(desc(“count”)).first()[0]

df = df.fillna({“Town”: mode_value})

# Show the cleaned data

df.show()

# Optionally, save the cleaned data to a CSV file

df.write.csv(“cleaned_sales_data.csv”, header=True, mode=“overwrite”)

Paste text,images,html and share with anyone
Scroll to Top