continuity/Server/PacketDistributorService.cs
2023-08-09 16:23:41 +02:00

74 lines
No EOL
2.8 KiB
C#

using System.Collections.Concurrent;
using System.Reflection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Server;
public class PacketDistributorService : IHostedService
{
private readonly ConcurrentQueue<Packet> _concurrentQueue;
private readonly ILogger<PacketDistributorService> _logger;
private readonly CancellationTokenSource _cancellationTokenSource = new();
private readonly Dictionary<OperationCode, IPacketHandler> _actions;
public PacketDistributorService(ILogger<PacketDistributorService> logger)
{
_concurrentQueue = new ConcurrentQueue<Packet>();
_logger = logger;
_actions = new Dictionary<OperationCode, IPacketHandler>();
var packetHandlers = Assembly.GetExecutingAssembly().GetTypes().Where(t =>
t.GetCustomAttributes().Any(attribute => attribute.GetType() == typeof(PacketHandler)) &&
t.GetInterfaces().Contains(typeof(IPacketHandler)));
foreach (var type in packetHandlers)
{
PacketHandler packetHandler = type.GetCustomAttribute<PacketHandler>()!;
_actions.Add(packetHandler.Code,
type.GetConstructor( Type.EmptyTypes)
.Invoke(null) as IPacketHandler);
_logger.LogInformation(
"Handler for ID \"{PacketHandlerCode}\" has been added with name \"{PacketHandlerName}\"",
packetHandler.Code, type.Name);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task AddPacket(Packet packet)
{
_logger.LogInformation("Packet with ID: {MessageOperationCode} has been received",
packet.OperationCode);
_concurrentQueue.Enqueue(packet);
return Task.Run(DequeueAndProcessAsync);
}
private async Task DequeueAndProcessAsync()
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
if (_concurrentQueue.TryDequeue(out var item))
{
Parallel.Invoke(() =>
{
var tempId = Guid.NewGuid();
_logger.LogInformation("[{TempId}] Packet with ID: {MessageOperationCode} is being dequeued",tempId, item.OperationCode);
var packetHandler = _actions[item.OperationCode];
packetHandler.Process(item);
_logger.LogInformation("[{TempId}] Packet with ID: {MessageOperationCode} has finished",tempId, item.OperationCode);
});
}
else
{
await Task.Delay(100); // Delay to prevent busy-waiting, can be adjusted based on needs
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}