Technology Topics by Brains

ブレインズテクノロジーの研究開発機関「未来工場」で働くエンジニアが、先端オープン技術、機械学習×データ分析(異常検知、予兆検知)に関する取組みをご紹介します。

Pythonによる時系列データの異常検知

インターン生の松井(B4)です.時系列データの異常検知手法をまとめました.

入門 機械学習による異常検知という本の7章が時系列データの異常検知を扱っています.(本書の内容をまとめたWeb記事もあります.)
www.coronasha.co.jp
この本のサンプルコードはすべてRで書かれているため,Python (+numpy, scikit-learn) で書き直してみました.

後半では,深層学習を用いた時系列データの異常検知手法について,知られている所をまとめました.

k近傍法による異常部位検出

時系列データの異常検知手法の中でも比較的シンプルなやり方です.
訓練用の時系列データをスライド窓によりベクトル化しておき,k近傍法を用いて新たな時系列の異常度を計算します.

k=1として最近傍法を実装したコードがこちら.

# -*- coding: utf-8 -*-
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import NearestNeighbors

def main():
    data = np.loadtxt("your path/qtdbsel102.txt",delimiter="\t")

    train_data = data[1:3000, 2]
    test_data = data[3001:6000, 2]

    width = 100
    nk = 1

    train = embed(train_data, width)
    test = embed(test_data, width)

    neigh = NearestNeighbors(n_neighbors=nk)
    neigh.fit(train)
    d = neigh.kneighbors(test)[0]

    # 距離をmax1にするデータ整形
    mx = np.max(d)
    d = d / mx

    # プロット
    test_for_plot = data[3001+width:6000, 2]
    fig = plt.figure()
    ax1 = fig.add_subplot(111)
    ax2 = ax1.twinx()

    p1, = ax1.plot(d, '-b')
    ax1.set_ylabel('distance')
    ax1.set_ylim(0, 1.2)
    p2, = ax2.plot(test_for_plot, '-g')
    ax2.set_ylabel('original')
    ax2.set_ylim(0, 12.0)
    plt.title("Nearest Neighbors")
    ax1.legend([p1, p2], ["distance", "original"])
    plt.savefig('./results/knn.png')
    plt.show()


def embed(lst, dim):
    emb = np.empty((0,dim), float)
    for i in range(lst.size - dim + 1):
        tmp = np.array(lst[i:i+dim])[::-1].reshape((1,-1)) 
        emb = np.append( emb, tmp, axis=0)
    return emb

if __name__ == '__main__':
    main()

心電図のデータをサンプルとして用いています.
周期的で正常な心電が見られる部分を訓練データとし,乱れている部分を含むようにテストデータの区間を取りました.
結果のプロットは以下のようになりました.
f:id:Nori_matsu:20170930222409p:plain
異常を含む区間で異常度 (グラフ中distanceと表記) が高くなっていることがわかります.

k近傍法では,注目波形パターンが,訓練データ中の波形パターン群の中でk番目に近いものと,どれだけ異なるかを表しています.
この手法は,綺麗な周期的データが予測される場合には有用ですが,周期的に繰り返しつつも長期的に上昇していくようなトレンドを持つ波形の場合にうまくいきません.

特異スペクトル変換法による変化点検知

一方,ある点の前後の波形を比較して変化点検出を行う特異スペクトル変換法という手法は,波形の長期的なトレンドにも強いと考えられます.
特異スペクトル変換法については,こちらのWebサイトにわかりやすくまとまっています.
numpyを用いて特異スペクトル変換法を実装したコードはこちら.

# -*- coding: utf-8 -*-
import numpy as np
import matplotlib.pyplot as plt

def main():
    data = np.loadtxt("your path/qtdbsel102.txt",delimiter="\t")

    train_data = data[1:3000, 2]
    test_data = data[3001:6000, 2]

    w = 50 # width
    m = 2
    k = w/2
    L = k/2 # lag
    Tt = test_data.size
    score = np.zeros(Tt)

    for t in range(w+k, Tt-L+1+1):
        tstart = t-w-k+1
        tend = t-1
        X1 = embed(test_data[tstart:tend], w).T[::-1, :] # trajectory matrix
        X2 = embed(test_data[(tstart+L):(tend+L)], w).T[::-1, :] # test matrix

        U1, s1, V1 = np.linalg.svd(X1, full_matrices=True)
        U1 = U1[:,0:m]
        U2, s2, V2 = np.linalg.svd(X2, full_matrices=True)
        U2 = U2[:,0:m]

        U, s, V = np.linalg.svd(U1.T.dot(U2), full_matrices=True)
        sig1 = s[0]
        score[t] = 1 - np.square(sig1)

    # 変化度をmax1にするデータ整形
    mx = np.max(score)
    score = score / mx

    # プロット
    test_for_plot = data[3001:6000, 2]
    fig = plt.figure()
    ax1 = fig.add_subplot(111)
    ax2 = ax1.twinx()

    p1, = ax1.plot(score, '-b')
    ax1.set_ylabel('degree of change')
    ax1.set_ylim(0, 1.2)
    p2, = ax2.plot(test_for_plot, '-g')
    ax2.set_ylabel('original')
    ax2.set_ylim(0, 12.0)
    plt.title("Singular Spectrum Transformation")
    ax1.legend([p1, p2], ["degree of change", "original"])
    plt.savefig('./results/sst.png')
    plt.show()


def embed(lst, dim):
    emb = np.empty((0,dim), float)
    for i in range(lst.size - dim + 1):
        tmp = np.array(lst[i:i+dim]).reshape((1,-1))
        emb = np.append( emb, tmp, axis=0)
    return emb

if __name__ == '__main__':
    main()

結果のプロットは以下のようになりました.
f:id:Nori_matsu:20170930230428p:plain
波形が乱れている点において,より鋭く異常度のピークが出ています.

どちらの手法でよりうまく異常検知ができるかはデータの性質に依存しますが,k近傍法の方が計算量は少なくて済むようです.

深層学習を用いた異常検知手法

LSTM (Long short-term memory) を用いた手法

時系列データを深層学習させる手法としては,RNNや,それを発展させたLSTMが知られています.
RNNで来月の航空会社の乗客数を予測する
わかるLSTM ~ 最近の動向と共に

LSTMは,与えられた波形から,次の時刻での値を予測するモデルです.
LSTMを用いて時系列データの異常検知をするアプローチとしては,異常例のデータセットが十分に集められるかそうでないかによって,二種類のアプローチがあります.
異常例のデータセットが少ない場合,直前までの波形から予測された値が,実際の値とどの程度異なるかによって,異常判定ができます.
一方,異常例のデータセットが十分にある場合には,LSTMを直接分類器として用いる方法も用いられるようです.
How to Use LSTM Networks for Time-series Anomaly Detection - Quora

Autoencoder (自己符号化器) を用いた手法

