読者です 読者をやめる 読者になる 読者になる

Technology Topics by Brains

ブレインズテクノロジーの研究開発機関「未来工場」で働くエンジニアが、先端オープン技術、機械学習×データ分析(異常検知、予兆検知)に関する取組みをご紹介します。

プログラミング言語あれこれ

こんにちは。(年齢順で)ブレインズNo.3のこんのです。
今回は当社のプロダクトでどんなプログラミング言語が使われているのか紹介したいと思います。

プロダクトで使われているプログラミング言語

当社ホームページをご覧頂いている方はすでにご存知かと思いますが、当社はエンタープライズ向けに2種類のプロダクトをご提供しています。ひとつは検索エンジンのNeuron。もうひとつは機械学習を活用したリアルタイム大規模データ分析プラットフォームのImpulseです。
どちらもオープンソースを利用し、低コストで提供している製品です。

さて、これらの製品で使われているプログラミング言語ですが、使っているオープンソースのライブラリに依存しています。
早速見てみましょう。
f:id:bti-konno:20151112151428j:plain

上記グラフは、各プロダクトの最新のマスタのソースの実際のコードの行数をプログラミング言語別でカウントしたものです(コメント行や空行は除いています)。
検索エンジンのNeuronでは Java が圧倒的に多く、次いで Javascript やHTML/CSSが使われています(Javascriptは利用しているオープンソースのライブラリもあわせてカウントしてしまっているため多く見えています)。
データ分析のImpulseでは Python が多く、次いで JavaScalaJavascript が続きます。簡潔な表現で処理を記述できる言語を主に用いているため、Neuronと比べて格段にコード量が少なくなっているように見えますが、実際はNeuron以上にリソースを投入して開発を行っています。
JavaPythonScalaに自信のある方はぜひ当社に応募してみませんか?

すぐにネタがつきてしまいました。
これだけではちょっと短いので、社員にアンケートを取ってみました。

社員のプログラミング言語あれこれ

当社社員はみんな前職がIT系出身であるため、なんらかのプログラミング言語をさわっています。
そこでまずは好きなプログラミング言語を(無理やり)聞いてみました。
f:id:bti-konno:20151112154257j:plain

簡潔でわかりやすい記法が好感触なのかPythonが1位です。やはり静的に型が決まっていないと嫌だ、というひとが投票したのでしょうか?次点はJavaでした。ちょっと変わったところではShell scirptを上げているひともいます。やはりワンライナーで結果が出るのがいいのでしょうか。


どれくらいのバックグランドを持って開発に携わっているのか?さわったことのあるプログラミング言語とその経験年数も聞いてみました。前職や大学で触ったことがある、というのも上げてもらっています。
f:id:bti-konno:20151112154844j:plain

結構みんないろいろな言語を触っています。
1年未満が多いですが、この中にはImpulse開発で初めてさわった言語ももちろんあります。
言語を1つきちんと習得していれば、初めての言語でも慣れれば書けるようになるようです。


最後に、今後どんなプログラミング言語を勉強したいかを上げてもらいました。1つ以上上げてもらっています。
f:id:bti-konno:20151112155734j:plain

ScalaPythonは当社プロダクトで使っているため、想定通りでした。次いで Go がきたのは以外でした。最近 Go 界隈が盛り上がっているのでしょうか? HaskellErlangLISPといった関数型言語の勉強をしたいというひともいますね。現在メインで使われているオブジェクト指向言語とは違った概念の言語を勉強し、見識を広げたいということでしょうか。

以上、プログラミング言語のあれこれを見てきました。
当社にご興味持った方、当社のことをもっとよく知りたい方はご連絡ください。

機械学習アルゴリズム実装シリーズ [線形回帰編]

機械学習アルゴリズム実装 線形回帰編


こんにちわ。
ブレインズテクノロジー最年少樋口です。

、、、と言っても、もうすぐ27なのですが...orz


今回から何回かに分け、「機械学習アルゴリズム実装シリーズ」と題して機械学習アルゴリズムPythonで実装をしていきます。もちろん、scikit-learnなどの機械学習ライブラリを使うのではなく重要な部分は全て自分で実装するという試みです。なるべく難しい数式などの説明は最小限にして、機械学習アルゴリズムを少しずつ解説・実装していきたいと思いますので、「これから機械学習いじってみたいけど、何したらいいか分からない。。。」って人の助けになれたら嬉しいです!(と言いつつ自分の備忘録代わりにしているのは秘密ですw)

因みにここで書いている事はCourseraという有名なオンライン学習サイトのMachine Learning by Stanford Universityで学んだ事を元に書いています。Machine Learning以外にもAlgorithmなど豊富なコンテンツが揃っているのでぜひ一度受講してみてください。(基本的に全部無料です!)www.coursera.org

機械学習とは

ということで、早速始めます。が、その前に簡単に機械学習についてほんのちょっとだけ説明を。

定義としてはいろんな言い方がされていますが、自分としては
What is Machine Learning: A Tour of Authoritative Definitions and a Handy One-Liner You Can Use - Machine Learning Mastery
で引用として述べられている一文がしっくりきました。

The field of machine learning is concerned with the question of how to construct computer programs that automatically improve with experience.


要するに、

どうやってコンピュータープログラムに自動的改善を行わせるのかを主題としているのが機械学習という分野である

ということみたいです。プログラムが自動的に自身を改善してくれるとは夢の様なことですねw

機械学習には大きく分けて教師あり学習と教師なし学習の2種類がありますが、更に教師あり学習は回帰と分類、教師なし学習はクラスタリングという種類にカテゴリー分けができます。

