reactphp

Aplicações em tempo real com PHP usando WebSockets

Neste artigo veremos uma introdução a WebSockets com a criação de um servidor em PHP e usando o navegador do usuário como cliente.

Antes, entretanto, recomendo que você assista o vídeo O que são WebSockets? gravado pelo Akira Hanashiro (aqui da TreinaWeb) que explica de uma forma bem sucinta.

E, se você tiver interesse, também recomendo a leitura do artigo Uma introdução a TCP, UDP e Sockets para se ter a base do que é um socket na arquitetura TCP/IP.

Desenvolvedor PHP Júnior
Formação: Desenvolvedor PHP Júnior
Nesta formação você aprenderá todos os fundamentos necessário para iniciar do modo correto com a linguagem PHP, uma das mais utilizadas no mercado. Além dos conceitos de base, você também conhecerá as características e a sintaxe da linguagem de forma prática.
CONHEÇA A FORMAÇÃO

Ok, mas o que é um WebSocket?

WebSocket é um protocolo que permite a criação de um canal de comunicação cliente-servidor com transmissão bidirecional onde ambos os lados (cliente e servidor) podem transmitir dados simultaneamente. WebSocket veio para suprir as deficiências do protocolo Http para esse objetivo. O protocolo Http, que por sinal, é unidirecional (a transmissão ocorre só de uma ponta para a outra), onde o cliente envia a requisição e o servidor retorna a resposta, finalizando ali a conexão. Ou seja, se o cliente envia 5 requisições Http para o servidor, 5 conexões TCP independentes são abertas, enquanto que com WebSocket uma única conexão TCP é aberta e ela fica disponível para troca de dados a qualquer momento (uma conexão persistente, até que um dos lados decida fechá-la).

É muito importante pontuar, também, que Socket e WebSocket são coisas diferentes. Um WebSocket é um protocolo que roda em cima de sockets TCP, enquanto que um socket é uma abstração, uma porta lógica de comunicação entre duas pontas numa rede.

Benefícios de usar WebSocket em detrimento a Http

Se existe a necessidade de uma conexão permanecer aberta por um longo tempo para uma troca de dados constante, WebSocket é uma ótima escolha. Uma conexão Http é relativamente pesada, ela transmite não só os dados, mas também cabeçalhos. Além disso, possui um curto tempo de vida e não mantém estado (stateless).

Já uma conexão WebSocket, depois do handshake entre o cliente e o servidor, ela permanece aberta até que uma das partes decida fechá-la. E foco dela é na transmissão dos dados, cabeçalhos não são transmitidos pra lá e pra cá.

Então, em resumo, os benefícios em relação ao Http são: baixa latência, conexão persistente e full-duplex (transmissão bidirecional).

Principais casos de uso para WebSockets

Home Brokers (onde cotações são atualizadas a todo instante), feeds de redes sociais, aplicativos de bate-papo, ferramentas de edição colaborativa, jogos multi-player etc.

Criando o primeiro servidor

Neste artigo usaremos a library Ratchet que nos permite criar um servidor WebSocket assíncrono, e para isso ela utiliza o event loop do ReactPHP. Outra opção seria utilizarmos o servidor de WebSocket do Swoole. Mas o bom de usar o ReactPHP é que não precisamos instalar nenhuma extensão específica na nossa instalação do PHP.

Crie uma pasta chamada websocket-demo e dentro dela crie o arquivo composer.json:

{
    "require": {
        "cboden/ratchet": "^0.4.1"
    }
}

Instale as dependências:

$ composer install

Agora, crie um arquivo server.php com a seguinte implementação:

<?php

require './vendor/autoload.php';

use Ratchet\Server\EchoServer;

$app = new Ratchet\App('localhost', 9980);
$app->route('/echo', new EchoServer, ['*']);
$app->run();

Esse é o servidor mais primitivo que existe, é um “Echo Server”, ele envia de volta tudo o que o cliente manda pra ele. Nem tivemos a necessidade de implementá-lo, pois ele já vem junto com o Ratchet.

Por fim, crie um arquivo index.html com a seguinte implementação:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket EchoServer</title>
</head>
<body>
<label for="input">Digite aqui: </label>
<input id="input" type="text" placeholder="Digite aqui"/>

<div id="response"></div>
<script>
    let input = document.getElementById('input');
    let response = document.getElementById('response');
    const socket = new WebSocket('ws://localhost:9980/echo');

    // Ao estabelecer a conexão enviamos uma mensagem pro servidor
    socket.addEventListener('open', function () {
        socket.send('Conexão estabelecida.');
    });

    // Callback disparado sempre que o servidor retornar uma mensagem
    socket.addEventListener('message', function (event) {
        response.insertAdjacentHTML('beforeend', "<p><b>Servidor diz: </b>" + event.data + "</p>");
    });

    input.addEventListener('keyup', function (event) {
        if (event.keyCode === 13) {
            socket.send(this.value);
            this.value = '';
        }
    });
</script>
</body>
</html>

Para testar, primeiro de tudo temos que ter o servidor rodando. Na raiz do projeto execute:

$ php server.php

Vai iniciar o servidor na porta 9980. Por fim, basta executar o arquivo index.html e interagir escrevendo mensagens e apertando enter para enviá-las. Tudo o que o servidor receber de input, ele retornará de volta para o cliente.

O objeto WebSocket é nativo e presente em todos os navegadores modernos. Iniciamos o servidor de forma “mágica” usando o Ratchet, mas a realidade é que por debaixo dos panos ele precisa abstrair algumas importantes coisas como o handshake inicial, as trocas das mensagens (Data Frames) que são criptografadas etc. Toda a dinâmica do funcionamento de um servidor de Websocket pode ser lida nesse documento: Escrevendo um servidor WebSocket.

Agora vamos criar o nosso próprio wrapper, nossa própria implementação de um servidor. Primeiro, na raiz do projeto, crie uma pasta chamada src. Em seguida, altere o composer.json para:

{
    "require": {
        "cboden/ratchet": "^0.4.1"
    },
    "autoload": {
        "psr-4": {
            "Chat\\": "src"
        }
    }
}

Agora crie um arquivo ChatServer.php dentro da pasta src com a seguinte implementação:

<?php

namespace Chat;

use Exception;
use SplObjectStorage;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;

final class ChatServer implements MessageComponentInterface
{
    private $clients;

    public function __construct()
    {
        $this->clients = new SplObjectStorage();
    }

    public function onOpen(ConnectionInterface $conn): void
    {
        $this->clients->attach($conn);
    }

    public function onMessage(ConnectionInterface $from, $msg): void
    {
        foreach ($this->clients as $client) {
            $client->send($msg);
        }
    }

    public function onClose(ConnectionInterface $conn): void
    {
        $this->clients->detach($conn);
    }

    public function onError(ConnectionInterface $conn, Exception $exception): void
    {
        $conn->close();
    }
}

Estamos implementando a interface MessageComponentInterface. A diferença desse servidor pro EchoServer, é que neste estamos guardando as conexões que são estabelecidas e, quando uma mensagem é recebida, enviamos ela de volta para toda as conexões abertas (que é o que o fazemos no método onMessage).

Para testar a implementação, crie um arquivo chat.html no projeto:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket Simple Chat</title>
</head>
<body>
<p>
    <label for="nome">Seu nome: </label>
    <input id="nome" type="text" placeholder="Seu nome"/>
</p>
<p>
    <label for="input">Sua mensagem: </label>
    <input id="input" type="text" placeholder="Sua mensagem"/>
</p>
<hr>
<div id="chat"></div>
<script>
    let chat = document.getElementById('chat');
    let input = document.getElementById('input');
    const nome = document.getElementById('nome');
    const socket = new WebSocket('ws://localhost:9990/chat');

    // Ao receber mensagens do servidor
    socket.addEventListener('message', function (event) {
        // Deserializamos o objeto
        const data = JSON.parse(event.data);
        // Escrevemos no DOM
        chat.insertAdjacentHTML('beforeend', "<p><b>" + data.nome + " diz: </b>" + data.mensagem + "</p>");
    });

    // Ao enviar uma mensagem
    input.addEventListener('keyup', function (event) {
        if (event.keyCode === 13) {
            // Objeto com os dados que serão trafegados
            const data = {
                nome: nome.value,
                mensagem: this.value,
            };

            // Serializamos o objeto para json
            socket.send(JSON.stringify(data));

            this.value = '';
        }
    });
</script>
</body>
</html>

Antes de iniciar o servidor é importante executar o dump-autoload do Composer, para que a classe ChatServer seja reconhecida:

$ composer dump-autoload

Por fim, inicie o servidor:

$ php chat.php

Você pode testar abrindo duas instâncias do chat e usando nomes diferentes:

Os exemplos completos você pode ver nesse repositório: https://github.com/KennedyTedesco/websocket-demo

Só o navegador pode ser cliente do meu servidor?

Não, qualquer outra aplicação pode ser cliente de um servidor WebSocket. Você pode, por exemplo, criar uma aplicação cliente em PHP usando a library Pawl.

Palavras finais

O Ratchet simplifica e muito a criação de servidores de WebSocket em PHP, não obstante, ele também suporta sub-protocolos como o Wamp. Em um próximo artigo pretendo abordar um exemplo utilizando-o.

Até a próxima!

Corrotinas e código assíncrono em PHP usando Generators

Nos últimos artigos eu tenho escrito sobre programação assíncrona em PHP com ReactPHP e Swoole. O Swoole tem seu mecanismo interno próprio de corrotinas, mas o que muita gente não sabe é que podemos trabalhar com corrotinas usando PHP puro, através de generators.

Eu achei que fosse interessante escrever sobre isso até mesmo como uma forma de apresentar às pessoas o conceito de generators e também como uma forma de instigá-las a buscar mais sobre o tema.

A ideia desse artigo é construir um simples (muito simples) scheduler de corrotinas bem primitivo e didático. Em outros artigos eu escrevi de forma mais abrangente sobre generators, código síncrono, assíncrono, corrotinas etc, conceitos esses que serão importantes você ter para entender o funcionamento do nosso exemplo aqui. Por esse motivo, eu recomendo que você leia esses artigos:

Opcionalmente, se você tiver interesse, também recomendo a leitura deste artigo:

Esse artigo que você está lendo é, de certa forma, uma ramificação deste:

Portanto, o artigo Generators no PHP é realmente uma leitura necessária para entender os códigos que desenvolveremos aqui.

Desenvolvedor PHP Júnior
Formação: Desenvolvedor PHP Júnior
Nesta formação você aprenderá todos os fundamentos necessário para iniciar do modo correto com a linguagem PHP, uma das mais utilizadas no mercado. Além dos conceitos de base, você também conhecerá as características e a sintaxe da linguagem de forma prática.
CONHEÇA A FORMAÇÃO

