diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 276352797..52df7fec3 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -17,7 +17,8 @@ -behaviour(gen_server). -export([start_link/0, start_link/2]). --export([submit/1, async_submit/1]). +-export([submit/1, submit/2]). +-export([async_submit/1, async_submit/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -25,6 +26,8 @@ -define(POOL, ?MODULE). +-type(task() :: fun() | mfa() | {fun(), Args :: list(any())}). + %% @doc Start pooler supervisor. start_link() -> emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}). @@ -34,16 +37,33 @@ start_link() -> start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). -%% @doc Submit work to the pool --spec(submit(fun()) -> any()). -submit(Fun) -> - gen_server:call(worker(), {submit, Fun}, infinity). +%% @doc Submit work to the pool. +-spec(submit(task()) -> any()). +submit(Task) -> + call({submit, Task}). -%% @doc Submit work to the pool asynchronously --spec(async_submit(fun()) -> ok). -async_submit(Fun) -> - gen_server:cast(worker(), {async_submit, Fun}). +-spec(submit(fun(), list(any())) -> any()). +submit(Fun, Args) -> + call({submit, {Fun, Args}}). +%% @private +call(Req) -> + gen_server:call(worker(), Req, infinity). + +%% @doc Submit work to the pool asynchronously. +-spec(async_submit(task()) -> ok). +async_submit(Task) -> + cast({async_submit, Task}). + +-spec(async_submit(fun(), list(any())) -> ok). +async_submit(Fun, Args) -> + cast({async_submit, {Fun, Args}}). + +%% @private +cast(Msg) -> + gen_server:cast(worker(), Msg). + +%% @private worker() -> gproc_pool:pick_worker(pool). @@ -55,15 +75,15 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({submit, Fun}, _From, State) -> - {reply, catch run(Fun), State}; +handle_call({submit, Task}, _From, State) -> + {reply, catch run(Task), State}; handle_call(Req, _From, State) -> emqx_logger:error("[Pool] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({async_submit, Fun}, State) -> - try run(Fun) +handle_cast({async_submit, Task}, State) -> + try run(Task) catch _:Error:Stacktrace -> emqx_logger:error("[Pool] error: ~p, ~p", [Error, Stacktrace]) end, @@ -78,7 +98,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #{pool := Pool, id := Id}) -> - true = gproc_pool:disconnect_worker(Pool, {Pool, Id}). + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -89,6 +109,8 @@ code_change(_OldVsn, State, _Extra) -> run({M, F, A}) -> erlang:apply(M, F, A); +run({F, A}) when is_function(F), is_list(A) -> + erlang:apply(F, A); run(Fun) when is_function(Fun) -> Fun(). diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index b3b417717..6540bbef2 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -188,8 +188,8 @@ parse(Topic = <<"$share/", Topic1/binary>>, Options) -> case binary:split(Topic1, <<"/">>) of [<<>>] -> error({invalid_topic, Topic}); [_] -> error({invalid_topic, Topic}); - [Group, Topic2] -> - case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of + [Group, Topic2] -> + case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of nomatch -> {Topic2, maps:put(share, Group, Options)}; _ -> error({invalid_topic, Topic}) end