编程语言
首页 > 编程语言> > ElasticSearch-Java-low-level-rest-client官方文档翻译

ElasticSearch-Java-low-level-rest-client官方文档翻译

作者:互联网

       人肉翻译,非谷歌机翻,部分地方添加了个人的理解,并做了分割,如有错误请在评论指出。转载请指明原链接,尊重个人劳动成果。

       High-Level-Rest-Client基于Low-Level-Rest-Client封装,Client配置方面基于Low-Level,而API使用上基于High-Level。

       翻译的版本为6.5.4版本的Elasticsearch,部分不重要的内容(如Maven/Gradle坐标、License等不在本文出现)。

       在实际的配置过程中,查看源码发现LowLevelClient完全基于HttpAsyncClient来实现的,内部具体组装Client的细节,如果有时间会写另外一篇博文分享出来。

  低级别rest-client包含了如下特性:  

1、初始化客户端

1.1.基本设置

        RestClient实例只需要通过RestClientBuilder类进行构建就行,如下所示,可以提供多个连接实例的HttpHost(每一个HttpHost都是一个结点Node):
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"),
3     new HttpHost("localhost", 9201, "http")).build();
        RestClient是线程安全的,并且和它依附的应用有相同的生命周期。需要注意的是,当不需要Client的时候,需要关闭它以便正确的释放其持有的资源(这些资源有底层的http client实例与它的线程):
1 restClient.close();

 

 1.2.参数化设置

   RestClientBuilder同样允许参数化的设置如下参数来用于构建RestClient实例:
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
3 builder.setDefaultHeaders(defaultHeaders);
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setMaxRetryTimeoutMillis(10000); 
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setFailureListener(new RestClient.FailureListener() {
3     @Override
4     public void onFailure(Node node) {
5         
6     }
7 });
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setRequestConfigCallback(
3     new RestClientBuilder.RequestConfigCallback() {
4         @Override
5         public RequestConfig.Builder customizeRequestConfig(
6                 RequestConfig.Builder requestConfigBuilder) {
7             return requestConfigBuilder.setSocketTimeout(10000); 
8         }
9     });
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
3         @Override
4         public HttpAsyncClientBuilder customizeHttpClient(
5                 HttpAsyncClientBuilder httpClientBuilder) {
6             return httpClientBuilder.setProxy(
7                 new HttpHost("proxy", 9000, "http"));  
8         }
9     });

 

2、执行请求

2.1.同步与异步API

        一旦Client创建,就可以通过performRequest或performRequestAsync来执行请求: (1)performRequest是同步请求,它会阻塞调用线程,在请求成功或者请求失败抛出异常之后,返回Response;
1 Request request = new Request(
2     "GET",   //HTTP方法,支持GET、POST、HEAD等
3     "/");    //es服务端点endpoint
4 Response response = restClient.performRequest(request);
(2)performRequestAsync是异步请求,它接收一个ResponseListener参数,当请求成功或者请求失败抛出异常之后,该Listener会被调起并传入Response或Exception到相应方法;
 1 Request request = new Request(
 2     "GET",   //HTTP方法,支持GET、POST、HEAD等
 3     "/");    //es服务端点endpoint
 4 restClient.performRequestAsync(request, new ResponseListener() {
 5     @Override
 6     public void onSuccess(Response response) {
 7         //成功回调
 8     }
 9 
10     @Override
11     public void onFailure(Exception exception) {
12         //失败回调
13     }
14 });
        另外,你可以向Request对象中添加请求参数,如下:
1 request.addParameter("pretty", "true");

        如果将HttpEntity设置为String,那么ContentType会被默认设置为application/json:

2.2.请求可选项

        RequestOptions这个类用于设置请求可选项,我们建议在同一个应用中,对于使用同样的请求可选项的Request共享一个RequestOption实例,你可以通过单例来实现:
1 private static final RequestOptions COMMON_OPTIONS;
2 static {
3     RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
4     builder.addHeader("Authorization", "Bearer " + TOKEN);  //设置请求头的校验信息
5     builder.setHttpAsyncResponseConsumerFactory( //设置Response的缓冲池大小          
6         new HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024)); 
7     COMMON_OPTIONS = builder.build();
8 }
        配置好RequestOptions之后,我们在每一个共享该配置的Request中,简单设置一下该对象即可:
1 request.setOptions(COMMON_OPTIONS);

        如果有某些请求需要对OPTIONS做一定程度的变更,可以通过如下方式增量处理而不是重新写一遍设置代码:

1 RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
2 options.addHeader("cats", "knock things off of other things");
3 request.setOptions(options); 

2.3.批量并行异步请求

        下面是一个使用闭锁并行执行异步请求的例子。在实际的编程中,你一般会倾向于使用_bulk这个API来替代,当然了,下面的例子依然具有参考性:

 1 final CountDownLatch latch = new CountDownLatch(documents.length);
 2 for (int i = 0; i < documents.length; i++) {
 3     Request request = new Request("PUT", "/posts/doc/" + i);
 4     //let's assume that the documents are stored in an HttpEntity array
 5     request.setEntity(documents[i]);
 6     restClient.performRequestAsync(
 7             request,
 8             new ResponseListener() {
 9                 @Override
10                 public void onSuccess(Response response) {
11                     
12                     latch.countDown();
13                 }
14 
15                 @Override
16                 public void onFailure(Exception exception) {
17                     
18                     latch.countDown();
19                 }
20             }
21     );
22 }
23 latch.await();

 

3、读取响应

3.1.获取响应对象

        响应对象,无论是通过同步还是异步获取到的(同步通过API返回值获取,异步通过回调方法中的参数获取),都是同一个类的对象。该对象包装了http client原始的响应对象以及一些附加信息:
1 Response response = restClient.performRequest(new Request("GET", "/"));
2 RequestLine requestLine = response.getRequestLine(); 
3 HttpHost host = response.getHost(); 
4 int statusCode = response.getStatusLine().getStatusCode(); 
5 Header[] headers = response.getHeaders(); 
6 String responseBody = EntityUtils.toString(response.getEntity()); 
         如果执行请求失败的话,你将会得到一个异常,异常有以下几种: (1)IOException:通讯问题,例如SocketTimeoutException; (2)ResponseException:响应已经返回,但是其状态码不为2XX时将抛出这个异常。ResponseException源自于一个有效的http Response,因此该异常会将源Response暴露出来,并允许你获取它。         注意:HEAD请求如果返回了404,ResponseException是不会被抛出的,因为对于HEAD来说资源未找到也是该方法期望的返回值之一。所有其他的HTTP方法对于404返回码都会抛出一个ResponseException,除非在ignore参数中包含了404。ignore是一个特殊的参数,该参数不会发送给es,它包含了一个分割的error status code列表。当Response的状态码在这个列表中时,它会被当成Response而不是作为Exception。对于GET方法相关的API,这是一个相当有用的特性,比如GET查询一个文档返回404,我们期望其不返回一个异常,而是一个没有document的response对象。         注意:low-level-client并不会给出json相关的编组与解组API。你可以自由选择相关的库来进行json的编组与解组。         底层的Apache Async Http Client库附带了不同的org.apache.http.HttpEntity实现,这些实现允许以不同的格式(流,字节数组,字符串等)提供请求主体。至于读取响应体,HttpEntity#getContent方法很方便,它返回从先前缓冲的响应体读取的InputStream。作为替代方案,可以提供一个自定义的org.apache.http.nio.protocol.HttpAsyncResponseConsumer来控制字节的读取和缓冲方式。  

4、通用配置

        在初始化客户端中,我们看到了RestClientBuilder支持提供一个RequestConfigCallback与一个能够用于个性化异步请求的HttpClientConfigCallback。这些回调函数能够让你在不覆盖每一个RestClient初始化默认值的情况下修改某些指定行为。本小节我们描述一些需要为low-level-client添加额外配置的公共场景。

