其他分享
首页 > 其他分享> > griffin measures【griffin官方文档】

griffin measures【griffin官方文档】

作者:互联网

目录

注1:0.5.0版本

注2:来自官方原文档,方便查阅。做了一些标记,突出重点。

Measures

measures to calculate data quality metrics.

Accuracy measure

accuracy measure is to compare source and target content, given corresponding mapping relationship.

Introduction

How to measure accuracy dimension of one target dataset T, given source of truth as golden dataset S.

To measure accuracy quality of target dataset T, basic approach is to calculate discrepancy between target and source datasets by going through their contents,
examining whether all fields are exactly matched as below,

                Count(source.field1 == target.field1 && source.field2 == target.field2 && ...source.fieldN == target.fieldN)
Accuracy  =     ---------------------------------------------------------------------------------------------------------------
                Count(source)

Since two datasets are too big to fit in one box, so our approach is to leverage map reduce programming model by distributed computing.

The real challenge is how to make this comparing algorithm generic enough to release data analysts and data scientists from coding burdens, and at the same time, it keeps flexibility to cover most of accuracy requirements.

Traditional way is to use SQL based join to calculate this, like scripts in hive.

But this SQL based solution can be improved since it has not considered unique natures of source dataset and target dataset in this context.

Our approach is to provide a generic accuracy measure, after taking into consideration of special natures of source dataset and target dataset.

Our implementation is in scala, leveraging scala's declarative capability to cater for various requirements, and running in spark cluster.

To make it concrete, schema for Source is as below

|-- uid: string (nullable = true)
|-- site_id: string (nullable = true)
|-- page_id: string (nullable = true)
|-- curprice: string (nullable = true)
|-- itm: string (nullable = true)
|-- itmcond: string (nullable = true)
|-- itmtitle: string (nullable = true)
|-- l1: string (nullable = true)
|-- l2: string (nullable = true)
|-- leaf: string (nullable = true)
|-- meta: string (nullable = true)
|-- st: string (nullable = true)
|-- dc: string (nullable = true)
|-- tr: string (nullable = true)
|-- eventtimestamp: string (nullable = true)
|-- cln: string (nullable = true)
|-- siid: string (nullable = true)
|-- ciid: string (nullable = true)
|-- sellerid: string (nullable = true)
|-- pri: string (nullable = true)
|-- pt: string (nullable = true)
|-- dt: string (nullable = true)
|-- hour: string (nullable = true)

and schema for target is below as

|-- uid: string (nullable = true)
|-- page_id: string (nullable = true)
|-- site_id: string (nullable = true)
|-- js_ev_mak: string (nullable = true)
|-- js_ev_orgn: string (nullable = true)
|-- curprice: string (nullable = true)
|-- itm: string (nullable = true)
|-- itmcond: string (nullable = true)
|-- itmtitle: string (nullable = true)
|-- l1: string (nullable = true)
|-- l2: string (nullable = true)
|-- leaf: string (nullable = true)
|-- meta: string (nullable = true)
|-- st: string (nullable = true)
|-- dc: string (nullable = true)
|-- tr: string (nullable = true)
|-- eventtimestamp: string (nullable = true)
|-- cln: string (nullable = true)
|-- siid: string (nullable = true)
|-- ciid: string (nullable = true)
|-- sellerid: string (nullable = true)
|-- product_ref_id: string (nullable = true)
|-- product_type: string (nullable = true)
|-- is_bu: string (nullable = true)
|-- is_udid: string (nullable = true)
|-- is_userid: string (nullable = true)
|-- is_cguid: string (nullable = true)
|-- dt: string (nullable = true)
|-- hour: string (nullable = true)

Accuracy Measure In Deep

【1.把原始数据转换成键值对格式 2.将用户定义的规则转换成 scala 程序,进行处理】

Pre-Process phase (transform raw data)

For efficient, we will convert our raw record to some key-value pair , after that, we just need to compare values which have the same key.

Since two dataset might have different names for the same field, and fields might come in different order, we will keep original information in associative map for later process.

The records will look like,

((uid,eventtimestamp)->(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)

and to track where are the data from, we add one labeling tag here.

for source dataset, we add label tag "__source__" and for target dataset, we add label tag "__target__".

((uid,eventtimestamp)->("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
((uid,eventtimestamp)->("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))

Ideally, in dataset, applying those composite keys, we should be able to get unique records for every composite key.

but the reality is , for various unknown reasons, dataset might have duplicate records given one unique composite key.
To cover this problem, and to track all records from source node, we will append all duplicate records in a list during this step.

The record will look like after pre process ,

((uid,eventtimestamp)->List(("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))

To save all records from target node, we will insert all records in a set during this step.

The record will look like after pre process ,

((uid,eventtimestamp)->Set(("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))

【List:有序的,可重复的;Set:是不可重复的,无序的】

Aggregate and Comparing phase

Union source and target together, execute one aggregate for all, we can apply rules defined by users to check whether records in source and target are matched or not.

aggregate { (List(sources),Set(targets)) =>
 if(foreach element from List(sources) in Set(targets)) emit true
 else emit false
}

We can also execute one aggregate to count the mismatch records in source

aggregate (missedCount = 0) { (List(sources), Set(targets)) =>
 foreach (element in List(sources)) {
  if (element in Set(targets)) continue
  else missedCount += 1
 }
}

Benefits

Further discussion

标签:griffin,string,nullable,--,measures,value,source,文档,true
来源: https://www.cnblogs.com/wasaier/p/15751876.html