Skip to content

CRT.sh - Use direct Postgres db connection #2310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions bbot/modules/crt.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time
import asyncpg

from bbot.modules.templates.subdomain_enum import subdomain_enum


Expand All @@ -11,30 +14,53 @@ class crt(subdomain_enum):
"author": "@TheTechromancer",
}

base_url = "https://crt.sh"
deps_pip = ["asyncpg"]

db_host = "crt.sh"
db_port = 5432
db_user = "guest"
db_name = "certwatch"
reject_wildcards = False

async def setup(self):
self.cert_ids = set()
self.db_conn = None
return await super().setup()

async def request_url(self, query):
params = {"q": f"%.{query}", "output": "json"}
url = self.helpers.add_get_params(self.base_url, params).geturl()
return await self.api_request(url, timeout=self.http_timeout + 30)

async def parse_results(self, r, query):
results = set()
j = r.json()
for cert_info in j:
if not type(cert_info) == dict:
continue
cert_id = cert_info.get("id")
if cert_id:
if hash(cert_id) not in self.cert_ids:
self.cert_ids.add(hash(cert_id))
domain = cert_info.get("name_value")
if domain:
for d in domain.splitlines():
results.add(d.lower())
if not self.db_conn:
self.db_conn = await asyncpg.connect(
host=self.db_host, port=self.db_port, user=self.db_user, database=self.db_name
)

sql = """
WITH ci AS (
SELECT array_agg(DISTINCT sub.NAME_VALUE) NAME_VALUES
FROM (
SELECT DISTINCT cai.CERTIFICATE, cai.NAME_VALUE
FROM certificate_and_identities cai
WHERE plainto_tsquery('certwatch', $1) @@ identities(cai.CERTIFICATE)
AND cai.NAME_VALUE ILIKE ('%.' || $1)
LIMIT 50000
) sub
GROUP BY sub.CERTIFICATE
)
SELECT DISTINCT unnest(NAME_VALUES) as name_value FROM ci;
"""
start = time.time()
results = await self.db_conn.fetch(sql, query)
end = time.time()
self.verbose(f"SQL query executed in: {end - start} seconds with {len(results):,} results")
return results

async def parse_results(self, results, query):
domains = set()
for row in results:
domain = row["name_value"]
if domain:
for d in domain.splitlines():
domains.add(d.lower())
return domains

async def cleanup(self):
if self.db_conn:
await self.db_conn.close()
22 changes: 16 additions & 6 deletions bbot/test/test_step_2/module_tests/test_module_crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@

class TestCRT(ModuleTestBase):
async def setup_after_prep(self, module_test):
module_test.module.abort_if = lambda e: False
for t in self.targets:
module_test.httpx_mock.add_response(
url="https://crt.sh?q=%25.blacklanternsecurity.com&output=json",
json=[{"id": 1, "name_value": "asdf.blacklanternsecurity.com\nzzzz.blacklanternsecurity.com"}],
)
class AsyncMock:
async def fetch(self, *args, **kwargs):
print("mock_fetch", args, kwargs)
return [
{"name_value": "asdf.blacklanternsecurity.com"},
{"name_value": "zzzz.blacklanternsecurity.com"},
]

async def close(self):
pass

async def mock_connect(*args, **kwargs):
print("mock_connect", args, kwargs)
return AsyncMock()

module_test.monkeypatch.setattr("asyncpg.connect", mock_connect)

def check(self, module_test, events):
assert any(e.data == "asdf.blacklanternsecurity.com" for e in events), "Failed to detect subdomain"
Expand Down