hive导致表锁的SQL和租户信息获取
作者:互联网
hive导致表锁的SQL和租户信息获取
多租户平台存在同一个表有多个租户具有访问权限,hive提供锁机制,保障数据的安全。由此会引发由于某个租户占用锁时间较长,其他租户作业滞后,如果将导致表锁的SQL和租户信息截取到,可以提供给局方进行业务流程的优化,加快作业执行效率。本文档记录一个可实现的方法供参考
获取表锁信息
场景:hive使用分布式协调服务(Zookeeper)提供的分布式锁,hive底层由mapreduce执行,yarn统一资源调度,本文档由python语言实现
关键代码
# xxxx
# 2021/11/4 8:54
import configparser
import os
import time
from kazoo.client import KazooClient
import ConnMysqlTmpl
'''
#获取zookeeper集群信息
ZookeeperInfo.ini文件为:
[zookeeper-conn]
url=zk_host_name:port
'''
def zookeeper_info():
cf = configparser.ConfigParser()
pass1 = os.path.dirname(os.path.abspath('.'))
cf.read(pass1 + "/config/ZookeeperInfo.ini")
items = cf.items("zookeeper-conn")
list = []
for line in items:
list.append(line[1])
return list
'''
#定义一个zookeeper类,实现zk集群的链接和关闭
'''
class Zookeeper:
def connection(self,zk_url):
self.zk_url=zk_url
try:
self.conn_path=KazooClient(self.zk_url)
self.conn_path.start()
return self.conn_path
except Exception as e:
print(f"SSH连接异常, 错误如下: {e}")
def conn_close(self):
self.conn_path.stop()
print("已关闭Zookeeper连接...")
'''
#获取需要监控的表
HiveMonLockInfo文件格式随意,只要在zk中拿到需要监控的表路径即可
'''
def monitor_info():
filename = 'datainfo/HiveMonLockInfo'
if os.path.exists(filename):
with open(filename, 'r', encoding='utf8') as rfile:
lines = rfile.readlines()
return lines
else:
print("未找到文件:【 {0} 】信息,请联系后台管理员处理".format(filename))
'''
#获取表锁列表
'''
def get_lock():
zk=Zookeeper()
zk_info = zookeeper_info()
zk_conn=zk.connection(zk_info[0])
table_info=monitor_info()
locks = []
for table in table_info:
if zk_conn.exists("/hive_zookeeper_namespace/" + table.rstrip('\n')):
lock_name = zk_conn.get_children("/hive_zookeeper_namespace/" + table.rstrip('\n'))
for lock in lock_name:
if "LOCK" in lock:
locks.append("/hive_zookeeper_namespace/"+table.rstrip('\n')+"/"+lock)
else:
pass
else:
print("/hive_zookeeper_namespace/" + table.rstrip('\n'), "无锁情况", get_time())
zk.conn_close()
return locks
'''
#获取每个表锁的具体信息
table_locks_info为入库执行的语句
'''
def get_lock_info():
zk = Zookeeper()
zk_info = zookeeper_info()
zk_conn = zk.connection(zk_info[0])
all_locks = get_lock()
print(all_locks)
if all_locks:
for path in all_locks:
#获取所有锁node信息
all_info=zk_conn.get(path)
#获取query_id和sql语句
all_sql_info = str(all_info).split("ZnodeStat")[0]
query_id = (all_sql_info.split(":")[0])[3:]
sql_info_tmp = all_sql_info.split(":")[3]
sql_info_tmp1=sql_info_tmp.replace("\n","\\n")
sql_info=sql_info_tmp1.replace("\'","\\'")
#获取节点创建信息
time_info = str(all_info).split("ZnodeStat")[1]
create_time = time_info.split(",")[2]
#时间戳转换
timeStamp = int(create_time.split("=")[1])
'''
爬取下来的时间戳长度都是13位的数字,而time.localtime的参数要的长度是10位,所以我们需要将其/1000并取整即可
'''
timeArray = time.localtime(timeStamp/1000)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
try:
ConnMysqlTmpl.table_locks_info(query_id,sql_info,otherStyleTime,path.split("/")[3])
except Exception as e:
print(f"执行SQL失败,请排查:{e}")
else:
print("所监控的表无锁情况~", get_time())
zk.conn_close()
def get_time():
create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
return create_time
if __name__ == '__main__':
get_lock_info()
主要说明
#获取所有锁node信息
#获取query_id和sql语句
all_sql_info = str(all_info).split("ZnodeStat")[0]
query_id = (all_sql_info.split(":")[0])[3:]
sql_info_tmp = all_sql_info.split(":")[3]
sql_info_tmp1=sql_info_tmp.replace("\n","\\n")
sql_info=sql_info_tmp1.replace("\'","\\'")
/SQL语句和SQL语句的query_id/
获取运行作业的query_id和租户
关键代码
import requests
import configparser
import os
import AppSourceInfo
import ConnMysqlTmpl
from json import JSONDecodeError
from requests.auth import HTTPBasicAuth
'''
#获取配置信息:访问url
'''
def get_app_info():
cf=configparser.ConfigParser()
pass1=os.path.dirname(os.path.abspath("."))
cf.read(pass1 + "/config/AppExecSqlInfo.ini")
items = cf.items("appexecsql-conn")
list = []
for line in items:
list.append(line[1])
return list
'''
#获取运行状态下作业的query_id和租户
'''
def job_info():
conn_info=get_app_info()
app_ids = AppSourceInfo.get_each_app1()
if app_ids:
for line in app_ids:
try:
all_info_tmp=requests.get(conn_info[0]+line+"/ws/v1/mapreduce/jobs/"+str(line).replace("application","job")+"/conf",
auth=HTTPBasicAuth(conn_info[1],conn_info[2]))
try:
info_json=all_info_tmp.json()
result=info_json["conf"]
all_info=result["property"]
list=[]
for line in all_info:
'''
if line["name"] == "hive.query.string":
#list.append(str(line["value"]))
sql_info_tmp = line["value"]
if sql_info_tmp:
sql_info_tmp1 = sql_info_tmp.replace("\n", "\\n")
sql_info = sql_info_tmp1.replace("\'", "\\'")
list.append(sql_info)
else:
sql_info = "0"
list.append(sql_info)
elif line["name"] == "mapreduce.jdbc.input.query":
#list.append(str(line["value"]))
jdbc_query_tmp = line["value"]
#print(type(jdbc_query_tmp))
if len(jdbc_query_tmp):
jdbc_query_tmp1 = jdbc_query_tmp.replace("\n", "\\n")
jdbc_query = jdbc_query_tmp1.replace("\'", "\\'")
list.append(jdbc_query)
else:
jdbc_query = "0"
list.append(jdbc_query)
'''
#获取query_id
if line["name"] == "hive.query.id":
#list.append(str(line["value"]))
query_id_tmp = line["value"]
if query_id_tmp:
query_id = line["value"]
list.append(query_id)
else:
query_id = "0"
list.append(query_id)
#获取执行作业的队列名称,就能对应到租户
elif line["name"] == "mapreduce.job.queuename":
#list.append(str(line["value"]))
user_info = (line["source"])[1]
user = str(user_info).split("/")
queue_name = user[4]
list.append(queue_name)
else:
pass
print(list)
print(len(list))
try:
pass
ConnMysqlTmpl.user_app_query_info1(list)
except Exception as e:
print(f"数据入库失败,请排查:{e}")
except JSONDecodeError as e:
print("{0}作业非MapReduce任务,请排查~".format(line))
except Exception as e:
print("获取app信息失败,请排查 {0}".format(e))
else:
print("获取app_id信息失败,请排查~")
主要说明
· 执行前(ACCEPTED)状态下的作业,内容编码存在异常,无法通过yarn rest api进行读取
· 执行完成的作业(FINISHED)的作业无异常情况下都会及时的释放锁,如果集群开始了日志服务,可以访问jobhistoryserver服务读取对应日志
获取运行状态下作业的信息,链接为:
/http://resouecemanage:port/applicationID/ws/v1/mapreduce/jobs/jobId/conf/
将返回的数据进行json格式转换:
info_json=all_info_tmp.json() result=info_json["conf"] all_info=result["property"]
获取query_id:
if line["name"] == "hive.query.id":
#list.append(str(line["value"]))
query_id_tmp = line["value"]
获取资源队列名称:
line["name"] == "mapreduce.job.queuename":
#list.append(str(line["value"]))
user_info = (line["source"])[1]
user = str(user_info).split("/")
queue_name = user[4]
list.append(queue_name)
效果呈现
SELECT
a.query_id,
a.queue_name,
b.sql_info,
b.table_name,
b.ctime
FROM
user_app_query_info a
JOIN zookeeper_lock_info b
WHERE
a.query_id = b.query_id
两个表根据query_id进行关联,获取到锁表情况下,锁表的SQL语句和租户信息。
标签:info,SQL,list,hive,sql,query,line,表锁,id 来源: https://www.cnblogs.com/zxyax132620/p/15514885.html