📘 Practical 2
Title: NASA Web Log Analysis using PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, min, max, to_date, hour
import re

# -------------------------------
# 1. Create Spark Session
# -------------------------------
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("NASAWebLogEDA") \
    .getOrCreate()

# -------------------------------
# 2. Load Log File
# -------------------------------
log_file_path = "/home/pl2/a2_39/"
logs_rdd = spark.sparkContext.textFile(log_file_path)

# -------------------------------
# 3. Define Log Pattern (Regex)
# -------------------------------
log_pattern = r'^(\S+) (\S+) (\S+) \[(.*?)\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+)'

# -------------------------------
# 4. Parse Each Line
# -------------------------------
def parse_line(line):
    match = re.match(log_pattern, line)
    if match:
        host, _, _, datetime, method, endpoint, protocol, status, content_size = match.groups()
        
        if content_size == '-':
            content_size = 0
        
        return (host, datetime, method, endpoint, protocol, int(status), int(content_size))
    else:
        return None

# Apply parsing and filter invalid rows
parsed_logs_rdd = logs_rdd.map(parse_line).filter(lambda x: x is not None)

# -------------------------------
# 5. Convert to DataFrame
# -------------------------------
columns = ["host", "datetime", "method", "endpoint", "protocol", "status", "content_size"]
logs_df = parsed_logs_rdd.toDF(columns)

# -------------------------------
# 6. Basic Statistics
# -------------------------------
logs_df.select(
    avg("content_size").alias("avg_size"),
    min("content_size").alias("min_size"),
    max("content_size").alias("max_size")
).show()

# -------------------------------
# 7. Status Code Counts
# -------------------------------
logs_df.groupBy("status").count().show()

# -------------------------------
# 8. Top Hosts
# -------------------------------
logs_df.groupBy("host") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(10)

# -------------------------------
# 9. Most Requested Endpoints
# -------------------------------
logs_df.groupBy("endpoint") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(20)

# -------------------------------
# 10. Error Endpoints (status >= 400)
# -------------------------------
logs_df.filter(col("status") >= 400) \
    .groupBy("endpoint") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(10)

# -------------------------------
# 11. Unique Hosts
# -------------------------------
unique_hosts = logs_df.select("host").distinct().count()
print("Unique hosts:", unique_hosts)

# -------------------------------
# 12. Extract Date
# -------------------------------
logs_df = logs_df.withColumn(
    "date",
    to_date(col("datetime"), "dd/MMM/yyyy:HH:mm:ss")
)

# -------------------------------
# 13. Requests per Host per Day
# -------------------------------
requests_per_host_per_day = logs_df.groupBy("host", "date").count()

requests_per_host_per_day.groupBy() \
    .agg(avg("count").alias("avg_requests_per_day")) \
    .show()

# -------------------------------
# 14. 404 Errors - Endpoints
# -------------------------------
logs_df.filter(col("status") == 404) \
    .groupBy("endpoint") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(20)

# -------------------------------
# 15. 404 Errors - Hosts
# -------------------------------
logs_df.filter(col("status") == 404) \
    .groupBy("host") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(20)

# -------------------------------
# 16. Errors Per Day
# -------------------------------
errors_per_day = logs_df.filter(col("status") == 404) \
    .groupBy("date") \
    .count()

errors_per_day.show()

# Top 3 days with highest errors
errors_per_day.orderBy(col("count").desc()).show(3)

# -------------------------------
# 17. Hourly Analysis
# -------------------------------
logs_df = logs_df.withColumn(
    "hour",
    hour(to_date(col("datetime"), "dd/MMM/yyyy:HH:mm:ss"))
)

logs_df.filter(col("status") == 404) \
    .groupBy("hour") \
    .count() \
    .orderBy("hour") \
    .show()

# -------------------------------
# 18. Stop Spark
# -------------------------------
spark.stop()