4.1.Timeouts

        在初始化客户端的参数化设置就提过了,可以通过如下形式进行设置:
 1 RestClientBuilder builder = RestClient.builder(
 2     new HttpHost("localhost", 9200))
 3     .setRequestConfigCallback(
 4         new RestClientBuilder.RequestConfigCallback() {
 5             @Override
 6             public RequestConfig.Builder customizeRequestConfig(
 7                     RequestConfig.Builder requestConfigBuilder) {
 8                 return requestConfigBuilder
 9                     .setConnectTimeout(5000) //默认值1S
10                     .setSocketTimeout(60000); //默认值30S
11             }
12         })
13     .setMaxRetryTimeoutMillis(60000); //设置完SocketTimeout后,一般需要联动修改这个值

4.2.线程数

        ApacheHttpAsyncClient基于IOReactor模式,在启动时会创建一个分发线程和一组工作线程,它们由Connection Manager使用,工作线程的默认数量与能够检测到的CPU核心数一致(Runtime.getRuntime().availableProcessors()的返回结果)。可以通过如下方法修改工作线程数:
 1 RestClientBuilder builder = RestClient.builder(
 2     new HttpHost("localhost", 9200))
 3     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 4         @Override
 5         public HttpAsyncClientBuilder customizeHttpClient(
 6                 HttpAsyncClientBuilder httpClientBuilder) {
 7             return httpClientBuilder.setDefaultIOReactorConfig(
 8                 IOReactorConfig.custom()
 9                     .setIoThreadCount(1)
10                     .build());
11         }
12     }); 

4.3.身份验证

        使用如下的方式设置身份验证相关的请求参数:
 1 final CredentialsProvider credentialsProvider =
 2     new BasicCredentialsProvider();
 3 credentialsProvider.setCredentials(AuthScope.ANY,
 4     new UsernamePasswordCredentials("user", "password"));
 5 
 6 RestClientBuilder builder = RestClient.builder(
 7     new HttpHost("localhost", 9200))
 8     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 9         @Override
10         public HttpAsyncClientBuilder customizeHttpClient(
11                 HttpAsyncClientBuilder httpClientBuilder) {
12             return httpClientBuilder
13                 .setDefaultCredentialsProvider(credentialsProvider);
14         }
15     });
        抢占式(这里翻译不太确定,结合上下文,应该指的是在第一次请求就发送身份验证信息)身份验证可以被禁用,这意味着每个HTTP请求都会在没有身份验证请求头的情况下发送,以查看它是否被接受,并且在收到HTTP 401响应后,它将使用含有身份验证请求头的报文重新发送完全相同的请求。 如果您希望这样做,那么您可以通过HttpAsyncClientBuilder禁用它:
 1 final CredentialsProvider credentialsProvider =
 2     new BasicCredentialsProvider();
 3 credentialsProvider.setCredentials(AuthScope.ANY,
 4     new UsernamePasswordCredentials("user", "password"));
 5 
 6 RestClientBuilder builder = RestClient.builder(
 7     new HttpHost("localhost", 9200))
 8     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 9         @Override
10         public HttpAsyncClientBuilder customizeHttpClient(
11                 HttpAsyncClientBuilder httpClientBuilder) {
12             httpClientBuilder.disableAuthCaching();  /* 禁用抢占式身份验证 */
13             return httpClientBuilder
14                 .setDefaultCredentialsProvider(credentialsProvider);
15         }
16     });

