Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Aplica-se a:SQL Server
SSIS Integration Runtime em Azure Data Factory
O SQL Server Integration Services dá aos programadores a capacidade de escrever componentes de origem que podem ligar-se a fontes de dados personalizadas e fornecer dados dessas fontes para outros componentes numa tarefa de fluxo de dados. A capacidade de criar fontes personalizadas é valiosa quando se tem de se ligar a fontes de dados que não podem ser acedidas usando uma das fontes existentes dos Serviços de Integração.
Os componentes de origem têm uma ou mais saídas e zero entradas. No momento do design, os componentes de origem são usados para criar e configurar ligações, ler metadados de colunas da fonte de dados externa e configurar as colunas de saída da fonte com base na fonte de dados externa. Durante a execução, ligam-se à fonte de dados externa e adicionam linhas a um buffer de saída. A tarefa de fluxo de dados fornece então este buffer de linhas de dados para os componentes a jusante.
Para uma visão geral do desenvolvimento de componentes de fluxo de dados, consulte Desenvolvimento de um Componente de Fluxo de Dados Personalizado.
Tempo de estruturação
Implementar a funcionalidade de design time de um componente de origem envolve especificar uma ligação a uma fonte de dados externa, adicionar e configurar colunas de saída que reflitam a fonte de dados e validar que o componente está pronto para executar. Por definição, um componente fonte não tem entradas e uma ou mais saídas assíncronas.
Criação do componente
Os componentes de origem ligam-se a fontes de dados externas usando ConnectionManager objetos definidos num pacote. Indicam a sua necessidade de um gestor de conexões adicionando um elemento à RuntimeConnectionCollection coleção da ComponentMetaData propriedade. Esta coleção serve dois propósitos — conter referências a gestores de ligação no pacote usado pelo componente e anunciar a necessidade de um gestor de ligação ao designer. Quando um IDTSRuntimeConnection100 é adicionado à coleção, o Editor Avançado mostra o separador Propriedades de Ligação , que permite aos utilizadores selecionar ou criar uma ligação no pacote.
O seguinte exemplo de código mostra uma implementação de ProvideComponentProperties que adiciona uma saída e adiciona um IDTSRuntimeConnection100 objeto ao RuntimeConnectionCollection.
using System;
using System.Collections;
using System.Data;
using Microsoft.Data.SqlClient;
using System.Data.OleDb;
using Microsoft.SqlServer.Dts.Runtime;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "MySourceComponent",ComponentType = ComponentType.SourceAdapter)]
public class MyComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "Output";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.NET";
}
Imports System.Data
Imports Microsoft.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
<DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _
Public Class MySourceComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Allow for resetting the component.
RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
output.Name = "Output"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.NET"
End Sub
End Class
Ligação a uma fonte de dados externa
Depois de uma ligação ter sido adicionada ao RuntimeConnectionCollection, sobrepõe-se o AcquireConnections método para estabelecer uma ligação à fonte de dados externa. Este método é chamado tanto durante o tempo de conceção como de execução. O componente deve estabelecer uma ligação ao gestor de ligação especificado pela ligação em tempo de execução e, subsequentemente, à fonte de dados externa.
Depois de estabelecida a ligação, deve ser armazenada em cache internamente pelo componente e libertada quando o ReleaseConnections método for chamado. O ReleaseConnections método é chamado em tempo de conceção e execução, tal como o AcquireConnections método. Os programadores sobrepõem este método e libertam a ligação estabelecida pelo componente durante AcquireConnections.
O exemplo de código seguinte mostra um componente que se liga a uma ligação ADO.NET no AcquireConnections método e fecha a ligação no ReleaseConnections método.
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then
Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
Criação e configuração de colunas de saída
As colunas de saída de um componente de origem refletem as colunas da fonte de dados externa que o componente adiciona ao fluxo de dados durante a execução. No momento do design, adiciona-se colunas de saída depois de o componente ter sido configurado para se ligar a uma fonte de dados externa. O método de design-time que um componente usa para adicionar as colunas à sua coleção de saída pode variar consoante as necessidades do componente, embora não devam ser adicionadas durante Validate ou AcquireConnections. Por exemplo, um componente que contém uma instrução SQL numa propriedade personalizada que controla o conjunto de dados do componente pode adicionar as suas colunas de saída durante o SetComponentProperty método. O componente verifica se tem uma ligação em cache e, se tiver, liga-se à fonte de dados e gera as suas colunas de saída.
Depois de criada uma coluna de saída, defina as propriedades do seu tipo de dado chamando o SetDataTypeProperties método. Este método é necessário porque as DataTypepropriedades , Length, Precision, e CodePage são de apenas leitura e cada propriedade depende das definições da outra. Este método reforça a necessidade de estes valores serem definidos de forma consistente, e a tarefa de fluxo de dados valida que estão corretamente definidos.
O DataType de da coluna determina os valores que são definidos para as outras propriedades. A tabela seguinte mostra os requisitos sobre as propriedades dependentes para cada DataType. Os tipos de dados não listados têm as suas propriedades dependentes definidas a zero.
| Tipo de Dados | Length | Scale | Precisão | Página de código |
|---|---|---|---|---|
| DT_DECIMAL | 0 | Maior que 0 e menor ou igual a 28. | 0 | 0 |
| DT_CY | 0 | 0 | 0 | 0 |
| DT_NUMERIC | 0 | Maior que 0 e menor ou igual a 28, e menor que Precisão. | Maior ou igual a 1 e menor ou igual a 38. | 0 |
| DT_BYTES | Maior que 0. | 0 | 0 | 0 |
| DT_STR | Maior que 0 e inferior a 8000. | 0 | 0 | Não é 0, e tem uma página de código válida. |
| DT_WSTR | Superior a 0 e inferior a 4000. | 0 | 0 | 0 |
Como as restrições às propriedades dos tipos de dados baseiam-se no tipo de dados da coluna de saída, deve escolher o tipo de dado SSIS correto quando trabalha com tipos geridos. A classe base fornece três métodos auxiliares, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType, , e DataRecordTypeToBufferType, para ajudar os desenvolvedores de componentes geridos a selecionar um tipo de dado SSIS dado um tipo gerido. Estes métodos convertem tipos de dados geridos em tipos de dados SSIS, e vice-versa.
O exemplo de código seguinte mostra como a coleção de colunas de saída de um componente é povoada com base no esquema de uma tabela. Os métodos auxiliares da classe base são usados para definir o tipo de dados da coluna, e as propriedades dependentes são definidas com base no tipo de dado.
SqlCommand sqlCommand;
private void CreateColumnsFromDataTable()
{
// Get the output.
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
// Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll();
output.ExternalMetadataColumnCollection.RemoveAll();
this.sqlCommand = sqlConnection.CreateCommand();
this.sqlCommand.CommandType = CommandType.Text;
this.sqlCommand.CommandText = (string)ComponentMetaData.CustomPropertyCollection["SqlStatement"].Value;
SqlDataReader schemaReader = this.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly);
DataTable dataTable = schemaReader.GetSchemaTable();
// Walk the columns in the schema,
// and for each data column create an output column and an external metadata column.
foreach (DataRow row in dataTable.Rows)
{
IDTSOutputColumn100 outColumn = output.OutputColumnCollection.New();
IDTSExternalMetadataColumn100 exColumn = output.ExternalMetadataColumnCollection.New();
// Set column data type properties.
bool isLong = false;
DataType dt = DataRecordTypeToBufferType((Type)row["DataType"]);
dt = ConvertBufferDataTypeToFitManaged(dt, ref isLong);
int length = 0;
int precision = (short)row["NumericPrecision"];
int scale = (short)row["NumericScale"];
int codepage = dataTable.Locale.TextInfo.ANSICodePage;
switch (dt)
{
// The length cannot be zero, and the code page property must contain a valid code page.
case DataType.DT_STR:
case DataType.DT_TEXT:
length = precision;
precision = 0;
scale = 0;
break;
case DataType.DT_WSTR:
length = precision;
codepage = 0;
scale = 0;
precision = 0;
break;
case DataType.DT_BYTES:
precision = 0;
scale = 0;
codepage = 0;
break;
case DataType.DT_NUMERIC:
length = 0;
codepage = 0;
if (precision > 38)
precision = 38;
if (scale > 6)
scale = 6;
break;
case DataType.DT_DECIMAL:
length = 0;
precision = 0;
codepage = 0;
break;
default:
length = 0;
precision = 0;
codepage = 0;
scale = 0;
break;
}
// Set the properties of the output column.
outColumn.Name = (string)row["ColumnName"];
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage);
}
}
Private sqlCommand As SqlCommand
Private Sub CreateColumnsFromDataTable()
' Get the output.
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
' Start clean, and remove the columns from both collections.
output.OutputColumnCollection.RemoveAll()
output.ExternalMetadataColumnCollection.RemoveAll()
Me.sqlCommand = sqlConnection.CreateCommand()
Me.sqlCommand.CommandType = CommandType.Text
Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value)
Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly)
Dim dataTable As DataTable = schemaReader.GetSchemaTable()
' Walk the columns in the schema,
' and for each data column create an output column and an external metadata column.
For Each row As DataRow In dataTable.Rows
Dim outColumn As IDTSOutputColumn100 = output.OutputColumnCollection.New()
Dim exColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
' Set column data type properties.
Dim isLong As Boolean = False
Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type))
dt = ConvertBufferDataTypeToFitManaged(dt, isLong)
Dim length As Integer = 0
Dim precision As Integer = CType(row("NumericPrecision"), Short)
Dim scale As Integer = CType(row("NumericScale"), Short)
Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage
Select Case dt
' The length cannot be zero, and the code page property must contain a valid code page.
Case DataType.DT_STR
Case DataType.DT_TEXT
length = precision
precision = 0
scale = 0
Case DataType.DT_WSTR
length = precision
codepage = 0
scale = 0
precision = 0
Case DataType.DT_BYTES
precision = 0
scale = 0
codepage = 0
Case DataType.DT_NUMERIC
length = 0
codepage = 0
If precision > 38 Then
precision = 38
End If
If scale > 6 Then
scale = 6
End If
Case DataType.DT_DECIMAL
length = 0
precision = 0
codepage = 0
Case Else
length = 0
precision = 0
codepage = 0
scale = 0
End Select
' Set the properties of the output column.
outColumn.Name = CStr(row("ColumnName"))
outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage)
Next
End Sub
Validação do componente
Deve validar um componente de origem e verificar se as colunas definidas nas suas coleções de colunas de saída correspondem às colunas da fonte de dados externa. Por vezes, verificar as colunas de saída em relação à fonte de dados externa pode ser impossível, como num estado desligado, ou quando é preferível evitar longas viagens de ida e volta ao servidor. Nestas situações, as colunas na saída ainda podem ser validadas usando o ExternalMetadataColumnCollection do objeto de saída. Para mais informações, consulte Validação de um Componente de Fluxo de Dados.
Esta coleção existe tanto em objetos de entrada como de saída e pode preenchê-la com as colunas da fonte de dados externa. Pode usar esta coleção para validar as colunas de saída quando o SSIS Designer está offline, quando o componente está desligado ou quando a ValidateExternalMetadata propriedade é falsa. A coleção deve ser preenchida primeiro ao mesmo tempo que as colunas de saída são criadas. Adicionar colunas externas de metadados à coleção é relativamente fácil porque a coluna de metadados externa deve inicialmente corresponder à coluna de saída. As propriedades do tipo de dado da coluna já devem ter sido definidas corretamente, e as propriedades podem ser copiadas diretamente para o IDTSExternalMetadataColumn100 objeto.
O código de exemplo seguinte adiciona uma coluna externa de metadados baseada numa coluna de saída recém-criada. Assume que a coluna de saída já foi criada.
private void CreateExternalMetaDataColumn(IDTSOutput100 output, IDTSOutputColumn100 outputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = output.ExternalMetadataColumnCollection.New();
externalColumn.Name = outputColumn.Name;
externalColumn.Precision = outputColumn.Precision;
externalColumn.Length = outputColumn.Length;
externalColumn.DataType = outputColumn.DataType;
externalColumn.Scale = outputColumn.Scale;
// Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput100, ByVal outputColumn As IDTSOutputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New()
externalColumn.Name = outputColumn.Name
externalColumn.Precision = outputColumn.Precision
externalColumn.Length = outputColumn.Length
externalColumn.DataType = outputColumn.DataType
externalColumn.Scale = outputColumn.Scale
' Map the external column to the output column.
outputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
Tempo de execução
Durante a execução, os componentes adicionam linhas aos buffers de saída que são criados pela tarefa de fluxo de dados e fornecidos ao componente em PrimeOutput. Chamado uma vez para componentes fonte, o método recebe um buffer de saída para cada IDTSOutput100 componente que está ligado a um componente a jusante.
Localização das colunas no buffer
O buffer de saída de um componente contém as colunas definidas pelo componente e quaisquer colunas adicionadas à saída de um componente a jusante. Por exemplo, se um componente de origem fornece três colunas na sua saída, e o componente seguinte adiciona uma quarta coluna de saída, o buffer de saída fornecido para uso pelo componente de origem contém estas quatro colunas.
A ordem das colunas numa linha de buffer não é definida pelo índice da coluna de saída na coleção de colunas de saída. Uma coluna de saída só pode ser localizada com precisão numa linha de buffer usando o FindColumnByLineageID método do BufferManager. Este método localiza a coluna com o ID de linhagem especificado no buffer especificado e devolve a sua localização na linha. Os índices das colunas de saída estão tipicamente localizados no PreExecute método e armazenados para uso durante PrimeOutput.
O exemplo de código seguinte encontra a localização das colunas de saída no buffer de saída durante uma chamada para PreExecute, e armazena-as numa estrutura interna. O nome da coluna também está guardado na estrutura e é usado no exemplo de código para o PrimeOutput método na secção seguinte deste tópico.
ArrayList columnInformation;
private struct ColumnInfo
{
public int BufferColumnIndex;
public string ColumnName;
}
public override void PreExecute()
{
this.columnInformation = new ArrayList();
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
foreach (IDTSOutputColumn100 col in output.OutputColumnCollection)
{
ColumnInfo ci = new ColumnInfo();
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID);
ci.ColumnName = col.Name;
columnInformation.Add(ci);
}
}
Public Overrides Sub PreExecute()
Me.columnInformation = New ArrayList()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
For Each col As IDTSOutputColumn100 In output.OutputColumnCollection
Dim ci As ColumnInfo = New ColumnInfo()
ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID)
ci.ColumnName = col.Name
columnInformation.Add(ci)
Next
End Sub
Linhas de processamento
As linhas são adicionadas ao buffer de saída chamando o AddRow método, que cria uma nova linha de buffer com valores vazios nas suas colunas. O componente atribui então valores às colunas individuais. Os buffers de saída fornecidos a um componente são criados e monitorizados pela tarefa de fluxo de dados. À medida que ficam cheios, as filas do buffer são movidas para o componente seguinte. Não há forma de determinar quando um lote de linhas foi enviado para o componente seguinte porque o movimento das linhas pela tarefa de fluxo de dados é transparente para o desenvolvedor do componente, e a RowCount propriedade é sempre zero nos buffers de saída. Quando um componente de origem termina de adicionar linhas ao seu buffer de saída, notifica a tarefa de fluxo de dados chamando o SetEndOfRowset método do PipelineBuffer, e as linhas restantes do buffer são passadas para o componente seguinte.
Enquanto o componente de origem lê linhas da fonte de dados externa, pode querer atualizar os contadores de desempenho "Rows read" ou "BLOB bytes read" chamando o IncrementPipelinePerfCounter método. Para mais informações, consulte Contadores de Desempenho.
O exemplo de código seguinte mostra um componente que adiciona linhas a um buffer de saída em PrimeOutput. Os índices das colunas de saída no buffer foram localizados usando PreExecute o exemplo de código anterior.
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
PipelineBuffer buffer = buffers[0];
SqlDataReader dataReader = sqlCommand.ExecuteReader();
// Loop over the rows in the DataReader,
// and add them to the output buffer.
while (dataReader.Read())
{
// Add a row to the output buffer.
buffer.AddRow();
for (int x = 0; x < columnInformation.Count; x++)
{
ColumnInfo ci = (ColumnInfo)columnInformation[x];
int ordinal = dataReader.GetOrdinal(ci.ColumnName);
if (dataReader.IsDBNull(ordinal))
buffer.SetNull(ci.BufferColumnIndex);
else
{
buffer[ci.BufferColumnIndex] = dataReader[ci.ColumnName];
}
}
}
buffer.SetEndOfRowset();
}
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
Dim buffer As PipelineBuffer = buffers(0)
Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader()
' Loop over the rows in the DataReader,
' and add them to the output buffer.
While (dataReader.Read())
' Add a row to the output buffer.
buffer.AddRow()
For x As Integer = 0 To columnInformation.Count
Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo)
Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName)
If (dataReader.IsDBNull(ordinal)) Then
buffer.SetNull(ci.BufferColumnIndex)
Else
buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName)
End If
Next
End While
buffer.SetEndOfRowset()
End Sub
Exemplo
O exemplo seguinte mostra um componente de origem simples que utiliza um gestor de ligação de ficheiros para carregar o conteúdo binário dos ficheiros no fluxo de dados. Este exemplo não demonstra todos os métodos e funcionalidades discutidos neste tópico. Demonstra os métodos importantes que cada componente fonte personalizado deve sobrepor, mas não contém código para validação em tempo de design.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
namespace BlobSrc
{
[DtsPipelineComponent(DisplayName = "BLOB Inserter Source", Description = "Inserts files into the data flow as BLOBs")]
public class BlobSrc : PipelineComponent
{
IDTSConnectionManager100 m_ConnMgr;
int m_FileNameColumnIndex = -1;
int m_FileBlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
output.Name = "BLOB File Inserter Output";
IDTSOutputColumn100 column = output.OutputColumnCollection.New();
column.Name = "FileName";
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0);
column = output.OutputColumnCollection.New();
column.Name = "FileBLOB";
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0);
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_ConnMgr = conn.ConnectionManager;
}
public override void ReleaseConnections()
{
m_ConnMgr = null;
}
public override void PreExecute()
{
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[0].LineageID);
m_FileBlobColumnIndex = (int)BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[1].LineageID);
}
public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
{
string strFileName = (string)m_ConnMgr.AcquireConnection(null);
while (strFileName != null)
{
buffers[0].AddRow();
buffers[0].SetString(m_FileNameColumnIndex, strFileName);
FileInfo fileInfo = new FileInfo(strFileName);
byte[] fileData = new byte[fileInfo.Length];
FileStream fs = new FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read);
fs.Read(fileData, 0, fileData.Length);
buffers[0].AddBlobData(m_FileBlobColumnIndex, fileData);
strFileName = (string)m_ConnMgr.AcquireConnection(null);
}
buffers[0].SetEndOfRowset();
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Namespace BlobSrc
<DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _
Public Class BlobSrc
Inherits PipelineComponent
Private m_ConnMgr As IDTSConnectionManager100
Private m_FileNameColumnIndex As Integer = -1
Private m_FileBlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New
output.Name = "BLOB File Inserter Output"
Dim column As IDTSOutputColumn100 = output.OutputColumnCollection.New
column.Name = "FileName"
column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0)
column = output.OutputColumnCollection.New
column.Name = "FileBLOB"
column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0)
Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_ConnMgr = conn.ConnectionManager
End Sub
Public Overrides Sub ReleaseConnections()
m_ConnMgr = Nothing
End Sub
Public Overrides Sub PreExecute()
Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer)
m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer)
End Sub
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())
Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String)
While Not (strFileName Is Nothing)
buffers(0).AddRow
buffers(0).SetString(m_FileNameColumnIndex, strFileName)
Dim fileInfo As FileInfo = New FileInfo(strFileName)
Dim fileData(fileInfo.Length) As Byte
Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read)
fs.Read(fileData, 0, fileData.Length)
buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData)
strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String)
End While
buffers(0).SetEndOfRowset
End Sub
End Class
End Namespace
Conteúdo relacionado
Desenvolvimento de um Componente de Destino Personalizado
Criação de uma Fonte com o Componente Script