Metadata-Version: 2.4
Name: emr-livy-mcp
Version: 0.1.0
Summary: MCP server for Apache Livy on Amazon EMR with per-request credential refresh
Requires-Python: >=3.10
Requires-Dist: boto3>=1.34.0
Requires-Dist: httpx>=0.27.0
Requires-Dist: mcp[cli]>=1.0.0
Description-Content-Type: text/markdown

# EMR Livy MCP Server

A Model Context Protocol (MCP) server for interacting with Apache Livy on Amazon EMR, with automatic credential refresh.

---

## Overview

This MCP server exposes tools for:
- Getting temporary EMR session credentials
- Managing Livy sessions (create / get / delete)
- Submitting and polling PySpark statements
- Running end-to-end queries with auto credential refresh

---

## Configuration

### Environment variables

| Variable | Required | Description |
|----------|----------|-------------|
| `EMR_CLUSTER_ID` | Yes | EMR cluster id (e.g. `j-...`) |
| `EMR_ROLE_ARN` | Yes | Execution role ARN passed to `get-cluster-session-credentials` |
| `LIVY_URL` | Yes | Livy base URL (no trailing slash), e.g. `https://livy.example.com` |
| `EMR_REGION` | No | AWS region for the EMR API (default: `us-west-2`) |
| `EMR_AK` / `EMR_SK` | No | Optional override; if unset, keys come from the usual AWS env vars / shared credentials / profile / instance role (boto3 default chain) |
| `EMR_AWS_ACCESS_KEY_ID` / `EMR_AWS_SECRET_ACCESS_KEY` | No | Same as above with explicit names |
| `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` | No | Standard AWS SDK variables (used when `EMR_AK` / `EMR_SK` are not set) |
| `EMR_AWS_SESSION_TOKEN` or `AWS_SESSION_TOKEN` | No | Optional session token for temporary keys |

```bash
export EMR_CLUSTER_ID="j-xx"
export EMR_ROLE_ARN="arn:aws:iam::<account-id>:role/<role_name>"
export LIVY_URL="https://livy.example.com"
# Optional — overrides default us-west-2:
# export EMR_REGION="us-west-2"
# Optional — otherwise use AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY (or profile, SSO, etc.):
# export EMR_AK="AKIA..."
# export EMR_SK="..."
```

### Run with `uvx` (recommended for users)

Same idea as `uvx opensearch-mcp-server-py`: **no clone, no `pip install`, no `--from`**. `uvx` downloads **`emr-livy-mcp`** from PyPI (or your default / `UV_INDEX` mirror) into an isolated tool env and runs the `emr-livy-mcp` console script.

**Requirement:** the **`emr-livy-mcp`** distribution must be published to an index `uv` can see (public PyPI or an internal index — set `UV_INDEX` / `UV_DEFAULT_INDEX` in MCP `env` if needed).

**CLI:**

```bash
uvx emr-livy-mcp
```

This package needs **Python ≥3.10**. If your default `python` is older, pin it (matches other MCP servers in many setups):

```bash
uvx --python 3.12 emr-livy-mcp
```

**Cursor `~/.cursor/mcp.json`:**

```json
"emr-livy": {
  "command": "uvx",
  "args": ["--python", "3.12", "emr-livy-mcp"],
  "env": {
    "UV_NATIVE_TLS": "1",
    "EMR_CLUSTER_ID": "<cluster id>",
    "EMR_ROLE_ARN": "<execution role arn>",
    "LIVY_URL": "https://livy.example.com"
  }
}
```

### Develop this repo (maintainers)