[教師あり学習]

  • 回帰(Regression)
  • 分類(Classification)

[教師なし学習]

厳密に言うと半教師学習というものもあったりしますが、ここではとりあえず「まぁ、大きく分けたら3つあんのね。日本語でおk」という程度で大丈夫かと。勉強していくうちに詳細は分かってきます。
以下簡単に、種類ごとにまとめてみました。

教師あり 教師なし
回帰 単線形回帰, 重線形回帰 -
分類 Logistic Regression, SVM, Neural Network K-means, Mean-shift

ちなみにMATLABで有名なMathWorksさんのページにもここらへんの説明や分かりやすい図が載っているので参照してみてください。jp.mathworks.com

それぞれの特徴としては、

  • 回帰:連続する値を予測する
  • 分類:離散値を予測する

という違いがあります。具体的例としてそれぞれ以下に例を示しておきます。

[回帰]
過去から現在までの商品の売れ行きから将来の売れ行きを予測
cpuリソースなどの時間毎の使用量予測

[分類]
画像認識(猫なのか、犬なのか、車なのか)など
ニュースカテゴリ分け

他にもいろんなところで機械学習が使われています。
もっと機械学習について学びたい場合は以下、専門の本をどうぞ。実践的なテクニックと共に深く学べます。machinelearningmastery.com

データを見る

ここまで、機械学習とはなんぞや的な部分を書いてきましたが、ここからは今回実装する線形回帰とはなんぞや的なところに入っていきます。まず以下の例を見て下さい。
f:id:mhigu:20151013151606p:plain
X軸、Y軸に (x, y) = (0, 0) (1, 1) (2, 2) (3, 3) の点がそれぞれプロットされています。では、このデータからx=4 の時はどのような点を取りうる事が予想できるでしょうか。

単純に考えればy=4と予想できると思います。直感的にも分かりやすいですよね。数式で表すなら、高校の時に習った直線を表す式y = ax+ba,bを求めてx=4を代入するだけですね。

では次の例です。車のエンジンサイズを横軸(X軸)、それに対する車の値段を縦軸(Y軸)として表示しています。x=290 の時、y の値はいくつになるでしょうか。
f:id:mhigu:20151026122355p:plain
(データソースは以下参照)
UCI Machine Learning Repository: Automobile Data Set

先ほどの例とは違って単純には求まりそうにはありませんが、y=ax+b の式にab の値を適切に設定すればおおよその値は求まりそうな気もします。図で示すとだいたい以下のような赤で示された直線を求めることが出来れば車の値段y を求めることができそうです。
f:id:mhigu:20151027114013p:plain
このように、今までに観測されたデータから線形モデルを使ってある値を予測することを線形回帰といいます。

線形回帰理論編

では実際に線形回帰の中身(ab の決め方)はどうなってんの?ってところの説明に移りたいと思います。まずは先ほどの車のエンジンサイズと値段データの表記を以下の様に定めます。

  • x_i : 車のエンジンのサイズ (i番目) ※入力値
  • y_i : 車の値段(i番目)
  • m : サンプルデータの数(今回はm=205)

このデータに対して予測を行うモデルを以下のような数式で表します。因みにモデルとは先ほどのy=ax+bで表した数式の事を指します。英語ではhypothesisという呼び方をするみたいです。

  • h_θ(x)=θ_0+θ_1x

今回の問題はエンジンサイズx=290 の時の車の値段を求めることでした。ただし、それに先立ちまずはθ=(θ_0, θ_1) の値を適切に設定しなければなりません。どういうアプローチをすればよいでしょうか。例として、θ_0=0, θ_1=30 の時を考えてみます。直線を図示すると
f:id:mhigu:20151026135530p:plain
のようになり、直感的にあまり適切ではないように見えます。
ではどれぐらい適切でないのでしょう。一つの方法として、青で図示されている各点に対するh_θ(x) で求まる値との差分の2乗を計算することで表せます。

[誤差]=\frac{1}{m}\{(y_1 - 30x_1) + (y_2 - 30x_2) + .... + (y_m - 30x_m)\}^2
(具体的な値の計算はここでは省きます。)

これを一般化すると,

  • j(θ_0, θ_1)=\frac{1}{m}\sum_{k=1}^{m} \{y_k-(θ_0 + θ_1x_k)\}^2

となります。このj(θ) を目的関数(Cost function)と呼びます。誤差は原則として0に近ければ近いほど良いので(※Over fittingを考慮しない)、目的はこの目的関数を最小にするθ_0, θ_1 の値を求める事になります。


いろいろと定義して来たので一度整理しておくと、

  • 問題

  エンジンの大きさがx=290 の時、車の値段y の値を求める

  • モデル

  h_θ(x)=θ_0+θ_1x

  • パラメータ

  θ_0, θ_1

  • 目的関数

  J(θ_0, θ_1)=\frac{1}{2m}\sum_{k=1}^{m} \{y_k-(θ_0 + θ_1x_k)\}^2
  ※後の計算単純化のため全体を2で割っておく

  • 目的

  J(θ_0, θ_1)を最小化するθ_0, θ_1を見つける。
以上がここまでのまとめです。

最急降下法

ここからは目的であるJ(θ_0, θ_1)の最小化をするためのアルゴリズムの説明をしていきます。この部分が教師データ(これまでの観測データ)から実際に自動的改善を行う部分、つまりは機械学習の肝となる部分です。今回は最急降下法(Gradient Descent)というアルゴリズムを使います。これ以外にも、目的関数を最小化するためのアルゴリズムはありますが、シンプルであり線形回帰以外でも使われることのあるアルゴリズムなので覚えておくのも良いかと思います。

