Skip to content

Commit

Permalink
Use inotify to detect when there's work to be done
Browse files Browse the repository at this point in the history
Rather than signals (which can only be sent by the same uid), use
inotify to detect when there's work to process in upload or staging
directory, or when a change has been made in the relarea.
  • Loading branch information
jon-turney committed Mar 21, 2024
1 parent 8b01a6e commit a7b9859
Showing 1 changed file with 87 additions and 66 deletions.
153 changes: 87 additions & 66 deletions calm/calm.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import sys
import tempfile
import time
from enum import Flag, auto, unique

import xtarfile

Expand Down Expand Up @@ -660,54 +661,38 @@ def do_output(args, state):
# daemonization loop
#

@unique
class Event(Flag):
read_uploads = auto()
read_relarea = auto()


def do_daemon(args, state):
import daemon
import inotify.adapters
import lockfile.pidlockfile

logging.getLogger('inotify.adapters').propagate = False

context = daemon.DaemonContext(
stdout=sys.stdout,
stderr=sys.stderr,
umask=0o002,
pidfile=lockfile.pidlockfile.PIDLockFile(args.daemon))

# XXX: running flag isn't actually doing anything anymore so can be removed
running = True
read_relarea = True
read_uploads = True
last_signal = None

# signals! the first, and best, interprocess communications mechanism! :)
def sigusr1(signum, frame):
logging.debug("SIGUSR1")
nonlocal last_signal
last_signal = signum
nonlocal read_uploads
read_uploads = True

def sigusr2(signum, frame):
logging.debug("SIGUSR2")
nonlocal last_signal
last_signal = signum
nonlocal read_relarea
read_relarea = True

def sigalrm(signum, frame):
logging.debug("SIGALRM")
nonlocal last_signal
last_signal = signum
nonlocal read_relarea
read_relarea = True
nonlocal read_uploads
read_uploads = True
# do all actions initially
action = Event.read_uploads | Event.read_relarea
saw_events = False

def sigterm(signum, frame):
logging.debug("SIGTERM")
nonlocal running
running = False
raise InterruptedError

context.signal_map = {
signal.SIGUSR1: sigusr1,
signal.SIGUSR2: sigusr2,
signal.SIGALRM: sigalrm,
signal.SIGTERM: sigterm,
}

Expand All @@ -718,60 +703,96 @@ def sigterm(signum, frame):

state.packages = {}

# watch for changes in relarea, upload and staging directories
i = inotify.adapters.InotifyTrees([args.rel_area, args.homedir, args.stagingdir],
mask=inotify.constants.IN_CREATE | inotify.constants.IN_DELETE | inotify.constants.IN_CLOSE_WRITE | inotify.constants.IN_ATTRIB | inotify.constants.IN_MOVED_TO,
block_duration_s=60)

try:
while running:
with mail_logs(state):
# re-read relarea on SIGALRM or SIGUSR2
if read_relarea:
if last_signal != signal.SIGALRM:
irk.irk("calm processing release area")
read_relarea = False
state.packages = process_relarea(args, state)

if not state.packages:
logging.error("errors in relarea, not processing uploads or writing setup.ini")
else:
if read_uploads:
if last_signal != signal.SIGALRM:
irk.irk("calm processing uploads")
# read uploads on SIGUSR1
read_uploads = False
state.packages = process_uploads(args, state)

do_output(args, state)
if action:
with mail_logs(state):
if Event.read_relarea in action:
if saw_events:
irk.irk("calm processing release area")
state.packages = process_relarea(args, state)

if not state.packages:
logging.error("errors in relarea, not processing uploads or writing setup.ini")
else:
if Event.read_uploads in action:
if saw_events:
irk.irk("calm processing uploads")
state.packages = process_uploads(args, state)

# if there is more work to do, but don't spin if we
# can't do anything because relarea is bad
if read_uploads:
continue
do_output(args, state)

# if there is more work to do
if read_relarea:
continue
if saw_events:
irk.irk("calm processing done")

# we wake at a 10 minute offset from the next 240 minute boundary
# (i.e. at :10 past every fourth hour) to check the state of the
# release area, in case someone has ninja-ed in a change there...
interval = 240 * 60
offset = 10 * 60
delay = interval - ((time.time() - offset) % interval)
signal.alarm(int(delay))
next_scan_time = time.time() + delay

if action:
logging.info("next rescan in %d seconds" % (delay))

action = Event(0)
saw_events = False
depth = args.rel_area.count(os.path.sep) + 1

# It would be nice to use inotify.adaptor's timeout feature so
# we go at least a few seconds without events, to ensure that we
# don't start processing in the middle of a flurry of events.
# Unfortunately, that goes back to waiting for the full
# block_duration_s if timeout hasn't expired...
for event in i.event_gen(yield_nones=True):
if event is not None:
logging.debug("inotify event %s" % str(event))
saw_events = True
(_, type_names, path, filename) = event
if path.startswith(args.rel_area):
# ignore sha512.sum and modifications to setup.*
# files in the arch directory
if (filename != 'sha512.sum') and (path.count(os.path.sep) > depth):
action |= Event.read_relarea
elif path.startswith(args.stagingdir):
action |= Event.read_uploads
elif (path.startswith(args.homedir)) and (filename == "!ready"):
action |= Event.read_uploads
else:
# None means no more events are currently available, so
# break to process actions
break

# wait until interrupted by a signal
if last_signal != signal.SIGALRM:
irk.irk("calm processing done")
logging.info("sleeping for %d seconds" % (delay))
signal.pause()
logging.info("woken")
if not saw_events:
if time.time() > next_scan_time:
logging.debug("scheduled rescan")
action |= (Event.read_uploads | Event.read_relarea)

if action:
logging.info("woken, actions %s" % action)

except InterruptedError:
# inotify module has the annoying behaviour of eating any EINTR
# returned by the poll on inotify fd, assuming it's indicating a
# timeout rather than a signal
#
# so we arrange for signals to raise an InterruptedError
# exception, to pop out here
irk.irk("calm daemon stopped by SIGTERM")

# cancel any pending alarm
signal.alarm(0)
except Exception as e:
with BufferingSMTPHandler(toaddrs=args.email, subject='calm stopping due to unhandled exception'):
logging.error("exception %s" % (type(e).__name__), exc_info=True)
irk.irk("calm daemon stopped due to unhandled exception")

else:
irk.irk("calm daemon stopped")
irk.irk("calm daemon stopped for unknown reason")

logging.info("calm daemon stopped")

Expand Down

0 comments on commit a7b9859

Please sign in to comment.