Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Aplica-se a:SQL Server
SSIS Integration Runtime em Azure Data Factory
O Microsoft SQL Server Integration Services oferece aos programadores a capacidade de escrever componentes de destino personalizados que podem ligar-se e armazenar dados em qualquer fonte de dados personalizada. Componentes de destino personalizados são úteis quando precisa de se ligar a fontes de dados que não podem ser acedidas usando um dos componentes de origem existentes incluídos nos Serviços de Integração.
Os componentes de destino têm uma ou mais entradas e zero saídas. No momento do design, criam e configuram ligações e leem metadados das colunas da fonte de dados externa. Durante a execução, ligam-se à sua fonte de dados externa e adicionam linhas que são recebidas dos componentes a montante no fluxo de dados para a fonte de dados externa. Se a fonte de dados externa existir antes da execução do componente, o componente de destino deve também garantir que os tipos de dados das colunas que o componente recebe correspondem aos tipos de dados das colunas da fonte externa.
Esta secção discute os detalhes de como desenvolver componentes de destino e fornece exemplos de código para clarificar conceitos importantes. Para uma visão geral do desenvolvimento de componentes de fluxo de dados, consulte Desenvolvimento de um Componente de Fluxo de Dados Personalizado.
Tempo de estruturação
Implementar a funcionalidade de design time de um componente de destino envolve especificar uma ligação a uma fonte de dados externa e validar que o componente foi corretamente configurado. Por definição, um componente de destino tem uma entrada e possivelmente uma saída de erro.
Criação do componente
Os componentes de destino ligam-se a fontes de dados externas através de ConnectionManager objetos definidos num pacote. O componente de destino indica a sua necessidade de um gestor de ligação ao Designer SSIS, e aos utilizadores do componente, adicionando um elemento à RuntimeConnectionCollection coleção do ComponentMetaData. Esta coleção serve dois propósitos: primeiro, anuncia a necessidade de um gestor de ligação ao SSIS Designer; Depois, depois de o utilizador ter selecionado ou criado um gestor de conexões, este mantém uma referência ao gestor de conexões no pacote que está a ser usado pelo componente. Quando um IDTSRuntimeConnection100 é adicionado à coleção, o Editor Avançado mostra o separador Propriedades de Ligação , para pedir ao utilizador que selecione ou crie uma ligação no pacote para utilização pelo componente.
O exemplo de código seguinte mostra uma implementação de ProvideComponentProperties que adiciona uma entrada, e depois adiciona um IDTSRuntimeConnection100 objeto ao RuntimeConnectionCollection.
using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]
public class DestinationComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.net";
}
}
}
Imports System
Imports System.Data
Imports Microsoft.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime
Namespace Microsoft.Samples.SqlServer.Dts
<DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _
Public Class DestinationComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Reset the component.
Me.RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()
input.Name = "Input"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.net"
End Sub
End Class
End Namespace
Ligação a uma fonte de dados externa
Depois de adicionar uma ligação ao RuntimeConnectionCollection, sobrepõe-se o AcquireConnections método para estabelecer uma ligação à fonte de dados externa. Este método é chamado em tempo de design e em tempo de execução. O componente deve estabelecer uma ligação ao gestor de ligação especificado pela ligação em tempo de execução e, subsequentemente, à fonte de dados externa. Uma vez estabelecida, o componente deve armazenar a ligação internamente em cache e libertá-la quando ReleaseConnections for chamada. Os programadores sobrepõem este método e libertam a ligação estabelecida pelo componente durante AcquireConnections. Ambos estes métodos, ReleaseConnections e AcquireConnections, são chamados em tempo de projeto e em tempo de execução.
O exemplo de código seguinte mostra um componente que se liga a uma ligação ADO.NET no AcquireConnections método, e depois fecha a ligação em ReleaseConnections.
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then
Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
Validação do componente
Os desenvolvedores de componentes de destino devem realizar a validação conforme descrito na Validação de Componentes. Além disso, devem verificar se as propriedades do tipo de dado das colunas definidas na coleção de colunas de entrada do componente correspondem às colunas da fonte de dados externa. Por vezes, verificar as colunas de entrada contra a fonte de dados externa pode ser impossível ou indesejável, como quando o componente ou o SSIS Designer está desconectado, ou quando viagens de ida e volta ao servidor não são aceitáveis. Nestas situações, as colunas na coleção de colunas de entrada ainda podem ser validadas usando o ExternalMetadataColumnCollection do objeto de entrada.
Esta coleção existe tanto em objetos de entrada como de saída e deve ser preenchida pelo desenvolvedor de componentes a partir das colunas na fonte de dados externa. Esta coleção pode ser usada para validar as colunas de entrada quando o SSIS Designer está offline, quando o componente está desligado ou quando a ValidateExternalMetadata propriedade é falsa.
O código de exemplo seguinte adiciona uma coluna de metadados externa baseada numa coluna de entrada existente.
private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();
externalColumn.Name = inputColumn.Name;
externalColumn.Precision = inputColumn.Precision;
externalColumn.Length = inputColumn.Length;
externalColumn.DataType = inputColumn.DataType;
externalColumn.Scale = inputColumn.Scale;
// Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()
externalColumn.Name = inputColumn.Name
externalColumn.Precision = inputColumn.Precision
externalColumn.Length = inputColumn.Length
externalColumn.DataType = inputColumn.DataType
externalColumn.Scale = inputColumn.Scale
' Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
Tempo de execução
Durante a execução, o componente de destino recebe uma chamada ao ProcessInput método sempre que um full PipelineBuffer está disponível do componente a montante. Este método é chamado repetidamente até que não haja mais buffers disponíveis e a EndOfRowset propriedade seja verdadeira. Durante este método, os componentes de destino leem colunas e linhas no buffer e adicionam-nas à fonte de dados externa.
Localização das colunas no buffer
O buffer de entrada de um componente contém todas as colunas definidas nas coleções de colunas de saída dos componentes a montante do componente no fluxo de dados. Por exemplo, se um componente de origem fornecer três colunas na sua saída, e o componente seguinte adicionar uma coluna de saída adicional, o buffer fornecido ao componente de destino contém quatro colunas, mesmo que o componente de destino escreva apenas duas colunas.
A ordem das colunas no buffer de entrada não é definida pelo índice da coluna na coleção de colunas de entrada do componente de destino. As colunas podem ser localizadas de forma fiável numa linha de buffer apenas usando o FindColumnByLineageID método do BufferManager. Este método localiza a coluna que tem o ID de linhagem especificado no buffer especificado e devolve a sua localização na linha. Os índices das colunas de entrada são tipicamente localizados durante o PreExecute método e armazenados em cache pelo desenvolvedor para uso posterior durante ProcessInputo período .
O exemplo de código seguinte encontra a localização das colunas de entrada no buffer durante PreExecute e armazena-as num array. O array é posteriormente usado durante ProcessInput a leitura dos valores das colunas no buffer.
int[] cols;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
cols = new int[input.InputColumnCollection.Count];
for (int x = 0; x < input.InputColumnCollection.Count; x++)
{
cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);
}
}
Private cols As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim cols(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)
Next x
End Sub
Linhas de processamento
Uma vez localizadas as colunas de entrada no buffer, podem ser lidas e escritas na fonte de dados externa.
Enquanto o componente de destino escreve linhas na fonte de dados externa, pode querer atualizar os contadores de desempenho "Rows read" ou "BLOB bytes read" chamando o IncrementPipelinePerfCounter método. Para mais informações, consulte Contadores de Desempenho.
O exemplo seguinte mostra um componente que lê linhas do buffer fornecido em ProcessInput. Os índices das colunas no buffer estavam localizados durante PreExecute no exemplo de código anterior.
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
foreach (int col in cols)
{
if (!buffer.IsNull(col))
{
// TODO: Read the column data.
}
}
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While (buffer.NextRow())
For Each col As Integer In cols
If buffer.IsNull(col) = False Then
' TODO: Read the column data.
End If
Next col
End While
End Sub
Exemplo
O exemplo seguinte mostra um componente de destino simples que utiliza um gestor de ligações de ficheiros para guardar dados binários do fluxo de dados em ficheiros. Este exemplo não demonstra todos os métodos e funcionalidades discutidos neste tópico. Demonstra os métodos importantes que cada componente de destino personalizado deve sobrepor, mas não contém código para validação em tempo de design.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace BlobDst
{
[DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]
public class BlobDst : PipelineComponent
{
string m_DestDir;
int m_FileNameColumnIndex = -1;
int m_BlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "BLOB Extractor Destination Input";
input.HasSideEffects = true;
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);
if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')
m_DestDir += "\\";
}
public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)
{
IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);
IDTSCustomProperty100 custProp;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsFileName";
custProp.Value = (object)false;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsBLOB";
custProp.Value = (object)false;
return inputColumn;
}
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;
IDTSCustomProperty100 custProp;
foreach (IDTSInputColumn100 column in inputColumns)
{
custProp = column.CustomPropertyCollection["IsFileName"];
if ((bool)custProp.Value == true)
{
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
custProp = column.CustomPropertyCollection["IsBLOB"];
if ((bool)custProp.Value == true)
{
m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
string strFileName = buffer.GetString(m_FileNameColumnIndex);
int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);
byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);
strFileName = TranslateFileName(strFileName);
// Make sure directory exists before creating file.
FileInfo fi = new FileInfo(strFileName);
if (!fi.Directory.Exists)
fi.Directory.Create();
// Write the data to the file.
FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);
fs.Write(blobData, 0, blobLength);
fs.Close();
}
}
private string TranslateFileName(string fileName)
{
if (fileName.Length > 2 && fileName[1] == ':')
return m_DestDir + fileName.Substring(3, fileName.Length - 3);
else
return m_DestDir + fileName;
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Namespace BlobDst
<DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _
Public Class BlobDst
Inherits PipelineComponent
Private m_DestDir As String
Private m_FileNameColumnIndex As Integer = -1
Private m_BlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
input.Name = "BLOB Extractor Destination Input"
input.HasSideEffects = True
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)
If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then
m_DestDir += "\"
End If
End Sub
Public Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100
Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)
Dim custProp As IDTSCustomProperty100
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsFileName"
custProp.Value = CType(False, Object)
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsBLOB"
custProp.Value = CType(False, Object)
Return inputColumn
End Function
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection
Dim custProp As IDTSCustomProperty100
For Each column As IDTSInputColumn100 In inputColumns
custProp = column.CustomPropertyCollection("IsFileName")
If CType(custProp.Value, Boolean) = True Then
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
custProp = column.CustomPropertyCollection("IsBLOB")
If CType(custProp.Value, Boolean) = True Then
m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
Next
End Sub
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While buffer.NextRow
Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)
Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)
Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)
strFileName = TranslateFileName(strFileName)
Dim fi As FileInfo = New FileInfo(strFileName)
' Make sure directory exists before creating file.
If Not fi.Directory.Exists Then
fi.Directory.Create
End If
' Write the data to the file.
Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)
fs.Write(blobData, 0, blobLength)
fs.Close
End While
End Sub
Private Function TranslateFileName(ByVal fileName As String) As String
If fileName.Length > 2 AndAlso fileName(1) = ":"C Then
Return m_DestDir + fileName.Substring(3, fileName.Length - 3)
Else
Return m_DestDir + fileName
End If
End Function
End Class
End Namespace
Conteúdo relacionado
Desenvolvimento de um Componente Fonte Personalizado
Criação de um Destino com o Componente Script