読者です 読者をやめる 読者になる 読者になる

Technology Topics by Brains

ブレインズテクノロジーの研究開発機関「未来工場」で働くエンジニアが、先端オープン技術、機械学習×データ分析(異常検知、予兆検知)に関する取組みをご紹介します。

SORACOM Funnel ときどき 異常検知

本記事はSORACOMサービスリリース1周年記念ブログの11月2日分です。



皆さん、こんにちは。ブレインズテクノロジーの林です。
今日はSORACOMリリース1周年のブログに寄稿させていただくことになり、以前から気になっていたSORACOM Funnelに関する記事を書かせていただきます。

KYOSOの辻様の記事(RaspberryPi +SORACOM FunnelでIoTデータを閉域網でS3やDynamoDBに格納する)ですでに詳しく説明いただたいておりますので、重複する部分は割愛して、SORACOM Funnelを利用するメリット、データを活用する部分にフォーカスして寄稿いたします。

SORACOM Funnel を利用するメリット

SORACOM Funnelを利用しない場合の、IoTデータの収集〜可視化するまでの流れは、以下の通りです。各デバイスからエージェントを介して、AWS IoT や 独自のGatewayへデータを送信し、ハブとしてKinesisを利用し、各データアプリケーションが可視化・分析等を行うのが一般的な構成になるかと思います。

f:id:brains-tech:20161102022443p:plain

が、しかし、SORACOM Funnelを利用すれば、アーキテクチャーで考慮しなければならないポイントがかなり減ります。

  • 通信経路・データセキュリティに関する考慮(閉域網での接続)
  • データを収集する部分の可用性・拡張性に対する考慮(AWS IoTであれば問題ないが・・・)
  • バイスと収集層との通信プロトコルに関する検討(mqtt / tcp / udp ?)

f:id:brains-tech:20161102022451p:plain

他にも勿論あると思いますし、SORACOMだからこそ可能な、SIM単位での通信ON/OFFの制御、アクセスコントロールの割当の容易さなど、メリットをあげればキリがないですね。つまり、異常検知・データ分析を主戦場とする我々とデバイスの距離が縮まったような気がします。

というわけで、SORACOM Funnel を利用してデバイスからデータを収集し、データの特性分析を行い、異常検知を行う流れをやってみたいと思います。

SORACOM Funnel を介した 異常検知

今回は、RaspberryPi上にセンサーデータが集まるという過程で、SORACOM Air → SORACOM Funnelを経由して、Kinesisへ流れているデータをImpulseが蓄積、分析、リアルタイムの異常検知をかけていくシナリオです。

f:id:brains-tech:20161102030130p:plain

※ Device〜Kinesisへの設定方法は、前述のKYOSOの辻様の記事(RaspberryPi +SORACOM FunnelでIoTデータを閉域網でS3やDynamoDBに格納する)をご参照ください。


複数個の照度センサーデータを一定間隔:5分でデータを送信し、収集したデータ特性を分析し、相関崩れによる異常検知を行うまでの流れになります。照度センサー4つを時系列でほぼ同じような値を取ることを前提として、データ送信をしています。(夜間帯は低く、日中にかけて照度の値が大きくなるような形)

まずは、RaspBerryPi から以下のデータをSORACOM Funnel のエンドポイントに対してデータを送信します。

{
    "description": "illuminance_data"
    , "illuminance_4": 3.88
    , "illuminance_3": 3.67
    , "illuminance_2": 4.63
    , "illuminance_1": 4.59
    , "time": 1477752900
}

そうすると、Kinesisへ以下の通りデータが入ってきます。
各種情報が付与され、送り出したデータは payloads に入っています。
便利ですね!!

{
    "operatorId": "xxxxxxxxxxxx"
   ,"timestamp": 1478055514828
   ,"destination": {
       "resourceUrl": "https://kinesis.us-west-2.amazonaws.com/soracom_funnel"
      ,"service": "kinesis"
      ,"provider": "aws"
    }
    ,"credentialsId": "funnel_kinesis"
    ,"payloads":{
        "description": "illuminance_data"
       ,"illuminance_4": 3.88
       ,"illuminance_3": 3.67
       ,"illuminance_2": 4.63
       ,"illuminance_1": 4.59
       ,"time": 1477752900
     }
     ,"sourceProtocol": "http"
     ,"imsi": "9999999999999999"
}

さて、Kinesisに流されたデータをImpulseへ取り込み、データの特性を分析します。図の右側の通り、特性分析を行うと、照度センサー1〜4の値は非常に高い相関を示していることがわかります。

f:id:brains-tech:20161102134833p:plain

収集データから異常検知を行うために、シュミレーションを行って初期モデルを作成します。作成されたモデルで、Impulseのリアルタイム検知機能を有効にします。有効にした後、RaspberryPiから流すデータの照度を一つだけ小さくすると、相関が崩れたとImpulseが判定し、"異常(いつもと違う)"と判定てくれます。

f:id:brains-tech:20161102134839p:plain

まとめ

SORACOM / SORACOM Funnelを利用することで、本来やりたいことに注力できるようになります。
つまり、データの分析・活用という部分に注力することができるようになるわけです。今後も継続して、新しい技術をウォッチしていこうと思います。

