#!/bin/sh

. "$TESTSUITE_LIB_UTILS"
. "$(dirname "$0")/find-kafka.sh"

# use ramdrive for kafka if available
RAMDISK="/mnt/ramdisk/$USER"
if mkdir -p "$RAMDISK" 2> /dev/null; then
    KAFKA_TMPDIR=$RAMDISK/_kafka${WORKER_SUFFIX_PATH}
fi

echo "Kafka data directory: $KAFKA_TMPDIR"

if [ "x$KAFKA_TMPDIR" = "x" ]; then
    die "KAFKA_TMPDIR must be set or RAM-disk must be enabled"
fi
if [ "x$KAFKA_HOME" = "x" ]; then
    die "KAFKA_HOME must be set"
fi
if [ "x$KAFKA_SERVER_HOST" = "x" ]; then
    die "KAFKA_SERVER_HOST must be set"
fi
if [ "x$KAFKA_SERVER_PORT" = "x" ]; then
    die "KAFKA_SERVER_PORT must be set"
fi
if [ "x$KAFKA_CONTROLLER_PORT" = "x" ]; then
    die "KAFKA_CONTROLLER_PORT must be set"
fi

KAFKA_SOURCE_DIR="$KAFKA_TMPDIR/kafka"
echo "Kafka source directory: $KAFKA_SOURCE_DIR"
KAFKA_KRAFT_SOURCE_CONF_FILE="$KAFKA_SOURCE_DIR/config/kraft/server.properties"
KAFKA_DEFAULT_SOURCE_CONF_FILE="$KAFKA_SOURCE_DIR/config/server.properties"
KAFKA_CONF_FILE="$KAFKA_SOURCE_DIR/config/kraft/testsuite-kafka-server.properties"
KAFKA_BIN_DIR="$KAFKA_SOURCE_DIR/bin"
KAFKA_LOGS_DIR="$KAFKA_TMPDIR/logs" # Logs means data

KAFKA_PID_FILE="$KAFKA_TMPDIR/kafka.pid"
KAFKA_LOG_FILE="$KAFKA_TMPDIR/kafka.log"

# Not default node.id value. Not to kill all Kafka brokers running on the machine.
KAFKA_NODE_ID="77"
echo "Using Kafka node.id $KAFKA_NODE_ID"

