diff --git a/.gitignore b/.gitignore index 9c7c8fd..bee1797 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,5 @@ Gemfile.lock !/bench/.env*.erb /bench/k6 test/dummy/solid_cable_test +test/dummy/solid_cable_test* +test/dummy/solid_cable_development* diff --git a/app/models/solid_cable/channel.rb b/app/models/solid_cable/channel.rb new file mode 100644 index 0000000..2b57c31 --- /dev/null +++ b/app/models/solid_cable/channel.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module SolidCable + class Channel < SolidCable::Record + scope :for, ->(channel) { where(channel_hash: channel_hash_for(channel)) } + + def increment_subscribers! + update!(subscribers: subscribers + 1) + end + + def decrement_subscribers! + update!(subscribers: [ subscribers - 1, 0 ].max) + end + end +end diff --git a/app/models/solid_cable/channel_hashable.rb b/app/models/solid_cable/channel_hashable.rb new file mode 100644 index 0000000..2f026b9 --- /dev/null +++ b/app/models/solid_cable/channel_hashable.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module SolidCable::ChannelHashable + extend ActiveSupport::Concern + + class_methods do + def channel_hashes_for(channels) + channels.map { |channel| channel_hash_for(channel) } + end + + # Need to unpack this as a signed integer since Postgresql and SQLite + # don't support unsigned integers + def channel_hash_for(channel) + Digest::SHA256.digest(channel.to_s).unpack1("q>") + end + end +end diff --git a/app/models/solid_cable/message.rb b/app/models/solid_cable/message.rb index 193d45a..bda3095 100644 --- a/app/models/solid_cable/message.rb +++ b/app/models/solid_cable/message.rb @@ -12,18 +12,10 @@ class Message < SolidCable::Record class << self def broadcast(channel, payload) - insert({ created_at: Time.current, channel:, payload:, - channel_hash: channel_hash_for(channel) }) - end - - def channel_hashes_for(channels) - channels.map { |channel| channel_hash_for(channel) } - end + channel_hash = channel_hash_for(channel) + insert({ created_at: Time.current, channel:, payload:, channel_hash: }) - # Need to unpack this as a signed integer since Postgresql and SQLite - # don't support unsigned integers - def channel_hash_for(channel) - Digest::SHA256.digest(channel.to_s).unpack1("q>") + ::SolidCable::Channel.find_by(channel_hash:)&.subscribers.to_i end end end diff --git a/app/models/solid_cable/record.rb b/app/models/solid_cable/record.rb index a36c424..aa6ecc9 100644 --- a/app/models/solid_cable/record.rb +++ b/app/models/solid_cable/record.rb @@ -2,6 +2,8 @@ module SolidCable class Record < ActiveRecord::Base + include ChannelHashable + self.abstract_class = true connects_to(**SolidCable.connects_to) if SolidCable.connects_to.present? diff --git a/bin/rails b/bin/rails new file mode 100755 index 0000000..2f03747 --- /dev/null +++ b/bin/rails @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby +# This command will automatically be run when you run "rails" with Rails gems +# installed from the root of your application. + +ENGINE_ROOT = File.expand_path('..', __dir__) +ENGINE_PATH = File.expand_path('../lib/active_insights/engine', __dir__) +APP_PATH = File.expand_path('../test/dummy/config/application', __dir__) + +# Set up gems listed in the Gemfile. +ENV['BUNDLE_GEMFILE'] ||= File.expand_path('../Gemfile', __dir__) +require 'bundler/setup' if File.exist?(ENV['BUNDLE_GEMFILE']) + +require 'rails/all' +require 'rails/engine/commands' diff --git a/lib/action_cable/subscription_adapter/solid_cable.rb b/lib/action_cable/subscription_adapter/solid_cable.rb index 401c273..160dece 100644 --- a/lib/action_cable/subscription_adapter/solid_cable.rb +++ b/lib/action_cable/subscription_adapter/solid_cable.rb @@ -15,9 +15,9 @@ def initialize(*) end def broadcast(channel, payload) - ::SolidCable::Message.broadcast(channel, payload) - - ::SolidCable::TrimJob.perform_now if ::SolidCable.autotrim? + ::SolidCable::Message.broadcast(channel, payload).tap do + ::SolidCable::TrimJob.perform_now if ::SolidCable.autotrim? + end end def subscribe(channel, callback, success_callback = nil) @@ -64,11 +64,13 @@ def shutdown def add_channel(channel, on_success) channels.add(channel) + ::SolidCable::Channel.for(channel).first_or_create.increment_subscribers! event_loop.post(&on_success) if on_success end def remove_channel(channel) channels.delete(channel) + ::SolidCable::Channel.for(channel).first_or_create.decrement_subscribers! end def invoke_callback(*) diff --git a/lib/generators/solid_cable/add_channels/USAGE b/lib/generators/solid_cable/add_channels/USAGE new file mode 100644 index 0000000..20625bc --- /dev/null +++ b/lib/generators/solid_cable/add_channels/USAGE @@ -0,0 +1,5 @@ +Description: + Adds Solid Cable channel migration + +Example: + bin/rails generate solid_cable:add_channels diff --git a/lib/generators/solid_cable/add_channels/add_channels_generator.rb b/lib/generators/solid_cable/add_channels/add_channels_generator.rb new file mode 100644 index 0000000..2c77d6a --- /dev/null +++ b/lib/generators/solid_cable/add_channels/add_channels_generator.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "rails/generators" +require "rails/generators/active_record" + +class SolidCable::AddChannelsGenerator < Rails::Generators::Base + include ActiveRecord::Generators::Migration + + source_root File.expand_path("templates", __dir__) + + def copy_files + migration_template "db/migrate/create_channels.rb", + "db/cable_migrate/create_channels.rb" + end +end diff --git a/lib/generators/solid_cable/add_channels/templates/db/migrate/create_channels.rb b/lib/generators/solid_cable/add_channels/templates/db/migrate/create_channels.rb new file mode 100644 index 0000000..1162027 --- /dev/null +++ b/lib/generators/solid_cable/add_channels/templates/db/migrate/create_channels.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +class CreateChannels < ActiveRecord::Migration[7.2] + def change + create_table "solid_cable_channels", force: :cascade do |t| + t.integer "channel_hash", limit: 8, null: false + t.integer "subscribers", default: 0, null: false + t.datetime "created_at", null: false + t.index [ "channel_hash" ], name: "index_solid_cable_channels_on_channel_hash" + end + end +end diff --git a/lib/generators/solid_cable/install/templates/db/cable_schema.rb b/lib/generators/solid_cable/install/templates/db/cable_schema.rb index 2366660..294bf06 100644 --- a/lib/generators/solid_cable/install/templates/db/cable_schema.rb +++ b/lib/generators/solid_cable/install/templates/db/cable_schema.rb @@ -1,4 +1,11 @@ ActiveRecord::Schema[7.1].define(version: 1) do + create_table "solid_cable_channels", force: :cascade do |t| + t.integer "channel_hash", limit: 8, null: false + t.integer "subscribers", default: 0, null: false + t.datetime "created_at", null: false + t.index ["channel_hash"], name: "index_solid_cable_channels_on_channel_hash" + end + create_table "solid_cable_messages", force: :cascade do |t| t.binary "channel", limit: 1024, null: false t.binary "payload", limit: 536870912, null: false diff --git a/lib/tasks/solid_cable_tasks.rake b/lib/tasks/solid_cable_tasks.rake index a92f45a..cddf4a7 100644 --- a/lib/tasks/solid_cable_tasks.rake +++ b/lib/tasks/solid_cable_tasks.rake @@ -9,4 +9,8 @@ namespace :solid_cable do task :update do Rails::Command.invoke :generate, [ "solid_cable:update" ] end + + task :add_channels do + Rails::Command.invoke :generate, [ "solid_cable:add_channels" ] + end end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 229f6a9..8d2c470 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.2].define(version: 2024_09_12_130854) do +ActiveRecord::Schema[7.2].define(version: 2024_10_11_202426) do create_table "solid_cable_messages", force: :cascade do |t| t.binary "channel", limit: 1024, null: false t.binary "payload", limit: 536870912, null: false @@ -20,4 +20,11 @@ t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash" t.index ["created_at"], name: "index_solid_cable_messages_on_created_at" end + + create_table "solid_cable_channels", force: :cascade do |t| + t.integer "channel_hash", limit: 8, null: false + t.integer "subscribers", default: 0, null: false + t.datetime "created_at", null: false + t.index ["channel_hash"], name: "index_solid_cable_channels_on_channel_hash" + end end diff --git a/test/dummy/solid_cable_test-shm b/test/dummy/solid_cable_test-shm deleted file mode 100644 index fe9ac28..0000000 Binary files a/test/dummy/solid_cable_test-shm and /dev/null differ diff --git a/test/dummy/solid_cable_test-wal b/test/dummy/solid_cable_test-wal deleted file mode 100644 index e69de29..0000000 diff --git a/test/lib/action_cable/subscription_adapter/solid_cable_test.rb b/test/lib/action_cable/subscription_adapter/solid_cable_test.rb index 6dedad8..548f8ea 100644 --- a/test/lib/action_cable/subscription_adapter/solid_cable_test.rb +++ b/test/lib/action_cable/subscription_adapter/solid_cable_test.rb @@ -33,6 +33,21 @@ class ActionCable::SubscriptionAdapter::SolidCableTest < ActionCable::TestCase [@rx_adapter, @tx_adapter].uniq.compact.each(&:shutdown) end + test "broadcast return value with no subscribers" do + subscribers = @tx_adapter.broadcast("channel", "hello world") + + assert_equal 0, subscribers + end + + test "broadcast return value with a subscriber" do + subscribe_as_queue("channel") do |queue| + subscribers = @tx_adapter.broadcast("channel", "hello world") + + assert_equal 1, subscribers + queue.clear + end + end + test "subscribe_and_unsubscribe" do subscribe_as_queue("channel") do |queue| end