.Netcore HttpClient源码探究
作者:互联网
源码搜索与概述
搜索HttpClient源码 https://source.dot.net/#System.Net.Http/System/Net/Http/HttpClient.cs
1、HttpClient 依赖HttpClientHandler或HttpMessageHandler,HttpClientHandler也继承自HttpMessageHandler
2、HttpClientHandler依赖 SocketsHttpHandler,SocketsHttpHandler继承HttpMessageHandler,并支持跨平台
3、SocketsHttpHandler依赖HttpConnectionHandler或HttpAuthenticatedConnectionHandler,
这两个又依赖HttpConnectionPoolManager
4、HttpConnectionPoolManager维护ConcurrentDictionary<HttpConnectionKey, HttpConnectionPool>,调用HttpConnectionPool进行Send
5、HttpConnectionPool再根据Http3/http2或其他不同的配置再进行细分到不同的流去处理。
查看示例
1.可以看到HttpClient继承HttpMessageInvoker,并提供了三个不同参数的构造函数
public partial class HttpClient : HttpMessageInvoker
public HttpClient() : this(new HttpClientHandler())
{
}
public HttpClient(HttpMessageHandler handler) : this(handler, true)
{
}
public HttpClient(HttpMessageHandler handler, bool disposeHandler) : base(handler, disposeHandler)
{
_timeout = s_defaultTimeout;
_maxResponseContentBufferSize = HttpContent.MaxBufferSize;
_pendingRequestsCts = new CancellationTokenSource();
}
首先跟踪第一个构造函数:public HttpClient() : this(new HttpClientHandler())
using HttpHandlerType = System.Net.Http.SocketsHttpHandler;
namespace System.Net.Http
{
public partial class HttpClientHandler : HttpMessageHandler
{
private readonly HttpHandlerType _underlyingHandler;
private HttpMessageHandler Handler
#if TARGET_BROWSER
{ get; }
#else
=> _underlyingHandler;
#endif
protected internal override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) =>
Handler.Send(request, cancellationToken);
protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) =>
Handler.SendAsync(request, cancellationToken);
可知依赖 HttpHandlerType=SocketsHttpHandler
[UnsupportedOSPlatform("browser")]
public sealed class SocketsHttpHandler : HttpMessageHandler
{
private readonly HttpConnectionSettings _settings = new HttpConnectionSettings();
private HttpMessageHandlerStage? _handler;
private bool _disposed;
private void CheckDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(SocketsHttpHandler));
}
}
private void CheckDisposedOrStarted()
{
CheckDisposed();
if (_handler != null)
{
throw new InvalidOperationException(SR.net_http_operation_started);
}
}
关注SendAsync
protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request), SR.net_http_handler_norequest);
}
CheckDisposed();
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<HttpResponseMessage>(cancellationToken);
}
HttpMessageHandler handler = _handler ?? SetupHandlerChain();
Exception? error = ValidateAndNormalizeRequest(request);
if (error != null)
{
return Task.FromException<HttpResponseMessage>(error);
}
return handler.SendAsync(request, cancellationToken);
}
//设置初始化handler:SetupHandlerChain
private HttpMessageHandlerStage SetupHandlerChain()
{
// Clone the settings to get a relatively consistent view that won't change after this point.
// (This isn't entirely complete, as some of the collections it contains aren't currently deeply cloned.)
HttpConnectionSettings settings = _settings.CloneAndNormalize();
HttpConnectionPoolManager poolManager = new HttpConnectionPoolManager(settings);
HttpMessageHandlerStage handler;
if (settings._credentials == null)
{
handler = new HttpConnectionHandler(poolManager);
}
else
{
handler = new HttpAuthenticatedConnectionHandler(poolManager);
}
// DiagnosticsHandler is inserted before RedirectHandler so that trace propagation is done on redirects as well
if (DiagnosticsHandler.IsGloballyEnabled() && settings._activityHeadersPropagator is DistributedContextPropagator propagator)
{
handler = new DiagnosticsHandler(handler, propagator, settings._allowAutoRedirect);
}
if (settings._allowAutoRedirect)
{
// Just as with WinHttpHandler, for security reasons, we do not support authentication on redirects
// if the credential is anything other than a CredentialCache.
// We allow credentials in a CredentialCache since they are specifically tied to URIs.
HttpMessageHandlerStage redirectHandler =
(settings._credentials == null || settings._credentials is CredentialCache) ?
handler :
new HttpConnectionHandler(poolManager); // will not authenticate
handler = new RedirectHandler(settings._maxAutomaticRedirections, handler, redirectHandler);
}
if (settings._automaticDecompression != DecompressionMethods.None)
{
handler = new DecompressionHandler(settings._automaticDecompression, handler);
}
// Ensure a single handler is used for all requests.
if (Interlocked.CompareExchange(ref _handler, handler, null) != null)
{
handler.Dispose();
}
return _handler;
}
从上面可以看到根据HttpConnectionSettings._credentials 的配置进行不同的初始化
HttpConnectionSettings settings = _settings.CloneAndNormalize();
HttpConnectionPoolManager poolManager = new HttpConnectionPoolManager(settings);
HttpMessageHandlerStage handler;
if (settings._credentials == null)
{
handler = new HttpConnectionHandler(poolManager);
}
else
{
handler = new HttpAuthenticatedConnectionHandler(poolManager);
}
关注HttpConnectionHandler和HttpAuthenticatedConnectionHandler,均依赖HttpConnectionPoolManager
internal sealed class HttpConnectionHandler : HttpMessageHandlerStage
{
private readonly HttpConnectionPoolManager _poolManager;
public HttpConnectionHandler(HttpConnectionPoolManager poolManager)
{
_poolManager = poolManager;
}
internal override ValueTask<HttpResponseMessage> SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
{
return _poolManager.SendAsync(request, async, doRequestAuth: false, cancellationToken);
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_poolManager.Dispose();
}
base.Dispose(disposing);
}
}
可见HttpConnectionPoolManager 维护了一个链接池。
public HttpConnectionPoolManager(HttpConnectionSettings settings)
{
_settings = settings;
_pools = new ConcurrentDictionary<HttpConnectionKey, HttpConnectionPool>();
管理链接的部分:
public ValueTask<HttpResponseMessage> SendAsyncCore(HttpRequestMessage request, Uri? proxyUri, bool async, bool doRequestAuth, bool isProxyConnect, CancellationToken cancellationToken)
{
HttpConnectionKey key = GetConnectionKey(request, proxyUri, isProxyConnect);
HttpConnectionPool? pool;
while (!_pools.TryGetValue(key, out pool))
{
pool = new HttpConnectionPool(this, key.Kind, key.Host, key.Port, key.SslHostName, key.ProxyUri);
if (_cleaningTimer == null)
{
// There's no cleaning timer, which means we're not adding connections into pools, but we still need
// the pool object for this request. We don't need or want to add the pool to the pools, though,
// since we don't want it to sit there forever, which it would without the cleaning timer.
break;
}
if (_pools.TryAdd(key, pool))
{
// We need to ensure the cleanup timer is running if it isn't
// already now that we added a new connection pool.
lock (SyncObj)
{
if (!_timerIsRunning)
{
SetCleaningTimer(_cleanPoolTimeout);
}
}
break;
}
// We created a pool and tried to add it to our pools, but some other thread got there before us.
// We don't need to Dispose the pool, as that's only needed when it contains connections
// that need to be closed.
}
return pool.SendAsync(request, async, doRequestAuth, cancellationToken);
}
可以看到由HttpConnectionPool发起SendAsync调用
https://source.dot.net/#System.Net.Http/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs,155362accc97d7ca
public HttpConnectionPool(HttpConnectionPoolManager poolManager, HttpConnectionKind kind, string? host, int port, string? sslHostName, Uri? proxyUri)
{
_poolManager = poolManager;
_kind = kind;
_proxyUri = proxyUri;
_maxHttp11Connections = Settings._maxConnectionsPerServer;
if (host != null)
{
_originAuthority = new HttpAuthority(host, port);
}
_http2Enabled = _poolManager.Settings._maxHttpVersion >= HttpVersion.Version20;
if (IsHttp3Supported())
{
_http3Enabled = _poolManager.Settings._maxHttpVersion >= HttpVersion.Version30 && (_poolManager.Settings._quicImplementationProvider ?? QuicImplementationProviders.Default).IsSupported;
}
switch (kind)
{
case HttpConnectionKind.Http:
Debug.Assert(host != null);
Debug.Assert(port != 0);
Debug.Assert(sslHostName == null);
Debug.Assert(proxyUri == null);
_http3Enabled = false;
break;
case HttpConnectionKind.Https:
Debug.Assert(host != null);
Debug.Assert(port != 0);
Debug.Assert(sslHostName != null);
Debug.Assert(proxyUri == null);
break;
case HttpConnectionKind.Proxy:
Debug.Assert(host == null);
Debug.Assert(port == 0);
Debug.Assert(sslHostName == null);
Debug.Assert(proxyUri != null);
_http2Enabled = false;
_http3Enabled = false;
break;
case HttpConnectionKind.ProxyTunnel:
Debug.Assert(host != null);
Debug.Assert(port != 0);
Debug.Assert(sslHostName == null);
Debug.Assert(proxyUri != null);
_http2Enabled = false;
_http3Enabled = false;
break;
case HttpConnectionKind.SslProxyTunnel:
Debug.Assert(host != null);
Debug.Assert(port != 0);
Debug.Assert(sslHostName != null);
Debug.Assert(proxyUri != null);
_http3Enabled = false; // TODO: how do we tunnel HTTP3?
break;
case HttpConnectionKind.ProxyConnect:
Debug.Assert(host != null);
Debug.Assert(port != 0);
Debug.Assert(sslHostName == null);
Debug.Assert(proxyUri != null);
// Don't enforce the max connections limit on proxy tunnels; this would mean that connections to different origin servers
// would compete for the same limited number of connections.
// We will still enforce this limit on the user of the tunnel (i.e. ProxyTunnel or SslProxyTunnel).
_maxHttp11Connections = int.MaxValue;
_http2Enabled = false;
_http3Enabled = false;
break;
case HttpConnectionKind.SocksTunnel:
case HttpConnectionKind.SslSocksTunnel:
Debug.Assert(host != null);
Debug.Assert(port != 0);
Debug.Assert(proxyUri != null);
_http3Enabled = false; // TODO: SOCKS supports UDP and may be used for HTTP3
break;
default:
Debug.Fail("Unknown HttpConnectionKind in HttpConnectionPool.ctor");
break;
}
if (!_http3Enabled)
{
// Avoid parsing Alt-Svc headers if they won't be used.
_altSvcEnabled = false;
}
string? hostHeader = null;
if (_originAuthority != null)
{
// Precalculate ASCII bytes for Host header
// Note that if _host is null, this is a (non-tunneled) proxy connection, and we can't cache the hostname.
hostHeader =
(_originAuthority.Port != (sslHostName == null ? DefaultHttpPort : DefaultHttpsPort)) ?
$"{_originAuthority.IdnHost}:{_originAuthority.Port}" :
_originAuthority.IdnHost;
// Note the IDN hostname should always be ASCII, since it's already been IDNA encoded.
_hostHeaderValueBytes = Encoding.ASCII.GetBytes(hostHeader);
Debug.Assert(Encoding.ASCII.GetString(_hostHeaderValueBytes) == hostHeader);
if (sslHostName == null)
{
_http2EncodedAuthorityHostHeader = HPackEncoder.EncodeLiteralHeaderFieldWithoutIndexingToAllocatedArray(H2StaticTable.Authority, hostHeader);
_http3EncodedAuthorityHostHeader = QPackEncoder.EncodeLiteralHeaderFieldWithStaticNameReferenceToArray(H3StaticTable.Authority, hostHeader);
}
}
if (sslHostName != null)
{
_sslOptionsHttp11 = ConstructSslOptions(poolManager, sslHostName);
_sslOptionsHttp11.ApplicationProtocols = null;
if (_http2Enabled)
{
_sslOptionsHttp2 = ConstructSslOptions(poolManager, sslHostName);
_sslOptionsHttp2.ApplicationProtocols = s_http2ApplicationProtocols;
_sslOptionsHttp2Only = ConstructSslOptions(poolManager, sslHostName);
_sslOptionsHttp2Only.ApplicationProtocols = s_http2OnlyApplicationProtocols;
// Note:
// The HTTP/2 specification states:
// "A deployment of HTTP/2 over TLS 1.2 MUST disable renegotiation.
// An endpoint MUST treat a TLS renegotiation as a connection error (Section 5.4.1)
// of type PROTOCOL_ERROR."
// which suggests we should do:
// _sslOptionsHttp2.AllowRenegotiation = false;
// However, if AllowRenegotiation is set to false, that will also prevent
// renegotation if the server denies the HTTP/2 request and causes a
// downgrade to HTTP/1.1, and the current APIs don't provide a mechanism
// by which AllowRenegotiation could be set back to true in that case.
// For now, if an HTTP/2 server erroneously issues a renegotiation, we'll
// allow it.
Debug.Assert(hostHeader != null);
_http2EncodedAuthorityHostHeader = HPackEncoder.EncodeLiteralHeaderFieldWithoutIndexingToAllocatedArray(H2StaticTable.Authority, hostHeader);
_http3EncodedAuthorityHostHeader = QPackEncoder.EncodeLiteralHeaderFieldWithStaticNameReferenceToArray(H3StaticTable.Authority, hostHeader);
}
if (IsHttp3Supported())
{
if (_http3Enabled)
{
_sslOptionsHttp3 = ConstructSslOptions(poolManager, sslHostName);
_sslOptionsHttp3.ApplicationProtocols = s_http3ApplicationProtocols;
}
}
}
// Set up for PreAuthenticate. Access to this cache is guarded by a lock on the cache itself.
if (_poolManager.Settings._preAuthenticate)
{
PreAuthCredentials = new CredentialCache();
}
if (NetEventSource.Log.IsEnabled()) Trace($"{this}");
}
发起调用
public ValueTask<HttpResponseMessage> SendAsync(HttpRequestMessage request, bool async, bool doRequestAuth, CancellationToken cancellationToken)
{
if (doRequestAuth && Settings._credentials != null)
{
return AuthenticationHelper.SendWithRequestAuthAsync(request, async, Settings._credentials, Settings._preAuthenticate, this, cancellationToken);
}
return SendWithProxyAuthAsync(request, async, doRequestAuth, cancellationToken);
}
public ValueTask<HttpResponseMessage> SendWithProxyAuthAsync(HttpRequestMessage request, bool async, bool doRequestAuth, CancellationToken cancellationToken)
{
if (DoProxyAuth && ProxyCredentials is not null)
{
return AuthenticationHelper.SendWithProxyAuthAsync(request, _proxyUri!, async, ProxyCredentials, doRequestAuth, this, cancellationToken);
}
return SendWithRetryAsync(request, async, doRequestAuth, cancellationToken);
}
public async ValueTask<HttpResponseMessage> SendWithRetryAsync(HttpRequestMessage request, bool async, bool doRequestAuth, CancellationToken cancellationToken)
{
int retryCount = 0;
while (true)
{
// Loop on connection failures (or other problems like version downgrade) and retry if possible.
try
{
return await SendAndProcessAltSvcAsync(request, async, doRequestAuth, cancellationToken).ConfigureAwait(false);
}
catch (HttpRequestException e) when (e.AllowRetry == RequestRetryType.RetryOnConnectionFailure)
{
Debug.Assert(retryCount >= 0 && retryCount <= MaxConnectionFailureRetries);
if (retryCount == MaxConnectionFailureRetries)
{
if (NetEventSource.Log.IsEnabled())
{
Trace($"MaxConnectionFailureRetries limit of {MaxConnectionFailureRetries} hit. Retryable request will not be retried. Exception: {e}");
}
throw;
}
retryCount++;
if (NetEventSource.Log.IsEnabled())
{
Trace($"Retry attempt {retryCount} after connection failure. Connection exception: {e}");
}
// Eat exception and try again.
}
catch (HttpRequestException e) when (e.AllowRetry == RequestRetryType.RetryOnLowerHttpVersion)
{
// Throw if fallback is not allowed by the version policy.
if (request.VersionPolicy != HttpVersionPolicy.RequestVersionOrLower)
{
throw new HttpRequestException(SR.Format(SR.net_http_requested_version_server_refused, request.Version, request.VersionPolicy), e);
}
if (NetEventSource.Log.IsEnabled())
{
Trace($"Retrying request because server requested version fallback: {e}");
}
// Eat exception and try again on a lower protocol version.
request.Version = HttpVersion.Version11;
}
catch (HttpRequestException e) when (e.AllowRetry == RequestRetryType.RetryOnStreamLimitReached)
{
if (NetEventSource.Log.IsEnabled())
{
Trace($"Retrying request on another HTTP/2 connection after active streams limit is reached on existing one: {e}");
}
// Eat exception and try again.
}
}
}
private async ValueTask<HttpResponseMessage> SendAndProcessAltSvcAsync(HttpRequestMessage request, bool async, bool doRequestAuth, CancellationToken cancellationToken)
{
HttpResponseMessage response = await DetermineVersionAndSendAsync(request, async, doRequestAuth, cancellationToken).ConfigureAwait(false);
// Check for the Alt-Svc header, to upgrade to HTTP/3.
if (_altSvcEnabled && response.Headers.TryGetValues(KnownHeaders.AltSvc.Descriptor, out IEnumerable<string>? altSvcHeaderValues))
{
HandleAltSvc(altSvcHeaderValues, response.Headers.Age);
}
return response;
}
private async ValueTask<HttpResponseMessage> DetermineVersionAndSendAsync(HttpRequestMessage request, bool async, bool doRequestAuth, CancellationToken cancellationToken)
{
HttpResponseMessage? response;
if (IsHttp3Supported())
{
response = await TrySendUsingHttp3Async(request, async, doRequestAuth, cancellationToken).ConfigureAwait(false);
if (response is not null)
{
return response;
}
}
// We cannot use HTTP/3. Do not continue if downgrade is not allowed.
if (request.Version.Major >= 3 && request.VersionPolicy != HttpVersionPolicy.RequestVersionOrLower)
{
throw GetVersionException(request, 3);
}
response = await TrySendUsingHttp2Async(request, async, doRequestAuth, cancellationToken).ConfigureAwait(false);
if (response is not null)
{
return response;
}
// We cannot use HTTP/2. Do not continue if downgrade is not allowed.
if (request.Version.Major >= 2 && request.VersionPolicy != HttpVersionPolicy.RequestVersionOrLower)
{
throw GetVersionException(request, 2);
}
return await SendUsingHttp11Async(request, async, doRequestAuth, cancellationToken).ConfigureAwait(false);
}
[SupportedOSPlatform("windows")]
[SupportedOSPlatform("linux")]
[SupportedOSPlatform("macos")]
private async ValueTask<Http3Connection> GetHttp3ConnectionAsync(HttpRequestMessage request, HttpAuthority authority, CancellationToken cancellationToken)
{
Debug.Assert(_kind == HttpConnectionKind.Https);
Debug.Assert(_http3Enabled == true);
Http3Connection? http3Connection = Volatile.Read(ref _http3Connection);
if (http3Connection != null)
{
if (CheckExpirationOnGet(http3Connection) || http3Connection.Authority != authority)
{
// Connection expired.
if (NetEventSource.Log.IsEnabled()) http3Connection.Trace("Found expired HTTP3 connection.");
http3Connection.Dispose();
InvalidateHttp3Connection(http3Connection);
}
else
{
// Connection exists and it is still good to use.
if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP3 connection.");
_usedSinceLastCleanup = true;
return http3Connection;
}
}
// Ensure that the connection creation semaphore is created
if (_http3ConnectionCreateLock == null)
{
lock (SyncObj)
{
if (_http3ConnectionCreateLock == null)
{
_http3ConnectionCreateLock = new SemaphoreSlim(1);
}
}
}
await _http3ConnectionCreateLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_http3Connection != null)
{
// Someone beat us to creating the connection.
if (NetEventSource.Log.IsEnabled())
{
Trace("Using existing HTTP3 connection.");
}
return _http3Connection;
}
if (NetEventSource.Log.IsEnabled())
{
Trace("Attempting new HTTP3 connection.");
}
QuicConnection quicConnection;
try
{
quicConnection = await ConnectHelper.ConnectQuicAsync(request, Settings._quicImplementationProvider ?? QuicImplementationProviders.Default, new DnsEndPoint(authority.IdnHost, authority.Port), _sslOptionsHttp3!, cancellationToken).ConfigureAwait(false);
}
catch
{
// Disables HTTP/3 until server announces it can handle it via Alt-Svc.
BlocklistAuthority(authority);
throw;
}
//TODO: NegotiatedApplicationProtocol not yet implemented.
#if false
if (quicConnection.NegotiatedApplicationProtocol != SslApplicationProtocol.Http3)
{
BlocklistAuthority(authority);
throw new HttpRequestException("QUIC connected but no HTTP/3 indicated via ALPN.", null, RequestRetryType.RetryOnSameOrNextProxy);
}
#endif
http3Connection = new Http3Connection(this, _originAuthority, authority, quicConnection);
_http3Connection = http3Connection;
if (NetEventSource.Log.IsEnabled())
{
Trace("New HTTP3 connection established.");
}
return http3Connection;
}
finally
{
_http3ConnectionCreateLock.Release();
}
}
标签:cancellationToken,Netcore,handler,request,Assert,源码,Debug,null,HttpClient 来源: https://www.cnblogs.com/fancunwei/p/15126469.html