Skip to content

Commit

Permalink
Merge pull request #77 from mbuhot/fix-job-queue-specs
Browse files Browse the repository at this point in the history
Adapt JobQueue typespecs to params_type
  • Loading branch information
mbuhot authored Sep 15, 2020
2 parents 6e26eea + fde010c commit d8cf50f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
38 changes: 27 additions & 11 deletions lib/ecto_job/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule EctoJob.JobQueue do
@typedoc """
A job `Ecto.Schema` struct.
"""
@type params :: map() | any()
@type job :: %{
__struct__: module,
__meta__: Ecto.Schema.Metadata.t(),
Expand All @@ -53,7 +54,7 @@ defmodule EctoJob.JobQueue do
schedule: DateTime.t() | nil,
attempt: integer,
max_attempts: integer | nil,
params: map() | any(),
params: params(),
notify: String.t() | nil,
priority: integer,
inserted_at: DateTime.t() | nil,
Expand All @@ -71,7 +72,7 @@ defmodule EctoJob.JobQueue do
def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params)
def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params)
"""
@callback perform(multi :: Multi.t(), params :: map() | any()) ::
@callback perform(multi :: Multi.t(), params :: params()) ::
{:ok, any()}
| {:error, any()}
| {:error, Ecto.Multi.name(), any(), %{required(Ecto.Multi.name()) => any()}}
Expand All @@ -89,7 +90,21 @@ defmodule EctoJob.JobQueue do
params_type: params_type
] do
use Ecto.Schema
require EctoJob.JobQueue.Helpers

@behaviour EctoJob.JobQueue
@before_compile EctoJob.JobQueue

params_spec =
case params_type do
:map -> :map
:binary -> {:term, [], []}
end

@type params :: unquote(params_spec)

@table_name table_name
@params_type params_type

if schema_prefix do
@schema_prefix schema_prefix
Expand All @@ -98,8 +113,13 @@ defmodule EctoJob.JobQueue do
if timestamps_opts do
@timestamps_opts timestamps_opts
end
end
end

schema table_name do
@doc false
defmacro __before_compile__(_env) do
quote do
schema @table_name do
# SCHEDULED, RESERVED, IN_PROGRESS, FAILED
field(:state, :string)
# Time at which reserved/in_progress jobs can be reset to SCHEDULED
Expand All @@ -111,7 +131,7 @@ defmodule EctoJob.JobQueue do
# Maximum attempts before this job is FAILED
field(:max_attempts, :integer)
# Job params, serialized as JSONB or Elixir/Erlang term
field(:params, params_type)
field(:params, @params_type)
# Payload used to notify that job has completed
field(:notify, :string)
# Used to prioritize the job execution
Expand Down Expand Up @@ -160,15 +180,15 @@ defmodule EctoJob.JobQueue do
- `:priority` (integer): lower numbers run first; default is 0
- `:notify` (string): payload to use for Postgres notification upon job completion
"""
@spec new(map() | any(), Keyword.t()) :: EctoJob.JobQueue.job()
@spec new(params(), Keyword.t()) :: EctoJob.JobQueue.job()
def new(params, opts \\ []) do
%__MODULE__{
state: if(opts[:schedule], do: "SCHEDULED", else: "AVAILABLE"),
expires: nil,
schedule: Keyword.get(opts, :schedule, DateTime.utc_now()),
attempt: 0,
max_attempts: opts[:max_attempts],
params: serialize_params(params, unquote(params_type)),
params: EctoJob.JobQueue.Helpers.__serialize_params__(params, @params_type),
notify: opts[:notify],
priority: Keyword.get(opts, :priority, 0)
}
Expand All @@ -186,7 +206,7 @@ defmodule EctoJob.JobQueue do
|> MyApp.Job.enqueue("send_welcome_email", %{"type" => "SendWelcomeEmail", "user" => user_params})
|> MyApp.Repo.transaction()
"""
@spec enqueue(Multi.t(), term, map, Keyword.t()) :: Multi.t()
@spec enqueue(Multi.t(), term, params(), Keyword.t()) :: Multi.t()
def enqueue(multi, name, params, opts \\ []) do
Multi.insert(multi, name, new(params, opts))
end
Expand All @@ -212,10 +232,6 @@ defmodule EctoJob.JobQueue do
end

def requeue(_, _, _), do: {:error, :non_failed_job}

@spec serialize_params(map() | any(), atom()) :: map() | binary()
defp serialize_params(params, :binary), do: :erlang.term_to_binary(params)
defp serialize_params(params, :map), do: params
end
end

Expand Down
6 changes: 6 additions & 0 deletions lib/ecto_job/job_queue/helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule EctoJob.JobQueue.Helpers do
@moduledoc false

def __serialize_params__(params, :map) when is_map(params), do: params
def __serialize_params__(params, :binary), do: :erlang.term_to_binary(params)
end

0 comments on commit d8cf50f

Please sign in to comment.