PHP 8.0.0 Released!

Класс parallel\Channel

(0.9.0)

Небуферизованные каналы

Небуферизованный канал будет блокировать вызовы parallel\Channel::send() до тех пор, пока не будет получатель и блокировать вызовы parallel\Channel::recv() до тех пор, пока не будет отправитель. Это означает, что небуферизованный канал - это не только способ обмена данными между задачами, но и простой метод синхронизации.

Небуферизованный канал - это самый быстрый способ обмена данными между задачами, требующий наименьшего количества копирования.

Буферизованные каналы

Буферизованный канал не будет блокироваться при вызовах parallel\Channel::send() до тех пор, пока не будет достигнута емкость, вызовы parallel\Channel::recv() будет блокироваться, пока в буфере не появятся данные.

Замыкания поверх каналов

Мощная особенность параллельных каналов состоит в том, что они позволяют обмениваться замыканиями между задачами (и средами выполнения).

Когда замыкание отправляется по каналу, оно буферизуется, не меняет буферизацию канала, передающего замыкание, но оно влияет на статическую область видимости внутри замыкания: одно и то же замыкание, отправленое в разные среды выполнения или в одну и ту же среду выполнения, не будет делиться своей статической областью.

Это означает, что всякий раз, когда выполняется замыкание, которое было передано каналом, статическое состояние будет таким, каким оно было при буферизации замыкания.

Анонимные каналы

Конструктор анонимного канала позволяет программисту избегать присвоения имен каждому каналу: parallel генерирует уникальное имя для анонимных каналов.

Обзор классов

final parallel\Channel {
/* Анонимный конструктор */
public __construct ( )
public __construct ( int $capacity )
/* Доступ */
public make ( string $name ) : Channel
public make ( string $name , int $capacity ) : Channel
public open ( string $name ) : Channel
/* Совместное использование */
public recv ( ) : mixed
public send ( mixed $value ) : void
/* Закрытие */
public close ( ) : void
/* Константа для бесконечной буферизации */
const Infinite ;
}

Содержание

add a note add a note

User Contributed Notes 3 notes

up
2
rustysun
1 year ago
an example used unbuffered channel.
<?php

use parallel\{Channel,Runtime};

$sum=function(array $a, Channel $ch) {
   
$sum=0;
    foreach (
$a as $v) {
       
$sum+=$v;
    }
   
$ch->send($sum);
};
try {
   
$a=[7, 2, 8, 1, 4, 0, 9, 10];
   
//unbuffered channel
   
$runtime=new Runtime;
   
$ch2=new Channel;
   
$runtime->run($sum, [array_slice($a, 0, $num), $ch2]);
   
$runtime->run($sum, [array_slice($a, $num), $ch2]);
   
//receive from channel
   
$x=$ch2->recv();
   
$y=$ch2->recv();
   
$ch2->close();
    echo
"\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
    echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
    echo
"\nException:", $e->getMessage();
}

//output
//ch2:18  23      41
up
2
hdvianna
10 months ago
This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.

<?php

use parallel\{Runtime, Channel};

main($argv);

function
main(array $argv)
{
    if (
count($argv) !== 3) {
        echo
"Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;
        echo
"Example: hello-parallel.php 5 3" . PHP_EOL;
        die;
    } else {
       
$numberOfTasks = intval($argv[1]);
       
$maximumTimeOfSleep = intval($argv[2]);
       
$t1 = microtime(true);
       
parallelize($numberOfTasks, $maximumTimeOfSleep);
       
$endTime = microtime(true) - $t1;
        echo
PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;
    }
}

function
parallelize(int $numberOfTasks, int $maximumTimeOfSleep)
{
   
$channel = new Channel();

   
$taskIds = array_map(function () use ($maximumTimeOfSleep) {
        return
$id = uniqid("task::");
    },
range(0, $numberOfTasks - 1));

   
$timesToSleep = array_map(function () use ($maximumTimeOfSleep) {
        return
rand(1, $maximumTimeOfSleep);
    },
$taskIds);

   
$producer = new Runtime();
   
$producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {
        foreach (
$timesToSleep as $timeToSleep) {
           
$channel->send($timeToSleep);
        }
    }, [
$channel, $timesToSleep]);

   
$consumerFutures = array_map(function (string $id) use ($channel) {
       
$runtime = new Runtime();
        return
$runtime->run(function (string $id, Channel $channel) {
           
$timeToSleep = $channel->recv();
            echo
"Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;
           
sleep($timeToSleep);
            echo
"$id slept for $timeToSleep second(s).".PHP_EOL;
            return
$timeToSleep;
        }, [
$id, $channel]);
    },
$taskIds);

   
wait($consumerFutures);
   
wait([$producerFuture]);
}

function
wait(array $futures)
{
    return
array_map(function ($future) {
        return
$future->value();
    },
$futures);
}
up
0
rustysun
1 year ago
<?php
use parallel\Channel;

function
sum(array $a, Channel $ch) {
   
$sum=0;
    foreach (
$a as $v) {
       
$sum+=$v;
    }
   
$ch->send($sum);
}

try {
   
$a=[7, 2, 8, 1, 4, 0, 9, 10];
   
$ch1=Channel::make('sum', 2);
   
$ch2=new Channel;
   
$num=count($a) / 2;
   
sum(array_slice($a, 0, $num), $ch1);
   
sum(array_slice($a, $num), $ch1);

   
//receive from channel
   
$x=$ch1->recv();
   
$y=$ch1->recv();
   
$ch1->close();
    echo
"\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
    echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
    echo
"\nException:", $e->getMessage();
}
To Top