diff --git a/src/poolboy.erl b/src/poolboy.erl index db20541..bbeead2 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -5,18 +5,17 @@ -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, transaction/3, child_spec/2, child_spec/3, child_spec/4, start/1, - start/2, start_link/1, start_link/2, stop/1, status/1]). + start/2, start_link/1, start_link/2, stop/1, status_map/1, status/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export_type([pool/0]). -define(TIMEOUT, 5000). +-define(DEFAULT_SIZE, 5). +-define(DEFAULT_TYPE, tuple). +-define(DEFAULT_STRATEGY, lifo). +-define(DEFAULT_OVERFLOW, 10). --ifdef(pre17). --type pid_queue() :: queue(). --else. --type pid_queue() :: queue:queue(). --endif. -ifdef(OTP_RELEASE). %% this implies 21 or higher -define(EXCEPTION(Class, Reason, Stacktrace), Class:Reason:Stacktrace). @@ -37,16 +36,27 @@ -type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}. -record(state, { - supervisor :: undefined | pid(), - workers :: undefined | pid_queue(), - waiting :: pid_queue(), + supervisor :: pid(), + worker_module :: atom(), + workers :: poolboy_worker_collection:coll() | poolboy_worker_collection:coll(pid()), + waiting :: poolboy_collection:pid_queue() | poolboy_collection:pid_queue(tuple()), monitors :: ets:tid(), - size = 5 :: non_neg_integer(), - overflow = 0 :: non_neg_integer(), - max_overflow = 10 :: non_neg_integer(), - strategy = lifo :: lifo | fifo + mrefs :: ets:tid(), + crefs :: ets:tid(), + size = ?DEFAULT_SIZE :: non_neg_integer(), + overflow :: poolboy_worker_collection:coll() | poolboy_worker_collection:coll(pid()), + max_overflow = ?DEFAULT_OVERFLOW :: non_neg_integer() }). +-type status_key() :: + state | available | overflow | monitored | waiting. + +-type state_name() :: + full | overflow | ready. + +-type status_map() :: + #{status_key() := integer() | state_name()}. + -spec checkout(Pool :: pool()) -> pid(). checkout(Pool) -> checkout(Pool, true). @@ -141,38 +151,138 @@ start_link(PoolArgs, WorkerArgs) -> stop(Pool) -> gen_server:call(Pool, stop). +-spec status_map(Pool :: pool()) -> status_map(). +status_map(Pool) -> + gen_server:call(Pool, status_map). + -spec status(Pool :: pool()) -> {atom(), integer(), integer(), integer()}. status(Pool) -> gen_server:call(Pool, status). init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), + + WorkerModule = worker_module(PoolArgs), + Supervisor = + case worker_supervisor(PoolArgs) of + undefined -> + start_supervisor(WorkerModule, WorkerArgs); + Sup when is_pid(Sup) -> + monitor(process, Sup), + Sup + end, + Size = pool_size(PoolArgs), + Type = pool_type(PoolArgs), + Strategy = strategy(PoolArgs), + Workers = init_workers(Supervisor, WorkerModule, Size, Type, Strategy), + + MaxOverflow = max_overflow(PoolArgs), + Overflow = init_overflow(Size, MaxOverflow, Type, Strategy), + Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). - -init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> - {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), - init(Rest, WorkerArgs, State#state{supervisor = Sup}); -init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> - init(Rest, WorkerArgs, State#state{size = Size}); -init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> - init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); -init([{strategy, lifo} | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State#state{strategy = lifo}); -init([{strategy, fifo} | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State#state{strategy = fifo}); -init([_ | Rest], WorkerArgs, State) -> - init(Rest, WorkerArgs, State); -init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> - Workers = prepopulate(Size, Sup), - {ok, State#state{workers = Workers}}. - -handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> + MRefs = ets:new(mrefs, [private]), + CRefs = ets:new(crefs, [private]), + {ok, #state{ + supervisor = Supervisor, + worker_module = WorkerModule, + workers = Workers, + waiting = Waiting, + monitors = Monitors, + mrefs = MRefs, + crefs = CRefs, + size = Size, + overflow = Overflow, + max_overflow = MaxOverflow + }}. + +start_supervisor(undefined, _WorkerArgs) -> + error({badarg, "worker_module or worker_supervisor is required"}); +start_supervisor(WorkerModule, WorkerArgs) -> + start_supervisor(WorkerModule, WorkerArgs, 1). + +start_supervisor(WorkerModule, WorkerArgs, Retries) -> + case poolboy_sup:start_link(WorkerModule, WorkerArgs) of + {ok, NewPid} -> + NewPid; + {error, {already_started, Pid}} when Retries > 0 -> + MRef = erlang:monitor(process, Pid), + receive {'DOWN', MRef, _, _, _} -> ok + after ?TIMEOUT -> ok + end, + start_supervisor(WorkerModule, WorkerArgs, Retries - 1); + {error, Error} -> + exit({no_worker_supervisor, Error}) + end. + +init_workers(Sup, Mod, Size, Type, Strategy) -> + Fun = fun(Idx) -> new_worker(Sup, Mod, Idx) end, + poolboy_worker_collection:new(Type, Size, Strategy, Fun). + +init_overflow(Size, MaxOverflow, Type, Strategy) -> + Fun = fun(Idx) -> Size + Idx end, + poolboy_worker_collection:new(Type, MaxOverflow, Strategy, Fun). + +worker_module(PoolArgs) -> + Is = is_atom(V = proplists:get_value(worker_module, PoolArgs)), + if not Is -> undefined; true -> V end. + +worker_supervisor(PoolArgs) -> + case find_pid(V = proplists:get_value(worker_supervisor, PoolArgs)) of + Res = undefined when Res =:= V -> Res; + Res when is_pid(Res) -> Res; + Res = undefined when Res =/= V -> exit({noproc, V}); + Res -> exit({Res, V}) + end. + +find_pid(undefined) -> + undefined; +find_pid(Name) when is_atom(Name) -> + find_pid({local, Name}); +find_pid({local, Name}) -> + whereis(Name); +find_pid({global, Name}) -> + find_pid({via, global, Name}); +find_pid({via, Registry, Name}) -> + Registry:whereis_name(Name); +find_pid({Name, Node}) -> + (catch erlang:monitor_node(Node, true)), + try rpc_call(Node, erlang, whereis, [Name], ?TIMEOUT) + catch _:Reason -> Reason + end. + +rpc_call(Node, Mod, Fun, Args, Timeout) -> + case rpc:call(Node, Mod, Fun, Args, Timeout) of + {badrpc, Reason} -> exit({Reason, {Node, {Mod, Fun, Args}}}); + Result -> Result + end. + +pool_size(PoolArgs) -> + Is = is_integer(V = proplists:get_value(size, PoolArgs)), + if not Is -> ?DEFAULT_SIZE; true -> V end. + +-define(IS_COLLECTION_TYPE(T), lists:member(T, [list,array,tuple,queue])). +pool_type(PoolArgs) -> + Is = ?IS_COLLECTION_TYPE(V = proplists:get_value(type, PoolArgs)), + if not Is -> ?DEFAULT_TYPE; true -> V end. + +max_overflow(PoolArgs) -> + Is = is_integer(V = proplists:get_value(max_overflow, PoolArgs)), + if not Is -> ?DEFAULT_OVERFLOW; true -> V end. + +-define(IS_STRATEGY(S), lists:member(S, [lifo, fifo, rand])). +strategy(PoolArgs) -> + Is = ?IS_STRATEGY(V = proplists:get_value(strategy, PoolArgs)), + if not Is -> ?DEFAULT_STRATEGY; true -> V end. + +handle_cast({checkin, Pid}, State) -> + #state{monitors = Monitors, mrefs = MRefs, crefs = CRefs} = State, case ets:lookup(Monitors, Pid) of - [{Pid, _, MRef}] -> - true = erlang:demonitor(MRef), + [{Pid, CRef, MRef}] -> + true = erlang:demonitor(MRef, [flush]), true = ets:delete(Monitors, Pid), + true = ets:delete(MRefs, MRef), + true = ets:delete(CRefs, CRef), NewState = handle_checkin(Pid, State), {noreply, NewState}; [] -> @@ -180,12 +290,9 @@ handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> end; handle_cast({cancel_waiting, CRef}, State) -> - case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of - [[Pid, MRef]] -> - demonitor(MRef, [flush]), - true = ets:delete(State#state.monitors, Pid), - NewState = handle_checkin(Pid, State), - {noreply, NewState}; + case ets:lookup(State#state.crefs, CRef) of + [{CRef, Pid}] -> + handle_cast({checkin, Pid}, State); [] -> Cancel = fun({_, Ref, MRef}) when Ref =:= CRef -> demonitor(MRef, [flush]), @@ -202,37 +309,66 @@ handle_cast(_Msg, State) -> handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) -> #state{supervisor = Sup, + worker_module = Mod, workers = Workers, monitors = Monitors, + mrefs = MRefs, + crefs = CRefs, overflow = Overflow, - max_overflow = MaxOverflow, - strategy = Strategy} = State, - case get_worker_with_strategy(Workers, Strategy) of - {{value, Pid}, Left} -> + max_overflow = MaxOverflow} = State, + OverflowLeft = poolboy_worker_collection:length(visible, Overflow), + case poolboy_worker_collection:checkout(Workers) of + {Pid, Left} when is_pid(Pid) -> MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), + true = ets:insert(MRefs, {MRef, Pid}), + true = ets:insert(CRefs, {CRef, Pid}), {reply, Pid, State#state{workers = Left}}; - {empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow -> - {Pid, MRef} = new_worker(Sup, FromPid), + empty when MaxOverflow > 0, OverflowLeft > 0 -> + {NextIdx, NewOverflow} = poolboy_worker_collection:checkout(Overflow), + Pid = new_worker(Sup, Mod, NextIdx), + {Pid, NewerOverflow} = poolboy_worker_collection:replace(NextIdx, Pid, NewOverflow), + MRef = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, CRef, MRef}), - {reply, Pid, State#state{overflow = Overflow + 1}}; - {empty, _Left} when Block =:= false -> + true = ets:insert(MRefs, {MRef, Pid}), + true = ets:insert(CRefs, {CRef, Pid}), + {reply, Pid, State#state{overflow = NewerOverflow}}; + empty when Block =:= false -> {reply, full, State}; - {empty, _Left} -> + empty -> MRef = erlang:monitor(process, FromPid), Waiting = queue:in({From, CRef, MRef}, State#state.waiting), {noreply, State#state{waiting = Waiting}} end; +handle_call(status_map, _From, State) -> + #state{workers = Workers, + monitors = Monitors, + overflow = Overflow, + max_overflow = MaxOverflow} = State, + StateName = state_name(State), + OverflowLeft = poolboy_worker_collection:length(visible, Overflow), + OverflowLevel = MaxOverflow - OverflowLeft, + {reply, #{state => StateName, + available => poolboy_worker_collection:length(visible, Workers), + overflow => OverflowLevel, + monitored => ets:info(Monitors, size), + waiting => queue:len(State#state.waiting)}, State}; handle_call(status, _From, State) -> #state{workers = Workers, monitors = Monitors, - overflow = Overflow} = State, + overflow = Overflow, + max_overflow = MaxOverflow} = State, StateName = state_name(State), - {reply, {StateName, queue:len(Workers), Overflow, ets:info(Monitors, size)}, State}; + VisibleWorkers = poolboy_worker_collection:length(visible, Workers), + OverflowLeft = poolboy_worker_collection:length(visible, Overflow), + OverflowLevel = MaxOverflow - OverflowLeft, + MonitorSize = ets:info(Monitors, size), + {reply, {StateName, VisibleWorkers, OverflowLevel, MonitorSize}, State}; handle_call(get_avail_workers, _From, State) -> - Workers = State#state.workers, - {reply, Workers, State}; + {reply, poolboy_worker_collection:all(visible, State#state.workers), State}; +handle_call(get_any_worker, _From, State) -> + {reply, poolboy_worker_collection:rand(known, State#state.workers), State}; handle_call(get_all_workers, _From, State) -> Sup = State#state.supervisor, WorkerList = supervisor:which_children(Sup), @@ -247,42 +383,44 @@ handle_call(_Msg, _From, State) -> Reply = {error, invalid_message}, {reply, Reply, State}. +handle_info({'DOWN', _, process, Pid, Reason}, State = #state{supervisor = Pid}) -> + {stop, Reason, State}; handle_info({'DOWN', MRef, _, _, _}, State) -> - case ets:match(State#state.monitors, {'$1', '_', MRef}) of - [[Pid]] -> - true = ets:delete(State#state.monitors, Pid), - NewState = handle_checkin(Pid, State), - {noreply, NewState}; + case ets:lookup(State#state.mrefs, MRef) of + [{MRef, Pid}] -> + handle_cast({checkin, Pid}, State); [] -> Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting), {noreply, State#state{waiting = Waiting}} end; +handle_info({'EXIT', _Pid, noconnection = Reason}, State) -> + {stop, Reason, State}; +handle_info({'EXIT', Pid, Reason}, State = #state{supervisor = Pid}) -> + {stop, Reason, State}; handle_info({'EXIT', Pid, _Reason}, State) -> - #state{supervisor = Sup, - monitors = Monitors} = State, + #state{monitors = Monitors, + mrefs = MRefs, + crefs = CRefs} = State, case ets:lookup(Monitors, Pid) of - [{Pid, _, MRef}] -> - true = erlang:demonitor(MRef), + [{Pid, CRef, MRef}] -> + true = erlang:demonitor(MRef, [flush]), true = ets:delete(Monitors, Pid), - NewState = handle_worker_exit(Pid, State), - {noreply, NewState}; + true = ets:delete(MRefs, MRef), + true = ets:delete(CRefs, CRef); [] -> - case queue:member(Pid, State#state.workers) of - true -> - W = filter_worker_by_pid(Pid, State#state.workers), - {noreply, State#state{workers = queue:in(new_worker(Sup), W)}}; - false -> - {noreply, State} - end - end; - + ok + end, + NewState = handle_worker_exit(Pid, State), + {noreply, NewState}; +handle_info({nodedown, Node}, State = #state{supervisor = Sup}) + when Node == erlang:node(Sup) -> + {stop, nodedown, State}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State) -> - Workers = queue:to_list(State#state.workers), - ok = lists:foreach(fun (W) -> unlink(W) end, Workers), - true = exit(State#state.supervisor, shutdown), +terminate(Reason, State = #state{supervisor = Sup}) -> + poolboy_worker_collection:filter(fun (W) -> catch not unlink(W) end, State#state.workers), + stop_supervisor(Reason, Sup), ok. code_change(_OldVsn, State, _Extra) -> @@ -296,82 +434,139 @@ start_pool(StartFun, PoolArgs, WorkerArgs) -> gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, []) end. -new_worker(Sup) -> - {ok, Pid} = supervisor:start_child(Sup, []), +new_worker(Sup, Mod, Index) -> + Node = erlang:node(Sup), + {ok, Pid} = + case rpc_call(Node, erlang, process_info, [Sup, registered_name], ?TIMEOUT) of + {registered_name, Name} -> + case function_exported(Node, Name, start_child, 1) of + true -> rpc_call(Node, Name, start_child, [Index], ?TIMEOUT); + false -> + case function_exported(Node, Name, start_child, 0) of + true -> rpc_call(Node, Name, start_child, [], ?TIMEOUT); + false -> + Args = child_args(Sup, Mod, Index), + supervisor:start_child(Sup, Args) + end + end; + R when R == undefined; R == [] -> + Args = child_args(Sup, Mod, Index), + supervisor:start_child(Sup, Args) + end, true = link(Pid), Pid. -new_worker(Sup, FromPid) -> - Pid = new_worker(Sup), - Ref = erlang:monitor(process, FromPid), - {Pid, Ref}. +child_args(Sup, Mod, Index) -> + Node = erlang:node(Sup), + case supervisor:get_childspec(Sup, Mod) of + {ok, #{start := {M,F,A}}} -> + case function_exported(Node, M, F, length(A) + 1) of + true -> [Index]; + false -> [] + end; + {ok, {_Id, {M,F,A}, _R, _SD, _T, _M}} -> + case function_exported(Node, M, F, length(A) + 1) of + true -> [Index]; + false -> [] + end; + _ -> [] + end. -get_worker_with_strategy(Workers, fifo) -> - queue:out(Workers); -get_worker_with_strategy(Workers, lifo) -> - queue:out_r(Workers). +function_exported(Node, Module, Name, Arity) -> + rpc_call(Node, erlang, function_exported, [Module, Name, Arity], ?TIMEOUT). dismiss_worker(Sup, Pid) -> true = unlink(Pid), supervisor:terminate_child(Sup, Pid). -filter_worker_by_pid(Pid, Workers) -> - queue:filter(fun (WPid) -> WPid =/= Pid end, Workers). - -prepopulate(N, _Sup) when N < 1 -> - queue:new(); -prepopulate(N, Sup) -> - prepopulate(N, Sup, queue:new()). - -prepopulate(0, _Sup, Workers) -> - Workers; -prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)). - handle_checkin(Pid, State) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, + mrefs = MRefs, + crefs = CRefs, overflow = Overflow} = State, case queue:out(Waiting) of {{value, {From, CRef, MRef}}, Left} -> true = ets:insert(Monitors, {Pid, CRef, MRef}), + true = ets:insert(MRefs, {MRef, Pid}), + true = ets:insert(CRefs, {CRef, Pid}), gen_server:reply(From, Pid), State#state{waiting = Left}; - {empty, Empty} when Overflow > 0 -> - ok = dismiss_worker(Sup, Pid), - State#state{waiting = Empty, overflow = Overflow - 1}; {empty, Empty} -> - Workers = queue:in(Pid, State#state.workers), - State#state{workers = Workers, waiting = Empty, overflow = 0} + try poolboy_worker_collection:replace(Pid, Overflow) of + {NewIdx, NewOverflow} -> + ok = dismiss_worker(Sup, Pid), + NewerOverflow = poolboy_worker_collection:checkin(NewIdx, NewOverflow), + State#state{waiting = Empty, overflow = NewerOverflow} + catch + error:enoent -> + Workers = poolboy_worker_collection:checkin(Pid, State#state.workers), + State#state{waiting = Empty, workers = Workers} + end end. handle_worker_exit(Pid, State) -> #state{supervisor = Sup, + worker_module = Mod, monitors = Monitors, - overflow = Overflow} = State, + mrefs = MRefs, + crefs = CRefs, + size = Size, + overflow = Overflow, + max_overflow = MaxOverflow} = State, + {NewWorker, Workers} = + try poolboy_worker_collection:replace(Pid, State#state.workers) + catch error:enoent -> {enoent, State#state.workers} + end, + OverflowLeft = poolboy_worker_collection:length(visible, Overflow), case queue:out(State#state.waiting) of - {{value, {From, CRef, MRef}}, LeftWaiting} -> - NewWorker = new_worker(State#state.supervisor), + {{value, {From, CRef, MRef}}, LeftWaiting} when is_pid(NewWorker) -> true = ets:insert(Monitors, {NewWorker, CRef, MRef}), + true = ets:insert(MRefs, {MRef, Pid}), + true = ets:insert(CRefs, {CRef, Pid}), gen_server:reply(From, NewWorker), - State#state{waiting = LeftWaiting}; - {empty, Empty} when Overflow > 0 -> - State#state{overflow = Overflow - 1, waiting = Empty}; - {empty, Empty} -> - W = filter_worker_by_pid(Pid, State#state.workers), - Workers = queue:in(new_worker(Sup), W), - State#state{workers = Workers, waiting = Empty} + State#state{waiting = LeftWaiting, workers = Workers}; + {{value, {From, CRef, MRef}}, LeftWaiting} when MaxOverflow > OverflowLeft -> + try + NewFun = fun(Idx) -> new_worker(Sup, Mod, Size + Idx) end, + {NewPid, NewOverflow} = poolboy_worker_collection:replace(Pid, NewFun, Overflow), + true = ets:insert(Monitors, {NewPid, CRef, MRef}), + true = ets:insert(MRefs, {MRef, Pid}), + true = ets:insert(CRefs, {CRef, Pid}), + gen_server:reply(From, NewPid), + State#state{waiting = LeftWaiting, overflow = NewOverflow} + catch error:enoent -> + State + end; + {empty, Empty} when is_pid(NewWorker) -> + NewWorkers = poolboy_worker_collection:checkin(NewWorker, Workers), + State#state{waiting = Empty, workers = NewWorkers}; + {empty, Empty} when MaxOverflow > 0 -> + {Idx, NewOverflow} = poolboy_worker_collection:replace(Pid, Overflow), + NewerOverflow = poolboy_worker_collection:checkin(Idx, NewOverflow), + State#state{waiting = Empty, overflow = NewerOverflow} end. -state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> - #state{max_overflow = MaxOverflow, workers = Workers} = State, - case queue:len(Workers) == 0 of - true when MaxOverflow < 1 -> full; - true -> overflow; - false -> ready - end; -state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> - full; -state_name(_State) -> - overflow. +state_name(State) -> + #state{workers = Workers, + overflow = Overflow, + max_overflow = MaxOverflow} = State, + case poolboy_worker_collection:length(visible, Workers) of + 0 when MaxOverflow < 1 -> full; + 0 -> + case poolboy_worker_collection:length(visible, Overflow) of + 0 -> full; + _ -> overflow + end; + _ -> ready + end. + +stop_supervisor(Reason, Pid) when is_pid(Pid) -> + case erlang:node(Pid) of + N when N == node() -> + exit(Pid, Reason); + _ when Reason =/= nodedown -> + catch gen_server:stop(Pid, Reason, ?TIMEOUT); + _ -> ok + end. diff --git a/src/poolboy_collection.erl b/src/poolboy_collection.erl new file mode 100644 index 0000000..8c8260a --- /dev/null +++ b/src/poolboy_collection.erl @@ -0,0 +1,233 @@ +-module(poolboy_collection). + +-include_lib("stdlib/include/ms_transform.hrl"). + +-export([from/2, out/1, len/1, nth/2, add/2, filter/2, replace/4, to/1]). + +-ifdef(pre17). +-type pid_queue() :: queue(). +-type pid_queue(A) :: queue(A). +-else. +-type pid_queue() :: queue:queue(). +-type pid_queue(A) :: queue:queue(A). +-endif. +-export_type([pid_queue/0, pid_queue/1]). + +-type coll_data() :: list()|{}|array:array()|pid_queue()|ets:tid(). +-type coll_data(A) :: list(A)|{A}|array:array(A)|pid_queue(A)|ets:tid(). + +-record(typed_data, { + type :: 'list' |'array' |'queue' |'tuple' |'ets', + data :: coll_data() | coll_data(any()) + }). +-type typed_data() :: #typed_data{data :: coll_data()}. +-type typed_data(A) :: #typed_data{data :: coll_data(A)}. +-export_type([typed_data/0, typed_data/1]). + + +from(List, T = list) -> + #typed_data{type = T, data = List}; +from(List, T = queue) -> + #typed_data{type = T, data = queue:from_list(List)}; +from(List, T = array) -> + #typed_data{type = T, data = array:from_list(List)}; +from(List, T = tuple) -> + #typed_data{type = T, data = list_to_tuple(List)}; +from(List, T = ets) -> + Tab = ets:new(table, [ordered_set]), + true = ets:insert_new(Tab, [{I} || I <- List]), + #typed_data{type = T, data = Tab}. + + +out(TD = #typed_data{data = Data, type = list}) -> + case Data of + [] = L -> out(TD, {empty, L}); + [H|T] -> out(TD, {{value, H}, T}) + end; +out(TD = #typed_data{data = Data, type = queue}) -> + out(TD, queue:out(Data)); +out(TD = #typed_data{data = Data, type = array}) -> + case array:sparse_size(Data) of + 0 -> out(TD, {empty, Data}); + _ -> out(TD, {{value, array:get(0, Data)}, array:reset(0, Data)}) + end; +out(TD = #typed_data{data = Data, type = tuple}) -> + case erlang:tuple_size(Data) of + 0 -> out(TD, {empty, Data}); + _ -> out(TD, {{value, element(1, Data)}, erlang:delete_element(1, Data)}) + end; +out(TD = #typed_data{data = Data, type = ets}) -> + case ets:info(Data, size) of + 0 -> {empty, TD}; + Size -> + Index = rand:uniform(Size), + [{Value}] = ets:slot(Data, Index-1), + true = ets:delete(Data, Value), + {{value, Value}, TD} + end. + +out(TD, {V, D}) -> {V, data(TD, D)}. + + +len(#typed_data{data = Data, type = list}) -> length(Data); +len(#typed_data{data = Data, type = queue}) -> queue:len(Data); +len(#typed_data{data = Data, type = array}) -> array:sparse_size(Data); +len(#typed_data{data = Data, type = tuple}) -> tuple_size(Data); +len(#typed_data{data = Data, type = ets}) -> ets:info(Data, size). + + +nth(Index, #typed_data{data = Data, type = list}) -> lists:nth(Index, Data); +nth(Index, #typed_data{data = Data, type = queue}) -> queue_nth(Index, Data); +nth(Index, #typed_data{data = Data, type = array}) -> array:get(Index-1, Data); +nth(Index, #typed_data{data = Data, type = tuple}) -> element(Index, Data); +nth(Index, #typed_data{data = Data, type = ets}) -> + [{Value}] = ets:slot(Data, Index-1), + Value. + + +add(In, TD = #typed_data{data = Data, type = list}) -> + data(TD, [In | Data]); +add(In, TD = #typed_data{data = Data, type = queue}) -> + data(TD, queue:in(In, Data)); +add(In, TD = #typed_data{data = Data, type = array}) -> + data(TD, array:set(array:size(Data), In, Data)); +add(In, TD = #typed_data{data = Data, type = tuple}) -> + data(TD, erlang:append_element(Data, In)); +add(In, TD = #typed_data{data = Data, type = ets}) -> + true = ets:insert_new(Data, {In}), + TD. + + +filter(Fun, TD = #typed_data{data = Data, type = list}) -> + data(TD, list_filter(Fun, Data)); +filter(Fun, TD = #typed_data{data = Data, type = queue}) -> + data(TD, queue:filter(Fun, Data)); +filter(Fun, TD = #typed_data{data = Data, type = array}) -> + data(TD, array_filter(Fun, Data)); +filter(Fun, TD = #typed_data{data = Data, type = tuple}) -> + data(TD, tuple_filter(Fun, Data)); +filter(Fun, TD = #typed_data{data = Data, type = ets}) -> + data(TD, ets_filter(Fun, Data)). + + +replace(Out, Index, In, TD = #typed_data{data = Data, type = list}) -> + data(TD, list_replace(Out, Index, In, Data)); +replace(Out, Index, In, TD = #typed_data{data = Data, type = queue}) -> + data(TD, queue_replace(Out, Index, In, Data)); +replace(Out, Index, In, TD = #typed_data{data = Data, type = array}) -> + data(TD, array_replace(Out, Index, In, Data)); +replace(Out, Index, In, TD = #typed_data{data = Data, type = tuple}) -> + data(TD, tuple_replace(Out, Index, In, Data)); +replace(Out, Index, In, TD = #typed_data{data = Data, type = ets}) -> + [{Out}] = ets:slot(Data, Index), + [{Out}] = ets:take(Data, Out), + true = ets:insert_new(Data, {In}), + TD. + + +to(#typed_data{data = Data, type = list}) -> Data; +to(#typed_data{data = Data, type = queue}) -> queue:to_list(Data); +to(#typed_data{data = Data, type = array}) -> array:to_list(Data); +to(#typed_data{data = Data, type = tuple}) -> tuple_to_list(Data); +to(#typed_data{data = Data, type = ets}) -> + ets:select(Data, ets:fun2ms(fun({I}) -> I end)). + + +data(TD, Data) -> TD#typed_data{data = Data}. + + + +list_filter(Fun, L) -> + list_filter(Fun, L, []). + +list_filter(_Fun, [], Acc) -> lists:reverse(Acc); +list_filter(Fun, [H | T], Acc) -> + case Fun(H) of + true -> list_filter(Fun, T, [H | Acc]); + [Else] when Else == H -> list_filter(Fun, T, [H | Acc]); + false -> list_filter(Fun, T, Acc); + [] -> list_filter(Fun, T, Acc); + [Else] -> list_filter(Fun, T, [Else | Acc]) + end. + +list_replace(O, X, I, L) -> + {L1, [O | Tl]} = lists:split(X-1, L), + L1 ++ [I| Tl]. + + + +array_filter(F, A) -> + array:sparse_map( + fun(_, V) -> + case F(V) of + true -> V; + [Else] when Else == V -> V; + false -> array:default(A); + [] -> array:default(A); + [Else] -> Else + end + end, + A). + + +array_replace(O, X, I, A) -> + O = array:get(X-1, A), + array:set(X-1, I, A). + + + +queue_nth(I, Q) -> + case queue:is_queue(Q) of + true -> + {Q1, _Q2} = queue:split(I, Q), + queue:last(Q1); + _ -> + throw(badarg) + end. + +queue_replace(O, X, I, Q) -> + {Q1, Q2} = queue:split(X-1, Q), + O = queue:get(Q2), + queue:join(queue:in(I, Q1), queue:drop(Q2)). + + + +tuple_filter(Fun, Tuple) -> + tuple_filter(Fun, Tuple, tuple_size(Tuple)). + +tuple_filter(_Fun, Tuple, 0) -> Tuple; +tuple_filter(Fun, Tuple, Index) -> + Element = element(Index, Tuple), + NewTuple = case Fun(Element) of + true -> Tuple; + [Else] when Else == Element -> Tuple; + false -> erlang:delete_element(Index, Tuple); + [] -> erlang:delete_element(Index, Tuple); + [Else] -> setelement(Index, Tuple, Else) + end, + tuple_filter(Fun, NewTuple, Index-1). + + +tuple_replace(O, X, I, Tu) -> + O = element(X, Tu), + setelement(X, Tu, I). + + + +ets_filter(Fun, Tab) -> + {Ins, Outs} = + ets:foldl( + fun({Item}, {In, Out} = Acc) -> + case Fun(Item) of + true -> Acc; + [Else] when Else == Item -> Acc; + false -> {In, [Item | Out]}; + [] -> {In, [Item | Out]}; + [Else] -> {[Else | In], [Item | Out]} + end + end, + {[], []}, + Tab), + true = lists:min([true | [ets:delete(Tab, O) || O <- Outs]]), + true = ets:insert_new(Tab, Ins), + Tab. diff --git a/src/poolboy_sup.erl b/src/poolboy_sup.erl index e6485a6..2c50b6f 100644 --- a/src/poolboy_sup.erl +++ b/src/poolboy_sup.erl @@ -6,7 +6,7 @@ -export([start_link/2, init/1]). start_link(Mod, Args) -> - supervisor:start_link(?MODULE, {Mod, Args}). + supervisor:start_link({local, Mod}, ?MODULE, {Mod, Args}). init({Mod, Args}) -> {ok, {{simple_one_for_one, 0, 1}, diff --git a/src/poolboy_worker_collection.erl b/src/poolboy_worker_collection.erl new file mode 100644 index 0000000..3fe4f84 --- /dev/null +++ b/src/poolboy_worker_collection.erl @@ -0,0 +1,109 @@ +-module(poolboy_worker_collection). + +-export([new/4, + length/2, + checkout/1, + replace/2, replace/3, + checkin/2, + filter/2, + all/2, + rand/2 + ]). + +-type strategy() :: lifo | fifo | rand. + +-record(coll, { + item_generator :: fun((non_neg_integer()) -> any()), + data :: poolboy_collection:typed_data() | poolboy_collection:typed_data(any()), + strategy :: strategy(), + indexes :: poolboy_collection:typed_data() | poolboy_collection:typed_data(any()), + rev_indexes :: #{any()=>non_neg_integer()} + }). + +-type coll() :: #coll{ + data :: poolboy_collection:typed_data(), + rev_indexes :: #{} + }. +-type coll(A) :: #coll{ + item_generator :: fun((non_neg_integer()) -> A), + data :: poolboy_collection:typed_data(A), + rev_indexes :: #{A=>non_neg_integer()} + }. + +-export_type([coll/0, coll/1]). + + +new(Type, Size, Strategy, Fun) when is_function(Fun, 1) -> + Indexes = lists:seq(1, Size), + Items = [Fun(I) || I <- Indexes], + RevIndexes = maps:from_list(lists:zip(Items, Indexes)), + IndexesType = + case Strategy of + lifo -> list; + fifo -> queue; + rand -> ets + end, + #coll{ + item_generator = Fun, + data = poolboy_collection:from(Items, Type), + strategy = Strategy, + indexes = poolboy_collection:from(Indexes, IndexesType), + rev_indexes = RevIndexes + }. + + +length(known, #coll{data=Data}) -> poolboy_collection:len(Data); +length(visible, #coll{indexes=Indexes}) -> poolboy_collection:len(Indexes). + + +checkout(Coll = #coll{indexes = Indexes, data=Data}) -> + case poolboy_collection:out(Indexes) of + {empty, _} -> empty; + {{value, Hd}, Tl} -> + {poolboy_collection:nth(Hd, Data), Coll#coll{indexes = Tl}} + end. + + +replace(Out, Coll = #coll{item_generator = In}) -> + replace(Out, In, Coll). + +replace(Out, In, Coll) when not is_function(In, 1) -> + replace(Out, fun(_) -> In end, Coll); +replace(Out, In, Coll = #coll{data = Data}) -> + case maps:take(Out, Coll#coll.rev_indexes) of + error -> error(enoent); + {OutIndex, RevIndexes} -> + NewItem = In(OutIndex), + NewData = poolboy_collection:replace(Out, OutIndex, NewItem, Data), + NewRevIndexes = maps:put(NewItem, OutIndex, RevIndexes), + {NewItem, Coll#coll{rev_indexes = NewRevIndexes, data = NewData}} + end. + + +checkin(In, Coll = #coll{indexes = Indexes, rev_indexes = RevIndexes}) -> + InIndex = maps:get(In, RevIndexes), + NewIndexes = poolboy_collection:add(InIndex, Indexes), + Coll#coll{indexes = NewIndexes}. + + +filter(Fun, #coll{data = Data}) -> + poolboy_collection:filter(Fun, Data). + + +all(known, #coll{rev_indexes = RevIndexes}) -> + maps:keys(RevIndexes); +all(visible, #coll{indexes = Indexes, data = Data}) -> + [ poolboy_collection:nth(I, Data) + || I <- poolboy_collection:to(Indexes) ]. + + +rand(known, #coll{data = Data}) -> + case poolboy_collection:len(Data) of + 0 -> empty; + L -> poolboy_collection:nth(rand:uniform(L), Data) + end; +rand(visible, #coll{indexes = Indexes, data = Data}) -> + case poolboy_collection:len(Indexes) of + 0 -> empty; + L -> poolboy_collection:nth(poolboy_collection:nth(rand:uniform(L), Indexes), Data) + end. diff --git a/src/poolboy_worker_supervisor.erl b/src/poolboy_worker_supervisor.erl new file mode 100644 index 0000000..b5a7ac0 --- /dev/null +++ b/src/poolboy_worker_supervisor.erl @@ -0,0 +1,13 @@ +-module(poolboy_worker_supervisor). + +-callback start_child() -> {ok, Pid} | + {error, Reason} when + Pid :: pid(), + Reason :: term(). + +-callback start_child(integer()) -> {ok, Pid} | + {error, Reason} when + Pid :: pid(), + Reason :: term(). + +-optional_callbacks([start_child/0, start_child/1]). diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 5b27024..f434999 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -3,76 +3,90 @@ -include_lib("eunit/include/eunit.hrl"). pool_test_() -> - {foreach, - fun() -> + {foreachx, + fun(_) -> error_logger:tty(false) end, - fun(_) -> + fun(_, _) -> case whereis(poolboy_test) of undefined -> ok; Pid -> pool_call(Pid, stop) end, error_logger:tty(true) end, + [ {{Type, Strategy}, + fun({T, S}, _) -> + {<<(atom_to_binary(T, latin1))/binary, <<"-">>/binary, + (atom_to_binary(S, latin1))/binary, <<": ">>/binary, + Title/binary>>, fun() -> Test({T, S}) end} + end} || Type <- [list, array, tuple, queue, ets], + Strategy <- [lifo, fifo, rand], + {Title, Test} <- [ {<<"Basic pool operations">>, - fun pool_startup/0 + fun pool_startup/1 }, {<<"Pool overflow should work">>, - fun pool_overflow/0 + fun pool_overflow/1 }, {<<"Pool behaves when empty">>, - fun pool_empty/0 + fun pool_empty/1 }, {<<"Pool behaves when empty and oveflow is disabled">>, - fun pool_empty_no_overflow/0 + fun pool_empty_no_overflow/1 }, {<<"Pool behaves on worker death">>, - fun worker_death/0 + fun worker_death/1 }, {<<"Pool behaves when full and a worker dies">>, - fun worker_death_while_full/0 + fun worker_death_while_full/1 }, {<<"Pool behaves when full, a worker dies and overflow disabled">>, - fun worker_death_while_full_no_overflow/0 + fun worker_death_while_full_no_overflow/1 }, {<<"Non-blocking pool behaves when full and overflow disabled">>, - fun pool_full_nonblocking_no_overflow/0 + fun pool_full_nonblocking_no_overflow/1 }, {<<"Non-blocking pool behaves when full">>, - fun pool_full_nonblocking/0 + fun pool_full_nonblocking/1 }, {<<"Pool behaves on owner death">>, - fun owner_death/0 + fun owner_death/1 }, {<<"Worker checked-in after an exception in a transaction">>, - fun checkin_after_exception_in_transaction/0 + fun checkin_after_exception_in_transaction/1 }, {<<"Pool returns status">>, - fun pool_returns_status/0 + fun pool_returns_status/1 }, {<<"Pool demonitors previously waiting processes">>, - fun demonitors_previously_waiting_processes/0 + fun demonitors_previously_waiting_processes/1 }, {<<"Pool demonitors when a checkout is cancelled">>, - fun demonitors_when_checkout_cancelled/0 + fun demonitors_when_checkout_cancelled/1 }, {<<"Check that LIFO is the default strategy">>, - fun default_strategy_lifo/0 + fun default_strategy_lifo/1 }, {<<"Check LIFO strategy">>, - fun lifo_strategy/0 + fun lifo_strategy/1 }, {<<"Check FIFO strategy">>, - fun fifo_strategy/0 + fun fifo_strategy/1 + }, + {<<"Check RAND strategy">>, + fun rand_strategy/1 }, {<<"Pool reuses waiting monitor when a worker exits">>, - fun reuses_waiting_monitor_on_worker_exit/0 + fun reuses_waiting_monitor_on_worker_exit/1 }, {<<"Recover from timeout without exit handling">>, - fun transaction_timeout_without_exit/0}, + fun transaction_timeout_without_exit/1 + }, {<<"Recover from transaction timeout">>, - fun transaction_timeout/0} + fun transaction_timeout/1 + } + ] ] }. @@ -93,8 +107,8 @@ checkin_worker(Pid, Worker) -> timer:sleep(500). -transaction_timeout_without_exit() -> - {ok, Pid} = new_pool(1, 0), +transaction_timeout_without_exit({Type, Strategy}) -> + {ok, Pid} = new_pool(1, 0, Strategy, Type), ?assertEqual({ready,1,0,0}, pool_call(Pid, status)), WorkerList = pool_call(Pid, get_all_workers), ?assertMatch([_], WorkerList), @@ -108,8 +122,8 @@ transaction_timeout_without_exit() -> ?assertEqual({ready,1,0,0}, pool_call(Pid, status)). -transaction_timeout() -> - {ok, Pid} = new_pool(1, 0), +transaction_timeout({Type, Strategy}) -> + {ok, Pid} = new_pool(1, 0, Strategy, Type), ?assertEqual({ready,1,0,0}, pool_call(Pid, status)), WorkerList = pool_call(Pid, get_all_workers), ?assertMatch([_], WorkerList), @@ -124,50 +138,50 @@ transaction_timeout() -> ?assertEqual({ready,1,0,0}, pool_call(Pid, status)). -pool_startup() -> +pool_startup({Type, Strategy}) -> %% Check basic pool operation. - {ok, Pid} = new_pool(10, 5), - ?assertEqual(10, queue:len(pool_call(Pid, get_avail_workers))), + {ok, Pid} = new_pool(10, 5, Strategy, Type), + ?assertEqual(10, length(pool_call(Pid, get_avail_workers))), poolboy:checkout(Pid), - ?assertEqual(9, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(9, length(pool_call(Pid, get_avail_workers))), Worker = poolboy:checkout(Pid), - ?assertEqual(8, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(8, length(pool_call(Pid, get_avail_workers))), checkin_worker(Pid, Worker), - ?assertEqual(9, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(9, length(pool_call(Pid, get_avail_workers))), ?assertEqual(1, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -pool_overflow() -> +pool_overflow({Type, Strategy}) -> %% Check that the pool overflows properly. - {ok, Pid} = new_pool(5, 5), - Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + {ok, Pid} = new_pool(5, 5, Strategy, Type), + Workers = lists:reverse([poolboy:checkout(Pid) || _ <- lists:seq(0, 6)]), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E, F, G] = Workers, checkin_worker(Pid, A), checkin_worker(Pid, B), - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), checkin_worker(Pid, F), - ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, G), - ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -pool_empty() -> +pool_empty({Type, Strategy}) -> %% Checks that the the pool handles the empty condition correctly when %% overflow is enabled. - {ok, Pid} = new_pool(5, 2), - Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + {ok, Pid} = new_pool(5, 2, Strategy, Type), + Workers = lists:reverse([poolboy:checkout(Pid) || _ <- lists:seq(0, 6)]), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E, F, G] = Workers, Self = self(), @@ -192,28 +206,28 @@ pool_empty() -> after 500 -> ?assert(false) end, - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), checkin_worker(Pid, F), - ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, G), - ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -pool_empty_no_overflow() -> +pool_empty_no_overflow({Type, Strategy}) -> %% Checks the pool handles the empty condition properly when overflow is %% disabled. - {ok, Pid} = new_pool(5, 0), + {ok, Pid} = new_pool(5, 0, Strategy, Type), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E] = Workers, Self = self(), @@ -238,48 +252,48 @@ pool_empty_no_overflow() -> after 500 -> ?assert(false) end, - ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(4, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), - ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -worker_death() -> +worker_death({Type, Strategy}) -> %% Check that dead workers are only restarted when the pool is not full %% and the overflow count is 0. Meaning, don't restart overflow workers. - {ok, Pid} = new_pool(5, 2), + {ok, Pid} = new_pool(5, 2, Strategy, Type), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), - [A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + [A, B, C|_Workers] = lists:reverse([poolboy:checkout(Pid) || _ <- lists:seq(0, 6)]), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), kill_worker(A), - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_workers))), kill_worker(B), kill_worker(C), - ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(4, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -worker_death_while_full() -> +worker_death_while_full({Type, Strategy}) -> %% Check that if a worker dies while the pool is full and there is a %% queued checkout, a new worker is started and the checkout serviced. %% If there are no queued checkouts, a new worker is not started. - {ok, Pid} = new_pool(5, 2), + {ok, Pid} = new_pool(5, 2, Strategy, Type), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), - [A, B|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + [A, B|_Workers] = lists:reverse([poolboy:checkout(Pid) || _ <- lists:seq(0, 6)]), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(7, length(pool_call(Pid, get_all_workers))), Self = self(), spawn(fun() -> @@ -306,21 +320,21 @@ worker_death_while_full() -> 1000 -> ?assert(false) end, kill_worker(B), - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_workers))), ?assertEqual(6, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -worker_death_while_full_no_overflow() -> +worker_death_while_full_no_overflow({Type, Strategy}) -> %% Check that if a worker dies while the pool is full and there's no %% overflow, a new worker is started unconditionally and any queued %% checkouts are serviced. - {ok, Pid} = new_pool(5, 0), + {ok, Pid} = new_pool(5, 0, Strategy, Type), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), [A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), Self = self(), spawn(fun() -> @@ -346,20 +360,20 @@ worker_death_while_full_no_overflow() -> 1000 -> ?assert(false) end, kill_worker(B), - ?assertEqual(1, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), kill_worker(C), - ?assertEqual(2, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(3, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -pool_full_nonblocking_no_overflow() -> +pool_full_nonblocking_no_overflow({Type, Strategy}) -> %% Check that when the pool is full, checkouts return 'full' when the %% option to use non-blocking checkouts is used. - {ok, Pid} = new_pool(5, 0), + {ok, Pid} = new_pool(5, 0, Strategy, Type), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(full, poolboy:checkout(Pid, false)), ?assertEqual(full, poolboy:checkout(Pid, false)), @@ -369,15 +383,15 @@ pool_full_nonblocking_no_overflow() -> ?assertEqual(5, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -pool_full_nonblocking() -> +pool_full_nonblocking({Type, Strategy}) -> %% Check that when the pool is full, checkouts return 'full' when the %% option to use non-blocking checkouts is used. - {ok, Pid} = new_pool(5, 5), + {ok, Pid} = new_pool(5, 5, Strategy, Type), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 9)], - ?assertEqual(0, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), ?assertEqual(10, length(pool_call(Pid, get_all_workers))), ?assertEqual(full, poolboy:checkout(Pid, false)), - A = hd(Workers), + A = lists:last(Workers), checkin_worker(Pid, A), NewWorker = poolboy:checkout(Pid, false), ?assertEqual(false, is_process_alive(A)), %% Overflow workers get shutdown @@ -386,26 +400,26 @@ pool_full_nonblocking() -> ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -owner_death() -> +owner_death({Type, Strategy}) -> %% Check that a dead owner (a process that dies with a worker checked out) %% causes the pool to dismiss the worker and prune the state space. - {ok, Pid} = new_pool(5, 5), + {ok, Pid} = new_pool(5, 5, Strategy, Type), spawn(fun() -> poolboy:checkout(Pid), receive after 500 -> exit(normal) end end), timer:sleep(1000), - ?assertEqual(5, queue:len(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), ok = pool_call(Pid, stop). -checkin_after_exception_in_transaction() -> - {ok, Pool} = new_pool(2, 0), - ?assertEqual(2, queue:len(pool_call(Pool, get_avail_workers))), +checkin_after_exception_in_transaction({Type, Strategy}) -> + {ok, Pool} = new_pool(2, 0, Strategy, Type), + ?assertEqual(2, length(pool_call(Pool, get_avail_workers))), Tx = fun(Worker) -> ?assert(is_pid(Worker)), - ?assertEqual(1, queue:len(pool_call(Pool, get_avail_workers))), + ?assertEqual(1, length(pool_call(Pool, get_avail_workers))), throw(it_on_the_ground), ?assert(false) end, @@ -414,11 +428,11 @@ checkin_after_exception_in_transaction() -> catch throw:it_on_the_ground -> ok end, - ?assertEqual(2, queue:len(pool_call(Pool, get_avail_workers))), + ?assertEqual(2, length(pool_call(Pool, get_avail_workers))), ok = pool_call(Pool, stop). -pool_returns_status() -> - {ok, Pool} = new_pool(2, 0), +pool_returns_status({Type, Strategy}) -> + {ok, Pool} = new_pool(2, 0, Strategy, Type), ?assertEqual({ready, 2, 0, 0}, poolboy:status(Pool)), poolboy:checkout(Pool), ?assertEqual({ready, 1, 0, 1}, poolboy:status(Pool)), @@ -426,7 +440,7 @@ pool_returns_status() -> ?assertEqual({full, 0, 0, 2}, poolboy:status(Pool)), ok = pool_call(Pool, stop), - {ok, Pool2} = new_pool(1, 1), + {ok, Pool2} = new_pool(1, 1, Strategy, Type), ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pool2)), poolboy:checkout(Pool2), ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pool2)), @@ -434,7 +448,7 @@ pool_returns_status() -> ?assertEqual({full, 0, 1, 2}, poolboy:status(Pool2)), ok = pool_call(Pool2, stop), - {ok, Pool3} = new_pool(0, 2), + {ok, Pool3} = new_pool(0, 2, Strategy, Type), ?assertEqual({overflow, 0, 0, 0}, poolboy:status(Pool3)), poolboy:checkout(Pool3), ?assertEqual({overflow, 0, 1, 1}, poolboy:status(Pool3)), @@ -442,12 +456,12 @@ pool_returns_status() -> ?assertEqual({full, 0, 2, 2}, poolboy:status(Pool3)), ok = pool_call(Pool3, stop), - {ok, Pool4} = new_pool(0, 0), + {ok, Pool4} = new_pool(0, 0, Strategy, Type), ?assertEqual({full, 0, 0, 0}, poolboy:status(Pool4)), ok = pool_call(Pool4, stop). -demonitors_previously_waiting_processes() -> - {ok, Pool} = new_pool(1,0), +demonitors_previously_waiting_processes({Type, Strategy}) -> + {ok, Pool} = new_pool(1,0, Strategy, Type), Self = self(), Pid = spawn(fun() -> W = poolboy:checkout(Pool), @@ -465,8 +479,8 @@ demonitors_previously_waiting_processes() -> Pid ! ok, ok = pool_call(Pool, stop). -demonitors_when_checkout_cancelled() -> - {ok, Pool} = new_pool(1,0), +demonitors_when_checkout_cancelled({Type, Strategy}) -> + {ok, Pool} = new_pool(1, 0, Strategy, Type), Self = self(), Pid = spawn(fun() -> poolboy:checkout(Pool), @@ -481,32 +495,61 @@ demonitors_when_checkout_cancelled() -> Pid ! ok, ok = pool_call(Pool, stop). -default_strategy_lifo() -> +default_strategy_lifo({Type, lifo}) -> %% Default strategy is LIFO - {ok, Pid} = new_pool(2, 0), + {ok, Pid} = new_pool(2, 0, default, Type), Worker1 = poolboy:checkout(Pid), ok = poolboy:checkin(Pid, Worker1), Worker1 = poolboy:checkout(Pid), - poolboy:stop(Pid). + poolboy:stop(Pid); +default_strategy_lifo({_Type, _Strategy}) -> + ok. -lifo_strategy() -> - {ok, Pid} = new_pool(2, 0, lifo), +lifo_strategy({Type, lifo}) -> + {ok, Pid} = new_pool(2, 0, lifo, Type), Worker1 = poolboy:checkout(Pid), ok = poolboy:checkin(Pid, Worker1), Worker1 = poolboy:checkout(Pid), - poolboy:stop(Pid). + poolboy:stop(Pid); +lifo_strategy({_Type, _Strategy}) -> + ok. -fifo_strategy() -> - {ok, Pid} = new_pool(2, 0, fifo), +fifo_strategy({Type, fifo}) -> + {ok, Pid} = new_pool(2, 0, fifo, Type), Worker1 = poolboy:checkout(Pid), ok = poolboy:checkin(Pid, Worker1), Worker2 = poolboy:checkout(Pid), ?assert(Worker1 =/= Worker2), Worker1 = poolboy:checkout(Pid), - poolboy:stop(Pid). - -reuses_waiting_monitor_on_worker_exit() -> - {ok, Pool} = new_pool(1,0), + poolboy:stop(Pid); +fifo_strategy({_Type, _Strategy}) -> + ok. + +rand_strategy({Type, rand}) -> + {ok, Pid} = new_pool(3, 0, rand, Type), + Workers1 = + [ begin + Worker = poolboy:checkout(Pid), + ok = poolboy:checkin(Pid, Worker), + Worker + end || _ <- lists:seq(1,9) ], + Workers2 = + [ begin + Worker = poolboy:checkout(Pid), + ok = poolboy:checkin(Pid, Worker), + Worker + end || _ <- lists:seq(1,9) ], + ?assertNotEqual(Workers1, Workers2), + Workers = [ poolboy:checkout(Pid) + || _ <- lists:seq(1,3) ], + ?assertEqual(lists:usort(Workers), + lists:usort(Workers1 ++ Workers2)), + poolboy:stop(Pid); +rand_strategy({_Type, _Strategy}) -> + ok. + +reuses_waiting_monitor_on_worker_exit({Type, Strategy}) -> + {ok, Pool} = new_pool(1,0, Strategy, Type), Self = self(), Pid = spawn(fun() -> @@ -540,11 +583,24 @@ new_pool(Size, MaxOverflow) -> {worker_module, poolboy_test_worker}, {size, Size}, {max_overflow, MaxOverflow}]). -new_pool(Size, MaxOverflow, Strategy) -> +new_pool(Size, MaxOverflow, default, Type) -> + poolboy:start_link([{name, {local, poolboy_test}}, + {worker_module, poolboy_test_worker}, + {size, Size}, {max_overflow, MaxOverflow}, {type, Type}]); + +new_pool(Size, MaxOverflow, Strategy, Type) -> poolboy:start_link([{name, {local, poolboy_test}}, {worker_module, poolboy_test_worker}, - {size, Size}, {max_overflow, MaxOverflow}, + {size, Size}, {max_overflow, MaxOverflow}, {type, Type}, {strategy, Strategy}]). +pool_call(ServerRef, stop) when is_pid(ServerRef) -> + case is_process_alive(ServerRef) of + true -> + try gen_server:stop(ServerRef) + catch exit:noproc -> ok + end; + _ -> ok + end; pool_call(ServerRef, Request) -> gen_server:call(ServerRef, Request).