ビバ、SORACOM

おまけ

今回使ったRaspberryPi + SORACOM Sim のセットの写真です。
f:id:brains-tech:20161102162950j:plain

ET/IoT総合技術展・関西 出展レポート

イベント

こんにちは!マーケティング担当の安部です。

今日8月8日はブレインズテクノロジーの創立記念日
早いもので9年目に突入しました…!
ブログをご覧の皆様、日々応援してくださっている皆様、ありがとうございます。
「最先端のオープンテクノロジーで、エンタープライズの世界に技術革新を提供する」明るい未来を創造する技術集団を、引き続きよろしくお願いします!

 

さて、今回のブログは先日の投稿に続き出展レポートです。

当社は2016年7月7日(木)~7月8日(金)にグランフロント大阪で行われた「ET/oT総合技術展・関西」にAWSさんのブースから出展しました。
(オージス総研さん、東海ソフトさんともご一緒させていただきました。)

ブレインズ初の関西出展です!

 

クラウド×機械学習×異常検知

今回も、デモを中心に異常検知のソリューションをご紹介しました。

f:id:brains-tech:20160726143153j:plain

前回のInteropと比べ、来場者の方とブースでじっくりお話しする時間が多かったです。「IoT総合技術展」の名のとおり、IoTの実用的なソリューションを求めている企業が多いことを改めて実感できました。

来場者の方だけでなく、出展企業の方々からもImpulseの機械学習を使った異常検知を高く評価していただき、ビジネスアライアンスの検討に繋がったことも大きな収穫でした。 

FA機器への対応についての問い合わせがあった際、隣のブースの東海ソフトさん(写真では見切れていますが…)と連携して一緒にお話しすることもありました。

f:id:brains-tech:20160726143321j:plain

また、今回はブースでのご紹介以外に「AWSで実現する IoT向け故障予兆検知・分析サービス 『Impulse』のご紹介 」というタイトルでセッションも行いました。
立ち見にも関わらずたくさんの方が聴いてくださり、なんと2日連続でボーナスセッションをゲットしました。

f:id:brains-tech:20160726143218j:plain

 最後に

今回も連日100名近いお客さまにご来場頂き大盛況で終えることができました。
改ましてご来場頂きました皆様、出展の機会を頂いたAWSの皆様、共同出展でお世話になった皆様、ありがとうございました!

Impulseの次回出展は…ご期待ください! 

Interop Tokyo 2016 出展レポート

イベント

こんにちは!マーケティング担当の安部です。

当社は2016年6月8日(水)~6月10日(金)幕張メッセで行われたInterop Tokyo 2016に出展いたしました。
連日100名を超えるお客様にご来場いただき、当社製品「Impulse」をご紹介することができました。ありがとうございました!

イベントが終了して早くも一ヶ月が経とうとしていますね…

レポートが遅くなってスミマセン。

f:id:brains-tech:20160701192406j:plain

当社のブースは、IoT Worldのエリアに出展しました。

リアルタイム予測分析プラットフォーム Impulse

「Impulse」は、リアルタイムの分析プラットフォームです。
*詳細はこちらをどうぞ!
Impulse(リアルタイム大規模データ分析基盤) | 製品&サービス | ブレインズテクノロジー株式会社

今回は、IoT用途として「照度センサーを使った異常検知」と「不良品検出ソリューション」のデモを紹介しました。

f:id:brains-tech:20160701192521j:plain

Best of Show Award」特別賞受賞

そして・・・突如舞い込んだビッグニュース。

300近くエントリーされた新製品の中、また、名だたる企業がファイナリストに選ばれる中…まさかの受賞でした。
初出展の企業がアワードを受賞するのは、初めての出来事だそうです。

f:id:brains-tech:20160701192527j:plain f:id:brains-tech:20160701192814j:plain


審査員の方々からは、

- 機械学習を実用性の高い分野でサービスとして展開していること

- 利用者のメリットが明確であること

を高く評価して頂きました。


f:id:brains-tech:20160701192448j:plain

ブースを訪れた方々からは、

機械学習ってよく聞くけど、結局良くわからない…」
「なんとなく興味はあるけど、実際どう使われているの?」

というコメントを多く耳にしました。

私たちはImpulseを「異常検知」という実用性の高い分野でサービス展開し、多くの企業の皆さまが機械学習活用の恩恵を享受できる努力を続けていきたいと思っています。

 

最後に

来場してくれた方のブログに・・・Impulseが紹介されていました。
Interop2016で見つけたヤンデレ彼女用IoTまとめ [Interop Tokyo 2016] | ツチノコブログ


今後も様々なイベントへ出展し、異常検知に関わるソリューションを紹介していきます。Impulseの「これから」にご期待ください!

機械学習アルゴリズム実装シリーズ vol 2[ロジスティック回帰編]

Machine Learning

こんにちは。ブレインズテクノロジー樋口です。
機械学習アルゴリズムを一から実装するシリーズ2回目。今回のお題はロジスティック回帰(Logistic Regression)の実装。
前回と同様あまり難しい説明はなしに実装していくつもりですが、もっと詳しいこと教えんかい( ゚д゚ )クワッ!!となったら以下のような専門書をどうぞ。
machinelearningmastery.com

