diff --git a/Continuity.AuthServer/Consumers/PacketConsumer.cs b/Continuity.AuthServer/Consumers/PacketConsumer.cs index 0a17e02..a8288e2 100644 --- a/Continuity.AuthServer/Consumers/PacketConsumer.cs +++ b/Continuity.AuthServer/Consumers/PacketConsumer.cs @@ -4,6 +4,7 @@ using Continuity.AuthServer.Packets; using JetBrains.Annotations; using MassTransit; using NetCoreServer; +using OpenTelemetry.Trace; using Rai.PacketMediator; using Wonderking.Packets; @@ -14,14 +15,23 @@ public class PacketConsumer : IConsumer { private readonly PacketDistributorService _distributorService; - public PacketConsumer(PacketDistributorService distributorService) + private readonly TracerProvider _tracerProvider; + + public PacketConsumer(PacketDistributorService distributorService, TracerProvider tracerProvider) { _distributorService = distributorService; + _tracerProvider = tracerProvider; } public Task Consume(ConsumeContext context) { - return _distributorService.AddPacketAsync(context.Message.MessageBody, context.Message.OperationCode, - context.Message.Session); + var tracer = _tracerProvider.GetTracer("Rai.PacketMediator"); + + using var scope = tracer.StartActiveSpan("PacketHandler"); + scope.SetAttribute("PacketId", context.Message.OperationCode.ToString()); + scope.SetAttribute("SessionId", context.Message.Session.Id.ToString()); + scope.SetAttribute("PacketSize", context.Message.MessageBody.Length); + + return _distributorService.AddPacketAsync(context.Message.MessageBody, context.Message.OperationCode, context.Message.Session); } } diff --git a/Rai.PacketMediator/PacketDistributorService.cs b/Rai.PacketMediator/PacketDistributorService.cs index fb0500f..8a87d95 100644 --- a/Rai.PacketMediator/PacketDistributorService.cs +++ b/Rai.PacketMediator/PacketDistributorService.cs @@ -2,7 +2,6 @@ using System.Collections.Concurrent; using System.Collections.Immutable; -using System.Diagnostics; using System.Reflection; using System.Threading.Channels; using DotNext.Collections.Generic; @@ -15,7 +14,8 @@ namespace Rai.PacketMediator; using static CodeGenerator; -public class PacketDistributorService : Microsoft.Extensions.Hosting.IHostedService, IDisposable where TPacketIdEnum : Enum +public class PacketDistributorService : Microsoft.Extensions.Hosting.IHostedService + where TPacketIdEnum : Enum { private readonly Channel> _channel; @@ -26,35 +26,35 @@ public class PacketDistributorService : Microsoft.Exten Func> _deserializationMap; private ConcurrentDictionary?> _packetHandlersInstantiation; - private readonly ActivitySource _activitySource; public PacketDistributorService(IServiceProvider serviceProvider, Assembly[] sourcesContainingPackets) { - _channel = Channel.CreateUnbounded>(); + _channel = Channel.CreateUnbounded>(new UnboundedChannelOptions + { + AllowSynchronousContinuations = false, + SingleReader = false, + SingleWriter = false + }); _serviceProvider = serviceProvider; _sourcesContainingPackets = sourcesContainingPackets; - _activitySource = new ActivitySource(nameof(PacketMediator)); } private Dictionary RetrievePacketsDictionary() { - using var activity = this._activitySource.StartActivity(); var packetsWithId = this._sourcesContainingPackets.SelectMany(a => a.GetTypes() .Where(type => type is { IsInterface: false, IsAbstract: false } && type.GetInterfaces().Contains(typeof(IIncomingPacket))) - .Select(type => new { Type = type, Attribute = type.GetCustomAttribute>() }) + .Select(type => + new { Type = type, Attribute = type.GetCustomAttribute>() }) .Where(item => item.Attribute is not null) .ToDictionary(item => item.Attribute!.Code, item => item.Type)).ToDictionary(); - activity?.AddTag("AmountOfPackets", packetsWithId.Count); - return packetsWithId; } private Dictionary GetAllPacketHandlersWithId() { - using var activity = this._activitySource.StartActivity(); var packetHandlersWithId = this._sourcesContainingPackets.SelectMany(assembly => assembly.GetTypes() .Where(t => t is { IsClass: true, IsAbstract: false } && Array.Exists(t @@ -65,21 +65,16 @@ public class PacketDistributorService : Microsoft.Exten Type = type, PacketId = type .GetInterfaces().First(t1 => - t1 is { IsGenericType: true } && t1.GetGenericTypeDefinition() == typeof(IPacketHandler)) + t1 is { IsGenericType: true } && + t1.GetGenericTypeDefinition() == typeof(IPacketHandler)) .GetGenericArguments().First(t => t.GetCustomAttributes>().Any()) .GetCustomAttributes>().First().Code }) .ToDictionary( x => x.PacketId, x => x.Type )).ToDictionary(); - activity?.AddTag("AmountOfPacketHandlers", packetHandlersWithId.Count); - return packetHandlersWithId; - } - public void Dispose() - { - GC.SuppressFinalize(this); - _activitySource.Dispose(); + return packetHandlersWithId; } public Task StartAsync(CancellationToken cancellationToken) @@ -148,25 +143,16 @@ public class PacketDistributorService : Microsoft.Exten private async Task InvokePacketHandlerAsync((byte[], TPacketIdEnum, TSession) valueTuple) { - IIncomingPacket packet; var (packetData, operationCode, session) = valueTuple; if (!_deserializationMap.TryGetValue(operationCode, out var func)) { return; } - using (var packetParsingActivity = _activitySource.StartActivity("PacketParsing")) - { - packetParsingActivity?.SetTag("PacketId", operationCode); - packet = func(packetData); - } + var packet = func(packetData); - using (var packetHandlerActivity = _activitySource.StartActivity("PacketHandler")) - { - packetHandlerActivity?.SetTag("PacketId", operationCode); - // ! I don't see how it's possibly null here. - await _packetHandlersInstantiation[operationCode]?.TryHandleAsync(packet, session)!; - } + // ! I don't see how it's possibly null here. + await _packetHandlersInstantiation[operationCode]?.TryHandleAsync(packet, session)!; } public Task StopAsync(CancellationToken cancellationToken)