# LoadDensity

<p align="center">
  <strong>Multi-protocol load and stress automation: Locust + WebSocket + gRPC + MQTT + raw sockets, plus a JSON-driven action executor with batteries included.</strong>
</p>

<p align="center">
  <a href="https://pypi.org/project/je-load-density/"><img src="https://img.shields.io/pypi/v/je_load_density" alt="PyPI Version"></a>
  <a href="https://pypi.org/project/je-load-density/"><img src="https://img.shields.io/pypi/pyversions/je_load_density" alt="Python Version"></a>
  <a href="https://github.com/Integration-Automation/LoadDensity/blob/main/LICENSE"><img src="https://img.shields.io/github/license/Integration-Automation/LoadDensity" alt="License"></a>
  <a href="https://loaddensity.readthedocs.io/en/latest/"><img src="https://readthedocs.org/projects/loaddensity/badge/?version=latest" alt="Documentation Status"></a>
</p>

<p align="center">
  <a href="README/README_zh-TW.md">繁體中文</a> |
  <a href="README/README_zh-CN.md">简体中文</a>
</p>

---

LoadDensity (`je_load_density`) started as a Locust wrapper and grew into a full multi-protocol load framework: HTTP, FastHttp, WebSocket, gRPC, MQTT, and raw TCP/UDP user templates behind one JSON-driven action executor, plus modules for parameterised data, scenario flow, reports, observability, distributed runners, recording, persistent storage, and an MCP control surface so Claude can drive load tests end-to-end. Every executor command has a deterministic name (`LD_*`) and a single dispatch point, so an action JSON can mix protocols, exporters, and reports in the same script.

> **Optional dependencies, opt-in install** — every protocol driver and exporter ships behind a `pip install je_load_density[<extra>]` extra. The base install footprint is unchanged for users who only need HTTP load testing.

## Table of Contents