最急降下法は以下の手順で処理を進めます。
1. 任意のθ_0, θ_1を設定する
2. J(θ_0, θ_1)が最小となるようにθ_0, θ_1を動かす
3. θ_0, θ_1を更新して2.のプロセスへ戻る

これを収束まで繰り返します。

式で表すと、θ_0, θ_1は以下計算によりにより更新されていきます。
θ_j-α\frac{∂}{∂θ_j}J(θ_0, θ_1) ※αは学習率

今回の例ではモデルがh_θ(x)=θ_0+θ_1xなので、パラメータはθ_0, θ_1の2つでした。そのためj=0, 1 の場合を考えればよく、J(θ_0, θ_1)をそれぞれで偏微分した結果は以下の様になります(具体的な計算は省略)。

j=0の時
(次の) θ_0=θ_0-α\sum_{k=1}^{m}\frac{1}{m}\{y_k-(θ_0 + θ_1x_k)\}
j=1の時
(次の) θ_1=θ_1-α\sum_{k=1}^{m}\frac{1}{m}\{y_k-(θ_0 + θ_1x_k)\}x_k

「はて、偏微分てなんぞや」という人は難しいことは考えずに、とりあえず最終的にはこんな式になるのかと思ってもらえればよいです。
これを繰り返し計算していくことで目的関数J(θ_0,θ_1)を最小とする、θ_0, θ_1の値が求まります。あとはこれをコードに落として、データを入れてあげれば線形回帰が動くはずです。。。

実装!!!

さてここまでで一通りの線形回帰の処理の流れを解説してきましたので、いよいよいよいよ実装に移りたいと思います。といっても実装自体はここまでに説明してきた数式をコードに落とすだけですので、説明はコードに書いてしまいたいと思います。最急降下法メソッドの部分は行列計算をしているため少し分かりにくいかもしれませんが、数式とコードを見比べてどう実装されているのか、データがどう処理されているかに注意して追っていけば分かるかと思います。
※ちなみに今回はデータをcsvの形で読み、データクレンジングは事前にしてあります。csvの形式は以下の構造になっています。

エンジンサイズ 車の値段
130 13495
~ ~
~ ~

[コード]

# -*- coding:utf-8 -*-
import numpy as np


def gradient_descent(x, y, iter_num, alpha=0.01):
    """
    最急降下法メソッド
    :param x: テストデータarray
    :param y: 目標データarray
    :param iter_num: イテレーションの回数
    :param alpha: 学習率
    :return: 学習後のθ
    """
    theta = np.array([0, 0])  # 1x2行列
    m = len(x)  # データ数

    # ----以下で目的関数Jの偏微分を計算して繰り返しθの値を更新する
    for i in range(iter_num):
        h = np.dot(x, theta)  # 1x205行列   h(θ) = θ0 + θ1X を計算
        J = np.sum(np.square(h-y))/(2*m)  # 目的関数。Jはスカラー値になる。
        print("J=", J)
        tmp1 = theta[0] - alpha*np.sum(h-y)/m   # θ0の計算
        tmp2 = theta[1] - alpha*np.sum((h-y)*x[:, 1])/m  # θ1の計算
        theta[0] = tmp1   # θ0の更新
        theta[1] = tmp2   # θ1の更新

    return theta

if __name__ == "__main__":

    print("Linear Regression for Car Price Evaluation!")

    # -----データの読込、変数初期化
    alpha = 0.0001  #学習率α この値によっては適切に学習できない場合がある
    iter_num = 30  #イテレーションの回数
    data = np.loadtxt("car_2.csv", delimiter=",")
    engine_size = data[:, 0]    # 1x205行列
    x = np.c_[np.ones((len(engine_size),)), engine_size]  # 205x2 最急降下法関数で行列計算を行うため列を足している。
    y = data[:, 1]  # 1x205行列

    # -----最急降下法処理をiter_num分実施
    theta = gradient_descent(x, y, iter_num, alpha)
    print("[theta_0 theta_1] = ", theta)

    # -----後処理
    ans = np.dot(theta, np.array([1, 290]))  # x=290の時のyを計算
    print("Answer(x=290): ", ans)

実行時の結果を以下に示します。

$ python sample.py
Linear Regression for Car Price Evaluation!
J= 118634960.46
J= 84567747.8682
J= 60867761.8781
J= 44969646.0721
J= 33868970.9527
J= 26389948.1617
J= 21570569.0423
J= 18134456.6841
J= 15760941.5348
J= 14182798.9677
J= 13186249.2811
J= 12350034.3706
J= 11881632.6393
J= 11484490.8085
J= 11312642.3557
J= 11158608.8781
J= 11022390.3756
J= 10903986.8483
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
J= 10803398.296
[theta_0 theta_1] = [  0 105]
Answer(x=290):  30450

結果を見ると最終的に、目的関数の値が収束して動いていないことが確認できます。これにより適切なJ(θ_0, θ_1)の値が定まりました。
求めたJ(θ_0, θ_1)で結果を図示してみました。

[横軸:エンジンサイズ(x), 縦軸:車の値段(y)]
f:id:mhigu:20151027194528p:plain
直感的には、xの大きい時にあまり正確に予測できていないように見えますが、アルゴリズム的にはこれが最適解という事になります。
ただし、この結果は学習率やイテレーションの回数を調整した結果ですので、学習率やイテレーションの回数を変えると結果は変わってきます。特に学習率を変えると収束せずに発散したりします。どうしてそうなるのかは今回は説明しませんが是非考えて見て下さい。


