Merge branch 'main-v4.3' into dashboard_password_reset_after_leaving_cluster

This commit is contained in:
gsychev 2022-03-23 13:57:59 +00:00 committed by GitHub
commit a9255032cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1024 additions and 309 deletions

View File

@ -19,7 +19,7 @@ jobs:
id: dload_jmeter id: dload_jmeter
timeout-minutes: 1 timeout-minutes: 1
env: env:
JMETER_VERSION: 5.3 JMETER_VERSION: 5.4.3
run: | run: |
wget --no-verbose --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org/jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz wget --no-verbose --no-check-certificate -O /tmp/apache-jmeter.tgz https://downloads.apache.org/jmeter/binaries/apache-jmeter-$JMETER_VERSION.tgz
- uses: actions/upload-artifact@v2 - uses: actions/upload-artifact@v2
@ -117,7 +117,7 @@ jobs:
- name: install jmeter - name: install jmeter
timeout-minutes: 10 timeout-minutes: 10
env: env:
JMETER_VERSION: 5.3 JMETER_VERSION: 5.4.3
run: | run: |
cd /tmp && tar -xvf apache-jmeter.tgz cd /tmp && tar -xvf apache-jmeter.tgz
echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
@ -208,7 +208,7 @@ jobs:
- name: install jmeter - name: install jmeter
timeout-minutes: 10 timeout-minutes: 10
env: env:
JMETER_VERSION: 5.3 JMETER_VERSION: 5.4.3
run: | run: |
cd /tmp && tar -xvf apache-jmeter.tgz cd /tmp && tar -xvf apache-jmeter.tgz
echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
@ -310,7 +310,7 @@ jobs:
- name: install jmeter - name: install jmeter
timeout-minutes: 10 timeout-minutes: 10
env: env:
JMETER_VERSION: 5.3 JMETER_VERSION: 5.4.3
run: | run: |
cd /tmp && tar -xvf apache-jmeter.tgz cd /tmp && tar -xvf apache-jmeter.tgz
echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties
@ -406,7 +406,7 @@ jobs:
- name: install jmeter - name: install jmeter
timeout-minutes: 10 timeout-minutes: 10
env: env:
JMETER_VERSION: 5.3 JMETER_VERSION: 5.4.3
run: | run: |
cd /tmp && tar -xvf apache-jmeter.tgz cd /tmp && tar -xvf apache-jmeter.tgz
echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties echo "jmeter.save.saveservice.output_format=xml" >> /tmp/apache-jmeter-$JMETER_VERSION/user.properties

View File

