diff --git a/config/config.exs b/config/config.exs index e57137c..b2e62a9 100644 --- a/config/config.exs +++ b/config/config.exs @@ -42,3 +42,4 @@ config :phoenix, :json_library, Jason # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" +import_config "oban.exs" diff --git a/config/oban.exs b/config/oban.exs new file mode 100644 index 0000000..d06519c --- /dev/null +++ b/config/oban.exs @@ -0,0 +1,6 @@ +import Config + +config :atlas, Oban, + repo: Atlas.Repo, + plugins: [Oban.Plugins.Pruner], + queues: [default: 10, emails: 5] diff --git a/config/test.exs b/config/test.exs index a2eddbd..d31c012 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,4 +1,6 @@ import Config +# Make Oban run jobs inline for tests +config :atlas, Oban, testing: :inline # Only in tests, remove the complexity from the password hashing algorithm config :bcrypt_elixir, :log_rounds, 1 diff --git a/lib/atlas/accounts/user_notifier.ex b/lib/atlas/accounts/user_notifier.ex index bcdf02a..18f2ef9 100644 --- a/lib/atlas/accounts/user_notifier.ex +++ b/lib/atlas/accounts/user_notifier.ex @@ -4,22 +4,17 @@ defmodule Atlas.Accounts.UserNotifier do password reset instructions, and email update instructions. """ - import Swoosh.Email - - alias Atlas.Mailer - - # Delivers the email using the application mailer. + alias Atlas.Workers.EmailWorker + # Enqueue the email using Oban defp deliver(recipient, subject, body) do - email = - new() - |> to(recipient) - |> from({"Atlas", "contact@example.com"}) - |> subject(subject) - |> text_body(body) - - with {:ok, _metadata} <- Mailer.deliver(email) do - {:ok, email} - end + job = %{ + "to" => recipient, + "subject" => subject, + "body" => body + } + + Oban.insert!(EmailWorker.new(job)) + {:ok, :enqueued} end @doc """ diff --git a/lib/atlas/application.ex b/lib/atlas/application.ex index e204bb5..b8e7670 100644 --- a/lib/atlas/application.ex +++ b/lib/atlas/application.ex @@ -5,6 +5,10 @@ defmodule Atlas.Application do use Application + defp oban_config do + Application.fetch_env!(:atlas, Oban) + end + @impl true def start(_type, _args) do children = [ @@ -15,10 +19,9 @@ defmodule Atlas.Application do # Start the Finch HTTP client for sending emails {Finch, name: Atlas.Finch}, # Start the Guardian DB token sweeper server - # This is used to clean up expired tokens from the database {Guardian.DB.Sweeper, []}, - # Start a worker by calling: Atlas.Worker.start_link(arg) - # {Atlas.Worker, arg}, + # Start Oban for background jobs + {Oban, oban_config()}, # Start to serve requests, typically the last entry AtlasWeb.Endpoint ] diff --git a/lib/atlas/workers/email_worker.ex b/lib/atlas/workers/email_worker.ex new file mode 100644 index 0000000..5dd06e7 --- /dev/null +++ b/lib/atlas/workers/email_worker.ex @@ -0,0 +1,39 @@ +defmodule Atlas.Workers.EmailWorker do + @moduledoc """ + Oban worker responsible for sending emails asynchronously using the application's mailer. + """ + use Oban.Worker, queue: :emails + + require Logger + + import Swoosh.Email + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"to" => to, "subject" => subject, "body" => body}}) do + required = [to, subject, body] + + if Enum.any?(required, &nil_or_blank?/1) do + Logger.warning("Email not sent: missing required fields") + :discard + else + email = + new() + |> from({"Atlas", "contact@example.com"}) + |> to(to) + |> subject(subject) + |> text_body(body) + + case Atlas.Mailer.deliver(email) do + {:ok, _metadata} -> + Logger.info("Email sent to #{to}") + :ok + + {:error, reason} -> + Logger.error("Failed to send email to #{to}: #{inspect(reason)}") + {:error, reason} + end + end + end + + defp nil_or_blank?(val), do: is_nil(val) or (is_binary(val) and String.trim(val) == "") +end diff --git a/mix.exs b/mix.exs index 326a3ec..212548c 100644 --- a/mix.exs +++ b/mix.exs @@ -52,6 +52,9 @@ defmodule Atlas.MixProject do {:swoosh, "~> 1.5"}, {:finch, "~> 0.13"}, + # job processing + {:oban, "~> 2.17"}, + # tools {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index a1cbc8c..0199bb3 100644 --- a/mix.lock +++ b/mix.lock @@ -3,9 +3,9 @@ "bcrypt_elixir": {:hex, :bcrypt_elixir, "3.3.2", "d50091e3c9492d73e17fc1e1619a9b09d6a5ef99160eb4d736926fd475a16ca3", [:make, :mix], [{:comeonin, "~> 5.3", [hex: :comeonin, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "471be5151874ae7931911057d1467d908955f93554f7a6cd1b7d804cac8cef53"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "castore": {:hex, :castore, "1.0.14", "4582dd7d630b48cf5e1ca8d3d42494db51e406b7ba704e81fbd401866366896a", [:mix], [], "hexpm", "7bc1b65249d31701393edaaac18ec8398d8974d52c647b7904d01b964137b9f4"}, - "corsica": {:hex, :corsica, "2.1.3", "dccd094ffce38178acead9ae743180cdaffa388f35f0461ba1e8151d32e190e6", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "616c08f61a345780c2cf662ff226816f04d8868e12054e68963e95285b5be8bc"}, "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"}, "comeonin": {:hex, :comeonin, "5.5.1", "5113e5f3800799787de08a6e0db307133850e635d34e9fab23c70b6501669510", [:mix], [], "hexpm", "65aac8f19938145377cee73973f192c5645873dcf550a8a6b18187d17c13ccdb"}, + "corsica": {:hex, :corsica, "2.1.3", "dccd094ffce38178acead9ae743180cdaffa388f35f0461ba1e8151d32e190e6", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "616c08f61a345780c2cf662ff226816f04d8868e12054e68963e95285b5be8bc"}, "credo": {:hex, :credo, "1.7.12", "9e3c20463de4b5f3f23721527fcaf16722ec815e70ff6c60b86412c695d426c1", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8493d45c656c5427d9c729235b99d498bd133421f3e0a683e5c1b561471291e5"}, "db_connection": {:hex, :db_connection, "2.8.0", "64fd82cfa6d8e25ec6660cea73e92a4cbc6a18b31343910427b702838c4b33b2", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "008399dae5eee1bf5caa6e86d204dcb44242c82b1ed5e22c881f2c34da201b15"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, @@ -24,6 +24,7 @@ "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "oban": {:hex, :oban, "2.19.4", "045adb10db1161dceb75c254782f97cdc6596e7044af456a59decb6d06da73c1", [:mix], [{:ecto_sql, "~> 3.10", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:igniter, "~> 0.5", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5fcc6219e6464525b808d97add17896e724131f498444a292071bf8991c99f97"}, "phoenix": {:hex, :phoenix, "1.7.21", "14ca4f1071a5f65121217d6b57ac5712d1857e40a0833aff7a691b7870fc9a3b", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "336dce4f86cba56fed312a7d280bf2282c720abb6074bdb1b61ec8095bdd0bc9"}, "phoenix_ecto": {:hex, :phoenix_ecto, "4.6.5", "c4ef322acd15a574a8b1a08eff0ee0a85e73096b53ce1403b6563709f15e1cea", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}], "hexpm", "26ec3208eef407f31b748cadd044045c6fd485fbff168e35963d2f9dfff28d4b"}, "phoenix_html": {:hex, :phoenix_html, "4.2.1", "35279e2a39140068fc03f8874408d58eef734e488fc142153f055c5454fd1c08", [:mix], [], "hexpm", "cff108100ae2715dd959ae8f2a8cef8e20b593f8dfd031c9cba92702cf23e053"}, @@ -35,7 +36,7 @@ "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, "postgrex": {:hex, :postgrex, "0.20.0", "363ed03ab4757f6bc47942eff7720640795eb557e1935951c1626f0d303a3aed", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d36ef8b36f323d29505314f704e21a1a038e2dc387c6409ee0cd24144e187c0f"}, "remote_ip": {:hex, :remote_ip, "1.2.0", "fb078e12a44414f4cef5a75963c33008fe169b806572ccd17257c208a7bc760f", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2ff91de19c48149ce19ed230a81d377186e4412552a597d6a5137373e5877cb7"}, - "swoosh": {:hex, :swoosh, "1.19.3", "02ad4455939f502386e4e1443d4de94c514995fd0e51b3cafffd6bd270ffe81c", [:mix], [{:bandit, ">= 1.0.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:cowboy, "~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: true]}, {:finch, "~> 0.6", [hex: :finch, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13 or ~> 1.0", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mua, "~> 0.2.3", [hex: :mua, repo: "hexpm", optional: true]}, {:multipart, "~> 0.4", [hex: :multipart, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:req, "~> 0.5.10 or ~> 0.6 or ~> 1.0", [hex: :req, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "04a10f8496786b744b84130e3510eb53ca51e769c39511b65023bdf4136b732f"}, + "swoosh": {:hex, :swoosh, "1.19.4", "f8563453f17795d69dbc4e58734e54b9f38652254ef0a15b5796355e10386d2b", [:mix], [{:bandit, ">= 1.0.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:cowboy, "~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: true]}, {:finch, "~> 0.6", [hex: :finch, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13 or ~> 1.0", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mua, "~> 0.2.3", [hex: :mua, repo: "hexpm", optional: true]}, {:multipart, "~> 0.4", [hex: :multipart, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:req, "~> 0.5.10 or ~> 0.6 or ~> 1.0", [hex: :req, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "aec5f4897ebf7fda47d520425b1a3e684c1a7e5131b4f84f722a73eaf0a4e945"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, "telemetry_poller": {:hex, :telemetry_poller, "1.2.0", "ba82e333215aed9dd2096f93bd1d13ae89d249f82760fcada0850ba33bac154b", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7216e21a6c326eb9aa44328028c34e9fd348fb53667ca837be59d0aa2a0156e8"}, diff --git a/priv/repo/migrations/20250728141804_add_oban_jobs.exs b/priv/repo/migrations/20250728141804_add_oban_jobs.exs new file mode 100644 index 0000000..24918b6 --- /dev/null +++ b/priv/repo/migrations/20250728141804_add_oban_jobs.exs @@ -0,0 +1,11 @@ +defmodule Atlas.Repo.Migrations.AddObanJobs do + use Ecto.Migration + + def up do + Oban.Migrations.up() + end + + def down do + Oban.Migrations.down(version: 1) + end +end diff --git a/test/atlas/workers/email_worker_test.exs b/test/atlas/workers/email_worker_test.exs new file mode 100644 index 0000000..8ec2711 --- /dev/null +++ b/test/atlas/workers/email_worker_test.exs @@ -0,0 +1,42 @@ +defmodule Atlas.Workers.EmailWorkerTest do + use Atlas.DataCase, async: true + + import Swoosh.TestAssertions + alias Atlas.Workers.EmailWorker + + test "enqueues and delivers email job" do + job = %{ + "to" => "test@example.com", + "subject" => "Queued Email", + "body" => "Hello from Oban!" + } + + assert {:ok, _job} = Oban.insert(EmailWorker.new(job)) + + assert_email_sent( + to: {nil, "test@example.com"}, + subject: "Queued Email", + text_body: "Hello from Oban!" + ) + end + + test "does not deliver email with missing fields" do + job = %{"to" => nil, "subject" => nil, "body" => nil} + assert {:ok, _job} = Oban.insert(EmailWorker.new(job)) + refute_email_sent() + end + + test "enqueues and delivers multiple email jobs" do + jobs = [ + %{"to" => "a@example.com", "subject" => "A", "body" => "Body A"}, + %{"to" => "b@example.com", "subject" => "B", "body" => "Body B"} + ] + + Enum.each(jobs, fn job -> + assert {:ok, _job} = Oban.insert(EmailWorker.new(job)) + end) + + assert_email_sent(to: {nil, "a@example.com"}, subject: "A", text_body: "Body A") + assert_email_sent(to: {nil, "b@example.com"}, subject: "B", text_body: "Body B") + end +end diff --git a/test/support/fixtures/accounts_fixtures.ex b/test/support/fixtures/accounts_fixtures.ex index 1663c0e..8e8e4b7 100644 --- a/test/support/fixtures/accounts_fixtures.ex +++ b/test/support/fixtures/accounts_fixtures.ex @@ -26,8 +26,9 @@ defmodule Atlas.AccountsFixtures do end def extract_user_token(fun) do - {:ok, captured_email} = fun.(&"[TOKEN]#{&1}[TOKEN]") - [_, token | _] = String.split(captured_email.text_body, "[TOKEN]") + fun.(fn token -> "[TOKEN]#{token}[TOKEN]" end) + {:email, email_struct} = Swoosh.TestAssertions.assert_email_sent() + [_, token | _] = String.split(email_struct.text_body, "[TOKEN]") token end