# GrapheonRL Complex Demo -- Multi-omics Joint Analysis Pipeline
#
# Simulates a real joint WGS + RNA-seq analysis pipeline for 3 patients.
# Two completely independent processing tracks merge at integration.
#
# ── WGS track (per patient, 3 parallel lanes) ───────────────────────────
#   fastqc_wgs → trim_wgs → bwa_map → sort_wgs_bam → markdup
#             → bqsr → haplotype_caller
#             (barrier) → merge_gvcf → genotype
#             → filter_snv ──┐
#             → filter_indel─┤→ functional_annotate
#             → annotate_snv─┘
#
# ── RNA-seq track (per patient, 3 parallel lanes) ───────────────────────
#   fastqc_rna → trim_rna → star_align → sort_rna_bam → featurecounts
#             (barrier) → merge_counts → normalize → deseq2
#             → volcano_plot ─┐
#             → go_enrichment─┘
#
# ── Shared reference preparation ────────────────────────────────────────
#   build_genome_index   (shared by WGS alignment)
#   build_transcriptome  (shared by RNA alignment)
#
# ── Integration (joins both tracks) ────────────────────────────────────
#   eqtl_prep → eqtl_analysis → integrate_results
#   multiqc → final_report
#
# Total: 58 tasks   Critical path: ~30s   8-core budget
#
# Scheduling challenges:
#  - STAR align uses ALL 8 cores → only one patient can align at a time
#  - Two independent synchronisation barriers (merge_gvcf, merge_counts)
#  - Post-barrier scatter (filter_snv + filter_indel run in parallel)
#  - WGS and RNA tracks compete for cores during per-patient phases

import time, os

PATIENTS = ["P1", "P2", "P3"]

onstart:
    for d in ["results/ref", "results/qc", "results/wgs",
              "results/rna", "results/integration", "results/reports"]:
        os.makedirs(d, exist_ok=True)
    with open("results/.start_time", "w") as f:
        f.write(str(time.time()))

onsuccess:
    with open("results/.end_time", "w") as f:
        f.write(str(time.time()))
    start = float(open("results/.start_time").read())
    end   = float(open("results/.end_time").read())
    print(f"\n  Complex pipeline wall time: {end - start:.1f}s  "
          f"({len(PATIENTS)} patients, WGS + RNA-seq, 58 tasks)")

rule all:
    input: "results/reports/final_report.txt"

# ── Shared reference preparation ─────────────────────────────────────────

rule build_genome_index:
    output: "results/ref/genome.idx"
    threads: 4
    resources: mem_mb=8192
    shell: "sleep 3 && echo 'genome_index' > {output}"

rule build_transcriptome:
    output: "results/ref/transcriptome.idx"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && echo 'transcriptome_index' > {output}"

# ── WGS per-patient processing ───────────────────────────────────────────

rule fastqc_wgs:
    output: "results/qc/{patient}_wgs_qc.txt"
    threads: 1
    resources: mem_mb=1024
    shell: "sleep 1 && echo 'wgs_qc:{wildcards.patient}' > {output}"

rule trim_wgs:
    input:  "results/qc/{patient}_wgs_qc.txt"
    output: "results/wgs/{patient}_wgs_trimmed.fq"
    threads: 2
    resources: mem_mb=2048
    shell: "sleep 2 && echo 'trimmed:{wildcards.patient}' > {output}"

rule bwa_map:
    input:
        ref   = "results/ref/genome.idx",
        reads = "results/wgs/{patient}_wgs_trimmed.fq"
    output: "results/wgs/{patient}_mapped.bam"
    threads: 4
    resources: mem_mb=8192
    shell: "sleep 4 && echo 'mapped:{wildcards.patient}' > {output}"

rule sort_wgs_bam:
    input:  "results/wgs/{patient}_mapped.bam"
    output: "results/wgs/{patient}_sorted.bam"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && echo 'wgs_sorted:{wildcards.patient}' > {output}"

rule markdup:
    input:  "results/wgs/{patient}_sorted.bam"
    output:
        bam     = "results/wgs/{patient}_dedup.bam",
        metrics = "results/qc/{patient}_dedup_metrics.txt"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && echo 'dedup:{wildcards.patient}' > {output.bam} && echo 'metrics' > {output.metrics}"

rule bqsr:
    input:
        bam = "results/wgs/{patient}_dedup.bam",
        ref = "results/ref/genome.idx"
    output: "results/wgs/{patient}_recal.bam"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && echo 'recal:{wildcards.patient}' > {output}"

rule haplotype_caller:
    input:
        bam = "results/wgs/{patient}_recal.bam",
        ref = "results/ref/genome.idx"
    output: "results/wgs/{patient}.gvcf"
    threads: 4
    resources: mem_mb=8192
    shell: "sleep 4 && echo 'gvcf:{wildcards.patient}' > {output}"

# ── WGS joint genotyping (barrier) ───────────────────────────────────────

rule merge_gvcf:
    input:  expand("results/wgs/{patient}.gvcf", patient=PATIENTS)
    output: "results/wgs/cohort.gvcf"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && cat {input} > {output}"

rule genotype:
    input:
        gvcf = "results/wgs/cohort.gvcf",
        ref  = "results/ref/genome.idx"
    output: "results/wgs/cohort.vcf"
    threads: 4
    resources: mem_mb=8192
    shell: "sleep 3 && echo 'genotyped' > {output}"

# ── Variant filtering scatter (parallel after genotyping) ────────────────

