#!/usr/bin/env python3 # # Copyright 2016 Chris Lamb # # This file is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This file is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this file. If not, see . import re import sys import gzip import datetime import eventlet import requests import dateutil.relativedelta from debian.deb822 import Sources from debian.debian_support import Version class LTSMissingUploads(object): MONTHS = 6 SOURCES = 'http://security.debian.org/dists/stretch/updates/main/source/Sources.gz' re_line = re.compile( r'(?Pmsg\d+.html).*\[DLA (?P[\d-]+)\] (?P[^\s]+) security update.*' ) re_version = re.compile(r'^Version.*: (?P.*)') def __init__(self): self.pool = eventlet.GreenPool(10) self.session = requests.session() def main(self, *args): self.info("Getting last {} month(s) of LTS annoucements", self.MONTHS) dlas = {} def download(x): self.info("{source}: parsing announcement from {url} ...", **x) x.update(self.get_dla(x['url'])[0]) dlas[x['source']] = x for idx in range(self.MONTHS): dt = datetime.datetime.utcnow().replace(day=1) - \ dateutil.relativedelta.relativedelta(months=idx) self.info( "Getting announcements for {}/{:02} ...", dt.year, dt.month, ) # Prefer later DLAs with reversed(..) for x in reversed(self.get_dlas(dt.year, dt.month)): if x['source'] not in dlas: self.pool.spawn_n(download, x) self.pool.waitall() if not dlas: return 0 sources = self.get_sources() for source, dla in sorted(dlas.items()): try: dla_version = Version(dla['version']) except ValueError: self.warn("{}: DLA-{} announced with invalid version: {} <{}>", source, dla['dla'], dla['version'], dla['url'], ) continue archive_version = Version(sources[source]) if dla_version <= archive_version: continue self.warn("{}: DLA-{} announced version {} but LTS has {} <{}>", source, dla['dla'], dla_version, archive_version, dla['url'], ) return 0 def get_dlas(self, year, month): url = 'https://lists.debian.org/debian-lts-announce/{}/{:02}/'.format( year, month, ) result = self.parse(url, self.re_line) # Prepend URL as the indices have relative URIs for x in result: x['url'] = '{}{}'.format(url, x['suffix']) return result def get_dla(self, url): return self.parse(url, self.re_version) def get_sources(self): self.info("Downloading Sources from {} ...", self.SOURCES) response = self.session.get(self.SOURCES) response.raise_for_status() val = gzip.decompress(response.content).decode('utf-8') return {x['Package']: x['Version'] for x in Sources.iter_paragraphs(val)} def parse(self, url, pattern): result = [] for x in self.session.get(url).content.splitlines(): m = pattern.search(x.decode('utf8')) if m is not None: result.append(m.groupdict()) return result ## def warn(self, msg, *args, **kwargs): print("W: " + msg.format(*args, **kwargs), file=sys.stderr) def info(self, msg, *args, **kwargs): print("I: " + msg.format(*args, **kwargs), file=sys.stderr) if __name__ == '__main__': eventlet.monkey_patch(socket=True) try: sys.exit(LTSMissingUploads().main(*sys.argv[1:])) except KeyboardInterrupt: sys.exit(1)