- [Highlights](#highlights)
- [Installation](#installation)
- [Architecture](#architecture)
- [Quick Start](#quick-start)
- [Core API](#core-api)
- [Action Executor](#action-executor)
- [User Templates](#user-templates)
  - [HTTP / FastHttp](#http--fasthttp)
  - [WebSocket](#websocket)
  - [gRPC](#grpc)
  - [MQTT](#mqtt)
  - [Raw TCP / UDP](#raw-tcp--udp)
- [Parameter Resolver](#parameter-resolver)
- [Scenario Modes](#scenario-modes)
- [Assertions & Extractors](#assertions--extractors)
- [Reports](#reports)
- [Observability](#observability)
- [Distributed Master / Worker](#distributed-master--worker)
- [HAR Record / Replay](#har-record--replay)
- [Persistent Records (SQLite)](#persistent-records-sqlite)
- [MCP Server (for Claude)](#mcp-server-for-claude)
- [Hardened Control Socket](#hardened-control-socket)
- [GUI](#gui)
- [CLI Usage](#cli-usage)
- [Test Record](#test-record)
- [Exception Handling](#exception-handling)
- [Logging](#logging)
- [Supported Platforms](#supported-platforms)
- [License](#license)

## Highlights

- **One executor, six protocols** — HTTP, FastHttp, WebSocket, gRPC, MQTT, raw TCP/UDP — all dispatched from the same `LD_start_test` command via a `user` key.
- **JSON-driven** — Every test is an action JSON list; the same script can be hand-authored, generated by HAR import, scheduled by an MCP tool, or sent over the control socket.
- **Parameter resolver** — `${var.x}`, `${env.X}`, `${csv.source.col}`, `${faker.method}`, plus built-in `${uuid()}`, `${now()}`, `${randint(min,max)}` helpers; values can also be extracted from responses and reused downstream.
- **Scenario flow** — Declare tasks as `sequence` (default), `weighted`, or `conditional` (`run_if` / `skip_if` predicates) without touching Python.
- **Six report formats** — HTML, JSON, XML, CSV, JUnit XML, and a percentile-summary JSON. The summary covers totals, failure rate, and per-name p50 / p90 / p95 / p99 latencies for trend tracking.
- **Three exporters** — Prometheus HTTP endpoint, InfluxDB line-protocol UDP/HTTP sink, and OpenTelemetry OTLP gRPC exporter.
- **Distributed runners** — `runner_mode="master"` / `"worker"` for cross-machine load with the same start_test API.
- **HAR record / replay** — Convert real browser traffic into a runnable action JSON with regex include/exclude filters.
- **Persistent records** — Optional SQLite sink with run / record / metadata schema for cross-run regression checks.
- **MCP server** — `python -m je_load_density.mcp_server` exposes 11 tools so Claude (Desktop, Code, any MCP client) can drive LoadDensity end-to-end.
- **Hardened control socket** — Length-prefixed framing, optional TLS, shared-secret token (env or arg), with backwards-compatible legacy mode for existing IDE integrations such as PyBreeze.
- **Live GUI** — Optional PySide6 GUI with a live stats panel (RPS / avg / p95 / failures), translated to English, Traditional Chinese, Japanese, and Korean.
- **CLI subcommands** — `run` / `run-dir` / `run-str` / `init` / `serve`. Legacy `-e/-d/-c/--execute_str` flags remain for downstream tools.

## Installation

```bash
pip install je_load_density
```

Pulls in [Locust](https://locust.io/) and `defusedxml` — nothing else.

### Optional extras

| Extra | Adds |
|-------|------|
| `gui` | PySide6 + qt-material (graphical front-end) |
| `websocket` | `websocket-client` (WebSocket user template) |
| `grpc` | `grpcio` + `protobuf` (gRPC user template) |
| `mqtt` | `paho-mqtt` (MQTT user template) |
| `prometheus` | `prometheus-client` (Prometheus exporter) |
| `opentelemetry` | OpenTelemetry SDK + OTLP gRPC exporter |
| `metrics` | `prometheus` + `opentelemetry` bundle |
| `faker` | `Faker` (powers `${faker.method}` placeholders) |
| `mcp` | `mcp` SDK (drives the MCP server) |
| `all` | Everything above |

```bash
pip install "je_load_density[gui]"
pip install "je_load_density[mqtt,grpc,websocket]"
pip install "je_load_density[metrics]"
pip install "je_load_density[mcp]"
pip install "je_load_density[all]"
```

### Development install

```bash
git clone https://github.com/Integration-Automation/LoadDensity.git
cd LoadDensity
pip install -e ".[all]"
pip install -r requirements.txt
```

## Architecture

```mermaid
flowchart TD
    subgraph Entry["Entry Surfaces"]
        CLI[CLI]
        MCP[MCP Server]
        GUI[GUI]
        SOCK[Control Socket]
    end

    Entry -- "action JSON" --> EXEC["Action Executor<br/>(LD_* dispatch + safe builtins)"]
    EXEC -- "start_test" --> WRAPPER["locust_wrapper_proxy<br/>(per-protocol task store)"]

    WRAPPER --> HTTP["HTTP / FastHttp"]
    WRAPPER --> WS["WebSocket"]
    WRAPPER --> GRPC["gRPC"]
    WRAPPER --> MQTT["MQTT"]
    WRAPPER --> RAW["Raw TCP / UDP"]

    HTTP -- "Locust events" --> BUS([Locust Event Bus])
    WS --> BUS
    GRPC --> BUS
    MQTT --> BUS
    RAW --> BUS

    BUS --> REC["test_record_instance"]
    BUS --> METRICS["Prometheus / InfluxDB / OTel"]

    REC --> REPORTS["HTML / JSON / XML / CSV / JUnit / Summary reports"]
    REC --> SQLITE[("SQLite persistence<br/>(cross-run comparison)")]
```

The dependency direction always points from the action layer down to Locust, never the other way around.

## Quick Start

### HTTP load test in Python

```python
from je_load_density import start_test

start_test(
    user_detail_dict={"user": "fast_http_user"},
    user_count=50,
    spawn_rate=10,
    test_time=30,
    variables={"base": "https://httpbin.org"},
    tasks=[
        {"method": "get",  "request_url": "${var.base}/get"},
        {"method": "post", "request_url": "${var.base}/post",
         "json": {"hello": "world"},
         "assertions": [{"type": "status_code", "value": 200}]},
    ],
)
```

### Action JSON

```json
{"load_density": [
  ["LD_register_variables", {"variables": {"base": "https://httpbin.org"}}],
  ["LD_start_test", {
    "user_detail_dict": {"user": "fast_http_user"},
    "user_count": 20, "spawn_rate": 10, "test_time": 30,
    "tasks": [
      {"method": "get",  "request_url": "${var.base}/get"},
      {"method": "post", "request_url": "${var.base}/post",
       "json": {"hello": "world"}}
    ]
  }],
  ["LD_generate_summary_report", {"report_name": "smoke"}]
]}
```

Run via the CLI:

```bash
python -m je_load_density run smoke.json
```

## Core API

```python
from je_load_density import (
    start_test, prepare_env, create_env,
    execute_action, execute_files, executor, add_command_to_executor,
    test_record_instance, locust_wrapper_proxy,
    register_variable, register_variables,
    register_csv_source, register_csv_sources,
    parameter_resolver, resolve,
    har_to_action_json, har_to_tasks, load_har,
    persist_records, list_runs, fetch_run_records,
    start_prometheus_exporter, stop_prometheus_exporter,
    start_influxdb_sink, stop_influxdb_sink,
    start_opentelemetry_exporter, stop_opentelemetry_exporter,
    start_load_density_socket_server,
    generate_html_report, generate_json_report, generate_xml_report,
    generate_csv_report, generate_junit_report, generate_summary_report,
    build_summary,
    create_project_dir, callback_executor, read_action_json,
)
```

`__all__` documents the full public surface in `je_load_density/__init__.py`.

## Action Executor

The action executor maps command strings to callable functions. Every action is a list:

```python
["command_name"]                        # No parameters
["command_name", {"key": "value"}]      # Keyword arguments
["command_name", [arg1, arg2]]          # Positional arguments
```

The top-level document is either a bare list or `{"load_density": [...]}`.

### Built-in `LD_*` commands

| Group | Commands |
|-------|----------|
| Core | `LD_start_test`, `LD_execute_action`, `LD_execute_files`, `LD_add_package_to_executor`, `LD_start_socket_server` |
| Reports | `LD_generate_html(_report)`, `LD_generate_json(_report)`, `LD_generate_xml(_report)`, `LD_generate_csv_report`, `LD_generate_junit_report`, `LD_generate_summary_report`, `LD_summary` |
| Persistence | `LD_persist_records`, `LD_list_runs`, `LD_fetch_run_records`, `LD_clear_records` |
| Parameters | `LD_register_variable(s)`, `LD_register_csv_source(s)`, `LD_clear_resolver` |
| Recording | `LD_load_har`, `LD_har_to_tasks`, `LD_har_to_action_json` |
| Metrics | `LD_start/stop_prometheus_exporter`, `LD_start/stop_influxdb_sink`, `LD_start/stop_opentelemetry_exporter` |

Safe Python built-ins (`print`, `len`, `range`, …) are also accepted; `eval`, `exec`, `compile`, `__import__`, `breakpoint`, `open`, and `input` are explicitly blocked.

### Custom commands

```python
from je_load_density import add_command_to_executor

def slack_notify(message: str) -> None:
    ...

add_command_to_executor({"LD_slack_notify": slack_notify})
```

## User Templates

Every template registers under `start_test` via `user_detail_dict={"user": "<key>"}`. Tasks share the same shape across HTTP, WebSocket, gRPC, MQTT, and raw socket users; only the protocol-specific fields differ.

### HTTP / FastHttp

```python
start_test(
    user_detail_dict={"user": "fast_http_user"},
    user_count=50, spawn_rate=10, test_time=60,
    variables={"base": "https://api.example.com"},
    tasks=[
        {"method": "post", "request_url": "${var.base}/login",
         "json": {"email": "u@example.com", "password": "secret"},
         "extract": [{"var": "auth", "from": "json_path", "path": "data.token"}]},
        {"method": "get", "request_url": "${var.base}/profile",
         "headers": {"Authorization": "Bearer ${var.auth}"},
         "assertions": [{"type": "status_code", "value": 200}]},
    ],
)
```

### WebSocket

`pip install je_load_density[websocket]`

```python
start_test(
    user_detail_dict={"user": "websocket_user"},
    user_count=10, spawn_rate=5, test_time=60,
    tasks=[
        {"method": "connect", "request_url": "wss://echo.example.com/socket"},
        {"method": "sendrecv", "payload": '{"ping": 1}', "expect": "pong"},
        {"method": "close"},
    ],
)
```

### gRPC

`pip install je_load_density[grpc]`

```python
start_test(
    user_detail_dict={"user": "grpc_user"},
    user_count=20, spawn_rate=5, test_time=60,
    tasks=[{
        "name": "say_hello",
        "target": "localhost:50051",
        "stub_path": "pkg.greeter_pb2_grpc.GreeterStub",
        "request_path": "pkg.greeter_pb2.HelloRequest",
        "method": "SayHello",
        "payload": {"name": "world"},
        "metadata": [["x-token", "abc"]],
        "timeout": 5,
    }],
)
```

`stub_path` and `request_path` are validated against a strict identifier regex before `importlib.import_module`, so traversal-style attacks are rejected.

### MQTT

`pip install je_load_density[mqtt]`

```python
start_test(
    user_detail_dict={"user": "mqtt_user"},
    user_count=10, spawn_rate=5, test_time=60,
    tasks=[
        {"method": "connect",   "broker": "127.0.0.1:1883"},
        {"method": "subscribe", "topic":  "telemetry/in", "qos": 1},
        {"method": "publish",   "topic":  "telemetry/out", "payload": "ping", "qos": 1},
        {"method": "disconnect"},
    ],
)
```

### Raw TCP / UDP

Stdlib only; nothing to install.

```python
start_test(
    user_detail_dict={"user": "socket_user"},
    user_count=20, spawn_rate=5, test_time=60,
    tasks=[
        {"protocol": "tcp", "target": "127.0.0.1:9000",
         "payload": "PING\n", "expect_bytes": 64,
         "expect_substring": "PONG"},
        {"protocol": "udp", "target": "127.0.0.1:9000",
         "payload": "hex:DEADBEEF", "expect_bytes": 4},
    ],
)
```

## Parameter Resolver

Placeholders are expanded automatically on every task:

| Placeholder | Resolves to |
|-------------|-------------|
| `${var.NAME}` | Value passed to `register_variable(s)` |
| `${env.NAME}` | Environment variable `NAME` |
| `${csv.SOURCE.COL}` | Next row from CSV source `SOURCE` (cycles by default) |
| `${faker.METHOD}` | `Faker().METHOD()` (lazy import) |
| `${uuid()}` | New UUID 4 string |
| `${now()}` | Local ISO-8601 timestamp (seconds) |
| `${randint(min, max)}` | Cryptographically-strong random int |

```python
from je_load_density import register_variable, register_csv_source

register_variable("base", "https://api.example.com")
register_csv_source("users", "users.csv")
```

Or from action JSON:

```json
["LD_register_variables", {"variables": {"base": "https://api.example.com"}}]
["LD_register_csv_sources", {"sources": [{"name": "users", "file_path": "users.csv"}]}]
```

Unknown placeholders are left in place so missing data is visible during a dry run.

## Scenario Modes

```json
{
  "mode": "weighted",
  "tasks": [
    {"method": "get", "request_url": "/products", "weight": 3},
    {"method": "get", "request_url": "/expensive", "weight": 1}
  ]
}
```

| Mode | Behaviour |
|------|-----------|
| `sequence` | Run every task in order each tick (default) |
| `weighted` | Pick one task per tick by `weight` |
| `conditional` | Use `run_if` / `skip_if` predicates evaluated against the parameter resolver |

Predicates: `bool`, `"${var.x}"`, `{"equals": [a,b]}`, `{"not_equals": [a,b]}`, `{"in": [needle, haystack]}`, `{"truthy": value}`.

## Assertions & Extractors

Both run under Locust's `catch_response`; failed assertions surface in every report.

```json
{
  "method": "post",
  "request_url": "${var.base}/login",
  "json": {"email": "u@example.com", "password": "secret"},
  "assertions": [
    {"type": "status_code", "value": 200},
    {"type": "json_path", "path": "data.role", "value": "admin"}
  ],
  "extract": [
    {"var": "auth_token", "from": "json_path", "path": "data.token"},
    {"var": "request_id", "from": "header",    "name": "X-Request-Id"}
  ]
}
```

Assertion types: `status_code`, `contains`, `not_contains`, `json_path`, `header`. Extractor sources: `json_path`, `header`, `status_code`.

## Reports

Six formats consumed from `test_record_instance`:

```python
from je_load_density import (
    generate_html_report, generate_json_report, generate_xml_report,
    generate_csv_report, generate_junit_report, generate_summary_report,
)

generate_html_report("report")           # report.html
generate_json_report("report")           # report_success.json + report_failure.json
generate_xml_report("report")            # report_success.xml  + report_failure.xml
generate_csv_report("report")            # report.csv
generate_junit_report("report-junit")    # report-junit.xml (CI)
generate_summary_report("report-sum")    # totals + per-name p50/p90/p95/p99
```

## Observability

```python
from je_load_density import (
    start_prometheus_exporter, start_influxdb_sink, start_opentelemetry_exporter,
)

start_prometheus_exporter(port=9646, addr="127.0.0.1")
start_influxdb_sink(transport="udp", host="influxdb", port=8089)
start_opentelemetry_exporter(endpoint="http://otel-collector:4317",
                             service_name="loaddensity")
```

| Sink | Metrics |
|------|---------|
| Prometheus | `loaddensity_requests_total`, `loaddensity_request_latency_ms`, `loaddensity_response_bytes` |
| InfluxDB | `loaddensity_request` line-protocol points (UDP or HTTP) |
| OTel | `loaddensity.requests`, `loaddensity.request.latency`, `loaddensity.response.size` |

All three are loaded lazily and gated by the matching install extra.

## Distributed Master / Worker

```python
# master
start_test(
    user_detail_dict={"user": "fast_http_user"},
    runner_mode="master",
    master_bind_host="0.0.0.0", master_bind_port=5557,
    expected_workers=4,
    web_ui_dict={"host": "0.0.0.0", "port": 8089},
    user_count=400, spawn_rate=40, test_time=600,
    tasks=[...],
)

# worker
start_test(
    user_detail_dict={"user": "fast_http_user"},
    runner_mode="worker",
    master_host="10.0.0.10", master_port=5557,
    tasks=[...],
)
```

The master waits up to 60 s for `expected_workers` workers to register before starting the load ramp.

## HAR Record / Replay

```python
from je_load_density import load_har, har_to_action_json

har = load_har("recording.har")
action_json = har_to_action_json(
    har,
    user="fast_http_user",
    user_count=20, spawn_rate=10, test_time=120,
    include=[r"api\.example\.com"],
    exclude=[r"\.svg$"],
)
```

Captures from Chrome / Firefox DevTools, mitmproxy, Charles, etc. all work. Status codes flow through as `status_code` assertions on every generated task.

## Persistent Records (SQLite)

```python
from je_load_density import persist_records, list_runs, fetch_run_records

run_id = persist_records(
    "loadtests.db",
    label="checkout-2026-04-28",
    metadata={"branch": "dev", "commit": "abc1234"},
)
for row in list_runs("loadtests.db", limit=10):
    print(row)
```

Schema is created lazily; an empty file is fine. Indexes on `run_id` and `name` keep cross-run queries fast.

## MCP Server (for Claude)

```bash
pip install "je_load_density[mcp]"
python -m je_load_density.mcp_server
```

Wire it into Claude Desktop / Code:

```json
{
  "mcpServers": {
    "loaddensity": {
      "command": "python",
      "args": ["-m", "je_load_density.mcp_server"]
    }
  }
}
```

Eleven tools are exposed: `run_test`, `run_action_json`, `create_project`, `list_executor_commands`, `import_har`, `generate_reports`, `summary`, `persist_records`, `list_runs`, `fetch_run`, `clear_records`.

## Hardened Control Socket

```bash
python -m je_load_density serve \
    --host 0.0.0.0 --port 9940 --framed \
    --token "$LOAD_DENSITY_SOCKET_TOKEN" \
    --tls-cert /etc/loaddensity/server.crt \
    --tls-key /etc/loaddensity/server.key
```

- 4-byte big-endian length-prefixed frames (1 MiB cap)
- Optional TLS (cert/key on disk; `ssl.create_default_context`, TLS 1.2+ minimum)
- Shared-secret token compared with `hmac.compare_digest`; once configured, all payloads must use `{"token": "...", "command": [...]}` and may set `"op": "quit"` to stop the server
- Token also reads from `LOAD_DENSITY_SOCKET_TOKEN` env var
- Legacy unauthenticated mode preserved for backwards compatibility

## GUI

```bash
pip install "je_load_density[gui]"
```

```python
import sys
from PySide6.QtWidgets import QApplication
from je_load_density.gui.main_window import LoadDensityUI

app = QApplication(sys.argv)
window = LoadDensityUI()
window.show()
sys.exit(app.exec())
```

The GUI ships English, Traditional Chinese, Japanese, and Korean translations and a live stats panel that polls `test_record_instance` once a second (RPS, average / p95 latency, failure count).

## CLI Usage

```
python -m je_load_density run FILE              # execute one action JSON file
python -m je_load_density run-dir DIR           # execute every .json in DIR
python -m je_load_density run-str JSON          # execute an inline JSON string
python -m je_load_density init PATH             # scaffold a project skeleton
python -m je_load_density serve [--host ...]    # start the control socket
```

Legacy single-flag form (`-e/-d/-c/--execute_str`) is still accepted for backwards compatibility with downstream tools.

## Test Record

`test_record_instance.test_record_list` and `error_record_list` collect every request with `Method`, `test_url`, `name`, `status_code`, `response_time_ms`, `response_length`, and (for failures) `error`. Reports and the SQLite sink read directly from these lists.

## Exception Handling

```
LoadDensityTestException
├── LoadDensityTestJsonException
├── LoadDensityGenerateJsonReportException
├── LoadDensityTestExecuteException
├── LoadDensityAssertException
├── LoadDensityHTMLException
├── LoadDensityAddCommandException
├── XMLException → XMLTypeException
└── CallbackExecutorException
```

All custom exceptions inherit from `LoadDensityTestException`; catching that one class covers the public surface.

## Logging

LoadDensity exposes a single configured logger (`load_density_logger`) under `je_load_density.utils.logging.loggin_instance`. Hook it into your existing log infrastructure with the standard `logging` module APIs.

## Supported Platforms

| Platform | Status |
|----------|--------|
| Windows 10 / 11 | Fully supported |
| macOS | Fully supported |
| Ubuntu / Linux | Fully supported |
| Raspberry Pi | Tested on 3B+ and later |

Python 3.10+ required.

## License

MIT — see [LICENSE](LICENSE).