rule filter_snv:
    input:  "results/wgs/cohort.vcf"
    output: "results/wgs/filtered_snv.vcf"
    threads: 2
    resources: mem_mb=2048
    shell: "sleep 2 && echo 'snv_filtered' > {output}"

rule filter_indel:
    input:  "results/wgs/cohort.vcf"
    output: "results/wgs/filtered_indel.vcf"
    threads: 2
    resources: mem_mb=2048
    shell: "sleep 2 && echo 'indel_filtered' > {output}"

rule annotate_snv:
    input:  "results/wgs/filtered_snv.vcf"
    output: "results/wgs/annotated_snv.vcf"
    threads: 2
    resources: mem_mb=2048
    shell: "sleep 2 && echo 'snv_annotated' > {output}"

rule functional_annotate:
    input:
        snv   = "results/wgs/annotated_snv.vcf",
        indel = "results/wgs/filtered_indel.vcf"
    output: "results/wgs/functional.vcf"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 3 && echo 'functional_annotation' > {output}"

# ── RNA-seq per-patient processing ───────────────────────────────────────

rule fastqc_rna:
    output: "results/qc/{patient}_rna_qc.txt"
    threads: 1
    resources: mem_mb=1024
    shell: "sleep 1 && echo 'rna_qc:{wildcards.patient}' > {output}"

rule trim_rna:
    input:  "results/qc/{patient}_rna_qc.txt"
    output: "results/rna/{patient}_rna_trimmed.fq"
    threads: 2
    resources: mem_mb=2048
    shell: "sleep 2 && echo 'rna_trimmed:{wildcards.patient}' > {output}"

rule star_align:
    input:
        ref   = "results/ref/transcriptome.idx",
        reads = "results/rna/{patient}_rna_trimmed.fq"
    output: "results/rna/{patient}_star.bam"
    threads: 8            # STAR uses all cores: scheduling pressure point
    resources: mem_mb=32768
    shell: "sleep 4 && echo 'star:{wildcards.patient}' > {output}"

rule sort_rna_bam:
    input:  "results/rna/{patient}_star.bam"
    output: "results/rna/{patient}_rna_sorted.bam"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && echo 'rna_sorted:{wildcards.patient}' > {output}"

rule featurecounts:
    input:
        bam = "results/rna/{patient}_rna_sorted.bam",
        ref = "results/ref/transcriptome.idx"
    output: "results/rna/{patient}_counts.txt"
    threads: 2
    resources: mem_mb=2048
    shell: "sleep 2 && echo 'counts:{wildcards.patient}' > {output}"

# ── RNA-seq aggregation (barrier) ────────────────────────────────────────

rule merge_counts:
    input:  expand("results/rna/{patient}_counts.txt", patient=PATIENTS)
    output: "results/rna/merged_counts.txt"
    threads: 1
    resources: mem_mb=1024
    shell: "sleep 1 && cat {input} > {output}"

rule normalize:
    input:  "results/rna/merged_counts.txt"
    output: "results/rna/normalized.txt"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && echo 'normalized' > {output}"

rule deseq2:
    input:  "results/rna/normalized.txt"
    output: "results/rna/deseq2_results.txt"
    threads: 4
    resources: mem_mb=8192
    shell: "sleep 3 && echo 'deseq2_results' > {output}"

# ── RNA-seq downstream scatter ────────────────────────────────────────────

rule volcano_plot:
    input:  "results/rna/deseq2_results.txt"
    output: "results/rna/volcano.pdf"
    threads: 1
    resources: mem_mb=1024
    shell: "sleep 1 && echo 'volcano_plot' > {output}"

rule go_enrichment:
    input:  "results/rna/deseq2_results.txt"
    output: "results/rna/go_enrichment.txt"
    threads: 2
    resources: mem_mb=2048
    shell: "sleep 2 && echo 'go_enrichment' > {output}"

# ── Integration (joins WGS + RNA-seq) ────────────────────────────────────

rule eqtl_prep:
    input:
        vcf  = "results/wgs/functional.vcf",
        expr = "results/rna/normalized.txt"
    output: "results/integration/eqtl_input.txt"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && echo 'eqtl_prepared' > {output}"

rule eqtl_analysis:
    input:  "results/integration/eqtl_input.txt"
    output: "results/integration/eqtl_results.txt"
    threads: 4
    resources: mem_mb=8192
    shell: "sleep 4 && echo 'eqtl_results' > {output}"

rule integrate_results:
    input:
        eqtl = "results/integration/eqtl_results.txt",
        go   = "results/rna/go_enrichment.txt",
        vcf  = "results/wgs/functional.vcf"
    output: "results/integration/integrated.txt"
    threads: 2
    resources: mem_mb=4096
    shell: "sleep 2 && echo 'integrated' > {output}"

# ── QC + final report ────────────────────────────────────────────────────

rule multiqc:
    input:
        wgs_qc  = expand("results/qc/{patient}_wgs_qc.txt",      patient=PATIENTS),
        rna_qc  = expand("results/qc/{patient}_rna_qc.txt",      patient=PATIENTS),
        dedup   = expand("results/qc/{patient}_dedup_metrics.txt",patient=PATIENTS)
    output: "results/reports/multiqc.html"
    threads: 1
    resources: mem_mb=2048
    shell: "sleep 2 && echo 'multiqc_report' > {output}"

rule final_report:
    input:
        integrated = "results/integration/integrated.txt",
        volcano    = "results/rna/volcano.pdf",
        multiqc    = "results/reports/multiqc.html"
    output: "results/reports/final_report.txt"
    threads: 1
    resources: mem_mb=1024
    shell: "sleep 1 && echo 'FINAL REPORT COMPLETE' > {output}"
