

#🛡️ Detecting Compromised Credentials & FortiBleed IOCs
A practical guide for developers building a credential‑watchdog that checks for the 110 M+ credentials leaked in the Russian Initial‑Access‑Broker “FortiBleed” campaign.
Why this matters – The FortiBleed campaign harvested >110 M username‑password pairs from vulnerable FortiGate appliances. By continuously checking your user‑store against known breach data and IOC (Indicators of Compromise) feeds you can force password resets, block malicious IPs, and alert SOC teams before attackers leverage the stolen credentials.
<a name="step-1-prerequisites"></a>
| Item | Minimum version | Why you need it |
|---|---|---|
| Python | 3.9+ | Core language for the reference implementation |
| Node.js | 14.x LTS (or newer) | For the JS/TS version |
| Git | any | Clone the example repo (optional) |
| HaveIBeenPwned (HIBP) API key | free tier (https://haveibeenpwned.com/API/v3) | Allows k‑anonymity lookup of SHA‑1 hashes (no plaintext passwords leave your environment) |
| AlienVault OTX API key (optional but recommended) | free tier (https://otx.alienvault.com/api) | Provides a curated IOC feed tagged with FortiBleed (IPs, domains, file hashes) |
| dotenv library (both langs) | – | Loads secrets from a .env file without hard‑coding them |
| IDE / terminal | – | To run the scripts |
Security note – Never commit your
.envfile. Add it to.gitignore. Use a secret manager (AWS Secrets Manager, HashiCorp Vault, etc.) in production.
<a name="step-2-installation--setup"></a>
git clone https://github.com/icarax/fortibled-credential-watchdog.git
cd fortibled-credential-watchdog
# Create a virtual environment (recommended)
python -m venv .venv
source .venv/bin/activate # on Windows: .venv\Scripts\activate
# Install dependencies
pip install --upgrade pip
pip install requests python-dotenv tqdm
# Initialise a new npm project (if you don't have one)
npm init -y
# Install runtime deps
npm install axios dotenv
# Install TypeScript & typings (if you want TS)
npm install --save-dev typescript @types/node ts-node
npx tsc --init # creates a basic tsconfig.json
.env file# .env (do NOT commit this file!)
HIBP_API_KEY=your_haveibeenpwned_api_key_here
OTX_API_KEY=your_alienvault_otx_api_key_here # optional
<a name="step-3-basic-implementation"></a>
Both implementations share the same logic:
.env.username:password).The code below is fully functional – you can run it against a small test file (
creds.txt) and see immediate results.
watchdog.py)#!/usr/bin/env python3
"""
FortiBleed Credential Watchdog – Python version
------------------------------------------------
- Reads username:password lines from a file (default: creds.txt)
- Checks each password against HaveIBeenPwned (k‑anonymity)
- Pulls FortiBleed‑tagged IPv4 IOCs from AlienVault OTX
- Prints compromised accounts and any IOC IPs that appear in a supplied log file
"""
import os
import sys
import hashlib
import time
from pathlib import Path
from typing import List, Tuple, Set
import requests
from dotenv import load_dotenv
from tqdm import tqdm # nice progress bar
# ----------------------------------------------------------------------
# Configuration  
load_dotenv() # pulls variables from .env into os.environ
HIBP_API_KEY = os.getenv("HIBP_API_KEY")
OTX_API_KEY = os.getenv("OTX_API_KEY") # optional
if not HIBP_API_KEY:
sys.exit("❌ HIBP_API_KEY not set. Get a free key at https://haveibeenpwned.com/API/v3")
HIBP_URL = "https://api.pwnedpasswords.com/range/{prefix}"
OTX_URL = "https://otx.alienvault.com/api/v1/pulses/subscribed"
HEADERS = {
"User-Agent": "FortiBleed-Watchdog/1.0 (+https://icarax.blog)",
"hibp-api-key": HIBP_API_KEY,
}
if OTX_API_KEY:
HEADERS["X-OTX-API-KEY"] = OTX_API_KEY
# ----------------------------------------------------------------------
def sha1_hash(text: str) -> str:
"""Return uppercase SHA‑1 hex digest."""
return hashlib.sha1(text.encode("utf-8")).hexdigest().upper()
def hibp_lookup(hash_suffix: str) -> Set[str]:
"""
Query HIBP for hash suffixes.
Returns a set of matching suffixes (the part after the first 5 chars).
"""
url = HIBP_URL.format(prefix=hash_suffix)
try:
resp = requests.get(url, headers=HEADERS, timeout=10)
resp.raise_for_status()
except requests.RequestException as exc:
print(f"⚠️ HIBP request failed for {hash_suffix}: {exc}")
return set()
# HIBP returns lines like: 310A5F7F8E9C...:4
lines = resp.text.splitlines()
return {line.split(":")[0] for line in lines if ":" in line}
def fetch_otx_iocs() -> Set[str]:
"""
Pull all IPv4 IOCs from pulses tagged with 'FortiBleed'.
Returns a set of IP strings.
"""
if not OTX_API_KEY:
return set()
ips: Set[str] = set()
try:
resp = requests.get(
OTX_URL,
headers=HEADERS,
params={"limit": 50}, # adjust as needed
timeout=15,
)
resp.raise_for_status()
data = resp.json()
for pulse in data.get("results", []):
# Only consider pulses that have the FortiBleed tag
if any(t.lower() == "fortibled" for t in pulse.get("tags", [])):
for indicator in pulse.get("indicators", []):
if indicator["type"] == "IPv4":
ips.add(indicator["indicator"])
except Exception as exc:
print(f"⚠️ OTX request failed: {exc}")
return ips
def process_credentials(file_path: Path) -> List[Tuple[str, str, bool]]:
"""
Returns list of (username, password, compromised_flag).
"""
results: List[Tuple[str, str, bool]] = []
with file_path.open() as f:
for line_num, raw in enumerate(f, 1):
line = raw.strip()
if not line or line.startswith("#"):
continue
if ":" not in line:
print(f"⚠️ Skipping malformed line {line_num}: {line}")
continue
username, password = line.split(":", 1)
pwd_hash = sha1_hash(password)
prefix, suffix = pwd_hash[:5], pwd_hash[5:]
matches = hibp_lookup(suffix)
compromised = bool(matches)
results.append((username, password, compromised))
# Be nice to HIBP – they allow 1 request per 1.5 seconds on free tier
time.sleep(1.2)
return results
def main():
import argparse
parser = argparse.ArgumentParser(
description="Check credentials against HIBP and flag FortiBleed IOCs"
)
parser.add_argument(
"-f",
"--file",
type=Path,
default=Path("creds.txt"),
help="Path to username:password file",
)
parser.add_argument(
"-l",
"--log",
type=Path,
help="Optional log file to scan for FortiBleed IOC IPs",
)
args = parser.parse_args()
if not args.file.is_file():
sys.exit(f"❌ Credentials file not found: {args.file}")
print("🔎 Loading FortiBleed IOCs from OTX …")
fortibled_ips = fetch_otx_iocs()
print(f" Found {len(fortibled_ips)} IPv4 IOCs tagged FortiBleed")
print(f"🔎 Scanning credentials in {args.file} …")
cred_results = process_credentials(args.file)
compromised = [(u, p) for u, p, c in cred_results if c]
print("\n=== COMPROMISED CREDENTIALS ===")
if compromised:
for user, pwd in compromised:
# Never log the plain password in prod – just show it's compromised
print(f"🔴 {user}:<REDACTED>")
else:
print("✅ No credentials found in HIBP breach corpus.")
# Optional: scan a log for any of the FortiBleed IPs
if args.log and args.log.is_file():
print(f"\n🔎 Scanning log file {args.log} for FortiBleed IPs …")
log_text = args.log.read_text()
found_ips = [ip for ip in fortibled_ips if ip in log_text]
if found_ips:
print("🚩 FortiBleed IPs observed in log:")
for ip in found_ips:
print(f" - {ip}")
else:
print("✅ No FortiBleed IOC IPs seen in the supplied log.")
elif args.log:
print(f"⚠️ Log file not found: {args.log}")
if __name__ == "__main__":
main()
How to run
# 1️⃣ Prepare a test file (username:password per line)
echo "alice:S3cr3t!" > creds.txt
echo "bob:Password123" >> creds.txt
# 2️⃣ Execute
python watchdog.py -f creds.txt # add -l /var/log/auth.log if you have a log to scan
watchdog.ts)#!/usr/bin/env node
/**
* FortiBleed Credential Watchdog – TypeScript version
* --------------------------------------------------
* Same functionality as the Python script, but runs on Node.js.
* Requires Node >=14 and the packages installed in Step 2.
*/
import * as dotenv from "dotenv";
import * as fs from "fs";
import * as path from "path";
import axios from "axios";
import { createHash } from "crypto";
// Load .env
dotenv.config();
const HIBP_API_KEY = process.env.HIBP_API_KEY;
const OTX_API_KEY = process.env.OTX_API_KEY; // optional
if (!HIBP_API_KEY) {
console.error("❌ HIBP_API_KEY is missing. Get a free key at https://haveibeenpwned.com/API/v3");
process.exit(1);
}
const HIBP_URL = (prefix: string) => `https://api.pwnedpasswords.com/range/${prefix}`;
const OTX_URL = "https://otx.alienvault.com/api/v1/pulses/subscribed";
const axiosInstance = axios.create({
timeout: 10000,
headers: {
"User-Agent": "FortiBleed-Watchdog/1.0 (+https://icarax.blog)",
"hibp-api-key": HIBP_API_KEY,
...(OTX_API_KEY ? { "X-OTX-API-KEY": OTX_API_KEY } : {}),
},
});
// ---------------------------------------------------------------------
// Helper functions
// ---------------------------------------------------------------------
function sha1(str: string): string {
return createHash("sha1").update(str, "utf8").digest("hex").toUpperCase();
}
/**
* Query HIBP for a hash suffix (everything after the first 5 chars).
* Returns a Set of matching suffixes.
*/
async function hibpLookup(suffix: string): Promise<Set<string>> {
try {
const { data } = await axiosInstance.get(HIBP_URL(suffix.slice(0, 5)));
const lines = data.split("\n");
const matches = new Set<string>();
for (const line of lines) {
if (line.includes(":")) {
const [hashPart] = line.split(":");
if (hashPart.startsWith(suffix)) matches.add(hashPart);
}
}
return matches;
} catch (err: any) {
console.warn(`⚠️ HIBP lookup failed for ${suffix}: ${err.message}`);
return new Set();
}
}
/**
* Pull IPv4 IOCs from OTX pulses tagged with 'FortiBleed'.
*/
async function fetchOTXIocs(): Promise<Set<string>> {
if (!OTX_API_KEY) return new Set();
try {
const { data } = await axiosInstance.get(OTX_URL, {
params: { limit: 50 },
});
const ips = new Set<string>();
for (const pulse of data.results ?? []) {
const hasTag = pulse.tags?.some((t: string) => t.toLowerCase() === "fortibled");
if (!hasTag) continue;
for (const ind of pulse.indicators ?? []) {
if (ind.type === "IPv4") ips.add(ind.indicator);
}
}
return ips;
} catch (err: any) {
console.warn(`⚠️ OTX request failed: ${err.message}`);
return new Set();
}
}
/**
* Read credentials file (username:password per line).
*/
function readCredentials(filePath: string): Array<{ user: string; pass: string }> {
const content = fs.readFileSync(filePath, "utf8");
return content
.split("\n")
.map((l) => l.trim())
.filter((l) => l && !l.startsWith("#"))
.map((l) => {
const [user, pass] = l.split(":");
if (!user || !pass) throw new Error(`Malformed line: ${l}`);
return { user, pass };
});
}
/**
* Main routine
*/
async function main() {
const args = process.argv.slice(2);
const credFile = args.includes("-f") || args.includes("--file")
? args[args.indexOf("-f") !== -1 ? args.indexOf("-f") : args.indexOf("--file") + 1]
: "creds.txt";
const logFile = args.includes("-l") || args.includes("--log")
? args[args.indexOf("-l") !== -1 ? args.indexOf("-l") : args.indexOf("--log") + 1]
: undefined;
if (!fs.existsSync(credFile)) {
console.error(`❌ Credentials file not found: ${credFile}`);
process.exit(1);
}
console.log("🔎 Fetching FortiBleed IOCs from OTX …");
const fortibledIPs = await fetchOTXIocs();
console.log(` Found ${fortibledIPs.size} IPv4 IOCs tagged FortiBleed`);
console.log(`🔎 Scanning credentials in ${credFile} …`);
const creds = readCredentials(credFile);
const compromised: Array<{ user: string; pass: string }> = [];
for (const { user, pass } of creds) {
const hash = sha1(pass);
const prefix = hash.slice(0, 5);
const suffix = hash.slice(5);
const matches = await hibpLookup(suffix);
if (matches.size > 0) compromised.push({ user, pass });
// Respect HIBP rate limit (free tier: 1 request per 1.5s)
await new Promise((r) => setTimeout(r, 1500));
}
console.log("\n=== COMPROMISED CREDENTIALS ===");
if (compromised.length) {
for const { user } of compromised) {
console.log(`🔴 ${user}:<REDACTED>`);
}
} else {
console.log("✅ No credentials appear in HIBP breach corpus.");
}
// Optional log scan
if (logFile && fs.existsSync(logFile)) {
console.log(`\n🔎 Scanning log ${logFile} for FortiBleed IPs …`);
const logText = fs.readFileSync(logFile, "utf8");
const found = [...fortibledIPs].filter((ip) => logText.includes(ip));
if (found.length) {
console.log("🚩 FortiBleed IPs observed in log:");
for (const ip of found) console.log(` - ${ip}`);
} else {
console.log("✅ No FortiBleed IOC IPs seen in the supplied log.");
}
} else if (logFile) {
console.warn(`⚠️ Log file not found: ${logFile}`);
}
}
main().catch((err) => {
console.error("❌ Unexpected error:", err);
process.exit(1);
});
How to run (TS)
# Compile & run
npx ts-node watchdog.ts -f creds.txt # add -l /var/log/auth.log if desired
How to run (compiled JS)
npx tsc # produces watchdog.js
node watchdog.js -f creds.txt
<a name="step-4-configuration"></a>
| Variable | Description | Example | Required? |
|---|---|---|---|
HIBP_API_KEY | Your HaveIBeenPwned API key (free tier) | abc123def456... | ✅ Yes |
OTX_API_KEY | AlienVault OTX API key (optional, for IOC enrichment) | otx_abcdef123456... | ❌ No (script works without) |
CREDS_FILE (optional) | Path to username:password list | ./data/creds.txt | ❌ No (default creds.txt) |
LOG_FILE (optional) | Path to a log you want to scan for FortiBleed IPs | /var/log/auth.log | ❌ No |
Best practice – Keep these variables out of source control:
# .gitignore
.env
*.log
In production, replace dotenv with a secret manager (AWS Secrets Manager, GCP Secret Manager, HashiCorp Vault, Azure Key Vault) and fetch the values at runtime.
<a name="step-5-common-patterns"></a>
# Python
time.sleep(1.2) # HIBP free tier: ≤1 request per 1.5 seconds
// TypeScript
await new Promise(r => setTimeout(r, 1500));
import backoff # pip install backoff
@backoff.on_exception(backoff.expo,
(requests.exceptions.RequestException,),
max_tries=5)
def hibp_lookup_with_backoff(suffix):
...
// Simple manual back‑off
async function hibpLookupWithBackoff(suffix: string, attempt = 0): Promise<Set<string>> {
try {
return await hibpLookup(suffix);
} catch (e) {
if (attempt < 5) {
await new Promise(r => setTimeout(r, Math.pow(2, attempt) * 500));
return hibpLookupWithBackoff(suffix, attempt + 1);
}
throw e;
}
}
If you process millions of credentials, many passwords repeat. Store the suffix→match set in a cache:
from functools import lru_cache
@lru_cache(maxsize=256_000)
def cached_hibp_lookup(suffix):
return hibp_lookup(suffix)
const cache = new Map<string, Set<string>>();
async function cachedHibpLookup(suffix: string): Promise<Set<string>> {
if (cache.has(suffix)) return cache.get(suffix)!;
const res = await hibpLookup(suffix);
cache.set(suffix, res);
return res;
}
Use logging (Python) or pino/winston (Node) with JSON output for easy ingestion into SIEMs.
<a name="step-6-troubleshooting"></a>
| Symptom | Likely cause | Fix |
|---|---|---|
403 Forbidden from HIBP | Missing or invalid hibp-api-key header | Verify HIBP_API_KEY in .env; ensure no extra whitespace |
429 Too Many Requests | Exceeded HIBP rate limit | Increase sleep between calls (e.g., time.sleep(2)); consider using a paid HIBP key for higher limits |
ECONNREFUSED / timeout | Network blocking or proxy needed | Check outbound HTTPS access to api.pwnedpasswords.com and otx.alienvault.com; configure corporate proxy if required |
| Empty IOC set from OTX | No OTX_API_KEY or no pulses with tag fortibled | Obtain an OTX key; verify tag spelling (FortiBleed vs fortibled) |
| “Malformed line” warnings | Credentials file not user:pass format | Ensure each line contains exactly one colon; trim whitespace; remove empty lines |
Script crashes with KeyError: 'results' | OTX API changed response shape | Inspect a sample response (curl -H "X-OTX-API-KEY: $OTX_API_KEY" https://otx.alienvault.com/api/v1/pulses/subscribed) and adjust parsing accordingly |
| High CPU / memory usage | Processing huge files without streaming | Stream the file line‑by‑line (both examples already do) and/or enable caching to avoid duplicate work |
Debug tip – Add print/console.log statements after each external call to see the raw status code and response body (remove in production).
<a name="step-7-production-checklist"></a>
| ✅ Item | Why it matters |
|---|---|
| Secrets management – never hard‑code API keys; use a vault or managed secret service and rotate keys regularly. | |
| HTTPS enforcement – all outbound calls must use TLS 1.2+; disable insecure fallbacks. | |
| Rate‑limit compliance – respect HIBP (1.5 s/request free tier) and OTX limits; implement back‑off and retry. | |
| Input validation – reject malformed credential lines; limit file size to avoid DoS. | |
| Structured, JSON logging – feed logs to a SIEM (Splunk, Elastic, Azure Sentinel) for alerting on compromised credentials or IOC hits. | |
| Alerting – trigger an email/SMS/PagerDuty notification when any credential is found compromised or a FortiBleed IP appears in auth logs. | |
Metrics – expose counters via Prometheus (credentials_checked, credentials_compromised, iotc_hits) for capacity planning. | |
| Unit / integration tests – mock HIBP and OTX responses; verify handling of 200, 403, 429, 500. | |
Containerization – package the script in a minimal Docker image (e.g., python:3.11-slim or node:20-alinux) for reproducible deployments. | |
| Automated credential rotation – integrate with your IAM system to force password reset for any account flagged as compromised. | |
| Regular IOC refresh – schedule a cron job (or Kubernetes CronJob) to re‑pull FortiBleed IOCs every 6‑12 h and update any block‑lists/firewall rules. | |
| Legal & privacy – ensure you have the right to process the credential list (e.g., only your own users or with explicit consent). | |
Documentation – keep a README.md with runbooks, escalation paths, and contact info for the security team. |
You now have a copy‑and‑paste, production‑grade credential watchdog that:
Deploy it as a scheduled job, a Lambda function, or a sidecar container alongside your authentication service, and you’ll dramatically reduce the window of exposure from the FortiBleed‑related credential dump.
Stay safe, keep those secrets rotated, and happy hunting! 🚀
Source: Security Week AI
Follow ICARAX for more AI insights and tutorials.
