chore: cleanup opentelemetry

This commit is contained in:
Timothy Schenk 2024-02-05 11:43:58 +01:00
parent bae1f812cf
commit b9144a3ff0
Signed by: rainote
SSH key fingerprint: SHA256:pnkNSDwpAnaip00xaZlVFHKKsS7T8UtOomMzvs0yITE
2 changed files with 29 additions and 33 deletions

View file

@ -4,6 +4,7 @@ using Continuity.AuthServer.Packets;
using JetBrains.Annotations; using JetBrains.Annotations;
using MassTransit; using MassTransit;
using NetCoreServer; using NetCoreServer;
using OpenTelemetry.Trace;
using Rai.PacketMediator; using Rai.PacketMediator;
using Wonderking.Packets; using Wonderking.Packets;
@ -14,14 +15,23 @@ public class PacketConsumer : IConsumer<RawPacket>
{ {
private readonly PacketDistributorService<OperationCode, TcpSession> _distributorService; private readonly PacketDistributorService<OperationCode, TcpSession> _distributorService;
public PacketConsumer(PacketDistributorService<OperationCode, TcpSession> distributorService) private readonly TracerProvider _tracerProvider;
public PacketConsumer(PacketDistributorService<OperationCode, TcpSession> distributorService, TracerProvider tracerProvider)
{ {
_distributorService = distributorService; _distributorService = distributorService;
_tracerProvider = tracerProvider;
} }
public Task Consume(ConsumeContext<RawPacket> context) public Task Consume(ConsumeContext<RawPacket> context)
{ {
return _distributorService.AddPacketAsync(context.Message.MessageBody, context.Message.OperationCode, var tracer = _tracerProvider.GetTracer("Rai.PacketMediator");
context.Message.Session);
using var scope = tracer.StartActiveSpan("PacketHandler");
scope.SetAttribute("PacketId", context.Message.OperationCode.ToString());
scope.SetAttribute("SessionId", context.Message.Session.Id.ToString());
scope.SetAttribute("PacketSize", context.Message.MessageBody.Length);
return _distributorService.AddPacketAsync(context.Message.MessageBody, context.Message.OperationCode, context.Message.Session);
} }
} }

View file

