Private
Public Access
1
0

new: dev: initialize the GTK client #11

This commit is contained in:
2026-04-28 08:28:51 +02:00
parent 199bb7e525
commit 6484133199
29 changed files with 2788 additions and 0 deletions

View File

View File

@@ -0,0 +1,93 @@
"""
This file is part of the SplendidBear Websites' projects.
Copyright (c) 2026 @ www.splendidbear.org
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
from __future__ import annotations
from mineseeker.api import client
class AuthError(Exception):
"""Raised on authentication failure."""
class TotpRequired(Exception):
"""Raised when the server requires a TOTP code after password login."""
def login(username: str, password: str) -> None:
"""
Authenticate with username + password via the dedicated JSON endpoint
POST /api/auth/login, which bypasses the reCAPTCHA gate.
Raises:
TotpRequired server confirmed credentials but TOTP is required next.
AuthError credentials wrong, account inactive, or server error.
"""
session = client.get_session()
resp = session.post(
client.url("/api/auth/login"),
json={"username": username, "password": password},
# The endpoint sets a session cookie; follow any redirects
allow_redirects=True,
)
# Non-2xx means a hard server error (500 etc.) — let it propagate
if resp.status_code >= 500:
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise AuthError(data.get("error", "Login failed."))
if data.get("requiresTwoFactor"):
raise TotpRequired()
def submit_totp(code: str) -> None:
"""
Submit the 6-digit TOTP code after login() raises TotpRequired.
The scheb/2fa bundle processes POST /2fa_check directly as a firewall
listener — no CSRF token required, no JSON body. The code goes as a
form-encoded field named _auth_code, same as the browser form.
The LoginCaptchaListener already skips /2fa_check paths.
Raises:
AuthError if the code is wrong or the session is no longer in
IS_AUTHENTICATED_2FA_IN_PROGRESS state.
"""
session = client.get_session()
resp = session.post(
client.url("/2fa_check"),
data={"_auth_code": code},
headers={"Content-Type": "application/x-www-form-urlencoded"},
allow_redirects=True,
)
resp.raise_for_status()
# If we land back on /2fa the code was wrong
if "/2fa" in resp.url:
raise AuthError("Invalid authentication code.")
def login_as_guest() -> None:
"""
Start an anonymous session.
A GET to the homepage is enough for Symfony to create a session and
assign the anon_<session_id> identity used by ResolveUserNamesService.
"""
session = client.get_session()
resp = session.get(client.url("/"), headers={"Accept": "text/html"})
resp.raise_for_status()
def logout() -> None:
"""Discard the local session (client-side only)."""
client.reset_session()

View File

@@ -0,0 +1,52 @@
"""
This file is part of the SplendidBear Websites' projects.
Copyright (c) 2026 @ www.splendidbear.org
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
from __future__ import annotations
import requests
from mineseeker import config
# Module-level singleton session shared by all API modules.
# Holds cookies (Symfony session cookie after login) across all requests.
_session: requests.Session | None = None
def get_session() -> requests.Session:
global _session
if _session is None:
_session = requests.Session()
_session.headers.update({
"Accept": "application/json",
"Content-Type": "application/json",
})
return _session
def reset_session() -> None:
"""Discard the current session (logout / new guest session)."""
global _session
_session = None
def url(path: str) -> str:
"""Build an absolute URL from a server-relative path."""
return f"{config.BASE_URL}/{path.lstrip('/')}"
def get(path: str, **kwargs) -> requests.Response:
resp = get_session().get(url(path), **kwargs)
resp.raise_for_status()
return resp
def post(path: str, json: dict | None = None, **kwargs) -> requests.Response:
resp = get_session().post(url(path), json=json, **kwargs)
resp.raise_for_status()
return resp

View File

@@ -0,0 +1,117 @@
"""
This file is part of the SplendidBear Websites' projects.
Copyright (c) 2026 @ www.splendidbear.org
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
from __future__ import annotations
import base64
import json
from mineseeker.api import client
def fetch_token() -> dict:
"""
GET /api/game/token
Returns { "mercureJwt": str, "gameAssoc": str }
"""
resp = client.get("/api/game/token")
return resp.json()
def connect(game_assoc: str) -> dict:
"""
GET /api/game/connect/{gameAssoc}
Returns the decoded connect-information dict.
"""
resp = client.get(f"/api/game/connect/{game_assoc}")
raw = resp.text.strip()
decoded = base64.b64decode(raw).decode("utf-8")
return json.loads(decoded)
def start(game_assoc: str) -> bool:
"""POST /api/game/start — initialise the grid for a new game."""
resp = client.post("/api/game/start", json={"gameAssoc": game_assoc})
return bool(resp.json().get("success", False))
def join(game_assoc: str) -> bool:
"""POST /api/game/join/{gameAssoc} — announce this player's presence."""
resp = client.post(f"/api/game/join/{game_assoc}", json={})
return bool(resp.json().get("success", False))
def step(
game_assoc: str,
coords: list[int],
player: str,
bomb: bool,
resign: str | None,
step_elapsed: float,
) -> dict:
"""
POST /api/game/step/{gameAssoc}
Returns the full step result dict published by TopicManager::publish().
"""
resp = client.post(
f"/api/game/step/{game_assoc}",
json={
"coords": coords,
"player": player,
"bomb": bomb,
"resign": resign,
"stepElapsed": step_elapsed,
},
)
return resp.json()
def leave(game_assoc: str) -> None:
"""POST /api/game/leave/{gameAssoc} — fire-and-forget on window close."""
try:
client.post(f"/api/game/leave/{game_assoc}", json={})
except Exception:
pass # best-effort
def heartbeat(game_assoc: str, color: str) -> None:
"""POST /api/game/heartbeat/{gameAssoc} — keep-alive ping."""
try:
client.post(f"/api/game/heartbeat/{game_assoc}", json={"color": color})
except Exception:
pass # best-effort
def waiting() -> list[dict]:
"""GET /api/game/waiting — list of waiting players in the lobby."""
resp = client.get("/api/game/waiting")
return resp.json()
def challenge(target_game_assoc: str, challenger_game_assoc: str) -> None:
"""POST /api/game/challenge/{targetGameAssoc}"""
client.post(
f"/api/game/challenge/{target_game_assoc}",
json={"challengerGameAssoc": challenger_game_assoc},
)
def challenge_respond(
challenger_game_assoc: str,
accepted: bool,
target_game_assoc: str,
) -> None:
"""POST /api/game/challenge/respond/{challengerGameAssoc}"""
client.post(
f"/api/game/challenge/respond/{challenger_game_assoc}",
json={
"accepted": accepted,
"targetGameAssoc": target_game_assoc,
},
)

