1)
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName(“Emp”).getOrCreate()
emp = spark.read.csv(“emp_data.csv”,header=True,inferSchema=True)
emp.show()
missing_counts = emp.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in emp.columns])
missing_counts.show()
emp = emp.fillna({“LastName”: “Unknown”})
emp = emp.dropna(subset=[“EmpID”, “StartDate”])
emp=emp.filter((col(“Current Employee Rating”) >= 1) & (col(“Current Employee Rating”) <= 5))
emp = emp.filter(length(col(“LocationCode”)) <= 6)
emp = emp.dropDuplicates()
df =emp.groupBy(“DepartmentType”,“Title”).count().orderBy(“DepartmentType”,“Title”)
df.show()
emp.show()
#This is an easier code for the last question, however it does not use max like asked in the question. Study at your own risk.
df = emp.withColumn(
“Performance Score”,
when(col(“Performance Score”) == “Exceeds”, 3)
.when(col(“Performance Score”) == “Fully Meets”, 2)
.when(col(“Performance Score”) == “Needs Improvement”, 1)
.otherwise(0)
)
highest_perf = (
df.orderBy(“DepartmentType”, col(“Performance Score”).desc())
.dropDuplicates([“DepartmentType”])
.select(“DepartmentType”, “EmpID”, “FirstName”, “LastName”, “Performance Score”)
)
highest_perf.show()
#This is the correct code, however it is harder.
# Step 1: Map performance scores to numeric values
df = emp.withColumn(
“Performance Score”,
when(col(“Performance Score”) == “Exceeds”, 3)
.when(col(“Performance Score”) == “Fully Meets”, 2)
.when(col(“Performance Score”) == “Needs Improvement”, 1)
.otherwise(0)
)
# Step 2: Find the maximum performance score for each department
max_scores = df.groupBy(“DepartmentType”).agg(max(“Performance Score”).alias(“MaxPerformanceScore”))
# Step 3: Alias DataFrames and join with qualified column names
df_alias = df.alias(“df”)
max_scores_alias = max_scores.alias(“max_scores”)
highest_perf = df_alias.join(
max_scores_alias,
(col(“df.DepartmentType”) == col(“max_scores.DepartmentType”)) &
(col(“df.Performance Score”) == col(“max_scores.MaxPerformanceScore”))
).select(
col(“df.DepartmentType”),
col(“df.EmpID”),
col(“df.FirstName”),
col(“df.LastName”),
col(“df.Performance Score”)
)
# Step 4: Drop duplicates based on DepartmentType
highest_perf = highest_perf.dropDuplicates([“DepartmentType”])
# Step 5: Show the results
highest_perf.show()
2)
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize Spark session
spark = SparkSession.builder.appName(“DataCleaning”).getOrCreate()
# Load dataset (replace ‘path_to_file’ with the actual path)
df = spark.read.csv(“Sales Data.csv”, header=True, inferSchema=True)
df.select(
[count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns]).show()
# Decide whether to fill or drop rows with missing values
# For numerical columns, replace nulls with mean values
numerical_columns = [‘Sales’, ‘Quantity Ordered’]
for col_name in numerical_columns:
mean_value = df.select(mean(col_name)).collect()[0][0]
df = df.fillna({col_name: mean_value})
# Drop rows with null values in critical columns
df = df.dropna()
df = df.dropDuplicates()
# Cast numerical columns to appropriate types
df = df.withColumn(“Sales”, col(“Sales”).cast(“float”))
df = df.withColumn(“Quantity Ordered”, col(“Quantity Ordered”).cast(“integer”))
df = df.withColumn(“Price Each”, col(“Price Each”).cast(“float”))
# Verify schema
df.printSchema()
# Remove rows with negative values in specific columns
columns_to_check = [‘Sales’, ‘Price Each’, ‘Quantity Ordered’]
for col_name in columns_to_check:
df = df.filter(col(col_name) >= 0)
df.show()
# Calculate total sales per product
df.groupBy(“Product”).agg(sum(“Sales”).alias(“Total Sales”)).show()
3)
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName(“jobs”).getOrCreate()
jobs = spark.read.csv(“/content/Cleaned_DS_Jobs.csv”,header=True,inferSchema=True)
jobs = jobs.withColumn(“min_salary”,regexp_extract(col(“Salary Estimate”),r“(\d+)-(\d+)”,1))
jobs = jobs.withColumn(“max_salary”,regexp_extract(col(“Salary Estimate”),r“(\d+)-(\d+)”,2))
jobs = jobs.withColumn(“avg_sal”, (col(“min_salary”) + col(“max_salary”)) / 2) # Overwrite jobs DataFrame with the new column
jobs = jobs.withColumn(“Rating”,when((col(“Rating”)==0 )| (col(“Rating”)==-1),1).otherwise(col(“Rating”)))
for column in jobs.columns:
jobs = jobs.withColumn(column, when(col(column).isNull(), -1).otherwise(col(column)))
jobs.write.csv(“jobs.csv”, header=True, mode=“overwrite”)
jobs.show()
grp_data = jobs.groupBy(“Job Title”).agg(avg(“avg_sal”).alias(“AVG_SAL”))
grp_data.show()
grp_cmp = jobs.groupBy(“Size”).agg(avg(“avg_sal”))
grp_cmp.show()
4)
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName(“DataCleaning”).getOrCreate()
df = spark.read.csv(“audible_uncleaned.csv”, header=True, inferSchema=True)
missing_counts = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
missing_counts.show()
df = df.fillna({‘author’: ‘Unknown’, ‘narrator’: ‘Unknown’}).dropna(subset=[‘stars’])
df=df.filter((col(“price”)>=100) & (col(“price”)<=2000))
df = df.withColumn(“author”, regexp_replace(col(“author”), “^Writtenby:”, “”)) \
.withColumn(“narrator”, regexp_replace(col(“narrator”), “^Narratedby:”, “”))
# df = df.withColumn(
# “time_in_minutes”,
# when(
# col(“time”).rlike(r”\d+ hrs and \d+ mins”), # Case 1: Both hours and minutes
# (split(col(“time”), ” hrs and “)[0].cast(“int”) * 60) +
# split(col(“time”), ” hrs and “)[1].substr(1, 2).cast(“int”)
# ).when(
# col(“time”).rlike(r”\d+ hrs”), # Case 2: Only hours
# split(col(“time”), ” hrs”)[0].cast(“int”) * 60
# ).otherwise(0) # Default to 0 if format is invalid
# )
df = df.withColumn(
“time_in_minutes”,
when(
col(“time”).rlike(r“\d+ hrs and \d+ mins”), # Case 1: Both hours and minutes
regexp_extract(col(“time”), r“(\d+) hrs”, 1).cast(“int”) * 60 + # Extract hours
regexp_extract(col(“time”), r“(\d+) mins”, 1).cast(“int”) # Extract minutes
).when(
col(“time”).rlike(r“\d+ hrs”), # Case 2: Only hours
regexp_extract(col(“time”), r“(\d+) hrs”, 1).cast(“int”) * 60
).when(
col(“time”).rlike(r“\d+ mins”), # Case 3: Only minutes
regexp_extract(col(“time”), r“(\d+) mins”, 1).cast(“int”)
).otherwise(0) # Default to 0 if format is invalid
)
df=df.drop(“time”)
df = df.withColumn(“releasedate”, to_date(col(“releasedate”), “dd-MM-yyyy”))
star_pattern = r“(\d+(\.\d+)?) out of 5 stars” # Extracts ‘5’ or ‘4.5’
rating_pattern = r“(\d+)\s+ratings” # Extracts ’34’, ’41’, etc.
# Extract stars and ratings into separate columns
df = df.withColumn(“extracted_stars”, regexp_extract(col(“stars”), star_pattern, 1).cast(“float”))
df = df.withColumn(“ratings”, regexp_extract(col(“stars”), rating_pattern, 1).cast(“int”))
# Replace ‘Not rated yet’ for stars if needed
df = df.withColumn(“extracted_stars”, when(col(“stars”).contains(“Not rated yet”), “Not rated yet”).otherwise(col(“extracted_stars”)))
df = df.withColumn(“ratings”, when(col(“stars”).contains(“Not rated yet”), “Not rated yet”).otherwise(col(“ratings”)))
df = df.withColumn(
“price”,
regexp_replace(col(“Price”), “[$,€£Rs ]+”, “”)
)
df = df.withColumn(“price”, col(“price”).cast(“int”))
grouped_df = df.groupBy(“language”).count()
grouped_df.show()
# df.show()
df.show()
df.write.csv(“cleaned_audiobooks.csv”, header=True, mode=“overwrite”)
5)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize Spark Session
spark = SparkSession.builder.appName(“CrimeDataCleaning”).getOrCreate()
# Load dataset
df = spark.read.csv(“crime_data.csv”, header=True, inferSchema=True)
# Identify missing values
missing_counts = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
missing_counts.show()
# Handle missing values
df = df.fillna({
‘Vict Sex’: ‘Unknown’,
‘Vict Age’: 0,
‘Crime Code’: ‘000’,
‘AREA NAME’: ‘Unknown’
}).dropna()
# Remove outliers
df = df.filter((col(“Vict Age”) >= 0) & (col(“Vict Age”) <= 120))
# Standardize TIME OCC column
df = df.withColumn(“TIME_OCC”, lpad(col(“TIME OCC”).cast(StringType()), 4, ‘0’))
df = df.withColumn(“TIME_OCC”, concat_ws(“:”, col(“TIME_OCC”).substr(1,2), col(“TIME_OCC”).substr(3,2)))
df = df.drop(“TIME OCC”)
# Clean AREA NAME column
df = df.withColumn(“AREA NAME”, regexp_replace(col(“AREA NAME”), r“[\\/|//]”, “”))
# Remove time portion from Date Rptd column
# df = df.withColumn(“Date Rptd”, split(col(“Date Rptd”), ” “).getItem(0))
df = df.withColumn(“Date Rptd”, split(col(“Date Rptd”), ” “)[0])
# Standardize Date Rptd column
df = df.withColumn(
“Date Rptd”,
coalesce(
to_date(col(“Date Rptd”), “dd-MM-yyyy”),
to_date(col(“Date Rptd”), “MM/dd/yyyy”)
)
)
df = df.withColumn(“Date Rptd”, date_format(col(“Date Rptd”), “yyyy-MM-dd”))
# df = df.withColumn(“DATE OCC”, split(col(“DATE OCC”), ” “).getItem(0))
df = df.withColumn(“DATE OCC”, split(col(“DATE OCC”), ” “)[0])
# Standardize Date Rptd column
df = df.withColumn(
“DATE OCC”,
coalesce(
to_date(col(“DATE OCC”), “dd-MM-yyyy”),
to_date(col(“DATE OCC”), “MM/dd/yyyy”)
)
)
df = df.withColumn(“DATE OCC”, date_format(col(“DATE OCC”), “yyyy-MM-dd”))
# Clean Crime Code column
df = df.withColumn(“Crime Code”, regexp_extract(col(“Crime Code”), r“(\d+)”, 1))
# Create derived columns
df = df.withColumn(“Year”, year(col(“Date Rptd”)))
df = df.withColumn(“Month”, month(col(“Date Rptd”)))
# Group by AREA NAME and count crimes
grouped_df = df.groupBy(“AREA NAME”).count()
grouped_df.show()
# Save cleaned data
df.write.csv(“cleaned_crime_data.csv”, header=True, mode=“overwrite”)
# Show cleaned data
df.show()
6)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, expr, median, when, regexp_replace, regexp_extract, split, lower, trim
# Initialize Spark session
spark = SparkSession.builder.appName(“DataCleaning”).getOrCreate()
# Load the dataset
file_path = “movie_dataset_uncleaned.csv” # Update with your actual path
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Remove parentheses from Year column
df = df.withColumn(“Year”, regexp_replace(col(“Year”), r“[()]”, “”))
# Handle negative Year cases (-YYYY means End_Year is YYYY and Start_Year should be YYYY – 5)
df = df.withColumn(“End_Year”, when(col(“Year”).rlike(r“^-\d{4}$”), regexp_extract(col(“Year”), r“-(\d{4})”, 1).cast(“int”)))
df = df.withColumn(“Start_Year”, when(col(“End_Year”).isNotNull(), col(“End_Year”) – 5))
# Process other Year cases
df = df.withColumn(“Start_Year”, when(col(“Start_Year”).isNull(), regexp_extract(col(“Year”), r“(\d{4})”, 1).cast(“int”)).otherwise(col(“Start_Year”)))
df = df.withColumn(“End_Year”, when(col(“End_Year”).isNull(), regexp_extract(col(“Year”), r“(\d{4})$”, 1).cast(“int”)).otherwise(col(“End_Year”)))
# If Start_Year is missing but End_Year exists, calculate Start_Year as End_Year – 5
df = df.withColumn(“Start_Year”, when(col(“Start_Year”).isNull(), col(“End_Year”) – 5).otherwise(col(“Start_Year”)))
# If End_Year is missing but Start_Year exists, calculate End_Year as Start_Year + 5
df = df.withColumn(“End_Year”, when(col(“End_Year”).isNull(), col(“Start_Year”) + 5).otherwise(col(“End_Year”)))
# Remove rows where both Start_Year and End_Year are missing
df = df.dropna(subset=[“Start_Year”, “End_Year”])
# Handle missing values
df = df.fillna({‘Rating’: df.select(mean(col(“Rating”))).collect()[0][0]})
df = df.fillna({‘Votes’: df.select(median(col(“Votes”))).collect()[0][0]})
df = df.fillna({‘RunTime’: 0})
df = df.fillna({‘Genre’: ‘Unknown’})
df = df.fillna({‘Gross’: ‘$0.00M’})
# Remove outliers in Rating column (ensure values are between 1 and 10)
df = df.filter((col(“Rating”) >= 1) & (col(“Rating”) <= 10))
# Clean Genre Column
# df = df.withColumn(“Final_Genre”, trim(lower(split(col(“Genre”), “\$”)[0])))
df = df.withColumn(“Final_Genre”, split(col(“Genre”), “\$”)[0])
df = df.withColumn(“Final_Genre”, when(col(“Final_Genre”).isNull(), “unknown”).otherwise(col(“Final_Genre”)))
# Save cleaned data with overwrite mode
df.write.csv(“cleaned_movie_dataset.csv”, header=True, mode=“overwrite”)
# Show sample cleaned data
df.show()
7)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize Spark Session
spark = SparkSession.builder.appName(“DataCleaning”).getOrCreate()
# Load dataset
df = spark.read.csv(“phone_usage_india.csv”, header=True, inferSchema=True)
# 1. Remove Outliers in the Age Column
df = df.filter((col(“Age”) >= 18) & (col(“Age”) <= 90))
# 2. Add a new column named Age_Category
df = df.withColumn(
“Age_Category”,
when((col(“Age”) >= 18) & (col(“Age”) <= 35), “Young”)
.when((col(“Age”) >= 36) & (col(“Age”) <= 60), “Middle-aged”)
.when((col(“Age”) >= 61) & (col(“Age”) <= 90), “Senior”)
.otherwise(“Unknown”)
)
# 3. Replace missing values with “Unknown” for Location column
df = df.fillna({“Location”: “Unknown”})
# 4. Convert Screen Time to Minutes
df = df.withColumn(“Screen_Time_Minutes”, col(“Screen Time (hrs/day)”) * 60)
df = df.withColumn(“Social_Media_Time_Minutes”, col(“Social Media Time (hrs/day)”) * 60)
df = df.withColumn(“Streaming_Time_Minutes”, col(“Streaming Time (hrs/day)”) * 60)
df = df.withColumn(“Gaming_Time_Minutes”, col(“Gaming Time (hrs/day)”) * 60)
# 5. Add a new column named Screen_Time_Category
df = df.withColumn(
“Screen_Time_Category”,
when(col(“Screen_Time_Minutes”) <= 180, “Low Usage”)
.when((col(“Screen_Time_Minutes”) > 180) & (col(“Screen_Time_Minutes”) <= 360), “Moderate Usage”)
.otherwise(“High Usage”)
)
# 6. Group by “Primary Use” and aggregate “Monthly Recharge Cost (INR)”
grouped_df = df.groupBy(“Primary Use”).agg(
sum(“Monthly Recharge Cost (INR)”).alias(“Total_Monthly_Recharge”)
)
# Show the results
df.show() # Display the cleaned dataset
grouped_df.show() # Display the aggregated results
# Optionally, save the cleaned data to a CSV file
df.write.csv(“cleaned_data.csv”, header=True, mode=“overwrite”)
8)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Initialize Spark Session
spark = SparkSession.builder.appName(“EmployeeDataCleaning”).getOrCreate()
# Load dataset
df = spark.read.csv(“HR_Analytics.csv”, header=True, inferSchema=True)
# 1. Handle missing values in BusinessTravel, DistanceFromHome, and EducationField columns
# Replace nulls in BusinessTravel with “Unknown”
df = df.fillna({“BusinessTravel”: “Unknown”})
# Replace nulls in DistanceFromHome with the mean value of the column
mean_distance = df.select(mean(col(“DistanceFromHome”))).collect()[0][0]
df = df.fillna({“DistanceFromHome”: mean_distance})
# Replace nulls in EducationField with “other”
df = df.fillna({“EducationField”: “other”})
# 2. Check for duplicate rows based on EmpID and remove them
df = df.dropDuplicates(subset=[“EmpID”])
# 3. Remove outliers in the DistanceFromHome column (values > 50)
df = df.filter(col(“DistanceFromHome”) <= 50)
# 4. Add a new column PercentSalaryHike_Category categorizing PercentSalaryHike
df = df.withColumn(
“PercentSalaryHike_Category”,
when((col(“PercentSalaryHike”) >= 0) & (col(“PercentSalaryHike”) <= 10), “Low”)
.when((col(“PercentSalaryHike”) >= 11) & (col(“PercentSalaryHike”) <= 20), “Moderate”)
.otherwise(“High”)
)
# 5. Standardize the JobSatisfaction column with descriptive categories
df = df.withColumn(
“JobSatisfaction”,
when(col(“JobSatisfaction”) == 1, “Poor”)
.when(col(“JobSatisfaction”) == 2, “Below Average”)
.when(col(“JobSatisfaction”) == 3, “Average”)
.when(col(“JobSatisfaction”) == 4, “Excellent”)
.otherwise(“Unknown”)
)
# 6. Transform the SalarySlab column by removing “k” and converting to numerical values
df = df.withColumn(“SalarySlab”, regexp_replace(col(“SalarySlab”), “k”, “”).cast(“int”) * 1000)
df = df.drop(“_c15”, “_c16”, “_c17”, “_c18”)
# 7. Save the cleaned dataset in CSV format
df.write.csv(“cleaned_employee_data.csv”, header=True, mode=“overwrite”)
# Show the cleaned data
df.show()
9)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark Session
spark = SparkSession.builder.appName(“StudentDataCleaning”).getOrCreate()
# Load the dataset into a PySpark DataFrame
df = spark.read.csv(“student-info.csv”, header=True, inferSchema=True)
# Show the initial DataFrame to verify proper loading
df.show()
# 1. Replace binary values in the ‘school’ column
df = df.withColumn(“school”, when(col(“school”) == “GP”, “Gabriel Pereira”)
.when(col(“school”) == “MS”, “Mousinho da Silveira”)
.otherwise(col(“school”)))
# 2. Replace binary values in the ‘address’ column
df = df.withColumn(“address”, when(col(“address”) == “U”, “Urban”)
.when(col(“address”) == “R”, “Rural”)
.otherwise(col(“address”)))
# 3. Replace binary values in the ‘family_size’ column
df = df.withColumn(“family_size”, when(col(“family_size”) == “LE3”, “Less or Equal to 3”)
.when(col(“family_size”) == “GT3”, “Greater than 3”)
.otherwise(col(“family_size”)))
# 4. Replace numeric values in the ‘mother_edu’ column
df = df.withColumn(“mother_edu”, when(col(“mother_edu”) == 0, “None”)
.when(col(“mother_edu”) == 1, “Primary Education (4th Grade)”)
.when(col(“mother_edu”) == 2, “5th to 9th Grade”)
.when(col(“mother_edu”) == 3, “Secondary Education”)
.when(col(“mother_edu”) == 4, “Higher Education”)
.otherwise(col(“mother_edu”)))
# Perform the same transformation for the ‘father_edu’ column
df = df.withColumn(“father_edu”, when(col(“father_edu”) == 0, “None”)
.when(col(“father_edu”) == 1, “Primary Education (4th Grade)”)
.when(col(“father_edu”) == 2, “5th to 9th Grade”)
.when(col(“father_edu”) == 3, “Secondary Education”)
.when(col(“father_edu”) == 4, “Higher Education”)
.otherwise(col(“father_edu”)))
# 5. Handle missing values in the ‘mother_job’ column by replacing all missing entries with ‘other’
df = df.fillna({“mother_job”: “other”})
# 6. Replace numeric values in the ‘traveltime’ column with descriptive labels
df = df.withColumn(“traveltime”, when(col(“traveltime”) == 1, “15 min”)
.when(col(“traveltime”) == 2, “30 min”)
.when(col(“traveltime”) == 3, “45 min”)
.when(col(“traveltime”) == 4, “60 min”)
.otherwise(col(“traveltime”)))
# 7. Handle missing values in the ‘internet’ column by replacing all missing entries with ‘unknown’
df = df.fillna({“internet”: “unknown”})
# 8. Remove rows in the ‘age’ column that contain outliers (age not in the range of 15 to 22)
df = df.filter((col(“age”) >= 15) & (col(“age”) <= 22))
# Show the cleaned data
df.show()
# Optionally, save the cleaned data to a CSV file
df.write.csv(“cleaned_student_data.csv”, header=True, mode=“overwrite”)
10)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
# Initialize Spark Session
spark = SparkSession.builder.appName(“SalesDataCleaning”).getOrCreate()
# Load the dataset into a PySpark DataFrame
df = spark.read.csv(“Real_estate.csv”, header=True, inferSchema=True)
# Show the initial DataFrame to verify proper loading
df.show()
# 1. Standardize the Date Recorded column to a consistent format (YYYY-MM-DD)
df = df.withColumn(
“Date Recorded”,
coalesce(
to_date(col(“Date Recorded”), “MM-dd-yyyy”), # Try converting MM-DD-YYYY format
to_date(col(“Date Recorded”), “MM/dd/yyyy”) # If failed, try MM/DD/YYYY format
)
)
# Standardize the date format to YYYY-MM-DD
df = df.withColumn(“Date Recorded”, date_format(col(“Date Recorded”), “yyyy-MM-dd”))
# 2. Calculate the Sales Ratio for each row and add it as a new column
df = df.withColumn(“Sales_Ratio”, col(“Assessed Value”) / col(“Sale Amount”))
# 3. Handle missing values in the Residential Type column by replacing all missing entries with ‘Unknown’
df = df.fillna({“Residential Type”: “Unknown”})
# 4. Remove rows in the List Year column that contain outliers, retaining only rows where the year is between 2001 and 2022
df = df.filter((col(“List Year”) >= 2001) & (col(“List Year”) <= 2022))
# 5. Handle missing values in the Town column by replacing all missing entries with the mode (most frequent value)
mode_value = df.groupBy(“Town”).count().orderBy(desc(“count”)).first()[0]
df = df.fillna({“Town”: mode_value})
# Show the cleaned data
df.show()
# Optionally, save the cleaned data to a CSV file
df.write.csv(“cleaned_sales_data.csv”, header=True, mode=“overwrite”)