読者です 読者をやめる 読者になる 読者になる

Technology Topics by Brains

ブレインズテクノロジーの研究開発機関「未来工場」で働くエンジニアが、先端オープン技術、機械学習×データ分析(異常検知、予兆検知)に関する取組みをご紹介します。

pysparkのWindow Functions(その1)

はじめまして、ブレインズテクノロジーのポンセです。

pysparkにWindow Functions(ウィンドウ関数)の機能がSpark 1.4で追加されました。
pyspark.sql module — PySpark 1.4.1 documentation

このWindow Functions、ランキングや移動平均値等々の集計を行うときに非常に便利です。
技術ブログ1回目はこのWindow Functionsについて簡単に書きたいと思います。

Window Functions(ウィンドウ関数)とは

Window Functionsはテーブルのグループごとに集計を行う機能ですが、
通常の集計関数とは異なり、inputの行毎に結果が返ります。

と言ってもわかりにくいので、何ができるのか下の例でみてみます。

Sparkのversionは1.4.1です。

>>> df = sqlCtx.createDataFrame([
  ('2015-08-01 00:00:00','cpu',40), ('2015-08-01 00:00:00','memory',50), ('2015-08-01 00:00:00','disk_io',70), 
  ('2015-08-01 01:00:00','cpu',20), ('2015-08-01 01:00:00','memory',90), ('2015-08-01 01:00:00','disk_io',70), 
  ('2015-08-01 02:00:00','cpu',100), ('2015-08-01 02:00:00','memory',55), ('2015-08-01 02:00:00','disk_io',72),
  ('2015-08-01 03:00:00','cpu',30), ('2015-08-01 03:00:00','memory',60), ('2015-08-01 03:00:00','disk_io',72), 
  ('2015-08-01 04:00:00','cpu',50), ('2015-08-01 04:00:00','memory',65), ('2015-08-01 04:00:00','disk_io',76),
  ('2015-08-01 05:00:00','cpu',45), ('2015-08-01 05:00:00','memory',40), ('2015-08-01 05:00:00','disk_io',90)
], ['timestamp','metrics','value'])
>>> df.show()
+-------------------+-------+-----+
|          timestamp|metrics|value|
+-------------------+-------+-----+
|2015-08-01 00:00:00|    cpu|   40|
|2015-08-01 00:00:00| memory|   50|
|2015-08-01 00:00:00|disk_io|   70|
|2015-08-01 01:00:00|    cpu|   20|
|2015-08-01 01:00:00| memory|   90|
|2015-08-01 01:00:00|disk_io|   70|
|2015-08-01 02:00:00|    cpu|  100|
|2015-08-01 02:00:00| memory|   55|
|2015-08-01 02:00:00|disk_io|   72|
|2015-08-01 03:00:00|    cpu|   30|
|2015-08-01 03:00:00| memory|   60|
|2015-08-01 03:00:00|disk_io|   72|
|2015-08-01 04:00:00|    cpu|   50|
|2015-08-01 04:00:00| memory|   65|
|2015-08-01 04:00:00|disk_io|   76|
|2015-08-01 05:00:00|    cpu|   45|
|2015-08-01 05:00:00| memory|   40|
|2015-08-01 05:00:00|disk_io|   90|
+-------------------+-------+-----+


上のようなカラムがtimestamp, metrics, valueからなるテーブルに対して、metricsごとでのvalueの平均値を取りたいとします。

通常の集計関数を使う場合は以下のようになります。

>>> df.groupBy('metrics').avg('value').collect()
[Row(metrics=u'disk_io', AVG(value)=75.0), 
Row(metrics=u'cpu', AVG(value)=47.5), 
Row(metrics=u'memory', AVG(value)=60.0)]

各metrics毎での平均値が単一のレコードで返ってきます。

Window Functionsを使うと以下のようになります。

>>> from pyspark.sql.window import Window
>>> from pyspark.sql import functions as F
>>> window_over_metrics = Window.partitionBy('metrics')
>>> df = df.withColumn('average', F.avg('value').over(window_over_metrics))
>>> df.show()
+-------------------+-------+-----+-------+
|          timestamp|metrics|value|average|
+-------------------+-------+-----+-------+
|2015-08-01 00:00:00|disk_io|   70|   75.0|
|2015-08-01 01:00:00|disk_io|   70|   75.0|
|2015-08-01 02:00:00|disk_io|   72|   75.0|
|2015-08-01 03:00:00|disk_io|   72|   75.0|
|2015-08-01 04:00:00|disk_io|   76|   75.0|
|2015-08-01 05:00:00|disk_io|   90|   75.0|
|2015-08-01 00:00:00|    cpu|   40|   47.5|
|2015-08-01 01:00:00|    cpu|   20|   47.5|
|2015-08-01 02:00:00|    cpu|  100|   47.5|
|2015-08-01 03:00:00|    cpu|   30|   47.5|
|2015-08-01 04:00:00|    cpu|   50|   47.5|
|2015-08-01 05:00:00|    cpu|   45|   47.5|
|2015-08-01 00:00:00| memory|   50|   60.0|
|2015-08-01 01:00:00| memory|   90|   60.0|
|2015-08-01 02:00:00| memory|   55|   60.0|
|2015-08-01 03:00:00| memory|   60|   60.0|
|2015-08-01 04:00:00| memory|   65|   60.0|
|2015-08-01 05:00:00| memory|   40|   60.0|
+-------------------+-------+-----+-------+


