Skip to content

[WIP] Added pub sub message converter #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
@@ -11,4 +11,32 @@ insert_final_newline = true
csharp_style_var_for_built_in_types = true:error
csharp_style_var_when_type_is_apparent = true:error
csharp_style_var_elsewhere = true:error
csharp_prefer_braces = true
csharp_prefer_braces = true:warning
csharp_new_line_before_open_brace = none



######################################################################
# Start by defining the naming symbols (groups) for fields...
######################################################################
# allowed by design guidelines, but naming is not specified by naming guidelines
dotnet_naming_symbols.private_fields.applicable_kinds = field
dotnet_naming_symbols.private_fields.applicable_accessibilities = private, internal, protected_internal

######################################################################
# Now define the styles that will be applied to those naming symbols...
######################################################################
# prefix_with_underscore_pascal_case
dotnet_naming_style.prefix_with_underscore_pascal_case.capitalization = pascal_case
dotnet_naming_style.prefix_with_underscore_pascal_case.required_prefix = _

######################################################################
# Naming Rules are matched in the order listed, and only the first match is applied
# Use this to match allowed field types, then match all other field types with the invalid style
# Explicitly mark the field type that is user-preference, to allow simple changing to camelCase
# or other settings...
######################################################################
# Fields that are private can be formatted entirely by user preference
dotnet_naming_rule.private_fields_rule.symbols = private_fields
dotnet_naming_rule.private_fields_rule.style = prefix_with_underscore_pascal_case
dotnet_naming_rule.private_fields_rule.severity = warning
2 changes: 1 addition & 1 deletion Fritz.StreamTools/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM mcr.microsoft.com/dotnet/core/aspnet:2.2 AS base
FROM mcr.microsoft.com/dotnet/core/aspnet:3.1 AS base
WORKDIR /app
EXPOSE 80

