このガイドでは、Java Message Service (JMS) 2.0 API を使用してAzure Service Busとの通信を成功させるために役立つ詳細情報を提供します。
Java 開発者として、Azure Service Bus を初めて使用する場合は、次の記事をお読みください。
| 作業の開始 | 概念 |
|---|---|
Java Message Service (JMS) プログラミング モデル
Java Message Service API プログラミング モデルについては、次のセクションで説明します。
注
Azure Service Bus Premium レベル では、JMS 1.1 と JMS 2.0 がサポートされます。
Azure Service Bus - Standard レベルでは、制限付きの JMS 1.1 機能がサポートされています。 詳細については、 このドキュメントを参照してください。
JMS - 構成要素
JMS アプリケーションと通信するには、次の構成要素を使用します。
注
このガイドは、oracle Java EE 6 Tutorial for Java Message Service (JMS) に適合しています。
Java メッセージ サービス (JMS) の理解を深めるために、このチュートリアルを参照してください。
接続ファクトリ
注
azure-servicebus-jms ライブラリは、com.azure:azure-servicebus-jms () 用のjakarta.jms.* (バージョン 2.0.0 以降) と com.microsoft.azure:azure-servicebus-jms EE () のjavax.jms.* (バージョン 1.0.x) の 2 つのバリエーションで使用できます。 適切な成果物の選択に関するガイダンスについては、 Jakarta EE と javax のサポートを参照してください。
クライアントは、接続ファクトリ オブジェクトを使用して JMS プロバイダーに接続します。 接続ファクトリは、管理者が定義する一連の接続構成パラメーターをカプセル化します。
各接続ファクトリは、 ConnectionFactory、 QueueConnectionFactory、または TopicConnectionFactory インターフェイスのインスタンスです。
これらのインターフェイスは、Azure Service Busへの接続を簡略化するために、それぞれ ServiceBusJmsConnectionFactory、ServiceBusJmsQueueConnectionFactory、または ServiceBusJmsTopicConnectionFactory によって実装されます。
重要
JMS 2.0 API を使用するJavaアプリケーションは、接続文字列を使用するか、TokenCredential を使用してMicrosoft Entraに基づく認証を利用することで、Azure Service Busに接続できます。 Microsoft Entraバックアップされた認証を使用する場合は、必要に応じてロールとアクセス許可を ID に割り当てるようにします。
Azure で システム割り当てマネージド ID を 作成し、この ID を使用して TokenCredentialを作成します。
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
次のパラメーターを使用して、接続ファクトリをインスタンス化できます。
- トークン資格情報 - OAuth トークンを提供できる資格情報を表します。
- ホスト - Azure Service Bus Premium レベルの名前空間のホスト名。
- ServiceBusJmsConnectionFactorySettings プロパティバッグで、次のものが含まれます。
-
connectionIdleTimeoutMS- アイドル接続タイムアウトはミリ秒です。 -
traceFrames- デバッグ用の AMQP トレース フレームを収集するためのブール型フラグ。 - その他の構成パラメーター。
-
次の例に示すように、ファクトリを作成します。 トークン資格情報とホストは必須パラメーターですが、他のプロパティは省略可能です。
String host = "<YourNamespaceName>.servicebus.windows.net";
ConnectionFactory factory = new ServiceBusJmsConnectionFactory(tokenCredential, host, null);
JMS の送信先
宛先は、クライアントが生成するメッセージのターゲットと、クライアントが使用するメッセージのソースを指定するために使用するオブジェクトです。
送信先は、Azure Service Bus 内のエンティティにマッピングされます - キュー (ポイント間のシナリオの場合) とトピック (パブリッシュ/サブスクライブのシナリオの場合)。
接続
接続は、JMS プロバイダーとの仮想接続をカプセル化します。 Azure Service Bus では、AMQP 経由のアプリケーションと Azure Service Bus の間のステートフル接続を表します。
次の例に示すように、接続ファクトリから接続を作成します。
Connection connection = factory.createConnection();
セッション
セッションは、メッセージを生成および使用するためのシングル スレッド コンテキストです。 これを使用して、メッセージ、メッセージ プロデューサー、およびコンシューマーを作成します。 また、送信と受信をアトミック作業単位にグループ化するためのトランザクション コンテキストも提供します。
次の例に示すように、接続オブジェクトからセッションを作成します。
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
注
JMS API は、メッセージング・セッションが有効になっている Service Bus キューまたはトピックからのメッセージの受信をサポートしていません。
セッション モード
次のいずれかのモードでセッションを作成します。
| セッション モード | 行動 |
|---|---|
| Session.AUTO_ACKNOWLEDGE | セッションは、受信する呼び出しからセッションが正常に戻ったとき、またはメッセージを処理するためにセッションが呼び出したメッセージ リスナーが正常に返すときに、クライアントによるメッセージの受信を自動的に確認します。 |
| Session.CLIENT_ACKNOWLEDGE | クライアントは、メッセージの受信確認メソッドを呼び出すことによって、使用されたメッセージを確認します。 |
| Session.DUPS_OK_ACKNOWLEDGE | この受信確認モードでは、メッセージの遅延配信を行うよう求める指示がセッションに送られます。 |
| Session.SESSION_TRANSACTED | この値を引数として Connection オブジェクトの createSession(int sessionMode) メソッドに渡して、セッションでローカル トランザクションを使用するように指定します。 |
セッション モードを指定しない場合、既定値は Session.AUTO_ACKNOWLEDGE です。
JMSContext
注
JMSContext は、JMS 2.0 仕様の一部として定義されています。
JMSContext は、接続オブジェクトとセッション オブジェクトによって提供される機能を組み合わせたものです。 接続ファクトリ オブジェクトから作成します。
JMSContext context = connectionFactory.createContext();
JMSContext のモード
Session オブジェクトと同様に、セッション モードで説明したのと同じ受信確認モードを使用して JMSContext を作成できます。
JMSContext context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
モードを指定しない場合、既定値は JMSContext.AUTO_ACKNOWLEDGE です。
JMS メッセージ プロデューサー
メッセージ プロデューサーは、JMSContext または Session を使用して作成するオブジェクトです。 送信先にメッセージを送信する場合に使用します。
次の例に示すように、スタンドアロン オブジェクトとして作成できます。
JMSProducer producer = context.createProducer();
または、メッセージを送信する必要があるときに実行時に作成することもできます。
context.createProducer().send(destination, message);
JMS メッセージ コンシューマー
メッセージ コンシューマーは、JMSContext または Session によって作成されるオブジェクトです。 これを使用して、送信先に送信されたメッセージを受信します。 次の例に示すように作成します。
JMSConsumer consumer = context.createConsumer(dest);
receive() メソッドを介した同期受信
メッセージ コンシューマーは、 receive() メソッドを介して送信先からメッセージを受信する同期的な方法を提供します。
引数またはタイムアウトを指定しない場合、または 0のタイムアウトを指定した場合、メッセージが到着しない限り、または接続が切断されない限り、コンシューマーは無期限にブロックされます (どちらか早い方)。
Message m = consumer.receive();
Message m = consumer.receive(0);
0 以外の正の引数を指定すると、コンシューマーはタイマーの有効期限が切れるまでブロックします。
Message m = consumer.receive(1000); // time out after one second.
JMS メッセージ リスナーを使用した非同期受信
メッセージ リスナーは、送信先でのメッセージの非同期処理に使用するオブジェクトです。
MessageListener インターフェイスを実装します。このインターフェイスには、特定のビジネス ロジックを実行する必要があるonMessage メソッドが含まれています。
setMessageListener メソッドを使用して、メッセージ リスナー オブジェクトをインスタンス化し、特定のメッセージ コンシューマーに対して登録する必要があります。
Listener myListener = new Listener();
consumer.setMessageListener(myListener);
トピックからの使用
JMS メッセージ・コンシューマーをキューまたはトピックである宛先に対して作成します。
キュー上のコンシューマーは、クライアント アプリケーションとAzure Service Busの間のセッション (および接続) のコンテキストに存在する単なるクライアント側オブジェクトです。
ただし、トピックに関するコンシューマーには、次の 2 つの部分があります。
- Session (または JMSContext) のコンテキストに存在する クライアント側オブジェクト 。
- Azure Service Bus 上のエンティティである サブスクリプション 。
サブスクリプションは ここに 記載されており、次のいずれかの種類にすることができます。
- 共有の永続的サブスクリプション
- 共有の非永続的サブスクリプション
- 非共有の永続的サブスクリプション
- 非共有の非永続的サブスクリプション
JMS キュー ブラウザー
JMS API は、アプリケーションがキュー内のメッセージを参照し、各メッセージのヘッダー値を表示するために使用できる QueueBrowser オブジェクトを提供します。
次の例に示すように、JMSContext を使用してキュー ブラウザーを作成できます。
QueueBrowser browser = context.createBrowser(queue);
注
JMS API には、トピックを参照するための API は用意されていません。
この制限は、トピック自体にメッセージが格納されないために存在します。 メッセージがトピックに送信されるとすぐに、適切なサブスクリプションに転送されます。
JMS メッセージ セレクター
受信側アプリケーションでは、メッセージ セレクターを使用して、受信したメッセージをフィルター処理できます。 メッセージ セレクターを使用すると、受信側のアプリケーションは、その責任を負うのではなく、JMS プロバイダー (この場合はAzure Service Bus) にメッセージをフィルター処理する作業をオフロードします。
セレクターは、次のいずれかのコンシューマーを作成するときに使用できます。
- 共有永続サブスクリプション
- 非共有の永続的なサブスクリプション
- 非永続的な共有サブスクリプション
- 非共有の非永続的サブスクリプション
- キュー コンシューマー
- キュー ブラウザー
注
Service Bus セレクターは、LIKE および BETWEEN SQL キーワードをサポートしていません。
スケジュールされたメッセージ (配信の遅延)
JMS 2.0 では、setDeliveryDelayまたはMessageProducerで JMSProducer メソッドを使用して、メッセージの将来の配信のスケジュールを設定できます。 このプロパティを設定すると、Service Busはメッセージを受け入れますが、遅延期間が経過した後にのみコンシューマーに表示されます。
MessageProducer producer = session.createProducer(queue);
// Schedule a message for delivery 30 seconds from now
producer.setDeliveryDelay(30000);
producer.send(session.createTextMessage("Scheduled message"));
完全な作業サンプルについては、azure-servicebus-jms-samples リポジトリの QueueScheduledSend.java を参照してください。
接続ファクトリの選定と信頼性
Spring Boot または JMS 接続を管理する他のフレームワークで ServiceBusJmsConnectionFactory を使用する場合は、信頼できる操作を確保するために 、送信側 と リスナー に適した接続ファクトリ ラッパーを選択します。
推奨される構成
| Role | 接続ファクトリ | なぜでしょうか |
|---|---|---|
送信者 (JmsTemplate) |
CachingConnectionFactory 包装 ServiceBusJmsConnectionFactory |
JmsTemplate は、既定で送信ごとに接続を作成して閉じます。
CachingConnectionFactory では、単一の AMQP 接続が維持され、セッションがキャッシュされるため、負荷の下でブローカー リソースを使い果たす可能性のある接続変更を回避できます。 |
リスナー (@JmsListener、 DefaultMessageListenerContainer) |
未加工の ServiceBusJmsConnectionFactory (ラップ未使用) |
各リスナー コンテナーは、独立したライフサイクルを持つ独自の AMQP 接続を取得します。 接続に失敗した場合 (トークンの有効期限、ゲートウェイのアップグレード、ネットワーク の省略)、そのリスナーのみが影響を受け、Spring によって接続が自動的に再作成されます。 |
リスナーが避けるべきこと
Warnung
リスナー コンテナーで SingleConnectionFactory を使用しないでください。 すべてのリスナーが 1 つの JMS 接続を共有するように強制されます。 何らかの理由でその接続が中断された場合、すべてのリスナーが同時に接続を失い、個別に復旧することはできません。 生の ServiceBusJmsConnectionFactory を使用して、各リスナー コンテナーが独自の接続を管理します。
CachingConnectionFactory また、キャッシュされたセッションが古い基になる接続を参照する可能性があるため、リスナー コンテナーで問題が発生する可能性もあります。 リスナーの場合、ロー・ファクトリーにより、各コンテナが独立して新規接続を確立できるようになります。
Spring Cloud Azureの既定値
spring-cloud-azure-starter-servicebus-jms (バージョン 6.2.0 以降) を使用する場合、スターターは既定でこのファクトリ分離を適用します。
spring.jms.servicebus.pool.enabled |
spring.jms.cache.enabled |
送信者ファクトリ | リスナー ファクトリ |
|---|---|---|---|
| (設定されていません) | (設定されていません) | CachingConnectionFactory |
ServiceBusJmsConnectionFactory |
| (設定されていません) | true |
CachingConnectionFactory |
CachingConnectionFactory |
| (設定されていません) | false |
ServiceBusJmsConnectionFactory |
ServiceBusJmsConnectionFactory |
true |
(設定されていません) | JmsPoolConnectionFactory |
JmsPoolConnectionFactory |
以前のバージョン (6.2.0 より前) では、送信者とリスナーの両方が既定で ServiceBusJmsConnectionFactory を使用するため、送信者は送信ごとに新しい接続を作成します。
例外リスナーの追加
例外リスナーがない場合、接続の切断は完全に無音になります。 監視できるように、送信者とリスナーの両方のファクトリに jakarta.jms.ExceptionListener を追加します。
connection.setExceptionListener(exception -> {
log.error("JMS connection error: {}", exception.getMessage(), exception);
});
Spring Boot で、 CachingConnectionFactory (送信者の場合) と DefaultJmsListenerContainerFactory (リスナーの場合) に例外リスナーを設定します。
これらのすべてのパターンを示す完全な作業サンプルについては、azure-servicebus-jms-samples リポジトリの Spring Boot JMS Resilience サンプルを参照してください。
配信不能キュー
Azure Service Bus内のすべてのキューとトピックのサブスクリプションには、デッドレターキュー (DLQ) が関連付けられています。 システムは、DLQ に配信または処理できないメッセージを自動的に移動します。 たとえば、メッセージが最大配信数を超えた場合、または有効期限 (TTL) が切れると、システムはメッセージを DLQ に移動します。
重要
TTL 期限切れのメッセージを DLQ に移動するには、キューまたはサブスクリプションでメッセージの有効期限切れ時のデッドレタリングを有効にします。 この設定を行わないと、システムは期限切れのメッセージを自動的に破棄します。 構成手順については、「キューまたはサブスクリプションの配信不能を有効にする」を参照してください。
JMS では、完全パスを構築し、それを使用して JmsQueue を作成することで、DLQ に別の宛先としてアクセスします。 特別な API は必要ありません。
キュー DLQ パスの形式:
<queue-name>/$deadletterqueue
トピック サブスクリプション DLQ パスの形式:
<topic-name>/Subscriptions/<subscription-name>/$deadletterqueue
例 - キューの配信不能キューからの使用:
import org.apache.qpid.jms.JmsQueue;
// Construct the DLQ path for a queue named "orders"
String dlqPath = "orders/$deadletterqueue";
JmsQueue dlqDestination = new JmsQueue(dlqPath);
// Create a consumer on the DLQ and receive messages
MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
Message message = dlqConsumer.receive(5000);
デッドレター メッセージには、メッセージが配信不能となった理由を説明するメタデータ プロパティが含まれます。
| 財産 | Description |
|---|---|
DeadLetterReason |
メッセージが配信不能になった理由 ( TTLExpiredException や MaxDeliveryCountExceededなど)。 |
DeadLetterErrorDescription |
配信不能の理由を人間が判読できる説明。 |
message.getStringProperty()を使用して、これらのプロパティを読み取る:
String reason = message.getStringProperty("DeadLetterReason");
String description = message.getStringProperty("DeadLetterErrorDescription");
完全な作業サンプルについては、azure-servicebus-jms-samples リポジトリの QueueDeadLetterReceive.java を参照してください。
AMQP のディスポジションと Service Bus 操作のマッピング
AMQP の処理が Service Bus 操作にどのように変換されるかを次に示します。
ACCEPTED = 1; -> Complete()
REJECTED = 2; -> DeadLetter()
RELEASED = 3; (just unlock the message in service bus, will then get redelivered)
MODIFIED_FAILED = 4; -> Abandon() which increases delivery count
MODIFIED_FAILED_UNDELIVERABLE = 5; -> Defer()
概要
この開発者ガイドでは、Java Message Service (JMS) を使用するクライアント アプリケーションJava Azure Service Busに接続する方法について説明します。
次のステップ
Azure Service Busの詳細と、Java Message Service (JMS) エンティティの詳細については、次の記事を参照してください。