From eb53d366e9032faef940d9b34ff4f18b98ce3a43 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 3 Sep 2018 17:57:53 +0800 Subject: [PATCH 1/7] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2f9e31cd8..41c6ce8b6 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # *EMQ X* - MQTT Broker -*EMQ X* broker is a fully open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. +*EMQ X* broker is a fully open source, highly scalable, highly available distributed MQTT messaging broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. From 96122cf966d477eae1d2c88f4b9d2a88194abef2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 4 Sep 2018 19:14:25 +0800 Subject: [PATCH 2/7] Rename 'already_existed' to 'already_exists' --- src/emqx_access_control.erl | 2 +- src/emqx_tracer.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 8301bd8d8..140f82bb6 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -154,7 +154,7 @@ init([]) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), reply(case lists:keymember(Mod, 1, Mods) of - true -> {error, already_existed}; + true -> {error, already_exists}; false -> case catch Mod:init(Opts) of {ok, ModState} -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 65e6f6378..44ad6c26f 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -78,7 +78,7 @@ init([]) -> handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) -> case catch lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of {ok, exists} -> - {reply, {error, already_existed}, State}; + {reply, {error, already_exists}, State}; {ok, Trace} -> {reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}}; {error, Reason} -> From 46359214589adc41f8ac5caaff405163ac660e8f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 5 Sep 2018 23:21:06 +0800 Subject: [PATCH 3/7] Rewrite the hooks module --- src/emqx_hooks.erl | 199 ++++++++++++++++++++++++--------------------- 1 file changed, 106 insertions(+), 93 deletions(-) diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 0aa3e274c..64c1dc596 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -16,142 +16,160 @@ -behaviour(gen_server). --export([start_link/0]). +-export([start_link/0, stop/0]). %% Hooks API --export([add/3, add/4, delete/2, run/2, run/3, lookup/1]). +-export([add/2, add/3, add/4, del/2, run/2, run/3, lookup/1]). %% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). --record(state, {}). +-type(hookpoint() :: atom()). +-type(action() :: function() | mfa()). +-type(filter() :: function() | mfa()). --type(hooktag() :: atom() | string() | binary()). +-record(callback, {action :: action(), + filter :: filter(), + priority :: integer()}). --export_type([hooktag/0]). +-record(hook, {name :: hookpoint(), callbacks :: list(#callback{})}). --record(callback, {tag :: hooktag(), - function :: function(), - init_args = [] :: list(any()), - priority = 0 :: integer()}). - --record(hook, {name :: atom(), callbacks = [] :: list(#callback{})}). +-export_type([hookpoint/0, action/0, filter/0]). -define(TAB, ?MODULE). +-define(SERVER, ?MODULE). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{hibernate_after, 60000}]). -%%-------------------------------------------------------------------- +-spec(stop() -> ok). +stop() -> + gen_server:stop(?SERVER, normal, infinity). + +%%------------------------------------------------------------------------------ %% Hooks API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ --spec(add(atom(), function() | {hooktag(), function()}, list(any())) -> ok). -add(HookPoint, Function, InitArgs) when is_function(Function) -> - add(HookPoint, {undefined, Function}, InitArgs, 0); +%% @doc Register a callback +-spec(add(hookpoint(), action() | #callback{}) -> emqx_types:ok_or_error(already_exists)). +add(HookPoint, Callback) when is_record(Callback, callback) -> + gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity); +add(HookPoint, Action) when is_function(Action); is_tuple(Action) -> + add(HookPoint, #callback{action = Action, priority = 0}). -add(HookPoint, {Tag, Function}, InitArgs) when is_function(Function) -> - add(HookPoint, {Tag, Function}, InitArgs, 0). +-spec(add(hookpoint(), action(), filter() | integer() | list()) + -> emqx_types:ok_or_error(already_exists)). +add(HookPoint, Action, InitArgs) when is_function(Action), is_list(InitArgs) -> + add(HookPoint, #callback{action = {Action, InitArgs}, priority = 0}); +add(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) -> + add(HookPoint, #callback{action = Action, filter = Filter, priority = 0}); +add(HookPoint, Action, Priority) when is_integer(Priority) -> + add(HookPoint, #callback{action = Action, priority = Priority}). --spec(add(atom(), function() | {hooktag(), function()}, list(any()), integer()) -> ok). -add(HookPoint, Function, InitArgs, Priority) when is_function(Function) -> - add(HookPoint, {undefined, Function}, InitArgs, Priority); -add(HookPoint, {Tag, Function}, InitArgs, Priority) when is_function(Function) -> - gen_server:call(?MODULE, {add, HookPoint, {Tag, Function}, InitArgs, Priority}). +-spec(add(hookpoint(), action(), filter(), integer()) + -> emqx_types:ok_or_error(already_exists)). +add(HookPoint, Action, Filter, Priority) -> + add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). --spec(delete(atom(), function() | {hooktag(), function()}) -> ok). -delete(HookPoint, Function) when is_function(Function) -> - delete(HookPoint, {undefined, Function}); -delete(HookPoint, {Tag, Function}) when is_function(Function) -> - gen_server:call(?MODULE, {delete, HookPoint, {Tag, Function}}). +%% @doc Unregister a callback. +-spec(del(hookpoint(), action()) -> ok). +del(HookPoint, Action) -> + gen_server:cast(?SERVER, {del, HookPoint, Action}). -%% @doc Run hooks without Acc. +%% @doc Run hooks. -spec(run(atom(), list(Arg :: any())) -> ok | stop). run(HookPoint, Args) -> run_(lookup(HookPoint), Args). +%% @doc Run hooks with Accumulator. -spec(run(atom(), list(Arg :: any()), any()) -> any()). run(HookPoint, Args, Acc) -> run_(lookup(HookPoint), Args, Acc). %% @private -run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args) -> - case apply(Fun, lists:append([Args, InitArgs])) of +run_([#callback{action = Action, filter = Filter} | Callbacks], Args) -> + case filtered(Filter, Args) orelse execute(Action, Args) of + true -> run_(Callbacks, Args); ok -> run_(Callbacks, Args); stop -> stop; _Any -> run_(Callbacks, Args) end; - run_([], _Args) -> ok. %% @private -run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) -> - case apply(Fun, lists:append([Args, [Acc], InitArgs])) of +run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> + Args1 = Args ++ [Acc], + case filtered(Filter, Args1) orelse execute(Action, Args1) of + true -> run_(Callbacks, Args, Acc); ok -> run_(Callbacks, Args, Acc); {ok, NewAcc} -> run_(Callbacks, Args, NewAcc); stop -> {stop, Acc}; {stop, NewAcc} -> {stop, NewAcc}; _Any -> run_(Callbacks, Args, Acc) end; - run_([], _Args, Acc) -> {ok, Acc}. --spec(lookup(atom()) -> [#callback{}]). +filtered(undefined, _Args) -> + false; +filtered(Filter, Args) -> + execute(Filter, Args). + +execute(Action, Args) when is_function(Action) -> + erlang:apply(Action, Args); +execute({Fun, InitArgs}, Args) when is_function(Fun) -> + erlang:apply(Fun, Args ++ InitArgs); +execute({M, F, A}, Args) -> + erlang:apply(M, F, Args ++ A). + +%% @doc Lookup callbacks. +-spec(lookup(hookpoint()) -> [#callback{}]). lookup(HookPoint) -> case ets:lookup(?TAB, HookPoint) of - [#hook{callbacks = Callbacks}] -> Callbacks; + [#hook{callbacks = Callbacks}] -> + Callbacks; [] -> [] end. -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- %% gen_server callbacks -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- init([]) -> - _ = emqx_tables:new(?TAB, [set, protected, {keypos, #hook.name}, - {read_concurrency, true}]), - {ok, #state{}}. + _ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), + {ok, #{}}. -handle_call({add, HookPoint, {Tag, Function}, InitArgs, Priority}, _From, State) -> - Callback = #callback{tag = Tag, function = Function, - init_args = InitArgs, priority = Priority}, - {reply, - case ets:lookup(?TAB, HookPoint) of - [#hook{callbacks = Callbacks}] -> - case contain_(Tag, Function, Callbacks) of - false -> - insert_hook_(HookPoint, add_callback_(Callback, Callbacks)); - true -> - {error, already_hooked} - end; - [] -> - insert_hook_(HookPoint, [Callback]) - end, State}; +handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> + Reply = case lists:keyfind(Action, 2, Callbacks = lookup(HookPoint)) of + true -> + {error, already_exists}; + false -> + insert_hook(HookPoint, add_callback(Callback, Callbacks)) + end, + {reply, Reply, State}; -handle_call({delete, HookPoint, {Tag, Function}}, _From, State) -> - {reply, - case ets:lookup(?TAB, HookPoint) of - [#hook{callbacks = Callbacks}] -> - case contain_(Tag, Function, Callbacks) of - true -> - insert_hook_(HookPoint, del_callback_(Tag, Function, Callbacks)); - false -> - {error, not_found} - end; - [] -> - {error, not_found} - end, State}; +handle_call({del, HookPoint, Action}, _From, State) -> + case lists:keydelete(Action, 2, lookup(HookPoint)) of + [] -> + ets:delete(?TAB, HookPoint); + Callbacks -> + insert_hook(HookPoint, Callbacks) + end, + {reply, ok, State}; handle_call(Req, _From, State) -> - {reply, {error, {unexpected_request, Req}}, State}. + emqx_logger:error("[Hooks] unexpected call: ~p", [Req]), + {reply, ignored, State}. -handle_cast(_Msg, State) -> +handle_cast(Msg, State) -> + emqx_logger:error("[Hooks] unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info(_Info, State) -> +handle_info(Info, State) -> + emqx_logger:error("[Hooks] unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -160,26 +178,21 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- %% Internal functions -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- -insert_hook_(HookPoint, Callbacks) -> +insert_hook(HookPoint, Callbacks) -> ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok. -add_callback_(Callback, Callbacks) -> - lists:keymerge(#callback.priority, Callbacks, [Callback]). +add_callback(C, Callbacks) -> + add_callback(C, Callbacks, []). -del_callback_(Tag, Function, Callbacks) -> - lists:filter( - fun(#callback{tag = Tag1, function = Func1}) -> - not ((Tag =:= Tag1) andalso (Function =:= Func1)) - end, Callbacks). - -contain_(_Tag, _Function, []) -> - false; -contain_(Tag, Function, [#callback{tag = Tag, function = Function}|_Callbacks]) -> - true; -contain_(Tag, Function, [_Callback | Callbacks]) -> - contain_(Tag, Function, Callbacks). +add_callback(C, [], Acc) -> + lists:reverse([C|Acc]); +add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More], Acc) + when P1 =< P2 -> + add_callback(C1, More, [C2|Acc]); +add_callback(C1, More, Acc) -> + lists:append(lists:reverse(Acc), [C1 | More]). From 5e3aed0b7366aa68d6744187bda04f4de3d41977 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 09:10:47 +0800 Subject: [PATCH 4/7] Add ok_or_error/1 type --- src/emqx_types.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index d31f37303..960aa699a 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -17,7 +17,7 @@ -include("emqx.hrl"). -export_type([zone/0]). --export_type([startlink_ret/0]). +-export_type([startlink_ret/0, ok_or_error/1]). -export_type([pubsub/0, topic/0, subid/0, subopts/0]). -export_type([client_id/0, username/0, password/0, peername/0, protocol/0]). -export_type([credentials/0, session/0]). @@ -29,6 +29,7 @@ -type(zone() :: atom()). -type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}). +-type(ok_or_error(Reason) :: ok | {error, Reason}). -type(pubsub() :: publish | subscribe). -type(topic() :: binary()). -type(subid() :: binary() | atom()). From 2a751055808c7b77bdb375e022582b82bfe091a0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 16:27:16 +0800 Subject: [PATCH 5/7] Improve the Hooks's design --- src/emqx.erl | 65 ++++++++++++++++++-------------- src/emqx_hooks.erl | 38 +++++++++++++------ src/emqx_mod_presence.erl | 24 ++++++------ src/emqx_mod_rewrite.erl | 24 +++++++----- test/emqx_broker_SUITE.erl | 58 +--------------------------- test/emqx_hooks_SUITE.erl | 77 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 120 deletions(-) create mode 100644 test/emqx_hooks_SUITE.erl diff --git a/src/emqx.erl b/src/emqx.erl index 217611171..72f1d6f81 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -29,16 +29,16 @@ -export([get_subopts/2, set_subopts/3]). %% Hooks API --export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]). +-export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]). %% Shutdown and reboot -export([shutdown/0, shutdown/1, reboot/0]). -define(APP, ?MODULE). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Bootstrap, is_running... -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% @doc Start emqx application -spec(start() -> {ok, list(atom())} | {error, term()}). @@ -62,9 +62,9 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% PubSub API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec(subscribe(emqx_topic:topic() | string()) -> ok). subscribe(Topic) -> @@ -97,9 +97,9 @@ unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> unsubscribe(Topic, SubPid) when is_pid(SubPid) -> emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% PubSub management API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber()) -> emqx_types:subopts()). @@ -128,36 +128,43 @@ subscribed(Topic, SubPid) when is_pid(SubPid) -> subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> emqx_broker:subscribed(iolist_to_binary(Topic), SubId). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Hooks API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ --spec(hook(atom(), function() | {emqx_hooks:hooktag(), function()}, list(any())) - -> ok | {error, term()}). -hook(Hook, TagFunction, InitArgs) -> - emqx_hooks:add(Hook, TagFunction, InitArgs). +-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}). +hook(HookPoint, Action) -> + emqx_hooks:add(HookPoint, Action). --spec(hook(atom(), function() | {emqx_hooks:hooktag(), function()}, list(any()), integer()) - -> ok | {error, term()}). -hook(Hook, TagFunction, InitArgs, Priority) -> - emqx_hooks:add(Hook, TagFunction, InitArgs, Priority). +-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter() | integer()) + -> ok | {error, already_exists}). +hook(HookPoint, Action, Priority) when is_integer(Priority) -> + emqx_hooks:add(HookPoint, Action, Priority); +hook(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) -> + emqx_hooks:add(HookPoint, Action, Filter); +hook(HookPoint, Action, InitArgs) when is_list(InitArgs) -> + emqx_hooks:add(HookPoint, Action, InitArgs). --spec(unhook(atom(), function() | {emqx_hooks:hooktag(), function()}) - -> ok | {error, term()}). -unhook(Hook, TagFunction) -> - emqx_hooks:delete(Hook, TagFunction). +-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer()) + -> ok | {error, already_exists}). +hook(HookPoint, Action, Filter, Priority) -> + emqx_hooks:add(HookPoint, Action, Filter, Priority). --spec(run_hooks(atom(), list(any())) -> ok | stop). -run_hooks(Hook, Args) -> - emqx_hooks:run(Hook, Args). +-spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok). +unhook(HookPoint, Action) -> + emqx_hooks:del(HookPoint, Action). --spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}). -run_hooks(Hook, Args, Acc) -> - emqx_hooks:run(Hook, Args, Acc). +-spec(run_hooks(emqx_hooks:hookpoint(), list(any())) -> ok | stop). +run_hooks(HookPoint, Args) -> + emqx_hooks:run(HookPoint, Args). -%%-------------------------------------------------------------------- +-spec(run_hooks(emqx_hooks:hookpoint(), list(any()), any()) -> {ok | stop, any()}). +run_hooks(HookPoint, Args, Acc) -> + emqx_hooks:run(HookPoint, Args, Acc). + +%%------------------------------------------------------------------------------ %% Shutdown and reboot -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ shutdown() -> shutdown(normal). diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 64c1dc596..073c12870 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -134,16 +134,16 @@ lookup(HookPoint) -> [] -> [] end. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([]) -> _ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), {ok, #{}}. handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> - Reply = case lists:keyfind(Action, 2, Callbacks = lookup(HookPoint)) of + Reply = case lists:keymember(Action, 2, Callbacks = lookup(HookPoint)) of true -> {error, already_exists}; false -> @@ -151,18 +151,18 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat end, {reply, Reply, State}; -handle_call({del, HookPoint, Action}, _From, State) -> - case lists:keydelete(Action, 2, lookup(HookPoint)) of +handle_call(Req, _From, State) -> + emqx_logger:error("[Hooks] unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast({del, HookPoint, Action}, State) -> + case del_callback(Action, lookup(HookPoint)) of [] -> ets:delete(?TAB, HookPoint); Callbacks -> insert_hook(HookPoint, Callbacks) end, - {reply, ok, State}; - -handle_call(Req, _From, State) -> - emqx_logger:error("[Hooks] unexpected call: ~p", [Req]), - {reply, ignored, State}. + {noreply, State}; handle_cast(Msg, State) -> emqx_logger:error("[Hooks] unexpected msg: ~p", [Msg]), @@ -178,9 +178,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ insert_hook(HookPoint, Callbacks) -> ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok. @@ -196,3 +196,17 @@ add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More] add_callback(C1, More, Acc) -> lists:append(lists:reverse(Acc), [C1 | More]). +del_callback(Action, Callbacks) -> + del_callback(Action, Callbacks, []). + +del_callback(_Action, [], Acc) -> + lists:reverse(Acc); +del_callback(Action, [#callback{action = Action} | Callbacks], Acc) -> + del_callback(Action, Callbacks, Acc); +del_callback(Action = {M, F}, [#callback{action = {M, F, _A}} | Callbacks], Acc) -> + del_callback(Action, Callbacks, Acc); +del_callback(Func, [#callback{action = {Func, _A}} | Callbacks], Acc) -> + del_callback(Func, Callbacks, Acc); +del_callback(Action, [Callback | Callbacks], Acc) -> + del_callback(Action, Callbacks, [Callback | Acc]). + diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 812c3267d..59c675f9a 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -19,24 +19,24 @@ -include("emqx.hrl"). -export([load/1, unload/1]). + -export([on_client_connected/4, on_client_disconnected/3]). +-define(ATTR_KEYS, [clean_start, proto_ver, proto_name, keepalive]). + load(Env) -> emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]), emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). on_client_connected(#{client_id := ClientId, username := Username, - peername := {IpAddr, _}}, ConnAck, ConnInfo, Env) -> + peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) -> + Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs), case emqx_json:safe_encode([{clientid, ClientId}, {username, Username}, {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, - {clean_start, proplists:get_value(clean_start, ConnInfo)}, - {proto_ver, proplists:get_value(proto_ver, ConnInfo)}, - {proto_name, proplists:get_value(proto_name, ConnInfo)}, - {keepalive, proplists:get_value(keepalive, ConnInfo)}, {connack, ConnAck}, - {ts, os:system_time(second)}]) of + {ts, os:system_time(second)} | Attrs]) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> @@ -55,20 +55,20 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E end. unload(_Env) -> - emqx_hooks:delete('client.connected', fun ?MODULE:on_client_connected/4), - emqx_hooks:delete('client.disconnected', fun ?MODULE:on_client_disconnected/3). + emqx_hooks:del('client.connected', fun ?MODULE:on_client_connected/4), + emqx_hooks:del('client.disconnected', fun ?MODULE:on_client_disconnected/3). message(QoS, Topic, Payload) -> - Msg = emqx_message:make(?MODULE, QoS, Topic, iolist_to_binary(Payload)), - emqx_message:set_flag(sys, Msg). + emqx_message:set_flag( + sys, emqx_message:make( + ?MODULE, QoS, Topic, iolist_to_binary(Payload))). topic(connected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"])); topic(disconnected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])). -qos(Env) -> - proplists:get_value(qos, Env, 0). +qos(Env) -> proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index a9ff334ce..29dbb660c 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -21,11 +21,15 @@ -export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]). -load(Rules0) -> - Rules = compile(Rules0), - emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Rules]), - emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3, [Rules]), - emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). +%%------------------------------------------------------------------------------ +%% Load/Unload +%%------------------------------------------------------------------------------ + +load(RawRules) -> + Rules = compile(RawRules), + emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Rules]), + emqx_hooks:add('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Rules]), + emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). rewrite_subscribe(_Credentials, TopicTable, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. @@ -37,13 +41,13 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) -> {ok, Message#message{topic = match_rule(Topic, Rules)}}. unload(_) -> - emqx_hooks:delete('client.subscribe', fun ?MODULE:rewrite_subscribe/3), - emqx_hooks:delete('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3), - emqx_hooks:delete('message.publish', fun ?MODULE:rewrite_publish/2). + emqx_hooks:del('client.subscribe', fun ?MODULE:rewrite_subscribe/3), + emqx_hooks:del('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3), + emqx_hooks:del('message.publish', fun ?MODULE:rewrite_publish/2). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ match_rule(Topic, []) -> Topic; diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index e23330f7b..0bd3dc599 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -20,7 +20,6 @@ -define(APP, emqx). -include_lib("eunit/include/eunit.hrl"). - -include_lib("common_test/include/ct.hrl"). -include("emqx.hrl"). @@ -32,7 +31,6 @@ all() -> {group, broker}, {group, metrics}, {group, stats}, - {group, hook}, {group, alarms}]. groups() -> @@ -43,10 +41,8 @@ groups() -> t_shared_subscribe, 'pubsub#', 'pubsub+']}, {session, [sequence], [start_session]}, - {broker, [sequence], [hook_unhook]}, {metrics, [sequence], [inc_dec_metric]}, {stats, [sequence], [set_get_stat]}, - {hook, [sequence], [add_delete_hook, run_hooks]}, {alarms, [sequence], [set_alarms]} ]. @@ -165,8 +161,6 @@ start_session(_) -> %%-------------------------------------------------------------------- %% Broker Group %%-------------------------------------------------------------------- -hook_unhook(_) -> - ok. %%-------------------------------------------------------------------- %% Metric Group @@ -178,61 +172,11 @@ inc_dec_metric(_) -> %%-------------------------------------------------------------------- %% Stats Group %%-------------------------------------------------------------------- + set_get_stat(_) -> emqx_stats:setstat('retained/max', 99), 99 = emqx_stats:getstat('retained/max'). -%%-------------------------------------------------------------------- -%% Hook Test -%%-------------------------------------------------------------------- - -add_delete_hook(_) -> - ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []), - ok = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), - {error, already_hooked} = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), - Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0}, - {callback, tag, fun ?MODULE:hook_fun2/1, [], 0}], - Callbacks = emqx_hooks:lookup(test_hook), - ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1), - ct:print("Callbacks: ~p~n", [emqx_hooks:lookup(test_hook)]), - ok = emqx:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}), - {error, not_found} = emqx:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}), - [] = emqx_hooks:lookup(test_hook), - - ok = emqx:hook(emqx_hook, fun ?MODULE:hook_fun1/1, [], 9), - ok = emqx:hook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8), - Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8}, - {callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}], - Callbacks2 = emqx_hooks:lookup(emqx_hook), - ok = emqx:unhook(emqx_hook, fun ?MODULE:hook_fun1/1), - ok = emqx:unhook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}), - [] = emqx_hooks:lookup(emqx_hook). - -run_hooks(_) -> - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), - ok = emqx:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]), - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), - {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []), - {ok, []} = emqx:run_hooks(unknown_hook, [], []), - - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), - ok = emqx:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]), - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), - stop = emqx:run_hooks(foreach_hook, [arg]). - -hook_fun1([]) -> ok. -hook_fun2([]) -> {ok, []}. - -hook_fun3(arg1, arg2, _Acc, init) -> ok. -hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. -hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. - -hook_fun6(arg, initArg) -> ok. -hook_fun7(arg, initArg) -> any. -hook_fun8(arg, initArg) -> stop. - set_alarms(_) -> AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, emqx_alarm_mgr:set_alarm(AlarmTest), diff --git a/test/emqx_hooks_SUITE.erl b/test/emqx_hooks_SUITE.erl new file mode 100644 index 000000000..b5b278e31 --- /dev/null +++ b/test/emqx_hooks_SUITE.erl @@ -0,0 +1,77 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_hooks_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [add_delete_hook, run_hooks]. + +add_delete_hook(_) -> + {ok, _} = emqx_hooks:start_link(), + ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []), + ok = emqx:hook(test_hook, fun ?MODULE:hook_fun2/1, []), + ?assertEqual({error, already_exists}, + emqx:hook(test_hook, fun ?MODULE:hook_fun2/1, [])), + Callbacks = [{callback, {fun ?MODULE:hook_fun1/1, []}, undefined, 0}, + {callback, {fun ?MODULE:hook_fun2/1, []}, undefined, 0}], + ?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)), + ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1), + ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun2/1), + timer:sleep(1000), + ?assertEqual([], emqx_hooks:lookup(test_hook)), + + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 8), + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun1, []}, 9), + Callbacks2 = [{callback, {?MODULE, hook_fun1, []}, undefined, 9}, + {callback, {?MODULE, hook_fun2, []}, undefined, 8}], + ?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun1, []}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}), + timer:sleep(1000), + ?assertEqual([], emqx_hooks:lookup(emqx_hook)), + ok = emqx_hooks:stop(). + +run_hooks(_) -> + {ok, _} = emqx_hooks:start_link(), + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), + ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}), + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), + {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []), + {ok, []} = emqx:run_hooks(unknown_hook, [], []), + + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), + {error, already_exists} = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), + stop = emqx:run_hooks(foreach_hook, [arg]), + ok = emqx_hooks:stop(). + +hook_fun1([]) -> ok. +hook_fun2([]) -> {ok, []}. + +hook_fun3(arg1, arg2, _Acc, init) -> ok. +hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. +hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. + +hook_fun6(arg, initArg) -> ok. +hook_fun7(arg, initArg) -> any. +hook_fun8(arg, initArg) -> stop. + From 324cc15dd43da40872e67ccb3d10981107e0977b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 17:38:01 +0800 Subject: [PATCH 6/7] Update README --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 41c6ce8b6..b45ac510d 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -# *EMQ X* - MQTT Broker - +# EMQ X Broker *EMQ X* broker is a fully open source, highly scalable, highly available distributed MQTT messaging broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. From d99d0a22d0370a7179e12acd6c2765d26a0fb925 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 17:54:32 +0800 Subject: [PATCH 7/7] Rename 'ignore' to 'ignored' --- src/emqx_access_control.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 140f82bb6..1b9d76937 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -183,7 +183,7 @@ handle_call(stop, _From, State) -> handle_call(Req, _From, State) -> emqx_logger:error("[AccessControl] unexpected request: ~p", [Req]), - {reply, ignore, State}. + {reply, ignored, State}. handle_cast(Msg, State) -> emqx_logger:error("[AccessControl] unexpected msg: ~p", [Msg]),