@ -22,12 +22,18 @@ File format:
* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache, * CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
to force an immediate reload of all certificates after the files are updated on disk. to force an immediate reload of all certificates after the files are updated on disk.
* Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983] * Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983]
* Force shutdown of processe that cannot answer takeover event [#7026] * Force shutdown of processes that cannot answer takeover event [#7026]
* `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter) * `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter)
* Add UTF-8 string validity check in `strict_mode` for MQTT packet. * Add UTF-8 string validity check in `strict_mode` for MQTT packet.
When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261] When set to true, invalid UTF-8 strings will cause the client to be disconnected. i.e. client ID, topic name. [#7261]
* Changed systemd service restart delay from 10 seconds to 60 seconds. * Changed systemd service restart delay from 10 seconds to 60 seconds.
* MQTT-SN gateway supports initiative to synchronize registered topics after session resumed. [#7300]
* Add load control app for future development. * Add load control app for future development.
* Change the precision of float to 17 digits after the decimal point when formatting a
float using payload templates of rule actions. The old precision is 10 digits before
this change. [#7336]
* Return the cached resource status when querying a resource using HTTP APIs.
This is to avoid blocking the HTTP request if the resource is unavailable. [#7374]
### Bug fixes ### Bug fixes
@ -43,6 +49,10 @@ File format:
* Fix false alert level log “cannot_find_plugins” caused by duplicate plugin names in `loaded_plugins` files. * Fix false alert level log “cannot_find_plugins” caused by duplicate plugin names in `loaded_plugins` files.
* Prompt user how to change the dashboard's initial default password when emqx start. * Prompt user how to change the dashboard's initial default password when emqx start.
* Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package) * Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package)
* Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$`
* Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`.
* Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309]
* Auto subscribe to an empty topic will be simply ignored now
## v4.3.12 ## v4.3.12
### Important changes ### Important changes

View File

@ -1,6 +1,5 @@
%% -*-: erlang -*- %% -*-: erlang -*-
{VSN,
{"4.3.4",
[ [
{"4.3.3", [ {"4.3.3", [
{load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []} {load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []}

View File

@ -6,15 +6,19 @@
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-5]">>, {<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}], {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}],
[{<<"4\\.3\\.[0-1]">>, [{<<"4\\.3\\.[0-1]">>,
[{restart_application,emqx_lwm2m}]}, [{restart_application,emqx_lwm2m}]},
{"4.3.2", {"4.3.2",
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]}, {load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-5]">>, {<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]}, [{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}. {load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]}]}.

View File

@ -31,5 +31,6 @@
-define(ERROR13, 113). %% User already exist -define(ERROR13, 113). %% User already exist
-define(ERROR14, 114). %% OldPassword error -define(ERROR14, 114). %% OldPassword error
-define(ERROR15, 115). %% bad topic -define(ERROR15, 115). %% bad topic
-define(ERROR16, 116). %% bad QoS
-define(VERSIONS, ["4.0", "4.1", "4.2", "4.3"]). -define(VERSIONS, ["4.0", "4.1", "4.2", "4.3"]).

View File

@ -1,6 +1,6 @@
{application, emqx_management, {application, emqx_management,
[{description, "EMQ X Management API and CLI"}, [{description, "EMQ X Management API and CLI"},
{vsn, "4.3.12"}, % strict semver, bump manually! {vsn, "4.3.11"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_management_sup]}, {registered, [emqx_management_sup]},
{applications, [kernel,stdlib,minirest]}, {applications, [kernel,stdlib,minirest]},

View File

@ -68,7 +68,7 @@ add_app(_Bindings, Params) ->
end. end.
del_app(#{appid := AppId}, _Params) -> del_app(#{appid := AppId}, _Params) ->
case emqx_mgmt_auth:del_app(AppId) of case emqx_mgmt_auth:del_app(emqx_mgmt_util:urldecode(AppId)) of
ok -> minirest:return(); ok -> minirest:return();
{error, Reason} -> minirest:return({error, Reason}) {error, Reason} -> minirest:return({error, Reason})
end. end.
@ -77,7 +77,7 @@ list_apps(_Bindings, _Params) ->
minirest:return({ok, [format(Apps)|| Apps <- emqx_mgmt_auth:list_apps()]}). minirest:return({ok, [format(Apps)|| Apps <- emqx_mgmt_auth:list_apps()]}).
lookup_app(#{appid := AppId}, _Params) -> lookup_app(#{appid := AppId}, _Params) ->
case emqx_mgmt_auth:lookup_app(AppId) of case emqx_mgmt_auth:lookup_app(emqx_mgmt_util:urldecode(AppId)) of
{AppId, AppSecret, Name, Desc, Status, Expired} -> {AppId, AppSecret, Name, Desc, Status, Expired} ->
minirest:return({ok, #{app_id => AppId, minirest:return({ok, #{app_id => AppId,
secret => AppSecret, secret => AppSecret,
@ -94,7 +94,7 @@ update_app(#{appid := AppId}, Params) ->
Desc = proplists:get_value(<<"desc">>, Params), Desc = proplists:get_value(<<"desc">>, Params),
Status = proplists:get_value(<<"status">>, Params), Status = proplists:get_value(<<"status">>, Params),
Expired = proplists:get_value(<<"expired">>, Params), Expired = proplists:get_value(<<"expired">>, Params),
case emqx_mgmt_auth:update_app(AppId, Name, Desc, Status, Expired) of case emqx_mgmt_auth:update_app(emqx_mgmt_util:urldecode(AppId), Name, Desc, Status, Expired) of
ok -> minirest:return(); ok -> minirest:return();
{error, Reason} -> minirest:return({error, Reason}) {error, Reason} -> minirest:return({error, Reason})
end. end.

View File

@ -151,6 +151,8 @@ do_subscribe(ClientId, _Topics, _QoS) when not is_binary(ClientId) ->
{ok, ?ERROR8, <<"bad clientid: must be string">>}; {ok, ?ERROR8, <<"bad clientid: must be string">>};
do_subscribe(_ClientId, [], _QoS) -> do_subscribe(_ClientId, [], _QoS) ->
{ok, ?ERROR15, bad_topic}; {ok, ?ERROR15, bad_topic};
do_subscribe(_ClientId, _Topic, QoS) when QoS =/= 0 andalso QoS =/= 1 andalso QoS =/= 2 ->
{ok, ?ERROR16, bad_qos};
do_subscribe(ClientId, Topics, QoS) -> do_subscribe(ClientId, Topics, QoS) ->
TopicTable = parse_topic_filters(Topics, QoS), TopicTable = parse_topic_filters(Topics, QoS),
case emqx_mgmt:subscribe(ClientId, TopicTable) of case emqx_mgmt:subscribe(ClientId, TopicTable) of

View File

@ -16,6 +16,8 @@
-module(emqx_mgmt_auth). -module(emqx_mgmt_auth).
-include_lib("emqx/include/logger.hrl").
%% Mnesia Bootstrap %% Mnesia Bootstrap
-export([mnesia/1]). -export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
@ -89,36 +91,45 @@ add_app(AppId, Name, Desc, Status, Expired) when is_binary(AppId) ->
-> {ok, appsecret()} -> {ok, appsecret()}
| {error, term()}). | {error, term()}).
add_app(AppId, Name, Secret, Desc, Status, Expired) when is_binary(AppId) -> add_app(AppId, Name, Secret, Desc, Status, Expired) when is_binary(AppId) ->
Secret1 = generate_appsecret_if_need(Secret), case emqx_misc:is_sane_id(AppId) of
App = #mqtt_app{id = AppId, ok ->
secret = Secret1, Secret1 = generate_appsecret_if_need(Secret),
name = Name, App = #mqtt_app{id = AppId,
desc = Desc, secret = Secret1,
status = Status, name = Name,
expired = Expired}, desc = Desc,
AddFun = fun() -> status = Status,
case mnesia:wread({mqtt_app, AppId}) of expired = Expired},
[] -> mnesia:write(App); AddFun = fun() ->
_ -> mnesia:abort(alread_existed) case mnesia:wread({mqtt_app, AppId}) of
end [] -> mnesia:write(App);
end, _ -> mnesia:abort(already_existed)
case mnesia:transaction(AddFun) of end
{atomic, ok} -> {ok, Secret1}; end,
{aborted, Reason} -> {error, Reason} case mnesia:transaction(AddFun) of
{atomic, ok} -> {ok, Secret1};
{aborted, Reason} -> {error, Reason}
end;
{error, Reason} -> {error, Reason}
end. end.
force_add_app(AppId, Name, Secret, Desc, Status, Expired) -> force_add_app(AppId, Name, Secret, Desc, Status, Expired) ->
AddFun = fun() -> case emqx_misc:is_sane_id(AppId) of
mnesia:write(#mqtt_app{id = AppId, ok ->
secret = Secret, AddFun = fun() ->
name = Name, mnesia:write(#mqtt_app{
desc = Desc, id = AppId,
status = Status, secret = Secret,
expired = Expired}) name = Name,
end, desc = Desc,
case mnesia:transaction(AddFun) of status = Status,
{atomic, ok} -> ok; expired = Expired})
{aborted, Reason} -> {error, Reason} end,
case mnesia:transaction(AddFun) of
{atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason}
end;
{error, Reason} -> {error, Reason}
end. end.
-spec(generate_appsecret_if_need(binary() | undefined) -> binary()). -spec(generate_appsecret_if_need(binary() | undefined) -> binary()).
@ -207,4 +218,3 @@ is_authorized(AppId, AppSecret) ->
is_expired(undefined) -> true; is_expired(undefined) -> true;
is_expired(Expired) -> Expired >= erlang:system_time(second). is_expired(Expired) -> Expired >= erlang:system_time(second).

View File

@ -118,9 +118,10 @@ handle_request(_Method, _Path, Req) ->
cowboy_req:reply(400, #{<<"content-type">> => <<"text/plain">>}, <<"Not found.">>, Req). cowboy_req:reply(400, #{<<"content-type">> => <<"text/plain">>}, <<"Not found.">>, Req).
authorize_appid(Req) -> authorize_appid(Req) ->
case cowboy_req:parse_header(<<"authorization">>, Req) of try
{basic, AppId, AppSecret} -> emqx_mgmt_auth:is_authorized(AppId, AppSecret); {basic, AppId, AppSecret} = cowboy_req:parse_header(<<"authorization">>, Req),
_ -> false emqx_mgmt_auth:is_authorized(AppId, AppSecret)
catch _:_ -> false
end. end.
-ifdef(EMQX_ENTERPRISE). -ifdef(EMQX_ENTERPRISE).

View File

@ -27,4 +27,3 @@ start_link() ->
init([]) -> init([]) ->
{ok, {{one_for_one, 1, 5}, []}}. {ok, {{one_for_one, 1, 5}, []}}.

View File

@ -1,29 +1,43 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.3.7",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, [{"4.3.7",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.6", {"4.3.6",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.6"]}}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{update,emqx_rule_metrics,{advanced,["4.3.5"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.5"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{update,emqx_rule_metrics,{advanced,["4.3.4"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.4"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{update,emqx_rule_metrics,{advanced,["4.3.3"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.3"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
@ -31,7 +45,9 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
[{update,emqx_rule_metrics,{advanced,["4.3.2"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.2"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
@ -40,7 +56,9 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.1", {"4.3.1",
[{update,emqx_rule_metrics,{advanced,["4.3.1"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.1"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
@ -49,7 +67,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{update,emqx_rule_metrics,{advanced,["4.3.0"]}}, [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.0"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
@ -59,30 +78,43 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.7",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, [{"4.3.7",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.6", {"4.3.6",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.6"]}}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{update,emqx_rule_metrics,{advanced,["4.3.5"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.5"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{update,emqx_rule_metrics,{advanced,["4.3.4"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.4"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{update,emqx_rule_metrics,{advanced,["4.3.3"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.3"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
@ -90,7 +122,9 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
[{update,emqx_rule_metrics,{advanced,["4.3.2"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.2"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}}, {apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
@ -99,7 +133,9 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.1", {"4.3.1",
[{update,emqx_rule_metrics,{advanced,["4.3.1"]}}, [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.1"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
@ -108,7 +144,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{update,emqx_rule_metrics,{advanced,["4.3.0"]}}, [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.0"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},

View File

@ -334,14 +334,11 @@ test_resource(#{type := Type, config := Config0}) ->
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
get_resource_status(ResId) -> get_resource_status(ResId) ->
case emqx_rule_registry:find_resource(ResId) of case emqx_rule_registry:find_resource_params(ResId) of
{ok, #resource{type = ResType}} -> {ok, #resource_params{status = Status}} ->
{ok, #resource_type{on_status = {Mod, OnStatus}}}
= emqx_rule_registry:find_resource_type(ResType),
Status = fetch_resource_status(Mod, OnStatus, ResId),
{ok, Status}; {ok, Status};
not_found -> not_found ->
{error, {resource_not_found, ResId}} {error, resource_not_initialized}
end. end.
-spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}). -spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}).

View File

@ -295,13 +295,8 @@ do_create_resource(Create, ParsedParams) ->
list_resources(#{}, _Params) -> list_resources(#{}, _Params) ->
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
Data = lists:map(fun(Res = #{id := Id}) -> Data = lists:map(fun(Res = #{id := ResId}) ->
Status = lists:all(fun(Node) -> Status = get_aggregated_status(ResId),
case rpc:call(Node, emqx_rule_registry, find_resource_params, [Id]) of
{ok, #resource_params{status = #{is_alive := true}}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()),
maps:put(status, Status, Res) maps:put(status, Status, Res)
end, Data0), end, Data0),
return({ok, Data}). return({ok, Data}).
@ -309,12 +304,23 @@ list_resources(#{}, _Params) ->
list_resources_by_type(#{type := Type}, _Params) -> list_resources_by_type(#{type := Type}, _Params) ->
return_all(emqx_rule_registry:get_resources_by_type(Type)). return_all(emqx_rule_registry:get_resources_by_type(Type)).
get_aggregated_status(ResId) ->
lists:all(fun(Node) ->
case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of
{ok, #{is_alive := true}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()).
show_resource(#{id := Id}, _Params) -> show_resource(#{id := Id}, _Params) ->
case emqx_rule_registry:find_resource(Id) of case emqx_rule_registry:find_resource(Id) of
{ok, R} -> {ok, R} ->
Status = Status =
[begin [begin
{ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]), St = case rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]) of
{ok, St0} -> St0;
{error, _} -> #{is_alive => false}
end,
maps:put(node, Node, St) maps:put(node, Node, St)
end || Node <- ekka_mnesia:running_nodes()], end || Node <- ekka_mnesia:running_nodes()],
return({ok, maps:put(status, Status, record_to_map(R))}); return({ok, maps:put(status, Status, record_to_map(R))});
@ -326,8 +332,8 @@ get_resource_status(#{id := Id}, _Params) ->
case emqx_rule_engine:get_resource_status(Id) of case emqx_rule_engine:get_resource_status(Id) of
{ok, Status} -> {ok, Status} ->
return({ok, Status}); return({ok, Status});
{error, {resource_not_found, ResId}} -> {error, resource_not_initialized} ->
return({error, 400, ?ERR_NO_RESOURCE(ResId)}) return({error, 400, ?ERR_NO_RESOURCE(Id)})
end. end.
start_resource(#{id := Id}, _Params) -> start_resource(#{id := Id}, _Params) ->

View File

@ -169,6 +169,16 @@
, sha256/1 , sha256/1
]). ]).
%% gzip Funcs
-export([ gzip/1
, gunzip/1
]).
%% zip Funcs
-export([ zip/1
, unzip/1
]).
%% Data encode and decode %% Data encode and decode
-export([ base64_encode/1 -export([ base64_encode/1
, base64_decode/1 , base64_decode/1
@ -784,6 +794,26 @@ sha256(S) when is_binary(S) ->
hash(Type, Data) -> hash(Type, Data) ->
emqx_misc:bin2hexstr_a_f(crypto:hash(Type, Data)). emqx_misc:bin2hexstr_a_f(crypto:hash(Type, Data)).
%%------------------------------------------------------------------------------
%% gzip Funcs
%%------------------------------------------------------------------------------
gzip(S) when is_binary(S) ->
zlib:gzip(S).
gunzip(S) when is_binary(S) ->
zlib:gunzip(S).
%%------------------------------------------------------------------------------
%% zip Funcs
%%------------------------------------------------------------------------------
zip(S) when is_binary(S) ->
zlib:zip(S).
unzip(S) when is_binary(S) ->
zlib:unzip(S).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Data encode and decode Funcs %% Data encode and decode Funcs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -60,8 +60,8 @@
]}). ]}).
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})"). -define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
-define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF -define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF
-define(FLOAT_PRECISION, 17).
-type(uri_string() :: iodata()). -type(uri_string() :: iodata()).
@ -336,12 +336,12 @@ bool(Bool) -> error({invalid_boolean, Bool}).
number_to_binary(Int) when is_integer(Int) -> number_to_binary(Int) when is_integer(Int) ->
integer_to_binary(Int); integer_to_binary(Int);
number_to_binary(Float) when is_float(Float) -> number_to_binary(Float) when is_float(Float) ->
float_to_binary(Float, [{decimals, 10}, compact]). float_to_binary(Float, [{decimals, ?FLOAT_PRECISION}, compact]).
number_to_list(Int) when is_integer(Int) -> number_to_list(Int) when is_integer(Int) ->
integer_to_list(Int); integer_to_list(Int);
number_to_list(Float) when is_float(Float) -> number_to_list(Float) when is_float(Float) ->
float_to_list(Float, [{decimals, 10}, compact]). float_to_list(Float, [{decimals, ?FLOAT_PRECISION}, compact]).
parse_nested(Attr) -> parse_nested(Attr) ->
case string:split(Attr, <<".">>, all) of case string:split(Attr, <<".">>, all) of

View File

@ -609,6 +609,29 @@ prop_hash_fun() ->
(64 == byte_size(apply_func(sha256, [S]))) (64 == byte_size(apply_func(sha256, [S])))
end). end).
%%------------------------------------------------------------------------------
%% Test cases for gzip funcs
%%------------------------------------------------------------------------------
t_gzip_funcs(_) ->
?PROPTEST(prop_gzip_fun).
prop_gzip_fun() ->
?FORALL(S, binary(),
S == apply_func(gunzip, [apply_func(gzip, [S])])).
%%------------------------------------------------------------------------------
%% Test cases for zip funcs
%%------------------------------------------------------------------------------
t_zip_funcs(_) ->
?PROPTEST(prop_zip_fun).
prop_zip_fun() ->
?FORALL(S, binary(),
S == apply_func(unzip, [apply_func(zip, [S])])).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Test cases for base64 %% Test cases for base64
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -822,4 +845,3 @@ all() ->
suite() -> suite() ->
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].

View File

@ -51,3 +51,10 @@ mqtt.sn.username = mqtt_sn_user
## ##
## Value: String ## Value: String
mqtt.sn.password = abc mqtt.sn.password = abc
## Whether to initiate all subscribed topic registration messages to the
## client after the Session has been taken over by a new channel.
##
## Value: Boolean
## Default: false
#mqtt.sn.subs_resume = false

View File

@ -56,6 +56,11 @@ end}.
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "mqtt.sn.subs_resume", "emqx_sn.subs_resume", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx_sn.username", fun(Conf) -> {translation, "emqx_sn.username", fun(Conf) ->
Username = cuttlefish:conf_get("mqtt.sn.username", Conf), Username = cuttlefish:conf_get("mqtt.sn.username", Conf),
list_to_binary(Username) list_to_binary(Username)

View File

@ -1,26 +1,52 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[ [
{<<"4\\.3\\.[4-5]">>,[ {"4.3.5",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} {load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.5"]}}
]}, ]},
{<<"4.3.[2-3]">>,[ {"4.3.4",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.4"]}}
]},
{"4.3.3",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} {load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.3"]}}
]},
{"4.3.2",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.2"]}}
]}, ]},
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
], ],
[ [
{<<"4\\.3\\.[4-5]">>,[ {"4.3.5",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} {load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.5"]}}
]}, ]},
{<<"4.3.[2-3]">>,[ {"4.3.4",[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.4"]}}
]},
{"4.3.3",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} {load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.3"]}}
]},
{"4.3.2",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
{update,emqx_sn_gateway,{advanced,["4.3.2"]}}
]}, ]},
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
]}. ]}.

View File

@ -122,12 +122,14 @@ listeners_confs() ->
EnableQos3 = application:get_env(emqx_sn, enable_qos3, false), EnableQos3 = application:get_env(emqx_sn, enable_qos3, false),
EnableStats = application:get_env(emqx_sn, enable_stats, false), EnableStats = application:get_env(emqx_sn, enable_stats, false),
IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000), IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000),
SubsResume = application:get_env(emqx_sn, subs_resume, false),
[{udp, ListenOn, [{gateway_id, GwId}, [{udp, ListenOn, [{gateway_id, GwId},
{username, Username}, {username, Username},
{password, Password}, {password, Password},
{enable_qos3, EnableQos3}, {enable_qos3, EnableQos3},
{enable_stats, EnableStats}, {enable_stats, EnableStats},
{idle_timeout, IdleTimeout}, {idle_timeout, IdleTimeout},
{subs_resume, SubsResume},
{max_connections, 1024000}, {max_connections, 1024000},
{max_conn_rate, 1000}, {max_conn_rate, 1000},
{udp_options, []}]}]. {udp_options, []}]}].

View File

@ -339,9 +339,9 @@ format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
format(?SN_PINGREQ_MSG(ClientId)) -> format(?SN_PINGREQ_MSG(ClientId)) ->
io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]);
format(?SN_PINGRESP_MSG()) -> format(?SN_PINGRESP_MSG()) ->
"SN_PINGREQ()"; "SN_PINGRESP()";
format(?SN_DISCONNECT_MSG(Duration)) -> format(?SN_DISCONNECT_MSG(Duration)) ->
io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); io_lib:format("SN_DISCONNECT(Duration=~w)", [Duration]);
format(#mqtt_sn_message{type = Type, variable = Var}) -> format(#mqtt_sn_message{type = Type, variable = Var}) ->
io_lib:format("mqtt_sn_message(type=~s, Var=~w)", io_lib:format("mqtt_sn_message(type=~s, Var=~w)",

View File

@ -48,6 +48,7 @@
, wait_for_will_topic/3 , wait_for_will_topic/3
, wait_for_will_msg/3 , wait_for_will_msg/3
, connected/3 , connected/3
, registering/3
, asleep/3 , asleep/3
, awake/3 , awake/3
]). ]).
@ -96,7 +97,10 @@
has_pending_pingresp = false :: boolean(), has_pending_pingresp = false :: boolean(),
%% Store all qos0 messages for waiting REGACK %% Store all qos0 messages for waiting REGACK
%% Note: QoS1/QoS2 messages will kept inflight queue %% Note: QoS1/QoS2 messages will kept inflight queue
pending_topic_ids = #{} :: pending_msgs() pending_topic_ids = #{} :: pending_msgs(),
subs_resume = false,
waiting_sync_topics = [],
previous_outgoings_and_state = undefined
}). }).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]).
@ -126,6 +130,9 @@
Reason =:= asleep_timeout; Reason =:= asleep_timeout;
Reason =:= keepalive_timeout). Reason =:= keepalive_timeout).
-define(RETRY_TIMEOUT, 5000).
-define(MAX_RETRY_TIMES, 3).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Exported APIs %% Exported APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -152,6 +159,7 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
Password = proplists:get_value(password, Options, undefined), Password = proplists:get_value(password, Options, undefined),
EnableQos3 = proplists:get_value(enable_qos3, Options, false), EnableQos3 = proplists:get_value(enable_qos3, Options, false),
IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), IdleTimeout = proplists:get_value(idle_timeout, Options, 30000),
SubsResume = proplists:get_value(subs_resume, Options, false),
EnableStats = proplists:get_value(enable_stats, Options, false), EnableStats = proplists:get_value(enable_stats, Options, false),
case inet:sockname(Sock) of case inet:sockname(Sock) of
{ok, Sockname} -> {ok, Sockname} ->
@ -168,7 +176,8 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
asleep_timer = emqx_sn_asleep_timer:init(), asleep_timer = emqx_sn_asleep_timer:init(),
enable_stats = EnableStats, enable_stats = EnableStats,
enable_qos3 = EnableQos3, enable_qos3 = EnableQos3,
idle_timeout = IdleTimeout idle_timeout = IdleTimeout,
subs_resume = SubsResume
}, },
emqx_logger:set_metadata_peername(esockd:format(Peername)), emqx_logger:set_metadata_peername(esockd:format(Peername)),
{ok, idle, State, [IdleTimeout]}; {ok, idle, State, [IdleTimeout]};
@ -379,6 +388,13 @@ connected(cast, {outgoing, Packet}, State) ->
connected(cast, {connack, ConnAck}, State) -> connected(cast, {connack, ConnAck}, State) ->
{keep_state, handle_outgoing(ConnAck, State)}; {keep_state, handle_outgoing(ConnAck, State)};
connected(cast, {register, TopicNames, BlockedOutgoins}, State) ->
NState = State#state{
waiting_sync_topics = TopicNames,
previous_outgoings_and_state = {BlockedOutgoins, ?FUNCTION_NAME}
},
{next_state, registering, NState, [next_event(shooting)]};
connected(cast, {shutdown, Reason, Packet}, State) -> connected(cast, {shutdown, Reason, Packet}, State) ->
stop(Reason, handle_outgoing(Packet, State)); stop(Reason, handle_outgoing(Packet, State));
@ -392,6 +408,80 @@ connected(cast, {close, Reason}, State) ->
connected(EventType, EventContent, State) -> connected(EventType, EventContent, State) ->
handle_event(EventType, EventContent, connected, State). handle_event(EventType, EventContent, connected, State).
registering(cast, shooting,
State = #state{
channel = Channel,
waiting_sync_topics = [],
previous_outgoings_and_state = {Outgoings, StateName}}) ->
Session = emqx_channel:get_session(Channel),
ClientInfo = emqx_channel:info(clientinfo, Channel),
{Outgoings2, NChannel} =
case emqx_session:dequeue(ClientInfo, Session) of
{ok, NSession} ->
{[], emqx_channel:set_session(NSession, Channel)};
{ok, Pubs, NSession} ->
emqx_channel:do_deliver(
Pubs,
emqx_channel:set_session(NSession, Channel)
)
end,
NState = State#state{
channel = NChannel,
previous_outgoings_and_state = undefined},
{next_state, StateName, NState, outgoing_events(Outgoings ++ Outgoings2)};
registering(cast, shooting,
State = #state{
clientid = ClientId,
waiting_sync_topics = [TopicName | Remainings]}) ->
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
NState = send_register(
TopicName,
TopicId,
16#FFFF, %% FIXME: msgid ?
State#state{waiting_sync_topics = [{TopicId, TopicName, 0} | Remainings]}
),
{keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}};
registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, _MsgId, ?SN_RC_ACCEPTED)},
State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) ->
?LOG(debug, "Register topic name ~s with id ~w successfully!", [TopicName, TopicId]),
{keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]};
registering(cast, {incoming, ?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)},
State = #state{waiting_sync_topics = [{TopicId, TopicName, _} | Remainings]}) ->
?LOG(error, "client does not accept register TopicName=~s, TopicId=~p, MsgId=~p, ReturnCode=~p",
[TopicName, TopicId, MsgId, ReturnCode]),
{keep_state, State#state{waiting_sync_topics = Remainings}, [next_event(shooting)]};
registering(cast, {incoming, Packet},
State = #state{previous_outgoings_and_state = {_, StateName}})
when is_record(Packet, mqtt_sn_message) ->
apply(?MODULE, StateName, [cast, {incoming, Packet}, State]);
registering({timeout, wait_regack}, _,
State = #state{waiting_sync_topics = [{TopicId, TopicName, Times} | Remainings]})
when Times < ?MAX_RETRY_TIMES ->
?LOG(warning, "Waiting REGACK timeout for TopicName=~s, TopicId=~w, try it again(~w)",
[TopicName, TopicId, Times+1]),
NState = send_register(
TopicName,
TopicId,
16#FFFF, %% FIXME: msgid?
State#state{waiting_sync_topics = [{TopicId, TopicName, Times + 1} | Remainings]}
),
{keep_state, NState, {{timeout, wait_regack}, ?RETRY_TIMEOUT, nocontent}};
registering({timeout, wait_regack}, _,
State = #state{waiting_sync_topics = [{TopicId, TopicName, ?MAX_RETRY_TIMES} | _]}) ->
?LOG(error, "Retry register TopicName=~s, TopicId=~w reached the max retry times",
[TopicId, TopicName]),
NState = send_message(?SN_DISCONNECT_MSG(undefined), State),
stop(reached_max_retry_times, NState);
registering(EventType, EventContent, State) ->
handle_event(EventType, EventContent, ?FUNCTION_NAME, State).
asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) -> asleep(cast, {incoming, ?SN_DISCONNECT_MSG(Duration)}, State) ->
State0 = send_message(?SN_DISCONNECT_MSG(undefined), State), State0 = send_message(?SN_DISCONNECT_MSG(undefined), State),
case Duration of case Duration of
@ -519,10 +609,13 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
stop(frame_error, State) stop(frame_error, State)
end; end;
handle_event(info, {deliver, _Topic, Msg}, asleep, handle_event(info, {deliver, _Topic, Msg}, StateName,
State = #state{channel = Channel}) -> State = #state{channel = Channel})
when StateName == alseep;
StateName == registering ->
% section 6.14, Support of sleeping clients % section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), ?LOG(debug, "enqueue downlink message in ~s state, msg: ~0p",
[StateName, Msg]),
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
Msg, emqx_channel:get_session(Channel)), Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -593,8 +686,31 @@ terminate(Reason, _StateName, #state{channel = Channel}) ->
emqx_channel:terminate(Reason, Channel), emqx_channel:terminate(Reason, Channel),
ok. ok.
code_change(_Vsn, StateName, State, _Extra) -> %% in the emqx_sn:v4.3.6, we have added two new fields in the state last:
{ok, StateName, State}. %% - waiting_sync_topics
%% - previous_outgoings_and_state
code_change({down, _Vsn}, StateName, State, [ToVsn]) ->
case re:run(ToVsn, "4\\.3\\.[2-5]") of
{match, _} ->
NState0 = lists:droplast(
lists:droplast(
lists:droplast(tuple_to_list(State)))),
NState = list_to_tuple(NState0),
{ok, StateName, NState};
_ ->
{ok, StateName, State}
end;
code_change(_Vsn, StateName, State, [FromVsn]) ->
case re:run(FromVsn, "4\\.3\\.[2-5]") of
{match, _} ->
NState = list_to_tuple(
tuple_to_list(State) ++ [false, [], undefined]
),
{ok, StateName, NState};
_ ->
{ok, StateName, State}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle Call/Info %% Handle Call/Info
@ -643,6 +759,9 @@ outgoing_event(Packet) when is_record(Packet, mqtt_packet);
outgoing_event(Action) -> outgoing_event(Action) ->
next_event(Action). next_event(Action).
next_event(Content) ->
{next_event, cast, Content}.
close_socket(State = #state{sockstate = closed}) -> State; close_socket(State = #state{sockstate = closed}) -> State;
close_socket(State = #state{socket = _Socket}) -> close_socket(State = #state{socket = _Socket}) ->
%ok = gen_udp:close(Socket), %ok = gen_udp:close(Socket),
@ -1058,6 +1177,38 @@ handle_incoming(#mqtt_packet{variable = #mqtt_packet_puback{}} = Packet, awake,
Result = channel_handle_in(Packet, State), Result = channel_handle_in(Packet, State),
handle_return(Result, State, [try_goto_asleep]); handle_return(Result, State, [try_goto_asleep]);
handle_incoming(
#mqtt_packet{
variable = #mqtt_packet_connect{
clean_start = false}
} = Packet,
_,
State = #state{subs_resume = SubsResume}) ->
Result = channel_handle_in(Packet, State),
case {SubsResume, Result} of
{true, {ok, Replies, NChannel}} ->
case maps:get(
subscriptions,
emqx_channel:info(session, NChannel)
) of
Subs when map_size(Subs) == 0 ->
handle_return(Result, State);
Subs ->
TopicNames = lists:filter(
fun(T) -> not emqx_topic:wildcard(T)
end, maps:keys(Subs)),
{ConnackEvents, Outgoings} = split_connack_replies(
Replies),
Events = outgoing_events(
ConnackEvents ++
[{register, TopicNames, Outgoings}]
),
{keep_state, State#state{channel = NChannel}, Events}
end;
_ ->
handle_return(Result, State)
end;
handle_incoming(Packet, _StName, State) -> handle_incoming(Packet, _StName, State) ->
Result = channel_handle_in(Packet, State), Result = channel_handle_in(Packet, State),
handle_return(Result, State). handle_return(Result, State).
@ -1167,9 +1318,6 @@ inc_outgoing_stats(Type) ->
false -> ok false -> ok
end. end.
next_event(Content) ->
{next_event, cast, Content}.
inc_counter(Key, Inc) -> inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc), _ = emqx_pd:inc_counter(Key, Inc),
ok. ok.
@ -1183,3 +1331,8 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) ->
State; State;
maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) ->
send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State).
%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}]
split_connack_replies([A = {event, connected},
B = {connack, _ConnAck} | Outgoings]) ->
{[A, B], Outgoings}.

View File

@ -79,6 +79,12 @@ set_special_confs(emqx_sn) ->
set_special_confs(_App) -> set_special_confs(_App) ->
ok. ok.
restart_emqx_sn(#{subs_resume := Bool}) ->
application:set_env(emqx_sn, subs_resume, Bool),
_ = application:stop(emqx_sn),
_ = application:ensure_all_started(emqx_sn),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -862,108 +868,6 @@ t_delivery_qos1_register_invalid_topic_id(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket). gen_udp:close(Socket).
t_delivery_takeover_and_re_register(_) ->
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#00100000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% qos1
%% received the resume messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
%% qos2
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_disconnect_msg(NSocket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
gen_udp:close(NSocket).
t_will_case01(_) -> t_will_case01(_) ->
QoS = 1, QoS = 1,
Duration = 1, Duration = 1,
@ -1725,6 +1629,324 @@ t_broadcast_test1(_) ->
timer:sleep(600), timer:sleep(600),
gen_udp:close(Socket). gen_udp:close(Socket).
t_register_subs_resume_on(_) ->
restart_emqx_sn(#{subs_resume => true}),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% receive the queued messages
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdA:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdA2),
<<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdA2),
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdB:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB2),
<<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB2),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1),
restart_emqx_sn(#{subs_resume => false}).
t_register_subs_resume_off(_) ->
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#00100000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% qos1
%% received the resume messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
%% qos2
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1).
t_register_skip_failure_topic_name_and_reach_max_retry_times(_) ->
restart_emqx_sn(#{subs_resume => true}),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
%% registered failured topic-name will be skipped
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID),
%% the gateway try to shutdown this client if it reached max-retry-times
%%
%% times-0
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% times-1
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% times-2
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% just a ping
send_pingreq_msg(NSocket, <<"test">>),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)),
%% times-3
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% shutdown due to reached max retry times
timer:sleep(5000), %% RETYRY_TIMEOUT
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
gen_udp:close(NSocket),
restart_emqx_sn(#{subs_resume => false}).
t_register_enqueue_delivering_messages(_) ->
restart_emqx_sn(#{subs_resume => true}),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
emqx_logger:set_log_level(debug),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
%% registered failured topic-name will be skipped
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED),
%% receive the queued messages
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdA:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1),
restart_emqx_sn(#{subs_resume => false}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper funcs %% Helper funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1816,9 +2038,12 @@ send_register_msg(Socket, TopicName, MsgId) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket). ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket).
send_regack_msg(Socket, TopicId, MsgId) -> send_regack_msg(Socket, TopicId, MsgId) ->
send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED).
send_regack_msg(Socket, TopicId, MsgId, Rc) ->
Length = 7, Length = 7,
MsgType = ?SN_REGACK, MsgType = ?SN_REGACK,
Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, Rc>>,
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) -> send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) ->

