-
Notifications
You must be signed in to change notification settings - Fork 79
Job queue, matrix builder, concurrency control #709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
f-f
wants to merge
5
commits into
trh/compilers-in-metadata
Choose a base branch
from
f-f/concurrent-jobs-2
base: trh/compilers-in-metadata
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
20d6b21
Update database schemas and add job executor loop
thomashoneyman 4b9743c
Split Server module into Env, Router, JobExecutor, and Main
fsoikin 2fe9635
Fix up build
fsoikin a4f1047
Run job executor
fsoikin dfd7e78
Fix integration tests
f-f File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| module Registry.App.Main where | ||
|
|
||
| import Registry.App.Prelude hiding ((/)) | ||
|
|
||
| import Data.DateTime (diff) | ||
| import Data.Time.Duration (Milliseconds(..), Seconds(..)) | ||
| import Effect.Aff as Aff | ||
| import Effect.Class.Console as Console | ||
| import Fetch.Retry as Fetch.Retry | ||
| import Node.Process as Process | ||
| import Registry.App.Server.Env (ServerEnv, createServerEnv) | ||
| import Registry.App.Server.JobExecutor as JobExecutor | ||
| import Registry.App.Server.Router as Router | ||
|
|
||
| main :: Effect Unit | ||
| main = do | ||
| createServerEnv # Aff.runAff_ case _ of | ||
| Left error -> do | ||
| Console.log $ "Failed to start server: " <> Aff.message error | ||
| Process.exit' 1 | ||
| Right env -> do | ||
| case env.vars.resourceEnv.healthchecksUrl of | ||
| Nothing -> Console.log "HEALTHCHECKS_URL not set, healthcheck pinging disabled" | ||
| Just healthchecksUrl -> Aff.launchAff_ $ healthcheck healthchecksUrl | ||
| Aff.launchAff_ $ jobExecutor env | ||
| Router.runRouter env | ||
| where | ||
| healthcheck :: String -> Aff Unit | ||
| healthcheck healthchecksUrl = loop limit | ||
| where | ||
| limit = 10 | ||
| oneMinute = Aff.Milliseconds (1000.0 * 60.0) | ||
| fiveMinutes = Aff.Milliseconds (1000.0 * 60.0 * 5.0) | ||
|
|
||
| loop n = do | ||
| Fetch.Retry.withRetryRequest healthchecksUrl {} >>= case _ of | ||
| Succeeded { status } | status == 200 -> do | ||
| Aff.delay fiveMinutes | ||
| loop n | ||
|
|
||
| Cancelled | n >= 0 -> do | ||
| Console.warn $ "Healthchecks cancelled, will retry..." | ||
| Aff.delay oneMinute | ||
| loop (n - 1) | ||
|
|
||
| Failed error | n >= 0 -> do | ||
| Console.warn $ "Healthchecks failed, will retry: " <> Fetch.Retry.printRetryRequestError error | ||
| Aff.delay oneMinute | ||
| loop (n - 1) | ||
|
|
||
| Succeeded { status } | status /= 200, n >= 0 -> do | ||
| Console.error $ "Healthchecks returned non-200 status, will retry: " <> show status | ||
| Aff.delay oneMinute | ||
| loop (n - 1) | ||
|
|
||
| Cancelled -> do | ||
| Console.error | ||
| "Healthchecks cancelled and failure limit reached, will not retry." | ||
|
|
||
| Failed error -> do | ||
| Console.error $ "Healthchecks failed and failure limit reached, will not retry: " <> Fetch.Retry.printRetryRequestError error | ||
|
|
||
| Succeeded _ -> do | ||
| Console.error "Healthchecks returned non-200 status and failure limit reached, will not retry." | ||
|
|
||
| jobExecutor :: ServerEnv -> Aff Unit | ||
| jobExecutor env = do | ||
| loop initialRestartDelay | ||
| where | ||
| initialRestartDelay = Milliseconds 100.0 | ||
|
|
||
| loop restartDelay = do | ||
| start <- nowUTC | ||
| result <- JobExecutor.runJobExecutor env | ||
| end <- nowUTC | ||
|
|
||
| Console.error case result of | ||
| Left error -> "Job executor failed: " <> Aff.message error | ||
| Right _ -> "Job executor exited for no reason." | ||
|
|
||
| -- This is a heuristic: if the executor keeps crashing immediately, we | ||
| -- restart with an exponentially increasing delay, but once the executor | ||
| -- had a run longer than a minute, we start over with a small delay. | ||
| let | ||
| nextRestartDelay | ||
| | end `diff` start > Seconds 60.0 = initialRestartDelay | ||
| | otherwise = restartDelay <> restartDelay | ||
|
|
||
| Aff.delay nextRestartDelay | ||
| loop nextRestartDelay |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure we want to just delete incomplete jobs? Should they instead be re-inserted into the queue? For example, reset
startedAtto NULL so the job is picked up again.We may also want to put more effort into making partially-completed jobs more recoverable, such as making operations as close to idempotent as possible and having a way to sweep through and catch any only partially-complete operations and decide whether to roll them back or to retry and complete them.