using System.Collections.Concurrent; using System.Reflection; using MassTransit.Internals; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.VisualBasic.CompilerServices; using Newtonsoft.Json; using Server.PacketHandlers; using Server.Packets; namespace Server.Services; public class PacketDistributorService : IHostedService { private readonly ConcurrentQueue _concurrentQueue; private readonly ILogger _logger; private readonly Dictionary _packetsTypes; private readonly Dictionary _packetHandlers; private readonly IServiceProvider _serviceProvider; public PacketDistributorService(ILogger logger, IServiceProvider serviceProvider) { this._concurrentQueue = new ConcurrentQueue(); this._logger = logger; this._serviceProvider = serviceProvider; this._packetHandlers = new Dictionary(); this._packetsTypes = new Dictionary(); var executingAssembly = Assembly.GetExecutingAssembly(); this._packetsTypes = this.GetPacketsWithId(executingAssembly); this._packetHandlers = this.GetAllPacketHandlersWithId(executingAssembly); } private Dictionary GetPacketsWithId(Assembly executingAssembly) { var packetsWithId = executingAssembly.GetTypes().AsParallel() .Where(type => type.GetCustomAttribute() != null && type.HasInterface(typeof(IPacket)) && !type.IsInterface) .ToDictionary(packet => packet.GetCustomAttribute()!.Code); if (packetsWithId is not { Count: 0 }) { packetsWithId.AsParallel().ForAll(packet => { this._logger.LogTrace("Packet with ID: {PacketID} has been added as {PacketName}", packet.Key, packet.Value.FullName); }); return packetsWithId; } this._logger.LogCritical("No Packets have been found"); throw new IncompleteInitialization(); } private Dictionary GetAllPacketHandlersWithId(Assembly assembly) { var packetHandlersWithId = assembly.GetTypes().AsParallel().Where(t => t is { IsClass: true, IsAbstract: false } && t .GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IPacketHandler<>))).ToDictionary(type => type.GetInterfaces().First(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IPacketHandler<>)) .GetGenericArguments()[0].GetCustomAttribute().Code); if (packetHandlersWithId is not { Count: 0 }) { packetHandlersWithId.AsParallel().ForAll(packetHandler => { this._logger.LogTrace("PacketHandler with ID: {PacketID} has been added as {PacketName}", packetHandler.Key, packetHandler.Value.FullName); }); return packetHandlersWithId; } this._logger.LogCritical("No PacketHandlers have been found"); throw new IncompleteInitialization(); } public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; public void AddPacket(RawPacket rawPacket) { this._concurrentQueue.Enqueue(rawPacket); Task.Run(() => this.DequeueRawPacketAsync()); this._logger.LogInformation("Packet with ID: {MessageOperationCode} has been received", rawPacket.OperationCode); } private async Task DequeueRawPacketAsync() { if (this._concurrentQueue.TryDequeue(out var item)) { Task.Run(() => { this.InvokePacketHandler(item); }); } else { await Task.Delay(100); // Delay to prevent busy-waiting, can be adjusted based on needs } } private void InvokePacketHandler(RawPacket? item) { this._logger.LogTrace("[{TempId}] Packet with ID: {MessageOperationCode} is being dequeued", item.Session.Id, item.OperationCode); var packetType = this._packetsTypes[item.OperationCode]; var packet = (IPacket)Activator.CreateInstance(packetType)!; packet.Deserialize(item.MessageBody); var packetHandler = ActivatorUtilities.GetServiceOrCreateInstance(this._serviceProvider, this._packetHandlers[item.OperationCode]); packetHandler.GetType().GetMethod("HandleAsync") ?.Invoke(packetHandler, new object[] { packet, item.Session }); this._logger.LogDebug("Packet data {PacketData}", JsonConvert.SerializeObject(packet)); this._logger.LogTrace("[{TempId}] Packet with ID: {MessageOperationCode} has finished", item.Session.Id, item.OperationCode); } public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; }