From a76a878f6b327860adb6a0c2795522545304e51b Mon Sep 17 00:00:00 2001 From: Elizabeth Schneider Date: Wed, 1 Jan 2020 09:08:18 -0700 Subject: [PATCH] Added pub sub message converter --- .editorconfig | 30 +++- Fritz.StreamTools/Dockerfile | 2 +- Fritz.StreamTools/Services/GitHubService.cs | 12 +- .../Services/TwitchPubSubService.cs | 6 +- Fritz.StreamTools/Startup.cs | 6 +- .../StartupServices/ConfigureServices.cs | 9 +- Fritz.StreamTools/appsettings.json | 1 + Fritz.Twitch/ConfigurationSettings.cs | 2 + Fritz.Twitch/PubSub/Proxy.cs | 153 +++++++++--------- .../PubSub/PubSubMessageJsonConverter.cs | 92 +++++++++++ 10 files changed, 226 insertions(+), 87 deletions(-) create mode 100644 Fritz.Twitch/PubSub/PubSubMessageJsonConverter.cs diff --git a/.editorconfig b/.editorconfig index e280fbae..4ba5c817 100644 --- a/.editorconfig +++ b/.editorconfig @@ -11,4 +11,32 @@ insert_final_newline = true csharp_style_var_for_built_in_types = true:error csharp_style_var_when_type_is_apparent = true:error csharp_style_var_elsewhere = true:error -csharp_prefer_braces = true +csharp_prefer_braces = true:warning +csharp_new_line_before_open_brace = none + + + +###################################################################### +# Start by defining the naming symbols (groups) for fields... +###################################################################### +# allowed by design guidelines, but naming is not specified by naming guidelines +dotnet_naming_symbols.private_fields.applicable_kinds = field +dotnet_naming_symbols.private_fields.applicable_accessibilities = private, internal, protected_internal + +###################################################################### +# Now define the styles that will be applied to those naming symbols... +###################################################################### +# prefix_with_underscore_pascal_case +dotnet_naming_style.prefix_with_underscore_pascal_case.capitalization = pascal_case +dotnet_naming_style.prefix_with_underscore_pascal_case.required_prefix = _ + +###################################################################### +# Naming Rules are matched in the order listed, and only the first match is applied +# Use this to match allowed field types, then match all other field types with the invalid style +# Explicitly mark the field type that is user-preference, to allow simple changing to camelCase +# or other settings... +###################################################################### +# Fields that are private can be formatted entirely by user preference +dotnet_naming_rule.private_fields_rule.symbols = private_fields +dotnet_naming_rule.private_fields_rule.style = prefix_with_underscore_pascal_case +dotnet_naming_rule.private_fields_rule.severity = warning diff --git a/Fritz.StreamTools/Dockerfile b/Fritz.StreamTools/Dockerfile index ffaf73d3..2c16c3d8 100644 --- a/Fritz.StreamTools/Dockerfile +++ b/Fritz.StreamTools/Dockerfile @@ -1,4 +1,4 @@ -FROM mcr.microsoft.com/dotnet/core/aspnet:2.2 AS base +FROM mcr.microsoft.com/dotnet/core/aspnet:3.1 AS base WORKDIR /app EXPOSE 80 diff --git a/Fritz.StreamTools/Services/GitHubService.cs b/Fritz.StreamTools/Services/GitHubService.cs index e0eb5fec..65734888 100644 --- a/Fritz.StreamTools/Services/GitHubService.cs +++ b/Fritz.StreamTools/Services/GitHubService.cs @@ -15,6 +15,8 @@ public class GitHubService : IHostedService { public static DateTime LastUpdate = DateTime.MinValue; + private CancellationToken _Token; + private Task _RunningTask; public GitHubService(IServiceProvider services, ILogger logger) { @@ -27,7 +29,9 @@ public GitHubService(IServiceProvider services, ILogger logger) public Task StartAsync(CancellationToken cancellationToken) { - return MonitorUpdates(cancellationToken); + _Token = cancellationToken; + _RunningTask = MonitorUpdates(); + return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) @@ -35,14 +39,14 @@ public Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - private async Task MonitorUpdates(CancellationToken cancellationToken) + private async Task MonitorUpdates() { var lastRequest = DateTime.Now; using (var scope = Services.CreateScope()) { var repo = scope.ServiceProvider.GetService(typeof(GitHubRepository)) as GitHubRepository; var mcGithubFaceClient = scope.ServiceProvider.GetService(typeof(GithubyMcGithubFaceClient)) as GithubyMcGithubFaceClient; - while (!cancellationToken.IsCancellationRequested) + while (!_Token.IsCancellationRequested) { if (repo != null) { @@ -56,7 +60,7 @@ private async Task MonitorUpdates(CancellationToken cancellationToken) mcGithubFaceClient?.UpdateGitHub("", "", 0); } } - await Task.Delay(500, cancellationToken); + await Task.Delay(500, _Token); } } } diff --git a/Fritz.StreamTools/Services/TwitchPubSubService.cs b/Fritz.StreamTools/Services/TwitchPubSubService.cs index 3d4c16f8..50009a94 100644 --- a/Fritz.StreamTools/Services/TwitchPubSubService.cs +++ b/Fritz.StreamTools/Services/TwitchPubSubService.cs @@ -19,6 +19,8 @@ public class TwitchPubSubService : IHostedService { private IServiceProvider _ServiceProvider; + private CancellationToken _Token; + private Task _BackgroundTask; private readonly Twitch.PubSub.Proxy _Proxy; private readonly ConfigurationSettings _Configuration; @@ -32,7 +34,9 @@ public TwitchPubSubService(IServiceProvider serviceProvider, Twitch.PubSub.Proxy public Task StartAsync(CancellationToken cancellationToken) { _Proxy.OnChannelPointsRedeemed += _Proxy_OnChannelPointsRedeemed; - return _Proxy.StartAsync(new TwitchTopic[] { TwitchTopic.ChannelPoints(_Configuration.UserId) }, cancellationToken); + _Token = cancellationToken; + _BackgroundTask = _Proxy.StartAsync(new TwitchTopic[] { TwitchTopic.ChannelPoints(_Configuration.UserId) }, _Token); + return Task.CompletedTask; } private void _Proxy_OnChannelPointsRedeemed(object sender, ChannelRedemption e) diff --git a/Fritz.StreamTools/Startup.cs b/Fritz.StreamTools/Startup.cs index 535853ec..ea2a8bcd 100644 --- a/Fritz.StreamTools/Startup.cs +++ b/Fritz.StreamTools/Startup.cs @@ -49,10 +49,12 @@ public void Configure(IApplicationBuilder app, IHostEnvironment env, IConfigurat } app.UseHsts(); - app.UseHttpsRedirection(); + //app.UseHttpsRedirection(); app.UseStaticFiles(); + app.UseRouting(); + app.UseEndpoints(endpoints => { @@ -60,6 +62,8 @@ public void Configure(IApplicationBuilder app, IHostEnvironment env, IConfigurat endpoints.MapHub("/github"); endpoints.MapHub("/attentionhub"); + endpoints.MapRazorPages(); + endpoints.MapDefaultControllerRoute(); }); diff --git a/Fritz.StreamTools/StartupServices/ConfigureServices.cs b/Fritz.StreamTools/StartupServices/ConfigureServices.cs index 108bb5d5..cf63f5e5 100644 --- a/Fritz.StreamTools/StartupServices/ConfigureServices.cs +++ b/Fritz.StreamTools/StartupServices/ConfigureServices.cs @@ -49,7 +49,7 @@ public static void Execute(IServiceCollection services, IConfiguration configura // Add the SentimentSink //services.AddSingleton(); - services.AddSingleton(); + services.AddHostedService(); services.AddSingleton(new GitHubClient(new ProductHeaderValue("Fritz.StreamTools"))); FritzBot.RegisterCommands(services); @@ -119,6 +119,7 @@ private static void AddStreamingServices(this IServiceCollection services, IConf c => !bool.TryParse(c["StreamServices:Fake:Enabled"], out var enabled) || !enabled); // Test to disable services.AddSingleton(); + } /// @@ -133,7 +134,7 @@ private static void AddStreamService(this IServiceCollection ser IConfiguration configuration, Func factory, Func isDisabled) - where TStreamService : class, IStreamService + where TStreamService : class, IStreamService, IHostedService { // Don't configure this service if it is disabled @@ -174,6 +175,8 @@ private static void AddAspNetFeatures(this IServiceCollection services) }).AddMessagePackProtocol(); + services.AddRazorPages(); + services.AddMvc() .SetCompatibilityVersion(CompatibilityVersion.Version_3_0); @@ -182,7 +185,9 @@ private static void AddAspNetFeatures(this IServiceCollection services) private static void RegisterTwitchPubSub(this IServiceCollection services) { services.AddSingleton(); + services.AddHostedService(); + //var provider = services.BuildServiceProvider(); //var pubSub = new TwitchPubSubService( diff --git a/Fritz.StreamTools/appsettings.json b/Fritz.StreamTools/appsettings.json index 4c7c8d05..71fb9286 100644 --- a/Fritz.StreamTools/appsettings.json +++ b/Fritz.StreamTools/appsettings.json @@ -17,6 +17,7 @@ "ClientId": "", "UserId": "", "ChatToken": "", + "PubSubAuthToken": "", "ChatBotName": "FritzBot_" }, "Mixer": { diff --git a/Fritz.Twitch/ConfigurationSettings.cs b/Fritz.Twitch/ConfigurationSettings.cs index d8656241..a4b4ebc9 100644 --- a/Fritz.Twitch/ConfigurationSettings.cs +++ b/Fritz.Twitch/ConfigurationSettings.cs @@ -17,6 +17,8 @@ public class ConfigurationSettings public virtual string OAuthToken { get; set; } + public virtual string PubSubAuthToken { get; set; } + [Obsolete] public string Channel { get => ChannelName; set => ChannelName = value; } diff --git a/Fritz.Twitch/PubSub/Proxy.cs b/Fritz.Twitch/PubSub/Proxy.cs index 4ff94fe0..b1af7f06 100644 --- a/Fritz.Twitch/PubSub/Proxy.cs +++ b/Fritz.Twitch/PubSub/Proxy.cs @@ -1,7 +1,6 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; -using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; using System.Linq; @@ -11,22 +10,25 @@ using System.Threading.Tasks; using System.Timers; -namespace Fritz.Twitch.PubSub -{ +namespace Fritz.Twitch.PubSub { /// /// Manage interactions with the Twitch pubsub API /// - public class Proxy : IDisposable - { + public class Proxy : IDisposable { private ClientWebSocket _Socket; private System.Timers.Timer _PingTimer; private System.Timers.Timer _PongTimer; + private System.Timers.Timer _ReconnectTimer = new System.Timers.Timer(); private ConfigurationSettings _Configuration; private ILogger _Logger; private static bool _Reconnect; + private static readonly TimeSpan[] _ReconnectTimeouts = new TimeSpan[] { + TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30), TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(5) + }; + public Proxy(IOptions settings, ILoggerFactory loggerFactory) { @@ -37,10 +39,10 @@ public Proxy(IOptions settings, ILoggerFactory loggerFact } - public async Task StartAsync(IEnumerable topics, CancellationToken token) - { + public async Task StartAsync(IEnumerable topics, CancellationToken token) { _Topics = topics; + _Reconnect = false; // Start a timer to manage the connection over the websocket _PingTimer = new System.Timers.Timer(TimeSpan.FromSeconds(30).TotalMilliseconds); @@ -49,8 +51,7 @@ public async Task StartAsync(IEnumerable topics, CancellationToken await StartListening(topics); - while (!token.IsCancellationRequested) - { + while (!token.IsCancellationRequested) { var buffer = new byte[1024]; var messageBuffer = new ArraySegment(buffer); @@ -58,8 +59,7 @@ public async Task StartAsync(IEnumerable topics, CancellationToken var result = await _Socket.ReceiveAsync(messageBuffer, token); completeMessage.Append(Encoding.UTF8.GetString(messageBuffer)); - while (!result.EndOfMessage) - { + while (!result.EndOfMessage) { buffer = new byte[1024]; messageBuffer = new ArraySegment(buffer); result = await _Socket.ReceiveAsync(messageBuffer, token); @@ -71,62 +71,81 @@ public async Task StartAsync(IEnumerable topics, CancellationToken break; } - try - { + try { HandleMessage(completeMessage.ToString()); - } catch (UnhandledPubSubMessageException) { + } + catch (UnhandledPubSubMessageException) { // do nothing - } catch (Exception e) { + } + catch (Exception e) { _Logger.LogError(e, "Error while parsing message from Twitch: " + completeMessage.ToString()); _Logger.LogError("Reconnecting..."); _Reconnect = true; } if (_Reconnect) { + if (!_ReconnectTimeouts.Any(t => t.TotalMilliseconds == _ReconnectTimer.Interval)) { + _ReconnectTimer.Interval = _ReconnectTimeouts[0].TotalMilliseconds; + _Logger.LogError($"Unable to connect to Twitch PubSub. Reconnecting in {_ReconnectTimeouts[0].TotalSeconds} seconds"); + } + else if (_ReconnectTimeouts.Last().TotalMilliseconds == _ReconnectTimer.Interval) { + _Reconnect = false; + _Logger.LogError("Unable to connect to Twitch PubSub. Ceasing attempting to connect"); + } else { + + for (var i=0; i<_ReconnectTimeouts.Length; i++) { + if (_ReconnectTimeouts[i].TotalMilliseconds == _ReconnectTimer.Interval) { + _Logger.LogError($"Unable to connect to Twitch PubSub. Reconnecting in {_ReconnectTimeouts[i + 1].TotalSeconds} seconds"); + _ReconnectTimer.Interval = _ReconnectTimeouts[i + 1].TotalMilliseconds; + break; + } + } + + + } + + await Task.Delay((int)_ReconnectTimer.Interval); break; + } } - if (_Reconnect) _ = Task.Run(() => StartAsync(topics, token)); + // if (_Reconnect) _ = Task.Run(() => StartAsync(topics, token)); } - private delegate bool OnReceivedMessage(string message); - private List _Strategies = new List(); + private delegate bool OnReceivedMessage(IPubSubReceiveMessage message); + private readonly List _Strategies = new List(); - private void HandleMessage(string receivedMessage) - { + private void HandleMessage(string receivedMessage) { + var message = JsonConvert.DeserializeObject(receivedMessage); - var jDoc = JObject.Parse(receivedMessage); - var messageType = jDoc["type"].Value(); - if (messageType == "RESPONSE" && jDoc["error"].Value() != "") - { - throw new Exception("Unable to connect"); - } else if (messageType == "RESPONSE") { + if (message is ResponseReceiveMessage response) { + if (!string.IsNullOrWhiteSpace(response.Error)) { + throw new Exception($"Unable to connect: {response.Error}"); + } return; } - foreach (var strategy in _Strategies) - { - if (strategy(receivedMessage)) return; + foreach (var strategy in _Strategies) { + if (strategy(message)) { + return; + } } throw new UnhandledPubSubMessageException(); - } - private async Task StartListening(IEnumerable topics) - { - + private async Task StartListening(IEnumerable topics) { _Socket = new ClientWebSocket(); var message = new PubSubListen { data = new PubSubListen.PubSubListenData { - auth_token = _Configuration.OAuthToken, + auth_token = _Configuration.PubSubAuthToken, topics = topics.Select(t => t.TopicString).ToArray() } }; @@ -136,8 +155,7 @@ await _Socket.ConnectAsync(new Uri("wss://pubsub-edge.twitch.tv:443"), Cancellat } - private void _PingTimer_Elapsed(object sender, ElapsedEventArgs e) - { + private void _PingTimer_Elapsed(object sender, ElapsedEventArgs e) { var message = @"{ ""type"": ""PING"" }"; SendMessageOnSocket(message).GetAwaiter().GetResult(); _PongTimer = new System.Timers.Timer(TimeSpan.FromSeconds(10).TotalMilliseconds); @@ -145,22 +163,22 @@ private void _PingTimer_Elapsed(object sender, ElapsedEventArgs e) _PongTimer.Start(); _PingAcknowledged = false; - // TODO: handle the lack of returned PONG message + // TODO: handle the lack of returned PONG message } - private void _PongTimer_Elapsed(object sender, ElapsedEventArgs e) - { + private void _PongTimer_Elapsed(object sender, ElapsedEventArgs e) { if (!_PingAcknowledged) { _Reconnect = true; _PongTimer.Dispose(); } } - private Task SendMessageOnSocket(string message) - { + private Task SendMessageOnSocket(string message) { - if (_Socket.State != WebSocketState.Open) return Task.CompletedTask; + if (_Socket.State != WebSocketState.Open) { + return Task.CompletedTask; + } var byteArray = Encoding.ASCII.GetBytes(message); return _Socket.SendAsync(byteArray, WebSocketMessageType.Text, true, CancellationToken.None); @@ -183,9 +201,9 @@ private void InitializeMethodStrategies() { } - private bool HandlePongMessage(string message) { + private bool HandlePongMessage(IPubSubReceiveMessage message) { - if (message.Contains(@"""PONG""")) { + if (message is PongReceiveMessage) { _PingAcknowledged = true; _PongTimer.Stop(); _PongTimer.Dispose(); @@ -197,9 +215,9 @@ private bool HandlePongMessage(string message) { } - private bool HandleReconnectMessage(string message) { + private bool HandleReconnectMessage(IPubSubReceiveMessage message) { - if (message.Contains(@"""RECONNECT""")) { + if (message is ReconnectReceiveMessage) { _Reconnect = true; @@ -210,26 +228,12 @@ private bool HandleReconnectMessage(string message) { } - private bool HandleChannelPointsMessage(string message) { + private bool HandleChannelPointsMessage(IPubSubReceiveMessage message) { - var jDoc = JObject.Parse(message); - - if (jDoc["type"].Value() == "MESSAGE" && jDoc["data"]["topic"].Value().StartsWith("channel-points-channel-v1") ) { - - var innerMessage = jDoc["data"]["message"].Value(); - - PubSubRedemptionMessage messageObj = null; - try - { - messageObj = JsonConvert.DeserializeObject(innerMessage); - } catch (Exception e) { - _Logger.LogError(e, "Error while deserializing the message"); - _Logger.LogInformation("Message contents: " + innerMessage); - } - _Logger.LogWarning($"Channel Points redeemed: {innerMessage}"); - OnChannelPointsRedeemed?.Invoke(null, messageObj?.data); + if (message is ChannelPointsReceiveMessage channelPointsMessage) { + _Logger.LogWarning($"Channel Points redeemed: {channelPointsMessage.Data.Message}"); + OnChannelPointsRedeemed?.Invoke(null, channelPointsMessage?.Data?.Message); return true; - } return false; @@ -239,35 +243,30 @@ private bool HandleChannelPointsMessage(string message) { #endregion #region IDisposable Support - private bool disposedValue = false; // To detect redundant calls + private bool _DisposedValue = false; // To detect redundant calls private bool _PingAcknowledged; private IEnumerable _Topics; - protected virtual void Dispose(bool disposing) - { - if (!disposedValue) - { - if (disposing) - { + protected virtual void Dispose(bool disposing) { + if (!_DisposedValue) { + if (disposing) { _PingTimer.Dispose(); - _PongTimer.Dispose(); + _PongTimer?.Dispose(); } _Socket.Dispose(); - disposedValue = true; + _DisposedValue = true; } } - ~Proxy() - { + ~Proxy() { // Do not change this code. Put cleanup code in Dispose(bool disposing) above. Dispose(false); } // This code added to correctly implement the disposable pattern. - public void Dispose() - { + public void Dispose() { // Do not change this code. Put cleanup code in Dispose(bool disposing) above. Dispose(true); // TODO: uncomment the following line if the finalizer is overridden above. diff --git a/Fritz.Twitch/PubSub/PubSubMessageJsonConverter.cs b/Fritz.Twitch/PubSub/PubSubMessageJsonConverter.cs new file mode 100644 index 00000000..3b09b192 --- /dev/null +++ b/Fritz.Twitch/PubSub/PubSubMessageJsonConverter.cs @@ -0,0 +1,92 @@ +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Fritz.Twitch.PubSub { + public class PubSubMessageJsonConverter : JsonConverter { + public override bool CanConvert(Type objectType) { + return objectType.IsGenericType && objectType.GetGenericTypeDefinition() == typeof(PubSubReceiveMessage<>); + } + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) { + var jObject = JObject.Load(reader); + var type = jObject["type"].Value().ToLower(); + + if (type == "message") { + var topicPrefix = jObject["data"]["topic"].Value().TopicToTopicPrefix(); + switch (topicPrefix) { + case "channel-points-channel-v1": + return jObject.ToObject(); + default: + return null; + } + } + else if (type == "response") { + return jObject.ToObject(); + } + else if (type == "pong") { + return jObject.ToObject(); + } + else if (type == "reconnect") { + return jObject.ToObject(); + } + + return null; + } + + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) { + throw new NotImplementedException(); + } + } + + + public class PubSubReceiveMessage : IPubSubReceiveMessage { + public string Type { get; private set; } + + public MessageData Data { get; private set; } + + public string Error { get; private set; } + + public string TopicPrefix { + get { + return Data?.Topic?.TopicToTopicPrefix() ?? string.Empty; + } + } + } + + [JsonConverter(typeof(PubSubMessageJsonConverter))] + public interface IPubSubReceiveMessage { + string Type { get; } + string Error { get; } + string TopicPrefix { get; } + } + + public class MessageData { + public string Topic { get; set; } + public MessageType Message { get; set; } + } + + public class ChannelPointsReceiveMessage : PubSubReceiveMessage { } + public class ResponseReceiveMessage : PubSubReceiveMessage { } + + public class PongReceiveMessage : PubSubReceiveMessage { } + public class ReconnectReceiveMessage : PubSubReceiveMessage { } + + + + public static class PubSubUtils { + public static string TopicToTopicPrefix(this string fullTopic) { + var index = fullTopic.IndexOf('.') - 1; + if (index <= 0) { + index = fullTopic.Length; + } + + var topicPrefix = fullTopic.Substring(0, index); + return topicPrefix; + } + } + + +}