#!/usr/bin/python3 # # Parse MITRE JSON 5.0 records and update data/CVE/list # # See https://github.com/CVEProject/cve-schema # and https://github.com/CVEProject/cvelistV5 # # Copyright © 2023 Emilio Pozuelo Monfort import argparse import io import json import os import zipfile import requests import setup_paths # noqa from sectracker import parsers CVE_ZIPFILE = 'https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip' def debug(m): if args.verbose: print(m) def get_annotation(annotations, ann_type): for ann in annotations: if isinstance(ann, ann_type): return ann def is_published(record): return record['cveMetadata']['state'] == 'PUBLISHED' def is_reserved(record): return record['cveMetadata']['state'] == 'RESERVED' def is_rejected(record): return record['cveMetadata']['state'] == 'REJECTED' def parse_record(record, cve): # remove all flags, and add the current one if needed ann = get_annotation(cve.annotations, parsers.FlagAnnotation) if ann: cve.annotations.remove(ann) if is_published(record): # no flag for published records pass elif is_reserved(record): ann = parsers.FlagAnnotation(0, 'RESERVED') cve.annotations.insert(0, ann) elif is_rejected(record): ann = parsers.FlagAnnotation(0, 'REJECTED') cve.annotations.insert(0, ann) if is_reserved(record) or is_rejected(record): if cve.header.description.startswith('('): cve.header.description = '' else: desc = [desc['value'] for desc in record['containers']['cna']['descriptions'] if desc['lang'].startswith('en')] if desc: desc = desc[0] # for some reason descriptions may contain new lines desc = desc.replace('\n', ' ') # and even non-printable characters such as \xa0 ( ) # if a character is non-ascii then return character in # ASCII-only representation. desc = "".join([ c if ord(c) < 128 else ascii(c).strip('\'') for c in desc if c.isprintable() ]) # and some contain leading spaces desc = desc.strip() if len(desc) > 70: desc = desc[:70] + ' ...' cve.header.description = f"({desc})" if not is_reserved(record) and not is_rejected(record) \ and not get_annotation(cve.annotations, parsers.StringAnnotation) \ and not get_annotation(cve.annotations, parsers.PackageAnnotation): ann = parsers.StringAnnotation(0, 'TODO', 'check') cve.annotations.append(ann) def process_record_file(f): global cve_dir global cves record = json.load(f) cve_id = record['cveMetadata']['cveId'] try: cve = cve_dir[cve_id] except KeyError: header = parsers.Header(0, cve_id, '') cve = parsers.Bug('', header, list()) cves.insert(0, cve) parse_record(record, cve) def process_record_filename(record_file): with open(record_file) as f: process_record_file(f) def process_record_dir(record_dir): for year_dir in os.listdir(record_dir): for record_file in os.listdir(year_dir): debug("processing record " + record_file) process_record_filename(record_file) debug("record processed") def process_zip_file(zip_file): z = zipfile.ZipFile(zip_file) for fname in z.namelist(): if os.path.basename(fname).startswith('CVE-'): f = z.open(fname) debug("processing record " + fname) process_record_file(f) debug("record processed") def download_zip_file(): debug("downloading zip file...") r = requests.get(CVE_ZIPFILE) debug(f"downloaded, status {r.status_code}") b = io.BytesIO(r.content) process_zip_file(b) default_workdir = os.path.join(os.path.dirname(os.path.dirname(__file__))) parser = argparse.ArgumentParser(description='Update CVE list with MITRE CVE records') parser.add_argument('-v', '--verbose', action="store_true", help='enable verbose messages') parser.add_argument('--work-dir', help='path to security-tracker repo (default: relative to the script)', default=default_workdir) parser.add_argument('file', nargs='?', help='file to process, or download records from MITRE if not specified') args = parser.parse_args() main_list = args.work_dir + '/data/CVE/list' debug("reading cve file") cves = parsers.cvelist(main_list) debug("finished reading cve file") cve_dir = { cve.header.name: cve for cve in cves } if not args.file: # no argument, we download the CVE db download_zip_file() elif args.file.endswith('.json'): debug("processing record " + args.file) process_record_filename(args.file) debug("record processed") elif args.file.endswith('.zip'): process_zip_file(args.file) else: process_record_dir(args.file) # write CVE file back with open(main_list, 'w') as f: parsers.writecvelist(cves, f)