2.7K Views
February 27, 20
スライド概要
2020-02-27 Serverless Meetup Tokyo #16
https://serverless.connpass.com/event/165352/
#serverlesstokyo
FaaSで小さくはじめるIoTリアルタイムデータ処理
秋葉原生まれ大手町育ちの歌って踊れる江戸っ子インフラエンジニア。 0と1が紡ぐ「ゆるやかなつながり」に魅せられ早20年、 SNSとCGMの力で世界を幸福にするのがライフワーク。 市民、幸福は義務です。 あなたは幸福ですか?
FaaSで小さくはじめる IoTリアルタイムデータ処理 仲山昌宏 (@nekoruri)
仲山 昌宏 / @nekoruri • • • • サーバーレス極振り屋さん 株式会社WHERE (2016-) SecHack365 トレーナー (2017-) セキュリティ・キャンプ 全国大会 講師 (2015-) • 技術系同人誌サークル 「めもおきば」「ssmjp同人部」 • #ssmjp / #qpstudy /#serverlesstokyo • Microsoft MVP (Azure) • Minecrafter (筋肉整地担当) 2
「あいおーてぃー」の目的 デバイスから データを収集 デバイスを 操作 判断結果を 人に伝える データを処理 分析・判断
「あいおーてぃー」の目的 • EXBeacon (BLEメッシュネットワーク) デバイスから データを収集 「現場」へのフィードバック デバイスを 操作 • SORACOM、LPWA エッジコンピューティング データを処理 現実の状態を推測・意味付け (デジタルツイン) 「人」へのフィードバック 判断結果を 人に伝える 分析・判断 様々な文脈情報と紐付けて 価値のある情報に分析、 デバイスや人への判断を下す
「あいおーてぃー」の目的 デバイスから データを収集 デバイスを 操作 判断結果を 人に伝える データを処理 分析・判断 クラウド側のおしごと
「あいおーてぃー」の目的 デバイスから データを収集 デバイスを 操作 BIツール 外部連携 判断結果を 人に伝える データを処理 分析・判断 クラウド側のおしごと IoTデータ処理 • 即時性(リアルタイム) • 正確性(バッチ)
IoTのデータ処理 即時性が必要なもの 正確性や大量のデータが必要なもの • ひと・ものがどこにいる? • 稼働データ • いまいる場所を知りたい • 場所に紐付けた危険判断 (危険エリアへの滞在など) • 故障の検知 • 現場にいる人への指示 • 動いているデバイスの制御 • 生産管理や課金請求 • 故障予測 • MLに突っ込む大量の生データ
IoTのデータ処理 即時性が必要なもの 正確性や大量のデータが必要なもの • ひと・ものがどこにいる? • 稼働データ • いまいる場所を知りたい • 場所に紐付けた危険判断 (危険エリアへの滞在など) • 故障の検知 • 現場にいる人への指示 • 動いているデバイスの制御 • 生産管理や課金請求 • 故障予測 • MLに突っ込む大量の生データ
リアルタイムデータ処理 • DBでがんばる • とにかく生データをDBに入れてクエリ等で頑張る • ストリームデータ処理エンジン • Spark Streaming、Apache Flink • Apache Beam (プログラミングモデル) • Google Cloud Dataflow (↑のフルマネージドサービス) • PubSub + FaaS + データストア • Azure: EventHubs + Functions • AWS: Kinesis + Lambda • とにかく手軽かつ軽率安易にはじめられる
IoTあるあるユースケース • センサーから上がってくる温度データをリアルタイムで見たい • 温度センサーの前処理 • 電圧⇒温度
IoTあるあるユースケース • センサーから上がってくる温度データをリアルタイムで見たい • 温度センサーの前処理 • 電圧⇒温度 • 誤差の吸収 • 計測誤差:センサーの配線にノイズが乗って電圧が変動する • 環境誤差:人が通ると温度がふらつく
IoTあるあるユースケース • 誤差の吸収 • まあやりかたはたくさんあります • 一番手軽なのは移動平均 • 移動平均……? • 過去N件あるいはM秒以内の値の平均値
基本的なアーキテクチャ PubSub • 生データを集める • バッチ用のアーカイブ 保存 • EventHubs(Azure) • Kinesis Data Streaming(AWS) 関数 データストア • 逐次的にデータを分析 • 判断結果に基づいて行 動 • Azure Functions • AWS Lambda • 処理結果を貯める • CosmosDB(Azure) • DynamoDB(AWS)
Azure: EventHubs + Functions • EventHubsにデータを集める • Azure IoTHubも基本的には一緒 • その話は今回はしません • EventHubsからFunctionsに連携 • Trigger • Output binding
EventHubs Trigger module.exports = async function (context, eventHubMessages) { eventHubMessages.forEach((message, index) => { context.log(`Processed message ${message}`); }); };
EventHubs output binding
module.exports = async function(context) {
const timeStamp = new Date().toISOString();
const message = 'Message created at: ' + timeStamp;
const message = [];
messages.push("1 " + message);
messages.push("2 " + message);
return messages;
};
FaaS is STATELESS! • 移動平均は直近の過去のデータが必要 • 戦略1:CosmosDBやRDBに過去のデータを入れる • デバイスごとに、タイムスタンプで履歴を入れていく • パーティションキー:デバイスのID • レンジキー:タイムスタンプ • デバイスを指定して、直近のデータを取得 • 移動平均を取得してDBに戻す • WRITEが高い • CosmosDBもDynamoDBも書き込み課金が比較的高い
FaaS is STATELESS! • 戦略2:Redisに過去のデータを入れる • 移動平均の直近データ、昔のは要らないのでは? ⇒ いわゆる「状態」データをメモリキャッシュストアに入れる • 結果データやマスタデータはCosmosDB等できちんと永続化
Redis • ちょうはやいメモリキャッシュストア • 参考 • redis、それは危険なほどのスピード https://ameblo.jp/principia-ca/entry-11197286812.html • 時系列データベースという概念をクラウドの技で再構築する https://speakerdeck.com/yuukit/the-rebuild-of-time-seriesdatabase-on-aws
Redisのデータ型 • 基本はKVS (Key-Value-Store) • いくつかのデータ型が存在 • • • • • 文字列 リスト セット(重複しないリスト) ソート済みリスト ハッシュ • 今回はハッシュを利用
データ構造 • Valueに履歴を含むJSONをまとめて突っ込む { "history": [ { "timestamp": 1582798224, "value": 24, "movavg": 24 }, { "timestamp": 1582798224, "value": 25, "movavg": 24.5}, { "timestamp": 1582798224, "value": 26, "movavg": 25 } ] } • Redisから取得 ⇒追加データを使って移動平均を計算 ⇒移動平均の期間外のデータを削除 ⇒新しい履歴データをRedisに更新
気になる性能 • おさらい: 「ちょうはやい」メモリキャッシュストア • ただし:全データがメモリ上に載っかること • メモリにさえ載っていれば、 1コアで秒数万クエリ以上さばける
横道:タイムスタンプ • デバイスで計測したときの時刻 • デバイスの時計、信頼できますか? • NTPへの通信路は安定していますか? • クラウドに上がってきたときの時刻 • できる限り入口で付ける (SORACOM Funnelだと受信時点で付けてくれる) • PubSubに積まれた時刻は取得可能 • データを処理するときの現在時刻 • クラウド内で「処理」するまでの遅延が入る:数100ミリ秒~数十秒 • 処理が詰まるとPubSubに積まれていく(MAX1~7日)
状態を扱うアーキテクチャ メモリキャッシュ • 中間データ(状態)を 保持する • Redis (各社にマネージドサー ビスもあり) PubSub • 生データを集める • バッチ用のアーカイブ 保存 • EventHubs(Azure) • Kinesis Data Streaming(AWS) 関数 データストア • 逐次的にデータを分析 • 判断結果に基づいて行 動 • Azure Functions • AWS Lambda • 処理結果を貯める • CosmosDB(Azure) • DynamoDB(AWS)
ところでそれもっと楽にできるよね? • ストリーミング処理エンジンのSQL (Stream Anaytics) SELECT AVG(Value) AS AverageValue, DeviceId INTO Output FROM Input GROUP BY DeviceId, TumblingWindow(minute, 5) • DSL (Spark Streaming on Scala) sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect()
ところでそれもっと楽に実装できるよね? • ストリーミング処理エンジンのSQL (Stream Anaytics) SELECT AVG(Value) AS AverageValue, DeviceId INTO Output FROM Input GROUP BY DeviceId, TumblingWindow(minute, 5) • DSL (Spark Streaming on Scala) sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect()
まとめ • PubSubとFaaSで手軽にリアルタイムデータ処理が書ける • 直近の状態データを持ちたくなったら、 Redisなどメモリキャッシュを併用すると吉 • 失いたくないデータはお金を払ってきちんと永続化 • 選択肢はFaaS意外にもあるので常に手札は複数もとう • フルマネージドサービス極振りはいいぞ