Gênesis

Sabemos que generators são como se fossem funções que podem ser interrompidas e resumidas a qualquer momento. Também sabemos que assincronismo é sobre fluxo de execução. Captando essas duas ideias centrais, podemos chegar na seguinte conclusão lógica: se um generator pode ser interrompido para que outro seja executado, eu posso usar isso para manipular o fluxo de uma execução e então atingir uma modelagem de código assíncrono.

Primeiro vamos ver um exemplo de código síncrono:

<?php

declare(strict_types=1);

$booksTask = static function () {
    for ($i = 1; $i <= 4; ++$i) {
        echo "Book $i\n";
    }
};

$moviesTask = static function () {
    for ($i = 1; $i <= 8; ++$i) {
        echo "Movie $i\n";
    }
};

$booksTask();
$moviesTask();

O resultado dessa execução:

Book 1
Book 2
Book 3
Book 4
Movie 1
Movie 2
Movie 3
Movie 4
Movie 5
Movie 6
Movie 7
Movie 8

A execução é síncrona, linear e previsível. Temos duas tarefas, mas a segunda ($moviesTask) só terá oportunidade de desempenhar seu trabalho depois que a primeira ($booksTask) terminar tudo o que tem para ser feito.

Podemos transformar esse exemplo em um código de modelo assíncrono usando generators e trabalhando com a ideia de que cada task é uma corrotina. Para isso, dois princípios são importantíssimos:

  • Uma Task (tarefa) será apenas um decorator de um generator;
  • Um Scheduler cuidará da fila de tarefas e da execução delas;

O exemplo pode ser encontrado no GitHub: https://github.com/KennedyTedesco/coroutines-php

Uma tarefa será representada pela classe Task:

<?php

declare(strict_types=1);

namespace Coral;

use Generator;

final class Task
{
    private $coroutine;
    protected $firstYield = true;

    public function __construct(Generator $coroutine)
    {
        $this->coroutine = $coroutine;
    }

    public function run(): void
    {
        if ($this->firstYield) {
            $this->firstYield = false;
            $this->coroutine->current();
        } else {
            $this->coroutine->next();
        }
    }

    public function finished(): bool
    {
        return ! $this->coroutine->valid();
    }
}

A tarefa é apenas um decorator de um generator.

E o Scheduler será representado pela classe de seu próprio nome:

<?php

declare(strict_types=1);

namespace Coral;

use SplQueue;

final class Scheduler
{
    private $tasks;

    public function __construct()
    {
        $this->tasks = new SplQueue();
    }

    public function schedule(Task $task): void
    {
        $this->tasks->enqueue($task);
    }

    public function handle(): void
    {
        while (! $this->tasks->isEmpty()) {
            /** @var Task $task */
            $task = $this->tasks->dequeue();

            $task->run();
            if (! $task->finished()) {
                $this->schedule($task);
            }
        }
    }
}

O Scheduler mantém uma fila de tarefas a serem executadas e possui um método público schedule() para adicionar tarefas nessa fila. O método handle() itera nas tarefas executando-as. Antes de entrarmos em mais detalhes, ao executar o exemplo:

<?php

declare(strict_types=1);

require 'vendor/autoload.php';

use Coral\Task;
use Coral\Scheduler;

$scheduler = new Scheduler();

$booksTask = static function () {
    for ($i = 1; $i <= 4; ++$i) {
        echo "Book $i\n";

        yield;
    }
};

$moviesTask = static function () {
    for ($i = 1; $i <= 8; ++$i) {
        echo "Movie $i\n";

        yield;
    }
};

$scheduler->schedule(new Task($booksTask()));
$scheduler->schedule(new Task($moviesTask()));

$scheduler->handle();

Nota: As duas tarefas retornam um generator, que depois é passado para o construtor de Task.

Temos o seguinte resultado:

Book 1
Movie 1
Book 2
Movie 2
Book 3
Movie 3
Book 4
Movie 4
Movie 5
Movie 6
Movie 7
Movie 8

Diferentemente do exemplo síncrono mostrado anteriormente, neste temos a alternância da execução das tarefas, no sentido de que são colaborativas, uma abre espaço para que a outra também tenha oportunidade de ser executada. Isso acontece pois o Scheduler executa a tarefa, o valor corrente dela é impresso, nisso ela volta novamente para a fila do Scheduler para ser executada novamente em outro momento. As tarefas sempre voltam para a fila enquanto ainda tiverem valores a serem processados:

$task->run();
if (! $task->finished()) {
    $this->schedule($task);
}

Essa é uma estratégia para mantê-las em sua essência colaborativas, ou seja, a tarefa abre mão do seu tempo de execução para que outra tarefa também tenha oportunidade.

Considerações finais

Este foi um simples exemplo de como podemos ter uma operação assíncrona utilizando generators. E aqui nem estamos nos referindo a multiplexing de I/O, mas poderíamos usar esse mesmo conceito de generators e implementar I/O não bloqueante (assíncrono) usando algum padrão como o Reactor, usado pelo ReactPHP ou algo mais “simples” usando diretamente a função stream_select(). Inclusive, O Nikita Popov (desenvolvedor do core do PHP) escreveu exatamente sobre isso no artigo Cooperative multitasking using coroutines (in PHP!), que por sinal, é a principal referência desse artigo aqui. Recomendo essa leitura pois ele também fez com que o generator se comunicasse com outros generators e com o Scheduler (que é a realmente essência de uma corrotina), numa espécie de canal de comunicação, o que torna as coisas ainda mais poderosas. O framework assíncrono Amp faz um uso bem intensivo de generators, também vale a pena testá-lo.

Ah, não poderia deixar de pontuar novamente: se você tem interesse por programação assíncrona com PHP, vale a pena a leitura desses artigos:

Até a próxima!

Trabalhando com corrotinas, canais e explorando um pouco mais o scheduler de corrotinas do Swoole

Neste artigo veremos de forma prática os aspectos essenciais do modelo de programação concorrente CSP (communicating sequential processes) com Swoole, usando Coroutine (corrotina), Channel (canal) e Defer (execução tardia). Se você já programou em Go verá muitas similaridades.

Antes, entretanto, é fundamental que você leia o artigo Introdução ao Swoole, framework PHP assíncrono baseado em corrotinas, pois ele introduz toda a teoria fundamental para que possamos criar os nossos primeiros exemplos e adentrar um pouco mais nas possibilidades que o Swoole nos oferece.

Em uma execução sequencial e síncrona de duas funções, teríamos:

<?php

function a() {
    sleep(1);
    echo 'a';
}

function b() {
    sleep(2);
    echo 'b';
}

a();
b();

O resultado é bem previsível, aguarda um segundo, imprime a, aguarda dois segundos e imprime b.

Para que possamos executar uma tarefa dentro de uma corrotina, usamos a função go(). O exemplo acima poderia ser reescrito para:

<?php

go(static function () {
    sleep(1);
    echo 'a';
});

go(static function () {
    sleep(2);
    echo 'b';
});

O problema que temos agora é que a função sleep() do PHP é bloqueante, assim como são as funções de stream, por exemplo.

Recomendação de leitura: Streams no PHP

Esse exemplo terá o exato mesmo comportamento que o anterior. Ele demorará três segundos pra finalizar a sua execução. Podemos resolver isso de duas formas, sendo que a primeira é adicionando a instrução Swoole\Runtime::enableCoroutine(); no exemplo:

<?php

Swoole\Runtime::enableCoroutine();

go(static function () {
    sleep(1);
    echo 'a';
});

go(static function () {
    sleep(2);
    echo 'b';
});

Este é um hook “mágico” que fará com que o Swoole execute algumas funções que são nativamente síncronas mas de forma assíncrona (não bloqueante). E isso vale para a sleep(), como vale para as funções relacionadas a streams.

Agora sim, esse exemplo será executado em dois segundos. Ao invés da execução consumir a soma dos dois tempos das corrotinas, ela passa a consumir o tempo da maior.

Então, temos a seguinte relação:

  • No modelo síncrono gasta-se o tempo de: (a + b)
  • No modelo concorrente gasta-se: MAX(a, b)

A outra forma de resolver o problema anterior sem que precisemos aplicar o hook enableCoroutine(), é executando a função sleep() assíncrona da API do Swoole:

<?php

use Swoole\Coroutine\System;

go(static function () {
    System::sleep(1);
    echo 'a';
});

go(static function () {
    System::sleep(2);
    echo 'b';
});

Outras funções disponíveis na API de corrotinas:

System::sleep(100);
System::fread($fp);
System::gethostbyname('www.google.com');
// Entre outras

Você verá muitos System::sleep() até o final desse artigo, pois é uma forma de emular uma operação de I/O (que é sabido que é mais custosa que uma operação de CPU).

Executando os exemplos

Se você usa Linux ou macOS pode instalar o Swoole diretamente no seu ambiente:

https://www.swoole.co.uk/docs/get-started/installation

Ou você pode usar o Docker, que é a opção escolhida desse artigo. Algumas das imagens disponíveis para o Swoole:

Para esse artigo eu estou usando como base a imagem do swoole-by-examples. Todos os exemplos desse artigo estão disponíveis nesse repositório:

https://github.com/KennedyTedesco/swoole-coroutines

Basta que você clone-o em seu computador e então execute o comando abaixo para inicializar o container:

Docker - Fundamentos
Curso de Docker - Fundamentos
CONHEÇA O CURSO
$ docker-compose up -d

E para executar o exemplo anteriormente criado:

$ docker-compose exec client bash -c "time php ./co1.php"

O resultado no terminal será:

ab
real    0m2.031s
user    0m0.010s
sys 0m0.010s

Usamos time na execução para que possamos ter a informação do tempo gasto.

Uma coisa importante de se pontuar é que esse projeto tem como dependência no composer o swoole-ide-helper que ajuda a sua IDE ou editor de código reconhecer as assinaturas das classes e métodos do Swoole. Mas é bom sempre lembrar que a documentação é outro ótimo lugar para conhecer outros detalhes e características das APIs.

Voltando …

Um importante conceito de concorrência é que não é sobre execução ordenada, a ordem de execução das tarefas não é garantida, são vários os fatores que influenciam. Então, o observe o exemplo abaixo em que executamos 5000 corrotinas:

<?php

use Swoole\Coroutine\System;

for ($i = 0; $i < 5000; $i++) {
    go(static function () use ($i) {
        System::sleep(1);
        echo "$i\n";
    });
}

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co2.php"