start_kafka() {
    # It is possible that Kafka server is already launched on user's machine.
    # In such keys, unexpectedly, testsuite server
    # may start interfact with launched one without any errors.
    # To avoid such cases, do not let the ports be binded by another applications at all.
    if [ "$(uname)" = "Darwin" ]; then
        if netstat -p tcp -n | awk '{print $4}' | grep "\.$KAFKA_SERVER_PORT"; then
            echo "Maybe Kafka server is already running on the $KAFKA_SERVER_PORT port."
            echo "Choose another port: specify TESTSUITE_KAFKA_SERVER_PORT environment variable."
            exit 1
        elif netstat -p tcp -n | awk '{print $4}' | grep "\.$KAFKA_CONTROLLER_PORT"; then
            echo "Maybe Kafka controller is already running on the $KAFKA_CONTROLLER_PORT port."
            echo "Choose another port: specify TESTSUITE_KAFKA_CONTROLLER_PORT environment variable."
            exit 1
        fi
    else
        if netstat --listening --tcp --numeric | awk '{print $4}' | grep ":$KAFKA_SERVER_PORT"; then
            echo "Maybe Kafka server is already running on the $KAFKA_SERVER_PORT port."
            echo "Choose another port: specify TESTSUITE_KAFKA_SERVER_PORT environment variable."
            exit 1
        elif netstat --listening --tcp --numeric | awk '{print $4}' | grep ":$KAFKA_CONTROLLER_PORT"; then
            echo "Maybe Kafka controller is already running on the $KAFKA_CONTROLLER_PORT port."
            echo "Choose another port: specify TESTSUITE_KAFKA_CONTROLLER_PORT environment variable."
            exit 1
        fi
    fi

    mkdir -p "$KAFKA_SOURCE_DIR"
    mkdir -p "$KAFKA_LOGS_DIR"

    echo "Copying Kafka home to tmp (to patch server configs): $KAFKA_HOME -> $KAFKA_SOURCE_DIR"
    cp -r "$KAFKA_HOME"/* "$KAFKA_SOURCE_DIR/"
    mkdir -p $(dirname "$KAFKA_CONF_FILE")
    cp "$KAFKA_KRAFT_SOURCE_CONF_FILE" "$KAFKA_CONF_FILE" || cp "$KAFKA_DEFAULT_SOURCE_CONF_FILE" "$KAFKA_CONF_FILE"

    echo "Generating cluster UUID..."
    CLUSTER_UUID_FILE=$KAFKA_TMPDIR/cluster_uuid.txt
    "$KAFKA_BIN_DIR/kafka-storage.sh" random-uuid > "$CLUSTER_UUID_FILE"
    echo "Cluster UUID is $(cat "$CLUSTER_UUID_FILE")"

    # For some reasons, kafka-storage.sh does not support --override option
    # to override the log.dirs, node.id and  controller.quorum.voters configuration params.
    # Though, manually substitute it in the file.
    echo "Patching server.properties. Setting logs dir to $KAFKA_LOGS_DIR"
    sed -i.backup 's/log\.dirs=.*/log\.dirs='"$(echo "$KAFKA_LOGS_DIR" | sed 's/\//\\\//g')"'/' "$KAFKA_CONF_FILE"
    sed -i.backup 's/node\.id=.*/node\.id='"$KAFKA_NODE_ID"'/' "$KAFKA_CONF_FILE"
    sed -i.backup 's/controller\.quorum\.voters=.*/controller.quorum.voters='"$KAFKA_NODE_ID"'@'"$KAFKA_SERVER_HOST:""$KAFKA_CONTROLLER_PORT"'/' "$KAFKA_CONF_FILE"
    patched_config=$(cat "$KAFKA_CONF_FILE" | grep -E "(log\.dirs|node\.id|controller\.quorum\.voters)")
    echo "Patched server config:\n$patched_config"

    echo "Formatting logs dir..."
    # standalone appeared in kafka 4.0
    # and is required for kafka launch
    "$KAFKA_BIN_DIR/kafka-storage.sh" format \
        --cluster-id $(cat "$CLUSTER_UUID_FILE") \
        --config $KAFKA_CONF_FILE \
        --standalone \
        --ignore-formatted || "$KAFKA_BIN_DIR/kafka-storage.sh" format \
            --cluster-id $(cat "$CLUSTER_UUID_FILE") \
            --config $KAFKA_CONF_FILE \
            --ignore-formatted

    echo "Running Kafka broker..."
    "$KAFKA_BIN_DIR/kafka-server-start.sh" -daemon \
        "$KAFKA_CONF_FILE" \
        --override auto.create.topics.enable=true \
        --override listeners="PLAINTEXT://$KAFKA_SERVER_HOST:$KAFKA_SERVER_PORT,CONTROLLER://$KAFKA_SERVER_HOST:$KAFKA_CONTROLLER_PORT" \
        --override advertised.listeners="PLAINTEXT://$KAFKA_SERVER_HOST:$KAFKA_SERVER_PORT" \
        --override controller.quorum.voters="$KAFKA_NODE_ID@$KAFKA_SERVER_HOST:$KAFKA_CONTROLLER_PORT" \
        --override offsets.topic.num.partitions=1

    # There is no basic healthcheck mechanism for checking if broker is started.
    # But almost exactly, if broker can create the topic, it is alive.
    echo "Creating test topic to ensure that broker is started..."
    "$KAFKA_BIN_DIR/kafka-topics.sh" \
        --bootstrap-server="$KAFKA_SERVER_HOST:$KAFKA_SERVER_PORT" \
        --create --topic test-topic \
        --partitions=1 --replication-factor=1

    echo "Deleting test topic..."
    "$KAFKA_BIN_DIR/kafka-topics.sh" \
        --bootstrap-server="$KAFKA_SERVER_HOST:$KAFKA_SERVER_PORT" \
        --delete --topic test-topic

    for topic_partition in $(echo "$KAFKA_START_TOPICS" | tr ';' '\n')
    do
        topic=$(echo $topic_partition | cut -d ':' -f 1)
        partitions_count=$(echo $topic_partition | cut -d ':' -f 2)

        echo "Creating topic $topic with $partitions_count partitions"

        "$KAFKA_BIN_DIR/kafka-topics.sh" \
            --bootstrap-server="$KAFKA_SERVER_HOST:$KAFKA_SERVER_PORT" \
            --create --topic $topic \
            --partitions=$partitions_count --replication-factor=1
    done
}

stop_kafka() {
    # Kafka script kafka-server-stop.sh does not suppose
    # that there are different brokers on one host
    # and tries to kill all of them.
    # Underlying solution is also a bit of strange
    # but it is safer than kafka-server-stop.sh

    # Official Apache solution to find all brokers on the host.
    PIDS=$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}'| xargs)
    echo "Potential brokers pids: $PIDS"

    # Finds the process which was started as a
    # bin/kafka-server-start.sh $KAFKA_CONF_FILE
    for pid in $PIDS; do
        echo "Checking pid: $pid"
        PathToConfig=$(ps ax | grep $pid | grep ' kafka\.Kafka ' | grep java | grep -v grep | sed 's/--override .*=[^ ]*//g' | awk 'NF>1{print $NF}')
        echo "Current path to config: $PathToConfig"
        if [ "$PathToConfig" = "$KAFKA_CONF_FILE" ]; then
            echo "Found candidate: $pid -> $PathToConfig"
            keyword="node.id="
            NID=$(sed -n "/$keyword/ { s/$keyword//p; q; }" "$PathToConfig")
            echo "Current candidate node.id: $NID"
            if [ "$NID" = "$KAFKA_NODE_ID" ]; then
                echo "Killing broker: $pid"
                for i in `seq 10`; do
                    if kill -s TERM $pid 2>/dev/null; then
                        echo "Retrying kill $pid after 1s..."
                        sleep 1
                    else
                        echo "Broker killed after $i tries"
                        break
                    fi
                done
                kill -s KILL $pid 2>/dev/null
            fi
        else
            echo "Skipping $pid"
        fi
    done

    rm -rf "$KAFKA_TMPDIR"
}

start() {
    start_kafka
}

stop() {
    stop_kafka
}

script_main "$@"
