Skip to content

Add graphql introspection module #2515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions bbot/modules/graphql_introspection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import json
from pathlib import Path
from bbot.modules.base import BaseModule


class graphql_introspection(BaseModule):
watched_events = ["URL"]
produced_events = ["FINDING"]
flags = ["safe", "active", "web-basic"]
meta = {
"description": "Perform GraphQL introspection on a target",
"created_date": "2025-07-01",
"author": "@mukesh-dream11",
}
options = {
"graphql_endpoint_urls": ["", "graphql", "v1/graphql"],
"output_folder": "",
}
options_desc = {
"graphql_endpoint_urls": "List of GraphQL endpoint to suffix to the target URL",
"output_folder": "Folder to save the GraphQL schemas to",
}

async def setup(self):
output_folder = self.config.get("output_folder", "")
if output_folder:
self.output_dir = Path(output_folder) / "graphql-schemas"
else:
self.output_dir = self.scan.home / "graphql-schemas"
self.helpers.mkdir(self.output_dir)
return True

async def handle_event(self, event):
if self.helpers.url_depth(event.data) > 1:
return

for endpoint_url in self.config.get("graphql_endpoint_urls", []):
url = f"{event.data}{endpoint_url}"
request_args = {
"url": url,
"method": "POST",
"json": {
"query": """\
query IntrospectionQuery {
__schema {
queryType {
name
}
mutationType {
name
}
types {
name
kind
description
fields(includeDeprecated: true) {
name
description
type {
... TypeRef
}
isDeprecated
deprecationReason
}
interfaces {
... TypeRef
}
possibleTypes {
... TypeRef
}
enumValues(includeDeprecated: true) {
name
description
isDeprecated
deprecationReason
}
ofType {
... TypeRef
}
}
}
}

fragment TypeRef on __Type {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
}
}
}
}
}
}
}
}"""
},
}
response = await self.helpers.request(**request_args)
if not response or response.status_code != 200:
self.debug(f"Failed to get GraphQL schema for {url} (status code {response.status_code})")
continue
try:
response_json = response.json()
except json.JSONDecodeError:
self.debug(f"Failed to parse JSON for {url}")
continue
if response_json.get("data", {}).get("__schema", {}).get("types", []):
schema_output_dir = url.rstrip("/").replace(":", "-").replace("/", "-")
schema_output_dir = self.output_dir / schema_output_dir
self.helpers.mkdir(schema_output_dir)

filename = "schema.json"
with open(schema_output_dir / filename, "w") as f:
json.dump(response_json, f)
await self.emit_event(
{"url": url, "description": "GraphQL schema"},
"FINDING",
event,
context=f"{{module}} found GraphQL schema at {url}",
)
# return, because we only want to find one schema per target
return
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from .base import ModuleTestBase


class TestGraphQLIntrospectionNon200(ModuleTestBase):
targets = ["http://127.0.0.1:8888"]
modules_overrides = ["graphql_introspection"]

async def setup_after_prep(self, module_test):
module_test.set_expect_requests(
expect_args={"method": "POST", "uri": "/"},
respond_args={"response_data": "ok"},
)

def check(self, module_test, events):
assert all(e.type != "FINDING" for e in events), "should have raised 0 events"


class TestGraphQLIntrospection(ModuleTestBase):
targets = ["http://127.0.0.1:8888"]
modules_overrides = ["graphql_introspection"]

async def setup_after_prep(self, module_test):
module_test.set_expect_requests(
expect_args={"method": "POST", "uri": "/"},
respond_args={
"response_data": """{"data": {"__schema": {"types": ["dummy"]}}}""",
},
)

def check(self, module_test, events):
finding = [e for e in events if e.type == "FINDING"]
assert finding, "should have raised 1 FINDING event"
assert finding[0].data["url"] == "http://127.0.0.1:8888/"
assert finding[0].data["description"] == "GraphQL schema"