Sempre que você executá-lo, terá um retorno diferente. Nesse exemplo criamos 5000 corrotinas que foram executadas em cerca de 1s. Não fossem executadas de forma concorrente gastaríamos 5000 segundos.

Outras formas de criar corrotinas

A função go() é muito conveniente para a criação de corrotinas, bastando que passemos para ela uma função anônima representando a tarefa. No entanto, existem outras formas de utilizá-la. O primeiro parâmetro dela espera por um callable:

/**
 * @param callable $func
 * @param ...$params
 * @return mixed
 */
function go(callable $func, ...$params){}

Portanto, poderíamos passar o nome de uma função:

<?php

use Swoole\Coroutine\System;

function someTask(int $i) : void {
    System::sleep(1);

    echo "$i\n";
}

for ($i = 0; $i < 1000; $i++) {
    go('someTask', $i);
}

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co3.php"

Como também poderíamos passar a instância de um objeto invocável:

<?php

use Swoole\Coroutine\System;

final class SomeTask
{
    public function __invoke(int $i): void
    {
        System::sleep(1);

        echo "$i\n";
    }
}

for ($i = 0; $i < 1000; $i++) {
    go(new SomeTask, $i);
}

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co4.php"

E as outras formas possíveis são:

<?php

use Swoole\Coroutine\System;

function someTask(int $i): void {
    System::sleep(1);

    echo "$i\n";
}

co::create('someTask', 1);

swoole_coroutine_create('someTask', 2);

Swoole\Coroutine::create('someTask', 3);

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co5.php"

E todas elas aceitam um valor callable, são formas alternativas a go().

Outro conceito importante sobre corrotinas no Swoole é que o scheduler delas não é multi-threaded como em Go. Apenas uma corrotina é executada por vez, não são executadas em paralelo. Por exemplo, se temos duas tarefas e a tarefa 1 é executada, se tem um sleep(1) nela, essa tarefa é pausada e então a tarefa 2 é executada, depois o scheduler volta para a tarefa 1. Eventos de I/O pausam/resumem a execução das corrotinas a todo instante.

Canais

Outro ponto fundamental do modelo CSP são os canais. As corrotinas representam as atividades do programa e os canais representam as conexões entre elas. Um canal é basicamente um sistema de comunicação que permite uma corrotina enviar valores para outra. Em Go um canal precisa ter um tipo especificado previamente, enquanto que no Swoole podemos armazenar qualquer tipo de dado.

Um exemplo:

<?php

use Swoole\Coroutine\System;
use Swoole\Coroutine\Channel;

$chan = new Channel();

go(static function () use ($chan) {
    // Cria 10.000 corrotinas
    for ($i = 0; $i < 10000; $i++) {
        go(static function () use ($i, $chan) {
            // Emula uma operação de I/O
            System::sleep(1);

            // Adiciona o valor processado no canal
            $chan->push([
                'index' => $i,
                'value' => random_int(1, 10000),
            ]);
        });
    }
});