View File

@ -1,11 +1,11 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[{<<"4\\.3\\.[0-2]$">>, [{<<"4\\.3\\.[0-2]">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-7]$">>, {<<"4\\.3\\.[3-7]">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -14,14 +14,15 @@
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9", {"4.3.9",
[ %% nothing so far [ %% nothing so far
%% 4.3.9 is taken by release 4.3.12
]}, ]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[0-2]$">>, [{<<"4\\.3\\.[0-2]">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-7]$">>, {<<"4\\.3\\.[3-7]">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -30,5 +31,6 @@
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9", {"4.3.9",
[ %% nothing so far [ %% nothing so far
%% 4.3.9 is taken by release 4.3.12
]}, ]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -27,7 +27,6 @@ spec:
namespace: {{ .Release.Namespace }} namespace: {{ .Release.Namespace }}
labels: labels:
app.kubernetes.io/name: {{ include "emqx.name" . }} app.kubernetes.io/name: {{ include "emqx.name" . }}
helm.sh/chart: {{ include "emqx.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }} app.kubernetes.io/managed-by: {{ .Release.Service }}
annotations: annotations:

View File

@ -3,12 +3,17 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Default user's login name. ## Default user's login name.
## ## For safety, it should be changed as soon as possible.
## Please use the './bin/emqx_ctl admins' CLI to change it.
## Then comment `dashboard.default_user.login/password` from here
## Value: String ## Value: String
dashboard.default_user.login = admin dashboard.default_user.login = admin
## Default user's password. ## Default user's password.
## ## The initial default password for 'dashboard.default_user.login'"
## For safety, it should be changed as soon as possible.
## Please use the './bin/emqx_ctl admins' CLI to change it.
## Then comment `dashboard.default_user.login/password` from here
## Value: String ## Value: String
dashboard.default_user.password = public dashboard.default_user.password = public
@ -127,4 +132,3 @@ dashboard.listener.http.ipv6_v6only = false
## ##
## Value: on | off ## Value: on | off
## dashboard.listener.https.honor_cipher_order = on ## dashboard.listener.https.honor_cipher_order = on

View File

@ -103,17 +103,17 @@ is_authorized(Req) ->
is_authorized("/api/v4/auth", _Req) -> is_authorized("/api/v4/auth", _Req) ->
true; true;
is_authorized(_Path, Req) -> is_authorized(_Path, Req) ->
case cowboy_req:parse_header(<<"authorization">>, Req) of try
{basic, Username, Password} -> {basic, Username, Password} = cowboy_req:parse_header(<<"authorization">>, Req),
case emqx_dashboard_admin:check(iolist_to_binary(Username), case emqx_dashboard_admin:check(iolist_to_binary(Username), iolist_to_binary(Password)) of
iolist_to_binary(Password)) of ok -> true;
ok -> true; {error, Reason} ->
{error, Reason} -> ?LOG(error, "[Dashboard] Authorization Failure: username=~s, reason=~p",
?LOG(error, "[Dashboard] Authorization Failure: username=~s, reason=~p", [Username, Reason]),
[Username, Reason]), false
false end
end; catch _:_ -> %% bad authorization header will crash.
_ -> false false
end. end.
filter(#{app := emqx_modules}) -> true; filter(#{app := emqx_modules}) -> true;

View File

@ -77,18 +77,24 @@ start_link() ->
-spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}). -spec(add_user(binary(), binary(), binary()) -> ok | {error, any()}).
add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) -> add_user(Username, Password, Tags) when is_binary(Username), is_binary(Password) ->
Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags}, case emqx_misc:is_sane_id(Username) of
return(mnesia:transaction(fun add_user_/1, [Admin])). ok ->
Admin = #mqtt_admin{username = Username, password = hash(Password), tags = Tags},
return(mnesia:transaction(fun add_user_/1, [Admin]));
{error, Reason} -> {error, Reason}
end.
force_add_user(Username, Password, Tags) -> force_add_user(Username, Password, Tags) ->
AddFun = fun() -> case emqx_misc:is_sane_id(Username) of
mnesia:write(#mqtt_admin{username = Username, ok ->
password = Password, AddFun = fun() ->
tags = Tags}) mnesia:write(#mqtt_admin{username = Username, password = Password, tags = Tags})
end, end,
case mnesia:transaction(AddFun) of case mnesia:transaction(AddFun) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end;
{error, Reason} -> {error, Reason}
end. end.
%% @private %% @private

View File

@ -77,9 +77,10 @@ auth(_Bindings, Params) ->
Password = proplists:get_value(<<"password">>, Params), Password = proplists:get_value(<<"password">>, Params),
return(emqx_dashboard_admin:check(Username, Password)). return(emqx_dashboard_admin:check(Username, Password)).
change_pwd(#{username := Username}, Params) -> change_pwd(#{username := Username0}, Params) ->
OldPwd = proplists:get_value(<<"old_pwd">>, Params), OldPwd = proplists:get_value(<<"old_pwd">>, Params),
NewPwd = proplists:get_value(<<"new_pwd">>, Params), NewPwd = proplists:get_value(<<"new_pwd">>, Params),
Username = emqx_mgmt_util:urldecode(Username0),
return(emqx_dashboard_admin:change_password(Username, OldPwd, NewPwd)). return(emqx_dashboard_admin:change_password(Username, OldPwd, NewPwd)).
create(_Bindings, Params) -> create(_Bindings, Params) ->
@ -96,14 +97,13 @@ list(_Bindings, _Params) ->
update(#{name := Username}, Params) -> update(#{name := Username}, Params) ->
Tags = proplists:get_value(<<"tags">>, Params), Tags = proplists:get_value(<<"tags">>, Params),
return(emqx_dashboard_admin:update_user(Username, Tags)). return(emqx_dashboard_admin:update_user(emqx_mgmt_util:urldecode(Username), Tags)).
delete(#{name := <<"admin">>}, _Params) -> delete(#{name := <<"admin">>}, _Params) ->
return({error, <<"Cannot delete admin">>}); return({error, <<"Cannot delete admin">>});
delete(#{name := Username}, _Params) -> delete(#{name := Username}, _Params) ->
return(emqx_dashboard_admin:remove_user(Username)). return(emqx_dashboard_admin:remove_user(emqx_mgmt_util:urldecode(Username))).
row(#mqtt_admin{username = Username, tags = Tags}) -> row(#mqtt_admin{username = Username, tags = Tags}) ->
#{username => Username, tags => Tags}. #{username => Username, tags => Tags}.

View File

@ -71,9 +71,13 @@ t_overview(_) ->
t_admins_add_delete(_) -> t_admins_add_delete(_) ->
ok = emqx_dashboard_admin:add_user(<<"username">>, <<"password">>, <<"tag">>), ok = emqx_dashboard_admin:add_user(<<"username">>, <<"password">>, <<"tag">>),
ok = emqx_dashboard_admin:add_user(<<"username1">>, <<"password1">>, <<"tag1">>), ok = emqx_dashboard_admin:add_user(<<"username1">>, <<"password1">>, <<"tag1">>),
ok = emqx_dashboard_admin:add_user(<<"1username1">>, <<"password1">>, <<"tag1">>),
{error, _} = emqx_dashboard_admin:add_user(<<"u/sername1">>, <<"password1">>, <<"tag1">>),
{error, _} = emqx_dashboard_admin:add_user(<<"/username1">>, <<"password1">>, <<"tag1">>),
Admins = emqx_dashboard_admin:all_users(), Admins = emqx_dashboard_admin:all_users(),
?assertEqual(3, length(Admins)), ?assertEqual(4, length(Admins)),
ok = emqx_dashboard_admin:remove_user(<<"username1">>), ok = emqx_dashboard_admin:remove_user(<<"username1">>),
ok = emqx_dashboard_admin:remove_user(<<"1username1">>),
Users = emqx_dashboard_admin:all_users(), Users = emqx_dashboard_admin:all_users(),
?assertEqual(2, length(Users)), ?assertEqual(2, length(Users)),
ok = emqx_dashboard_admin:change_password(<<"username">>, <<"password">>, <<"pwd">>), ok = emqx_dashboard_admin:change_password(<<"username">>, <<"password">>, <<"pwd">>),
@ -81,7 +85,8 @@ t_admins_add_delete(_) ->
?assert(request_dashboard(get, api_path("brokers"), auth_header_("username", "pwd"))), ?assert(request_dashboard(get, api_path("brokers"), auth_header_("username", "pwd"))),
ok = emqx_dashboard_admin:remove_user(<<"username">>), ok = emqx_dashboard_admin:remove_user(<<"username">>),
?assertNotEqual(true, request_dashboard(get, api_path("brokers"), auth_header_("username", "pwd"))). ?assertNotEqual(true, request_dashboard(get, api_path("brokers"),
auth_header_("username", "pwd"))).
t_admins_persist_default_password(_) -> t_admins_persist_default_password(_) ->
emqx_dashboard_admin:change_password(<<"admin">>, <<"new_password">>), emqx_dashboard_admin:change_password(<<"admin">>, <<"new_password">>),
@ -156,9 +161,9 @@ t_default_password_persists_after_leaving_cluster(_) ->
t_rest_api(_Config) -> t_rest_api(_Config) ->
{ok, Res0} = http_get("users"), {ok, Res0} = http_get("users"),
Users = get_http_data(Res0),
?assertEqual([#{<<"username">> => <<"admin">>, ?assert(lists:member(#{<<"username">> => <<"admin">>, <<"tags">> => <<"administrator">>},
<<"tags">> => <<"administrator">>}], get_http_data(Res0)), Users)),
AssertSuccess = fun({ok, Res}) -> AssertSuccess = fun({ok, Res}) ->
?assertEqual(#{<<"code">> => 0}, json(Res)) ?assertEqual(#{<<"code">> => 0}, json(Res))

View File

@ -53,11 +53,18 @@
{ key { key
, msg , msg
}). }).
-type delayed_message() :: #delayed_message{}.
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(MAX_INTERVAL, 4294967). -define(MAX_INTERVAL, 4294967).
-type state() :: #{ publish_at := non_neg_integer()
, timer := timer:tref() | undefined
, stats_timer => timer:tref() | undefined
, stats_fun => function()
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -124,7 +131,7 @@ on_message_publish(Msg) ->
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(store(#delayed_message{}) -> ok). -spec(store(delayed_message()) -> ok).
store(DelayedMsg) -> store(DelayedMsg) ->
gen_server:call(?SERVER, {store, DelayedMsg}, infinity). gen_server:call(?SERVER, {store, DelayedMsg}, infinity).
@ -134,7 +141,9 @@ store(DelayedMsg) ->
init([]) -> init([]) ->
{ok, ensure_stats_event( {ok, ensure_stats_event(
ensure_publish_timer(#{timer => undefined, publish_at => 0}))}. ensure_publish_timer(#{timer => undefined,
publish_at => 0,
stats_timer => undefined}))}.
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
ok = mnesia:dirty_write(?TAB, DelayedMsg), ok = mnesia:dirty_write(?TAB, DelayedMsg),
@ -163,8 +172,9 @@ handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #{timer := TRef}) -> terminate(_Reason, #{timer := PublishTimer} = State) ->
emqx_misc:cancel_timer(TRef). emqx_misc:cancel_timer(PublishTimer),
emqx_misc:cancel_timer(maps:get(stats_timer, State, undefined)).
code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" -> code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" ->
NState = maps:with([timer, publish_at], State), NState = maps:with([timer, publish_at], State),
@ -179,12 +189,14 @@ code_change(Vsn, State, _Extra) when Vsn =:= "4.3.0" ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ensure the stats %% Ensure the stats
-spec ensure_stats_event(state()) -> state().
ensure_stats_event(State) -> ensure_stats_event(State) ->
StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'), StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'),
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
State#{stats_fun => StatsFun, stats_timer => StatsTimer}. State#{stats_fun => StatsFun, stats_timer => StatsTimer}.
%% Ensure publish timer %% Ensure publish timer
-spec ensure_publish_timer(state()) -> state().
ensure_publish_timer(State) -> ensure_publish_timer(State) ->
ensure_publish_timer(mnesia:dirty_first(?TAB), State). ensure_publish_timer(mnesia:dirty_first(?TAB), State).
@ -222,4 +234,3 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
-spec(delayed_count() -> non_neg_integer()). -spec(delayed_count() -> non_neg_integer()).
delayed_count() -> mnesia:table_info(?TAB, size). delayed_count() -> mnesia:table_info(?TAB, size).

View File

@ -20,6 +20,7 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
%% emqx_gen_mod callbacks %% emqx_gen_mod callbacks
-export([ load/1 -export([ load/1
@ -38,14 +39,33 @@ load(Topics) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}). emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) -> on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) ->
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) OptFun = case ProtoVer of
?MQTT_PROTO_V5 -> fun(X) -> X end;
_ -> fun(#{qos := Qos}) -> #{qos => Qos} end
end, end,
TopicFilters = case ProtoVer of
?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics]; Fold = fun({Topic, SubOpts}, Acc) ->
_ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics] case rep(Topic, ClientId, Username) of
end, {error, Reason} ->
self() ! {subscribe, TopicFilters}. ?LOG(warning, "auto subscribe ignored, topic filter:~ts reason:~p~n",
[Topic, Reason]),
Acc;
<<>> ->
?LOG(warning, "auto subscribe ignored, topic filter:~ts"
" reason: topic can't be empty~n",
[Topic]),
Acc;
NewTopic ->
[{NewTopic, OptFun(SubOpts)} | Acc]
end
end,
case lists:foldl(Fold, [], Topics) of
[] -> ok;
TopicFilters ->
self() ! {subscribe, TopicFilters}
end.
unload(_) -> unload(_) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
@ -56,10 +76,24 @@ description() ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
rep(<<"%c">>, ClientId, Topic) -> rep(Topic, ClientId, Username) ->
emqx_topic:feed_var(<<"%c">>, ClientId, Topic); Words = emqx_topic:words(Topic),
rep(<<"%u">>, undefined, Topic) -> rep(Words, ClientId, Username, []).
Topic;
rep(<<"%u">>, Username, Topic) ->
emqx_topic:feed_var(<<"%u">>, Username, Topic).
rep([<<"%c">> | T], ClientId, Username, Acc) ->
rep(T,
ClientId,
Username,
[ClientId | Acc]);
rep([<<"%u">> | _], _, undefined, _) ->
{error, username_undefined};
rep([<<"%u">> | T], ClientId, Username, Acc) ->
rep(T,
ClientId,
Username,
[Username | Acc]);
rep([H | T], ClientId, UserName, Acc) ->
rep(T, ClientId, UserName, [H | Acc]);
rep([], _, _, Acc) ->
emqx_topic:join(lists:reverse(Acc)).

View File

@ -1,6 +1,6 @@
{application, emqx_modules, {application, emqx_modules,
[{description, "EMQ X Module Management"}, [{description, "EMQ X Module Management"},
{vsn, "4.3.4"}, {vsn, "4.3.5"},
{modules, []}, {modules, []},
{applications, [kernel,stdlib]}, {applications, [kernel,stdlib]},
{mod, {emqx_modules_app, []}}, {mod, {emqx_modules_app, []}},

View File

@ -1,33 +1,39 @@
%% -*-: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[ [{"4.3.4",
{<<"4\\.3\\.[2-3]">>, [ [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []} {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
]}, {<<"4\\.3\\.[2-3]">>,
{"4.3.1", [ [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]},
]}, {"4.3.1",
{"4.3.0", [ [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
{update, emqx_mod_delayed, {advanced, []}}, {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
]}, {"4.3.0",
{<<".*">>, []} [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
], {update,emqx_mod_delayed,{advanced,[]}},
[ {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
{<<"4\\.3\\.[2-3]">>, [ {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []} {<<".*">>,[]}],
]}, [{"4.3.4",
{"4.3.1", [ [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]}]},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} {<<"4\\.3\\.[2-3]">>,
]}, [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
{"4.3.0", [ {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
{update, emqx_mod_delayed, {advanced, []}}, {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]}]},
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}, {"4.3.1",
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []} [{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_mod_delayed,brutal_purge,soft_purge,[]},
{<<".*">>, []} {load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
] {load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
}. {"4.3.0",
[{load_module,emqx_mod_subscription,brutal_purge,soft_purge,[]},
{update,emqx_mod_delayed,{advanced,[]}},
{load_module,emqx_mod_presence,brutal_purge,soft_purge,[]},
{load_module,emqx_mod_api_topic_metrics,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}.

View File

@ -60,7 +60,7 @@ t_suboption(_) ->
Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end, Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end,
Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2}, Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2},
?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])), ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])),
{ok, C1} = emqtt:start_link([{proto_ver, v5}]), {ok, C1} = emqtt:start_link([{proto_ver, v5}, {username, "admin"}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
timer:sleep(200), timer:sleep(200),
[CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)), [CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)),
@ -69,7 +69,7 @@ t_suboption(_) ->
?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1), ?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1),
ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C1),
%% The subscription option is not valid for MQTT V3.1.1 %% The subscription option is not valid for MQTT V3.1.1
{ok, C2} = emqtt:start_link([{proto_ver, v4}]), {ok, C2} = emqtt:start_link([{proto_ver, v4}, {username, "admin"}]),
{ok, _} = emqtt:connect(C2), {ok, _} = emqtt:connect(C2),
timer:sleep(200), timer:sleep(200),
[CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)), [CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)),

View File

@ -38,7 +38,9 @@ PREV_TAG="$(git describe --tag --match "${TAG_PREFIX}*" | grep -oE "${TAG_PREFIX
PREV_VERSION="${PREV_TAG#[e|v]}" PREV_VERSION="${PREV_TAG#[e|v]}"
shift 1 shift 1
ESCRIPT_ARGS=() # bash 3.2 treat empty array as unbound, so we can't use 'ESCRIPT_ARGS=()' here,
# but must add an empty-string element to the array
ESCRIPT_ARGS=( '' )
while [ "$#" -gt 0 ]; do while [ "$#" -gt 0 ]; do
case $1 in case $1 in
-h|--help) -h|--help)
@ -103,12 +105,15 @@ if [ ! -d "${PREV_TAG}" ]; then
fi fi
popd popd
# bash 3.2 does not allow empty array, so we had to add an empty string in the ESCRIPT_ARGS array,
# this in turn makes quoting "${ESCRIPT_ARGS[@]}" problematic, hence disable SC2068 check here
# shellcheck disable=SC2068
./scripts/update_appup.escript \ ./scripts/update_appup.escript \
--src-dirs "${SRC_DIRS}/**" \ --src-dirs "${SRC_DIRS}/**" \
--release-dir "_build/${PROFILE}/lib" \ --release-dir "_build/${PROFILE}/lib" \
--prev-release-dir "${PREV_DIR_BASE}/${PREV_TAG}/emqx/lib" \ --prev-release-dir "${PREV_DIR_BASE}/${PREV_TAG}/emqx/lib" \
--skip-build \ --skip-build \
"${ESCRIPT_ARGS[@]}" "$PREV_VERSION" ${ESCRIPT_ARGS[@]} "$PREV_VERSION"
if [ "${IS_CHECK:-}" = 'yes' ]; then if [ "${IS_CHECK:-}" = 'yes' ]; then
diffs="$(git diff --name-only | grep -E '\.appup\.src' || true)" diffs="$(git diff --name-only | grep -E '\.appup\.src' || true)"

View File

@ -324,6 +324,8 @@ process_old_action({add_module, Module}) ->
[Module]; [Module];
process_old_action({delete_module, Module}) -> process_old_action({delete_module, Module}) ->
[Module]; [Module];
process_old_action({update, Module, _Change}) ->
[Module];
process_old_action(LoadModule) when is_tuple(LoadModule) andalso process_old_action(LoadModule) when is_tuple(LoadModule) andalso
element(1, LoadModule) =:= load_module -> element(1, LoadModule) =:= load_module ->
element(2, LoadModule); element(2, LoadModule);

View File

@ -13,10 +13,12 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}]},
{"4.3.12", {"4.3.12",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
@ -40,6 +42,7 @@
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
@ -63,6 +66,7 @@
{"4.3.10", {"4.3.10",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -87,6 +91,7 @@
{"4.3.9", {"4.3.9",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
@ -115,6 +120,7 @@
{"4.3.8", {"4.3.8",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
@ -410,6 +416,7 @@
{load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -422,6 +429,7 @@
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
@ -442,6 +450,7 @@
{"4.3.11", {"4.3.11",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -465,6 +474,7 @@
{"4.3.10", {"4.3.10",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
@ -488,6 +498,7 @@
{"4.3.9", {"4.3.9",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
@ -515,6 +526,7 @@
{"4.3.8", {"4.3.8",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]}, {load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},

View File

@ -924,9 +924,23 @@ return_sub_unsub_ack(Packet, Channel) ->
-> {reply, Reply :: term(), channel()} -> {reply, Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}).
handle_call(kick, Channel) -> handle_call(kick, Channel = #channel{
Channel1 = ensure_disconnected(kicked, Channel), conn_state = ConnState,
disconnect_and_shutdown(kicked, ok, Channel1); will_msg = WillMsg,
conninfo = #{proto_ver := ProtoVer}
}) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
Channel1 = case ConnState of
connected -> ensure_disconnected(kicked, Channel);
_ -> Channel
end,
case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
true ->
shutdown(kicked, ok,
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1);
_ ->
shutdown(kicked, ok, Channel1)
end;
handle_call(discard, Channel) -> handle_call(discard, Channel) ->
disconnect_and_shutdown(discarded, ok, Channel); disconnect_and_shutdown(discarded, ok, Channel);
@ -984,7 +998,7 @@ handle_info({sock_closed, Reason}, Channel =
clientinfo = ClientInfo = #{zone := Zone}}) -> clientinfo = ClientInfo = #{zone := Zone}}) ->
emqx_zone:enable_flapping_detect(Zone) emqx_zone:enable_flapping_detect(Zone)
andalso emqx_flapping:detect(ClientInfo), andalso emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of case maybe_shutdown(Reason, Channel1) of
{ok, Channel2} -> {ok, {event, disconnected}, Channel2}; {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
Shutdown -> Shutdown Shutdown -> Shutdown
@ -1645,9 +1659,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Maybe Publish will msg %% Maybe Publish will msg
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel; Channel;
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
case will_delay_interval(WillMsg) of case will_delay_interval(WillMsg) of
0 -> 0 ->
ok = publish_will_msg(WillMsg), ok = publish_will_msg(WillMsg),
@ -1657,10 +1671,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
end. end.
will_delay_interval(WillMsg) -> will_delay_interval(WillMsg) ->
maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). maps:get('Will-Delay-Interval',
emqx_message:get_header(properties, WillMsg, #{}), 0).
publish_will_msg(Msg) -> publish_will_msg(Msg) ->
%% TODO check if we should discard result here
_ = emqx_broker:publish(Msg), _ = emqx_broker:publish(Msg),
ok. ok.

View File

@ -52,6 +52,26 @@
, hexstr2bin/1 , hexstr2bin/1
]). ]).
-export([ is_sane_id/1
]).
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}.
is_sane_id(Str) ->
StrLen = len(Str),
case StrLen > 0 andalso StrLen =< 256 of
true ->
case re:run(Str, ?VALID_STR_RE) of
nomatch -> {error, <<"required: " ?VALID_STR_RE>>};
_ -> ok
end;
false -> {error, <<"0 < Length =< 256">>}
end.
len(Bin) when is_binary(Bin) -> byte_size(Bin);
len(Str) when is_list(Str) -> length(Str).
-define(OOM_FACTOR, 1.25). -define(OOM_FACTOR, 1.25).
%% @doc Parse v4 or v6 string format address to tuple. %% @doc Parse v4 or v6 string format address to tuple.
@ -309,4 +329,30 @@ hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
ipv6_probe_test() -> ipv6_probe_test() ->
?assertEqual([{ipv6_probe, true}], ipv6_probe([])). ?assertEqual([{ipv6_probe, true}], ipv6_probe([])).
is_sane_id_test() ->
?assertMatch({error, _}, is_sane_id("")),
?assertMatch({error, _}, is_sane_id("_")),
?assertMatch({error, _}, is_sane_id("_aaa")),
?assertMatch({error, _}, is_sane_id("lkad/oddl")),
?assertMatch({error, _}, is_sane_id("lkad*oddl")),
?assertMatch({error, _}, is_sane_id("script>lkadoddl")),
?assertMatch({error, _}, is_sane_id("<script>lkadoddl")),
?assertMatch(ok, is_sane_id(<<"Abckdf_lkdfd_1222">>)),
?assertMatch(ok, is_sane_id("Abckdf_lkdfd_1222")),
?assertMatch(ok, is_sane_id("abckdf_lkdfd_1222")),
?assertMatch(ok, is_sane_id("abckdflkdfd1222")),
?assertMatch(ok, is_sane_id("abckdflkdf")),
?assertMatch(ok, is_sane_id("a1122222")),
?assertMatch(ok, is_sane_id("1223333434")),
?assertMatch(ok, is_sane_id("1lkdfaldk")),
Ok = lists:flatten(lists:duplicate(256, "a")),
Bad = Ok ++ "a",
?assertMatch(ok, is_sane_id(Ok)),
?assertMatch(ok, is_sane_id(list_to_binary(Ok))),
?assertMatch({error, _}, is_sane_id(Bad)),
?assertMatch({error, _}, is_sane_id(list_to_binary(Bad))),
ok.
-endif. -endif.

View File

@ -577,7 +577,32 @@ t_handle_out_unexpected(_) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_handle_call_kick(_) -> t_handle_call_kick(_) ->
{shutdown, kicked, ok, _Chan} = emqx_channel:handle_call(kick, channel()). Channelv5 = channel(),
Channelv4 = v4(Channelv5),
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4),
{shutdown, kicked, ok,
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
_} = emqx_channel:handle_call(kick, Channelv5),
DisconnectedChannelv5 = channel(#{conn_state => disconnected}),
DisconnectedChannelv4 = v4(DisconnectedChannelv5),
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5),
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4).
t_handle_kicked_publish_will_msg(_) ->
Self = self(),
ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end),
Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>),
{shutdown, kicked, ok,
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
_} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})),
receive
{pub, Msg} -> ok
after 200 -> exit(will_message_not_published)
end.
t_handle_call_discard(_) -> t_handle_call_discard(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
@ -858,3 +883,11 @@ session(InitFields) when is_map(InitFields) ->
quota() -> quota() ->
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}}, emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
{overall_messages_routing, {10, 1}}]). {overall_messages_routing, {10, 1}}]).
v4(Channel) ->
ConnInfo = emqx_channel:info(conninfo, Channel),
emqx_channel:set_field(
conninfo,
maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo),
Channel
).