Dubbo ActiveLimitFilter限流
作者:互联网
public class ActiveLimitFilter implements Filter, Filter.Listener { private static final String ACTIVE_LIMIT_FILTER_START_TIME = "active_limit_filter_start_time"; @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); // 最大并发量 int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0); final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // 尝试给计数加一 if (!RpcStatus.beginCount(url, methodName, max)) { // 获取计数失败 long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0); long start = System.currentTimeMillis(); long remain = timeout; synchronized (rpcStatus) { while (!RpcStatus.beginCount(url, methodName, max)) { try { rpcStatus.wait(remain); } catch (InterruptedException e) { // ignore } long elapsed = System.currentTimeMillis() - start; // 等待剩余时间 remain = timeout - elapsed; if (remain <= 0) { // 等待超时 throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION, "Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + rpcStatus.getActive() + ". max concurrent invoke limit: " + max); } } } } invocation.put(ACTIVE_LIMIT_FILTER_START_TIME, System.currentTimeMillis()); return invoker.invoke(invocation); } @Override public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { String methodName = invocation.getMethodName(); URL url = invoker.getUrl(); int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0); // 这里需要先计数减一再唤醒其他线程 RpcStatus.endCount(url, methodName, getElapsed(invocation), true); notifyFinish(RpcStatus.getStatus(url, methodName), max); } @Override public void one rror(Throwable t, Invoker<?> invoker, Invocation invocation) { String methodName = invocation.getMethodName(); URL url = invoker.getUrl(); int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0); if (t instanceof RpcException) { RpcException rpcException = (RpcException) t; if (rpcException.isLimitExceed()) { return; } } RpcStatus.endCount(url, methodName, getElapsed(invocation), false); notifyFinish(RpcStatus.getStatus(url, methodName), max); } private long getElapsed(Invocation invocation) { Object beginTime = invocation.get(ACTIVE_LIMIT_FILTER_START_TIME); return beginTime != null ? System.currentTimeMillis() - (Long) beginTime : 0; } private void notifyFinish(final RpcStatus rpcStatus, int max) { if (max > 0) { synchronized (rpcStatus) { rpcStatus.notifyAll(); } } } }
public class RpcStatus { /** * @param url */ public static boolean beginCount(URL url, String methodName, int max) { max = (max <= 0) ? Integer.MAX_VALUE : max; RpcStatus appStatus = getStatus(url); RpcStatus methodStatus = getStatus(url, methodName); if (methodStatus.active.get() == Integer.MAX_VALUE) { return false; } // 这里的CAS写法值得学习 for (int i; ; ) { i = methodStatus.active.get(); if (i == Integer.MAX_VALUE || i + 1 > max) { return false; } if (methodStatus.active.compareAndSet(i, i + 1)) { break; } } appStatus.active.incrementAndGet(); return true; } private static void endCount(RpcStatus status, long elapsed, boolean succeeded) { status.active.decrementAndGet(); status.total.incrementAndGet(); status.totalElapsed.addAndGet(elapsed); if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } if (succeeded) { if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { status.failed.incrementAndGet(); status.failedElapsed.addAndGet(elapsed); if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } } }
标签:status,Dubbo,methodName,max,RpcStatus,elapsed,invocation,限流,ActiveLimitFilter 来源: https://blog.csdn.net/qq_42261290/article/details/121412619