ロジスティック回帰とは

ロジスティック回帰(Logistic Regression)は分類問題を解決するための手法で教師あり学習に分類されます。
元々は統計の分野で発展してきたもので、応用先としては
・e-mailのスパム判定
・病理標本の悪性判定
トランザクションの正常性判定

などに使われているようです。
# 名前には回帰(Regression)とついてるんですけど分類なんですね。。。

ロジスティック回帰[理論編]

前回、線形回帰ではモデルを以下のような数式で表しました。
h_θ(x)=θ_0+θ_1x
ロジスティック回帰の場合は上記の式を更にシグモイド関数という関数に入れます。式で表わすと以下のようになります。
h_θ(x)=g(θ^TX) -①
g_θ(z)=\frac{1}{1+e^{-z}}
※ちみなにシグモイド関数は以下のような波形を描きます。
f:id:brains-tech:20160307110209p:plain
シグモイド関数 - Wikipedia
jp.mathworks.com



①により、データを入力すると0~1の間の値を返してきて、それがあるクラスに当てはまる確率となります。
例えば、ある特徴量x_0, x_1を入力した時、出力された値がh_θ(X)=0.7だった場合、それが当該クラスである確率は70%であるといった具合です。

では、①の式が適切な値を出力してくれるようにθの値を学習していくためにはどうすればよいでしょうか。

前回と同じく、目的関数を定めた後その目的関数を最小とするような θの値を求めていきます。

