summaryrefslogtreecommitdiffstats
path: root/bin/grab-cve-in-fix
blob: 98ea9cd47645840b23f318ad33bb8b66e576acf8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
grab-cve-in-fix - #1001451

- queries the latest version of source:<package_name> in unstable
- extracts all mentioned CVE IDs from the change
- creates a correctly formatted CVE snippet with the recorded fixes that
  can be reviewed and merged into the main data/CVE/list
"""

#
#  Copyright 2021-2022 Neil Williams <codehelp@debian.org>
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
#

# pylint: disable=too-few-public-methods,line-too-long,too-many-instance-attributes,too-many-branches

# Examples:
# --archive https://lists.debian.org/debian-devel-changes/2021/12/msg01280.html
# --tracker https://tracker.debian.org/news/1285227/accepted-freerdp2-241dfsg1-1-source-into-unstable/

import argparse
import os
import glob
import logging
import re
import sys
import requests

# depends on python3-apt
import apt_pkg

# depends on python3-debian
from debian.deb822 import Changes

import setup_paths  # noqa # pylint: disable=unused-import
from sectracker.parsers import (
    sourcepackages,
    FlagAnnotation,
    StringAnnotation,
    PackageAnnotation,
    Bug,
    cvelist,
    writecvelist,
)


class ParseChanges:
    """Base for parsing DEB822 content into a CVE list"""

    def __init__(self, url):
        self.url = url
        self.source_package = None
        self.cves = []
        self.bugs = {}
        self.parsed = []
        self.unstable_version = None
        self.tracker_base = "https://security-tracker.debian.org/tracker/source-package/"
        self.logger = logging.getLogger("grab-cve-in-fix")
        self.logger.setLevel(logging.DEBUG)
        # console logging
        ch_log = logging.StreamHandler()
        ch_log.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
        ch_log.setFormatter(formatter)
        self.logger.addHandler(ch_log)
        apt_pkg.init_system()  # pylint: disable=c-extension-no-member

    def _read_cvelist(self):
        os.chdir(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
        data, _ = cvelist("data/CVE/list")  # pylint: disable=no-value-for-parameter
        for cve in self.cves:
            for bug in data:
                if bug.header.name == cve:
                    self.bugs[cve] = bug
        package_checks = {}
        cve_notes = {}
        for cve, bug in self.bugs.items():
            self.logger.info("%s: %s", bug.header.name, bug.header.description)
            for line in bug.annotations:
                if isinstance(line, PackageAnnotation):
                    package_checks.setdefault(cve, [])
                    package_checks[cve].append(line.package)
                if isinstance(line, StringAnnotation) or isinstance(line, FlagAnnotation):
                    cve_notes.setdefault(cve, [])
                    cve_notes[cve].append(line.type)
            if cve not in package_checks:
                self.logger.error("CVE %s is not attributed to a Debian package: %s", cve, cve_notes.get(cve, ""))
            elif self.source_package not in package_checks[cve]:
                self.logger.warning(
                    "%s is listed against %s, not %s", cve, list(set(package_checks[cve])), self.source_package
                )
        if not self.cves:
            self.logger.warning(
                "no CVEs found in the changes output " "for %s %s",
                self.source_package,
                self.unstable_version,
            )

    def parse(self):
        """Parser-specific code to pick out the DEB822 content"""
        raise NotImplementedError

    def _read_changes(self):
        if not self.parsed:
            return
        rel = Changes(self.parsed)
        changes = rel.get("Changes")
        if not changes:
            self.logger.error("%s %s\n", rel, self.parsed)
            return
        self.source_package = rel.get("Source")
        self.unstable_version = rel.get("Version")
        match = None
        for log in changes.splitlines():
            match = re.findall(r"(CVE-[0-9]{4}-[0-9]+)", log)
            if match:
                self.cves += match

    def add_unstable_version(self):
        """
        Writes out a CVE file snippet with the filename:
        ./<src_package>.list
        Fails if the file already exists.

        Prints error if any of the listed CVEs are not found
        for the specified source_package.

        If a new version is set, the fixed version for the CVE will
        be updated to that version. Uses python3-apt to only update
        if the version is declared, by apt, to be newer.

        A typo in the CVE ID *may* cause a CVE to be declared as
        fixed in the wrong source package. This is complicated by
        the need to allow for embedded copies and removed packages.
        """
        modified = []
        cve_file = f"{self.source_package}.list"
        cves = sorted(set(self.cves))
        cves.reverse()
        for cve in cves:
            if cve not in self.bugs:
                self.logger.error(
                    "%s was not found in the Security Tracker CVE list! Check %s%s - "
                    "possible typo in the package changelog? Check the list of CVEs "
                    "in the security tracker and use this script again, in offline mode."
                    " ./bin grab-cve-in-fix --src %s --cves corrected-cve",
                    cve,
                    self.tracker_base,
                    self.source_package,
                    self.source_package,
                )
                continue
            for line in self.bugs[cve].annotations:
                if not isinstance(line, PackageAnnotation):
                    continue  # skip notes etc.
                if line.release:  # only update unstable
                    continue
                if line.package != self.source_package:
                    self.logger.info(
                        "Ignoring %s annotation for %s",
                        cve,
                        line.package,
                    )
                    continue  # allow for removed, old or alternate pkg names
                if line.version:
                    vcompare = apt_pkg.version_compare(  # pylint: disable=c-extension-no-member
                        line.version, self.unstable_version
                    )
                    if vcompare < 0:
                        self.logger.info("Updating %s to %s", line.version, self.unstable_version)
                        mod_line = line._replace(version=self.unstable_version)
                        index = self.bugs[cve].annotations.index(line)
                        bug_list = list(self.bugs[cve].annotations)
                        bug_list[index] = mod_line
                        mod_bug = Bug(self.bugs[cve].file, self.bugs[cve].header, tuple(bug_list))
                        modified.append(mod_bug)
                    elif vcompare > 0:
                        self.logger.error(
                            "%s is listed as fixed in %s which is newer than %s",
                            cve,
                            line.version,
                            self.unstable_version,
                        )
                    else:
                        self.logger.info(
                            "%s already has annotation for - %s %s",
                            cve,
                            self.source_package,
                            line.version,
                        )
                else:
                    mod_line = line._replace(version=self.unstable_version)
                    index = self.bugs[cve].annotations.index(line)
                    bug_list = list(self.bugs[cve].annotations)
                    bug_list[index] = mod_line
                    mod_bug = Bug(self.bugs[cve].file, self.bugs[cve].header, tuple(bug_list))
                    modified.append(mod_bug)
        if not modified:
            return 0
        if os.path.exists(cve_file):
            self.logger.critical("%s already exists", cve_file)
            return -1
        for cve in modified:
            self.logger.info(
                "Writing to ./%s with update for %s - %s %s",
                cve_file,
                cve.header.name,
                self.source_package,
                self.unstable_version,
            )
        with open(cve_file, "a") as snippet:
            writecvelist(modified, snippet)
        return 0


class ParseSources(ParseChanges):
    """Read latest version in unstable from updated local Sources files"""

    def parse(self):
        """
        Support to pick up unstable_version from the local packages cache.

        Also supports explicitly setting the version for times when
        the package has received an unrelated update in unstable.
        """
        if self.unstable_version:
            self.logger.info("Using forced version: %s", self.unstable_version)
            self._read_cvelist()
            self.add_unstable_version()
            return 0

        self.logger.info("Retrieving data from local packages data...")
        if not self.source_package or not self.cves:
            self.logger.error("for offline use, specify both --src and --cves options")
            return 1
        # self.url contains pkgdir which needs to contain Sources files
        os.chdir(self.url)
        for srcs_file in glob.glob("sid*Sources"):
            srcs = sourcepackages(srcs_file)  # pylint: disable=no-value-for-parameter
            if srcs.get(self.source_package):
                self.unstable_version = srcs[self.source_package].version
                # src package is only listed in one Sources file
                break
        self._read_cvelist()
        self.add_unstable_version()
        return 0


class ParseTrackerAccepted(ParseChanges):
    """
    Download and parse Accepted tracker NEWS

    e.g. https://tracker.debian.org/news/1285227/accepted-freerdp2-241dfsg1-1-source-into-unstable/
    """

    MARKER = '<div class="email-news-body">'

    def parse(self):
        self.logger.info("Retrieving data from distro-tracker...")
        req = requests.get(self.url)
        if req.status_code != requests.codes.ok:  # pylint: disable=no-member
            return 2
        self.parsed = []
        for line in req.text.splitlines():
            if not self.parsed and not line.startswith(self.MARKER):
                continue
            if self.MARKER in line:
                line = line.replace(self.MARKER, "")
            if "<pre>" in line:
                line = line.replace("<pre>", "")
            if line.startswith("\t"):
                line = line.replace("\t", "")
            self.parsed.append(line)
            if line.startswith("</pre>"):
                break
        self._read_changes()
        self._read_cvelist()
        self.add_unstable_version()
        return 0


class ParseDDChanges(ParseChanges):
    """
    Download and parse an email in the debian-devel-changes archive

    e.g. https://lists.debian.org/debian-devel-changes/2021/12/msg01280.html
    """

    def parse(self):
        self.logger.info("Retrieving data from debian-devel-changes archive...")
        req = requests.get(self.url)
        if req.status_code != requests.codes.ok:  # pylint: disable=no-member
            return 3
        for line in req.text.splitlines():
            if not self.parsed and not line.startswith("<pre>"):
                continue
            pars = line.replace("<pre>", "")
            self.parsed.append(pars)
            if line.startswith("</pre>"):
                break
        self._read_changes()
        self._read_cvelist()
        self.add_unstable_version()
        return 0


class ParseDDStdIn(ParseChanges):
    """
    Parse an email originating from debian-devel-changes passed
    on STDIN
    """

    MARKER = "-----BEGIN PGP SIGNED MESSAGE-----"

    def parse(self):
        self.logger.info("Retrieving data STDIN ...")
        content = sys.stdin.read()
        for line in content.splitlines():
            if not self.parsed and not line.startswith(self.MARKER):
                continue
            self.parsed.append(line)
        if not self.parsed:
            self.logger.warning("Unable to find PGP marker - unsigned content?")
            return 1
        self._read_changes()
        self._read_cvelist()
        self.add_unstable_version()
        return 0


def main():
    """
    1: Provide an option to parse the email from debian-devel-changes
    2: Provide an option to lookup the information using tracker.d.o
    3: Provide an option to read an email from debian-devel-changes on stdin
    4: Fallback to lookup the information in the local apt-cache
        data populated by 'make update-packages'
        data/packages/sid__main_Sources
        data/packages/sid__contrib_Sources
        data/packages/sid__non-free_Sources
    """
    parser = argparse.ArgumentParser(
        description="Grab CVE data from a package upload for manual review",
        usage="%(prog)s [-h] [[--input] | [--archive URL] | [--tracker TRACKER]] | "
        "[[--src SRC] & [--cves [CVES ...]]]",
        epilog="Data is written to a new <source_package>.list " "file which can be used with './bin/merge-cve-files'",
    )
    online = parser.add_argument_group(
        "Online - query one of distro-tracker or " "debian-devel-changes mail archive or debian-devel-changes email"
    )
    online.add_argument(
        "--input",
        action="store_true",
        help="Read from a debian-devel-changes email on STDIN",
    )
    online.add_argument(
        "--archive",
        help="URL of debian-devel-changes " "announcement in the list archive",
    )
    online.add_argument(
        "--tracker",
        help="URL of tracker.debian.org 'Accepted NEWS' page for unstable",
    )
    offline = parser.add_argument_group(
        "Offline - run 'make update-packages' first & specify source package and CVE list"
    )
    offline.add_argument("--src", help="Source package name to look up version in local packages files")
    offline.add_argument(
        "--force-version",
        help="Explicitly set the fixed version, in case sid has moved ahead.",
    )
    offline.add_argument("--cves", nargs="*", help="CVE ID tag with version from local packages files")
    args = parser.parse_args()
    if args.input:
        data = ParseDDStdIn(args.input)
        return data.parse()
    if args.archive:
        data = ParseDDChanges(args.archive)
        return data.parse()
    if args.tracker:
        data = ParseTrackerAccepted(args.tracker)
        return data.parse()
    pkg_dir = os.path.join(".", "data", "packages")
    if os.path.exists(pkg_dir):
        data = ParseSources(pkg_dir)
        data.source_package = args.src
        data.cves = args.cves
        if args.force_version:
            data.unstable_version = args.force_version
        return data.parse()
    logger = logging.getLogger("grab-cve-in-fix")
    logger.error("Unable to parse package data!")
    return -1


if __name__ == "__main__":
    sys.exit(main())

© 2014-2024 Faster IT GmbH | imprint | privacy policy