This is part 2 in my occasional
AnyEvent series
As we already discovered, signals don’t work in Strawberry Perl [on Windows]. As they are so useful, we really want a way to emulate them. I can think of a few approaches, but the most obvious is to have a userspace "kernel" sitting on a socket which all processes register with.
The kernel will respond to a few commands
register: [args]
list_processes # ps π
signal: [pid] [message] # similar to kill, but more flexible
A quick disclaimer: for the sake of brevity, I’ve omitted authentication and error handling.
Kernel Code
The server again uses AnyEvent and start_listen(), prepare_handler() and _accept_handler() should look pretty familiar.
Twiggy uses AE::io and checks errno for its conditions. I guess this is done for speed as AnyEvent::Handle is much easier to use.
sub watch_socket
{
my ($self, $sock) = @_;
# Closure variables
my $pid = -1;
my $handle; $handle = AnyEvent::Handle->new(
fh => $sock,
on_error => sub {
say "Error: $! (PID $pid)";
$handle->destroy;
},
on_eof => sub {
say "Disconnected (PID $pid)";
delete $self->{watchers}{$pid};
$handle->destroy;
},
);
$handle->push_read(line => sub {
my ($handle, $command) = @_;
DEBUG && say "Received [$command]";
# valid commands are:
# register: [args]
# list_processes
# signal: [pid] [message]
if ($command =~ /^register:\s+(.+)/) {
my $args = $1;
$pid = ++$self->{pid};
$self->register_watcher($handle, $pid, $args);
# Don't set on_drain(...)
return;
} elsif ($command eq 'list_processes') {
$self->list_processes($handle);
} elsif ($command =~ /^signal:\s+(\d+)\s+(.+)/) {
my ($pid, $message) = ($1, $2);
$self->send_signal($handle, $pid, $message);
} else {
say 'Error: unrecognised command';
$handle->push_write("Invalid command [$command]" . ${cr});
}
# For all commands apart from register, we want to close the
# socket after sending the response which we do by setting
# on_drain(...)
$handle->on_drain(sub { $handle->destroy(); });
});
}
register_watcher
register_watcher stores the handle and the information about the process in the object. It informs the process which PID has been given to the process. (I am still considering the registering process sending $$ to use as the PID).
sub register_watcher
{
my ($self, $handle, $pid, $args) = @_;
DEBUG && say "Registering [$args] (PID $pid)";
$handle->keepalive(1);
$self->{watchers}{$pid} = { handle => $handle, args => $args };
$handle->push_write("Connected. (PID $pid)" . ${cr});
# on_read(...) needs to be set, otherwise disconnects from the
# client side are not always detected
$handle->on_read(sub { });
}
list_processes
list_processes gives a process list of processes that are registered with the kernel. This is similar, but uglier to ps(1).
sub list_processes
{
my ($self, $handle) = @_;
foreach my $pid (sort { $a <=> $b } keys %{$self->{watchers}}) {
my $args = $self->{watchers}{$pid}{args};
$handle->push_write("$pid: $args" . ${cr});
}
}
send_signal
send_signal looks up the handle belonging to the process in $self->{watchers}. If it is registered, it is possible to send any message to the process, otherwise it returns an error message. I’ve added special handling to a disconnect message, allowing
This is the dangerous bit. kill(1) has some protection in that you are only able to send signals to processes you own (unless you are root). The implementation, as presented here, allows anyone who can telnet to the kernel port to send any message to any process that is registered. It would be necessary to add a layer of authentication and process ownership if the goal really was to provide something similar to unix signals.
sub send_signal
{
my ($self, $handle, $pid, $message) = @_;
if (! exists($self->{watchers}{$pid})) {
say "Error: PID $pid does not exist";
$handle->push_write("PID $pid does not exist" . ${cr});
} else {
my $process = $self->{watchers}{$pid}{handle};
$process->push_write('signal: ' . $message . ${cr});
if ($message eq 'disconnect') {
delete $self->{watchers}{$pid};
$process->on_drain(sub { $process->destroy(); });
}
}
}
kernel.pl – complete source
#!perl
use 5.010;
use strict;
use warnings;
package Kernel;
use constant DEBUG => $ENV{KERNEL_DEBUG};
my $cr = "\015\012";
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
sub new
{
my $class = shift;
my $self = { pid => 0, watchers => {} };
bless $self, $class;
return $self;
}
sub register_watcher
{
my ($self, $handle, $pid, $args) = @_;
DEBUG && say "Registering [$args] (PID $pid)";
$self->{watchers}{$pid} = { handle => $handle, args => $args };
$handle->push_write("Connected. (PID $pid)" . ${cr});
$handle->on_read(sub { });
}
sub list_processes
{
my ($self, $handle) = @_;
foreach my $pid (sort keys %{$self->{watchers}}) {
my $args = $self->{watchers}{$pid}{args};
$handle->push_write("$pid: $args" . ${cr});
}
}
sub send_signal
{
my ($self, $handle, $pid, $message) = @_;
if (! exists($self->{watchers}{$pid})) {
say "Error: PID $pid does not exist";
$handle->push_write("PID $pid does not exist" . ${cr});
} else {
my $process = $self->{watchers}{$pid}{handle};
$process->push_write('signal: ' . $message . ${cr});
if ($message eq 'disconnect') {
delete $self->{watchers}{$pid};
$process->on_drain(sub { $process->destroy(); });
}
}
}
sub watch_socket
{
my ($self, $sock) = @_;
# Closure variables
my $pid = -1;
my $handle; $handle = AnyEvent::Handle->new(
fh => $sock,
on_error => sub {
say "Error: $! (PID $pid)";
$handle->destroy();
},
on_eof => sub {
say "Disconnected (PID $pid)";
delete $self->{watchers}{$pid};
$handle->destroy();
},
);
$handle->push_read(line => sub {
my ($handle, $command) = @_;
DEBUG && say "Received [$command]";
if ($command =~ /^register:\s+(.+)/) {
my $args = $1;
$pid = ++$self->{pid};
$handle->keepalive();
$self->register_watcher($handle, $pid, $args);
return;
} elsif ($command eq 'list_processes') {
$self->list_processes($handle);
} elsif ($command =~ /^signal:\s+(\d+)\s+(.+)/) {
my ($pid, $message) = ($1, $2);
$self->send_signal($handle, $pid, $message);
} else {
say 'Error: unrecognised command';
$handle->push_write("Invalid command [$command]" . ${cr});
}
$handle->on_drain(sub { $handle->destroy(); });
});
}
sub prepare_handler
{
my ($fh, $host, $port) = @_;
DEBUG && warn "Listening on $host:$port\n";
}
sub _accept_handler
{
my $self = shift;
return sub {
my ($sock, $peer_host, $peer_port) = @_;
DEBUG && warn "Accepted connection from $peer_host:$peer_port\n";
if (! $sock) {
warn '$sock undefined' . "\n";
return;
}
$self->watch_socket($sock);
};
}
sub start_listen
{
my ($self, $host, $port) = @_;
$self->{server} = tcp_server($host,
$port,
$self->_accept_handler(),
\&prepare_handler);
}
package main;
my $host = undef;
my $port = 12345;
my $kernel = Kernel->new();
$kernel->start_listen($host, $port);
AE::cv->recv();
Read Full Post »