Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
APLICA-SE A:
Azure Data Factory
Azure Synapse Analytics
Dica
Data Factory no Microsoft Fabric é a próxima geração de Azure Data Factory, com uma arquitetura mais simples, IA interna e novos recursos. Se você não estiver familiarizado com a integração de dados, comece com Fabric Data Factory. As cargas de trabalho existentes do ADF podem ser atualizadas para Fabric para acessar novos recursos em ciência de dados, análise em tempo real e relatórios.
Use a atividade Fluxo de Dados para transformar e mover dados por meio de fluxos de dados de mapeamento. Se você não estiver familiarizado com fluxos de dados, consulte Visão geral do Fluxo de Dados
Criar uma atividade de Fluxo de Dados com a interface do usuário
Para usar uma atividade Fluxo de Dados em um pipeline, conclua as seguintes etapas:
Pesquise por Fluxo de Dados no painel Atividades do pipeline e arraste uma atividade Fluxo de Dados para a área de trabalho do pipeline.
Selecione a nova atividade de Fluxo de Dados na tela se ela ainda não estiver selecionada, e a guia de Configurações para editar seus detalhes.
A chave do ponto de verificação é usada para definir o ponto de verificação quando o fluxo de dados é usado para captura de dados de alterações. Você pode substituí-la. As atividades de fluxo de dados usam um valor de guid como chave do ponto de verificação em vez de "nome do pipeline + nome da atividade", para que sempre possam continuar acompanhando o estado da captura de dados de alterações do cliente, mesmo que haja ações de renomeação. Todas as atividades de fluxo de dados existentes usam a chave de padrão antiga para fins de compatibilidade com versões anteriores. A opção de chave de ponto de verificação depois de publicar uma nova atividade de fluxo de dados com o recurso de fluxo de dados habilitado para captura de dados de alterações é mostrada abaixo.
Selecione um fluxo de dados existente ou crie um novo usando o botão Novo. Selecione outras opções conforme necessário para concluir sua configuração.
Sintaxe
{
"name": "MyDataFlowActivity",
"type": "ExecuteDataFlow",
"typeProperties": {
"dataflow": {
"referenceName": "MyDataFlow",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": true,
"staging": {
"linkedService": {
"referenceName": "MyStagingLinkedService",
"type": "LinkedServiceReference"
},
"folderPath": "my-container/my-folder"
},
"integrationRuntime": {
"referenceName": "MyDataFlowIntegrationRuntime",
"type": "IntegrationRuntimeReference"
}
}
Propriedades de tipo
| Propriedade | Descrição | Valores permitidos | Obrigatório |
|---|---|---|---|
| fluxo de dados | A referência ao Fluxo de Dados que está sendo executado | DataFlowReference | Sim |
| integrationRuntime | O ambiente de computação em que o fluxo de dados é executado. Se não for especificado, o autoresolve Azure integration runtime será usado. | IntegrationRuntimeReference | Não |
| compute.coreCount | O número de núcleos usados no cluster do Spark. Só poderá ser especificado se o runtime de integração do Azure de resolução automática for utilizado | 8, 16, 32, 48, 80, 144, 272 | Não |
| compute.computeType | O tipo de computação utilizada no cluster do Spark. Só poderá ser especificado se o runtime de integração do Azure de resolução automática for utilizado | "Geral" | Não |
| staging.linkedService | Se você estiver usando um coletor ou uma origem do Azure Synapse Analytics, especifique a conta de armazenamento utilizada para o processo de preparo do PolyBase. Se o Armazenamento do Azure estiver configurado com o ponto de extremidade de serviço de VNet, você precisará usar a autenticação de identidade gerenciada com a opção “permitir serviço confiável da Microsoft” habilitada na conta de armazenamento. Confira Impacto de usar pontos de extremidade de serviço de VNet com Armazenamento do Azure. Aprenda também as configurações necessárias para Azure Blob e Azure Data Lake Storage Gen2 respectivamente. |
LinkedServiceReference | Somente se o fluxo de dados lê ou grava em um Azure Synapse Analytics |
| staging.folderPath | Se você estiver usando um coletor ou uma origem do Azure Synapse Analytics, o caminho da pasta na conta de armazenamento de blobs utilizada para o processo de preparo do PolyBase | String | Somente se o fluxo de dados ler ou gravar no Azure Synapse Analytics |
| traceLevel | Defina o nível de registro em log da execução da atividade de fluxo de dados | Fino, grosso, nenhum | Não |
Dimensionar dinamicamente a computação de fluxo de dados em runtime
As propriedades Core Count e Compute Type podem ser definidas dinamicamente para ajustar o tamanho de seus dados de origem de entrada em runtime. Use atividades de pipeline como Lookup ou Get Metadata para localizar o tamanho dos dados do conjunto de dados de origem. Em seguida, use Adicionar Conteúdo Dinâmico nas propriedades da atividade Fluxo de Dados. Você pode escolher tamanhos de computação pequenos, médios ou grandes. Opcionalmente, escolha "Personalizado" e configure os tipos de computação e o número de núcleos manualmente.
Este é um breve tutorial em vídeo explicando essa técnica
Tempo de execução de integração do fluxo de dados
Escolha qual Integration Runtime usar para a execução da atividade Fluxo de Dados. Por padrão, o serviço usa o runtime de integração de Azure autoresolve com quatro núcleos de trabalho. Esse IR tem um tipo de computação de uso geral e é executado na mesma região que a sua instância de serviço. Para pipelines operacionalizados, é altamente recomendável criar seu próprio Azure Integration Runtime que define regiões específicas, tipo de computação, número de núcleos e TTL para a execução da atividade de fluxo de dados.
Um tipo de computação mínimo de Uso Geral com uma configuração de 8 + 8 (16 núcleos virtuais no total), e uma TTL (vida útil) de dez minutos é a recomendação mínima para a maioria das cargas de trabalho de produção. Ao definir um pequeno TTL, o Azure IR pode manter um cluster quente que não incorrerá em vários minutos de tempo de início para um cluster frio. Para obter mais informações, consulte Azure integration runtime.
Importante
A seleção do Integration Runtime na atividade Fluxo de Dados se aplica somente às execuções disparadas do pipeline. A depuração do pipeline com fluxos de dados é executada no cluster especificado na sessão de depuração.
PolyBase
Se você estiver usando um Azure Synapse Analytics como um coletor ou origem, deverá escolher um local de preparo para o carregamento em lotes do PolyBase. O PolyBase permite o carregamento em lotes em massa em vez de carregar os dados linha por linha. O PolyBase reduz drasticamente o tempo de carregamento em Azure Synapse Analytics.
Chave do ponto de verificação
Ao usar a opção de captura de alterações para fontes de fluxo de dados, o ADF mantém e gerencia o ponto de verificação para você automaticamente. A chave de ponto de verificação padrão é um hash do nome do fluxo de dados e do nome do pipeline. Se você estiver usando um padrão dinâmico para suas tabelas ou pastas de origem, talvez queira substituir esse hash e definir seu valor de chave de ponto de verificação aqui.
Nível de log
Se não precisar de cada execução de pipeline das suas atividades de fluxo de dados para registrar completamente todos os logs de telemetria detalhados, você pode definir seu nível de registros em log como "Básico" ou "Nenhum". Ao executar os fluxos de dados no modo “Detalhado” (padrão), você solicita que o serviço registre totalmente a atividade em cada nível de partição individual durante a transformação dos dados. Isso pode ser uma operação dispendiosa. Portanto, habilitar o modo detalhado somente ao solucionar problemas pode melhorar o fluxo geral de dados e o desempenho do pipeline. O modo "Básico" só registra as durações de transformação e "Nenhum" fornece apenas um resumo das durações.
Propriedades do coletor
O recurso de agrupamento em fluxos de dados permite que você defina a ordem de execução de seus coletores, bem como dos coletores de grupo juntos usando o mesmo número de grupo. Para ajudar a gerenciar os grupos, você pode pedir que o serviço execute coletores no mesmo grupo em paralelo. Você também pode definir o grupo de coletores para continuar mesmo depois que um deles encontrar um erro.
O comportamento padrão dos coletores de fluxo de dados é executar cada coletor sequencialmente, em série, e fazer com que o fluxo de dados falhe quando um erro for encontrado no coletor. Além disso, todos os coletores são padronizados para o mesmo grupo, a menos que você acesse as propriedades de fluxo de dados e defina prioridades diferentes para os coletores.
Primeira linha apenas
Essa opção só está disponível para fluxos de dados que têm coletores de cache habilitados para "Saída para atividade". A saída do fluxo de dados que é injetada diretamente em seu pipeline é limitada a 2 MB. A configuração "primeira linha somente" ajuda a limitar a saída de dados do fluxo de dados ao injetar a saída da atividade de fluxo de dados diretamente em seu pipeline.
Parametrização de fluxos de dados
Conjuntos de dados parametrizados
Se o fluxo de dados usar conjuntos de dados parametrizados, defina os valores de parâmetro na guia Configurações.
Fluxos de dados parametrizados
Se o fluxo de dados for parametrizado, defina os valores dinâmicos dos parâmetros de fluxo de dados na guia Parâmetros. Você pode usar tanto a linguagem de expressão de pipeline como a linguagem de expressão do fluxo de dados para atribuir valores de parâmetros dinâmicos ou literais. Para obter mais informações, consulte Fluxo de Dados Parameters.
Propriedades de computação parametrizadas.
Você pode parametrizar a contagem de núcleos ou o tipo de computação se usar o tempo de execução de integração de Azure autoresolve e especificar valores para compute.coreCount e compute.computeType.
Depuração de pipeline da atividade de Fluxo de Dados
Para uma execução de pipeline de depuração com uma atividade de Fluxo de Dados, você deve alternar o modo de depuração de fluxo de dados por meio do controle deslizante Depuração de Fluxo de Dados na barra superior. O modo de depuração permite executar o fluxo de dados em um cluster ativo do Spark. Para saber mais, consulte Modo de depuração.
O pipeline de depuração é executado no cluster de depuração ativa, não no ambiente do runtime de integração especificado nas configurações de atividade do Fluxo de Dados. Você pode escolher o ambiente de computação de depuração ao iniciar o modo de depuração.
Monitorando a atividade de Fluxo de Dados
A atividade Fluxo de Dados tem uma experiência especial de monitoramento em que você pode exibir informações de particionamento, tempo de estágio e linhagem de dados. Abra o painel de monitoramento usando o ícone de óculos em Ações. Para saber mais, confira Monitoramento de fluxos de dados.
Usar os resultados da atividade de Fluxo de Dados em uma atividade subsequente
A atividade de fluxo de dados gera métricas referentes ao número de linhas gravadas em cada coletor e linhas lidas de cada fonte. Esses resultados são retornados na seção output do resultado da execução de atividade. As métricas retornadas estão no formato do JSON abaixo.
{
"runStatus": {
"metrics": {
"<your sink name1>": {
"rowsWritten": <number of rows written>,
"sinkProcessingTime": <sink processing time in ms>,
"sources": {
"<your source name1>": {
"rowsRead": <number of rows read>
},
"<your source name2>": {
"rowsRead": <number of rows read>
},
...
}
},
"<your sink name2>": {
...
},
...
}
}
}
Por exemplo, para obter o número de linhas gravadas em um coletor chamado 'sink1' em uma atividade denominada 'dataflowActivity', use @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.
Para obter o número de linhas lidas de uma fonte denominada 'source1' que foi utilizada nesse coletor, use @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.
Observação
Se um coletor tiver zero linhas gravadas, ele não aparecerá nas métricas. A existência pode ser verificada usando a função contains. Por exemplo, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') verifica se alguma linha foi gravada em sink1.
Conteúdo relacionado
Veja as atividades de fluxo de controle suportadas: