// Copyright (c) 2023 Timothy Schenk. Subject to the GNU AGPL Version 3 License. using System.Collections.Concurrent; using System.Collections.Immutable; using System.Diagnostics; using System.Reflection; using Continuity.AuthServer.LoggerMessages; using Continuity.AuthServer.PacketHandlers; using Continuity.AuthServer.Packets; using DotNext.Collections.Generic; using DotNext.Linq.Expressions; using DotNext.Metaprogramming; using MassTransit.Internals; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.VisualBasic.CompilerServices; using Newtonsoft.Json; using Wonderking.Packets; namespace Continuity.AuthServer.Services; using static CodeGenerator; using static ExpressionBuilder; public class PacketDistributorService : IHostedService, IDisposable { private readonly ConcurrentQueue _concurrentQueue; private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; private ImmutableDictionary> _deserializationMap; private ConcurrentDictionary _packetHandlersInstantiation; private readonly ActivitySource _activitySource; public PacketDistributorService(ILogger logger, IServiceProvider serviceProvider) { _concurrentQueue = new ConcurrentQueue(); _logger = logger; _serviceProvider = serviceProvider; _activitySource = new ActivitySource(nameof(Server)); } public Task StartAsync(CancellationToken cancellationToken) { var tempDeserializationMap = new Dictionary>(); var wonderkingAssembly = Assembly.GetAssembly(typeof(IPacket)); var packetsTypes = GetPacketsWithId(wonderkingAssembly); var packetHandlers = GetAllPacketHandlersWithId(Assembly.GetExecutingAssembly()); _packetHandlersInstantiation = new ConcurrentDictionary(); packetHandlers.ForEach(x => { var packetHandler = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, x.Value); _packetHandlersInstantiation.TryAdd(x.Key, packetHandler as IPacketHandler); }); foreach (var packetsType in packetsTypes) { var lambda = Lambda>(fun => { var argPacketData = fun[0]; var newPacket = packetsType.Value.New(); var packetVariable = DeclareVariable(packetsType.Value, "packet"); Assign(packetVariable, newPacket); Call(packetVariable, nameof(IPacket.Deserialize), argPacketData); Return(packetVariable); }).Compile(); _logger.PacketCreationFunctionCreated(packetsType.Key); tempDeserializationMap.Add(packetsType.Key, lambda); } _deserializationMap = tempDeserializationMap.ToImmutableDictionary(); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private Dictionary GetPacketsWithId(Assembly executingAssembly) { // ! : We are filtering if types that don't have an instance of the required Attribute var packetsWithId = executingAssembly.GetTypes().AsParallel() .Where(type => type.HasInterface(typeof(IPacket)) && type is { IsInterface: false, IsAbstract: false }) .Where(type => type.Namespace?.Contains("Incoming") ?? false) .Select(type => new { Type = type, Attribute = type.GetCustomAttribute() }) .Where(item => item.Attribute is not null) .ToDictionary(item => item.Attribute!.Code, item => item.Type); if (packetsWithId is not { Count: 0 }) { packetsWithId.AsParallel() .ForAll(packet => _logger.PacketWithIdAdded(packet.Key, packet.Value.FullName)); return packetsWithId; } _logger.NoPacketsFound(); throw new IncompleteInitialization(); } private Dictionary GetAllPacketHandlersWithId(Assembly assembly) { // ! : We are filtering if types that don't have an instance of the required Attribute var packetHandlersWithId = assembly.GetTypes().AsParallel() .Where(t => t is { IsClass: true, IsAbstract: false } && Array.Exists(t .GetInterfaces(), i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IPacketHandler<>))) .Select(type => new { Type = type, PacketId = type .GetInterfaces().First(t1 => t1 is { IsGenericType: true } && t1.GetGenericTypeDefinition() == typeof(IPacketHandler<>)) .GetGenericArguments()[0].GetCustomAttribute()?.Code }) .Where(x => x.PacketId is not null) .ToDictionary( x => x.PacketId!.Value, x => x.Type ); if (packetHandlersWithId is not { Count: 0 }) { packetHandlersWithId.AsParallel().ForAll(packetHandler => _logger.PacketHandlerWithIdAdded(packetHandler.Key, packetHandler.Value.FullName)); return packetHandlersWithId; } _logger.NoPacketHandlersFound(); throw new IncompleteInitialization(); } public void AddPacket(RawPacket rawPacket) { _concurrentQueue.Enqueue(rawPacket); DequeueRawPacket(); _logger.PacketReceived(rawPacket.OperationCode); } private void DequeueRawPacket() { if (_concurrentQueue.TryDequeue(out var item)) { ThreadPool.QueueUserWorkItem(InvokePacketHandler, item, preferLocal: false); } } private void InvokePacketHandler(RawPacket item) { IPacket packet; _logger.PacketDequeued(item.Session.Id, item.OperationCode); if (!_deserializationMap.TryGetValue(item.OperationCode, out var value)) { _logger.PacketTypeNotFound(item.OperationCode); return; } using (var packetParsingActivity = _activitySource.StartActivity("PacketParsing")) { packetParsingActivity?.SetTag("PacketId", item.OperationCode); packet = value(item.MessageBody); } using (var packetHandlerActivity = _activitySource.StartActivity("PacketHandler")) { packetHandlerActivity?.SetTag("PacketId", item.OperationCode); _ = _packetHandlersInstantiation[item.OperationCode].TryHandleAsync(packet, item.Session); } _logger.PacketData(JsonConvert.SerializeObject(packet)); _logger.PacketFinished(item.Session.Id, item.OperationCode); } public void Dispose() { GC.SuppressFinalize(this); _activitySource.Dispose(); } }