diff --git a/apps/emqx/src/emqx_hooks.erl b/apps/emqx/src/emqx_hooks.erl index 3709c64d3..eb2da4276 100644 --- a/apps/emqx/src/emqx_hooks.erl +++ b/apps/emqx/src/emqx_hooks.erl @@ -20,6 +20,7 @@ -include("logger.hrl"). -include("types.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -logger_header("[Hooks]"). @@ -32,6 +33,8 @@ , add/3 , add/4 , put/2 + , put/3 + , put/4 , del/2 , run/2 , run_fold/3 @@ -130,11 +133,25 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> %% @doc Like add/2, it register a callback, discard 'already_exists' error. -spec(put(hookpoint(), action() | #callback{}) -> ok). -put(HookPoint, Callback) -> +put(HookPoint, Callback) when is_record(Callback, callback) -> case add(HookPoint, Callback) of ok -> ok; - {error, already_exists} -> ok - end. + {error, already_exists} -> + gen_server:call(?SERVER, {put, HookPoint, Callback}, infinity) + end; +put(HookPoint, Action) when is_function(Action); is_tuple(Action) -> + ?MODULE:put(HookPoint, #callback{action = Action, priority = 0}). + +-spec(put(hookpoint(), action(), filter() | integer() | list()) -> ok). +put(HookPoint, Action, {_M, _F, _A} = Filter) -> + ?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = 0}); +put(HookPoint, Action, Priority) when is_integer(Priority) -> + ?MODULE:put(HookPoint, #callback{action = Action, priority = Priority}). + +-spec(put(hookpoint(), action(), filter(), integer()) -> ok). +put(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> + ?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). + %% @doc Unregister a callback. -spec(del(hookpoint(), action() | {module(), atom()}) -> ok). @@ -215,15 +232,20 @@ init([]) -> ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), {ok, #{}}. -handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> - Reply = case lists:keymember(Action, #callback.action, Callbacks = lookup(HookPoint)) of - true -> - {error, already_exists}; - false -> - insert_hook(HookPoint, add_callback(Callback, Callbacks)) +handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) -> + Reply = case lists:any(fun (#callback{action = {M0, F0, _}}) -> + M0 =:= M andalso F0 =:= F + end, Callbacks = lookup(HookPoint)) of + true -> {error, already_exists}; + false -> insert_hook(HookPoint, add_callback(Callback, Callbacks)) end, {reply, Reply, State}; +handle_call({put, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) -> + Callbacks = del_callback({M, F}, lookup(HookPoint)), + Reply = update_hook(HookPoint, add_callback(Callback, Callbacks)), + {reply, Reply, State}; + handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -257,6 +279,10 @@ code_change(_OldVsn, State, _Extra) -> insert_hook(HookPoint, Callbacks) -> ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok. +update_hook(HookPoint, Callbacks) -> + Ms = ets:fun2ms(fun ({hook, K, V}) when K =:= HookPoint -> {hook, K, Callbacks} end), + ets:select_replace(emqx_hooks, Ms), + ok. add_callback(C, Callbacks) -> add_callback(C, Callbacks, []). diff --git a/apps/emqx/test/emqx_hooks_SUITE.erl b/apps/emqx/test/emqx_hooks_SUITE.erl index 49ba88934..be8814ca4 100644 --- a/apps/emqx/test/emqx_hooks_SUITE.erl +++ b/apps/emqx/test/emqx_hooks_SUITE.erl @@ -38,15 +38,22 @@ all() -> emqx_ct:all(?MODULE). % t_add(_) -> % error('TODO'). -t_add_del_hook(_) -> +t_add_put_del_hook(_) -> {ok, _} = emqx_hooks:start_link(), ok = emqx:hook(test_hook, {?MODULE, hook_fun1, []}), ok = emqx:hook(test_hook, {?MODULE, hook_fun2, []}), ?assertEqual({error, already_exists}, emqx:hook(test_hook, {?MODULE, hook_fun2, []})), - Callbacks = [{callback, {?MODULE, hook_fun1, []}, undefined, 0}, - {callback, {?MODULE, hook_fun2, []}, undefined, 0}], - ?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)), + Callbacks0 = [{callback, {?MODULE, hook_fun1, []}, undefined, 0}, + {callback, {?MODULE, hook_fun2, []}, undefined, 0}], + ?assertEqual(Callbacks0, emqx_hooks:lookup(test_hook)), + + ok = emqx_hooks:put(test_hook, {?MODULE, hook_fun1, [test]}), + ok = emqx_hooks:put(test_hook, {?MODULE, hook_fun2, [test]}), + Callbacks1 = [{callback, {?MODULE, hook_fun1, [test]}, undefined, 0}, + {callback, {?MODULE, hook_fun2, [test]}, undefined, 0}], + ?assertEqual(Callbacks1, emqx_hooks:lookup(test_hook)), + ok = emqx:unhook(test_hook, {?MODULE, hook_fun1}), ok = emqx:unhook(test_hook, {?MODULE, hook_fun2}), timer:sleep(200), @@ -61,10 +68,21 @@ t_add_del_hook(_) -> {callback, {?MODULE, hook_fun8, []}, undefined, 8}, {callback, {?MODULE, hook_fun2, []}, undefined, 2}], ?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)), - ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}), - ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, []}), - ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, []}), - ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, []}), + + ok = emqx_hooks:put(emqx_hook, {?MODULE, hook_fun8, [test]}, 3), + ok = emqx_hooks:put(emqx_hook, {?MODULE, hook_fun2, [test]}, 4), + ok = emqx_hooks:put(emqx_hook, {?MODULE, hook_fun10, [test]}, 1), + ok = emqx_hooks:put(emqx_hook, {?MODULE, hook_fun9, [test]}, 2), + Callbacks3 = [{callback, {?MODULE, hook_fun2, [test]}, undefined, 4}, + {callback, {?MODULE, hook_fun8, [test]}, undefined, 3}, + {callback, {?MODULE, hook_fun9, [test]}, undefined, 2}, + {callback, {?MODULE, hook_fun10, [test]}, undefined, 1}], + ?assertEqual(Callbacks3, emqx_hooks:lookup(emqx_hook)), + + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, [test]}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, [test]}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, [test]}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, [test]}), timer:sleep(200), ?assertEqual([], emqx_hooks:lookup(emqx_hook)), ok = emqx_hooks:stop().