Commit 61dd9317 authored by Leigh Stoller's avatar Leigh Stoller

OK, another try at keeping the input event queue drained, so that the

recv socket buffer does not fill up and reject events. This one has
promise, it worked on a 1000 node experiment.
parent 077cb20e
......@@ -166,12 +166,24 @@ if (!event_subscribe($localhandle, \&callback, $tuple)) {
# Flag to know when there are no more events to process.
#
my $gotone;
my @notifications = ();
sub callback($$$)
{
my ($handle, $note, $data) = @_;
my ($handle, $notification, $data) = @_;
$gotone++;
my $clone = event_notification_clone($handle, $notification);
if (!$clone) {
print STDERR "Could not clone notification\n";
return;
}
unshift(@notifications, $clone);
}
sub HandleNotification($$)
{
my ($handle, $note) = @_;
my $time = time();
my $site = event_notification_get_site($handle, $note);
my $slice = event_notification_get_string($handle, $note, "slice");
......@@ -198,7 +210,7 @@ sub callback($$$)
if (0 && $slice !~ /stoller/);
$count++;
if ($debug || $verbose) {
if ($debug || $verbose || $slice =~ /XXpurpnurp/) {
print "Event: $count $time $site $type $urn $slice $details\n";
}
my $instance = APT_Instance->LookupBySlice($slice);
......@@ -364,7 +376,25 @@ while (1)
$gotone = 0;
event_poll($localhandle);
}
event_poll_blocking($localhandle, 100);
if (@notifications) {
while (@notifications) {
my $notification = pop(@notifications);
HandleNotification($localhandle, $notification);
event_notification_free($localhandle, $notification);
#
# Keep the incoming queue drained! If the socket buffer
# fills up cause we are running slow, we lose events and
# pubsubd starts throwing errors back to the sender.
#
$gotone = 1;
while ($gotone) {
$gotone = 0;
event_poll($localhandle);
}
}
}
event_poll_blocking($localhandle, 1000);
}
exit(0);
......
......@@ -424,23 +424,38 @@ my $counter = 0;
while (1)
{
$counter++;
$gotone = 1;
while ($gotone) {
$gotone = 0;
event_poll($localhandle);
}
while (@notifications) {
my $notification = pop(@notifications);
HandleNotification($localhandle, $notification);
event_notification_free($localhandle, $notification);
if (@notifications) {
while (@notifications) {
my $notification = pop(@notifications);
HandleNotification($localhandle, $notification);
event_notification_free($localhandle, $notification);
#
# Keep the incoming queue drained! If the socket buffer
# fills up cause we are running slow, we lose events and
# pubsubd starts throwing errors back to the sender, and
# that is bad.
#
$gotone = 1;
while ($gotone) {
$gotone = 0;
event_poll($localhandle);
}
}
}
event_poll_blocking($localhandle, 1000);
#
# Every 60 seconds compute new status for all slices. This might
# Periodically compute new status for all slices. This might
# generate new events for the loop above.
#
if ($counter++ >= 10) {
if ($counter >= 120) {
$counter = 0;
my @slices;
......@@ -457,6 +472,8 @@ while (1)
}
GeniUtil::FlushCaches();
}
event_poll_blocking($localhandle, 1000);
}
exit(0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment