終末 A.I.

データいじりや機械学習するエンジニアのブログ

データテストライブラリー「Deequ」を触ってみた

DeequはAWSがリリースしているデータテストを行うためのライブラリです(Deequの説明ではUnit Testと表現されています)。

ここで言うデータテストは、ETL処理やデータマート作成処理などの意図通り動いているどうか、取り込んだデータが昔と変化していないかを確認するための検証処理のことを指しています。

ETL処理などを最初に作成したタイミングでは、その処理が意図したものになっているか確認すると思います。一方で、日次のバッチ処理や、動き続けているストリーム処理について、本当に意図したようにデータが加工されているかどうかは、通常の方法では処理自体が成功したかどうかくらいしか確認するすべがありません。

しかし、日々のデータ処理は簡単に意図しないデータを生み出してしまう可能性があります。気づいたらデータの中身が変わっていて、変換処理が意図しない動作をしてしまっていたり、そもそもソースデータがおかしくなっていて重要な指標がずれてしまう、というようなことも考えられるでしょう。

そのような時に役に立つのがデータテストです。データテストでは、Nullを許容しないはずのカラムに何故かNullか入ってくるようになっていないか、過去のデータと比較して極端にデータの数が変化していないか、などを調べることを含む概念です。一言でいうと、データが意図しないものになっていないかを確認する処理、と言えます。

目次

Deequの何がいいのか

AWSのDeequは、そんなデータテストを簡単に実施できるようにするためのScala製ライブラリです。PythonラッパーであるPyDeequもあります。 Deequは、例えばデータ変換ツールであるdbtでもサポートしていますが、それと比べると、下記のような点が特徴としてあげられます。

  • Sparkベースでできている
    • SQLクエリで直接アクセスできない、ファイルだけがあるようなデータにも適用できる
    • プラグラムベースでしか実現しにくいような処理でも比較的組み込みやすい
  • プリセットのテスト関数が豊富に組み込まれている
    • 手元のデータ単体にフォーカスしたテスト関数だけで40個ほどプリセットである
    • AnomalyDetectionという、過去のデータの状態も参照してテストするための処理も組み込まれている
  • 必要なテスト処理をカラムごとにレコメンドしてくれる機能も組み込まれている

個人的に特にいいのは、AnomalyDetectionの機能が最初から組み込まれている点です。言わずもがなデータは日々変わりますので、実データについて何が正解かを決めにくい、という問題がテストを実装するにあたって存在します。

そのため、今までのデータと比べてどうかというのが、データがおかしくなっているかどうかを判断するための1つの重要な指標になります。特定の指標について経年変化をもとに「違和感」を自動で検出できる仕組みは非常に重要です。

Deequは、このようなAnomalyDetectionと、一般的なデータ単体のテストを、1つのライブラリで完結して行うことができるようになっています。

データテストの実行

基本的な使い方

データテストの基本的な実行は、PyDeequでは下記のように書くことで実現できます。

サンプルコード1

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pydeequ.checks import *
from pydeequ.verification import *

df = spark.createDataFrame(data=[
        (1, "Test1", "foo"), 
        (2, "Test2", "foo"), 
        (3, "Test3", "bar"), 
        (4, "Test4", "baz"), 
        (5, "Test5", "baz"), 
        (6, "Test6", "bar"), 
        (7, "Test7", None), 
        (8, "Test8", "bar"), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", StringType(), True),
        StructField("c", StringType(), True),
]))

check_warning = Check(spark, CheckLevel.Warning, "Warning Check")
check_error = Check(spark, CheckLevel.Error, "Error Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check_warning.isComplete("a") \
        .isComplete("b") \
        .isComplete("c")) \
    .addCheck(
        check_error.isPositive("a") \
        .isUnique("b") \
        .isContainedIn("c", ["foo", "bar", "baz"])) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

実行結果は下記のようになります。

結果1

+-------------+-----------+------------+-----------------------------------------------------------------------------------------------------------+-----------------+------------------------------------------------------+
|check        |check_level|check_status|constraint                                                                                                 |constraint_status|constraint_message                                    |
+-------------+-----------+------------+-----------------------------------------------------------------------------------------------------------+-----------------+------------------------------------------------------+
|Warning Check|Warning    |Warning     |CompletenessConstraint(Completeness(a,None))                                                               |Success          |                                                      |
|Warning Check|Warning    |Warning     |CompletenessConstraint(Completeness(b,None))                                                               |Success          |                                                      |
|Warning Check|Warning    |Warning     |CompletenessConstraint(Completeness(c,None))                                                               |Failure          |Value: 0.875 does not meet the constraint requirement!|
|Error Check  |Error      |Success     |ComplianceConstraint(Compliance(a is positive,COALESCE(CAST(a AS DECIMAL(20,10)), 1.0) > 0,None))          |Success          |                                                      |
|Error Check  |Error      |Success     |UniquenessConstraint(Uniqueness(List(b),None))                                                             |Success          |                                                      |
|Error Check  |Error      |Success     |ComplianceConstraint(Compliance(c contained in foo,bar,baz,`c` IS NULL OR `c` IN ('foo','bar','baz'),None))|Success          |                                                      |
+-------------+-----------+------------+-----------------------------------------------------------------------------------------------------------+-----------------+------------------------------------------------------+

VerificationSuiteとCheckの2種類のオブジェクトから構成され、Checkオブジェクトにメソッドチェーンの形でテストしたい項目を追加していき、addCheckでCheckオブジェクトをVerificationSuiteに登録、runでテストを実行できます。

Checkがテストする内容は、関数とカラム名によって決まります。 isComplete("a") であれば、カラムaの値がすべてNullでないことを検証してくれます。検証の結果は、結果が格納されたDataFrameのconstraint_statusカラムに格納され、成功であればSuccess、失敗であればFailureが入ります。

Checkオブジェクトには、WarningとErrorの2種類のCheckLevelと、そのCheckについての説明を指定することができます。同じCheckオブジェクトに紐付いたテストが1つでも失敗すると、結果のcheck_statusは、設定されているCheckLevelに応じた値が入るようになります。

Checkオブジェクト毎にどのようなテストを追加するかは後処理でどのようにしたいかによって分けるのが良いでしょう。ErrorLevelでオブジェクトを分ける、カラムごとにオブジェクトを分ける、テスト項目ごとにオブジェクトを分ける、などがパターンとして考えられます。

複数カラムの関係をテストする

Deequでは、特定カラムだけでなく、複数のカラムの組み合わせについてテストすることもできます。

下記のように、テスト関数の引数が異なるだけで、基本的な使い方は単一カラムのテストの場合と使い方は変わりません。

サンプルコード2

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pydeequ.checks import *
from pydeequ.verification import *

df = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, "baz"), 
        (6, 60, "bar"), 
        (7, 70, None), 
        (8, 80, "bar"), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

check_warning = Check(spark, CheckLevel.Warning, "Warning Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check_warning.isLessThan("a", "b") \
        .hasCorrelation("a", "b", lambda x: x >= 1.0) \
        .hasUniqueness(["b", "c"], lambda x: x >= 1.0)) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

isLessThanは、文字通りの意味でaの値が対応するbの値よりも小さいことを確認するテストです。

hasCorrelationは、指定した2つのカラムの相関係数が、指定したassertion関数を満たすことを確認するテストです。

hasUniquenessは、hasCorrelationと同様で、指定したカラムすべてを考慮してユニークかどうかを判定し、その結果がassertion関数を満たすことを確認するテストです。

制約条件を満たさないことをテストする

has系のテスト関数は、上記で書いたようにassertion関数を満たすかどうかでテスト結果が決まります。

このテスト関数の性質を利用して、条件を満たさないことをテストすることも可能です。

サンプルコード3

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pydeequ.checks import *
from pydeequ.verification import *

df = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, "baz"), 
        (6, 60, "bar"), 
        (7, 70, None), 
        (8, 80, "bar"), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

check_warning = Check(spark, CheckLevel.Warning, "Warning Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check_warning.containsEmail("c", lambda x: x == 0.0) \
        .hasMin("a", lambda x: x > 0)) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

hasMinは、指定したカラムの最小値が条件を満たすことを確認するテストです。0より小さくないことを確認する場合は、assertion関数で最小値が0より大きくなることを確認すればよいです。

containsEmailは、デフォルトでは指定したカラムのすべての値がEmailの形式を満たすかを確認するテスト関数です。assertion関数をしていした場合、Emailの値を含むカラムの割合を確認することができます。つまりこの値が0であることを確認すれば、指定したカラムにEmail形式の文字列を含まないことをテストすることができます。

カスタマイズした内容をテストする

satisfies関数を使う事により、任意のSQLを満たすかどうか、もしくは、レコードのうち何割が満たすかをテストすることができます。

サンプルコード4

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pydeequ.checks import *
from pydeequ.verification import *

df = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, "baz"), 
        (6, 60, "bar"), 
        (7, 70, None), 
        (8, 80, "bar"), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

check_warning = Check(spark, CheckLevel.Warning, "Warning Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check_warning.satisfies("b % 10 = 0", "B is 10 dividable", lambda x: x == 1.0) \
        .satisfies("rlike(c, '[0-9]+?')", "C is not contained numeric", lambda x: x == 0.0) \
        .satisfies("b / a = 10", "b / a is 10", lambda x: x == 1.0)) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

satisfies関数の引数は、条件のSQL文、条件の説明文、条件を満たすレコードの割合のassertion関数の3つが必要です。

SQL文は、Sparkのfilter関数で使用できるSQLの条件文であれば何でも使用することができます。rlikeで正規表現でマッチさせたり、複数のカラム間の関係を条件にすることもできます。

説明文は、下記の結果の出力内に使用される文字列ですので、後で識別できるものであれば何でも大丈夫です。

assertion関数は、pythonのsatisfies関数の定義ではOptionalですが、指定していないとエラーになってしまいます。すべてのレコードを満たすことを確認する場合は lambda x: x == 1.0 を、満たさないことを確認する場合は lambda x: x == 0.0 を指定しておく必要があります。

結果4

+-------------+-----------+------------+-------------------------------------------------------------------------------------+-----------------+------------------+
|check        |check_level|check_status|constraint                                                                           |constraint_status|constraint_message|
+-------------+-----------+------------+-------------------------------------------------------------------------------------+-----------------+------------------+
|Warning Check|Warning    |Success     |ComplianceConstraint(Compliance(B is 10 dividable,b % 10 = 0,None))                  |Success          |                  |
|Warning Check|Warning    |Success     |ComplianceConstraint(Compliance(C is not contained numeric,rlike(c, '[0-9]+?'),None))|Success          |                  |
|Warning Check|Warning    |Success     |ComplianceConstraint(Compliance(b / a is 10,b / a = 10,None))                        |Success          |                  |
+-------------+-----------+------------+-------------------------------------------------------------------------------------+-----------------+------------------+

AnomalyDetectionの実行

基本的な使い方

AnomalyDetectionの基本的な実行は、PyDeequでは下記のように書くことで実現できます。

サンプルコード5

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pydeequ.analyzers import *
from pydeequ.anomaly_detection import *
from pydeequ.repository import *
from pydeequ.verification import *
import datetime

now = datetime.datetime.now().timestamp()
repo = InMemoryMetricsRepository(spark)
yesterdaysKey = ResultKey(spark, int(now * 1000) - 24 * 60 * 60 * 1000)

df_yesterday = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, "baz"), 
        (6, 60, "bar"), 
        (7, 70, None), 
        (8, 80, "bar"), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

checkResult = AnalysisRunner(spark) \
    .onData(df_yesterday) \
    .useRepository(repo) \
    .saveOrAppendResult(yesterdaysKey) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Maximum("a")) \
    .run()

df = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, "baz"), 
        (6, 60, "bar"), 
        (7, 70, None), 
        (8, 80, "bar"), 
        (9, 90, "bar"), 
        (10, 100, "bar"), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

todaysKey = ResultKey(spark, int(now * 1000))
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .useRepository(repo) \
    .saveOrAppendResult(todaysKey) \
    .addAnomalyCheck(AbsoluteChangeStrategy(-1.0, 1.0), Size()) \
    .addAnomalyCheck(AbsoluteChangeStrategy(-2.0, 2.0), Size()) \
    .addAnomalyCheck(RelativeRateOfChangeStrategy(0.9, 1.1), Maximum("a")) \
    .addAnomalyCheck(RelativeRateOfChangeStrategy(0.7, 1.3), Maximum("a")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

repo.load() \
  .getSuccessMetricsAsDataFrame() \
  .show()

AnomalyDetectionでは、通常のテストと違い、Checkの代わりに、AnomalyDetectionの方法を指定するオブジェクト、Analyzerというテスト項目を指定するオブジェクト、Repositoryという今までのテスト項目の結果を保存してくれるオブジェクト、ResultKeyといういつの時間のデータを保存するかを指定するオブジェクトが必要になります。

基本的にはRepositoryのオブジェクトに保存されている直前(オブジェクトに保存されている順番で直前)のデータを各テスト項目について比較して結果を出力します。Repositoryオブジェクトへの結果の保存は、AnalyzerRunnerクラスを用いて、Analyzerによるテスト項目の計算のみを行いその結果を保存する方法と、VerificationSuiteでAnomalyDetectionも行いながら、テスト項目の計算結果を保存する2つのアプローチがあります。Repositoryオブジェクトが管理しているファイルやメモリ上のオブジェクトの結果は、保存するたびに結果が追記されていくため、VerificationSuiteでの保存は用途に使い分けるのが良さそうです。

AbsoluteChangeStrategyは、保存されている直前の値と比較して、変化量がmaxRateDecreaseとmaxRateIncreaseの範囲に収まっているかどうかを確認します。

RelativeRateOfChangeStrategyは、保存されている直前の値と比較して、変化率がmaxRateDecreaseとmaxRateIncreaseの範囲に収まっているかどうかを確認します。

結果5

+---------------------------------+-----------+------------+----------------------------------+-----------------+-----------------------------------------------------+
|check                            |check_level|check_status|constraint                        |constraint_status|constraint_message                                   |
+---------------------------------+-----------+------------+----------------------------------+-----------------+-----------------------------------------------------+
|Anomaly check for Size(None)     |Warning    |Warning     |AnomalyConstraint(Size(None))     |Failure          |Value: 10.0 does not meet the constraint requirement!|
|Anomaly check for Size(None)     |Warning    |Success     |AnomalyConstraint(Size(None))     |Success          |                                                     |
|Anomaly check for Maximum(a,None)|Warning    |Warning     |AnomalyConstraint(Maximum(a,None))|Failure          |Value: 10.0 does not meet the constraint requirement!|
|Anomaly check for Maximum(a,None)|Warning    |Success     |AnomalyConstraint(Maximum(a,None))|Success          |                                                     |
+---------------------------------+-----------+------------+----------------------------------+-----------------+-----------------------------------------------------+

テスト項目は、Analyzerクラスの種類と指定されているカラムの組み合わせで決まります。 サンプルコードでは、todaysKey に関連して実行されている addAnomalyCheck には Size()Maximum("a") が指定されているものが2つありますが、下記のようにレポジトリに保存されている内容は、1つのみとなります。

保存内容5

+-------+--------+-------+-----+-------------+
| entity|instance|   name|value| dataset_date|
+-------+--------+-------+-----+-------------+
|Dataset|       *|   Size|  8.0|1628216988556|
| Column|       a|Maximum|  8.0|1628216988556|
|Dataset|       *|   Size| 10.0|1628303388556|
| Column|       a|Maximum| 10.0|1628303388556|
+-------+--------+-------+-----+-------------+

傾向の変化を検出する

AnomalyDetectionでは、直前だけでなく過去の複数のデータを使って、テストを行うことができます。

サンプルコード6

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pydeequ.analyzers import *
from pydeequ.anomaly_detection import *
from pydeequ.repository import *
from pydeequ.verification import *
import datetime

now = datetime.datetime.now().timestamp()
repo = InMemoryMetricsRepository(spark)

for i in range(24):
  yesterdaysKey = ResultKey(spark, int(now * 1000) - (24 - i) * 60 * 60 * 1000)

  df_yesterday = spark.createDataFrame(data=[
          (1, 10, "foo"), 
          (2, 20, "foo"), 
          (3, 30, "bar"), 
          (4, 40, "baz"), 
          (5, 50, "baz"), 
          (6, 60, "bar"), 
          (7, 70, None), 
          (8, 80, "bar"), 
  ], schema=StructType([
          StructField("a", IntegerType(), True),
          StructField("b", IntegerType(), True),
          StructField("c", StringType(), True),
  ]))

  checkResult = AnalysisRunner(spark) \
      .onData(df_yesterday) \
      .useRepository(repo) \
      .saveOrAppendResult(yesterdaysKey) \
      .addAnalyzer(Mean("a")) \
      .addAnalyzer(Completeness("c")) \
      .run()

df = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, None), 
        (6, 60, None), 
        (7, 70, None), 
        (8, 80, "bar"), 
        (9, 90, None), 
        (1000, 100, None), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

todaysKey = ResultKey(spark, int(now * 1000))
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .useRepository(repo) \
    .saveOrAppendResult(todaysKey) \
    .addAnomalyCheck(OnlineNormalStrategy(), Mean("a")) \
    .addAnomalyCheck(OnlineNormalStrategy(), Completeness("c")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

repo.load() \
  .getSuccessMetricsAsDataFrame() \
  .show()

OnlineNormalStrategyは、Analyzerで指定されたテスト項目の過去の値の平均と標準偏差を計算し、今回のテスト項目の値が mean - lowerDeviationFactor *stdDev と mean + upperDeviationFactor * stDev の間に収まっているかどうかを確認します。

ignoreAnomalies で履歴データ内の外れ値を無視して平均と標準偏差を計算してくれることを期待しますが、現状のDeequ側の実装では残念ながらそれらは無視されず、平均と標準偏差の計算に考慮されてしまいます。また、ウィンドウサイズのようなものを指定することができないため、Repositoryに保存されているデータを渡す前に特定日以前は切っておくというような操作が必要になります。

結果6

+--------------------------------------+-----------+------------+---------------------------------------+-----------------+------------------------------------------------------+
|check                                 |check_level|check_status|constraint                             |constraint_status|constraint_message                                    |
+--------------------------------------+-----------+------------+---------------------------------------+-----------------+------------------------------------------------------+
|Anomaly check for Mean(a,None)        |Warning    |Warning     |AnomalyConstraint(Mean(a,None))        |Failure          |Value: 104.5 does not meet the constraint requirement!|
|Anomaly check for Completeness(c,None)|Warning    |Warning     |AnomalyConstraint(Completeness(c,None))|Failure          |Value: 0.5 does not meet the constraint requirement!  |
+--------------------------------------+-----------+------------+---------------------------------------+-----------------+------------------------------------------------------+

季節性を考慮した変化の検出する

履歴をもとに AnomalyDetection を行う場合は、周期性を考慮して行いたいケースが多くあります。Deequでは、週単位、年単位の周期を考慮してテストすることができます。

サンプルコード7

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pydeequ.analyzers import *
from pydeequ.anomaly_detection import *
from pydeequ.repository import *
from pydeequ.verification import *
import datetime

now = datetime.datetime.now().timestamp()
repo = InMemoryMetricsRepository(spark)

for k in range(2):
  for i in range(1, 7):
    yesterdaysKey = ResultKey(spark, int(now * 1000) - (k*7+i) * 24 * 60 * 60 * 1000)

    df_yesterday = spark.createDataFrame(data=[
            (1, 10, "foo"), 
            (2, 20, "foo"), 
            (3, 30, "bar"), 
            (4, 40, "baz"), 
            (5, 50, "baz"), 
            (6, 60, "bar"), 
            (7, 70, None), 
            (8, 80, "bar"), 
    ], schema=StructType([
            StructField("a", IntegerType(), True),
            StructField("b", IntegerType(), True),
            StructField("c", StringType(), True),
    ]))

    checkResult = AnalysisRunner(spark) \
        .onData(df_yesterday) \
        .useRepository(repo) \
        .saveOrAppendResult(yesterdaysKey) \
        .addAnalyzer(Mean("a")) \
        .addAnalyzer(Completeness("c")) \
        .run()
  yesterdaysKey = ResultKey(spark, int(now * 1000) - (k*7+7) * 24 * 60 * 60 * 1000)

  df_yesterday = spark.createDataFrame(data=[
          (1, 10, "foo"), 
          (2, 20, "foo"), 
          (3, 30, "bar"), 
          (4, 40, "baz"), 
          (5, 50, None), 
          (6, 60, None), 
          (7, 70, None), 
          (8, 80, None), 
  ], schema=StructType([
          StructField("a", IntegerType(), True),
          StructField("b", IntegerType(), True),
          StructField("c", StringType(), True),
  ]))

  checkResult = AnalysisRunner(spark) \
      .onData(df_yesterday) \
      .useRepository(repo) \
      .saveOrAppendResult(yesterdaysKey) \
      .addAnalyzer(Mean("a")) \
      .addAnalyzer(Completeness("c")) \
      .run()

df = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, None), 
        (6, 60, None), 
        (7, 70, None), 
        (8, 80, "bar"), 
        (9, 90, None), 
        (1000, 100, None), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

todaysKey = ResultKey(spark, int(now * 1000))
checkResult = VerificationSuite(spark) \
    .onData(df) \
    .useRepository(repo) \
    .saveOrAppendResult(todaysKey) \
    .addAnomalyCheck(HoltWinters(MetricInterval.Daily, SeriesSeasonality.Weekly), Mean("a")) \
    .addAnomalyCheck(HoltWinters(MetricInterval.Daily, SeriesSeasonality.Weekly), Completeness("c")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show(truncate=False)

repo.load() \
  .getSuccessMetricsAsDataFrame() \
  .show()

HoltWintersは、データの頻度と周期の2つを指定することで、該当のテスト項目が履歴データから外れたものでないかを確認することができます。

結果7

+--------------------------------------+-----------+------------+---------------------------------------+-----------------+------------------------------------------------------+
|check                                 |check_level|check_status|constraint                             |constraint_status|constraint_message                                    |
+--------------------------------------+-----------+------------+---------------------------------------+-----------------+------------------------------------------------------+
|Anomaly check for Mean(a,None)        |Warning    |Warning     |AnomalyConstraint(Mean(a,None))        |Failure          |Value: 104.5 does not meet the constraint requirement!|
|Anomaly check for Completeness(c,None)|Warning    |Success     |AnomalyConstraint(Completeness(c,None))|Success          |                                                      |
+--------------------------------------+-----------+------------+---------------------------------------+-----------------+------------------------------------------------------+

Repositoryの格納内容

Repositoryには下記のように、すべての時間のすべてのAnalyzerのテスト結果が1つのファイル内(メモリだと1つのオブジェクト)に格納されます。

保存内容

[
  {
    "resultKey": {
      "dataSetDate": 1628402595296,
      "tags": {}
    },
    "analyzerContext": {
      "metricMap": [
        {
          "analyzer": {
            "analyzerName": "Size"
          },
          "metric": {
            "metricName": "DoubleMetric",
            "entity": "Dataset",
            "instance": "*",
            "name": "Size",
            "value": 8.0
          }
        },
        {
          "analyzer": {
            "analyzerName": "Maximum",
            "column": "a"
          },
          "metric": {
            "metricName": "DoubleMetric",
            "entity": "Column",
            "instance": "a",
            "name": "Maximum",
            "value": 8.0
          }
        }
      ]
    }
  },
  {
    "resultKey": {
      "dataSetDate": 1628316195296,
      "tags": {}
    },
    "analyzerContext": {
      "metricMap": [
        {
          "analyzer": {
            "analyzerName": "Size"
          },
          "metric": {
            "metricName": "DoubleMetric",
            "entity": "Dataset",
            "instance": "*",
            "name": "Size",
            "value": 10.0
          }
        },
        {
          "analyzer": {
            "analyzerName": "Maximum",
            "column": "a"
          },
          "metric": {
            "metricName": "DoubleMetric",
            "entity": "Column",
            "instance": "a",
            "name": "Maximum",
            "value": 10.0
          }
        }
      ]
    }
  },
]

その他の機能

データのプロファイリング

Profilerを用いることで、どのようなデータが格納されるカラムかを簡単に確認することができます。出力された値をもとにどのようなテスト項目があると良さそうかを、自動もしくは手動で判定するのに使うことが主な用途として想定されます。

サンプルコード8

from pydeequ.profiles import *

df = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, "baz"), 
        (6, 60, "bar"), 
        (7, 70, None), 
        (8, 80, "bar"), 
        (9, 90, "bar"), 
        (10, 100, "bar"), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

取得できる主な項目は、completeness、approximateNumDistinctValues、dataType、histogram、meanなどの統計量です。

completenessはNullでない値の割合、approximateNumDistinctValuesは推定した値の種類、dataTypeはデータの型、histogramはどの値がどれだけ(数と割合)含まれているかの配列、統計量は数字カラムのみでそのカラム全体の平均や最大値などを返してくれます。

結果8

NumericProfiles for column: a: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 10,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "8",
            1,
            0.1
        ],
        [
            "4",
            1,
            0.1
        ],
        [
            "9",
            1,
            0.1
        ],
        [
            "5",
            1,
            0.1
        ],
        [
            "10",
            1,
            0.1
        ],
        [
            "6",
            1,
            0.1
        ],
        [
            "1",
            1,
            0.1
        ],
        [
            "2",
            1,
            0.1
        ],
        [
            "7",
            1,
            0.1
        ],
        [
            "3",
            1,
            0.1
        ]
    ],
    "kll": "None",
    "mean": 5.5,
    "maximum": 10.0,
    "minimum": 1.0,
    "sum": 55.0,
    "stdDev": 2.8722813232690143,
    "approxPercentiles": []
}
NumericProfiles for column: b: {
    "completeness": 1.0,
    "approximateNumDistinctValues": 10,
    "dataType": "Integral",
    "isDataTypeInferred": false,
    "typeCounts": {},
    "histogram": [
        [
            "100",
            1,
            0.1
        ],
        [
            "40",
            1,
            0.1
        ],
        [
            "90",
            1,
            0.1
        ],
        [
            "50",
            1,
            0.1
        ],
        [
            "10",
            1,
            0.1
        ],
        [
            "80",
            1,
            0.1
        ],
        [
            "60",
            1,
            0.1
        ],
        [
            "20",
            1,
            0.1
        ],
        [
            "70",
            1,
            0.1
        ],
        [
            "30",
            1,
            0.1
        ]
    ],
    "kll": "None",
    "mean": 55.0,
    "maximum": 100.0,
    "minimum": 10.0,
    "sum": 550.0,
    "stdDev": 28.722813232690143,
    "approxPercentiles": []
}
StandardProfiles for column: c: {
    "completeness": 0.9,
    "approximateNumDistinctValues": 3,
    "dataType": "String",
    "isDataTypeInferred": false,
    "typeCounts": {
        "Boolean": 0,
        "Fractional": 0,
        "Integral": 0,
        "Unknown": 1,
        "String": 9
    },
    "histogram": [
        [
            "bar",
            5,
            0.5
        ],
        [
            "baz",
            2,
            0.2
        ],
        [
            "foo",
            2,
            0.2
        ],
        [
            "NullValue",
            1,
            0.1
        ]
    ]
}

テスト項目のレコメンデーション

ConstraintSuggestionは、Profilerからさらに進んで、Checkオブジェクトにどのようなテスト項目を追加したほうが良さそうかを、レコメンドしてくれます。

サンプルコード9

from pydeequ.suggestions import *

df = spark.createDataFrame(data=[
        (1, 10, "foo"), 
        (2, 20, "foo"), 
        (3, 30, "bar"), 
        (4, 40, "baz"), 
        (5, 50, "baz"), 
        (6, 60, "bar"), 
        (7, 70, None), 
        (8, 80, "bar"), 
        (9, 90, "bar"), 
        (10, 100, "bar"), 
], schema=StructType([
        StructField("a", IntegerType(), True),
        StructField("b", IntegerType(), True),
        StructField("c", StringType(), True),
]))

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

for item in suggestionResult['constraint_suggestions']:
  print(item)
  print()

レコメンドされる対象のルールは、addConstraintRuleで指定する事ができ、DEFAULTではsuggestionsモジュール配下のすべてのルールが含まれます。含まれるルールには、Nullでないかを確認するテストを追加すべきか判定する CompleteIfCompleteRule 、負の値が含まれていないかを確認するテストを追加すべきかを判定する NonNegativeNumbersRule などがあります。

constraint_suggestions の各結果には、constraint_name、column_name、current_value、description、suggesting_rule、rule_description、code_for_constraintの各値が含まれます。利用上もっとも重要なのが code_for_constraint で、Checkオブジェクトに該当のテスト項目を追加するための実装がそのまま記載されています。

結果9

{'constraint_name': 'CompletenessConstraint(Completeness(b,None))', 'column_name': 'b', 'current_value': 'Completeness: 1.0', 'description': "'b' is not null", 'suggesting_rule': 'CompleteIfCompleteRule()', 'rule_description': 'If a column is complete in the sample, we suggest a NOT NULL constraint', 'code_for_constraint': '.isComplete("b")'}

{'constraint_name': "ComplianceConstraint(Compliance('b' has no negative values,b >= 0,None))", 'column_name': 'b', 'current_value': 'Minimum: 10.0', 'description': "'b' has no negative values", 'suggesting_rule': 'NonNegativeNumbersRule()', 'rule_description': 'If we see only non-negative numbers in a column, we suggest a corresponding constraint', 'code_for_constraint': '.isNonNegative("b")'}

{'constraint_name': 'UniquenessConstraint(Uniqueness(List(b),None))', 'column_name': 'b', 'current_value': 'ApproxDistinctness: 1.0', 'description': "'b' is unique", 'suggesting_rule': 'UniqueIfApproximatelyUniqueRule()', 'rule_description': 'If the ratio of approximate num distinct values in a column is close to the number of records (within the error of the HLL sketch), we suggest a UNIQUE constraint', 'code_for_constraint': '.isUnique("b")'}

{'constraint_name': 'CompletenessConstraint(Completeness(a,None))', 'column_name': 'a', 'current_value': 'Completeness: 1.0', 'description': "'a' is not null", 'suggesting_rule': 'CompleteIfCompleteRule()', 'rule_description': 'If a column is complete in the sample, we suggest a NOT NULL constraint', 'code_for_constraint': '.isComplete("a")'}

{'constraint_name': "ComplianceConstraint(Compliance('a' has no negative values,a >= 0,None))", 'column_name': 'a', 'current_value': 'Minimum: 1.0', 'description': "'a' has no negative values", 'suggesting_rule': 'NonNegativeNumbersRule()', 'rule_description': 'If we see only non-negative numbers in a column, we suggest a corresponding constraint', 'code_for_constraint': '.isNonNegative("a")'}

{'constraint_name': 'UniquenessConstraint(Uniqueness(List(a),None))', 'column_name': 'a', 'current_value': 'ApproxDistinctness: 1.0', 'description': "'a' is unique", 'suggesting_rule': 'UniqueIfApproximatelyUniqueRule()', 'rule_description': 'If the ratio of approximate num distinct values in a column is close to the number of records (within the error of the HLL sketch), we suggest a UNIQUE constraint', 'code_for_constraint': '.isUnique("a")'}

{'constraint_name': "ComplianceConstraint(Compliance('c' has value range 'bar', 'baz', 'foo' for at least 99.0% of values,`c` IN ('bar', 'baz', 'foo'),None))", 'column_name': 'c', 'current_value': 'Compliance: 0.9999999999999999', 'description': "'c' has value range 'bar', 'baz', 'foo' for at least 99.0% of values", 'suggesting_rule': 'FractionalCategoricalRangeRule(0.9)', 'rule_description': 'If we see a categorical range for most values in a column, we suggest an IS IN (...) constraint that should hold for most values', 'code_for_constraint': '.isContainedIn("c", ["bar", "baz", "foo"], lambda x: x >= 0.99, "It should be above 0.99!")'}

{'constraint_name': 'CompletenessConstraint(Completeness(c,None))', 'column_name': 'c', 'current_value': 'Completeness: 0.9', 'description': "'c' has less than 29% missing values", 'suggesting_rule': 'RetainCompletenessRule()', 'rule_description': 'If a column is incomplete in the sample, we model its completeness as a binomial variable, estimate a confidence interval and use this to define a lower bound for the completeness', 'code_for_constraint': '.hasCompleteness("c", lambda x: x >= 0.71, "It should be above 0.71!")'}