以上で今回のBlogを終わりたいと思いますが、今回の例では過学習を考慮せず、正則化についても説明せず進めてきました。他にもモデルはもっといいものがあるかもしれませんし、考慮すべきことはたくさんあります。その辺は今後また良いタイミングで説明できたらと思います。
ではノシ

【レポート】AWS re:Invent 2015(AWSの新サービスが発表されました:Day2)

f:id:fujiwarakazunari:20151009090526j:plainデータ分析サービス・プロダクト担当の藤原です。

昨日に続き、本日も米国ラスベガスからAWS最大のグローバルカンファレンス「re:Invent」をレポートします。
本日の基調講演(Day2)でも新たなサービスが発表されましたので、その内容を簡単にご紹介します。

1. Amazon Kinesis Analytics
ストリーミングデータに対するSQLベースの時系列分析処理エンジン

2. 新しいインスタンスタイプの提供
X1という強力なハイパワーマシンと、t2.nanoというより小さなインスタンスタイプを提供

3. Amazon EC2 Container Serviceの機能拡張(Amazon EC2 Container Registry、他)
・Container Registry:フルマネージドのコンテナ管理(イメージの作成、管理、配布)
・Container Service CLI:各種コンテナ処理のコマンドライン実行への対応

4. AWS Lambdaの機能拡張(Python for Lambda、他)
VPCサポート
・実行時間の拡張(最大5分に)
・スケジュール実行
・バージョン管理
Python対応

5. AWS Mobile Hub
モバイルアプリ開発におけるビルド、テスト、モニタリングを含む統合管理サービス

そして

6. AWS IoT
SDKs, Device Gateway, Rules, Shadowという一連の機能郡によって構成されたIoT向けのプラットフォームサービス

特に、午前中のキーノートでの発表後、即座に午後からのテクニカルブレイクアウトセッションのメニューに「NEW LAUNCH! AWS IoT xxxxxxxx」というタイトルのセッションがいくつも追加され、どの会場も長蛇の列でした。グローバル規模でIoT分野の関心度の高さが感じられます
セッション聴講後、スピーカーがデモで利用していた「Amazon IoT Button」というIoTデバイス機器が配布されたのでこちらをGet。
帰国後、社内の開発メンバーと共有したいと思います。

AWSの情報はすぐに世界中に配信されるため、現地でなくても最新の情報は得られると思いますが、やはり現地で感じたのはグローバル規模で利用されているAWSの様々なサービスや事例の凄さと、それに関わるITエンジニアの熱気ですね。
また、AWSを活用したエコシステムを開発・提供するパートナーの多さにも圧倒されました。
弊社も、こうした様々なAWSサービスを活用したエコシステムの開発に取り組んでいきたいと思います。

【レポート】AWS re:Invent 2015(AWSの新サービスが発表されました:Day1)

f:id:fujiwarakazunari:20151008074735j:plainデータ分析サービス・プロダクト担当の藤原です。

AWS re:Invent」をご存知でしょうか?

Amazon Web Service(AWS)で毎年開催されるグローバルデベロッパーカンファレンス「AWS re:Invent」で、今年も米国ラスベガスで10月6日〜10月9日の日程で開催されています。

AWS re:Invent では、実践ワークショップ、技術セッション、アーキテクチャから運用まであらゆるテーマを扱う詳細な技術コンテンツなど、300を超えるセッションや、グローバルのAWSパートナー企業による展示会が催されます。

弊社もAWSのテクノロジーパートナーとして、AWSを活用したグローバル規模での各種ソリューションやエコシステムの最新動向を調査すべく、このカンファレンスに参加しています。

今回は、現地から本日の基調講演(ラスベガス現地時間:10月6日 8:30〜)で発表されたAWSの新サービスについて、速報レポートをご紹介します。

毎年、このre:Inventでは、基調講演(Day1, Day2)で新サービスが発表されるのが特徴です。Day1の本日は、AWSのAndy Jassy氏から8つの新サービスについて発表がありました。

1. Amazon QuickSight
・SPICEというインメモリ分析処理と優れたビジュアライズダッシュボードを備えたビッグデータ用BIサービス

2. Amazon Kinesis Firehose
Kinesisに収集されたストリーミングデータをS3やRedshiftにそのまま移行できる機能の拡張

3. Amazon Import/Export Snowball
・大規模データのストレージ・移行サービス
(50TB格納できるハードウェアの提供と移動、AWSリソースへのデータロードの一連のオペレーションをサービス提供)

4. RDB for MiriaDB
RDBでのMiriaDBのサポート開始

5. AWS Database Migration Service
・オンプレからのデータマイグレーションサービス、1TBの移行が約3ドル

6. AWS Schema Conversion Tool
・データマイグレーション時の変換ツール

7. AWS Config Rule
コンプライアンスガイドラインに基づくAWS Configのルール設定と監視、通知機能の拡張

8. AWS Inspector
・セキュリティ・監査に関する自動アセスメントとレポーティングのサービス


QuickSightは、データ分析に必要な「収集」「蓄積」「可視化/分析」において、これまでマネジメントコンソール以外には提供されていなかった「可視化」の強化を果たし、一貫したデータ分析環境が整備されたといえます。

Database Migration ServiceやSchema Conversion Toolといったマイグレーションサービスの提供によって、これまで以上に、ユーザー企業におけるデータのクラウド移行が促進されるのではないでしょうか。

これらの新サービスの詳細は、Amazon Web Service ブログにて掲載されていますので、ご興味のある方はそちらをご確認ください。

明日の基調講演(Day2)でも新しい発表があるかもしれませんので、またレポートしたいと思います。

