Skip to content

[Feature] LimitEnumerator #424

@N1koKouloufakos

Description

@N1koKouloufakos

What

An enumerator that stops after a certain number of elements have been yielded.

Why

The use case we have:
we are iterating through results from an API and we want to only iterate over 10_000 results in this job. Because this job is run periodically, we want every job to have a hard, defined stopping point.

The ThrottleEnumerator is really similar, but it re-enqueues itself if a condition is hit. I want an enumerator that will shutdown the job

How

Because the job can shutdown at any time, and we want the limit respected regardless of retries, I've added a counter variable that gets serialized and deserialized so that the enumerator knows how many more elements to yield regardless of how many times the job was re-queued.

Code

# typed: true
# frozen_string_literal: true

module JobIteration
  module Limitable
    attr_accessor :counter

    def serialize
      super.merge("counter" => counter)
    end

    def deserialize(job_data)
      super
      @counter = job_data["counter"]
    end

    # This method could probably be moved into enumerator_builder.rb
    def build_limit_enumerator(enum, limit:)
      LimitEnumerator.new(enum, self, limit: limit, counter: counter || 0).to_enum
    end

    # LimitEnumerator allows you to limit iterations
    # to a specific number of items.
    # @example
    #   def build_enumerator(_params, cursor:)
    #     build_limit_enumerator(
    #       enumerator_builder.active_record_on_batches(
    #         Account.inactive,
    #         cursor: cursor
    #       ),
    #       limit: 1_000,
    #     )
    #   end
    # The enumerator from above will mimic the provided enumerator, active_record_on_batches,
    # except when over 1_000 items have been yielded from the enumerator.
    # In that case, it will `throw :abort` which will quietly shutdown the job.
    # This enumerator works by serializing a counter variable into the job's metadata if it shuts down, so that
    # when the job is re-enqueued, it can pick up where it left off and remember how many items it has already yielded.
    class LimitEnumerator
      attr_accessor :enum, :job, :limit, :counter

      def initialize(enum, job, limit:, counter:)
        @enum = enum
        @job = job
        @limit = limit
        @counter = counter
      end

      def to_enum
        Enumerator.new do |yielder|
          @enum.each do |*val|
            if should_stop?
              ActiveSupport::Notifications.instrument("limited.iteration", job_class: @job.class.name)
              throw(:abort)
            end

            @counter += 1
            yielder.yield(*val)
          end
        end
      end

      def should_stop?
        @counter >= @limit
      end
    end
  end
end

Usage:

class MyLimitedIterationJob < ActiveJob::Base
  include JobIteration::Iteration
  include JobIteration::Limitable

  def build_enumerator(cursor:)
    build_limited_enumerator(
       enumerator_builder.active_record_on_batches(
          Account.inactive,
          cursor: cursor
         ),
         limit: 1_000,
    )
  end

  def each_iteration(batch)
    # Do work
  end
end

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions