PHP Velho Oeste 2024

The parallel\Events class

(0.9.0)

The Event Loop

The Event loop monitors the state of sets of futures and or channels (targets) in order to perform read (parallel\Future::value(), parallel\Channel::recv()) and write (parallel\Channel::send()) operations as the targets become available and the operations may be performed without blocking the event loop.

Class synopsis

final class parallel\Events implements Countable, Traversable {
/* Input */
public setInput(Input $input): void
/* Targets */
public addChannel(parallel\Channel $channel): void
public addFuture(string $name, parallel\Future $future): void
public remove(string $target): void
/* Behaviour */
public setBlocking(bool $blocking): void
public setTimeout(int $timeout): void
/* Polling */
}

Table of Contents

add a note

User Contributed Notes 3 notes

up
16
s dot laufer at homegear dot email
4 years ago
<?php
/**
Example showing the usage of events.

The documentation still is very thin, so I'm not sure, the example is the best solution. But it works.
*/
use parallel\{Channel,Runtime,Events,Events\Event};

$myThread = function(Channel $channel) {
$events = new Events();
$events->addChannel($channel);
//$events->setBlocking(false); //Uncomment to don't block on Events::poll()
$events->setTimeout(1000000); //Comment when not blocking

while(true)
{
/*
...
Your code.
...
*/

//Read all available events
try
{
$event = NULL;
do
{
$event = $events->poll(); //Returns non-null if there is an event
if($event && $event->source == 'myChannel')
{
//It seems, the target gets deleted after returning an event,
//so add it again.
$events->addChannel($channel);
if(
$event->type == Event\Type::Read)
{
if(
is_array($event->value) && count($event->value) > 0)
{
if(
$event->value['name'] == 'stop')
{
echo
'Stopping thread';
return;
//Stop
}
else
{
echo
'Event: '.$event->value['name'].' => '.$event->value['value'].PHP_EOL;
}
}
}
else if(
$event->type == Event\Type::Close) return; //Stop
}
}
while(
$event);
}
catch(
Events\Error\Timeout $ex)
{
//Timeout
echo 'Timeout'.PHP_EOL;
}
}
};

class
MyClass {
private
$runtime;
private
$future;
private
$channel;

public function
start() {
//Create runtime
$this->runtime = new Runtime();

//Create buffered channel.
//Buffered channels don't block on Channel::send().
//Note that target names need to be unique within the process.
$this->channel = Channel::make('myChannel', Channel::Infinite);

global
$myThread;
$this->future = $this->runtime->run($myThread, [$this->channel]);
}

public function
stop() {
$this->channel->send(['name' => 'stop', 'value' => true]);

$this->future->value(); //Wait for thread to finish
$this->channel->close();
}

public function
emit(string $name, $value)
{
$this->channel->send(['name' => $name, 'value' => $value]);
}
}

$a = new MyClass();
$a->start();

for(
$i = 0; $i < 5; $i++)
{
$a->emit('test', $i);
sleep(0.5);
}

sleep(2);

for(
$i = 5; $i < 10; $i++)
{
$a->emit('test', $i);
sleep(0.5);
}

$a->stop();
?>
up
1
gam6itko
2 years ago
<?php

// example below shows how to stop child thread from main thread

use parallel\{Channel, Events, Events\Event\Type, Runtime};

$fnThread = static function (Channel $channel) {
$events = new Events();
$events->addChannel($channel);
$events->setBlocking(false); // don't block on Events::poll()

while (true) {
if (
$event = $events->poll()) {
//It seems, the target gets deleted after returning an event, so add it again.
$events->addChannel($channel);

if (
Type::Read === $event->type) {
echo
"- received value: ".$event->value.PHP_EOL;
} elseif (
Type::Close === $event->type) {
echo
"- receiving close event".PHP_EOL;
return
"i'm done";
}

}
echo
"-\n";
usleep(500_000);
}
};

// main thread
$runtime = new Runtime();
$channel = new Channel();
$future = $runtime->run($fnThread, [$channel]);

$channel->send('message1');
sleep(2);
$channel->send('message2');
sleep(2);

echo
"closing channel\n";
$channel->close();
echo
"future said: ".$future->value();
echo
PHP_EOL;
up
1
gam6itko
2 years ago
<?php

// this example shows how Events handle Future events

use parallel\{Events, Events\Event, Runtime};

$events = new Events();

$runtime = new Runtime();

//Read (type: 1)
$future = $runtime->run(
static function (
string $name) {
return
"Future#$name result";
},
[
'Read']
);
$events->addFuture("Future#Read", $future);

//Cancel (type: 4)
$future = $runtime->run(
static function (
string $name) {
throw new
\Exception("Exception#$name");
},
[
'Cancel']
);
$events->addFuture("Future#Cancel", $future);

//Kill (type: 5)
$future = $runtime->run(
static function () {
sleep(100000);
},
[]
);
$events->addFuture("Future#Kill", $future);
$future->cancel(); //kill it

//Error (type: 6)
$future = $runtime->run(
static function () {
$memoryEater = [];
$i = 0;
while (++
$i) {
$memoryEater[] = $i;
}
}
);
$events->addFuture("Future#Error", $future);

// reading events

/** @var Event $event */
foreach ($events as $i => $event) {
echo
str_pad('', 50, '=') . " EVENT_$i\n";
var_dump($event);
echo
"\n";
}
To Top