Autoencoderは,ニューラルネットワークを用いて次元圧縮をする手法であり,入力データをそのまま出力 (=復元) させるように教師あり学習をすることで,信号の特徴を抽出することができます.
deepage.net
正常な波形をデータセットとしてAutoencoderを学習させれば,テストデータとして正常波形を用いた時,元の形に近い波形を出力することができます.
一方このモデルに異常な波形を入力した場合,異常な波形に対する特徴を学習していないために,出力波形は大きく異なるでしょう.
このように,部分時系列データをうまく復元できたかどうかによって,異常判定を行います.

結び

時系列データから異常検知を行うためのアルゴリズムとして,k近傍法,特異スペクトル変換法を実装しました.
また,深層学習を用いたアプローチも紹介しました.
Autoencoderによる異常検知アルゴリズムの実装も行いたいところでしたが,時間切れとなってしまったので今回はここまでで...orz
リアルタイムで異常検知を行うためには,精度のみならず,どれだけの計算時間が許されるかという所にも着目して,アルゴリズムを選択する必要があるようです.

SparkのDeep Learning Pipelinesを使ってみた

Impulse開発チームの塚田です。

今回は、DatabricksのDeep Learning Pipelinesを、spark-shell上で触ってみました。 内容はほぼ下記を実行したものなので、英語余裕で読めるぜ!って方はこちらを見てください。

Deep Learning Pipelines — Databricks Documentation

Deep Learning Pipelines

Deep Learning PipelinesはSparkでDeep Learningをスケーラブルに実行するためのhigh-level APIを提供するパッケージです。 TensorFlowやTensorFlowをバックエンドとしたKerasをサポートしています。 現在は画像データに特化した下記を実行することができます。

  • Sparkで画像を操作する
    • 画像をDataFrameとして読み込むAPI
  • 転移学習
    • 学習済みモデルを特徴抽出器として使用
  • Deep Learningのモデルを使ったtrasform
    • 下記を使用してTransformerを構成
      • 学習済みモデル(Inception v3)
      • TensorFlow Graph
      • Keras
  • モデルをSQL functionとしてデプロイする
  • 分散ハイパーパラメータチューニング (comming soon...)

今回は下記を使って、猫と犬の判別をしてみたいと思います。

  • Sparkで画像を操作する
  • 転移学習

実行環境

ハードウェアの概要
機種名 MacBook Pro
機種ID MacBookPro13,2
プロセッサ名 Intel Core i5
プロセッサ速度 2.9 GHz
プロセッサの個数 1
コアの総数 2
二次キャッシュ(コア単位) 256 KB
三次キャッシュ 4 MB
メモリ 16 GB

準備

使用するのは猫と犬の画像です。 下記から取得しました。(Kaggleへの登録が必要です)

https://www.kaggle.com/c/dogs-vs-cats-redux-kernels-edition

Dataからtrain.zipをダウンロードしてください。 解凍後、適当な枚数を選りすぐり下記のようにします。 (猫、犬ともに30枚ずつで実行しました)

- /images/
  - cat/
    - cat.0.jpg
    - ...
    - cat.29.jpg
  - dog/
    - dog.0.jpg
    - ...
    - dog.29.jpg

Pythonパッケージのインストール

下記のインストールが必要です

pip install tensorflow keras h5py

spark-shell実行

pythonAPIしかないので、pysparkを実行します。 packagesにspark-deep-learningを指定します。

pyspark --packages databricks:spark-deep-learning:0.1.0-spark2.1-s_2.11 --driver-memory 2G --executor-memory 2G

画像ファイルから入力データを作成

学習に使う画像データはSparkのDataFrameとして取り込みます。 ディレクトリを指定すると直下にある画像ファイルを読み込みます。

# データの読み込み
from sparkdl import readImages
from pyspark.sql.functions import lit

cat_df = readImages("/images/cat").withColumn("label", lit(1))
dog_df = readImages("/images/dog").withColumn("label", lit(0))
cat_train, cat_test = cat_df.randomSplit([0.6, 0.4], 0)
dog_train, dog_test = dog_df.randomSplit([0.6, 0.4], 0)
train_df = cat_train.unionAll(dog_train)
test_df = cat_test.unionAll(dog_test)

中身を見てみると、画像のサイズ等の情報と中身のバイナリが入ってそうですね。

