Skip to content

Refactor code to reactive approach #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Codehard.DJ/Codehard.DJ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Codehard.Functional" Version="2.4.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.0" />
<PackageReference Include="SpotifyAPI.Web.Auth" Version="7.0.0" />
<PackageReference Include="System.Runtime.Caching" Version="7.0.0" />
<PackageReference Include="OpenAI-DotNet" Version="5.1.0" />
<PackageReference Include="Codehard.Functional" Version="2.4.0"/>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="7.0.0"/>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.0"/>
<PackageReference Include="SpotifyAPI.Web.Auth" Version="7.0.0"/>
<PackageReference Include="System.Runtime.Caching" Version="7.0.0"/>
<PackageReference Include="OpenAI-DotNet" Version="5.1.0"/>
<PackageReference Include="System.Reactive.Linq" Version="5.0.0"/>
</ItemGroup>

<ItemGroup>
Expand All @@ -24,9 +25,9 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DJ.Domain\DJ.Domain.csproj" />
<ProjectReference Include="..\DJ.Infrastructure\DJ.Infrastructure.csproj" />
<ProjectReference Include="..\Infrastructure.Discord\Infrastructure.Discord.csproj" />
<ProjectReference Include="..\DJ.Domain\DJ.Domain.csproj"/>
<ProjectReference Include="..\DJ.Infrastructure\DJ.Infrastructure.csproj"/>
<ProjectReference Include="..\Infrastructure.Discord\Infrastructure.Discord.csproj"/>
</ItemGroup>