ここにハマった!DynamoDB

はじめまして。Impulse開発チームの木村です。

今回は、Amazon DynamoDBを、 Apache Cassandraと同じように扱おうとした際に、ハマった点とその解決策を紹介します。

なお、DynamoDBの操作には、AWS SDK for JavaScript (Node.js)を使用しています。

テーブル定義編

テーブルをまとめる機能がない

ハマった点

Cassandraでいうkeyspaceに相当するものがないため、 テーブルをグループでまとめて整理できない。

解決策

テーブル名にプレフィックスを付けて整理する。

テーブル名に.,_,-を使えるため、これらを区切りとして用いる。

参考


複合primary keyに使える属性は、最大で2つ

ハマった点

「3つ以上の属性の複合primary key」を定義できない。

解決策

primary keyとして使用可能なのは、以下のいずれか。

  • Hash属性
  • Hash属性とRange属性のペア

このため、まずは、2つ以下の属性でprimary keyが定義できる設計を検討してみる。

検討の結果、3つ(以上)の属性の複合primary keyが必要な場合は、たとえば以下のように擬似的に実現する。

  1. 複合primary keyとして使用したい属性の値を元に、 テーブル内で一意となる値を生成(「idを生成」「値を結合」など)する
  2. 「生成した値」を代入する属性をテーブルに定義して、primary keyとして扱う

日付・時刻型がない

ハマった点

属性の型に、日付と時刻を表す型がない。

解決策

文字列型または数値型を使って、日付・時刻型を代替する。

  • 文字列型の場合
    • ISO 8601に従って、日付・時刻を表現
    • Range属性として使って、query結果を時刻順にソートしたい場合は、 タイムゾーンを統一して、かつ表記揺れをなくす必要がある
  • 数値型の場合
    • UNIX時間で、日付・時刻を表現
      • 閏秒を気にする場合は、また別の方法が必要
    • Range属性として使えば、query結果は時刻順にソートされる

参考


NS/SS/BS型は、配列ではない

ハマった点

NS/SS/BS型の値には、同じ要素は1つまでしか入らない。

解決策

NS/SS/BS型はSetを表すため、そのような動作となる。

配列が必要な際は、L型を使う。

参考


AttributeDefinitionsにkey属性以外を入れてはならない

ハマった点

createTableでテーブルを作成する際、 (CREATE TABLEにおけるカラム定義の気分で) AttributeDefinitionsに全ての属性を含めると、 ValidationExceptionが起きる。

解決策

AttributeDefinitionsには、key属性のみを含める。

(つまり、テーブルの属性定義には、key属性のみが含まれる)

参考


データ取得編

Range属性のみのkey条件指定はできない

ハマった点

(テーブル全体から範囲検索しようとして) Range属性のみへkey条件指定してqueryを行うと、ValidationExceptionが起きる。

解決策

key条件指定で有効なのは、

  • Hash属性のみへ条件指定
  • Hash属性とRange属性の両方へ条件指定

のいずれか。 そのため「Range属性のみへkey条件指定」は行えない。

ただし、「テーブル全体から範囲検索」の実現方法は、例えば以下の方法が考えられる。

  • scanを行う
    • FilterExpressionで、Range属性に対して条件範囲を指定
    • scanが、queryと比べて重い操作であることに注意
  • テーブル内のアイテムすべてで、Hash属性の値を同じにする
    • key条件として、Hash属性にその値を指定し、Range属性に範囲を指定して、queryを行う
    • Hash属性がすべて同じだと、すべてのアイテムが同じpartitionに保存されることに注意

もちろん、「テーブル全体から範囲検索する必要のない」設計の場合は、このハマりは回避できる。

参考


SQL(ライクな)文が使えない

ハマった点

SELECT value1, value2 FROM sample_table WHERE hash_key = 1 AND range_key >= 2

といったSQL文を、queryに指定できない。

解決策

SQL文の内容を、(可能な場合は)queryのパラメータに翻訳する。

たとえば、上記のSQL文では以下のよう。

{
  // SELECT
  "Select": "SPECIFIC_ATTRIBUTES",
  "ProjectionExpression": "value1, value2",

  // FROM
  "TableName": "sample_table",

  // WHERE
  "ExpressionAttributeValues": {
    ":hash": {"N": 1},
    ":from": {"N": 2}
  },
  "KeyConditionExpression": "hash_key = :hash AND range_key >= :from"
}

Expressionに、数値や文字列を直接書けない

ハマった点

条件指定に使うExpression(KeyConditionExpressionFilterExpressionなど) に数値や文字列を書くと、ValidationExceptionが起きる。

ハマる例:

{
  "KeyConditionExpression": "hash_key = 1"
}

解決策

ExpressionAttributeValuesを使い、値の代わりとなるプレースホルダーを作成して、 Expressionにはそのプレースホルダー名を書く。

ハマらない例:

{
  "ExpressionAttributeValues": {
    // 値のプレースホルダー名は、':'から始める約束
    ":hash": {"N": 1},
  },
  "KeyConditionExpression": "hash_key = :hash"
}

参考


Range属性に対して2つの条件を指定できない

ハマった点

KeyConditionExpression内で、Range属性に条件を2つ指定すると、 ValidationExceptionが起きる。

ハマる例:

{
  "ExpressionAttributeValues": {
    ":hash": {"N": 1},
    ":from": {"N": 100},
    ":to"  : {"N": 200}
  },
  "KeyConditionExpression": "hash_key = :hash AND range_key >= :from AND range_key <= :to"
}

解決策

KeyConditionExpressionでの条件指定は、1つのkeyに1つの条件まで。