4.4.SSL通信

        加密通信同样可以通过HttpClientConfigCallback进行配置。HttpAsyncClientBuilder通过3个暴露的API:setSSLContext、setSSLSessionStrategy、setConnectionManager进行加密通信的设置。下面是一个配置示例(如果没有显式配置,则会使用系统的默认配置):
 1 KeyStore truststore = KeyStore.getInstance("jks");
 2 try (InputStream is = Files.newInputStream(keyStorePath)) {
 3     truststore.load(is, keyStorePass.toCharArray());
 4 }
 5 SSLContextBuilder sslBuilder = SSLContexts.custom()
 6     .loadTrustMaterial(truststore, null);
 7 final SSLContext sslContext = sslBuilder.build();
 8 RestClientBuilder builder = RestClient.builder(
 9     new HttpHost("localhost", 9200, "https"))
10     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
11         @Override
12         public HttpAsyncClientBuilder customizeHttpClient(
13                 HttpAsyncClientBuilder httpClientBuilder) {
14             return httpClientBuilder.setSSLContext(sslContext);
15         }
16     });

4.5.节点选择器

        客户端默认情况下会以轮询的形式将请求发送到每一个初始化时配置的节点上。在初始化时,我们可以提供一个节点选择器NodeSelector。当嗅探开启时这个功能会很好用,以防只有HTTP请求才能访问专用主节点。 对于每个请求,客户端将运行最终配置的节点选择器以过滤候选节点,然后从列表中选择下一个节点选择器。下面是一个基于机架进行节点选择的例子:
 1 RestClientBuilder builder = RestClient.builder(
 2         new HttpHost("localhost", 9200, "http"));
 3 builder.setNodeSelector(new NodeSelector() { 
 4     @Override
 5     public void select(Iterable<Node> nodes) {
 6         /*
 7          * Prefer any node that belongs to rack_one. If none is around
 8          * we will go to another rack till it's time to try and revive
 9          * some of the nodes that belong to rack_one.
10          */
11         boolean foundOne = false;
12         for (Node node : nodes) {
13             String rackId = node.getAttributes().get("rack_id").get(0);
14             if ("rack_one".equals(rackId)) {
15                 foundOne = true;
16                 break;
17             }
18         }
19         if (foundOne) {
20             Iterator<Node> nodesIt = nodes.iterator();
21             while (nodesIt.hasNext()) {
22                 Node node = nodesIt.next();
23                 String rackId = node.getAttributes().get("rack_id").get(0);
24                 if ("rack_one".equals(rackId) == false) {
25                     nodesIt.remove();
26                 }
27             }
28         }
29     }
30 });
        上面的例子中,优先选择一号机架上的节点。当一号机架没有任何节点时,才会选择其他机架节点。        需要注意的是,如果一个NodeSelector无法提供一个始终如一的(或者说稳定的)节点列表,那么轮询发送请求至不同node的行为将会变得不可预测或失效。在上面的例子中,轮询行为会很好的工作,因为这个例子中的代码可以很好的(或比较稳定的)获取到哪些结点可用(这些获取到的可用结点能够影响轮询行为的可预测性)。因此,这也提醒我们NodeSelector不应该依赖于任何外部因素,否则轮询行为将会变的不可预测。  

5、嗅探Sniffer

        允许从一个运行的ES集群中自动发现结点,并且将发现的结点设置入一个已存在的RestClient。默认的,它使用节点信息api检索属于集群的节点,并使用jackson解析获得的json响应。         要使用该功能,必须引入依赖,该依赖版本同rest-client版本一样,跟着es的发布版本走的:
1 <dependency>
2     <groupId>org.elasticsearch.client</groupId>
3     <artifactId>elasticsearch-rest-client-sniffer</artifactId>
4     <version>6.5.4</version>
5 </dependency>

5.1.基本设置

        在初始化的RestClient的时候将一个Sniffer与其进行关联。Sniffer嗅探器会为RestClient提供一个周期性的拉取es所有结点列表的功能(默认5分钟拉取一次),拉取到节点列表后,会调用RestClient.setNodes的API将其更新入RestClient。
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 Sniffer sniffer = Sniffer.builder(restClient).build(); 
        Sniffer应该具有同RestClient一样的生命周期,也就是说,在关闭RestClient时,也需要关闭Sniffer以释放其占用的资源:
1 sniffer.close();
2 restClient.close(); 

5.2.定制刷新间隔

        上面我们提到了,Sniffer默认5分钟刷新一次,我们可以在初始化Sniffer的时候改变这个时间(单为:毫秒):
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 Sniffer sniffer = Sniffer.builder(restClient)
5     .setSniffIntervalMillis(60000).build(); //设置Sniffer刷新间隔
        另外,我们可以在请求失败上启用Sniffer,即在每次请求节点失败后,可以让Sniffer执行一次节点更新(区别于Sniffer固定间隔的刷新)。我们需要为RestClient提供一个SnifferOnFailureListener,并在Client初始化的时候设置这个监听器,同时在创建Sniffer之后也需要将其设置给这个监听器。这样一来,每次失败之后,监听器监听到失败就会委托Sniffer去执行节点刷新:
 1 SniffOnFailureListener sniffOnFailureListener =
 2     new SniffOnFailureListener(); //创建Sniffer失败监听器
 3 RestClient restClient = RestClient.builder(
 4     new HttpHost("localhost", 9200))
 5     .setFailureListener(sniffOnFailureListener) //为RestClient设置监听器
 6     .build();
 7 Sniffer sniffer = Sniffer.builder(restClient)
 8     .setSniffAfterFailureDelayMillis(30000) //设置失败之后的额外调度延迟
 9     .build();
10 sniffOnFailureListener.setSniffer(sniffer); //为监听器设置Sniffer

5.3.设置节点通信协议

        Sniffer的节点列表刷新行为使用的是NodeInfo API,这个API只能返回节点的IP和PORT信息,对于连接节点使用到的协议无法返回,因此默认认为节点通信使用的是HTTP协议。在使用HTTPS协议的情况下,我们需要手工的设置一个ElasticsearchNodesSniffer实例用来告知Sniffer节点通信的协议是HTTPS:
1 RestClient restClient = RestClient.builder(
2         new HttpHost("localhost", 9200, "http"))
3         .build();
4 NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
5         restClient,
6         ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
7         ElasticsearchNodesSniffer.Scheme.HTTPS);
8 Sniffer sniffer = Sniffer.builder(restClient)
9         .setNodesSniffer(nodesSniffer).build();
        通过同样的方式,我们可以设置SnifferRequestTimeout这个参数的值(默认1S)。当调用NodeInfoAPI时,这个超时参数会作为查询字符串中的参数,它告诉ES服务器查询节点列表的超时时间。因此当服务器侧出现超时的时候仍然会返回一个有效的响应结果(服务器侧查询节点列表的超时,不代表Sniffer请求自身的超时),这个响应结果可能只包含节点列表的一个子集,即在超时时间内能获取到的节点列表数据:
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
5     restClient,
6     TimeUnit.SECONDS.toMillis(5),
7     ElasticsearchNodesSniffer.Scheme.HTTP);
8 Sniffer sniffer = Sniffer.builder(restClient)
9     .setNodesSniffer(nodesSniffer).build(); 

5.4.自定义NodesSniffer

        有时候,我们需要自定义一个NodesSniffer完成一些个性化的特殊功能,比如从一个内部资源中获取节点列表(如文件)而不是从ES服务器集群中获取节点列表:
 1 RestClient restClient = RestClient.builder(
 2     new HttpHost("localhost", 9200, "http"))
 3     .build();
 4 NodesSniffer nodesSniffer = new NodesSniffer() {
 5         @Override
 6         public List<Node> sniff() throws IOException {
 7             return null; 
 8         }
 9     };
10 Sniffer sniffer = Sniffer.builder(restClient)
11     .setNodesSniffer(nodesSniffer).build();

 

标签:Java,RestClient,level,builder,rest,new,Sniffer,节点,请求
来源: https://www.cnblogs.com/manayi/p/10645907.html