using System.Collections.Concurrent; using System.Reflection; using System.Text; using MassTransit.Internals; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.VisualBasic.CompilerServices; using Newtonsoft.Json; using FieldAccessException = System.FieldAccessException; namespace Server; public class PacketDistributorService : IHostedService { private readonly ConcurrentQueue _concurrentQueue; private readonly ILogger _logger; private readonly Dictionary _packetsTypes; private readonly Dictionary _packetHandlers; private readonly IServiceProvider _serviceProvider; public PacketDistributorService(ILogger logger, IServiceProvider serviceProvider) { _concurrentQueue = new ConcurrentQueue(); _logger = logger; _serviceProvider = serviceProvider; _packetHandlers = new Dictionary(); _packetsTypes = new Dictionary(); var packetsWithId = Assembly.GetEntryAssembly()?.GetTypes().AsParallel() .Where(type => type.GetCustomAttribute() != null && type.HasInterface(typeof(IPacket)) && !type.IsInterface) //.Select(Activator.CreateInstance).Cast() .ToDictionary(packet => packet.GetCustomAttribute()!.Code); if (packetsWithId == null || packetsWithId.Count == 0) { _logger.LogCritical("No Packets have been found"); throw new IncompleteInitialization(); } packetsWithId.AsParallel().ForAll(packet => { _logger.LogTrace("Packet with ID: {PacketID} has been added as {PacketName}", packet.Key, packet.Value.FullName); }); _packetsTypes = packetsWithId; var packetHandlersWithId = Assembly.GetEntryAssembly()?.GetTypes().AsParallel().Where(t => t is { IsClass: true, IsAbstract: false } && t .GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IPacketHandler<>))).ToDictionary(type => type.GetInterfaces().First(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(IPacketHandler<>)) .GetGenericArguments()[0].GetCustomAttribute().Code); if (packetHandlersWithId == null || packetHandlersWithId.Count == 0) { _logger.LogCritical("No PacketHandlers have been found"); throw new IncompleteInitialization(); } packetHandlersWithId.AsParallel().ForAll(packetHandler => { _logger.LogTrace("Packet with ID: {PacketID} has been added as {PacketName}", packetHandler.Key, packetHandler.Value.FullName); }); _packetHandlers = packetHandlersWithId; } public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } public void AddPacket(RawPacket rawPacket) { _logger.LogInformation("Packet with ID: {MessageOperationCode} has been received", rawPacket.OperationCode); _concurrentQueue.Enqueue(rawPacket); Parallel.Invoke(DequeueAndProcessAsync); } private async void DequeueAndProcessAsync() { if (_concurrentQueue.TryDequeue(out var item)) { Parallel.Invoke(() => { _logger.LogTrace("[{TempId}] Packet with ID: {MessageOperationCode} is being dequeued", item.Session.Id, item.OperationCode); var packetType = _packetsTypes[item.OperationCode]; var packet = Activator.CreateInstance(packetType); packetType.GetFields().AsParallel().ForAll(field => { var typeOfField = field.FieldType; var fieldOffsetAttribute = field.GetCustomAttribute(); if (fieldOffsetAttribute == null) throw new FieldAccessException(); object value; _logger.LogDebug("Type of field of {fieldName}: {Name}", field.Name, typeOfField.Name); value = typeOfField.Name switch { nameof(String) => Encoding.ASCII.GetString(item.MessageBody, fieldOffsetAttribute.Offset, fieldOffsetAttribute.Size), "Byte[]" => new ArraySegment(item.MessageBody, fieldOffsetAttribute.Offset, fieldOffsetAttribute.Size).ToArray(), nameof(Boolean) => BitConverter.ToBoolean(item.MessageBody, fieldOffsetAttribute.Offset), nameof(Byte) => item.MessageBody[fieldOffsetAttribute.Offset], nameof(Int16) => BitConverter.ToInt16(item.MessageBody, fieldOffsetAttribute.Offset), nameof(UInt16) => BitConverter.ToUInt16(item.MessageBody, fieldOffsetAttribute.Offset), nameof(Int32) => BitConverter.ToInt32(item.MessageBody, fieldOffsetAttribute.Offset), nameof(UInt32) => BitConverter.ToUInt32(item.MessageBody, fieldOffsetAttribute.Offset), nameof(Double) => BitConverter.ToDouble(item.MessageBody, fieldOffsetAttribute.Offset), nameof(Single) => BitConverter.ToSingle(item.MessageBody, fieldOffsetAttribute.Offset), _ => throw new InvalidOperationException() }; field.SetValue(packet, value); }); var packetHandler = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _packetHandlers[item.OperationCode]); packetHandler.GetType().GetMethod("Handle")?.Invoke(packetHandler, new[] { packet }); _logger.LogDebug("Packet data {PacketData}", JsonConvert.SerializeObject(packet)); _logger.LogTrace("[{TempId}] Packet with ID: {MessageOperationCode} has finished", item.Session.Id, item.OperationCode); }); } else { await Task.Delay(100); // Delay to prevent busy-waiting, can be adjusted based on needs } } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } }