This example demonstrates various aspects of MTP with pthreads - esspecially worth of note is bidirectional communication with child threads.
I could not find anything about that so I would like to present you my research result.
<?php
class Model
{
public $id;
public $value;
}
class Connection
extends Worker
{
protected static $link;
public function __construct($hostname, $username, $password, $database, $port = 3306)
{
$this->hostname = $hostname;
$this->username = $username;
$this->password = $password;
$this->database = $database;
$this->port = $port;
}
public function getConnection()
{
if(!self::$link)
{
echo 'Thread: '. $this->getThreadId() ." Connecting to db\n";
self::$link = new \PDO(...);
}
return self::$link;
}
}
class QueryTask
extends Threaded
{
public $data;
public $result;
protected $_complete;
public function __construct(Model $data)
{
$this->_complete = false;
$this->data = $data;
}
public function run()
{
$pdo = $this->worker->getConnection();
$text = 'Thread: '. $this->worker->getThreadId() .' Job: '. $this->data->id .' Data: '. $this->data->value;
$t = microtime(true);
$stmt = $pdo->prepare("
INSERT INTO `test` (`id`, `text`) VALUES (NULL, '". $text ."')
");
$stmt->execute();
$dt = microtime(true) - $t;
$result = (int) $stmt->rowCount();
echo $text .' Result: '. $result .' Exec time: '. $dt ."s\n";
$this->result = $result;
$this->_complete = true;
}
public function isGarbage() : bool
{
return $this->_complete;
}
}
$t = microtime(true);
$pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]);
$tasks = 10;
for($i=0; $i<$tasks; ++$i)
{
$object = new Model();
$object->id = $i;
$object->value = rand();
$pool->submit(new QueryTask($object));
}
$data = [];
while(1)
{
$newData = [];
$pool->collect(function(QueryTask $task) use (&$newData) {
if($task->isGarbage())
{
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
$newData[ $task->data->id ] = $task->data->value;
}
return $task->isGarbage();
});
$data = array_merge($data, $newData);
if(count($data) == $tasks)
break;
usleep(100000);
}
var_dump($data);
?>
Result:
Thread: 6796 Connecting to db
Thread: 3156 Connecting to db
Thread: 9040 Connecting to db
Thread: 7748 Connecting to db
Thread: 8836 Connecting to db
Job: 0 Done in: 0.0070011615753174s
Job: 4 Done in: 0.0069999694824219s
Job: 2 Done in: 0.0090010166168213s
Job: 3 Done in: 0.0090010166168213s
Job: 1 Done in: 0.003000020980835s
Job: 5 Done in: 0.0069999694824219s
Job: 7 Done in: 0.0079998970031738s
Job: 6 Done in: 0.0049998760223389s
Job: 9 Done in: 0.0079998970031738s
Job: 8 Done in: 0.0069999694824219s
array(10) {
[0] =>
int(17730)
[1] =>
int(18771)
[2] =>
int(12944)
[3] =>
int(6025)
[4] =>
int(29582)
[5] =>
int(10159)
[6] =>
int(26556)
[7] =>
int(9029)
[8] =>
int(15002)
[9] =>
int(4043)
}
Things worth noting here:
1. Constructing of 5 workers for 10 tasks. 5 last task are runned on existing threads with already set up connection to db.
2. You can "send" data to thread by creating new task and submiting it.
3. You can retrive result by collect function.
4. You can pass simple object to task constructor.