The other day I was looking at a script that ran a bunch of more or less independent jobs in batches of four.
I’ve reproduced the core of the script as best as I can remember it.
Job
It has a class to represent the jobs themselves.
package Job;
use Moose;
has identifier => (
is => 'ro',
required => 1,
);
has cmd => (
is => 'ro',
required => 1,
);
no Moose;
__PACKAGE__->meta->make_immutable;
Job Manager
and a class that tries to ensure that 4 jobs are running in parallel wherever possible.
package JobManager;
use Moose;
use POSIX 'strftime';
has identifier => (
is => 'ro',
default => sub { strftime('%H%M%S', localtime(time())); },
);
has max_processes => (
is => 'ro',
default => 4,
);
has _job_id => (
is => 'ro',
writer => '_set_job_id',
init_arg => undef,
default => 1,
);
has queued_jobs => (
is => 'ro',
traits => ['Array'],
isa => 'ArrayRef[Job]',
default => sub { [] },
handles => {
enqueue_job => 'push',
dequeue_job => 'shift',
exist_queued_jobs => 'count',
},
);
has running_jobs => (
is => 'ro',
traits => ['Hash'],
isa => 'HashRef[Job]',
default => sub { {} },
handles => {
add_running_job => 'set',
delete_running_job => 'delete',
num_jobs => 'count',
},
);
sub next_job_id
{
my $self = shift;
my $job_id = $self->_job_id();
$self->_set_job_id($job_id + 1);
return sprintf "%02d", $job_id;
}
sub run_job
{
my ($self, $job) = @_;
my ($identifier, $cmd) = ($job->identifier(), $job->cmd());
my $pid = fork();
if (! defined($pid)) {
say "Failed to run job $identifier";
} elsif ($pid) {
say "Running job $identifier ($pid)";
$self->add_running_job($pid, $job);
} else {
system("$cmd > /tmp/$identifier.output 2>&1");
exit;
}
}
sub add_job
{
my ($self, $name, $cmd) = @_;
my $job = Job->new(
identifier => (sprintf "%s_${name}_%s",
$self->next_job_id(), $self->identifier()),
cmd => $cmd);
if ($self->num_jobs() > $self->max_processes()) {
$self->enqueue_job($job);
} else {
$self->run_job($job);
}
}
sub main_loop
{
my $self = shift;
while (1) {
my $pid = wait();
last if ($pid < 0);
say "Child $pid has exited";
$self->delete_running_job($pid);
while ($self->num_jobs() < $self->max_processes()) {
last unless $self->exist_queued_jobs();
$self->run_job($self->dequeue_job());
}
}
}
no Moose;
__PACKAGE__->meta->make_immutable;
Test Code
My test code to check if I got the code more or less correct.
my $manager = JobManager->new();
$manager->add_job('echo', 'sleep 10 ; echo hello');
for (1..9) {
$manager->add_job('echo', 'sleep 2 ; echo hello');
}
$manager->main_loop();
jared@localhost $ ls -ltr /tmp/*echo*
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/05_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/04_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/03_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/02_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/08_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/07_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/06_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/10_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/09_echo_193228.output
-rw-r--r-- 1 jared jared 6 2011-07-03 19:32 /tmp/01_echo_193228.output
Conclusion
I took two lessons away.
Parallel::Queue would have greatly simplified the core of this script. How many CPAN modules could my code benefit from equally if only I knew about them?
fork() is nice and easy to deal with. The code to manage the processes isn’t hugely complicated and seems pretty robust (careful, I may not have duplicated the robustness here).
Read Full Post »