12 changes: 8 additions & 4 deletions Fritz.StreamTools/Services/GitHubService.cs
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@ public class GitHubService : IHostedService
{

public static DateTime LastUpdate = DateTime.MinValue;
private CancellationToken _Token;
private Task _RunningTask;

public GitHubService(IServiceProvider services, ILogger<GitHubService> logger)
{
@@ -27,22 +29,24 @@ public GitHubService(IServiceProvider services, ILogger<GitHubService> logger)

public Task StartAsync(CancellationToken cancellationToken)
{
return MonitorUpdates(cancellationToken);
_Token = cancellationToken;
_RunningTask = MonitorUpdates();
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

private async Task MonitorUpdates(CancellationToken cancellationToken)
private async Task MonitorUpdates()
{
var lastRequest = DateTime.Now;
using (var scope = Services.CreateScope())
{
var repo = scope.ServiceProvider.GetService(typeof(GitHubRepository)) as GitHubRepository;
var mcGithubFaceClient = scope.ServiceProvider.GetService(typeof(GithubyMcGithubFaceClient)) as GithubyMcGithubFaceClient;
while (!cancellationToken.IsCancellationRequested)
while (!_Token.IsCancellationRequested)
{
if (repo != null)
{
@@ -56,7 +60,7 @@ private async Task MonitorUpdates(CancellationToken cancellationToken)
mcGithubFaceClient?.UpdateGitHub("", "", 0);
}
}
await Task.Delay(500, cancellationToken);
await Task.Delay(500, _Token);
}
}
}
6 changes: 5 additions & 1 deletion Fritz.StreamTools/Services/TwitchPubSubService.cs
Original file line number Diff line number Diff line change
@@ -19,6 +19,8 @@ public class TwitchPubSubService : IHostedService
{

private IServiceProvider _ServiceProvider;
private CancellationToken _Token;
private Task _BackgroundTask;
private readonly Twitch.PubSub.Proxy _Proxy;
private readonly ConfigurationSettings _Configuration;

@@ -32,7 +34,9 @@ public TwitchPubSubService(IServiceProvider serviceProvider, Twitch.PubSub.Proxy
public Task StartAsync(CancellationToken cancellationToken)
{
_Proxy.OnChannelPointsRedeemed += _Proxy_OnChannelPointsRedeemed;
return _Proxy.StartAsync(new TwitchTopic[] { TwitchTopic.ChannelPoints(_Configuration.UserId) }, cancellationToken);
_Token = cancellationToken;
_BackgroundTask = _Proxy.StartAsync(new TwitchTopic[] { TwitchTopic.ChannelPoints(_Configuration.UserId) }, _Token);
return Task.CompletedTask;
}

private void _Proxy_OnChannelPointsRedeemed(object sender, ChannelRedemption e)
6 changes: 5 additions & 1 deletion Fritz.StreamTools/Startup.cs
Original file line number Diff line number Diff line change
@@ -49,17 +49,21 @@ public void Configure(IApplicationBuilder app, IHostEnvironment env, IConfigurat
}

app.UseHsts();
app.UseHttpsRedirection();
//app.UseHttpsRedirection();

app.UseStaticFiles();

app.UseRouting();

app.UseEndpoints(endpoints =>
{

endpoints.MapHub<FollowerHub>("/followerstream");
endpoints.MapHub<GithubyMcGithubFace>("/github");
endpoints.MapHub<AttentionHub>("/attentionhub");

endpoints.MapRazorPages();

endpoints.MapDefaultControllerRoute();

});
9 changes: 7 additions & 2 deletions Fritz.StreamTools/StartupServices/ConfigureServices.cs
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ public static void Execute(IServiceCollection services, IConfiguration configura
// Add the SentimentSink
//services.AddSingleton<Fritz.Chatbot.Commands.SentimentSink>();

services.AddSingleton<IHostedService, FritzBot>();
services.AddHostedService<FritzBot>();

services.AddSingleton(new GitHubClient(new ProductHeaderValue("Fritz.StreamTools")));
FritzBot.RegisterCommands(services);
@@ -119,6 +119,7 @@ private static void AddStreamingServices(this IServiceCollection services, IConf
c => !bool.TryParse(c["StreamServices:Fake:Enabled"], out var enabled) || !enabled); // Test to disable

services.AddSingleton<StreamService>();

}

/// <summary>
@@ -133,7 +134,7 @@ private static void AddStreamService<TStreamService>(this IServiceCollection ser
IConfiguration configuration,
Func<IConfiguration, ILoggerFactory, TStreamService> factory,
Func<IConfiguration, bool> isDisabled)
where TStreamService : class, IStreamService
where TStreamService : class, IStreamService, IHostedService
{

// Don't configure this service if it is disabled
@@ -174,6 +175,8 @@ private static void AddAspNetFeatures(this IServiceCollection services)

}).AddMessagePackProtocol();

services.AddRazorPages();

services.AddMvc()
.SetCompatibilityVersion(CompatibilityVersion.Version_3_0);

@@ -182,7 +185,9 @@ private static void AddAspNetFeatures(this IServiceCollection services)
private static void RegisterTwitchPubSub(this IServiceCollection services) {

services.AddSingleton<Twitch.PubSub.Proxy>();

services.AddHostedService<TwitchPubSubService>();

//var provider = services.BuildServiceProvider();

//var pubSub = new TwitchPubSubService(
1 change: 1 addition & 0 deletions Fritz.StreamTools/appsettings.json
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
"ClientId": "",
"UserId": "",
"ChatToken": "",
"PubSubAuthToken": "",
"ChatBotName": "FritzBot_"
},
"Mixer": {
2 changes: 2 additions & 0 deletions Fritz.Twitch/ConfigurationSettings.cs
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@ public class ConfigurationSettings

public virtual string OAuthToken { get; set; }

public virtual string PubSubAuthToken { get; set; }

[Obsolete]
public string Channel { get => ChannelName; set => ChannelName = value; }

153 changes: 76 additions & 77 deletions Fritz.Twitch/PubSub/Proxy.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -11,22 +10,25 @@
using System.Threading.Tasks;
using System.Timers;

namespace Fritz.Twitch.PubSub
{
namespace Fritz.Twitch.PubSub {

/// <summary>
/// Manage interactions with the Twitch pubsub API
/// </summary>
public class Proxy : IDisposable
{
public class Proxy : IDisposable {

private ClientWebSocket _Socket;
private System.Timers.Timer _PingTimer;
private System.Timers.Timer _PongTimer;
private System.Timers.Timer _ReconnectTimer = new System.Timers.Timer();
private ConfigurationSettings _Configuration;
private ILogger _Logger;
private static bool _Reconnect;

private static readonly TimeSpan[] _ReconnectTimeouts = new TimeSpan[] {
TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30), TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(5)
};

public Proxy(IOptions<ConfigurationSettings> settings, ILoggerFactory loggerFactory)
{

@@ -37,10 +39,10 @@ public Proxy(IOptions<ConfigurationSettings> settings, ILoggerFactory loggerFact

}

public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken token)
{
public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken token) {

_Topics = topics;
_Reconnect = false;

// Start a timer to manage the connection over the websocket
_PingTimer = new System.Timers.Timer(TimeSpan.FromSeconds(30).TotalMilliseconds);
@@ -49,17 +51,15 @@ public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken

await StartListening(topics);

while (!token.IsCancellationRequested)
{
while (!token.IsCancellationRequested) {

var buffer = new byte[1024];
var messageBuffer = new ArraySegment<byte>(buffer);
var completeMessage = new StringBuilder();

var result = await _Socket.ReceiveAsync(messageBuffer, token);
completeMessage.Append(Encoding.UTF8.GetString(messageBuffer));
while (!result.EndOfMessage)
{
while (!result.EndOfMessage) {
buffer = new byte[1024];
messageBuffer = new ArraySegment<byte>(buffer);
result = await _Socket.ReceiveAsync(messageBuffer, token);
@@ -71,62 +71,81 @@ public async Task StartAsync(IEnumerable<TwitchTopic> topics, CancellationToken
break;
}

try
{
try {
HandleMessage(completeMessage.ToString());
} catch (UnhandledPubSubMessageException) {
}
catch (UnhandledPubSubMessageException) {
// do nothing
} catch (Exception e) {
}
catch (Exception e) {
_Logger.LogError(e, "Error while parsing message from Twitch: " + completeMessage.ToString());
_Logger.LogError("Reconnecting...");
_Reconnect = true;
}

if (_Reconnect) {
if (!_ReconnectTimeouts.Any(t => t.TotalMilliseconds == _ReconnectTimer.Interval)) {
_ReconnectTimer.Interval = _ReconnectTimeouts[0].TotalMilliseconds;
_Logger.LogError($"Unable to connect to Twitch PubSub. Reconnecting in {_ReconnectTimeouts[0].TotalSeconds} seconds");
}
else if (_ReconnectTimeouts.Last().TotalMilliseconds == _ReconnectTimer.Interval) {
_Reconnect = false;
_Logger.LogError("Unable to connect to Twitch PubSub. Ceasing attempting to connect");
} else {

for (var i=0; i<_ReconnectTimeouts.Length; i++) {
if (_ReconnectTimeouts[i].TotalMilliseconds == _ReconnectTimer.Interval) {
_Logger.LogError($"Unable to connect to Twitch PubSub. Reconnecting in {_ReconnectTimeouts[i + 1].TotalSeconds} seconds");
_ReconnectTimer.Interval = _ReconnectTimeouts[i + 1].TotalMilliseconds;
break;
}
}


}

await Task.Delay((int)_ReconnectTimer.Interval);
break;

}

}

if (_Reconnect) _ = Task.Run(() => StartAsync(topics, token));
// if (_Reconnect) _ = Task.Run(() => StartAsync(topics, token));

}


private delegate bool OnReceivedMessage(string message);
private List<OnReceivedMessage> _Strategies = new List<OnReceivedMessage>();
private delegate bool OnReceivedMessage(IPubSubReceiveMessage message);
private readonly List<OnReceivedMessage> _Strategies = new List<OnReceivedMessage>();

private void HandleMessage(string receivedMessage)
{
private void HandleMessage(string receivedMessage) {
var message = JsonConvert.DeserializeObject<IPubSubReceiveMessage>(receivedMessage);

var jDoc = JObject.Parse(receivedMessage);
var messageType = jDoc["type"].Value<string>();
if (messageType == "RESPONSE" && jDoc["error"].Value<string>() != "")
{
throw new Exception("Unable to connect");
} else if (messageType == "RESPONSE") {
if (message is ResponseReceiveMessage response) {
if (!string.IsNullOrWhiteSpace(response.Error)) {
throw new Exception($"Unable to connect: {response.Error}");
}
return;
}

foreach (var strategy in _Strategies)
{
if (strategy(receivedMessage)) return;
foreach (var strategy in _Strategies) {
if (strategy(message)) {
return;
}
}

throw new UnhandledPubSubMessageException();

}

private async Task StartListening(IEnumerable<TwitchTopic> topics)
{

private async Task StartListening(IEnumerable<TwitchTopic> topics) {
_Socket = new ClientWebSocket();

var message = new PubSubListen
{
data = new PubSubListen.PubSubListenData
{
auth_token = _Configuration.OAuthToken,
auth_token = _Configuration.PubSubAuthToken,
topics = topics.Select(t => t.TopicString).ToArray()
}
};
@@ -136,31 +155,30 @@ await _Socket.ConnectAsync(new Uri("wss://pubsub-edge.twitch.tv:443"), Cancellat

}

private void _PingTimer_Elapsed(object sender, ElapsedEventArgs e)
{
private void _PingTimer_Elapsed(object sender, ElapsedEventArgs e) {
var message = @"{ ""type"": ""PING"" }";
SendMessageOnSocket(message).GetAwaiter().GetResult();
_PongTimer = new System.Timers.Timer(TimeSpan.FromSeconds(10).TotalMilliseconds);
_PongTimer.Elapsed += _PongTimer_Elapsed;
_PongTimer.Start();
_PingAcknowledged = false;

// TODO: handle the lack of returned PONG message
// TODO: handle the lack of returned PONG message

}

private void _PongTimer_Elapsed(object sender, ElapsedEventArgs e)
{
private void _PongTimer_Elapsed(object sender, ElapsedEventArgs e) {
if (!_PingAcknowledged) {
_Reconnect = true;
_PongTimer.Dispose();
}
}

private Task SendMessageOnSocket(string message)
{
private Task SendMessageOnSocket(string message) {

if (_Socket.State != WebSocketState.Open) return Task.CompletedTask;
if (_Socket.State != WebSocketState.Open) {
return Task.CompletedTask;
}

var byteArray = Encoding.ASCII.GetBytes(message);
return _Socket.SendAsync(byteArray, WebSocketMessageType.Text, true, CancellationToken.None);
@@ -183,9 +201,9 @@ private void InitializeMethodStrategies() {

}

private bool HandlePongMessage(string message) {
private bool HandlePongMessage(IPubSubReceiveMessage message) {

if (message.Contains(@"""PONG""")) {
if (message is PongReceiveMessage) {
_PingAcknowledged = true;
_PongTimer.Stop();
_PongTimer.Dispose();
@@ -197,9 +215,9 @@ private bool HandlePongMessage(string message) {

}

private bool HandleReconnectMessage(string message) {
private bool HandleReconnectMessage(IPubSubReceiveMessage message) {

if (message.Contains(@"""RECONNECT""")) {
if (message is ReconnectReceiveMessage) {

_Reconnect = true;

@@ -210,26 +228,12 @@ private bool HandleReconnectMessage(string message) {

}

private bool HandleChannelPointsMessage(string message) {
private bool HandleChannelPointsMessage(IPubSubReceiveMessage message) {

var jDoc = JObject.Parse(message);

if (jDoc["type"].Value<string>() == "MESSAGE" && jDoc["data"]["topic"].Value<string>().StartsWith("channel-points-channel-v1") ) {

var innerMessage = jDoc["data"]["message"].Value<string>();

PubSubRedemptionMessage messageObj = null;
try
{
messageObj = JsonConvert.DeserializeObject<PubSubRedemptionMessage>(innerMessage);
} catch (Exception e) {
_Logger.LogError(e, "Error while deserializing the message");
_Logger.LogInformation("Message contents: " + innerMessage);
}
_Logger.LogWarning($"Channel Points redeemed: {innerMessage}");
OnChannelPointsRedeemed?.Invoke(null, messageObj?.data);
if (message is ChannelPointsReceiveMessage channelPointsMessage) {
_Logger.LogWarning($"Channel Points redeemed: {channelPointsMessage.Data.Message}");
OnChannelPointsRedeemed?.Invoke(null, channelPointsMessage?.Data?.Message);
return true;

}

return false;
@@ -239,35 +243,30 @@ private bool HandleChannelPointsMessage(string message) {
#endregion

#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls
private bool _DisposedValue = false; // To detect redundant calls
private bool _PingAcknowledged;
private IEnumerable<TwitchTopic> _Topics;

protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
protected virtual void Dispose(bool disposing) {
if (!_DisposedValue) {
if (disposing) {
_PingTimer.Dispose();
_PongTimer.Dispose();
_PongTimer?.Dispose();
}

_Socket.Dispose();

disposedValue = true;
_DisposedValue = true;
}
}

~Proxy()
{
~Proxy() {
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(false);
}

// This code added to correctly implement the disposable pattern.
public void Dispose()
{
public void Dispose() {
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// TODO: uncomment the following line if the finalizer is overridden above.
92 changes: 92 additions & 0 deletions Fritz.Twitch/PubSub/PubSubMessageJsonConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Text;

namespace Fritz.Twitch.PubSub {
public class PubSubMessageJsonConverter : JsonConverter {
public override bool CanConvert(Type objectType) {
return objectType.IsGenericType && objectType.GetGenericTypeDefinition() == typeof(PubSubReceiveMessage<>);
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) {
var jObject = JObject.Load(reader);
var type = jObject["type"].Value<string>().ToLower();

if (type == "message") {
var topicPrefix = jObject["data"]["topic"].Value<string>().TopicToTopicPrefix();
switch (topicPrefix) {
case "channel-points-channel-v1":
return jObject.ToObject<ChannelPointsReceiveMessage>();
default:
return null;
}
}
else if (type == "response") {
return jObject.ToObject<ResponseReceiveMessage>();
}
else if (type == "pong") {
return jObject.ToObject<PongReceiveMessage>();
}
else if (type == "reconnect") {
return jObject.ToObject<ReconnectReceiveMessage>();
}

return null;
}

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) {
throw new NotImplementedException();
}
}


public class PubSubReceiveMessage<MessageType> : IPubSubReceiveMessage {
public string Type { get; private set; }

public MessageData<MessageType> Data { get; private set; }

public string Error { get; private set; }

public string TopicPrefix {
get {
return Data?.Topic?.TopicToTopicPrefix() ?? string.Empty;
}
}
}

[JsonConverter(typeof(PubSubMessageJsonConverter))]
public interface IPubSubReceiveMessage {
string Type { get; }
string Error { get; }
string TopicPrefix { get; }
}

public class MessageData<MessageType> {
public string Topic { get; set; }
public MessageType Message { get; set; }
}

public class ChannelPointsReceiveMessage : PubSubReceiveMessage<ChannelRedemption> { }
public class ResponseReceiveMessage : PubSubReceiveMessage<string> { }

public class PongReceiveMessage : PubSubReceiveMessage<string> { }
public class ReconnectReceiveMessage : PubSubReceiveMessage<string> { }



public static class PubSubUtils {
public static string TopicToTopicPrefix(this string fullTopic) {
var index = fullTopic.IndexOf('.') - 1;
if (index <= 0) {
index = fullTopic.Length;
}

var topicPrefix = fullTopic.Substring(0, index);
return topicPrefix;
}
}


}