PHP Conference Kansai 2025

parallel\run

(1.0.0)

parallel\runEjecución

Descripción

parallel\run(Closure $task): ?Future

Programa task para ejecución en paralelo.

parallel\run(Closure $task, array $argv): ?Future

Programa task para ejecución en paralelo, pasando argv a la ejecución.

Planificación automática

Si un \parallel\Runtime creado y almacenado en caché por una llamada previa a parallel\run() está inactivo, se utilizará para ejecutar la tarea. Si ningún \parallel\Runtime está inactivo, parallel creará y almacenará en caché un \parallel\Runtime.

Nota:

Los objetos \parallel\Runtime creados por el desarrollador no se utilizan para la planificación automática.

Parámetros

task

Una Closure con características específicas.

argv

Un array de argumentos con características específicas para pasar a task en el momento de la ejecución.

Características de la tarea

Las cierres planificadas para la ejecución en paralelo no deben:

  • aceptar o devolver por referencia
  • aceptar o devolver objetos internos (ver notas)
  • ejecutar un conjunto limitado de instrucciones

Las instrucciones prohibidas en los cierres destinados a la ejecución en paralelo son:

  • yield
  • usar by-reference
  • declarar clases
  • declarar funciones nombradas

Nota:

Los cierres anidados pueden usar yield o by-reference, pero no deben contener declaraciones de clases o funciones nombradas.

Nota:

Ninguna instrucción está prohibida en los ficheros que la tarea puede incluir.

Características de los argumentos

Los argumentos no deben:

  • contener referencias
  • contener recursos
  • contener objetos internos (ver notas)

Nota:

En el caso de los recursos de flujo de ficheros, el recurso será convertido en descriptor de fichero y pasado en int si es posible, esto no está soportado en Windows.

Notas sobre los objetos internos

Los objetos internos utilizan generalmente una estructura personalizada que no puede ser copiada de manera segura por valor, PHP carece actualmente de mecanismos para hacerlo (sin serialización) y por lo tanto solo los objetos que no utilizan una estructura personalizada pueden ser compartidos.

Algunos objetos internos no utilizan una estructura personalizada, por ejemplo parallel\Events\Event y por lo tanto pueden ser compartidos.

Los cierres son un tipo especial de objeto interno y soportan ser copiados por valor, y por lo tanto pueden ser compartidos.

Los canales son centrales para la escritura de código paralelo y soportan el acceso y la ejecución concurrentes por necesidad, y por lo tanto pueden ser compartidos.

Advertencia

Una clase de usuario que extiende una clase interna puede usar una estructura personalizada tal como está definida por la clase interna, en cuyo caso no puede ser copiada de manera segura por valor, y por lo tanto no puede ser compartida.

Valores devueltos

Advertencia

El Future devuelto no debe ser ignorado cuando la tarea contiene una declaración de retorno o de lanzamiento.

Excepciones

Advertencia

Lanza una parallel\Runtime\Error\Closed si parallel\Runtime estaba cerrado.

Advertencia

Lanza una parallel\Runtime\Error\IllegalFunction si task es un cierre creado a partir de una función interna.

Advertencia

Lanza una parallel\Runtime\Error\IllegalInstruction si task contiene instrucciones ilegales.

Advertencia

Lanza una parallel\Runtime\Error\IllegalParameter si task acepta o argv contiene variables ilegales.

Advertencia

Lanza una parallel\Runtime\Error\IllegalReturn si task devuelve de manera ilegal.

Ver también

add a note

User Contributed Notes 3 notes

up
22
john_2885 at yahoo dot com
5 years ago
Here's a more substantial example of how to use the run functional API.

