#!/usr/bin/python3 # # Interactive command to iterate over new CVEs in order to triage # them. # # Based on a previous Perl script written by Stefan Fritsch and others. # # Copyright © 2023 Emilio Pozuelo Monfort # # This file is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This file is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this file. If not, see . import argparse import collections import json import logging import os import re import readline import subprocess import sys import tempfile import textwrap import zipfile import requests import setup_paths # noqa from sectracker import parsers from bugs import temp_bug_name logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s') #logging.getLogger().setLevel("DEBUG") def debug(s): if args.verbose: print(s) def get_annotation(annotations, ann_type): for ann in annotations: if isinstance(ann, ann_type): return ann def ann_is_todo_check(ann): if isinstance(ann, parsers.StringAnnotation): if ann.type == "TODO" and ann.description == "check": return True return False def read_packages_file(file): packages = [] with open(file) as f: for line in f: if line.startswith('#'): continue line = line.strip() if line != "": packages.append(line) return packages def read_wnpp_file(wnpp_file): wnpp = {} wnpp_re = re.compile(r"^([\w.-]+): ((?:ITP|RFP) .+)$") with open(wnpp_file) as f: for line in f: m = wnpp_re.match(line) if m: package, bug = m.group(1, 2) wnpp[package] = bug return wnpp def print_urls(cve_id): cve = get_cve5(cve_id) if cve: cna = cve['containers']['cna'] if 'references' in cna: for ref in cna['references']: print('Reference: ' + ref['url']) print("") def get_cve5_description(cve_id): cve = get_cve5(cve_id) desc = None if cve: if 'descriptions' in cve['containers']['cna']: desc = [desc['value'] for desc in cve['containers']['cna']['descriptions'] if desc['lang'].startswith('en')] if desc: desc = desc[0] # for some reason descriptions may contain new lines desc = desc.replace('\n', ' ') # and some contain leading spaces desc = desc.strip() return desc def save_datafile(cves, datafile): debug(f'writing {len(cves)} to {datafile}') with open(datafile, "w") as f: parsers.writecvelist(cves, f) def print_cve(cve): cvelist = [cve] parsers.writecvelist(cvelist, sys.stdout) def get_cve5(cve_id): global cve5_zip if cve_id not in cve5s: return None fname = cve5s[cve_id] logging.info('loading file') f = cve5_zip.open(fname) logging.info('loading json') return json.load(f) def read_cve5_file(f): cve5s = {} z = zipfile.ZipFile(cve5_file) for fname in z.namelist(): if os.path.basename(fname).startswith('CVE-'): debug("found record " + fname) cve_id = os.path.basename(fname)[:-5] cve5s[cve_id] = fname return cve5s # returns the first Debian bug associated to the CVE, or None def get_cve_bug(cve): for ann in cve.annotations: if isinstance(ann, parsers.PackageAnnotation): for flag in ann.flags: if isinstance(flag, parsers.PackageBugAnnotation): return flag.bug return None def get_cve_name(cve): if not 'XXXX' in cve.header.name: return cve.header.name # for XXXX entries, we can't use the name in CVE/list as it will # cause collitions, so we use the hashed name instead. bug = get_cve_bug(cve) or 0 desc = cve.header.description return temp_bug_name(bug, desc) def parse_cves(): cvelist = parsers.cvelist(datafile) # we want to use a dict to easily lookup and replace a cve, but we need # to keep order from the list above for when we write the keys back. cves = collections.OrderedDict() for cve in cvelist: name = get_cve_name(cve) cves[name] = cve return cves def auto_nfu(name): debug(f'checking nfu for {name}') desc = get_cve5_description(name) if not desc: return None wordpress_re = re.compile(r".*in\s+the\s+(.+)\s+(plugin|theme)\s+(?:[\w\d.]+\s+)?(?:(?:(?:before|through)\s+)?[\w\d.]+\s+)?for\s+[Ww]ord[Pp]ress.*") m = wordpress_re.match(desc) if m: name, type = m.group(1, 2) return f"{name} {type} for WordPress" nfu_re = re.compile(r".*\b(FS\s+.+?\s+Clone|Meinberg\s+LANTIME|Ecava\s+IntegraXor|Foxit\s+Reader|Cambium\s+Networks\s+.+?\s+firmware|Trend\s+Micro|(?:SAP|IBM|EMC|NetApp|Micro\sFocus).+?(?=tool|is|version|[\d(,])).*") m = nfu_re.match(desc) if m: name = m.group(1) name = name.strip() return name return None apt_cache_cache = [] apt_cache_cache_term = "" def apt_cache(term): global apt_cache_cache_term global apt_cache_cache if term == apt_cache_cache_term: return apt_cache_cache cmd = subprocess.run(['apt-cache', 'search', term], text=True, capture_output=True) apt_cache_cache = cmd.stdout.strip().split('\n') apt_cache_cache_term = term return apt_cache_cache def read_embedded_copies(): emb_file = "data/embedded-code-copies" with open(emb_file) as f: comment_section = True code = None pkg = None for line in f.readlines(): if re.match(r'^---BEGIN', line): comment_section = False continue if not comment_section: debug(line) if m := re.match(r'^([\w][\w+-.]+)', line): code = m.group(1).lower() debug("code: " + code) pkg = None if code in embed_code: syntax_error("Duplicate embedded code $code") elif line.strip() == "": code = None pkg = None debug("empty line, resetting") elif m := re.match(r'^\s+(?:\[\w+\]\s+)?-\s+(\w[\w.-]+)', line): pkg = m.group(1) debug("pkg: " + pkg) line = line.strip() if code not in embed_code: embed_code[code] = {} if pkg not in embed_code[code]: embed_code[code][pkg] = {} embed_pkg[pkg] = {} embed_code[code][pkg][line] = True embed_pkg[pkg][code] = True elif re.match(r'^\s+(?:NOTE|TODO)', line): # note should follow a pkg line, which should have already # been processed pass else: syntax_error(f"Cannot parse {line}") def set_cve_nfu(name, desc): cve = cves[name] # remove todo: check annotation... cve.annotations = [ann for ann in cve.annotations if not ann_is_todo_check(ann)] # ... and add a NFU annotation ann = parsers.StringAnnotation(0, "NOT-FOR-US", desc) cve.annotations.append(ann) def syntax_error(s): print("embedded-code-copies: " + s, file=sys.stderr) sys.exit(1) def search_embed(text): found = 0 text = text.lower() if text in embed_code: print(f"{text} is embedded by: " + " ".join(sorted(embed_code[text].keys()))) found = 1 if text in embed_pkg: print(f"{text} embeds: " + " ".join(sorted(embed_pkg[text].keys()))) found = 1 return found def wnpp_to_candidates(): for pkg, line in wnpp.items(): # there might be more than one bug, so only take the first bugline = line.split('|')[0] type, bug = bugline.split(" ") if re.match(r'^(?:RFP|ITP)$', type): yield f"{pkg} (bug #{bug})" def print_stats(): temp_cves = [e for e in cves.keys() if 'TEMP' in e] print(f"{len(cves)} CVEs", end="") print(f", {len(temp_cves)} temp issues", end="") if num_todo > 0: print(f", {num_todo} todos", end="") if num_missing_bug > 0: print(f", {num_missing_bug} entries with missing bug reference", end="") print("") def print_commands(): print(''' * s or blank line to skip to next issue * h to repeat this help output of the list of commands * f name to do "apt-file search name" * c name to do "apt-cache search name" * w name to look up name in wnpp * m package to search data/embedded-code-copies for "package" * r package to launch an editor with a report of the issue against "package" * g issue to go to the given issue, even if it's not a todo * d to display the issue information again * v or e to launch an editor with the current item * !command to execute a command * - package-entry to add an entry for "package" and launch an editor (e.g. - poppler ) * n to mark the issue as NOT-FOR-US: * q to save and quit * CTRL-C to quit without saving * everything else is inserted as product name for a NOT-FOR-US''') parser = argparse.ArgumentParser(description="review new CVE entries") parser.add_argument('-l', '--list', action='store_true', help='Only list issues') parser.add_argument('-f', '--full', action='store_true', help='Show full CVE entries') parser.add_argument('-u', '--unfixed', action='store_true', help='Also process CVEs with unfixed issues and no bugs') parser.add_argument('-U', '--only-unfixed', action='store_true', help='Only process CVEs with unfixed issues and no bugs') parser.add_argument('-a', '--auto', action='store_true', help='Automatically process NOT-FOR-US entries') parser.add_argument('-s', '--skip', action='store_true', help='Skip automatic apt-cache/apt-file searches') parser.add_argument('-D', '--no-download', action='store_true', help='Skip downloading files') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose mode') args = parser.parse_args() embed_code = {} embed_pkg = {} read_embedded_copies() cve5_file_url = 'https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip' cve5_file = 'mitre.zip' datafile = "data/CVE/list" removed_packages_file = "data/packages/removed-packages" ignore_bug_file = "data/packages/ignored-debian-bug-packages" wnppurl = "https://qa.debian.org/data/bts/wnpp_rm" wnppfile = "../wnpp_rm" issue_re = re.compile(r'CVE-20(?:0[3-9]|[1-9][0-9])|TEMP') auto_display_limit = 10 #$auto_display_limit = $opts{a} if defined $opts{a} editor = 'sensible-editor' if not args.no_download: debug("downloading files...") r = requests.get(cve5_file_url) with open(cve5_file, "wb") as f: f.write(r.content) r = requests.get(wnppurl) with open(wnppfile, "w") as f: f.write(r.text) debug("reading data...") # used by read_cve5, used as a global so that we don't have to open the # file repeatedly, since we only read cve5s one by one on demand cve5_zip = zipfile.ZipFile(cve5_file) # We have CVE 5.0 JSON information coming from MITRE, we use cve5 for those # We also have CVE information coming from our data/CVE/list, we use cve there cves = parse_cves() cve5s = read_cve5_file(cve5_file) todos = [] afcache = {} num_todo = 0 num_missing_bug = 0 wnpp = read_wnpp_file(wnppfile) # packages that should be ignored by -u/-U ignore_missing_bugs = read_packages_file(removed_packages_file) ignore_missing_bugs += read_packages_file(ignore_bug_file) seen_pkgs = {} wnpp_candidates = list(wnpp_to_candidates()) for name, cve in cves.items(): if not args.list: for ann in cve.annotations: if isinstance(ann, parsers.PackageAnnotation): if ann.kind == 'itp': continue pkg = ann.package seen_pkgs[pkg] = True if issue_re.match(name): if not args.only_unfixed: for ann in cve.annotations: if ann_is_todo_check(ann): todos.append(name) num_todo += 1 if args.unfixed or args.only_unfixed: for ann in cve.annotations: if not isinstance(ann, parsers.PackageAnnotation): continue if ann.release != None and ann.release != "": continue if ann.package in ignore_missing_bugs: continue if ann.kind != "unfixed": continue urgency = get_annotation(ann.flags, parsers.PackageUrgencyAnnotation) if urgency and urgency.severity == "unimportant": continue bug = get_annotation(ann.flags, parsers.PackageBugAnnotation) if bug: continue todos.append(name) num_missing_bug += 1 print_stats() if not args.list and not args.auto: print("") print("Commands:") print_commands() print("") if args.list: for todo in sorted(todos, reverse=True): desc = get_cve5_description(todo) if desc: indent = " " lines = textwrap.wrap(desc, initial_indent=indent, subsequent_indent=indent) desc = "\n".join(lines[0:2]) if args.full: print_cve(cves[todo]) print(f"{desc}") else: print(f"{todo}:\n{desc}") else: print_cve(cves[todo]) sys.exit(0) if args.auto: # auto process for todo in todos: if nfu_entry := auto_nfu(todo): set_cve_nfu(todo, nfu_entry) save_datafile(cves.values(), datafile) sys.exit(0) def print_full_entry(name): print("======================================================") print(f"Name: {name}") print_urls(name) if desc := get_cve5_description(name): desc = "\n".join(textwrap.wrap(desc)) print(desc) print("") print_cve(cves[name]) def edit_entry(entry, extra_text=None): _, filename = tempfile.mkstemp() save_datafile([entry], filename) with open(filename) as f: old_data = f.read() if extra_text is not None: with open(filename, "a") as f: f.write(extra_text) subprocess.run(f"{editor} {filename}", shell=True) with open(filename) as f: new_data = f.read() newcves = parsers.cvelist(filename) os.unlink(filename) debug(f"edit_entry: old_data\n{old_data}\n") debug(f"edit_entry: new_data\n{new_data}\n") if old_data == new_data: return None return newcves[0] def search_wnpp(s, wantarray=False): s = s.lower() matches = [w for w in wnpp.keys() if s in w] matches = sorted(matches) if wantarray: return matches for e in matches: print(f"{e}: {wnpp[e]}") return len(matches) > 0 def auto_search(name): desc = get_cve5_description(name) or "" desc = desc.strip() #$desc =~ s/[\s\n]+/ /g; prog = None file = None if m := re.match(r'^(\S+(?: [A-Z]\w*)*) \d', desc): prog = m.group(1) elif m := re.search(r' in (\S+\.\S+) in (?:the )?(\S+) ', desc): file = m.group(1) prog = m.group(2) elif m := re.search(r' in (?:the )?(\S+) ', desc): prog = m.group(1) if prog: debug("prog: " + prog) if not args.skip: prog_esc = prog #$prog_esc =~ tr{a-zA-Z0-9_@/-}{ }cs; ac = apt_cache(prog_esc) if len(ac) > auto_display_limit or len(ac) == 0: print(f"{len(ac)} results from apt-cache search {prog_esc}") else: print(f"=== apt-cache search {prog_esc}:") for result in ac: print(result) print("===") else: print(f"You probably want to .c{prog}") for p in prog.split(): search_embed(p) wr = search_wnpp(p, wantarray=True) if len(wr) > auto_display_limit: print(f"{len(wr)} results from searching '{p}' in WNPP") else: for we in wr: print(f"{we}: {wnpp[we]}") if file and re.match(r'^(?:index|default|login|search|admin)\.(?:php3?|asp|cgi|pl)$', file): return if file and re.search(r'(php3?|asp|cgi|pl)$', file): if not args.skip: if file not in afcache: file_esc = file #file_esc = quotemeta(file) print(f"doing apt-file search {file_esc}") cmd = subprocess.run(['apt-file', '-i', 'search', file_esc], text=True, capture_output=True) afcache[file] = cmd.stdout.split('\n') #if (scalar @{$afcache{$file}} > $auto_display_limit) { # # replace with empty array to save mem # my $num = scalar @{$afcache{$file}} # afcache[file] = [] if len(afcache[file]) > auto_display_limit or \ len(afcache[file]) == 0: print(f"{len(afcache[file])} results from apt-file -i search {file}") else: print(f"=== apt-file -i search {file}:") for result in afcache[file]: print(afcache[file]) print("===") else: print(f"You probably want to .f{file}") def present_issue(name): quit = False print_full_entry(name) if nfu_entry := auto_nfu(name): set_cve_nfu(name, nfu_entry) print("New entry automatically set to NFU:") entry = cves[name] print_cve(entry) return False auto_search(name) while True: line = input("> ") line = line.strip() if m := re.match(r"^\s*$", line): # skip command break elif m := re.match(r"^s$", line): # skip command break elif m := re.match(r"^c\s+(.*)$", line): s = m.group(1).strip() #$s =~ tr{a-zA-Z0-9_@-}{ }cs; print(f"=== apt-cache search {s}") subprocess.run(f"apt-cache search {s} | less -FX", shell=True) print("===") continue elif m := re.match(r"^f\s+(.*)$", line): s = m.group(1).strip() #s = quotemeta(s) print(f"=== apt-file search {s}") subprocess.run(f"apt-file search {s} | less -FX", shell=True) print("===") continue elif m := re.match(r"^w\s+(.*)$", line): s = m.group(1).strip() print(f"=== wnpp lookup for '{s}':") search_wnpp(s) print("===") continue elif m := re.match(r'^m\s+(.*)$', line): s = m.group(1).strip() print(f"references to {s} in embedded-code-copies:") search_embed(s) or print("none") continue elif m := re.match(r"^g\s+(.+)$", line): n = m.group(1).strip() if n not in cves: print(f"unknown issue '{n}'") continue if present_issue(n): quit = True break print(f"back at {name} (you might want to type 'd')") continue elif re.match("^h$", line): print_commands() continue elif m := re.match(r"^!(.+)$", line): cmd = m.group(1) r = subprocess.run(cmd, shell=True) print(f"exit status: {r.returncode}") continue elif re.match(r"^q\s?$", line): quit = True break elif re.match(r"^[ve]\s?$", line): entry = cves[name] new_entry = edit_entry(entry) if not new_entry: print("Not changed.") continue else: cves[name] = new_entry print("New entry set to:") print_cve(new_entry) break elif re.match(r"^d\s?$", line): print_full_entry(name) continue elif m := re.match(r"^(\-\s+.+)$", line): components = m.group(1).split() if len(components) <= 2: components.append('') extra_text = "\t" + " ".join(components) old_entry = cves[name] if components[2] == '': old_entry.annotations = [ann for ann in old_entry.annotations if not ann_is_todo_check(ann)] new_entry = edit_entry(old_entry, extra_text=extra_text) cves[name] = new_entry print("New entry set to:") print_cve(new_entry) break elif m := re.match(f'^r\s+(.*)$', line): pkg = m.group(1).strip() _, tmpname = tempfile.mkstemp() subprocess.run(f"bin/report-vuln {pkg} {name} > {tmpname}", shell=True) subprocess.run(f"{editor} {tmpname}", shell=True) #os.unlink(tmpname) continue elif m := re.match(f'^n\s+(.*)$', line): nfu = m.group(1).strip() set_cve_nfu(name, nfu) print("New entry set to:") print_cve(cves[name]) break else: set_cve_nfu(name, line) print("New entry set to:") print_cve(cves[name]) break return quit completion_commands = "f c w m r g ! v e - h n s q d".split() def complete_line(text, state): response = None origline = readline.get_line_buffer() begin = readline.get_begidx() end = readline.get_endidx() being_completed = origline[begin:end] words = origline.split() logging.debug('origline=%s', repr(origline)) logging.debug('begin=%s', begin) logging.debug('end=%s', end) logging.debug('being_completed=%s', being_completed) logging.debug('words=%s', words) if not words: current_candidates = completion_commands else: try: if begin == 0: # first word candidates = completion_commands else: # later word first = words[0] if first == '-': # autocomplete - pkg entries if len(words) == 1 or (len(words) == 2 and being_completed): candidates = list(seen_pkgs.keys()) candidates += wnpp_candidates elif (len(words) == 2 and not being_completed) \ or (len(words) == 3 and being_completed): candidates = ' '.split() else: candidates = ['(unimportant)', '(low)', '(medium)', '(high)' ] elif first == 'r': if (len(words) == 1 and not being_completed) or \ (len(words) == 2 and being_completed): candidates = list(seen_pkgs.keys()) if being_completed: # match options with portion of input # being completed current_candidates = [ w for w in candidates if w.startswith(being_completed) ] else: # matching empty string so use all candidates current_candidates = candidates logging.debug('candidates=%s', current_candidates) except (KeyError, IndexError) as err: logging.error('completion error: %s', err) current_candidates = [] try: response = current_candidates[state] if len(current_candidates) == 1: response += " " except IndexError: response = None logging.debug('complete(%s, %s) => %s', repr(text), state, response) return response readline.set_completer(complete_line) # we don't want '<' to be considered a delim as we use it as a word for # e.g. '' readline.set_completer_delims(' ') readline.parse_and_bind('tab: complete') for todo in sorted(todos, reverse=True): if present_issue(todo): break save_datafile(cves.values(), datafile)