この例の場合は、BETWEENを使うことで制限を回避する。

ハマらない例:

{
  "ExpressionAttributeValues": {
    ":hash": {"N": 1},
    ":from": {"N": 100},
    ":to"  : {"N": 200}
  },
  "KeyConditionExpression": "hash_key = :hash AND range_key BETWEEN :from AND: to"
}

Expressionには、含めてはならない予約語がある

ハマった点

Expression(KeyConditionExpressionProjectionExpressionなど)に含まれる属性名が予約語と被ると、ValidationExceptionが起きる。

ハマる例:

{
  "ExpressionAttributeValues": {
    ":y": {"N": 2015},
  },
  // yearは予約語
  "KeyConditionExpression": "year = :y"
}

解決策

ExpressionAttributeNamesを使い、属性名の代わりとなるプレースホルダーを作成して、 Expressionにはそのプレースホルダー名を記述する。

ハマらない例:

{
  "ExpressionAttributeNames": {
    // 属性名のプレースホルダー名は、'#'から始める約束
    "#year": "year"
  },
  "ExpressionAttributeValues": {
    ":y": {"N": 2015},
  },
  // yearは予約語
  "KeyConditionExpression": "#year = :y"
}

参考


一度に取得できるテーブル名の数に限界がある

ハマった点

listTablesが返すテーブル名の数は、最大100個。

このため、すべてのテーブル名を一度に取得できないことがある。

解決策

ExclusiveStartTableNameを適宜指定して、listTablesを繰り返し実行することで、 すべてのテーブル名を取得する。

  • 最初の実行では、ExclusiveStartTableNamenullを指定
  • 2回目以降の実行では、ExclusiveStartTableNameに 「ひとつ前に実行したlistTablesのレスポンスに含まれるLastEvaluatedTableNameの値」を指定
  • テーブル名を最後まで取得した際は、LastEvaluatedTableNameの値がnullとなる

参考


一度に取得できるアイテムの数に限界がある

ハマった点

query/scanが返すアイテムの数は、最大で1MBを超えない数まで。

このため、すべてのアイテムを一度に取得できないことがある。

解決策

ExclusiveStartKeyを適宜指定して、query/scanを繰り返し実行することで、 すべてのアイテムを取得する。

  • 最初の実行では、ExclusiveStartKeynullを指定
  • 2回目以降の実行では、ExclusiveStartKeyに「ひとつ前に実行したquery/scanのレスポンスに含まれるLastEvaluatedKeyの値」を指定
  • テーブル名を最後まで取得した際は、LastEvaluatedKeyの値がnullとなる

ちなみに「AWS SDK for Java ドキュメント API」のquery/scanでは、 この繰り返しをAPIが裏で行ってくれる。

参考


データ追加・更新編

空文字列を代入できない

ハマった点

S型の属性に空文字列を代入しようとすると、ValidationExceptionが起きる。

解決策

DynamoDBの制限のため、空文字列は代入できない。

空文字列を代入せずに済む仕様にする必要がある。

参考


batchWriteItemで、一度にputできるアイテム数に限界がある

ハマった点

batchWriteItemが一度に受け付けるRequest数は、最大25個。

26個以上のアイテムは一度にputできない。

解決策

アイテムは、25個以下ずつputする。

参考


batchWriteItemで、同じアイテムへのRequestは重複不可

ハマった点

batchWriteItemの実行時、 同じアイテムへのRequestがRequestItems内で重複すると、 ValidationExceptionが起きる。

同じアイテムかどうかは、同じkey属性値かどうかで決まる。

以下の、どのパターンでも起きる。

  • PutRequest間で重複
  • DeleteRequest間で重複
  • PutRequestDeleteRequest間で重複

解決策

重複しないようにする。

たとえば、 「オブジェクトの配列から複数のRequestを自動生成する場合」などに注意。 Requestの生成前か生成後に、重複排除のためのfilteringを行うなどして対処する。

参考


batchWriteItem成功時、全Requestが成功したとは限らない

ハマった点

batchWriteItemの成功は、すべてのRequestの成功を保証しない。

解決策

失敗したRequestを特定して、ケアする必要がある。

batchWriteItemのレスポンスに含まれるUnprocessedItemsに、 失敗したRequestがリストアップされるので、 これを用いて、batchWriteItemを再び実行する。

参考


おわりに

開発中にDynamoDBでハマった点を紹介しました。

新しいものに触った時はハマるのが常ですが、 ドキュメントとソースを見ながら、ググりながら、 落ち着いて理解/実験していけば、何とかなるものです(と信じたい)。

ハマりすぎて落ち着けなくなったら、帰りましょう。

【お知らせ】日経BP主催「Cloud Days Tokyo 2015 秋」に出展します

https://exponet.nikkeibp.co.jp/itev/images/logo/cloud.png


データ分析サービス・プロダクト担当の藤原です。

来る2015年9月30日(水)〜10月2日(金)、東京ビッグサイトで開催されるクラウドサービスの総合展「Cloud Days Tokyo 2015 秋」に、協賛出展いたします。
弊社としては初めての大規模な展示会での出展となるので、社員一同とても楽しみにしてます。

今回は、弊社のリアルタイム大規模データ分析プラットフォーム「Impulse」、企業におけるITシステム運用の強化を目的とした統合ログ管理・分析ソリューション「Grasper」を、AWSAmazon Web Services)ブース内にてご紹介します。

展示ブースでは、各サービスの提供機能の詳細、活用シーン、事例紹介等をお届けしたいと思います。
もちろん、営業担当だけではなく、技術メンバーもスタンバイしておりますので、弊社が提供する機械学習を利用した異常検知の実現方式やSparkを始めとする様々なオープンテクノロジーの活用状況等もご紹介できるかと思います。