go(static function () use ($chan) {
    while (true) {
        $data = $chan->pop();
        echo "{$data['index']} -> {$data['value']}\n";
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./chan1.php"

Usamos o método push() para adicionar um item no canal, que no caso foi um array, mas poderia ser um inteiro, uma string etc. E usamos pop() para extrair um valor do canal. O while(true) dentro dessa corrotina em especial não é um problema, nisso que estamos realizando uma operação no canal, o estado dessa corrotina é controlado, ela não toma pra ela todo o tempo da CPU. Mas veremos mais adiante que operações pesadas de CPU podem impedir que outras corrotinas tenham a chance de serem executadas, mas isso pode ser resolvido se ativarmos o scheduler preemptivo do Swoole.

Avaliando URLs de forma concorrente

Um dos bons exemplos para visualizarmos na prática concorrência é quando envolvemos operações de rede na jogada. O exemplo que veremos a seguir, apesar de não tão sofisticado, foi desenvolvido para que possamos fazer uso de corrotinas, canais e defer.

<?php

use Swoole\Coroutine\Channel;
use Swoole\Coroutine\System;
use Swoole\Coroutine\Http\Client;

function httpHead(string $url) {
    $client = new Client($url, 80);
    $client->get('/');

    return $client;
}

$chan = new Channel();

go(static function () use ($chan) {
    // Abre um ponteiro para o arquivo
    $fp = fopen('sites.txt', 'rb');

    // Atrasa o fechamento do ponteiro do arquivo para o final da corrotina
    defer(static function () use ($fp) {
        fclose($fp);
    });

    while (feof($fp) === false) {
        // Lê linha a linha do arquivo
        $url = trim(System::fgets($fp));

        if ($url !== '') {
            // Cria uma corrotina para requisitar a URL e trazer o status code dela
            go(static function () use ($url, $chan) {
                $response = httpHead($url);

                // Insere no canal a resposta
                $chan->push([
                    'url' => $url,
                    'statusCode' => $response->statusCode,
                ]);
            });
        }
    }
});

// Corrotina que lê os valores do canal e imprime no output
go(static function () use ($chan) {
    while (true) {
        $data = $chan->pop();
        echo "{$data['url']} -> {$data['statusCode']}\n";
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./http1.php"

Na função httpHead() estamos usando o cliente HTTP de corrotina do Swoole, a documentação dele pode ser consultada aqui.

Na primeira corrotina abrimos um ponteiro para o arquivo onde as URLs estão localizadas. A função defer() define uma tarefa para ser executada ao final da corrotina, então a estamos utilizamos para fechar o ponteiro de arquivo aberto anteriormente.

Iteramos sobre cada linha do arquivo usando a função assíncrona co::fgets() da própria API de corrotina e então, pra cada URL, criamos uma nova corrotina para fazer uma requisição HEAD e obter o código http da resposta. Essa corrotina envia para um canal o resultado, canal este que é utilizado pela segunda corrotina, que imprime todos os valores contidos nele.

O cliente HTTP padrão do Swoole não possui uma API muito rica e não é tão intuitivo de se usar, para isso existe a biblioteca saber que encapsula toda a parte complicada, oferecendo uma API bem intuitiva e de alto nível para se trabalhar com requisições http concorrentes. Se você tiver interesse em praticar, recomendo alterar o exemplo anterior para usar a saber.

E como ficam as tarefas que fazem um uso intensivo de CPU?

Corrotinas são conhecidas por operarem por cooperação (a tarefa é dona do seu ciclo de vida, tendo o poder de se liberar do scheduler no fim de sua operação) em detrimento à preempção.

Esse diagrama ilustra melhor esse cenário:

Enquanto nossas tarefas fazem mais uso de I/O que de CPU, tá tudo bem, pois deixamos os reactors fazerem a mágica. Agora, e se tivermos tarefas de uso pesado de CPU? O modo padrão do scheduler funcionar pode não ser o mais “justo” dependendo do caso, por exemplo:

<?php

use Swoole\Coroutine\System;

// Tarefa 1
go(static function() {
    System::sleep(1);
    for ($i = 0; $i <= 10; $i++) {
        echo "N{$i}";
    }
});

// Tarefa 2
go(static function() {
    $i = 0;
    while (true) {
        $i++;
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./scheduler1.php"

Ao executar esse exemplo, você notará que a primeira tarefa não terá a oportunidade de executar a sua lógica de imprimir N1, N2 etc, pois quando ela é despachada pelo scheduler para um worker, a primeira linha dela é System::sleep(1); que simula uma operação de I/O, isso faz com que ela seja pausada para que outra tarefa da fila seja executada. O problema é que a tarefa 2 não é muito espirituosa, ela fica num loop infinito incrementando uma variável, com isso, ela não deixa nenhuma oportunidade para que a outra tarefa irmã seja executada, ou seja, ela não é tão colaborativa assim.

Já sabemos que uma tarefa é pausada quando ela está aguardando por alguma operação de I/O para dar oportunidade a outra tarefa desempenhar o seu trabalho. Podemos emular isso na prática usando como base o exemplo anterior:

<?php

use Swoole\Coroutine\System;

// Tarefa 1
go(static function() {
    System::sleep(1);
    for ($i = 0; $i <= 10; $i++) {
        echo "N{$i}";
    }
});

// Tarefa 2
go(static function() {
    $i = 0;
    while (true) {
        $i++;

        // Quando estiver no centésimo loop, emula uma operação de I/O
        if ($i === 100) {
            echo "{$i} -> ";

            System::sleep(1);
        }
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./scheduler2.php"

O resultado da execução desse exemplo é:

100 -> N0N1N2N3N4N5N6N7N8N9N10

No centésimo loop emulamos uma operação de I/O de 1 segundo, que fez com que a tarefa fosse pausada dando oportunidade para a tarefa 1 voltar a ser executada.

Como as corrotinas possuem controle do seu ciclo de vida, é possível que uma corrotina deliberadamente peça a suspensão do seu direito de execução para dar espaço a outra corrotina. É o que vemos nesse exemplo:

<?php

// Tarefa 1
$firstTaskId = go(static function() {
    echo 'a';
    co::yield();
    echo 'b';
    co::yield();
    echo 'c';
});

// Tarefa 2
go(static function() use($firstTaskId) {
    $i = 0;
    while (true) {
        $i++;

        if ($i === 1000 || $i === 2000) {
            echo " {$i} ";

            co::resume($firstTaskId);
        }
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./scheduler3.php"

O resultado:

a 1000 b 2000 c

Quando criamos uma corrotina imediatamente recebemos o id dela, por isso definimos a variável $firstTaskId. A primeira tarefa imprime a e então abre mão do seu direito de execução, o que faz com que a segunda tarefa seja executada. Quando o contador chega em 1000, a segunda tarefa abre mão do seu direito de execução para que especificamente a primeira tarefa volte a ser executada e então b é impresso. Mas depois de imprimir b, a primeira tarefa novamente abre mão do seu direito de execução e então chegamos no contador 2000 da segunda tarefa que a resume novamente imprimindo, por fim, c.

Ok, mas e se existisse uma forma do scheduler cuidar dessas questões e não deixar que uma tarefa “sacana” tome todo o tempo da CPU dedicado ao processo? Existe, é possível ativarmos o modo preemptivo. Quando ativamos o modo preemptivo no scheduler, ele passa a funcionar de forma parecida com o scheduler do sistema operacional, dando um tempo justo pra cada linha de execução, sem deixar que uma tarefa impeça as outras de serem executadas. Esse modo preemptivo foi adicionado recentemente e ele parece ter um impacto positivo em aplicações de alto porte que envolvem uma mistura considerável de tarefas CPU bound e I/O bound. Talvez pra sua aplicação não mude muita coisa, ou talvez mude, você teria que testar essa carga nos dois modos (cooperativo e preemptivo) e então ver qual faz mais sentido pro seu caso de uso.

De qualquer forma, voltando no nosso caso hipotético do while(true), usando o modo preemptivo, temos:

<?php

ini_set('swoole.enable_preemptive_scheduler', 1);

go(static function() {
    $i = 0;
    while (true) {
        $i++;
    }
});

go(static function() {
    for ($i = 0; $i <= 10; $i++) {
        echo "N{$i}";
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./scheduler4.php"

O resultado:

N0N1N2N3N4N5N6N7N8N9N10

Veja que a primeira tarefa é um while (true) , mas como o modo preemptivo foi ativado, ela terá um tempo de CPU em milissegundos (no máximo 10ms) e então terá que abrir espaço para que outra tarefa seja executada, depois o tempo da CPU volta pra ela novamente, algo controlado automaticamente pelo scheduler.

Aninhamento de corrotinas

Como já vimos anteriormente, é possível aninharmos corrotinas, criando novas sub-corrotinas. Um bom exemplo para entender a ordem de execução de corrotinas aninhadas:

<?php

go(static function () { //T1
    echo "[init]\n";
    go(static function () { //T2
        go(static function () { //T3
            echo "co3\n";
        });

        echo "co2\n";
    });

    echo "co1\n";
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co6.php"

O resultado:

[init]
co3
co2
co1

Agora, a história muda quando as corrotinas realizam ou emulam alguma operação de I/O:

<?php

use Swoole\Coroutine\System;

go(static function () { //T1
    echo "[init]\n";
    go(static function () { //T2
        System::sleep(3);
        go(static function () { //T3
            System::sleep(2);
            echo "co3\n";
        });

        echo "co2\n";
    });

    System::sleep(1);
    echo "co1\n";
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co7.php"

O resultado será:

[init]
co1
co2
co3

WaitGroup

Com um “grupo de espera” podemos aguardar a finalização de algumas corrotinas antes que executemos alguma outra instrução:

<?php

use Swoole\Coroutine\System;
use Swoole\Coroutine\WaitGroup;

$wg = new WaitGroup();

go(static function () use ($wg) {
    $wg->add(3);

    go(static function () use ($wg) {
        System::sleep(3);
        echo "T1\n";
        $wg->done();
    });

    go(static function () use ($wg) {
        System::sleep(2);
        echo "T2\n";
        $wg->done();
    });

    go(static function () use ($wg) {
        System::sleep(1);
        echo "T3\n";
        $wg->done();
    });

    // Aguarda a execução das corrotinas do grupo antes de executar as instruções abaixo
    $wg->wait();

    echo "\n---- \ ----\n";
    go(static function () {
        echo "\n[FIM]\n";
    });
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co8.php"

O resultado será:

T3
T2
T1

---- \ ----

[FIM]

O método add() é para incrementar o contador de quantas corrotinas estão no grupo de espera, ele pode ser usado quantas vezes forem necessárias.

Devo me preocupar com race conditions?

Em implementações em que o scheduler usa o modelo multi-thread, como em Go, o desenvolvedor precisa se preocupar com o acesso aos recursos globais compartilhados, para garantir que duas ou mais corrotinas não os acessem ao mesmo tempo, o que invariavelmente causaria race conditions (condições de corrida). Mas esse não é o caso quando usamos Swoole, pois o scheduler dele é single-thread, portanto, não há necessidade de lockings.

Esse exemplo em Go que cria 5k gorrotinas incrementando uma variável global:

package main

import (
    "fmt"
    "sync"
    "time"
)

var count int
var waitGroup sync.WaitGroup

func main() {
    for i := 0; i < 5000; i++ {
        waitGroup.Add(1)
        go increment()
    }

    waitGroup.Wait()
    fmt.Println(count)
}

func increment() {
    time.Sleep(1 * time.Second)

    count++
    waitGroup.Done()
}

Para executá-lo:

$ time go run go1.go

Você pode testá-lo inúmeras vezes e verá que sempre terá um resultado diferente de 5.000, exatamente por causa das race conditions que acontecem, uma gorrotina atropelando a outra na hora de acessar a variável global.

Go implementa uma ferramenta para identificar race conditions, bastando adicionar o parâmetro -race na execução:

$ time go run -race go1.go

Ele indicará que o programa é uma “fábrica” de race conditions:

==================
WARNING: DATA RACE
Read at 0x000001229360 by goroutine 8:
  main.increment()
...
Found 4 data race(s)
exit status 66

Para que as evitemos, podemos usar mutexes ou operações atômicas. Vamos com a primeira opção que é bem simples de assimilar:

package main

import (
    "fmt"
    "sync"
    "time"
)

var count int
var mu sync.Mutex
var waitGroup sync.WaitGroup

func main() {
    for i := 0; i < 5000; i++ {
        waitGroup.Add(1)
        go increment()
    }

    waitGroup.Wait()
    fmt.Println(count)
}

func increment() {
    time.Sleep(1 * time.Second)

    mu.Lock()
    count++
    mu.Unlock()

    waitGroup.Done()
}

Observe que envolvemos a operação de incremento com mu.Lock() e mu.Unlock() para garantir um único acesso por vez à variável global.

Ao executar novamente o exemplo:

$ time go run -race go1.go

O resultado:

5000

E não teremos nenhum erro da ferramenta de verificação de race conditions.

Podemos ter o mesmo exemplo escrito no Swoole sem que precisemos fazer nada de especial (em relação a locks etc):

<?php

use Swoole\Coroutine\System;
use Swoole\Coroutine\WaitGroup;

$count = 0;

go(static function() use(&$count) {
    $wg = new WaitGroup();

    for ($i = 0; $i < 5000; $i++) {
        $wg->add(1);

        go(static function () use($wg, &$count) {
            System::sleep(1);
            $count++;

            $wg->done();
        });
    }

    $wg->wait();

    echo $count;
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co9.php"

Usamos WaitGroup para fazer paralelo com a implementação em Go, mas nesse exemplo em especial, poderíamos ter cortado essa etapa e escrito assim:

<?php

use Swoole\Coroutine\System;

$count = 0;

Co\run(static function() use(&$count) {
    for ($i = 0; $i < 5000; $i++) {
        go(static function () use(&$count) {
            System::sleep(1);
            $count++;
        });
    }
});

echo $count;

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co10.php"

Esse exemplo produz o mesmo resultado que o anterior. Co\run aguarda as corrotinas serem finalizadas antes de seguir o fluxo da execução.

O que mais posso fazer com corrotinas?

Muito mais. Os clientes de corrotinas atualmente implementados/suportados pelo Swoole:

E, claro, lembre-se sempre de acompanhar a documentação.

O que mais posso fazer com o Swoole?

Recomendo acompanhar a lista awesome-swoole. Muita coisa boa, frameworks, libraries etc.

Considerações finais

Vimos a teoria essencial de corrotinas que são atualmente o principal mecanismo interno do Swoole e cada vez mais ganharão importância em seu core.

Nos próximos artigos exploraremos outras APIs do Swoole. Até breve!

Trabalhando com Sockets no ReactPHP

No primeiro artigo da série fizemos uma Introdução à programação assíncrona em PHP usando o ReactPHP, depois vimos sobre Promises no ReactPHP. Hoje falaremos sobre Sockets, um assunto muito relevante para a criação de aplicações cliente-servidor.

PHP Básico
Curso de PHP Básico
CONHEÇA O CURSO

Antes, entretanto, tem dois artigos que muito recomendo você ler antes de continuar neste, que são estes:

Criando o primeiro servidor

No seu projeto, instale o componente:

$ composer require react/socket:^1.3

Não vamos instalar manualmente o componente Event Loop pois ele já é uma dependência de react/socket.

Já estamos prontos para criar o nosso primeiro servidor, baseado no do artigo sobre Programação de Sockets em PHP:

<?php

require './vendor/autoload.php';

use React\Socket\Server;
use React\EventLoop\Factory;
use React\Socket\ConnectionInterface;
use React\Stream\WritableResourceStream;

$loop = Factory::create();
$socket = new Server('127.0.0.1:7181', $loop);
$stdout = new WritableResourceStream(\STDOUT, $loop);

$socket->on('connection', static function (ConnectionInterface $connection) {
    $connection->write("Client [{$connection->getRemoteAddress()}] connected \n");
});

$stdout->write("Listening on: {$socket->getAddress()}\n");

$loop->run();

No construtor de Server informamos o IP (local) e a porta que o nosso servidor rodará, não informando o protocolo antes do IP, ele considera por padrão como sendo tcp. Depois, passamos a ouvir o evento connection e informamos um handler que é executado sempre que uma nova conexão é estabelecida com o nosso servidor. Esse handler recebe como parâmetro um objeto do tipo ConnectionInterface que nos permite trabalhar com a conexão que foi realizada. No caso, sempre que um cliente se conectar ao nosso servidor, vamos imprimir no lado dele a mensagem que definimos no método write().

Para testar esse exemplo, basta que você inicie o servidor:

$ php index.php

E para se conectar ao servidor, se seu sistema operacional é baseado em Unix, em outra janela do terminal:

telnet 127.0.0.1 7181

Ao se conectar, no lado do servidor teremos:

$ ~/D/w/reactphp&gt; php index.php
Listening on: tcp://127.0.0.1:7181

No lado do cliente:

$ telnet 127.0.0.1 7181
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Client [tcp://127.0.0.1:50353] connected

O objeto $connection também implementa a interface EventEmitterInterface que nos permite ouvir alguns eventos relacionados à conexão, por exemplo, através do evento data conseguimos receber os dados enviados pelo cliente.

Para testarmos de forma efetiva o evento data, desenvolveremos o exemplo “Echo Server” do artigo Programação de Sockets em PHP:

<?php

require './vendor/autoload.php';

use React\Socket\Server;
use React\EventLoop\Factory;
use React\Socket\ConnectionInterface;
use React\Stream\WritableResourceStream;

$loop = Factory::create();
$socket = new Server('127.0.0.1:7181', $loop);
$stdout = new WritableResourceStream(\STDOUT, $loop);

$socket->on('connection', static function (ConnectionInterface $connection) {
    $connection->write("Client [{$connection->getRemoteAddress()}] connected \n");

    $connection->on('data', static function ($data) use ($connection) {
        $connection->write("Server says: {$data}");
    });
});

$stdout->write("Listening on: {$socket->getAddress()}\n");

$loop->run();

Se você comparar essa implementação com a que implementamos usando apenas o “PHP puro” (no artigo Programação de Sockets em PHP), verá que essa é bem mais simples. O RectPHP abstrai toda a parte complicada de lidar com streams/buffers, oferecendo uma API de bem alto nível.

No terminal do cliente ao interagir com o servidor (submeter algumas entradas):

$ telnet 127.0.0.1 7181
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Client [tcp://127.0.0.1:50397] connected
Hello
Server says: Hello
World
Server says: World

Diferentemente da nossa implementação pura, os servidores criados usando o ReactPHP aceitam múltiplos clientes, sem que façamos nada de especial a respeito (a não ser que tenhamos algum caso de uso mais específico, que é o caso do nosso próximo exemplo).

PHP Intermediário
Curso de PHP Intermediário
CONHEÇA O CURSO

Desenvolvendo um chat

Avançando um pouquinho mais, vamos criar um simples chat que funcionará pelo terminal. Crie um novo projeto, no meu caso, vou chamá-lo de “reactchat”. Nele, instale a dependência do reactphp/socket:

$ composer require react/socket:^1.3

A diferença desse exemplo para o que criamos no artigo Programação de Sockets em PHP será:

  • Usará o componente de socket do ReactPHP;
  • Um membro poderá citar outro (usando @) para enviar uma mensagem privada;
  • O ReactPHP abstrai em muitos aspectos a programação de sockets, portanto, bem mais simples de desenvolver e manter do que usando as funções nativas da extensão Streams do PHP.

Usaremos como referência o exemplo desenvolvido pelo Sergey Zhuk no artigo: Build A Simple Chat With ReactPHP Socket: Server.

Publiquei o exemplo no Github, você pode obtê-lo de lá: KennedyTedesco/reactchat

O index.php inicia o servidor:

<?php

// https://github.com/KennedyTedesco/reactchat

require './vendor/autoload.php';

use ReactChat\Chat;
use ReactChat\Member;
use React\Socket\Server;
use React\EventLoop\Factory;
use React\Socket\ConnectionInterface;
use React\Stream\WritableResourceStream;

$loop = Factory::create();
$socket = new Server('127.0.0.1:7181', $loop);
$stdout = new WritableResourceStream(\STDOUT, $loop);

$chat = new Chat();

$socket->on('connection', static function (ConnectionInterface $connection) use ($chat) {
    $member = new Member($connection);
    $member->write('Informe o seu nome: ');

    $connection->on('data', static function ($data) use ($member, $chat) {
        if ($data !== '' && $member->getName() === null) {
            // Define o nome do membro
            $member->setName(\str_replace(["\r", "\n"], '', $data));
            // Adiciona o membro ao chat
            $chat->addMember($member);
        }
    });
});

$stdout->write("Listening on: {$socket->getAddress()}\n");

$loop->run();

Não obstante, quando uma conexão é estabelecida, a primeira coisa que ele faz é pedir o nome, considerando que é alguém que está querendo entrar no chat. Essa pessoa só é adicionada ao chat depois que definir o nome.

A classe Member encapsula uma conexão e oferece alguns métodos úteis para se trabalhar com essa conexão:

<?php

// https://github.com/KennedyTedesco/reactchat

declare(strict_types=1);

namespace ReactChat;

use Closure;
use React\Socket\ConnectionInterface;

final class Member
{
    private $name;
    private $connection;

    public function __construct(ConnectionInterface $connection)
    {
        $this->connection = $connection;
    }

    public function getName() : ?string
    {
        return $this->name;
    }

    public function setName(string $name) : void
    {
        $this->name = $name;
    }

    public function write(string $data) : void
    {
        $data = \str_replace(["\r", "\n"], '', $data);

        $this->connection->write($data . \PHP_EOL);
    }

    public function onData(Closure $handler) : void
    {
        $this->connection->on('data', $handler);
    }

    public function onClose(Closure $handler) : void
    {
        $this->connection->on('close', $handler);
    }
}

E é no Chat que adicionamos novos membros e definimos os handlers para os eventos de data e close das conexões dos membros.

<?php

// https://github.com/KennedyTedesco/reactchat

declare(strict_types=1);

namespace ReactChat;

use SplObjectStorage;

final class Chat
{
    private $members;

    public function __construct()
    {
        $this->members = new SplObjectStorage();
    }

    public function addMember(Member $member) : void
    {
        $this->members->attach($member);

        $this->newMessageTo("Bem-vindo(a), {$member->getName()}", $member);
        $this->newMessageToAll("{$member->getName()} entrou na sala;", $member);

        $member->onData(function ($data) use ($member) {
            $this->newMessage("{$member->getName()} diz: {$data}", $member);
        });

        $member->onClose(function () use ($member) {
            $this->members->detach($member);
            $this->newMessage("{$member->getName()} saiu da sala.", $member);
        });
    }

    private function newMessage(string $message, Member $exceptMember) : void
    {
        // Se um membro foi citado usando @, envia a mensagem apenas para ele (mensagem privada).
        // Caso contrário, envia a mensagem para todos os membros
        $mentionedMember = $this->getMentionedMember($message);

        if ($mentionedMember instanceof Member) {
            $this->newMessageTo($message, $mentionedMember);
        } else {
            $this->newMessageToAll($message, $exceptMember);
        }
    }

    private function newMessageTo(string $message, Member $to) : void
    {
        // Envia para um membro específico
        foreach ($this->members as $member) {
            if ($member === $to) {
                $member->write($message);

                break;
            }
        }
    }

    private function newMessageToAll(string $message, Member $exceptMember) : void
    {
        // Envia para todos, exceto para o $exceptMember (quem está enviando a mensagem)
        foreach ($this->members as $member) {
            if ($member !== $exceptMember) {
                $member->write($message);
            }
        }
    }

    private function getMentionedMember(string $message) : ?Member
    {
        \preg_match('/\B@(\w+)/i', $message, $matches);

        $nameMentioned = $matches[1] ?? null;

        if ($nameMentioned !== null) {
            /** @var Member $member */
            foreach ($this->members as $member) {
                if ($member->getName() === $nameMentioned) {
                    return $member;
                }
            }
        }

        return null;
    }
}

Se você acompanhou os outros artigos da série, não terá dificuldade para entender o que está acontecendo nesse exemplo. O ReactPHP torna as coisas bem simples do nosso lado.

O método addMember() é o mais importante da classe, pois ele adiciona um membro na pool de conexões, de tal forma que ele passará a receber as mensagens do chat e também poderá interagir com os outros membros.

O método getMentionedMember() avalia por uma regex se algum pattern @nome foi destacado na mensagem que será enviada, se sim, ele procura e retorna o membro que possui esse nome. Ele é o núcleo do funcionamento do envio das mensagens privadas lá no método newMessage().

Para testar, tudo o que você precisa é iniciar o servidor:

$ php index.php

Depois, abra umas três abas no seu terminal para inicializar alguns clientes do chat (dando nomes diferentes para eles):

telnet 127.0.0.1 7181

Você pode, inclusive, testar o envio de mensagens privadas usando @nomeDoMembro.

PHP Avançado
Curso de PHP Avançado
CONHEÇA O CURSO

Concluindo

Ao invés de utilizarmos o terminal para a conexão dos membros do chat, poderíamos usar uma interface web e, para se conectar ao servidor, utilizaríamos WebSockets que já é uma realidade estável em todos os principais navegadores. Inclusive, esse deverá ser o assunto do próximo artigo da série. =D

Até a próxima!

Promises no ReactPHP

No último artigo – Introdução à programação assíncrona em PHP usando o ReactPHP – tivemos uma introdução à programação assíncrona com PHP.

No artigo de hoje vamos comentar sobre Promises, que no ReactPHP trata-se de um componente. Promises no ReactPHP são uma implementação da API CommonJS Promises/A. Se você trabalha com JavaScript, vai notar muita similaridade com o que veremos a seguir.

PHP Básico
Curso de PHP Básico
CONHEÇA O CURSO

Afinal, o que é uma Promise?

Uma Promise é uma abstração que encapsula um resultado de uma execução assíncrona para ser utilizado quando ele estiver disponível. Promises desempenham a mesma tarefa que os Callbacks dos quais já estamos acostumados, só que de maneira bem mais elegante (principalmente quando lidamos com múltiplos callbacks de forma encadeada).

Dois conceitos importantes que temos que conhecer: Promise e Deferred. Já vimos que uma Promise representa o resultado de um código assíncrono, já Deferred representa a computação que vai gerar esse resultado em algum momento futuro. Um objeto Deferred sempre vai ter uma Promise associada que vai representar (encapsular) o resultado. Usar um objeto Deferred é uma forma de separar a Promise de quem vai resolvê-la ou rejeitá-la (em algum momento futuro).

Uma Promise possui três estados possíveis:

  • unfulfilled: É o estado inicial, pois o valor retornado de Deferred ainda é desconhecido.
  • fulfilled: Esse estado indica que a Promise está devidamente “alimentada” do resultado da computação de Deferred.
  • failed: Houve uma exceção durante a execução do Deferred (o executor pode ter chamado reject()).

Já um objeto Deferred possui dois métodos para mudar o estado da sua Promise:

  • resolve(string $result): Quando o código é executado com sucesso, esse método altera o estado da Promise para fulfilled (enviando para ela o resultado que ela encapsulára);
  • reject(string $reason): A execução do código falhou, altera o estado da Promise para failed.

Uma Promise também possui métodos para que definamos handlers (callbacks a serem executados) para as mudanças de estados dela, são esses: then(), done(), otherwise() e always().

Antes de iniciarmos, certifique-se de instalar o componente React Promise no seu projeto:

$ composer require react/promise

Vamos então ao primeiro exemplo do uso de um objeto Deferred e de sua Promise:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();
$promise = $deferred->promise();

$promise->done(function ($data) {
    echo 'Resultado: ' . $data;
});

$deferred->resolve('Olá mundo!'); // Resultado: Olá mundo!

Criamos um objeto Deferred e definimos um handler para o método done() da Promise associada a ele. Por fim, resolvemos a operação usando o método resolve(), método este que “altera” o estado da Promise (para fulfilled) e fez o nosso handler (a função anônima que definimos no primeiro argumento do método done()) ser executado.

No caso de o objeto Deferred explicitamente rejeitar uma operação alterando o estado da Promise para failed, podemos usar o segundo argumento do método done() para definir um handler a ser executado em caso de rejeição:

O método done() aceita como argumento handlers tanto para o estado fulfilled quanto para o failed:

public function done(callable $onFulfilled = null, callable $onRejected = null)

No primeiro argumento informamos o handler que será executado quando o estado da Promise mudar para fulfilled, no segundo argumento, um handler para ser executado quando o estado da Promise for alterado para failed.

Por exemplo, vamos informar um handler pro segundo argumento da Promise e então vamos executar o método reject() do Deferred:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();
$promise = $deferred->promise();

$promise->done(function ($data) {
    echo 'Resultado: ' . $data;
}, function($reason) {
    echo 'Motivo da falha: ' . $reason;
});

$deferred->reject('Erro interno'); // Motivo da falha: Erro interno

Portanto, temos a seguinte relação:

Promises podem ser encadeadas, ou seja, o valor de uma promise resolvida pode ser encaminhado para a próxima da sequência. Isso pode ser atingido usando os seguintes métodos: then() e otherwise(). O método then() é parecido com o done(), com a diferença que ele retorna uma nova promise, enquanto o done() sempre retorna null. E o método otherwise() é uma forma de definir um handler para quando o estado da Promise for failed (e ele retorna uma nova Promise).

Poderíamos, então, trocar o done() por then() no exemplo anterior:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();
$promise = $deferred->promise();

$promise->then(function ($data) {
    echo 'Resultado: ' . $data;
}, function($reason) {
    echo 'Motivo da falha: ' . $reason;
});

$deferred->reject('Erro interno'); // Motivo da falha: Erro interno

Ou, uma vez que then() retorna uma nova Promise, poderíamos usar o otherwise() de forma encadeada:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();
$promise = $deferred->promise();

$promise
    ->then(function ($data) {
        echo 'Resultado: ' . $data;
    })
    ->otherwise(function($reason) {
        echo 'Motivo da falha: ' . $reason;
    });

$deferred->reject('Erro interno'); // Motivo da falha: Erro interno

Outro exemplo com o encadeamento de Promises:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();

$deferred->promise()
    ->then(function ($data) {
        return "Olá, {$data}";
    })->then(function($data) {
        return "{$data}Web";
    })->then(function($data) {
        return strtoupper($data);
    })->then(function($data) {
        echo "{$data}!";
    });

$deferred->resolve('Treina'); // OLÁ, TREINAWEB!

Na programação assíncrona faz muito sentido uma Promise retornar outra Promise, pois não temos o resultado da operação de imediato (quando envolve I/O), o que temos é uma “promessa” de que em algum momento ele poderá vir a estar disponível. Nesse sentido, then() é um mecanismo para aplicar uma transformação em uma Promise e gerar uma nova Promise a partir dessa transformação.

PHP Intermediário
Curso de PHP Intermediário
CONHEÇA O CURSO

Outro exemplo de encadeamento de then() com otherwise():

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();

$deferred->promise()
    ->then(function ($data) {
        return "Olá, {$data}";
    })->then(function($data) {
        throw new InvalidArgumentException("{$data}Web");
    })->otherwise(function(InvalidArgumentException $exception) {
        return strtoupper($exception->getMessage());
    })->done(function($data) {
        echo "{$data}!";
    });

$deferred->resolve('Treina'); // OLÁ, TREINAWEB!

Nesse exemplo o nosso handler em otherwise() só vai ser invocado se ele receber uma exceção do tipo InvalidArgumentException (e fizemos ele receber).

Outro exemplo:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();

$deferred->promise()
    ->then(function ($data) {
        return "Olá, {$data}";
    })->then(function($data) {
        throw new RuntimeException("{$data}Web");
    })->otherwise(function(InvalidArgumentException $exception) {
        return strtoupper($exception->getMessage());
    })->otherwise(function(Exception $exception) {
        return strtolower($exception->getMessage());
    })->done(function($data) {
        echo "{$data}!";
    });

$deferred->resolve('Treina'); // olá, treinaweb!

Nesse exemplo, como a promise lança uma exceção do tipo RuntimeException, apenas o handler que está esperando por uma exceção genérica (do tipo Exception) será invocado:

...
})->otherwise(function(Exception $exception) {
  return strtolower($exception->getMessage());
})
...

E, claro, se nenhuma promise passar pelo estado failed os nossos handlers que lidam com as falhas não serão invocados:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();

$deferred->promise()
    ->then(function ($data) {
        return "Olá, {$data}";
    })->then(function($data) {
        // throw new RuntimeException("{$data}Web");
        return "{$data}Web";
    })->otherwise(function(InvalidArgumentException $exception) {
        return strtoupper($exception->getMessage());
    })->otherwise(function(Exception $exception) {
        return strtolower($exception->getMessage());
    })->done(function($data) {
        echo "{$data}!";
    });

$deferred->resolve('Treina'); // Olá, TreinaWeb!

Agora que já entendemos o funcionamento do objeto Deferred e como as Promises funcionam, podemos verificar como seria resolver (ou rejeitar) uma Promise sem um objeto Deferred:

<?php

require './vendor/autoload.php';

use React\Promise\Promise;

$promise = new Promise(function(Closure $resolve, Closure $reject) {
    if (\random_int(1, 1000000) % 2 === 0) {
        $resolve('Gerou um número par.');
    } else {
        $reject('Gerou um número ímpar.');
    }
});

$promise->then(function($data) {
    echo 'Sucesso: ' . $data;
})->otherwise(function($reason) {
    echo 'Falha: ' . $reason;
});

Nesse exemplo instanciamos um objeto Promise passando para ele uma função anônima que vai cuidar da computação que precisamos realizar. Apenas para fins didáticos, estamos gerando um número inteiro aleatório e então verificamos se ele é par, se verdadeiro, resolvemos a Promise, caso contrário, a rejeitamos. Nisso que passamos a função anônima no construtor da classe Promise, ela é invocada e recebe duas novas funções anônimas como argumento: $resolve e $reject. São essas funções que invocamos no término da nossa operação para decidir o estado da Promise, se vai ser fulfilled ou failed. Por fim, apenas definimos handlers que serão executados em caso de sucesso ou falha e, para isso, usamos os métodos then() e otherwise().

Observe que a diferença dessa abordagem (de instanciar um objeto Promise diretamente) para a que usa um objeto Deferred, é que nessa última invertemos o controle de quem resolve a Promise, ou seja, essa responsabilidade fica com o objeto Deferred.

Agora que já vimos o essencial do funcionamento das promises, podemos fazer um paralelo de como seria um código que usa callbacks versus um que usa promises (usando JavaScript como linguagem de referência):

request('http://www.treinaweb.com.br', function (error, response) {
    if (error) {
        // Aqui a gente lida com o erro.
    } else {
        request('http://www.treinaweb.com.br/' + response.path, function (error, response) {
            if (serror) {
                // Aqui a gente lida com o erro.
            } else {
                // Aqui lidaríamos com o sucesso da requisição.
            }
        });
    }
});

Nesse modelo estamos sempre aninhando callbacks uns dentro dos outros, o que nos torna suscetíveis ao mal do callback hell. O mesmo exemplo acima usando a biblioteca axios que trabalha com Promises ficaria assim:

axios.get('http://www.treinaweb.com.br')
    .then(function (response) {
        return axios.get('http://www.treinaweb.com.br/' + response.path);
    })
    .then(function (response) {
        // Lida com a resposta da requisição anterior
    })
    .catch(function (error) {
        // Lida com alguma exceção, se existir.
    });

No artigo de introdução ao ReactPHP mostramos um exemplo que usava a library reactphp-buzz. Veremos outro exemplo com ela, pois ela faz uso intensivo de promises. Primeiro instale-a como dependência do projeto:

$ composer require clue/buzz-react:^2.6

O nosso exemplo vai de forma assíncrona imprimir o bairro de alguns CEPs usando a API pública do Postmon:

<?php

require './vendor/autoload.php';

use Clue\React\Buzz\Browser;
use React\EventLoop\Factory;
use Psr\Http\Message\ResponseInterface;

$browser = new Browser(
    $loop = Factory::create()
);

$ceps = [
    '01311200', // Bela Vista
    '70630904', // Setor Militar Urbano
    '70165900', // Zona Cívico-Administrativa
    '32685888', // Erro, cep não existe.
];

foreach ($ceps as $cep) {
    $browser->get("https://api.postmon.com.br/v1/cep/{$cep}")
        ->then(function (ResponseInterface $response) {
            $endereco = \json_decode($response->getBody());

            echo $endereco->bairro . PHP_EOL;
        })
        ->otherwise(function (\Exception $exception) use ($cep) {
            echo 'Erro no CEP: ' . $cep . PHP_EOL;
        });
}

$loop->run();

Esse exemplo demonstra que o método get() que realiza uma requisição HTTP retorna uma Promise. Se você executar esse exemplo várias vezes, verá que não terá uma ordem definida para a impressão dos resultados, ademais, um resultado só é impresso quando ele fica pronto, não seguindo uma ordem linear (que é o que estamos acostumados na programação síncrona).

PHP Avançado
Curso de PHP Avançado
CONHEÇA O CURSO

Para finalizar, é importante pontuar que Promises por si só não fazem nossos códigos serem assíncronos, elas são apenas mecanismos para encapsular os resultados.

Até a próxima!

Introdução à programação assíncrona em PHP usando o ReactPHP

Antes de entrarmos no comparativo do modelo síncrono versus assíncrono, veremos uma introdução, o essencial, sobre como uma requisição funciona em uma aplicação PHP tradicional.

A maior parte das aplicações escritas em PHP funcionam no clássico modelo de requisição e resposta de curto tempo de vida. Uma requisição é feita, o código é interpretado e depois compilado, a execução é realizada, dados são retornados e tudo é descarregado da memória na sequência, tudo acontece de forma isolada, sem compartilhar contexto. De forma simplificada, esse é o ciclo de vida da execução de um script no PHP a cada nova requisição feita.

Você pode estar imaginando que isso é muito custoso, ter sempre que passar pela interpretação e compilação a cada nova requisição. Você está certo. Mas o PHP implementa mecanismos que otimizam esse processo, para não ter que interpretar e compilar o código o tempo todo. O PHP interpreta os códigos e os compila (de forma implícita, ou seja, quando ele julga necessário) para bytecodes (uma versão intermediária de código) e coloca isso em memória compartilhada quando ele percebe que aquela parte é muito requisitada/utilizada (tarefa da extensão nativa OPCache). Além disso, o PHP implementa outros mecanismos (de mais baixo nível) de otimização da execução desse código intermediário.

De qualquer forma, mesmo com os mecanismos de otimização, a essência do modelo de requisição e resposta se mantém a mesma. O diagrama abaixo exemplifica como funciona esse ciclo de execução:

Esse diagrama encurtou propositalmente uma etapa, a que passa pelo PHP-FPM, um gerenciador de processos, muito usado junto ao Nginx (servidor web), pois poderíamos ter um artigo só para falar sobre ele. A ideia aqui é entender o básico de como a requisição passa pelo servidor web e depois é retornada para o cliente. O PHP-FPM dispõe de pools de processos, ele cria, controla e encerra, de acordo com a demanda (o que o Nginx está encaminhando pra ele) e capacidade do hardware para tal (memória, principalmente).

Modelo síncrono

Num ambiente síncrono (tradicional) as instruções (partes) do programa são executadas uma por uma e apenas uma por vez, de forma sequencial:

<?php

echo "Hello ";

sleep(4); // Espera 4 segundos

echo "World";

Esse script vai demorar 4 segundos para ser executado e finalizado. A execução acontece linha por linha, de forma bloqueante. Se uma instrução precisa aguardar algum tempo (seja para ler algo do disco ou fazer alguma operação na rede), isso terá de ser concluído para que a próxima instrução seja executada e até mesmo para que ela use os dados previamente recuperados/preparados. Ou seja, parte-se da premissa de que a instrução anterior precisa ter sido concluída com sucesso (sem erros) para que uma nova seja executada (dependência).

Esse modelo funciona muito bem pra operações que usam mais CPU que I/O, pois a resolução de uma operação na CPU é muito mais eficiente do que uma operação que envolva I/O (uma requisição na rede, a leitura de um arquivo, esse tipo de operação é de alta latência).

Só que pense no seguinte problema: você decidiu implementar uma tarefa que precisa verificar se os links de um site estão todos online. No modelo síncrono, teríamos que partir da primeira requisição, aguardar o resultado dela (momento de ociosidade do programa) e então partir para próxima seguindo o mesmo fluxo até a última (sempre de forma sequencial e uma só iniciando após a finalização da outra).

Mas, não seria melhor ao invés de esperarmos a primeira requisição ser finalizada já inicializarmos as outras requisições e depois de um tempo voltar para pegar os resultados produzidos por elas? Pois bem, essa é a ideia central do modelo assíncrono, ele minimiza a ociosidade do programa alternando entre as tarefas. Num código assíncrono as tarefas são intercaladas sem precisar envolver novas threads, ou seja, de forma single-thread (como funciona o PHP e NodeJS, por exemplo).

Modelo assíncrono

Um código assíncrono lida com dependências e ordem de execução de eventos, ou seja, lida basicamente com tempo. É comum associar assincronismo com paralelismo, pois o assincronismo dá essa sensação que muita coisa está sendo executada no mesmo instante de tempo, no entanto, ao invés disso, no assincronismo muita coisa é feita ao mesmo tempo (concorrentemente) só que uma coisa por vez, nunca no mesmo instante de tempo (o fluxo de execução alterna entre as tarefas). Não existe paralelismo num código assíncrono, ou seja, um código assíncrono não tem suas tarefas distribuídas em múltiplas unidades de processamento, igual comentamos anteriormente, é single-thread (apesar de ser possível atingir paralelismo com assincronismo num ambiente multi-thread, mas foge do escopo do nosso artigo e normalmente necessita de algum caso de uso bem específico, devido às dificuldades técnicas de se implementar e sincronizar a comunicação).

Um código assíncrono continua executando uma tarefa por vez, ele apenas não fica preso em ociosidade enquanto uma tarefa ainda está aguardando algum resultado de I/O, por exemplo. Ao invés de ficar “bloqueado” aguardando, ele alterna de tarefa, inicia outros trabalhos e volta nas outras tarefas em um tempo futuro quando elas estiverem prontas. Fazendo uma analogia, vamos supor que você tem uma tarefa que precisa fazer duas requisições na internet, o seu código assíncrono vai lidar dessa forma:

“Faça essa primeira requisição, mas não vou ficar aqui esperando o resultado, me avise quando tudo estiver pronto. Enquanto isso, deixa eu executar a segunda requisição aqui.”

Enquanto no código síncrono seria:

“Faça essa primeira requisição. Eu terei que ficar esperando essa resposta, pois necessito dela para continuar o meu fluxo de trabalho. [ … algum tempo depois …] Obrigado pela resposta, agora, por gentileza, execute essa segunda requisição? Ficarei aqui aguardando o resultado dela. [… algum tempo depois …] Obrigado pela resposta. Agora posso concluir meu trabalho.”

Se você desenvolve um código síncrono para resolver uma operação matemática e porta esse código para um modelo assíncrono, você vai notar que ambos serão executados praticamente no mesmo tempo, sem nenhum levar vantagem sobre o outro. Agora, a história muda completamente se o seu problema precisa realizar alguma operação I/O (que naturalmente é bloqueante) ou quando ele precisa aguardar algum tempo por alguma coisa, nesse tipo de caso, o modelo assíncrono leva muita vantagem, como mostra esse diagrama:

Veja que nesse diagrama as tarefas alternam entre si, o modelo assíncrono tenta sempre evitar ociosidade/espera/bloqueio. Ele só fica bloqueado/aguardando quando nenhuma tarefa pode fazer nenhum progresso, aí ele precisa receber alguma chamada para voltar à sua operação.

A abordagem assíncrona não é a solução para todos os problemas, mas em comparação com o modelo síncrono, ela performa melhor principalmente nos seguintes cenários:

  • Quando o programa contém tarefas que fazem uso intensivo de I/O;
  • Quando o programa contém tarefas independentes, ou seja, quando umas não precisam esperar pelas outras para realizar seus trabalhos (e essas passam por algum estado de progresso em suas atividades).

Quando não faz tanto sentido:

  • Uma aplicação que faz um uso intensivo da CPU em que as operações são dependentes, ou seja, uma precisa ser finalizada para que a outra entre em cena;
  • Uma aplicação que realiza grandes operações de I/O mas que o uso da aplicação em si é infrequente e não há necessidade de escalar;

Levando para exemplos do “mundo real” você vai ver com frequência o uso de programação assíncrona para:

  • Uma API em que o usuário faz uma requisição e precisa de uma resposta rápida sem que precise esperar alguma operação ser finalizada (essa operação pode continuar rodando lá no servidor enquanto o usuário já obteve a resposta dele). Nesse sentido, a interface do usuário não fica congelada esperando uma resposta de uma operação que ele não precisa esperar por ela.
  • Data Streaming (dá pra construir, por exemplo, até um servidor de streaming de vídeo);
  • Aplicação de monitoramento;
  • Criação de chats;
  • Etc;

Apesar de termos dado exemplos clássicos aqui, é perfeitamente possível integrar código assíncrono numa aplicação tradicional (de abordagem síncrona) se você perceber que em determinado momento requisições externas precisam ser feitas ou alguma operação importante que envolva I/O, você pode estudar a possibilidade de implementar um código assíncrono nessa parte para obter o benefício da não ociosidade e melhorar o tempo de resposta do seu usuário. Não existem “regras estritas” aqui, você vai precisar avaliar caso a caso e decidir o que achar melhor. Mas, certamente, é mais comum ver scripts que rodam em linha de comando (CLI) utilizarem a abordagem assíncrona.

PHP Assíncrono

O PHP não dispõe (mas há a intenção de se implementar isso em algum momento da versão 8 do PHP) de mecanismos nativos para lidar com código assíncrono, diferente do JavaScript e C#, por exemplo. Por isso bibliotecas como Amp e ReactPHP se tornaram relevantes, pois elas abstraem isso. Nesse artigo introdutório, usaremos os componentes do ReactPHP.

O ReactPHP é baseado no padrão Reactor (o mesmo usado pelo NodeJS) que é uma implementação de uma arquitetura orientada a eventos (event-driven). A ideia é permitir que iniciemos múltiplas operações I/O sem que precisemos esperar pela finalização delas (não bloqueante). Ao invés disso, somos notificados quando algo importante acontecer e reagimos a esse evento com um callback (se você programa em JavaScript certamente já está familiarizado com isso).

O ReactPHP possui uma série de componentes independentes e o principal deles, que é o seu core é o EventLoop, ele é a base para o funcionamento de todos os outros componentes que o ReactPHP disponibiliza. O componente EventLoop é uma implementação padrão Reactor.

O Event Loop é basicamente um while infinito que faz o papel de ser o Scheduler das operações. Ele sequencialmente processa a fila de eventos e cuida da execução dos callbacks. Ele é o único código sendo executado sincronamente, nenhum outro código é executado em paralelo. E, como já dissemos anteriormente, ele roda em uma única thread. O fato do seu processador ter 16 núcleos ou 1, em nada vai interferir, a execução do Event Loop continuará sendo single-thread. A ideia por trás do ReactPHP é fazer um bom uso do tempo da CPU (sem cair na ociosidade com as operações de I/O) e não exatamente em paralelizar processos (o que demandaria diversos outros problemas de comunicação, troca de estados, trocas de contexto por parte do sistema operacional além, claro, de recursos de hardware).

O funcionamento é mais ou menos assim:

  • Você registra um evento;
  • Você passa a “ouví-lo” (listening);
  • Quando esse evento é disparado, você reage a ele via um handler e executa algum código.

Diagrama simplificado de funcionamento de um Event Loop:

ReactPHP

O ReactPHP possui quatro implementações possíveis de Event Loop e ele por padrão escolhe qual usar a partir da análise das extensões instaladas no seu ambiente PHP.

As implementações são:

  • StreamSelectLoop – Essa implementação funciona nativamente no PHP sem precisar de nenhuma extensão específica, ela executa chamadas de sistema select que resolvem a implementação do event loop, mesmo que não na performance das opções que serão mostradas abaixo.
  • LibEventLoop – Essa opção usa a extensão libevent do repositório pecl.
  • LibEvLoop – Usa a extensão libev. Funciona de forma similar à libevent citada acima.
  • ExtEventLoop – Usa a extensão event. Funciona de forma similar à libevent citada anteriormente. Essa é a minha extensão de escolha, por ser a mais atualizada e um dos desenvolvedores dela também trabalha no core do PHP. Você pode ver mais detalhes sobre ela clicando aqui.

Mas, calma lá! Não é escopo nosso, por enquanto, se preocupar com tudo isso. Se você vai desenvolver uma aplicação para produção que vai usar ReactPHP, ótimo, eu lhe recomendaria muitíssimamente instalar a extensão event. Mas, para o nosso objetivo didático, vamos deixar que o próprio ReactPHP escolha a melhor implementação pra gente, baseando-se no que temos instalado em nosso ambiente. Se não tivermos nenhuma das três últimas extensões, ele vai usar a primeira implementação, a **StreamSelectLoop **(standalone).

Ele possui uma factory que decide qual das implementações acima será usada:

$loop = React\EventLoop\Factory::create();

As soluções escritas usando o ReactPHP não dependem de nada específico de nenhuma das implementações acima, ou seja, não importa qual a implementação será usada, o ReactPHP se comportará da mesma maneira, as interfaces são as mesmas para todas. Isso nos dá a liberdade de não nos preocuparmos em instalar uma extensão para desenvolvermos alguns testes.

Timers

Timer são úteis para executar um determinado código em um momento futuro. Funcionam da mesma forma que setTimeout() and setInterval() do JavaScript.

Por exemplo, o Event Loop dispõe do método addPeriodicTimer() que faz com que o callback informado seja executado repetidamente a cada determinado intervalo de tempo.

Vamos criar o nosso primeiro exemplo? Tudo o que você precisa fazer é criar um diretório no local onde normalmente você escreve suas aplicações. Dentro dessa pasta, crie um arquivo composer.json com o seguinte conteúdo:

{
    "require": {
        "react/event-loop": "^1.1"
    }
}

Pelo terminal, acesse esse diretório e execute:

$ composer install

Por fim, no mesmo diretório, crie um arquivo index.php com o seguinte conteúdo:

<?php

require './vendor/autoload.php';

$loop = React\EventLoop\Factory::create();

$loop->addPeriodicTimer(1, static function () {
    static $count;

    if (null === $count) {
        $count = 0;
    }

    echo $count++ . PHP_EOL;
});

$loop->run();

// Output:

// 0
// 1
// 2
// 3
// 4
// 5
// ...

Para executar o exemplo:

$ php index.php

E você verá o resultado no seu terminal. A cada um segundo o callback (a função anônima que definimos no segundo argumento de addPeriodicTimer() é executada).

Outro método é o addTimer():

<?php

require './vendor/autoload.php';

$loop = React\EventLoop\Factory::create();

$loop->addTimer(2, static function () {
    echo 'World';
});

echo 'Hello ';
$loop->run();

// Output: Hello World

Nesse caso, o callback será executado uma única vez, num tempo futuro (em dois segundos).

Esse exemplo é, de certa forma, parecido com esse, escrito em JavasCript:

setTimeout(function () { 
  console.log('World');
}, 2);

console.log('Hello ');

Ambos os exemplos mostram que não estão seguindo o fluxo síncrono de execução, ademais, uma parte do código foi programada para ser executada em outro momento e isso não bloqueou a linha de execução.

Streams

A documentação do PHP define Streams como uma forma de generalizar arquivo, rede e outras operações que compartilham um conjunto comum de funções e usos. Em outras palavras, streams representam coleções de dados que podem não estar completamente disponíveis de imediato e também não possuem a limitação de ter que caber na memória, isso faz com que streams sejam uma ferramenta poderosa para lidar com grandes quantidades de dados que podem ser obtidas por partes (chunks). Grande parte do que é feito no PHP é sobre streams. Ao ler um arquivo, lidamos com streams. Ao retornar um output para o cliente, lidamos com streams. Ao obter dados de uma conexão TCP/IP, também estamos lidando com streams.

Temos três tipos de Streams:

  • Readable – Esse tipo permite ler (apenas leitura) os dados de uma fonte;
  • Writable – Esse tipo permite escrever (apenas escrita) dados em uma fonte;
  • Duplex (Readable e Writable ao mesmo tempo) – Esse tipo permite ler e/ou escrever (ambos) dados, como é o caso do protocolo TCP/IP (full-duplex).

Por exemplo, vamos supor que você precise avaliar linha a linha um arquivo de log que possui 1GB, se fizer assim:

$log = file_get_content("error.log")

O PHP tentará carregar o arquivo inteiro na memória (e enquanto não for carregado, nada mais pode ser executado, bloqueante por natureza), o que fatalmente acarretará em um erro e a execução do script será interrompida.

Usando a interface Readable Resource Stream do ReactPHP atingimos esse objetivo de forma não bloqueante, performática e com o mínimo uso de memória.

Vamos testar isso na prática? Criaremos um arquivo (na raiz do projeto) com os 10 milhões de números, um por linha, se você usa um sistema baseado em Unix, consegue atingir esse objetivo executando:

$ awk 'BEGIN { n = 1; while (n < 10000000) print (n++) }' > numeros.txt

No arquivo index.php, execute:

$content = file_get_contents('numeros.txt');

echo 'Memória utilizada: ' . (memory_get_peak_usage(true)/1024/1024);

// Memória utilizada: 77.24

O PHP tentará alocar cerca de ~78MB na memória. Se você tentar limitar o consumo de memória pelo script, terá um estouro:

<?php

ini_set('memory_limit', '12M');

$content = file_get_contents('numeros.txt');

// PHP Fatal error: Allowed memory size of 12582912 bytes exhausted (tried to allocate 78897112 bytes)

Agora vamos usar a classe ReadableResourceStrea do ReactPHP:

<?php

require './vendor/autoload.php';

use React\Stream\ReadableResourceStream;

$loop = React\EventLoop\Factory::create();

$stream = new ReadableResourceStream(
    fopen('numeros.txt', 'rb'), $loop
);

$stream->on('data', function ($chunk) {
    // echo "$chunk\n";
});

$stream->on('end', function () {
    echo 'Memória utilizada: ' . (memory_get_peak_usage(true)/1024/1024);
});

$loop->run();

// Memória utilizada: 2

O pico de consumo de memória ficou em 2MB (assim que a informação fica disponível no buffer, já a utilizamos, liberando-o). Poderíamos processar aí um arquivo bem maior, de dezenas ou centenas de gigabytes.

Veja que nesse exemplo implementamos dois eventos: data e end. No data recebemos os chunks (partes) do arquivo que está sendo lido. Em end executamos um callback quando o processo é finalizado.

Nesse exemplo usamos fopen(), função nativa do PHP (que trabalha com streams), mas em uma aplicação verdadeiramente assíncrona, ao invés disso, devemos usar o componente Filesystem do ReacPHP pois, se tiver uma disputa na leitura do arquivo, a aplicação pode ficar congelada (ler qualquer coisa do sistema de arquivos é bloqueante por natureza). Com esse componente, teríamos algo como:

<?php

require './vendor/autoload.php';

use React\Filesystem\Filesystem;

$loop = React\EventLoop\Factory::create();
$filesystem = Filesystem::create($loop);

$filesystem->file('numeros.txt')->open('rb')->then(function($stream) {
    $stream->on('data', function ($chunk) {
        // echo "$chunk\n";
    });

    $stream->on('end', function () {
        //
    });
});

$loop->run();

Ah, para rodar esse exemplo é necessário que você instale o componente no seu projeto:

$ composer require react/filesystem

Se você já usou promises no JavaScript deve ter notado o método then() ali em cima. O conceito é o mesmo. O método open() retorna uma promise e no método then() executamos um callback quando ela (a “promessa”) é cumprida.

Lembra do exemplo que citamos lá no começo do artigo sobre uma tarefa que verifica se os links de um site estão online? Pois bem, ela poderia ser implementada usando a library reactphp-buzz, pois ela abstrai todo o essencial para se fazer requisições HTTP assíncronas.

Um protótipo de como isso poderia ser implementado de forma síncrona:

<?php

$time_start = microtime(true);

function urlsFromHtml(string $html) : array
{
    $dom = new DOMDocument();

    libxml_use_internal_errors(true);
    $dom->loadHTML($html);
    libxml_use_internal_errors(false);

    $urls = [];

    foreach ($dom->getElementsByTagName('a') as $node) {
        $urls[] = $node->getAttribute('href');
    }

    return $urls;
}

function getUrlStatusCode(string $url) : int
{
    $curl = curl_init();

    curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($curl, CURLOPT_CUSTOMREQUEST, 'HEAD');
    curl_setopt($curl, CURLOPT_HEADER, 1);
    curl_setopt($curl, CURLOPT_NOBODY, true);
    curl_setopt($curl, CURLOPT_URL, $url);

    curl_exec($curl);
    $code = curl_getinfo($curl, CURLINFO_HTTP_CODE);
    curl_close($curl);

    return $code;
}

function getUrlContent(string $url)
{
    $curl = curl_init($url);

    curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
    $html = curl_exec($curl);
    curl_close($curl);

    return $html;
}

$urls = urlsFromHtml(
    getUrlContent('https://www.globo.com')
);

foreach ($urls as $url) {
    $status = getUrlStatusCode($url) === 200 ? ' [online]' : ' [offline]';

    echo "{$url} -> {$status} \n";
}

echo 'Tempo total de execução: ' . round(microtime(true) - $time_start);

Dessa forma demora cerca de 150 segundos para “pingar” todas as URLS extraídas. Agora, a mesma implementação usando ReactPHP e e a library reactphp-buzz:

Primeiro instale a dependência dela no projeto:

$ composer require clue/buzz-react:^2.6
<?php

require './vendor/autoload.php';

use Psr\Http\Message\ResponseInterface;

$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

function urlsFromHtml(string $html) : array
{
    $dom = new DOMDocument();

    libxml_use_internal_errors(true);
    $dom->loadHTML($html);
    libxml_use_internal_errors(false);

    $urls = [];

    foreach ($dom->getElementsByTagName('a') as $node) {
        $urls[] = $node->getAttribute('href');
    }

    return $urls;
}

$browser->get('https://www.globo.com')->then(function (ResponseInterface $response) use ($loop, $browser) {
    $urls = urlsFromHtml($response->getBody());
    foreach ($urls as $url) {
        $browser->head($url)->then(function (ResponseInterface $response) use ($url) {
            $status = $response->getStatusCode() === 200 ? ' [online]' : ' [offline]';

            echo "{$url} -> {$status} \n";
        });
    }
});

$time_start = microtime(true);

$loop->run();

echo 'Tempo total de execução: ' . round(microtime(true) - $time_start);

Já de forma assíncrona custou apenas 18 segundos. Lembrando que esse é apenas um exemplo para comparar a diferença entre os dois modelos.

Recomendação de leitura: Promises no ReactPHP

O outro artigo da série foi publicado, ele trata o uso de Promises com o ReactPHP. Você pode acessá-lo clicando aqui.

Concluindo

Esse foi um artigo introdutório sobre programação assíncrona e ReactPHP. Deu pra notar como é poderoso manipular streams, muitas possibilidades são abertas. Tem muito mais o que podemos explorar como Ticks, Promises, trabalhar com sistema de arquivos, trabalhar com websockets, usar funcionalidades do sistema operacional através de processos filhos, entre muitas outras coisas. Existem diversos projetos opensource desenvolvidos em cima do ReactPHP para atingir objetivos diversos, conforme você pode ver no site oficial.