View File

@@ -0,0 +1,163 @@
"""
This file is part of the SplendidBear Websites' projects.
Copyright (c) 2026 @ www.splendidbear.org
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
from __future__ import annotations
import json
import logging
import threading
import time
from collections.abc import Callable
from typing import Any
import requests
from gi.repository import GLib
from mineseeker import config
from mineseeker.api import client
from mineseeker.constants import SSE_RECONNECT_INITIAL, SSE_RECONNECT_MAX
log = logging.getLogger(__name__)
class SseListener:
"""
Opens a Mercure SSE connection in a daemon thread and dispatches
parsed JSON messages back to the GTK main thread via GLib.idle_add().
Message routing mirrors useServerCommunication.jsx handleMercureMessage():
payload.type == "challenge" → on_challenge(payload)
payload.type == "challenge-response" → on_challenge_response(payload)
payload.type == "heartbeat" → on_heartbeat(payload)
"data" key present → on_topic(payload)
"msg" key present → on_unsubscribe(payload)
(none of the above) → on_subscribe(payload)
"""
def __init__(
self,
game_assoc: str,
mercure_jwt: str,
*,
on_subscribe: Callable[[dict], Any] | None = None,
on_unsubscribe: Callable[[dict], Any] | None = None,
on_topic: Callable[[dict], Any] | None = None,
on_challenge: Callable[[dict], Any] | None = None,
on_challenge_response: Callable[[dict], Any] | None = None,
on_heartbeat: Callable[[dict], Any] | None = None,
) -> None:
self._game_assoc = game_assoc
self._mercure_jwt = mercure_jwt
self._handlers = {
"subscribe": on_subscribe,
"unsubscribe": on_unsubscribe,
"topic": on_topic,
"challenge": on_challenge,
"challenge-response": on_challenge_response,
"heartbeat": on_heartbeat,
}
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
# ------------------------------------------------------------------
# Public control
# ------------------------------------------------------------------
def start(self) -> None:
"""Start the background SSE listener thread."""
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run, daemon=True, name="sse-listener"
)
self._thread.start()
def stop(self) -> None:
"""Signal the background thread to stop."""
self._stop_event.set()
# ------------------------------------------------------------------
# Background thread
# ------------------------------------------------------------------
def _build_url(self) -> str:
topic = f"mineseeker/channel/{self._game_assoc}"
return f"{config.MERCURE_URL}?topic={topic}"
def _run(self) -> None:
backoff = SSE_RECONNECT_INITIAL
while not self._stop_event.is_set():
try:
self._stream()
backoff = SSE_RECONNECT_INITIAL # reset on clean disconnect
except Exception as exc:
if self._stop_event.is_set():
break
log.warning("SSE connection lost (%s), reconnecting in %.1fs", exc, backoff)
time.sleep(backoff)
backoff = min(backoff * 2, SSE_RECONNECT_MAX)
def _stream(self) -> None:
"""Open the SSE stream and process events until stopped or error."""
url = self._build_url()
headers = {
"Authorization": f"Bearer {self._mercure_jwt}",
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
}
# Use the requests session from client.py so cookies are included
resp = client.get_session().get(
url, headers=headers, stream=True, timeout=(10, None)
)
resp.raise_for_status()
# Parse the raw SSE stream manually (sseclient-py would also work
# but avoids an extra dependency on GLib-aware loops)
data_lines: list[str] = []
for raw_line in resp.iter_lines(decode_unicode=True):
if self._stop_event.is_set():
break
if raw_line.startswith("data:"):
data_lines.append(raw_line[5:].lstrip(" "))
elif raw_line == "" and data_lines:
# Empty line signals end of event — dispatch it
payload_str = "\n".join(data_lines)
data_lines = []
try:
payload = json.loads(payload_str)
GLib.idle_add(self._dispatch, payload)
except json.JSONDecodeError:
log.debug("Non-JSON SSE data ignored: %s", payload_str)
def _dispatch(self, payload: dict) -> bool:
"""Called on the GTK main thread via GLib.idle_add."""
msg_type = payload.get("type")
if msg_type == "challenge":
self._call("challenge", payload)
elif msg_type == "challenge-response":
self._call("challenge-response", payload)
elif msg_type == "heartbeat":
self._call("heartbeat", payload)
elif "data" in payload:
self._call("topic", payload)
elif "msg" in payload:
self._call("unsubscribe", payload)
else:
self._call("subscribe", payload)
return GLib.SOURCE_REMOVE # run once only
def _call(self, key: str, payload: dict) -> None:
handler = self._handlers.get(key)
if handler is not None:
try:
handler(payload)
except Exception:
log.exception("Error in SSE handler '%s'", key)