Uses **[uv](https://docs.astral.sh/uv/)** ([`uv.lock`](uv.lock), [`.python-version`](.python-version)):

```bash
cd dp/jup-mcp
uv sync
uv run emr-livy-mcp --help
uv add <package>
```

**Publish** so `uvx emr-livy-mcp` works for everyone:

```bash
uv build
UV_PUBLISH_TOKEN=pypi-... uv publish --index pypi
```

(`publish-url` is set on `[[tool.uv.index]]` in `pyproject.toml`; older `[[tool.uv.publish-index]]` is ignored by current uv, which caused “No indexes were found”. You can also run plain `uv publish` with a token for default PyPI upload.)

---

## API Endpoints & Tools

### 1. `get_credentials`

Fetches temporary username/password credentials from EMR.  
**Call this before any Livy operation. Credentials expire in ~1 minute so refresh before each call.**

**Parameters:** none

**Returns:**
```json
{
    "Credentials": {
        "UsernamePassword": {
            "Username": "xxx",
            "Password": "yyy"
        }
    },
    "ExpiresAt": "2026-04-11T23:33:38.905000+08:00"
}
```

**Underlying AWS CLI:**
```bash
aws emr get-cluster-session-credentials \
  --cluster-id $EMR_CLUSTER_ID \
  --execution-role-arn $EMR_ROLE_ARN \
  --region ${EMR_REGION:-us-west-2}
```

---

### 2. `create_session`

Creates a new Livy session and waits until it reaches `idle` state.

**Parameters:**

| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `kind` | string | No | `pyspark` | Session type. One of: `pyspark`, `spark`, `sparkr`, `sql` |

**Returns:**
```json
{"id":22,"name":null,"appId":null,"owner":"xxx","proxyUser":"yyy","state":"starting","kind":"pyspark","appInfo":{"driverLogUrl":null,"sparkUiUrl":null},"log":["stdout: ","\nstderr: ","\nYARN Diagnostics: "]}
```

**Underlying REST call:**
```
POST /sessions
Body: { "kind": "pyspark" }
```

---

### 3. `get_session`

Gets the current status of a Livy session.

**Parameters:**

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `session_id` | integer | Yes | Session ID returned by `create_session` |

**Returns:**
```json
{
  "session_id": 22,
  "state": "idle",
  "app_id": "application_xxx",
  "owner": "XXXXXXXXXXXXXXXXXXXX"
}
```

**Underlying REST call:**
```
GET /sessions/{session_id}
```

---

### 4. `delete_session`

Deletes and terminates a Livy session.

**Parameters:**

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `session_id` | integer | Yes | Session ID to terminate |

**Returns:**
```json
{
  "session_id": 22,
  "deleted": true
}
```

**Underlying REST call:**
```
DELETE /sessions/{session_id}
```

---

### 5. `submit_statement`

Submits a PySpark code statement to an existing session.

**Parameters:**

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `session_id` | integer | Yes | Target session ID |
| `code` | string | Yes | PySpark code to execute (multiline supported) |

**Returns:**
```json
{"id":0,"code":"print(1+1)","state":"waiting","output":null,"progress":0.0,"started":0,"completed":0}
```

**Underlying REST call:**
```
POST /sessions/{session_id}/statements
Body: { "code": "<pyspark code>" }
```

---

### 6. `get_statement`

Gets the result of a submitted statement.

**Parameters:**

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `session_id` | integer | Yes | Session ID |
| `statement_id` | integer | Yes | Statement ID returned by `submit_statement` |

**Returns (success):**
```json
{
  "statement_id": 0,
  "state": "available",
  "progress": 1.0,
  "output": {
    "status": "ok",
    "data": {
      "text/plain": "+----------+--------+\n|run_ip    |report_date|\n+----------+--------+"
    }
  }
}
```

**Returns (still running):**
```json
{
  "statement_id": 0,
  "state": "running",
  "progress": 0.5,
  "output": null
}
```

**Statement states:**

| State | Meaning |
|-------|---------|
| `waiting` | Queued, not yet started |
| `running` | Currently executing |
| `available` | Completed, result ready |
| `error` | Execution failed |
| `cancelled` | Was cancelled |

**Underlying REST call:**
```
GET /sessions/{session_id}/statements/{statement_id}
```

---

### 7. `run_query` ⭐ (high-level)

End-to-end tool: creates a session, submits code, polls until complete, returns results.  
**Automatically refreshes credentials on every poll cycle.**

**Parameters:**

| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `code` | string | Yes | — | PySpark code to run |
| `kind` | string | No | `pyspark` | Session type |
| `poll_interval_seconds` | integer | No | `30` | How often to poll for results |
| `max_wait_seconds` | integer | No | `1800` | Max time to wait before timeout (30 minutes) |
| `reuse_session_id` | integer | No | `null` | Reuse an existing session instead of creating a new one |

**Returns:**
```json
{
  "session_id": 22,
  "statement_id": 0,
  "state": "available",
  "output": "result text here"
}
```

---

## Example Usage (Claude Code prompt)

```
Use the EMR Livy MCP server to run this PySpark query and show me the results:

from pyspark.sql import functions as F
lower = '2026-01-07'
upper = '2026-01-07'
df = spark.table('xxx')
df = df.where(F.col('report_date').between(lower, upper))
df = df.where(F.col('farm_name') == 'xx').where(F.col('pool_worker') == 'xxx')
df = df.select('run_ip', 'report_date')
df = df.orderBy('report_date', 'run_ip')
df.show(100, False)
```

---

## Credential Refresh Strategy

Because EMR temporary credentials expire in ~1 minute, the MCP server follows this rule:

> **Call `get_credentials` immediately before every single Livy HTTP request.**

```
get_credentials → create_session
get_credentials → poll session state (repeat)
get_credentials → submit_statement
get_credentials → poll statement state (repeat every N seconds)
```

Do **not** cache or reuse credentials across calls.

---

## Error Handling

All tools return an `error` field on failure:

```json
{
  "error": "Session is in state dead",
  "code": "SESSION_FAILED"
}
```

| Error Code | Meaning |
|------------|---------|
| `CREDENTIAL_FAILED` | AWS CLI call failed — check IAM role and region |
| `SESSION_FAILED` | Session entered `dead` or `error` state |
| `STATEMENT_FAILED` | Code execution failed — check `output` for traceback |
| `TIMEOUT` | Exceeded `max_wait_seconds` |
| `NOT_FOUND` | Session or statement ID does not exist |

---

## Notes

- Livy runs on port `8998` with a self-signed certificate — all requests use `-k` / `verify=False`
- The `session_id` increments globally on the cluster; do not assume it starts at 0
- Sessions remain alive after a query completes — pass `reuse_session_id` to `run_query` to reuse them
- To avoid resource waste, always call `delete_session` when done