doris stream load 导入
作者:互联网
官网地址:https://doris.apache.org/zh-CN/administrator-guide/load-data/stream-load-manual.html#基本原理
doris 可以通过insert into 语句插入单条,或者批量插入,但是正式环境不推荐。
通过http client 进行数据导入
1.curl 格式 ,这个格式也是尝试了好多次 8030 是FE的http端口,8040是BE的http端口
注意文件的内容最后一样不要有空行,光标不能在空行
密码和用户名,都是doris的用户名和密码,不是服务器的。
-T 后面是上传文件的路基,加上后缀名。我看官网上没有加,
-H label 保证每次不重复 多个参数的时候 一个个的写
//密码为fe的密码 curl --location-trusted -u root -T /home/label.csv -H "label:label12" http://192.168.0.18:8030/api/example_db/table4/_stream_load curl --location-trusted -u root -T /home/label.csv -H "label:label14" -H "column_separator:," http://192.168.0.18:8030/api/example_db/table3/_stream_load
2.http client
1.doc 文件
FE的请求老是报 no valid Basic authorization 网上说的307 跳转导致的 我这边是直接调用的BE 的机器
public static void StreamLoadBEDoc() { string result = ""; string host = "192.168.0.74";//be int port = 8040;//be接口 fe8030 string database = "example_db"; string table = "table3"; string user = "root"; string passwd = ""; string load_file_name = "E:/label.csv"; string label = Guid.NewGuid().ToString(); string loadUrl = string.Format("http://{0}:{1}/api/{2}/{3}/_stream_load", host, port, database, table); HttpWebRequest request = BuildRequestDoc(loadUrl, user, passwd, label); //文件数据 FileStream rdr = new FileStream(load_file_name, FileMode.Open); request.ContentLength = rdr.Length; Stream reqStream = request.GetRequestStream(); byte[] inData = new byte[rdr.Length]; int bytesRead = rdr.Read(inData, 0, Convert.ToInt32(rdr.Length)); reqStream.Write(inData, 0, Convert.ToInt32(rdr.Length)); rdr.Close(); WebResponse response = request.GetResponse(); Stream stream = response.GetResponseStream(); if (stream != null) { var reader = new StreamReader(stream, Encoding.UTF8); result = reader.ReadToEnd(); stream.Dispose(); stream = null; reader.Dispose(); reader = null; } request.Abort(); request = null; response.Dispose(); response = null; }
public static HttpWebRequest BuildRequestDoc(string url, string user, string passwd, string label) { HttpWebRequest client = (HttpWebRequest)WebRequest.Create(url); client.Headers.Set("Authorization", "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes($"{user}:{passwd}"))); client.Headers.Set("Expect", "100-continue");//固定 client.Headers.Set("label", label);//labid 保持唯一 client.Headers.Set("column_separator", ",");//列分隔符 client.Headers.Set("Content-Type", "text/plain; charset=UTF-8"); client.Method = HttpMethod.Put.Method;//put请求 client.ServicePoint.ConnectionLimit = int.MaxValue; //加上请求设置,提高效率 client.ServicePoint.Expect100Continue = false; client.ServicePoint.UseNagleAlgorithm = false; client.AllowWriteStreamBuffering = false; client.Proxy = null; //不使用代理 client.KeepAlive = false; //不建立持久性连接 return client; }
2.json 传输
json 和doc 差不多
public static void StreamLoadBEJson(string jsonData) { string result = ""; string host = "192.168.0.74";//be int port = 8040;//be接口 string database = "example_db"; string table = "table3"; string user = "root"; string passwd = ""; string label = Guid.NewGuid().ToString(); string loadUrl = string.Format("http://{0}:{1}/api/{2}/{3}/_stream_load", host, port, database, table); HttpWebRequest request = BuildRequestJson(loadUrl, user, passwd, label); //文件数据 Stream outstream; byte[] _buffer = Encoding.GetEncoding("utf-8").GetBytes(jsonData); outstream = request.GetRequestStream(); outstream.Write(_buffer, 0, _buffer.Length); outstream.Close(); WebResponse response = request.GetResponse(); Stream stream = response.GetResponseStream(); if (stream != null) { var reader = new StreamReader(stream, Encoding.UTF8); result = reader.ReadToEnd(); stream.Dispose(); stream = null; reader.Dispose(); reader = null; } request.Abort(); request = null; response.Dispose(); response = null; }
public static HttpWebRequest BuildRequestJson(string url, string user, string passwd, string label) { HttpWebRequest client = (HttpWebRequest)WebRequest.Create(url); client.Headers.Set("Authorization", "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes($"{user}:{passwd}"))); client.Headers.Set("Expect", "100-continue");//固定 client.Headers.Set("label", label);//labid 保持唯一 client.Headers.Set("column_separator", ",");//列分隔符 client.Headers.Set("Content-Type", "text/plain; charset=UTF-8"); client.Method = HttpMethod.Put.Method;//put请求 client.Headers.Set("format", "json"); //json数据 client.Headers.Set("strip_outer_array", "true");//序列化是数组的json格式 client.ServicePoint.ConnectionLimit = int.MaxValue; //加上请求设置,提高效率 client.ServicePoint.Expect100Continue = false; client.ServicePoint.UseNagleAlgorithm = false; client.AllowWriteStreamBuffering = false; client.Proxy = null; //不使用代理 client.KeepAlive = false; //不建立持久性连接 return client; }
返回格式
{ "TxnId": 1003, "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee", "Status": "Success", "ExistingJobStatus": "FINISHED", // optional "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }
status :success 就成功了
唯一遗憾的是FE的链接没有测试通过。不知道有没有老铁解决了。
标签:load,Set,string,stream,label,Headers,client,doris 来源: https://www.cnblogs.com/elsons/p/15801156.html