#!/usr/bin/python3 # # Fetch CVE descriptions from the MITRE CVE v5 and update the DB # # TODO: dedup code from check-new-issues # TODO: change DB schema and remove unused columns and rename table # # Copyright © 2023 Emilio Pozuelo Monfort # # This file is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This file is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this file. If not, see . import io import json import os.path import sys import zipfile import requests import setup_paths import security_db cve5_zip = None def get_cve5_zipfile(): global cve5_zip if cve5_zip is None: cve5_file_url = 'https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip' r = requests.get(cve5_file_url) cve5_zip_data = io.BytesIO(r.content) cve5_zip = zipfile.ZipFile(cve5_zip_data) return cve5_zip def read_cve5_file(): cve5s = {} z = get_cve5_zipfile() for fname in z.namelist(): if os.path.basename(fname).startswith('CVE-'): #debug("found record " + fname) cve_id = os.path.basename(fname)[:-5] cve5s[cve_id] = fname return cve5s def get_cve5(cve_id): global cve5_zip if cve_id not in cve5s: return None fname = cve5s[cve_id] #logging.info('loading file') f = cve5_zip.open(fname) #logging.info('loading json') return json.load(f) def get_cve5_description(cve_id): cve = get_cve5(cve_id) desc = None if cve: if 'descriptions' in cve['containers']['cna']: desc = [desc['value'] for desc in cve['containers']['cna']['descriptions'] if desc['lang'].startswith('en')] if desc: desc = desc[0] # for some reason descriptions may contain new lines desc = desc.replace('\n', ' ') # and some contain leading spaces desc = desc.strip() return desc base = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) db_file = os.path.join(base, 'data/security.db') db = security_db.DB(db_file) data = [] cve5s = read_cve5_file() for cve_id, cve_file in cve5s.items(): cve_desc = get_cve5_description(cve_id) # The DB schema requires the desc to not be NULL if not cve_desc: cve_desc = "" data.append((cve_id, cve_desc, "", # discovered "", # published "", # severity "", # range local "", # range remote "", # range user init "", # loss avail "", # loss conf "", # loss int "", # loss sec prot user "", # loss sec prot admin "", # loss sec prot other )) # Sort afterwards to increase locality in the insert process data.sort() cursor = db.writeTxn() db.updateNVD(cursor, data, False) db.commit(cursor)