Skip to content

Commit

Permalink
Merge pull request #45 from mbuhot/feature/add_new_retrying_state_to_…
Browse files Browse the repository at this point in the history
…state_machine

Add new retry state to state machine
  • Loading branch information
mbuhot authored Oct 24, 2019
2 parents 76a4c1d + 9aa3aa1 commit 91d32e1
Show file tree
Hide file tree
Showing 10 changed files with 301 additions and 36 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,19 @@ Control the time for which the job is reserved while waiting for a worker to pic
config :ecto_job, :reservation_timeout, 15_000
```

Control the timeout for job execution before a job will be made available for retry. Begins when job is picked up by worker. Keep in mind, for jobs that are expected to retry quickly, any configured `execution_timeout` will only retry a job as quickly as the `poll_interval`. The default is `300_000` ms (5 mins).
Control the delay between retries following a job execution failure. Keep in mind, for jobs that are expected to retry quickly, any configured `retry_timeout` will only retry a job as quickly as the `poll_interval`. The default is `30_000` ms (30 seconds).

```
config :ecto_job, :retry_timeout, 30_000
```

Control the timeout for job execution before an "IN_PROGRESS" job is assumed to have failed. Begins when job is picked up by worker. Similarly to `retry_timeout`, any configured `execution_timeout` will only retry a job as quickly as the `poll_interval`. The default is `300_000` ms (5 mins).

```
config :ecto_job, :execution_timeout, 300_000
```


You can control whether logs are on or off and the log level. The default is `true` and `:info`.

```
Expand Down Expand Up @@ -252,7 +259,8 @@ Once a consumer is given a job, it increments the attempt counter and updates th
If the job is being retried, the expiry will be initial timeout * the attempt counter.

If successful, the consumer can delete the job from the queue using the preloaded multi passed to the `perform/2` job handler.
If an exception is raised in the worker or a successful processing attempt fails to successfully commit the preloaded multi, the job is not deleted and remains in the "IN_PROGRESS" state until it expires.
If an exception is raised in the worker or a successful processing attempt fails to successfully commit the preloaded multi, the job is transitioned to the "RETRY" state, scheduled to run again after `retry_timeout` * attempt counter.
If the processes is killed or is otherwise unable to transition to "RETRY", it will remain in "IN_PROGRESS" until the `execution_timeout` expires.

Jobs in the "RESERVED" or "IN_PROGRESS" state past the expiry time will be returned to the "AVAILABLE" state.

Expand Down
2 changes: 1 addition & 1 deletion examples/ecto_job_demo/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ config :ecto_job_demo, ecto_repos: [EctoJobDemo.Repo]
config :ecto_job_demo, EctoJobDemo.Repo,
adapter: Ecto.Adapters.Postgres,
username: "postgres",
password: "postgres",
password: "password",
database: "ecto_job_demo",
hostname: "localhost",
pool_size: 10
Expand Down
3 changes: 3 additions & 0 deletions lib/ecto_job/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule EctoJob.Config do
- `poll_interval`: (Default `60_000`) Time in milliseconds between polling the `JobQueue` for scheduled jobs or jobs due to be retried
- `reservation_timeout`: (Default `60_000`) Time in ms during which a `RESERVED` job state is held while waiting for a worker to start the job. Subsequent polls will return the job to the `AVAILABLE` state for retry.
- `execution_timeout`: (Default `300_000`) Time in ms that a worker is allotted to hold a job in the `IN_PROGRESS` state before subsequent polls return a job to the `AVAILABLE` state for retry. The timeout is extended by `execution_timeout` for every retry attempt until `max_attemps` is reached for a given job.
- `retry_timeout`: (Default `30_000`) Time in ms that a job will stay in the `RETRY` state before subsequent polls return a job to the `AVAILABLE` state for retry. The timeout is extended by `retry_timeout` for every retry attempt until `max_attemps` is reached for a given job.
- `notifications_listen_timeout`: (Default `5_000`) Time in milliseconds that Notifications.listen!/3 is alloted to start listening to notifications from postgrex for new jobs
"""

Expand All @@ -36,6 +37,7 @@ defmodule EctoJob.Config do
log_level: :info,
poll_interval: 60_000,
reservation_timeout: 60_000,
retry_timeout: 30_000,
execution_timeout: 300_000,
notifications_listen_timeout: 5_000

Expand All @@ -54,6 +56,7 @@ defmodule EctoJob.Config do
log_level: :info,
poll_interval: 60_000,
reservation_timeout: 60_000,
retry_timeout: 30_000,
execution_timeout: 300_000,
notifications_listen_timeout: 5_000
}
Expand Down
54 changes: 47 additions & 7 deletions lib/ecto_job/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ defmodule EctoJob.JobQueue do
- `"AVAILABLE"`: The job is availble to be run by the next available worker
- `"RESERVED"`: The job has been reserved by a worker for execution
- `"IN_PROGRESS"`: The job is currently being worked
- `"RETRY"`: The job has failed and it's waiting for a retry
- `"FAILED"`: The job has exceeded the `max_attempts` and will not be retried again
"""
@type state :: String.t()
Expand All @@ -62,13 +63,18 @@ defmodule EctoJob.JobQueue do
@doc """
Job execution callback to be implemented by each `JobQueue` module.
The return type is the same as `Ecto.Repo.transaction/1`.
## Example
@impl true
def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params)
def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params)
"""
@callback perform(multi :: Multi.t(), params :: map) :: any()
@callback perform(multi :: Multi.t(), params :: map) ::
{:ok, any()}
| {:error, any()}
| {:error, Ecto.Multi.name(), any(), %{required(Ecto.Multi.name()) => any()}}

