Configurar o buffer de dados e a persistência de disco para fluxos de dados

Quando um fluxo de dados envia mensagens para um ponto de extremidade de destino, como Hubs de Eventos do Azure, Microsoft Fabric Real-Time Intelligence, Kafka ou outro serviço de nuvem, o destino ou a rede pode ficar indisponível. Operações do Azure IoT usa o modelo de assinatura do agente MQTT local para armazenar mensagens em buffer e tentar novamente a entrega.

Este artigo explica como os fluxos de dados protegem mensagens durante interrupções de destino e como configurar o buffer do agente e a persistência de disco para uma proteção mais forte.

Como os dados armazenam mensagens em buffer durante interrupções no destino

Quando você utiliza o corretor MQTT local como um ponto de origem em um fluxo de dados, seja direta ou indiretamente através de ativos que publicam nele, o fluxo de dados recebe mensagens do corretor MQTT como assinante. O fluxo de dados reconhece cada mensagem de origem depois que a mensagem é processada e entregue com êxito no destino ou depois que a mensagem é descartada intencionalmente devido à filtragem, validação de esquema ou expiração da mensagem.

Se o ponto de extremidade de destino não estiver disponível, a entrega não poderá ser concluída. Nesse caso, o fluxo de dados não reconhece a mensagem de origem. O broker MQTT mantém a mensagem na fila do assinante e o fluxo de dados tenta a entrega novamente. Quando a conectividade é restaurada, o fluxo de dados envia mensagens enfileiradas para o destino e as reconhece após a entrega bem-sucedida.

O caminho da mensagem é: publicador ou ativo para agente MQTT, agente MQTT para fluxo de dados e fluxo de dados para o ponto de extremidade de destino.

  1. O agente MQTT entrega uma mensagem para o fluxo de dados.
  2. O fluxo de dados envia a mensagem para o destino final.
  3. Se o envio for bem-sucedido, o fluxo de dados reconhecerá a mensagem de origem e o agente a removerá da fila do assinante.
  4. Se o envio falhar, o fluxo de dados não reconhecerá a mensagem de origem. O corretor o mantém na fila.
  5. O fluxo de dados tenta a entrega novamente até que ela seja bem-sucedida, a mensagem expire ou um limite configurado se aplique.

Importante

O buffer de fluxo de dados está limitado. As mensagens enfileiradas estão sujeitas ao perfil de memória do agente MQTT, aos limites de fila do assinante, ao tamanho do buffer de mensagens com suporte em disco, à configuração de persistência e à expiração da mensagem ou da sessão. Defina essas configurações para a duração máxima da interrupção e a taxa de transferência que você precisa tolerar.

Camadas de configuração de proteção de dados

Use as camadas de configuração a seguir para controlar como as mensagens são armazenadas em buffer e protegidas durante interrupções de destino.

Camada Contra o que ele protege Comportamento padrão Ação do cliente
Repetição de fluxo de dados e confirmação retida Destino temporário ou interrupção de rede Integrado para o agente MQTT local e fontes suportadas por ativos Não é necessária nenhuma configuração
Fila de assinantes do agente MQTT Mensagens recebidas por uma assinatura de fluxo de dados, mas ainda não confirmadas Armazenado na memória Configurar perfil de memória e limites de fila de assinante
Buffer de mensagens com backup de disco Grandes pendências temporárias que excedem a memória disponível Disabled Configurar o intermediário na implantação com um buffer de mensagens respaldado por disco
Persistência do broker MQTT Reinicialização do broker ou do pod enquanto as mensagens estão na fila Desabilitadas por padrão Habilitar persistência do agente e persistência de fila de assinantes
Fluxo de dados requestDiskPersistence Solicitação por cada fluxo de dados para armazenamento persistente de fila de subscritor Disabled Habilitar requestDiskPersistence no fluxo de dados ou no grafo de fluxo de dados e habilitar a persistência de filas dinâmicas de assinantes no agente
Expiração de mensagem e sessão Comportamento de armazenamento e reprodução limitados Configurável Definir expiração e limites com base na sua tolerância a perdas e na janela de interrupção

A fila de assinantes do agente MQTT local é armazenada na memória por padrão. Você pode configurar o agente MQTT para usar o disco de duas maneiras diferentes:

  • Buffer de mensagens com backup em disco: utiliza o disco como um buffer auxiliar quando as filas crescem além da memória disponível. Essa configuração ajuda com pendências temporárias maiores, mas não é a mesma que a persistência durável entre as reinicializações do agente.
  • Persistência do broker MQTT: Persiste os dados do broker selecionado, incluindo filas de assinantes, para que as mensagens enfileiradas possam sobreviver a reinicializações ou perda de energia.

Para detalhes de configuração do broker, consulte:

Escolher uma configuração de buffer

Escolha uma configuração com base na duração da interrupção e nos requisitos de durabilidade para sua carga de trabalho.

  • Para interrupções de rede ou nuvem transitórias curtas, a fila de assinantes em memória, por padrão, pode ser suficiente.
  • Para maior taxa de transferência ou interrupções temporárias mais longas, configure o buffer de mensagens com backup de disco.
  • Para a reinicialização ou proteção contra perda de energia, habilite a persistência do broker MQTT e a persistência da fila do assinante e habilite requestDiskPersistence no fluxo de dados ou no grafo de fluxo de dados.
  • Para ambientes de armazenamento limitados, configure limites de fila de assinantes, expiração de mensagens e monitoramento para que o agente imponha limites de fila e remova ou rejeite mensagens de acordo com sua política.

Exemplo: Interrupção no destino

Suponha que você crie um fluxo de dados usando o agente MQTT local padrão como o ponto de extremidade de origem e Hubs de Eventos do Azure como o ponto de extremidade de destino. Se a conectividade entre o fluxo de dados e o Hubs de Eventos do Azure for perdida, o fluxo de dados tentará enviar novamente e não reconhecerá as mensagens de origem. O broker MQTT enfileira as mensagens não confirmadas. Com as configurações padrão, a fila é armazenada na memória. Com o buffer de mensagens armazenado em disco, a fila pode ser transferida para o disco. Com a persistência do broker e o fluxo de dados requestDiskPersistence, as mensagens enfileiradas podem sobreviver às reinicializações do broker, sujeitas aos limites de persistência, expiração e armazenamento configurados.

Habilitar a persistência de disco para um fluxo de dados

A persistência de disco permite que fluxos de dados e grafos de fluxo de dados mantenham o estado de processamento entre reinicializações. Quando você habilita esse recurso, o agente MQTT mantém dados, como mensagens na fila do assinante, em disco. Essa abordagem ajuda a garantir que a origem de dados do fluxo de dados não perca dados enfileirados durante interrupções de energia ou reinicializações do broker. O agente mantém um desempenho ideal porque a persistência é configurada por fluxo de dados, portanto, somente os fluxos de dados que precisam dele usam esse recurso.

O fluxo de dados solicita persistência durante a assinatura usando uma propriedade de usuário MQTTv5. Esse recurso só funciona quando:

  • O fluxo de dados usa o corretor ou ativo MQTT como fonte.
  • O broker MQTT tem a persistência habilitada, com o modo de persistência dinâmica definido para o tipo de dados Enabled, como filas de assinante.

Para obter detalhes sobre a configuração de persistência do agente MQTT, consulte Configurar a persistência do agente MQTT.

A configuração aceita Enabled ou Disabled. Disabled é o padrão.

Configurar um fluxo de dados

Ao criar ou editar um fluxo de dados, selecione Edit e, em seguida, selecione Sim ao lado de Solicitar persistência de dados.

Configurar para um grafo de fluxo de dados

Ao criar ou editar um grafo de fluxo de dados, selecione Editar e selecione Sim ao lado de Solicitar persistência de dados.