噛み砕いてKafka Streams #kafkajp

414 Views

December 16, 16

スライド概要

Apache Kafka Meetup Japan #2 https://kafka-apache-jp.connpass.com/event/45923/ 発表資料

profile-image

2023年10月からSpeaker Deckに移行しました。最新情報はこちらをご覧ください。 https://speakerdeck.com/lycorptech_jp

シェア

またはPlayer版

埋め込む »CMSなどでJSが使えない場合

関連スライド

各ページのテキスト
1.

噛み砕いてKafka Streams 2016年12月15日 ヤフー株式会社 データ&サイエンスソリューション統括本部 データプラットフォーム本部 開発1部 パイプライン 1 森谷 大輔

2.

自己紹介 • 氏名 • 森谷 大輔 @kokumutyoukan • 業務 • 次世代データパイプラインの開発 • Kafka, Storm, Cassandra, Elasticsearch • 好き • 横浜ベイスターズ • ハングリータイガー(の会会長) 2

3.

今日のゴール • おっ、調べてみるかなという気になってもらう • • • 3 Kafka Streamsを触った内容を噛み砕いて紹介 布教というわけではない 気になるところあれば遠慮なくツッコんでください

4.

アジェンダ • • • • 概要 Word Count Time, Window, Join つかってみた • まとめ 4

5.

アジェンダ • • • • 概要 Word Count Time, Window, Join つかってみた • まとめ 5

6.

Kafka Streams is 何 • • • ストリーム処理のアプリケーションを書くためのライブラリ Apache Kafka に同梱されている 0.10.0 からアップデートの目玉として追加 (2016年5月) 群雄割拠勢 6 Confluentが開発・導入促進を 頑張っている

7.

ストリーム処理アプリケーションをつくるには • よく必要になる「難しい機能」 • • • • • • • • 7 パーティショニング・拡張性 故障してもうまいこと復旧する(ステート管理) 遅れてやってきたデータもうまいこと処理する(時間の扱い) 再処理 ウィンドウ集計 方法①:素のKafka Java APIを使う 方法②:ストリーム処理フレームワークを使う 方法③:Kafka Streamsを使う

8.

①:素の Kafka Java API を使う Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); • お手軽 • • • • 8 Java ライブラリなのでアプリケーションを書いて jar にかためて java コ マンドで起動さえすれば良い デプロイがシンプル 覚えることはAPIの使い方だけ ただし「難しい機能」を自分で考えて実装しなければならない

9.

②:ストリーム処理フレームワークを使う • Stormなど群雄割拠勢 • 「難しい機能」を含めリッチな機能が使える • ただしフレームワークの専用クラスタが必要 • • • 9 フレームワークならではの構成、設定、書き方 デプロイ複雑 覚えることが多い

10.

③:Kafka Streamsを使う • 「Kafka Streamsはフレームワークではなく、ライブラリ」 ・サーバを分散処理モードで動かすためにセッティングし、 ・フレームワークのとりきめに従ったアプリケーションの実装をし、 ・専用のデプロイツールでデプロイしてはじめて分散処理 ・ライブラリをクラスパスに含めてjarにかためてjavaコマンドうてば動く • 「難しい機能」も抽象化されている • • 大体のパターンのストリーム処理アプリケーションを書くには充分 リアルタイム性 • 10 Spark Streamingのようなマイクロバッチではなく、Stormのような逐次処理(at least once) • レイテンシ要求が厳しい案件でもOK

11.

比較 ※独断と偏見 方法 ① 素のKafka Java APIを使う ② ストリーム処理 フレームワークを使う ③ Kafka Streams (難しい機能) 実装の簡単さ 学習 コスト 運用(デプロイ) コスト ✕ ◯ ◯ ◯ ✕ △ ◯ ✕ ◯ ※ストリーム処理フレームワークにしかない機能もある 11

12.

アジェンダ • • • • 概要 Word Count Time, Window, Join つかってみた • まとめ 12

13.

ことはじめ • ビルド設定(maven) <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.1</version> </dependency> • 13 APIを選ぶ • high-level DSL ←今回はこれ • low-level API

