> 文档中心 > Asp.Net Core 7 preview 4 新特性--限流中间件

Asp.Net Core 7 preview 4 新特性--限流中间件


前言

限流是应对流量暴增或某些用户恶意攻击等场景的重要手段之一,然而微软官方从未支持这一重要特性,AspNetCoreRateLimit这一第三方库限流库一般作为首选使用,然而其配置参数过于繁多,对使用者造成较大的学习成本。令人高兴的是,在刚刚发布的.NET 7 Preview 4中开始支持限流中间件。

UseRateLimiter尝鲜

  1. 安装.NET 7.0 SDK(v7.0.100-preview.4)
  2. 通过nuget包安装Microsoft.AspNetCore.RateLimiting
  3. 创建.Net7网站应用,注册中间件

全局限流并发1个

app.UseRateLimiter(new RateLimiterOptions{    Limiter = PartitionedRateLimiter.Create(resource =>    { return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",     _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));    })});

根据不同资源不同限制并发数/api前缀的资源租约数2,等待队列长度为2,其他默认租约数1,队列长度1。

app.UseRateLimiter(new RateLimiterOptions(){    // 触发限流的响应码    DefaultRejectionStatusCode = 500,    OnRejected = async (ctx, rateLimitLease) =>    { // 触发限流回调处理    },    Limiter = PartitionedRateLimiter.Create(resource =>    { if (resource.Request.Path.StartsWithSegments("/api")) {     return RateLimitPartition.CreateConcurrencyLimiter("WebApiLimiter",  _ => new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2)); } else {     return RateLimitPartition.CreateConcurrencyLimiter("DefaultLimiter",  _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); }    })});

本地测试

  1. 新建一个webapi项目,并注册限流中间件如下
using Microsoft.AspNetCore.RateLimiting;using System.Threading.RateLimiting;var builder = WebApplication.CreateBuilder(args);// Add services to the container.builder.Services.AddControllers();// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbucklebuilder.Services.AddEndpointsApiExplorer();builder.Services.AddSwaggerGen();var app = builder.Build();// Configure the HTTP request pipeline.if (app.Environment.IsDevelopment()){    app.UseSwagger();    app.UseSwaggerUI();}app.UseRateLimiter(new RateLimiterOptions{    DefaultRejectionStatusCode = 500,    OnRejected = async (ctx, lease) =>    { await Task.FromResult(ctx.Response.WriteAsync("ConcurrencyLimiter"));    },    Limiter = PartitionedRateLimiter.Create(resource =>    { return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",     _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));    })});app.UseHttpsRedirection();app.UseAuthorization();app.MapControllers();app.Run();
  1. 启动项目,使用jmeter测试100并发,请求接口/WeatherForecast
    Asp.Net Core 7 preview 4 新特性--限流中间件
    所有请求处理成功,失败0!

这个结果是不是有点失望,其实RateLimitPartition.CreateConcurrencyLimiter创建的限流器是
ConcurrencyLimiter,后续可以实现个各种策略的限流器进行替换之。
看了ConcurrencyLimiter的实现,其实就是令牌的限流思想,上面配置的new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)),第一个1代表令牌的个数,第二个1代表可以当桶里的令牌为空时,进入等待队列,而不是直接失败,当前面的请求结束后,会归还令牌,此时等待的请求就可以拿到令牌了,QueueProcessingOrder.NewestFirst代表最新的请求优先获取令牌,也就是获取令牌时非公平的,还有另一个枚举值QueueProcessingOrder.OldestFirst老的优先,获取令牌是公平的。只要我们获取到令牌的人干活速度快,虽然我们令牌只有1,并发就很高。
3. 测试触发失败场景
只需要让我们拿到令牌的人持有时间长点,就能轻易的触发。
Asp.Net Core 7 preview 4 新特性--限流中间件
调整jmater并发数为10
Asp.Net Core 7 preview 4 新特性--限流中间件

相应内容也是我们设置的内容。
Asp.Net Core 7 preview 4 新特性--限流中间件

ConcurrencyLimiter源码

令牌桶限流思想

获取令牌

 protected override RateLimitLease AcquireCore(int permitCount) {     // These amounts of resources can never be acquired     if (permitCount > _options.PermitLimit)     {  throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));     }     ThrowIfDisposed();     // Return SuccessfulLease or FailedLease to indicate limiter state     if (permitCount == 0)     {  return _permitCount > 0 ? SuccessfulLease : FailedLease;     }     // Perf: Check SemaphoreSlim implementation instead of locking     if (_permitCount >= permitCount)     {  lock (Lock)  {      if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))      {   return lease;      }  }     }     return FailedLease; }

尝试获取令牌核心逻辑

 private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease) {     ThrowIfDisposed();     // if permitCount is 0 we want to queue it if there are no available permits     if (_permitCount >= permitCount && _permitCount != 0)     {  if (permitCount == 0)  {      // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available      lease = SuccessfulLease;      return true;  }  // a. if there are no items queued we can lease  // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest  if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))  {      _idleSince = null;      _permitCount -= permitCount;      Debug.Assert(_permitCount >= 0);      lease = new ConcurrencyLease(true, this, permitCount);      return true;  }     }     lease = null;     return false; }

令牌获取失败后进入等待队列

 protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default) {     // These amounts of resources can never be acquired     if (permitCount > _options.PermitLimit)     {  throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));     }     // Return SuccessfulLease if requestedCount is 0 and resources are available     if (permitCount == 0 && _permitCount > 0 && !_disposed)     {  return new ValueTask(SuccessfulLease);     }     // Perf: Check SemaphoreSlim implementation instead of locking     lock (Lock)     {  if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))  {      return new ValueTask(lease);  }  // Avoid integer overflow by using subtraction instead of addition  Debug.Assert(_options.QueueLimit >= _queueCount);  if (_options.QueueLimit - _queueCount < permitCount)  {      if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount = 0);if (!oldestRequest.Tcs.TrySetResult(FailedLease)){    // Updating queue count is handled by the cancellation code    _queueCount += oldestRequest.Count;}   }   while (_options.QueueLimit - _queueCount < permitCount);      }      else      {   // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst   return new ValueTask(QueueLimitLease);      }  }  CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);  CancellationTokenRegistration ctr = default;  if (cancellationToken.CanBeCanceled)  {      ctr = cancellationToken.Register(static obj =>      {   ((CancelQueueState)obj!).TrySetCanceled();      }, tcs);  }  RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);  _queue.EnqueueTail(request);  _queueCount += permitCount;  Debug.Assert(_queueCount <= _options.QueueLimit);  return new ValueTask(request.Tcs.Task);     } }

归还令牌

 private void Release(int releaseCount) {     lock (Lock)     {  if (_disposed)  {      return;  }  _permitCount += releaseCount;  Debug.Assert(_permitCount  0)  {      RequestRegistration nextPendingRequest =   _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst   ? _queue.PeekHead()   : _queue.PeekTail();      if (_permitCount >= nextPendingRequest.Count)      {   nextPendingRequest =_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst? _queue.DequeueHead(): _queue.DequeueTail();   _permitCount -= nextPendingRequest.Count;   _queueCount -= nextPendingRequest.Count;   Debug.Assert(_permitCount >= 0);   ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);   // Check if request was canceled   if (!nextPendingRequest.Tcs.TrySetResult(lease))   {// Queued item was canceled so add count back_permitCount += nextPendingRequest.Count;// Updating queue count is handled by the cancellation code_queueCount += nextPendingRequest.Count;   }   nextPendingRequest.CancellationTokenRegistration.Dispose();   Debug.Assert(_queueCount >= 0);      }      else      {   break;      }  }  if (_permitCount == _options.PermitLimit)  {      Debug.Assert(_idleSince is null);      Debug.Assert(_queueCount == 0);      _idleSince = Stopwatch.GetTimestamp();  }     } }

总结

虽然这次官方对限流进行了支持,但貌似还不能支持对ip或client级别的限制支持,对于更高级的限流策略仍需要借助第三方库或自己实现,期待后续越来越完善。