其他分享
首页 > 其他分享> > doris stream load 导入

doris stream load 导入

作者:互联网

官网地址:https://doris.apache.org/zh-CN/administrator-guide/load-data/stream-load-manual.html#基本原理

doris 可以通过insert into 语句插入单条,或者批量插入,但是正式环境不推荐。

通过http client 进行数据导入

1.curl 格式 ,这个格式也是尝试了好多次 8030 是FE的http端口,8040是BE的http端口

注意文件的内容最后一样不要有空行,光标不能在空行

 

密码和用户名,都是doris的用户名和密码,不是服务器的。

-T 后面是上传文件的路基,加上后缀名。我看官网上没有加,

-H label 保证每次不重复 多个参数的时候 一个个的写

  //密码为fe的密码
  curl --location-trusted -u root -T /home/label.csv -H "label:label12"  http://192.168.0.18:8030/api/example_db/table4/_stream_load  
  curl --location-trusted -u root -T /home/label.csv -H "label:label14" -H "column_separator:,"  http://192.168.0.18:8030/api/example_db/table3/_stream_load

 

2.http client  

   1.doc 文件

      FE的请求老是报 no valid Basic authorization  网上说的307 跳转导致的 我这边是直接调用的BE 的机器

 public static void StreamLoadBEDoc()
        {
            string result = "";
            string host = "192.168.0.74";//be
            int port = 8040;//be接口 fe8030
            string database = "example_db";
            string table = "table3";
            string user = "root";
            string passwd = "";
            string load_file_name = "E:/label.csv";
            string label = Guid.NewGuid().ToString();
            string loadUrl = string.Format("http://{0}:{1}/api/{2}/{3}/_stream_load", host, port, database, table);
            HttpWebRequest request = BuildRequestDoc(loadUrl, user, passwd, label);
            //文件数据
            FileStream rdr = new FileStream(load_file_name, FileMode.Open);
            request.ContentLength = rdr.Length;
            Stream reqStream = request.GetRequestStream();
            byte[] inData = new byte[rdr.Length];
            int bytesRead = rdr.Read(inData, 0, Convert.ToInt32(rdr.Length));
            reqStream.Write(inData, 0, Convert.ToInt32(rdr.Length));
            rdr.Close();
            WebResponse response = request.GetResponse();
            Stream stream = response.GetResponseStream();
            if (stream != null)
            {

                var reader = new StreamReader(stream, Encoding.UTF8);
                result = reader.ReadToEnd();
                stream.Dispose();
                stream = null;
                reader.Dispose();
                reader = null;
            }
            request.Abort();
            request = null;
            response.Dispose();
            response = null;
        }
 public static HttpWebRequest BuildRequestDoc(string url, string user, string passwd, string label)
        {
            HttpWebRequest client = (HttpWebRequest)WebRequest.Create(url);
            client.Headers.Set("Authorization", "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes($"{user}:{passwd}")));
            client.Headers.Set("Expect", "100-continue");//固定
            client.Headers.Set("label", label);//labid 保持唯一
            client.Headers.Set("column_separator", ",");//列分隔符
            client.Headers.Set("Content-Type", "text/plain; charset=UTF-8");
            client.Method = HttpMethod.Put.Method;//put请求

            client.ServicePoint.ConnectionLimit = int.MaxValue; //加上请求设置,提高效率
            client.ServicePoint.Expect100Continue = false;
            client.ServicePoint.UseNagleAlgorithm = false;
            client.AllowWriteStreamBuffering = false;
            client.Proxy = null; //不使用代理
            client.KeepAlive = false; //不建立持久性连接
            return client;
        }

  2.json 传输

     json 和doc 差不多

     

 public static void StreamLoadBEJson(string jsonData)
        {
            string result = "";
            string host = "192.168.0.74";//be
            int port = 8040;//be接口
            string database = "example_db";
            string table = "table3";
            string user = "root";
            string passwd = "";
            string label = Guid.NewGuid().ToString();
            string loadUrl = string.Format("http://{0}:{1}/api/{2}/{3}/_stream_load", host, port, database, table);
            HttpWebRequest request = BuildRequestJson(loadUrl, user, passwd, label);
            //文件数据
            Stream outstream;
            byte[] _buffer = Encoding.GetEncoding("utf-8").GetBytes(jsonData);
            outstream = request.GetRequestStream();
            outstream.Write(_buffer, 0, _buffer.Length);
            outstream.Close();

            WebResponse response = request.GetResponse();
            Stream stream = response.GetResponseStream();
            if (stream != null)
            {
                var reader = new StreamReader(stream, Encoding.UTF8);
                result = reader.ReadToEnd();
                stream.Dispose();
                stream = null;
                reader.Dispose();
                reader = null;
            }
            request.Abort();
            request = null;
            response.Dispose();
            response = null;
        }
 public static HttpWebRequest BuildRequestJson(string url, string user, string passwd, string label)
        {
            HttpWebRequest client = (HttpWebRequest)WebRequest.Create(url);
            client.Headers.Set("Authorization", "Basic " + Convert.ToBase64String(Encoding.UTF8.GetBytes($"{user}:{passwd}")));
            client.Headers.Set("Expect", "100-continue");//固定
            client.Headers.Set("label", label);//labid 保持唯一
            client.Headers.Set("column_separator", ",");//列分隔符
            client.Headers.Set("Content-Type", "text/plain; charset=UTF-8");
            client.Method = HttpMethod.Put.Method;//put请求
            client.Headers.Set("format", "json"); //json数据
            client.Headers.Set("strip_outer_array", "true");//序列化是数组的json格式
            client.ServicePoint.ConnectionLimit = int.MaxValue; //加上请求设置,提高效率
            client.ServicePoint.Expect100Continue = false;
            client.ServicePoint.UseNagleAlgorithm = false;
            client.AllowWriteStreamBuffering = false;
            client.Proxy = null; //不使用代理
            client.KeepAlive = false; //不建立持久性连接
            return client;
        }

 

返回格式

{
    "TxnId": 1003,
    "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
    "Status": "Success",
    "ExistingJobStatus": "FINISHED", // optional
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 40888898,
    "LoadTimeMs": 2144,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 325,
    "WriteDataTimeMs": 1933,
    "CommitAndPublishTimeMs": 106,
    "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}

status :success 就成功了

 

唯一遗憾的是FE的链接没有测试通过。不知道有没有老铁解决了。

标签:load,Set,string,stream,label,Headers,client,doris
来源: https://www.cnblogs.com/elsons/p/15801156.html