diff --git a/.github/workflows/run_automate_tests.yaml b/.github/workflows/run_automate_tests.yaml index 215d983a7..a9c4b1edf 100644 --- a/.github/workflows/run_automate_tests.yaml +++ b/.github/workflows/run_automate_tests.yaml @@ -19,7 +19,7 @@ jobs: id: dload_jmeter timeout-minutes: 1 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | wget --no-verbose --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org/jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz - uses: actions/upload-artifact@v2 @@ -117,7 +117,7 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties @@ -208,7 +208,7 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties @@ -310,7 +310,7 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties @@ -406,7 +406,7 @@ jobs: - name: install jmeter timeout-minutes: 10 env: - JMETER_VERSION: 5.3 + JMETER_VERSION: 5.4.3 run: | cd /tmp && tar -xvf apache-jmeter.tgz echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 11debe5e7..146f29e23 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -22,12 +22,18 @@ File format: * CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache, to force an immediate reload of all certificates after the files are updated on disk. * Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983] -* Force shutdown of processe that cannot answer takeover event [#7026] +* Force shutdown of processes that cannot answer takeover event [#7026] * `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter) * Add UTF-8 string validity check in `strict_mode` for MQTT packet. When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261] * Changed systemd service restart delay from 10 seconds to 60 seconds. +* MQTT-SN gateway supports initiative to synchronize registered topics after session resumed. [#7300] * Add load control app for future development. +* Change the precision of float to 17 digits after the decimal point when formatting a + float using payload templates of rule actions. The old precision is 10 digits before + this change. [#7336] +* Return the cached resource status when querying a resource using HTTP APIs. + This is to avoid blocking the HTTP request if the resource is unavailable. [#7374] ### Bug fixes @@ -43,6 +49,10 @@ File format: * Fix false alert level log “cannot_find_plugins” caused by duplicate plugin names in `loaded_plugins` files. * Prompt user how to change the dashboard's initial default password when emqx start. * Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package) +* Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$` +* Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`. +* Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309] +* Auto subscribe to an empty topic will be simply ignored now ## v4.3.12 ### Important changes diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index fdf2dec81..5aa48cea7 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -1,6 +1,5 @@ %% -*-: erlang -*- - -{"4.3.4", +{VSN, [ {"4.3.3", [ {load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []} diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index e9f270397..a25d6cae1 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -6,15 +6,19 @@ [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[3-5]">>, + {<<"4\\.3\\.[3-4]">>, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, - {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}], + {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, + {"4.3.5", + [{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}], [{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_lwm2m}]}, {"4.3.2", [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[3-5]">>, + {<<"4\\.3\\.[3-4]">>, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, - {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}. + {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, + {"4.3.5", + [{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}. diff --git a/apps/emqx_management/include/emqx_mgmt.hrl b/apps/emqx_management/include/emqx_mgmt.hrl index 6d510ed0c..3b54e313f 100644 --- a/apps/emqx_management/include/emqx_mgmt.hrl +++ b/apps/emqx_management/include/emqx_mgmt.hrl @@ -31,5 +31,6 @@ -define(ERROR13, 113). %% User already exist -define(ERROR14, 114). %% OldPassword error -define(ERROR15, 115). %% bad topic +-define(ERROR16, 116). %% bad QoS -define(VERSIONS, ["4.0", "4.1", "4.2", "4.3"]). \ No newline at end of file diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index 64f44637a..bee65781a 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -1,6 +1,6 @@ {application, emqx_management, [{description, "EMQ X Management API and CLI"}, - {vsn, "4.3.12"}, % strict semver, bump manually! + {vsn, "4.3.11"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_management_sup]}, {applications, [kernel,stdlib,minirest]}, diff --git a/apps/emqx_management/src/emqx_mgmt_api_apps.erl b/apps/emqx_management/src/emqx_mgmt_api_apps.erl index cca0b41f0..abcfe1961 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_apps.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_apps.erl @@ -68,7 +68,7 @@ add_app(_Bindings, Params) -> end. del_app(#{appid := AppId}, _Params) -> - case emqx_mgmt_auth:del_app(AppId) of + case emqx_mgmt_auth:del_app(emqx_mgmt_util:urldecode(AppId)) of ok -> minirest:return(); {error, Reason} -> minirest:return({error, Reason}) end. @@ -77,7 +77,7 @@ list_apps(_Bindings, _Params) -> minirest:return({ok, [format(Apps)|| Apps <- emqx_mgmt_auth:list_apps()]}). lookup_app(#{appid := AppId}, _Params) -> - case emqx_mgmt_auth:lookup_app(AppId) of + case emqx_mgmt_auth:lookup_app(emqx_mgmt_util:urldecode(AppId)) of {AppId, AppSecret, Name, Desc, Status, Expired} -> minirest:return({ok, #{app_id => AppId, secret => AppSecret, @@ -94,7 +94,7 @@ update_app(#{appid := AppId}, Params) -> Desc = proplists:get_value(<<"desc">>, Params), Status = proplists:get_value(<<"status">>, Params), Expired = proplists:get_value(<<"expired">>, Params), - case emqx_mgmt_auth:update_app(AppId, Name, Desc, Status, Expired) of + case emqx_mgmt_auth:update_app(emqx_mgmt_util:urldecode(AppId), Name, Desc, Status, Expired) of ok -> minirest:return(); {error, Reason} -> minirest:return({error, Reason}) end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl index 53ca022bb..31b6d36f1 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_pubsub.erl @@ -151,6 +151,8 @@ do_subscribe(ClientId, _Topics, _QoS) when not is_binary(ClientId) -> {ok, ?ERROR8, <<"bad clientid: must be string">>}; do_subscribe(_ClientId, [], _QoS) -> {ok, ?ERROR15, bad_topic}; +do_subscribe(_ClientId, _Topic, QoS) when QoS =/= 0 andalso QoS =/= 1 andalso QoS =/= 2 -> + {ok, ?ERROR16, bad_qos}; do_subscribe(ClientId, Topics, QoS) -> TopicTable = parse_topic_filters(Topics, QoS), case emqx_mgmt:subscribe(ClientId, TopicTable) of diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index c05cbf581..5f2b96dff 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -16,6 +16,8 @@ -module(emqx_mgmt_auth). +-include_lib("emqx/include/logger.hrl"). + %% Mnesia Bootstrap -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). @@ -89,36 +91,45 @@ add_app(AppId, Name, Desc, Status, Expired) when is_binary(AppId) -> -> {ok, appsecret()} | {error, term()}). add_app(AppId, Name, Secret, Desc, Status, Expired) when is_binary(AppId) -> - Secret1 = generate_appsecret_if_need(Secret), - App = #mqtt_app{id = AppId, - secret = Secret1, - name = Name, - desc = Desc, - status = Status, - expired = Expired}, - AddFun = fun() -> - case mnesia:wread({mqtt_app, AppId}) of - [] -> mnesia:write(App); - _ -> mnesia:abort(alread_existed) - end - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> {ok, Secret1}; - {aborted, Reason} -> {error, Reason} + case emqx_misc:is_sane_id(AppId) of + ok -> + Secret1 = generate_appsecret_if_need(Secret), + App = #mqtt_app{id = AppId, + secret = Secret1, + name = Name, + desc = Desc, + status = Status, + expired = Expired}, + AddFun = fun() -> + case mnesia:wread({mqtt_app, AppId}) of + [] -> mnesia:write(App); + _ -> mnesia:abort(already_existed) + end + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> {ok, Secret1}; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. force_add_app(AppId, Name, Secret, Desc, Status, Expired) -> - AddFun = fun() -> - mnesia:write(#mqtt_app{id = AppId, - secret = Secret, - name = Name, - desc = Desc, - status = Status, - expired = Expired}) - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} + case emqx_misc:is_sane_id(AppId) of + ok -> + AddFun = fun() -> + mnesia:write(#mqtt_app{ + id = AppId, + secret = Secret, + name = Name, + desc = Desc, + status = Status, + expired = Expired}) + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. -spec(generate_appsecret_if_need(binary() | undefined) -> binary()). @@ -207,4 +218,3 @@ is_authorized(AppId, AppSecret) -> is_expired(undefined) -> true; is_expired(Expired) -> Expired >= erlang:system_time(second). - diff --git a/apps/emqx_management/src/emqx_mgmt_http.erl b/apps/emqx_management/src/emqx_mgmt_http.erl index 8e92b7371..ced7d10b2 100644 --- a/apps/emqx_management/src/emqx_mgmt_http.erl +++ b/apps/emqx_management/src/emqx_mgmt_http.erl @@ -118,9 +118,10 @@ handle_request(_Method, _Path, Req) -> cowboy_req:reply(400, #{<<"content-type">> => <<"text/plain">>}, <<"Not found.">>, Req). authorize_appid(Req) -> - case cowboy_req:parse_header(<<"authorization">>, Req) of - {basic, AppId, AppSecret} -> emqx_mgmt_auth:is_authorized(AppId, AppSecret); - _ -> false + try + {basic, AppId, AppSecret} = cowboy_req:parse_header(<<"authorization">>, Req), + emqx_mgmt_auth:is_authorized(AppId, AppSecret) + catch _:_ -> false end. -ifdef(EMQX_ENTERPRISE). diff --git a/apps/emqx_management/src/emqx_mgmt_sup.erl b/apps/emqx_management/src/emqx_mgmt_sup.erl index f3f5545f2..8ad0bc963 100644 --- a/apps/emqx_management/src/emqx_mgmt_sup.erl +++ b/apps/emqx_management/src/emqx_mgmt_sup.erl @@ -27,4 +27,3 @@ start_link() -> init([]) -> {ok, {{one_for_one, 1, 5}, []}}. - diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 77b41dd4a..0ed0fee3a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,29 +1,43 @@ %% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.7",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.7", + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{update,emqx_rule_metrics,{advanced,["4.3.5"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.5"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{update,emqx_rule_metrics,{advanced,["4.3.4"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.4"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{update,emqx_rule_metrics,{advanced,["4.3.3"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.3"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -31,7 +45,9 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{update,emqx_rule_metrics,{advanced,["4.3.2"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.2"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, @@ -40,7 +56,9 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{update,emqx_rule_metrics,{advanced,["4.3.1"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.1"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -49,7 +67,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{update,emqx_rule_metrics,{advanced,["4.3.0"]}}, + [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.0"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -59,30 +78,43 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.7",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.7", + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{update,emqx_rule_metrics,{advanced,["4.3.5"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.5"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{update,emqx_rule_metrics,{advanced,["4.3.4"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.4"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{update,emqx_rule_metrics,{advanced,["4.3.3"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.3"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -90,7 +122,9 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{update,emqx_rule_metrics,{advanced,["4.3.2"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.2"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, @@ -99,7 +133,9 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{update,emqx_rule_metrics,{advanced,["4.3.1"]}}, + [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.1"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, @@ -108,7 +144,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{update,emqx_rule_metrics,{advanced,["4.3.0"]}}, + [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {update,emqx_rule_metrics,{advanced,["4.3.0"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 2eed25647..c8e69a17f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -334,14 +334,11 @@ test_resource(#{type := Type, config := Config0}) -> -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). get_resource_status(ResId) -> - case emqx_rule_registry:find_resource(ResId) of - {ok, #resource{type = ResType}} -> - {ok, #resource_type{on_status = {Mod, OnStatus}}} - = emqx_rule_registry:find_resource_type(ResType), - Status = fetch_resource_status(Mod, OnStatus, ResId), + case emqx_rule_registry:find_resource_params(ResId) of + {ok, #resource_params{status = Status}} -> {ok, Status}; not_found -> - {error, {resource_not_found, ResId}} + {error, resource_not_initialized} end. -spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 80de9be45..39ac1e9c2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -295,13 +295,8 @@ do_create_resource(Create, ParsedParams) -> list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), - Data = lists:map(fun(Res = #{id := Id}) -> - Status = lists:all(fun(Node) -> - case rpc:call(Node, emqx_rule_registry, find_resource_params, [Id]) of - {ok, #resource_params{status = #{is_alive := true}}} -> true; - _ -> false - end - end, ekka_mnesia:running_nodes()), + Data = lists:map(fun(Res = #{id := ResId}) -> + Status = get_aggregated_status(ResId), maps:put(status, Status, Res) end, Data0), return({ok, Data}). @@ -309,12 +304,23 @@ list_resources(#{}, _Params) -> list_resources_by_type(#{type := Type}, _Params) -> return_all(emqx_rule_registry:get_resources_by_type(Type)). +get_aggregated_status(ResId) -> + lists:all(fun(Node) -> + case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of + {ok, #{is_alive := true}} -> true; + _ -> false + end + end, ekka_mnesia:running_nodes()). + show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} -> Status = [begin - {ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]), + St = case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of + {ok, St0} -> St0; + {error, _} -> #{is_alive => false} + end, maps:put(node, Node, St) end || Node <- ekka_mnesia:running_nodes()], return({ok, maps:put(status, Status, record_to_map(R))}); @@ -326,8 +332,8 @@ get_resource_status(#{id := Id}, _Params) -> case emqx_rule_engine:get_resource_status(Id) of {ok, Status} -> return({ok, Status}); - {error, {resource_not_found, ResId}} -> - return({error, 400, ?ERR_NO_RESOURCE(ResId)}) + {error, resource_not_initialized} -> + return({error, 400, ?ERR_NO_RESOURCE(Id)}) end. start_resource(#{id := Id}, _Params) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index a96ee7a62..a9897437e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -169,6 +169,16 @@ , sha256/1 ]). +%% gzip Funcs +-export([ gzip/1 + , gunzip/1 + ]). + +%% zip Funcs +-export([ zip/1 + , unzip/1 + ]). + %% Data encode and decode -export([ base64_encode/1 , base64_decode/1 @@ -784,6 +794,26 @@ sha256(S) when is_binary(S) -> hash(Type, Data) -> emqx_misc:bin2hexstr_a_f(crypto:hash(Type, Data)). +%%------------------------------------------------------------------------------ +%% gzip Funcs +%%------------------------------------------------------------------------------ + +gzip(S) when is_binary(S) -> + zlib:gzip(S). + +gunzip(S) when is_binary(S) -> + zlib:gunzip(S). + +%%------------------------------------------------------------------------------ +%% zip Funcs +%%------------------------------------------------------------------------------ + +zip(S) when is_binary(S) -> + zlib:zip(S). + +unzip(S) when is_binary(S) -> + zlib:unzip(S). + %%------------------------------------------------------------------------------ %% Data encode and decode Funcs %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 3791b1386..67436ea5e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -60,8 +60,8 @@ ]}). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). - -define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF +-define(FLOAT_PRECISION, 17). -type(uri_string() :: iodata()). @@ -336,12 +336,12 @@ bool(Bool) -> error({invalid_boolean, Bool}). number_to_binary(Int) when is_integer(Int) -> integer_to_binary(Int); number_to_binary(Float) when is_float(Float) -> - float_to_binary(Float, [{decimals, 10}, compact]). + float_to_binary(Float, [{decimals, ?FLOAT_PRECISION}, compact]). number_to_list(Int) when is_integer(Int) -> integer_to_list(Int); number_to_list(Float) when is_float(Float) -> - float_to_list(Float, [{decimals, 10}, compact]). + float_to_list(Float, [{decimals, ?FLOAT_PRECISION}, compact]). parse_nested(Attr) -> case string:split(Attr, <<".">>, all) of diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 32421df32..350a57051 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -609,6 +609,29 @@ prop_hash_fun() -> (64 == byte_size(apply_func(sha256, [S]))) end). + +%%------------------------------------------------------------------------------ +%% Test cases for gzip funcs +%%------------------------------------------------------------------------------ + +t_gzip_funcs(_) -> + ?PROPTEST(prop_gzip_fun). + +prop_gzip_fun() -> + ?FORALL(S, binary(), + S == apply_func(gunzip, [apply_func(gzip, [S])])). + +%%------------------------------------------------------------------------------ +%% Test cases for zip funcs +%%------------------------------------------------------------------------------ + +t_zip_funcs(_) -> + ?PROPTEST(prop_zip_fun). + +prop_zip_fun() -> + ?FORALL(S, binary(), + S == apply_func(unzip, [apply_func(zip, [S])])). + %%------------------------------------------------------------------------------ %% Test cases for base64 %%------------------------------------------------------------------------------ @@ -822,4 +845,3 @@ all() -> suite() -> [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. - diff --git a/apps/emqx_sn/etc/emqx_sn.conf b/apps/emqx_sn/etc/emqx_sn.conf index 6572812c1..655ef4028 100644 --- a/apps/emqx_sn/etc/emqx_sn.conf +++ b/apps/emqx_sn/etc/emqx_sn.conf @@ -51,3 +51,10 @@ mqtt.sn.username = mqtt_sn_user ## ## Value: String mqtt.sn.password = abc + +## Whether to initiate all subscribed topic registration messages to the +## client after the Session has been taken over by a new channel. +## +## Value: Boolean +## Default: false +#mqtt.sn.subs_resume = false diff --git a/apps/emqx_sn/priv/emqx_sn.schema b/apps/emqx_sn/priv/emqx_sn.schema index a585c1037..bd0995c77 100644 --- a/apps/emqx_sn/priv/emqx_sn.schema +++ b/apps/emqx_sn/priv/emqx_sn.schema @@ -56,6 +56,11 @@ end}. {datatype, string} ]}. +{mapping, "mqtt.sn.subs_resume", "emqx_sn.subs_resume", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx_sn.username", fun(Conf) -> Username = cuttlefish:conf_get("mqtt.sn.username", Conf), list_to_binary(Username) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 6a4eb66d1..269605afa 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,26 +1,52 @@ %% -*- mode: erlang -*- {VSN, [ - {<<"4\\.3\\.[4-5]">>,[ + {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, - {<<"4.3.[2-3]">>,[ + {"4.3.4",[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.4"]}} + ]}, + {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.3"]}} + ]}, + {"4.3.2",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ - {<<"4\\.3\\.[4-5]">>,[ + {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, - {<<"4.3.[2-3]">>,[ + {"4.3.4",[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.4"]}} + ]}, + {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.3"]}} + ]}, + {"4.3.2",[ + {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, + {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ]}. diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_sn/src/emqx_sn_app.erl index 9575523f8..e7c86d5b7 100644 --- a/apps/emqx_sn/src/emqx_sn_app.erl +++ b/apps/emqx_sn/src/emqx_sn_app.erl @@ -122,12 +122,14 @@ listeners_confs() -> EnableQos3 = application:get_env(emqx_sn, enable_qos3, false), EnableStats = application:get_env(emqx_sn, enable_stats, false), IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000), + SubsResume = application:get_env(emqx_sn, subs_resume, false), [{udp, ListenOn, [{gateway_id, GwId}, {username, Username}, {password, Password}, {enable_qos3, EnableQos3}, {enable_stats, EnableStats}, {idle_timeout, IdleTimeout}, + {subs_resume, SubsResume}, {max_connections, 1024000}, {max_conn_rate, 1000}, {udp_options, []}]}]. diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_sn/src/emqx_sn_frame.erl index 28a20956e..7ec18dd4a 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_sn/src/emqx_sn_frame.erl @@ -339,9 +339,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> format(?SN_PINGREQ_MSG(ClientId)) -> io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); format(?SN_PINGRESP_MSG()) -> - "SN_PINGREQ()"; + "SN_PINGRESP()"; format(?SN_DISCONNECT_MSG(Duration)) -> - io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); + io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]); format(#mqtt_sn_message{type = Type, variable = Var}) -> io_lib:format("mqtt_sn_message(type=~s, Var=~w)", diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 265607229..80a1339c0 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -48,6 +48,7 @@ , wait_for_will_topic/3 , wait_for_will_msg/3 , connected/3 + , registering/3 , asleep/3 , awake/3 ]). @@ -96,7 +97,10 @@ has_pending_pingresp = false :: boolean(), %% Store all qos0 messages for waiting REGACK %% Note: QoS1/QoS2 messages will kept inflight queue - pending_topic_ids = #{} :: pending_msgs() + pending_topic_ids = #{} :: pending_msgs(), + subs_resume = false, + waiting_sync_topics = [], + previous_outgoings_and_state = undefined }). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). @@ -126,6 +130,9 @@ Reason =:= asleep_timeout; Reason =:= keepalive_timeout). +-define(RETRY_TIMEOUT, 5000). +-define(MAX_RETRY_TIMES, 3). + %%-------------------------------------------------------------------- %% Exported APIs %%-------------------------------------------------------------------- @@ -152,6 +159,7 @@ init([{_, SockPid, Sock}, Peername, Options]) -> Password = proplists:get_value(password, Options, undefined), EnableQos3 = proplists:get_value(enable_qos3, Options, false), IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), + SubsResume = proplists:get_value(subs_resume, Options, false), EnableStats = proplists:get_value(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> @@ -168,7 +176,8 @@ init([{_, SockPid, Sock}, Peername, Options]) -> asleep_timer = emqx_sn_asleep_timer:init(), enable_stats = EnableStats, enable_qos3 = EnableQos3, - idle_timeout = IdleTimeout + idle_timeout = IdleTimeout, + subs_resume = SubsResume }, emqx_logger:set_metadata_peername(esockd:format(Peername)), {ok, idle, State, [IdleTimeout]}; @@ -379,6 +388,13 @@ connected(cast, {outgoing, Packet}, State) -> connected(cast, {connack, ConnAck}, State) -> {keep_state, handle_outgoing(ConnAck, State)}; +connected(cast, {register, TopicNames, BlockedOutgoins}, State) -> + NState = State#state{ + waiting_sync_topics = TopicNames, + previous_outgoings_and_state = {BlockedOutgoins, ?FUNCTION_NAME} + }, + {next_state, registering, NState, [next_event(shooting)]}; + connected(cast, {shutdown, Reason, Packet}, State) -> stop(Reason, handle_outgoing(Packet, State)); @@ -392,6 +408,80 @@ connected(cast, {close, Reason}, State) -> connected(EventType, EventContent, State) -> handle_event(EventType, EventContent, connected, State). +registering(cast, shooting, + State = #state{ + channel = Channel, + waiting_sync_topics = [], + previous_outgoings_and_state = {Outgoings, StateName}}) -> + Session = emqx_channel:get_session(Channel), + ClientInfo = emqx_channel:info(clientinfo, Channel), + {Outgoings2, NChannel} = + case emqx_session:dequeue(ClientInfo, Session) of + {ok, NSession} -> + {[], emqx_channel:set_session(NSession, Channel)}; + {ok, Pubs, NSession} -> + emqx_channel:do_deliver( + Pubs, + emqx_channel:set_session(NSession, Channel) + ) + end, + NState = State#state{ + channel = NChannel, + previous_outgoings_and_state = undefined}, + {next_state, StateName, NState, outgoing_events(Outgoings ++ Outgoings2)}; + +registering(cast, shooting, + State = #state{ + clientid = ClientId, + waiting_sync_topics = [TopicName | Remainings]}) -> + TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid ? + State#state{waiting_sync_topics = [{TopicId, TopicName, 0} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(debug, "Register topic name ~s with id ~w successfully!", [TopicName, TopicId]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)}, + State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) -> + ?LOG(error, "client does not accept register TopicName=~s, TopicId=~p, MsgId=~p, ReturnCode=~p", + [TopicName, TopicId, MsgId, ReturnCode]), + {keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]}; + +registering(cast, {incoming, Packet}, + State = #state{previous_outgoings_and_state = {_, StateName}}) + when is_record(Packet, mqtt_sn_message) -> + apply(?MODULE, StateName, [cast, {incoming, Packet}, State]); + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, Times} | Remainings]}) + when Times < ?MAX_RETRY_TIMES -> + ?LOG(warning, "Waiting REGACK timeout for TopicName=~s, TopicId=~w, try it again(~w)", + [TopicName, TopicId, Times+1]), + NState = send_register( + TopicName, + TopicId, + 16#FFFF, %% FIXME: msgid? + State#state{waiting_sync_topics = [{TopicId, TopicName, Times + 1} | Remainings]} + ), + {keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}}; + +registering({timeout, wait_regack}, _, + State = #state{waiting_sync_topics = [{TopicId, TopicName, ?MAX_RETRY_TIMES} | _]}) -> + ?LOG(error, "Retry register TopicName=~s, TopicId=~w reached the max retry times", + [TopicId, TopicName]), + NState = send_message(?SN_DISCONNECT_MSG(undefined), State), + stop(reached_max_retry_times, NState); + +registering(EventType, EventContent, State) -> + handle_event(EventType, EventContent, ?FUNCTION_NAME, State). + asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> State0 = send_message(?SN_DISCONNECT_MSG(undefined), State), case Duration of @@ -519,10 +609,13 @@ handle_event(info, {datagram, SockPid, Data}, StateName, stop(frame_error, State) end; -handle_event(info, {deliver, _Topic, Msg}, asleep, - State = #state{channel = Channel}) -> +handle_event(info, {deliver, _Topic, Msg}, StateName, + State = #state{channel = Channel}) + when StateName == alseep; + StateName == registering -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), + ?LOG(debug, "enqueue downlink message in ~s state, msg: ~0p", + [StateName, Msg]), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -593,8 +686,31 @@ terminate(Reason, _StateName, #state{channel = Channel}) -> emqx_channel:terminate(Reason, Channel), ok. -code_change(_Vsn, StateName, State, _Extra) -> - {ok, StateName, State}. +%% in the emqx_sn:v4.3.6, we have added two new fields in the state last: +%% - waiting_sync_topics +%% - previous_outgoings_and_state +code_change({down, _Vsn}, StateName, State, [ToVsn]) -> + case re:run(ToVsn, "4\\.3\\.[2-5]") of + {match, _} -> + NState0 = lists:droplast( + lists:droplast( + lists:droplast(tuple_to_list(State)))), + NState = list_to_tuple(NState0), + {ok, StateName, NState}; + _ -> + {ok, StateName, State} + end; + +code_change(_Vsn, StateName, State, [FromVsn]) -> + case re:run(FromVsn, "4\\.3\\.[2-5]") of + {match, _} -> + NState = list_to_tuple( + tuple_to_list(State) ++ [false, [], undefined] + ), + {ok, StateName, NState}; + _ -> + {ok, StateName, State} + end. %%-------------------------------------------------------------------- %% Handle Call/Info @@ -643,6 +759,9 @@ outgoing_event(Packet) when is_record(Packet, mqtt_packet); outgoing_event(Action) -> next_event(Action). +next_event(Content) -> + {next_event, cast, Content}. + close_socket(State = #state{sockstate = closed}) -> State; close_socket(State = #state{socket = _Socket}) -> %ok = gen_udp:close(Socket), @@ -1058,6 +1177,38 @@ handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake, Result = channel_handle_in(Packet, State), handle_return(Result, State, [try_goto_asleep]); +handle_incoming( + #mqtt_packet{ + variable = #mqtt_packet_connect{ + clean_start = false} + } = Packet, + _, + State = #state{subs_resume = SubsResume}) -> + Result = channel_handle_in(Packet, State), + case {SubsResume, Result} of + {true, {ok, Replies, NChannel}} -> + case maps:get( + subscriptions, + emqx_channel:info(session, NChannel) + ) of + Subs when map_size(Subs) == 0 -> + handle_return(Result, State); + Subs -> + TopicNames = lists:filter( + fun(T) -> not emqx_topic:wildcard(T) + end, maps:keys(Subs)), + {ConnackEvents, Outgoings} = split_connack_replies( + Replies), + Events = outgoing_events( + ConnackEvents ++ + [{register, TopicNames, Outgoings}] + ), + {keep_state, State#state{channel = NChannel}, Events} + end; + _ -> + handle_return(Result, State) + end; + handle_incoming(Packet, _StName, State) -> Result = channel_handle_in(Packet, State), handle_return(Result, State). @@ -1167,9 +1318,6 @@ inc_outgoing_stats(Type) -> false -> ok end. -next_event(Content) -> - {next_event, cast, Content}. - inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. @@ -1183,3 +1331,8 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) -> State; maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). + +%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}] +split_connack_replies([A = {event, connected}, + B = {connack, _ConnAck} | Outgoings]) -> + {[A, B], Outgoings}. diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 9ff519fb5..cdecc06bb 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -79,6 +79,12 @@ set_special_confs(emqx_sn) -> set_special_confs(_App) -> ok. +restart_emqx_sn(#{subs_resume := Bool}) -> + application:set_env(emqx_sn, subs_resume, Bool), + _ = application:stop(emqx_sn), + _ = application:ensure_all_started(emqx_sn), + ok. + %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- @@ -862,108 +868,6 @@ t_delivery_qos1_register_invalid_topic_id(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). -t_delivery_takeover_and_re_register(_) -> - MsgId = 1, - {ok, Socket} = gen_udp:open(0, [binary]), - send_connect_msg(Socket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), - - send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), - <<_, ?SN_SUBACK, 2#00100000, - TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), - <<_, ?SN_SUBACK, 2#01000000, - TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), - - _ = emqx:publish( - emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), - _ = emqx:publish( - emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), - send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), - - send_disconnect_msg(Socket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), - gen_udp:close(Socket), - - %% offline messages will be queued into the MQTT-SN session - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), - _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), - - {ok, NSocket} = gen_udp:open(0, [binary]), - send_connect_msg(NSocket, <<"test">>, 0), - ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, - receive_response(NSocket)), - - %% qos1 - - %% received the resume messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdA, RegMsgIdA), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#00100000, - TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), - - %% qos2 - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), - %% only one qos1/qos2 inflight - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), - %% recv register - <<_, ?SN_REGISTER, - TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), - send_regack_msg(NSocket, TopicIdB, RegMsgIdB), - %% received the replay messages - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), - send_pubrec_msg(NSocket, MsgIdB1), - <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), - send_pubcomp_msg(NSocket, MsgIdB1), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), - - <<_, ?SN_PUBLISH, 2#01000000, - TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), - send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), - - %% no more messages - ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - - send_disconnect_msg(NSocket, undefined), - ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), - gen_udp:close(NSocket). - t_will_case01(_) -> QoS = 1, Duration = 1, @@ -1725,6 +1629,324 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_register_subs_resume_on(_) -> + restart_emqx_sn(#{subs_resume => true}), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdA2), + <<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdA2), + + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdB:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB2), + <<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB2), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). + +t_register_subs_resume_off(_) -> + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#00100000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% qos1 + + %% received the resume messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), + + %% qos2 + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB1), + <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB1), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1). + +t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> + restart_emqx_sn(#{subs_resume => true}), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID), + + %% the gateway try to shutdown this client if it reached max-retry-times + %% + %% times-0 + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-1 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% times-2 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% just a ping + send_pingreq_msg(NSocket, <<"test">>), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)), + %% times-3 + timer:sleep(5000), %% RETYRY_TIMEOUT + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + %% shutdown due to reached max retry times + timer:sleep(5000), %% RETYRY_TIMEOUT + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), + gen_udp:close(NSocket), + restart_emqx_sn(#{subs_resume => false}). + +t_register_enqueue_delivering_messages(_) -> + restart_emqx_sn(#{subs_resume => true}), + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + emqx_logger:set_log_level(debug), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% receive subs register requests + + %% registered failured topic-name will be skipped + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + + _ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + + send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED), + + %% receive the queued messages + + <<_, ?SN_PUBLISH, 2#00000000, + TopicIdA:16, 0:16, "m1">> = receive_response(NSocket), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + gen_udp:close(NSocket), + {ok, NSocket1} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket1, <<"test">>), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket1)), + send_disconnect_msg(NSocket1, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). + %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- @@ -1816,9 +2038,12 @@ send_register_msg(Socket, TopicName, MsgId) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket). send_regack_msg(Socket, TopicId, MsgId) -> + send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED). + +send_regack_msg(Socket, TopicId, MsgId, Rc) -> Length = 7, MsgType = ?SN_REGACK, - Packet = <>, + Packet = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) -> diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index d90663cca..3f0980b3c 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -1,11 +1,11 @@ %% -*- mode: erlang -*- {VSN, - [{<<"4\\.3\\.[0-2]$">>, + [{<<"4\\.3\\.[0-2]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[3-7]$">>, + {<<"4\\.3\\.[3-7]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -14,14 +14,15 @@ [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {"4.3.9", [ %% nothing so far + %% 4.3.9 is taken by release 4.3.12 ]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[0-2]$">>, + [{<<"4\\.3\\.[0-2]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[3-7]$">>, + {<<"4\\.3\\.[3-7]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -30,5 +31,6 @@ [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {"4.3.9", [ %% nothing so far + %% 4.3.9 is taken by release 4.3.12 ]}, {<<".*">>,[]}]}. diff --git a/deploy/charts/emqx/templates/StatefulSet.yaml b/deploy/charts/emqx/templates/StatefulSet.yaml index 992528bbe..ef4521ee8 100644 --- a/deploy/charts/emqx/templates/StatefulSet.yaml +++ b/deploy/charts/emqx/templates/StatefulSet.yaml @@ -27,7 +27,6 @@ spec: namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/name: {{ include "emqx.name" . }} - helm.sh/chart: {{ include "emqx.chart" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/managed-by: {{ .Release.Service }} annotations: diff --git a/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf index f67b88c17..2d59264a1 100644 --- a/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf +++ b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf @@ -3,12 +3,17 @@ ##-------------------------------------------------------------------- ## Default user's login name. -## +## For safety, it should be changed as soon as possible. +## Please use the './bin/emqx_ctl admins' CLI to change it. +## Then comment `dashboard.default_user.login/password` from here ## Value: String dashboard.default_user.login = admin ## Default user's password. -## +## The initial default password for 'dashboard.default_user.login'" +## For safety, it should be changed as soon as possible. +## Please use the './bin/emqx_ctl admins' CLI to change it. +## Then comment `dashboard.default_user.login/password` from here ## Value: String dashboard.default_user.password = public @@ -127,4 +132,3 @@ dashboard.listener.http.ipv6_v6only = false ## ## Value: on | off ## dashboard.listener.https.honor_cipher_order = on - diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl index 0390339d3..9ce60d51d 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.erl @@ -103,17 +103,17 @@ is_authorized(Req) -> is_authorized("/api/v4/auth", _Req) -> true; is_authorized(_Path, Req) -> - case cowboy_req:parse_header(<<"authorization">>, Req) of - {basic, Username, Password} -> - case emqx_dashboard_admin:check(iolist_to_binary(Username), - iolist_to_binary(Password)) of - ok -> true; - {error, Reason} -> - ?LOG(error, "[Dashboard] Authorization Failure: username=~s, reason=~p", - [Username, Reason]), - false - end; - _ -> false + try + {basic, Username, Password} = cowboy_req:parse_header(<<"authorization">>, Req), + case emqx_dashboard_admin:check(iolist_to_binary(Username), iolist_to_binary(Password)) of + ok -> true; + {error, Reason} -> + ?LOG(error, "[Dashboard] Authorization Failure: username=~s, reason=~p", + [Username, Reason]), + false + end + catch _:_ -> %% bad authorization header will crash. + false end. filter(#{app := emqx_modules}) -> true; diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl index b4e17f05b..e19ea48bc 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -77,18 +77,24 @@ start_link() -> -spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}). add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) -> - Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, - return(mnesia:transaction(fun add_user_/1, [Admin])). + case emqx_misc:is_sane_id(Username) of + ok -> + Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, + return(mnesia:transaction(fun add_user_/1, [Admin])); + {error, Reason} -> {error, Reason} + end. force_add_user(Username, Password, Tags) -> - AddFun = fun() -> - mnesia:write(#mqtt_admin{username = Username, - password = Password, - tags = Tags}) - end, - case mnesia:transaction(AddFun) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} + case emqx_misc:is_sane_id(Username) of + ok -> + AddFun = fun() -> + mnesia:write(#mqtt_admin{username = Username, password = Password, tags = Tags}) + end, + case mnesia:transaction(AddFun) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end; + {error, Reason} -> {error, Reason} end. %% @private diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl b/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl index e1c89efbb..4dbf2517f 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard_api.erl @@ -77,9 +77,10 @@ auth(_Bindings, Params) -> Password = proplists:get_value(<<"password">>, Params), return(emqx_dashboard_admin:check(Username, Password)). -change_pwd(#{username := Username}, Params) -> +change_pwd(#{username := Username0}, Params) -> OldPwd = proplists:get_value(<<"old_pwd">>, Params), NewPwd = proplists:get_value(<<"new_pwd">>, Params), + Username = emqx_mgmt_util:urldecode(Username0), return(emqx_dashboard_admin:change_password(Username, OldPwd, NewPwd)). create(_Bindings, Params) -> @@ -96,14 +97,13 @@ list(_Bindings, _Params) -> update(#{name := Username}, Params) -> Tags = proplists:get_value(<<"tags">>, Params), - return(emqx_dashboard_admin:update_user(Username, Tags)). + return(emqx_dashboard_admin:update_user(emqx_mgmt_util:urldecode(Username), Tags)). delete(#{name := <<"admin">>}, _Params) -> return({error, <<"Cannot delete admin">>}); delete(#{name := Username}, _Params) -> - return(emqx_dashboard_admin:remove_user(Username)). + return(emqx_dashboard_admin:remove_user(emqx_mgmt_util:urldecode(Username))). row(#mqtt_admin{username = Username, tags = Tags}) -> #{username => Username, tags => Tags}. - diff --git a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl index ddfcb540a..013e5707f 100644 --- a/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/lib-ce/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -71,9 +71,13 @@ t_overview(_) -> t_admins_add_delete(_) -> ok = emqx_dashboard_admin:add_user(<<"username">>, <<"password">>, <<"tag">>), ok = emqx_dashboard_admin:add_user(<<"username1">>, <<"password1">>, <<"tag1">>), + ok = emqx_dashboard_admin:add_user(<<"1username1">>, <<"password1">>, <<"tag1">>), + {error, _} = emqx_dashboard_admin:add_user(<<"u/sername1">>, <<"password1">>, <<"tag1">>), + {error, _} = emqx_dashboard_admin:add_user(<<"/username1">>, <<"password1">>, <<"tag1">>), Admins = emqx_dashboard_admin:all_users(), - ?assertEqual(3, length(Admins)), + ?assertEqual(4, length(Admins)), ok = emqx_dashboard_admin:remove_user(<<"username1">>), + ok = emqx_dashboard_admin:remove_user(<<"1username1">>), Users = emqx_dashboard_admin:all_users(), ?assertEqual(2, length(Users)), ok = emqx_dashboard_admin:change_password(<<"username">>, <<"password">>, <<"pwd">>), @@ -81,7 +85,8 @@ t_admins_add_delete(_) -> ?assert(request_dashboard(get, api_path("brokers"), auth_header_("username", "pwd"))), ok = emqx_dashboard_admin:remove_user(<<"username">>), - ?assertNotEqual(true, request_dashboard(get, api_path("brokers"), auth_header_("username", "pwd"))). + ?assertNotEqual(true, request_dashboard(get, api_path("brokers"), + auth_header_("username", "pwd"))). t_admins_persist_default_password(_) -> emqx_dashboard_admin:change_password(<<"admin">>, <<"new_password">>), @@ -156,9 +161,9 @@ t_default_password_persists_after_leaving_cluster(_) -> t_rest_api(_Config) -> {ok, Res0} = http_get("users"), - - ?assertEqual([#{<<"username">> => <<"admin">>, - <<"tags">> => <<"administrator">>}], get_http_data(Res0)), + Users = get_http_data(Res0), + ?assert(lists:member(#{<<"username">> => <<"admin">>, <<"tags">> => <<"administrator">>}, + Users)), AssertSuccess = fun({ok, Res}) -> ?assertEqual(#{<<"code">> => 0}, json(Res)) diff --git a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl index ac5be58b2..53280becc 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_delayed.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_delayed.erl @@ -53,11 +53,18 @@ { key , msg }). +-type delayed_message() :: #delayed_message{}. -define(TAB, ?MODULE). -define(SERVER, ?MODULE). -define(MAX_INTERVAL, 4294967). +-type state() :: #{ publish_at := non_neg_integer() + , timer := timer:tref() | undefined + , stats_timer => timer:tref() | undefined + , stats_fun => function() + }. + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -124,7 +131,7 @@ on_message_publish(Msg) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(store(#delayed_message{}) -> ok). +-spec(store(delayed_message()) -> ok). store(DelayedMsg) -> gen_server:call(?SERVER, {store, DelayedMsg}, infinity). @@ -134,7 +141,9 @@ store(DelayedMsg) -> init([]) -> {ok, ensure_stats_event( - ensure_publish_timer(#{timer => undefined, publish_at => 0}))}. + ensure_publish_timer(#{timer => undefined, + publish_at => 0, + stats_timer => undefined}))}. handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> ok = mnesia:dirty_write(?TAB, DelayedMsg), @@ -163,8 +172,9 @@ handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #{timer := TRef}) -> - emqx_misc:cancel_timer(TRef). +terminate(_Reason, #{timer := PublishTimer} = State) -> + emqx_misc:cancel_timer(PublishTimer), + emqx_misc:cancel_timer(maps:get(stats_timer, State, undefined)). code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" -> NState = maps:with([timer, publish_at], State), @@ -179,12 +189,14 @@ code_change(Vsn, State, _Extra) when Vsn =:= "4.3.0" -> %%-------------------------------------------------------------------- %% Ensure the stats +-spec ensure_stats_event(state()) -> state(). ensure_stats_event(State) -> StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'), {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), State#{stats_fun => StatsFun, stats_timer => StatsTimer}. %% Ensure publish timer +-spec ensure_publish_timer(state()) -> state(). ensure_publish_timer(State) -> ensure_publish_timer(mnesia:dirty_first(?TAB), State). @@ -222,4 +234,3 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> -spec(delayed_count() -> non_neg_integer()). delayed_count() -> mnesia:table_info(?TAB, size). - diff --git a/lib-ce/emqx_modules/src/emqx_mod_subscription.erl b/lib-ce/emqx_modules/src/emqx_mod_subscription.erl index 06178aee7..1b6a2c1c7 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_subscription.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_subscription.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). %% emqx_gen_mod callbacks -export([ load/1 @@ -38,14 +39,33 @@ load(Topics) -> emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) -> - Replace = fun(Topic) -> - rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) + + OptFun = case ProtoVer of + ?MQTT_PROTO_V5 -> fun(X) -> X end; + _ -> fun(#{qos := Qos}) -> #{qos => Qos} end end, - TopicFilters = case ProtoVer of - ?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics]; - _ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics] - end, - self() ! {subscribe, TopicFilters}. + + Fold = fun({Topic, SubOpts}, Acc) -> + case rep(Topic, ClientId, Username) of + {error, Reason} -> + ?LOG(warning, "auto subscribe ignored, topic filter:~ts reason:~p~n", + [Topic, Reason]), + Acc; + <<>> -> + ?LOG(warning, "auto subscribe ignored, topic filter:~ts" + " reason: topic can't be empty~n", + [Topic]), + Acc; + NewTopic -> + [{NewTopic, OptFun(SubOpts)} | Acc] + end + end, + + case lists:foldl(Fold, [], Topics) of + [] -> ok; + TopicFilters -> + self() ! {subscribe, TopicFilters} + end. unload(_) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). @@ -56,10 +76,24 @@ description() -> %% Internal functions %%-------------------------------------------------------------------- -rep(<<"%c">>, ClientId, Topic) -> - emqx_topic:feed_var(<<"%c">>, ClientId, Topic); -rep(<<"%u">>, undefined, Topic) -> - Topic; -rep(<<"%u">>, Username, Topic) -> - emqx_topic:feed_var(<<"%u">>, Username, Topic). +rep(Topic, ClientId, Username) -> + Words = emqx_topic:words(Topic), + rep(Words, ClientId, Username, []). +rep([<<"%c">> | T], ClientId, Username, Acc) -> + rep(T, + ClientId, + Username, + [ClientId | Acc]); +rep([<<"%u">> | _], _, undefined, _) -> + {error, username_undefined}; +rep([<<"%u">> | T], ClientId, Username, Acc) -> + rep(T, + ClientId, + Username, + [Username | Acc]); +rep([H | T], ClientId, UserName, Acc) -> + rep(T, ClientId, UserName, [H | Acc]); + +rep([], _, _, Acc) -> + emqx_topic:join(lists:reverse(Acc)). diff --git a/lib-ce/emqx_modules/src/emqx_modules.app.src b/lib-ce/emqx_modules/src/emqx_modules.app.src index 47a3d8888..ccc3a4c28 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.app.src +++ b/lib-ce/emqx_modules/src/emqx_modules.app.src @@ -1,6 +1,6 @@ {application, emqx_modules, [{description, "EMQ X Module Management"}, - {vsn, "4.3.4"}, + {vsn, "4.3.5"}, {modules, []}, {applications, [kernel,stdlib]}, {mod, {emqx_modules_app, []}}, diff --git a/lib-ce/emqx_modules/src/emqx_modules.appup.src b/lib-ce/emqx_modules/src/emqx_modules.appup.src index 1b9eeec84..01b9c6651 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.appup.src +++ b/lib-ce/emqx_modules/src/emqx_modules.appup.src @@ -1,33 +1,39 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [ - {<<"4\\.3\\.[2-3]">>, [ - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", [ - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {update, emqx_mod_delayed, {advanced, []}}, - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ], - [ - {<<"4\\.3\\.[2-3]">>, [ - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []} - ]}, - {"4.3.1", [ - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.3.0", [ - {update, emqx_mod_delayed, {advanced, []}}, - {load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} - ]}, - {<<".*">>, []} - ] -}. + [{"4.3.4", + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-3]">>, + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]}, + {"4.3.1", + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, + {"4.3.0", + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {update,emqx_mod_delayed,{advanced,[]}}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}], + [{"4.3.4", + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-3]">>, + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]}, + {"4.3.1", + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, + {"4.3.0", + [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]}, + {update,emqx_mod_delayed,{advanced,[]}}, + {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}, + {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}]}. diff --git a/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl index c2905754b..89a2678ef 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_subscription_SUITE.erl @@ -60,7 +60,7 @@ t_suboption(_) -> Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end, Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2}, ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])), - {ok, C1} = emqtt:start_link([{proto_ver, v5}]), + {ok, C1} = emqtt:start_link([{proto_ver, v5}, {username, "admin"}]), {ok, _} = emqtt:connect(C1), timer:sleep(200), [CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)), @@ -69,7 +69,7 @@ t_suboption(_) -> ?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1), ok = emqtt:disconnect(C1), %% The subscription option is not valid for MQTT V3.1.1 - {ok, C2} = emqtt:start_link([{proto_ver, v4}]), + {ok, C2} = emqtt:start_link([{proto_ver, v4}, {username, "admin"}]), {ok, _} = emqtt:connect(C2), timer:sleep(200), [CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)), diff --git a/scripts/update-appup.sh b/scripts/update-appup.sh index 1d18187ee..a57cfbef8 100755 --- a/scripts/update-appup.sh +++ b/scripts/update-appup.sh @@ -38,7 +38,9 @@ PREV_TAG="$(git describe --tag --match "${TAG_PREFIX}*" | grep -oE "${TAG_PREFIX PREV_VERSION="${PREV_TAG#[e|v]}" shift 1 -ESCRIPT_ARGS=() +# bash 3.2 treat empty array as unbound, so we can't use 'ESCRIPT_ARGS=()' here, +# but must add an empty-string element to the array +ESCRIPT_ARGS=( '' ) while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -103,12 +105,15 @@ if [ ! -d "${PREV_TAG}" ]; then fi popd +# bash 3.2 does not allow empty array, so we had to add an empty string in the ESCRIPT_ARGS array, +# this in turn makes quoting "${ESCRIPT_ARGS[@]}" problematic, hence disable SC2068 check here +# shellcheck disable=SC2068 ./scripts/update_appup.escript \ --src-dirs "${SRC_DIRS}/**" \ --release-dir "_build/${PROFILE}/lib" \ --prev-release-dir "${PREV_DIR_BASE}/${PREV_TAG}/emqx/lib" \ --skip-build \ - "${ESCRIPT_ARGS[@]}" "$PREV_VERSION" + ${ESCRIPT_ARGS[@]} "$PREV_VERSION" if [ "${IS_CHECK:-}" = 'yes' ]; then diffs="$(git diff --name-only | grep -E '\.appup\.src' || true)" diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 9083b62f1..840f63509 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -324,6 +324,8 @@ process_old_action({add_module, Module}) -> [Module]; process_old_action({delete_module, Module}) -> [Module]; +process_old_action({update, Module, _Change}) -> + [Module]; process_old_action(LoadModule) when is_tuple(LoadModule) andalso element(1, LoadModule) =:= load_module -> element(2, LoadModule); diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 95e2f48f2..86ca424b3 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -13,10 +13,12 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -40,6 +42,7 @@ [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, @@ -63,6 +66,7 @@ {"4.3.10", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -87,6 +91,7 @@ {"4.3.9", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, @@ -115,6 +120,7 @@ {"4.3.8", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, @@ -410,6 +416,7 @@ {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -422,6 +429,7 @@ [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, @@ -442,6 +450,7 @@ {"4.3.11", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -465,6 +474,7 @@ {"4.3.10", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, @@ -488,6 +498,7 @@ {"4.3.9", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, @@ -515,6 +526,7 @@ {"4.3.8", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 32f0062ea..f8c269135 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -924,9 +924,23 @@ return_sub_unsub_ack(Packet, Channel) -> -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). -handle_call(kick, Channel) -> - Channel1 = ensure_disconnected(kicked, Channel), - disconnect_and_shutdown(kicked, ok, Channel1); +handle_call(kick, Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + }) -> + (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + Channel1 = case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, + case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of + true -> + shutdown(kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); + _ -> + shutdown(kicked, ok, Channel1) + end; handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); @@ -984,7 +998,7 @@ handle_info({sock_closed, Reason}, Channel = clientinfo = ClientInfo = #{zone := Zone}}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; Shutdown -> Shutdown @@ -1645,9 +1659,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- %% Maybe Publish will msg -mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> case will_delay_interval(WillMsg) of 0 -> ok = publish_will_msg(WillMsg), @@ -1657,10 +1671,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). + maps:get('Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), 0). publish_will_msg(Msg) -> - %% TODO check if we should discard result here _ = emqx_broker:publish(Msg), ok. diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index eb6a25377..3c866b732 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -52,6 +52,26 @@ , hexstr2bin/1 ]). +-export([ is_sane_id/1 + ]). + +-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). + +-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}. +is_sane_id(Str) -> + StrLen = len(Str), + case StrLen > 0 andalso StrLen =< 256 of + true -> + case re:run(Str, ?VALID_STR_RE) of + nomatch -> {error, <<"required: " ?VALID_STR_RE>>}; + _ -> ok + end; + false -> {error, <<"0 < Length =< 256">>} + end. + +len(Bin) when is_binary(Bin) -> byte_size(Bin); +len(Str) when is_list(Str) -> length(Str). + -define(OOM_FACTOR, 1.25). %% @doc Parse v4 or v6 string format address to tuple. @@ -309,4 +329,30 @@ hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10. ipv6_probe_test() -> ?assertEqual([{ipv6_probe, true}], ipv6_probe([])). +is_sane_id_test() -> + ?assertMatch({error, _}, is_sane_id("")), + ?assertMatch({error, _}, is_sane_id("_")), + ?assertMatch({error, _}, is_sane_id("_aaa")), + ?assertMatch({error, _}, is_sane_id("lkad/oddl")), + ?assertMatch({error, _}, is_sane_id("lkad*oddl")), + ?assertMatch({error, _}, is_sane_id("script>lkadoddl")), + ?assertMatch({error, _}, is_sane_id("