assíncrono

Aplicações em tempo real com PHP usando WebSockets

Neste artigo veremos uma introdução a WebSockets com a criação de um servidor em PHP e usando o navegador do usuário como cliente.

Antes, entretanto, recomendo que você assista o vídeo O que são WebSockets? gravado pelo Akira Hanashiro (aqui da TreinaWeb) que explica de uma forma bem sucinta.

E, se você tiver interesse, também recomendo a leitura do artigo Uma introdução a TCP, UDP e Sockets para se ter a base do que é um socket na arquitetura TCP/IP.

Desenvolvedor PHP Júnior
Formação: Desenvolvedor PHP Júnior
Nesta formação você aprenderá todos os fundamentos necessário para iniciar do modo correto com a linguagem PHP, uma das mais utilizadas no mercado. Além dos conceitos de base, você também conhecerá as características e a sintaxe da linguagem de forma prática.
CONHEÇA A FORMAÇÃO

Ok, mas o que é um WebSocket?

WebSocket é um protocolo que permite a criação de um canal de comunicação cliente-servidor com transmissão bidirecional onde ambos os lados (cliente e servidor) podem transmitir dados simultaneamente. WebSocket veio para suprir as deficiências do protocolo Http para esse objetivo. O protocolo Http, que por sinal, é unidirecional (a transmissão ocorre só de uma ponta para a outra), onde o cliente envia a requisição e o servidor retorna a resposta, finalizando ali a conexão. Ou seja, se o cliente envia 5 requisições Http para o servidor, 5 conexões TCP independentes são abertas, enquanto que com WebSocket uma única conexão TCP é aberta e ela fica disponível para troca de dados a qualquer momento (uma conexão persistente, até que um dos lados decida fechá-la).

É muito importante pontuar, também, que Socket e WebSocket são coisas diferentes. Um WebSocket é um protocolo que roda em cima de sockets TCP, enquanto que um socket é uma abstração, uma porta lógica de comunicação entre duas pontas numa rede.

Benefícios de usar WebSocket em detrimento a Http

Se existe a necessidade de uma conexão permanecer aberta por um longo tempo para uma troca de dados constante, WebSocket é uma ótima escolha. Uma conexão Http é relativamente pesada, ela transmite não só os dados, mas também cabeçalhos. Além disso, possui um curto tempo de vida e não mantém estado (stateless).

Já uma conexão WebSocket, depois do handshake entre o cliente e o servidor, ela permanece aberta até que uma das partes decida fechá-la. E foco dela é na transmissão dos dados, cabeçalhos não são transmitidos pra lá e pra cá.

Então, em resumo, os benefícios em relação ao Http são: baixa latência, conexão persistente e full-duplex (transmissão bidirecional).

Principais casos de uso para WebSockets

Home Brokers (onde cotações são atualizadas a todo instante), feeds de redes sociais, aplicativos de bate-papo, ferramentas de edição colaborativa, jogos multi-player etc.

Criando o primeiro servidor

Neste artigo usaremos a library Ratchet que nos permite criar um servidor WebSocket assíncrono, e para isso ela utiliza o event loop do ReactPHP. Outra opção seria utilizarmos o servidor de WebSocket do Swoole. Mas o bom de usar o ReactPHP é que não precisamos instalar nenhuma extensão específica na nossa instalação do PHP.

Crie uma pasta chamada websocket-demo e dentro dela crie o arquivo composer.json:

{
    "require": {
        "cboden/ratchet": "^0.4.1"
    }
}

Instale as dependências:

$ composer install

Agora, crie um arquivo server.php com a seguinte implementação:

<?php

require './vendor/autoload.php';

use Ratchet\Server\EchoServer;

$app = new Ratchet\App('localhost', 9980);
$app->route('/echo', new EchoServer, ['*']);
$app->run();

Esse é o servidor mais primitivo que existe, é um “Echo Server”, ele envia de volta tudo o que o cliente manda pra ele. Nem tivemos a necessidade de implementá-lo, pois ele já vem junto com o Ratchet.

Por fim, crie um arquivo index.html com a seguinte implementação:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket EchoServer</title>
</head>
<body>
<label for="input">Digite aqui: </label>
<input id="input" type="text" placeholder="Digite aqui"/>

<div id="response"></div>
<script>
    let input = document.getElementById('input');
    let response = document.getElementById('response');
    const socket = new WebSocket('ws://localhost:9980/echo');

    // Ao estabelecer a conexão enviamos uma mensagem pro servidor
    socket.addEventListener('open', function () {
        socket.send('Conexão estabelecida.');
    });

    // Callback disparado sempre que o servidor retornar uma mensagem
    socket.addEventListener('message', function (event) {
        response.insertAdjacentHTML('beforeend', "<p><b>Servidor diz: </b>" + event.data + "</p>");
    });

    input.addEventListener('keyup', function (event) {
        if (event.keyCode === 13) {
            socket.send(this.value);
            this.value = '';
        }
    });
</script>
</body>
</html>

Para testar, primeiro de tudo temos que ter o servidor rodando. Na raiz do projeto execute:

$ php server.php

Vai iniciar o servidor na porta 9980. Por fim, basta executar o arquivo index.html e interagir escrevendo mensagens e apertando enter para enviá-las. Tudo o que o servidor receber de input, ele retornará de volta para o cliente.

O objeto WebSocket é nativo e presente em todos os navegadores modernos. Iniciamos o servidor de forma “mágica” usando o Ratchet, mas a realidade é que por debaixo dos panos ele precisa abstrair algumas importantes coisas como o handshake inicial, as trocas das mensagens (Data Frames) que são criptografadas etc. Toda a dinâmica do funcionamento de um servidor de Websocket pode ser lida nesse documento: Escrevendo um servidor WebSocket.

Agora vamos criar o nosso próprio wrapper, nossa própria implementação de um servidor. Primeiro, na raiz do projeto, crie uma pasta chamada src. Em seguida, altere o composer.json para:

{
    "require": {
        "cboden/ratchet": "^0.4.1"
    },
    "autoload": {
        "psr-4": {
            "Chat\\": "src"
        }
    }
}

Agora crie um arquivo ChatServer.php dentro da pasta src com a seguinte implementação:

<?php

namespace Chat;

use Exception;
use SplObjectStorage;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;

final class ChatServer implements MessageComponentInterface
{
    private $clients;

    public function __construct()
    {
        $this->clients = new SplObjectStorage();
    }

    public function onOpen(ConnectionInterface $conn): void
    {
        $this->clients->attach($conn);
    }

    public function onMessage(ConnectionInterface $from, $msg): void
    {
        foreach ($this->clients as $client) {
            $client->send($msg);
        }
    }

    public function onClose(ConnectionInterface $conn): void
    {
        $this->clients->detach($conn);
    }

    public function onError(ConnectionInterface $conn, Exception $exception): void
    {
        $conn->close();
    }
}

Estamos implementando a interface MessageComponentInterface. A diferença desse servidor pro EchoServer, é que neste estamos guardando as conexões que são estabelecidas e, quando uma mensagem é recebida, enviamos ela de volta para toda as conexões abertas (que é o que o fazemos no método onMessage).

Para testar a implementação, crie um arquivo chat.html no projeto:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket Simple Chat</title>
</head>
<body>
<p>
    <label for="nome">Seu nome: </label>
    <input id="nome" type="text" placeholder="Seu nome"/>
</p>
<p>
    <label for="input">Sua mensagem: </label>
    <input id="input" type="text" placeholder="Sua mensagem"/>
</p>
<hr>
<div id="chat"></div>
<script>
    let chat = document.getElementById('chat');
    let input = document.getElementById('input');
    const nome = document.getElementById('nome');
    const socket = new WebSocket('ws://localhost:9990/chat');

    // Ao receber mensagens do servidor
    socket.addEventListener('message', function (event) {
        // Deserializamos o objeto
        const data = JSON.parse(event.data);
        // Escrevemos no DOM
        chat.insertAdjacentHTML('beforeend', "<p><b>" + data.nome + " diz: </b>" + data.mensagem + "</p>");
    });

    // Ao enviar uma mensagem
    input.addEventListener('keyup', function (event) {
        if (event.keyCode === 13) {
            // Objeto com os dados que serão trafegados
            const data = {
                nome: nome.value,
                mensagem: this.value,
            };

            // Serializamos o objeto para json
            socket.send(JSON.stringify(data));

            this.value = '';
        }
    });
</script>
</body>
</html>

Antes de iniciar o servidor é importante executar o dump-autoload do Composer, para que a classe ChatServer seja reconhecida:

$ composer dump-autoload

Por fim, inicie o servidor:

$ php chat.php

Você pode testar abrindo duas instâncias do chat e usando nomes diferentes:

Os exemplos completos você pode ver nesse repositório: https://github.com/KennedyTedesco/websocket-demo

Só o navegador pode ser cliente do meu servidor?

Não, qualquer outra aplicação pode ser cliente de um servidor WebSocket. Você pode, por exemplo, criar uma aplicação cliente em PHP usando a library Pawl.

Palavras finais

O Ratchet simplifica e muito a criação de servidores de WebSocket em PHP, não obstante, ele também suporta sub-protocolos como o Wamp. Em um próximo artigo pretendo abordar um exemplo utilizando-o.

Até a próxima!

Corrotinas e código assíncrono em PHP usando Generators

Nos últimos artigos eu tenho escrito sobre programação assíncrona em PHP com ReactPHP e Swoole. O Swoole tem seu mecanismo interno próprio de corrotinas, mas o que muita gente não sabe é que podemos trabalhar com corrotinas usando PHP puro, através de generators.

Eu achei que fosse interessante escrever sobre isso até mesmo como uma forma de apresentar às pessoas o conceito de generators e também como uma forma de instigá-las a buscar mais sobre o tema.

A ideia desse artigo é construir um simples (muito simples) scheduler de corrotinas bem primitivo e didático. Em outros artigos eu escrevi de forma mais abrangente sobre generators, código síncrono, assíncrono, corrotinas etc, conceitos esses que serão importantes você ter para entender o funcionamento do nosso exemplo aqui. Por esse motivo, eu recomendo que você leia esses artigos:

Opcionalmente, se você tiver interesse, também recomendo a leitura deste artigo:

Esse artigo que você está lendo é, de certa forma, uma ramificação deste:

Portanto, o artigo Generators no PHP é realmente uma leitura necessária para entender os códigos que desenvolveremos aqui.

Desenvolvedor PHP Júnior
Formação: Desenvolvedor PHP Júnior
Nesta formação você aprenderá todos os fundamentos necessário para iniciar do modo correto com a linguagem PHP, uma das mais utilizadas no mercado. Além dos conceitos de base, você também conhecerá as características e a sintaxe da linguagem de forma prática.
CONHEÇA A FORMAÇÃO

Gênesis

Sabemos que generators são como se fossem funções que podem ser interrompidas e resumidas a qualquer momento. Também sabemos que assincronismo é sobre fluxo de execução. Captando essas duas ideias centrais, podemos chegar na seguinte conclusão lógica: se um generator pode ser interrompido para que outro seja executado, eu posso usar isso para manipular o fluxo de uma execução e então atingir uma modelagem de código assíncrono.

Primeiro vamos ver um exemplo de código síncrono:

<?php

declare(strict_types=1);

$booksTask = static function () {
    for ($i = 1; $i <= 4; ++$i) {
        echo "Book $i\n";
    }
};

$moviesTask = static function () {
    for ($i = 1; $i <= 8; ++$i) {
        echo "Movie $i\n";
    }
};

$booksTask();
$moviesTask();

O resultado dessa execução:

Book 1
Book 2
Book 3
Book 4
Movie 1
Movie 2
Movie 3
Movie 4
Movie 5
Movie 6
Movie 7
Movie 8

A execução é síncrona, linear e previsível. Temos duas tarefas, mas a segunda ($moviesTask) só terá oportunidade de desempenhar seu trabalho depois que a primeira ($booksTask) terminar tudo o que tem para ser feito.

Podemos transformar esse exemplo em um código de modelo assíncrono usando generators e trabalhando com a ideia de que cada task é uma corrotina. Para isso, dois princípios são importantíssimos:

  • Uma Task (tarefa) será apenas um decorator de um generator;
  • Um Scheduler cuidará da fila de tarefas e da execução delas;

O exemplo pode ser encontrado no GitHub: https://github.com/KennedyTedesco/coroutines-php

Uma tarefa será representada pela classe Task:

<?php

declare(strict_types=1);

namespace Coral;

use Generator;

final class Task
{
    private $coroutine;
    protected $firstYield = true;

    public function __construct(Generator $coroutine)
    {
        $this->coroutine = $coroutine;
    }

    public function run(): void
    {
        if ($this->firstYield) {
            $this->firstYield = false;
            $this->coroutine->current();
        } else {
            $this->coroutine->next();
        }
    }

    public function finished(): bool
    {
        return ! $this->coroutine->valid();
    }
}

A tarefa é apenas um decorator de um generator.

E o Scheduler será representado pela classe de seu próprio nome:

<?php

declare(strict_types=1);

namespace Coral;

use SplQueue;

final class Scheduler
{
    private $tasks;

    public function __construct()
    {
        $this->tasks = new SplQueue();
    }

    public function schedule(Task $task): void
    {
        $this->tasks->enqueue($task);
    }

    public function handle(): void
    {
        while (! $this->tasks->isEmpty()) {
            /** @var Task $task */
            $task = $this->tasks->dequeue();

            $task->run();
            if (! $task->finished()) {
                $this->schedule($task);
            }
        }
    }
}

O Scheduler mantém uma fila de tarefas a serem executadas e possui um método público schedule() para adicionar tarefas nessa fila. O método handle() itera nas tarefas executando-as. Antes de entrarmos em mais detalhes, ao executar o exemplo:

<?php

declare(strict_types=1);

require 'vendor/autoload.php';

use Coral\Task;
use Coral\Scheduler;

$scheduler = new Scheduler();

$booksTask = static function () {
    for ($i = 1; $i <= 4; ++$i) {
        echo "Book $i\n";

        yield;
    }
};

$moviesTask = static function () {
    for ($i = 1; $i <= 8; ++$i) {
        echo "Movie $i\n";

        yield;
    }
};

$scheduler->schedule(new Task($booksTask()));
$scheduler->schedule(new Task($moviesTask()));

$scheduler->handle();

Nota: As duas tarefas retornam um generator, que depois é passado para o construtor de Task.

Temos o seguinte resultado:

Book 1
Movie 1
Book 2
Movie 2
Book 3
Movie 3
Book 4
Movie 4
Movie 5
Movie 6
Movie 7
Movie 8

Diferentemente do exemplo síncrono mostrado anteriormente, neste temos a alternância da execução das tarefas, no sentido de que são colaborativas, uma abre espaço para que a outra também tenha oportunidade de ser executada. Isso acontece pois o Scheduler executa a tarefa, o valor corrente dela é impresso, nisso ela volta novamente para a fila do Scheduler para ser executada novamente em outro momento. As tarefas sempre voltam para a fila enquanto ainda tiverem valores a serem processados:

$task->run();
if (! $task->finished()) {
    $this->schedule($task);
}

Essa é uma estratégia para mantê-las em sua essência colaborativas, ou seja, a tarefa abre mão do seu tempo de execução para que outra tarefa também tenha oportunidade.

Considerações finais

Este foi um simples exemplo de como podemos ter uma operação assíncrona utilizando generators. E aqui nem estamos nos referindo a multiplexing de I/O, mas poderíamos usar esse mesmo conceito de generators e implementar I/O não bloqueante (assíncrono) usando algum padrão como o Reactor, usado pelo ReactPHP ou algo mais “simples” usando diretamente a função stream_select(). Inclusive, O Nikita Popov (desenvolvedor do core do PHP) escreveu exatamente sobre isso no artigo Cooperative multitasking using coroutines (in PHP!), que por sinal, é a principal referência desse artigo aqui. Recomendo essa leitura pois ele também fez com que o generator se comunicasse com outros generators e com o Scheduler (que é a realmente essência de uma corrotina), numa espécie de canal de comunicação, o que torna as coisas ainda mais poderosas. O framework assíncrono Amp faz um uso bem intensivo de generators, também vale a pena testá-lo.

Ah, não poderia deixar de pontuar novamente: se você tem interesse por programação assíncrona com PHP, vale a pena a leitura desses artigos:

Até a próxima!

Concorrência, Paralelismo, Processos, Threads, programação síncrona e assíncrona

Concorrência, paralelismo, processos, threads, programação síncrona e assíncrona, são assuntos que permeiam o dia a dia dos desenvolvedores. A ideia desse artigo é descomplicar um pouco o que esses conceitos significam e como eles se relacionam.

Monotarefa versus Multitarefa

Os primeiros sistemas operacionais suportavam a execução de apenas uma tarefa por vez. Nesse modelo, o processador, a memória e os periféricos ficavam dedicados a uma única tarefa. Tínhamos um fluxo bem linear, como pode ser visto nesse diagrama:

Apenas no término da execução de uma tarefa que outra poderia ser carregada na memória e então executada.

O problema desse modelo é que enquanto o processo realizava uma operação de I/O para, por exemplo, ler algum dado do disco, o processador ficava ocioso. Ademais, uma operação do processador é infinitamente mais rápida que qualquer uma de leitura ou escrita em periféricos.

Para se ter ideia, quando falamos de uma operação que a CPU executa, lidamos com nanosegundos, enquanto em uma operação de rede consideramos mile segundos.

Se você “pingar” o Google observará isso:

$ ping google.com.br
PING google.com.br (216.58.202.3): 56 data bytes
64 bytes from 216.58.202.3: icmp_seq=0 ttl=52 time=20.845 ms
...

Conforme as aplicações foram evoluindo, começou a se tornar um problema. Um exemplo clássico é um editor de texto que precisa executar diversas tarefas simultaneamente como, por exemplo, formatar o texto selecionado e verificar a ortografia dele, duas tarefas (simplificando) sendo executadas sobre a mesma “massa” de dados que é o texto selecionado.

A solução encontrada para resolver esse problema foi permitir ao processador suspender a execução de uma tarefa que estivesse aguardando dados externos ou algum evento e passar a executar outra tarefa. Em outro momento de tempo, quando os dados estivessem disponíveis, a tarefa suspensa poderia ser retomada do ponto exato de onde ela havia parado. Nesse modelo, mais de um programa é carregado na memória. O mecanismo que permite a retirada de um recurso (o processador, por exemplo) de uma tarefa, é chamado de preempção.

Sistemas preemptivos são mais produtivos, ademais, várias tarefas podem estar em execução ao mesmo intervalo de tempo alternando entre si o uso dos recursos da forma mais justa que for possível. Nesse tipo de sistema as tarefas alteram de estado e contexto a todo instante.

Os estados de uma tarefa num sistema preemptivo:

  • Nova: A tarefa está sendo criada (carregada na memória);
  • Pronta: A tarefa está em memória aguardando a disponibilidade do processador para ser executada pela primeira vez ou voltar a ser executada (na hipótese de que ela foi substituída por outra tarefa, devido à preempção);
  • Executando: O processador está executando a tarefa e alterando o seu estado;
  • Suspensa: A tarefa não pode ser executada no momento por depender de dados externos ainda não disponíveis (dados solicitados à rede ou ao disco, por exemplo.);
  • Terminada: A execução da tarefa foi finalizada e ela já pode sair da memória;

O diagrama de estado das tarefas com preempção de tempo:

Quantum pode ser entendido como o tempo que o sistema operacional dá para que os processos usem a CPU. Quando o quantum de um processo termina, mesmo que ele ainda não tenha terminado a execução de suas instruções, o contexto dele é trocado, é salvo na sua pilha de execução onde ele parou, os dados necessários e então ele volta pro estado de “pronto”, até que o sistema operacional através do seu escalonador de processos volte a “emprestar” a CPU pra ele (e então ele volta pro estado de “executando”). Trocas de contexto (de pronto pra executando etc) acontecem a todo momento. Milhares delas. É uma tarefa custosa para o sistema operacional, mas que é necessária para que a CPU não fique ociosa.

Um core (núcleo) do processador executa uma tarefa por vez, cabendo ao escalonador do sistema operacional cuidar dessa fila de tarefas, decidir quem tem prioridade, quem tem o quantum disponível etc.

O que é um processo?

Um processo pode ser visto como um container de recursos utilizados por uma ou mais tarefas. Processos são isolados entre si (inclusive, através de mecanismos de proteção a nível de hardware), não compartilham memória, possuem níveis de operação e quais chamadas de sistemas podem executar. Como os recursos são atribuídos aos processos, as tarefas fazem o uso deles a partir do processo. Dessa forma, uma tarefa de um processo A não consegue acessar um recurso (a memória, por exemplo) de uma tarefa do processo B.

Um processo é uma entidade ativa, é a “instância” de um programa (entidade passiva). Podemos fazer analogia com orientação a objetos onde o programa seria a estrutura da classe e um processo seria um objeto instância dessa classe.

O kernel do sistema operacional possui descritores de processos, denominados PCBs (Process Control Blocks) e eles armazenam informações referentes aos processos ativos e cada processo possui um identificador único no sistema, conhecido como PID (Process IDentifier).

As tarefas de um processo podem trocar informações com facilidade, pois compartilham a mesma área de memória. No entanto, tarefas de processos distintos não conseguem essa comunicação facilmente, pois estão em áreas diferentes de memória. Esse problema é resolvido com chamadas de sistema do kernel que permitem a comunicação entre processos (IPC – Inter-Process Communication).

O que é uma thread?

Os processos podem ter uma série de threads associadas e as threads de um processo são conhecidas como threads de usuário, por executarem no modo-usuário e não no modo-kernel. Uma thread é uma “linha” de execução dentro de um processo. Cada thread tem o seu próprio estado de processador e a sua própria pilha, mas compartilha a memória atribuída ao processo com as outras threads “irmãs” (filhas do mesmo processo).

O núcleo (kernel) dos sistemas operacionais também implementa threads, mas essas são chamadas de threads de kernel (ou kernel-threads). Elas controlam atividades internas que o sistema operacional precisa executar/cuidar.

Concorrência e paralelismo

É comum achar que concorrência e paralelismo são a mesma coisa, mas não são. Rob Pike – um dos criadores da linguagem Go – em uma apresentação pontuou:

“Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.”

Concorrência é sobre lidar com várias coisas ao mesmo tempo e paralelismo é sobre fazer várias coisas ao mesmo tempo. Concorrência é um conceito mais a nível de software e paralelismo mais a nível de hardware.

Concorrência é sobre a execução sequencial e disputada de um conjunto de tarefas independentes. Sob o ponto de vista de um sistema operacional, o responsável por esse gerenciamento é o escalonador de processos. Já sob o ponto de vista de concorrência em uma linguagem de programação como Go, por exemplo, o responsável é o scheduler interno da linguagem. Escalonadores preemptivos (como é o caso dos sistemas operacionais modernos) favorecem a concorrência pausando e resumindo tarefas (no caso de sistemas operacionais estamos falando de processos e threads no que chamamos de trocas de contexto) para que todas tenham a oportunidade de serem executadas.

Podemos fazer uma analogia em que concorrência (no contexto de um sistema operacional) é uma fila de drive thru em que carros estão disputando o recurso do atendimento, um por vez. Mas esse drive thru é especial, diferente do da vida real, ele é baseado num sistema preemptivo onde os carros possuem um tempo máximo em que podem ficar ali parados pelo atendimento, passando esse tempo, o carro tem que obrigatoriamente sair para dar oportunidade pro próximo carro da fila:

Paralelismo é sobre a execução paralela de tarefas, ou seja, mais de uma por vez (de forma simultânea), a depender da quantidade de núcleos (cores) do processador. Quanto mais núcleos, mais tarefas paralelas podem ser executadas. É uma forma de distribuir processamento em mais de um núcleo.

Paralelismo é aquele pedágio que permite que carros progridam em diferentes fluxos simultaneamente:

Também podemos dizer paralelismo é uma forma de atingir concorrência e que cada linha de execução paralela também é concorrente, pois os núcleos estarão sendo disputados por várias outras linhas de execução e quem gerencia o que o núcleo vai executar em dado momento do tempo é o escalonador de processos. Paralelismo implica concorrência, mas o contrário não é verdadeiro, pois é possível ter concorrência sem paralelismo, é só pensar no caso de uso de uma única thread gerenciando milhares de tarefas, pausando e resumindo-as, esse é um modelo de concorrência sem paralelismo. Já o scheduler de corrotinas da linguagem Go é um exemplo de scheduler multi threaded, ele paraleliza a execução das corrotinas, ou seja, é um modelo de alta concorrência que faz uso dos núcleos do processador para paralelizar as execuções.

Do ponto de vista de um processador que possui mais de um núcleo (que é o padrão atualmente), um processo poderia ter, por exemplo, três threads rodando simultaneamente em três diferentes núcleos:

Síncrono e assíncrono

Síncrono e assíncrono são modelos de programação que estão intimamente ligados ao fluxo de execução, eles determinam como o código será escrito e como ele rodará. No modelo síncrono uma operação precisa ser finalizada para que outra tenha a oportunidade de ser executada. É um modelo linear, previsível, onde a execução acontece etapa por etapa e ele é base padrão da maior parte das linguagens de programação. Por exemplo, em PHP:

<?php

function tarefa1() {
    echo "tarefa1\n";
}

function tarefa2() {
    echo "tarefa2\n";
}

tarefa2();
tarefa1();

O resultado é previsível:

tarefa2
tarefa1

Já no modelo assíncrono, uma operação não precisa esperar a outra ser finalizada, ao contrário disso, elas alternam o controle da execução entre si. É um modelo não previsível e que não garante a ordem da execução. É um modelo que favorece a concorrência.

Fazendo uma analogia, no modelo síncrono se você precisa colocar roupas para lavar e lavar louças, primeiro você poderia colocar as roupas para lavar e esperaria o tempo que fosse necessário até finalizar, para só depois lavar as louças. Enquanto que no modelo assíncrono você poderia colocar as roupas na máquina de lavar roupas e já começaria a lavar as louças no mesmo instante, pois você não precisa ficar ocioso esperando a máquina de lavar processar seu resultado para desempenhar outra tarefa, você pode pegar o “resultado” da máquina de lavar em um momento futuro.

Pensando no modelo síncrono de como as tarefas são executadas durante o tempo:

A tarefa 2 precisa obrigatoriamente esperar a tarefa 1 ser finalizada antes de ter a oportunidade de ser executada.

Já no modelo assíncrono nós temos a alternância da execução das tarefas, elas concorrem entre si:

Em verde as tarefas que iniciaram, em cinza a alternância de execução durante o tempo e em vermelho a finalização da execução delas. Fiz questão de não colocar em ordem pois assincronismo não garante ordem, assincronismo é sobre a execução de tarefas independentes.

Observe que tanto no modelo síncrono quanto no assíncrono uma tarefa é executada por vez, a diferença é que no modelo assíncrono a tarefa não precisa esperar o resultado da outra para poder ter seu tempo de execução. Assincronismo não necessariamente implica em paralelismo (se pensar em assincronismo num modelo multi-thread, sim, aí atinge paralelismo), assincronismo é uma forma de atingir concorrência.

Existem diferentes formas de se atingir assincronismo, sendo que a principal é a orientada a eventos (event loop com o padrão Reactor), modelo usado por NodeJS, Nginx, Swoole, ReactPHP entre outros. No caso do NodeJS e ReactPHP, por exemplo, eles fazem isso de forma single-thread, mas também é possível atingir assincronismo num modelo orientado a threads (multi-thread).

Em termos gerais, usar o modelo assíncrono é vantagem para a maior parte dos aplicativos que fazem uso intensivo de operações de I/O (leitura e escrita de arquivos, acesso a rede etc). Enquanto que o modelo de paralelizar processamento é mais indicado quando as tarefas são mais orientadas à CPU, quando elas precisam de mais poder de processamento/cálculo que leitura e escrita em periféricos (na rede, ou sistema de arquivos etc).

Palavras finais

Esses assuntos são extensos e poderiam render bem mais conteúdo, mas isso deixaria a leitura engessada e complicada demais. Eu ainda acredito que a melhor forma de entendê-los realmente bem, é aplicá-los usando linguagens e frameworks. Por exemplo, no caso do PHP que é síncrono por padrão e que (ainda, na versão 7.4) não possui nenhum mecanismo a nível da linguagem para operações assíncronas, você pode estudar ReactPHP ou Swoole. A linguagem Go é uma boa pedida para entender na prática concorrência e paralelismo. E para estudar I/O assíncrono, NodeJS pode ser uma boa pedida.

Se você se interessa pelos pormenores do assunto relacionado a processos e threads e como o sistema operacional gerencia isso, o livro Sistemas Operacionais Modernos do Tanenbaum é uma ótima referência de estudo.

Até mais!

Trabalhando com corrotinas, canais e explorando um pouco mais o scheduler de corrotinas do Swoole

Neste artigo veremos de forma prática os aspectos essenciais do modelo de programação concorrente CSP (communicating sequential processes) com Swoole, usando Coroutine (corrotina), Channel (canal) e Defer (execução tardia). Se você já programou em Go verá muitas similaridades.

Antes, entretanto, é fundamental que você leia o artigo Introdução ao Swoole, framework PHP assíncrono baseado em corrotinas, pois ele introduz toda a teoria fundamental para que possamos criar os nossos primeiros exemplos e adentrar um pouco mais nas possibilidades que o Swoole nos oferece.

Em uma execução sequencial e síncrona de duas funções, teríamos:

<?php

function a() {
    sleep(1);
    echo 'a';
}

function b() {
    sleep(2);
    echo 'b';
}

a();
b();

O resultado é bem previsível, aguarda um segundo, imprime a, aguarda dois segundos e imprime b.

Para que possamos executar uma tarefa dentro de uma corrotina, usamos a função go(). O exemplo acima poderia ser reescrito para:

<?php

go(static function () {
    sleep(1);
    echo 'a';
});

go(static function () {
    sleep(2);
    echo 'b';
});

O problema que temos agora é que a função sleep() do PHP é bloqueante, assim como são as funções de stream, por exemplo.

Recomendação de leitura: Streams no PHP

Esse exemplo terá o exato mesmo comportamento que o anterior. Ele demorará três segundos pra finalizar a sua execução. Podemos resolver isso de duas formas, sendo que a primeira é adicionando a instrução Swoole\Runtime::enableCoroutine(); no exemplo:

<?php

Swoole\Runtime::enableCoroutine();

go(static function () {
    sleep(1);
    echo 'a';
});

go(static function () {
    sleep(2);
    echo 'b';
});

Este é um hook “mágico” que fará com que o Swoole execute algumas funções que são nativamente síncronas mas de forma assíncrona (não bloqueante). E isso vale para a sleep(), como vale para as funções relacionadas a streams.

Agora sim, esse exemplo será executado em dois segundos. Ao invés da execução consumir a soma dos dois tempos das corrotinas, ela passa a consumir o tempo da maior.

Então, temos a seguinte relação:

  • No modelo síncrono gasta-se o tempo de: (a + b)
  • No modelo concorrente gasta-se: MAX(a, b)

A outra forma de resolver o problema anterior sem que precisemos aplicar o hook enableCoroutine(), é executando a função sleep() assíncrona da API do Swoole:

<?php

use Swoole\Coroutine\System;

go(static function () {
    System::sleep(1);
    echo 'a';
});

go(static function () {
    System::sleep(2);
    echo 'b';
});

Outras funções disponíveis na API de corrotinas:

System::sleep(100);
System::fread($fp);
System::gethostbyname('www.google.com');
// Entre outras

Você verá muitos System::sleep() até o final desse artigo, pois é uma forma de emular uma operação de I/O (que é sabido que é mais custosa que uma operação de CPU).

Executando os exemplos

Se você usa Linux ou macOS pode instalar o Swoole diretamente no seu ambiente:

https://www.swoole.co.uk/docs/get-started/installation

Ou você pode usar o Docker, que é a opção escolhida desse artigo. Algumas das imagens disponíveis para o Swoole:

Para esse artigo eu estou usando como base a imagem do swoole-by-examples. Todos os exemplos desse artigo estão disponíveis nesse repositório:

https://github.com/KennedyTedesco/swoole-coroutines

Basta que você clone-o em seu computador e então execute o comando abaixo para inicializar o container:

Docker - Fundamentos
Curso de Docker - Fundamentos
CONHEÇA O CURSO
$ docker-compose up -d

E para executar o exemplo anteriormente criado:

$ docker-compose exec client bash -c "time php ./co1.php"

O resultado no terminal será:

ab
real    0m2.031s
user    0m0.010s
sys 0m0.010s

Usamos time na execução para que possamos ter a informação do tempo gasto.

Uma coisa importante de se pontuar é que esse projeto tem como dependência no composer o swoole-ide-helper que ajuda a sua IDE ou editor de código reconhecer as assinaturas das classes e métodos do Swoole. Mas é bom sempre lembrar que a documentação é outro ótimo lugar para conhecer outros detalhes e características das APIs.

Voltando …

Um importante conceito de concorrência é que não é sobre execução ordenada, a ordem de execução das tarefas não é garantida, são vários os fatores que influenciam. Então, o observe o exemplo abaixo em que executamos 5000 corrotinas:

<?php

use Swoole\Coroutine\System;

for ($i = 0; $i < 5000; $i++) {
    go(static function () use ($i) {
        System::sleep(1);
        echo "$i\n";
    });
}

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co2.php"

Sempre que você executá-lo, terá um retorno diferente. Nesse exemplo criamos 5000 corrotinas que foram executadas em cerca de 1s. Não fossem executadas de forma concorrente gastaríamos 5000 segundos.

Outras formas de criar corrotinas

A função go() é muito conveniente para a criação de corrotinas, bastando que passemos para ela uma função anônima representando a tarefa. No entanto, existem outras formas de utilizá-la. O primeiro parâmetro dela espera por um callable:

/**
 * @param callable $func
 * @param ...$params
 * @return mixed
 */
function go(callable $func, ...$params){}

Portanto, poderíamos passar o nome de uma função:

<?php

use Swoole\Coroutine\System;

function someTask(int $i) : void {
    System::sleep(1);

    echo "$i\n";
}

for ($i = 0; $i < 1000; $i++) {
    go('someTask', $i);
}

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co3.php"

Como também poderíamos passar a instância de um objeto invocável:

<?php

use Swoole\Coroutine\System;

final class SomeTask
{
    public function __invoke(int $i): void
    {
        System::sleep(1);

        echo "$i\n";
    }
}

for ($i = 0; $i < 1000; $i++) {
    go(new SomeTask, $i);
}

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co4.php"

E as outras formas possíveis são:

<?php

use Swoole\Coroutine\System;

function someTask(int $i): void {
    System::sleep(1);

    echo "$i\n";
}

co::create('someTask', 1);

swoole_coroutine_create('someTask', 2);

Swoole\Coroutine::create('someTask', 3);

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co5.php"

E todas elas aceitam um valor callable, são formas alternativas a go().

Outro conceito importante sobre corrotinas no Swoole é que o scheduler delas não é multi-threaded como em Go. Apenas uma corrotina é executada por vez, não são executadas em paralelo. Por exemplo, se temos duas tarefas e a tarefa 1 é executada, se tem um sleep(1) nela, essa tarefa é pausada e então a tarefa 2 é executada, depois o scheduler volta para a tarefa 1. Eventos de I/O pausam/resumem a execução das corrotinas a todo instante.

Canais

Outro ponto fundamental do modelo CSP são os canais. As corrotinas representam as atividades do programa e os canais representam as conexões entre elas. Um canal é basicamente um sistema de comunicação que permite uma corrotina enviar valores para outra. Em Go um canal precisa ter um tipo especificado previamente, enquanto que no Swoole podemos armazenar qualquer tipo de dado.

Um exemplo:

<?php

use Swoole\Coroutine\System;
use Swoole\Coroutine\Channel;

$chan = new Channel();

go(static function () use ($chan) {
    // Cria 10.000 corrotinas
    for ($i = 0; $i < 10000; $i++) {
        go(static function () use ($i, $chan) {
            // Emula uma operação de I/O
            System::sleep(1);

            // Adiciona o valor processado no canal
            $chan->push([
                'index' => $i,
                'value' => random_int(1, 10000),
            ]);
        });
    }
});

go(static function () use ($chan) {
    while (true) {
        $data = $chan->pop();
        echo "{$data['index']} -> {$data['value']}\n";
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./chan1.php"

Usamos o método push() para adicionar um item no canal, que no caso foi um array, mas poderia ser um inteiro, uma string etc. E usamos pop() para extrair um valor do canal. O while(true) dentro dessa corrotina em especial não é um problema, nisso que estamos realizando uma operação no canal, o estado dessa corrotina é controlado, ela não toma pra ela todo o tempo da CPU. Mas veremos mais adiante que operações pesadas de CPU podem impedir que outras corrotinas tenham a chance de serem executadas, mas isso pode ser resolvido se ativarmos o scheduler preemptivo do Swoole.

Avaliando URLs de forma concorrente

Um dos bons exemplos para visualizarmos na prática concorrência é quando envolvemos operações de rede na jogada. O exemplo que veremos a seguir, apesar de não tão sofisticado, foi desenvolvido para que possamos fazer uso de corrotinas, canais e defer.

<?php

use Swoole\Coroutine\Channel;
use Swoole\Coroutine\System;
use Swoole\Coroutine\Http\Client;

function httpHead(string $url) {
    $client = new Client($url, 80);
    $client->get('/');

    return $client;
}

$chan = new Channel();

go(static function () use ($chan) {
    // Abre um ponteiro para o arquivo
    $fp = fopen('sites.txt', 'rb');

    // Atrasa o fechamento do ponteiro do arquivo para o final da corrotina
    defer(static function () use ($fp) {
        fclose($fp);
    });

    while (feof($fp) === false) {
        // Lê linha a linha do arquivo
        $url = trim(System::fgets($fp));

        if ($url !== '') {
            // Cria uma corrotina para requisitar a URL e trazer o status code dela
            go(static function () use ($url, $chan) {
                $response = httpHead($url);

                // Insere no canal a resposta
                $chan->push([
                    'url' => $url,
                    'statusCode' => $response->statusCode,
                ]);
            });
        }
    }
});

// Corrotina que lê os valores do canal e imprime no output
go(static function () use ($chan) {
    while (true) {
        $data = $chan->pop();
        echo "{$data['url']} -> {$data['statusCode']}\n";
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./http1.php"

Na função httpHead() estamos usando o cliente HTTP de corrotina do Swoole, a documentação dele pode ser consultada aqui.

Na primeira corrotina abrimos um ponteiro para o arquivo onde as URLs estão localizadas. A função defer() define uma tarefa para ser executada ao final da corrotina, então a estamos utilizamos para fechar o ponteiro de arquivo aberto anteriormente.

Iteramos sobre cada linha do arquivo usando a função assíncrona co::fgets() da própria API de corrotina e então, pra cada URL, criamos uma nova corrotina para fazer uma requisição HEAD e obter o código http da resposta. Essa corrotina envia para um canal o resultado, canal este que é utilizado pela segunda corrotina, que imprime todos os valores contidos nele.

O cliente HTTP padrão do Swoole não possui uma API muito rica e não é tão intuitivo de se usar, para isso existe a biblioteca saber que encapsula toda a parte complicada, oferecendo uma API bem intuitiva e de alto nível para se trabalhar com requisições http concorrentes. Se você tiver interesse em praticar, recomendo alterar o exemplo anterior para usar a saber.

E como ficam as tarefas que fazem um uso intensivo de CPU?

Corrotinas são conhecidas por operarem por cooperação (a tarefa é dona do seu ciclo de vida, tendo o poder de se liberar do scheduler no fim de sua operação) em detrimento à preempção.

Esse diagrama ilustra melhor esse cenário:

Enquanto nossas tarefas fazem mais uso de I/O que de CPU, tá tudo bem, pois deixamos os reactors fazerem a mágica. Agora, e se tivermos tarefas de uso pesado de CPU? O modo padrão do scheduler funcionar pode não ser o mais “justo” dependendo do caso, por exemplo:

<?php

use Swoole\Coroutine\System;

// Tarefa 1
go(static function() {
    System::sleep(1);
    for ($i = 0; $i <= 10; $i++) {
        echo "N{$i}";
    }
});

// Tarefa 2
go(static function() {
    $i = 0;
    while (true) {
        $i++;
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./scheduler1.php"

Ao executar esse exemplo, você notará que a primeira tarefa não terá a oportunidade de executar a sua lógica de imprimir N1, N2 etc, pois quando ela é despachada pelo scheduler para um worker, a primeira linha dela é System::sleep(1); que simula uma operação de I/O, isso faz com que ela seja pausada para que outra tarefa da fila seja executada. O problema é que a tarefa 2 não é muito espirituosa, ela fica num loop infinito incrementando uma variável, com isso, ela não deixa nenhuma oportunidade para que a outra tarefa irmã seja executada, ou seja, ela não é tão colaborativa assim.

Já sabemos que uma tarefa é pausada quando ela está aguardando por alguma operação de I/O para dar oportunidade a outra tarefa desempenhar o seu trabalho. Podemos emular isso na prática usando como base o exemplo anterior:

<?php

use Swoole\Coroutine\System;

// Tarefa 1
go(static function() {
    System::sleep(1);
    for ($i = 0; $i <= 10; $i++) {
        echo "N{$i}";
    }
});

// Tarefa 2
go(static function() {
    $i = 0;
    while (true) {
        $i++;

        // Quando estiver no centésimo loop, emula uma operação de I/O
        if ($i === 100) {
            echo "{$i} -> ";

            System::sleep(1);
        }
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./scheduler2.php"

O resultado da execução desse exemplo é:

100 -> N0N1N2N3N4N5N6N7N8N9N10

No centésimo loop emulamos uma operação de I/O de 1 segundo, que fez com que a tarefa fosse pausada dando oportunidade para a tarefa 1 voltar a ser executada.

Como as corrotinas possuem controle do seu ciclo de vida, é possível que uma corrotina deliberadamente peça a suspensão do seu direito de execução para dar espaço a outra corrotina. É o que vemos nesse exemplo:

<?php

// Tarefa 1
$firstTaskId = go(static function() {
    echo 'a';
    co::yield();
    echo 'b';
    co::yield();
    echo 'c';
});

// Tarefa 2
go(static function() use($firstTaskId) {
    $i = 0;
    while (true) {
        $i++;

        if ($i === 1000 || $i === 2000) {
            echo " {$i} ";

            co::resume($firstTaskId);
        }
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./scheduler3.php"

O resultado:

a 1000 b 2000 c

Quando criamos uma corrotina imediatamente recebemos o id dela, por isso definimos a variável $firstTaskId. A primeira tarefa imprime a e então abre mão do seu direito de execução, o que faz com que a segunda tarefa seja executada. Quando o contador chega em 1000, a segunda tarefa abre mão do seu direito de execução para que especificamente a primeira tarefa volte a ser executada e então b é impresso. Mas depois de imprimir b, a primeira tarefa novamente abre mão do seu direito de execução e então chegamos no contador 2000 da segunda tarefa que a resume novamente imprimindo, por fim, c.

Ok, mas e se existisse uma forma do scheduler cuidar dessas questões e não deixar que uma tarefa “sacana” tome todo o tempo da CPU dedicado ao processo? Existe, é possível ativarmos o modo preemptivo. Quando ativamos o modo preemptivo no scheduler, ele passa a funcionar de forma parecida com o scheduler do sistema operacional, dando um tempo justo pra cada linha de execução, sem deixar que uma tarefa impeça as outras de serem executadas. Esse modo preemptivo foi adicionado recentemente e ele parece ter um impacto positivo em aplicações de alto porte que envolvem uma mistura considerável de tarefas CPU bound e I/O bound. Talvez pra sua aplicação não mude muita coisa, ou talvez mude, você teria que testar essa carga nos dois modos (cooperativo e preemptivo) e então ver qual faz mais sentido pro seu caso de uso.

De qualquer forma, voltando no nosso caso hipotético do while(true), usando o modo preemptivo, temos:

<?php

ini_set('swoole.enable_preemptive_scheduler', 1);

go(static function() {
    $i = 0;
    while (true) {
        $i++;
    }
});

go(static function() {
    for ($i = 0; $i <= 10; $i++) {
        echo "N{$i}";
    }
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./scheduler4.php"

O resultado:

N0N1N2N3N4N5N6N7N8N9N10

Veja que a primeira tarefa é um while (true) , mas como o modo preemptivo foi ativado, ela terá um tempo de CPU em milissegundos (no máximo 10ms) e então terá que abrir espaço para que outra tarefa seja executada, depois o tempo da CPU volta pra ela novamente, algo controlado automaticamente pelo scheduler.

Aninhamento de corrotinas

Como já vimos anteriormente, é possível aninharmos corrotinas, criando novas sub-corrotinas. Um bom exemplo para entender a ordem de execução de corrotinas aninhadas:

<?php

go(static function () { //T1
    echo "[init]\n";
    go(static function () { //T2
        go(static function () { //T3
            echo "co3\n";
        });

        echo "co2\n";
    });

    echo "co1\n";
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co6.php"

O resultado:

[init]
co3
co2
co1

Agora, a história muda quando as corrotinas realizam ou emulam alguma operação de I/O:

<?php

use Swoole\Coroutine\System;

go(static function () { //T1
    echo "[init]\n";
    go(static function () { //T2
        System::sleep(3);
        go(static function () { //T3
            System::sleep(2);
            echo "co3\n";
        });

        echo "co2\n";
    });

    System::sleep(1);
    echo "co1\n";
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co7.php"

O resultado será:

[init]
co1
co2
co3

WaitGroup

Com um “grupo de espera” podemos aguardar a finalização de algumas corrotinas antes que executemos alguma outra instrução:

<?php

use Swoole\Coroutine\System;
use Swoole\Coroutine\WaitGroup;

$wg = new WaitGroup();

go(static function () use ($wg) {
    $wg->add(3);

    go(static function () use ($wg) {
        System::sleep(3);
        echo "T1\n";
        $wg->done();
    });

    go(static function () use ($wg) {
        System::sleep(2);
        echo "T2\n";
        $wg->done();
    });

    go(static function () use ($wg) {
        System::sleep(1);
        echo "T3\n";
        $wg->done();
    });

    // Aguarda a execução das corrotinas do grupo antes de executar as instruções abaixo
    $wg->wait();

    echo "\n---- \ ----\n";
    go(static function () {
        echo "\n[FIM]\n";
    });
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co8.php"

O resultado será:

T3
T2
T1

---- \ ----

[FIM]

O método add() é para incrementar o contador de quantas corrotinas estão no grupo de espera, ele pode ser usado quantas vezes forem necessárias.

Devo me preocupar com race conditions?

Em implementações em que o scheduler usa o modelo multi-thread, como em Go, o desenvolvedor precisa se preocupar com o acesso aos recursos globais compartilhados, para garantir que duas ou mais corrotinas não os acessem ao mesmo tempo, o que invariavelmente causaria race conditions (condições de corrida). Mas esse não é o caso quando usamos Swoole, pois o scheduler dele é single-thread, portanto, não há necessidade de lockings.

Esse exemplo em Go que cria 5k gorrotinas incrementando uma variável global:

package main

import (
    "fmt"
    "sync"
    "time"
)

var count int
var waitGroup sync.WaitGroup

func main() {
    for i := 0; i < 5000; i++ {
        waitGroup.Add(1)
        go increment()
    }

    waitGroup.Wait()
    fmt.Println(count)
}

func increment() {
    time.Sleep(1 * time.Second)

    count++
    waitGroup.Done()
}

Para executá-lo:

$ time go run go1.go

Você pode testá-lo inúmeras vezes e verá que sempre terá um resultado diferente de 5.000, exatamente por causa das race conditions que acontecem, uma gorrotina atropelando a outra na hora de acessar a variável global.

Go implementa uma ferramenta para identificar race conditions, bastando adicionar o parâmetro -race na execução:

$ time go run -race go1.go

Ele indicará que o programa é uma “fábrica” de race conditions:

==================
WARNING: DATA RACE
Read at 0x000001229360 by goroutine 8:
  main.increment()
...
Found 4 data race(s)
exit status 66

Para que as evitemos, podemos usar mutexes ou operações atômicas. Vamos com a primeira opção que é bem simples de assimilar:

package main

import (
    "fmt"
    "sync"
    "time"
)

var count int
var mu sync.Mutex
var waitGroup sync.WaitGroup

func main() {
    for i := 0; i < 5000; i++ {
        waitGroup.Add(1)
        go increment()
    }

    waitGroup.Wait()
    fmt.Println(count)
}

func increment() {
    time.Sleep(1 * time.Second)

    mu.Lock()
    count++
    mu.Unlock()

    waitGroup.Done()
}

Observe que envolvemos a operação de incremento com mu.Lock() e mu.Unlock() para garantir um único acesso por vez à variável global.

Ao executar novamente o exemplo:

$ time go run -race go1.go

O resultado:

5000

E não teremos nenhum erro da ferramenta de verificação de race conditions.

Podemos ter o mesmo exemplo escrito no Swoole sem que precisemos fazer nada de especial (em relação a locks etc):

<?php

use Swoole\Coroutine\System;
use Swoole\Coroutine\WaitGroup;

$count = 0;

go(static function() use(&$count) {
    $wg = new WaitGroup();

    for ($i = 0; $i < 5000; $i++) {
        $wg->add(1);

        go(static function () use($wg, &$count) {
            System::sleep(1);
            $count++;

            $wg->done();
        });
    }

    $wg->wait();

    echo $count;
});

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co9.php"

Usamos WaitGroup para fazer paralelo com a implementação em Go, mas nesse exemplo em especial, poderíamos ter cortado essa etapa e escrito assim:

<?php

use Swoole\Coroutine\System;

$count = 0;

Co\run(static function() use(&$count) {
    for ($i = 0; $i < 5000; $i++) {
        go(static function () use(&$count) {
            System::sleep(1);
            $count++;
        });
    }
});

echo $count;

Para executá-lo:

$ docker-compose exec client bash -c "time php ./co10.php"

Esse exemplo produz o mesmo resultado que o anterior. Co\run aguarda as corrotinas serem finalizadas antes de seguir o fluxo da execução.

O que mais posso fazer com corrotinas?

Muito mais. Os clientes de corrotinas atualmente implementados/suportados pelo Swoole:

E, claro, lembre-se sempre de acompanhar a documentação.

O que mais posso fazer com o Swoole?

Recomendo acompanhar a lista awesome-swoole. Muita coisa boa, frameworks, libraries etc.

Considerações finais

Vimos a teoria essencial de corrotinas que são atualmente o principal mecanismo interno do Swoole e cada vez mais ganharão importância em seu core.

Nos próximos artigos exploraremos outras APIs do Swoole. Até breve!

Introdução ao Swoole, framework PHP assíncrono baseado em corrotinas

O Swoole é um framework de programação assíncrona para PHP e já se posiciona como uma opção estável e confiável para se desenvolver de forma concorrente com alta performance e uso equilibrado de recursos. Ele implementa I/O assíncrono orientado a eventos baseado no padrão Reactor (o mesmo utilizado por ReactPHP, NodeJS, Netty do Java, Twisted do Python entre outros). O Swoole é escrito em C e disponibilizado como uma extensão para PHP.

O Swoole permite que escrevamos aplicações altamente performáticas e concorrentes usando TCP, UDP, Unix Socket, HTTP e WebSockets sem que precisemos ter grandes conhecimentos de baixo nível sobre assincronismo e/ou sobre o kernel do Linux. Ele fornece uma completa API de alto nível com foco na produtividade. O Swoole é usado em grandes projetos enterprises, principalmente na China sendo Baidu (maior portal de busca da China) e Tencent (um dos maiores portais de serviços da China) os principais casos de uso.

É muito importante e recomendado que antes de continuar aqui, você leia o artigo sobre Introdução à programação assíncrona em PHP usando o ReactPHP, pois ele vai te introduzir alguns importantes conceitos de como é o modelo tradicional síncrono de se desenvolver em PHP e como funciona o modelo assíncrono usando o padrão Reactor.

Para o que veio e para o que não veio o Swoole

O Swoole não veio para substituir a abordagem tradicional e síncrona de como a maioria do projetos em PHP são escritos. Ao contrário disso, ele veio pra suprir os outros casos de uso como, por exemplo, a criação de servidores TCP, RPC, Websockets etc. Ele veio para resolver o problema de servidores que precisem de alta concorrência. Para se ter ideia, um único servidor escrito usando o Swoole consegue lidar com 1 milhão de conexões (C1000K). Também podemos dizer que o Swoole veio para que você não precise mudar toda sua stack e linguagem para resolver as necessidades acima listadas.

Muitas formas de se atingir assincronismo

Síncrono ou assíncrono é sobre o fluxo de execução. É possível atingir assincronismo de diversas formas como, por exemplo, uma fila de mensagens (Amazon SQS, Redis etc) onde você a alimenta com as tarefas (jobs) e no seu servidor você tem uma determinada quantidade de worker processes consumindo essa fila e executando as tarefas de forma concorrente e/ou paralela (a depender dos núcleos da CPU).

Também é possível atingir assincronismo com Ajax, lembra de requisição assíncrona onde é possível enviar a requisição e não ficar esperando pela resposta para poder fazer outra? Também é possível criando novos processos (uma opção mais pesada) ou criando novas threads (mais leve que processos, mas ainda assim, não tão leve quanto corrotinas que veremos adiante). Todas essas coisas podem ser trabalhadas para se chegar no objetivo de atingir assincronismo.

No entanto, o modelo assíncrono do Swoole é focado em nonblocking I/O e I/O multiplexing que é mais popularmente conhecido como event driven (com eventos de leitura e escrita a partir do monitoramento de descritores de arquivos) e usa chamadas de sistema como select, pool ou epool. Além disso, o Swoole implementa corrotinas com o modelo CSP (Communicating sequential processes) que é bem conhecido na linguagem Go com go, chan e defer. Se você já trabalhou com concorrência em Go, terá muita facilidade para assimilar em como as coisas são feitas no Swoole.

A arquitetura tradicional com PHP-FPM

Na arquitetura tradicional da maior parte das aplicações escritas em PHP, temos um servidor web e um process manager baseado no protocolo FastCGI, que no caso do PHP o oficial é o PHP-FPM. No lado do servidor web, o mais usado atualmente é o Nginx, que por sinal, também usa o padrão Reactor com I/O multiplexing (epool) para conseguir responder a milhares de conexões simultâneas, diferentemente do Apache (outro popular servidor) que implementa um modelo híbrido multi-process e multi-thread (quando usando Worker MPM em detrimento ao Prefork MPM).

É relativamente comum ler que o PHP-FPM é multi-thread, mas na realidade ele é multi-process. As requisições são iniciadas pelo servidor web (Nginx, por exemplo) que as redireciona para o PHP-FPM via o protocolo binário FastCGI. O Master Process do PHP-FPM recebe essas requisições e as aloca em um novo worker process. O PHP-FPM gerencia pools de worker processes, cada pool pode gerenciar um determinado número de processos filhos (worker processes) para lidar com as requisições (e esse número depende das configurações do PHP-FPM e da quantidade de memória disponível). Todo esse processo é bloqueante por natureza, ou seja, enquanto o script estiver sendo executado (acessando I/O, por exemplo). Só no final quando a resposta é processada e retornada que o processo filho criado pelo PHP-FPM é reciclado.

Uma nota sobre pools: A ideia de uma pool é pré-alocar uma determinada quantidade de processos ou threads (no caso de uma Thread Pool) para que fiquem em espera para realizarem alguma tarefa em um tempo futuro. É uma forma de evitar desperdício de tempo com a alocação de um novo processo ou thread, que não é uma operação muito leve no nível do kernel. Além disso, é uma forma de determinar recursos finitos para o número de clientes que se espera ter. Por exemplo, se o servidor tem 8GB de memória, mas só podemos usar 6GB para as pools que receberão as requisições, então as configuramos para consumirem no máximo isso, o que vai limitar o número de processos / threads que elas poderão pré-alocar.

Em um cenário onde há necessidade de alta concorrência para responder à milhares ou dezenas de milhares de conexões, todo esse processo tradicional é muito custoso (gasta-se muito tempo com troca de contexto), muita alocação de memória e a concorrência fica limitada à quantidade de processos que sua máquina consegue manejar. Não obstante, é lento, pois a cada nova requisição todo o código precisa ser inicializado do zero novamente. Mas esse modelo tradicional também tem seus benefícios, a depender do ponto de vista. Por exemplo, é um modelo stateless, o tempo de vida de uma requisição é curto, assim que o resultado é preparado e retornado, tudo é finalizado e retirado da memória, isso diminui as chances de memory leaks.

Abaixo o diagrama de como funciona essa arquitetura tradicional. O Nginx recebe as requisições da Web (normalmente advindas das portas 80 e 443) e então as encaminha para o socket FastCGI do PHP-FPM que maneja as execuções nas pools de worker processes.

O fluxo tradicional com PHP-FPM é mais ou menos assim:

  • Recebe a requisição;
  • Carrega os códigos (processo léxico, de parser, compilação para opcodes etc);
  • Inicializa os objetos e variáveis;
  • Executa o código;
  • Retorna a resposta;
  • Recicla os recursos, liberando o worker process para outra requisição;

Para a maioria das aplicações de uso menos intensivo de I/O, esse modelo é perfeito e estável. Mas quando precisamos de um uso mais intensivo como, por exemplo, para lidar com milhares ou centenas de milhares de conexões simultâneas, é impossível que tenhamos centenas de milhares de processos sendo criados para cuidar dessas requisições. Muito menos poderíamos criar centenas de milhares de threads, dada a limitação do número de threads por processo imposta pelo sistema operacional e também devido ao overhead que isso causaria. Mesmo que fosse possível resolver o problema usando um número menor de threads, trabalhar com threads não é simples, impõe muitos problemas de comunicação e sincronia para lidar.

É nesse ponto que entram as soluções de I/O não bloqueante com multiplexing de I/O. É aqui que ReactPHP, Nginx, NodeJS, Netty, Swoole, Go etc, resolvem o problema, cada um na sua maneira. Mas a forma mais comum e mais utilizada é usando uma abordagem orientada a eventos com o padrão Reactor.

O ciclo de vida de uma requisição em um servidor do Swoole se limita a bem menos etapas, pois depois do first load, ele mantém os recursos em memória:

  • Recebe a requisição;
  • Executa o código;
  • Retorna a resposta;

A arquitetura com Swoole

Em um cenário de alta concorrência e em um modelo de I/O não bloqueante como é o caso do Swoole, ele deixa de lidar apenas com uma requisição bloqueante (como funciona no PHP-FPM) e ganha o poder de lidar com várias requisições ao mesmo tempo, de forma não bloqueante, graças aos reactors.

O Swoole roda em modo CLI e ele forka um determinado número de processos a depender da quantidade de núcleos da sua CPU. Veja um panorama da arquitetura do Swoole:

O padrão Reactor no Swoole é multi-thread e assíncrono, igual comentamos anteriormente, ele faz uso da chamada de sistema epool. O Main Reactor é uma thread, assim como os reactors auxiliares. O Main Reactor é o que fica ouvindo o socket por novas conexões, ele faz balanceamento de carga entre os reactors auxiliares.

  • Master: o processo principal, o processo pai, o que forkará o processo Manager e que criará as threads do Reactor.

  • Manager: processo gerenciador, o que forka e gerencia os processos workers. Os reactors estarão em constante comunicação com o Manager (através do processo Master).

  • Worker: processo de trabalho, onde as tarefas são executadas;

  • Task Worker: processo de trabalho de tarefa assíncrona, é um auxiliador do Worker, ele trabalha principalmente processando tarefas de sincronização de longa data;

O mais importante aqui é entendermos que quando iniciamos um servidor do Swoole, 2 + n + m processos são criados, ou seja, o processo Master, o processo Manager e n refere-se aos processos Workers e m refere-se aos processos Task Workers, sendo que n e m serão relativos à quantidade de núcleos do seu processador. Se o seu processador tiver 6 núcleos, ele forkará 8 processos, sendo 3 Workers e 3 Task Workers.

A árvore de processo da execução de um servidor do Swoole em uma máquina de 6 núcleos:

 | |   \-+= 05535 kennedytedesco php server.php (Master)
 | |     \-+- 05536 kennedytedesco php server.php (Manager)
 | |       |--- 05537 kennedytedesco php server.php (worker / task worker)
 | |       |--- 05538 kennedytedesco php server.php (worker / task worker)
 | |       |--- 05539 kennedytedesco php server.php (worker / task worker)
 | |       |--- 05540 kennedytedesco php server.php (worker / task worker)
 | |       |--- 05541 kennedytedesco php server.php (worker / task worker)
 | |       \--- 05542 kennedytedesco php server.php (worker / task worker)

Quando trabalhamos de forma assíncrona não sabemos em qual tempo futuro a nossa resposta estará pronta e então passamos a usar os famosos callbacks. Com o tempo, é comum termos muitos callbacks aninhados, é aí que chegamos no famoso Callback Hell. Por exemplo, em NodeJS:

fs.readdir(source, function (err, files) {
  if (err) {
    console.log('Error finding files: ' + err)
  } else {
    files.forEach(function (filename, fileIndex) {
      console.log(filename)
      gm(source + filename).size(function (err, values) {
        if (err) {
          console.log('Error identifying file size: ' + err)
        } else {
          console.log(filename + ' : ' + values)
          aspect = (values.width / values.height)
          widths.forEach(function (width, widthIndex) {
            height = Math.round(width / aspect)
            console.log('resizing ' + filename + 'to ' + height + 'x' + height)
            this.resize(width, height).write(dest + 'w' + width + '_' + filename, function(err) {
              if (err) console.log('Error writing file: ' + err)
            })
          }.bind(this))
        }
      })
    })
  }
})

Existem várias formas que diferentes linguagens/frameworks usam para resolver esse problema, como: Promises (recomendo a leitura do artigo de Promises no ReactPHP) , yield/generators, instruções Async / Await (como em C#).

Em Swoole a forma mais elegante e eficiente de resolver esse problema é usando corrotinas. Corrotinas são uma importante parte do núcleo do Swoole.

Um pouco da sintaxe de corrotinas:

<?php

use Swoole\Coroutine\System;

go(static function () {
    System::sleep(1);
    echo 'a';
});

go(static function () {
    System::sleep(2);
    echo 'b';
});

Corrotina (Coroutine)

Programação concorrente é a composição de atividades independentes e o modelo tradicional de se aplicar concorrência é o multi threaded com memória compartilhada. Mas existem muitos desafios nas trocas mensagens, sincronização, sem contar que todas essas operações são custosas a nível de consumo de recursos. E se existisse uma forma mais simples e bem mais leve de se ter concorrência? Sim, existe! As corrotinas se propõem a resolver esse problema.

O uso de concorrência para “ocultar” a latência das operações I/O se faz cada vez mais necessário com aplicativos que necessitam servir dezenas ou centenas de milhares de clientes simultaneamente. Corrotinas são uma forma de se aplicar concorrência, onde valores são passados entre atividades independentes (coroutines). Já os canais (channels) são o mecanismo que uma corrotina tem para se comunicar com outra corrotina (passando valores).

Há o senso de que corrotinas são uma espécie de “threads de peso leve”. Alguns autores chamam corrotinas de “green threads”, pra dar essa ideia de que são leves, consomem menos recursos. Mas isso não pode ser levado no sentido literal, pois corrotinas não são threads. Todas as operações de uma corrotina acontecem no modo do usuário (user mode), não envolve diretamente o kernel (kernel mode) como acontece com as threads, isso faz com que o custo de criação e consumo de recursos aconteça em uma escala muito menor. O paralelo de se comparar corrotinas com threads é útil apenas para que entendamos quando devemos utilizá-las. Basicamente todo caso de uso que teríamos com threads, podemos fazer com corrotinas. O Swoole por padrão cria uma corrotina para cada requisição recebida, esse que também é o padrão em Go. Teremos um artigo focado só em corrotinas.

Desde a versão 4 o Swoole provê um completo suporte à corrotinas, num modelo relativamente parecido com o que se implementa em Go, mas com algumas diferenças internas fundamentais, pois no Swoole uma única corrotina é executada por vez através de um scheduler single threaded. Enquanto em Go o scheduler de corrotinas é multi threaded, ele paraleliza a execução das corrotinas (e internamente usa locks e mutex para controlar a sincronização).

Nota: Scheduling é um mecanismo que atribui tarefas para serem executadas nos workers, é o mecanismo que gerencia qual tarefa será executada em um dado momento, e quem aplica esse mecanismo é o scheduler. Num modelo mais tradicional, pensando no scheduler de um sistema operacional, tarefa seria uma thread e worker seria um núcleo da CPU. No caso do Swoole, tarefa é uma corrotina e worker é um worker process.

Ambos os modelos tem seus prós e contras. O “contra” do modelo do Swoole é que para compartilhar variáveis globais e recursos entre diferentes processos do Swoole, precisa-se pensar um pouco mais, se vai usar IPC (Inter Process Communication) ou outra abordagem (Table, Atomic etc). Mas esse costuma ser um caso de uso bem mais específico. Na maior parte dos casos o desenvolvedor não precisará ter esse tipo de preocupação.

O pró do modelo do Swoole é que para atualizar recursos compartilhados entre corrotinas, não precisamos nos preocupar em implementar locks (pra cuidar do acesso concorrente a um mesmo recurso), uma vez que a execução é single-thread e que as corrotinas são executadas uma por vez. O fato de de o scheduler de corrotinas operar em uma única thread pode dar uma sensação de que ele não é tão eficiente, mas é importante ressaltar que as corrotinas são pausadas/resumidas a partir de eventos de I/O, então a alternância de execução delas pelo scheduler ocorre de maneira muito alta, não dando muito tempo para ociosidade.

Falando sobre custos e as dificuldades de cada abordagem, temos esse comparativo:

Multi-processoMulti-threadCorrotinas
CriaçãoChamada de sistema fork()Pthreads API pthread_create()Função go()
Custo de SchedulingAltoModeradoExtremamente baixo
ConcorrênciaCentenas de processosMilhares de threadsCentenas de milhares de corrotinas
Dificuldade de desenvolvimentoAltaMuito altaBaixa

Executando os primeiros exemplos

O melhor recurso para visualizar alguns exemplos do que é possível fazer com o Swoole é o Swoole by Examples. Clone esse repositório na sua máquina e então execute o comando abaixo para inicializar os containers:

Observação: É necessário que você tenha Docker instalado na sua máquina.

Docker - Fundamentos
Curso de Docker - Fundamentos
CONHEÇA O CURSO
$ docker-compose up -d

O primeiro exemplo que vamos executar trata-se de uma execução síncrona (tradicional):

<?php

(function () {
    sleep(2);
    echo "1";
})();

(function () {
    sleep(1);
    echo "2";
})();  

Execute no terminal:

$ docker-compose exec client bash -c "time ./io/blocking-io.php"

O resultado será:

12
real    0m3.029s
user    0m0.010s
sys 0m0.010s

O script levou cerca de 3 segundos pra executar e retornou 12. A função sleep() foi usada para simular um bloqueio de I/O.

O mesmo exemplo escrito de forma assíncrona usando corrotinas:

<?php

go(function () {
    co::sleep(2);
    echo "1";
});

go(function () {
    co::sleep(1);
    echo "2";
});

Execute:

$ docker-compose exec client bash -c "time ./io/non-blocking-io.php"

O resultado será:

21
real    0m2.033s
user    0m0.020s
sys 0m0.000s

Executou em cerca de dois segundos, ou seja, ele custou o tempo da maior execução para retornar os resultados, ao invés de executar cada uma função de forma bloqueante, o que custaria 3 segundos.

Benchmarking

Usando a ferramenta wrk gerei benchmarking de quatro servidores “hello world” rodando em diferentes plataformas:

1) Servidor embutido do PHP;

2) Servidor em NodeJS;

3) Servidor em Go;

4) Servidor Swoole;

O exemplo do servidor embutido do PHP:

<?php
// vanilla.php

echo "Hello World";

Iniciei o servidor embutido:

$ php -S 127.0.0.1:8101 vanilla.php

E então rodei o benchmarking:

$ wrk -t4 -c200 -d10s http://127.0.0.1:8101

O resultado foi:

Running 10s test @ http://127.0.0.1:8101
  4 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.46ms    9.56ms 141.27ms   98.92%
    Req/Sec     1.41k     1.70k    9.47k    85.45%
  8871 requests in 10.05s, 1.47MB read
  Socket errors: connect 0, read 9275, write 0, timeout 0
Requests/sec:    882.46
Transfer/sec:    149.95KB

Conseguimos 882 requisições por segundo.

No servidor escrito em NodeJS:

const http = require('http');

const server = http.createServer((req, res) => {
    res.end('Hello World')
});

server.listen(8101, '0.0.0.0');

O resultado foi:

Running 10s test @ http://127.0.0.1:8101
  4 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     4.05ms    1.09ms  43.57ms   98.31%
    Req/Sec    12.49k     1.46k   13.49k    97.03%
  502227 requests in 10.10s, 53.16MB read
  Socket errors: connect 0, read 110, write 0, timeout 0
Requests/sec:  49720.54
Transfer/sec:      5.26MB

Consegui cerca de 50 mil requisições por segundo.

No servidor escrito em Go:

package main

import (
    "fmt"
    "net/http"
)

func main() {
    http.HandleFunc("/", HelloServer)
    _ = http.ListenAndServe(":8101", nil)
}

func HelloServer(w http.ResponseWriter, r *http.Request) {
    _, _ = fmt.Fprintf(w, "Hello World")
}

O resultado foi:

Running 10s test @ http://127.0.0.1:8101
  4 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.05ms  624.03us  42.09ms   92.46%
    Req/Sec    47.05k     2.31k   50.59k    95.50%
  1873010 requests in 10.00s, 228.64MB read
  Socket errors: connect 0, read 48, write 0, timeout 0
Requests/sec: 187280.40
Transfer/sec:     22.86MB

Cerca de 188 mil requisições por segundo.

Agora, no servidor do Swoole:

<?php

use Swoole\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;

$server = new Server('0.0.0.0', 8101);

$server->on('request', static function (Request $request, Response $response) {
    $response->header('Content-Type', 'text/plain');
    $response->end('Hello World');
});

$server->start();

O resultado foi:

Running 10s test @ http://127.0.0.1:8101
  4 threads and 200 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     0.87ms  660.42us  42.95ms   98.60%
    Req/Sec    48.55k     3.12k   53.67k    88.75%
  1933132 requests in 10.01s, 304.19MB read
  Socket errors: connect 0, read 41, write 0, timeout 0
Requests/sec: 193149.91
Transfer/sec:     30.39MB

Não, você não está vendo errado, foram 193 mil requisições por segundo, um pouco mais do que conseguimos com Go. É um número impressionante.

Esse tipo de benchmarking com exemplos simples e hipotéticos não possuem um valor tão significativo, eles precisam ser relativizados, pois as coisas mudam em uma aplicação real que faz um uso mais intensivo de I/O. Ainda assim, é possível enxergar um espectro do que a tecnologia pode alcançar e também nos ajuda a fazer comparações com outras plataformas, como fizemos com NodeJS e Go.

Considerações finais

Esse artigo teve como objetivo fazer um comparativo entre o modelo tradicional síncrono com o assíncrono não bloqueante orientado a eventos. Também foram passadas as principais ideias por trás do Swoole.

Por fim, é interessante informar que o Swoole permite em sua API que implementemos outros padrões como Thread pool pattern. O Swoole também permite que criemos aplicações multi-process.

Leitura sugerida

Dando continuidade aos estudos de Swoole, sugiro a leitura do artigo sequência: Trabalhando com corrotinas, canais e explorando um pouco mais o scheduler de corrotinas do Swoole

Trabalhando com Sockets no ReactPHP

No primeiro artigo da série fizemos uma Introdução à programação assíncrona em PHP usando o ReactPHP, depois vimos sobre Promises no ReactPHP. Hoje falaremos sobre Sockets, um assunto muito relevante para a criação de aplicações cliente-servidor.

PHP Básico
Curso de PHP Básico
CONHEÇA O CURSO

Antes, entretanto, tem dois artigos que muito recomendo você ler antes de continuar neste, que são estes:

Criando o primeiro servidor

No seu projeto, instale o componente:

$ composer require react/socket:^1.3

Não vamos instalar manualmente o componente Event Loop pois ele já é uma dependência de react/socket.

Já estamos prontos para criar o nosso primeiro servidor, baseado no do artigo sobre Programação de Sockets em PHP:

<?php

require './vendor/autoload.php';

use React\Socket\Server;
use React\EventLoop\Factory;
use React\Socket\ConnectionInterface;
use React\Stream\WritableResourceStream;

$loop = Factory::create();
$socket = new Server('127.0.0.1:7181', $loop);
$stdout = new WritableResourceStream(\STDOUT, $loop);

$socket->on('connection', static function (ConnectionInterface $connection) {
    $connection->write("Client [{$connection->getRemoteAddress()}] connected \n");
});

$stdout->write("Listening on: {$socket->getAddress()}\n");

$loop->run();

No construtor de Server informamos o IP (local) e a porta que o nosso servidor rodará, não informando o protocolo antes do IP, ele considera por padrão como sendo tcp. Depois, passamos a ouvir o evento connection e informamos um handler que é executado sempre que uma nova conexão é estabelecida com o nosso servidor. Esse handler recebe como parâmetro um objeto do tipo ConnectionInterface que nos permite trabalhar com a conexão que foi realizada. No caso, sempre que um cliente se conectar ao nosso servidor, vamos imprimir no lado dele a mensagem que definimos no método write().

Para testar esse exemplo, basta que você inicie o servidor:

$ php index.php

E para se conectar ao servidor, se seu sistema operacional é baseado em Unix, em outra janela do terminal:

telnet 127.0.0.1 7181

Ao se conectar, no lado do servidor teremos:

$ ~/D/w/reactphp&gt; php index.php
Listening on: tcp://127.0.0.1:7181

No lado do cliente:

$ telnet 127.0.0.1 7181
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Client [tcp://127.0.0.1:50353] connected

O objeto $connection também implementa a interface EventEmitterInterface que nos permite ouvir alguns eventos relacionados à conexão, por exemplo, através do evento data conseguimos receber os dados enviados pelo cliente.

Para testarmos de forma efetiva o evento data, desenvolveremos o exemplo “Echo Server” do artigo Programação de Sockets em PHP:

<?php

require './vendor/autoload.php';

use React\Socket\Server;
use React\EventLoop\Factory;
use React\Socket\ConnectionInterface;
use React\Stream\WritableResourceStream;

$loop = Factory::create();
$socket = new Server('127.0.0.1:7181', $loop);
$stdout = new WritableResourceStream(\STDOUT, $loop);

$socket->on('connection', static function (ConnectionInterface $connection) {
    $connection->write("Client [{$connection->getRemoteAddress()}] connected \n");

    $connection->on('data', static function ($data) use ($connection) {
        $connection->write("Server says: {$data}");
    });
});

$stdout->write("Listening on: {$socket->getAddress()}\n");

$loop->run();

Se você comparar essa implementação com a que implementamos usando apenas o “PHP puro” (no artigo Programação de Sockets em PHP), verá que essa é bem mais simples. O RectPHP abstrai toda a parte complicada de lidar com streams/buffers, oferecendo uma API de bem alto nível.

No terminal do cliente ao interagir com o servidor (submeter algumas entradas):

$ telnet 127.0.0.1 7181
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Client [tcp://127.0.0.1:50397] connected
Hello
Server says: Hello
World
Server says: World

Diferentemente da nossa implementação pura, os servidores criados usando o ReactPHP aceitam múltiplos clientes, sem que façamos nada de especial a respeito (a não ser que tenhamos algum caso de uso mais específico, que é o caso do nosso próximo exemplo).

PHP Intermediário
Curso de PHP Intermediário
CONHEÇA O CURSO

Desenvolvendo um chat

Avançando um pouquinho mais, vamos criar um simples chat que funcionará pelo terminal. Crie um novo projeto, no meu caso, vou chamá-lo de “reactchat”. Nele, instale a dependência do reactphp/socket:

$ composer require react/socket:^1.3

A diferença desse exemplo para o que criamos no artigo Programação de Sockets em PHP será:

  • Usará o componente de socket do ReactPHP;
  • Um membro poderá citar outro (usando @) para enviar uma mensagem privada;
  • O ReactPHP abstrai em muitos aspectos a programação de sockets, portanto, bem mais simples de desenvolver e manter do que usando as funções nativas da extensão Streams do PHP.

Usaremos como referência o exemplo desenvolvido pelo Sergey Zhuk no artigo: Build A Simple Chat With ReactPHP Socket: Server.

Publiquei o exemplo no Github, você pode obtê-lo de lá: KennedyTedesco/reactchat

O index.php inicia o servidor:

<?php

// https://github.com/KennedyTedesco/reactchat

require './vendor/autoload.php';

use ReactChat\Chat;
use ReactChat\Member;
use React\Socket\Server;
use React\EventLoop\Factory;
use React\Socket\ConnectionInterface;
use React\Stream\WritableResourceStream;

$loop = Factory::create();
$socket = new Server('127.0.0.1:7181', $loop);
$stdout = new WritableResourceStream(\STDOUT, $loop);

$chat = new Chat();

$socket->on('connection', static function (ConnectionInterface $connection) use ($chat) {
    $member = new Member($connection);
    $member->write('Informe o seu nome: ');

    $connection->on('data', static function ($data) use ($member, $chat) {
        if ($data !== '' && $member->getName() === null) {
            // Define o nome do membro
            $member->setName(\str_replace(["\r", "\n"], '', $data));
            // Adiciona o membro ao chat
            $chat->addMember($member);
        }
    });
});

$stdout->write("Listening on: {$socket->getAddress()}\n");

$loop->run();

Não obstante, quando uma conexão é estabelecida, a primeira coisa que ele faz é pedir o nome, considerando que é alguém que está querendo entrar no chat. Essa pessoa só é adicionada ao chat depois que definir o nome.

A classe Member encapsula uma conexão e oferece alguns métodos úteis para se trabalhar com essa conexão:

<?php

// https://github.com/KennedyTedesco/reactchat

declare(strict_types=1);

namespace ReactChat;

use Closure;
use React\Socket\ConnectionInterface;

final class Member
{
    private $name;
    private $connection;

    public function __construct(ConnectionInterface $connection)
    {
        $this->connection = $connection;
    }

    public function getName() : ?string
    {
        return $this->name;
    }

    public function setName(string $name) : void
    {
        $this->name = $name;
    }

    public function write(string $data) : void
    {
        $data = \str_replace(["\r", "\n"], '', $data);

        $this->connection->write($data . \PHP_EOL);
    }

    public function onData(Closure $handler) : void
    {
        $this->connection->on('data', $handler);
    }

    public function onClose(Closure $handler) : void
    {
        $this->connection->on('close', $handler);
    }
}

E é no Chat que adicionamos novos membros e definimos os handlers para os eventos de data e close das conexões dos membros.

<?php

// https://github.com/KennedyTedesco/reactchat

declare(strict_types=1);

namespace ReactChat;

use SplObjectStorage;

final class Chat
{
    private $members;

    public function __construct()
    {
        $this->members = new SplObjectStorage();
    }

    public function addMember(Member $member) : void
    {
        $this->members->attach($member);

        $this->newMessageTo("Bem-vindo(a), {$member->getName()}", $member);
        $this->newMessageToAll("{$member->getName()} entrou na sala;", $member);

        $member->onData(function ($data) use ($member) {
            $this->newMessage("{$member->getName()} diz: {$data}", $member);
        });

        $member->onClose(function () use ($member) {
            $this->members->detach($member);
            $this->newMessage("{$member->getName()} saiu da sala.", $member);
        });
    }

    private function newMessage(string $message, Member $exceptMember) : void
    {
        // Se um membro foi citado usando @, envia a mensagem apenas para ele (mensagem privada).
        // Caso contrário, envia a mensagem para todos os membros
        $mentionedMember = $this->getMentionedMember($message);

        if ($mentionedMember instanceof Member) {
            $this->newMessageTo($message, $mentionedMember);
        } else {
            $this->newMessageToAll($message, $exceptMember);
        }
    }

    private function newMessageTo(string $message, Member $to) : void
    {
        // Envia para um membro específico
        foreach ($this->members as $member) {
            if ($member === $to) {
                $member->write($message);

                break;
            }
        }
    }

    private function newMessageToAll(string $message, Member $exceptMember) : void
    {
        // Envia para todos, exceto para o $exceptMember (quem está enviando a mensagem)
        foreach ($this->members as $member) {
            if ($member !== $exceptMember) {
                $member->write($message);
            }
        }
    }

    private function getMentionedMember(string $message) : ?Member
    {
        \preg_match('/\B@(\w+)/i', $message, $matches);

        $nameMentioned = $matches[1] ?? null;

        if ($nameMentioned !== null) {
            /** @var Member $member */
            foreach ($this->members as $member) {
                if ($member->getName() === $nameMentioned) {
                    return $member;
                }
            }
        }

        return null;
    }
}

Se você acompanhou os outros artigos da série, não terá dificuldade para entender o que está acontecendo nesse exemplo. O ReactPHP torna as coisas bem simples do nosso lado.

O método addMember() é o mais importante da classe, pois ele adiciona um membro na pool de conexões, de tal forma que ele passará a receber as mensagens do chat e também poderá interagir com os outros membros.

O método getMentionedMember() avalia por uma regex se algum pattern @nome foi destacado na mensagem que será enviada, se sim, ele procura e retorna o membro que possui esse nome. Ele é o núcleo do funcionamento do envio das mensagens privadas lá no método newMessage().

Para testar, tudo o que você precisa é iniciar o servidor:

$ php index.php

Depois, abra umas três abas no seu terminal para inicializar alguns clientes do chat (dando nomes diferentes para eles):

telnet 127.0.0.1 7181

Você pode, inclusive, testar o envio de mensagens privadas usando @nomeDoMembro.

PHP Avançado
Curso de PHP Avançado
CONHEÇA O CURSO

Concluindo

Ao invés de utilizarmos o terminal para a conexão dos membros do chat, poderíamos usar uma interface web e, para se conectar ao servidor, utilizaríamos WebSockets que já é uma realidade estável em todos os principais navegadores. Inclusive, esse deverá ser o assunto do próximo artigo da série. =D

Até a próxima!

Programação de Sockets em PHP

Neste artigo veremos uma introdução à programação de sockets com o PHP. Antes, recomendo que primeiro você leia o artigo Uma introdução a TCP, UDP e Sockets que discorre sobre a base teórica fundamental.

O PHP dispõe de uma extensão chamada Sockets, que foi introduzida ainda no PHP 4 e ela tem uma boa fidelidade em relação à forma como sockets são programados em C, além dela expor a possibilidade de se trabalhar com algumas características do sistema operacional, o que pode ser útil dependendo do que se precisa desenvolver. No entanto, essa extensão nos impõe alguns problemas e dificuldades, o primeiro é que ela não vem instalada por padrão, sendo necessário na compilação do PHP adicionar a flag -enable-sockets para ativá-la. Ela também não possui uma boa usabilidade para o desenvolvedor, é uma extensão difícil de usar, de bem mais baixo nível. Além disso, os resources abertos por ela só podem ser acessados pelas funções da mesma família, ou seja, as que iniciam pelo nome socket_ e isso é um fator que limita o desenvolvimento, principalmente porque o PHP dispõe de várias funções para trabalhar com streams como stream_get_contents(), fwrite(), fread() etc, que não podem ser utilizadas junto com as funções da extensão Sockets.

PHP Básico
Curso de PHP Básico
CONHEÇA O CURSO

Mas a solução para os nossos problemas está na extensão Stream, nativa do PHP e que já vem habilitada por padrão. Como o próprio nome sugere, essa extensão lida com toda a abstração de I/O (input e output) do PHP e o detalhe mais importante pra nós é que desde o PHP 5 essa extensão nos permite trabalhar com TCP/IP Sockets e também com Unix Sockets. Os sockets criados usando as funções da extensão Stream (as que iniciam por stream_socket_) nos permitem usar as clássicas funções para lidar com arquivos, o que torna o desenvolvimento bem mais intuitivo.

Leitura recomendada antes de continuar: Streams no PHP.

Em resumo, temos duas formas de trabalhar com Sockets no PHP:

  • Usando as funções da extensão Sockets, que são funções de mais baixo nível, que nos expõem uma variedade maior de possibilidades ao lidar com algumas características de sockets a nível do sistema operacional, mas que prejudica o desenvolvimento por impor um grau maior de dificuldade;
  • Usando as funções da extensão Stream, que é uma extensão habilitada por padrão e utilizada por tudo que envolve streams no PHP. Essa extensão expõem funções de mais alto nível, mas ela pode ser um fator limitante se for preciso trabalhar com características de mais baixo nível, mas isso não costuma ser um problema para a grande maioria das aplicações que implementam o uso de sockets.

Echo Server

O primeiro e mais acessível exemplo que podemos construir para entender como um servidor de socket opera é o de um “Echo Server”, ele basicamente envia de volta tudo o que recebe do cliente. Crie um novo diretório onde normalmente você guarda os seus projetos e então crie um arquivo index.php com a seguinte implementação:

<?php

declare(strict_types=1);

// Queremos que o PHP reporte apenas erros graves, estamos explicitamente ignorando warnings aqui.
// Warnings que, por sinal, acontecem bastante ao se trabalhar com sockets.
error_reporting(E_ERROR | E_PARSE);

// Inicia o servidor na porta 7181
$server = stream_socket_server('tcp://127.0.0.1:7181', $errno, $errstr);

// Em caso de falha, para por aqui.
if ($server === false) {
    fwrite(STDERR, "Error: $errno: $errstr");

    exit(1);
}

// Sucesso, servidor iniciado.
fwrite(STDERR, sprintf("Listening on: %s\n", stream_socket_get_name($server, false)));

// Looping infinito para "escutar" novas conexões
while (true) {
    // Aceita uma conexão ao nosso socket da porta 7181
    // O valor -1 seta um timeout infinito para a função receber novas conexões (socket accept timeout) e isso significa que a execução ficará bloqueada aqui até que uma conexão seja aceita;
    $connection = stream_socket_accept($server, -1, $clientAddress);

    // Se a conexão foi devidamente estabelecida, vamos interagir com ela.
    if ($connection) {
        fwrite(STDERR, "Client [{$clientAddress}] connected \n");

        // Lê 2048 bytes por vez (leitura por "chunks") enquanto o cliente enviar.
        // Quando os dados não forem mais enviados, fread() retorna false e isso é o que interrompe o loop.
        // fread() também retornará false quando o cliente interromper a conexão.
        while ($buffer = fread($connection, 2048)) {
            if ($buffer !== '') {
                // Escreve na conexão do cliente
                fwrite($connection, "Server says: $buffer");
            }
        }

        // Fecha a conexão com o cliente
        fclose($connection);
    }
}

Na raiz do projeto, utilizando o terminal, execute o servidor:

$ php index.php

O resultado será algo como:

kennedytedesco@kennedy-pc ~/D/w/sockets&gt; php index.php
Listening on: 127.0.0.1:7181

Como o próprio output explicita, agora temos um servidor rodando na porta 7181 do host local e ele está pronto para receber uma conexão de um cliente.

Para conectar ao servidor aberto, vou utilizar o cliente Telnet disponível no meu sistema operacional, bastando executar:

$ telnet 127.0.0.1 7181

Tendo sucesso na conexão:

kennedytedesco@kennedy-pc ~&gt; telnet 127.0.0.1 7181
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

E o servidor imediatamente imprime qual cliente se conectou:

kennedytedesco@kennedy-pc ~/D/w/sockets&gt; php index.php
Listening on: 127.0.0.1:7181
Client [127.0.0.1:52357] connected

A porta do cliente é conhecida pelo servidor no momento que esse se conecta a ele. A cada nova conexão uma porta aleatória é associada ao cliente.

A partir desse momento, o cliente pode interagir com o servidor, enviando mensagens, por exemplo:

kennedytedesco@kennedy-pc ~&gt; telnet 127.0.0.1 7181
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello World!
Server says: Hello World!

O cliente enviou “Hello World” e recebeu como resposta “Server says: Hello World!“, essa é a dinâmica do nosso servidor, ele envia de volta tudo o que recebe. Apertando as teclas de atalho CTRL + C no terminal onde o servidor está rodando, você consegue pará-lo e, consequentemente, isso também desconecta o cliente.

Essa é a dinâmica da interação entre o cliente e o servidor:

O nosso servidor opera da seguinte maneira:

  • 1) Ele abre um socket, que na prática diz ao sistema operacional que todos os dados recebidos por esse socket devem ser enviados para o nosso programa;
  • 2) Ele passa a “ouvir” por conexões de clientes;
  • 3) Ele aceita uma conexão quando ela é estabelecida. Nesse momento, a comunicação entre cliente-servidor é possível;
  • 4) Ele troca dados com o cliente;
  • 5) Ele fecha a conexão ou deixa que o cliente a finalize;
  • 6) Quando uma conexão é finalizada, ele volta para o passo 2) e todo esse ciclo é executado enquanto novas conexões existirem e/ou enquanto o servidor estiver em execução.
PHP Intermediário
Curso de PHP Intermediário
CONHEÇA O CURSO

Voltando ao nosso exemplo, definimos um timeout de -1 na stream_socket_accept e, conforme explicado no código, isso fará o fluxo da execução ficar bloqueado nessa parte até que uma nova conexão seja aceita/estabelecida. No entanto, podemos alterar o exemplo e definir um timeout de, por exemplo, 1 segundo. É que o vamos fazer. Os comentários definidos anteriormente foram suprimidos, pra melhorar a legibilidade:

<?php

declare(strict_types=1);

error_reporting(E_ERROR | E_PARSE);

$server = stream_socket_server('tcp://127.0.0.1:7181', $errno, $errstr);

if ($server === false) {
    fwrite(STDERR, "Error: $errno: $errstr");

    exit(1);
}

fwrite(STDERR, sprintf("Listening on: %s\n", stream_socket_get_name($server, false)));

while (true) {
    $connection = stream_socket_accept($server, 1, $clientAddress);

    if ($connection) {
        fwrite(STDERR, "Client [{$clientAddress}] connected \n");

        while ($buffer = fread($connection, 2048)) {
            if ($buffer !== '') {
                fwrite($connection, "Server says: $buffer");
            }
        }

        fclose($connection);
    } else {
        fwrite(STDERR, "Aguardando ... \n");
    }
}

Nessa alteração, definimos um timeout de 1 segundo e adicionamos esse else na verificação se a conexão foi aceita:

} else {
  fwrite(STDERR, "Aguardando ... \n");
}

Dessa forma, ao iniciar o servidor, a cada 1 segundo ele vai imprimir “Aguardando …” até que uma conexão seja aceita:

kennedytedesco@kennedy-pc ~/D/w/sockets&gt; php index.php
Listening on: 127.0.0.1:7181
Aguardando ...
Aguardando ...
Aguardando ...
Aguardando ...
Aguardando ...
Aguardando ...
Client [127.0.0.1:54215] connected

A ideia por trás de definir um timeout não negativo para a stream_socket_accept() é a de liberar o fluxo da execução do programa ali dentro do while, para que ele possa desempenhar outras tarefas que julgar necessárias.

Aceitando múltiplos clientes

Você deve ter percebido que o nosso servidor só aceita um cliente por vez. Se você tentar abrir um terceiro terminal e se conectar a ele, verá que a conexão não será devidamente estabelecida se outra estiver aberta. Podemos resolver isso de duas formas:

  • 1) Abrir múltiplos processos “ouvindo” o mesmo socket (mas isso só vai funcionar bem em sistemas baseados em Unix, além disso, nem sempre é possível sair forkando processos, sem contar o custo disso pro sistema operacional que é alto (uso de memória e trocas de contexto), sendo mais efetivo trabalhar com threads ou coroutines);
  • 2) Manter um único processo (single-thread) mas receber todas as conexões e monitorar a mudança de estado delas para então decidir se vamos ler e/ou escrever;

Vamos implementar a segunda opção, o “segredo” dela está no uso da função stream_select() que executa uma chamada de sistema select que monitora um ou mais recursos (resources) por mudanças, dessa forma, conseguimos alternar entre diferentes recursos de forma não bloqueante. Esse que é um conceito intimamente relacionado à programação assíncrona. Recomendo, aproveitando o gatilho, a leitura do artigo Introdução à programação assíncrona em PHP usando o ReactPHP.

A implementação do nosso Echo Server que aceita múltiplas conexões de forma não bloqueante:

<?php

declare(strict_types=1);

error_reporting(E_ERROR | E_PARSE);

final class EchoSocketServer
{
    private $except;
    private $server;

    private $buffers = [];
    private $writable = [];
    private $readable = [];
    private $connections = [];

    public function __construct(string $uri)
    {
        $this->server = stream_socket_server($uri);
        stream_set_blocking($this->server, false);

        if ($this->server === false) {
            exit(1);
        }
    }

    public function run() : void
    {
        while (true) {
            $this->except = null;
            $this->writable = $this->connections;
            $this->readable = $this->connections;

            // Adiciona a stream do servidor no array de streams de somente leitura,
            // para que consigamos aceitar novas conexões quando disponíveis;
            $this->readable[] = $this->server;

            // Em um looping infinito, a stream_select() retornará quantas streams foram modificadas,
            // a partir disso iteramos sobre elas (tanto as de escrita quanto de leitura), lendo ou escrevendo.
            // A stream_select() recebe os arrays por referência e ela os zera (remove seus itens) até que uma stream muda de estado,
            // quando isso acontece, a stream_select() volta com essa stream para o array, é nesse momento que conseguimos iterar escrevendo ou lendo.
            if (stream_select($this->readable, $this->writable, $this->except, 0, 0) > 0) {
                $this->readFromStreams();
                $this->writeToStreams();
                $this->release();
            }
        }
    }

    private function readFromStreams() : void
    {
        foreach ($this->readable as $stream) {
            // Se essa $stream é a do servidor, então uma nova conexão precisa ser aceita;
            if ($stream === $this->server) {
                $this->acceptConnection($stream);

                continue;
            }

            // Uma stream é um resource, tipo especial do PHP,
            // quando aplicamos um casting de inteiro nela, obtemos o id desse resource;
            $key = (int) $stream;

            // Armazena no nosso array de buffer os dados recebidos;
            if (isset($this->buffers[$key])) {
                $this->buffers[$key] .= fread($stream, 4096);
            } else {
                $this->buffers[$key] = '';
            }
        }
    }

    private function writeToStreams() : void
    {
        foreach ($this->writable as $stream) {
            $key = (int) $stream;
            $buffer = $this->buffers[$key] ?? null;

            if ($buffer && $buffer !== '') {
                // Escreve no cliente o que foi recebido;
                $bytesWritten = fwrite($stream, "Server says: {$this->buffers[$key]}", 2048);

                // Imediatamente remove do buffer a parte que foi escrita;
                $this->buffers[$key] = substr($this->buffers[$key], $bytesWritten);
            }
        }
    }

    private function release() : void
    {
        foreach ($this->connections as $key => $connection) {
            // Quando uma conexão é fechada, ela entra no modo EOF (end-of-file),
            // usamos a feof() pra verificar esse estado e então devidamente executar fclose().
            if (feof($connection)) {
                fwrite(STDERR, sprintf("Client [%s] closed the connection; \n", stream_socket_get_name($connection, true)));

                fclose($connection);
                unset($this->connections[$key]);
            }
        }
    }

    private function acceptConnection($stream) : void
    {
        $connection = stream_socket_accept($stream, 0, $clientAddress);

        if ($connection) {
            stream_set_blocking($connection, false);
            $this->connections[(int) $connection] = $connection;

            fwrite(STDERR, sprintf("Client [%s] connected; \n", $clientAddress));
        }
    }
}

$server = new EchoSocketServer('tcp://127.0.0.1:7181');
$server->run();

Você pode testar agora abrir várias conexões ao servidor que ele responderá corretamente para cada uma delas. A implementação em sua essência não difere tanto da primeira que fizemos, com a diferença que agora estamos armazenamento as streams em arrays, controlando os buffers e monitorando as mudanças das streams pela stream_select().

Recomendação de leitura

Se você quer aprender programação assíncrona no PHP usando o ReactPHP, recomendo a leitura:

PHP Avançado
Curso de PHP Avançado
CONHEÇA O CURSO

Concluindo

A última implementação que fizemos não foi muito trivial e ela também nem é a melhor opção para resolver o nosso problema. Uma opção melhor e que prefiro é usar programação assíncrona com o padrão Reactor (o mesmo usado por NodeJS e ReactPHP), torna o trabalho com sockets bem mais organizado. Para isso, temos bons recursos no ecossistema PHP, como:

Além disso, se você precisa trabalhar com sockets usando a extensão ext-socket (aquela de mais baixo nível que precisa estar previamente compilada), a library php-socket-raw fornece uma interface orientada a objetos que abstrai grande parte da dificuldade.

Até a próxima!

Promises no ReactPHP

No último artigo – Introdução à programação assíncrona em PHP usando o ReactPHP – tivemos uma introdução à programação assíncrona com PHP.

No artigo de hoje vamos comentar sobre Promises, que no ReactPHP trata-se de um componente. Promises no ReactPHP são uma implementação da API CommonJS Promises/A. Se você trabalha com JavaScript, vai notar muita similaridade com o que veremos a seguir.

PHP Básico
Curso de PHP Básico
CONHEÇA O CURSO

Afinal, o que é uma Promise?

Uma Promise é uma abstração que encapsula um resultado de uma execução assíncrona para ser utilizado quando ele estiver disponível. Promises desempenham a mesma tarefa que os Callbacks dos quais já estamos acostumados, só que de maneira bem mais elegante (principalmente quando lidamos com múltiplos callbacks de forma encadeada).

Dois conceitos importantes que temos que conhecer: Promise e Deferred. Já vimos que uma Promise representa o resultado de um código assíncrono, já Deferred representa a computação que vai gerar esse resultado em algum momento futuro. Um objeto Deferred sempre vai ter uma Promise associada que vai representar (encapsular) o resultado. Usar um objeto Deferred é uma forma de separar a Promise de quem vai resolvê-la ou rejeitá-la (em algum momento futuro).

Uma Promise possui três estados possíveis:

  • unfulfilled: É o estado inicial, pois o valor retornado de Deferred ainda é desconhecido.
  • fulfilled: Esse estado indica que a Promise está devidamente “alimentada” do resultado da computação de Deferred.
  • failed: Houve uma exceção durante a execução do Deferred (o executor pode ter chamado reject()).

Já um objeto Deferred possui dois métodos para mudar o estado da sua Promise:

  • resolve(string $result): Quando o código é executado com sucesso, esse método altera o estado da Promise para fulfilled (enviando para ela o resultado que ela encapsulára);
  • reject(string $reason): A execução do código falhou, altera o estado da Promise para failed.

Uma Promise também possui métodos para que definamos handlers (callbacks a serem executados) para as mudanças de estados dela, são esses: then(), done(), otherwise() e always().

Antes de iniciarmos, certifique-se de instalar o componente React Promise no seu projeto:

$ composer require react/promise

Vamos então ao primeiro exemplo do uso de um objeto Deferred e de sua Promise:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();
$promise = $deferred->promise();

$promise->done(function ($data) {
    echo 'Resultado: ' . $data;
});

$deferred->resolve('Olá mundo!'); // Resultado: Olá mundo!

Criamos um objeto Deferred e definimos um handler para o método done() da Promise associada a ele. Por fim, resolvemos a operação usando o método resolve(), método este que “altera” o estado da Promise (para fulfilled) e fez o nosso handler (a função anônima que definimos no primeiro argumento do método done()) ser executado.

No caso de o objeto Deferred explicitamente rejeitar uma operação alterando o estado da Promise para failed, podemos usar o segundo argumento do método done() para definir um handler a ser executado em caso de rejeição:

O método done() aceita como argumento handlers tanto para o estado fulfilled quanto para o failed:

public function done(callable $onFulfilled = null, callable $onRejected = null)

No primeiro argumento informamos o handler que será executado quando o estado da Promise mudar para fulfilled, no segundo argumento, um handler para ser executado quando o estado da Promise for alterado para failed.

Por exemplo, vamos informar um handler pro segundo argumento da Promise e então vamos executar o método reject() do Deferred:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();
$promise = $deferred->promise();

$promise->done(function ($data) {
    echo 'Resultado: ' . $data;
}, function($reason) {
    echo 'Motivo da falha: ' . $reason;
});

$deferred->reject('Erro interno'); // Motivo da falha: Erro interno

Portanto, temos a seguinte relação:

Promises podem ser encadeadas, ou seja, o valor de uma promise resolvida pode ser encaminhado para a próxima da sequência. Isso pode ser atingido usando os seguintes métodos: then() e otherwise(). O método then() é parecido com o done(), com a diferença que ele retorna uma nova promise, enquanto o done() sempre retorna null. E o método otherwise() é uma forma de definir um handler para quando o estado da Promise for failed (e ele retorna uma nova Promise).

Poderíamos, então, trocar o done() por then() no exemplo anterior:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();
$promise = $deferred->promise();

$promise->then(function ($data) {
    echo 'Resultado: ' . $data;
}, function($reason) {
    echo 'Motivo da falha: ' . $reason;
});

$deferred->reject('Erro interno'); // Motivo da falha: Erro interno

Ou, uma vez que then() retorna uma nova Promise, poderíamos usar o otherwise() de forma encadeada:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();
$promise = $deferred->promise();

$promise
    ->then(function ($data) {
        echo 'Resultado: ' . $data;
    })
    ->otherwise(function($reason) {
        echo 'Motivo da falha: ' . $reason;
    });

$deferred->reject('Erro interno'); // Motivo da falha: Erro interno

Outro exemplo com o encadeamento de Promises:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();

$deferred->promise()
    ->then(function ($data) {
        return "Olá, {$data}";
    })->then(function($data) {
        return "{$data}Web";
    })->then(function($data) {
        return strtoupper($data);
    })->then(function($data) {
        echo "{$data}!";
    });

$deferred->resolve('Treina'); // OLÁ, TREINAWEB!

Na programação assíncrona faz muito sentido uma Promise retornar outra Promise, pois não temos o resultado da operação de imediato (quando envolve I/O), o que temos é uma “promessa” de que em algum momento ele poderá vir a estar disponível. Nesse sentido, then() é um mecanismo para aplicar uma transformação em uma Promise e gerar uma nova Promise a partir dessa transformação.

PHP Intermediário
Curso de PHP Intermediário
CONHEÇA O CURSO

Outro exemplo de encadeamento de then() com otherwise():

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();

$deferred->promise()
    ->then(function ($data) {
        return "Olá, {$data}";
    })->then(function($data) {
        throw new InvalidArgumentException("{$data}Web");
    })->otherwise(function(InvalidArgumentException $exception) {
        return strtoupper($exception->getMessage());
    })->done(function($data) {
        echo "{$data}!";
    });

$deferred->resolve('Treina'); // OLÁ, TREINAWEB!

Nesse exemplo o nosso handler em otherwise() só vai ser invocado se ele receber uma exceção do tipo InvalidArgumentException (e fizemos ele receber).

Outro exemplo:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();

$deferred->promise()
    ->then(function ($data) {
        return "Olá, {$data}";
    })->then(function($data) {
        throw new RuntimeException("{$data}Web");
    })->otherwise(function(InvalidArgumentException $exception) {
        return strtoupper($exception->getMessage());
    })->otherwise(function(Exception $exception) {
        return strtolower($exception->getMessage());
    })->done(function($data) {
        echo "{$data}!";
    });

$deferred->resolve('Treina'); // olá, treinaweb!

Nesse exemplo, como a promise lança uma exceção do tipo RuntimeException, apenas o handler que está esperando por uma exceção genérica (do tipo Exception) será invocado:

...
})->otherwise(function(Exception $exception) {
  return strtolower($exception->getMessage());
})
...

E, claro, se nenhuma promise passar pelo estado failed os nossos handlers que lidam com as falhas não serão invocados:

<?php

require './vendor/autoload.php';

use React\Promise\Deferred;

$deferred = new Deferred();

$deferred->promise()
    ->then(function ($data) {
        return "Olá, {$data}";
    })->then(function($data) {
        // throw new RuntimeException("{$data}Web");
        return "{$data}Web";
    })->otherwise(function(InvalidArgumentException $exception) {
        return strtoupper($exception->getMessage());
    })->otherwise(function(Exception $exception) {
        return strtolower($exception->getMessage());
    })->done(function($data) {
        echo "{$data}!";
    });

$deferred->resolve('Treina'); // Olá, TreinaWeb!

Agora que já entendemos o funcionamento do objeto Deferred e como as Promises funcionam, podemos verificar como seria resolver (ou rejeitar) uma Promise sem um objeto Deferred:

<?php

require './vendor/autoload.php';

use React\Promise\Promise;

$promise = new Promise(function(Closure $resolve, Closure $reject) {
    if (\random_int(1, 1000000) % 2 === 0) {
        $resolve('Gerou um número par.');
    } else {
        $reject('Gerou um número ímpar.');
    }
});

$promise->then(function($data) {
    echo 'Sucesso: ' . $data;
})->otherwise(function($reason) {
    echo 'Falha: ' . $reason;
});

Nesse exemplo instanciamos um objeto Promise passando para ele uma função anônima que vai cuidar da computação que precisamos realizar. Apenas para fins didáticos, estamos gerando um número inteiro aleatório e então verificamos se ele é par, se verdadeiro, resolvemos a Promise, caso contrário, a rejeitamos. Nisso que passamos a função anônima no construtor da classe Promise, ela é invocada e recebe duas novas funções anônimas como argumento: $resolve e $reject. São essas funções que invocamos no término da nossa operação para decidir o estado da Promise, se vai ser fulfilled ou failed. Por fim, apenas definimos handlers que serão executados em caso de sucesso ou falha e, para isso, usamos os métodos then() e otherwise().

Observe que a diferença dessa abordagem (de instanciar um objeto Promise diretamente) para a que usa um objeto Deferred, é que nessa última invertemos o controle de quem resolve a Promise, ou seja, essa responsabilidade fica com o objeto Deferred.

Agora que já vimos o essencial do funcionamento das promises, podemos fazer um paralelo de como seria um código que usa callbacks versus um que usa promises (usando JavaScript como linguagem de referência):

request('http://www.treinaweb.com.br', function (error, response) {
    if (error) {
        // Aqui a gente lida com o erro.
    } else {
        request('http://www.treinaweb.com.br/' + response.path, function (error, response) {
            if (serror) {
                // Aqui a gente lida com o erro.
            } else {
                // Aqui lidaríamos com o sucesso da requisição.
            }
        });
    }
});

Nesse modelo estamos sempre aninhando callbacks uns dentro dos outros, o que nos torna suscetíveis ao mal do callback hell. O mesmo exemplo acima usando a biblioteca axios que trabalha com Promises ficaria assim:

axios.get('http://www.treinaweb.com.br')
    .then(function (response) {
        return axios.get('http://www.treinaweb.com.br/' + response.path);
    })
    .then(function (response) {
        // Lida com a resposta da requisição anterior
    })
    .catch(function (error) {
        // Lida com alguma exceção, se existir.
    });

No artigo de introdução ao ReactPHP mostramos um exemplo que usava a library reactphp-buzz. Veremos outro exemplo com ela, pois ela faz uso intensivo de promises. Primeiro instale-a como dependência do projeto:

$ composer require clue/buzz-react:^2.6

O nosso exemplo vai de forma assíncrona imprimir o bairro de alguns CEPs usando a API pública do Postmon:

<?php

require './vendor/autoload.php';

use Clue\React\Buzz\Browser;
use React\EventLoop\Factory;
use Psr\Http\Message\ResponseInterface;

$browser = new Browser(
    $loop = Factory::create()
);

$ceps = [
    '01311200', // Bela Vista
    '70630904', // Setor Militar Urbano
    '70165900', // Zona Cívico-Administrativa
    '32685888', // Erro, cep não existe.
];

foreach ($ceps as $cep) {
    $browser->get("https://api.postmon.com.br/v1/cep/{$cep}")
        ->then(function (ResponseInterface $response) {
            $endereco = \json_decode($response->getBody());

            echo $endereco->bairro . PHP_EOL;
        })
        ->otherwise(function (\Exception $exception) use ($cep) {
            echo 'Erro no CEP: ' . $cep . PHP_EOL;
        });
}

$loop->run();

Esse exemplo demonstra que o método get() que realiza uma requisição HTTP retorna uma Promise. Se você executar esse exemplo várias vezes, verá que não terá uma ordem definida para a impressão dos resultados, ademais, um resultado só é impresso quando ele fica pronto, não seguindo uma ordem linear (que é o que estamos acostumados na programação síncrona).

PHP Avançado
Curso de PHP Avançado
CONHEÇA O CURSO

Para finalizar, é importante pontuar que Promises por si só não fazem nossos códigos serem assíncronos, elas são apenas mecanismos para encapsular os resultados.

Até a próxima!

Introdução à programação assíncrona em PHP usando o ReactPHP

Antes de entrarmos no comparativo do modelo síncrono versus assíncrono, veremos uma introdução, o essencial, sobre como uma requisição funciona em uma aplicação PHP tradicional.

A maior parte das aplicações escritas em PHP funcionam no clássico modelo de requisição e resposta de curto tempo de vida. Uma requisição é feita, o código é interpretado e depois compilado, a execução é realizada, dados são retornados e tudo é descarregado da memória na sequência, tudo acontece de forma isolada, sem compartilhar contexto. De forma simplificada, esse é o ciclo de vida da execução de um script no PHP a cada nova requisição feita.

Você pode estar imaginando que isso é muito custoso, ter sempre que passar pela interpretação e compilação a cada nova requisição. Você está certo. Mas o PHP implementa mecanismos que otimizam esse processo, para não ter que interpretar e compilar o código o tempo todo. O PHP interpreta os códigos e os compila (de forma implícita, ou seja, quando ele julga necessário) para bytecodes (uma versão intermediária de código) e coloca isso em memória compartilhada quando ele percebe que aquela parte é muito requisitada/utilizada (tarefa da extensão nativa OPCache). Além disso, o PHP implementa outros mecanismos (de mais baixo nível) de otimização da execução desse código intermediário.

De qualquer forma, mesmo com os mecanismos de otimização, a essência do modelo de requisição e resposta se mantém a mesma. O diagrama abaixo exemplifica como funciona esse ciclo de execução:

Esse diagrama encurtou propositalmente uma etapa, a que passa pelo PHP-FPM, um gerenciador de processos, muito usado junto ao Nginx (servidor web), pois poderíamos ter um artigo só para falar sobre ele. A ideia aqui é entender o básico de como a requisição passa pelo servidor web e depois é retornada para o cliente. O PHP-FPM dispõe de pools de processos, ele cria, controla e encerra, de acordo com a demanda (o que o Nginx está encaminhando pra ele) e capacidade do hardware para tal (memória, principalmente).

Modelo síncrono

Num ambiente síncrono (tradicional) as instruções (partes) do programa são executadas uma por uma e apenas uma por vez, de forma sequencial:

<?php

echo "Hello ";

sleep(4); // Espera 4 segundos

echo "World";

Esse script vai demorar 4 segundos para ser executado e finalizado. A execução acontece linha por linha, de forma bloqueante. Se uma instrução precisa aguardar algum tempo (seja para ler algo do disco ou fazer alguma operação na rede), isso terá de ser concluído para que a próxima instrução seja executada e até mesmo para que ela use os dados previamente recuperados/preparados. Ou seja, parte-se da premissa de que a instrução anterior precisa ter sido concluída com sucesso (sem erros) para que uma nova seja executada (dependência).

Esse modelo funciona muito bem pra operações que usam mais CPU que I/O, pois a resolução de uma operação na CPU é muito mais eficiente do que uma operação que envolva I/O (uma requisição na rede, a leitura de um arquivo, esse tipo de operação é de alta latência).

Só que pense no seguinte problema: você decidiu implementar uma tarefa que precisa verificar se os links de um site estão todos online. No modelo síncrono, teríamos que partir da primeira requisição, aguardar o resultado dela (momento de ociosidade do programa) e então partir para próxima seguindo o mesmo fluxo até a última (sempre de forma sequencial e uma só iniciando após a finalização da outra).

Mas, não seria melhor ao invés de esperarmos a primeira requisição ser finalizada já inicializarmos as outras requisições e depois de um tempo voltar para pegar os resultados produzidos por elas? Pois bem, essa é a ideia central do modelo assíncrono, ele minimiza a ociosidade do programa alternando entre as tarefas. Num código assíncrono as tarefas são intercaladas sem precisar envolver novas threads, ou seja, de forma single-thread (como funciona o PHP e NodeJS, por exemplo).

Modelo assíncrono

Um código assíncrono lida com dependências e ordem de execução de eventos, ou seja, lida basicamente com tempo. É comum associar assincronismo com paralelismo, pois o assincronismo dá essa sensação que muita coisa está sendo executada no mesmo instante de tempo, no entanto, ao invés disso, no assincronismo muita coisa é feita ao mesmo tempo (concorrentemente) só que uma coisa por vez, nunca no mesmo instante de tempo (o fluxo de execução alterna entre as tarefas). Não existe paralelismo num código assíncrono, ou seja, um código assíncrono não tem suas tarefas distribuídas em múltiplas unidades de processamento, igual comentamos anteriormente, é single-thread (apesar de ser possível atingir paralelismo com assincronismo num ambiente multi-thread, mas foge do escopo do nosso artigo e normalmente necessita de algum caso de uso bem específico, devido às dificuldades técnicas de se implementar e sincronizar a comunicação).

Um código assíncrono continua executando uma tarefa por vez, ele apenas não fica preso em ociosidade enquanto uma tarefa ainda está aguardando algum resultado de I/O, por exemplo. Ao invés de ficar “bloqueado” aguardando, ele alterna de tarefa, inicia outros trabalhos e volta nas outras tarefas em um tempo futuro quando elas estiverem prontas. Fazendo uma analogia, vamos supor que você tem uma tarefa que precisa fazer duas requisições na internet, o seu código assíncrono vai lidar dessa forma:

“Faça essa primeira requisição, mas não vou ficar aqui esperando o resultado, me avise quando tudo estiver pronto. Enquanto isso, deixa eu executar a segunda requisição aqui.”

Enquanto no código síncrono seria:

“Faça essa primeira requisição. Eu terei que ficar esperando essa resposta, pois necessito dela para continuar o meu fluxo de trabalho. [ … algum tempo depois …] Obrigado pela resposta, agora, por gentileza, execute essa segunda requisição? Ficarei aqui aguardando o resultado dela. [… algum tempo depois …] Obrigado pela resposta. Agora posso concluir meu trabalho.”

Se você desenvolve um código síncrono para resolver uma operação matemática e porta esse código para um modelo assíncrono, você vai notar que ambos serão executados praticamente no mesmo tempo, sem nenhum levar vantagem sobre o outro. Agora, a história muda completamente se o seu problema precisa realizar alguma operação I/O (que naturalmente é bloqueante) ou quando ele precisa aguardar algum tempo por alguma coisa, nesse tipo de caso, o modelo assíncrono leva muita vantagem, como mostra esse diagrama:

Veja que nesse diagrama as tarefas alternam entre si, o modelo assíncrono tenta sempre evitar ociosidade/espera/bloqueio. Ele só fica bloqueado/aguardando quando nenhuma tarefa pode fazer nenhum progresso, aí ele precisa receber alguma chamada para voltar à sua operação.

A abordagem assíncrona não é a solução para todos os problemas, mas em comparação com o modelo síncrono, ela performa melhor principalmente nos seguintes cenários:

  • Quando o programa contém tarefas que fazem uso intensivo de I/O;
  • Quando o programa contém tarefas independentes, ou seja, quando umas não precisam esperar pelas outras para realizar seus trabalhos (e essas passam por algum estado de progresso em suas atividades).

Quando não faz tanto sentido:

  • Uma aplicação que faz um uso intensivo da CPU em que as operações são dependentes, ou seja, uma precisa ser finalizada para que a outra entre em cena;
  • Uma aplicação que realiza grandes operações de I/O mas que o uso da aplicação em si é infrequente e não há necessidade de escalar;

Levando para exemplos do “mundo real” você vai ver com frequência o uso de programação assíncrona para:

  • Uma API em que o usuário faz uma requisição e precisa de uma resposta rápida sem que precise esperar alguma operação ser finalizada (essa operação pode continuar rodando lá no servidor enquanto o usuário já obteve a resposta dele). Nesse sentido, a interface do usuário não fica congelada esperando uma resposta de uma operação que ele não precisa esperar por ela.
  • Data Streaming (dá pra construir, por exemplo, até um servidor de streaming de vídeo);
  • Aplicação de monitoramento;
  • Criação de chats;
  • Etc;

Apesar de termos dado exemplos clássicos aqui, é perfeitamente possível integrar código assíncrono numa aplicação tradicional (de abordagem síncrona) se você perceber que em determinado momento requisições externas precisam ser feitas ou alguma operação importante que envolva I/O, você pode estudar a possibilidade de implementar um código assíncrono nessa parte para obter o benefício da não ociosidade e melhorar o tempo de resposta do seu usuário. Não existem “regras estritas” aqui, você vai precisar avaliar caso a caso e decidir o que achar melhor. Mas, certamente, é mais comum ver scripts que rodam em linha de comando (CLI) utilizarem a abordagem assíncrona.

PHP Assíncrono

O PHP não dispõe (mas há a intenção de se implementar isso em algum momento da versão 8 do PHP) de mecanismos nativos para lidar com código assíncrono, diferente do JavaScript e C#, por exemplo. Por isso bibliotecas como Amp e ReactPHP se tornaram relevantes, pois elas abstraem isso. Nesse artigo introdutório, usaremos os componentes do ReactPHP.

O ReactPHP é baseado no padrão Reactor (o mesmo usado pelo NodeJS) que é uma implementação de uma arquitetura orientada a eventos (event-driven). A ideia é permitir que iniciemos múltiplas operações I/O sem que precisemos esperar pela finalização delas (não bloqueante). Ao invés disso, somos notificados quando algo importante acontecer e reagimos a esse evento com um callback (se você programa em JavaScript certamente já está familiarizado com isso).

O ReactPHP possui uma série de componentes independentes e o principal deles, que é o seu core é o EventLoop, ele é a base para o funcionamento de todos os outros componentes que o ReactPHP disponibiliza. O componente EventLoop é uma implementação padrão Reactor.

O Event Loop é basicamente um while infinito que faz o papel de ser o Scheduler das operações. Ele sequencialmente processa a fila de eventos e cuida da execução dos callbacks. Ele é o único código sendo executado sincronamente, nenhum outro código é executado em paralelo. E, como já dissemos anteriormente, ele roda em uma única thread. O fato do seu processador ter 16 núcleos ou 1, em nada vai interferir, a execução do Event Loop continuará sendo single-thread. A ideia por trás do ReactPHP é fazer um bom uso do tempo da CPU (sem cair na ociosidade com as operações de I/O) e não exatamente em paralelizar processos (o que demandaria diversos outros problemas de comunicação, troca de estados, trocas de contexto por parte do sistema operacional além, claro, de recursos de hardware).

O funcionamento é mais ou menos assim:

  • Você registra um evento;
  • Você passa a “ouví-lo” (listening);
  • Quando esse evento é disparado, você reage a ele via um handler e executa algum código.

Diagrama simplificado de funcionamento de um Event Loop:

ReactPHP

O ReactPHP possui quatro implementações possíveis de Event Loop e ele por padrão escolhe qual usar a partir da análise das extensões instaladas no seu ambiente PHP.

As implementações são:

  • StreamSelectLoop – Essa implementação funciona nativamente no PHP sem precisar de nenhuma extensão específica, ela executa chamadas de sistema select que resolvem a implementação do event loop, mesmo que não na performance das opções que serão mostradas abaixo.
  • LibEventLoop – Essa opção usa a extensão libevent do repositório pecl.
  • LibEvLoop – Usa a extensão libev. Funciona de forma similar à libevent citada acima.
  • ExtEventLoop – Usa a extensão event. Funciona de forma similar à libevent citada anteriormente. Essa é a minha extensão de escolha, por ser a mais atualizada e um dos desenvolvedores dela também trabalha no core do PHP. Você pode ver mais detalhes sobre ela clicando aqui.

Mas, calma lá! Não é escopo nosso, por enquanto, se preocupar com tudo isso. Se você vai desenvolver uma aplicação para produção que vai usar ReactPHP, ótimo, eu lhe recomendaria muitíssimamente instalar a extensão event. Mas, para o nosso objetivo didático, vamos deixar que o próprio ReactPHP escolha a melhor implementação pra gente, baseando-se no que temos instalado em nosso ambiente. Se não tivermos nenhuma das três últimas extensões, ele vai usar a primeira implementação, a **StreamSelectLoop **(standalone).

Ele possui uma factory que decide qual das implementações acima será usada:

$loop = React\EventLoop\Factory::create();

As soluções escritas usando o ReactPHP não dependem de nada específico de nenhuma das implementações acima, ou seja, não importa qual a implementação será usada, o ReactPHP se comportará da mesma maneira, as interfaces são as mesmas para todas. Isso nos dá a liberdade de não nos preocuparmos em instalar uma extensão para desenvolvermos alguns testes.

Timers

Timer são úteis para executar um determinado código em um momento futuro. Funcionam da mesma forma que setTimeout() and setInterval() do JavaScript.

Por exemplo, o Event Loop dispõe do método addPeriodicTimer() que faz com que o callback informado seja executado repetidamente a cada determinado intervalo de tempo.

Vamos criar o nosso primeiro exemplo? Tudo o que você precisa fazer é criar um diretório no local onde normalmente você escreve suas aplicações. Dentro dessa pasta, crie um arquivo composer.json com o seguinte conteúdo:

{
    "require": {
        "react/event-loop": "^1.1"
    }
}

Pelo terminal, acesse esse diretório e execute:

$ composer install

Por fim, no mesmo diretório, crie um arquivo index.php com o seguinte conteúdo:

<?php

require './vendor/autoload.php';

$loop = React\EventLoop\Factory::create();

$loop->addPeriodicTimer(1, static function () {
    static $count;

    if (null === $count) {
        $count = 0;
    }

    echo $count++ . PHP_EOL;
});

$loop->run();

// Output:

// 0
// 1
// 2
// 3
// 4
// 5
// ...

Para executar o exemplo:

$ php index.php

E você verá o resultado no seu terminal. A cada um segundo o callback (a função anônima que definimos no segundo argumento de addPeriodicTimer() é executada).

Outro método é o addTimer():

<?php

require './vendor/autoload.php';

$loop = React\EventLoop\Factory::create();

$loop->addTimer(2, static function () {
    echo 'World';
});

echo 'Hello ';
$loop->run();

// Output: Hello World

Nesse caso, o callback será executado uma única vez, num tempo futuro (em dois segundos).

Esse exemplo é, de certa forma, parecido com esse, escrito em JavasCript:

setTimeout(function () { 
  console.log('World');
}, 2);

console.log('Hello ');

Ambos os exemplos mostram que não estão seguindo o fluxo síncrono de execução, ademais, uma parte do código foi programada para ser executada em outro momento e isso não bloqueou a linha de execução.

Streams

A documentação do PHP define Streams como uma forma de generalizar arquivo, rede e outras operações que compartilham um conjunto comum de funções e usos. Em outras palavras, streams representam coleções de dados que podem não estar completamente disponíveis de imediato e também não possuem a limitação de ter que caber na memória, isso faz com que streams sejam uma ferramenta poderosa para lidar com grandes quantidades de dados que podem ser obtidas por partes (chunks). Grande parte do que é feito no PHP é sobre streams. Ao ler um arquivo, lidamos com streams. Ao retornar um output para o cliente, lidamos com streams. Ao obter dados de uma conexão TCP/IP, também estamos lidando com streams.

Temos três tipos de Streams:

  • Readable – Esse tipo permite ler (apenas leitura) os dados de uma fonte;
  • Writable – Esse tipo permite escrever (apenas escrita) dados em uma fonte;
  • Duplex (Readable e Writable ao mesmo tempo) – Esse tipo permite ler e/ou escrever (ambos) dados, como é o caso do protocolo TCP/IP (full-duplex).

Por exemplo, vamos supor que você precise avaliar linha a linha um arquivo de log que possui 1GB, se fizer assim:

$log = file_get_content("error.log")

O PHP tentará carregar o arquivo inteiro na memória (e enquanto não for carregado, nada mais pode ser executado, bloqueante por natureza), o que fatalmente acarretará em um erro e a execução do script será interrompida.

Usando a interface Readable Resource Stream do ReactPHP atingimos esse objetivo de forma não bloqueante, performática e com o mínimo uso de memória.

Vamos testar isso na prática? Criaremos um arquivo (na raiz do projeto) com os 10 milhões de números, um por linha, se você usa um sistema baseado em Unix, consegue atingir esse objetivo executando:

$ awk 'BEGIN { n = 1; while (n < 10000000) print (n++) }' > numeros.txt

No arquivo index.php, execute:

$content = file_get_contents('numeros.txt');

echo 'Memória utilizada: ' . (memory_get_peak_usage(true)/1024/1024);

// Memória utilizada: 77.24

O PHP tentará alocar cerca de ~78MB na memória. Se você tentar limitar o consumo de memória pelo script, terá um estouro:

<?php

ini_set('memory_limit', '12M');

$content = file_get_contents('numeros.txt');

// PHP Fatal error: Allowed memory size of 12582912 bytes exhausted (tried to allocate 78897112 bytes)

Agora vamos usar a classe ReadableResourceStrea do ReactPHP:

<?php

require './vendor/autoload.php';

use React\Stream\ReadableResourceStream;

$loop = React\EventLoop\Factory::create();

$stream = new ReadableResourceStream(
    fopen('numeros.txt', 'rb'), $loop
);

$stream->on('data', function ($chunk) {
    // echo "$chunk\n";
});

$stream->on('end', function () {
    echo 'Memória utilizada: ' . (memory_get_peak_usage(true)/1024/1024);
});

$loop->run();

// Memória utilizada: 2

O pico de consumo de memória ficou em 2MB (assim que a informação fica disponível no buffer, já a utilizamos, liberando-o). Poderíamos processar aí um arquivo bem maior, de dezenas ou centenas de gigabytes.

Veja que nesse exemplo implementamos dois eventos: data e end. No data recebemos os chunks (partes) do arquivo que está sendo lido. Em end executamos um callback quando o processo é finalizado.

Nesse exemplo usamos fopen(), função nativa do PHP (que trabalha com streams), mas em uma aplicação verdadeiramente assíncrona, ao invés disso, devemos usar o componente Filesystem do ReacPHP pois, se tiver uma disputa na leitura do arquivo, a aplicação pode ficar congelada (ler qualquer coisa do sistema de arquivos é bloqueante por natureza). Com esse componente, teríamos algo como:

<?php

require './vendor/autoload.php';

use React\Filesystem\Filesystem;

$loop = React\EventLoop\Factory::create();
$filesystem = Filesystem::create($loop);

$filesystem->file('numeros.txt')->open('rb')->then(function($stream) {
    $stream->on('data', function ($chunk) {
        // echo "$chunk\n";
    });

    $stream->on('end', function () {
        //
    });
});

$loop->run();

Ah, para rodar esse exemplo é necessário que você instale o componente no seu projeto:

$ composer require react/filesystem

Se você já usou promises no JavaScript deve ter notado o método then() ali em cima. O conceito é o mesmo. O método open() retorna uma promise e no método then() executamos um callback quando ela (a “promessa”) é cumprida.

Lembra do exemplo que citamos lá no começo do artigo sobre uma tarefa que verifica se os links de um site estão online? Pois bem, ela poderia ser implementada usando a library reactphp-buzz, pois ela abstrai todo o essencial para se fazer requisições HTTP assíncronas.

Um protótipo de como isso poderia ser implementado de forma síncrona:

<?php

$time_start = microtime(true);

function urlsFromHtml(string $html) : array
{
    $dom = new DOMDocument();

    libxml_use_internal_errors(true);
    $dom->loadHTML($html);
    libxml_use_internal_errors(false);

    $urls = [];

    foreach ($dom->getElementsByTagName('a') as $node) {
        $urls[] = $node->getAttribute('href');
    }

    return $urls;
}

function getUrlStatusCode(string $url) : int
{
    $curl = curl_init();

    curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($curl, CURLOPT_CUSTOMREQUEST, 'HEAD');
    curl_setopt($curl, CURLOPT_HEADER, 1);
    curl_setopt($curl, CURLOPT_NOBODY, true);
    curl_setopt($curl, CURLOPT_URL, $url);

    curl_exec($curl);
    $code = curl_getinfo($curl, CURLINFO_HTTP_CODE);
    curl_close($curl);

    return $code;
}

function getUrlContent(string $url)
{
    $curl = curl_init($url);

    curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
    $html = curl_exec($curl);
    curl_close($curl);

    return $html;
}

$urls = urlsFromHtml(
    getUrlContent('https://www.globo.com')
);

foreach ($urls as $url) {
    $status = getUrlStatusCode($url) === 200 ? ' [online]' : ' [offline]';

    echo "{$url} -> {$status} \n";
}

echo 'Tempo total de execução: ' . round(microtime(true) - $time_start);

Dessa forma demora cerca de 150 segundos para “pingar” todas as URLS extraídas. Agora, a mesma implementação usando ReactPHP e e a library reactphp-buzz:

Primeiro instale a dependência dela no projeto:

$ composer require clue/buzz-react:^2.6
<?php

require './vendor/autoload.php';

use Psr\Http\Message\ResponseInterface;

$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

function urlsFromHtml(string $html) : array
{
    $dom = new DOMDocument();

    libxml_use_internal_errors(true);
    $dom->loadHTML($html);
    libxml_use_internal_errors(false);

    $urls = [];

    foreach ($dom->getElementsByTagName('a') as $node) {
        $urls[] = $node->getAttribute('href');
    }

    return $urls;
}

$browser->get('https://www.globo.com')->then(function (ResponseInterface $response) use ($loop, $browser) {
    $urls = urlsFromHtml($response->getBody());
    foreach ($urls as $url) {
        $browser->head($url)->then(function (ResponseInterface $response) use ($url) {
            $status = $response->getStatusCode() === 200 ? ' [online]' : ' [offline]';

            echo "{$url} -> {$status} \n";
        });
    }
});

$time_start = microtime(true);

$loop->run();

echo 'Tempo total de execução: ' . round(microtime(true) - $time_start);

Já de forma assíncrona custou apenas 18 segundos. Lembrando que esse é apenas um exemplo para comparar a diferença entre os dois modelos.

Recomendação de leitura: Promises no ReactPHP

O outro artigo da série foi publicado, ele trata o uso de Promises com o ReactPHP. Você pode acessá-lo clicando aqui.

Concluindo

Esse foi um artigo introdutório sobre programação assíncrona e ReactPHP. Deu pra notar como é poderoso manipular streams, muitas possibilidades são abertas. Tem muito mais o que podemos explorar como Ticks, Promises, trabalhar com sistema de arquivos, trabalhar com websockets, usar funcionalidades do sistema operacional através de processos filhos, entre muitas outras coisas. Existem diversos projetos opensource desenvolvidos em cima do ReactPHP para atingir objetivos diversos, conforme você pode ver no site oficial.

© 2004 - 2019 TreinaWeb Tecnologia LTDA - CNPJ: 06.156.637/0001-58 Av. Paulista, 1765, Conj 71 e 72 - Bela Vista - São Paulo - SP - 01311-200