各行に対して、metricsごとの平均値が返ります。

行毎に結果が返るということは以下のように使うことも可能です。

>>> df.withColumn('value - average', df['value'] - F.avg('value').over(window_over_metrics)).show()
+-------------------+-------+-----+-------+---------------+
|          timestamp|metrics|value|average|value - average|
+-------------------+-------+-----+-------+---------------+
|2015-08-01 00:00:00|disk_io|   70|   75.0|           -5.0|
|2015-08-01 01:00:00|disk_io|   70|   75.0|           -5.0|
|2015-08-01 02:00:00|disk_io|   72|   75.0|           -3.0|
|2015-08-01 03:00:00|disk_io|   72|   75.0|           -3.0|
|2015-08-01 04:00:00|disk_io|   76|   75.0|            1.0|
|2015-08-01 05:00:00|disk_io|   90|   75.0|           15.0|
|2015-08-01 00:00:00|    cpu|   40|   47.5|           -7.5|
|2015-08-01 01:00:00|    cpu|   20|   47.5|          -27.5|
|2015-08-01 02:00:00|    cpu|  100|   47.5|           52.5|
|2015-08-01 03:00:00|    cpu|   30|   47.5|          -17.5|
|2015-08-01 04:00:00|    cpu|   50|   47.5|            2.5|
|2015-08-01 05:00:00|    cpu|   45|   47.5|           -2.5|
|2015-08-01 00:00:00| memory|   50|   60.0|          -10.0|
|2015-08-01 01:00:00| memory|   90|   60.0|           30.0|
|2015-08-01 02:00:00| memory|   55|   60.0|           -5.0|
|2015-08-01 03:00:00| memory|   60|   60.0|            0.0|
|2015-08-01 04:00:00| memory|   65|   60.0|            5.0|
|2015-08-01 05:00:00| memory|   40|   60.0|          -20.0|
+-------------------+-------+-----+-------+---------------+


各value値と平均値との差を新たなカラムとして追加することができました。
平均値との差が〜より大きいや小さい等のフィルターを組み合わせることで、特定の行に対する調査も簡単にできそうです。

また、Window FunctionsではorderByも指定できます。

>>> window_over_metrics = Window.partitionBy('metrics').orderBy('timestamp')
>>> df.withColumn('average', F.avg('value').over(window_over_metrics)).show()
+-------------------+-------+-----+------------------+
|          timestamp|metrics|value|           average|
+-------------------+-------+-----+------------------+
|2015-08-01 00:00:00|disk_io|   70|              70.0|
|2015-08-01 01:00:00|disk_io|   70|              70.0|
|2015-08-01 02:00:00|disk_io|   72| 70.66666666666667|
|2015-08-01 03:00:00|disk_io|   72|              71.0|
|2015-08-01 04:00:00|disk_io|   76|              72.0|
|2015-08-01 05:00:00|disk_io|   90|              75.0|
|2015-08-01 00:00:00|    cpu|   40|              40.0|
|2015-08-01 01:00:00|    cpu|   20|              30.0|
|2015-08-01 02:00:00|    cpu|  100|53.333333333333336|
|2015-08-01 03:00:00|    cpu|   30|              47.5|
|2015-08-01 04:00:00|    cpu|   50|              48.0|
|2015-08-01 05:00:00|    cpu|   45|              47.5|
|2015-08-01 00:00:00| memory|   50|              50.0|
|2015-08-01 01:00:00| memory|   90|              70.0|
|2015-08-01 02:00:00| memory|   55|              65.0|
|2015-08-01 03:00:00| memory|   60|             63.75|
|2015-08-01 04:00:00| memory|   65|              64.0|
|2015-08-01 05:00:00| memory|   40|              60.0|
+-------------------+-------+-----+------------------+

先ほど算出した平均値とは値が変わりました。
このaverageの値は移動平均の値となっています。

orderByにtimestampのカラムを指定することにより、各metricsごとに、
timestampの最初の行からの平均値を計算しています。


これまでの例は平均値でしたが、これ以外にも様々な集計関数がpysparkでサポートされています。
pyspark.sql module — PySpark 1.4.1 documentation

まとめ

SparkでもWindow Functionsがサポートされて、簡単で高度な集計が可能になりました。
まだ使ったことがない人はぜひ試してみてください!!

次回(その2)は、今回すっとばしたWindow Functionsの構文やFrameという概念、さらにUDFとの絡みなど細かい話に踏み込めればと思っています。
(全く違うことを書くかもしれませんが…)

さいごに

ブレインズテクノロジーでは、Sparkの様々なライブラリや機械学習の技術を駆使して、日々大規模データと戦っています。またApache Spark以外にもelasticsearchやfluentdなど、様々なOSSを組み合わせてデータ分析基盤Impulseを構築し、AWS上で展開しています。

Impulseについては、使っている技術を中心に今後紹介していきます。

参考

databricks.com