

Topic: Critical Vulnerabilities Patched in Fortinet & Ivanti Products
Context: High-impact Remote Code Execution (RCE) vulnerabilities in edge networking devices require real-time detection and automated alerting to prevent exploitation.
Before implementing the monitoring engine, ensure you have the following:
.env files).git for version control.# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install required production-grade libraries
pip install requests python-dotenv pydantic structlog
# Initialize project
npm init -y
# Install dependencies
# axios: HTTP client, dotenv: env management, zod: schema validation, winston: logging
npm install axios dotenv zod winston
npm install --save-dev typescript @types/node ts-node
We will build a Vulnerability Scanner Engine that queries the NVD API specifically for Fortinet and Ivanti keywords to identify critical RCEs.
scanner.py
import os
import requests
import structlog
from typing import List, Dict
from dotenv import load_dotenv
from pydantic import BaseModel, ValidationError
# Initialize structured logging for production observability
logger = structlog.get_logger()
load_dotenv()
# Data Model for validated CVE data
class VulnerabilityReport(BaseModel):
cve_id: str
description: str
severity: str
vendor: str
class VulnerabilityScanner:
def __init__(self):
self.api_key = os.getenv("NVD_API_KEY")
self.base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
self.target_vendors = ["Fortinet", "Ivanti"]
def fetch_critical_vulnerabilities(self) -> List[VulnerabilityReport]:
"""
Queries NVD API for specific vendors with CRITICAL severity.
"""
reports = []
for vendor in self.target_vendors:
logger.info("querying_nvd", vendor=vendor)
# Constructing query parameters for specific vendor and critical severity
params = {
"keywordSearch": vendor,
"cvssV3Severity": "CRITICAL"
}
headers = {"apiKey": self.api_key} if self.api_key else {}
try:
response = requests.get(self.base_url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
for item in data.get("vulnerabilities", []):
cve = item.get("cve", {})
cve_id = cve.get("id")
desc = cve.get("descriptions", [{}])[0].get("value", "No description")
# Extracting severity from metrics
metrics = cve.get("metrics", {}).get("cvssMetricV31", [{}])[0]
severity = metrics.get("cvssData", {}).get("baseSeverity", "UNKNOWN")
# Validate using Pydantic to ensure data integrity
report = VulnerabilityReport(
cve_id=cve_id,
description=desc[:200] + "...", # Truncate for brevity
severity=severity,
vendor=vendor
)
reports.append(report)
except requests.exceptions.RequestException as e:
logger.error("api_request_failed", vendor=vendor, error=str(e))
except ValidationError as e:
logger.error("data_validation_failed", vendor=vendor, error=str(e))
except Exception as e:
logger.error("unexpected_error", error=str(e))
return reports
if __name__ == "__main__":
scanner = VulnerabilityScanner()
results = scanner.fetch_critical_vulnerabilities()
print(f"\n--- Found {len(results)} Critical Vulnerabilities ---")
for r in results:
print(f"[{r.vendor}] {r.cve_id} | Severity: {r.severity}")
print(f"Details: {r.description}\n")
scanner.ts
import axios from 'axios';
import * as dotenv from 'dotenv';
import { z } from 'zod';
import winston from 'winston';
dotenv.config();
// Configure production logging
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [new winston.transports.Console()],
});
// Define schema for type-safe vulnerability objects
const VulnerabilitySchema = z.object({
cveId: z.string(),
description: z.string(),
severity: z.string(),
vendor: z.string(),
});
type Vulnerability = z.infer<typeof VulnerabilitySchema>;
class VulnerabilityScanner {
private readonly apiKey: string | undefined;
private readonly baseUrl = 'https://services.nvd.nist.gov/rest/json/cves/2.0';
private readonly vendors = ['Fortinet', 'Ivanti'];
constructor() {
this.apiKey = process.env.NVD_API_KEY;
}
public async scan(): Promise<Vulnerability[]> {
const allReports: Vulnerability[] = [];
for (const vendor of this.vendors) {
logger.info(`Scanning for vendor: ${vendor}`);
try {
const response = await axios.get(this.baseUrl, {
params: {
keywordSearch: vendor,
cvssV3Severity: 'CRITICAL'
},
headers: this.apiKey ? { 'apiKey': this.apiKey } : {},
timeout: 10000
});
const vulnerabilities = response.data.vulnerabilities || [];
for (const item of vulnerabilities) {
const cve = item.cve;
// Mapping raw API data to our validated schema
const rawData = {
cveId: cve.id,
description: cve.descriptions?.[0]?.value || 'No description',
severity: cve.metrics?.cvssMetricV31?.[0]?.cvssData?.baseSeverity || 'UNKNOWN',
vendor: vendor
};
// Validate with Zod
const validated = VulnerabilitySchema.parse(rawData);
allReports.push(validated);
}
} catch (error: any) {
logger.error('Scan failed', { vendor, error: error.message });
}
}
return allReports;
}
}
// Execution block
(async () => {
const scanner = new VulnerabilityScanner();
const results = await scanner.scan();
console.log('--- Vulnerability Scan Results ---');
console.table(results);
})();
Never hardcode credentials. Use a .env file in your project root.
# .env file
NVD_API_KEY=your_actual_nvd_api_key_here
LOG_LEVEL=info
SCAN_INTERVAL_MINUTES=60
NOTIFICATION_WEBHOOK_URL=https://hooks.slack.com/services/T000/B000/XXXX
Instead of just printing results, developers typically implement an "Observer" that pushes alerts to Slack or PagerDuty when a new CVE is detected.
Since NVD doesn't push webhooks, the standard pattern is a scheduled Cron job or a while True loop with an exponential backoff to avoid being banned by the API.
To avoid alerting on the same CVE every hour, developers store the cve_id in a lightweight database (like Redis or SQLite) and only alert if the ID is not present.
| Error | Cause | Fix |
|---|---|---|
403 Forbidden | API Key is invalid or expired. | Regenerate key in NVD portal. |
429 Too Many Requests | You are hitting the API too fast. | Implement time.sleep() or use an API key. |
ValidationError | The API structure changed. | Update your Pydantic/Zod schema to match the new JSON structure. |
TimeoutError | Network latency or large query size. | Increase the timeout parameter in your HTTP client. |
.env and added to .gitignore?Dockerfile ready for deployment to Kubernetes or AWS Fargate?Source: Security Week AI
Follow ICARAX for more AI insights and tutorials.
