#!/usr/bin/python3 import argparse import collections from datetime import datetime, timedelta import os import re import subprocess import sys try: import humanfriendly except ImportError: humanfriendly = None def format_date(timestamp): date_to_format = datetime.utcfromtimestamp(timestamp) delta = datetime.utcnow() - date_to_format output = date_to_format.strftime('%Y-%m-%d %H:%M') if delta.days > 1: output += ' ({} days ago)'.format(delta.days) elif delta.days == 1: output += ' (yesterday)' return output parser = argparse.ArgumentParser(description="Review DSA/DLA needed queues") parser.add_argument('--lts', action='store_true', help='Review dla-needed.txt instead of dsa-needed.txt') parser.add_argument('-v', '--verbose', action='store_true', help='Show more information, e.g. notes, commit author and per user stats') parser.add_argument('--quiet', action='store_true', help='Do not output anything but errors') parser.add_argument('--sort-by', default='last-update', choices=('last-update', 'claimed-date'), help='Sort by last-update (default) or by claimed-date') parser.add_argument('--skip-unclaimed', action='store_true', help='Skip unclaimed packages in the review') if humanfriendly: parser.add_argument('--unclaim', default=None, metavar='N', nargs='?', const='1w', help='Automatically unclaim entries older than specified delta (default: %(default)s)') else: parser.add_argument('--unclaim', default=None, metavar='N', type=int, nargs='?', const=604800, help='Automatically unclaim entries older than N seconds (default: %(default)s)') parser.add_argument('--exclude', nargs='+', metavar='PACKAGE', default=[], help='completely ignore packages specified PACKAGE') parser.add_argument('--exist-in', nargs='+', metavar='SUITE', default=[], help='query rmadison for existence in SUITE') args = parser.parse_args() if args.verbose and args.quiet: args.error("--verbose and --quiet contradiction") if args.unclaim: if humanfriendly: unclaim_delta = timedelta(seconds=humanfriendly.parse_timespan(args.unclaim)) else: unclaim_delta = timedelta(seconds=args.unclaim) if args.lts: dsa_dla_needed = 'data/dla-needed.txt' else: dsa_dla_needed = 'data/dsa-needed.txt' if not os.path.exists(dsa_dla_needed): args.error("ERROR: {} not found\n".format(dsa_dla_needed)) if not os.path.exists(".git"): args.error("ERROR: works only in a git checkout\n") process = subprocess.Popen(["git", "blame", "--line-porcelain", "--", dsa_dla_needed], stdout=subprocess.PIPE) context = {} in_preamble = True all_entries = [] per_user = collections.defaultdict(list) entry = None for line in process.stdout: line = line.decode('utf-8') res = re.search(r'^([0-9a-f]{40}) \d+', line) if res: context['commit'] = res.group(1) if line.startswith('author '): context['author'] = line.strip().split()[1] elif line.startswith('author-time '): context['author-time'] = int(line.strip().split()[1]) elif line.startswith('summary '): context['summary'] = line.strip().split(maxsplit=1)[1] elif line.startswith("\t"): line = line[1:] if line.startswith("--"): in_preamble = False entry = None elif in_preamble: continue elif line[0] == ' ' or line[0] == "\t": entry['note'] += line if context['author-time'] > entry['last-update']: entry['last-update'] = context['author-time'] entry['last-update-author'] = context['author'] entry['last-update-summary'] = context['summary'] entry['last-update-commit'] = context['commit'] else: res = re.match(r'^(\S+)(?:\s+\((.*)\)\s*)?$', line) entry = { 'pkg': res.group(1), 'claimed-by': res.group(2), 'claimed-date': context['author-time'], 'last-update': context['author-time'], 'last-update-author': context['author'], 'last-update-summary': context['summary'], 'last-update-commit': context['commit'], 'author': context['author'], 'note': '', } if entry['pkg'] not in args.exclude: per_user[entry['claimed-by']].append(entry['pkg']) all_entries.append(entry) retcode = process.wait() if retcode != 0: sys.stderr.write("WARNING: git blame returned error code {}\n".format(retcode)) all_entries.sort(key=lambda x: x[args.sort_by]) if args.exist_in: process = subprocess.Popen(["rmadison", "-u", "debian", "-a", "source", "-s", ",".join(args.exist_in), " ".join([entry['pkg'] for entry in all_entries])], stdout=subprocess.PIPE) rmadison = [] for line in process.stdout: pkg, ver, suite, arch = [f.strip() for f in line.decode('utf-8').split('|')] rmadison.append((pkg, suite)) for entry in all_entries: in_suites = set() for suite in args.exist_in: if (entry['pkg'], suite) in rmadison: in_suites.add(suite) missing_from = set(args.exist_in).difference(in_suites) if len(missing_from) > 0: entry.update({'missing-from': ",".join(missing_from)}) else: entry.update({'missing-from': None}) retcode = process.wait() if retcode != 0: sys.stderr.write("WARNING: rmadison returned error code {}\n".format(retcode)) unclaim_pkgs = [] for entry in all_entries: if args.skip_unclaimed and not entry['claimed-by']: continue args.quiet or print("Package: {}".format(entry['pkg'])) if entry['missing-from']: args.quiet or print("Missing-From: {}".format(entry['missing-from'])) if entry['claimed-by']: args.quiet or print("Claimed-By: {}".format(entry['claimed-by'])) args.quiet or print("Claimed-Date: {}".format(format_date(entry['claimed-date']))) if args.unclaim: if entry['last-update'] > entry['claimed-date']: date_to_format = datetime.utcfromtimestamp(entry['last-update']) else: date_to_format = datetime.utcfromtimestamp(entry['claimed-date']) if datetime.utcnow() - date_to_format > unclaim_delta: unclaim_pkgs.append(entry['pkg']) args.quiet or print("Unclaimed: idle for more than {}: {}".format(unclaim_delta, datetime.utcnow() - date_to_format)) else: args.quiet or print("Unclaimed-Since: {}".format(format_date(entry['claimed-date']))) if entry['last-update'] > entry['claimed-date']: args.quiet or print("Last-Update: {}".format(format_date(entry['last-update']))) if not args.verbose: args.quiet or print("") continue print("Last-Update-Author: {}".format(entry['last-update-author'])) print("Last-Update-Summary: {}".format(entry['last-update-summary'])) print("Last-Update-Commit: {}".format(entry['last-update-commit'])) if entry['note']: print("Notes:\n{}".format(entry['note'])) else: print("") if args.unclaim: args.quiet or print("Editing file to unclaim: {}".format(", ".join(unclaim_pkgs))) in_preamble = True with open(dsa_dla_needed) as orig, open(dsa_dla_needed + '.new', 'w') as new: for line in orig: if line.startswith('--'): in_preamble = False if in_preamble: new.write(line) # do not touch preamble else: # look for packages to unclaim in this line for pkg in unclaim_pkgs: if line.startswith(pkg + " ("): new.write(pkg + "\n") break else: # nothing found, write untouched line new.write(line) os.rename(dsa_dla_needed + '.new', dsa_dla_needed) if args.verbose: # sort by number of claimed packages items = sorted(per_user.items(), key=lambda x: len(x[1])) for user, pkgs in items: print("User: {}\nPackages: {}\nCount: {}\n".format(user, ", ".join(pkgs), len(pkgs)))