#!/usr/bin/env python import hashlib import logging import os import shlex import sqlite3 import sys import time from argparse import ArgumentParser from pathlib import Path parser = ArgumentParser( description="Only allow a program to run N times every I seconds" ) parser.add_argument( "-n", "--count", metavar="N", type=int, default=4, help="Max executions per interval (default 3)", ) parser.add_argument( "-i", "--interval", metavar="I", type=float, default=30, help="Interval to limit execution count within (seconds, default 60)", ) parser.add_argument( "-k", "--key", type=str, help="Unique identifier for this command (defaults to command)", ) parser.add_argument( "-v", "--verbose", action="store_true", default=False, help="Enable debug output" ) parser.add_argument("cmd", help="Command to run") parser.add_argument("args", nargs="*", help="Arguments to command") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) cmd_str = shlex.join([args.cmd] + args.args) if args.key is None: key = shlex.join(cmd_str) else: key = args.key key_hash = hashlib.sha256(key.encode("utf-8")).hexdigest() home = Path(os.environ["HOME"]) data_home = Path(os.environ.get("XDG_DATA_HOME", home / ".local/share")) data_home.mkdir(parents=True, exist_ok=True) data_path = data_home / "ratelimit.sqlite3" logging.debug("data file: %s", data_path) logging.debug("cmd: %s", cmd_str) now = time.monotonic() logging.debug("now: %f", now) con = sqlite3.connect(data_path) cur = con.cursor() cur.execute( """ create table if not exists ratelimit ( key_hash text not null primary key, deadline real not null, count integer not null ) """ ) cur.execute("delete from ratelimit where deadline < ?", [now]) row = cur.execute( "select deadline, count from ratelimit where key_hash = ?", [key_hash] ).fetchone() if row is None: execute = True cur.execute( "insert into ratelimit (key_hash, deadline, count) values (?, ?, ?)", [key_hash, now + args.interval, 1], ) logging.debug("cmd never executed") else: deadline, count = row logging.debug("cmd executed %d times", count) if now < deadline: if count < args.count: logging.debug("can execute %d more times", args.count - count) execute = True cur.execute( "update ratelimit set deadline = ?, count = ? where key_hash = ?", [now + args.interval, count + 1, key_hash], ) else: logging.debug( "interval hasn't passed (wait %f seconds)", deadline - now, ) execute = False else: logging.debug("interval has passed") execute = True cur.execute( "update ratelimit set deadline = ?, count = ? where key_hash = ?", [now + args.interval, 1, key_hash], ) con.commit() if execute: os.execl(args.cmd, args.cmd, *args.args) else: sys.exit(127)