diff --git a/apt/aptevent_daemon.in b/apt/aptevent_daemon.in index 7eaf3ae12ae43bf3f4a56da6ef6f191d2f2b56bd..df7eb5a24a5ab3c82cc5b4c4e3b5c7deee91342b 100644 --- a/apt/aptevent_daemon.in +++ b/apt/aptevent_daemon.in @@ -166,12 +166,24 @@ if (!event_subscribe($localhandle, \&callback, $tuple)) { # Flag to know when there are no more events to process. # my $gotone; +my @notifications = (); sub callback($$$) { - my ($handle, $note, $data) = @_; + my ($handle, $notification, $data) = @_; $gotone++; + my $clone = event_notification_clone($handle, $notification); + if (!$clone) { + print STDERR "Could not clone notification\n"; + return; + } + unshift(@notifications, $clone); +} + +sub HandleNotification($$) +{ + my ($handle, $note) = @_; my $time = time(); my $site = event_notification_get_site($handle, $note); my $slice = event_notification_get_string($handle, $note, "slice"); @@ -198,7 +210,7 @@ sub callback($$$) if (0 && $slice !~ /stoller/); $count++; - if ($debug || $verbose) { + if ($debug || $verbose || $slice =~ /XXpurpnurp/) { print "Event: $count $time $site $type $urn $slice $details\n"; } my $instance = APT_Instance->LookupBySlice($slice); @@ -364,7 +376,25 @@ while (1) $gotone = 0; event_poll($localhandle); } - event_poll_blocking($localhandle, 100); + if (@notifications) { + while (@notifications) { + my $notification = pop(@notifications); + HandleNotification($localhandle, $notification); + event_notification_free($localhandle, $notification); + + # + # Keep the incoming queue drained! If the socket buffer + # fills up cause we are running slow, we lose events and + # pubsubd starts throwing errors back to the sender. + # + $gotone = 1; + while ($gotone) { + $gotone = 0; + event_poll($localhandle); + } + } + } + event_poll_blocking($localhandle, 1000); } exit(0); diff --git a/protogeni/event/igevent_daemon.in b/protogeni/event/igevent_daemon.in index 9fe7435fa1b80a03206b157059b76681baf02374..61ef49b6bfe0b7d604c3e2394ac9667232a9bc63 100644 --- a/protogeni/event/igevent_daemon.in +++ b/protogeni/event/igevent_daemon.in @@ -424,23 +424,38 @@ my $counter = 0; while (1) { + $counter++; + $gotone = 1; while ($gotone) { $gotone = 0; event_poll($localhandle); } - while (@notifications) { - my $notification = pop(@notifications); - HandleNotification($localhandle, $notification); - event_notification_free($localhandle, $notification); + if (@notifications) { + while (@notifications) { + my $notification = pop(@notifications); + HandleNotification($localhandle, $notification); + event_notification_free($localhandle, $notification); + + # + # Keep the incoming queue drained! If the socket buffer + # fills up cause we are running slow, we lose events and + # pubsubd starts throwing errors back to the sender, and + # that is bad. + # + $gotone = 1; + while ($gotone) { + $gotone = 0; + event_poll($localhandle); + } + } } - event_poll_blocking($localhandle, 1000); # - # Every 60 seconds compute new status for all slices. This might + # Periodically compute new status for all slices. This might # generate new events for the loop above. # - if ($counter++ >= 10) { + if ($counter >= 120) { $counter = 0; my @slices; @@ -457,6 +472,8 @@ while (1) } GeniUtil::FlushCaches(); } + + event_poll_blocking($localhandle, 1000); } exit(0);