# security_db.py -- simple, CVE-driven Debian security bugs database # Copyright (C) 2005 Florian Weimer # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA """This module implements a small database for tracking security bugs. Note that the database is always secondary to the text files. The database is only an implementation tool, and not used for maintaining the data. The data is kept in a SQLite 3 database. FIXME: Document the database schema once it is finished. """ import apsw import base64 import bugs import cPickle import cStringIO import glob import os import os.path import re import sys import types import zlib import debian_support import dist_config class InsertError(Exception): """Class for capturing insert errors. The 'errors' member collects all error messages. """ def __init__(self, errors): assert len(errors) > 0, errors assert type(errors) == types.ListType, errors assert type(errors[0])== types.StringType, errors self.errors = errors def __str__(self): return self.errors[0] + ' [more...]' def mergeLists(a, b): """Merges two lists.""" if type(a) == types.StringType: if a == "": a = [] else: a = a.split(',') if type(b) == types.StringType: if b == "": b = [] else: b = b.split(',') result = {} for x in a: result[x] = 1 for x in b: result[x] = 1 result = result.keys() result.sort() return result class NVDEntry: """A class for an entry in the nvd_data table. Objects have the same fileds as the table.""" def __init__(self, row, description): for x in range(len(row)): setattr(self, description[x][0], row[x]) def rangeString(self): result = [] if self.range_local: result.append("local") if self.range_remote: result.append("remote") if self.range_user_init: result.append("user-initiated") return ", ".join(result) class SchemaMismatch(Exception): """Raised to indicate a schema mismatch. The caller is expected to remove and regenerate the database.""" class DB: """Access to the security database. This is a wrapper around an SQLite database object (which is accessible as the "db" member. Most operations need a special cursor object, which can be created with a cursor object. The name "cursor" is somewhat of a misnomer because these objects are quite versatile. """ def __init__(self, name, verbose=False): self.name = name self.db = apsw.Connection(name) self.verbose = verbose self.schema_version = 21 self._initFunctions() c = self.cursor() for (v,) in c.execute("PRAGMA user_version"): if v == 0: self.initSchema() elif v == 20: self._initSchema20() elif v <> self.schema_version: if self.verbose: print "DB: schema version mismatch: expected %d, got %d" \ % (self.schema_version, v) raise SchemaMismatch, `v` # Database has been created at this point. Small race # condition here (the already opened database might refer # to an older file). self.__stat = os.stat(self.name) return assert False def __del__(self): self.db.close() def refresh(self): """Checks if the database file is still the same and reopens it if necessary.""" current = os.stat(self.name) if os.path.samestat(self.__stat, current): return self.__stat = current self.db = apsw.Connection(self.name) self._initFunctions() def cursor(self): """Creates a new database cursor. Also see the writeTxn method.""" return self.db.cursor() def writeTxn(self): """Creates a cursor for an exclusive transaction. No other process may modify the database at the same time. After finishing the work, you should invoke the commit or rollback methods below. """ c = self.cursor() c.execute("BEGIN TRANSACTION EXCLUSIVE") return c def commit(self, cursor): """Makes the changes in the transaction permanent.""" cursor.execute("COMMIT") def rollback(self, cursor): """Undos the changes in the transaction.""" cursor.execute("ROLLBACK") def initSchema(self): """Creates the database schema.""" cursor = self.cursor() # This gives us better performance (it's usually the file # system block size). This must come first to be ffective. cursor.execute("PRAGMA page_size = 4096") # Set the schema version to an invalid value which is # different from zero. We can use this to detect a partially # created schema. cursor.execute("PRAGMA user_version = 1") cursor.execute("""CREATE TABLE inodeprints (file TEXT NOT NULL PRIMARY KEY, inodeprint TEXT NOT NULL, parsed BLOB)""") cursor.execute("""CREATE TABLE version_linear_order (id INTEGER NOT NULL PRIMARY KEY, version TEXT NOT NULL UNIQUE)""") cursor.execute( """CREATE TABLE source_packages (name TEXT NOT NULL, release TEXT NOT NULL, subrelease TEXT NOT NULL, archive TEXT NOT NULL, version TEXT NOT NULL, version_id INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (name, release, subrelease, archive))""") cursor.execute( """CREATE TABLE binary_packages (name TEXT NOT NULL, release TEXT NOT NULL, subrelease TEXT NOT NULL, archive TEXT NOT NULL, version TEXT NOT NULL, source TEXT NOT NULL, source_version TEXT NOT NULL, archs TEXT NOT NULL, version_id INTEGER NOT NULL DEFAULT 0, source_version_id INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (name, release, subrelease, archive, version, source, source_version))""") cursor.execute( """CREATE INDEX binary_packages_source ON binary_packages(source)""") cursor.execute("""CREATE TABLE package_notes (id INTEGER NOT NULL PRIMARY KEY, bug_name TEXT NOT NULL, package TEXT NOT NULL, fixed_version TEXT CHECK (fixed_version IS NULL OR fixed_version <> ''), fixed_version_id INTEGER NOT NULL DEFAULT 0, release TEXT NOT NULL, package_kind TEXT NOT NULL DEFAULT 'unknown', urgency TEXT NOT NULL, bug_origin TEXT NOT NULL DEFAULT '')""") cursor.execute( """CREATE UNIQUE INDEX package_notes_bug ON package_notes(bug_name, package, release)""") cursor.execute( """CREATE INDEX package_notes_package ON package_notes(package)""") cursor.execute("""CREATE TABLE debian_bugs (bug INTEGER NOT NULL, note INTEGER NOT NULL, PRIMARY KEY (bug, note))""") cursor.execute("""CREATE TABLE bugs (name TEXT NOT NULL PRIMARY KEY, cve_status TEXT NOT NULL CHECK (cve_status IN ('', 'CANDIDATE', 'ASSIGNED', 'RESERVED', 'REJECTED')), not_for_us INTEGER NOT NULL CHECK (not_for_us IN (0, 1)), description TEXT NOT NULL, release_date TEXT NOT NULL, source_file TEXT NOT NULL, source_line INTEGER NOT NULL)""") cursor.execute("""CREATE TABLE bugs_notes (bug_name TEXT NOT NULL CHECK (typ <> ''), typ TEXT NOT NULL CHECK (typ IN ('TODO', 'NOTE')), release TEXT NOT NULL DEFAULT '', comment TEXT NOT NULL CHECK (comment <> ''))""") cursor.execute("""CREATE TABLE bugs_xref (source TEXT NOT NULL, target TEXT NOT NULL, PRIMARY KEY (source, target))""") cursor.execute("CREATE INDEX bugs_xref_target ON bugs_xref(target)") cursor.execute("""CREATE TABLE bug_status (bug_name TEXT NOT NULL, release TEXT NOT NULL, status TEXT NOT NULL CHECK (status IN ('vulnerable', 'fixed', 'unknown', 'partially-fixed', 'todo')), reason TEXT NOT NULL, PRIMARY KEY (bug_name, release))""") cursor.execute("""CREATE TABLE source_package_status (bug_name TEXT NOT NULL, package INTEGER NOT NULL, vulnerable INTEGER NOT NULL, urgency TEXT NOT NULL, PRIMARY KEY (bug_name, package))""") cursor.execute( """CREATE INDEX source_package_status_package ON source_package_status(package)""") cursor.execute("""CREATE TABLE binary_package_status (bug_name TEXT NOT NULL, package INTEGER NOT NULL, vulnerable INTEGER NOT NULL, urgency TEXT NOT NULL, PRIMARY KEY (bug_name, package))""") cursor.execute( """CREATE INDEX binary_package_status_package ON binary_package_status(package)""") cursor.execute( "CREATE TABLE removed_packages (name TEXT NOT NULL PRIMARY KEY)") cursor.execute( """CREATE TABLE nvd_data (cve_name TEXT NOT NULL PRIMARY KEY, cve_desc TEXT NOT NULL, discovered TEXT NOT NULL, published TEXT NOT NULL, severity TEXT NOT NULL, range_local INTEGER, range_remote INTEGER, range_user_init INTEGER, loss_avail INTEGER NOT NULL, loss_conf INTEGER NOT NULL, loss_int INTEGER NOT NULL, loss_sec_prot_user INTEGER NOT NULL, loss_sec_prot_admin INTEGER NOT NULL, loss_sec_prot_other INTEGER NOT NULL)""") cursor.execute( """CREATE TABLE debsecan_data (name TEXT NOT NULL PRIMARY KEY, data TEXT NOT NULL)""") self._initNoDSA(cursor) self._initViews(cursor) cursor.execute("PRAGMA user_version = %d" % self.schema_version) def _initSchema20(self): cursor = self.db.cursor() cursor.execute("PRAGMA user_version = 1") self._initNoDSA(cursor) self._initViews(cursor) cursor.execute("DELETE FROM inodeprints WHERE file ='data/CVE/list'") cursor.execute("PRAGMA user_version = %d" % self.schema_version) def _initNoDSA(self, cursor): cursor.execute( """CREATE TABLE package_notes_nodsa (bug_name TEXT NOT NULL, package TEXT NOT NULL, release TEXT NOT NULL, reason TEXT NOT NULL, comment TEXT NOT NULL, PRIMARY KEY (bug_name, package, release)) """) def _initViews(self, cursor): for view in ('testing_status', 'stable_status', 'oldstable_status'): try: cursor.execute('DROP VIEW ' + view) except apsw.SQLError: pass cursor.execute( """CREATE VIEW testing_status AS SELECT DISTINCT sp.name AS package, st.bug_name AS bug, sp.archive AS section, st.urgency AS urgency, (SELECT vulnerable FROM source_packages AS sidp, source_package_status AS sidst WHERE sidp.name = sp.name AND sidp.release = 'sid' AND sidp.subrelease = '' AND sidp.archive = sp.archive AND sidst.bug_name = st.bug_name AND sidst.package = sidp.rowid) AS unstable_vulnerable, COALESCE((SELECT NOT vulnerable FROM source_packages AS tsecp, source_package_status AS tsecst WHERE tsecp.name = sp.name AND tsecp.release = 'squeeze' AND tsecp.subrelease = 'security' AND tsecp.archive = sp.archive AND tsecst.bug_name = st.bug_name AND tsecst.package = tsecp.rowid), 0) AS testing_security_fixed, (SELECT range_remote FROM nvd_data WHERE cve_name = st.bug_name) AS remote, (EXISTS (SELECT * FROM package_notes_nodsa AS pnd WHERE pnd.bug_name = st.bug_name AND pnd.package = sp.name AND pnd.release = 'squeeze')) AS no_dsa FROM source_package_status AS st, source_packages AS sp WHERE st.vulnerable AND st.urgency <> 'unimportant' AND sp.rowid = st.package AND sp.release = 'squeeze' AND sp.subrelease = '' ORDER BY sp.name, st.urgency, st.bug_name""") for (name, nickname) in (('stable', 'lenny'), ('oldstable', 'etch'),): cursor.execute( """CREATE VIEW %s_status AS SELECT DISTINCT sp.name AS package, st.bug_name AS bug, sp.archive AS section, st.urgency AS urgency, (SELECT range_remote FROM nvd_data WHERE cve_name = st.bug_name) AS remote, (EXISTS (SELECT * FROM package_notes_nodsa AS pnd WHERE pnd.bug_name = st.bug_name AND pnd.package = sp.name AND pnd.release = '%s')) AS no_dsa FROM source_package_status AS st, source_packages AS sp WHERE st.vulnerable AND st.urgency <> 'unimportant' AND sp.rowid = st.package AND sp.release = '%s' AND sp.subrelease = '' AND NOT COALESCE((SELECT NOT vulnerable FROM source_packages AS secp, source_package_status AS secst WHERE secp.name = sp.name AND secp.release = '%s' AND secp.subrelease = 'security' AND secp.archive = sp.archive AND secst.bug_name = st.bug_name AND secst.package = secp.rowid), 0) ORDER BY sp.name, urgency_to_number(urgency), st.bug_name""" % (name, nickname, nickname, nickname)) def _initFunctions(self): """Registers user-defined SQLite functions.""" def string_list_add(lst, *args): for arg in args: lst.append(arg) def string_list_to_string(lst): return ', '.join(lst) def string_list_factory(): return ([], string_list_add, string_list_to_string) self.db.createaggregatefunction("string_list", string_list_factory) def string_set_add(lst, *args): for arg in args: for arch in arg.split(','): lst[arch] = True def string_set_to_archs(lst): l = lst.keys() l.sort() return ','.join(l) def string_set_factory(): return ({}, string_set_add, string_set_to_archs) self.db.createaggregatefunction("string_set", string_set_factory) urgencies = ['high', 'medium', 'low', 'unimportant'] def urgency_to_number(u): try: return urgencies.index(u) except ValueError: return 999 self.db.createscalarfunction("urgency_to_number", urgency_to_number, 1) releases = ['potato', 'woody', 'sarge', 'etch', 'lenny', 'squeeze', 'sid'] def release_to_number(u): try: return releases.index(u) except ValueError: return -1 self.db.createscalarfunction("release_to_number", release_to_number, 1) def release_name(release, subrelease, archive): if archive <> 'main': release = release + '/' + archive if subrelease: return "%s (%s)" % (release, subrelease) else: return release self.db.createscalarfunction("release_name", release_name, 3) self.db.createcollation("version", debian_support.version_compare) def filePrint(self, filename): """Returns a fingerprint string for filename.""" st = os.stat(filename) # The "1" is a version number which can be used to trigger a # re-read if the code has changed in an incompatible way. return `(st.st_size, st.st_ino, st.st_mtime, 1)` def _parseFile(self, cursor, filename): current_print = self.filePrint(filename) def do_parse(packages): if self.verbose: print " reading " + `filename` re_source = re.compile\ (r'^([a-zA-Z0-9.+-]+)(?:\s+\(([a-zA-Z0-9.+:~-]+)\))?$') data = [] for pkg in packages: pkg_name = None pkg_version = None pkg_arch = None pkg_source = None pkg_source_version = None for (name, contents) in pkg: if name == "Package": pkg_name = contents elif name == "Version": pkg_version = contents elif name == "Source": match = re_source.match(contents) if match is None: raise SyntaxError(('package %s references ' + 'invalid source package %s') % (pkg_name, `contents`)) (pkg_source, pkg_source_version) = match.groups() elif name == "Architecture": pkg_arch = contents if pkg_name is None: raise SyntaxError\ ("package record does not contain package name") if pkg_version is None: raise SyntaxError\ ("package record for %s does not contain version" % pkg_name) if pkg_arch is None: raise SyntaxError\ ("package record for %s lacks Architecture: field" % pkg_name) data.append((pkg_name, pkg_version, pkg_arch, pkg_source, pkg_source_version)) return data def toString(data): result = cStringIO.StringIO() cPickle.dump(data, result) return buffer(result.getvalue()) for (old_print, contents) in cursor.execute( "SELECT inodeprint, parsed FROM inodeprints WHERE file = ?", (filename,)): if old_print == current_print: return (True, cPickle.load(cStringIO.StringIO(contents))) result = do_parse(debian_support.PackageFile(filename)) cursor.execute("""UPDATE inodeprints SET inodeprint = ?, parsed = ? WHERE file = ?""", (current_print, toString(result), filename)) return (False, result) # No inodeprints entry, load file and add one. result = do_parse(debian_support.PackageFile(filename)) cursor.execute("""INSERT INTO inodeprints (file, inodeprint, parsed) VALUES (?, ?, ?)""", (filename, current_print, toString(result))) return (False, result) def readPackages(self, cursor, directory): """Reads a directory of package files.""" if self.verbose: print "readPackages:" self._readSourcePackages(cursor, directory) self._readBinaryPackages(cursor, directory) if self.verbose: print " finished" def _readSourcePackages(self, cursor, directory): """Reads from directory with source package files.""" re_sources = re.compile(r'.*/([a-z-]+)_([a-z-]*)_([a-z-]+)_Sources$') if self.verbose: print " reading source packages" for filename in glob.glob(directory + '/*_Sources'): match = re_sources.match(filename) if match is None: raise ValueError, "invalid file name: " + `filename` (release, subrelease, archive) = match.groups() (unchanged, parsed) = self._parseFile(cursor, filename) if unchanged: continue cursor.execute( """DELETE FROM source_packages WHERE release = ? AND subrelease = ? AND archive = ?""", (release, subrelease, archive)) self._clearVersions(cursor) def gen(): for (name, version, archs, source, source_version) in parsed: assert source is None assert source_version is None yield name, release, subrelease, archive, version cursor.executemany( """INSERT INTO source_packages (name, release, subrelease, archive, version) VALUES (?, ?, ?, ?, ?)""", gen()) def _readBinaryPackages(self, cursor, directory): """Reads from a directory with binary package files.""" re_packages \ = re.compile( r'.*/([a-z-]+)_([a-z-]*)_([a-z-]+)_([a-z0-9]+)_Packages$') if self.verbose: print " reading binary packages" # First check for any changes. filenames = glob.glob(directory + '/*_Packages') filenames.sort() changed = False for filename in filenames: changed = True for (old_print,) in cursor.execute( "SELECT inodeprint FROM inodeprints WHERE file = ?", (filename,)): if self.filePrint(filename) == old_print: changed = False if changed: break if not changed: if self.verbose: print " finished (no changes)" return # Real import. We have to re-read all Packages files even if # only some of them have changed because the database only # stores aggregated data, and there is no efficient way to # handle updates of the records related to a single file. packages = {} unchanged = True for filename in filenames: match = re_packages.match(filename) if match is None: raise ValueError, "invalid file name: " + `filename` (release, subrelease, archive, architecture) = match.groups() (unch, parsed) = self._parseFile(cursor, filename) unchanged = unchanged and unch for (name, version, arch, source, source_version) in parsed: if source is None: source = name if source_version is None: source_version = version if arch <> 'all' and arch <> architecture: raise ValueError, ("invalid architecture %s for package %s" % (arch, name)) key = (name, release, subrelease, archive, version, source, source_version) if packages.has_key(key): packages[key][arch] = 1 else: packages[key] = {arch : 1} if unchanged: if self.verbose: print " finished (no changes)" return if self.verbose: print " deleting old data" cursor.execute("DELETE FROM binary_packages") self._clearVersions(cursor) l = packages.keys() if len(l) == 0: raise ValueError, "no binary packages found" l.sort() def gen(): for key in l: archs = packages[key].keys() archs.sort() archs = ','.join(archs) yield key + (archs,) if self.verbose: print " storing binary package data" cursor.executemany( """INSERT INTO binary_packages (name, release, subrelease, archive, version, source, source_version, archs) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", gen()) def readBugs(self, cursor, path): if self.verbose: print "readBugs:" def clear_db(cleared=[False]): # Avoid clearing the database multiple times. if cleared[0]: return else: cleared[0] = True cursor.execute("DELETE FROM debian_bugs") cursor.execute("DELETE FROM bugs") cursor.execute("DELETE FROM package_notes") cursor.execute("DELETE FROM bugs_notes") cursor.execute("DELETE FROM bugs_xref") cursor.execute("DELETE FROM package_notes_nodsa") cursor.execute("DELETE FROM removed_packages") # The *_status tables are regenerated anyway, no need to # delete them here. self._clearVersions(cursor) def do_parse(source, cleared=[False]): errors = [] clear_db() if self.verbose: print " reading " + `source.name` for bug in source: try: bug.writeDB(cursor) except ValueError, e: errors.append("%s: %d: error: %s" % (bug.source_file, bug.source_line, e)) if errors: raise InsertError(errors) cursor.executemany( "INSERT OR IGNORE INTO removed_packages (name) VALUES (?)", map(lambda x: (x,), source.removed_packages.keys())) def has_changed(filename): current_print = self.filePrint(filename) for (old_print,) in cursor.execute( "SELECT inodeprint FROM inodeprints WHERE file = ?", (filename,)): if old_print == current_print: return False else: return True return True source_removed_packages = '/packages/removed-packages' sources = ((bugs.CVEFile, '/CVE/list'), (bugs.DSAFile, '/DSA/list'), (bugs.DTSAFile, '/DTSA/list'), (None, source_removed_packages)) unchanged = True for (_, name) in sources: if has_changed(path + name): unchanged = False break if unchanged: if self.verbose: print " finished (no changes)" return clear_db() def read_one(source): filename = source.name current_print = self.filePrint(filename) do_parse(source) cursor.execute( """INSERT OR REPLACE INTO inodeprints (inodeprint, file) VALUES (?, ?)""", (current_print, filename)) for (cls, name) in sources: if cls is None: continue read_one(cls(path + name)) if self.verbose: print " update removed packages" self.readRemovedPackages(cursor, path + source_removed_packages) errors = [] if self.verbose: print " check cross-references" for (bug,) in cursor.execute( """SELECT DISTINCT target FROM bugs_xref EXCEPT SELECT name FROM bugs"""): if bug[0:3] == "VU#": continue errors.append("reference to unknwown bug " + bug) if self.verbose: print " copy notes" # Copy notes from DSA/DTSA to CVE. old_source = '' for source, target in list(cursor.execute( """SELECT source, target FROM bugs_xref WHERE (source LIKE 'DTSA-%' OR source LIKE 'DSA-%') AND target LIKE 'CVE-%'""")): if source <> old_source: source_bug = bugs.BugFromDB(cursor, source) old_source = source for n in source_bug.notes: # We do not copy recursively. assert not n.bug_origin if n.release: rel = str(n.release) else: rel = '' present = False for (version, note_id) in list(cursor.execute( """SELECT fixed_version, id FROM package_notes WHERE bug_name = ? AND package = ? AND release = ?""", (target, n.package, rel))): if version is None: # The target is marked as unfixed. Our # version cannot win. present = True continue if (n.fixed_version is None or n.fixed_version > debian_support.Version(version)): # If our version is larger, it is the definitive one. # Remove the existing entry in this case. cursor.execute( "DELETE FROM debian_bugs WHERE note = ?", (note_id,)) cursor.execute( """DELETE FROM package_notes WHERE bug_name = ? AND package = ? AND release = ?""", (target, n.package, rel)) else: present = True if not present: n.writeDB(cursor, target, bug_origin=source) if errors: raise InsertError(errors) if self.verbose: print " finished" def availableReleases(self, cursor=None): """Returns a list of tuples (RELEASE, ARCHIVE, SOURCES-PRESENT, ARCHITECTURE-LIST).""" if cursor is None: cursor = self.cursor() releases = {} for r in cursor.execute( """SELECT DISTINCT release, subrelease, archive FROM source_packages"""): releases[r] = (True, []) for (rel, subrel, archive, archs) in cursor.execute( """SELECT DISTINCT release, subrelease, archive, archs FROM binary_packages"""): key = (rel, subrel, archive) if not releases.has_key(key): releases[key] = (False, []) releases[key][1][:] = mergeLists(releases[key][1], archs) result = [] for ((rel, subrel, archive), (sources, archs)) in releases.items(): result.append((rel, subrel, archive, sources, archs)) result.sort() return result def getFunnyPackageVersions(self): """Returns a list of (PACKAGE, RELEASE, ARCHIVE, VERSION, SOURCE-VERSION) tuples such that PACKAGE is both a source and binary package, but the associated version numbers are different.""" return list(self.db.cursor().execute( """SELECT DISTINCT name, release, archive, version, source_version FROM binary_packages WHERE name = source AND version <> source_version ORDER BY name, release, archive""")) def _clearVersions(self, cursor): cursor.execute("DELETE FROM version_linear_order") def _updateVersions(self, cursor): """Updates the linear version table.""" if self.verbose: print "updateVersions:" for x in cursor.execute("SELECT * FROM version_linear_order LIMIT 1"): if self.verbose: print " finished (no changes)" return if self.verbose: print " reading" versions = [] for (v,) in cursor.execute( """SELECT DISTINCT * FROM (SELECT fixed_version FROM package_notes WHERE fixed_version IS NOT NULL UNION ALL SELECT version FROM source_packages UNION ALL SELECT version FROM binary_packages UNION ALL SELECT source_version FROM binary_packages)"""): versions.append(debian_support.Version(v)) if self.verbose: print " calculating linear order" versions.sort() if self.verbose: print " storing linear order" for v in versions: cursor.execute( "INSERT INTO version_linear_order (version) VALUES (?)", (str(v),)) if self.verbose: print " updating package notes" cursor.execute( """UPDATE package_notes SET fixed_version_id = (SELECT id FROM version_linear_order WHERE version = package_notes.fixed_version) WHERE fixed_version IS NOT NULL""") if self.verbose: print " updating source packages" cursor.execute( """UPDATE source_packages SET version_id = (SELECT id FROM version_linear_order WHERE version = source_packages.version)""") if self.verbose: print " updating binary packages" cursor.execute( """UPDATE binary_packages SET version_id = (SELECT id FROM version_linear_order WHERE version = binary_packages.version), source_version_id = (SELECT id FROM version_linear_order WHERE version = binary_packages.source_version)""") if self.verbose: print " finished" def calculateVulnerabilities(self, cursor): """Calculate vulnerable packages. To each package note, a release-specific vulnerability status is attached. Currently, only squeeze/testing is processed. Returns a list strings describing inconsistencies. """ result = [] self._updateVersions(cursor) if self.verbose: print "calculateVulnerabilities:" print " checking version consistency in package notes" # The following does not work because stable->security -> # testing -> unstable propagation is no longer available. if False: # Ignore squeeze/testing because stable issues may be # fast-tracked into testing, bypassing unstable. for (bug_name, pkg_name, rel, unstable_ver, rel_ver) \ in list(cursor.execute( """SELECT a.bug_name, a.package, b.release, a.fixed_version, b.fixed_version FROM package_notes a, package_notes b WHERE a.bug_name = b.bug_name AND a.package = b.package AND a.release = '' AND b.release NOT IN ('', 'squeeze') AND a.fixed_version IS NOT NULL AND a.fixed_version_id < b.fixed_version_id""")): b = bugs.BugFromDB(cursor, bug_name) result.append("%s:%d: inconsistent versions for package %s" % (b.source_file, b.source_line, pkg_name)) result.append("%s:%d: unstable: %s" % (b.source_file, b.source_line, unstable_ver)) result.append("%s:%d: release %s: %s" % (b.source_file, b.source_line, `rel`, rel_ver)) if self.verbose: print " checking source packages" cursor.execute( """UPDATE package_notes SET package_kind = 'unknown' WHERE package_kind IN ('source', 'binary')""") cursor.execute( """UPDATE package_notes SET package_kind = 'source' WHERE package_kind = 'unknown' AND EXISTS (SELECT * FROM source_packages AS p WHERE p.name = package_notes.package)""") cursor.execute( """UPDATE package_notes SET package_kind = 'source' WHERE package_kind = 'unknown' AND EXISTS (SELECT * FROM removed_packages AS p WHERE p.name = package_notes.package)""") for (bug_name, package) in list(cursor.execute( """SELECT n.bug_name, n.package FROM package_notes AS n WHERE n.package_kind = 'itp' AND ((EXISTS (SELECT * FROM source_packages WHERE name = n.package)) OR (EXISTS (SELECT * FROM binary_packages WHERE name = n.package)))""")): b = bugs.BugFromDB(cursor, bug_name) result.append("%s:%d: ITPed package %s is in the archive" % (b.source_file, b.source_line, package)) if result: return result if self.verbose: print " remove old status" cursor.execute("DELETE FROM source_package_status") cursor.execute("DELETE FROM binary_package_status") cursor.execute("DELETE FROM bug_status") if self.verbose: print " calculate package status" print " source packages (unqualified)" cursor.execute( """INSERT INTO source_package_status SELECT n.bug_name, p.rowid, n.fixed_version IS NULL OR p.version_id < n.fixed_version_id, n.urgency FROM package_notes AS n, source_packages AS p WHERE n.release = '' AND p.name = n.package""") # Release annotations always override previous results, # therefore we use INSERT OR REPLACE. if self.verbose: print " source packages (qualified)" cursor.execute( """INSERT OR REPLACE INTO source_package_status SELECT n.bug_name, p.rowid, n.fixed_version IS NULL OR p.version_id < n.fixed_version_id, n.urgency FROM package_notes AS n, source_packages AS p WHERE p.name = n.package AND p.release = n.release""") if self.verbose: print " binary packages (from source packages)" cursor.execute( """INSERT INTO binary_package_status SELECT n.bug_name, p.rowid, n.fixed_version IS NULL OR p.source_version_id < n.fixed_version_id, n.urgency FROM package_notes AS n, binary_packages AS p WHERE n.release = '' AND p.source = n.package""") cursor.execute( """INSERT OR REPLACE INTO binary_package_status SELECT n.bug_name, p.rowid, n.fixed_version IS NULL OR p.source_version_id < n.fixed_version_id, n.urgency FROM package_notes AS n, binary_packages AS p WHERE p.source = n.package AND p.release = n.release""") # Almost the same for binary packages. We prefer interpreting # package names as source packages, so we only process the # notes which refer to binary packages. (Of course, we do not # have to add status information for binary package # separately.) if self.verbose: print " binary packages (unqualified)" cursor.execute( """INSERT INTO binary_package_status SELECT n.bug_name, p.rowid, n.fixed_version IS NULL OR p.version_id < n.fixed_version_id, n.urgency FROM package_notes AS n, binary_packages AS p WHERE n.release = '' AND p.name = n.package AND n.package_kind = 'binary'""") if self.verbose: print " binary packages (qualified)" cursor.execute( """INSERT OR REPLACE INTO binary_package_status SELECT n.bug_name, p.rowid, n.fixed_version IS NULL OR p.version_id < n.fixed_version_id, n.urgency FROM package_notes AS n, binary_packages AS p WHERE p.name = n.package AND p.release = n.release AND n.package_kind = 'binary'""") if self.verbose: print " source packages (from binary packages)" cursor.execute( """INSERT INTO source_package_status SELECT n.bug_name, s.rowid, MAX(n.fixed_version IS NULL OR b.version_id < n.fixed_version_id), MAX(n.urgency) FROM package_notes AS n, binary_packages AS b, source_packages AS s WHERE n.package_kind = 'binary' AND b.name = n.package AND s.name = b.source AND s.release = b.release AND s.subrelease = b.subrelease AND s.archive = b.archive GROUP BY n.bug_name, s.rowid, s.release""") # The "GROUP BY" is needed because we we might have multiple # rows in the binary_packages table for different # architectures. # FIXME: MAX(n.urgency) is wrong. # Calculate the release-specific bug status. if self.verbose: print " calculate release status" c = self.cursor() for (bug_name,) in cursor.execute( "SELECT name FROM bugs WHERE NOT not_for_us"): self._calcUnstable(c, bug_name) self._calcTesting(c, bug_name, 'testing', 'squeeze') self._calcTesting(c, bug_name, 'stable', 'lenny') self._calcTesting(c, bug_name, 'oldstable', 'etch') return result def _calcUnstable(self, cursor, bug_name): """Update bug_status with bug_name for unstable.""" vulnerable_packages = [] have_something = False for (package, vulnerable) in cursor.execute( """SELECT DISTINCT sp.name, st.vulnerable FROM source_package_status AS st, source_packages AS sp, package_notes AS n WHERE st.bug_name = ? AND sp.rowid = st.package AND sp.release = 'sid' AND n.bug_name = st.bug_name AND n.package = sp.name AND n.urgency <> 'unimportant' ORDER BY sp.name""", (bug_name,)): have_something = True if vulnerable: vulnerable_packages.append(package) if vulnerable_packages: if len(vulnerable_packages) == 1: pkgs = "package %s is vulnerable" % vulnerable_packages[0] else: pkgs = ("packages %s are vulnerable" % ', '.join(vulnerable_packages)) cursor.execute("""INSERT INTO bug_status (bug_name, release, status, reason) VALUES (?, 'unstable', 'vulnerable', ?)""", (bug_name, pkgs)) else: if have_something: status = "not vulnerable" else: status = "not known to be vulnerable" cursor.execute("""INSERT INTO bug_status (bug_name, release, status, reason) VALUES (?, 'unstable', 'fixed', ?)""", (bug_name, status)) def _calcTesting(self, cursor, bug_name, suite, nickname): """Update bug_status with bug_name for testing/stable.""" # Note that there is at most one source package per # note/release/subrelease triple, but we should check that # here. status = {'' : {}, 'security' : {}} for (package, note, subrelease, vulnerable) in cursor.execute( """SELECT DISTINCT sp.name, n.id, sp.subrelease, st.vulnerable FROM source_package_status AS st, source_packages AS sp, package_notes AS n WHERE st.bug_name = ? AND sp.rowid = st.package AND sp.release = ? AND sp.subrelease IN ('', 'security') AND n.bug_name = st.bug_name AND n.package = sp.name AND n.urgency <> 'unimportant' ORDER BY sp.name""", (bug_name, nickname)): status[subrelease][(package, note)] = vulnerable # Check if any packages in plain testing are vulnerable, and # if all of those have been fixed in the security archive. fixed_in_security = True pkgs = {} for ((package, note), vulnerable) in status[''].items(): if vulnerable: pkgs[package] = True if status['security'].get((package, note), True): fixed_in_security = False pkgs = pkgs.keys() pkgs.sort() if len(pkgs) == 0: if len(status[''].keys()) == 0: msg = "not known to be vulnerable" else: msg = "not vulnerable" cursor.execute("""INSERT INTO bug_status (bug_name, release, status, reason) VALUES (?, ?, 'fixed', ?)""", (bug_name, suite, msg)) return if len(pkgs) == 1: pkgs = "package " + pkgs[0] + " is " else: pkgs = "packages " + ", ".join(pkgs) + " are " if fixed_in_security: pkgs = "%sfixed in %s-security" % (pkgs, suite) if suite == 'stable': status = 'fixed' else: status = "partially-fixed" else: pkgs += "vulnerable" status = "vulnerable" cursor.execute("""INSERT INTO bug_status (bug_name, release, status, reason) VALUES (?, ?, ?, ?)""", (bug_name, suite, status, pkgs)) def calculateDebsecan0(self, release): """Create data for the debsecan tool (VERSION 0 format).""" c = self.cursor() c.execute("""CREATE TEMPORARY TABLE vulnlist ( name TEXT NOT NULL, package TEXT NOT NULL, note INTEGER NOT NULL, PRIMARY KEY (name, package) )""") # Populate the table with the unstable vulnerabilities; # override them with the release-specific status. c.execute("""INSERT INTO vulnlist SELECT bug_name, package, id FROM package_notes WHERE release = ''""") if release: c.execute("""INSERT OR REPLACE INTO vulnlist SELECT bug_name, package, id FROM package_notes WHERE release = ?""", (release,)) else: release = 'sid' c.execute("""DELETE FROM vulnlist WHERE name LIKE 'TEMP-0000000-%'""") urgency_to_flag = {'low' : 'L', 'medium' : 'M', 'high' : 'H', 'unknown' : ' '} result = ["VERSION 0\n"] for (name, package, fixed_version, kind, urgency, remote, description, note_id) in list(c.execute("""SELECT vulnlist.name, vulnlist.package, COALESCE(n.fixed_version, ''), n.package_kind, n.urgency, (SELECT range_remote FROM nvd_data WHERE cve_name = vulnlist.name) AS remote, bugs.description, n.id FROM vulnlist, bugs, package_notes AS n WHERE bugs.name = vulnlist.name AND n.id = vulnlist.note ORDER BY vulnlist.package""")): if fixed_version == '0' or urgency == 'unimportant' \ or kind not in ('source', 'binary', 'unknown'): continue # Normalize FAKE-* names a bit. The line number (which # makes the name unique) is completely useless for the # client. if name[0:5] == 'TEMP-': name = '-'.join(name.split('-')[0:2]) # Determine if a fix is available for the specific # release. fix_available = ' ' if kind == 'source': fix_available_sql = """SELECT st.vulnerable FROM source_packages AS p, source_package_status AS st WHERE p.name = ? AND p.release = ? AND p.subrelease IN ('', 'security') AND st.bug_name = ? AND st.package = p.rowid ORDER BY p.version COLLATE version DESC""" elif kind == 'binary': fix_available_sql = """SELECT st.vulnerable FROM binary_packages AS p, binary_package_status AS st WHERE p.name = ? AND p.release = ? AND p.subrelease IN ('', 'security') AND st.bug_name = ? AND st.package = p.rowid ORDER BY p.version COLLATE version DESC""" else: fix_available_sql = '' if fix_available_sql: for (v,) in c.execute(fix_available_sql, (package, release, name)): assert v is not None if not v: fix_available = 'F' break if kind == 'source': kind = 'S' elif kind == 'binary': kind = 'B' else: kind = ' ' if remote is None: remote = '?' elif remote: remote = 'R' else: remote = ' ' result.append("%s,%c%c%c%c,%s,%s,%s\n" % (name, kind, urgency_to_flag[urgency], remote, fix_available, package, fixed_version, description)) result = base64.encodestring(zlib.compress(''.join(result), 9)) c.execute( "INSERT OR REPLACE INTO debsecan_data (name, data) VALUES (?, ?)", ('release/' + release, result)) c.execute("DROP TABLE vulnlist") def calculateDebsecan1(self): """Calculates debsecan data (release-independent, VERSION 1).""" c = self.cursor() result_start = ['VERSION 1'] bug_to_index = {} bug_to_remote_flag = {} def fill_bug_to_index(): index = 0 for (bug, desc, remote) in c.execute( """SELECT DISTINCT p.bug_name, b.description, (SELECT range_remote FROM nvd_data WHERE cve_name = p.bug_name) FROM package_notes AS p, bugs AS b WHERE (p.bug_name LIKE 'CVE-%' OR p.bug_name LIKE 'TEMP-%') AND p.bug_name NOT LIKE 'TEMP-0000000-%' AND p.urgency <> 'unimportant' AND COALESCE(p.fixed_version, '') <> '0' AND p.package_kind IN ('source', 'binary', 'unknown') AND b.name = p.bug_name ORDER BY p.bug_name"""): if remote is None: remote = '?' elif remote: remote = 'R' else: remote = ' ' # Normalize FAKE-* names a bit. The line number (which # makes the name unique) is completely useless for the # client. if bug[0:5] == 'TEMP-': name = '-'.join(bug.split('-')[0:2]) else: name = bug result_start.append("%s,,%s" % (name, desc)) bug_to_index[bug] = index bug_to_remote_flag[bug] = remote index += 1 result_start.append('') fill_bug_to_index() urgency_to_flag = {'low' : 'L', 'medium' : 'M', 'high' : 'H', 'unknown' : ' '} vuln_list = [] source_packages = {} def fill_vuln_list(source_packages=source_packages): for (bug, package) in list(c.execute( """SELECT DISTINCT bug_name, package FROM package_notes WHERE (bug_name LIKE 'CVE-%' OR bug_name LIKE 'TEMP-%') AND bug_name NOT LIKE 'TEMP-0000000-%' AND package_kind IN ('source', 'binary', 'unknown') GROUP BY package, bug_name ORDER BY package, bug_name""")): # By default, unstable is unfixed even if there are # only release-specific annotations available. This # is slightly at odds with the web front end (see # data/latently-vulnerable) which does not normally # report unstable versions as vulnerable in this case. # However, in our tracking model, the main branch # (sid) cannot be non-vulnerable, while the # release-specific branches are. unstable_fixed = '' total_urgency = '' other_versions = {} is_binary = False is_unknown = False fixed_releases = {} for (release, kind, urgency, version) in list(c.execute( """SELECT release, package_kind, urgency, fixed_version FROM package_notes WHERE bug_name = ? AND package = ?""", (bug, package))): if not total_urgency: total_urgency = urgency elif total_urgency == 'unknown': if urgency <> 'unimportant': total_urgency = urgency elif urgency == 'unknown': if total_urgency == 'unimportant': total_urgency = 'unknown' elif bugs.internUrgency(urgency) \ > bugs.internUrgency(total_urgency): total_urgency = urgency if kind == 'binary': is_binary = True elif kind == 'source': source_packages[package] = True else: is_unknown = True if release == '': unstable_fixed = version if version: v_ref = debian_support.Version(version) for (v,) in c.execute("""SELECT version FROM source_packages WHERE name = ? AND release = 'sid' AND subrelease = ''""", (package,)): if debian_support.Version(v) >= v_ref: fixed_releases['sid'] = True break elif version is not None: fixed_releases[release] = True # Collect newer versions in the same release # (which are supposed to fix the same bug). v_ref = debian_support.Version(version) for (v,) in c.execute("""SELECT fixed_version FROM package_notes WHERE package = ? AND release = ?""", (package, release)): if v is None: continue if debian_support.Version(v) >= v_ref: other_versions[v] = True # The second part of this SELECT statement # covers binary-only NMUs. for (v,) in c.execute("""SELECT version FROM source_packages WHERE name = ?1 AND release = ?2 AND subrelease IN ('', 'security') UNION ALL SELECT source_version FROM binary_packages WHERE source = ?1 AND release = ?2 AND subrelease IN ('', 'security')""", (package, release)): if debian_support.Version(v) >= v_ref: other_versions[v] = True if not total_urgency: total_urgency = 'unknown' # Check if the issue does not actually mark any # packages as vulnerable. (If unstable_fixed == '0', # release-specific annotations cannot create # vulnerabilities, either.) if total_urgency == 'unimportant' or unstable_fixed == '0': continue if unstable_fixed is None: unstable_fixed = '' bs_flag = 'S' if is_binary: assert not is_unknown bs_flag = 'B' elif is_unknown: bs_flag = ' ' other_versions = other_versions.keys() other_versions.sort() other_versions = ' '.join(other_versions) vuln_list.append(("%s,%d,%c%c%c" % (package, bug_to_index[bug], bs_flag, urgency_to_flag[total_urgency], bug_to_remote_flag[bug]), fixed_releases.keys(), ",%s,%s" % (unstable_fixed, other_versions))) fill_vuln_list() source_packages = source_packages.keys() source_packages.sort() def store_value(name, value): value = base64.encodestring(zlib.compress(value, 9)) c.execute("""INSERT OR REPLACE INTO debsecan_data VALUES (?, ?)""", (name, value)) def gen_release(release): result = result_start[:] for (prefix, releases, suffix) in vuln_list: if release in releases: fixed = 'F' else: fixed = ' ' result.append(prefix + fixed + suffix) result.append('') for sp in source_packages: bp_list = [] for (bp,) in c.execute("""SELECT name FROM binary_packages WHERE source = ? AND release = ? AND subrelease = '' ORDER BY name""", (sp, release)): bp_list.append(bp) if bp_list <> [sp]: # We intentionally store the empty list, it means # that the source package is obsolete as a whole. result.append("%s,%s" % (sp, ' '.join(bp_list))) result.append('') store_value('release/1/' + release, '\n'.join(result)) for release in ('sid', 'etch', 'lenny', 'squeeze'): gen_release(release) result = result_start for (prefix, release, suffix) in vuln_list: result.append(prefix + ' ' + suffix) result.append('') result.append('') result.append('') store_value ('release/1/GENERIC', '\n'.join(result)) def calculateDebsecan(self): """Calculate all debsecan data.""" for release in ('', 'etch', 'lenny', 'squeeze'): self.calculateDebsecan0(release) self.calculateDebsecan1() def getDebsecan(self, name): """Returns the debsecan data item NAME.""" for (data,) in self.cursor().execute( "SELECT data FROM debsecan_data WHERE name = ?", (name,)): return base64.decodestring(data) else: return None def replaceNVD(self, cursor, data): """Replaces the stored NVD data.""" cursor.execute("DELETE FROM nvd_data"); cursor.executemany("INSERT INTO nvd_data VALUES (?" + (", ?" * (len(data[0]) - 1)) + ")", data) def updateNVD(self, cursor, data): """Adds (and overwrites) NVD data stored in the database. This can be used for incremental updates.""" cursor.executemany("INSERT OR REPLACE INTO nvd_data VALUES (?" + (", ?" * (len(data[0]) - 1)) + ")", data) def getNVD(self, cursor, cve_name): """Returns a dictionary with NVD data corresponding to the CVE name, or None.""" for row in cursor.execute("SELECT * FROM nvd_data WHERE cve_name = ?", (cve_name,)): return NVDEntry(row, cursor.getdescription()) return None def getSourcePackageVersions(self, cursor, pkg): """A generator which returns tuples (RELEASE-LIST, VERSION), the available versions of the source package pkg.""" for (releases, version) in cursor.execute( """SELECT string_list(release) AS releases, version FROM (SELECT release, version FROM source_packages WHERE name = ? ORDER BY release_to_number(release)) GROUP BY version""", (pkg,)): yield releases.split(', '), version def getBinaryPackageVersions(self, cursor, pkg): """A generator which returns tuples (RELEASE-LIST, SOURCE-PACKAGE, VERSION, ARCH-LIST), the available versions of the binary package pkg.""" for (releases, source, version, archs) in cursor.execute( """SELECT string_list(release) AS releases, source, version, archs FROM (SELECT release, source, version, string_set(archs) AS archs FROM binary_packages WHERE name = ? GROUP BY release, source, version ORDER BY release_to_number(release)) GROUP BY source, version, archs""", (pkg,)): yield releases.split(', '), source, version, archs.split(',') def getBinaryPackagesForSource(self, cursor, pkg): """A generator which returns tuples (PACKAGES, RELEASE-LIST, VERSION), the available binary packages built from the source package pkg.""" for (packages, releases, version, archs) in cursor.execute( """SELECT string_list(package) AS packages, releases, version, archs FROM (SELECT package, string_list(rel) AS releases, version, archs FROM (SELECT name AS package, release_name(release, subrelease, archive) AS rel, version, string_set(archs) AS archs FROM binary_packages WHERE source = ? GROUP BY name, release, subrelease, archive, version ORDER BY release_to_number(release), subrelease) GROUP BY package, version, archs ORDER BY package) GROUP BY releases, version, archs ORDER BY version COLLATE version""", (pkg,)): yield (packages.split(', '), releases.split(', '), archs.split(','), version) def getSourcePackages(self, cursor, bug): """A generator which returns tuples (SOURCE-PACKAGE, RELEASE-LIST, VERSION, VULNERABLE-FLAG) of source packages which are related to the given bug.""" for (package, releases, version, vulnerable) in cursor.execute( """SELECT package, string_list(release), version, vulnerable FROM (SELECT p.name AS package, release_name(p.release, p.subrelease, p.archive) AS release, p.version AS version, s.vulnerable AS vulnerable FROM source_package_status AS s, source_packages AS p WHERE s.bug_name = ? AND p.rowid = s.package ORDER BY release_to_number(p.release), p.subrelease) GROUP BY package, version, vulnerable ORDER BY package, version COLLATE version""", (bug,)): yield package, releases.split(', '), version, vulnerable def getBinaryPackages(self, cursor, bug): """A generator which returns tuples (BINARY-PACKAGE-LIST, RELEASE-LIST, VERSION, ARCH-LIST, VULNERABLE-FLAG) of binary packages which are related to the given bug.""" for (packages, releases, version, archs, vulnerable) in cursor.execute( """SELECT string_list(package) AS packages, releases, version, archs, vulnerable FROM (SELECT package, string_set(release) AS releases, version, archs, vulnerable FROM (SELECT p.name AS package, release_name(p.release, p.subrelease, p.archive) AS release, p.version AS version, string_set(archs) AS archs, s.vulnerable AS vulnerable FROM binary_package_status AS s, binary_packages AS p WHERE s.bug_name = ? AND p.rowid = s.package GROUP BY p.name, p.release, p.subrelease, p.archive, p.version, vulnerable ORDER BY release_to_number(p.release), p.subrelease) GROUP BY package, version, vulnerable, archs ORDER BY package) GROUP BY releases, version, vulnerable, archs ORDER BY packages, version COLLATE version""", (bug,)): yield (packages.split(', '), releases.split(','), version, archs.split(','), vulnerable) def getBugsFromDebianBug(self, cursor, number): """A generator which returns a list of tuples (BUG-NAME, URGENCY, DESCRIPTION).""" return cursor.execute( """SELECT DISTINCT bugs.name, package_notes.urgency, bugs.description FROM debian_bugs, package_notes, bugs WHERE debian_bugs.bug = ? AND package_notes.id = debian_bugs.note AND bugs.name = package_notes.bug_name ORDER BY bug_name""", (number,)) def isSourcePackage(self, cursor, pkg): """Returns a true value if pkg is a source package.""" ((flag,),) = cursor.execute( "SELECT EXISTS (SELECT * FROM source_packages WHERE name = ?)", (pkg,)) return flag def isBinaryPackage(self, cursor, pkg): """Returns a true value if pkg is a binary package.""" ((flag,),) = cursor.execute( "SELECT EXISTS (SELECT * FROM binary_packages WHERE name = ?)", (pkg,)) return flag def getBugsForSourcePackage(self, cursor, pkg, vulnerable): """Returns a generator for a list of (BUG, DESCRIPTION) pairs which have the requested status.""" return cursor.execute( """SELECT DISTINCT name, description FROM (SELECT bugs.name AS name, bugs.description AS description, MAX(st.vulnerable AND COALESCE((SELECT st2.vulnerable FROM source_packages AS sp2, source_package_status AS st2 WHERE sp2.name = sp.name AND sp2.release = sp.release AND sp2.subrelease = 'security' AND sp2.archive = sp.archive AND st2.package = sp2.rowid AND st2.bug_name = st.bug_name ORDER BY st2.vulnerable DESC), 1)) AS vulnerable FROM source_packages AS sp, source_package_status AS st, bugs WHERE sp.name = ? AND sp.release <> 'woody' AND sp.subrelease <> 'security' AND st.package = sp.rowid AND bugs.name = st.bug_name AND st.urgency <> 'unimportant' GROUP BY bugs.name, bugs.description, sp.name) WHERE vulnerable = ? ORDER BY name""", (pkg, vulnerable)) def getBugsForBinaryPackage(self, cursor, pkg, vulnerable): """Returns a generator for a list of (BUG, DESCRIPTION) pairs which have the requested status.""" return cursor.execute( """SELECT name, description FROM (SELECT bugs.name AS name, bugs.description AS description, MAX(st.vulnerable) AS vulnerable FROM binary_packages AS bp, binary_package_status AS st, bugs WHERE bp.name = ? AND bp.release <> 'woody' AND st.package = bp.rowid AND st.urgency <> 'unimportant' AND bugs.name = st.bug_name GROUP BY bugs.name, bugs.description) WHERE vulnerable = ? ORDER BY name""", (pkg, vulnerable)) def getNonBugsForBinaryPackage(self, cursor, pkg): """Returns a generator for a list of (BUG, DESCRIPTION) pairs which have the requested status.""" return cursor.execute( """SELECT DISTINCT bugs.name, bugs.description FROM binary_packages AS bp, binary_package_status AS st, bugs WHERE bp.name = ? AND st.package = bp.rowid AND st.urgency = 'unimportant' AND bugs.name = st.bug_name ORDER BY bugs.name""", (pkg,)) def getTODOs(self, cursor=None, hide_check=False): """Returns a list of pairs (BUG-NAME, DESCRIPTION).""" if cursor is None: cursor = self.cursor() if hide_check: return cursor.execute( """SELECT DISTINCT bugs.name, bugs.description FROM bugs_notes, bugs WHERE bugs_notes.typ = 'TODO' AND bugs_notes.comment <> 'check' AND bugs.name = bugs_notes.bug_name ORDER BY name """) else: return cursor.execute( """SELECT DISTINCT bugs.name, bugs.description FROM bugs_notes, bugs WHERE bugs_notes.typ = 'TODO' AND bugs.name = bugs_notes.bug_name ORDER BY name """) def getBugXrefs(self, cursor, bug): """Returns a generator for a list of bug names. The listed bugs refer to the given bug, or the bug refers to them.""" for (bug_name,) in cursor.execute( """SELECT DISTINCT bug FROM (SELECT target AS bug FROM bugs_xref WHERE source = ? UNION ALL SELECT source AS bug FROM bugs_xref WHERE target = ? UNION ALL SELECT bug_origin AS bug FROM package_notes WHERE bug_name = ? AND bug_origin <> '') WHERE bug <> ? ORDER BY bug""", (bug, bug, bug, bug)): yield bug_name def readRemovedPackages(self, cursor, filename): """Reads a file of removed packages and stores it in the database. The original contents of the removed_packages table is preserved.""" f = file(filename) re_package = re.compile(r'^\s*([a-z0-9]\S+)\s*$') # Not very good error reporting, but changes to that file are # rare. def gen(): for line in f: if line == '': break if line[0] == '#' or line == '\n': continue match = re_package.match(line) if match: yield match.groups() else: raise ValueError, "not a package: " + `line` cursor.executemany( "INSERT OR IGNORE INTO removed_packages (name) VALUES (?)", gen()) def getUnknownPackages(self, cursor): """Returns a generator for a list of unknown packages. Each entry has the form (PACKAGE, BUG-LIST).""" old_package = '' bugs = [] for (package, bug_name) in cursor.execute( """SELECT DISTINCT package, bug_name FROM package_notes WHERE package_kind = 'unknown' AND NOT EXISTS (SELECT * FROM removed_packages WHERE name = package) ORDER BY package, bug_name"""): if package <> old_package: if old_package: yield (old_package, bugs) old_package = package bugs = [] bugs.append(bug_name) if old_package: yield (old_package, bugs) def getFakeBugs(self, cursor=None): """Returns a list of pairs (BUG-NAME, DESCRIPTION).""" if cursor is None: cursor = self.cursor() return list(cursor.execute( """SELECT name, description FROM bugs WHERE name > 'TEMP-' AND name LIKE 'TEMP-%' ORDER BY name""")) def getITPs(self, cursor): """Returns a generator for a list of unknown packages. Each entry has the form (PACKAGE, BUG-LIST, DEBIAN-BUG-LIST).""" # The "|| ''" is required to convert the string_set argument # to a string. for (package, bugs, debian_bugs) in cursor.execute( """SELECT DISTINCT n.package, string_set(n.bug_name), string_set(db.bug || '') FROM package_notes AS n, debian_bugs AS db WHERE package_kind = 'itp' AND db.note = n.id GROUP BY n.package ORDER BY n.package"""): yield (package, bugs.split(','), map(int, debian_bugs.split(','))) def getEffectiveVersion(self, release, pkg, purpose, cache=None, cursor=None): """Retrieve the effective version of a source package in a release. The effective version is the version that matches the recommended sources.list file for the intended purpose. For suitable values of purpose, see dist_config. """ # The cache is structured as a (RELEASE, PACKAGE) => VAL # dict, where VAL is either a dict PURPOSE => VERSION, # a VERSION, or None. if cache is not None: sp = (release, pkg) if cache.has_key(sp): d = cache[sp] if d.__class__ == dict: return d.get(purpose, None) else: return d if cursor is None: cursor = self.cursor() rel = dist_config.releases[release] purposes = rel['purpose'] results = {} Version = debian_support.Version for (part, ver) in cursor.execute( """SELECT DISTINCT subrelease, version FROM source_packages WHERE release = ? AND name = ?""", (str(release), pkg)): ver = Version(ver) for (purpose, permitted) in purposes.items(): if part not in permitted: continue if results.has_key(purpose): oldver = results[purpose] if ver <= oldver: continue results[purpose] = ver if cache is not None: vers = set(map(str, results.values())) l = len(vers) if l == 1: for r in vers: cache[sp] = Version(r) elif l == 0: cache[sp] = None else: cache[sp] = results return results.get(purpose, None) def check(self, cursor=None): """Runs a simple consistency check and prints the results.""" if cursor is None: cursor = self.cursor() for (package, release, archive, architecture, source) in\ cursor.execute( """SELECT package, release, archive, architecture, source FROM binary_packages WHERE NOT EXISTS (SELECT * FROM source_packages AS sp WHERE sp.package = binary_packages.source AND sp.release = binary_packages.release AND sp.archive = binary_packages.archive) """): print "error: binary package without source package" print " binary package:", package print " release:", release if archive: print " archive:", archive print " architecture:", architecture print " missing source package:", source for (package, release, archive, architecture, version, source, source_version) \ in cursor.execute("""SELECT binary_packages.package, binary_packages.release, binary_packages.archive, binary_packages.architecture,binary_packages.version, sp.package, sp.version FROM binary_packages, source_packages AS sp WHERE sp.package = binary_packages.source AND sp.release = binary_packages.release AND sp.archive = binary_packages.archive AND sp.version <> binary_packages.source_version"""): relation = cmp(debian_support.Version(version), debian_support.Version(source_version)) assert relation <> 0 if relation <= 0: print "error: binary package is older than source package" else: print "warning: binary package is newer than source package" print " binary package: %s (%s)" % (package, version) print " source package: %s (%s)" % (source, source_version) print " release:", release if archive: print " archive:", archive print " architecture:", architecture def test(): assert mergeLists('', '') == [], mergeLists('', '') assert mergeLists('', []) == [] assert mergeLists('a', 'a') == ['a'] assert mergeLists('a', 'b') == ['a', 'b'] assert mergeLists('a,c', 'b') == ['a', 'b', 'c'] assert mergeLists('a,c', ['b', 'de']) == ['a', 'b', 'c', 'de'] import os db_file = 'test_security.db' try: db = DB(db_file) except SchemaMismatch: os.unlink(db_file) db = DB(db_file) cursor = db.writeTxn() db.readBugs(cursor, '../../data') db.commit(cursor) b = bugs.BugFromDB(cursor, 'CVE-2005-2491') assert b.name == 'CVE-2005-2491', b.name assert b.description == 'Integer overflow in pcre_compile.c in Perl Compatible Regular ...', b.description assert len(b.xref) == 2, b.xref assert not b.not_for_us assert 'DSA-800-1' in b.xref, b.xref assert 'DTSA-10-1' in b.xref, b.xref assert tuple(b.comments) == (('NOTE', 'gnumeric/goffice includes one as well; according to upstream not exploitable in gnumeric,'), ('NOTE', 'new copy will be included any way')),\ b.comments assert len(b.notes) == 4, len(b.notes) for n in b.notes: assert n.release is None if n.package == 'pcre3': assert n.fixed_version == debian_support.Version('6.3-0.1etch1') assert tuple(n.bugs) == (324531,), n.bugs assert n.urgency == bugs.internUrgency('medium') elif n.package == 'python2.1': assert n.fixed_version == debian_support.Version('2.1.3dfsg-3') assert len(n.bugs) == 0, n.bugs assert n.urgency == bugs.internUrgency('medium') elif n.package == 'python2.2': assert n.fixed_version == debian_support.Version('2.2.3dfsg-4') assert len(n.bugs) == 0, n.bugs assert n.urgency == bugs.internUrgency('medium') elif n.package == 'python2.3': assert n.fixed_version == debian_support.Version('2.3.5-8') assert len(n.bugs) == 0, n.bugs assert n.urgency == bugs.internUrgency('medium') else: assert False assert bugs.BugFromDB(cursor, 'DSA-311').isKernelOnly() if __name__ == "__main__": test()