From b45bbf676c120708df1d4d0941e5bc8dab922340 Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Fri, 4 Mar 2022 18:06:10 +0800 Subject: [PATCH 01/39] fix(helm): delete needless labels for support helm upgrade --- deploy/charts/emqx/templates/StatefulSet.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/deploy/charts/emqx/templates/StatefulSet.yaml b/deploy/charts/emqx/templates/StatefulSet.yaml index 73ce5ef77..f90c2142f 100644 --- a/deploy/charts/emqx/templates/StatefulSet.yaml +++ b/deploy/charts/emqx/templates/StatefulSet.yaml @@ -27,7 +27,6 @@ spec: namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/name: {{ include "emqx.name" . }} - helm.sh/chart: {{ include "emqx.chart" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/managed-by: {{ .Release.Service }} annotations: From 8057282d7d4bfff0f13f510157b65edf27b60981 Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Wed, 9 Mar 2022 10:04:35 +0800 Subject: [PATCH 02/39] build(helm): add preStop command for container update probe for container --- deploy/charts/emqx/templates/StatefulSet.yaml | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/deploy/charts/emqx/templates/StatefulSet.yaml b/deploy/charts/emqx/templates/StatefulSet.yaml index d593cefb9..992528bbe 100644 --- a/deploy/charts/emqx/templates/StatefulSet.yaml +++ b/deploy/charts/emqx/templates/StatefulSet.yaml @@ -205,8 +205,23 @@ spec: httpGet: path: /status port: {{ .Values.emqxConfig.EMQX_MANAGEMENT__LISTENER__HTTP | default 8081 }} - initialDelaySeconds: 5 + initialDelaySeconds: 10 periodSeconds: 5 + failureThreshold: 30 + livenessProbe: + httpGet: + path: /status + port: {{ .Values.emqxConfig.EMQX_MANAGEMENT__LISTENER__HTTP | default 8081 }} + initialDelaySeconds: 60 + periodSeconds: 30 + failureThreshold: 10 + lifecycle: + preStop: + exec: + command: + - "/opt/emqx/bin/emqx_ctl" + - "cluster" + - "leave" {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} From ca5637b4a3dba8f3599cf2bfcdc7087a2f7a3c23 Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 9 Mar 2022 10:57:29 +0100 Subject: [PATCH 03/39] fix: update mongodb client to include deadloop fix --- apps/emqx_auth_mongo/rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_auth_mongo/rebar.config b/apps/emqx_auth_mongo/rebar.config index 78442c00b..c89c15d3c 100644 --- a/apps/emqx_auth_mongo/rebar.config +++ b/apps/emqx_auth_mongo/rebar.config @@ -1,6 +1,6 @@ {deps, %% NOTE: mind poolboy version when updating mongodb-erlang version - [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}}, + [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.12"}}}, %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git %% (which has overflow_ttl feature added). %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07). From cedeff4dab699ee2f1104272f68b24bc2614b373 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 9 Mar 2022 11:18:02 +0100 Subject: [PATCH 04/39] build(appup): fix module delete instructions when there is a application_restart instruction, there is no need to add module delete instructions --- scripts/update_appup.escript | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 19b546f43..9083b62f1 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -272,8 +272,9 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) -> New = New0 -- AlreadyHandled, Changed = Changed0 -- AlreadyHandled, Deleted = Deleted0 -- AlreadyHandled, + HasRestart = contains_restart_application(App, OldActions), Actions = - case contains_restart_application(App, OldActions) of + case HasRestart of true -> []; false -> @@ -285,7 +286,12 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) -> OldActionsWithStop ++ Actions ++ OldActionsAfterStop ++ - [{delete_module, M} || M <- Deleted] ++ + case HasRestart of + true -> + []; + false -> + [{delete_module, M} || M <- Deleted] + end ++ AppSpecific. %% If an entry restarts an application, there's no need to use From c0b688b51fd147c1df080b679bc820fa659cccd9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 10 Mar 2022 11:23:44 +0800 Subject: [PATCH 05/39] feat(frame): utf-8 string check in `strict_mode` --- src/emqx_frame.erl | 296 +++++++++++++++++++++++++++++---------------- 1 file changed, 192 insertions(+), 104 deletions(-) diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 17daf9809..48e8b71fb 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -221,8 +221,9 @@ packet(Header, Variable) -> packet(Header, Variable, Payload) -> #mqtt_packet{header = Header, variable = Variable, payload = Payload}. -parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> - {ProtoName, Rest} = parse_utf8_string(FrameBin), +parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, + #{strict_mode := StrictMode}) -> + {ProtoName, Rest} = parse_utf8_string(FrameBin, StrictMode), <> = Rest, % Note: Crash when reserved flag doesn't equal to 0, there is no strict % compliance with the MQTT5.0. @@ -236,8 +237,8 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> KeepAlive : 16/big, Rest2/binary>> = Rest1, - {Properties, Rest3} = parse_properties(Rest2, ProtoVer), - {ClientId, Rest4} = parse_utf8_string(Rest3), + {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), + {ClientId, Rest4} = parse_utf8_string(Rest3, StrictMode), ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, proto_ver = ProtoVer, is_bridge = (BridgeTag =:= 8), @@ -249,14 +250,14 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> properties = Properties, clientid = ClientId }, - {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4), - {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)), - {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)), + {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4, StrictMode), + {Username, Rest6} = parse_utf8_string(Rest5, StrictMode, bool(UsernameFlag)), + {Passsword, <<>>} = parse_utf8_string(Rest6, StrictMode, bool(PasswordFlag)), ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword}; -parse_packet(#mqtt_packet_header{type = ?CONNACK}, - <>, #{version := Ver}) -> - {Properties, <<>>} = parse_properties(Rest, Ver), +parse_packet(#mqtt_packet_header{type = ?CONNACK}, <>, + #{strict_mode := StrictMode, version := Ver}) -> + {Properties, <<>>} = parse_properties(Rest, Ver, StrictMode), #mqtt_packet_connack{ack_flags = AckFlags, reason_code = ReasonCode, properties = Properties @@ -264,21 +265,22 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK}, parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, #{strict_mode := StrictMode, version := Ver}) -> - {TopicName, Rest} = parse_utf8_string(Bin), + {TopicName, Rest} = parse_utf8_string(Bin, StrictMode), {PacketId, Rest1} = case QoS of ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) end, (PacketId =/= undefined) andalso StrictMode andalso validate_packet_id(PacketId), - {Properties, Payload} = parse_properties(Rest1, Ver), + {Properties, Payload} = parse_properties(Rest1, Ver, StrictMode), Publish = #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, properties = Properties }, {Publish, Payload}; -parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{strict_mode := StrictMode}) +parse_packet(#mqtt_packet_header{type = PubAck}, <>, + #{strict_mode := StrictMode}) when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> StrictMode andalso validate_packet_id(PacketId), #mqtt_packet_puback{packet_id = PacketId, reason_code = 0}; @@ -287,7 +289,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, < StrictMode andalso validate_packet_id(PacketId), - {Properties, <<>>} = parse_properties(Rest, Ver), + {Properties, <<>>} = parse_properties(Rest, Ver, StrictMode), #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, properties = Properties @@ -296,7 +298,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), TopicFilters = parse_topic_filters(subscribe, Rest1), ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]), #mqtt_packet_subscribe{packet_id = PacketId, @@ -307,7 +309,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), ReasonCodes = parse_reason_codes(Rest1), #mqtt_packet_suback{packet_id = PacketId, properties = Properties, @@ -317,7 +319,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBACK}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), TopicFilters = parse_topic_filters(unsubscribe, Rest1), #mqtt_packet_unsubscribe{packet_id = PacketId, properties = Properties, @@ -332,7 +334,7 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), ReasonCodes = parse_reason_codes(Rest1), #mqtt_packet_unsuback{packet_id = PacketId, properties = Properties, @@ -340,115 +342,119 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, - #{version := ?MQTT_PROTO_V5}) -> - {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), + #{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}) -> + {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode), #mqtt_packet_disconnect{reason_code = ReasonCode, properties = Properties }; parse_packet(#mqtt_packet_header{type = ?AUTH}, <>, - #{version := ?MQTT_PROTO_V5}) -> - {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), + #{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}) -> + {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode), #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}. parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, - proto_ver = Ver}, Bin) -> - {Props, Rest} = parse_properties(Bin, Ver), - {Topic, Rest1} = parse_utf8_string(Rest), + proto_ver = Ver}, + Bin, StrictMode) -> + {Props, Rest} = parse_properties(Bin, Ver, StrictMode), + {Topic, Rest1} = parse_utf8_string(Rest, StrictMode), {Payload, Rest2} = parse_binary_data(Rest1), {Packet#mqtt_packet_connect{will_props = Props, will_topic = Topic, will_payload = Payload }, Rest2}; -parse_will_message(Packet, Bin) -> {Packet, Bin}. +parse_will_message(Packet, Bin, _StrictMode) -> {Packet, Bin}. -compile({inline, [parse_packet_id/1]}). parse_packet_id(<>) -> {PacketId, Rest}. -parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> +parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 -> {#{}, Bin}; %% TODO: version mess? -parse_properties(<<>>, ?MQTT_PROTO_V5) -> +parse_properties(<<>>, ?MQTT_PROTO_V5, _StrictMode) -> {#{}, <<>>}; -parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5) -> +parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5, _StrictMode) -> {#{}, Rest}; -parse_properties(Bin, ?MQTT_PROTO_V5) -> +parse_properties(Bin, ?MQTT_PROTO_V5, StrictMode) -> {Len, Rest} = parse_variable_byte_integer(Bin), <> = Rest, - {parse_property(PropsBin, #{}), Rest1}. + {parse_property(PropsBin, #{}, StrictMode), Rest1}. -parse_property(<<>>, Props) -> +parse_property(<<>>, Props, _StrictMode) -> Props; -parse_property(<<16#01, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}); -parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}); -parse_property(<<16#03, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Content-Type' => Val}); -parse_property(<<16#08, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Response-Topic' => Val}); -parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Correlation-Data' => Val}); -parse_property(<<16#0B, Bin/binary>>, Props) -> +parse_property(<<16#01, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}, StrictMode); +parse_property(<<16#02, Val:32/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}, StrictMode); +parse_property(<<16#03, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Content-Type' => Val}, StrictMode); +parse_property(<<16#08, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Response-Topic' => Val}, StrictMode); +parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Correlation-Data' => Val}, StrictMode); +parse_property(<<16#0B, Bin/binary>>, Props, StrictMode) -> {Val, Rest} = parse_variable_byte_integer(Bin), - parse_property(Rest, Props#{'Subscription-Identifier' => Val}); -parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}); -parse_property(<<16#12, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}); -parse_property(<<16#13, Val:16, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Server-Keep-Alive' => Val}); -parse_property(<<16#15, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Authentication-Method' => Val}); -parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Authentication-Data' => Val}); -parse_property(<<16#17, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Request-Problem-Information' => Val}); -parse_property(<<16#18, Val:32, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Will-Delay-Interval' => Val}); -parse_property(<<16#19, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Request-Response-Information' => Val}); -parse_property(<<16#1A, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Response-Information' => Val}); -parse_property(<<16#1C, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Server-Reference' => Val}); -parse_property(<<16#1F, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Reason-String' => Val}); -parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Receive-Maximum' => Val}); -parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}); -parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Topic-Alias' => Val}); -parse_property(<<16#24, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Maximum-QoS' => Val}); -parse_property(<<16#25, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Retain-Available' => Val}); -parse_property(<<16#26, Bin/binary>>, Props) -> - {Pair, Rest} = parse_utf8_pair(Bin), + parse_property(Rest, Props#{'Subscription-Identifier' => Val}, StrictMode); +parse_property(<<16#11, Val:32/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}, StrictMode); +parse_property(<<16#12, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}, StrictMode); +parse_property(<<16#13, Val:16, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Server-Keep-Alive' => Val}, StrictMode); +parse_property(<<16#15, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Authentication-Method' => Val}, StrictMode); +parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Authentication-Data' => Val}, StrictMode); +parse_property(<<16#17, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Request-Problem-Information' => Val}, StrictMode); +parse_property(<<16#18, Val:32, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Will-Delay-Interval' => Val}, StrictMode); +parse_property(<<16#19, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Request-Response-Information' => Val}, StrictMode); +parse_property(<<16#1A, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Response-Information' => Val}, StrictMode); +parse_property(<<16#1C, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Server-Reference' => Val}, StrictMode); +parse_property(<<16#1F, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Reason-String' => Val}, StrictMode); +parse_property(<<16#21, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Receive-Maximum' => Val}, StrictMode); +parse_property(<<16#22, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}, StrictMode); +parse_property(<<16#23, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Topic-Alias' => Val}, StrictMode); +parse_property(<<16#24, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Maximum-QoS' => Val}, StrictMode); +parse_property(<<16#25, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Retain-Available' => Val}, StrictMode); +parse_property(<<16#26, Bin/binary>>, Props, StrictMode) -> + {Pair, Rest} = parse_utf8_pair(Bin, StrictMode), case maps:find('User-Property', Props) of {ok, UserProps} -> UserProps1 = lists:append(UserProps, [Pair]), - parse_property(Rest, Props#{'User-Property' := UserProps1}); + parse_property(Rest, Props#{'User-Property' := UserProps1}, StrictMode); error -> - parse_property(Rest, Props#{'User-Property' => [Pair]}) + parse_property(Rest, Props#{'User-Property' => [Pair]}, StrictMode) end; -parse_property(<<16#27, Val:32, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}); -parse_property(<<16#28, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}); -parse_property(<<16#29, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}); -parse_property(<<16#2A, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}). +parse_property(<<16#27, Val:32, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}, StrictMode); +parse_property(<<16#28, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}, StrictMode); +parse_property(<<16#29, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}, StrictMode); +parse_property(<<16#2A, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}, StrictMode); +parse_property(<>, _Props, _StrictMode) -> + error(#{invalid_property_code => Property}). +%% TODO: invalid property in specific packet. parse_variable_byte_integer(Bin) -> parse_variable_byte_integer(Bin, 1, 0). @@ -470,20 +476,53 @@ parse_topic_filters(unsubscribe, Bin) -> parse_reason_codes(Bin) -> [Code || <> <= Bin]. -parse_utf8_pair(<>) -> - {{Key, Val}, Rest}. +%%-------------------- +%% parse utf8 pair +parse_utf8_pair( <> + , true) -> + {{validate_utf8(Key), validate_utf8(Val)}, Rest}; +parse_utf8_pair( <> + , false) -> + {{Key, Val}, Rest}; +parse_utf8_pair(<>, _StrictMode) + when LenK > byte_size(Rest) -> + error(user_property_not_enough_bytes); +parse_utf8_pair(<>, _StrictMode) + when LenV > byte_size(Rest) -> + error(malformed_user_property_value); +parse_utf8_pair(Bin, _StrictMode) + when 4 > byte_size(Bin) -> + error(user_property_not_enough_bytes). -parse_utf8_string(Bin, false) -> +%%-------------------- +%% parse utf8 string +parse_utf8_string(Bin, _StrictMode, false) -> {undefined, Bin}; -parse_utf8_string(Bin, true) -> - parse_utf8_string(Bin). +parse_utf8_string(Bin, StrictMode, true) -> + parse_utf8_string(Bin, StrictMode). -parse_utf8_string(<>) -> - {Str, Rest}. +parse_utf8_string(<>, true) -> + {validate_utf8(Str), Rest}; +parse_utf8_string(<>, false) -> + {Str, Rest}; +parse_utf8_string(<>, _) + when Len > byte_size(Rest) -> + error(malformed_utf8_string); +parse_utf8_string(Bin, _) + when 2 > byte_size(Bin) -> + error(malformed_utf8_string_length). parse_binary_data(<>) -> - {Data, Rest}. + {Data, Rest}; +parse_binary_data(<>) + when Len > byte_size(Rest) -> + error(malformed_binary_data); +parse_binary_data(Bin) + when 2 > byte_size(Bin) -> + error(malformed_binary_data_length). %%-------------------------------------------------------------------- %% Serialize MQTT Packet @@ -821,3 +860,52 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. + +validate_utf8(Bin) -> + case unicode:characters_to_binary(Bin) of + {error, _, _} -> + error(utf8_string_invalid); + {incomplete, _, _} -> + error(utf8_string_invalid); + Bin when is_binary(Bin) -> + case validate_mqtt_utf8_char(Bin) of + true -> Bin; + false -> error(utf8_string_invalid) + end + end. + +%% Is the utf8 string respecting UTF-8 characters defined by MQTT Spec? +%% i.e. contains invalid UTF-8 char or control char +validate_mqtt_utf8_char(<<>>) -> + true; +%% ==== 1-Byte UTF-8 invalid: [[U+0000 .. U+001F] && [U+007F]] +validate_mqtt_utf8_char(<>) + when B1 >= 16#20, B1 =< 16#7E -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<>) + when B1 >= 16#00, B1 =< 16#1F; + B1 =:= 16#7F -> + %% [U+0000 .. U+001F] && [U+007F] + false; +%% ==== 2-Bytes UTF-8 invalid: [U+0080 .. U+009F] +validate_mqtt_utf8_char(<>) + when B1 =:= 16#C2; + B2 >= 16#A0, B2 =< 16#BF; + B1 > 16#C3, B1 =< 16#DE; + B2 >= 16#80, B2 =< 16#BF -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<<16#C2, B2, _Bs/binary>>) + when B2 >= 16#80, B2 =< 16#9F -> + %% [U+0080 .. U+009F] + false; +%% ==== 3-Bytes UTF-8 invalid: [U+D800 .. U+DFFF] +validate_mqtt_utf8_char(<>) + when B1 >= 16#E0, B1 =< 16#EE; + B1 =:= 16#EF -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<<16#ED, _B2, _B3, _Bs/binary>>) -> + false; +%% ==== 4-Bytes UTF-8 +validate_mqtt_utf8_char(<>) + when B1 =:= 16#0F -> + validate_mqtt_utf8_char(Bs). From dce513df0e1cf0cdccd32cf6d3cfbec880983095 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 10 Mar 2022 11:39:44 +0800 Subject: [PATCH 06/39] test(frame): malformed utf-8 packet --- test/emqx_frame_SUITE.erl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 4b43c8506..d98786e99 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -45,6 +45,7 @@ groups() -> t_parse_frame_too_large, t_parse_frame_malformed_variable_byte_integer, t_parse_frame_variable_byte_integer, + t_parse_malformed_utf8_string, t_parse_frame_proxy_protocol %% proxy_protocol_config_disabled packet. ]}, {connect, [parallel], @@ -144,6 +145,23 @@ t_parse_frame_variable_byte_integer(_) -> ?catch_error(malformed_variable_byte_integer, emqx_frame:parse_variable_byte_integer(Bin)). +t_parse_malformed_utf8_string(_) -> + MalformedPacket = <<16,31,0,4, + %% Specification name, should be "MQTT" + %% 77,81,84,84, + %% malformed 1-Byte UTF-8 in (U+0000 .. U+001F] && [U+007F]) + 16#00,16#01,16#1F,16#7F, + + 4,194,0,60, + 0,4,101,109, + 113,120,0,5, + 97,100,109,105, + 110,0,6,112, + 117,98,108,105, + 99>>, + ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), + ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). + t_parse_frame_proxy_protocol(_) -> BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">> , <<"\r\n\r\n\0\r\nQUIT\n">>], From 8ea84e4a01c836ab8c79165ecc1c6837b8f21f0e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 10 Mar 2022 12:12:42 +0800 Subject: [PATCH 07/39] chore(appup): update appup.src --- src/emqx.appup.src | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 542c02aa1..95e2f48f2 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,7 @@ {VSN, [{"4.3.13", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -15,6 +16,7 @@ {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -405,6 +407,7 @@ {<<".*">>,[]}], [{"4.3.13", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -417,6 +420,7 @@ {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, From 892600f43f7dd38721e4fb43eec0fe354aa684a9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 10 Mar 2022 13:12:35 +0800 Subject: [PATCH 08/39] docs: update CHANGES-4.3.md --- CHANGES-4.3.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 9063c7129..efd3c5db7 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -23,8 +23,9 @@ File format: to force an immediate reload of all certificates after the files are updated on disk. * Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983] * Force shutdown of processe that cannot answer takeover event [#7026] - * `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter) +* Add UTF-8 string validity check in `strict_mode` for MQTT packet. + When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261] ### Bug fixes From f82550ddc6375d3dcf76b4e2bd91b26b343e5b37 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 10 Mar 2022 23:01:58 +0800 Subject: [PATCH 09/39] chore: Prompt user how to change the dashboard default password when emqx start --- CHANGES-4.3.md | 1 + .../src/emqx_dashboard_admin.erl | 32 ++++++++++++++++--- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index efd3c5db7..6f991e9c0 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -39,6 +39,7 @@ File format: * Fix the MQTT-SN message replay when the topic is not registered to the client [#6970] * Fix rpc get node info maybe crash when other nodes is not ready. * Fix false alert level log “cannot_find_plugins” caused by duplicate plugin names in `loaded_plugins` files. +* Prompt user how to change the dashboard's initial default password when emqx start. ## v4.3.12 ### Important changes diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl index 94c5c3cda..c9e9bee3c 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -21,6 +21,7 @@ -behaviour(gen_server). -include("emqx_dashboard.hrl"). +-include_lib("emqx/include/logger.hrl"). -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). @@ -218,11 +219,34 @@ binenv(Key) -> iolist_to_binary(application:get_env(emqx_dashboard, Key, "")). add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) -> - igonre; + ignore; add_default_user(Username, Password) -> case lookup_user(Username) of [] -> add_user(Username, Password, <<"administrator">>); - _ -> ok - end. - + _ -> + case check(Username, Password) of + ok -> + ?LOG(warning, + "[Dashboard] The initial default password for dashboard 'admin' user in emqx_dashboard.conf\n" + "For safety, it should be changed as soon as possible.\n" + "Please use the './bin/emqx_ctl admins' CLI to change it.\n" + "Then remove `dashboard.default_user.login/password` from emqx_dashboard.conf" + ); + {error, _} -> + %% We can't force add default, + %% otherwise passwords that have been updated via HTTP API will be reset after reboot. + ?LOG(warning, + "[Dashboard] dashboard.default_user.password in the plugins/emqx_dashboard.conf\n" + "does not match the password in the database(mnesia).\n" + "1. If you have already changed the password via the HTTP API or `./bin/emqx_ctl admins`," + "this warning has no effect.\n" + "You should remove the `dashboard.default_user.login/password` from emqx_dashboard.conf " + "to resolve this warning.\n" + "2. If you just want to update the password by manually changing the configuration file,\n" + "you need to delete the old user and password using `emqx_ctl admins del ~s` first\n" + "the new password in emqx_dashboard.conf can take effect after reboot.", + []) + end + end, + ok. From 223642e62b50247a78e62cacb30a71e156e89b62 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 14 Mar 2022 07:59:38 +0100 Subject: [PATCH 10/39] chore: ensure bash wraper in emqx.service, restart wait 60 seconds --- CHANGES-4.3.md | 2 ++ deploy/packages/emqx.service | 13 ++++++++++--- deploy/packages/rpm/emqx.service | 18 +----------------- 3 files changed, 13 insertions(+), 20 deletions(-) mode change 100644 => 120000 deploy/packages/rpm/emqx.service diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 6f991e9c0..04447e220 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -26,6 +26,7 @@ File format: * `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter) * Add UTF-8 string validity check in `strict_mode` for MQTT packet. When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261] +* Changed systemd service restart delay from 10 seconds to 60 seconds. ### Bug fixes @@ -40,6 +41,7 @@ File format: * Fix rpc get node info maybe crash when other nodes is not ready. * Fix false alert level log “cannot_find_plugins” caused by duplicate plugin names in `loaded_plugins` files. * Prompt user how to change the dashboard's initial default password when emqx start. +* Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package) ## v4.3.12 ### Important changes diff --git a/deploy/packages/emqx.service b/deploy/packages/emqx.service index ef9abfb01..def74a1a4 100644 --- a/deploy/packages/emqx.service +++ b/deploy/packages/emqx.service @@ -7,11 +7,18 @@ User=emqx Group=emqx Type=forking Environment=HOME=/var/lib/emqx -ExecStart=/usr/bin/emqx start + +# Must use a 'bash' wrap for some OS +# errno=13 'Permission denied' +# Cannot create FIFO ... for writing +ExecStart=bash /usr/bin/emqx start + LimitNOFILE=1048576 -ExecStop=/usr/bin/emqx stop +ExecStop=bash /usr/bin/emqx stop Restart=on-failure -RestartSec=5s + +# When clustered, give the peers enough time to get this node's 'DOWN' event +RestartSec=60s [Install] WantedBy=multi-user.target diff --git a/deploy/packages/rpm/emqx.service b/deploy/packages/rpm/emqx.service deleted file mode 100644 index ef9abfb01..000000000 --- a/deploy/packages/rpm/emqx.service +++ /dev/null @@ -1,17 +0,0 @@ -[Unit] -Description=emqx daemon -After=network.target - -[Service] -User=emqx -Group=emqx -Type=forking -Environment=HOME=/var/lib/emqx -ExecStart=/usr/bin/emqx start -LimitNOFILE=1048576 -ExecStop=/usr/bin/emqx stop -Restart=on-failure -RestartSec=5s - -[Install] -WantedBy=multi-user.target diff --git a/deploy/packages/rpm/emqx.service b/deploy/packages/rpm/emqx.service new file mode 120000 index 000000000..2fc64d79d --- /dev/null +++ b/deploy/packages/rpm/emqx.service @@ -0,0 +1 @@ +../emqx.service \ No newline at end of file From 55eddfa16cc26f7d93a7c82d6f0439485d8fab48 Mon Sep 17 00:00:00 2001 From: Chris Date: Mon, 14 Mar 2022 10:18:18 +0100 Subject: [PATCH 11/39] fix: typo in log message --- src/emqx_cm.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 3a12b5a24..4451d43d0 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -399,7 +399,7 @@ kick_session(Action, ClientId, ChanPid) -> kick_session(ClientId) -> case lookup_channels(ClientId) of [] -> - ?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]), + ?LOG(warning, "kicked_an_unknown_session ~ts", [ClientId]), ok; ChanPids -> case length(ChanPids) > 1 of From 440523138bd653b37d0729884e35b940341fd16b Mon Sep 17 00:00:00 2001 From: Chris Date: Mon, 14 Mar 2022 09:20:10 +0100 Subject: [PATCH 12/39] fix: prevent crash when mongodb connection times out --- apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src | 2 +- apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src | 10 ++++++++-- apps/emqx_auth_mongo/src/emqx_auth_mongo.erl | 3 +++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src index bf9a5e54c..9eb14fd7d 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_mongo, [{description, "EMQ X Authentication/ACL with MongoDB"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_mongo_sup]}, {applications, [kernel,stdlib,mongodb,ecpool]}, diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src index 24e29d65c..449125709 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src @@ -1,11 +1,17 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.0", + [{"4.3.1", + [{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, + {"4.3.0", [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.0", + [{"4.3.1", + [{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, + {"4.3.0", [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}] }. diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl index cd1d21b42..307aa3f7f 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl @@ -129,6 +129,9 @@ query_multi(Pool, Collection, SelectorList) -> lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) -> Batch = ecpool:with_client(Pool, fun(Conn) -> case mongo_api:find(Conn, Collection, Selector, #{}) of + {error, Reason} -> + ?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]), + []; [] -> []; {ok, Cursor} -> mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000) From 3201d1121266020a12a3508b5e0ef290b1b19548 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 17:44:42 +0800 Subject: [PATCH 13/39] feat(mqttsn): introduce subs_resume option As the mqtt-sn v1.2 spec metioned, the gateway will be able to sync the subscriptions topic-name registry to client when the client resume it's session port from: https://github.com/emqx/emqx-sn/pull/195 --- apps/emqx_sn/etc/emqx_sn.conf | 7 ++ apps/emqx_sn/priv/emqx_sn.schema | 5 + apps/emqx_sn/src/emqx_sn_frame.erl | 4 +- apps/emqx_sn/src/emqx_sn_gateway.erl | 144 +++++++++++++++++++++++++-- 4 files changed, 151 insertions(+), 9 deletions(-) diff --git a/apps/emqx_sn/etc/emqx_sn.conf b/apps/emqx_sn/etc/emqx_sn.conf index 6572812c1..655ef4028 100644 --- a/apps/emqx_sn/etc/emqx_sn.conf +++ b/apps/emqx_sn/etc/emqx_sn.conf @@ -51,3 +51,10 @@ mqtt.sn.username = mqtt_sn_user ## ## Value: String mqtt.sn.password = abc + +## Whether to initiate all subscribed topic registration messages to the +## client after the Session has been taken over by a new channel. +## +## Value: Boolean +## Default: false +#mqtt.sn.subs_resume = false diff --git a/apps/emqx_sn/priv/emqx_sn.schema b/apps/emqx_sn/priv/emqx_sn.schema index a585c1037..bd0995c77 100644 --- a/apps/emqx_sn/priv/emqx_sn.schema +++ b/apps/emqx_sn/priv/emqx_sn.schema @@ -56,6 +56,11 @@ end}. {datatype, string} ]}. +{mapping, "mqtt.sn.subs_resume", "emqx_sn.subs_resume", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx_sn.username", fun(Conf) -> Username = cuttlefish:conf_get("mqtt.sn.username", Conf), list_to_binary(Username) diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_sn/src/emqx_sn_frame.erl index 28a20956e..7ec18dd4a 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_sn/src/emqx_sn_frame.erl @@ -339,9 +339,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> format(?SN_PINGREQ_MSG(ClientId)) -> io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); format(?SN_PINGRESP_MSG()) -> - "SN_PINGREQ()"; + "SN_PINGRESP()"; format(?SN_DISCONNECT_MSG(Duration)) -> - io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); + io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]); format(#mqtt_sn_message{type = Type, variable = Var}) -> io_lib:format("mqtt_sn_message(type=~s, Var=~w)", diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 265607229..d5c574f8c 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -48,6 +48,7 @@ , wait_for_will_topic/3 , wait_for_will_msg/3 , connected/3 + , registering/3 , asleep/3 , awake/3 ]). @@ -96,7 +97,9 @@ has_pending_pingresp = false :: boolean(), %% Store all qos0 messages for waiting REGACK %% Note: QoS1/QoS2 messages will kept inflight queue - pending_topic_ids = #{} :: pending_msgs() + pending_topic_ids = #{} :: pending_msgs(), + waiting_sync_topics = [], + previous_outgoings_and_state = undefined }). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). @@ -126,6 +129,9 @@ Reason =:= asleep_timeout; Reason =:= keepalive_timeout). +-define(RETRY_TIMEOUT, 5000). +-define(MAX_RETRY_TIMES, 3). + %%-------------------------------------------------------------------- %% Exported APIs %%-------------------------------------------------------------------- @@ -379,6 +385,13 @@ connected(cast, {outgoing, Packet}, State) -> connected(cast, {connack, ConnAck}, State) -> {keep_state, handle_outgoing(ConnAck, State)}; +connected(cast, {register, TopicNames, BlockedOutgoins}, State) -> + NState = State#state{ + waiting_sync_topics = TopicNames, + previous_outgoings_and_state = {BlockedOutgoins, ?FUNCTION_NAME} + }, + {next_state, registering, NState, [next_event(shooting)]}; + connected(cast, {shutdown, Reason, Packet}, State) -> stop(Reason, handle_outgoing(Packet, State)); @@ -392,6 +405,80 @@ connected(cast, {close, Reason}, State) -> connected(EventType, EventContent, State) -> handle_event(EventType, EventContent, connected, State). +registering(cast, shooting, + State = #state{ + channel = Channel, + waiting_sync_topics = [], + previous_outgoings_and_state = {Outgoings, StateName}}) -> + Session = emqx_channel:get_session(Channel), + ClientInfo = emqx_channel:info(clientinfo, Channel), + {Outgoings2, NChannel} = + case emqx_session:dequeue(ClientInfo, Session) of + {ok, NSession} -> + {[], emqx_channel:set_session(NSession, Channel)}; + {ok, Pubs, NSession} -> + emqx_channel:do_deliver( + Pubs, + emqx_channel:set_session(NSession, Channel) + ) + end, + NState = State#state{ + channel = NChannel, + previous_outgoings_and_state = undefined}, + {next_state, StateName, NState, outgoing_events(Outgoings ++ Outgoings2)}; + +registering(cast, shooting, + State = #state{ + clientid = ClientId, + waiting_sync_topics = [TopicName | Remainings]}) -> + TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid ? + State#state{waiting_sync_topics = [{TopicId, TopicName, 0} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(debug, "Register topic name ~s with id ~w successfully!", [TopicName, TopicId]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(error, "client does not accept register TopicName=~s, TopicId=~p, MsgId=~p, ReturnCode=~p", + [TopicName, TopicId, MsgId, ReturnCode]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, Packet}, + State = #state{previous_outgoings_and_state = {_, StateName}}) + when is_record(Packet, mqtt_sn_message) -> + apply(?MODULE, StateName, [cast, {incoming, Packet}, State]); + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, Times} | Remainings]}) + when Times < ?MAX_RETRY_TIMES -> + ?LOG(warning, "Waiting REGACK timeout for TopicName=~s, TopicId=~w, try it again(~w)", + [TopicName, TopicId, Times+1]), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid? + State#state{waiting_sync_topics = [{TopicId, TopicName, Times + 1} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, ?MAX_RETRY_TIMES} | _]}) -> + ?LOG(error, "Retry register TopicName=~s, TopicId=~w reached the max retry times", + [TopicId, TopicName]), + NState = send_message(?SN_DISCONNECT_MSG(undefined), State), + stop(reached_max_retry_times, NState); + +registering(EventType, EventContent, State) -> + handle_event(EventType, EventContent, ?FUNCTION_NAME, State). + asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> State0 = send_message(?SN_DISCONNECT_MSG(undefined), State), case Duration of @@ -519,10 +606,13 @@ handle_event(info, {datagram, SockPid, Data}, StateName, stop(frame_error, State) end; -handle_event(info, {deliver, _Topic, Msg}, asleep, - State = #state{channel = Channel}) -> +handle_event(info, {deliver, _Topic, Msg}, StateName, + State = #state{channel = Channel}) + when StateName == alseep; + StateName == registering -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), + ?LOG(debug, "enqueue downlink message in ~s state, msg: ~0p", + [StateName, Msg]), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -643,6 +733,9 @@ outgoing_event(Packet) when is_record(Packet, mqtt_packet); outgoing_event(Action) -> next_event(Action). +next_event(Content) -> + {next_event, cast, Content}. + close_socket(State = #state{sockstate = closed}) -> State; close_socket(State = #state{socket = _Socket}) -> %ok = gen_udp:close(Socket), @@ -1058,6 +1151,38 @@ handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake, Result = channel_handle_in(Packet, State), handle_return(Result, State, [try_goto_asleep]); +handle_incoming( + #mqtt_packet{ + variable = #mqtt_packet_connect{ + clean_start = false} + } = Packet, + _, + State) -> + Result = channel_handle_in(Packet, State), + case {subs_resume(), Result} of + {true, {ok, Replies, NChannel}} -> + case maps:get( + subscriptions, + emqx_channel:info(session, NChannel) + ) of + Subs when map_size(Subs) == 0 -> + handle_return(Result, State); + Subs -> + TopicNames = lists:filter( + fun(T) -> not emqx_topic:wildcard(T) + end, maps:keys(Subs)), + {ConnackEvents, Outgoings} = split_connack_replies( + Replies), + Events = outgoing_events( + ConnackEvents ++ + [{register, TopicNames, Outgoings}] + ), + {keep_state, State#state{channel = NChannel}, Events} + end; + _ -> + handle_return(Result, State) + end; + handle_incoming(Packet, _StName, State) -> Result = channel_handle_in(Packet, State), handle_return(Result, State). @@ -1167,9 +1292,6 @@ inc_outgoing_stats(Type) -> false -> ok end. -next_event(Content) -> - {next_event, cast, Content}. - inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. @@ -1183,3 +1305,11 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) -> State; maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). + +subs_resume() -> + application:get_env(emqx_sn, subs_resume, false). + +%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}] +split_connack_replies([A = {event, connected}, + B = {connack, _ConnAck} | Outgoings]) -> + {[A, B], Outgoings}. From d4c1b3acc634fcd796149620a749c8d02c5cc99b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 17:45:46 +0800 Subject: [PATCH 14/39] test(mqttsn): more tests for topic register and subs_resume --- apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl | 427 ++++++++++++++----- 1 file changed, 324 insertions(+), 103 deletions(-) diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 9ff519fb5..9fd731f28 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -862,108 +862,6 @@ t_delivery_qos1_register_invalid_topic_id(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). -t_delivery_takeover_and_re_register(_) -> - MsgId = 1, - {ok, Socket} = gen_udp:open(0, [binary]), - send_connect_msg(Socket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), - - send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), - <<_, ?SN_SUBACK, 2#00100000, - TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), - <<_, ?SN_SUBACK, 2#01000000, - TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - _ = emqx:publish( - emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), - _ = emqx:publish( - emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), - - send_disconnect_msg(Socket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), - gen_udp:close(Socket), - - %% offline messages will be queued into the MQTT-SN session - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), - - {ok, NSocket} = gen_udp:open(0, [binary]), - send_connect_msg(NSocket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(NSocket)), - - %% qos1 - - %% received the resume messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdA, RegMsgIdA), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), - - %% qos2 - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdB, RegMsgIdB), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), - send_pubrec_msg(NSocket, MsgIdB1), - <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), - send_pubcomp_msg(NSocket, MsgIdB1), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), - - %% no more messages - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - - send_disconnect_msg(NSocket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), - gen_udp:close(NSocket). - t_will_case01(_) -> QoS = 1, Duration = 1, @@ -1725,6 +1623,326 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_register_subs_resume_on(_) -> + application:set_env(emqx_sn, subs_resume, true), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdA2), + <<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdA2), + + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdB:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB2), + <<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB2), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + application:set_env(emqx_sn, subs_resume, false), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + +t_register_subs_resume_off(_) -> + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#00100000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% qos1 + + %% received the resume messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), + + %% qos2 + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB1), + <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB1), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + +t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> + application:set_env(emqx_sn, subs_resume, true), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID), + + %% the gateway try to shutdown this client if it reached max-retry-times + %% + %% times-0 + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-1 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-2 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% just a ping + send_pingreq_msg(NSocket, <<"test">>), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)), + %% times-3 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% shutdown due to reached max retry times + timer:sleep(5000), %% RETYRY_TIMEOUT + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), + application:set_env(emqx_sn, subs_resume, false), + gen_udp:close(NSocket). + +t_register_enqueue_delivering_messages(_) -> + application:set_env(emqx_sn, subs_resume, true), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + emqx_logger:set_log_level(debug), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + application:set_env(emqx_sn, subs_resume, false), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- @@ -1816,9 +2034,12 @@ send_register_msg(Socket, TopicName, MsgId) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket). send_regack_msg(Socket, TopicId, MsgId) -> + send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED). + +send_regack_msg(Socket, TopicId, MsgId, Rc) -> Length = 7, MsgType = ?SN_REGACK, - Packet = <>, + Packet = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) -> From fcf1178f3b0e80c34fee70e707eba55ca26937f7 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 17:48:17 +0800 Subject: [PATCH 15/39] chore: update CHANGES-4.3.md --- CHANGES-4.3.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 04447e220..a8f0ac8ba 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -27,6 +27,7 @@ File format: * Add UTF-8 string validity check in `strict_mode` for MQTT packet. When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261] * Changed systemd service restart delay from 10 seconds to 60 seconds. +* MQTT-SN gateway supports initiative to synchronize registered topics after session resumed. [#7300] ### Bug fixes From f8b7b9415d16d13ba594355e24f59eff7e8942c1 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 18:36:23 +0800 Subject: [PATCH 16/39] chore(mqttsn): update appup.src --- apps/emqx_sn/src/emqx_sn.appup.src | 34 +++++++++++++++++++++------- apps/emqx_sn/src/emqx_sn_gateway.erl | 25 ++++++++++++++++++-- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 6a4eb66d1..7b0bc91c3 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,26 +1,44 @@ %% -*- mode: erlang -*- {VSN, [ - {<<"4\\.3\\.[4-5]">>,[ + {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {update,emqx_sn_gateway,{advance,["4.3.5"]}} ]}, - {<<"4.3.[2-3]">>,[ + {"4.3.4",[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advance,["4.3.4"]}} + ]}, + {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {update,emqx_sn_gateway,{advance, ["4.3.3"]}} + ]}, + {"4.3.2",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advance, ["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ - {<<"4\\.3\\.[4-5]">>,[ + {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {update,emqx_sn_gateway,{advance,["4.3.5"]}} ]}, - {<<"4.3.[2-3]">>,[ + {"4.3.4",[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advance,["4.3.4"]}} + ]}, + {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {update,emqx_sn_gateway,{advance, ["4.3.3"]}} + ]}, + {"4.3.2",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advance, ["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ]}. diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index d5c574f8c..3e8c18dc9 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -683,8 +683,29 @@ terminate(Reason, _StateName, #state{channel = Channel}) -> emqx_channel:terminate(Reason, Channel), ok. -code_change(_Vsn, StateName, State, _Extra) -> - {ok, StateName, State}. +%% in the emqx_sn:v4.3.6, we have added two new fields in the state last: +%% - waiting_sync_topics +%% - previous_outgoings_and_state +code_change({down, _Vsn}, StateName, State, [ToVsn]) -> + case re:run(ToVsn, "4\\.3\\.[2-5]") of + {match, _} -> + NState0 = lists:droplast(lists:droplast(tuple_to_list(State))), + NState = list_to_tuple(lists:reverse(NState0)), + {ok, StateName, NState}; + _ -> + {ok, StateName, State} + end; + +code_change(_Vsn, StateName, State, [FromVsn]) -> + case re:run(FromVsn, "4\\.3\\.[2-5]") of + {match, _} -> + NState = list_to_tuple( + tuple_to_list(State) ++ [[], undefined] + ), + {ok, StateName, NState}; + _ -> + {ok, StateName, State} + end. %%-------------------------------------------------------------------- %% Handle Call/Info From 3823ab8693420892f0b32be78cc29d007a70d5d6 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 14 Mar 2022 19:10:05 +0800 Subject: [PATCH 17/39] fix: typos in emqx_sn.appup.src --- apps/emqx_sn/src/emqx_sn.appup.src | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 7b0bc91c3..7a8679db1 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -3,42 +3,42 @@ [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance,["4.3.5"]}} + {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance,["4.3.4"]}} + {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance, ["4.3.3"]}} + {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance, ["4.3.2"]}} + {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance,["4.3.5"]}} + {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance,["4.3.4"]}} + {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance, ["4.3.3"]}} + {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {update,emqx_sn_gateway,{advance, ["4.3.2"]}} + {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ]}. From af65310ce77f680e6573c0379bdc63a2ff42b5a7 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 10:40:37 +0800 Subject: [PATCH 18/39] chore(script): update_appup.escript support the update cmd We often use the advanced directive `update` when hot upgrading gen_server, gen_statem, and other such processes, and it will be parsed as: ``` {suspend,[Mod]}, {load,{Mod,brutal_purge,brutal_purge}}, {code_change,up,[{Mod,[Extra]}]}, {resume,[Mod]}, ``` So, we should treat the update instruction as having completed the upgrade of this module. --- scripts/update_appup.escript | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 9083b62f1..840f63509 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -324,6 +324,8 @@ process_old_action({add_module, Module}) -> [Module]; process_old_action({delete_module, Module}) -> [Module]; +process_old_action({update, Module, _Change}) -> + [Module]; process_old_action(LoadModule) when is_tuple(LoadModule) andalso element(1, LoadModule) =:= load_module -> element(2, LoadModule); From a3d8981635b0179b421fe968ff530b93c30a5c1a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 11:28:40 +0800 Subject: [PATCH 19/39] refactor(mqttsn): assign subs_resume to mqtt-sn client process state --- apps/emqx_sn/src/emqx_sn.appup.src | 8 ++++++ apps/emqx_sn/src/emqx_sn_app.erl | 2 ++ apps/emqx_sn/src/emqx_sn_gateway.erl | 20 ++++++++------- apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl | 26 +++++++++++--------- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 7a8679db1..269605afa 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -3,20 +3,24 @@ [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} @@ -24,20 +28,24 @@ [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_sn/src/emqx_sn_app.erl index 9575523f8..e7c86d5b7 100644 --- a/apps/emqx_sn/src/emqx_sn_app.erl +++ b/apps/emqx_sn/src/emqx_sn_app.erl @@ -122,12 +122,14 @@ listeners_confs() -> EnableQos3 = application:get_env(emqx_sn, enable_qos3, false), EnableStats = application:get_env(emqx_sn, enable_stats, false), IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000), + SubsResume = application:get_env(emqx_sn, subs_resume, false), [{udp, ListenOn, [{gateway_id, GwId}, {username, Username}, {password, Password}, {enable_qos3, EnableQos3}, {enable_stats, EnableStats}, {idle_timeout, IdleTimeout}, + {subs_resume, SubsResume}, {max_connections, 1024000}, {max_conn_rate, 1000}, {udp_options, []}]}]. diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 3e8c18dc9..80a1339c0 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -98,6 +98,7 @@ %% Store all qos0 messages for waiting REGACK %% Note: QoS1/QoS2 messages will kept inflight queue pending_topic_ids = #{} :: pending_msgs(), + subs_resume = false, waiting_sync_topics = [], previous_outgoings_and_state = undefined }). @@ -158,6 +159,7 @@ init([{_, SockPid, Sock}, Peername, Options]) -> Password = proplists:get_value(password, Options, undefined), EnableQos3 = proplists:get_value(enable_qos3, Options, false), IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), + SubsResume = proplists:get_value(subs_resume, Options, false), EnableStats = proplists:get_value(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> @@ -174,7 +176,8 @@ init([{_, SockPid, Sock}, Peername, Options]) -> asleep_timer = emqx_sn_asleep_timer:init(), enable_stats = EnableStats, enable_qos3 = EnableQos3, - idle_timeout = IdleTimeout + idle_timeout = IdleTimeout, + subs_resume = SubsResume }, emqx_logger:set_metadata_peername(esockd:format(Peername)), {ok, idle, State, [IdleTimeout]}; @@ -689,8 +692,10 @@ terminate(Reason, _StateName, #state{channel = Channel}) -> code_change({down, _Vsn}, StateName, State, [ToVsn]) -> case re:run(ToVsn, "4\\.3\\.[2-5]") of {match, _} -> - NState0 = lists:droplast(lists:droplast(tuple_to_list(State))), - NState = list_to_tuple(lists:reverse(NState0)), + NState0 = lists:droplast( + lists:droplast( + lists:droplast(tuple_to_list(State)))), + NState = list_to_tuple(NState0), {ok, StateName, NState}; _ -> {ok, StateName, State} @@ -700,7 +705,7 @@ code_change(_Vsn, StateName, State, [FromVsn]) -> case re:run(FromVsn, "4\\.3\\.[2-5]") of {match, _} -> NState = list_to_tuple( - tuple_to_list(State) ++ [[], undefined] + tuple_to_list(State) ++ [false, [], undefined] ), {ok, StateName, NState}; _ -> @@ -1178,9 +1183,9 @@ handle_incoming( clean_start = false} } = Packet, _, - State) -> + State = #state{subs_resume = SubsResume}) -> Result = channel_handle_in(Packet, State), - case {subs_resume(), Result} of + case {SubsResume, Result} of {true, {ok, Replies, NChannel}} -> case maps:get( subscriptions, @@ -1327,9 +1332,6 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) -> maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). -subs_resume() -> - application:get_env(emqx_sn, subs_resume, false). - %% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}] split_connack_replies([A = {event, connected}, B = {connack, _ConnAck} | Outgoings]) -> diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 9fd731f28..cdecc06bb 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -79,6 +79,12 @@ set_special_confs(emqx_sn) -> set_special_confs(_App) -> ok. +restart_emqx_sn(#{subs_resume := Bool}) -> + application:set_env(emqx_sn, subs_resume, Bool), + _ = application:stop(emqx_sn), + _ = application:ensure_all_started(emqx_sn), + ok. + %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- @@ -1624,7 +1630,7 @@ t_broadcast_test1(_) -> gen_udp:close(Socket). t_register_subs_resume_on(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1710,8 +1716,6 @@ t_register_subs_resume_on(_) -> %% no more messages ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket), {ok, NSocket1} = gen_udp:open(0, [binary]), send_connect_msg(NSocket1, <<"test">>), @@ -1719,7 +1723,8 @@ t_register_subs_resume_on(_) -> receive_response(NSocket1)), send_disconnect_msg(NSocket1, undefined), ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), - gen_udp:close(NSocket1). + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). t_register_subs_resume_off(_) -> MsgId = 1, @@ -1829,7 +1834,7 @@ t_register_subs_resume_off(_) -> gen_udp:close(NSocket1). t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1883,11 +1888,11 @@ t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> %% shutdown due to reached max retry times timer:sleep(5000), %% RETYRY_TIMEOUT ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket). + gen_udp:close(NSocket), + restart_emqx_sn(#{subs_resume => false}). t_register_enqueue_delivering_messages(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1932,8 +1937,6 @@ t_register_enqueue_delivering_messages(_) -> %% no more messages ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket), {ok, NSocket1} = gen_udp:open(0, [binary]), send_connect_msg(NSocket1, <<"test">>), @@ -1941,7 +1944,8 @@ t_register_enqueue_delivering_messages(_) -> receive_response(NSocket1)), send_disconnect_msg(NSocket1, undefined), ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), - gen_udp:close(NSocket1). + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). %%-------------------------------------------------------------------- %% Helper funcs From 81ed61b001a4476ceb84380ceb30e324df1c64c8 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 15 Mar 2022 11:39:23 +0100 Subject: [PATCH 20/39] feat: add load control app --- CHANGES-4.3.md | 1 + rebar.config | 1 + src/emqx.app.src | 10 +++++++++- src/emqx_os_mon.erl | 11 ++++++++++- 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 04447e220..11debe5e7 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -27,6 +27,7 @@ File format: * Add UTF-8 string validity check in `strict_mode` for MQTT packet. When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261] * Changed systemd service restart delay from 10 seconds to 60 seconds. +* Add load control app for future development. ### Bug fixes diff --git a/rebar.config b/rebar.config index 131aafebb..2003321bb 100644 --- a/rebar.config +++ b/rebar.config @@ -56,6 +56,7 @@ , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.1"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} + , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.0"}}} ]}. {xref_ignores, diff --git a/src/emqx.app.src b/src/emqx.app.src index 66ab590ca..2f58c52a8 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -9,7 +9,15 @@ {vsn, "4.3.14"}, % strict semver, bump manually! {modules, []}, {registered, []}, - {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]}, + {applications, [ kernel + , stdlib + , gproc + , gen_rpc + , esockd + , cowboy + , sasl + , lc + , os_mon]}, {mod, {emqx_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index e7565947c..a8419bc82 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -48,6 +48,7 @@ ]). -include("emqx.hrl"). +-include_lib("lc/include/lc.hrl"). -define(OS_MON, ?MODULE). @@ -88,7 +89,15 @@ get_sysmem_high_watermark() -> memsup:get_sysmem_high_watermark(). set_sysmem_high_watermark(Float) -> - memsup:set_sysmem_high_watermark(Float / 100). + V = Float/100, + case load_ctl:get_config() of + #{ ?MEM_MON_F0 := true } = OldLC -> + ok = load_ctl:put_config(OldLC#{ ?MEM_MON_F0 => true + , ?MEM_MON_F1 => V}); + _ -> + skip + end, + memsup:set_sysmem_high_watermark(V). get_procmem_high_watermark() -> memsup:get_procmem_high_watermark(). From b44512cdab40bb0ea2eb27efc36d8bcacbdd38e1 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 16 Mar 2022 10:20:09 +0800 Subject: [PATCH 21/39] fix: Add string legitimacy check. --- apps/emqx_management/src/emqx_mgmt_auth.erl | 64 +++++++++++-------- .../src/emqx_dashboard_admin.erl | 26 +++++--- .../test/emqx_dashboard_SUITE.erl | 4 +- src/emqx_misc.erl | 45 +++++++++++++ 4 files changed, 100 insertions(+), 39 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index c05cbf581..6793c9885 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -89,36 +89,45 @@ add_app(AppId, Name, Desc, Status, Expired) when is_binary(AppId) -> -> {ok, appsecret()} | {error, term()}). add_app(AppId, Name, Secret, Desc, Status, Expired) when is_binary(AppId) -> - Secret1 = generate_appsecret_if_need(Secret), - App = #mqtt_app{id = AppId, - secret = Secret1, - name = Name, - desc = Desc, - status = Status, - expired = Expired}, - AddFun = fun() -> - case mnesia:wread({mqtt_app, AppId}) of - [] -> mnesia:write(App); - _ -> mnesia:abort(alread_existed) - end - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> {ok, Secret1}; - {aborted, Reason} -> {error, Reason} + case emqx_misc:valid_str(Name) of + ok -> + Secret1 = generate_appsecret_if_need(Secret), + App = #mqtt_app{id = AppId, + secret = Secret1, + name = Name, + desc = Desc, + status = Status, + expired = Expired}, + AddFun = fun() -> + case mnesia:wread({mqtt_app, AppId}) of + [] -> mnesia:write(App); + _ -> mnesia:abort(alread_existed) + end + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> {ok, Secret1}; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. force_add_app(AppId, Name, Secret, Desc, Status, Expired) -> - AddFun = fun() -> - mnesia:write(#mqtt_app{id = AppId, - secret = Secret, - name = Name, - desc = Desc, - status = Status, - expired = Expired}) - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} + case emqx_misc:valid_str(Name) of + ok -> + AddFun = fun() -> + mnesia:write(#mqtt_app{ + id = AppId, + secret = Secret, + name = Name, + desc = Desc, + status = Status, + expired = Expired}) + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. -spec(generate_appsecret_if_need(binary() | undefined) -> binary()). @@ -207,4 +216,3 @@ is_authorized(AppId, AppSecret) -> is_expired(undefined) -> true; is_expired(Expired) -> Expired >= erlang:system_time(second). - diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl index c9e9bee3c..7ce1f564f 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -78,18 +78,24 @@ start_link() -> -spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}). add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) -> - Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, - return(mnesia:transaction(fun add_user_/1, [Admin])). + case emqx_misc:valid_str(Username) of + ok -> + Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, + return(mnesia:transaction(fun add_user_/1, [Admin])); + {error, Reason} -> {error, Reason} + end. force_add_user(Username, Password, Tags) -> - AddFun = fun() -> - mnesia:write(#mqtt_admin{username = Username, - password = Password, - tags = Tags}) - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} + case emqx_misc:valid_str(Username) of + ok -> + AddFun = fun() -> + mnesia:write(#mqtt_admin{username = Username, password = Password, tags = Tags}) + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. %% @private diff --git a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl index ef2e747fa..3bca197f0 100644 --- a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -67,6 +67,9 @@ t_overview(_) -> t_admins_add_delete(_) -> ok = emqx_dashboard_admin:add_user(<<"username">>, <<"password">>, <<"tag">>), ok = emqx_dashboard_admin:add_user(<<"username1">>, <<"password1">>, <<"tag1">>), + {error, _} = emqx_dashboard_admin:add_user(<<"1username1">>, <<"password1">>, <<"tag1">>), + {error, _} = emqx_dashboard_admin:add_user(<<"u/sername1">>, <<"password1">>, <<"tag1">>), + {error, _} = emqx_dashboard_admin:add_user(<<"/username1">>, <<"password1">>, <<"tag1">>), Admins = emqx_dashboard_admin:all_users(), ?assertEqual(3, length(Admins)), ok = emqx_dashboard_admin:remove_user(<<"username1">>), @@ -165,4 +168,3 @@ api_path(Path) -> json(Data) -> {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx. - diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index eb6a25377..8c76ff4a6 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -52,6 +52,26 @@ , hexstr2bin/1 ]). +-export([ valid_str/1 + ]). + +-define(VALID_STR_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$"). + +-spec valid_str(list() | binary()) -> ok | {error, Reason::binary()}. +valid_str(Str) -> + StrLen = len(Str), + case StrLen > 0 andalso StrLen =< 256 of + true -> + case re:run(Str, ?VALID_STR_RE) of + nomatch -> {error, <<"required: " ?VALID_STR_RE>>}; + _ -> ok + end; + false -> {error, <<"0 < Length =< 256">>} + end. + +len(Bin) when is_binary(Bin) -> byte_size(Bin); +len(Str) when is_list(Str) -> length(Str). + -define(OOM_FACTOR, 1.25). %% @doc Parse v4 or v6 string format address to tuple. @@ -309,4 +329,29 @@ hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10. ipv6_probe_test() -> ?assertEqual([{ipv6_probe, true}], ipv6_probe([])). +valid_str_test() -> + ?assertMatch({error, _}, valid_str("")), + ?assertMatch({error, _}, valid_str("_")), + ?assertMatch({error, _}, valid_str("_aaa")), + ?assertMatch({error, _}, valid_str("lkad/oddl")), + ?assertMatch({error, _}, valid_str("lkad*oddl")), + ?assertMatch({error, _}, valid_str("