From c89079261393f9a94599c9c3367ab0a8d21cc590 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Wed, 17 Oct 2018 17:24:05 +0800 Subject: [PATCH 1/8] Fix the init_proc_mng_policy bug Prior to this change, when the plugin like emqx_sn_gateway which has no zone run the init_proc_mng_policy function, it would trigger error and application crash. This change add a case to avoid crash. --- src/emqx_misc.erl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 03c42510c..852113a8c 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -63,8 +63,14 @@ proc_stats(Pid) -> -define(DISABLED, 0). init_proc_mng_policy(Zone) -> - #{max_heap_size := MaxHeapSizeInBytes} = ShutdownPolicy = - emqx_zone:get_env(Zone, force_shutdown_policy), + #{max_heap_size := MaxHeapSizeInBytes} + = ShutdownPolicy + = case Zone of + undefined -> + #{max_heap_size => 0}; + _ -> + emqx_zone:get_env(Zone, force_shutdown_policy) + end, MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize), _ = erlang:process_flag(max_heap_size, MaxHeapSize), % zero is discarded erlang:put(force_shutdown_policy, ShutdownPolicy), @@ -106,4 +112,3 @@ is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED. proc_info(Key) -> {Key, Value} = erlang:process_info(self(), Key), Value. - From 599121052acdc3b24e74877dfba3fc572f4ee1f2 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 18 Oct 2018 10:41:01 +0800 Subject: [PATCH 2/8] Add test case for init_proc_mng_policy --- test/emqx_misc_SUITE.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl index 766691869..87f51746a 100644 --- a/test/emqx_misc_SUITE.erl +++ b/test/emqx_misc_SUITE.erl @@ -25,7 +25,7 @@ {backlog, 512}, {nodelay, true}]). -all() -> [t_merge_opts]. +all() -> [t_merge_opts, t_init_proc_mng_policy]. t_merge_opts(_) -> Opts = emqx_misc:merge_opts(?SOCKOPTS, [raw, @@ -43,3 +43,10 @@ t_merge_opts(_) -> {nodelay, false}, {packet, raw}, {reuseaddr, true}] = lists:sort(Opts). + +t_init_proc_mng_policy(_) -> + application:set_env(emqx, zones, [{policy, [{force_shutdown_policy, #{max_heap_size => 1}}]}]), + {ok, _} = emqx_zone:start_link(), + ok = emqx_misc:init_proc_mng_policy(policy), + ok = emqx_misc:init_proc_mng_policy(undefined), + emqx_zone:stop(). From a748e8f1d8a0ebcefcaccf736057fa0659a1be44 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 18 Oct 2018 13:24:06 +0800 Subject: [PATCH 3/8] Refactor send_fun in protocol and other connection module Prior to this change, in the send function, the packet is forced to use emqx:serialize to serialize packet, it is a wrong design because other plugins which need to transform the mqtt packet to other packets can not use their own serialize function to serialize packet. This change solve the problem issued above. --- src/emqx_connection.erl | 4 ++-- src/emqx_frame.erl | 3 +-- src/emqx_protocol.erl | 22 +++++++++++----------- src/emqx_ws_connection.erl | 4 ++-- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ccb5f59fa..37665ac10 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -165,7 +165,8 @@ init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). send_fun(Transport, Socket, Peername) -> - fun(Data) -> + fun(Serialize, Packet, Options) -> + Data = Serialize(Packet, Options), try Transport:async_send(Socket, Data) of ok -> ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), @@ -408,4 +409,3 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> ok = emqx_gc:inc(1, Oct); maybe_gc(_, _) -> ok. - diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index aa7aad064..075f0a11e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -130,7 +130,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> WillQoS : 2, WillFlag : 1, CleanStart : 1, - 0 : 1, + 0 : 1, KeepAlive : 16/big, Rest2/binary>> = Rest1, @@ -634,4 +634,3 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 1fb80c0ce..93a1a8b64 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -407,13 +407,13 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), +process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> case Interval =/= 0 andalso OldInterval =:= 0 of - true -> + true -> deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), {error, protocol_error, PState#pstate{will_msg = undefined}}; - false -> + false -> emqx_session:update_expiry_interval(SPid, Interval), %% Clean willmsg {stop, normal, PState#pstate{will_msg = undefined}} @@ -495,13 +495,13 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, 'Subscription-Identifier-Available' => 1, 'Shared-Subscription-Available' => flag(Shared)}, - Props1 = if - MaxQoS =:= ?QOS_2 -> + Props1 = if + MaxQoS =:= ?QOS_2 -> Props; true -> maps:put('Maximum-QoS', MaxQoS, Props) end, - + Props2 = if IsAssigned -> Props1#{'Assigned-Client-Identifier' => ClientId}; true -> Props1 @@ -555,7 +555,7 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> trace(send, Packet, PState), - case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of + case SendFun(fun emqx_frame:serialize/2, Packet, #{version => Ver}) of ok -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; @@ -601,17 +601,17 @@ set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn maps:put(max_inflight, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Receive-Maximum', ConnProps, 65535); - true -> + true -> emqx_zone:get_env(Zone, max_inflight, 65535) end, SessAttrs); set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> maps:put(expiry_interval, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Session-Expiry-Interval', ConnProps, 0); - true -> + true -> case CleanStart of true -> 0; - false -> + false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) end end, SessAttrs); @@ -619,7 +619,7 @@ set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVe maps:put(topic_alias_maximum, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Topic-Alias-Maximum', ConnProps, 0); - true -> + true -> emqx_zone:get_env(Zone, max_topic_alias, 0) end, SessAttrs); set_session_attrs({_, #pstate{}}, SessAttrs) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index fa08fa1bb..1907695af 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -144,7 +144,8 @@ websocket_init(#state{request = Req, options = Options}) -> idle_timeout = IdleTimout}}. send_fun(WsPid) -> - fun(Data) -> + fun(Serialize, Packet, Options) -> + Data = Serialize(Packet, Options), BinSize = iolist_size(Data), emqx_metrics:inc('bytes/sent', BinSize), put(send_oct, get(send_oct) + BinSize), @@ -299,4 +300,3 @@ stop(Error, State) -> wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. - From df713959ab3c06983c1687d6ad5a2eed6c684fa0 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 18 Oct 2018 13:57:27 +0800 Subject: [PATCH 4/8] Refactor init_proc_mng_policy. If there is no zone, it is unnecessary to add proc_mng_policy. --- src/emqx_misc.erl | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 852113a8c..cf4a555ca 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -62,15 +62,11 @@ proc_stats(Pid) -> -define(DISABLED, 0). +init_proc_mng_policy(undefined) -> ok; init_proc_mng_policy(Zone) -> #{max_heap_size := MaxHeapSizeInBytes} = ShutdownPolicy - = case Zone of - undefined -> - #{max_heap_size => 0}; - _ -> - emqx_zone:get_env(Zone, force_shutdown_policy) - end, + = emqx_zone:get_env(Zone, force_shutdown_policy), MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize), _ = erlang:process_flag(max_heap_size, MaxHeapSize), % zero is discarded erlang:put(force_shutdown_policy, ShutdownPolicy), From 387f2468c0fc509d828545db6f26cf773b5f93c0 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 18 Oct 2018 15:01:45 +0800 Subject: [PATCH 5/8] Add SendFun case for emqx-sn --- src/emqx_protocol.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 93a1a8b64..3638e94e2 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -562,6 +562,9 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun {binary, _Data} -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; + {datagram, _Peer, _Data} -> + emqx_metrics:sent(Packet), + {ok, inc_stats(send, Type, PState)}; {error, Reason} -> {error, Reason} end. From d10edfe025cfcd84fba4fb6dc1cde85b57d5faf7 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 19 Oct 2018 16:05:59 +0800 Subject: [PATCH 6/8] Resolve merge conflict --- test/emqx_misc_SUITE.erl | 52 ---------------------------------------- 1 file changed, 52 deletions(-) delete mode 100644 test/emqx_misc_SUITE.erl diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl deleted file mode 100644 index 87f51746a..000000000 --- a/test/emqx_misc_SUITE.erl +++ /dev/null @@ -1,52 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_misc_SUITE). - --include_lib("eunit/include/eunit.hrl"). - --compile(export_all). --compile(nowarn_export_all). - --define(SOCKOPTS, [binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 512}, - {nodelay, true}]). - -all() -> [t_merge_opts, t_init_proc_mng_policy]. - -t_merge_opts(_) -> - Opts = emqx_misc:merge_opts(?SOCKOPTS, [raw, - binary, - {backlog, 1024}, - {nodelay, false}, - {max_clients, 1024}, - {acceptors, 16}]), - ?assertEqual(1024, proplists:get_value(backlog, Opts)), - ?assertEqual(1024, proplists:get_value(max_clients, Opts)), - [binary, raw, - {acceptors, 16}, - {backlog, 1024}, - {max_clients, 1024}, - {nodelay, false}, - {packet, raw}, - {reuseaddr, true}] = lists:sort(Opts). - -t_init_proc_mng_policy(_) -> - application:set_env(emqx, zones, [{policy, [{force_shutdown_policy, #{max_heap_size => 1}}]}]), - {ok, _} = emqx_zone:start_link(), - ok = emqx_misc:init_proc_mng_policy(policy), - ok = emqx_misc:init_proc_mng_policy(undefined), - emqx_zone:stop(). From 35460d8227c2362e3b5a8735007557df73db9529 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Mon, 22 Oct 2018 09:04:03 +0800 Subject: [PATCH 7/8] Refactor send_fun --- Makefile | 3 +-- src/emqx_connection.erl | 4 ++-- src/emqx_protocol.erl | 2 +- src/emqx_ws_connection.erl | 4 ++-- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 368aff6a9..01ca4885a 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ EUNIT_OPTS = verbose ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ - emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ + emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub @@ -138,4 +138,3 @@ dep-vsn-check: {[], []} -> halt(0); \ {Rebar, Mk} -> erlang:error({deps_version_discrepancy, [{rebar, Rebar}, {mk, Mk}]}) \ end." - diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 37665ac10..6c3d4c7e6 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -165,8 +165,8 @@ init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). send_fun(Transport, Socket, Peername) -> - fun(Serialize, Packet, Options) -> - Data = Serialize(Packet, Options), + fun(Packet, Options) -> + Data = emqx_frame:serialize(Packet, Options), try Transport:async_send(Socket, Data) of ok -> ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 3638e94e2..2c128810b 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -555,7 +555,7 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> trace(send, Packet, PState), - case SendFun(fun emqx_frame:serialize/2, Packet, #{version => Ver}) of + case SendFun(Packet, #{version => Ver}) of ok -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 1907695af..407525601 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -144,8 +144,8 @@ websocket_init(#state{request = Req, options = Options}) -> idle_timeout = IdleTimout}}. send_fun(WsPid) -> - fun(Serialize, Packet, Options) -> - Data = Serialize(Packet, Options), + fun(Packet, Options) -> + Data = emqx_frame:serialize(Packet, Options), BinSize = iolist_size(Data), emqx_metrics:inc('bytes/sent', BinSize), put(send_oct, get(send_oct) + BinSize), From 4ea57c2bf95c9331a26e1a823b47589f6b19dea1 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Sat, 3 Nov 2018 22:56:11 +0800 Subject: [PATCH 8/8] Delete redundant case clauses in sendfun Prior to this change, there are ok, {binary, _Data}. {datagram, _Peer, _Dara} case clauses, and the {binary, _Data} and {datagram, _Peer, _Data} are unnecessary cases This change delete these two cases and add ok in the end of funtion in send_fun of emqx_ws_connection. --- src/emqx_protocol.erl | 6 ------ src/emqx_ws_connection.erl | 3 ++- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 3e5db983f..9ce66bfb8 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -593,12 +593,6 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun ok -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; - {binary, _Data} -> - emqx_metrics:sent(Packet), - {ok, inc_stats(send, Type, PState)}; - {datagram, _Peer, _Data} -> - emqx_metrics:sent(Packet), - {ok, inc_stats(send, Type, PState)}; {error, Reason} -> {error, Reason} end. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 407525601..7ff0b55a8 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -150,7 +150,8 @@ send_fun(WsPid) -> emqx_metrics:inc('bytes/sent', BinSize), put(send_oct, get(send_oct) + BinSize), put(send_cnt, get(send_cnt) + 1), - WsPid ! {binary, iolist_to_binary(Data)} + WsPid ! {binary, iolist_to_binary(Data)}, + ok end. stat_fun() ->