Kafka
Kafka プラグインを使用すると、 Kafka(英語) イベントストリーミングプロセスを監視し、コンシューマー、プロデューサー、トピックを作成できます。 また、スキーマレジストリに接続し、スキーマを作成および更新することもできます。
Kafka プラグインをインストールする
この機能は、インストールして有効にする必要がある Kafka(英語) プラグインに依存しています。
Ctrl+Alt+S を押して設定を開き、 を選択します。
Marketplace タブを開き、 Kafka プラグインを見つけて、 インストール をクリックします (プロンプトが表示されたら、IDE を再起動します)。
Kafka プラグインを使用すると、次のことが可能になります。
接続先:
Kafka プラグインがインストールされ有効になっている場合は、 Kafka ツールウィンドウ () を使用して Kafka に接続し、操作することができます。 あるいは、 リモートファイルシステム または Zeppelin プラグインがインストールされ有効になっている場合は、 ビッグデータツール ツールウィンドウ () を使用して Kafka 接続にアクセスすることもできます。
Kafka に接続する
クラウドプロバイダーを使用して Kafka に接続する
Confluent クラスターに接続する
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
「名前 」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソース リストで クラウド を選択し、次に プロバイダー リストで Confluent を選択します。
https://confluent.cloud/home(英語) に進みます。 ページの右側で、設定メニューをクリックし、 環境 を選択し、クラスターを選択して、 を選択します。
クライアントの構成スニペットをコピーする ブロックで、Kafka API キーを指定し、 コピー をクリックします。
IDE に戻り、コピーしたプロパティを 構成 フィールドに貼り付けます。
設定を入力したら、 接続のテスト をクリックして、すべての構成パラメーターが正しいことを確認します。 次に OK をクリックします。

オプションで、次を設定できます。
接続を有効にする: この接続を無効にする場合は、チェックボックスをオフにしてください。 デフォルトでは、新しく作成された接続は有効になっています。
プロジェクト単位 ごと: これらの接続設定を現在のプロジェクトでのみ有効にするには、チェックボックスをオンにしてください。 この接続を他のプロジェクトでも表示したい場合は、チェックボックスをオフにしてください。
AWS MSK クラスターに接続する
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
「名前 」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソース リストで クラウド を選択し、次に プロバイダー リストで AWS MSK を選択します。
「Bootstrap サーバー 」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
AWS 認証 リストで認証方法を選択します。
デフォルトの資格情報プロバイダーチェーン: デフォルトのプロバイダーチェーンの資格情報を使用します。 チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語) 」を参照してください。
資格情報ファイルからのプロファイル: ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵: 資格情報を手動で入力します。
オプションで、 スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、 トンネリングを有効にする を選択し、 SSH 構成 リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、 接続のテスト をクリックして、すべての構成パラメーターが正しいことを確認します。 次に OK をクリックします。

オプションで、次を設定できます。
接続を有効にする: この接続を無効にする場合は、チェックボックスをオフにしてください。 デフォルトでは、新しく作成された接続は有効になっています。
プロジェクト単位 ごと: これらの接続設定を現在のプロジェクトでのみ有効にするには、チェックボックスをオンにしてください。 この接続を他のプロジェクトでも表示したい場合は、チェックボックスをオフにしてください。
カスタム Kafka サーバーに接続する
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
「名前 」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソース リストで、 カスタム を選択します。
「Bootstrap サーバー 」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
認証 で、認証方法を選択します。
なし: 認証なしで接続します。
SASL :SASL メカニズム(Plain、SCRAM-SHA-256、SCRAM-SHA-512、 Kerberos )を選択し、ユーザー名とパスワードを入力します。
SSL
ブローカーのホスト名がブローカー証明書のホスト名と一致することを確認する場合は、 サーバーのホスト名を検証する を選択します。 チェックボックスをオフにすることは、
ssl.endpoint.identification.algorithm=プロパティを追加することと同じです。トラストストアの場所 で、SSL トラストストアのロケーションへのパス(
ssl.truststore.locationプロパティ)を指定します。トラストストアのパスワード で、SSL トラストストアのパスワードへのパス(
ssl.truststore.passwordプロパティ)を指定します。キーストアクライアント認証を使用する を選択し、 鍵ストアのロケーション (
ssl.keystore.location)、 鍵ストアパスワード (ssl.keystore.password)、および 鍵パスワード (ssl.key.password) の値を指定します。
AWS IAM: Amazon MSK には AWS IAM を使用します。 AWS 認証 リストで、次のいずれかを選択します。
デフォルトの資格情報プロバイダーチェーン: デフォルトのプロバイダーチェーンの資格情報を使用します。 チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語) 」を参照してください。
資格情報ファイルからのプロファイル: ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵: 資格情報を手動で入力します。
オプションで、 スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、 トンネリングを有効にする を選択し、 SSH 構成 リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、 接続のテスト をクリックして、すべての構成パラメーターが正しいことを確認します。 次に OK をクリックします。

