continuity/Rai.PacketMediator/PacketDistributorService.cs

173 lines
6.4 KiB
C#

// Copyright (c) 2023 Timothy Schenk. Subject to the GNU AGPL Version 3 License.
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Reflection;
using DotNext.Collections.Generic;
using DotNext.Linq.Expressions;
using DotNext.Metaprogramming;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.VisualBasic.CompilerServices;
namespace Rai.PacketMediator;
using static CodeGenerator;
public class PacketDistributorService<T, S> : Microsoft.Extensions.Hosting.IHostedService, IDisposable where T : Enum
{
private readonly ConcurrentQueue<ValueTuple<byte[], T, S>> _concurrentQueue;
private readonly IServiceProvider _serviceProvider;
private readonly Assembly[] _sourcesContainingPackets;
private readonly TaskFactory _taskFactory;
private ImmutableDictionary<T,
Func<byte[], IIncomingPacket>> _deserializationMap;
private ConcurrentDictionary<T, IPacketHandler<S>?> _packetHandlersInstantiation;
private readonly ActivitySource _activitySource;
public PacketDistributorService(IServiceProvider serviceProvider,
Assembly[] sourcesContainingPackets)
{
_concurrentQueue = new ConcurrentQueue<ValueTuple<byte[], T, S>>();
_serviceProvider = serviceProvider;
_sourcesContainingPackets = sourcesContainingPackets;
_activitySource = new ActivitySource(nameof(PacketMediator));
_taskFactory = new TaskFactory();
}
private Dictionary<T, Type> RetrievePacketsDictionary()
{
using var activity = this._activitySource.StartActivity();
var packetsWithId = this._sourcesContainingPackets.SelectMany(a => a.GetTypes()
.Where(type => type is { IsInterface: false, IsAbstract: false } &&
type.GetInterfaces().Contains(typeof(IIncomingPacket)))
.Select(type => new { Type = type, Attribute = type.GetCustomAttribute<PacketIdAttribute<T>>() })
.Where(item => item.Attribute is not null)
.ToDictionary(item => item.Attribute!.Code, item => item.Type)).ToDictionary();
activity?.AddTag("AmountOfPackets", packetsWithId.Count);
return packetsWithId;
}
private Dictionary<T, Type> GetAllPacketHandlersWithId()
{
using var activity = this._activitySource.StartActivity();
var packetHandlersWithId = this._sourcesContainingPackets.SelectMany(assembly => assembly.GetTypes()
.Where(t =>
t is { IsClass: true, IsAbstract: false } && Array.Exists(t
.GetInterfaces(), i =>
i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IPacketHandler<S>)))
.Select(type => new
{
Type = type,
PacketId = type
.GetInterfaces().First(t1 =>
t1 is { IsGenericType: true } && t1.GetGenericTypeDefinition() == typeof(IPacketHandler<S>))
.GetGenericArguments().First(t => t.GetCustomAttributes<PacketIdAttribute<T>>().Any())
.GetCustomAttributes<PacketIdAttribute<T>>().First().Code
})
.ToDictionary(
x => x.PacketId, x => x.Type
)).ToDictionary();
activity?.AddTag("AmountOfPacketHandlers", packetHandlersWithId.Count);
return packetHandlersWithId;
}
public void Dispose()
{
GC.SuppressFinalize(this);
_activitySource.Dispose();
}
public Task StartAsync(CancellationToken cancellationToken)
{
var packetDictionary = RetrievePacketsDictionary();
if (packetDictionary is { Count: 0 })
{
throw new IncompleteInitialization();
}
var packetHandlers = GetAllPacketHandlersWithId();
if (packetHandlers is { Count: 0 })
{
throw new IncompleteInitialization();
}
var tempDeserializationMap =
new Dictionary<T, Func<byte[], IIncomingPacket>>();
_packetHandlersInstantiation = new ConcurrentDictionary<T, IPacketHandler<S>?>();
packetHandlers.ForEach(x =>
{
var packetHandler =
ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
x.Value);
_packetHandlersInstantiation.TryAdd(x.Key, packetHandler as IPacketHandler<S>);
});
foreach (var packetsType in packetDictionary)
{
var lambda = Lambda<Func<byte[], IIncomingPacket>>(fun =>
{
var argPacketData = fun[0];
var newPacket = packetsType.Value.New();
var packetVariable = DeclareVariable(packetsType.Value, "packet");
Assign(packetVariable, newPacket);
Call(packetVariable, nameof(IIncomingPacket.Deserialize), argPacketData);
Return(packetVariable);
}).Compile();
tempDeserializationMap.Add(packetsType.Key, lambda);
}
_deserializationMap = tempDeserializationMap.ToImmutableDictionary();
return Task.CompletedTask;
}
public async Task AddPacketAsync(byte[] packetData, T operationCode, S session)
{
_concurrentQueue.Enqueue((packetData, operationCode, session));
await DequeueRawPacketAsync();
}
private async Task DequeueRawPacketAsync()
{
if (_concurrentQueue.TryDequeue(out var item))
{
await InvokePacketHandlerAsync(item);
}
}
private async Task InvokePacketHandlerAsync((byte[], T, S) valueTuple)
{
IIncomingPacket packet;
var (packetData, operationCode, session) = valueTuple;
if (!_deserializationMap.TryGetValue(operationCode, out var func))
{
return;
}
using (var packetParsingActivity = _activitySource.StartActivity("PacketParsing"))
{
packetParsingActivity?.SetTag("PacketId", operationCode);
packet = func(packetData);
}
using (var packetHandlerActivity = _activitySource.StartActivity("PacketHandler"))
{
packetHandlerActivity?.SetTag("PacketId", operationCode);
// ! I don't see how it's possibly null here.
await _packetHandlersInstantiation[operationCode]?.TryHandleAsync(packet, session)!;
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}