summaryrefslogtreecommitdiffstats
path: root/bin/lts-missing-uploads.py
blob: 1c76c2f7b7656467b87ef85e71f743cd187f2ee7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
#
# Copyright 2016 Chris Lamb <lamby@debian.org>
#
# This file is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This file is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this file.  If not, see <https://www.gnu.org/licenses/>.

import re
import sys
import gzip
import datetime
import eventlet
import requests
import dateutil.relativedelta

from debian.deb822 import Sources
from debian.debian_support import Version

class LTSMissingUploads(object):
    MONTHS = 6
    SOURCES = 'http://security.debian.org/dists/stretch/updates/main/source/Sources.gz'

    re_line = re.compile(
        r'(?P<suffix>msg\d+.html).*\[DLA (?P<dla>[\d-]+)\] (?P<source>[^\s]+) security update.*'
    )
    re_version = re.compile(r'^Version.*: (?P<version>.*)')

    def __init__(self):
        self.pool = eventlet.GreenPool(10)
        self.session = requests.session()

    def main(self, *args):
        self.info("Getting last {} month(s) of LTS annoucements", self.MONTHS)

        dlas = {}
        def download(x):
            self.info("{source}: parsing announcement from {url} ...", **x)
            x.update(self.get_dla(x['url'])[0])
            dlas[x['source']] = x

        for idx in range(self.MONTHS):
            dt = datetime.datetime.utcnow().replace(day=1) - \
                dateutil.relativedelta.relativedelta(months=idx)

            self.info(
                "Getting announcements for {}/{:02} ...",
                dt.year,
                dt.month,
            )

            # Prefer later DLAs with reversed(..)
            for x in reversed(self.get_dlas(dt.year, dt.month)):
                if x['source'] not in dlas:
                    self.pool.spawn_n(download, x)
            self.pool.waitall()

        if not dlas:
            return 0

        sources = self.get_sources()

        for source, dla in sorted(dlas.items()):
            try:
                dla_version = Version(dla['version'])
            except ValueError:
                self.warn("{}: DLA-{} announced with invalid version: {} <{}>",
                    source,
                    dla['dla'],
                    dla['version'],
                    dla['url'],
                )
                continue

            archive_version = Version(sources[source])

            if dla_version <= archive_version:
                continue

            self.warn("{}: DLA-{} announced version {} but LTS has {} <{}>",
                source,
                dla['dla'],
                dla_version,
                archive_version,
                dla['url'],
            )

        return 0

    def get_dlas(self, year, month):
        url = 'https://lists.debian.org/debian-lts-announce/{}/{:02}/'.format(
            year,
            month,
        )

        result = self.parse(url, self.re_line)

        # Prepend URL as the indices have relative URIs
        for x in result:
            x['url'] = '{}{}'.format(url, x['suffix'])

        return result

    def get_dla(self, url):
        return self.parse(url, self.re_version)

    def get_sources(self):
        self.info("Downloading Sources from {} ...", self.SOURCES)

        response = self.session.get(self.SOURCES)
        response.raise_for_status()

        val = gzip.decompress(response.content).decode('utf-8')

        return {x['Package']: x['Version'] for x in Sources.iter_paragraphs(val)}

    def parse(self, url, pattern):
        result = []

        for x in self.session.get(url).content.splitlines():
            m = pattern.search(x.decode('utf8'))

            if m is not None:
                result.append(m.groupdict())

        return result

    ##

    def warn(self, msg, *args, **kwargs):
        print("W: " + msg.format(*args, **kwargs), file=sys.stderr)

    def info(self, msg, *args, **kwargs):
        print("I: " + msg.format(*args, **kwargs), file=sys.stderr)

if __name__ == '__main__':
    eventlet.monkey_patch(socket=True)

    try:
        sys.exit(LTSMissingUploads().main(*sys.argv[1:]))
    except KeyboardInterrupt:
        sys.exit(1)

© 2014-2024 Faster IT GmbH | imprint | privacy policy