#!/usr/bin/python3 -tt # # Author: Toshio Kuratomi # Copyright 2012 Toshio Kuratomi # License: GPLv2+ # # This copyrighted material is made available to anyone wishing to use, modify, # copy, or redistribute it subject to the terms and conditions of the GNU # General Public License v.2, or (at your option) any later version. This # program is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the GNU # General Public License along with this program; if not, write to the Free # Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the source # code or documentation are not subject to the GNU General Public License and # may only be used or replicated with the express permission of Red Hat, Inc. # # Add to crontab # # sudo crontab -e # */1 * * * * /home/badger/bin/nanny.py # Sample every minute __version__ = '1.0.0' __version_info__ = ['1','0','0'] import os import re import sys import copy import sqlite3 import argparse import datetime import subprocess TIMED_LOGINS = ('username', 'to', 'check', 'and', 'logout') DEFAULT_SAMPLES = 30 DB_FILE = '/var/lib/nanny/timed_logins.sqlite' DBUS_SESSION_RE = re.compile('^DBUS_SESSION_BUS_ADDRESS=\(.*\)$') def initialize_db(): '''Create database directory and database if they don't exist''' nanny_dir = os.path.dirname(DB_FILE) if not os.path.isdir(nanny_dir): os.makedirs(nanny_dir, mode=0o755) if not os.path.exists(DB_FILE): # Create the DB_FILE db = sqlite3.connect(DB_FILE) cursor = db.cursor() # sqlite3 doesn't seem to substitute ? in create table statements cursor.execute('create table times (username text, date timestamp,' ' quota integer default %s, samples integer default 0)' % DEFAULT_SAMPLES) db.commit() def take_sample(): '''Determine who is currently logged in''' # universal_newlines to get back str instead of bytes cmd = subprocess.Popen(['who'], stdout=subprocess.PIPE, universal_newlines=True) output = cmd.communicate() for record in output[0].split('\n'): fields = record.split() if fields and fields[1].startswith(':'): yield fields[0], fields[1] def update_sample(username, quota=None, increment=1): today = datetime.datetime.today() db = sqlite3.connect(DB_FILE) cursor = db.cursor() cursor.execute('select samples, quota from times where username = ? and date = ?', (username, today.strftime('%Y-%m-%d'))) samples = cursor.fetchall() if len(samples) == 0: if quota is None: quota = DEFAULT_SAMPLES cursor.execute('insert into times (username, date, quota, samples)' ' values(?, ?, ?, ?)', (username, today.strftime('%Y-%m-%d'), quota, increment)) elif len(samples) > 1: if quota is None: quota = DEFAULT_SAMPLES total = increment for sample in samples: total = total + sample[0] if sample[1] != DEFAULT_SAMPLES: quota = sample[1] cursor.execute('delete from times where username = ? and date = ?', (username, today.strftime('%Y-%m-%d'))) cursor.execute('insert into times (username, date, quota, samples)' ' values(?, ?, ?, ?)', (username, today.strftime('%Y-%m-%d'), quota, total)) else: total = samples[0][0] + increment if quota is None: quota = samples[0][1] cursor.execute('update times set samples = ?, quota = ? where' ' username = ? and date = ?', (total, quota, username, today.strftime('%Y-%m-%d'))) db.commit() db.close() def notify(username, message, display=':0'): '''Use the notification daemon to notify the person their session is ending''' env = copy.copy(os.environ) env['DISPLAY'] = display # The notification daemon requires you're able to talk to the user's dbus directory = os.path.expanduser('~{}/.dbus/session-bus'.format(username)) newest = None newest_ts = 0 for filename in os.listdir(directory): stats = os.stat(os.path.join(directory, filename)) if stats.st_mtime > newest_ts: newest_ts = stats.st_mtime newest = os.path.join(directory, filename) for line in open(newest).readline(): match = DBUS_SESSION_RE.match(line) if match: env['DBUS_SESSION_BUS_ADDRESS'] = match.group(1) continue cmd = subprocess.Popen(['sudo', '-u', username, 'notify-send', '-t', '1000', message], env=env) cmd.wait() def check_sample(username, display): today = datetime.datetime.today() db = sqlite3.connect(DB_FILE) cursor = db.cursor() # look at sample for today cursor.execute('select samples, quota from times where username = ? and date = ?', (username, today.strftime('%Y-%m-%d'))) # if the total sample is over max samples = cursor.fetchall() if samples and samples[0][0] > samples[0][1]: # killall processes with that username cmd = subprocess.Popen(['pkill', '-u', username]) cmd.communicate() elif samples and (samples[0][0] + 2) == samples[0][1]: # Warn that the time is almost up notify(username, 'Two minutes left', display=display) db.close() def sample_handler(args): for person, display in take_sample(): update_sample(person) if person in TIMED_LOGINS: check_sample(person, display) def list_handler(args): '''List used quotas for today''' today = datetime.datetime.today() db = sqlite3.connect(DB_FILE) cursor = db.cursor() cursor.execute('select username, samples, quota from times where date = ?', (today.strftime('%Y-%m-%d'),)) print('Quota\tUsed\tUser') for record in cursor.fetchall(): quota = 'n/a' if record[0] in TIMED_LOGINS: quota = record[2] print('{}\t{}\t{}'.format(quota, record[1], record[0])) def set_handler(args): '''Set new quotas for people''' update_sample(args.username, args.quota, increment=0) # Silly argparse, exit() is for programs, not libraries! def handle_errors(msg): raise Exception() def parse_args(args=None): '''Parse the arguments and setup for processing''' parser = argparse.ArgumentParser( description='Manage how long a user can be logged in', epilog='''This program should be setup to run from cron (as root) every minute. It will then start recording who is logged into the computer every time it runs. There are a few variables to fill in at the top of the program. These will allow you to set which users will be logged out if they use up their time and set some defaults. The list and set subcommands allow you to change values on a given day. ''') subparsers = parser.add_subparsers() samp_parser = subparsers.add_parser('sample', help='Record who is logged in at this time [default]') samp_parser.set_defaults(handler=sample_handler) set_parser = subparsers.add_parser('set', help='Set a new time quota for a user') set_parser.set_defaults(handler=set_handler) set_parser.add_argument('username', action='store', help='Account to set a new quota for') set_parser.add_argument('quota', action='store', type=int, help='Amount to set quota to') list_parser = subparsers.add_parser('list', help='List quotas and time used today for users on the system') list_parser.set_defaults(handler=list_handler) old_error = parser.error parser.error = handle_errors exception = False try: parsed_args = parser.parse_args(args) except: exception = True if exception: parser.error = old_error if args == None: # sys.argv args = copy.copy(sys.argv) args[0] = 'sample' else: if args[0] not in ('sample', 'set'): args.insert(0, 'sample') parsed_args = parser.parse_args(args) return parsed_args if __name__ == '__main__': initialize_db() args = parse_args() args.handler(args)