defmacro __using__(opts) do
table_name = Keyword.fetch!(opts, :table_name)
Expand Down Expand Up @@ -207,7 +213,7 @@ defmodule EctoJob.JobQueue do
end

@doc """
Updates all jobs in the `"SCHEDULED"` state with scheduled time <= now to `"AVAILABLE"` state.
Updates all jobs in the `"SCHEDULED"` and `"RETRY"` state with scheduled time <= now to `"AVAILABLE"` state.
Returns the number of jobs updated.
"""
Expand All @@ -217,7 +223,7 @@ defmodule EctoJob.JobQueue do
repo.update_all(
Query.from(
job in schema,
where: job.state == "SCHEDULED",
where: job.state in ["SCHEDULED", "RETRY"],
where: job.schedule <= ^now
),
set: [state: "AVAILABLE", updated_at: now]
Expand Down Expand Up @@ -339,7 +345,7 @@ defmodule EctoJob.JobQueue do
set: [
attempt: job.attempt + 1,
state: "IN_PROGRESS",
expires: progress_expiry(now, job.attempt + 1, timeout_ms),
expires: increase_time(now, job.attempt + 1, timeout_ms),
updated_at: now
]
)
Expand All @@ -351,10 +357,44 @@ defmodule EctoJob.JobQueue do
end

@doc """
Computes the expiry time for an `"IN_PROGRESS"` job based on the current time and attempt counter.
Transitions a job from `"IN_PROGRESS"` to `"RETRY" or "FAILED" after execution failure.
If the job has exceeded the configured `max_attempts` the state will move to "FAILED",
otherwise the state is transitioned to `"RETRY"` and changes the schedule time so the
job will be picked up again.
"""
@spec job_failed(repo, job, DateTime.t(), integer) :: {:ok, job} | :error
def job_failed(repo, job = %schema{}, now, retry_timeout_ms) do
updates =
if job.attempt >= job.max_attempts do
[state: "FAILED", expires: nil]
else
[state: "RETRY", schedule: increase_time(now, job.attempt + 1, retry_timeout_ms)]
end

{count, results} =
repo.update_all(
Query.from(
j in schema,
where: j.id == ^job.id,
where: j.state == "IN_PROGRESS",
where: j.attempt == ^job.attempt,
select: j
),
set: updates
)

case {count, results} do
{0, _} -> :error
{1, [job]} -> {:ok, job}
end
end

@doc """
Computes the expiry time for an `"IN_PROGRESS"` and schedule time of "RETRY" jobs based on the current time and attempt counter.
"""
@spec progress_expiry(DateTime.t(), integer, integer) :: DateTime.t()
def progress_expiry(now = %DateTime{}, attempt, timeout_ms) do
@spec increase_time(DateTime.t(), integer, integer) :: DateTime.t()
def increase_time(now = %DateTime{}, attempt, timeout_ms) do
timeout_ms |> Kernel.*(attempt) |> Integer.floor_div(1000) |> advance_seconds(now)
end

Expand Down
56 changes: 45 additions & 11 deletions lib/ecto_job/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,64 @@ defmodule EctoJob.Worker do
@doc """
Equivalent to `start_link(config, job, DateTime.utc_now())`
"""
@spec start_link(Config.t, EctoJob.JobQueue.job()) :: {:ok, pid}
@spec start_link(Config.t(), EctoJob.JobQueue.job()) :: {:ok, pid}
def start_link(config, job), do: start_link(config, job, DateTime.utc_now())

