Compartilhar via


Integrar o Azure Functions ao Azure Data Explorer usando associações de entrada e saída (versão prévia)

Importante

Este conector pode ser usado em Real-Time Intelligence no Microsoft Fabric. Use as instruções incluídas neste artigo com as seguintes exceções:

O Azure Functions permite que você execute código sem servidor na nuvem em um agendamento ou em resposta a um evento. Usando associações de entrada e saída do Azure Data Explorer para o Azure Functions, você pode integrar o Azure Data Explorer aos fluxos de trabalho para ingerir dados e executar consultas em seu cluster.

Pré-requisitos

Experimente a integração usando o projeto de exemplo.

Como usar associações do Azure Data Explorer para o Azure Functions

Para obter informações sobre como usar associações do Azure Data Explorer para o Azure Functions, consulte os seguintes artigos:

Cenários para usar associações do Azure Data Explorer para o Azure Functions

As seções a seguir descrevem alguns cenários comuns para usar associações do Azure Data Explorer para o Azure Functions.

Associações de entrada

As associações de entrada executam uma consulta KQL (Linguagem de Consulta Kusto) ou uma função KQL, opcionalmente com parâmetros, e retornam a saída para a função.

As seções a seguir descrevem como usar associações de entrada em alguns cenários comuns.

Cenário 1: um endpoint HTTP para consultar dados de um cluster

Use associações de entrada quando precisar expor dados do Azure Data Explorer por meio de uma API REST. Nesse cenário, você usa um gatilho HTTP do Azure Functions para consultar dados em seu cluster. O cenário é útil em situações em que você precisa fornecer acesso programático aos dados do Azure Data Explorer para aplicativos ou serviços externos. Expor seus dados por meio de uma API REST permite que os aplicativos consumam prontamente os dados sem exigir que eles se conectem diretamente ao cluster.

O código define uma função com um gatilho HTTP e uma associação de entrada do Azure Data Explorer. A associação de entrada especifica a consulta a ser executada na tabela Produtos no banco de dados productsdb. A função usa a coluna productId como o predicado passado como um parâmetro.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

Você invoca a função da seguinte maneira:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Cenário 2: um gatilho agendado para exportar dados de um cluster

O cenário a seguir é aplicável em situações em que os dados precisam ser exportados em um agendamento baseado em tempo.

O código define uma função com um gatilho de temporizador que exporta uma agregação de dados de vendas do banco de dados productsdb para um arquivo CSV no Armazenamento de Blobs do Azure.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Vinculações de saída

Os vínculos de saída pegam uma ou mais linhas e as inserem em uma tabela do Azure Data Explorer.

As seções a seguir descrevem como usar associações de saída em alguns cenários comuns.

Cenário 1: endpoint HTTP para capturar dados em um cluster

Use esse cenário quando precisar processar solicitações HTTP de entrada e ingerir os dados em seu cluster. Usando uma associação de saída, você pode gravar dados de entrada da solicitação em tabelas do Azure Data Explorer.

O código define uma função com um gatilho HTTP e uma associação de saída do Azure Data Explorer. Essa função usa uma carga JSON no corpo da solicitação HTTP e a grava na tabela de produtos no banco de dados productsdb.

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

Você invoca a função da seguinte maneira:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Cenário 2: Ingerir dados do RabbitMQ ou de outros sistemas de mensagens com suporte no Azure

Use esse cenário quando precisar ingerir dados de um sistema de mensagens em seu cluster. Usando uma associação de saída, você pode ingerir dados de entrada do sistema de mensagens em tabelas do Azure Data Explorer.

O código define uma função com um gatilho RabbitMQ. A função ingere mensagens, dados no formato JSON, na tabela products no banco de dados productsdb.

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Para obter mais informações sobre as funções, consulte a Documentação do Azure Functions. A extensão do Azure Data Explorer está disponível em: