数据中台之数据血缘的具体实现
作者:互联网
目录
前言
前置知识
Hive Hook
Neo4j基础入门
数据血缘数据结构设计
数据录入测试代码
主逻辑实现
参考文章
前言
前置知识
Hive Hook
其实Apache Atlas对于Hive的元数据管理,使用的是Hive的Hooks
Pre-semantic-analyzer hooks
:在Hive在查询字符串上运行语义分析器之前调用。
Post-semantic-analyzer hooks
:在Hive在查询字符串上运行语义分析器之后调用。
Pre-driver-run hooks
:在driver执行查询之前调用。
Post-driver-run hooks
:在driver执行查询之后调用。
Pre-execution hooks
:在执行引擎执行查询之前调用。请注意,这个目的是此时已经为Hive准备了一个优化的查询计划。
Post-execution hooks
:在查询执行完成之后以及将结果返回给用户之前调用。
Failure-execution hooks
:当查询执行失败时调用。
下面看下我们将会用到的核心类
ExecuteWithHookContext可以实现3种类型的hook,分别是
pre-execution 在执行引擎执行查询之前调用
post-execution 在执行引擎执行查询之后调用
execution-failure 在执行引擎执行查询失败之后调用
public class MyExecuteWithHookContext implements ExecuteWithHookContext {
public void run(HookContext hookContext) throws Exception {
if(hookContext.getHookType()==HookContext.HookType.PRE_EXEC_HOOK){
}else if(hookContext.getHookType()==HookContext.HookType.POST_EXEC_HOOK){
}else if(hookContext.getHookType()==HookContext.HookType.ON_FAILURE_HOOK){
}
}
}
Neo4j基础入门
Neo4j
是一个高性能的,NOSQL
图形数据库,它将结构化数据存储在网络上而不是表中。它是一个嵌入式的、基于磁盘的、具备完全的事务特性的Java
持久化引擎,但是它将结构化数据存储在网络(从数学角度叫做图)上而不是表中。Neo4j
也可以被看作是一个高性能的图引擎,该引擎具有成熟数据库的所有特性。程序员工作在一个面向对象的、灵活的网络结构下而不是严格、静态的表中——但是他们可以享受到具备完全的事务特性、企业级的数据库的所有好处。
下面介绍一些基础概念名词(见图1)
1. 标签(Label)
在Neo4j
中,一个节点可以有一个以上的标签,从现实世界的角度去看,一个标签可以认为节点的某个类别,比如BOOK
、MOVIE
等等。
2. 节点(Node)
节点是指一个实实在在的对象,这个对象可以有好多的标签,表示对象的种类,也可以有好多的属性,描述其特征,节点与节点之间还可以形成多个有方向(或者没有方向)的关系。
3. 关系(Relationship)
用来描述节点与节点之间的关系,关系可以拥有属性。
4. 属性(Property)
描述节点的特性,采用的是Key-Value
结构,可以随意设定来描述节点的特征。
常用的查询语法(CQL)
1.查找指定节点、指定属性、指定关系的节点、关系
# MATCH 匹配命令
# return 后面的别名p还可以利用as 设置指定的返回值名称,如 p as userName
match (p:PERSON {name:"Mask"})-[r]-(n) return p,r,n
2. 对查找结果进行排序order by,并限制返回条数 limit
order by关键字与SQL里面是一样的操作,后面跟上需要根据排序的关键字,limit的操作是指定输出前几条
# 这里利用order by来指定返回按照Person.name来排序
# limit 表示只返回前3条数据
match(p:Person) return p order by p.name limit 3
3.删除节点delete命令
# 删除指定条件的节点 # 先通过匹配关键字match找到匹配元素,然后通过delete关键字指定删除
match(p:PERSON {name:"teacher_wange"}) delete p
# 删除节点和节点相关的关系
match (p:Person {name:"lisi"})-[r]-() delete p,r
数据血缘数据结构设计
neo4j
import org.neo4j.ogm.annotation.*;
@RelationshipEntity(type = "blood")
public class Blood {
public Blood() {// 从 Neo4j API 2.0.5开始需要无参构造函数
}
@Id
@GeneratedValue
private Long id;
private String name;
private String fromKey;
private String toKey;
private String fromTable;
private String toTable;
@StartNode // 关系开始的节点
private TableNode formNode;
@EndNode // 关系目标的节点
private TableNode toNode;
public Blood(String fromKey, String toKey, String fromTable, String toTable) {
this.fromKey = fromKey;
this.toKey = toKey;
this.fromTable = fromTable;
this.toTable = toTable;
this.name=fromTable+"->"+toTable;
}
//get and set
...
}
import org.neo4j.ogm.annotation.GeneratedValue;
import org.neo4j.ogm.annotation.Id;
import org.neo4j.ogm.annotation.NodeEntity;
import java.io.Serializable;
@NodeEntity
public class TableNode implements Serializable {
@Id
@GeneratedValue
private Long id;
private String tableName;
public TableNode( ) {// 从 Neo4j API 2.0.5开始需要无参构造函数
}
//get and set
...
}
mysql
public class DataBlood {
private Long id;
private String fromTable;
private String toTable;
private String fromKey;
private String toKey;
private String sqls;
//get and set
...
}
数据录入测试代码
<properties>
<ogm.properties>ogm-bolt.properties</ogm.properties>
</properties>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.3</version>
</dependency>
<!-- Neo4j OGM -->
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-ogm-core</artifactId>
<version>${neo4j.ogm.version}</version>
</dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-ogm-api</artifactId>
<version>${neo4j.ogm.version}</version>
</dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-ogm-bolt-driver</artifactId>
<version>${neo4j.ogm.version}</version>
</dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-ogm-test</artifactId>
<version>${neo4j.ogm.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-security-enterprise</artifactId>
</exclusion>
</exclusions>
</dependency>
import com.test.mapper.DataBloodMapper;
import com.test.po.Blood;
import com.test.po.DataBlood;
import com.test.po.TableNode;
import com.test.util.JsonUtil;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.ogm.config.*;
import org.neo4j.ogm.session.Session;
import org.neo4j.ogm.session.SessionFactory;
import java.io.IOException;
import java.io.Reader;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class HiveHookTest{
private Session neo4jSession;
private static SqlSession sqlSession;
@Before
public void init() throws IOException {
neo4j/
ConfigurationSource configurationSource = new ClasspathConfigurationSource("application.properties");
Properties properties = configurationSource.properties();
String uri = properties.get("spring.data.neo4j.uri") == null ? null : properties.get("spring.data.neo4j.uri").toString();
String username = properties.get("spring.data.neo4j.username") == null ? null : properties.get("spring.data.neo4j.username").toString();
String password = properties.get("spring.data.neo4j.password") == null ? null : properties.get("spring.data.neo4j.password").toString();
Configuration config = new Configuration.Builder()
.uri(uri)
.credentials(username, password)
.build();
SessionFactory sessionFactory = new SessionFactory(config, "com.dataqin.po");
neo4jSession = sessionFactory.openSession();
mybatis/
Reader reader = null;
reader = Resources.getResourceAsReader("mybatis-config.xml");
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(reader);
sqlSession = sqlSessionFactory.openSession();
}
@Test
public void testSaveTableNode() throws Exception {
TableNode rootTable = new TableNode("rootTable");
TableNode tablename1 = new TableNode("tablename1");
TableNode tablename2 = new TableNode("tablename2");
TableNode tablename3 = new TableNode("tablename3");
neo4jSession.save(rootTable);
neo4jSession.save(tablename1);
neo4jSession.save(tablename2);
neo4jSession.save(tablename3);
Blood c1 = new Blood("test.rootTable", "test.tablename1", "rootTable", "tablename1");
c1.setFormNode(rootTable);
c1.setToNode(tablename1);
Blood c2 = new Blood("test.rootTable", "test.tablename2", "rootTable", "tablename2");
c2.setFormNode(rootTable);
c2.setToNode(tablename2);
Blood c3 = new Blood("test.tablename2", "test.tablename3", "tablename2", "tablename3");
c3.setFormNode(tablename2);
c3.setToNode(tablename3);
neo4jSession.save(c1);
neo4jSession.save(c2);
neo4jSession.save(c3);
}
@Test
public void saveDataBlood() {
DataBlood dataBlood = new DataBlood();
dataBlood.setFromKey("test.rootTable");
dataBlood.setToKey("test.tablename1");
dataBlood.setFromTable("rootTable");
dataBlood.setToTable("tablename1");
dataBlood.setToKey("rootTable");
dataBlood.setSqls("select * from a");
// 获取UserMapper接口
try {
DataBloodMapper dataBloodMapper = sqlSession.getMapper(DataBloodMapper.class);
System.out.println(dataBloodMapper.save(dataBlood));
sqlSession.commit();
} finally {
// 关闭SqlSession
sqlSession.close();
}
}
}
执行上述的testSaveTableNode后neo4j生成如下图
图1主逻辑实现
import com.dataqin.util.Constants;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.session.SessionState;
public class MyExecuteWithHookContext implements ExecuteWithHookContext {
/**
* ExecuteWithHookContext可以实现3种类型的hook,分别是
* pre-execution 在执行引擎执行查询之前调用
* post-execution 在执行引擎执行查询之后调用
* execution-failure 在执行引擎执行查询失败之后调用
* @param hookContext
* @throws Exception
*/
public void run(HookContext hookContext) throws Exception {
if(hookContext.getHookType()==HookContext.HookType.POST_EXEC_HOOK){
//采集sql的主逻辑
System.out.println("***Hello from the hook !!***");
String user = SessionState.getUserFromAuthenticator();
System.out.println(user);
SessionState session = SessionState.get();
System.out.println("CMD "+session.getCmd());
if(StringUtils.isNotEmpty(session.getCmd())){
//只有同时包含insert和select的sql语句需要采集
session.setCmd(session.getCmd().toLowerCase());
if(session.getCmd().contains(Constants.INSERT) && session.getCmd().contains(Constants.SELECT)){
//发送kafka消息通知消费端把数据入库到neo4j和mysql
}
}
}
}
}
在hive配置文件中增加如下配置
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook<value/>
</property>
1.通过Hook监听Hive的各种事件,比如创建表,修改表等,然后按同时包含insert和select的sql语句的数据推送到Kafka(防止数据产生过多未及时入库而造成的数据丢失)把上述代码打包丢入hive的lib包下就会在执行hive相关的产生数据血缘的语句时发送kafka消息。
2.写个项目专门消费kafka消息并存储数据血缘到neo4j和mysql中。
3.前端传入表名时后端调用如下代码查询neo4j获取血缘数据再按照和前端约定好的数据格式进行拼装返回给前端
@Test
public void findBloods() {
Map param = new HashMap<String, String>();
param.put("tableName", "rootTable");
Iterable<Blood> cl2 = neo4jSession.query(Blood.class, "match (x)-[r]-(y) where r.fromTable={tableName} return x,r,y", param);
for (Blood cl : cl2) {
System.out.println(JsonUtil.toJson(cl));
}
}
标签:String,import,private,ogm,中台,org,neo4j,血缘,数据 来源: https://blog.51cto.com/u_13270164/2997908