展示ブースでのご案内に加え、サービスに関するプレゼンテーション(私がプレゼンします!)も予定しておりますので、お時間がありましたら是非会場へお越しください。

【開催概要】

  • 日時:2015年9月30日(水)-10月2日(金) 10:00~17:30
  • 会場:東京ビッグサイト 東4-6ホール、会議棟1-26(AWSブース内※)
  • 主催:日経BP
  • 入場料:事前登録で入場料は無料(当日入場料 3,000円)

(※)ブレインズテクノロジーは、AWSのパートナー企業として共同出展します


【プレゼンスケジュール】

  • 9/30(水)16:25-16:35
  • 10/1(木)13:05-13:15
  • 10/2(金)11:05-11:15


Cloud Daysへの事前登録のお申込みはこちらからどうぞ。
http://itpro.nikkeibp.co.jp/expo/2015/regist/

ご来場の際には、「AWSブース」「ブレインズテクノロジーブース」に、是非お立ち寄り下さい。

pysparkのWindow Functions(その1)

はじめまして、ブレインズテクノロジーのポンセです。

pysparkにWindow Functions(ウィンドウ関数)の機能がSpark 1.4で追加されました。
pyspark.sql module — PySpark 1.4.1 documentation

このWindow Functions、ランキングや移動平均値等々の集計を行うときに非常に便利です。
技術ブログ1回目はこのWindow Functionsについて簡単に書きたいと思います。

Window Functions(ウィンドウ関数)とは

Window Functionsはテーブルのグループごとに集計を行う機能ですが、
通常の集計関数とは異なり、inputの行毎に結果が返ります。

と言ってもわかりにくいので、何ができるのか下の例でみてみます。

Sparkのversionは1.4.1です。

>>> df = sqlCtx.createDataFrame([
  ('2015-08-01 00:00:00','cpu',40), ('2015-08-01 00:00:00','memory',50), ('2015-08-01 00:00:00','disk_io',70), 
  ('2015-08-01 01:00:00','cpu',20), ('2015-08-01 01:00:00','memory',90), ('2015-08-01 01:00:00','disk_io',70), 
  ('2015-08-01 02:00:00','cpu',100), ('2015-08-01 02:00:00','memory',55), ('2015-08-01 02:00:00','disk_io',72),
  ('2015-08-01 03:00:00','cpu',30), ('2015-08-01 03:00:00','memory',60), ('2015-08-01 03:00:00','disk_io',72), 
  ('2015-08-01 04:00:00','cpu',50), ('2015-08-01 04:00:00','memory',65), ('2015-08-01 04:00:00','disk_io',76),
  ('2015-08-01 05:00:00','cpu',45), ('2015-08-01 05:00:00','memory',40), ('2015-08-01 05:00:00','disk_io',90)
], ['timestamp','metrics','value'])
>>> df.show()
+-------------------+-------+-----+
|          timestamp|metrics|value|
+-------------------+-------+-----+
|2015-08-01 00:00:00|    cpu|   40|
|2015-08-01 00:00:00| memory|   50|
|2015-08-01 00:00:00|disk_io|   70|
|2015-08-01 01:00:00|    cpu|   20|
|2015-08-01 01:00:00| memory|   90|
|2015-08-01 01:00:00|disk_io|   70|
|2015-08-01 02:00:00|    cpu|  100|
|2015-08-01 02:00:00| memory|   55|
|2015-08-01 02:00:00|disk_io|   72|
|2015-08-01 03:00:00|    cpu|   30|
|2015-08-01 03:00:00| memory|   60|
|2015-08-01 03:00:00|disk_io|   72|
|2015-08-01 04:00:00|    cpu|   50|
|2015-08-01 04:00:00| memory|   65|
|2015-08-01 04:00:00|disk_io|   76|
|2015-08-01 05:00:00|    cpu|   45|
|2015-08-01 05:00:00| memory|   40|
|2015-08-01 05:00:00|disk_io|   90|
+-------------------+-------+-----+


上のようなカラムがtimestamp, metrics, valueからなるテーブルに対して、metricsごとでのvalueの平均値を取りたいとします。

通常の集計関数を使う場合は以下のようになります。

>>> df.groupBy('metrics').avg('value').collect()
[Row(metrics=u'disk_io', AVG(value)=75.0), 
Row(metrics=u'cpu', AVG(value)=47.5), 
Row(metrics=u'memory', AVG(value)=60.0)]

各metrics毎での平均値が単一のレコードで返ってきます。

Window Functionsを使うと以下のようになります。

>>> from pyspark.sql.window import Window
>>> from pyspark.sql import functions as F
>>> window_over_metrics = Window.partitionBy('metrics')
>>> df = df.withColumn('average', F.avg('value').over(window_over_metrics))
>>> df.show()
+-------------------+-------+-----+-------+
|          timestamp|metrics|value|average|
+-------------------+-------+-----+-------+
|2015-08-01 00:00:00|disk_io|   70|   75.0|
|2015-08-01 01:00:00|disk_io|   70|   75.0|
|2015-08-01 02:00:00|disk_io|   72|   75.0|
|2015-08-01 03:00:00|disk_io|   72|   75.0|
|2015-08-01 04:00:00|disk_io|   76|   75.0|
|2015-08-01 05:00:00|disk_io|   90|   75.0|
|2015-08-01 00:00:00|    cpu|   40|   47.5|
|2015-08-01 01:00:00|    cpu|   20|   47.5|
|2015-08-01 02:00:00|    cpu|  100|   47.5|
|2015-08-01 03:00:00|    cpu|   30|   47.5|
|2015-08-01 04:00:00|    cpu|   50|   47.5|
|2015-08-01 05:00:00|    cpu|   45|   47.5|
|2015-08-01 00:00:00| memory|   50|   60.0|
|2015-08-01 01:00:00| memory|   90|   60.0|
|2015-08-01 02:00:00| memory|   55|   60.0|
|2015-08-01 03:00:00| memory|   60|   60.0|
|2015-08-01 04:00:00| memory|   65|   60.0|
|2015-08-01 05:00:00| memory|   40|   60.0|
+-------------------+-------+-----+-------+