</Project>
154 changes: 83 additions & 71 deletions Codehard.DJ/Providers/Spotify/SpotifyProvider.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Codehard.DJ.Extensions;
using System.Reactive.Linq;
using Codehard.DJ.Extensions;
using Codehard.DJ.Providers.Models;
using Codehard.Functional;
using Microsoft.Extensions.Logging;
Expand All @@ -10,13 +11,13 @@ public class SpotifyProvider : IMusicProvider
{
private readonly SpotifyClient _client;
private readonly ILogger<SpotifyProvider> _logger;
private readonly Timer _timer;
private readonly Queue<Music> _queue;
private readonly Stack<Music> _playedStack;

private Music? _currentMusic;
private bool _disposed;
private int _volume = -1;
private readonly Action _disposer;

public SpotifyProvider(
SpotifyClient client,
Expand All @@ -30,7 +31,10 @@ public SpotifyProvider(
// so we doing the queue in memory
this._queue = new Queue<Music>();
this._playedStack = new Stack<Music>();
this._timer = new Timer(GetPlayingTrackInfoAsync, default, TimeSpan.Zero, TimeSpan.FromSeconds(3));

var disposeSubscriptionAction = GetPlayingTrackInfo();

this._disposer = disposeSubscriptionAction;
}

public event PlayStartEventHandler? PlayStartEvent;
Expand Down Expand Up @@ -265,90 +269,97 @@ public async ValueTask<bool> SetVolumeAsync(int volume, CancellationToken cancel
}
}

private async void GetPlayingTrackInfoAsync(object? state)
private Action GetPlayingTrackInfo()
{
try
var playbackStateObserver =
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(3))
.SelectMany(_ => Observable.FromAsync(GetPlayerInfoAsync))
.Do(t => this.State = t.State);

var stateChangedSubscription =
playbackStateObserver
.DistinctUntilChanged(t => t.State)
.Subscribe(t => this.PlayerStateChangedEvent?.Invoke(this, t.State));

var playStoppedOrEndedSubscription =
playbackStateObserver
.Where(t => t.State is PlaybackState.Ended or PlaybackState.Stopped)
.Select(t => t.TrackAudio)
.Subscribe(PlayerStoppedOrEndedSubscribeHandler);

var playbackOutOfSyncSubscription =
playbackStateObserver
.Where(t =>
t.State is PlaybackState.Playing &&
t.TrackAudio!.Id != this._currentMusic?.Id)
.Select(t => t.TrackAudio)
.DistinctUntilChanged(_ => this.RemainingInQueue)
.Subscribe(PlaybackOutOfSyncHandler!);

return () =>
{
var (track, playbackState) = await IsCurrentPlaybackEndedAsync();
stateChangedSubscription.Dispose();
playStoppedOrEndedSubscription.Dispose();
playbackOutOfSyncSubscription.Dispose();
};

if (this.State != playbackState)
{
this.PlayerStateChangedEvent?.Invoke(this, playbackState);
}
async void PlaybackOutOfSyncHandler(FullTrack track)
{
TryInvokePlaybackEnded();

this.State = playbackState;
this.PlaybackOutOfSyncEvent?.Invoke(this,
new MusicPlayerEventArgs
{
Music = new Music(track.Id, track.Name, track.Artists.Select(a => new Artist(a.Id, a.Name, System.Array.Empty<string>())).ToArray(),
new Album(track.Album.Name, track.Album.Images.Select(i => i.Url).ToArray(), string.Empty), track.DurationMs, new Uri(track.Uri)),
});

switch (playbackState)
if (this.RemainingInQueue > 0)
{
case PlaybackState.Playing:
var isOutOfSync = track!.Id != this._currentMusic?.Id;

if (!isOutOfSync)
{
break;
}

TryInvokePlaybackEnded();

this.PlaybackOutOfSyncEvent?.Invoke(this, new MusicPlayerEventArgs
{
Music = new Music(
track.Id,
track.Name,
track.Artists.Select(a => new Artist(a.Id, a.Name, System.Array.Empty<string>())).ToArray(),
new Album(track.Album.Name, track.Album.Images.Select(i => i.Url).ToArray(), string.Empty),
track.DurationMs,
new Uri(track.Uri)),
});

if (this.RemainingInQueue > 0)
{
await this.StopAsync();
}

break;
case PlaybackState.Stopped:
case PlaybackState.Ended:
TryInvokePlaybackEnded();

if (this._queue.Any())
{
await this.NextAsync();
}

break;
default:
throw new NotSupportedException();
await this.StopAsync();
}
}
catch (Exception ex)
{
this._logger.LogError(ex, "An error occurred during track info gathering");
}

async Task<(FullTrack? TrackAudio, PlaybackState State)> IsCurrentPlaybackEndedAsync()
async void PlayerStoppedOrEndedSubscribeHandler(FullTrack? _)
{
var playingContext = await this._client.Player.GetCurrentPlayback();
TryInvokePlaybackEnded();

if (playingContext == null!)
if (this._queue.Any())
{
return (default, PlaybackState.Stopped);
await this.NextAsync();
}
}

if (playingContext.Item is not FullTrack currentTrack)
async Task<(FullTrack? TrackAudio, PlaybackState State)> GetPlayerInfoAsync()
{
try
{
return (default, PlaybackState.Stopped);
}
var playingContext = await this._client.Player.GetCurrentPlayback();

var trackEnded = playingContext.ProgressMs >= currentTrack.DurationMs - 1;
var trackStopped = playingContext is { IsPlaying: false, ProgressMs: 0 };
if (playingContext == null!)
{
return (default, PlaybackState.Stopped);
}

if (playingContext.Item is not FullTrack currentTrack)
{
return (default, PlaybackState.Stopped);
}

var trackEnded = playingContext.ProgressMs >= currentTrack.DurationMs - 1;
var trackStopped = playingContext is { IsPlaying: false, ProgressMs: 0 };

var playbackState =
trackEnded ? PlaybackState.Ended :
trackStopped ? PlaybackState.Stopped :
PlaybackState.Playing;
var playbackState =
trackEnded ? PlaybackState.Ended :
trackStopped ? PlaybackState.Stopped :
PlaybackState.Playing;

return (currentTrack, playbackState);
return (currentTrack, playbackState);
}
catch
{
return (null, PlaybackState.Unknown);
}
}

void TryInvokePlaybackEnded()
Expand All @@ -374,13 +385,14 @@ protected void Dispose(bool disposing)
return;
}

this._timer.Dispose();
this._disposer();

this._disposed = true;
}

public void Dispose()
{
_timer.Dispose();
this.Dispose(true);
GC.SuppressFinalize(this);
}
}