📘 Practical 2
Title: NASA Web Log Analysis using PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, min, max, to_date, hour
import re

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("NASAWebLogEDA") \
    .getOrCreate()

log_file_path = "/content/NASA_access_log_Jul95"

logs_rdd = spark.sparkContext.textFile(log_file_path)

log_pattern = r'^(\S+) (\S+) (\S+) \[(.*?)\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+)'

def parse_line(line):

    match = re.match(log_pattern, line)

    if match:

        host, _, _, datetime, method, endpoint, protocol, status, content_size = match.groups()

        if content_size == '-':
            content_size = 0

        return (
            host,
            datetime,
            method,
            endpoint,
            protocol,
            int(status),
            int(content_size)
        )

    else:
        return None

parsed_logs_rdd = logs_rdd.map(parse_line).filter(lambda x: x is not None)

columns = [
    "host",
    "datetime",
    "method",
    "endpoint",
    "protocol",
    "status",
    "content_size"
]

logs_df = parsed_logs_rdd.toDF(columns)

# a. Content size stats

logs_df.select(
    avg("content_size").alias("avg_size"),
    min("content_size").alias("min_size"),
    max("content_size").alias("max_size")
).show()

# b. HTTP status code analysis

logs_df.groupBy("status").count().show()

# c. Top 10 hosts

logs_df.groupBy("host") \
.count() \
.orderBy(col("count").desc()) \
.show(10)

# d. Top 20 endpoints

logs_df.groupBy("endpoint") \
.count() \
.orderBy(col("count").desc()) \
.show(20)

# e. Top 10 error endpoints

logs_df.filter(col("status") >= 400) \
.groupBy("endpoint") \
.count() \
.orderBy(col("count").desc()) \
.show(10)

# f. Unique hosts

unique_hosts = logs_df.select("host").distinct().count()

print("Unique hosts:", unique_hosts)

# g. Average requests per host per day

logs_df = logs_df.withColumn(
    "date",
    to_date(col("datetime"), "dd/MMM/yyyy:HH:mm:ss Z")
)

requests_per_host_per_day = logs_df.groupBy("host", "date").count()

requests_per_host_per_day.groupBy().agg(avg("count")).show()

# h. Top 20 endpoints with 404

logs_df.filter(col("status") == 404) \
.groupBy("endpoint") \
.count() \
.orderBy(col("count").desc()) \
.show(20)

# i. Top 20 hosts generating 404

logs_df.filter(col("status") == 404) \
.groupBy("host") \
.count() \
.orderBy(col("count").desc()) \
.show(20)

# j. 404 errors per day

errors_per_day = logs_df.filter(col("status") == 404) \
.groupBy("date") \
.count()

errors_per_day.show()

# k. Top 3 days with most 404 errors

errors_per_day.orderBy(col("count").desc()).show(3)

# l. Hourly 404 errors

logs_df = logs_df.withColumn(
    "hour",
    hour(to_date(col("datetime"), "dd/MMM/yyyy:HH:mm:ss Z"))
)

logs_df.filter(col("status") == 404) \
.groupBy("hour") \
.count() \
.orderBy("hour") \
.show()

spark.stop()