ロジスティック回帰では目的関数に以下の式を使用します。
J(θ_0, θ_1)=-\frac{1}{m}\sum_{i=1}^{m} (y^i\log(h_θ(x^i)) + (1-y^i)\log(1-h_θ(x^i)) -②
xは特徴量, yはラベル

ここからは前回と同じです。最急降下法を使って、このJ(θ_0, θ_1)を最小にする\thetaを求めます。
ここで偏微分とかいろいろ出てくるのですが、とりあえず置いておき、つまるところ以下の式を計算していくことになります。
次のθ_j=θ_j-α\sum_{i=1}^{m} ((h_θ(x^i)-y^i)x^i_j) -③

データを見る

以上がかなり簡単な理論のお話です。
移行は実装に入っていきますが、まずは今回のデータを見てみましょう。
以下の様なデータを用意しました。
f:id:brains-tech:20160403195901p:plain

赤、青でプロットされた点があり、それぞれがクラス(赤: class 1, 青: class 0)を表しています。
今回はこの2つのクラスのデータを学習して、新しい点が入力された時そのデータが赤クラスである確率はどれくらいなのか、つまりどちらのクラスなのか分類したいと思います。
※実装がメインですので、シンプルなデータにしてあります。

実装!

ここまで説明してきたことをコードに落とすと以下のようになります。
コメントで説明を入れてありますが、分かりにくいところは少し動かしてみることをオススメします。
[コード]

# -*- coding:utf-8 -*-
import numpy as np
import matplotlib.pyplot as plt


def gradient_decent(theta, X, y, iter_num, alpha):
    """
    最急降下法メソッド
    与えられたiter_num分だけ最急降下法を適用してJ(θ)を最小とするthetaを求める
    :param theta: モデルのパラメータθ
    :param X: 入力データ(学習に使われるデータ)
    :param y: 入力データに対するラベル
    :param iter_num: 最急降下法の適用回数
    :param alpha: 学習率
    :return: theta, J (学習後のtheta, 学習途中での目的関数の値)
    """

    m = float(len(X))
    J = []
    for i in range(iter_num):
        # 現在のthetaでh_thetaを計算 (
        h = sigmoid(np.inner(theta, X))  # --- ①式
        # 目的関数を可視化のために計算
        J.append(1/m * np.sum((-y*np.log(h) - (1-y)*np.log(1-h))))  # --- ②式
        # 以下で, ③式を計算してthetaを更新していく
        theta1_tmp = 1/m * np.sum((h-y)*X[:, 0])
        theta2_tmp = 1/m * np.sum((h-y)*X[:, 1])
        theta3_tmp = 1/m * np.sum((h-y)*X[:, 2])
        theta = theta - alpha*np.array([theta1_tmp, theta2_tmp, theta3_tmp])
        print "theta: ", theta
    return theta, J


def sigmoid(x):
    """
    与えられたデータをsigmoid関数へ入れて計算結果を返す
    sigmoid関数
    :param x: 入力データ
    :return: sigmoid関数適用後のデータ
    """
    return 1/(1 + np.exp(-x))
    

def main():
    # サンプルデータのロード
    # 前述した散布図のデータ
    sample_data = np.load("./data.npy")

    # ラベルデータのロード
    # 赤: 1, 青: 0 として表現
    y = np.load("./label.npy")

    # 学習データの作成
    x_tmp = np.ones(len(sample_data))
    X = np.c_[x_tmp, sample_data]

    # パラメータの初期化
    # thataは任意の整数
    # alphaは学習率で予めこちらで調整した値が代入されています
    # iter_numの回数により何回最急降下法が回るかが決まります
    theta = np.array([1,1,1])
    alpha = 0.5
    iter_num = 100

    # 最急降下法を使って学習を行う
    theta, J = gradient_decent(theta, X, y, iter_num, alpha)
    plt.plot(J)
    plt.show()


if __name__ == "__main__":
    main()


[実行結果]

$ python lr.py
theta:  [ 0.8319953   0.68474584  0.81423634]
theta:  [ 0.67634834  0.38081935  0.63610057]
theta:  [ 0.53901998  0.09768263  0.47012794]
theta:  [ 0.42658727 -0.1536833   0.32244438]
theta:  [ 0.34236108 -0.36727612  0.19797763]
theta:  [ 0.28391004 -0.54596135  0.09699565]
theta:  [ 0.24549959 -0.69715343  0.01567327]
theta:  [ 0.22146887 -0.82779591 -0.05084846]
theta:  [ 0.20750074 -0.94295183 -0.10667272]
theta:  [ 0.6395262  -2.837131   -1.09546083]
...
...
...
theta:  [ 0.89455876 -3.43082868 -1.50299413]
theta:  [ 0.90015444 -3.44389053 -1.51225235]
theta:  [ 0.90569562 -3.45683949 -1.52143847]
theta:  [ 0.91118314 -3.46967788 -1.53055366]
theta:  [ 0.91661788 -3.48240793 -1.53959907]
theta:  [ 0.92200064 -3.49503179 -1.54857581]
theta:  [ 0.92733226 -3.50755156 -1.55748496]
theta:  [ 0.93261354 -3.51996927 -1.56632758]

[目的関数J(θ)の推移]
f:id:mhigu:20160314074359p:plain

学習の結果、目的関数の値が小さくなり収束していっていることが分かります。

確認のために学習したモデルに値を入力して結果を観測します。
以下にテスト用に別に作ったスクリプトを記載します。
x_0=-2, x_1=1の場合

# -*- coding:utf-8 -*-

import numpy as np
import matplotlib.pyplot as plt


def sigmoid(x):
    # 与えられたデータをシグモイド関数へ入れて
    # 計算された結果を返す
    return 1/(1 + np.exp(-x))
    

def test():

    # プロット用にデータをロード
    X = np.load("data.npy")
    label = np.load("label.npy")

    # theta:  [ 0.93261354 -3.51996927 -1.56632758] を利用
    theta = np.array([0.93261354, -3.51996927, -1.56632578])

    # x0=2, x1=1を設定
    x = np.array([1, -2, 1])
    class_1_rate = sigmoid(np.dot(theta, x))
    class_num = -1

    # モデルから返ってきた値が0.5より大きければ赤(class 1)
    # 0.5以下であれば青(class 0) 
    if class_1_rate > 0.5:
        class_num = 1
    elif class_1_rate <= 0.5:
        class_num = 0
    print "[RESULT]"
    print "################"
    print "Model Output: {0}".format(class_1_rate)
    print "Input Data is class {0}".format(class_num)
    print "################"

    # 散布図をプロット
    plt.scatter(X[:,0], X[:,1], c = label, s= 100)
    plt.scatter(x[1], x[2], marker='^', color='g', s=150)
    plt.show()

if __name__ == "__main__":
    test()

[実行結果]

[RESULT]
################
Model Output: 0.998351478053
Input Data is class 1
################

ちゃんと動いているように見えます。。。
散布図を作って見るとしっかり分類できていますね。(緑の三角がinputしたものです)
f:id:brains-tech:20160404000807p:plain
ひとまずは実装完了です。
が、本来であれば、過学習を考慮して罰則項を足してモデルを作ったり、特徴量を取るときに正規化を行ったりと様々工夫が入ります。興味のある方はそのへんまで是非是非調べてみてください。

結び

今回は分類問題を解くための教師あり機械学習アルゴリズム、ロジスティック回帰を実装してきました。
分類問題を解くための機械学習アルゴリズムには、ロジスティック回帰の他にも、SVMニューラルネットワークなどたくさんあります。最近ディープラーニングがすごく話題になってますが、ニューラルネットワークはその走りですよね。次回は、そのニューラルネットワークを今回のロジスティック回帰との違いに着目しながら実装していきたいと思います。

ではでは(((((((((((っ・ω・)っ ブーン

Spark1.6.0のDataset APIを触ってみた

Spark

あけましておめでとうございます。 Impulse開発チームの木村です。

今回は、Spark 1.6.0で導入されたDataset APIを、spark-shell上で触ってみました。

Dataset APIとは

Dataset APIは、RDDやDataFrameと同じく、データのまとまりを扱うためのAPIです。 RDDとDataFrame双方の長所を合わせ持つAPIとして、開発が進められています。 (なお、ver1.6.0でのDataset API導入は、あくまで実験的なものであり、ver2.0.0での本リリースが予定されているようです。)

対応するIssueによれば、Datasetの要件として次が掲げられています。

  • Fast
    • ほとんどのケースで、RDD以上のパフォーマンス
  • Typesafe
  • Support for a variety of object models
    • デフォルトで様々なオブジェクトを、Datasetの要素としてエンコード可能
  • Java Compatible
  • Interoperates with DataFrames
    • DatasetとDataFrame間での(お決まりのコードを書く必要のない)シームレスな変換

ver1.6.0時点では、ScalaJavaからDataset APIを使用できます。 Pythonのサポートは、先のリリースとなるそうです。

Datasetの作り方

  • Seqから作る

toDSを使って簡単に、SeqをDatasetに変換できます。

// spark-shell上では、sqlContext.implicits._のimportは不要

// 数が要素のDataset
scala> val numberDS = Seq(1, 2, 3).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> numberDS.show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+

// case classのインスタンスが要素のDataset
scala> :paste
case class Player(
  name: String,
  weapon: String,
  rank: Int
)
defined class Player

scala> :paste
val playerDS = Seq(
  Player("Ponce", "Splattershot Jr.", 9), 
  Player("Paciorek", "E-Liter 3K", 50),
  Player("Petrick", "Splattershot Jr.", 12)
).toDS()
playerDS: org.apache.spark.sql.Dataset[Player] = [name: string, weapon: string, rank: int]

scala> playerDS.show()
+--------+----------------+----+
|    name|          weapon|rank|
+--------+----------------+----+
|   Ponce|Splattershot Jr.|   9|
|Paciorek|      E-Liter 3K|  50|
| Petrick|Splattershot Jr.|  12|
+--------+----------------+----+
  • DataFrameから作る

DataFrame#as[T]を使って、DataFrameをDatasetに変換できます。

// 適当なDataFrameを作成
scala> val df = sqlContext.read.json(sc.parallelize("""{"x": 11, "y": 22}""" :: Nil))
df: org.apache.spark.sql.DataFrame = [x: bigint, y: bigint]

scala> case class Coordinate(x: Long, y: Long)
defined class Coordinate

// DataFrameのカラムとcase classのフィールドの対応は、名前で判断される
scala> val ds = df.as[Coordinate]
ds: org.apache.spark.sql.Dataset[Coordinate] = [x: bigint, y: bigint]

scala> ds.show()
+---+---+
|  x|  y|
+---+---+
| 11| 22|
+---+---+

Datasetへの操作

今までと似た感覚で操作を行えます。

  • フィルタ
scala> val numberDS = Seq(1, 2, 3).toDS()
numberDS: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> numberDS.show()
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+

scala> :paste
numberDS
  .filter(_ >= 2)
  .show()
+-----+
|value|
+-----+
|    2|
|    3|
+-----+
  • 集計
scala> :paste
case class Player(
  name: String,
  weapon: String,
  rank: Int
)
defined class Player

scala> :paste
val playerDS = Seq(
  Player("Ponce", "Splattershot Jr.", 9), 
  Player("Paciorek", "E-Liter 3K", 50),
  Player("Petrick", "Splattershot Jr.", 12)
).toDS()
playerDS: org.apache.spark.sql.Dataset[Player] = [name: string, weapon: string, rank: int]

scala> playerDS.show()
+--------+----------------+----+
|    name|          weapon|rank|
+--------+----------------+----+
|   Ponce|Splattershot Jr.|   9|
|Paciorek|      E-Liter 3K|  50|
| Petrick|Splattershot Jr.|  12|
+--------+----------------+----+

scala> :paste
playerDS
  .groupBy(_.weapon)
  .agg(count("weapon"))
  .show()
+----------------+-------------+
|           value|count(weapon)|
+----------------+-------------+
|      E-Liter 3K|            1|
|Splattershot Jr.|            2|
+----------------+-------------+

ただし、RDDやDataFrameに存在する操作の一部(orderBydropなど)は、現時点では実装されていません。 このような操作を行う場合は、一度RDDやDataFrameに変換する必要があります。 (Dataset#rddRDDに、Dataset#toDFでDataFrameに変換することができます)

おわりに

Dataset APIについて紹介しました。

Sparkはバージョンアップの間隔が短く、新しい機能がどんどん出てきます。 追っていくのは比較的大変ですが、良い新機能はどんどん取り入れるべく、精進していこうと思います。

(deprecatedが頻発しないといいなあ…)

参考

World of IoTに出展したデモ

AWS soracom

こんにちは。おじさんチームのこんのです。
今回は、去る12月16日〜18日に東京ビッグサイトで開催されたWorld of IoT | SEMICON Japanで出展したデモについて紹介したいと思います。
なお、この3日間のイベントで、200名を超える方々が弊社のImpulseについて興味を持ち、お話を聞きにきてくださいました。ありがとうございます!!

デモは、弊社サービスのひとつである、リアルタイム大規模データ分析基盤Impulseを使った異常検知です。
Impulseとはなんぞや?という方は下記ページをご覧ください。
www.brains-tech.co.jp

太陽光発電パネルの発電量の故障予兆検知を想定し、それに似せた構成を取りました。
なお、今回のデモにあたり、アットマークテクノさんからArmadillo-IoTをお貸しいただきました。
アットマークテクノさんのご厚意に、この場を借りてお礼申し上げます!

f:id:bti-konno:20151218230257p:plain

概要構成は上記図のとおりです。ちょっとだけ詳しく流れを説明すると、

  1. 4つ並べた照度センサー(太陽光発電パネルの代わり)で照度(発電量の代わり)を測定し、その照度をアットマークテクノさんのArmadillo-IoTで収集します。
  2. 照度センサーから受け取ったデータは、アットマークテクノさんに作成いただいたプログラムにてファイルにJSON形式で追記されていきます。
  3. 上記ファイルを fluentd で監視し、6秒おきに溜まったデータを MQTT でAWS IoTにPublishします。
  4. AWS IoT から弊社Impulseにデータを流し込みます。
  5. Impulseでは正常状態を学習させておき、学習したモデルから入力されたデータが大きく外れていないかを検知します。
  6. 異常を検知した場合、AWS IoTのRasberry Pi2のShadowを書き換えます。
  7. Raspberry Pi2 は、Shadowが書き換わっていた場合、Arduino、LED、スピーカーを使って異常を知らせます。

Armadillo-IoTは、開発環境で使われるDebian(SDカード)で起動するようにしています。開発環境DebianでArmadillo-IoTを使用するとき、SWAP領域を作ることを忘れないでください。これがないとC/C++のソースからのコンパイルがかなりの割合で失敗します。ご注意ください。
また、dateが2000年スタートになっています。気付かずに証明書を入れてMQTTSで通信しようとするとSSLエラーがでます。日付時刻修正をお忘れなく。

fluentdからAWS IoTにMQTTでPublishするPluginは、yuuna/fluent-plugin-mqtt · GitHub を使わせていただきました。
yuunaさんにこの場を借りてお礼を申し上げます!

今回SORACOMさんのモバイルネットワークを使ってAWS IoTへ連携していますが、SORACOM Beamは使っていません。
# SORACOMさんの回線を使うことに決めたのがイベント前日ということもあり・・・。
# つまり、SORACOMさんのセットアップはものすごく簡単ということですね、はい。
# Armadillo-IoT、Raspberry Pi両方ともSORACOMさん接続です。
# Armadillo-IoTの3Gセットアップはアットマークテクノさんのマニュアルページに丁寧に記載されています。
# Raspberry Pi2は dietposterさんのQiitaを参考にしました。
# dietposterさん、ありがとうございます m(_"_)m

AWS IoTでは、PublishされたトピックのメッセージをそのままKinesisに流し込んでいます。

ImpulseではこのKinesisから流し込まれた値を取り込み、正常状態から外れていないかを指定された秒数間隔でチェックします。今回は同じような環境にある複数の照度センサーでしたので、Impulseのデータ特性分析から相関性が高い状態にあることがわかっています。つまり、Impulseではこの相関性が崩れた時に異常と判定します。
異常判定時は、Lambdaを通してRaspberry Pi2のShadowを書き換えています。

Raspberry Pi2では、AWS IoT SDK(node.js版)を使って、Shadowが書き換わった際に呼び出される function で(GPIO経由で)LEDを点灯させ、ミニディスプレイ付きArduino(実際には SparkFun Electronics のMicroView)にてメッセージを表示し、スピーカーから異常検知したことを伝えるようにしています。
f:id:bti-konno:20151219001132p:plain
スピーカーからお知らせする内容は、OpenJTalk を使って予め合成音声(合成音声には名工大のメイちゃんを使わせていただきました)を作っておいたものをaplayで再生しています。
なお、node.js(Javascript)からGPIOへ簡単にアクセスするために、Intelが開発している mraa ライブラリをインストールしています。mraa はIntel Edison等で主に使われていますが、Raspberry Piもカバー範囲に入っています。おすすめです。また、実際のコードは、AWS IoT Handsonで公開しているサンプルプログラムを若干改良しただけで済んでいます。

ざっくりと簡単に紹介いたしました。
いかがでしたか?
このような簡単な準備、構成で Impulse による異常検知ができてしまいます。

イベントの期間中、デモをお見せしたお客さまからこんなことを言われました。
「人がやっても同じではないか?Impulseは何がすごいのか?」
お客さま、いいことをおっしゃってくれました。それです。それなんです。
機械学習を活用することで、[人と同じこと]がコンピューター上でできるようになったんです。
さらにコンピューターでやることで、[人以上]に処理することができるのです。
確かに10個やその程度の数であれば人でやったほうが早いかもしれません。それが数百、数千、数万、それ以上あった場合はどうでしょう?膨大な数のデータソースを監視し異常を見つけ出すのは、人では途方も無いコストをかけなければ難しいでしょう。あるいは、それを長期にわたって絶え間なく監視し続けなければならないとしたらどうでしょう?やはり人でやるのは難しいと思います。人と同じように異常を見つけ出し、人以上に(長期間、膨大な数を)処理できる。それこそがImpulse、といいますか機械学習を活用したサービスが提供できるメリットなのです。

センサーでこれから何かしらデータを集めようとしているみなさま、分析しようとしているみなさま、故障予兆検知をしたいというニーズをお持ちのみなさま、Impulseであれば収集・蓄積・可視化から異常検知まで一通りの機能を提供できます。ぜひブレインズテクノロジーにご相談ください。
きっとお役に立てると思います。

【Spark】Window Functions(その2)

Spark

どうも、ポンセです。
前回の続きです(タイトルを微妙に変えていますが)。

SparkというよりSQLのWindow関数周りの話になっている気がしますが、気にせず書きます。

今回はSQLの形式で書きたいと思います。

ROWS

前回と同様にグループ単位での平均値を行毎で算出するのですが、
windowの行数を指定して集計したいとします。

そんな時は ROWS BETWEEN ~ AND ~ 。

データ
sample
+-------------------+-------+-----+
|          timestamp|metrics|value|
+-------------------+-------+-----+
|2015-08-01 00:00:00|    cpu|   40|
|2015-08-01 00:00:00| memory|   50|
|2015-08-01 00:00:00|disk_io|   70|
|2015-08-01 01:00:00|    cpu|   20|
|2015-08-01 01:00:00| memory|   90|
|2015-08-01 01:00:00|disk_io|   70|
|2015-08-01 02:00:00|    cpu|  100|
|2015-08-01 02:00:00| memory|   55|
|2015-08-01 02:00:00|disk_io|   72|
|2015-08-01 03:00:00|    cpu|   30|
|2015-08-01 03:00:00| memory|   60|
|2015-08-01 03:00:00|disk_io|   72|
|2015-08-01 04:00:00|    cpu|   50|
|2015-08-01 04:00:00| memory|   65|
|2015-08-01 04:00:00|disk_io|   76|
|2015-08-01 05:00:00|    cpu|   45|
|2015-08-01 05:00:00| memory|   40|
|2015-08-01 05:00:00|disk_io|   90|
+-------------------+-------+-----+
SQL
SELECT
  timestamp,
  metrics,
  value,
  AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as average
FROM
  sample
結果
+-------------------+-------+-----+------------------+
|          timestamp|metrics|value|           average|
+-------------------+-------+-----+------------------+
|2015-08-01 00:00:00|    cpu|   40|              30.0|
|2015-08-01 01:00:00|    cpu|   20|53.333333333333336|
|2015-08-01 02:00:00|    cpu|  100|              50.0|<- ***
|2015-08-01 03:00:00|    cpu|   30|              60.0|
|2015-08-01 04:00:00|    cpu|   50|41.666666666666664|
|2015-08-01 05:00:00|    cpu|   45|              47.5|
|2015-08-01 00:00:00|disk_io|   70|              70.0|
|2015-08-01 01:00:00|disk_io|   70| 70.66666666666667|
|2015-08-01 02:00:00|disk_io|   72| 71.33333333333333|
|2015-08-01 03:00:00|disk_io|   72| 73.33333333333333|
|2015-08-01 04:00:00|disk_io|   76| 79.33333333333333|
|2015-08-01 05:00:00|disk_io|   90|              83.0|
|2015-08-01 00:00:00| memory|   50|              70.0|
|2015-08-01 01:00:00| memory|   90|              65.0|
|2015-08-01 02:00:00| memory|   55| 68.33333333333333|
|2015-08-01 03:00:00| memory|   60|              60.0|
|2015-08-01 04:00:00| memory|   65|              55.0|
|2015-08-01 05:00:00| memory|   40|              52.5|
+-------------------+-------+-----+------------------+

例えば3行目を見てみます。
3行目はmetrics cpuに対して、20(1つ前の行)、 100(現在行)、30(1つ後ろの行)の平均値(すなわち50)となっています。
このようにROWS BETWEEN ~ AND ~ の構文で行数単位での集計が可能となります。

RANGE

今度はある値より大きいまたは小さい行でグルーピングして集計したいとします。

そんな時は RANGE BETWEEN ~ AND ~ 。

データ

データは上記例とは違い、timestampの間隔をバラバラにしています。

ex_sample
+-------------------+-------+-----+
|          timestamp|metrics|value|
+-------------------+-------+-----+
|2015-08-01 00:00:00|    cpu|   40|
|2015-08-01 00:02:00| memory|   50|
|2015-08-01 00:01:00|disk_io|   70|
|2015-08-01 00:05:00|    cpu|   20|
|2015-08-01 00:04:00| memory|   90|
|2015-08-01 00:04:00|disk_io|   70|
|2015-08-01 00:07:00|    cpu|  100|
|2015-08-01 00:10:00| memory|   55|
|2015-08-01 00:15:00|disk_io|   72|
|2015-08-01 00:15:00|    cpu|   30|
|2015-08-01 00:30:00| memory|   60|
|2015-08-01 00:20:00|disk_io|   72|
|2015-08-01 00:27:00|    cpu|   50|
|2015-08-01 00:35:00| memory|   65|
|2015-08-01 00:24:00|disk_io|   76|
|2015-08-01 00:32:00|    cpu|   45|
|2015-08-01 00:54:00| memory|   40|
|2015-08-01 00:40:00|disk_io|   90|
+-------------------+-------+-----+
SQL
SELECT
  timestamp,
  unix_timestamp(timestamp) as unix_time,
  metrics,
  value,
  AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) RANGE BETWEEN 600 PRECEDING AND CURRENT ROW) as average
FROM
  ex_sample
結果
+-------------------+----------+-------+-----+------------------+
|          timestamp| unix_time|metrics|value|           average|
+-------------------+----------+-------+-----+------------------+
|2015-08-01 00:00:00|1438354800|    cpu|   40|              40.0|
|2015-08-01 00:05:00|1438355100|    cpu|   20|              30.0|
|2015-08-01 00:07:00|1438355220|    cpu|  100|53.333333333333336|
|2015-08-01 00:15:00|1438355700|    cpu|   30|              50.0|<- ***
|2015-08-01 00:27:00|1438356420|    cpu|   50|              50.0|
|2015-08-01 00:32:00|1438356720|    cpu|   45|              47.5|
|2015-08-01 00:01:00|1438354860|disk_io|   70|              70.0|
|2015-08-01 00:04:00|1438355040|disk_io|   70|              70.0|
|2015-08-01 00:15:00|1438355700|disk_io|   72|              72.0|
|2015-08-01 00:20:00|1438356000|disk_io|   72|              72.0|
|2015-08-01 00:24:00|1438356240|disk_io|   76| 73.33333333333333|
|2015-08-01 00:40:00|1438357200|disk_io|   90|              90.0|
|2015-08-01 00:02:00|1438354920| memory|   50|              50.0|
|2015-08-01 00:04:00|1438355040| memory|   90|              70.0|
|2015-08-01 00:10:00|1438355400| memory|   55|              65.0|
|2015-08-01 00:30:00|1438356600| memory|   60|              60.0|
|2015-08-01 00:35:00|1438356900| memory|   65|              62.5|
|2015-08-01 00:54:00|1438358040| memory|   40|              40.0|
+-------------------+----------+-------+-----+------------------+

わかりやすくするために、unixtimeの行を追加しています。

例えば4行目を見てみますと、
この行ではORDER BY句で指定しているunixtimeに対して、600秒前から現在行すなわち、30(現在行)、100(1つ前)、20(2つ前)の平均(すなわち50)となっています。
このようにRANGE BETWEEN ~ AND ~ の構文で特定値より大きいまたは小さい行による集計が可能となります。

まとめ

・ROWSとRANGEを使って行数指定、値の幅指定を行ってSparkで集計してみました(全然大規模なデータじゃないけど)。

さいごに

Window関数に加えて、Spark1.5では Date time や String 周りの関数がかなりサポートされているので、集計周りは相当充実してきた印象です。もうすぐ1.6もリリースされそうなのでそちらにも注目していきたいところですね。

今回の例で使ったpythonのコードを載せておきます。

df = sqlCtx.createDataFrame([
  ('2015-08-01 00:00:00','cpu',40), ('2015-08-01 00:00:00','memory',50), ('2015-08-01 00:00:00','disk_io',70), 
  ('2015-08-01 01:00:00','cpu',20), ('2015-08-01 01:00:00','memory',90), ('2015-08-01 01:00:00','disk_io',70), 
  ('2015-08-01 02:00:00','cpu',100), ('2015-08-01 02:00:00','memory',55), ('2015-08-01 02:00:00','disk_io',72),
  ('2015-08-01 03:00:00','cpu',30), ('2015-08-01 03:00:00','memory',60), ('2015-08-01 03:00:00','disk_io',72), 
  ('2015-08-01 04:00:00','cpu',50), ('2015-08-01 04:00:00','memory',65), ('2015-08-01 04:00:00','disk_io',76),
  ('2015-08-01 05:00:00','cpu',45), ('2015-08-01 05:00:00','memory',40), ('2015-08-01 05:00:00','disk_io',90)
], ['timestamp','metrics','value'])

df.registerTempTable('sample')

q = """
SELECT
  timestamp,
  metrics,
  value,
  AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as average
FROM
  sample
"""

sqlContext.sql(q).show()



ex_df = sqlCtx.createDataFrame([
  ('2015-08-01 00:00:00','cpu',40), ('2015-08-01 00:02:00','memory',50), ('2015-08-01 00:01:00','disk_io',70), 
  ('2015-08-01 00:05:00','cpu',20), ('2015-08-01 00:04:00','memory',90), ('2015-08-01 00:04:00','disk_io',70), 
  ('2015-08-01 00:07:00','cpu',100), ('2015-08-01 00:10:00','memory',55), ('2015-08-01 00:15:00','disk_io',72),
  ('2015-08-01 00:15:00','cpu',30), ('2015-08-01 00:30:00','memory',60), ('2015-08-01 00:20:00','disk_io',72), 
  ('2015-08-01 00:27:00','cpu',50), ('2015-08-01 00:35:00','memory',65), ('2015-08-01 00:24:00','disk_io',76),
  ('2015-08-01 00:32:00','cpu',45), ('2015-08-01 00:54:00','memory',40), ('2015-08-01 00:40:00','disk_io',90)
], ['timestamp','metrics','value'])

ex_df.registerTempTable('ex_sample')


q = """
SELECT
  timestamp,
  unix_timestamp(timestamp) as unix_time,
  metrics,
  value,
  AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) RANGE BETWEEN 600 PRECEDING AND CURRENT ROW) as average
FROM
  ex_sample
"""

sqlContext.sql(q).show()