Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions aspnetcore/signalr/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ The following table describes options for configuring SignalR hubs:
| `EnableDetailedErrors` | `false` | If `true`, detailed exception messages are returned to clients when an exception is thrown in a Hub method. The default is `false` because these exception messages can contain sensitive information. |
| `StreamBufferCapacity` | `10` | The maximum number of items that can be buffered for client upload streams. If this limit is reached, the processing of invocations is blocked until the server processes stream items.|
| `MaximumReceiveMessageSize` | 32 KB | Maximum size of a single incoming hub message. Increasing the value might increase the risk of [Denial of service (DoS) attacks](https://developer.mozilla.org/docs/Glossary/DOS_attack). |
| `MaximumParallelInvocationsPerClient` | 1 | The maximum number of hub methods that each client can call in parallel before queueing. |
| `DisableImplicitFromServicesParameters` | `false` | Hub method arguments are resolved from DI if possible. |
| `MaximumParallelInvocationsPerClient` | 1 | The maximum number of hub methods that each client can call in parallel before queueing. This limit does not apply to streaming hub invocations. |
| `DisableImplicitFromServicesParameters` | `false` | Hub method arguments will be resolved from DI if possible. |

> [!NOTE]
> `MaximumParallelInvocationsPerClient` does not apply to streaming hub invocations. Streaming invocations are expected to be long-running and can run concurrently. Use [hub filters](xref:signalr/hub-filters) to enforce per-connection streaming concurrency limits.

Options can be configured for all hubs by providing an options delegate to the `AddSignalR` call in `Program.cs`.

Expand Down
5 changes: 4 additions & 1 deletion aspnetcore/signalr/configuration/includes/configuration5.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ The following table describes options for configuring SignalR hubs:
| `EnableDetailedErrors` | `false` | If `true`, detailed exception messages are returned to clients when an exception is thrown in a Hub method. The default is `false` because these exception messages can contain sensitive information. |
| `StreamBufferCapacity` | `10` | The maximum number of items that can be buffered for client upload streams. If this limit is reached, the processing of invocations is blocked until the server processes stream items.|
| `MaximumReceiveMessageSize` | 32 KB | Maximum size of a single incoming hub message. Increasing the value may increase the risk of [Denial of service (DoS) attacks](https://developer.mozilla.org/docs/Glossary/DOS_attack). |
| `MaximumParallelInvocationsPerClient` | 1 | The maximum number of hub methods that each client can call in parallel before queueing. |
| `MaximumParallelInvocationsPerClient` | 1 | The maximum number of hub methods that each client can call in parallel before queueing. This limit does not apply to streaming hub invocations. |

> [!NOTE]
> `MaximumParallelInvocationsPerClient` does not apply to streaming hub invocations. Streaming invocations are expected to be long-running and can run concurrently. Use [hub filters](xref:signalr/hub-filters) to enforce per-connection streaming concurrency limits.

Options can be configured for all hubs by providing an options delegate to the `AddSignalR` call in `Startup.ConfigureServices`.

Expand Down
5 changes: 4 additions & 1 deletion aspnetcore/signalr/configuration/includes/configuration6.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ The following table describes options for configuring SignalR hubs:
| `EnableDetailedErrors` | `false` | If `true`, detailed exception messages are returned to clients when an exception is thrown in a Hub method. The default is `false` because these exception messages can contain sensitive information. |
| `StreamBufferCapacity` | `10` | The maximum number of items that can be buffered for client upload streams. If this limit is reached, the processing of invocations is blocked until the server processes stream items.|
| `MaximumReceiveMessageSize` | 32 KB | Maximum size of a single incoming hub message. Increasing the value may increase the risk of [Denial of service (DoS) attacks](https://developer.mozilla.org/docs/Glossary/DOS_attack). |
| `MaximumParallelInvocationsPerClient` | 1 | The maximum number of hub methods that each client can call in parallel before queueing. |
| `MaximumParallelInvocationsPerClient` | 1 | The maximum number of hub methods that each client can call in parallel before queueing. This limit does not apply to streaming hub invocations. |

> [!NOTE]
> `MaximumParallelInvocationsPerClient` does not apply to streaming hub invocations. Streaming invocations are expected to be long-running and can run concurrently. Use [hub filters](xref:signalr/hub-filters) to enforce per-connection streaming concurrency limits.

Options can be configured for all hubs by providing an options delegate to the `AddSignalR` call in `Program.cs`.

Expand Down
5 changes: 4 additions & 1 deletion aspnetcore/signalr/configuration/includes/configuration7.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ The following table describes options for configuring SignalR hubs:
| `EnableDetailedErrors` | `false` | If `true`, detailed exception messages are returned to clients when an exception is thrown in a Hub method. The default is `false` because these exception messages can contain sensitive information. |
| `StreamBufferCapacity` | `10` | The maximum number of items that can be buffered for client upload streams. If this limit is reached, the processing of invocations is blocked until the server processes stream items.|
| `MaximumReceiveMessageSize` | 32 KB | Maximum size of a single incoming hub message. Increasing the value may increase the risk of [Denial of service (DoS) attacks](https://developer.mozilla.org/docs/Glossary/DOS_attack). |
| `MaximumParallelInvocationsPerClient` | 1 | The maximum number of hub methods that each client can call in parallel before queueing. |
| `MaximumParallelInvocationsPerClient` | 1 | The maximum number of hub methods that each client can call in parallel before queueing. This limit does not apply to streaming hub invocations. |
| `DisableImplicitFromServicesParameters` | `false` | Hub method arguments will be resolved from DI if possible. |

> [!NOTE]
> `MaximumParallelInvocationsPerClient` does not apply to streaming hub invocations. Streaming invocations are expected to be long-running and can run concurrently. Use [hub filters](xref:signalr/hub-filters) to enforce per-connection streaming concurrency limits.

Options can be configured for all hubs by providing an options delegate to the `AddSignalR` call in `Program.cs`.

```csharp
Expand Down
119 changes: 119 additions & 0 deletions aspnetcore/signalr/hubs.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,125 @@ public class ChatHub : Hub
> [!NOTE]
> This feature makes use of <xref:Microsoft.Extensions.DependencyInjection.IServiceProviderIsService>, which is optionally implemented by DI implementations. If the app's DI container doesn't support this feature, injecting services into hub methods isn't supported.

## Limit per-connection streaming invocations
Copy link
Copy Markdown
Contributor

@wadepickett wadepickett May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new section would need to move to after the sub section Keyed Services support in Dependancy Injection.
"Keyed services support" is conceptually a subtopic of "Inject services into a hub"

So the organizaiton would like like this:

## Inject services into a hub
(existing content)
### Keyed services support in Dependency Injection
(existing content)

## Limit per-connection streaming invocations
(new content)

## Handle events for a connection


`HubOptions.MaximumParallelInvocationsPerClient` controls non-streaming hub invocations only. It does not apply to streaming hub invocations. Streaming invocations are expected to be long-running and can run concurrently. To enforce a per-connection limit for streaming invocations, use a hub filter to track active stream-returning hub methods.

```csharp
using System.Collections.Concurrent;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need:
using System.Threading.Channels;


public class StreamConcurrencyFilter : IHubFilter
{
private readonly ConcurrentDictionary<string, int> _activeStreams = new();
private readonly int _maxConcurrentStreams;

public StreamConcurrencyFilter(int maxConcurrentStreams = 1)
{
_maxConcurrentStreams = maxConcurrentStreams;
}

public async ValueTask<object?> InvokeMethodAsync(
HubInvocationContext invocationContext,
Func<HubInvocationContext, ValueTask<object?>> next)
{
if (!IsStreamingInvocation(invocationContext.HubMethod.ReturnType))
{
return await next(invocationContext);
}

var connectionId = invocationContext.Context.ConnectionId;
if (connectionId is null)
{
return await next(invocationContext);
}

var activeStreams = _activeStreams.AddOrUpdate(
connectionId,
1,
(_, current) => current + 1);

if (activeStreams > _maxConcurrentStreams)
{
Decrement(connectionId);
throw new HubException($"The connection is limited to {_maxConcurrentStreams} concurrent streaming invocations.");
}

try
Copy link
Copy Markdown
Contributor

@wadepickett wadepickett May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to look at this a few times, but I think the limit would never be honored.

The count is decremented the moment the stream starts delivering, not when the stream actually finishes.
Every stream increments the counter, then decrements it almost instantly as soon as the stream object is returned. So by the time a second stream request comes in, the first one has already decremented. The counter is back to zero even though the first stream is still actively running.

So it seems for example, we could start 100 concurrent streams even though there is a limit of 1, because the counter never stays elevated long enough to block as intended, so no actual concurrency control.

@BrennanConroy, what would be ther correct pattern to suggest and document here? Also, should this be using InvokeStreamAsync as opposed to InvokeMethodAsync?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BrennanConroy, could you weigh in on the correct pattern here for above? It is not clear to me what it should be.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought would have been code in the Hub itself to handle this: (pseudo code) e.g.

class MyHub : Hub
{
    public IAsyncEnumerable<string> Stream()
    {
        return WithLimit(GetStream());
    }

    private async IAsyncEnumerable<string> WithLimit(IAsyncEnumerable<string> originalStream)
    {
        // initial limit check to throw before wrapping if at limit
        if (...) throw;

        IncrementStreamCount();

        try
        {
            await foreach (var item in originalStream)
            {
                yield return item;
            }
        }
        finally
        {
            DecrementStreamCount();
        }
    }
}

The hub filter is an interesting idea, unfortunately you would need to use some reflection to achieve the same behavior:

Reflection IHubFilter Pseudo Code
public sealed class StreamConcurrencyFilter : IHubFilter
{
    private readonly ConcurrentDictionary<string, int> _activeStreams = new();
    private readonly int _maxConcurrentStreams;

    public StreamConcurrencyFilter(int maxConcurrentStreams = 1)
    {
        if (maxConcurrentStreams < 1)
        {
            throw new ArgumentOutOfRangeException(nameof(maxConcurrentStreams));
        }

        _maxConcurrentStreams = maxConcurrentStreams;
    }

    public async ValueTask<object?> InvokeMethodAsync(
        HubInvocationContext invocationContext,
        Func<HubInvocationContext, ValueTask<object?>> next)
    {
        var streamItemType = GetStreamItemType(invocationContext.HubMethod.ReturnType);
        if (streamItemType is null)
        {
            return await next(invocationContext);
        }

        var connectionId = invocationContext.Context.ConnectionId;

        // Reserve a slot before invoking the method. If the cap is exceeded,
        // release the slot and fail the invocation so no stream is started.
        if (_activeStreams.AddOrUpdate(connectionId, 1, (_, current) => current + 1)
            > _maxConcurrentStreams)
        {
            Decrement(connectionId);
            throw new HubException(
                $"The connection is limited to {_maxConcurrentStreams} concurrent streaming invocations.");
        }

        object? result;
        try
        {
            result = await next(invocationContext);
        }
        catch
        {
            Decrement(connectionId);
            throw;
        }

        if (result is null)
        {
            Decrement(connectionId);
            return null;
        }

        // Wrap the result so the slot is released only after the client finishes
        // consuming the stream (or the connection is aborted).
        return WrapMethod
            .MakeGenericMethod(streamItemType)
            .Invoke(this, new[] { result, connectionId });
    }

    private static readonly MethodInfo WrapMethod = typeof(StreamConcurrencyFilter)
        .GetMethod(nameof(Wrap), BindingFlags.Instance | BindingFlags.NonPublic)!;

    private object Wrap<T>(object result, string connectionId)
    {
        return result switch
        {
            IAsyncEnumerable<T> asyncEnumerable => TrackAsyncEnumerable(asyncEnumerable, connectionId),
            ChannelReader<T> channelReader => TrackChannelReader(channelReader, connectionId),
            _ => ReleaseAndReturn(result, connectionId),
        };
    }

    private async IAsyncEnumerable<T> TrackAsyncEnumerable<T>(
        IAsyncEnumerable<T> source,
        string connectionId,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        try
        {
            await foreach (var item in source.WithCancellation(cancellationToken))
            {
                yield return item;
            }
        }
        finally
        {
            Decrement(connectionId);
        }
    }

    private ChannelReader<T> TrackChannelReader<T>(ChannelReader<T> source, string connectionId)
    {
        var channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
        {
            SingleReader = true,
            SingleWriter = true,
        });

        _ = Task.Run(async () =>
        {
            try
            {
                await foreach (var item in source.ReadAllAsync())
                {
                    await channel.Writer.WriteAsync(item);
                }

                channel.Writer.TryComplete();
            }
            catch (Exception ex)
            {
                channel.Writer.TryComplete(ex);
            }
            finally
            {
                Decrement(connectionId);
            }
        });

        return channel.Reader;
    }

    private object ReleaseAndReturn(object result, string connectionId)
    {
        Decrement(connectionId);
        return result;
    }

    private static Type? GetStreamItemType(Type returnType)
    {
        if (returnType is null)
        {
            return null;
        }

        if (returnType.IsGenericType)
        {
            var def = returnType.GetGenericTypeDefinition();
            if (def == typeof(IAsyncEnumerable<>) || def == typeof(ChannelReader<>))
            {
                return returnType.GetGenericArguments()[0];
            }

            if (def == typeof(Task<>) || def == typeof(ValueTask<>))
            {
                return GetStreamItemType(returnType.GetGenericArguments()[0]);
            }
        }

        return null;
    }

    private void Decrement(string connectionId)
    {
        while (_activeStreams.TryGetValue(connectionId, out var current))
        {
            if (current <= 1)
            {
                if (_activeStreams.TryRemove(new KeyValuePair<string, int>(connectionId, current)))
                {
                    return;
                }
            }
            else if (_activeStreams.TryUpdate(connectionId, current - 1, current))
            {
                return;
            }
        }
    }
}

{
return await next(invocationContext);
}
finally
{
Decrement(connectionId);
}
}

private static bool IsStreamingInvocation(Type returnType)
{
if (returnType is null)
{
return false;
}

if (returnType.IsGenericType)
{
var genericDefinition = returnType.GetGenericTypeDefinition();
if (genericDefinition == typeof(IAsyncEnumerable<>) ||
genericDefinition == typeof(ChannelReader<>))
{
return true;
}

if (genericDefinition == typeof(Task<>))
{
return IsStreamingInvocation(returnType.GetGenericArguments()[0]);
}
}

return false;
}

private void Decrement(string connectionId)
{
while (true)
{
if (!_activeStreams.TryGetValue(connectionId, out var current))
{
return;
}

if (current <= 1)
{
if (_activeStreams.TryRemove(connectionId, out _))
{
return;
}

continue;
}

if (_activeStreams.TryUpdate(connectionId, current - 1, current))
{
return;
}
}
}
}
```

Register the filter as a singleton so the active-stream count is shared across hub invocations for the same connection:

```csharp
builder.Services.AddSingleton(new StreamConcurrencyFilter(maxConcurrentStreams: 2));
builder.Services.AddSignalR(options =>
{
options.AddFilter<StreamConcurrencyFilter>();
});
```

> [!NOTE]
> This filter applies only to methods that return `IAsyncEnumerable<T>` or `ChannelReader<T>` and does not change the behavior of non-streaming hub methods.

### Keyed services support in Dependency Injection

*Keyed services* refers to a mechanism for registering and retrieving Dependency Injection (DI) services using keys. A service is associated with a key by calling <xref:Microsoft.Extensions.DependencyInjection.ServiceCollectionServiceExtensions.AddKeyedSingleton%2A> (or `AddKeyedScoped` or `AddKeyedTransient`) to register it. Access a registered service by specifying the key with the [`[FromKeyedServices]`](xref:Microsoft.Extensions.DependencyInjection.FromKeyedServicesAttribute) attribute. The following code shows how to use keyed services:
Expand Down
Loading