diff --git a/.github/workflows/run_automate_tests.yaml b/.github/workflows/run_automate_tests.yaml index 05b978d57..aa8bfd46f 100644 --- a/.github/workflows/run_automate_tests.yaml +++ b/.github/workflows/run_automate_tests.yaml @@ -19,7 +19,7 @@ jobs: id: dload_jmeter timeout-minutes: 1 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | wget --no-verbose --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org/jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz - uses: actions/upload-artifact@v2 @@ -121,7 +121,7 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties @@ -213,7 +213,7 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties @@ -316,7 +316,7 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties @@ -413,7 +413,7 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 780e8e1be..fe6a96ed4 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -25,6 +25,14 @@ File format: * Force shutdown of processes that cannot answer takeover event [#7026] * Support set keepalive via queryString & Body HTTP API. * `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter) +* Add UTF-8 string validity check in `strict_mode` for MQTT packet. + When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261] +* Changed systemd service restart delay from 10 seconds to 60 seconds. +* MQTT-SN gateway supports initiative to synchronize registered topics after session resumed. [#7300] +* Add load control app for future development. +* Change the precision of float to 17 digits after the decimal point when formatting a + float using payload templates of rule actions. The old precision is 10 digits before + this change. ### Bug fixes @@ -38,6 +46,11 @@ File format: * Fix the MQTT-SN message replay when the topic is not registered to the client [#6970] * Fix rpc get node info maybe crash when other nodes is not ready. * Fix false alert level log “cannot_find_plugins” caused by duplicate plugin names in `loaded_plugins` files. +* Prompt user how to change the dashboard's initial default password when emqx start. +* Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package) +* Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$` +* Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`. +* Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309] ## v4.3.12 ### Important changes diff --git a/apps/emqx_auth_mongo/rebar.config b/apps/emqx_auth_mongo/rebar.config index 78442c00b..c89c15d3c 100644 --- a/apps/emqx_auth_mongo/rebar.config +++ b/apps/emqx_auth_mongo/rebar.config @@ -1,6 +1,6 @@ {deps, %% NOTE: mind poolboy version when updating mongodb-erlang version - [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}}, + [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.12"}}}, %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git %% (which has overflow_ttl feature added). %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07). diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src index 96a5bd810..de7107e99 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.appup.src @@ -2,10 +2,12 @@ {VSN, [{"4.4.0", [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.0", [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}, {load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}] }. diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl index cd1d21b42..307aa3f7f 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl @@ -129,6 +129,9 @@ query_multi(Pool, Collection, SelectorList) -> lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) -> Batch = ecpool:with_client(Pool, fun(Conn) -> case mongo_api:find(Conn, Collection, Selector, #{}) of + {error, Reason} -> + ?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]), + []; [] -> []; {ok, Cursor} -> mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000) diff --git a/apps/emqx_management/include/emqx_mgmt.hrl b/apps/emqx_management/include/emqx_mgmt.hrl index 26c566f50..8e6b8f4bb 100644 --- a/apps/emqx_management/include/emqx_mgmt.hrl +++ b/apps/emqx_management/include/emqx_mgmt.hrl @@ -31,5 +31,6 @@ -define(ERROR13, 113). %% User already exist -define(ERROR14, 114). %% OldPassword error -define(ERROR15, 115). %% bad topic +-define(ERROR16, 116). %% bad QoS -define(VERSIONS, ["4.0", "4.1", "4.2", "4.3", "4.4"]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_apps.erl b/apps/emqx_management/src/emqx_mgmt_api_apps.erl index cca0b41f0..abcfe1961 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_apps.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_apps.erl @@ -68,7 +68,7 @@ add_app(_Bindings, Params) -> end. del_app(#{appid := AppId}, _Params) -> - case emqx_mgmt_auth:del_app(AppId) of + case emqx_mgmt_auth:del_app(emqx_mgmt_util:urldecode(AppId)) of ok -> minirest:return(); {error, Reason} -> minirest:return({error, Reason}) end. @@ -77,7 +77,7 @@ list_apps(_Bindings, _Params) -> minirest:return({ok, [format(Apps)|| Apps <- emqx_mgmt_auth:list_apps()]}). lookup_app(#{appid := AppId}, _Params) -> - case emqx_mgmt_auth:lookup_app(AppId) of + case emqx_mgmt_auth:lookup_app(emqx_mgmt_util:urldecode(AppId)) of {AppId, AppSecret, Name, Desc, Status, Expired} -> minirest:return({ok, #{app_id => AppId, secret => AppSecret, @@ -94,7 +94,7 @@ update_app(#{appid := AppId}, Params) -> Desc = proplists:get_value(<<"desc">>, Params), Status = proplists:get_value(<<"status">>, Params), Expired = proplists:get_value(<<"expired">>, Params), - case emqx_mgmt_auth:update_app(AppId, Name, Desc, Status, Expired) of + case emqx_mgmt_auth:update_app(emqx_mgmt_util:urldecode(AppId), Name, Desc, Status, Expired) of ok -> minirest:return(); {error, Reason} -> minirest:return({error, Reason}) end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl index 1d89f237a..076ee366e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl @@ -154,6 +154,8 @@ do_subscribe(ClientId, _Topics, _QoS) when not is_binary(ClientId) -> {ok, ?ERROR8, <<"bad clientid: must be string">>}; do_subscribe(_ClientId, [], _QoS) -> {ok, ?ERROR15, bad_topic}; +do_subscribe(_ClientId, _Topic, QoS) when QoS =/= 0 andalso QoS =/= 1 andalso QoS =/= 2 -> + {ok, ?ERROR16, bad_qos}; do_subscribe(ClientId, Topics, QoS) -> TopicTable = parse_topic_filters(Topics, QoS), case emqx_mgmt:subscribe(ClientId, TopicTable) of diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index c05cbf581..5f2b96dff 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -16,6 +16,8 @@ -module(emqx_mgmt_auth). +-include_lib("emqx/include/logger.hrl"). + %% Mnesia Bootstrap -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). @@ -89,36 +91,45 @@ add_app(AppId, Name, Desc, Status, Expired) when is_binary(AppId) -> -> {ok, appsecret()} | {error, term()}). add_app(AppId, Name, Secret, Desc, Status, Expired) when is_binary(AppId) -> - Secret1 = generate_appsecret_if_need(Secret), - App = #mqtt_app{id = AppId, - secret = Secret1, - name = Name, - desc = Desc, - status = Status, - expired = Expired}, - AddFun = fun() -> - case mnesia:wread({mqtt_app, AppId}) of - [] -> mnesia:write(App); - _ -> mnesia:abort(alread_existed) - end - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> {ok, Secret1}; - {aborted, Reason} -> {error, Reason} + case emqx_misc:is_sane_id(AppId) of + ok -> + Secret1 = generate_appsecret_if_need(Secret), + App = #mqtt_app{id = AppId, + secret = Secret1, + name = Name, + desc = Desc, + status = Status, + expired = Expired}, + AddFun = fun() -> + case mnesia:wread({mqtt_app, AppId}) of + [] -> mnesia:write(App); + _ -> mnesia:abort(already_existed) + end + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> {ok, Secret1}; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. force_add_app(AppId, Name, Secret, Desc, Status, Expired) -> - AddFun = fun() -> - mnesia:write(#mqtt_app{id = AppId, - secret = Secret, - name = Name, - desc = Desc, - status = Status, - expired = Expired}) - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} + case emqx_misc:is_sane_id(AppId) of + ok -> + AddFun = fun() -> + mnesia:write(#mqtt_app{ + id = AppId, + secret = Secret, + name = Name, + desc = Desc, + status = Status, + expired = Expired}) + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. -spec(generate_appsecret_if_need(binary() | undefined) -> binary()). @@ -207,4 +218,3 @@ is_authorized(AppId, AppSecret) -> is_expired(undefined) -> true; is_expired(Expired) -> Expired >= erlang:system_time(second). - diff --git a/apps/emqx_management/src/emqx_mgmt_sup.erl b/apps/emqx_management/src/emqx_mgmt_sup.erl index f3f5545f2..8ad0bc963 100644 --- a/apps/emqx_management/src/emqx_mgmt_sup.erl +++ b/apps/emqx_management/src/emqx_mgmt_sup.erl @@ -27,4 +27,3 @@ start_link() -> init([]) -> {ok, {{one_for_one, 1, 5}, []}}. - diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 698cbf605..5181731d1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -172,6 +172,16 @@ , sha256/1 ]). +%% gzip Funcs +-export([ gzip/1 + , gunzip/1 + ]). + +%% zip Funcs +-export([ zip/1 + , unzip/1 + ]). + %% Data encode and decode -export([ base64_encode/1 , base64_decode/1 @@ -790,6 +800,26 @@ sha256(S) when is_binary(S) -> hash(Type, Data) -> emqx_misc:bin2hexstr_a_f_lower(crypto:hash(Type, Data)). +%%------------------------------------------------------------------------------ +%% gzip Funcs +%%------------------------------------------------------------------------------ + +gzip(S) when is_binary(S) -> + zlib:gzip(S). + +gunzip(S) when is_binary(S) -> + zlib:gunzip(S). + +%%------------------------------------------------------------------------------ +%% zip Funcs +%%------------------------------------------------------------------------------ + +zip(S) when is_binary(S) -> + zlib:zip(S). + +unzip(S) when is_binary(S) -> + zlib:unzip(S). + %%------------------------------------------------------------------------------ %% Data encode and decode Funcs %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 3791b1386..67436ea5e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -60,8 +60,8 @@ ]}). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). - -define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF +-define(FLOAT_PRECISION, 17). -type(uri_string() :: iodata()). @@ -336,12 +336,12 @@ bool(Bool) -> error({invalid_boolean, Bool}). number_to_binary(Int) when is_integer(Int) -> integer_to_binary(Int); number_to_binary(Float) when is_float(Float) -> - float_to_binary(Float, [{decimals, 10}, compact]). + float_to_binary(Float, [{decimals, ?FLOAT_PRECISION}, compact]). number_to_list(Int) when is_integer(Int) -> integer_to_list(Int); number_to_list(Float) when is_float(Float) -> - float_to_list(Float, [{decimals, 10}, compact]). + float_to_list(Float, [{decimals, ?FLOAT_PRECISION}, compact]). parse_nested(Attr) -> case string:split(Attr, <<".">>, all) of diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 32421df32..350a57051 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -609,6 +609,29 @@ prop_hash_fun() -> (64 == byte_size(apply_func(sha256, [S]))) end). + +%%------------------------------------------------------------------------------ +%% Test cases for gzip funcs +%%------------------------------------------------------------------------------ + +t_gzip_funcs(_) -> + ?PROPTEST(prop_gzip_fun). + +prop_gzip_fun() -> + ?FORALL(S, binary(), + S == apply_func(gunzip, [apply_func(gzip, [S])])). + +%%------------------------------------------------------------------------------ +%% Test cases for zip funcs +%%------------------------------------------------------------------------------ + +t_zip_funcs(_) -> + ?PROPTEST(prop_zip_fun). + +prop_zip_fun() -> + ?FORALL(S, binary(), + S == apply_func(unzip, [apply_func(zip, [S])])). + %%------------------------------------------------------------------------------ %% Test cases for base64 %%------------------------------------------------------------------------------ @@ -822,4 +845,3 @@ all() -> suite() -> [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. - diff --git a/apps/emqx_sn/etc/emqx_sn.conf b/apps/emqx_sn/etc/emqx_sn.conf index 6572812c1..655ef4028 100644 --- a/apps/emqx_sn/etc/emqx_sn.conf +++ b/apps/emqx_sn/etc/emqx_sn.conf @@ -51,3 +51,10 @@ mqtt.sn.username = mqtt_sn_user ## ## Value: String mqtt.sn.password = abc + +## Whether to initiate all subscribed topic registration messages to the +## client after the Session has been taken over by a new channel. +## +## Value: Boolean +## Default: false +#mqtt.sn.subs_resume = false diff --git a/apps/emqx_sn/priv/emqx_sn.schema b/apps/emqx_sn/priv/emqx_sn.schema index a585c1037..bd0995c77 100644 --- a/apps/emqx_sn/priv/emqx_sn.schema +++ b/apps/emqx_sn/priv/emqx_sn.schema @@ -56,6 +56,11 @@ end}. {datatype, string} ]}. +{mapping, "mqtt.sn.subs_resume", "emqx_sn.subs_resume", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx_sn.username", fun(Conf) -> Username = cuttlefish:conf_get("mqtt.sn.username", Conf), list_to_binary(Username) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 6a4eb66d1..269605afa 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,26 +1,52 @@ %% -*- mode: erlang -*- {VSN, [ - {<<"4\\.3\\.[4-5]">>,[ + {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, - {<<"4.3.[2-3]">>,[ + {"4.3.4",[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.4"]}} + ]}, + {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.3"]}} + ]}, + {"4.3.2",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ - {<<"4\\.3\\.[4-5]">>,[ + {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, - {<<"4.3.[2-3]">>,[ + {"4.3.4",[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.4"]}} + ]}, + {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.3"]}} + ]}, + {"4.3.2",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ]}. diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_sn/src/emqx_sn_app.erl index 9575523f8..e7c86d5b7 100644 --- a/apps/emqx_sn/src/emqx_sn_app.erl +++ b/apps/emqx_sn/src/emqx_sn_app.erl @@ -122,12 +122,14 @@ listeners_confs() -> EnableQos3 = application:get_env(emqx_sn, enable_qos3, false), EnableStats = application:get_env(emqx_sn, enable_stats, false), IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000), + SubsResume = application:get_env(emqx_sn, subs_resume, false), [{udp, ListenOn, [{gateway_id, GwId}, {username, Username}, {password, Password}, {enable_qos3, EnableQos3}, {enable_stats, EnableStats}, {idle_timeout, IdleTimeout}, + {subs_resume, SubsResume}, {max_connections, 1024000}, {max_conn_rate, 1000}, {udp_options, []}]}]. diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_sn/src/emqx_sn_frame.erl index 28a20956e..7ec18dd4a 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_sn/src/emqx_sn_frame.erl @@ -339,9 +339,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> format(?SN_PINGREQ_MSG(ClientId)) -> io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); format(?SN_PINGRESP_MSG()) -> - "SN_PINGREQ()"; + "SN_PINGRESP()"; format(?SN_DISCONNECT_MSG(Duration)) -> - io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); + io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]); format(#mqtt_sn_message{type = Type, variable = Var}) -> io_lib:format("mqtt_sn_message(type=~s, Var=~w)", diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 265607229..80a1339c0 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -48,6 +48,7 @@ , wait_for_will_topic/3 , wait_for_will_msg/3 , connected/3 + , registering/3 , asleep/3 , awake/3 ]). @@ -96,7 +97,10 @@ has_pending_pingresp = false :: boolean(), %% Store all qos0 messages for waiting REGACK %% Note: QoS1/QoS2 messages will kept inflight queue - pending_topic_ids = #{} :: pending_msgs() + pending_topic_ids = #{} :: pending_msgs(), + subs_resume = false, + waiting_sync_topics = [], + previous_outgoings_and_state = undefined }). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). @@ -126,6 +130,9 @@ Reason =:= asleep_timeout; Reason =:= keepalive_timeout). +-define(RETRY_TIMEOUT, 5000). +-define(MAX_RETRY_TIMES, 3). + %%-------------------------------------------------------------------- %% Exported APIs %%-------------------------------------------------------------------- @@ -152,6 +159,7 @@ init([{_, SockPid, Sock}, Peername, Options]) -> Password = proplists:get_value(password, Options, undefined), EnableQos3 = proplists:get_value(enable_qos3, Options, false), IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), + SubsResume = proplists:get_value(subs_resume, Options, false), EnableStats = proplists:get_value(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> @@ -168,7 +176,8 @@ init([{_, SockPid, Sock}, Peername, Options]) -> asleep_timer = emqx_sn_asleep_timer:init(), enable_stats = EnableStats, enable_qos3 = EnableQos3, - idle_timeout = IdleTimeout + idle_timeout = IdleTimeout, + subs_resume = SubsResume }, emqx_logger:set_metadata_peername(esockd:format(Peername)), {ok, idle, State, [IdleTimeout]}; @@ -379,6 +388,13 @@ connected(cast, {outgoing, Packet}, State) -> connected(cast, {connack, ConnAck}, State) -> {keep_state, handle_outgoing(ConnAck, State)}; +connected(cast, {register, TopicNames, BlockedOutgoins}, State) -> + NState = State#state{ + waiting_sync_topics = TopicNames, + previous_outgoings_and_state = {BlockedOutgoins, ?FUNCTION_NAME} + }, + {next_state, registering, NState, [next_event(shooting)]}; + connected(cast, {shutdown, Reason, Packet}, State) -> stop(Reason, handle_outgoing(Packet, State)); @@ -392,6 +408,80 @@ connected(cast, {close, Reason}, State) -> connected(EventType, EventContent, State) -> handle_event(EventType, EventContent, connected, State). +registering(cast, shooting, + State = #state{ + channel = Channel, + waiting_sync_topics = [], + previous_outgoings_and_state = {Outgoings, StateName}}) -> + Session = emqx_channel:get_session(Channel), + ClientInfo = emqx_channel:info(clientinfo, Channel), + {Outgoings2, NChannel} = + case emqx_session:dequeue(ClientInfo, Session) of + {ok, NSession} -> + {[], emqx_channel:set_session(NSession, Channel)}; + {ok, Pubs, NSession} -> + emqx_channel:do_deliver( + Pubs, + emqx_channel:set_session(NSession, Channel) + ) + end, + NState = State#state{ + channel = NChannel, + previous_outgoings_and_state = undefined}, + {next_state, StateName, NState, outgoing_events(Outgoings ++ Outgoings2)}; + +registering(cast, shooting, + State = #state{ + clientid = ClientId, + waiting_sync_topics = [TopicName | Remainings]}) -> + TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid ? + State#state{waiting_sync_topics = [{TopicId, TopicName, 0} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(debug, "Register topic name ~s with id ~w successfully!", [TopicName, TopicId]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(error, "client does not accept register TopicName=~s, TopicId=~p, MsgId=~p, ReturnCode=~p", + [TopicName, TopicId, MsgId, ReturnCode]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, Packet}, + State = #state{previous_outgoings_and_state = {_, StateName}}) + when is_record(Packet, mqtt_sn_message) -> + apply(?MODULE, StateName, [cast, {incoming, Packet}, State]); + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, Times} | Remainings]}) + when Times < ?MAX_RETRY_TIMES -> + ?LOG(warning, "Waiting REGACK timeout for TopicName=~s, TopicId=~w, try it again(~w)", + [TopicName, TopicId, Times+1]), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid? + State#state{waiting_sync_topics = [{TopicId, TopicName, Times + 1} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, ?MAX_RETRY_TIMES} | _]}) -> + ?LOG(error, "Retry register TopicName=~s, TopicId=~w reached the max retry times", + [TopicId, TopicName]), + NState = send_message(?SN_DISCONNECT_MSG(undefined), State), + stop(reached_max_retry_times, NState); + +registering(EventType, EventContent, State) -> + handle_event(EventType, EventContent, ?FUNCTION_NAME, State). + asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> State0 = send_message(?SN_DISCONNECT_MSG(undefined), State), case Duration of @@ -519,10 +609,13 @@ handle_event(info, {datagram, SockPid, Data}, StateName, stop(frame_error, State) end; -handle_event(info, {deliver, _Topic, Msg}, asleep, - State = #state{channel = Channel}) -> +handle_event(info, {deliver, _Topic, Msg}, StateName, + State = #state{channel = Channel}) + when StateName == alseep; + StateName == registering -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), + ?LOG(debug, "enqueue downlink message in ~s state, msg: ~0p", + [StateName, Msg]), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -593,8 +686,31 @@ terminate(Reason, _StateName, #state{channel = Channel}) -> emqx_channel:terminate(Reason, Channel), ok. -code_change(_Vsn, StateName, State, _Extra) -> - {ok, StateName, State}. +%% in the emqx_sn:v4.3.6, we have added two new fields in the state last: +%% - waiting_sync_topics +%% - previous_outgoings_and_state +code_change({down, _Vsn}, StateName, State, [ToVsn]) -> + case re:run(ToVsn, "4\\.3\\.[2-5]") of + {match, _} -> + NState0 = lists:droplast( + lists:droplast( + lists:droplast(tuple_to_list(State)))), + NState = list_to_tuple(NState0), + {ok, StateName, NState}; + _ -> + {ok, StateName, State} + end; + +code_change(_Vsn, StateName, State, [FromVsn]) -> + case re:run(FromVsn, "4\\.3\\.[2-5]") of + {match, _} -> + NState = list_to_tuple( + tuple_to_list(State) ++ [false, [], undefined] + ), + {ok, StateName, NState}; + _ -> + {ok, StateName, State} + end. %%-------------------------------------------------------------------- %% Handle Call/Info @@ -643,6 +759,9 @@ outgoing_event(Packet) when is_record(Packet, mqtt_packet); outgoing_event(Action) -> next_event(Action). +next_event(Content) -> + {next_event, cast, Content}. + close_socket(State = #state{sockstate = closed}) -> State; close_socket(State = #state{socket = _Socket}) -> %ok = gen_udp:close(Socket), @@ -1058,6 +1177,38 @@ handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake, Result = channel_handle_in(Packet, State), handle_return(Result, State, [try_goto_asleep]); +handle_incoming( + #mqtt_packet{ + variable = #mqtt_packet_connect{ + clean_start = false} + } = Packet, + _, + State = #state{subs_resume = SubsResume}) -> + Result = channel_handle_in(Packet, State), + case {SubsResume, Result} of + {true, {ok, Replies, NChannel}} -> + case maps:get( + subscriptions, + emqx_channel:info(session, NChannel) + ) of + Subs when map_size(Subs) == 0 -> + handle_return(Result, State); + Subs -> + TopicNames = lists:filter( + fun(T) -> not emqx_topic:wildcard(T) + end, maps:keys(Subs)), + {ConnackEvents, Outgoings} = split_connack_replies( + Replies), + Events = outgoing_events( + ConnackEvents ++ + [{register, TopicNames, Outgoings}] + ), + {keep_state, State#state{channel = NChannel}, Events} + end; + _ -> + handle_return(Result, State) + end; + handle_incoming(Packet, _StName, State) -> Result = channel_handle_in(Packet, State), handle_return(Result, State). @@ -1167,9 +1318,6 @@ inc_outgoing_stats(Type) -> false -> ok end. -next_event(Content) -> - {next_event, cast, Content}. - inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. @@ -1183,3 +1331,8 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) -> State; maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). + +%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}] +split_connack_replies([A = {event, connected}, + B = {connack, _ConnAck} | Outgoings]) -> + {[A, B], Outgoings}. diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index e78ad5938..017722a30 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -84,6 +84,12 @@ set_special_confs(emqx_sn) -> set_special_confs(_App) -> ok. +restart_emqx_sn(#{subs_resume := Bool}) -> + application:set_env(emqx_sn, subs_resume, Bool), + _ = application:stop(emqx_sn), + _ = application:ensure_all_started(emqx_sn), + ok. + %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- @@ -986,108 +992,6 @@ t_delivery_qos1_register_invalid_topic_id(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). -t_delivery_takeover_and_re_register(_) -> - MsgId = 1, - {ok, Socket} = gen_udp:open(0, [binary]), - send_connect_msg(Socket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), - - send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), - <<_, ?SN_SUBACK, 2#00100000, - TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), - <<_, ?SN_SUBACK, 2#01000000, - TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - _ = emqx:publish( - emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), - _ = emqx:publish( - emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), - - send_disconnect_msg(Socket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), - gen_udp:close(Socket), - - %% offline messages will be queued into the MQTT-SN session - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), - - {ok, NSocket} = gen_udp:open(0, [binary]), - send_connect_msg(NSocket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(NSocket)), - - %% qos1 - - %% received the resume messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdA, RegMsgIdA), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), - - %% qos2 - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdB, RegMsgIdB), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), - send_pubrec_msg(NSocket, MsgIdB1), - <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), - send_pubcomp_msg(NSocket, MsgIdB1), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), - - %% no more messages - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - - send_disconnect_msg(NSocket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), - gen_udp:close(NSocket). - t_will_case01(_) -> QoS = 1, Duration = 1, @@ -1892,6 +1796,324 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_register_subs_resume_on(_) -> + restart_emqx_sn(#{subs_resume => true}), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdA2), + <<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdA2), + + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdB:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB2), + <<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB2), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). + +t_register_subs_resume_off(_) -> + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#00100000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% qos1 + + %% received the resume messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), + + %% qos2 + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB1), + <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB1), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + +t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> + restart_emqx_sn(#{subs_resume => true}), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID), + + %% the gateway try to shutdown this client if it reached max-retry-times + %% + %% times-0 + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-1 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-2 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% just a ping + send_pingreq_msg(NSocket, <<"test">>), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)), + %% times-3 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% shutdown due to reached max retry times + timer:sleep(5000), %% RETYRY_TIMEOUT + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), + gen_udp:close(NSocket), + restart_emqx_sn(#{subs_resume => false}). + +t_register_enqueue_delivering_messages(_) -> + restart_emqx_sn(#{subs_resume => true}), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + emqx_logger:set_log_level(debug), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). + %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- @@ -1983,9 +2205,12 @@ send_register_msg(Socket, TopicName, MsgId) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket). send_regack_msg(Socket, TopicId, MsgId) -> + send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED). + +send_regack_msg(Socket, TopicId, MsgId, Rc) -> Length = 7, MsgType = ?SN_REGACK, - Packet = <>, + Packet = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) -> diff --git a/deploy/charts/emqx/templates/StatefulSet.yaml b/deploy/charts/emqx/templates/StatefulSet.yaml index ee341c743..c0f4e85ad 100644 --- a/deploy/charts/emqx/templates/StatefulSet.yaml +++ b/deploy/charts/emqx/templates/StatefulSet.yaml @@ -27,7 +27,6 @@ spec: namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/name: {{ include "emqx.name" . }} - helm.sh/chart: {{ include "emqx.chart" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/managed-by: {{ .Release.Service }} annotations: @@ -197,8 +196,23 @@ spec: httpGet: path: /status port: {{ .Values.emqxConfig.EMQX_MANAGEMENT__LISTENER__HTTP | default 8081 }} - initialDelaySeconds: 5 + initialDelaySeconds: 10 periodSeconds: 5 + failureThreshold: 30 + livenessProbe: + httpGet: + path: /status + port: {{ .Values.emqxConfig.EMQX_MANAGEMENT__LISTENER__HTTP | default 8081 }} + initialDelaySeconds: 60 + periodSeconds: 30 + failureThreshold: 10 + lifecycle: + preStop: + exec: + command: + - "/opt/emqx/bin/emqx_ctl" + - "cluster" + - "leave" {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/deploy/packages/emqx.service b/deploy/packages/emqx.service index ef9abfb01..def74a1a4 100644 --- a/deploy/packages/emqx.service +++ b/deploy/packages/emqx.service @@ -7,11 +7,18 @@ User=emqx Group=emqx Type=forking Environment=HOME=/var/lib/emqx -ExecStart=/usr/bin/emqx start + +# Must use a 'bash' wrap for some OS +# errno=13 'Permission denied' +# Cannot create FIFO ... for writing +ExecStart=bash /usr/bin/emqx start + LimitNOFILE=1048576 -ExecStop=/usr/bin/emqx stop +ExecStop=bash /usr/bin/emqx stop Restart=on-failure -RestartSec=5s + +# When clustered, give the peers enough time to get this node's 'DOWN' event +RestartSec=60s [Install] WantedBy=multi-user.target diff --git a/deploy/packages/rpm/emqx.service b/deploy/packages/rpm/emqx.service deleted file mode 100644 index ef9abfb01..000000000 --- a/deploy/packages/rpm/emqx.service +++ /dev/null @@ -1,17 +0,0 @@ -[Unit] -Description=emqx daemon -After=network.target - -[Service] -User=emqx -Group=emqx -Type=forking -Environment=HOME=/var/lib/emqx -ExecStart=/usr/bin/emqx start -LimitNOFILE=1048576 -ExecStop=/usr/bin/emqx stop -Restart=on-failure -RestartSec=5s - -[Install] -WantedBy=multi-user.target diff --git a/deploy/packages/rpm/emqx.service b/deploy/packages/rpm/emqx.service new file mode 120000 index 000000000..2fc64d79d --- /dev/null +++ b/deploy/packages/rpm/emqx.service @@ -0,0 +1 @@ +../emqx.service \ No newline at end of file diff --git a/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf index f67b88c17..2d59264a1 100644 --- a/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf +++ b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf @@ -3,12 +3,17 @@ ##-------------------------------------------------------------------- ## Default user's login name. -## +## For safety, it should be changed as soon as possible. +## Please use the './bin/emqx_ctl admins' CLI to change it. +## Then comment `dashboard.default_user.login/password` from here ## Value: String dashboard.default_user.login = admin ## Default user's password. -## +## The initial default password for 'dashboard.default_user.login'" +## For safety, it should be changed as soon as possible. +## Please use the './bin/emqx_ctl admins' CLI to change it. +## Then comment `dashboard.default_user.login/password` from here ## Value: String dashboard.default_user.password = public @@ -127,4 +132,3 @@ dashboard.listener.http.ipv6_v6only = false ## ## Value: on | off ## dashboard.listener.https.honor_cipher_order = on - diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl index 94c5c3cda..420380f88 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -21,6 +21,7 @@ -behaviour(gen_server). -include("emqx_dashboard.hrl"). +-include_lib("emqx/include/logger.hrl"). -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). @@ -77,18 +78,24 @@ start_link() -> -spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}). add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) -> - Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, - return(mnesia:transaction(fun add_user_/1, [Admin])). + case emqx_misc:is_sane_id(Username) of + ok -> + Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, + return(mnesia:transaction(fun add_user_/1, [Admin])); + {error, Reason} -> {error, Reason} + end. force_add_user(Username, Password, Tags) -> - AddFun = fun() -> - mnesia:write(#mqtt_admin{username = Username, - password = Password, - tags = Tags}) - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} + case emqx_misc:is_sane_id(Username) of + ok -> + AddFun = fun() -> + mnesia:write(#mqtt_admin{username = Username, password = Password, tags = Tags}) + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. %% @private @@ -218,11 +225,34 @@ binenv(Key) -> iolist_to_binary(application:get_env(emqx_dashboard, Key, "")). add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) -> - igonre; + ignore; add_default_user(Username, Password) -> case lookup_user(Username) of [] -> add_user(Username, Password, <<"administrator">>); - _ -> ok - end. - + _ -> + case check(Username, Password) of + ok -> + ?LOG(warning, + "[Dashboard] The initial default password for dashboard 'admin' user in emqx_dashboard.conf\n" + "For safety, it should be changed as soon as possible.\n" + "Please use the './bin/emqx_ctl admins' CLI to change it.\n" + "Then remove `dashboard.default_user.login/password` from emqx_dashboard.conf" + ); + {error, _} -> + %% We can't force add default, + %% otherwise passwords that have been updated via HTTP API will be reset after reboot. + ?LOG(warning, + "[Dashboard] dashboard.default_user.password in the plugins/emqx_dashboard.conf\n" + "does not match the password in the database(mnesia).\n" + "1. If you have already changed the password via the HTTP API or `./bin/emqx_ctl admins`," + "this warning has no effect.\n" + "You should remove the `dashboard.default_user.login/password` from emqx_dashboard.conf " + "to resolve this warning.\n" + "2. If you just want to update the password by manually changing the configuration file,\n" + "you need to delete the old user and password using `emqx_ctl admins del ~s` first\n" + "the new password in emqx_dashboard.conf can take effect after reboot.", + []) + end + end, + ok. diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl index e1c89efbb..4dbf2517f 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl @@ -77,9 +77,10 @@ auth(_Bindings, Params) -> Password = proplists:get_value(<<"password">>, Params), return(emqx_dashboard_admin:check(Username, Password)). -change_pwd(#{username := Username}, Params) -> +change_pwd(#{username := Username0}, Params) -> OldPwd = proplists:get_value(<<"old_pwd">>, Params), NewPwd = proplists:get_value(<<"new_pwd">>, Params), + Username = emqx_mgmt_util:urldecode(Username0), return(emqx_dashboard_admin:change_password(Username, OldPwd, NewPwd)). create(_Bindings, Params) -> @@ -96,14 +97,13 @@ list(_Bindings, _Params) -> update(#{name := Username}, Params) -> Tags = proplists:get_value(<<"tags">>, Params), - return(emqx_dashboard_admin:update_user(Username, Tags)). + return(emqx_dashboard_admin:update_user(emqx_mgmt_util:urldecode(Username), Tags)). delete(#{name := <<"admin">>}, _Params) -> return({error, <<"Cannot delete admin">>}); delete(#{name := Username}, _Params) -> - return(emqx_dashboard_admin:remove_user(Username)). + return(emqx_dashboard_admin:remove_user(emqx_mgmt_util:urldecode(Username))). row(#mqtt_admin{username = Username, tags = Tags}) -> #{username => Username, tags => Tags}. - diff --git a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl index 71a7692be..8a5a47a24 100644 --- a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -68,9 +68,13 @@ t_overview(_) -> t_admins_add_delete(_) -> ok = emqx_dashboard_admin:add_user(<<"username">>, <<"password">>, <<"tag">>), ok = emqx_dashboard_admin:add_user(<<"username1">>, <<"password1">>, <<"tag1">>), + ok = emqx_dashboard_admin:add_user(<<"1username1">>, <<"password1">>, <<"tag1">>), + {error, _} = emqx_dashboard_admin:add_user(<<"u/sername1">>, <<"password1">>, <<"tag1">>), + {error, _} = emqx_dashboard_admin:add_user(<<"/username1">>, <<"password1">>, <<"tag1">>), Admins = emqx_dashboard_admin:all_users(), - ?assertEqual(3, length(Admins)), + ?assertEqual(4, length(Admins)), ok = emqx_dashboard_admin:remove_user(<<"username1">>), + ok = emqx_dashboard_admin:remove_user(<<"1username1">>), Users = emqx_dashboard_admin:all_users(), ?assertEqual(2, length(Users)), ok = emqx_dashboard_admin:change_password(<<"username">>, <<"password">>, <<"pwd">>), @@ -78,13 +82,14 @@ t_admins_add_delete(_) -> ?assert(request_dashboard(get, api_path("brokers"), auth_header_("username", "pwd"))), ok = emqx_dashboard_admin:remove_user(<<"username">>), - ?assertNotEqual(true, request_dashboard(get, api_path("brokers"), auth_header_("username", "pwd"))). + ?assertNotEqual(true, request_dashboard(get, api_path("brokers"), + auth_header_("username", "pwd"))). t_rest_api(_Config) -> {ok, Res0} = http_get("users"), - - ?assertEqual([#{<<"username">> => <<"admin">>, - <<"tags">> => <<"administrator">>}], get_http_data(Res0)), + Users = get_http_data(Res0), + ?assert(lists:member(#{<<"username">> => <<"admin">>, <<"tags">> => <<"administrator">>}, + Users)), AssertSuccess = fun({ok, Res}) -> ?assertEqual(#{<<"code">> => 0}, json(Res)) diff --git a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl index ac5be58b2..53280becc 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl @@ -53,11 +53,18 @@ { key , msg }). +-type delayed_message() :: #delayed_message{}. -define(TAB, ?MODULE). -define(SERVER, ?MODULE). -define(MAX_INTERVAL, 4294967). +-type state() :: #{ publish_at := non_neg_integer() + , timer := timer:tref() | undefined + , stats_timer => timer:tref() | undefined + , stats_fun => function() + }. + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -124,7 +131,7 @@ on_message_publish(Msg) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(store(#delayed_message{}) -> ok). +-spec(store(delayed_message()) -> ok). store(DelayedMsg) -> gen_server:call(?SERVER, {store, DelayedMsg}, infinity). @@ -134,7 +141,9 @@ store(DelayedMsg) -> init([]) -> {ok, ensure_stats_event( - ensure_publish_timer(#{timer => undefined, publish_at => 0}))}. + ensure_publish_timer(#{timer => undefined, + publish_at => 0, + stats_timer => undefined}))}. handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> ok = mnesia:dirty_write(?TAB, DelayedMsg), @@ -163,8 +172,9 @@ handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #{timer := TRef}) -> - emqx_misc:cancel_timer(TRef). +terminate(_Reason, #{timer := PublishTimer} = State) -> + emqx_misc:cancel_timer(PublishTimer), + emqx_misc:cancel_timer(maps:get(stats_timer, State, undefined)). code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" -> NState = maps:with([timer, publish_at], State), @@ -179,12 +189,14 @@ code_change(Vsn, State, _Extra) when Vsn =:= "4.3.0" -> %%-------------------------------------------------------------------- %% Ensure the stats +-spec ensure_stats_event(state()) -> state(). ensure_stats_event(State) -> StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'), {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), State#{stats_fun => StatsFun, stats_timer => StatsTimer}. %% Ensure publish timer +-spec ensure_publish_timer(state()) -> state(). ensure_publish_timer(State) -> ensure_publish_timer(mnesia:dirty_first(?TAB), State). @@ -222,4 +234,3 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> -spec(delayed_count() -> non_neg_integer()). delayed_count() -> mnesia:table_info(?TAB, size). - diff --git a/lib-ce/emqx_modules/src/emqx_modules.appup.src b/lib-ce/emqx_modules/src/emqx_modules.appup.src index 45a9194ad..f3f49f24f 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.appup.src +++ b/lib-ce/emqx_modules/src/emqx_modules.appup.src @@ -1,4 +1,5 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.0", [{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, diff --git a/rebar.config b/rebar.config index cd597faed..10ba754e9 100644 --- a/rebar.config +++ b/rebar.config @@ -56,6 +56,7 @@ , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.1"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} + , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.0"}}} ]}. {xref_ignores, diff --git a/scripts/update-appup.sh b/scripts/update-appup.sh index df4f56e74..be417aafd 100755 --- a/scripts/update-appup.sh +++ b/scripts/update-appup.sh @@ -38,7 +38,9 @@ PREV_TAG="$(git describe --tag --match "${TAG_PREFIX}*" | grep -oE "${TAG_PREFIX PREV_VERSION="${PREV_TAG#[e|v]}" shift 1 -ESCRIPT_ARGS=() +# bash 3.2 treat empty array as unbound, so we can't use 'ESCRIPT_ARGS=()' here, +# but must add an empty-string element to the array +ESCRIPT_ARGS=( '' ) while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -87,12 +89,15 @@ if [ ! -d "${PREV_TAG}" ]; then fi popd +# bash 3.2 does not allow empty array, so we had to add an empty string in the ESCRIPT_ARGS array, +# this in turn makes quoting "${ESCRIPT_ARGS[@]}" problematic, hence disable SC2068 check here +# shellcheck disable=SC2068 ./scripts/update_appup.escript \ --src-dirs "${SRC_DIRS}/**" \ --release-dir "_build/${PROFILE}/lib" \ --prev-release-dir "${PREV_DIR_BASE}/${PREV_TAG}/emqx/lib" \ --skip-build \ - "${ESCRIPT_ARGS[@]}" "$PREV_VERSION" + ${ESCRIPT_ARGS[@]} "$PREV_VERSION" if [ "${IS_CHECK:-}" = 'yes' ]; then diffs="$(git diff --name-only | grep -E '\.appup\.src' || true)" diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 19b546f43..840f63509 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -272,8 +272,9 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) -> New = New0 -- AlreadyHandled, Changed = Changed0 -- AlreadyHandled, Deleted = Deleted0 -- AlreadyHandled, + HasRestart = contains_restart_application(App, OldActions), Actions = - case contains_restart_application(App, OldActions) of + case HasRestart of true -> []; false -> @@ -285,7 +286,12 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) -> OldActionsWithStop ++ Actions ++ OldActionsAfterStop ++ - [{delete_module, M} || M <- Deleted] ++ + case HasRestart of + true -> + []; + false -> + [{delete_module, M} || M <- Deleted] + end ++ AppSpecific. %% If an entry restarts an application, there's no need to use @@ -318,6 +324,8 @@ process_old_action({add_module, Module}) -> [Module]; process_old_action({delete_module, Module}) -> [Module]; +process_old_action({update, Module, _Change}) -> + [Module]; process_old_action(LoadModule) when is_tuple(LoadModule) andalso element(1, LoadModule) =:= load_module -> element(2, LoadModule); diff --git a/src/emqx.app.src b/src/emqx.app.src index 4530e071c..54b9dfc01 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -9,7 +9,15 @@ {vsn, "4.4.2"}, % strict semver, bump manually! {modules, []}, {registered, []}, - {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]}, + {applications, [ kernel + , stdlib + , gproc + , gen_rpc + , esockd + , cowboy + , sasl + , lc + , os_mon]}, {mod, {emqx_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 4678aca89..7b4d5617a 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -43,7 +43,6 @@ [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, - {delete_module,emqx_relup}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 4ac7ece05..f60da82e7 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -949,9 +949,23 @@ return_sub_unsub_ack(Packet, Channel) -> -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). -handle_call(kick, Channel) -> - Channel1 = ensure_disconnected(kicked, Channel), - disconnect_and_shutdown(kicked, ok, Channel1); +handle_call(kick, Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + }) -> + (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + Channel1 = case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, + case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of + true -> + shutdown(kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); + _ -> + shutdown(kicked, ok, Channel1) + end; handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); @@ -1022,7 +1036,7 @@ handle_info({sock_closed, Reason}, Channel = clientinfo = ClientInfo = #{zone := Zone}}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; Shutdown -> Shutdown @@ -1685,9 +1699,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- %% Maybe Publish will msg -mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> case will_delay_interval(WillMsg) of 0 -> ok = publish_will_msg(WillMsg), @@ -1697,10 +1711,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). + maps:get('Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), 0). publish_will_msg(Msg) -> - %% TODO check if we should discard result here _ = emqx_broker:publish(Msg), ok. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 4b06f35a8..86c32be25 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -409,7 +409,7 @@ kick_session(Action, ClientId, ChanPid) -> kick_session(ClientId) -> case lookup_channels(ClientId) of [] -> - ?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]), + ?LOG(warning, "kicked_an_unknown_session ~ts", [ClientId]), ok; ChanPids -> case length(ChanPids) > 1 of diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 17daf9809..48e8b71fb 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -221,8 +221,9 @@ packet(Header, Variable) -> packet(Header, Variable, Payload) -> #mqtt_packet{header = Header, variable = Variable, payload = Payload}. -parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> - {ProtoName, Rest} = parse_utf8_string(FrameBin), +parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, + #{strict_mode := StrictMode}) -> + {ProtoName, Rest} = parse_utf8_string(FrameBin, StrictMode), <> = Rest, % Note: Crash when reserved flag doesn't equal to 0, there is no strict % compliance with the MQTT5.0. @@ -236,8 +237,8 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> KeepAlive : 16/big, Rest2/binary>> = Rest1, - {Properties, Rest3} = parse_properties(Rest2, ProtoVer), - {ClientId, Rest4} = parse_utf8_string(Rest3), + {Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode), + {ClientId, Rest4} = parse_utf8_string(Rest3, StrictMode), ConnPacket = #mqtt_packet_connect{proto_name = ProtoName, proto_ver = ProtoVer, is_bridge = (BridgeTag =:= 8), @@ -249,14 +250,14 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> properties = Properties, clientid = ClientId }, - {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4), - {Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)), - {Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)), + {ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4, StrictMode), + {Username, Rest6} = parse_utf8_string(Rest5, StrictMode, bool(UsernameFlag)), + {Passsword, <<>>} = parse_utf8_string(Rest6, StrictMode, bool(PasswordFlag)), ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword}; -parse_packet(#mqtt_packet_header{type = ?CONNACK}, - <>, #{version := Ver}) -> - {Properties, <<>>} = parse_properties(Rest, Ver), +parse_packet(#mqtt_packet_header{type = ?CONNACK}, <>, + #{strict_mode := StrictMode, version := Ver}) -> + {Properties, <<>>} = parse_properties(Rest, Ver, StrictMode), #mqtt_packet_connack{ack_flags = AckFlags, reason_code = ReasonCode, properties = Properties @@ -264,21 +265,22 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK}, parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, #{strict_mode := StrictMode, version := Ver}) -> - {TopicName, Rest} = parse_utf8_string(Bin), + {TopicName, Rest} = parse_utf8_string(Bin, StrictMode), {PacketId, Rest1} = case QoS of ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) end, (PacketId =/= undefined) andalso StrictMode andalso validate_packet_id(PacketId), - {Properties, Payload} = parse_properties(Rest1, Ver), + {Properties, Payload} = parse_properties(Rest1, Ver, StrictMode), Publish = #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, properties = Properties }, {Publish, Payload}; -parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{strict_mode := StrictMode}) +parse_packet(#mqtt_packet_header{type = PubAck}, <>, + #{strict_mode := StrictMode}) when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> StrictMode andalso validate_packet_id(PacketId), #mqtt_packet_puback{packet_id = PacketId, reason_code = 0}; @@ -287,7 +289,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, < StrictMode andalso validate_packet_id(PacketId), - {Properties, <<>>} = parse_properties(Rest, Ver), + {Properties, <<>>} = parse_properties(Rest, Ver, StrictMode), #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, properties = Properties @@ -296,7 +298,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), TopicFilters = parse_topic_filters(subscribe, Rest1), ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]), #mqtt_packet_subscribe{packet_id = PacketId, @@ -307,7 +309,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), ReasonCodes = parse_reason_codes(Rest1), #mqtt_packet_suback{packet_id = PacketId, properties = Properties, @@ -317,7 +319,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBACK}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), TopicFilters = parse_topic_filters(unsubscribe, Rest1), #mqtt_packet_unsubscribe{packet_id = PacketId, properties = Properties, @@ -332,7 +334,7 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, #{strict_mode := StrictMode, version := Ver}) -> StrictMode andalso validate_packet_id(PacketId), - {Properties, Rest1} = parse_properties(Rest, Ver), + {Properties, Rest1} = parse_properties(Rest, Ver, StrictMode), ReasonCodes = parse_reason_codes(Rest1), #mqtt_packet_unsuback{packet_id = PacketId, properties = Properties, @@ -340,115 +342,119 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, - #{version := ?MQTT_PROTO_V5}) -> - {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), + #{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}) -> + {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode), #mqtt_packet_disconnect{reason_code = ReasonCode, properties = Properties }; parse_packet(#mqtt_packet_header{type = ?AUTH}, <>, - #{version := ?MQTT_PROTO_V5}) -> - {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), + #{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}) -> + {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode), #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}. parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, - proto_ver = Ver}, Bin) -> - {Props, Rest} = parse_properties(Bin, Ver), - {Topic, Rest1} = parse_utf8_string(Rest), + proto_ver = Ver}, + Bin, StrictMode) -> + {Props, Rest} = parse_properties(Bin, Ver, StrictMode), + {Topic, Rest1} = parse_utf8_string(Rest, StrictMode), {Payload, Rest2} = parse_binary_data(Rest1), {Packet#mqtt_packet_connect{will_props = Props, will_topic = Topic, will_payload = Payload }, Rest2}; -parse_will_message(Packet, Bin) -> {Packet, Bin}. +parse_will_message(Packet, Bin, _StrictMode) -> {Packet, Bin}. -compile({inline, [parse_packet_id/1]}). parse_packet_id(<>) -> {PacketId, Rest}. -parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> +parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 -> {#{}, Bin}; %% TODO: version mess? -parse_properties(<<>>, ?MQTT_PROTO_V5) -> +parse_properties(<<>>, ?MQTT_PROTO_V5, _StrictMode) -> {#{}, <<>>}; -parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5) -> +parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5, _StrictMode) -> {#{}, Rest}; -parse_properties(Bin, ?MQTT_PROTO_V5) -> +parse_properties(Bin, ?MQTT_PROTO_V5, StrictMode) -> {Len, Rest} = parse_variable_byte_integer(Bin), <> = Rest, - {parse_property(PropsBin, #{}), Rest1}. + {parse_property(PropsBin, #{}, StrictMode), Rest1}. -parse_property(<<>>, Props) -> +parse_property(<<>>, Props, _StrictMode) -> Props; -parse_property(<<16#01, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}); -parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}); -parse_property(<<16#03, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Content-Type' => Val}); -parse_property(<<16#08, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Response-Topic' => Val}); -parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Correlation-Data' => Val}); -parse_property(<<16#0B, Bin/binary>>, Props) -> +parse_property(<<16#01, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}, StrictMode); +parse_property(<<16#02, Val:32/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}, StrictMode); +parse_property(<<16#03, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Content-Type' => Val}, StrictMode); +parse_property(<<16#08, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Response-Topic' => Val}, StrictMode); +parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Correlation-Data' => Val}, StrictMode); +parse_property(<<16#0B, Bin/binary>>, Props, StrictMode) -> {Val, Rest} = parse_variable_byte_integer(Bin), - parse_property(Rest, Props#{'Subscription-Identifier' => Val}); -parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}); -parse_property(<<16#12, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}); -parse_property(<<16#13, Val:16, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Server-Keep-Alive' => Val}); -parse_property(<<16#15, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Authentication-Method' => Val}); -parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Authentication-Data' => Val}); -parse_property(<<16#17, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Request-Problem-Information' => Val}); -parse_property(<<16#18, Val:32, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Will-Delay-Interval' => Val}); -parse_property(<<16#19, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Request-Response-Information' => Val}); -parse_property(<<16#1A, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Response-Information' => Val}); -parse_property(<<16#1C, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Server-Reference' => Val}); -parse_property(<<16#1F, Bin/binary>>, Props) -> - {Val, Rest} = parse_utf8_string(Bin), - parse_property(Rest, Props#{'Reason-String' => Val}); -parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Receive-Maximum' => Val}); -parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}); -parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Topic-Alias' => Val}); -parse_property(<<16#24, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Maximum-QoS' => Val}); -parse_property(<<16#25, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Retain-Available' => Val}); -parse_property(<<16#26, Bin/binary>>, Props) -> - {Pair, Rest} = parse_utf8_pair(Bin), + parse_property(Rest, Props#{'Subscription-Identifier' => Val}, StrictMode); +parse_property(<<16#11, Val:32/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}, StrictMode); +parse_property(<<16#12, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}, StrictMode); +parse_property(<<16#13, Val:16, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Server-Keep-Alive' => Val}, StrictMode); +parse_property(<<16#15, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Authentication-Method' => Val}, StrictMode); +parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Authentication-Data' => Val}, StrictMode); +parse_property(<<16#17, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Request-Problem-Information' => Val}, StrictMode); +parse_property(<<16#18, Val:32, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Will-Delay-Interval' => Val}, StrictMode); +parse_property(<<16#19, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Request-Response-Information' => Val}, StrictMode); +parse_property(<<16#1A, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Response-Information' => Val}, StrictMode); +parse_property(<<16#1C, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Server-Reference' => Val}, StrictMode); +parse_property(<<16#1F, Bin/binary>>, Props, StrictMode) -> + {Val, Rest} = parse_utf8_string(Bin, StrictMode), + parse_property(Rest, Props#{'Reason-String' => Val}, StrictMode); +parse_property(<<16#21, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Receive-Maximum' => Val}, StrictMode); +parse_property(<<16#22, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}, StrictMode); +parse_property(<<16#23, Val:16/big, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Topic-Alias' => Val}, StrictMode); +parse_property(<<16#24, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Maximum-QoS' => Val}, StrictMode); +parse_property(<<16#25, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Retain-Available' => Val}, StrictMode); +parse_property(<<16#26, Bin/binary>>, Props, StrictMode) -> + {Pair, Rest} = parse_utf8_pair(Bin, StrictMode), case maps:find('User-Property', Props) of {ok, UserProps} -> UserProps1 = lists:append(UserProps, [Pair]), - parse_property(Rest, Props#{'User-Property' := UserProps1}); + parse_property(Rest, Props#{'User-Property' := UserProps1}, StrictMode); error -> - parse_property(Rest, Props#{'User-Property' => [Pair]}) + parse_property(Rest, Props#{'User-Property' => [Pair]}, StrictMode) end; -parse_property(<<16#27, Val:32, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}); -parse_property(<<16#28, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}); -parse_property(<<16#29, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}); -parse_property(<<16#2A, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}). +parse_property(<<16#27, Val:32, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}, StrictMode); +parse_property(<<16#28, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}, StrictMode); +parse_property(<<16#29, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}, StrictMode); +parse_property(<<16#2A, Val, Bin/binary>>, Props, StrictMode) -> + parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}, StrictMode); +parse_property(<>, _Props, _StrictMode) -> + error(#{invalid_property_code => Property}). +%% TODO: invalid property in specific packet. parse_variable_byte_integer(Bin) -> parse_variable_byte_integer(Bin, 1, 0). @@ -470,20 +476,53 @@ parse_topic_filters(unsubscribe, Bin) -> parse_reason_codes(Bin) -> [Code || <> <= Bin]. -parse_utf8_pair(<>) -> - {{Key, Val}, Rest}. +%%-------------------- +%% parse utf8 pair +parse_utf8_pair( <> + , true) -> + {{validate_utf8(Key), validate_utf8(Val)}, Rest}; +parse_utf8_pair( <> + , false) -> + {{Key, Val}, Rest}; +parse_utf8_pair(<>, _StrictMode) + when LenK > byte_size(Rest) -> + error(user_property_not_enough_bytes); +parse_utf8_pair(<>, _StrictMode) + when LenV > byte_size(Rest) -> + error(malformed_user_property_value); +parse_utf8_pair(Bin, _StrictMode) + when 4 > byte_size(Bin) -> + error(user_property_not_enough_bytes). -parse_utf8_string(Bin, false) -> +%%-------------------- +%% parse utf8 string +parse_utf8_string(Bin, _StrictMode, false) -> {undefined, Bin}; -parse_utf8_string(Bin, true) -> - parse_utf8_string(Bin). +parse_utf8_string(Bin, StrictMode, true) -> + parse_utf8_string(Bin, StrictMode). -parse_utf8_string(<>) -> - {Str, Rest}. +parse_utf8_string(<>, true) -> + {validate_utf8(Str), Rest}; +parse_utf8_string(<>, false) -> + {Str, Rest}; +parse_utf8_string(<>, _) + when Len > byte_size(Rest) -> + error(malformed_utf8_string); +parse_utf8_string(Bin, _) + when 2 > byte_size(Bin) -> + error(malformed_utf8_string_length). parse_binary_data(<>) -> - {Data, Rest}. + {Data, Rest}; +parse_binary_data(<>) + when Len > byte_size(Rest) -> + error(malformed_binary_data); +parse_binary_data(Bin) + when 2 > byte_size(Bin) -> + error(malformed_binary_data_length). %%-------------------------------------------------------------------- %% Serialize MQTT Packet @@ -821,3 +860,52 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. + +validate_utf8(Bin) -> + case unicode:characters_to_binary(Bin) of + {error, _, _} -> + error(utf8_string_invalid); + {incomplete, _, _} -> + error(utf8_string_invalid); + Bin when is_binary(Bin) -> + case validate_mqtt_utf8_char(Bin) of + true -> Bin; + false -> error(utf8_string_invalid) + end + end. + +%% Is the utf8 string respecting UTF-8 characters defined by MQTT Spec? +%% i.e. contains invalid UTF-8 char or control char +validate_mqtt_utf8_char(<<>>) -> + true; +%% ==== 1-Byte UTF-8 invalid: [[U+0000 .. U+001F] && [U+007F]] +validate_mqtt_utf8_char(<>) + when B1 >= 16#20, B1 =< 16#7E -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<>) + when B1 >= 16#00, B1 =< 16#1F; + B1 =:= 16#7F -> + %% [U+0000 .. U+001F] && [U+007F] + false; +%% ==== 2-Bytes UTF-8 invalid: [U+0080 .. U+009F] +validate_mqtt_utf8_char(<>) + when B1 =:= 16#C2; + B2 >= 16#A0, B2 =< 16#BF; + B1 > 16#C3, B1 =< 16#DE; + B2 >= 16#80, B2 =< 16#BF -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<<16#C2, B2, _Bs/binary>>) + when B2 >= 16#80, B2 =< 16#9F -> + %% [U+0080 .. U+009F] + false; +%% ==== 3-Bytes UTF-8 invalid: [U+D800 .. U+DFFF] +validate_mqtt_utf8_char(<>) + when B1 >= 16#E0, B1 =< 16#EE; + B1 =:= 16#EF -> + validate_mqtt_utf8_char(Bs); +validate_mqtt_utf8_char(<<16#ED, _B2, _B3, _Bs/binary>>) -> + false; +%% ==== 4-Bytes UTF-8 +validate_mqtt_utf8_char(<>) + when B1 =:= 16#0F -> + validate_mqtt_utf8_char(Bs). diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 01495460f..eb38d33b8 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -54,6 +54,26 @@ , hexstr2bin/1 ]). +-export([ is_sane_id/1 + ]). + +-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). + +-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}. +is_sane_id(Str) -> + StrLen = len(Str), + case StrLen > 0 andalso StrLen =< 256 of + true -> + case re:run(Str, ?VALID_STR_RE) of + nomatch -> {error, <<"required: " ?VALID_STR_RE>>}; + _ -> ok + end; + false -> {error, <<"0 < Length =< 256">>} + end. + +len(Bin) when is_binary(Bin) -> byte_size(Bin); +len(Str) when is_list(Str) -> length(Str). + -define(OOM_FACTOR, 1.25). %% @doc Parse v4 or v6 string format address to tuple. @@ -304,4 +324,30 @@ hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10. ipv6_probe_test() -> ?assertEqual([{ipv6_probe, true}], ipv6_probe([])). +is_sane_id_test() -> + ?assertMatch({error, _}, is_sane_id("")), + ?assertMatch({error, _}, is_sane_id("_")), + ?assertMatch({error, _}, is_sane_id("_aaa")), + ?assertMatch({error, _}, is_sane_id("lkad/oddl")), + ?assertMatch({error, _}, is_sane_id("lkad*oddl")), + ?assertMatch({error, _}, is_sane_id("script>lkadoddl")), + ?assertMatch({error, _}, is_sane_id("