From b2c3d8366da17b8829516a7580519136090ac46b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Mon, 3 Dec 2018 13:57:37 +0800 Subject: [PATCH 1/7] Add logs for malformed acl configuration file --- src/emqx_access_rule.erl | 10 +++++++--- src/emqx_acl_internal.erl | 28 ++++++++++++++++++---------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index 3fb6dd7ef..1d3154735 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -34,16 +34,20 @@ -export([match/3]). -define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))). +-define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= publish) orelse (A =:= pubsub))). %% @doc Compile Access Rule. compile({A, all}) when ?ALLOW_DENY(A) -> {A, all}; -compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), is_binary(Topic) -> +compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), ?PUBSUB(Access), is_binary(Topic) -> {A, compile(who, Who), Access, [compile(topic, Topic)]}; -compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) -> - {A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}. +compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A), ?PUBSUB(Access) -> + {A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}; + +compile(Rule) -> + emqx_logger:error("[ACCESS_RULE] Malformed rule: ~p", [Rule]). compile(who, all) -> all; diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index eee7e6c18..383967030 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -46,18 +46,24 @@ all_rules() -> -spec(init([File :: string()]) -> {ok, #{}}). init([File]) -> _ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]), - true = load_rules_from_file(File), + ok = load_rules_from_file(File), {ok, #{acl_file => File}}. load_rules_from_file(AclFile) -> - {ok, Terms} = file:consult(AclFile), - Rules = [emqx_access_rule:compile(Term) || Term <- Terms], - lists:foreach(fun(PubSub) -> - ets:insert(?ACL_RULE_TAB, {PubSub, - lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)}) - end, [publish, subscribe]), - ets:insert(?ACL_RULE_TAB, {all_rules, Terms}). - + case file:consult(AclFile) of + {ok, Terms} -> + Rules = [emqx_access_rule:compile(Term) || Term <- Terms], + lists:foreach(fun(PubSub) -> + ets:insert(?ACL_RULE_TAB, {PubSub, + lists:filter(fun(Rule) -> filter(PubSub, Rule) end, Rules)}) + end, [publish, subscribe]), + ets:insert(?ACL_RULE_TAB, {all_rules, Terms}), + ok; + {error, Reason} -> + emqx_logger:error("[ACL_INTERNAL] Consult failed: ~p", [Reason]), + {error, Reason} + end. + filter(_PubSub, {allow, all}) -> true; filter(_PubSub, {deny, all}) -> @@ -100,9 +106,11 @@ match(Credentials, Topic, [Rule|Rules]) -> -spec(reload_acl(state()) -> ok | {error, term()}). reload_acl(#{acl_file := AclFile}) -> case catch load_rules_from_file(AclFile) of - true -> + ok -> emqx_logger:info("Reload acl_file ~s successfully", [AclFile]), ok; + {error, Error} -> + {error, Error}; {'EXIT', Error} -> {error, Error} end. From 35e699e54ea83df6bbcfa5865ef9a3ff169ff29b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Tue, 4 Dec 2018 16:11:25 +0800 Subject: [PATCH 2/7] Make sure test case of emqx_banned passes --- src/emqx_banned.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 49a698384..aae4f8c7a 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -105,7 +105,7 @@ code_change(_OldVsn, State, _Extra) -> -ifdef(TEST). ensure_expiry_timer(State) -> - State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}. + State#{expiry_timer := emqx_misc:start_timer(timer:seconds(1), expire)}. -else. ensure_expiry_timer(State) -> State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}. From ec2e28977600d131cf3b5eca4e5be5fbc71d3f23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Mon, 10 Dec 2018 11:13:25 +0800 Subject: [PATCH 3/7] Fix crash in emqx_acl_internal:filter/2 --- src/emqx_access_rule.erl | 3 ++- src/emqx_acl_internal.erl | 6 ++++-- test/emqx_access_SUITE.erl | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index 1d3154735..47b20676b 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -47,7 +47,8 @@ compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A), ?PUBSUB(Access) -> {A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}; compile(Rule) -> - emqx_logger:error("[ACCESS_RULE] Malformed rule: ~p", [Rule]). + emqx_logger:error("[ACCESS_RULE] Malformed rule: ~p", [Rule]), + {error, bad_rule}. compile(who, all) -> all; diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 383967030..328993a78 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -60,10 +60,12 @@ load_rules_from_file(AclFile) -> ets:insert(?ACL_RULE_TAB, {all_rules, Terms}), ok; {error, Reason} -> - emqx_logger:error("[ACL_INTERNAL] Consult failed: ~p", [Reason]), + emqx_logger:error("[ACL_INTERNAL] Failed to read ~s: ~p", [AclFile, Reason]), {error, Reason} end. - + +filter(_PubSub, {error, _}) -> + false; filter(_PubSub, {allow, all}) -> true; filter(_PubSub, {deny, all}) -> diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index b49c90d80..5d0bcf049 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -355,7 +355,8 @@ compile_rule(_) -> {deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = compile({deny, all, subscribe, ["$SYS/#", "#"]}), {allow, all} = compile({allow, all}), - {deny, all} = compile({deny, all}). + {deny, all} = compile({deny, all}), + {error, bad_rule} = compile({test, malformed}). match_rule(_) -> User = #{client_id => <<"testClient">>, username => <<"TestUser">>, peername => {{127,0,0,1}, 2948}}, From 52e2c56ce1433b6fc3107e0d20d449f6f1112af3 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 14 Dec 2018 14:58:53 +0800 Subject: [PATCH 4/7] Change default configs for max-connections --- etc/emqx.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 7876be59b..6370e7214 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -197,7 +197,7 @@ node.async_threads = 32 ## Value: Number [1024-134217727] ## ## vm.args: +P Number -node.process_limit = 256000 +node.process_limit = 2048000 ## Sets the maximum number of simultaneously existing ports for this system. ## @@ -206,7 +206,7 @@ node.process_limit = 256000 ## Value: Number [1024-134217727] ## ## vm.args: +Q Number -node.max_ports = 256000 +node.max_ports = 1024000 ## Set the distribution buffer busy limit (dist_buf_busy_limit). ## @@ -879,7 +879,7 @@ listener.tcp.internal.acceptors = 4 ## Maximum number of concurrent MQTT/TCP connections. ## ## Value: Number -listener.tcp.internal.max_connections = 10240000 +listener.tcp.internal.max_connections = 1024000 ## Maximum internal connections per second. ## From 675edf3fab850c99c87123a1b8a970d8f06aea30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Tue, 18 Dec 2018 12:01:41 +0800 Subject: [PATCH 5/7] Fix a bug that will not send a will message in some cases --- src/emqx_session.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 12718b9fc..cb1d04780 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -963,7 +963,8 @@ ensure_expire_timer(State) -> ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Delay-Interval' := WillDelayInterval}}}) -> State#state{will_delay_timer = emqx_misc:start_timer(WillDelayInterval * 1000, will_delay)}; -ensure_will_delay_timer(State) -> +ensure_will_delay_timer(State = #state{will_msg = WillMsg}) -> + send_willmsg(WillMsg), State. ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, From b7a39f25f29b7476540cc8b88f7940dd18ca0772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Tue, 18 Dec 2018 12:02:18 +0800 Subject: [PATCH 6/7] Revert "Fix a bug that will not send a will message in some cases" This reverts commit 675edf3fab850c99c87123a1b8a970d8f06aea30. --- src/emqx_session.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index cb1d04780..12718b9fc 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -963,8 +963,7 @@ ensure_expire_timer(State) -> ensure_will_delay_timer(State = #state{will_msg = #message{headers = #{'Will-Delay-Interval' := WillDelayInterval}}}) -> State#state{will_delay_timer = emqx_misc:start_timer(WillDelayInterval * 1000, will_delay)}; -ensure_will_delay_timer(State = #state{will_msg = WillMsg}) -> - send_willmsg(WillMsg), +ensure_will_delay_timer(State) -> State. ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, From dc06c0beab77f30a6063dc966d6d1ee924913db1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 18 Dec 2018 15:11:04 +0800 Subject: [PATCH 7/7] Remove 'topic_alias_maximum' from session's state --- src/emqx_protocol.erl | 53 +++++++++--------------- src/emqx_session.erl | 95 +++++++++++++++++++------------------------ 2 files changed, 62 insertions(+), 86 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 573b913f7..5751f78fe 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -619,14 +619,12 @@ maybe_use_username_as_clientid(ClientId, undefined, _PState) -> ClientId; maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) -> case emqx_zone:get_env(Zone, use_username_as_clientid, false) of - true -> - Username; - false -> - ClientId + true -> Username; + false -> ClientId end. %%------------------------------------------------------------------------------ -%% Assign a clientid +%% Assign a clientId maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) -> ClientId = emqx_guid:to_base62(emqx_guid:gen()), @@ -650,41 +648,30 @@ try_open_session(PState = #pstate{zone = Zone, clean_start => CleanStart, will_msg => WillMsg }, - SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}, {topic_alias_maximum, PState}]), + SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}]), case emqx_sm:open_session(SessAttrs1) of {ok, SPid} -> {ok, SPid, false}; Other -> Other end. -set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> - maps:put(max_inflight, if - ProtoVer =:= ?MQTT_PROTO_V5 -> - get_property('Receive-Maximum', ConnProps, 65535); - true -> - emqx_zone:get_env(Zone, max_inflight, 65535) - end, SessAttrs); -set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> - maps:put(expiry_interval, if - ProtoVer =:= ?MQTT_PROTO_V5 -> - get_property('Session-Expiry-Interval', ConnProps, 0); - true -> - case CleanStart of - true -> 0; - false -> - emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) - end - end, SessAttrs); -set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> - maps:put(topic_alias_maximum, if - ProtoVer =:= ?MQTT_PROTO_V5 -> - get_property('Topic-Alias-Maximum', ConnProps, 0); - true -> - emqx_zone:get_env(Zone, max_topic_alias, 0) - end, SessAttrs); -set_session_attrs({_, #pstate{}}, SessAttrs) -> - SessAttrs. +set_session_attrs({max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) -> + maps:put(max_inflight, get_property('Receive-Maximum', ConnProps, 65535), SessAttrs); +set_session_attrs({max_inflight, #pstate{zone = Zone}}, SessAttrs) -> + maps:put(max_inflight, emqx_zone:get_env(Zone, max_inflight, 65535), SessAttrs); + +set_session_attrs({expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) -> + maps:put(expiry_interval, get_property('Session-Expiry-Interval', ConnProps, 0), SessAttrs); + +set_session_attrs({expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}}, SessAttrs) -> + maps:put(expiry_interval, case CleanStart of + true -> 0; + false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) + end, SessAttrs); + +set_session_attrs(_, SessAttrs) -> + SessAttrs. authenticate(Credentials, Password) -> case emqx_access_control:authenticate(Credentials, Password) of diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 12718b9fc..fde76f0b6 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -147,8 +147,6 @@ %% Created at created_at :: erlang:timestamp(), - topic_alias_maximum :: pos_integer(), - will_msg :: emqx:message(), will_delay_timer :: reference() | undefined @@ -341,7 +339,6 @@ init([Parent, #{zone := Zone, clean_start := CleanStart, expiry_interval := ExpiryInterval, max_inflight := MaxInflight, - topic_alias_maximum := TopicAliasMaximum, will_msg := WillMsg}]) -> emqx_logger:set_metadata_client_id(ClientId), process_flag(trap_exit, true), @@ -367,7 +364,6 @@ init([Parent, #{zone := Zone, deliver_stats = 0, enqueue_stats = 0, created_at = os:timestamp(), - topic_alias_maximum = TopicAliasMaximum, will_msg = WillMsg }, emqx_sm:register_session(ClientId, attrs(State)), @@ -518,22 +514,23 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight end; %% RESUME: -handle_cast({resume, #{conn_pid := ConnPid, - will_msg := WillMsg, - expiry_interval := SessionExpiryInterval, - max_inflight := MaxInflight, - topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id = ClientId, - conn_pid = OldConnPid, - clean_start = CleanStart, - retry_timer = RetryTimer, - await_rel_timer = AwaitTimer, - expiry_timer = ExpireTimer, - will_delay_timer = WillDelayTimer}) -> +handle_cast({resume, #{conn_pid := ConnPid, + will_msg := WillMsg, + expiry_interval := ExpiryInterval, + max_inflight := MaxInflight}}, + State = #state{client_id = ClientId, + conn_pid = OldConnPid, + clean_start = CleanStart, + retry_timer = RetryTimer, + await_rel_timer = AwaitTimer, + expiry_timer = ExpireTimer, + will_delay_timer = WillDelayTimer}) -> ?LOG(info, "Resumed by connection ~p ", [ConnPid], State), %% Cancel Timers - lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]), + lists:foreach(fun emqx_misc:cancel_timer/1, + [RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]), case kick(ClientId, OldConnPid, ConnPid) of ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State); @@ -542,19 +539,18 @@ handle_cast({resume, #{conn_pid := ConnPid, true = link(ConnPid), - State1 = State#state{conn_pid = ConnPid, - binding = binding(ConnPid), - old_conn_pid = OldConnPid, - clean_start = false, - retry_timer = undefined, - awaiting_rel = #{}, - await_rel_timer = undefined, - expiry_timer = undefined, - expiry_interval = SessionExpiryInterval, - inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), - topic_alias_maximum = TopicAliasMaximum, - will_delay_timer = undefined, - will_msg = WillMsg}, + State1 = State#state{conn_pid = ConnPid, + binding = binding(ConnPid), + old_conn_pid = OldConnPid, + clean_start = false, + retry_timer = undefined, + awaiting_rel = #{}, + await_rel_timer = undefined, + expiry_timer = undefined, + expiry_interval = ExpiryInterval, + inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), + will_delay_timer = undefined, + will_msg = WillMsg}, %% Clean Session: true -> false??? CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), @@ -573,9 +569,10 @@ handle_cast(Msg, State) -> %% Batch dispatch handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> - {noreply, lists:foldl(fun(Msg, NewState) -> - element(2, handle_info({dispatch, Topic, Msg}, NewState)) - end, State, Msgs)}; + noreply(lists:foldl( + fun(Msg, St) -> + element(2, handle_info({dispatch, Topic, Msg}, St)) + end, State, Msgs)); %% Dispatch message handle_info({dispatch, Topic, Msg = #message{}}, State) -> @@ -584,12 +581,11 @@ handle_info({dispatch, Topic, Msg = #message{}}, State) -> %% Require ack, but we do not have connection %% negative ack the message so it can try the next subscriber in the group ok = emqx_shared_sub:nack_no_connection(Msg), - noreply(State); + {noreply, State}; false -> - handle_dispatch(Topic, Msg, State) + noreply(handle_dispatch(Topic, Msg, State)) end; - %% Do nothing if the client has been disconnected. handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) -> noreply(State#state{retry_timer = undefined}); @@ -663,25 +659,17 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -has_connection(#state{conn_pid = Pid}) -> is_pid(Pid) andalso is_process_alive(Pid). +has_connection(#state{conn_pid = Pid}) -> + is_pid(Pid) andalso is_process_alive(Pid). -handle_dispatch(Topic, Msg = #message{headers = Headers}, - State = #state{subscriptions = SubMap, - topic_alias_maximum = TopicAliasMaximum - }) -> - TopicAlias = maps:get('Topic-Alias', Headers, undefined), - if - TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum -> - noreply(case maps:find(Topic, SubMap) of - {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> - run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State); - {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> - run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State); - error -> - dispatch(emqx_message:unset_flag(dup, Msg), State) - end); - true -> - noreply(State) +handle_dispatch(Topic, Msg, State = #state{subscriptions = SubMap}) -> + case maps:find(Topic, SubMap) of + {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State); + {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> + run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State); + error -> + dispatch(emqx_message:unset_flag(dup, Msg), State) end. suback(_From, undefined, _ReasonCodes) -> @@ -1011,3 +999,4 @@ noreply(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. +