>>> train_df.show(100, False)
+---------------------------+---------------------------+-----+
|filePath                   |image                      |label|
+---------------------------+---------------------------+-----+
|file:/images/cat/cat.19.jpg|[RGB,223,320,3,[B@5db7511c]|1    |
|file:/images/cat/cat.2.jpg |[RGB,396,312,3,[B@3562ebce]|1    |
|file:/images/cat/cat.23.jpg|[RGB,256,334,3,[B@5d07ef8a]|1    |
|file:/images/cat/cat.25.jpg|[RGB,500,345,3,[B@55547685]|1    |
|file:/images/cat/cat.26.jpg|[RGB,374,500,3,[B@320cf282]|1    |
|file:/images/cat/cat.27.jpg|[RGB,479,370,3,[B@10925396]|1    |
|file:/images/cat/cat.28.jpg|[RGB,270,286,3,[B@720b85b1]|1    |
|file:/images/cat/cat.29.jpg|[RGB,375,499,3,[B@41d352b] |1    |
|file:/images/cat/cat.3.jpg |[RGB,414,500,3,[B@51d05fc8]|1    |
|file:/images/cat/cat.6.jpg |[RGB,303,400,3,[B@53dca887]|1    |
|file:/images/cat/cat.8.jpg |[RGB,345,461,3,[B@7dd49940]|1    |
|file:/images/cat/cat.9.jpg |[RGB,425,320,3,[B@100d5fbc]|1    |
|file:/images/cat/cat.0.jpg |[RGB,374,500,3,[B@43707c6a]|1    |
|file:/images/cat/cat.10.jpg|[RGB,499,489,3,[B@75de6c13]|1    |
|file:/images/cat/cat.11.jpg|[RGB,410,431,3,[B@2c99f571]|1    |
|file:/images/cat/cat.13.jpg|[RGB,315,499,3,[B@5693afe1]|1    |
|file:/images/cat/cat.15.jpg|[RGB,353,405,3,[B@16401a75]|1    |
|file:/images/cat/cat.16.jpg|[RGB,258,448,3,[B@10f8525a]|1    |
|file:/images/dog/dog.19.jpg|[RGB,225,299,3,[B@73a91f49]|0    |
|file:/images/dog/dog.2.jpg |[RGB,199,187,3,[B@388e5a7a]|0    |
|file:/images/dog/dog.23.jpg|[RGB,403,499,3,[B@2e0b6cac]|0    |
|file:/images/dog/dog.25.jpg|[RGB,375,499,3,[B@7cb391b5]|0    |
|file:/images/dog/dog.26.jpg|[RGB,224,300,3,[B@47db9a3d]|0    |
|file:/images/dog/dog.27.jpg|[RGB,375,499,3,[B@d4c4536] |0    |
|file:/images/dog/dog.28.jpg|[RGB,432,287,3,[B@78d7363f]|0    |
|file:/images/dog/dog.29.jpg|[RGB,376,500,3,[B@18093ea9]|0    |
|file:/images/dog/dog.3.jpg |[RGB,375,499,3,[B@39855e7] |0    |
|file:/images/dog/dog.6.jpg |[RGB,488,499,3,[B@6c12c557]|0    |
|file:/images/dog/dog.8.jpg |[RGB,500,469,3,[B@2db57024]|0    |
|file:/images/dog/dog.9.jpg |[RGB,500,368,3,[B@7c98ff4d]|0    |
|file:/images/dog/dog.0.jpg |[RGB,375,499,3,[B@3d6eba54]|0    |
|file:/images/dog/dog.10.jpg|[RGB,292,269,3,[B@472cf8d3]|0    |
|file:/images/dog/dog.11.jpg|[RGB,101,135,3,[B@419e9442]|0    |
|file:/images/dog/dog.13.jpg|[RGB,428,362,3,[B@7ac779ab]|0    |
|file:/images/dog/dog.15.jpg|[RGB,374,500,3,[B@3233c0bd]|0    |
|file:/images/dog/dog.16.jpg|[RGB,380,500,3,[B@12f95c48]|0    |
+---------------------------+---------------------------+-----+

転移学習

Deep Learning Pipelinesにより学習済みモデルを使用して学習を行います。 現状は下記のモデルがサポートされています。

  • InceptionV3

InceptionV3の出力を特徴量とし、決定木により猫と犬を分類します。 DeepImageFeaturizerインスタンスを作ってPipelineを作るだけですね。

# 学習
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer 

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
p = Pipeline(stages=[featurizer, dt])

p_model = p.fit(train_df)

出来上がったモデルを使ってtransformします。

# テスト
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

tested_df = p_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label"))))
Test set accuracy = 0.75

何も考えずに作った割には合ってる気もしますが、もうちょっと中身について見ていきます。

DataFlameの中身を見てみると、DeepImageFeaturizerの出力がfeaturesカラムに入っています。

>>> tested_df.show()
+--------------------+--------------------+-----+--------------------+-------------+-----------+----------+
|            filePath|               image|label|            features|rawPrediction|probability|prediction|
+--------------------+--------------------+-----+--------------------+-------------+-----------+----------+
|file:/images/cat/...|[RGB,374,500,3,[B...|    1|[0.0,0.8633406162...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/cat/...|[RGB,374,500,3,[B...|    1|[0.0,0.0,0.0,0.0,...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/cat/...|[RGB,499,431,3,[B...|    1|[0.0,0.0,0.229576...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/cat/...|[RGB,345,500,3,[B...|    1|[0.0,0.0516236834...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/cat/...|[RGB,374,500,3,[B...|    1|[0.0,0.0,0.0,0.0,...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/cat/...|[RGB,375,499,3,[B...|    1|[0.12656563520431...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/cat/...|[RGB,144,175,3,[B...|    1|[0.0,0.0,0.195755...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/cat/...|[RGB,499,495,3,[B...|    1|[0.0,0.0,0.0,0.0,...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/cat/...|[RGB,280,300,3,[B...|    1|[0.79150134325027...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/cat/...|[RGB,224,300,3,[B...|    1|[1.65047085285186...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/cat/...|[RGB,267,320,3,[B...|    1|[0.86000078916549...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/cat/...|[RGB,375,499,3,[B...|    1|[0.0,0.0,0.0,0.14...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/dog/...|[RGB,348,215,3,[B...|    0|[0.0,0.1367801278...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/dog/...|[RGB,332,500,3,[B...|    0|[0.21633519232273...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/dog/...|[RGB,499,415,3,[B...|    0|[0.0,0.0,0.884287...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/dog/...|[RGB,371,499,3,[B...|    0|[0.93737035989761...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/dog/...|[RGB,500,274,3,[B...|    0|[0.56643348932266...|    [0.0,1.0]|  [0.0,1.0]|       1.0|
|file:/images/dog/...|[RGB,287,300,3,[B...|    0|[0.10650488734245...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/dog/...|[RGB,376,499,3,[B...|    0|[0.11714683473110...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/dog/...|[RGB,264,299,3,[B...|    0|[0.0,0.0,0.0,0.0,...|    [0.0,1.0]|  [0.0,1.0]|       1.0|
|file:/images/dog/...|[RGB,499,327,3,[B...|    0|[0.21827600896358...|   [0.0,17.0]|  [0.0,1.0]|       1.0|
|file:/images/dog/...|[RGB,161,98,3,[B@...|    0|[0.04027611017227...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/dog/...|[RGB,386,500,3,[B...|    0|[0.0,0.0,1.286264...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
|file:/images/dog/...|[RGB,335,272,3,[B...|    0|[0.0,1.5273251533...|   [18.0,0.0]|  [1.0,0.0]|       0.0|
+--------------------+--------------------+-----+--------------------+-------------+-----------+----------+

featuresの中身は配列になっています。 長さは131072です。

>>> len(tested_df.head()['features'])
131072

決定木は下記の用になっています。

>>> print(p_model.stages[1].toDebugString)
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_46609281014c140920a8) of depth 2 with 5 nodes
  If (feature 38387 <= 0.6593263149261475)
   If (feature 26 <= 0.5600484609603882)
    Predict: 0.0
   Else (feature 26 > 0.5600484609603882)
    Predict: 1.0
  Else (feature 38387 > 0.6593263149261475)
   Predict: 1.0

featuresの数の割に木が短いです。 単純に学習データを多くすれば精度を上げられそうです。 (今回はとりあえず動かすだけなので大分サンプル数を絞っています)

また、indexからfeaturesのラベルを逆引きする手段がDeepImageFeaturizerになさそうなので、26と38387が何の値なのかさっぱりわかりません。 indexとラベルのマッピングが取れる手段が欲しいですね。

おわりに

Deep Learning Pipelinesを使うとDeep Learningのことを大して知らなくても実行できます。

まだ画像しか扱えませんし使える学習済みモデルも少ないですが、その辺の選択肢が増えて来ると夢が広がりますよね。

参考

dockerコンテナのレイテンシ

はじめましてブレインズテクノロジーの貴明です。 今回はDockerにおけるコンテナ間通信のレイテンシについて調べたためその内容について記事にしました。

経緯

「1コンテナ1プロセス」というDockerの原則に基づきコンテナを運用していると、確かに何のコンテナが何の役割を果たしているのか分かりやすいのですが コンテナが増えてくると果たしてそこで発生しているオーバーヘッドがどの程度なのか気になるところ。
基本的にレイテンシが起きるのは、CPU/ディスク/ネットワークとなりますが、今回はまずネットワーク(コンテナ間の通信)において発生しているレイテンシを単純な構成下で計測してみようという試みが本記事となります。
(今回はまず単一ホスト下における計測とし、ホスト間のレイテンシは次回!)

やりたいこと

同コンテナ内から発行されたクエリの処理時間と、別コンテナ経由で発行されたクエリの処理時間差から、コンテナ間通信におけるレイテンシを計測する。
具体的にはPostgreSQLに対してinsert文、およびselect文を複数回実行し、その処理時間差をtimeコマンドにて計測。 (単純なinsert文1,000回, select文10,000回実行) f:id:nakazawa-takaaki:20170817181226p:plain

前提となる環境

  • MacBook Pro (13-inch, 2017,Four Thunderbolt 3 Ports)
  • プロセッサ: 3.1 GHz Intel Core i5
  • メモリ: 16 GB 2133 MHz LPDDR3
  • グラフィックス: Intel Iris Plus Graphics 650 1536 MB
  • OS: MacOS Sierra (10.12.6)
  • Docker version 17.06.0-ce, build 02c1d87
  • docker-compose version 1.14.0, build c7bdf9e
  • Dockerリソース割り当て
    • CPUs: 2
    • Memory: 7.0 GB

結果

結果からいうと以下のようになりました。 やり方によるとは思われますが、今回の試みからはおよそ処理時間にして1%のレイテンシが発生している結果となりました。 実行方法は後述。

処理 同コンテナ処理時間(s) 別コンテナ経由処理時間(s) 処理時間増加比率
insert 11.741 11.908 101.4%
select 6.258 6.351 101.5%

結果詳細

具体的なtimeコマンドによる計測結果は以下の通り。

insert

time type 同コンテナ 別コンテナ経由 処理時間増加比率
real 0m11.741s 0m11.908s 101.4%
user 0m0.000s 0m0.040s -
sys 0m0.110s 0m0.100s 90.9%

select

time type 同コンテナ 別コンテナ経由 処理時間増加比率
real 0m6.258s 0m6.351s 101.5%
user 0m3.160s 0m3.370s 106.6%
sys 0m0.520s 0m0.270s 51.9%

実行方法

まずは以下の内容のDockerfileを用意

FROM postgres:9.6
COPY init.sql /docker-entrypoint-initdb.d/.
COPY create.sh test.sh insert.sql select.sql /tmp/

中でcopyしている各ファイルは以下の通り

  • init.sql: テスト用のDB作成
  • create.sh: テスト用のテーブル作成(今回はint型の1カラムのみとした)
  • insert.sql: テスト用のsql(insert文1,000行)
  • select.sql: テスト用のsql(select文10,000行)

以下のdocker-compose.ymlを用意して

version: '3'

services:
  db:
    build: .
    image: test/postgres:latest
    container_name: db
    ports:
      - "5432:5432"
    tty: true

  outerdb:
    image: test/postgres:latest
    container_name: outerdb
    tty: true

dockerイメージを作成、起動

docker-compose build
docker-compose up -d

sql発行のテストは以下の要領(同コンテナからのsql発行)

docker exec -it db bash
bash /tmp/create.sh
bash /tmp/test.sh localhost

sql発行のテスト(別コンテナからのsql発行)

docker exec -it db bash
bash /tmp/create.sh
exit
docker exec -it outerdb bash
bash /tmp/test.sh db

使ったもの

今回使ったDockerFileやシェルスクリプトなどはgithubへ上げております。

Spark Summit 2017 San Francisco

f:id:brains_aoki:20170607131148j:plain

こんにちは、データアナリストの青木とエンジニアの樋口です。

引き続き、Spark Summit 2017 San Fransiscoの記事です。Keynoteやセッションで特に興味深かったものを紹介していきます。

Keynote

Coming in Spark 2.2

まずは、Spark2.2に関する情報がきました。注目点は以下。

  • コストベースSQLの最適化

  • structured streamingがproduction-readyとなった

  • pip install pyspark が可能となる

すでにgitではv.2.2.0-rc4のtagが打たれていることから、リリース間近なようですね。

続いて大きな発表がありました。

Two new open source from Databricks

おそらくこの発表がSummitの目玉だったようです。 今後Databricksは以下2つについて、特に力を入れていくとのこと。

  1. Deep Learning

  2. Streaming Performance

Deep Learning

みんな大好きDeep Learning。画像分野を中心に華やかな成果を上げてはいますが、TensorFlowやKrasなど既存のDeep LearningオープンソースはまだまだAPIとしてlow-level。これに対してDatabricksはDeep Learning Pipelinesというhigh-levelなAPIを用意するぜ!とのことでした。これにより、

  • よくあるユースケースについては、コード数行でモデルを作成できる

  • Spark上で自動的に分散処理してくれる

  • バッチやストリーミングアプリ、SparkSQLにモデルをデプロイできる

ができるようになるとのことです。以下のDatabricksの記事に詳しい説明があります。

databricks.com

Streaming Performance

Spark Streamingにおけるデータに対し、DataFrameやSparkSQLなど、バッチと同様のhigh-levelのAPIを提供しつつ、レイテンシーを抑えるとのこと。microbatchの処理はやめて、continuous progressという新たな処理仕様になるようですが、既存のコード自体は変える必要はないようです。リアルタイム処理にSpark Streamingをコアとしている弊社にとってはうれしい話かもしれません。

Streaming PerformanceについてもDatabricksの以下の記事が詳しいです。

databricks.com

www.youtube.com

そのほか、DatabricksのServerlessの話もありました。

databricks.com

www.youtube.com

次はセッション。 各セッションの動画については来週末くらいにuploadされるみたいなので、その際にまたlinkと共に更新します。

Sessions

NEXT GENERATION WORKSHOP CAR DIAGNOSTICS AT BMW POWERED BY APACHE SPARK

f:id:mhigu:20170608101515p:plain:w160 f:id:mhigu:20170608101816p:plain:w160 f:id:mhigu:20170608101638p:plain:w160

概要

現在の販売店での車両診断は、手動で生成された決定木(人が作って来た条件分岐)に基づいていて、車種の多様化、車両システムの複雑化(hybrid, connective)に伴い、すでに限界に達しているとのこと。BMWでは、自動車やワークショップから入手できるデータを利用して、交換するべき部品や取るべき行動を予測できるモデルを作成し、そこから得られた結果をWeb-Applicationとして提供して、複雑化した車両整備を現場の人だけで解決できるような形にして出力しているそうです。

感想

これこそ、IoTの事例といった感じでしょうか。このアプリケーションを作り始めた理由としては、概要にも少し書いてありますが、車両システムの複雑化により整備者の知識だけでは交換すべき部品が分からないという背景があったそうです。この傾向は車業界に限らず発生することだと思うので、似たような事例が今後も出てくることになりそうです。impulseは異常検知に特化したソフトウェアとして作っているため、もしかしたら、impulseでも、、、?

DEEP LEARNING IN SECURITY—AN EMPIRICAL EXAMPLE IN USER AND ENTITY BEHAVIOR ANALYTICS (UEBA)

f:id:mhigu:20170608100119j:plain:w160 f:id:mhigu:20170608095516j:plain:w160 f:id:mhigu:20170608095526j:plain:w160

概要

このプレゼンテーションでは、顧客の攻撃検出例を使ってDeep Learningがどのように適用されて、どうセキュリティ問題を解決したのか、経験の共有、また、ディープラーニングや一般的な機械学習をより広範なセキュリティに展開するための課題とガイドラインについても説明していました。Deep Learningについては、以下の2つの実例が挙げられていました。

  • 畳み込みニューラルネットワーク(CNN)を使用するユーザ行動異常検出ソリューション
  • LSTMを使用するステートフルなユーザーリスクスコアリングシステム

感想

セキュリティ業界では、ルールベースの検知が限界とされて久しいですが、今回のプレゼンテーションではDeep Learning(CNN, LSTM)を使って異常検知をした実例を説明してくれました。CNNへの入力は、ユーザの通信状態(プロトコル毎)の画像。出力はどのユーザかのid。idがいつもと違うものになれば異常とする算段なのですが、これだけだと検知精度は良くないらしいです。対策として、CNNと併せてLSTMへ、フィッシングメールがあるかどうか、不審なDNSクエリがあるか、大量データ送信があるかなどをエンコードしたベクトルを入力して各段階でスコアを算出し、実際の検知を行っているとのこと。また、プレゼンの最後にセキュリティの異常検知はDeep Learningだけでは解決不能で企業のルール等も鑑みながらロジックを組み立てており、Deep Learningは異常検知の機能の一部分でしかないことを強調していた事が非常に印象的でした。

RAY: A CLUSTER COMPUTING ENGINE FOR REINFORCEMENT LEARNING APPLICATIONS

概要

UC Berkeleyの学生たちによるSpark をベースとした強化学習のフレームワーク(Ray)についてのプレゼン

機械学習が成熟するにつれて、標準的な教師あり学習は学習データの量と正確性という観点から十分な手段ではなくなりました。

昨今の機会学習アプリケーションでは、静的に学習したモデルから単一の予測を作成して提供する代わりに、一連のアクションを実行した結果を再評価する事で、動的環境の変化に対応する必要性が高まっています。強化学習という手段はこの要求に対して良い成果を出すことが知られています。 ただし、これらのアプリケーションは非常に厳しい計算要件を必要とし、動的なグラフ計算もサポートする必要があります。彼らは、この要件に対してスケールアウトが容易であるSparkをコンピューティングエンジンとして選択しており、そのアーキテクチャー及び一部実装方法についてがプレゼンの内容となっています。

感想

強化学習は違ったパラメータでシミュレーションを行いそれを評価してルールを更新し、再度違ったパラメータでシミュレーションを行い再評価して。。。という事をするのですが、彼らはそれを並列化して大量にシミュレーションを行いたいと、、、これこそ、Sparkの出番!!という感じで、単純にああなるほどね。と納得しました。

また講演では、人の走るという行為を強化学習でシミュレーションするサンプルアプリケーションのデモ動画が流されていて、最初の方は一歩進むのもままならない状態のものが、1歩・2歩と進むようになり、最終的には(不格好ながらも)ずっと走り続けるようになる結果を見て、単純におもしろいなと思いました。

GoogleのAlphaGoが強化学習とDeep Learningを利用して強くなったことはあまりに有名ですが、エンタープライズの世界でも(Sparkをベースとした!?)強化学習によりブレークスルーが起きるか要注目ですね。

BIG DATA AT AUDI: ROOT CAUSE ANALYSIS IN AN AUTOMOTIVE PAINT SHOP USING MLLIB

f:id:brains_aoki:20170608135540p:plain:w160f:id:brains_aoki:20170608135631p:plain:w160 f:id:brains_aoki:20170608135714p:plain:w160

概要

アウディの自動車塗装において取得できる、約2500のセンサーデータを活用した話。塗装工程は非常に複雑かつ、温度や湿度等の影響をうけるため、どの工程でどのような失敗が起きるのかセンサーデータにSparkのDataFrame APIやMLlibを使って原因を突き止めることを試みたとのことです。

感想

個人的に興味深かったのは、時刻調整や補正に関する話です。自動車が塗装されるタイミングにおけるセンサーデータの時刻調整の話や、ある時間間隔におけるデータの代表値を平均か回帰で補正するのかなど、弊社も似たようなことをやっているなと感じました。こういった親近感のような感想を得られたことは、ある意味収穫ではありました。

終わりに

Keynoteでもありましたが、これからSparkはDeep Learningのサポート・ストリーミングのマイクロバッチ廃止など、更に進化していきます。個人的にはDeep Learningの機能は早く使ってみたいですし、とても楽しみにしています。 今後もまだまだ、Sparkの進化から目が離せませんね!

【レポート】Spark Summit 2017 開幕!!!

Spark Summit 2017 San Francisco

f:id:brains_aoki:20170607004557j:plain

こんにちは、データアナリストの青木とエンジニアの樋口です。

6月5日から合計3日間アメリカのサンフランシスコでSpark Summit2017が開催されています。

spark-summit.org

Spark Summit2017では、3,000人以上のエンジニア、データサイエンティスト、研究者やビジネスプロフェッショナルが参加しており、データサイエンスやエンタープライズ、マシンラーニングなどの分野で、170を超えるセッションがあります。

ブレインズテクノロジーではデータ分析・機械学習を事業のコアとしている事もありSparkの動向には常に注目しています。

その中で、CTOから「ちょっと、きみたち行ってきたら?」というありがたい鶴の一声を頂いたのではるばるアメリカへ来ています!

セッションの内容等、順次アップして行く予定ですが、まずは雰囲気だけ!!

f:id:brains_aoki:20170607004445j:plain f:id:brains_aoki:20170607004526j:plain f:id:brains_aoki:20170607010319j:plain

(P.S サンフランシスコ、最高!)

f:id:brains_aoki:20170607010353j:plain

【レポート】ケイ・オプティコム様 IBMユーザ論文「AIを利用したサイレント故障検知の取組み」で金賞受賞!

データ分析サービス・プロダクト担当の藤原です。

弊社のリアルタイム大規模データ分析基盤「Impulse」を導入いただきましたケイ・オプティコム様が、ネットワークインフラ運用において機械学習を活用してサイレント故障の検知を実現した取組みについて、IBMが主催するIBMユーザー研究会の論文を執筆され、「金賞」を受賞されました!
http://www.uken.or.jp/symp/symp55/program/paperlist.shtml

関西エリアを中心として大規模なネットワークインフラの安定稼働のため、ネットワークのサイレント故障の迅速な検知を実現する手段として、機械学習に着目され、弊社Impulseを活用して分析基盤を整備された取組みの詳細がまとめられた内容です。

5月18〜19日には「IBMユーザー・シンポジウム」として、約1,000人の来場者が訪れる大規模なイベントが京都で開催されました。
www.uken.or.jp

機械学習の活用に関する具体的な取組み事例ということもあり、ケイ・オプティコム様のプレゼン会場は超満員。
聴講されていた方々も非常に真剣な様子で、その内容に聞き入っておられました。

今回の金賞受賞は、ケイ・オプティコム様が「AI・機械学習」といった技術を、いち早く「システムインフラ・ネットワーク運用」という分野で、実際の現場に適用されたという先進的な取組みであったという点が高い評価に繋がっているではないかと思います。
そうした貴重な取組みに対して、技術的な面で貢献できたことを弊社としても非常に嬉しく感じております。

今後もケイ・オプティコム様を始めとした多くの導入企業様の事業に貢献できるよう、Impulseの更なる進化に取り組んでいきたいと思います。

f:id:fujiwarakazunari:20170531135705j:plain
※シンポジウムでの表彰後の様子:株式会社ケイ・オプティコム 谷岡様(右)と赤井様(左)

Apache Zeppelin & Spark SQLでサーバのログデータを整形・可視化する

こんにちは。春休みにブレインズテクノロジーのインターンシップに参加した、現在学部4年生の松井です。
インターン中にやったこと、ハマったことなどをまとめてみました。

やったこと

Apache Zeppelinというブラウザ上で動くインタラクティブシェルを用いて、S3に置かれているサーバ4台のメトリクス1週間分のログデータを可視化しました。
可視化までのステップを大雑把に区切ると以下のような感じです。

  1. S3に置かれているログデータを必要な分ローカルに保存
  2. ローカルに保存したデータをDataframeとして読み出し
  3. DataFrameに対し、意味のある情報の抜き出し・データ整形といった処理を行う
  4. SQL文を記述してグラフ出力
Spark SQLについて

Apache Zeppelinには分散処理のフレームワークであるApache Sparkのインタプリタが組み込まれており、データの整形・可視化にはApache Sparkの1コンポーネントであるSpark SQLを使いました。
Spark SQLは、関係データベースと似ているDataFrameというデータ構造を使っています。
DataFrameのAPIScala, Java, Python, Rで利用することができ、今回のコードはすべてScalaで記述しています。

実行環境について

メモリ16GBのMacBook Proで実行しました。
Zeppelinのデフォルトの設定だと、メモリ領域やヒープ領域が足りずOutOfMemoryErrorやStackOverflowErrorが出たので、InterpreterからSparkインタプリタの設定を変更してみましたがどうも上手くいかず。。。
最終的には、設定ファイルconf/zeppelin-env.shに以下の行を追加することで解決しました。

export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16" # Additional jvm options. for example, export ZEPPELIN_JAVA_OPTS="-Dspark.executor.memory=8g -Dspark.cores.max=16"
export ZEPPELIN_MEM="-Xms4096m -Xmx4096m -Xss4096k -XX:PermSize=256m -XX:MaxPermSize=256m" # Zeppelin jvm mem options Default -Xms1024m -Xmx1024m -XX:MaxPermSize=512m

ちなみに、同ファイルにS3へのアクセスキーなども記述する必要があります。

export AWS_SECRET_ACCESS_KEY=####
export AWS_ACCESS_KEY_ID=####
export AWS_DEFAULT_REGION=####

S3に置かれているログデータをローカルに保存

まず、取得する期間やパスなどを初期変数として定めておきます。この変数は今後も使います。

///初期変数
//startDateのhh/mm/ssは無視されて、各日のデータはすべて取得される
//サーバからローカルに保存する場合も、ローカルからDataFrameに保存する場合も、同じこれらの初期変数を使う
val dayRange = z.input("ログ取得日数", 7).toString.toInt
val baseKey = z.input("S3パス", "Your Server's Path").toString
val baseLocalPath = z.input("保存先パス", "Your Local Path").toString
val startDate = z.input("ログ取得開始日時(yyyy/MM/dd HH:mm:ss)", "2017/02/21 23:00:00").toString

日付のパースなどを行い、4台あるサーバのデータをそれぞれローカルに保存していきます。

///S3からログデータ取得、ローカルに保存
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import java.text.SimpleDateFormat
import java.util.Calendar
import java.util.TimeZone
import java.util.ArrayList

import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.message.BasicNameValuePair

//Dateのパース
var sdf:SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/HH/mm")
sdf.setTimeZone(TimeZone.getTimeZone("JST"))
var nowdt:Calendar = Calendar.getInstance
nowdt.setTimeZone(TimeZone.getTimeZone("JST"))
if(startDate.length > 0) {
  var startDateTmp = startDate.split(" ")
  var sYear = startDateTmp(0).split("/")(0).toInt
  var sMonth = startDateTmp(0).split("/")(1).toInt-1
  var sDay = startDateTmp(0).split("/")(2).toInt
  var sHour = startDateTmp(1).split(":")(0).toInt
  var sMinute = startDateTmp(1).split(":")(1).toInt
  var sSecond = startDateTmp(1).split(":")(2).toInt
  nowdt.set(sYear,sMonth,sDay,sHour,sMinute,sSecond)
}
var nowdate = sdf.format(nowdt.getTime)

def saveAsTextFile(rdd:RDD[String], localPath:String) = {
  rdd.isEmpty() // InvalidInputException (matches 0 files) 対策
  rdd.saveAsTextFile(localPath,classOf[org.apache.hadoop.io.compress.GzipCodec])
  println("save as text file =>"+localPath)

}

def get(inputPath:String, host:String) = {
  try {
    var textFileRDD = sc.textFile(baseKey+inputPath+"*")
    saveAsTextFile(textFileRDD.repartition(1), baseLocalPath+inputPath+"0")
  } catch {
    case faee:org.apache.hadoop.mapred.FileAlreadyExistsException => throw faee
  }
}

def getDate(year:Int, month:Int, day:Int, add:Int) : String = {
  val resDate = "%tY/%<tm/%<td" format new java.util.GregorianCalendar(year, month-1, day-add);
  return resDate;
}

def zeroPadding2(str:String) : String = {
  var res = str
  if(str.length == 1)
    res = "0"+str
  return res
}

val nowdatesplit = nowdate.split("/")
val nowYear = nowdatesplit(0).toInt
val nowMonth = nowdatesplit(1).toInt
val nowDay = nowdatesplit(2).toInt
val nowHour = nowdatesplit(3).toInt
val nowMinuteStr = nowdatesplit(4).substring(0,1)+"0" // example 27 -> 20

//ログデータを10分ごとに切り出しまとめて保存
try {

  for(addDay <- 0 to (dayRange-1)) {

    for(addHour <- ((24-1) to 0 by -1)) { //(nowHour to 0 by -1)

      for(addMinute <- List(50,40,30,20,10,0)) {

        var analysisDate = getDate(nowYear, nowMonth, nowDay, addDay)
        var fileDate = analysisDate.replaceAll("/","")
        var dateSplit = analysisDate.split("/")
        var year = dateSplit(0)
        var month = dateSplit(1)
        var day = dateSplit(2)

        var hourStr = zeroPadding2(addHour.toString)
        var minuteStr = zeroPadding2(addMinute.toString)
        var minutePath = minuteStr.substring(0,1)

        get("SERVER-MG/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath, "SERVER-MG")
        get("SERVER-SR01/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath, "SERVER-SR01")
        get("SERVER-SR02/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath, "SERVER-SR02")
        get("SERVER-SR03/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath, "SERVER-SR03")

      }
    }
  }
} catch {
  // 取得するログの日付を遡り取得済みであれば処理終了
  case faee:org.apache.hadoop.mapred.FileAlreadyExistsException => println("exit : "+faee)
}

ローカルに保存したデータをDataframeとして読み出し

前の処理でローカルに保存したログデータを、まずは1つのDataframeに格納します。

///ローカルに保存してあるログデータをDataFrame形式で読み込み
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import java.text.SimpleDateFormat
import java.util.Calendar
import java.util.TimeZone
import java.util.ArrayList

import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.message.BasicNameValuePair

//Dateのパースは同様
var sdf:SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/HH/mm")
sdf.setTimeZone(TimeZone.getTimeZone("JST"))
var nowdt:Calendar = Calendar.getInstance
nowdt.setTimeZone(TimeZone.getTimeZone("JST"))
if(startDate.length > 0) {
  var startDateTmp = startDate.split(" ")
  var sYear = startDateTmp(0).split("/")(0).toInt
  var sMonth = startDateTmp(0).split("/")(1).toInt-1
  var sDay = startDateTmp(0).split("/")(2).toInt
  var sHour = startDateTmp(1).split(":")(0).toInt
  var sMinute = startDateTmp(1).split(":")(1).toInt
  var sSecond = startDateTmp(1).split(":")(2).toInt
  nowdt.set(sYear,sMonth,sDay,sHour,sMinute,sSecond)
}
var nowdate = sdf.format(nowdt.getTime)

def getDate(year:Int, month:Int, day:Int, add:Int) : String = {
  val resDate = "%tY/%<tm/%<td" format new java.util.GregorianCalendar(year, month-1, day-add);
  return resDate;
}

def zeroPadding2(str:String) : String = {
  var res = str
  if(str.length == 1)
    res = "0"+str
  return res
}

val nowdatesplit = nowdate.split("/")
val nowYear = nowdatesplit(0).toInt
val nowMonth = nowdatesplit(1).toInt
val nowDay = nowdatesplit(2).toInt
val nowHourStr = nowdatesplit(3)
val nowMinuteStr = nowdatesplit(4).substring(0,1)+"0"

//全データをmetricDfに格納
var metricDf:org.apache.spark.sql.DataFrame = null

try {

  for(addDay <- (0 to (dayRange-1))) {

    for(addHour <- ((24-1) to 0 by -1)) {

      for(addMinute <- List(50,40,30,20,10,0)) { //50,40,30,20,10,0

        var analysisDate = getDate(nowYear, nowMonth, nowDay, addDay)
        var fileDate = analysisDate.replaceAll("/","")
        var dateSplit = analysisDate.split("/")
        var year = dateSplit(0)
        var month = dateSplit(1)
        var day = dateSplit(2)

        var hourStr = zeroPadding2(addHour.toString)
        var minuteStr = zeroPadding2(addMinute.toString)
        var minutePath = minuteStr.substring(0,1)
        var minutePathForDisplay = minuteStr.substring(0,2)

        var metricDfMg = sqlContext.read.json(baseLocalPath+"SERVER-MG/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath+"*", "SERVER-MG")
        var metricDfS1 = sqlContext.read.json(baseLocalPath+"SERVER-SR01/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath+"*", "SERVER-SR01")
        var metricDfS2 = sqlContext.read.json(baseLocalPath+"SERVER-SR02/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath+"*", "SERVER-SR02")
        var metricDfS3 = sqlContext.read.json(baseLocalPath+"SERVER-SR03/"+year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePath+"*", "SERVER-SR03")

        val path = year+"/"+month+"/"+day+"/metricbeat_"+fileDate+hourStr+minutePathForDisplay
        println(path)

        if (metricDf == null) {
          metricDf = metricDfMg.unionAll(metricDfS1).unionAll(metricDfS2).unionAll(metricDfS3)
          metricDf = metricDfMg
        } else {
          metricDf = metricDf.unionAll(metricDfMg).unionAll(metricDfS1).unionAll(metricDfS2).unionAll(metricDfS3)
        }

        metricDfMg = null; metricDfS1 = null; metricDfS2 = null; metricDfS3 = null;

      }
    }
  }
} catch {
  case iie:java.io.IOException => println(iie)
}

DataFrameに対して、意味のある情報の抜き出し・データ整形といった処理を行う

今回取り出したい情報は、CPU利用率、ディスクI/O、ファイルシステム利用率、メモリ利用率、ネットワーク流量の5つです。
metricDfには余計な情報も含まれているので、これらの情報のみを抜き出します。
さらに、メトリクスには累計ネットワーク流量が記録されていたので、これを使って単位時間あたりのネットワーク流量を計算します。

まずは前の処理で得たmetricDfから上記の項目ごとの切り出しを行い、それぞれの項目ごとにDataFrameを作成します。

import sqlContext.implicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{unix_timestamp, to_date}

//DataFrameから項目ごとに切り出し、それぞれの項目ごとにDataFrame作成
var cpuDf = metricDf.where("metricset.name like '%cpu%'")
var diskDf = metricDf.where("metricset.name like '%diskio%'")
var fsDf = metricDf.where("metricset.name like '%filesystem%'")
var memoryDf = metricDf.where("metricset.name like '%memory%'")
var networkDf = metricDf.where("metricset.name like '%network%'")

//必要なカラムだけ取り出す
cpuDf = cpuDf.select("@timestamp", "beat.hostname", "system.cpu.system.pct", "system.cpu.user.pct")
val newNamesCpu = Seq("Date", "Host", "CPU_Usage_By_System", "CPU_Usage_By_User") //パーセント表示
cpuDf = cpuDf.toDF(newNamesCpu:_*)

diskDf = diskDf.select("@timestamp", "beat.hostname", "system.diskio.read.bytes", "system.diskio.write.bytes")
val newNamesDisk = Seq("Date", "Host", "Disk_IO_Reading_Bytes", "Disk_IO_Writing_Bytes") //バイト表示
diskDf = diskDf.toDF(newNamesDisk:_*)

fsDf = fsDf.select("@timestamp", "beat.hostname", "system.filesystem.used.pct")
val newNamesFs = Seq("Date", "Host", "Filesystem_Usage") //パーセント表示
fsDf = fsDf.toDF(newNamesFs:_*)

memoryDf = memoryDf.select("@timestamp", "beat.hostname", "system.memory.used.pct")
val newNamesMemory = Seq("Date", "Host", "Memory_Usage") //パーセント表示
memoryDf = memoryDf.toDF(newNamesMemory:_*)

networkDf = networkDf.select("@timestamp", "beat.hostname", "system.network.in.bytes", "system.network.out.bytes")
val newNamesNetwork = Seq("Date", "Host", "Network_In_Bytes", "Network_Out_Bytes") //バイト表示
networkDf = networkDf.toDF(newNamesNetwork:_*)

このあと、DataFrameのままSQLライクなメソッドチェーンを繋げて処理をしていたのですが、ものすごく実行時間が掛かってしまっていました。
そこで、DataFrameをRDD経由でScalaのArrayに変換し、Arrayに対して処理を行うことで、処理速度が大幅に改善されました。

DataFrameをScalaのArrayとして操作する

CPU利用率についての処理を行います。
まず、Dataをパースし、最小時間単位をhourに直します。

//Dateをmapして、最小時間単位をhourにするよう部分文字列切り出し
val DateTime = ("""([0-9]{4}-[0-9]{2}-[0-9]{2})""" + "T" + """([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})""" + "Z").r

def get(s:String) = s match {
  case DateTime(d,t) => d + " " + t.slice(0,2)
}

var rowsCpu: RDD[Row] = cpuDf.rdd
var cpuDfMapped = rowsCpu.map({
  case Row(date: String, host: String, cPU_Usage_By_System: Double, cPU_Usage_By_User: Double) => (get(date), host, cPU_Usage_By_System, cPU_Usage_By_User)
}).toDF("Date", "Host", "CPU_Usage_By_System", "CPU_Usage_By_User")

1週間分のデータを可視化するので、毎時間最大値の情報のみを使うことにします。

import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

//Cpuログをホストごとに分類 <- この部分、処理を軽くするために行っているので場合によっては分けなくてよい(groupByでDate,Hostを指定しているため)
var cpuDf_filtered_mg = cpuDfMapped.where('Host === "SERVER-MG")
var cpuDf_filtered_1 = cpuDfMapped.where('Host === "SERVER-SR01")
var cpuDf_filtered_2 = cpuDfMapped.where('Host === "SERVER-SR02")
var cpuDf_filtered_3 = cpuDfMapped.where('Host === "SERVER-SR03")

//毎時間最大値を抽出
//rowscpuDf_mgの型: Array[org.apache.spark.sql.Row]
var rowscpuDf_mg = cpuDf_filtered_mg.groupBy("Date", "Host").agg(max("CPU_Usage_By_System"), max("CPU_Usage_By_User")).sort($"Date").rdd.collect()
var rowscpuDf_1 = cpuDf_filtered_1.groupBy("Date", "Host").agg(max("CPU_Usage_By_System"), max("CPU_Usage_By_User")).sort($"Date").rdd.collect()
var rowscpuDf_2 = cpuDf_filtered_2.groupBy("Date", "Host").agg(max("CPU_Usage_By_System"), max("CPU_Usage_By_User")).sort($"Date").rdd.collect()
var rowscpuDf_3 = cpuDf_filtered_3.groupBy("Date", "Host").agg(max("CPU_Usage_By_System"), max("CPU_Usage_By_User")).sort($"Date").rdd.collect()

//cpuDf_mg: Array[(String, String, Double, Double)]
var cpuDf_mg = rowscpuDf_mg.map{u => (u.getString(0), u.getString(1), u.getDouble(2), u.getDouble(3)) }
var cpuDf_1 = rowscpuDf_1.map{u => (u.getString(0), u.getString(1), u.getDouble(2), u.getDouble(3)) }
var cpuDf_2 = rowscpuDf_2.map{u => (u.getString(0), u.getString(1), u.getDouble(2), u.getDouble(3)) }
var cpuDf_3 = rowscpuDf_3.map{u => (u.getString(0), u.getString(1), u.getDouble(2), u.getDouble(3)) }

//Arrayを統合し、DataFrameに戻しテーブル登録
var cpuDfs = cpuDf_mg ++ cpuDf_1 ++ cpuDf_2 ++ cpuDf_3
var cpuDfs_modified = sc.makeRDD(cpuDfs).toDF("Date", "Host", "CPU_Usage_By_System", "CPU_Usage_By_User")
cpuDfs_modified.registerTempTable("T_CPU")

ディスクI/O・ファイルシステム利用率・メモリ利用率の他3つのデータについての操作は、これと同様に行いました。

Scalaのコレクションメソッドmapを使った処理

ネットワーク流量についての処理を行います。
メトリクスに記録されていたネットワーク情報は、上り・下り転送量の累積バイト数であり、2^32-1バイトで循環的になっていました。
そこで、1つ前のデータとの差分を取ることで、単位時間あたりのデータ転送量を計算しました。

//Dateをmapして、最小時間単位をhourにするよう部分文字列切り出し
val DateTime = ("""([0-9]{4}-[0-9]{2}-[0-9]{2})""" + "T" + """([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})""" + "Z").r

def get(s:String) = s match {
  case DateTime(d,t) => d + " " + t.slice(0,2)
}

var rowsNetwork:RDD[Row] = networkDf.rdd
var networkDfMapped = rowsNetwork.map({
  case Row(date: String, host: String, network_In_Bytes: Long, network_Out_Bytes: Long) => (get(date), host, network_In_Bytes, network_Out_Bytes)
}).toDF("Date", "Host", "Network_In_Bytes", "Network_Out_Bytes")

Scalaのコレクションのメソッドであるmapを使って差分の計算を行います。負になった場合は循環の切れ目が入っていると判断し、カウンタ値2^32 - 1を使って処理しました。
SERVER-MGサーバのデータだけであとの3台分は省略しています。

import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

//Networkログをホストごとに分類
var networkDf_filtered_mg = networkDfMapped.where('Host === "SERVER-MG")

//毎時間最大値を抽出 
//rowsNetwork_mg: Array[org.apache.spark.sql.Row]
var rowsNetwork_mg = networkDf_filtered_mg.groupBy("Date", "Host").agg(max("Network_In_Bytes"), max("Network_Out_Bytes")).sort($"Date").rdd.collect()

//差分を計算するための関数
def delta(a:Any, b:Any): Long = {
  var atmp: Long = a.asInstanceOf[Long]
  var btmp: Long = b.asInstanceOf[Long]
  if (atmp - btmp > 0) {
    return atmp - btmp
  } else {
    return 4294967295L - btmp + atmp //4294967295Lはカウンタ値
  }
}

//tailで先頭の要素を除いたArrayを、zipでもとのArrayの同インデックス要素とのタプルにし、mapで差分を計算
var rowsNetwork_mg_delta = rowsNetwork_mg.zip(rowsNetwork_mg.tail).map{ u =>
  (u._2.apply(0).toString,
  u._2.apply(1).toString,
  delta(u._2.apply(2), u._1.apply(2)),
  delta(u._2.apply(3), u._1.apply(3)))
}

//DataFrameに戻し、列の名前を付ける
val networkDf_mg_modified = sc.makeRDD(rowsNetwork_mg_delta).toDF("Date", "Host", "Network_In_Per_DeltaTime", "Network_Out_Per_DeltaTime")

//テーブル登録
networkDf_mg_modified.registerTempTable("T_NETWORK_MG")

SQL文を記述してグラフ出力

SQL文を書くだけで可視化してくれます。

%sql
select
  *
from
  T_CPU
order by
  Date

GUI操作で、使用する列やグラフの形状を変えることができます。
結果はこのようになりました。

f:id:brains-tech:20170425160512p:plain