sub events {
my $self = shift;
my $timeout = 3600; # an hour
# Increase inactivity timeout for connection a bit
Mojo::IOLoop->stream($self->tx->connection)->timeout($timeout + 5);
$self->res->headers->content_type('text/event-stream');
my $event = $self->param('event');
my $type;
if ( $event eq 'testgrep' ) {
$type = 'pipe';
}
else {
$type = '';
}
if ( $type eq 'pipe' ) {
my $pipe = '/tmp/eventpipe_' . $$ . '_' . Time::HiRes::time;
$pipe = '/tmp/JD';
$self->write("event:${event}pipe\ndata:$pipe\n\n");
open(my $h, '+<', $pipe);
my $stream = Mojo::IOLoop::Stream->new( $h );
$self->stash->{stream} = $stream;
$stream->on('read' => sub {
my ( $s, $data ) = @_;
$data = decode_json $data;
for my $i (2000000..2000001) {
$data->{$i} = {ticked => 'foobar'};
}
$self->flash( testgrep => $data );
$self->app->log->info( 'READ DATA ' . Dumper($data) );
$self->write("event:$event\ndata:hello\n\n");
$s->close;
});
$stream->on('timeout' => sub {
my ( $s, $data ) = @_;
$self->app->log->info( 'TIMEOUT' );
$self->write("event:$event\ndata:Took too long, got bored waiting. Try again.\n\n");
});
$stream->on('close' => sub {
$self->app->log->info( 'CLOSE PIPE ' . $pipe );
});
$stream->on('error' => sub {
$self->app->log->info( 'ERROR READING PIPE ' . $pipe );
});
$stream->timeout($timeout);
$stream->start;
}
$self->render_later;
} # events