各行に対して、metricsごとの平均値が返ります。

行毎に結果が返るということは以下のように使うことも可能です。

>>> df.withColumn('value - average', df['value'] - F.avg('value').over(window_over_metrics)).show()
+-------------------+-------+-----+-------+---------------+
|          timestamp|metrics|value|average|value - average|
+-------------------+-------+-----+-------+---------------+
|2015-08-01 00:00:00|disk_io|   70|   75.0|           -5.0|
|2015-08-01 01:00:00|disk_io|   70|   75.0|           -5.0|
|2015-08-01 02:00:00|disk_io|   72|   75.0|           -3.0|
|2015-08-01 03:00:00|disk_io|   72|   75.0|           -3.0|
|2015-08-01 04:00:00|disk_io|   76|   75.0|            1.0|
|2015-08-01 05:00:00|disk_io|   90|   75.0|           15.0|
|2015-08-01 00:00:00|    cpu|   40|   47.5|           -7.5|
|2015-08-01 01:00:00|    cpu|   20|   47.5|          -27.5|
|2015-08-01 02:00:00|    cpu|  100|   47.5|           52.5|
|2015-08-01 03:00:00|    cpu|   30|   47.5|          -17.5|
|2015-08-01 04:00:00|    cpu|   50|   47.5|            2.5|
|2015-08-01 05:00:00|    cpu|   45|   47.5|           -2.5|
|2015-08-01 00:00:00| memory|   50|   60.0|          -10.0|
|2015-08-01 01:00:00| memory|   90|   60.0|           30.0|
|2015-08-01 02:00:00| memory|   55|   60.0|           -5.0|
|2015-08-01 03:00:00| memory|   60|   60.0|            0.0|
|2015-08-01 04:00:00| memory|   65|   60.0|            5.0|
|2015-08-01 05:00:00| memory|   40|   60.0|          -20.0|
+-------------------+-------+-----+-------+---------------+


各value値と平均値との差を新たなカラムとして追加することができました。
平均値との差が〜より大きいや小さい等のフィルターを組み合わせることで、特定の行に対する調査も簡単にできそうです。

また、Window FunctionsではorderByも指定できます。

>>> window_over_metrics = Window.partitionBy('metrics').orderBy('timestamp')
>>> df.withColumn('average', F.avg('value').over(window_over_metrics)).show()
+-------------------+-------+-----+------------------+
|          timestamp|metrics|value|           average|
+-------------------+-------+-----+------------------+
|2015-08-01 00:00:00|disk_io|   70|              70.0|
|2015-08-01 01:00:00|disk_io|   70|              70.0|
|2015-08-01 02:00:00|disk_io|   72| 70.66666666666667|
|2015-08-01 03:00:00|disk_io|   72|              71.0|
|2015-08-01 04:00:00|disk_io|   76|              72.0|
|2015-08-01 05:00:00|disk_io|   90|              75.0|
|2015-08-01 00:00:00|    cpu|   40|              40.0|
|2015-08-01 01:00:00|    cpu|   20|              30.0|
|2015-08-01 02:00:00|    cpu|  100|53.333333333333336|
|2015-08-01 03:00:00|    cpu|   30|              47.5|
|2015-08-01 04:00:00|    cpu|   50|              48.0|
|2015-08-01 05:00:00|    cpu|   45|              47.5|
|2015-08-01 00:00:00| memory|   50|              50.0|
|2015-08-01 01:00:00| memory|   90|              70.0|
|2015-08-01 02:00:00| memory|   55|              65.0|
|2015-08-01 03:00:00| memory|   60|             63.75|
|2015-08-01 04:00:00| memory|   65|              64.0|
|2015-08-01 05:00:00| memory|   40|              60.0|
+-------------------+-------+-----+------------------+

先ほど算出した平均値とは値が変わりました。
このaverageの値は移動平均の値となっています。

orderByにtimestampのカラムを指定することにより、各metricsごとに、
timestampの最初の行からの平均値を計算しています。


これまでの例は平均値でしたが、これ以外にも様々な集計関数がpysparkでサポートされています。
pyspark.sql module — PySpark 1.4.1 documentation

まとめ

SparkでもWindow Functionsがサポートされて、簡単で高度な集計が可能になりました。
まだ使ったことがない人はぜひ試してみてください!!

次回(その2)は、今回すっとばしたWindow Functionsの構文やFrameという概念、さらにUDFとの絡みなど細かい話に踏み込めればと思っています。
(全く違うことを書くかもしれませんが…)

さいごに

ブレインズテクノロジーでは、Sparkの様々なライブラリや機械学習の技術を駆使して、日々大規模データと戦っています。またApache Spark以外にもelasticsearchやfluentdなど、様々なOSSを組み合わせてデータ分析基盤Impulseを構築し、AWS上で展開しています。

Impulseについては、使っている技術を中心に今後紹介していきます。

参考

databricks.com