Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Survive minor db drops #427

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 54 additions & 40 deletions lib/dynflow/persistence_adapters/sequel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,36 +71,39 @@ def find_execution_plans(options = {})
paginate(table(table_name), options),
options),
options[:filters])
data_set.all.map { |record| execution_plan_column_map(load_data(record, table_name)) }
records = with_retry { data_set.all }
records.map { |record| execution_plan_column_map(load_data(record, table_name)) }
end

def find_execution_plan_counts(options = {})
filter(:execution_plan, table(:execution_plan), options[:filters]).count
with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).count }
end

def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil)
count = 0
filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:uuid) }
@db.transaction do
table(:delayed).where(execution_plan_uuid: uuids).delete
with_retry do
filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:uuid) }
@db.transaction do
table(:delayed).where(execution_plan_uuid: uuids).delete

steps = table(:step).where(execution_plan_uuid: uuids)
backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir
steps.delete
steps = table(:step).where(execution_plan_uuid: uuids)
backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir
steps.delete

output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete
output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete

actions = table(:action).where(execution_plan_uuid: uuids)
backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir
actions.delete
actions = table(:action).where(execution_plan_uuid: uuids)
backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir
actions.delete

execution_plans = table(:execution_plan).where(uuid: uuids)
backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
count += execution_plans.delete
execution_plans = table(:execution_plan).where(uuid: uuids)
backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
count += execution_plans.delete
end
end
return count
end
return count
end

def load_execution_plan(execution_plan_id)
Expand All @@ -113,30 +116,37 @@ def save_execution_plan(execution_plan_id, value)

def delete_delayed_plans(filters, batch_size = 1000)
count = 0
filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
@db.transaction do
count += table(:delayed).where(execution_plan_uuid: uuids).delete
with_retry do
filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
@db.transaction do
count += table(:delayed).where(execution_plan_uuid: uuids).delete
end
end
end
count
end

def find_old_execution_plans(age)
table_name = :execution_plan
table(table_name)
.where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
.all.map { |plan| execution_plan_column_map(load_data plan, table_name) }
records = with_retry do
table(table_name)
.where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
.all
end
records.map { |plan| execution_plan_column_map(load_data plan, table_name) }
end

def find_past_delayed_plans(time)
table_name = :delayed
table(table_name)
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
.where(:frozen => false)
.order_by(:start_at)
.all
.map { |plan| load_data(plan, table_name) }
records = with_retry do
table(table_name)
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
.where(:frozen => false)
.order_by(:start_at)
.all
end
records.map { |plan| load_data(plan, table_name) }
end

def load_delayed_plan(execution_plan_id)
Expand Down Expand Up @@ -203,28 +213,30 @@ def save_envelope(data)

def pull_envelopes(receiver_id)
connector_feature!
db.transaction do
data_set = table(:envelope).where(receiver_id: receiver_id).all
envelopes = data_set.map { |record| load_data(record) }
with_retry do
db.transaction do
data_set = table(:envelope).where(receiver_id: receiver_id).all
envelopes = data_set.map { |record| load_data(record) }

table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
return envelopes
table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
return envelopes
end
end
end

def push_envelope(envelope)
connector_feature!
table(:envelope).insert(prepare_record(:envelope, envelope))
with_retry { table(:envelope).insert(prepare_record(:envelope, envelope)) }
end

def prune_envelopes(receiver_ids)
connector_feature!
table(:envelope).where(receiver_id: receiver_ids).delete
with_retry { table(:envelope).where(receiver_id: receiver_ids).delete }
end

def prune_undeliverable_envelopes
connector_feature!
table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete
with_retry { table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete }
end

def coordinator_feature!
Expand All @@ -245,7 +257,7 @@ def update_coordinator_record(class_name, record_id, value)

def delete_coordinator_record(class_name, record_id)
coordinator_feature!
table(:coordinator_record).where(class: class_name, id: record_id).delete
with_retry { table(:coordinator_record).where(class: class_name, id: record_id).delete }
end

def find_coordinator_records(options)
Expand All @@ -257,7 +269,9 @@ def find_coordinator_records(options)
if exclude_owner_id
data_set = data_set.exclude(:owner_id => exclude_owner_id)
end
data_set.all.map { |record| load_data(record) }
with_retry do
data_set.all.map { |record| load_data(record) }
end
end

def to_hash
Expand Down