@doc """
Start a worker process given a repo module and a job struct
This may fail if the job reservation has expired, in which case the job will be
reactivated by the producer.
"""
@spec start_link(Config.t, EctoJob.JobQueue.job(), DateTime.t()) :: {:ok, pid}
def start_link(config = %Config{repo: repo, execution_timeout: timeout}, job = %queue{}, now) do
Task.start_link(fn ->
with {:ok, job} <- JobQueue.update_job_in_progress(repo, job, now, timeout) do
queue.perform(JobQueue.initial_multi(job), job.params)
log_duration(config, job, now)
notify_completed(repo, job)
@spec start_link(Config.t(), EctoJob.JobQueue.job(), DateTime.t()) :: {:ok, pid}
def start_link(config, job, now) do
Task.start_link(fn -> do_work(config, job, now) end)
end

@doc false
@spec do_work(Config.t(), EctoJob.JobQueue.job(), DateTime.t()) :: :ok | {:error, any()}
def do_work(config = %Config{repo: repo, execution_timeout: exec_timeout}, job, now) do
with {:ok, in_progress_job} <- JobQueue.update_job_in_progress(repo, job, now, exec_timeout),
:ok <- run_job(config, in_progress_job) do
log_duration(config, in_progress_job, now)
notify_completed(repo, in_progress_job)
else
{:error, reason} -> {:error, reason}
end
end

@spec run_job(Config.t(), EctoJob.JobQueue.job()) :: :ok
defp run_job(%Config{repo: repo, retry_timeout: timeout}, job = %queue{}) do
job_failed = fn ->
_ = JobQueue.job_failed(repo, job, DateTime.utc_now(), timeout)
:ok
end

try do
case queue.perform(JobQueue.initial_multi(job), job.params) do
{:ok, _value} -> :ok
{:error, _value} -> job_failed.()
{:error, _failed_operation, _failed_value, _changes_so_far} -> job_failed.()
end
end)
rescue
e ->
# An exception occurred, make an attempt to put the job into the RETRY state
# before propagating the exception
stacktrace = System.stacktrace()
job_failed.()
reraise(e, stacktrace)
end
end

@spec log_duration(Config.t, EctoJob.JobQueue.job(), DateTime.t()) :: :ok
defp log_duration(%Config{log: true, log_level: log_level}, _job = %queue{id: id}, start = %DateTime{}) do
@spec log_duration(Config.t(), EctoJob.JobQueue.job(), DateTime.t()) :: :ok
defp log_duration(
%Config{log: true, log_level: log_level},
_job = %queue{id: id},
start = %DateTime{}
) do
duration = DateTime.diff(DateTime.utc_now(), start, :microsecond)
Logger.log(log_level, fn -> "#{queue}[#{id}] done: #{duration} µs" end)
end

defp log_duration(_config, _job, _start), do: :ok

@spec notify_completed(repo, EctoJob.JobQueue.job()) :: :ok
Expand Down
97 changes: 86 additions & 11 deletions test/job_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,16 @@ defmodule EctoJob.JobQueueTest do
|> Ecto.Multi.to_list()

assert [
requeue_job:
{
:update,
%Ecto.Changeset{
action: :update,
data: %EctoJob.Test.JobQueue{},
changes: %{attempt: 0, state: "SCHEDULED"}
},
[]
}
] = multi
requeue_job: {
:update,
%Ecto.Changeset{
action: :update,
data: %EctoJob.Test.JobQueue{},
changes: %{attempt: 0, state: "SCHEDULED"}
},
[]
}
] = multi
end

test "Returns an error when job status is not FAILED" do
Expand Down Expand Up @@ -147,6 +146,22 @@ defmodule EctoJob.JobQueueTest do
assert count == 0
assert Repo.get(EctoJob.Test.JobQueue, job.id).state == "RESERVED"
end

test "Updates a scheduled RETRY job to AVAILABLE" do
schedule = DateTime.from_naive!(~N[2017-08-17T12:23:34.000000Z], "Etc/UTC")
now = DateTime.from_naive!(~N[2017-08-17T12:24:00.000000Z], "Etc/UTC")

%{id: id} =
EctoJob.Test.JobQueue.new(%{})
|> Map.put(:state, "RETRY")
|> Map.put(:schedule, schedule)
|> Repo.insert!()

