From eb53d366e9032faef940d9b34ff4f18b98ce3a43 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 3 Sep 2018 17:57:53 +0800 Subject: [PATCH 01/21] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2f9e31cd8..41c6ce8b6 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # *EMQ X* - MQTT Broker -*EMQ X* broker is a fully open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. +*EMQ X* broker is a fully open source, highly scalable, highly available distributed MQTT messaging broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. From 96122cf966d477eae1d2c88f4b9d2a88194abef2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 4 Sep 2018 19:14:25 +0800 Subject: [PATCH 02/21] Rename 'already_existed' to 'already_exists' --- src/emqx_access_control.erl | 2 +- src/emqx_tracer.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 8301bd8d8..140f82bb6 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -154,7 +154,7 @@ init([]) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), reply(case lists:keymember(Mod, 1, Mods) of - true -> {error, already_existed}; + true -> {error, already_exists}; false -> case catch Mod:init(Opts) of {ok, ModState} -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 65e6f6378..44ad6c26f 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -78,7 +78,7 @@ init([]) -> handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) -> case catch lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of {ok, exists} -> - {reply, {error, already_existed}, State}; + {reply, {error, already_exists}, State}; {ok, Trace} -> {reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}}; {error, Reason} -> From c8b92a59b167b4b292528158cf55ea8dec97132f Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Wed, 5 Sep 2018 19:06:34 +0800 Subject: [PATCH 03/21] check topic alias --- src/emqx_mqtt_caps.erl | 10 +++++-- src/emqx_protocol.erl | 66 ++++++++++++++++++++++-------------------- 2 files changed, 42 insertions(+), 34 deletions(-) diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index fdc29fae8..baec9920c 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -43,7 +43,9 @@ {mqtt_wildcard_subscription, true}]). -define(PUBCAP_KEYS, [max_qos_allowed, - mqtt_retain_available]). + mqtt_retain_available, + mqtt_topic_alias + ]). -define(SUBCAP_KEYS, [max_qos_allowed, max_topic_levels, mqtt_shared_subscription, @@ -60,6 +62,11 @@ do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) -> true -> {error, ?RC_QOS_NOT_SUPPORTED}; false -> do_check_pub(Props, Caps) end; +do_check_pub(Props = #{ topic_alias := TopicAlias}, [{max_topic_alias, MaxTopicAlias}| Caps]) -> + case TopicAlias =< MaxTopicAlias andalso TopicAlias > 0 of + false -> {error, ?RC_TOPIC_ALIAS_INVALID}; + true -> do_check_pub(Props, Caps) + end; do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) -> @@ -136,4 +143,3 @@ with_env(Zone, Key, InitFun) -> Caps; ZoneCaps -> ZoneCaps end. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index ec104799e..898d0a04e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -33,36 +33,36 @@ -export([shutdown/2]). -record(pstate, { - zone, - sendfun, - peername, - peercert, - proto_ver, - proto_name, - ackprops, - client_id, - is_assigned, - conn_pid, - conn_props, - ack_props, - username, - session, - clean_start, - topic_aliases, - packet_size, - will_topic, - will_msg, - keepalive, - mountpoint, - is_super, - is_bridge, - enable_ban, - enable_acl, - recv_stats, - send_stats, - connected, - connected_at - }). + zone, + sendfun, + peername, + peercert, + proto_ver, + proto_name, + ackprops, + client_id, + is_assigned, + conn_pid, + conn_props, + ack_props, + username, + session, + clean_start, + topic_aliases, + packet_size, + will_topic, + will_msg, + keepalive, + mountpoint, + is_super, + is_bridge, + enable_ban, + enable_acl, + recv_stats, + send_stats, + connected, + connected_at + }). -type(state() :: #pstate{}). -export_type([state/0]). @@ -631,9 +631,11 @@ check_publish(Packet, PState) -> run_check_steps([fun check_pub_caps/2, fun check_pub_acl/2], Packet, PState). -check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = R}}, +check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Retain}, + variable = #mqtt_packet_publish{ properties = Properties}}, #pstate{zone = Zone}) -> - emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => R}). + #{'Topic-Alias' := TopicAlias} = Properties, + emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic_alias => TopicAlias}). check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> From 46359214589adc41f8ac5caaff405163ac660e8f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 5 Sep 2018 23:21:06 +0800 Subject: [PATCH 04/21] Rewrite the hooks module --- src/emqx_hooks.erl | 199 ++++++++++++++++++++++++--------------------- 1 file changed, 106 insertions(+), 93 deletions(-) diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 0aa3e274c..64c1dc596 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -16,142 +16,160 @@ -behaviour(gen_server). --export([start_link/0]). +-export([start_link/0, stop/0]). %% Hooks API --export([add/3, add/4, delete/2, run/2, run/3, lookup/1]). +-export([add/2, add/3, add/4, del/2, run/2, run/3, lookup/1]). %% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). --record(state, {}). +-type(hookpoint() :: atom()). +-type(action() :: function() | mfa()). +-type(filter() :: function() | mfa()). --type(hooktag() :: atom() | string() | binary()). +-record(callback, {action :: action(), + filter :: filter(), + priority :: integer()}). --export_type([hooktag/0]). +-record(hook, {name :: hookpoint(), callbacks :: list(#callback{})}). --record(callback, {tag :: hooktag(), - function :: function(), - init_args = [] :: list(any()), - priority = 0 :: integer()}). - --record(hook, {name :: atom(), callbacks = [] :: list(#callback{})}). +-export_type([hookpoint/0, action/0, filter/0]). -define(TAB, ?MODULE). +-define(SERVER, ?MODULE). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{hibernate_after, 60000}]). -%%-------------------------------------------------------------------- +-spec(stop() -> ok). +stop() -> + gen_server:stop(?SERVER, normal, infinity). + +%%------------------------------------------------------------------------------ %% Hooks API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ --spec(add(atom(), function() | {hooktag(), function()}, list(any())) -> ok). -add(HookPoint, Function, InitArgs) when is_function(Function) -> - add(HookPoint, {undefined, Function}, InitArgs, 0); +%% @doc Register a callback +-spec(add(hookpoint(), action() | #callback{}) -> emqx_types:ok_or_error(already_exists)). +add(HookPoint, Callback) when is_record(Callback, callback) -> + gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity); +add(HookPoint, Action) when is_function(Action); is_tuple(Action) -> + add(HookPoint, #callback{action = Action, priority = 0}). -add(HookPoint, {Tag, Function}, InitArgs) when is_function(Function) -> - add(HookPoint, {Tag, Function}, InitArgs, 0). +-spec(add(hookpoint(), action(), filter() | integer() | list()) + -> emqx_types:ok_or_error(already_exists)). +add(HookPoint, Action, InitArgs) when is_function(Action), is_list(InitArgs) -> + add(HookPoint, #callback{action = {Action, InitArgs}, priority = 0}); +add(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) -> + add(HookPoint, #callback{action = Action, filter = Filter, priority = 0}); +add(HookPoint, Action, Priority) when is_integer(Priority) -> + add(HookPoint, #callback{action = Action, priority = Priority}). --spec(add(atom(), function() | {hooktag(), function()}, list(any()), integer()) -> ok). -add(HookPoint, Function, InitArgs, Priority) when is_function(Function) -> - add(HookPoint, {undefined, Function}, InitArgs, Priority); -add(HookPoint, {Tag, Function}, InitArgs, Priority) when is_function(Function) -> - gen_server:call(?MODULE, {add, HookPoint, {Tag, Function}, InitArgs, Priority}). +-spec(add(hookpoint(), action(), filter(), integer()) + -> emqx_types:ok_or_error(already_exists)). +add(HookPoint, Action, Filter, Priority) -> + add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). --spec(delete(atom(), function() | {hooktag(), function()}) -> ok). -delete(HookPoint, Function) when is_function(Function) -> - delete(HookPoint, {undefined, Function}); -delete(HookPoint, {Tag, Function}) when is_function(Function) -> - gen_server:call(?MODULE, {delete, HookPoint, {Tag, Function}}). +%% @doc Unregister a callback. +-spec(del(hookpoint(), action()) -> ok). +del(HookPoint, Action) -> + gen_server:cast(?SERVER, {del, HookPoint, Action}). -%% @doc Run hooks without Acc. +%% @doc Run hooks. -spec(run(atom(), list(Arg :: any())) -> ok | stop). run(HookPoint, Args) -> run_(lookup(HookPoint), Args). +%% @doc Run hooks with Accumulator. -spec(run(atom(), list(Arg :: any()), any()) -> any()). run(HookPoint, Args, Acc) -> run_(lookup(HookPoint), Args, Acc). %% @private -run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args) -> - case apply(Fun, lists:append([Args, InitArgs])) of +run_([#callback{action = Action, filter = Filter} | Callbacks], Args) -> + case filtered(Filter, Args) orelse execute(Action, Args) of + true -> run_(Callbacks, Args); ok -> run_(Callbacks, Args); stop -> stop; _Any -> run_(Callbacks, Args) end; - run_([], _Args) -> ok. %% @private -run_([#callback{function = Fun, init_args = InitArgs} | Callbacks], Args, Acc) -> - case apply(Fun, lists:append([Args, [Acc], InitArgs])) of +run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> + Args1 = Args ++ [Acc], + case filtered(Filter, Args1) orelse execute(Action, Args1) of + true -> run_(Callbacks, Args, Acc); ok -> run_(Callbacks, Args, Acc); {ok, NewAcc} -> run_(Callbacks, Args, NewAcc); stop -> {stop, Acc}; {stop, NewAcc} -> {stop, NewAcc}; _Any -> run_(Callbacks, Args, Acc) end; - run_([], _Args, Acc) -> {ok, Acc}. --spec(lookup(atom()) -> [#callback{}]). +filtered(undefined, _Args) -> + false; +filtered(Filter, Args) -> + execute(Filter, Args). + +execute(Action, Args) when is_function(Action) -> + erlang:apply(Action, Args); +execute({Fun, InitArgs}, Args) when is_function(Fun) -> + erlang:apply(Fun, Args ++ InitArgs); +execute({M, F, A}, Args) -> + erlang:apply(M, F, Args ++ A). + +%% @doc Lookup callbacks. +-spec(lookup(hookpoint()) -> [#callback{}]). lookup(HookPoint) -> case ets:lookup(?TAB, HookPoint) of - [#hook{callbacks = Callbacks}] -> Callbacks; + [#hook{callbacks = Callbacks}] -> + Callbacks; [] -> [] end. -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- %% gen_server callbacks -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- init([]) -> - _ = emqx_tables:new(?TAB, [set, protected, {keypos, #hook.name}, - {read_concurrency, true}]), - {ok, #state{}}. + _ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), + {ok, #{}}. -handle_call({add, HookPoint, {Tag, Function}, InitArgs, Priority}, _From, State) -> - Callback = #callback{tag = Tag, function = Function, - init_args = InitArgs, priority = Priority}, - {reply, - case ets:lookup(?TAB, HookPoint) of - [#hook{callbacks = Callbacks}] -> - case contain_(Tag, Function, Callbacks) of - false -> - insert_hook_(HookPoint, add_callback_(Callback, Callbacks)); - true -> - {error, already_hooked} - end; - [] -> - insert_hook_(HookPoint, [Callback]) - end, State}; +handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> + Reply = case lists:keyfind(Action, 2, Callbacks = lookup(HookPoint)) of + true -> + {error, already_exists}; + false -> + insert_hook(HookPoint, add_callback(Callback, Callbacks)) + end, + {reply, Reply, State}; -handle_call({delete, HookPoint, {Tag, Function}}, _From, State) -> - {reply, - case ets:lookup(?TAB, HookPoint) of - [#hook{callbacks = Callbacks}] -> - case contain_(Tag, Function, Callbacks) of - true -> - insert_hook_(HookPoint, del_callback_(Tag, Function, Callbacks)); - false -> - {error, not_found} - end; - [] -> - {error, not_found} - end, State}; +handle_call({del, HookPoint, Action}, _From, State) -> + case lists:keydelete(Action, 2, lookup(HookPoint)) of + [] -> + ets:delete(?TAB, HookPoint); + Callbacks -> + insert_hook(HookPoint, Callbacks) + end, + {reply, ok, State}; handle_call(Req, _From, State) -> - {reply, {error, {unexpected_request, Req}}, State}. + emqx_logger:error("[Hooks] unexpected call: ~p", [Req]), + {reply, ignored, State}. -handle_cast(_Msg, State) -> +handle_cast(Msg, State) -> + emqx_logger:error("[Hooks] unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info(_Info, State) -> +handle_info(Info, State) -> + emqx_logger:error("[Hooks] unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -160,26 +178,21 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- %% Internal functions -%%-------------------------------------------------------------------- +%%----------------------------------------------------------------------------- -insert_hook_(HookPoint, Callbacks) -> +insert_hook(HookPoint, Callbacks) -> ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok. -add_callback_(Callback, Callbacks) -> - lists:keymerge(#callback.priority, Callbacks, [Callback]). +add_callback(C, Callbacks) -> + add_callback(C, Callbacks, []). -del_callback_(Tag, Function, Callbacks) -> - lists:filter( - fun(#callback{tag = Tag1, function = Func1}) -> - not ((Tag =:= Tag1) andalso (Function =:= Func1)) - end, Callbacks). - -contain_(_Tag, _Function, []) -> - false; -contain_(Tag, Function, [#callback{tag = Tag, function = Function}|_Callbacks]) -> - true; -contain_(Tag, Function, [_Callback | Callbacks]) -> - contain_(Tag, Function, Callbacks). +add_callback(C, [], Acc) -> + lists:reverse([C|Acc]); +add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More], Acc) + when P1 =< P2 -> + add_callback(C1, More, [C2|Acc]); +add_callback(C1, More, Acc) -> + lists:append(lists:reverse(Acc), [C1 | More]). From 5e3aed0b7366aa68d6744187bda04f4de3d41977 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 09:10:47 +0800 Subject: [PATCH 05/21] Add ok_or_error/1 type --- src/emqx_types.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index d31f37303..960aa699a 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -17,7 +17,7 @@ -include("emqx.hrl"). -export_type([zone/0]). --export_type([startlink_ret/0]). +-export_type([startlink_ret/0, ok_or_error/1]). -export_type([pubsub/0, topic/0, subid/0, subopts/0]). -export_type([client_id/0, username/0, password/0, peername/0, protocol/0]). -export_type([credentials/0, session/0]). @@ -29,6 +29,7 @@ -type(zone() :: atom()). -type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}). +-type(ok_or_error(Reason) :: ok | {error, Reason}). -type(pubsub() :: publish | subscribe). -type(topic() :: binary()). -type(subid() :: binary() | atom()). From 876a983e93f7982b5624bd6c1b303e82f92c3c95 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 6 Sep 2018 13:37:26 +0800 Subject: [PATCH 06/21] Pub Packet delivered from client to server cannot contain sub id --- src/emqx_packet.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 715526964..c3eab87e1 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -69,9 +69,9 @@ validate_packet_id(0) -> validate_packet_id(_) -> true. -validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) - when I =< 0; I >= 16#FFFFFFF -> - error(subscription_identifier_invalid); +validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := _I}) -> + %% when I =< 0; I >= 16#FFFFFFF -> + error(protocol_error); validate_properties(?PUBLISH, # {'Topic-Alias':= I}) when I =:= 0 -> error(topic_alias_invalid); @@ -236,4 +236,3 @@ format_password(_Password) -> '******'. i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. - From 9189d4ff41f00f6011fb2cc37eb11bebbb78bdea Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 6 Sep 2018 14:24:07 +0800 Subject: [PATCH 07/21] catch topic_alias_invalid reasoncode --- src/emqx_protocol.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 898d0a04e..025fe9c93 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -312,9 +312,12 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); + {error, ?RC_TOPIC_ALIAS_INVALID} -> + ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID], PState), + {error, ?RC_TOPIC_ALIAS_INVALID, PState}; {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, ReasonCode], PState), - {ok, PState} + {error, ReasonCode, PState} end; process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) -> From c145cb89f406d4c481bedf459976fa93f09d9e84 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 6 Sep 2018 15:45:18 +0800 Subject: [PATCH 08/21] add validate_properties for PUBLISH and fix error for SUB --- src/emqx_packet.erl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index c3eab87e1..fc90cf492 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -69,12 +69,14 @@ validate_packet_id(0) -> validate_packet_id(_) -> true. -validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := _I}) -> - %% when I =< 0; I >= 16#FFFFFFF -> - error(protocol_error); -validate_properties(?PUBLISH, # {'Topic-Alias':= I}) +validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) + when I =< 0; I >= 16#FFFFFFF -> + error(subscription_identifier_invalid); +validate_properties(?PUBLISH, #{'Topic-Alias':= I}) when I =:= 0 -> error(topic_alias_invalid); +validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> + error(protocol_error); validate_properties(_, _) -> true. From 2a751055808c7b77bdb375e022582b82bfe091a0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 16:27:16 +0800 Subject: [PATCH 09/21] Improve the Hooks's design --- src/emqx.erl | 65 ++++++++++++++++++-------------- src/emqx_hooks.erl | 38 +++++++++++++------ src/emqx_mod_presence.erl | 24 ++++++------ src/emqx_mod_rewrite.erl | 24 +++++++----- test/emqx_broker_SUITE.erl | 58 +--------------------------- test/emqx_hooks_SUITE.erl | 77 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 120 deletions(-) create mode 100644 test/emqx_hooks_SUITE.erl diff --git a/src/emqx.erl b/src/emqx.erl index 217611171..72f1d6f81 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -29,16 +29,16 @@ -export([get_subopts/2, set_subopts/3]). %% Hooks API --export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]). +-export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]). %% Shutdown and reboot -export([shutdown/0, shutdown/1, reboot/0]). -define(APP, ?MODULE). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Bootstrap, is_running... -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% @doc Start emqx application -spec(start() -> {ok, list(atom())} | {error, term()}). @@ -62,9 +62,9 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% PubSub API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec(subscribe(emqx_topic:topic() | string()) -> ok). subscribe(Topic) -> @@ -97,9 +97,9 @@ unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> unsubscribe(Topic, SubPid) when is_pid(SubPid) -> emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% PubSub management API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber()) -> emqx_types:subopts()). @@ -128,36 +128,43 @@ subscribed(Topic, SubPid) when is_pid(SubPid) -> subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> emqx_broker:subscribed(iolist_to_binary(Topic), SubId). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Hooks API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ --spec(hook(atom(), function() | {emqx_hooks:hooktag(), function()}, list(any())) - -> ok | {error, term()}). -hook(Hook, TagFunction, InitArgs) -> - emqx_hooks:add(Hook, TagFunction, InitArgs). +-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}). +hook(HookPoint, Action) -> + emqx_hooks:add(HookPoint, Action). --spec(hook(atom(), function() | {emqx_hooks:hooktag(), function()}, list(any()), integer()) - -> ok | {error, term()}). -hook(Hook, TagFunction, InitArgs, Priority) -> - emqx_hooks:add(Hook, TagFunction, InitArgs, Priority). +-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter() | integer()) + -> ok | {error, already_exists}). +hook(HookPoint, Action, Priority) when is_integer(Priority) -> + emqx_hooks:add(HookPoint, Action, Priority); +hook(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) -> + emqx_hooks:add(HookPoint, Action, Filter); +hook(HookPoint, Action, InitArgs) when is_list(InitArgs) -> + emqx_hooks:add(HookPoint, Action, InitArgs). --spec(unhook(atom(), function() | {emqx_hooks:hooktag(), function()}) - -> ok | {error, term()}). -unhook(Hook, TagFunction) -> - emqx_hooks:delete(Hook, TagFunction). +-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer()) + -> ok | {error, already_exists}). +hook(HookPoint, Action, Filter, Priority) -> + emqx_hooks:add(HookPoint, Action, Filter, Priority). --spec(run_hooks(atom(), list(any())) -> ok | stop). -run_hooks(Hook, Args) -> - emqx_hooks:run(Hook, Args). +-spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok). +unhook(HookPoint, Action) -> + emqx_hooks:del(HookPoint, Action). --spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}). -run_hooks(Hook, Args, Acc) -> - emqx_hooks:run(Hook, Args, Acc). +-spec(run_hooks(emqx_hooks:hookpoint(), list(any())) -> ok | stop). +run_hooks(HookPoint, Args) -> + emqx_hooks:run(HookPoint, Args). -%%-------------------------------------------------------------------- +-spec(run_hooks(emqx_hooks:hookpoint(), list(any()), any()) -> {ok | stop, any()}). +run_hooks(HookPoint, Args, Acc) -> + emqx_hooks:run(HookPoint, Args, Acc). + +%%------------------------------------------------------------------------------ %% Shutdown and reboot -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ shutdown() -> shutdown(normal). diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 64c1dc596..073c12870 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -134,16 +134,16 @@ lookup(HookPoint) -> [] -> [] end. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([]) -> _ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), {ok, #{}}. handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> - Reply = case lists:keyfind(Action, 2, Callbacks = lookup(HookPoint)) of + Reply = case lists:keymember(Action, 2, Callbacks = lookup(HookPoint)) of true -> {error, already_exists}; false -> @@ -151,18 +151,18 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat end, {reply, Reply, State}; -handle_call({del, HookPoint, Action}, _From, State) -> - case lists:keydelete(Action, 2, lookup(HookPoint)) of +handle_call(Req, _From, State) -> + emqx_logger:error("[Hooks] unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast({del, HookPoint, Action}, State) -> + case del_callback(Action, lookup(HookPoint)) of [] -> ets:delete(?TAB, HookPoint); Callbacks -> insert_hook(HookPoint, Callbacks) end, - {reply, ok, State}; - -handle_call(Req, _From, State) -> - emqx_logger:error("[Hooks] unexpected call: ~p", [Req]), - {reply, ignored, State}. + {noreply, State}; handle_cast(Msg, State) -> emqx_logger:error("[Hooks] unexpected msg: ~p", [Msg]), @@ -178,9 +178,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ insert_hook(HookPoint, Callbacks) -> ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok. @@ -196,3 +196,17 @@ add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More] add_callback(C1, More, Acc) -> lists:append(lists:reverse(Acc), [C1 | More]). +del_callback(Action, Callbacks) -> + del_callback(Action, Callbacks, []). + +del_callback(_Action, [], Acc) -> + lists:reverse(Acc); +del_callback(Action, [#callback{action = Action} | Callbacks], Acc) -> + del_callback(Action, Callbacks, Acc); +del_callback(Action = {M, F}, [#callback{action = {M, F, _A}} | Callbacks], Acc) -> + del_callback(Action, Callbacks, Acc); +del_callback(Func, [#callback{action = {Func, _A}} | Callbacks], Acc) -> + del_callback(Func, Callbacks, Acc); +del_callback(Action, [Callback | Callbacks], Acc) -> + del_callback(Action, Callbacks, [Callback | Acc]). + diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 812c3267d..59c675f9a 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -19,24 +19,24 @@ -include("emqx.hrl"). -export([load/1, unload/1]). + -export([on_client_connected/4, on_client_disconnected/3]). +-define(ATTR_KEYS, [clean_start, proto_ver, proto_name, keepalive]). + load(Env) -> emqx_hooks:add('client.connected', fun ?MODULE:on_client_connected/4, [Env]), emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). on_client_connected(#{client_id := ClientId, username := Username, - peername := {IpAddr, _}}, ConnAck, ConnInfo, Env) -> + peername := {IpAddr, _}}, ConnAck, ConnAttrs, Env) -> + Attrs = lists:filter(fun({K, _}) -> lists:member(K, ?ATTR_KEYS) end, ConnAttrs), case emqx_json:safe_encode([{clientid, ClientId}, {username, Username}, {ipaddress, iolist_to_binary(esockd_net:ntoa(IpAddr))}, - {clean_start, proplists:get_value(clean_start, ConnInfo)}, - {proto_ver, proplists:get_value(proto_ver, ConnInfo)}, - {proto_name, proplists:get_value(proto_name, ConnInfo)}, - {keepalive, proplists:get_value(keepalive, ConnInfo)}, {connack, ConnAck}, - {ts, os:system_time(second)}]) of + {ts, os:system_time(second)} | Attrs]) of {ok, Payload} -> emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); {error, Reason} -> @@ -55,20 +55,20 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E end. unload(_Env) -> - emqx_hooks:delete('client.connected', fun ?MODULE:on_client_connected/4), - emqx_hooks:delete('client.disconnected', fun ?MODULE:on_client_disconnected/3). + emqx_hooks:del('client.connected', fun ?MODULE:on_client_connected/4), + emqx_hooks:del('client.disconnected', fun ?MODULE:on_client_disconnected/3). message(QoS, Topic, Payload) -> - Msg = emqx_message:make(?MODULE, QoS, Topic, iolist_to_binary(Payload)), - emqx_message:set_flag(sys, Msg). + emqx_message:set_flag( + sys, emqx_message:make( + ?MODULE, QoS, Topic, iolist_to_binary(Payload))). topic(connected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"])); topic(disconnected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])). -qos(Env) -> - proplists:get_value(qos, Env, 0). +qos(Env) -> proplists:get_value(qos, Env, 0). reason(Reason) when is_atom(Reason) -> Reason; reason({Error, _}) when is_atom(Error) -> Error; diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index a9ff334ce..29dbb660c 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -21,11 +21,15 @@ -export([rewrite_subscribe/3, rewrite_unsubscribe/3, rewrite_publish/2]). -load(Rules0) -> - Rules = compile(Rules0), - emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Rules]), - emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3, [Rules]), - emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). +%%------------------------------------------------------------------------------ +%% Load/Unload +%%------------------------------------------------------------------------------ + +load(RawRules) -> + Rules = compile(RawRules), + emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Rules]), + emqx_hooks:add('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3, [Rules]), + emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). rewrite_subscribe(_Credentials, TopicTable, Rules) -> {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. @@ -37,13 +41,13 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) -> {ok, Message#message{topic = match_rule(Topic, Rules)}}. unload(_) -> - emqx_hooks:delete('client.subscribe', fun ?MODULE:rewrite_subscribe/3), - emqx_hooks:delete('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3), - emqx_hooks:delete('message.publish', fun ?MODULE:rewrite_publish/2). + emqx_hooks:del('client.subscribe', fun ?MODULE:rewrite_subscribe/3), + emqx_hooks:del('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/3), + emqx_hooks:del('message.publish', fun ?MODULE:rewrite_publish/2). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ match_rule(Topic, []) -> Topic; diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index e23330f7b..0bd3dc599 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -20,7 +20,6 @@ -define(APP, emqx). -include_lib("eunit/include/eunit.hrl"). - -include_lib("common_test/include/ct.hrl"). -include("emqx.hrl"). @@ -32,7 +31,6 @@ all() -> {group, broker}, {group, metrics}, {group, stats}, - {group, hook}, {group, alarms}]. groups() -> @@ -43,10 +41,8 @@ groups() -> t_shared_subscribe, 'pubsub#', 'pubsub+']}, {session, [sequence], [start_session]}, - {broker, [sequence], [hook_unhook]}, {metrics, [sequence], [inc_dec_metric]}, {stats, [sequence], [set_get_stat]}, - {hook, [sequence], [add_delete_hook, run_hooks]}, {alarms, [sequence], [set_alarms]} ]. @@ -165,8 +161,6 @@ start_session(_) -> %%-------------------------------------------------------------------- %% Broker Group %%-------------------------------------------------------------------- -hook_unhook(_) -> - ok. %%-------------------------------------------------------------------- %% Metric Group @@ -178,61 +172,11 @@ inc_dec_metric(_) -> %%-------------------------------------------------------------------- %% Stats Group %%-------------------------------------------------------------------- + set_get_stat(_) -> emqx_stats:setstat('retained/max', 99), 99 = emqx_stats:getstat('retained/max'). -%%-------------------------------------------------------------------- -%% Hook Test -%%-------------------------------------------------------------------- - -add_delete_hook(_) -> - ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []), - ok = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), - {error, already_hooked} = emqx:hook(test_hook, {tag, fun ?MODULE:hook_fun2/1}, []), - Callbacks = [{callback, undefined, fun ?MODULE:hook_fun1/1, [], 0}, - {callback, tag, fun ?MODULE:hook_fun2/1, [], 0}], - Callbacks = emqx_hooks:lookup(test_hook), - ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1), - ct:print("Callbacks: ~p~n", [emqx_hooks:lookup(test_hook)]), - ok = emqx:unhook(test_hook, {tag, fun ?MODULE:hook_fun2/1}), - {error, not_found} = emqx:unhook(test_hook1, {tag, fun ?MODULE:hook_fun2/1}), - [] = emqx_hooks:lookup(test_hook), - - ok = emqx:hook(emqx_hook, fun ?MODULE:hook_fun1/1, [], 9), - ok = emqx:hook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}, [], 8), - Callbacks2 = [{callback, "tag", fun ?MODULE:hook_fun2/1, [], 8}, - {callback, undefined, fun ?MODULE:hook_fun1/1, [], 9}], - Callbacks2 = emqx_hooks:lookup(emqx_hook), - ok = emqx:unhook(emqx_hook, fun ?MODULE:hook_fun1/1), - ok = emqx:unhook(emqx_hook, {"tag", fun ?MODULE:hook_fun2/1}), - [] = emqx_hooks:lookup(emqx_hook). - -run_hooks(_) -> - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), - ok = emqx:hook(foldl_hook, {tag, fun ?MODULE:hook_fun3/4}, [init]), - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), - ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), - {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []), - {ok, []} = emqx:run_hooks(unknown_hook, [], []), - - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), - ok = emqx:hook(foreach_hook, {tag, fun ?MODULE:hook_fun6/2}, [initArg]), - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), - ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), - stop = emqx:run_hooks(foreach_hook, [arg]). - -hook_fun1([]) -> ok. -hook_fun2([]) -> {ok, []}. - -hook_fun3(arg1, arg2, _Acc, init) -> ok. -hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. -hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. - -hook_fun6(arg, initArg) -> ok. -hook_fun7(arg, initArg) -> any. -hook_fun8(arg, initArg) -> stop. - set_alarms(_) -> AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, emqx_alarm_mgr:set_alarm(AlarmTest), diff --git a/test/emqx_hooks_SUITE.erl b/test/emqx_hooks_SUITE.erl new file mode 100644 index 000000000..b5b278e31 --- /dev/null +++ b/test/emqx_hooks_SUITE.erl @@ -0,0 +1,77 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_hooks_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [add_delete_hook, run_hooks]. + +add_delete_hook(_) -> + {ok, _} = emqx_hooks:start_link(), + ok = emqx:hook(test_hook, fun ?MODULE:hook_fun1/1, []), + ok = emqx:hook(test_hook, fun ?MODULE:hook_fun2/1, []), + ?assertEqual({error, already_exists}, + emqx:hook(test_hook, fun ?MODULE:hook_fun2/1, [])), + Callbacks = [{callback, {fun ?MODULE:hook_fun1/1, []}, undefined, 0}, + {callback, {fun ?MODULE:hook_fun2/1, []}, undefined, 0}], + ?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)), + ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1), + ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun2/1), + timer:sleep(1000), + ?assertEqual([], emqx_hooks:lookup(test_hook)), + + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 8), + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun1, []}, 9), + Callbacks2 = [{callback, {?MODULE, hook_fun1, []}, undefined, 9}, + {callback, {?MODULE, hook_fun2, []}, undefined, 8}], + ?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun1, []}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}), + timer:sleep(1000), + ?assertEqual([], emqx_hooks:lookup(emqx_hook)), + ok = emqx_hooks:stop(). + +run_hooks(_) -> + {ok, _} = emqx_hooks:start_link(), + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun3/4, [init]), + ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}), + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun4/4, [init]), + ok = emqx:hook(foldl_hook, fun ?MODULE:hook_fun5/4, [init]), + {stop, [r3, r2]} = emqx:run_hooks(foldl_hook, [arg1, arg2], []), + {ok, []} = emqx:run_hooks(unknown_hook, [], []), + + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), + {error, already_exists} = emqx:hook(foreach_hook, fun ?MODULE:hook_fun6/2, [initArg]), + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]), + ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]), + stop = emqx:run_hooks(foreach_hook, [arg]), + ok = emqx_hooks:stop(). + +hook_fun1([]) -> ok. +hook_fun2([]) -> {ok, []}. + +hook_fun3(arg1, arg2, _Acc, init) -> ok. +hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. +hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. + +hook_fun6(arg, initArg) -> ok. +hook_fun7(arg, initArg) -> any. +hook_fun8(arg, initArg) -> stop. + From 324cc15dd43da40872e67ccb3d10981107e0977b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 17:38:01 +0800 Subject: [PATCH 10/21] Update README --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 41c6ce8b6..b45ac510d 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -# *EMQ X* - MQTT Broker - +# EMQ X Broker *EMQ X* broker is a fully open source, highly scalable, highly available distributed MQTT messaging broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. From d99d0a22d0370a7179e12acd6c2765d26a0fb925 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 17:54:32 +0800 Subject: [PATCH 11/21] Rename 'ignore' to 'ignored' --- src/emqx_access_control.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 140f82bb6..1b9d76937 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -183,7 +183,7 @@ handle_call(stop, _From, State) -> handle_call(Req, _From, State) -> emqx_logger:error("[AccessControl] unexpected request: ~p", [Req]), - {reply, ignore, State}. + {reply, ignored, State}. handle_cast(Msg, State) -> emqx_logger:error("[AccessControl] unexpected msg: ~p", [Msg]), From edf654727c47409508db4c9bb96fa06713603b25 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 18:09:22 +0800 Subject: [PATCH 12/21] Rename emqx_mqtt_properties module to emqx_mqtt_props --- src/emqx_client.erl | 6 +++--- src/{emqx_mqtt_properties.erl => emqx_mqtt_props.erl} | 2 +- ...tt_properties_SUITE.erl => emqx_mqtt_props_SUITE.erl} | 9 +++++---- 3 files changed, 9 insertions(+), 8 deletions(-) rename src/{emqx_mqtt_properties.erl => emqx_mqtt_props.erl} (99%) rename test/{emqx_mqtt_properties_SUITE.erl => emqx_mqtt_props_SUITE.erl} (73%) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 192569ca4..85d6ca59d 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -159,7 +159,7 @@ start_link() -> start_link([]). start_link(Options) when is_map(Options) -> start_link(maps:to_list(Options)); start_link(Options) when is_list(Options) -> - ok = emqx_mqtt_properties:validate( + ok = emqx_mqtt_props:validate( proplists:get_value(properties, Options, #{})), case start_client(with_owner(Options)) of {ok, Client} -> @@ -265,7 +265,7 @@ publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) -> -> ok | {ok, packet_id()} | {error, term()}). publish(Client, Topic, Properties, Payload, Opts) when is_binary(Topic), is_map(Properties), is_list(Opts) -> - ok = emqx_mqtt_properties:validate(Properties), + ok = emqx_mqtt_props:validate(Properties), Retain = proplists:get_bool(retain, Opts), QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), publish(Client, #mqtt_msg{qos = QoS, @@ -541,7 +541,7 @@ mqtt_connect(State = #state{client_id = ClientId, will_msg = WillMsg, properties = Properties}) -> ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, - ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties), + ConnProps = emqx_mqtt_props:filter(?CONNECT, Properties), send(?CONNECT_PACKET( #mqtt_packet_connect{proto_ver = ProtoVer, proto_name = ProtoName, diff --git a/src/emqx_mqtt_properties.erl b/src/emqx_mqtt_props.erl similarity index 99% rename from src/emqx_mqtt_properties.erl rename to src/emqx_mqtt_props.erl index 643156013..33acb360b 100644 --- a/src/emqx_mqtt_properties.erl +++ b/src/emqx_mqtt_props.erl @@ -13,7 +13,7 @@ %% limitations under the License. %% @doc MQTT5 Properties --module(emqx_mqtt_properties). +-module(emqx_mqtt_props). -include("emqx_mqtt.hrl"). diff --git a/test/emqx_mqtt_properties_SUITE.erl b/test/emqx_mqtt_props_SUITE.erl similarity index 73% rename from test/emqx_mqtt_properties_SUITE.erl rename to test/emqx_mqtt_props_SUITE.erl index a8301d1f4..8d3b16a14 100644 --- a/test/emqx_mqtt_properties_SUITE.erl +++ b/test/emqx_mqtt_props_SUITE.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_mqtt_properties_SUITE). +-module(emqx_mqtt_props_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -22,6 +22,7 @@ all() -> [t_mqtt_properties_all]. t_mqtt_properties_all(_) -> - Props = emqx_mqtt_properties:filter(?CONNECT, #{'Session-Expiry-Interval' => 1, 'Maximum-Packet-Size' => 255}), - ok = emqx_mqtt_properties:validate(Props), - #{} = emqx_mqtt_properties:filter(?CONNECT, #{'Maximum-QoS' => ?QOS_2}). + Props = emqx_mqtt_props:filter(?CONNECT, #{'Session-Expiry-Interval' => 1, 'Maximum-Packet-Size' => 255}), + ok = emqx_mqtt_props:validate(Props), + #{} = emqx_mqtt_props:filter(?CONNECT, #{'Maximum-QoS' => ?QOS_2}). + From 7c688a483949a217b0587cb0b3780f8393c5e6df Mon Sep 17 00:00:00 2001 From: HuangDan Date: Thu, 6 Sep 2018 18:09:58 +0800 Subject: [PATCH 13/21] Add test case for mqtt5 connect packet --- test/emqx_SUITE.erl | 63 +++++------------ test/emqx_mqtt_packet_SUITE.erl | 117 ++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 47 deletions(-) create mode 100644 test/emqx_mqtt_packet_SUITE.erl diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index a56af1a8a..a08305a30 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -17,8 +17,6 @@ -compile(export_all). -compile(nowarn_export_all). --include("emqx_mqtt.hrl"). - -define(APP, emqx). -include_lib("eunit/include/eunit.hrl"). @@ -52,9 +50,7 @@ -define(PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, <<"publish">>)). all() -> - [{group, connect}%, - % {group, cleanSession} - ]. + [{group, connect}]. groups() -> [{connect, [non_parallel_tests], @@ -64,11 +60,7 @@ groups() -> mqtt_connect_with_ssl_oneway, mqtt_connect_with_ssl_twoway, mqtt_connect_with_ws - ]}, - {cleanSession, [sequence], - [cleanSession_validate] - } - ]. + ]}]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -109,15 +101,17 @@ mqtt_connect_with_ssl_oneway(_) -> emqx_ct_broker_helpers:change_opts(ssl_oneway), emqx:start(), ClientSsl = emqx_ct_broker_helpers:client_ssl(), - {ok, #ssl_socket{tcp = Sock, ssl = SslSock}} + {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), -%% Packet = raw_send_serialise(?CLIENT), -%% ssl:send(SslSock, Packet), -%% receive Data -> -%% ct:log("Data:~p~n", [Data]) -%% after 30000 -> -%% ok -%% end, + Packet = raw_send_serialise(?CLIENT), + emqx_client_sock:setopts(Sock, [{active, once}]), + emqx_client_sock:send(Sock, Packet), + ?assert( + receive {ssl, _, ConAck}-> + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(ConAck), true + after 1000 -> + false + end), ssl:close(SslSock). mqtt_connect_with_ssl_twoway(_Config) -> @@ -131,11 +125,12 @@ mqtt_connect_with_ssl_twoway(_Config) -> emqx_client_sock:setopts(Sock, [{active, once}]), emqx_client_sock:send(Sock, Packet), timer:sleep(500), + ?assert( receive {ssl, _, Data}-> - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data) + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), true after 1000 -> - ok - end, + false + end), emqx_client_sock:close(Sock). mqtt_connect_with_ws(_Config) -> @@ -162,32 +157,6 @@ mqtt_connect_with_ws(_Config) -> {close, _} = rfc6455_client:close(WS), ok. -cleanSession_validate(_) -> - {ok, C1} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), - timer:sleep(10), - emqttc:subscribe(C1, <<"topic">>, qos0), - emqttc:disconnect(C1), - {ok, Pub} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"pub">>}]), - - emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 0}]), - timer:sleep(10), - {ok, C11} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), - timer:sleep(100), - receive {publish, _Topic, M1} -> - ?assertEqual(<<"m1">>, M1) - after 1000 -> false - end, - emqttc:disconnect(Pub), - emqttc:disconnect(C11). - raw_send_serialise(Packet) -> emqx_frame:serialize(Packet). diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl new file mode 100644 index 000000000..8bc41cb37 --- /dev/null +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -0,0 +1,117 @@ +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== + +-module(emqx_mqtt_packet_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-import(emqx_frame, [serialize/1]). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-define(INVALID_RESERVED, 1). + +-define(CONNECT_INVALID_PACKET(Var), + #mqtt_packet{header = #mqtt_packet_header{type = ?INVALID_RESERVED}, + variable = Var}). + +-define(CASE1_PROTOCOL_NAME, ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_name = <<"MQTC">>, + client_id = <<"mqtt_protocol_name">>, + username = <<"admin">>, + password = <<"public">>})). + +-define(CASE2_PROTOCAL_VER, ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + proto_ver = 6, + username = <<"admin">>, + password = <<"public">>})). + +-define(CASE3_PROTOCAL_INVALID_RESERVED, ?CONNECT_INVALID_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + proto_ver = 5, + username = <<"admin">>, + password = <<"public">>})). + +-define(PROTOCOL5, ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_ver = 5, + keepalive = 60, + properties = #{'Message-Expiry-Interval' => 3600}, + client_id = <<"mqtt_client">>, + will_topic = <<"will_tipic">>, + will_payload = <<"will message">>, + username = <<"admin">>, + password = <<"public">>})). + + + +all() -> [{group, connect}]. + +groups() -> [{connect, [sequence], + [case1_protocol_name, + case2_protocol_ver%, + %TOTO case3_invalid_reserved + ]}]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +case1_protocol_name(_) -> + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + MqttPacket = serialize(?CASE1_PROTOCOL_NAME), + emqx_client_sock:send(Sock, MqttPacket), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + Disconnect = gen_tcp:recv(Sock, 0), + ?assertEqual({error, closed}, Disconnect). + +case2_protocol_ver(_) -> + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = serialize(?CASE2_PROTOCAL_VER), + emqx_client_sock:send(Sock, Packet), + {ok, Data} = gen_tcp:recv(Sock, 0), + %% case1 Unacceptable protocol version + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + Disconnect = gen_tcp:recv(Sock, 0), + ?assertEqual({error, closed}, Disconnect). + +case3_invalid_reserved(_) -> + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = serialize(?CASE3_PROTOCAL_INVALID_RESERVED), + emqx_client_sock:send(Sock, Packet), + {ok, Data} = gen_tcp:recv(Sock, 0), + %% case1 Unacceptable protocol version + ct:log("Data:~p~n", [raw_recv_pase(Data)]), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + Disconnect = gen_tcp:recv(Sock, 0), + ?assertEqual({error, closed}, Disconnect). + +raw_recv_pase(P) -> + emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4} }). From 5774ba542c67b7be0ebd086858ff3122ecf987a0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 18:10:01 +0800 Subject: [PATCH 14/21] Rename the emqx_mqtt_properties SUITE to emqx_mqtt_props --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 7202915cc..e12a516b1 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ EUNIT_OPTS = verbose CT_SUITES = emqx emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ - emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ + emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone \ emqx_mountpoint emqx_listeners emqx_protocol From 328d035dab7240d5a305a5024e4a4cfd2495c9e2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 18:43:47 +0800 Subject: [PATCH 15/21] Replace 'state' record with map --- src/emqx_pool.erl | 18 ++++++++---------- src/emqx_pool_sup.erl | 8 ++++++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 8d927ddd9..276352797 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -23,16 +23,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id}). - -define(POOL, ?MODULE). %% @doc Start pooler supervisor. start_link() -> emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}). -%% @doc Start pool --spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). +%% @doc Start pool. +-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). @@ -49,13 +47,13 @@ async_submit(Fun) -> worker() -> gproc_pool:pick_worker(pool). -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id}}. + {ok, #{pool => Pool, id => Id}}. handle_call({submit, Fun}, _From, State) -> {reply, catch run(Fun), State}; @@ -79,15 +77,15 @@ handle_info(Info, State) -> emqx_logger:error("[Pool] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> +terminate(_Reason, #{pool := Pool, id := Id}) -> true = gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ run({M, F, A}) -> erlang:apply(M, F, A); diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index b71c15f1e..b371549c0 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -26,8 +26,12 @@ spec(Args) -> -spec(spec(any(), list()) -> supervisor:child_spec()). spec(ChildId, Args) -> - {ChildId, {?MODULE, start_link, Args}, - transient, infinity, supervisor, [?MODULE]}. + #{id => ChildId, + start => {?MODULE, start_link, Args}, + restart => transient, + shutdown => infinity, + type => supervisor, + modules => [?MODULE]}. -spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, MFA) -> From 765ab5ad7b78450adae749266f24d6a50b50ecaa Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 6 Sep 2018 19:09:29 +0800 Subject: [PATCH 16/21] Add condition to handle when mqx_topic_alias do not exist --- src/emqx_mqtt_caps.erl | 2 ++ src/emqx_protocol.erl | 60 +++++++++++++++++------------------ test/emqx_mqtt_caps_SUITE.erl | 32 ++++++++++++------- 3 files changed, 52 insertions(+), 42 deletions(-) diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index baec9920c..b8b7a5b3a 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -69,6 +69,8 @@ do_check_pub(Props = #{ topic_alias := TopicAlias}, [{max_topic_alias, MaxTopicA end; do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; +do_check_pub(Props, [{max_topic_alias, _} | Caps]) -> + do_check_pub(Props, Caps); do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) -> do_check_pub(Props, Caps). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 025fe9c93..38f55d204 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -33,36 +33,36 @@ -export([shutdown/2]). -record(pstate, { - zone, - sendfun, - peername, - peercert, - proto_ver, - proto_name, - ackprops, - client_id, - is_assigned, - conn_pid, - conn_props, - ack_props, - username, - session, - clean_start, - topic_aliases, - packet_size, - will_topic, - will_msg, - keepalive, - mountpoint, - is_super, - is_bridge, - enable_ban, - enable_acl, - recv_stats, - send_stats, - connected, - connected_at - }). + zone, + sendfun, + peername, + peercert, + proto_ver, + proto_name, + ackprops, + client_id, + is_assigned, + conn_pid, + conn_props, + ack_props, + username, + session, + clean_start, + topic_aliases, + packet_size, + will_topic, + will_msg, + keepalive, + mountpoint, + is_super, + is_bridge, + enable_ban, + enable_acl, + recv_stats, + send_stats, + connected, + connected_at + }). -type(state() :: #pstate{}). -export_type([state/0]). diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 8b840b91c..1d4c81b8d 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -38,7 +38,7 @@ t_get_set_caps(_) -> mqtt_wildcard_subscription => true }, Caps2 = Caps#{max_packet_size => 1048576}, - case emqx_mqtt_caps:get_caps(zone) of + case emqx_mqtt_caps:get_caps(zone) of Caps -> ok; Caps2 -> ok end, @@ -63,20 +63,28 @@ t_check_pub(_) -> {ok, _} = emqx_zone:start_link(), PubCaps = #{ max_qos_allowed => ?QOS_1, - mqtt_retain_available => false + mqtt_retain_available => false, + max_topic_alias => 4 }, emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), timer:sleep(100), + ct:log("~p", [emqx_mqtt_caps:get_caps(zone, publish)]), BadPubProps1 = #{ qos => ?QOS_2, retain => false - }, + }, {error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps1), BadPubProps2 = #{ qos => ?QOS_1, retain => true - }, + }, {error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps2), + BadPubProps3 = #{ + qos => ?QOS_1, + retain => false, + topic_alias => 5 + }, + {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_mqtt_caps:check_pub(zone, BadPubProps3), PubProps = #{ qos => ?QOS_1, retain => false @@ -94,18 +102,18 @@ t_check_sub(_) -> mqtt_wildcard_subscription => true }, - ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]), + ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]), ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]), - ok = do_check_sub(Caps#{max_topic_levels => 1}, - [{<<"client/stat">>, Opts}], + ok = do_check_sub(Caps#{max_topic_levels => 1}, + [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{rc => ?RC_TOPIC_FILTER_INVALID}}]), - ok = do_check_sub(Caps#{mqtt_shared_subscription => false}, - [{<<"client/stat">>, Opts}], + ok = do_check_sub(Caps#{mqtt_shared_subscription => false}, + [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]), - ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, - [{<<"vlient/+/dsofi">>, Opts}], + ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, + [{<<"vlient/+/dsofi">>, Opts}], [{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]). - + From d9ad29476a60873c7cb74e153b12f55fd5c99aaa Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 7 Sep 2018 10:23:43 +0800 Subject: [PATCH 17/21] Code Review: Update the zone module 1. Add force_reload/1 API 2. Change the default reload interval to 5 minutes --- src/emqx_zone.erl | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 209f0323c..8344ba150 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -26,10 +26,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {timer}). - -define(TAB, ?MODULE). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -58,7 +57,7 @@ set_env(Zone, Key, Val) -> init([]) -> _ = emqx_tables:new(?TAB, [set, {read_concurrency, true}]), - {ok, element(2, handle_info(reload, #state{}))}. + {ok, element(2, handle_info(reload, #{timer => undefined}))}. handle_call(Req, _From, State) -> emqx_logger:error("[Zone] unexpected call: ~p", [Req]), @@ -72,11 +71,9 @@ handle_cast(Msg, State) -> emqx_logger:error("[Zone] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info(reload, State) -> - lists:foreach( - fun({Zone, Opts}) -> - [ets:insert(?TAB, {{Zone, Key}, Val}) || {Key, Val} <- Opts] - end, emqx_config:get_env(zones, [])), +handle_info({timeout, TRef, reload}, State = #{timer := TRef}) -> + [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts]) + || {Zone, Opts} <- emqx_config:get_env(zones, [])], {noreply, ensure_reload_timer(State), hibernate}; handle_info(Info, State) -> @@ -94,5 +91,5 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ ensure_reload_timer(State) -> - State#state{timer = erlang:send_after(10000, self(), reload)}. + State#{timer := emqx_misc:start_timer(timer:minutes(5), reload)}. From 304a24ca6a0a8f0d2c6b6e035d69e1d5e2c6e7f3 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 7 Sep 2018 10:26:14 +0800 Subject: [PATCH 18/21] Code Review: Update the zone module 1. Add force_reload/0 management API 2. Change the reload interval to 5 minutes --- src/emqx_zone.erl | 37 +++++++++++++++++++++++++++++-------- test/emqx_zone_SUITE.erl | 14 ++++++++++---- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 8344ba150..dd183dbdf 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -21,16 +21,20 @@ -export([start_link/0]). -export([get_env/2, get_env/3]). -export([set_env/3]). +-export([force_reload/0]). +%% for test +-export([stop/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(TAB, ?MODULE). +-define(SERVER, ?MODULE). -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec(get_env(emqx_types:zone() | undefined, atom()) -> undefined | term()). get_env(undefined, Key) -> @@ -49,7 +53,15 @@ get_env(Zone, Key, Def) -> -spec(set_env(emqx_types:zone(), atom(), term()) -> ok). set_env(Zone, Key, Val) -> - gen_server:cast(?MODULE, {set_env, Zone, Key, Val}). + gen_server:cast(?SERVER, {set_env, Zone, Key, Val}). + +-spec(force_reload() -> ok). +force_reload() -> + gen_server:call(?SERVER, force_reload). + +-spec(stop() -> ok). +stop() -> + gen_server:stop(?SERVER, normal, infinity). %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -59,6 +71,10 @@ init([]) -> _ = emqx_tables:new(?TAB, [set, {read_concurrency, true}]), {ok, element(2, handle_info(reload, #{timer => undefined}))}. +handle_call(force_reload, _From, State) -> + _ = do_reload(), + {reply, ok, State}; + handle_call(Req, _From, State) -> emqx_logger:error("[Zone] unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -71,10 +87,9 @@ handle_cast(Msg, State) -> emqx_logger:error("[Zone] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({timeout, TRef, reload}, State = #{timer := TRef}) -> - [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts]) - || {Zone, Opts} <- emqx_config:get_env(zones, [])], - {noreply, ensure_reload_timer(State), hibernate}; +handle_info(reload, State) -> + _ = do_reload(), + {noreply, ensure_reload_timer(State#{timer := undefined}), hibernate}; handle_info(Info, State) -> emqx_logger:error("[Zone] unexpected info: ~p", [Info]), @@ -90,6 +105,12 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -ensure_reload_timer(State) -> - State#{timer := emqx_misc:start_timer(timer:minutes(5), reload)}. +do_reload() -> + [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts]) + || {Zone, Opts} <- emqx_config:get_env(zones, [])]. + +ensure_reload_timer(State = #{timer := undefined}) -> + State#{timer := erlang:send_after(timer:minutes(5), self(), reload)}; +ensure_reload_timer(State) -> + State. diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index 282acc3e5..83c2ceaab 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -18,15 +18,21 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). all() -> [t_set_get_env]. t_set_get_env(_) -> - emqx_zone:start_link(), - ok = emqx_zone:set_env(china, language, chinese), - timer:sleep(100), % make sure set_env/3 is okay + application:set_env(emqx, zones, [{china, [{language, chinese}]}]), + {ok, _} = emqx_zone:start_link(), + ct:print("~p~n", [ets:tab2list(emqx_zone)]), chinese = emqx_zone:get_env(china, language), cn470 = emqx_zone:get_env(china, ism_band, cn470), undefined = emqx_zone:get_env(undefined, delay), - 500 = emqx_zone:get_env(undefined, delay, 500). + 500 = emqx_zone:get_env(undefined, delay, 500), + application:set_env(emqx, zones, [{zone1, [{key, val}]}]), + ?assertEqual(undefined, emqx_zone:get_env(zone1, key)), + emqx_zone:force_reload(), + ?assertEqual(val, emqx_zone:get_env(zone1, key)), + emqx_zone:stop(). From 3a94d7ddaec62cec1c019342f86eb3c0f1c0deed Mon Sep 17 00:00:00 2001 From: spring2maz Date: Thu, 6 Sep 2018 22:41:17 +0200 Subject: [PATCH 19/21] Generate a config file for testing Prior to this change, the template file etc/emqx.conf is used directly in testing, as a result, mustache style directories are created e.g. `{{ platform_log_dir }}` which should have been replaced with a config varialbe e.g. `log` In this change, Makefile targets are added as `ct` dependency to download bbmustach, load the template input, replace with variableds defined in 'vars' file, finally to etc/gen.emqx.conf. The direct usage of etc/emqx.conf in test code are replaced with gen.emqx.conf --- .gitignore | 2 ++ Makefile | 27 +++++++++++++++++++++++++-- test/emqx_ct_broker_helpers.erl | 2 +- test/emqx_listeners_SUITE.erl | 2 +- vars | 9 +++++++++ 5 files changed, 38 insertions(+), 4 deletions(-) create mode 100644 vars diff --git a/.gitignore b/.gitignore index d1b8a289e..0322aaddc 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,5 @@ rebar3.crashdump .DS_Store rebar.config emqx.iml +bbmustache/ +etc/gen.emqx.conf diff --git a/Makefile b/Makefile index e12a516b1..452d7c57d 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,29 @@ DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wun include erlang.mk -app.config:: - ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emqx.conf -i priv/emqx.schema -d data/ +clean:: gen-clean + +.PHONY: gen-clean +gen-clean: + @rm -rf bbmustache + @rm -f etc/gen.emqx.conf + +bbmustache: + $(verbose) git clone https://github.com/soranoba/bbmustache.git && pushd bbmustache && ./rebar3 compile && popd + +# This hack is to generate a conf file for testing +# relx overlay is used for release +etc/gen.emqx.conf: bbmustache etc/emqx.conf + $(verbose) erl -noshell -pa bbmustache/_build/default/lib/bbmustache/ebin -eval \ + "{ok, Temp} = file:read_file('etc/emqx.conf'), \ + {ok, Vars0} = file:consult('vars'), \ + Vars = [{atom_to_list(N), list_to_binary(V)} || {N, V} <- Vars0], \ + Targ = bbmustache:render(Temp, Vars), \ + ok = file:write_file('etc/gen.emqx.conf', Targ), \ + halt(0)." + +app.config: etc/gen.emqx.conf + $(verbose) ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/ + +ct: app.config diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index ba1883ecc..62a91df54 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -63,7 +63,7 @@ run_teardown_steps() -> generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), - Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]), + Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), cuttlefish_generator:map(Schema, Conf). get_base_dir(Module) -> diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index 6086e98c2..9d85583ab 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -49,7 +49,7 @@ restart_listeners(_) -> generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), - Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]), + Conf = conf_parse:file([local_path(["etc", "gen.emqx.conf"])]), cuttlefish_generator:map(Schema, Conf). set_app_env({App, Lists}) -> diff --git a/vars b/vars new file mode 100644 index 000000000..fedd69a45 --- /dev/null +++ b/vars @@ -0,0 +1,9 @@ +%% vars here are for test only, not intended for release + +{platform_bin_dir, "bin"}. +{platform_data_dir, "data"}. +{platform_etc_dir, "etc"}. +{platform_lib_dir, "lib"}. +{platform_log_dir, "log"}. +{platform_plugins_dir, "plugins"}. + From dd8513ad35bd24a02f37eac2c2103322b52eb25b Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 7 Sep 2018 14:10:16 +0800 Subject: [PATCH 20/21] Update for banned API Use `mnesia:foldl` to traverse mnesia rather than `mnesia:first` and `mnesia:next`, as a badarg exception would occur if the record was deleted while travering the whole table. --- include/emqx.hrl | 15 +++++++++------ src/emqx_banned.erl | 21 ++++++--------------- test/emqx_banned_SUITE.erl | 10 +++++----- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/include/emqx.hrl b/include/emqx.hrl index 34e41b0a1..bca6fe519 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -148,13 +148,16 @@ %%-------------------------------------------------------------------- %% Banned %%-------------------------------------------------------------------- +-type(banned_who() :: {client_id, binary()} + | {username, binary()} + | {ip_address, inet:ip_address()}). -record(banned, { - key, - reason, - by, - desc, - until}). + who :: banned_who(), + reason :: binary(), + by :: binary(), + desc :: binary(), + until :: integer() + }). -endif. - diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 444f07dad..8f1c3156f 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -85,7 +85,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]), + mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> @@ -106,17 +106,8 @@ ensure_expiry_timer(State) -> State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}. expire_banned_items(Now) -> - expire_banned_item(mnesia:first(?TAB), Now). - -expire_banned_item('$end_of_table', _Now) -> - ok; -expire_banned_item(Key, Now) -> - case mnesia:read(?TAB, Key) of - [#banned{until = undefined}] -> - ok; - [B = #banned{until = Until}] when Until < Now -> - mnesia:delete_object(?TAB, B, sticky_write); - _ -> ok - end, - expire_banned_item(mnesia:next(?TAB, Key), Now). - + mnesia:foldl(fun + (B = #banned{until = Until}, _Acc) when Until < Now -> + mnesia:delete_object(?TAB, B, sticky_write); + (_, _Acc) -> ok + end, ok, ?TAB). diff --git a/test/emqx_banned_SUITE.erl b/test/emqx_banned_SUITE.erl index 9fae880d4..c8eab87b6 100644 --- a/test/emqx_banned_SUITE.erl +++ b/test/emqx_banned_SUITE.erl @@ -28,14 +28,14 @@ all() -> [t_banned_all]. t_banned_all(_) -> emqx_ct_broker_helpers:run_setup_steps(), emqx_banned:start_link(), - {MegaSecs, Secs, MicroSecs} = erlang:timestamp(), - ok = emqx_banned:add(#banned{key = {client_id, <<"TestClient">>}, + TimeNow = erlang:system_time(second), + ok = emqx_banned:add(#banned{who = {client_id, <<"TestClient">>}, reason = <<"test">>, by = <<"banned suite">>, - desc = <<"test">>, - until = {MegaSecs, Secs + 10, MicroSecs}}), + desc = <<"test">>, + until = TimeNow + 10}), % here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed timer:sleep(100), ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), emqx_banned:del({client_id, <<"TestClient">>}), - ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})). \ No newline at end of file + ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})). From 1c5615c957573f84e1813c5dc1ae0862f1cc1ed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 7 Sep 2018 17:22:24 +0800 Subject: [PATCH 21/21] Stop emqx_zone when emqx_mqtt_caps test over --- test/emqx_mqtt_caps_SUITE.erl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 8b840b91c..85f6fae1d 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -57,7 +57,8 @@ t_get_set_caps(_) -> mqtt_shared_subscription => true, mqtt_wildcard_subscription => true }, - SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe). + SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe), + emqx_zone:stop(). t_check_pub(_) -> {ok, _} = emqx_zone:start_link(), @@ -81,7 +82,8 @@ t_check_pub(_) -> qos => ?QOS_1, retain => false }, - ok = emqx_mqtt_caps:check_pub(zone, PubProps). + ok = emqx_mqtt_caps:check_pub(zone, PubProps), + emqx_zone:stop(). t_check_sub(_) -> {ok, _} = emqx_zone:start_link(), @@ -104,7 +106,8 @@ t_check_sub(_) -> [{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]), ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, [{<<"vlient/+/dsofi">>, Opts}], - [{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]). + [{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]), + emqx_zone:stop().