フィルターは、関数の実行方法とタイミングを制御して可視性を提供することで、セキュリティを強化します。 これは、ソリューションがエンタープライズ対応であることを確信できるように、責任ある AI 原則を作業に導入するために必要です。
たとえば、承認フローが開始される前に、フィルターを利用してアクセス許可を検証します。 フィルターが実行され、承認を送信しようとしているユーザーのアクセス許可が確認されます。 つまり、プロセスを開始できるのは、選択したユーザーグループだけです。
フィルターの良い例は ここに フィルターに関するセマンティック カーネルのブログ記事で提供されています。
フィルターには次の 3 種類があります。
関数呼び出しフィルター - このフィルターは、
KernelFunctionが呼び出されるたびに実行されます。 次のことができます。- 実行中の関数とその引数に関する情報へのアクセス
- 関数の実行中の例外の処理
- 関数の結果をオーバーライドすることは、実行前(たとえば、キャッシュシナリオの場合)や実行後(たとえば、責任あるAIシナリオの場合)に行うことができます。
- 障害が発生した場合の関数の再試行 ( 代替 AI モデルへの切り替えなど)
プロンプト レンダリング フィルター - このフィルターは、プロンプトのレンダリング操作の前にトリガーされ、次の機能が有効になります。
- AI に送信されるプロンプトの表示と変更 (RAG や PII の編集など)
- 関数の結果をオーバーライドして AI へのプロンプト送信を防止する (セ マンティック キャッシュの場合など)
関数呼び出しフィルター - このフィルターは、
KernelFunctionが呼び出されるたびに実行されます。 次のことができます。- 実行中の関数とその引数に関する情報へのアクセス
- 関数の実行中の例外の処理
- 実行前(たとえばキャッシュシナリオの場合)または実行後(たとえば責任あるAIシナリオの場合)における関数結果のオーバーライド
- 障害が発生した場合の関数の再試行 ( 代替 AI モデルへの切り替えなど)
プロンプト レンダリング フィルター - このフィルターは、プロンプトのレンダリング操作の前にトリガーされ、次の機能が有効になります。
- AI に送信されるプロンプトの表示と変更
- 関数の結果をオーバーライドして AI へのプロンプト送信を防止する (セ マンティック キャッシュの場合など)
関数呼び出しフィルター - このフィルターは、
KernelFunctionが呼び出されるたびに実行されます。 次のことができます。- 実行中の関数とその引数に関する情報へのアクセス
- 関数の実行中の例外の処理
- 関数の結果のオーバーライドは、実行前(たとえば、キャッシュシナリオの場合)や実行後(たとえば、責任あるAIシナリオの場合)に行われます。
- エラーが発生した場合の関数の再試行
プロンプト レンダリング フィルター - このフィルターは、プロンプトのレンダリング操作の前にトリガーされ、次の機能が有効になります。
- AI に送信されるプロンプトの表示と変更
- 関数の結果をオーバーライドして AI へのプロンプト送信を防止する
-
自動関数呼び出しフィルター - 関数呼び出しフィルターと同様に、このフィルターは
automatic function callingのスコープ内で動作し、チャット履歴、実行されるすべての関数の一覧、反復カウンターなどの追加のコンテキストを提供します。 また、自動関数呼び出しプロセスの終了も可能になります (たとえば、3 つの計画された関数の 2 番目から目的の結果が得られた場合など)。
各フィルターには、関数の実行またはプロンプトのレンダリングに関するすべての関連情報を含む context オブジェクトが含まれています。 さらに、各フィルターには、パイプラインまたは関数自体で次のフィルターを実行するための next デリゲート/コールバックがあり、関数の実行を制御できます (悪意のあるプロンプトや引数の場合など)。 同じ種類の複数のフィルターを登録できます。それぞれ独自の責任を持つ必要があります。
フィルターでは、 next デリゲートの呼び出しは、次に登録されたフィルターまたは元の操作 (関数の呼び出しまたはプロンプトのレンダリング) に進むのに不可欠です。
nextを呼び出さないと、操作は実行されません。
フィルターを使用するには、まずフィルターを定義してから、依存関係の挿入または適切なKernel プロパティを使用してKernel オブジェクトに追加します。 依存関係の挿入を使用する場合、フィルターの順序は保証されないため、複数のフィルターでは実行順序が予測できない場合があります。
フィルターを使用するには、必要なパラメーターを持つ関数を定義し、Kernel メソッドを使用してadd_filter オブジェクトに登録するか (FilterTypes値またはその文字列に相当する値を渡す)、@kernel.filterデコレーターを使用してフィルターを定義して登録します。
関数呼び出しフィルター
このフィルターは、プロンプトまたはメソッドから作成された関数であるかどうかに関係なく、Semantic Kernel関数が呼び出されるたびにトリガーされます。
/// <summary>
/// Example of function invocation filter to perform logging before and after function invocation.
/// </summary>
public sealed class LoggingFilter(ILogger logger) : IFunctionInvocationFilter
{
public async Task OnFunctionInvocationAsync(FunctionInvocationContext context, Func<FunctionInvocationContext, Task> next)
{
logger.LogInformation("FunctionInvoking - {PluginName}.{FunctionName}", context.Function.PluginName, context.Function.Name);
await next(context);
logger.LogInformation("FunctionInvoked - {PluginName}.{FunctionName}", context.Function.PluginName, context.Function.Name);
}
}
依存関係の挿入を使用してフィルターを追加します。
IKernelBuilder builder = Kernel.CreateBuilder();
builder.Services.AddSingleton<IFunctionInvocationFilter, LoggingFilter>();
Kernel kernel = builder.Build();
Kernel プロパティを使用してフィルターを追加します。
kernel.FunctionInvocationFilters.Add(new LoggingFilter(logger));
コードの例
import logging
from typing import Awaitable, Callable
from semantic_kernel.filters import FilterTypes, FunctionInvocationContext
logger = logging.getLogger(__name__)
async def logger_filter(context: FunctionInvocationContext, next: Callable[[FunctionInvocationContext], Awaitable[None]]) -> None:
logger.info(f"FunctionInvoking - {context.function.plugin_name}.{context.function.name}")
await next(context)
logger.info(f"FunctionInvoked - {context.function.plugin_name}.{context.function.name}")
# Add filter to the kernel
kernel.add_filter(FilterTypes.FUNCTION_INVOCATION, logger_filter)
@kernel.filterデコレーターを使用して、フィルターを直接登録することもできます。
@kernel.filter(FilterTypes.FUNCTION_INVOCATION)
async def logger_filter(context: FunctionInvocationContext, next: Callable[[FunctionInvocationContext], Awaitable[None]]) -> None:
logger.info(f"FunctionInvoking - {context.function.plugin_name}.{context.function.name}")
await next(context)
logger.info(f"FunctionInvoked - {context.function.plugin_name}.{context.function.name}")
コードの例
詳細については、近日公開予定です。
プロンプト レンダリング フィルター
このフィルターは、プロンプトから作成された関数が呼び出されたときなど、プロンプトのレンダリング操作中にのみ呼び出されます。 メソッドから作成されたSemantic Kernel関数に対してはトリガーされません。
/// <summary>
/// Example of prompt render filter which overrides rendered prompt before sending it to AI.
/// </summary>
public class SafePromptFilter : IPromptRenderFilter
{
public async Task OnPromptRenderAsync(PromptRenderContext context, Func<PromptRenderContext, Task> next)
{
// Example: get function information
var functionName = context.Function.Name;
await next(context);
// Example: override rendered prompt before sending it to AI
context.RenderedPrompt = "Safe prompt";
}
}
依存関係の挿入を使用してフィルターを追加します。
IKernelBuilder builder = Kernel.CreateBuilder();
builder.Services.AddSingleton<IPromptRenderFilter, SafePromptFilter>();
Kernel kernel = builder.Build();
Kernel プロパティを使用してフィルターを追加します。
kernel.PromptRenderFilters.Add(new SafePromptFilter());
コードの例
from typing import Awaitable, Callable
from semantic_kernel.filters import FilterTypes, PromptRenderContext
async def safe_prompt_filter(
context: PromptRenderContext,
next: Callable[[PromptRenderContext], Awaitable[None]],
) -> None:
# Example: get function information
function_name = context.function.name
await next(context)
# Example: override the rendered prompt before sending it to the AI
context.rendered_prompt = f"Safe prompt: {context.rendered_prompt or ''}"
# Register the filter on the kernel
kernel.add_filter(FilterTypes.PROMPT_RENDERING, safe_prompt_filter)
@kernel.filterデコレーターを使用して、フィルターを直接登録することもできます。
@kernel.filter(FilterTypes.PROMPT_RENDERING)
async def prompt_rendering_filter(context: PromptRenderContext, next):
await next(context)
context.rendered_prompt = f"You pretend to be Mosscap, but you are Papssom who is the opposite of Moscapp in every way {context.rendered_prompt or ''}"
コードの例
詳細については、近日公開予定です。
自動関数呼び出しフィルター
このフィルターは、関数の自動呼び出しプロセス中にのみ呼び出されます。 このプロセスの外部で関数が呼び出されたときにはトリガーされません。
/// <summary>
/// Example of auto function invocation filter which terminates function calling process as soon as we have the desired result.
/// </summary>
public sealed class EarlyTerminationFilter : IAutoFunctionInvocationFilter
{
public async Task OnAutoFunctionInvocationAsync(AutoFunctionInvocationContext context, Func<AutoFunctionInvocationContext, Task> next)
{
// Call the function first.
await next(context);
// Get a function result from context.
var result = context.Result.GetValue<string>();
// If the result meets the condition, terminate the process.
// Otherwise, the function calling process will continue.
if (result == "desired result")
{
context.Terminate = true;
}
}
}
依存関係の挿入を使用してフィルターを追加します。
IKernelBuilder builder = Kernel.CreateBuilder();
builder.Services.AddSingleton<IAutoFunctionInvocationFilter, EarlyTerminationFilter>();
Kernel kernel = builder.Build();
Kernel プロパティを使用してフィルターを追加します。
kernel.AutoFunctionInvocationFilters.Add(new EarlyTerminationFilter());
コードの例
from semantic_kernel.filters import FilterTypes, AutoFunctionInvocationContext
@kernel.filter(FilterTypes.AUTO_FUNCTION_INVOCATION)
async def auto_function_invocation_filter(context: AutoFunctionInvocationContext, next):
await next(context)
if context.function_result == "desired result":
context.terminate = True
他のフィルターの種類と同様に、 kernel.add_filterを使用してフィルターを登録することもできます。
kernel.add_filter(FilterTypes.AUTO_FUNCTION_INVOCATION, auto_function_invocation_filter)
コードの例
詳細については、近日公開予定です。
ストリーミング呼び出しと非ストリーミング呼び出し
Semantic Kernelの関数は、ストリーミングと非ストリーミングの 2 つの方法で呼び出すことができます。 ストリーミング モードでは、関数は通常、 IAsyncEnumerable<T>を返しますが、非ストリーミング モードでは FunctionResultを返します。 この違いは、フィルターで結果をオーバーライドする方法に影響します。ストリーミング モードでは、新しい関数の結果値は IAsyncEnumerable<T>型である必要があります。非ストリーミング モードでは、単に T型にすることができます。 返す必要がある結果の種類を判断するには、フィルター コンテキスト モデルで context.IsStreaming フラグを使用できます。
/// <summary>Filter that can be used for both streaming and non-streaming invocation modes at the same time.</summary>
public sealed class DualModeFilter : IFunctionInvocationFilter
{
public async Task OnFunctionInvocationAsync(FunctionInvocationContext context, Func<FunctionInvocationContext, Task> next)
{
// Call next filter in pipeline or actual function.
await next(context);
// Check which function invocation mode is used.
if (context.IsStreaming)
{
// Return IAsyncEnumerable<string> result in case of streaming mode.
var enumerable = context.Result.GetValue<IAsyncEnumerable<string>>();
context.Result = new FunctionResult(context.Result, OverrideStreamingDataAsync(enumerable!));
}
else
{
// Return just a string result in case of non-streaming mode.
var data = context.Result.GetValue<string>();
context.Result = new FunctionResult(context.Result, OverrideNonStreamingData(data!));
}
}
private async IAsyncEnumerable<string> OverrideStreamingDataAsync(IAsyncEnumerable<string> data)
{
await foreach (var item in data)
{
yield return $"{item} - updated from filter";
}
}
private string OverrideNonStreamingData(string data)
{
return $"{data} - updated from filter";
}
}
でフィルターを使用する IChatCompletionService
IChatCompletionServiceがKernelではなく直接使用される場合、フィルターは、Kernel オブジェクトがチャット完了サービス メソッドにパラメーターとして渡された場合にのみ呼び出されます。フィルターは、Kernel インスタンスにアタッチされるためです。
Kernel kernel = Kernel.CreateBuilder()
.AddOpenAIChatCompletion("gpt-4", "api-key")
.Build();
kernel.FunctionInvocationFilters.Add(new MyFilter());
IChatCompletionService chatCompletionService = kernel.GetRequiredService<IChatCompletionService>();
// Passing a Kernel here is required to trigger filters.
ChatMessageContent result = await chatCompletionService.GetChatMessageContentAsync(chatHistory, executionSettings, kernel);
Semantic Kernelの関数は、ストリーミングと非ストリーミングの 2 つの方法で呼び出すことができます。 ストリーミング モードでは、関数は通常、AsyncGenerator[T]がストリーミング コンテンツ タイプの一種であるT オブジェクトを返しますが、非ストリーミング モードではFunctionResultを返します。 この違いは、フィルターで結果をオーバーライドする方法に影響します。ストリーミング モードでは、新しい関数の結果値も AsyncGenerator[T]型である必要があります。 返す必要がある結果の種類を決定するには、すべてのフィルター コンテキスト モデルで context.is_streaming フラグを使用できます。
そのため、ストリーミング関数呼び出し用の単純なロガー フィルターを構築するには、次のように使用します。
@kernel.filter(FilterTypes.FUNCTION_INVOCATION)
async def streaming_exception_handling(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]],
):
await next(context)
if not context.is_streaming:
return
async def override_stream(stream):
try:
async for partial in stream:
yield partial
except Exception as e:
yield [
StreamingChatMessageContent(role=AuthorRole.ASSISTANT, content=f"Exception caught: {e}", choice_index=0)
]
stream = context.result.value
context.result = FunctionResult(function=context.result.function, value=override_stream(stream))
コードの例
詳細については、近日公開予定です。
順序付け
依存関係の挿入を使用する場合、フィルターの順序は保証されません。 フィルターの順序が重要な場合は、適切なプロパティを使用して Kernel オブジェクトに直接フィルターを追加することをお勧めします。 この方法では、実行時にフィルターを追加、削除、または並べ替えることができます。
フィルターは、 add_filter または @kernel.filter デコレーターを使用して、カーネル オブジェクトに追加された順序で実行されます。 実行順序は動作に影響を与える可能性があるため、フィルターの順序を慎重に管理することが重要です。
次の例を確認してください。
def func():
print('function')
@kernel.filter(FilterTypes.FUNCTION_INVOCATION)
async def filter1(context: FunctionInvocationContext, next):
print('before filter 1')
await next(context)
print('after filter 1')
@kernel.filter(FilterTypes.FUNCTION_INVOCATION)
async def filter2(context: FunctionInvocationContext, next):
print('before filter 2')
await next(context)
print('after filter 2')
関数を実行すると、出力は次のようになります。
before filter 1
before filter 2
function
after filter 2
after filter 1