読者です 読者をやめる 読者になる 読者になる

Technology Topics by Brains

ブレインズテクノロジーの研究開発機関「未来工場」で働くエンジニアが、先端オープン技術、機械学習×データ分析(異常検知、予兆検知)に関する取組みをご紹介します。

【Spark】Window Functions(その2)

どうも、ポンセです。
前回の続きです(タイトルを微妙に変えていますが)。

SparkというよりSQLのWindow関数周りの話になっている気がしますが、気にせず書きます。

今回はSQLの形式で書きたいと思います。

ROWS

前回と同様にグループ単位での平均値を行毎で算出するのですが、
windowの行数を指定して集計したいとします。

そんな時は ROWS BETWEEN ~ AND ~ 。

データ
sample
+-------------------+-------+-----+
|          timestamp|metrics|value|
+-------------------+-------+-----+
|2015-08-01 00:00:00|    cpu|   40|
|2015-08-01 00:00:00| memory|   50|
|2015-08-01 00:00:00|disk_io|   70|
|2015-08-01 01:00:00|    cpu|   20|
|2015-08-01 01:00:00| memory|   90|
|2015-08-01 01:00:00|disk_io|   70|
|2015-08-01 02:00:00|    cpu|  100|
|2015-08-01 02:00:00| memory|   55|
|2015-08-01 02:00:00|disk_io|   72|
|2015-08-01 03:00:00|    cpu|   30|
|2015-08-01 03:00:00| memory|   60|
|2015-08-01 03:00:00|disk_io|   72|
|2015-08-01 04:00:00|    cpu|   50|
|2015-08-01 04:00:00| memory|   65|
|2015-08-01 04:00:00|disk_io|   76|
|2015-08-01 05:00:00|    cpu|   45|
|2015-08-01 05:00:00| memory|   40|
|2015-08-01 05:00:00|disk_io|   90|
+-------------------+-------+-----+
SQL
SELECT
  timestamp,
  metrics,
  value,
  AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as average
FROM
  sample
結果
+-------------------+-------+-----+------------------+
|          timestamp|metrics|value|           average|
+-------------------+-------+-----+------------------+
|2015-08-01 00:00:00|    cpu|   40|              30.0|
|2015-08-01 01:00:00|    cpu|   20|53.333333333333336|
|2015-08-01 02:00:00|    cpu|  100|              50.0|<- ***
|2015-08-01 03:00:00|    cpu|   30|              60.0|
|2015-08-01 04:00:00|    cpu|   50|41.666666666666664|
|2015-08-01 05:00:00|    cpu|   45|              47.5|
|2015-08-01 00:00:00|disk_io|   70|              70.0|
|2015-08-01 01:00:00|disk_io|   70| 70.66666666666667|
|2015-08-01 02:00:00|disk_io|   72| 71.33333333333333|
|2015-08-01 03:00:00|disk_io|   72| 73.33333333333333|
|2015-08-01 04:00:00|disk_io|   76| 79.33333333333333|
|2015-08-01 05:00:00|disk_io|   90|              83.0|
|2015-08-01 00:00:00| memory|   50|              70.0|
|2015-08-01 01:00:00| memory|   90|              65.0|
|2015-08-01 02:00:00| memory|   55| 68.33333333333333|
|2015-08-01 03:00:00| memory|   60|              60.0|
|2015-08-01 04:00:00| memory|   65|              55.0|
|2015-08-01 05:00:00| memory|   40|              52.5|
+-------------------+-------+-----+------------------+

例えば3行目を見てみます。
3行目はmetrics cpuに対して、20(1つ前の行)、 100(現在行)、30(1つ後ろの行)の平均値(すなわち50)となっています。
このようにROWS BETWEEN ~ AND ~ の構文で行数単位での集計が可能となります。

RANGE

今度はある値より大きいまたは小さい行でグルーピングして集計したいとします。

そんな時は RANGE BETWEEN ~ AND ~ 。

データ

データは上記例とは違い、timestampの間隔をバラバラにしています。

ex_sample
+-------------------+-------+-----+
|          timestamp|metrics|value|
+-------------------+-------+-----+
|2015-08-01 00:00:00|    cpu|   40|
|2015-08-01 00:02:00| memory|   50|
|2015-08-01 00:01:00|disk_io|   70|
|2015-08-01 00:05:00|    cpu|   20|
|2015-08-01 00:04:00| memory|   90|
|2015-08-01 00:04:00|disk_io|   70|
|2015-08-01 00:07:00|    cpu|  100|
|2015-08-01 00:10:00| memory|   55|
|2015-08-01 00:15:00|disk_io|   72|
|2015-08-01 00:15:00|    cpu|   30|
|2015-08-01 00:30:00| memory|   60|
|2015-08-01 00:20:00|disk_io|   72|
|2015-08-01 00:27:00|    cpu|   50|
|2015-08-01 00:35:00| memory|   65|
|2015-08-01 00:24:00|disk_io|   76|
|2015-08-01 00:32:00|    cpu|   45|
|2015-08-01 00:54:00| memory|   40|
|2015-08-01 00:40:00|disk_io|   90|
+-------------------+-------+-----+
SQL
SELECT
  timestamp,
  unix_timestamp(timestamp) as unix_time,
  metrics,
  value,
  AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) RANGE BETWEEN 600 PRECEDING AND CURRENT ROW) as average
FROM
  ex_sample
結果
+-------------------+----------+-------+-----+------------------+
|          timestamp| unix_time|metrics|value|           average|
+-------------------+----------+-------+-----+------------------+
|2015-08-01 00:00:00|1438354800|    cpu|   40|              40.0|
|2015-08-01 00:05:00|1438355100|    cpu|   20|              30.0|
|2015-08-01 00:07:00|1438355220|    cpu|  100|53.333333333333336|
|2015-08-01 00:15:00|1438355700|    cpu|   30|              50.0|<- ***
|2015-08-01 00:27:00|1438356420|    cpu|   50|              50.0|
|2015-08-01 00:32:00|1438356720|    cpu|   45|              47.5|
|2015-08-01 00:01:00|1438354860|disk_io|   70|              70.0|
|2015-08-01 00:04:00|1438355040|disk_io|   70|              70.0|
|2015-08-01 00:15:00|1438355700|disk_io|   72|              72.0|
|2015-08-01 00:20:00|1438356000|disk_io|   72|              72.0|
|2015-08-01 00:24:00|1438356240|disk_io|   76| 73.33333333333333|
|2015-08-01 00:40:00|1438357200|disk_io|   90|              90.0|
|2015-08-01 00:02:00|1438354920| memory|   50|              50.0|
|2015-08-01 00:04:00|1438355040| memory|   90|              70.0|
|2015-08-01 00:10:00|1438355400| memory|   55|              65.0|
|2015-08-01 00:30:00|1438356600| memory|   60|              60.0|
|2015-08-01 00:35:00|1438356900| memory|   65|              62.5|
|2015-08-01 00:54:00|1438358040| memory|   40|              40.0|
+-------------------+----------+-------+-----+------------------+

わかりやすくするために、unixtimeの行を追加しています。

例えば4行目を見てみますと、
この行ではORDER BY句で指定しているunixtimeに対して、600秒前から現在行すなわち、30(現在行)、100(1つ前)、20(2つ前)の平均(すなわち50)となっています。
このようにRANGE BETWEEN ~ AND ~ の構文で特定値より大きいまたは小さい行による集計が可能となります。

まとめ

・ROWSとRANGEを使って行数指定、値の幅指定を行ってSparkで集計してみました(全然大規模なデータじゃないけど)。

さいごに

Window関数に加えて、Spark1.5では Date time や String 周りの関数がかなりサポートされているので、集計周りは相当充実してきた印象です。もうすぐ1.6もリリースされそうなのでそちらにも注目していきたいところですね。

今回の例で使ったpythonのコードを載せておきます。

df = sqlCtx.createDataFrame([
  ('2015-08-01 00:00:00','cpu',40), ('2015-08-01 00:00:00','memory',50), ('2015-08-01 00:00:00','disk_io',70), 
  ('2015-08-01 01:00:00','cpu',20), ('2015-08-01 01:00:00','memory',90), ('2015-08-01 01:00:00','disk_io',70), 
  ('2015-08-01 02:00:00','cpu',100), ('2015-08-01 02:00:00','memory',55), ('2015-08-01 02:00:00','disk_io',72),
  ('2015-08-01 03:00:00','cpu',30), ('2015-08-01 03:00:00','memory',60), ('2015-08-01 03:00:00','disk_io',72), 
  ('2015-08-01 04:00:00','cpu',50), ('2015-08-01 04:00:00','memory',65), ('2015-08-01 04:00:00','disk_io',76),
  ('2015-08-01 05:00:00','cpu',45), ('2015-08-01 05:00:00','memory',40), ('2015-08-01 05:00:00','disk_io',90)
], ['timestamp','metrics','value'])

df.registerTempTable('sample')

q = """
SELECT
  timestamp,
  metrics,
  value,
  AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as average
FROM
  sample
"""

sqlContext.sql(q).show()



ex_df = sqlCtx.createDataFrame([
  ('2015-08-01 00:00:00','cpu',40), ('2015-08-01 00:02:00','memory',50), ('2015-08-01 00:01:00','disk_io',70), 
  ('2015-08-01 00:05:00','cpu',20), ('2015-08-01 00:04:00','memory',90), ('2015-08-01 00:04:00','disk_io',70), 
  ('2015-08-01 00:07:00','cpu',100), ('2015-08-01 00:10:00','memory',55), ('2015-08-01 00:15:00','disk_io',72),
  ('2015-08-01 00:15:00','cpu',30), ('2015-08-01 00:30:00','memory',60), ('2015-08-01 00:20:00','disk_io',72), 
  ('2015-08-01 00:27:00','cpu',50), ('2015-08-01 00:35:00','memory',65), ('2015-08-01 00:24:00','disk_io',76),
  ('2015-08-01 00:32:00','cpu',45), ('2015-08-01 00:54:00','memory',40), ('2015-08-01 00:40:00','disk_io',90)
], ['timestamp','metrics','value'])

ex_df.registerTempTable('ex_sample')


q = """
SELECT
  timestamp,
  unix_timestamp(timestamp) as unix_time,
  metrics,
  value,
  AVG(value) OVER (PARTITION BY metrics ORDER BY unix_timestamp(timestamp) RANGE BETWEEN 600 PRECEDING AND CURRENT ROW) as average
FROM
  ex_sample
"""

sqlContext.sql(q).show()