feat: switching to channels

This commit is contained in:
Timothy Schenk 2024-02-05 10:36:26 +01:00
parent 3615059a96
commit 1668f0f73d
Signed by: rainote
SSH key fingerprint: SHA256:pnkNSDwpAnaip00xaZlVFHKKsS7T8UtOomMzvs0yITE

View file

@ -4,6 +4,7 @@ using System.Collections.Concurrent;
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Diagnostics; using System.Diagnostics;
using System.Reflection; using System.Reflection;
using System.Threading.Channels;
using DotNext.Collections.Generic; using DotNext.Collections.Generic;
using DotNext.Linq.Expressions; using DotNext.Linq.Expressions;
using DotNext.Metaprogramming; using DotNext.Metaprogramming;
@ -16,11 +17,10 @@ using static CodeGenerator;
public class PacketDistributorService<T, S> : Microsoft.Extensions.Hosting.IHostedService, IDisposable where T : Enum public class PacketDistributorService<T, S> : Microsoft.Extensions.Hosting.IHostedService, IDisposable where T : Enum
{ {
private readonly ConcurrentQueue<ValueTuple<byte[], T, S>> _concurrentQueue; private readonly Channel<ValueTuple<byte[], T, S>> _channel;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly Assembly[] _sourcesContainingPackets; private readonly Assembly[] _sourcesContainingPackets;
private readonly TaskFactory _taskFactory;
private ImmutableDictionary<T, private ImmutableDictionary<T,
Func<byte[], IIncomingPacket>> _deserializationMap; Func<byte[], IIncomingPacket>> _deserializationMap;
@ -31,11 +31,10 @@ public class PacketDistributorService<T, S> : Microsoft.Extensions.Hosting.IHost
public PacketDistributorService(IServiceProvider serviceProvider, public PacketDistributorService(IServiceProvider serviceProvider,
Assembly[] sourcesContainingPackets) Assembly[] sourcesContainingPackets)
{ {
_concurrentQueue = new ConcurrentQueue<ValueTuple<byte[], T, S>>(); _channel = Channel.CreateUnbounded<ValueTuple<byte[], T, S>>();
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_sourcesContainingPackets = sourcesContainingPackets; _sourcesContainingPackets = sourcesContainingPackets;
_activitySource = new ActivitySource(nameof(PacketMediator)); _activitySource = new ActivitySource(nameof(PacketMediator));
_taskFactory = new TaskFactory();
} }
private Dictionary<T, Type> RetrievePacketsDictionary() private Dictionary<T, Type> RetrievePacketsDictionary()
@ -126,20 +125,24 @@ public class PacketDistributorService<T, S> : Microsoft.Extensions.Hosting.IHost
} }
_deserializationMap = tempDeserializationMap.ToImmutableDictionary(); _deserializationMap = tempDeserializationMap.ToImmutableDictionary();
_ = this.DequeueRawPacketAsync();
return Task.CompletedTask; return Task.CompletedTask;
} }
public async Task AddPacketAsync(byte[] packetData, T operationCode, S session) public async Task AddPacketAsync(byte[] packetData, T operationCode, S session)
{ {
_concurrentQueue.Enqueue((packetData, operationCode, session)); await _channel.Writer.WriteAsync((packetData, operationCode, session));
await DequeueRawPacketAsync();
} }
private async Task DequeueRawPacketAsync() private async Task DequeueRawPacketAsync()
{ {
if (_concurrentQueue.TryDequeue(out var item)) while (await _channel.Reader.WaitToReadAsync())
{ {
await InvokePacketHandlerAsync(item); while (_channel.Reader.TryRead(out var item))
{
await InvokePacketHandlerAsync(item);
}
} }
} }