diff --git a/Rai.PacketMediator/PacketDistributorService.cs b/Rai.PacketMediator/PacketDistributorService.cs index dae4797..89f2478 100644 --- a/Rai.PacketMediator/PacketDistributorService.cs +++ b/Rai.PacketMediator/PacketDistributorService.cs @@ -4,6 +4,7 @@ using System.Collections.Concurrent; using System.Collections.Immutable; using System.Diagnostics; using System.Reflection; +using System.Threading.Channels; using DotNext.Collections.Generic; using DotNext.Linq.Expressions; using DotNext.Metaprogramming; @@ -16,11 +17,10 @@ using static CodeGenerator; public class PacketDistributorService : Microsoft.Extensions.Hosting.IHostedService, IDisposable where T : Enum { - private readonly ConcurrentQueue> _concurrentQueue; + private readonly Channel> _channel; private readonly IServiceProvider _serviceProvider; private readonly Assembly[] _sourcesContainingPackets; - private readonly TaskFactory _taskFactory; private ImmutableDictionary> _deserializationMap; @@ -31,11 +31,10 @@ public class PacketDistributorService : Microsoft.Extensions.Hosting.IHost public PacketDistributorService(IServiceProvider serviceProvider, Assembly[] sourcesContainingPackets) { - _concurrentQueue = new ConcurrentQueue>(); + _channel = Channel.CreateUnbounded>(); _serviceProvider = serviceProvider; _sourcesContainingPackets = sourcesContainingPackets; _activitySource = new ActivitySource(nameof(PacketMediator)); - _taskFactory = new TaskFactory(); } private Dictionary RetrievePacketsDictionary() @@ -126,20 +125,24 @@ public class PacketDistributorService : Microsoft.Extensions.Hosting.IHost } _deserializationMap = tempDeserializationMap.ToImmutableDictionary(); + + _ = this.DequeueRawPacketAsync(); return Task.CompletedTask; } public async Task AddPacketAsync(byte[] packetData, T operationCode, S session) { - _concurrentQueue.Enqueue((packetData, operationCode, session)); - await DequeueRawPacketAsync(); + await _channel.Writer.WriteAsync((packetData, operationCode, session)); } private async Task DequeueRawPacketAsync() { - if (_concurrentQueue.TryDequeue(out var item)) + while (await _channel.Reader.WaitToReadAsync()) { - await InvokePacketHandlerAsync(item); + while (_channel.Reader.TryRead(out var item)) + { + await InvokePacketHandlerAsync(item); + } } }