<?php
/*********************************************
* Sample parallel functional API
*
* Scenario
* -------------------------------------------
* Given a large number of rows of
* data to process, divide the work amongst
* a set of workers. Each worker is responsible
* for finishing their assigned task.
*
* In the code below, assume we have arbitrary
* start and end IDs (rows) - we will try to
* divide the number of IDs (rows) evenly
* across 8 workers. The workers will get the
* following batches to process to completion:
*
* Total number of IDs (rows): 1371129
* Each worker will get 171392 IDs to process
*
* Worker 1: IDs from 11001 to 182393
* Worker 2: IDs from 182393 to 353785
* Worker 3: IDs from 353785 to 525177
* Worker 4: IDs from 525177 to 696569
* Worker 5: IDs from 696569 to 867961
* Worker 6: IDs from 867961 to 1039353
* Worker 7: IDs from 1039353 to 1210745
* Worker 8: IDs from 1210745 to 1382130
*
* Each worker then processes 5000 rows at a time
* until they are done with their assigned work
*
*********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches
$rowsToFetch = 5000;

print
"Total IDs: " . $totalIds . "\n";
print
"Batch Size: " . $batchSize . "\n";
print
"Last Batch: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
$tempMinId = $startId;
$tempMaxId = $tempMinId + $fetchSize;
$fetchCount = 1;

print
"Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";

while(
$tempMinId < $endId) {
for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
$usleep = rand(500000, 1000000);
usleep($usleep);
print
"Worker " . $worker . " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
// Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
break;
}

// Now we move on to the next sub-batch for this worker
$tempMinId = $tempMaxId;
$tempMaxId = $tempMinId + $fetchSize;
if(
$tempMaxId > $endId) {
$tempMaxId = $endId;
}
// Introduce some timing randomness
$sleep = rand(1,5);
sleep($sleep);
$fetchCount++;
}

// This worker has completed their entire batch
print "Worker " . $worker . " finished\n";

};

// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
$startId = $minId + ($i * $batchSize);
$endId = $startId + $batchSize;
if(
$i == ($workers - 1)) {
$endId = $maxId;
}
\parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
up
9
anonymous user
4 years ago
Although function declaration is not allowed inside thread exec code, include is allowed. So if we want to declare a function, we could write another file that contain the function and include it.
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
$future = $runtime->run ( function () {
include
"included.php";
return
add (1, 3);
}, [ ] );
echo
$future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
return
$a + $b;
}
up
1
Thierry Kauffmann
3 years ago
<?php

/**
* Sample parralel functional API
* using a generator instead of a static list of items to process
*
* Items to process in parallel come from a generator
* It could be anything : e.g fetch a mysql array, a DirectoryIterator...
* Thus the number of items to process in parallel is NOT known in advance
*
* This algorithm attributes items to each parallel thread dynamically
* As soon as a thread has finished working
* It is assigned a new item to process
* until all items are processed (generator closes)
*
* In this example we process 50 items in 5 parallel threads
* It produces output in this form (output changes at each run) :
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/

use \parallel\{Runtime, Future, Channel, Events};

// Generate list of items to process with a generator
function generator(int $item_count) {
for (
$i=1; $i <= $item_count; $i++) {
yield
$i;
}
}

function
testConcurrency(int $concurrency, int $item_count) {

$generator = generator($item_count);

// Function executing in each thread. Have a snap for a random time for example !
$producer = function (int $item_id) {
$seconds = rand(1, 10);
sleep($seconds);
return [
'item_id' => $item_id, 'sleep_seconds' => $seconds];
};

// Fill up threads with initial 'inactive' state
$threads = array_fill(1, $concurrency, ['is_active' => false]);

while (
true) {
// Loop through threads until all threads are finished
foreach ($threads as $thread_id => $thread) {
if (!
$thread['is_active'] and $generator->valid()) {
// Thread is inactive and generator still has values : run something in the thread
$item_id = $generator->current();
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
echo
"ThreadId: $thread_id => Item: $item_id (Start)\n";
$threads[$thread_id]['is_active'] = true;
$generator->next();
} elseif (!isset(
$threads[$thread_id]['run'])) {
// Destroy supplementary threads in case generator closes sooner than number of threads
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} elseif (
$threads[$thread_id]['run']->done()) {
// Thread finished. Get results
$item = $threads[$thread_id]['run']->value();
echo
"ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

if (!
$generator->valid()) {
// Generator is closed then destroy thread
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} else {
// Thread is ready to run again
$threads[$thread_id]['is_active'] = false;
}
}
}

// Escape loop when all threads are destroyed
if (empty($threads)) break;
}
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>
To Top