count = EctoJob.JobQueue.activate_scheduled_jobs(Repo, EctoJob.Test.JobQueue, now)

assert count == 1
assert Repo.get(EctoJob.Test.JobQueue, id).state == "AVAILABLE"
end
end

describe "JobQueue.activate_expired_jobs" do
Expand Down Expand Up @@ -397,4 +412,64 @@ defmodule EctoJob.JobQueueTest do
)
end
end

describe "JobQueue.job_failed" do
test "Does not update state if different than IN_PROGRESS" do
expiry = DateTime.from_naive!(~N[2017-08-17T12:23:34.000000Z], "Etc/UTC")
now = DateTime.from_naive!(~N[2017-08-17T12:20:00.000000Z], "Etc/UTC")

job =
EctoJob.Test.JobQueue.new(%{})
|> Map.put(:state, "AVAILABLE")
|> Map.put(:expires, expiry)
|> Repo.insert!()

assert :error = EctoJob.JobQueue.job_failed(Repo, job, now, @default_timeout)
assert job.state == "AVAILABLE"
assert job.attempt == 0
end

test "Moves from IN_PROGRESS to RETRY and increase the schedule" do
expiry = DateTime.from_naive!(~N[2017-08-17T12:23:34.000000Z], "Etc/UTC")
schedule = DateTime.from_naive!(~N[2017-08-17T12:23:34.000000Z], "Etc/UTC")
now = DateTime.from_naive!(~N[2017-08-17T12:20:00.000000Z], "Etc/UTC")

job =
EctoJob.Test.JobQueue.new(%{})
|> Map.put(:state, "IN_PROGRESS")
|> Map.put(:attempt, 1)
|> Map.put(:schedule, schedule)
|> Map.put(:expires, expiry)
|> Repo.insert!()

{:ok, new_job} = EctoJob.JobQueue.job_failed(Repo, job, now, @default_timeout)

assert new_job.state == "RETRY"
assert new_job.attempt == 1
assert DateTime.compare(expiry, new_job.expires) == :eq
assert DateTime.compare(schedule, new_job.schedule) == :lt
assert Repo.all(Query.from(EctoJob.Test.JobQueue, where: [state: "IN_PROGRESS"])) == []
end

test "Moves from IN_PROGRESS to FAILED after max attempts" do
expiry = DateTime.from_naive!(~N[2017-08-17T12:23:34.000000Z], "Etc/UTC")
schedule = DateTime.from_naive!(~N[2017-08-17T12:23:34.000000Z], "Etc/UTC")
now = DateTime.from_naive!(~N[2017-08-17T12:20:00.000000Z], "Etc/UTC")

job =
EctoJob.Test.JobQueue.new(%{}, max_attempts: 5)
|> Map.put(:state, "IN_PROGRESS")
|> Map.put(:attempt, 5)
|> Map.put(:schedule, schedule)
|> Map.put(:expires, expiry)
|> Repo.insert!()

{:ok, new_job} = EctoJob.JobQueue.job_failed(Repo, job, now, @default_timeout)

assert new_job.state == "FAILED"
assert new_job.attempt == 5
assert DateTime.compare(schedule, new_job.schedule) == :eq
assert Repo.all(Query.from(EctoJob.Test.JobQueue, where: [state: "IN_PROGRESS"])) == []
end
end
end
9 changes: 9 additions & 0 deletions test/support/exception_job_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule EctoJob.Test.ExceptionJobQueue do
# credo:disable-for-this-file

use EctoJob.JobQueue, table_name: "jobs"

def perform(_multi, _params) do
raise "Exception"
end
end
7 changes: 3 additions & 4 deletions test/support/job_queue.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
defmodule EctoJob.Test.JobQueue do
# credo:disable-for-this-file

@moduledoc false
use EctoJob.JobQueue, table_name: "jobs"

def perform(multi, params) do
IO.inspect({multi, params})
def perform(multi, _params) do
EctoJob.Test.Repo.transaction(multi)
end
end
11 changes: 11 additions & 0 deletions test/support/transaction_fail_job_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule EctoJob.Test.TransactionFailJobQueue do
# credo:disable-for-this-file

use EctoJob.JobQueue, table_name: "jobs"

def perform(multi, _params) do
multi
|> Ecto.Multi.run(:send, fn _,_ -> {:error, "Error"} end)
|> EctoJob.Test.Repo.transaction()
end
end
Loading

0 comments on commit 91d32e1

Please sign in to comment.