feat(hooks): emqx hooks support put

This commit is contained in:
zhanghongtong 2021-07-30 18:27:51 +08:00 committed by Rory Z
parent 4adfe35e61
commit 9a03138e53
2 changed files with 61 additions and 17 deletions

View File

@ -20,6 +20,7 @@
-include("logger.hrl").
-include("types.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-logger_header("[Hooks]").
@ -32,6 +33,8 @@
, add/3
, add/4
, put/2
, put/3
, put/4
, del/2
, run/2
, run_fold/3
@ -130,11 +133,25 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
%% @doc Like add/2, it register a callback, discard 'already_exists' error.
-spec(put(hookpoint(), action() | #callback{}) -> ok).
put(HookPoint, Callback) ->
put(HookPoint, Callback) when is_record(Callback, callback) ->
case add(HookPoint, Callback) of
ok -> ok;
{error, already_exists} -> ok
end.
{error, already_exists} ->
gen_server:call(?SERVER, {put, HookPoint, Callback}, infinity)
end;
put(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
?MODULE:put(HookPoint, #callback{action = Action, priority = 0}).
-spec(put(hookpoint(), action(), filter() | integer() | list()) -> ok).
put(HookPoint, Action, {_M, _F, _A} = Filter) ->
?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = 0});
put(HookPoint, Action, Priority) when is_integer(Priority) ->
?MODULE:put(HookPoint, #callback{action = Action, priority = Priority}).
-spec(put(hookpoint(), action(), filter(), integer()) -> ok).
put(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Unregister a callback.
-spec(del(hookpoint(), action() | {module(), atom()}) -> ok).
@ -215,15 +232,20 @@ init([]) ->
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
{ok, #{}}.
handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
Reply = case lists:keymember(Action, #callback.action, Callbacks = lookup(HookPoint)) of
true ->
{error, already_exists};
false ->
insert_hook(HookPoint, add_callback(Callback, Callbacks))
handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) ->
Reply = case lists:any(fun (#callback{action = {M0, F0, _}}) ->
M0 =:= M andalso F0 =:= F
end, Callbacks = lookup(HookPoint)) of
true -> {error, already_exists};
false -> insert_hook(HookPoint, add_callback(Callback, Callbacks))
end,
{reply, Reply, State};
handle_call({put, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) ->
Callbacks = del_callback({M, F}, lookup(HookPoint)),
Reply = update_hook(HookPoint, add_callback(Callback, Callbacks)),
{reply, Reply, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
@ -257,6 +279,10 @@ code_change(_OldVsn, State, _Extra) ->
insert_hook(HookPoint, Callbacks) ->
ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok.
update_hook(HookPoint, Callbacks) ->
Ms = ets:fun2ms(fun ({hook, K, V}) when K =:= HookPoint -> {hook, K, Callbacks} end),
ets:select_replace(emqx_hooks, Ms),
ok.
add_callback(C, Callbacks) ->
add_callback(C, Callbacks, []).

View File

@ -38,15 +38,22 @@ all() -> emqx_ct:all(?MODULE).
% t_add(_) ->
% error('TODO').
t_add_del_hook(_) ->
t_add_put_del_hook(_) ->
{ok, _} = emqx_hooks:start_link(),
ok = emqx:hook(test_hook, {?MODULE, hook_fun1, []}),
ok = emqx:hook(test_hook, {?MODULE, hook_fun2, []}),
?assertEqual({error, already_exists},
emqx:hook(test_hook, {?MODULE, hook_fun2, []})),
Callbacks = [{callback, {?MODULE, hook_fun1, []}, undefined, 0},
{callback, {?MODULE, hook_fun2, []}, undefined, 0}],
?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)),
Callbacks0 = [{callback, {?MODULE, hook_fun1, []}, undefined, 0},
{callback, {?MODULE, hook_fun2, []}, undefined, 0}],
?assertEqual(Callbacks0, emqx_hooks:lookup(test_hook)),
ok = emqx_hooks:put(test_hook, {?MODULE, hook_fun1, [test]}),
ok = emqx_hooks:put(test_hook, {?MODULE, hook_fun2, [test]}),
Callbacks1 = [{callback, {?MODULE, hook_fun1, [test]}, undefined, 0},
{callback, {?MODULE, hook_fun2, [test]}, undefined, 0}],
?assertEqual(Callbacks1, emqx_hooks:lookup(test_hook)),
ok = emqx:unhook(test_hook, {?MODULE, hook_fun1}),
ok = emqx:unhook(test_hook, {?MODULE, hook_fun2}),
timer:sleep(200),
@ -61,10 +68,21 @@ t_add_del_hook(_) ->
{callback, {?MODULE, hook_fun8, []}, undefined, 8},
{callback, {?MODULE, hook_fun2, []}, undefined, 2}],
?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, []}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, []}),
ok = emqx_hooks:put(emqx_hook, {?MODULE, hook_fun8, [test]}, 3),
ok = emqx_hooks:put(emqx_hook, {?MODULE, hook_fun2, [test]}, 4),
ok = emqx_hooks:put(emqx_hook, {?MODULE, hook_fun10, [test]}, 1),
ok = emqx_hooks:put(emqx_hook, {?MODULE, hook_fun9, [test]}, 2),
Callbacks3 = [{callback, {?MODULE, hook_fun2, [test]}, undefined, 4},
{callback, {?MODULE, hook_fun8, [test]}, undefined, 3},
{callback, {?MODULE, hook_fun9, [test]}, undefined, 2},
{callback, {?MODULE, hook_fun10, [test]}, undefined, 1}],
?assertEqual(Callbacks3, emqx_hooks:lookup(emqx_hook)),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, [test]}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, [test]}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, [test]}),
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, [test]}),
timer:sleep(200),
?assertEqual([], emqx_hooks:lookup(emqx_hook)),
ok = emqx_hooks:stop().