@ -2,7 +2,6 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Immutable; using System.Collections.Immutable;
using System.Diagnostics;
using System.Reflection; using System.Reflection;
using System.Threading.Channels; using System.Threading.Channels;
using DotNext.Collections.Generic; using DotNext.Collections.Generic;
@ -15,7 +14,8 @@ namespace Rai.PacketMediator;
using static CodeGenerator; using static CodeGenerator;
public class PacketDistributorService<TPacketIdEnum, TSession> : Microsoft.Extensions.Hosting.IHostedService, IDisposable where TPacketIdEnum : Enum public class PacketDistributorService<TPacketIdEnum, TSession> : Microsoft.Extensions.Hosting.IHostedService
where TPacketIdEnum : Enum
{ {
private readonly Channel<ValueTuple<byte[], TPacketIdEnum, TSession>> _channel; private readonly Channel<ValueTuple<byte[], TPacketIdEnum, TSession>> _channel;
@ -26,35 +26,35 @@ public class PacketDistributorService<TPacketIdEnum, TSession> : Microsoft.Exten
Func<byte[], IIncomingPacket>> _deserializationMap; Func<byte[], IIncomingPacket>> _deserializationMap;
private ConcurrentDictionary<TPacketIdEnum, IPacketHandler<TSession>?> _packetHandlersInstantiation; private ConcurrentDictionary<TPacketIdEnum, IPacketHandler<TSession>?> _packetHandlersInstantiation;
private readonly ActivitySource _activitySource;
public PacketDistributorService(IServiceProvider serviceProvider, public PacketDistributorService(IServiceProvider serviceProvider,
Assembly[] sourcesContainingPackets) Assembly[] sourcesContainingPackets)
{ {
_channel = Channel.CreateUnbounded<ValueTuple<byte[], TPacketIdEnum, TSession>>(); _channel = Channel.CreateUnbounded<ValueTuple<byte[], TPacketIdEnum, TSession>>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleReader = false,
SingleWriter = false
});
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_sourcesContainingPackets = sourcesContainingPackets; _sourcesContainingPackets = sourcesContainingPackets;
_activitySource = new ActivitySource(nameof(PacketMediator));
} }
private Dictionary<TPacketIdEnum, Type> RetrievePacketsDictionary() private Dictionary<TPacketIdEnum, Type> RetrievePacketsDictionary()
{ {
using var activity = this._activitySource.StartActivity();
var packetsWithId = this._sourcesContainingPackets.SelectMany(a => a.GetTypes() var packetsWithId = this._sourcesContainingPackets.SelectMany(a => a.GetTypes()
.Where(type => type is { IsInterface: false, IsAbstract: false } && .Where(type => type is { IsInterface: false, IsAbstract: false } &&
type.GetInterfaces().Contains(typeof(IIncomingPacket))) type.GetInterfaces().Contains(typeof(IIncomingPacket)))
.Select(type => new { Type = type, Attribute = type.GetCustomAttribute<PacketIdAttribute<TPacketIdEnum>>() }) .Select(type =>
new { Type = type, Attribute = type.GetCustomAttribute<PacketIdAttribute<TPacketIdEnum>>() })
.Where(item => item.Attribute is not null) .Where(item => item.Attribute is not null)
.ToDictionary(item => item.Attribute!.Code, item => item.Type)).ToDictionary(); .ToDictionary(item => item.Attribute!.Code, item => item.Type)).ToDictionary();
activity?.AddTag("AmountOfPackets", packetsWithId.Count);
return packetsWithId; return packetsWithId;
} }
private Dictionary<TPacketIdEnum, Type> GetAllPacketHandlersWithId() private Dictionary<TPacketIdEnum, Type> GetAllPacketHandlersWithId()
{ {
using var activity = this._activitySource.StartActivity();
var packetHandlersWithId = this._sourcesContainingPackets.SelectMany(assembly => assembly.GetTypes() var packetHandlersWithId = this._sourcesContainingPackets.SelectMany(assembly => assembly.GetTypes()
.Where(t => .Where(t =>
t is { IsClass: true, IsAbstract: false } && Array.Exists(t t is { IsClass: true, IsAbstract: false } && Array.Exists(t
@ -65,21 +65,16 @@ public class PacketDistributorService<TPacketIdEnum, TSession> : Microsoft.Exten
Type = type, Type = type,
PacketId = type PacketId = type
.GetInterfaces().First(t1 => .GetInterfaces().First(t1 =>
t1 is { IsGenericType: true } && t1.GetGenericTypeDefinition() == typeof(IPacketHandler<TSession>)) t1 is { IsGenericType: true } &&
t1.GetGenericTypeDefinition() == typeof(IPacketHandler<TSession>))
.GetGenericArguments().First(t => t.GetCustomAttributes<PacketIdAttribute<TPacketIdEnum>>().Any()) .GetGenericArguments().First(t => t.GetCustomAttributes<PacketIdAttribute<TPacketIdEnum>>().Any())
.GetCustomAttributes<PacketIdAttribute<TPacketIdEnum>>().First().Code .GetCustomAttributes<PacketIdAttribute<TPacketIdEnum>>().First().Code
}) })
.ToDictionary( .ToDictionary(
x => x.PacketId, x => x.Type x => x.PacketId, x => x.Type
)).ToDictionary(); )).ToDictionary();
activity?.AddTag("AmountOfPacketHandlers", packetHandlersWithId.Count);
return packetHandlersWithId;
}
public void Dispose() return packetHandlersWithId;
{
GC.SuppressFinalize(this);
_activitySource.Dispose();
} }
public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken)
@ -148,25 +143,16 @@ public class PacketDistributorService<TPacketIdEnum, TSession> : Microsoft.Exten
private async Task InvokePacketHandlerAsync((byte[], TPacketIdEnum, TSession) valueTuple) private async Task InvokePacketHandlerAsync((byte[], TPacketIdEnum, TSession) valueTuple)
{ {
IIncomingPacket packet;
var (packetData, operationCode, session) = valueTuple; var (packetData, operationCode, session) = valueTuple;
if (!_deserializationMap.TryGetValue(operationCode, out var func)) if (!_deserializationMap.TryGetValue(operationCode, out var func))
{ {
return; return;
} }
using (var packetParsingActivity = _activitySource.StartActivity("PacketParsing")) var packet = func(packetData);
{
packetParsingActivity?.SetTag("PacketId", operationCode);
packet = func(packetData);
}
using (var packetHandlerActivity = _activitySource.StartActivity("PacketHandler")) // ! I don't see how it's possibly null here.
{ await _packetHandlersInstantiation[operationCode]?.TryHandleAsync(packet, session)!;
packetHandlerActivity?.SetTag("PacketId", operationCode);
// ! I don't see how it's possibly null here.
await _packetHandlersInstantiation[operationCode]?.TryHandleAsync(packet, session)!;
}
} }
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)