14.
[beta]
プログラム
@Test
public final void wordCount() {
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> queryStream
= builder.stream(stringSerde, stringSerde, “search-query-topic”); // 入力トピック名は複数指定可能
KStream<String, Long> wordCounts = queryStream
.flatMapValues(value -> Arrays.asList(value.split(“¥¥s+”))) // 空白区切り分割
.map((key, value) -> new KeyValue<>(value, value)) // key 毎カウント下準備
.countByKey(stringSerde, “Counts”) // KStream -> KTable
.toStream(); // KTable -> KStream
wordCounts.to(stringSerde, longSerde, “wordcount-output”); // sink トピックに結果を書く

KafkaStreams streams = new KafkaStreams(builder, props); // props は Kafka Streams の設定
streams.start(); // アプリケーション実行
}

14

15.
[beta]
入力・結果例
// 入力
producer.send(new ProducerRecord<>(“search-query-topic”, “ぬこ 飼い方”));
producer.send(new ProducerRecord<>(“search-query-topic”, “犬 飼い方”));
producer.send(new ProducerRecord<>(“search-query-topic”, “本当すこ ぬこ"));
consumer.subscribe(Arrays.asList("wordcount-output"));
while (true) {
ConsumerRecords<String, Long> records = consumer.poll(100);
for (ConsumerRecord<String, Long> record : records) {
System.out.println("record = " + record.key() + ", " + record.value());
}
}
// 出力
record = ぬこ, 1
record = 飼い方, 1
record = 犬, 1
アプリケーションの動作確認は
record = 飼い方, 2
Kafka Unit Testを使うと便利
record = 本当すこ, 1
※Kafka 公式 FAQ 参照
record = ぬこ, 2

15

16.

KStream? KTable? • KStream • record streamを扱う場合はKStreamクラスを使う • 自己完結のデータストリーム • 例えばPVログ、サーバログ、ツイート • KTable • changelog streamを扱う場合はKTableクラスを使う • 状態を持つ、keyで値が更新されるデータのストリーム • 例えばこの単語が今までに何件出現したか、のようなデータ • Stateとしてローカルに保持される 16

17.

アジェンダ • • • • 概要 Word Count Time, Window, Join つかってみた • まとめ 17

18.

Time • ストリームであるイベントが流れてきた時、そのイベントのタイムスタンプとしてどんな情 報を使うべきか 例えばイベントがツイートだとして、一時間毎のツイート数を計算したいといった場合、な んのタイムスタンプ毎に計算する? • 1. 2. 3. 4. ユーザがツイートした瞬間 ツイートをAPIからバックエンドサーバが受け取ってKafkaに投げた瞬間 Kafkaに入った瞬間 Kafka Streamsがそのイベントを処理した瞬間 Tweet! Twitter API ① 18 Kafka Streams my BE server ② ③ ④

19.

Time • ストリームであるイベントが流れてきた時、そのイベントのタイムスタンプとしてどんな情 報を使うべきか 例えばイベントがツイートだとして、一時間毎のツイート数を計算したいといった場合、な んのタイムスタンプ毎に計算する? • 1. 2. 3. 4. • • ユーザがツイートした瞬間 ツイートをAPIからバックエンドサーバが受け取ってKafkaに投げた瞬間 Kafkaに入った瞬間 Kafka Streamsがそのイベントを処理した瞬間 多くは1だと思うが、アプリケーションの仕様によって異なる Kafka Streamsでは設定項目 timestamp.extractor でどれを選択するか簡単に決められ る 19

20.

Kafka Streams的分類 • event-time • ログ内の独自タイムスタンプの場合 • • 「ユーザがツイートした瞬間」 Kafka messageに付与されているタイムスタンプを使う場合 • • • 「ツイートをAPIからバックエンドサーバが受け取ってKafkaに投げた瞬間」 broker設定 log.message.timestamp.type=CreateTime (デフォルト) このタイムスタンプはKafka0.10からMessageに付与される • • 0.9以前のproducerから投げると -1 ingestion-time • • • • 「Kafkaに入った瞬間」 log.message.timestamp.type=LogAppendTime だった場合 そのイベントがKafka Brokerに入ったときの時刻がmessageタイムスタンプに付与 processing-time • 20 「Kafka Streamsがそのイベントを処理した瞬間」

21.

timestamp.extractor Time分類 timestamp.extractor event-time(独自) 自分で実装する event-time(message) ConsumerRecordTimestampExtractor ingestion-time ConsumerRecordTimestampExtractor processing-time WallclockTimestampExtractor 設定例 import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; Properties props = new Properties(); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); 21

22.
[beta]
独自クラス実装例
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
// TimestampExtractorインタフェースを実装する
public class MyEventTimeExtractor implements TimestampExtractor {
@Override public long extract(ConsumerRecord<Object, Object> record) {
// ログをパースしてtimestampを取り出す
Foo myPojo = (Foo) record.value();
if (myPojo != null) {
return myPojo.getTimestampInMillis();
} else {
// valueがnullだったらとりあえず現在時刻をいれておく
return System.currentTimeMillis();
}
}
}
http://docs.confluent.io/3.0.0/streams/developer-guide.html#timestamp-extractor (コメント以外引用)

22

23.

Window • Tumbling time window • 5分毎のユーザ毎のPV数とか KStream<String, String> viewsByUser = ユーザIDがkeyのPVログStreamなど; KTable<Windowed<String>, Long> userCounts = viewsByUser.countByKey(TimeWindows.of(”WindowName", 5 * 60 * 1000L)); • Hopping time window • 1つのイベントが複数のウィンドウにまたがる TimeWindows.of(”WindowName", 5 * 60 * 1000L).advanceBy(60 * 1000L); 23

24.
[beta]
Join
•

ストリーム処理でよくやるストリームとテーブルのJoinができる
KStream<String, String> voteRegionStream = ...(“vote-topic”)
KTable<String, String> partyTable = ...("party-topic");
KStream<String, String> voteParty
= voteRegionStream.leftJoin(
partyTable, (region, party) -> region + ”," + party);

•

k: Hillary

k: Hillary

v: California

k: candidate

v: party

Hillary

Democratic

Trump

Republican

v: California, Democratic

KTableはローカルにあり、常に最新である
•

24

メッセージ処理毎にネットワークを超えてKVSを叩く必要も、鮮度を諦めて定期的にRDBを
メモリにロードする必要もない

25.

アジェンダ • • • • 概要 Word Count Time, Window, Join つかってみた • まとめ 25

26.

Kafka Streamsで開発してみた • Kafkaクラスタから引いた全メッセージをグルーピング、ウィンドウ集計して指標をsinkに 書くシンプルなアプリ • • ローカルでテストは通った、本番デプロイいこう バグを踏む:Kafka-4160 (´・ω・`) • • • • • Kafka Streamsのフォアグラウンドスレッドとバッググラウンド ハートビートスレッドの間に単一のロックがある タスク生成中にハートビートをブロックするのでタスク生成が長い とセッションタイムアウトを超える consumerがグループから追い出されて再度タスク生成を始める 永遠に繰り返してデッドロックみたくなる 入力パーティション数が少ないと問題にならないのだが 本番では1topicあたり最大60あったため本番で初めて発覚した 26

27.

続き • • • バグは Kafka 0.10.1.0 で解消されたよ!(今client, server共に 0.10.0.1) Kafka Streams をバージョンアップすれば解決しそう > Apps built with Kafka Streams 0.10.1 only work against Kafka clusters running 0.10.1+. • 古いサーバに対しても互換性なんとかしたいとは書いてあった • • 0.10.0.1のKafkaにバグフィックスだけパッチ当ててアプリに入れるか・・・ 対象コードが 0.10.1 で大きく変わってて厳しい • やっぱりサーバあげよう ← イマココ 27 ___________ /|:: ┌──────┐ ::| /. |:: | Exception | ::| / ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ |.... |:: | Use 0.10.1 !| ::| | マイナーバージョンアップなら…アレ? |.... |:: | .| ::| \_ ______ |.... |:: └──────┘ ::| ∨ \_| ┌────┐ .| ∧∧  ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ( _) / ̄ ̄ ̄ ̄ ̄旦 ̄(_, ) / \ | ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄ ̄|、_)  ̄| ̄| ̄ ̄ ̄ ̄ ̄ ̄|

28.

思ったこと • 向くユースケースなら向く • • • Kafka Streamsのコンセプトがわかってきてからシステム設計した方がいいかも 既にカッチリ決まった要件にKafka Streamsを合わせようとするとハックするはめになるかも ライブラリならアプリケーション開発を楽にしてくれなくちゃいけない • • • Kafkaの素のハイレベルAPIがそもそもかなりちょうどいい抽象化 Kafka Streams APIの利点が活かせるかどうか インターナルトピックをかなり大量に作ることを想定している(アプリのバージョンアップご とにトピックは増える) • • 28 小さめのサービス専用クラスタとかならいいが、マルチテナント向けのクラスタだとちょっと気持ち悪 いかも 現在は1 Kafkaクラスタしか指定できないが将来的には複数可能になるかも

29.

アジェンダ • • • • 概要 Word Count Time, Window, Join つかってみた • まとめ 29

30.

まとめ • • • • 30 Kafka Streamsはストリーム処理のアプリケーションを実装するためのライブ ラリ シンプルながらストリーム処理でよく必要になる、自分で実装するには難し い機能を実現する 時間軸に何を使うか開発者が選択できる Kafka Streamsが便利に使えるようにシステム設計をすると吉

31.

Appendix 31

32.

Kafka Streamsはどこで動くの? • consumerアプリケーション • 普通はKafkaクラスタの(物理的に)近くのアプリケーション専用サーバ上でJavaプロセスとして動 かすと思う • ライブラリなので何でもできるが、Kafkaとしか接続しないように全体設計すると楽そう Kafkaクラスタ source topic Kafka Connect 等 Kafka Streams internal topic sink topic Kafka Connect 等 32

33.

Configuration import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ConsumerConfig; Properties settings = new Properties(); settings.put(StreamsConfig.APPLICATION_ID_CONFIG, “my-app”); // StreamConfigのこの3つは必須 settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ”localhost:9092"); settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ”localhost:2181"); settings.put(ProducerConfig...., “”); // 必須でない settings.put(ConsumerConfig...., “”); // 必須でない application.id アプリケーション認識名. consumer group名やinternal topic名等に利用される. bootstrap.servers 接続するKafkaクラスタのhost/portペアのリスト. zookeeper.connect 接続するZooKeeperのコネクション文字列(host:port/chroot). num.stream.threads ストリーム処理のために使うスレッド数. replication.factor internal topicを作るときのレプリケーションファクタ state.dir State Storeのディレクトリパス 33 timestamp.extractor 後述

34.

フォールトトレラント • 故障時にStateを復旧させるため、Kafkaクラスタにchangelog topicという内部topicが作ら れる source part-0 Node Task changelog part-0 source part-1 Node Task changelog part-1 34

35.

フォールトトレラント • 故障時にStateを復旧させるため、Kafkaクラスタにchangelog topicという内部topicが作ら れる Node source part-0 changelog part-0 source part-1 Task Task Node Task changelog part-1 35

36.

changelog topic(おまけ) • topicはKafka Streamsアプリケーションの実行時に自動で作成される • 手動でtopicを作るときと同じような感じで、Kafka設定 auto.create.topics.enable=falseでも作成される • topic設定はcompact • • 36 同じkeyで頻繁にvalueが変わるはずだから タスク数分パーティションが作られる

37.
[beta]
プログラム(full)
@Test
public final void wordCount() {
final Serde<String> stringSerde = Serdes.String(); // Serde is Serializer/Deserializerの略、Kafka共通のクラス
final Serde<Long> longSerde = Serdes.Long(); // 基本的なビルトインをSerdesから呼べる、もちろん自作可能
KStreamBuilder builder = new KStreamBuilder();
// 入力名からKStreamを作る. 1: key Serde, 2: value Serde, 3: 入力トピック名(複数指定可能)
KStream<String, String> queryStream = builder.stream(stringSerde, stringSerde, “search-query-topic”);
KStream<String, Long> wordCounts = queryStream
// valueに対して空白区切りで文字列を分割して次に送る処理
.flatMapValues(value -> Arrays.asList(value.split(“¥¥s+”)))
// key毎カウントしたいからkeyにvalueを入れる
.map((key, value) -> new KeyValue<>(value, value))
.countByKey(stringSerde, “Counts”) // KStream -> KTable、第二引数はKTable名
.toStream(); // KTable -> KStream
wordCounts.to(stringSerde, longSerde, “wordcount-output”); // sinkトピックに結果を書く
KafkaStreams streams = new KafkaStreams(builder, props); // propsはKafka StreamsやClientの設定Properties
streams.start(); // アプリケーション実行
}37

38.

比較(full) ※独断と偏見 (難しい機能) 実装の簡単さ 学習 コスト 運用(デプロイ) コスト ① 素のKafka Java APIを使う ✕ ◯ ◯ ② ストリーム処理 フレームワークを使う ◯ ✕ ✕ ③ Kafka Streams ◯ △ ◯ 方法 38 実績 ◯ △ (差異が大きい) ✕ ドキュメント 充実度 ◯ △? △