<reasoning_content>
*   `Job`: Dataclass with `id: str`, `data: dict`, `priority: int = 0`, `retries: int = 0`.
    *   `RetryPolicy`: Class with `max_attempts` and a `run(fn, data)` method. `run` calls `fn(data)`, retries on exception, returns `(success, attempts_made)`. No actual sleep.
    *   `JobQueue`: Class with `add_job`, `process_job`, and `get_next_job`.
        *   `add_job`: adds job with priority.
        *   `process_job`: uses `RetryPolicy`.
        *   `get_next_job`: returns highest priority (FIFO for same priority). Returns `(id, data)` or `None`.

    *   `Job`: `@dataclass`.
    *   `RetryPolicy`: `run` method logic.
    *   `JobQueue`:
        *   Need a way to store jobs with priority and FIFO order. A `heapq` is standard for priority queues.
        *   In Python's `heapq`, to handle FIFO within same priority, we use a tuple like `(-priority, counter, job)`. However, the prompt says `priority: int = 0`. Higher priority usually means a higher number, but `heapq` is a min-heap.
        *   Wait, "highest-priority job". Usually, in a queue, higher number = higher priority. If `priority` 10 > `priority` 0, and `heapq` is a min-heap, I should store `-priority`.
        *   To ensure FIFO for same priority, use an incrementing counter.
        *   Data structure: `heapq`.
        *   `add_job`: create `Job`, push to heap.
        *   `process_job`: retrieve job (or find it? The prompt says `process_job(self, job_id, processor)`). Wait, if `process_job` takes a `job_id`, how does it find the job?
        *   If `process_job` takes `job_id`, the queue needs a dictionary/map to look up jobs by ID to avoid searching the heap.
        *   If a job is "processed", is it removed? Usually, yes.
        *   The prompt says `get_next_job` returns `(id, data)`.
        *   Wait, let's re-read `process_job(self, job_id: str, processor: Callable)`. This implies the job is already in the queue or the queue manages the registry.
        *   Actually, the standard pattern for these types of coding tasks:
            1. `add_job` adds to internal storage.
            2. `get_next_job` pops the highest priority one.
            3. `process_job` takes an ID. If I pop the job in `get_next_job`, `process_job` can't find it by ID unless the job is still there or we track it.
            4. Let's assume `add_job` puts it in the heap. `get_next_job` pops it. But `process_job` takes a `job_id`. This suggests the jobs are stored in a way where they can be accessed by ID.
            5. Let's check the requirements again: `process_job(self, job_id: str, processor: Callable) -> bool`.
            6. This is slightly contradictory if `get_next_job` removes it. If `get_next_job` returns it, `process_job` is likely called *on* that job.
            7. Let's refine the `JobQueue` logic:
                *   `self.jobs`: A dictionary `{job_id: Job}`.
                *   `self.queue`: A heap `[(-priority, counter, job_id)]`.
                *   `add_job`: Create `Job`, add to `self.jobs`, add to `self.queue`.
                *   `get_next_job`: Pop from `self.queue` (handling cases where the job might have been deleted from `self.jobs` - though `process_job` doesn't seem to delete jobs, it's ambiguous). Actually, let's look at the standard flow.
                *   Flow: `add_job` -> `get_next_job` (gets job) -> `process_job` (uses processor).
                *   If `get_next_job` returns `(id, data)`, it likely removes it from the priority queue to satisfy the "get next" property.
                *   If `process_job` takes `job_id`, it needs to look it up.
                *   Let's design `JobQueue` with:
                    *   `self._jobs: dict[str, Job]`
                    *   `self._priority_queue: list` (heap)
                    *   `self._counter: int` (for FIFO)
                    *   `self._retry_policy = RetryPolicy()`
                *   `add_job(id, data, priority)`:
                    `job = Job(id, data, priority)`
                    `self._jobs[id] = job`
                    `heapq.heappush(self._priority_queue, (-priority, self._counter, id))`
                    `self._counter += 1`
                *   `get_next_job()`:
                    `while self._priority_queue:`
                        `p, c, jid = heapq.heappop(self._priority_queue)`
                        `if jid in self._jobs:`
                            `job = self._jobs[jid]`
                            `return jid, job.data`
                    `return None`
                *   `process_job(job_id, processor)`:
                    `job = self._jobs.get(job_id)`
                    `if not job: return False`
                    `success, attempts = self._retry_policy.run(processor, job.data)`
                    `if success: del self._jobs[job_id]; return True`
                    `else: job.retries = attempts; return False` (wait, prompt doesn't say what to do with failed jobs, but usually they stay or get discarded. I'll just follow the return type `bool`).
                    Actually, `process_job` says `-> bool`. If `success` is True, return True.

    *   `Job` dataclass:
        ```python
        @dataclass
        class Job:
            id: str
            data: dict
            priority: int = 0
            retries: int = 0
        ```
    *   `RetryPolicy` class:
        ```python
        class RetryPolicy:
            def __init__(self, max_attempts: int = 4):
                self.max_attempts = max_attempts
            def run(self, fn, data) -> tuple[bool, int]:
                attempts = 0
                while attempts < self.max_attempts:
                    attempts += 1
                    try:
                        fn(data)
                        return True, attempts
                    except Exception:
                        pass
                return False, attempts
        ```
    *   `JobQueue` class:
        ```python
        import heapq
        from dataclasses import dataclass
        from typing import Callable, Any

        class JobQueue:
            def __init__(self):
                self._jobs: dict[str, Job] = {}
                self._pq: list[tuple[int, int, str]] = []
                self._counter = 0
                self._retry_policy = RetryPolicy()

            def add_job(self, job_id: str, data: dict, priority: int = 0) -> None:
                job = Job(id=job_id, data=data, priority=priority)
                self._jobs[job_id] = job
                heapq.heappush(self._pq, (-priority, self._counter, job_id))
                self._counter += 1

            def process_job(self, job_id: str, processor: Callable) -> bool:
                job = self._jobs.get(job_id)
                if not job:
                    return False
                success, attempts = self._retry_policy.run(processor, job.data)
                if success:
                    del self._jobs[job_id]
                    return True
                job.retries = attempts
                return False

            def get_next_job(self) -> tuple[str, dict] | None:
                while self._pq:
                    _, _, job_id = heapq.heappop(self._pq)
                    if job_id in self._jobs:
                        job = self._jobs[job_id]
                        return job.id, job.data
                return None
        ```

    *   Wait, if `get_next_job` pops from the heap, and then `process_job` is called, `process_job` needs the job. If `get_next_job` removes it from the heap, the job is "taken" out of the priority queue but still exists in `self._jobs`?
    *   The problem with my logic: `get_next_job` pops from heap. If `process_job` then fails, the job is still in `self._jobs` but no longer in `self._pq`. This means it won't be "gettable" again.
    *   *Alternative approach*: `get_next_job` shouldn't remove the job from the queue if it's just being peeked at? No, "get next job" usually implies consumption.
    *   Let's look at `process_job(self, job_id: str, processor: Callable)`. This method takes a specific `job_id`.
    *   If I call `get_next_job` and it returns `(id, data)`, then I call `process_job(id, processor)`, the job is removed on success.
    *   What if `process_job` fails? The job's `retries` count should be updated.
    *   If `get_next_job` pops from the heap, the job is gone from the queue. If `process_job` fails, the job is in `_jobs` but not in `_pq`. This is a bug.
    *   *Correction*: `get_next_job` should probably not be the one removing the job if we want it to be re-processed later via `process_job` if it fails? Or rather, `get_next_job` is for *discovering* what to do.
    *   Actually, a common pattern in these prompts is that `get_next_job` is the way to pull a job *out* for processing.
    *   If `get_next_job` pops it from the heap, then `process_job` is called. If `process_job` fails, and we want to re-queue it, we'd need to put it back. But the prompt doesn't say anything about re-queueing failed jobs.
    *   However, if `get_next_job` pops it, and it's not in `_jobs`, it's lost.
    *   Wait, if `process_job` is the one that determines success, and `get_next_job` is just for fetching, maybe `get_next_job` shouldn't pop? No, `get_next_job` usually implies movement.
    *   Let's look at `get_next_job` signature: `get_next_job(self) -> tuple[str, dict] | None`.
    *   If I use `heapq.heappop` in `get_next_job`, the job is removed from the priority order.
    *   If `process_job` is then called, and it fails, the job is still in `self._jobs` but is no longer in `self._pq`.
    *   To fix this: `process_job` should handle the removal, and `get_next_job` should simply return the top job without removing it? No, that would lead to an infinite loop of getting the same job.
    *   Let's consider this: `get_next_job` retrieves the job ID and data. In a real system, once you "get" it, it's "in flight". If `process_job` fails, you might re-add it.
    *   The prompt doesn't specify behavior for failed jobs after `process_job`. It only asks for the implementation of the three methods.
    *   I will implement `get_next_job` to pop from the heap. If `process_job` is called and fails, I'll update the `retries` field in the `Job` object (as requested by the field definition, even though `RetryPolicy` handles the internal retry attempts).
    *   Actually, `RetryPolicy.run` returns `attempts_made`. I should update `job.retries = attempts_made`.

    *   One detail: `Job` has `retries: int = 0`. In `RetryPolicy.run`, it returns the number of attempts made.
    *   In `process_job`, if `success` is `False`, I'll update `job.retries`.

    *   `Job`: `id`, `data`, `priority`, `retries`. (Check)
    *   `RetryPolicy`: `max_attempts`, `run(fn, data) -> (bool, int)`, no sleep. (Check)
    *   `JobQueue`: `add_job`, `process_job`, `get_next_job` (FIFO for same priority). (Check)
    *   `get_next_job` returns `(id, data)` or `None`. (Check)
    *   `process_job` uses `RetryPolicy`. (Check)
    *   `JobQueue` uses `heapq` with `(-priority, counter, job_id)`. (Check)
    *   `JobQueue.get_next_job` logic:
        1. `heapq.heappop`
        2. Check if `job_id` is in `self._jobs`.
        3. If yes, return.
        4. If no (meaning it was deleted via `process_job` already), continue loop.
    *   Wait, if `process_job` is called, it deletes from `_jobs`. This is good.
    *   If `get_next_job` pops, the job is removed from the heap. If `process_job` isn't called, the job stays in `_jobs` but is lost from the queue. This is acceptable for a simple implementation unless the prompt implies `get_next_job` shouldn't remove. But "Get Next Job" usually means "take the next one".

    One more thing: "FIFO order within the same priority".
    If `priority` is 10, 10, 5, 5.
    Heap stores: `(-10, 0, id1), (-10, 1, id2), (-5, 2, id3), (-5, 3, id4)`.
    `heappop` will return `id1`, then `id2`, then `id3`. This is correct.

    ```python
    from dataclasses import dataclass
    from typing import Callable, Any
    import heapq

    @dataclass
    class Job:
        id: str
        data: dict
        priority: int = 0
        retries: int = 0

    class RetryPolicy:
        def __init__(self, max_attempts: int = 4):
            self.max_attempts = max_attempts

        def run(self, fn: Callable, data: Any) -> tuple[bool, int]:
            attempts = 0
            while attempts < self.max_attempts:
                attempts += 1
                try:
                    fn(data)
                    return True, attempts
                except Exception:
                    continue
            return False, attempts

    class JobQueue:
        def __init__(self):
            self._jobs: dict[str, Job] = {}
            self._pq: list[tuple[int, int, str]] = []
            self._counter = 0
            self._retry_policy = RetryPolicy()

        def add_job(self, job_id: str, data: dict, priority: int = 0) -> None:
            job = Job(id=job_id, data=data, priority=priority)
            self._jobs[job_id] = job
            # Use -priority for max-heap behavior with heapq (min-heap)
            # Use self._counter to ensure FIFO for same priority
            heapq.heappush(self._pq, (-priority, self._counter, job_id))
            self._counter += 1

        def process_job(self, job_id: str, processor: Callable) -> bool:
            job = self._jobs.get(job_id)
            if not job:
                return False
            
            success, attempts = self._retry_policy.run(processor, job.data)
            if success:
                del self._jobs[job_id]
                return True
            else:
                job.retries = attempts
                return False

        def get_next_job(self) -> tuple[str, dict] | None:
            while self._pq:
                _, _, job_id = heapq.heappop(self._pq)
                if job_id in self._jobs:
                    job = self._jobs[job_id]
                    return job.id, job.data
            return None
    ```

    Wait, if `get_next_job` pops the job, and `process_job` is called on it, but `process_job` fails, the job is still in `self._jobs` but cannot be retrieved again via `get_next_job` because it was already popped.
    Should `get_next_job` be a "peek" operation? The prompt says `get_next_job(self) -> tuple[str, dict] | None`.
    In most queue implementations, `get` removes.
    If the user wants to retry a job that failed in `process_job`, they would have to re-add it.
    Given the constraints and the instructions, the provided implementation is the most logical interpretation of the "composable" requirement.

    One edge case: `process_job` takes `job_id`. If I call `get_next_job`, I get the `id`. Then I call `process_job(id, ...)`. If `process_job` fails, the job is in `_jobs` but not in `_pq`.
    How can the user get it again? They can't, unless they call `add_job` again or I change `get_next_job`.
    If `get_next_job` didn't pop, the `while self._pq` would never end or would always return the same job.
    So, `get_next_job` *must* pop. If the job fails, it's the responsibility of the caller to re-add it or the system design accepts that failed jobs are removed from the queue. This is common in simple task queues.

    Final check on types:
    `Job.data` is `dict`.
    `RetryPolicy.run` calls `fn(data)`.
    `JobQueue.process_job` takes `processor: Callable`.
    `get_next_job` returns `tuple[str, dict] | None`.
    Everything looks solid.

    ```python
    from dataclasses import dataclass
    from typing import Callable, Any
    import heapq

    @dataclass
    class Job:
        id: str
        data: dict
        priority: int = 0
        retries: int = 0

    class RetryPolicy:
        def __init__(self, max_attempts: int = 4):
            self.max_attempts = max_attempts

        def run(self, fn: Callable, data: Any) -> tuple[bool, int]:
            attempts = 0
            while attempts < self.max_attempts:
                attempts += 1
                try:
                    fn(data)
                    return True, attempts
                except Exception:
                    pass
            return False, attempts

    class JobQueue:
        def __init__(self):
            self._jobs: dict[str, Job] = {}
            self._pq: list[tuple[int, int, str]] = []
            self._counter = 0
            self._retry_policy = RetryPolicy()

        def add_job(self, job_id: str, data: dict, priority: int = 0) -> None:
            job = Job(id=job_id, data=data, priority=priority)
            self._jobs[job_id] = job
            heapq.heappush(self._pq, (-priority, self._counter, job_id))
            self._counter += 1

        def process_job(self, job_id: str, processor: Callable) -> bool:
            job = self._jobs.get(job_id)
            if not job:
                return False
            success, attempts = self._retry_policy.run(processor, job.data)
            if success:
                del self._jobs[job_id]
                return True
            job.retries = attempts
            return False

        def get_next_job(self) -> tuple[str, dict] | None:
            while self._pq:
                _, _, job_id = heapq.heappop(self._pq)
                if job_id in self._jobs:
                    job = self._jobs[job_id]
                    return job.id, job.data
            return None
    ```
    Actually, one small thing: `JobQueue` doesn't allow setting `max_attempts` for the `RetryPolicy`. The prompt doesn't ask for it, so I'll leave it with the default.

    The prompt says: `run(self, fn: Callable, data) -> tuple[bool, int]`.
    My code: `def run(self, fn: Callable, data: Any) -> tuple[bool, int]:`. (Correct).

    Wait, if `get_next_job` removes the job from the priority queue, but `process_job` is the one that removes it from the dictionary, then a failed job is essentially "lost" from the queue (it exists in the dictionary but won't be returned by `get_next_job` because its ID is no longer in the heap). This is fine for this scope.

    One more check: `process_job` must USE `RetryPolicy`.
    `success, attempts = self._retry_policy.run(processor, job.data)` (Correct).

    Final code structure:
    Imports
    Job
    RetryPolicy
    JobQueue

    (Self-correction): `get_next_job` return type `tuple[str, dict] | None`. Python 3.10+ syntax. Standard library only. This is fine.
</reasoning_content>

<content>
```python
from dataclasses import dataclass
from typing import Callable, Any
import heapq

@dataclass
class Job:
    id: str
    data: dict
    priority: int = 0
    retries: int = 0

class RetryPolicy:
    def __init__(self, max_attempts: int = 4):
        self.max_attempts = max_attempts

    def run(self, fn: Callable, data: Any) -> tuple[bool, int]:
        attempts = 0
        while attempts < self.max_attempts:
            attempts += 1
            try:
                fn(data)
                return True, attempts
            except Exception:
                continue
        return False, attempts

class JobQueue:
    def __init__(self):
        self._jobs: dict[str, Job] = {}
        self._pq: list[tuple[int, int, str]] = []
        self._counter = 0
        self._retry_policy = RetryPolicy()

    def add_job(self, job_id: str, data: dict, priority: int = 0) -> None:
        job = Job(id=job_id, data=data, priority=priority)
        self._jobs[job_id] = job
        # heapq is a min-heap. We use -priority to simulate a max-priority queue.
        # We use self._counter to ensure FIFO order for jobs with the same priority.
        heapq.heappush(self._pq, (-priority, self._counter, job_id))
        self._counter += 1

    def process_job(self, job_id: str, processor: Callable) -> bool:
        job = self._jobs.get(job_id)
        if not job:
            return False
        
        success, attempts = self._retry_policy.run(processor, job.data)
        if success:
            del self._jobs[job_id]
            return True
        else:
            job.retries = attempts
            return False

    def get_next_job(self) -> tuple[str, dict] | None:
        while self._pq:
            _, _, job_id = heapq.heappop(self._pq)
            if job_id in self._jobs:
                job = self._jobs[job_id]
                return job.id, job.data
        return None

```
</content>