オプションで、次を設定できます。
接続を有効にする: この接続を無効にする場合は、チェックボックスをオフにしてください。 デフォルトでは、新しく作成された接続は有効になっています。
プロジェクト単位 ごと: これらの接続設定を現在のプロジェクトでのみ有効にするには、チェックボックスをオンにしてください。 この接続を他のプロジェクトでも表示したい場合は、チェックボックスをオフにしてください。
プロパティを使用して Kafka に接続する
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
「名前 」フィールドに、他の接続と区別するための接続の名前を入力します。
構成ソース リストで、 プロパティ を選択します。
「Bootstrap サーバー 」フィールドに、Kafka ブローカーの URL、または URL のコンマ区切りリストを入力します。
Kafka ブローカー構成プロパティを提供する方法を選択します。
暗黙的: 提供された構成プロパティを貼り付けます。 または、コード補完と IntelliJ IDEA が提供するクイックドキュメントを使用して手動で入力することもできます。
ファイルから: プロパティファイルを選択します。
オプションで、 スキーマレジストリに接続できます。
Kafka への接続中に SSH トンネルを使用する場合は、 トンネリングを有効にする を選択し、 SSH 構成 リストで SSH 構成を選択するか、新しい構成を作成します。
設定を入力したら、 接続のテスト をクリックして、すべての構成パラメーターが正しいことを確認します。 次に OK をクリックします。
オプションで、次を設定できます。
接続を有効にする: この接続を無効にする場合は、チェックボックスをオフにしてください。 デフォルトでは、新しく作成された接続は有効になっています。
プロジェクト単位 ごと: これらの接続設定を現在のプロジェクトでのみ有効にするには、チェックボックスをオンにしてください。 この接続を他のプロジェクトでも表示したい場合は、チェックボックスをオフにしてください。
Spring プロジェクトで Kafka に接続する
Spring プロジェクトで Kafka を使用する場合は、アプリケーションプロパティファイルの構成プロパティに基づいて、Kafka クラスターにすばやく接続 (または既存の接続を開く) できます。
少なくとも
bootstrap-serversプロパティが定義された application.properties または application.yml ファイルを開きます。ガターで、
をクリックし、 Kafka 接続を作成する を選択します。 Kafka 接続をすでに構成している場合は、このリストで選択することもできます。

さらに、 @KafkaListener でアノテーションが付けられたメソッドがある場合は、その横にある をクリックして、指定されたトピックへのメッセージをすばやく生成したり、そこからデータを使用したりできます。
Kafka サーバーへの接続を確立すると、この接続を含む新しいタブが Kafka ツールウィンドウに表示されます。 これを使用して、データの 生成と 消費 、トピックの 作成と削除を行うことができます。 スキーマレジストリに接続している場合は、スキーマを表示、作成、更新することもできます。
Kafka ツールウィンドウの任意のタブで をクリックして、接続の名前変更、削除、無効化、リフレッシュを行ったり、接続の設定を変更したりできます。

すべてのクラスタートピックが トピック セクションに表示されます。 をクリックしてお気に入りのトピックのみを表示するか、
をクリックして内部トピックを表示または非表示にすることができます。 トピックをクリックすると、パーティション、構成、スキーマに関する情報などの詳細が表示されます。
トピックを作成する
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
トピック を選択し、
をクリックします (または Alt+Insert を押します)。
新しいトピックに名前を付け、パーティションの数とレプリケーション係数を指定して、「OK 」をクリックします。
トピックからレコードを削除する
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
トピック でトピックを右クリックし、 トピックのクリア を選択します (またはその左側にある
をクリックします)。 OK をクリックして削除を確認します。
データの生産と消費
データを作成する
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
Kafka 接続を選択し、 プロデューサー},{ をクリックします。
これにより、新しいエディタータブでプロデューサーが開きます。
トピック リストで、メッセージを書き込むトピックを選択します。
キー および 値 で、メッセージキーと値を選択します。
スキーマレジストリに接続している場合は、 スキーマレジストリ を選択して、送信されたデータを選択したスキーマと照合してチェックできます。
ランダムな値を生成できます:
をクリックすると、選択したタイプに基づいてランダムな値が生成されます。 これには、選択したスキーマレジストリに基づいて JSON オブジェクト全体の生成が含まれます。
ランダムな値を生成する柔軟性を高めるには、
${random...}変数を使用します。 JSON、Avro、Protobuf ファイルを編集するときに、値フィールドにrandomと入力し始めると、ランダムな値の自動補完オプションが表示されます。 例:"${random.integer(1,10)}"を使用して、1 から 10 までのランダムな整数を生成できます。
ヘッダー で、カスタムヘッダーを指定します。 JSON または CSV 形式で保存されている場合は、このセクションに貼り付けることができます。
フロー では、レコードフローを制御できます。
複数のレコードを同時に送信する場合は、 一度に記録 に数値を入力します。
レコードデータをランダムに生成する場合は、 ランダムなキーを生成する および ランダムな値を生成する を選択します。
レコード送信間の 間隔 をミリ秒単位で設定します。
指定されたレコード数に達した場合、または指定された時間が経過した場合に、プロデューサーがメッセージの送信を停止するようにするには、 停止条件 を指定します。
オプション で、追加のオプションを指定します。
パーティション :レコードを送信するトピックパーティションを指定します。 指定されていない場合は、デフォルトのロジックが使用されます:プロデューサーが鍵のハッシュ値をパーティション数で割った値を使います。
圧縮: プロデューサーによって生成されたデータの圧縮タイプを選択します: なし、 Gzip、 Snappy、 Lz4 、または Zstd。
冪等性: 各メッセージのコピーが 1 つだけストリームに書き込まれるようにする場合に選択します。
ACK: リーダーがローカルログにレコードを書き込み、すべてのフォロワーからの完全な確認を待たずに応答するようにする場合は、 リーダー を選択します。 リーダーが同期レプリカの完全なセットがレコードを確認するのを待機するには、 すべて を選択します。 サーバーからの確認応答を待たないように、プロデューサー用の なし を保持します。
生産 をクリックします。

データ タブ内の任意のレコードをクリックすると、その詳細が表示されます。 をクリックして統計を有効にすることもできます。
データを消費する
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
Kafka 接続を選択し、 消費者 をクリックします。
これにより、新しいエディタータブでコンシューマーが開きます。
トピック リストで、サブスクライブするトピックを選択します。
キー および 値 で、使用するレコードのキーと値のデータ型を選択します。
範囲とフィルター を使用して、使用するデータを絞り込みます。
開始 リストで、データを消費する期間またはオフセットを選択します。 トピックからすべてのレコードを取得するには、 先頭から を選択します。
制限 リストで、データの受信をいつ停止するかを選択します (たとえば、トピック内のレコード数が特定の数に達したとき)。
フィルター を使用して、キー、値、またはヘッダーの部分文字列によってレコードをフィルターします。
その他 で、追加のパラメーターを設定します。
パーティション ボックスにパーティション ID またはコンマ区切りの ID リストを入力して、特定のパーティションからのレコードのみを取得します。
新しいコンシューマーを追加する場合は、 消費者団体 リストでコンシューマーグループを選択します。
消費を開始する をクリックします。

データ タブ内の任意のレコードをクリックすると、その詳細が表示されます。 をクリックして統計を有効にすることもできます。
データのエクスポート
生成または消費されたデータを CSV、TSV、JSON 形式でダウンロードできます。
プロデューサーまたはコンシューマーのプリセットを保存する
同じキー、値、ヘッダー、またはその他のパラメーターを使用してデータを頻繁に生成または使用する場合は、プリセットとして保存できます。 その後、プリセットを再利用して、プロデューサーまたはコンシューマーをすばやく作成できます。
Kafka ツールウィンドウで、 プロデューサー},{ または 消費者 をクリックします。
必要なパラメーターを指定し、プロデューサーまたはコンシューマー作成フォームの上部で
(プリセットの保存) をクリックします。
パラメーターはプリセットとして保存され、 プリセット タブで利用できます。 プリセットをクリックして適用します。
スキーマレジストリの操作
プロデューサーとコンシューマーはスキーマを使用して、レコードのキーと値の一貫性を検証し、保証できます。 Kafka プラグインはスキーマレジストリと統合され、Avro、Protobuf、JSON スキーマをサポートします。 これにより、次のことが可能になります。
スキーマレジストリに接続する
スキーマの作成、更新、削除、クローン作成
生の形式またはツリービューでスキーマをプレビューする
スキーマのバージョンを比較する
スキーマのバージョンを削除する
スキーマレジストリに接続する
クラウドプロバイダー、 カスタムサーバー 、または プロパティを使用して、Kafka ブローカーへの接続を作成します。
Confluent を使用する場合は、ブローカーレジストリプロパティとスキーマレジストリプロパティの両方を 構成 フィールドに貼り付けることができます。
それ以外の場合は、「スキーマレジストリ 」セクションを展開し、プロバイダー (Confluent または Glue) を選択します。
URL: スキーマレジストリの URL を入力します。
構成ソース: 接続パラメーターを提供する方法を選択します。
カスタム: 認証方法を選択し、資格情報を提供します。
Kafka ブローカーの SSL 設定とは異なる SSL 設定を使用する場合は、 ブローカー SSL 設定を使用する チェックボックスをオフにして、トラストストアのパスを指定します。
プロパティ: 提供された構成プロパティを貼り付けます。 または、コード補完と IntelliJ IDEA が提供する簡単なドキュメントを使用して、プロパティを手動で入力することもできます。
地域: スキーマレジストリリージョンを選択します。
AWS 認証: 認証方法を選択します:
デフォルトの資格情報プロバイダーチェーン: デフォルトのプロバイダーチェーンの資格情報を使用します。 チェーンの詳細については、「デフォルトの資格情報プロバイダーチェーンの使用(英語) 」を参照してください。
資格情報ファイルからのプロファイル: ファイルからプロファイルを選択します。
明示的なアクセスキーと秘密鍵: 資格情報を手動で入力します。
レジストリ名: 接続するスキーマレジストリの名前を入力するか、
をクリックしてリストから選択します。
設定を入力したら、 接続のテスト をクリックして、すべての構成パラメーターが正しいことを確認します。 次に OK をクリックします。
スキーマの作成
Kafka ツールウィンドウを開きます: 。
(新規接続) をクリックします。
スキーマレジストリ を選択し、
をクリックします (または Alt+Insert を押します)。
フォーマット リストで、スキーマ形式 (Avro、Protobuf、JSON) を選択します。
戦略 リストで、 命名戦略(英語)を選択し、選択した戦略に応じて、名前接尾辞を設定するか、トピックを選択します。 または、 カスタム名 を選択し、任意の名前を入力します。

スキーマをツリーおよび生のビューでプレビューできます。


スキーマのバージョンを比較する
スキーマレジストリに接続する場合は、 スキーマレジストリ のスキーマを選択します。
生のビュー に切り替えて、 比較 をクリックします。 このボタンは、スキーマに複数のバージョンがある場合に使用できます。

スキーマのバージョンを削除する
スキーマに複数のバージョンがある場合、特定のバージョンを削除できます。 スキーマレジストリは、 ソフト削除(バージョンの削除後もスキーマのメタデータと ID がレジストリから削除されない)とハード削除(スキーマ ID を含むすべてのメタデータが削除される)の 2 種類の削除(英語)をサポートしています。 選択できる方法は、Confluent スキーマレジストリと AWS Glue スキーマレジストリのどちらを使用しているかによって異なります。
Confluent Schema Registry では、デフォルトで論理的な削除が使用されます。 完全な削除 チェックボックスを選択すると、完全な削除の使用を選択できます。
AWS Glue Schema Registry は常に完全削除を使用します。
スキーマレジストリ でスキーマを選択します。
その右側にある
をクリックし、 バージョンの削除 を選択します。