Merge pull request #7149 from zmstone/merge-4.3-to-4.4
Merge 4.3 to 4.4
This commit is contained in:
commit
e7f39d4cfe
|
@ -20,6 +20,7 @@
|
||||||
?>
|
?>
|
||||||
|
|
||||||
[shell emqx]
|
[shell emqx]
|
||||||
|
!OLD_VSN=$(echo $OLD_VSN | sed -r 's/[v|e]//g')
|
||||||
!cd $PACKAGE_PATH
|
!cd $PACKAGE_PATH
|
||||||
!unzip -q -o $PROFILE-$(echo $OLD_VSN | sed -r 's/[v|e]//g')-otp${FROM_OTP_VSN}-ubuntu20.04-amd64.zip
|
!unzip -q -o $PROFILE-$(echo $OLD_VSN | sed -r 's/[v|e]//g')-otp${FROM_OTP_VSN}-ubuntu20.04-amd64.zip
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
@ -32,6 +33,7 @@
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
[shell emqx2]
|
[shell emqx2]
|
||||||
|
!OLD_VSN=$(echo $OLD_VSN | sed -r 's/[v|e]//g')
|
||||||
!cd $PACKAGE_PATH
|
!cd $PACKAGE_PATH
|
||||||
!cp -f $ONE_MORE_EMQX_PATH/one_more_$(echo $PROFILE | sed 's/-/_/g').sh .
|
!cp -f $ONE_MORE_EMQX_PATH/one_more_$(echo $PROFILE | sed 's/-/_/g').sh .
|
||||||
!./one_more_$(echo $PROFILE | sed 's/-/_/g').sh emqx2
|
!./one_more_$(echo $PROFILE | sed 's/-/_/g').sh emqx2
|
||||||
|
@ -84,6 +86,27 @@
|
||||||
|
|
||||||
!cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/
|
!cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/
|
||||||
|
|
||||||
|
## upgrade to the new version
|
||||||
|
!./bin/emqx install $VSN
|
||||||
|
?Made release permanent: "$VSN"
|
||||||
|
?SH-PROMPT
|
||||||
|
|
||||||
|
!./bin/emqx versions |grep permanent
|
||||||
|
?(.*)$VSN
|
||||||
|
?SH-PROMPT
|
||||||
|
|
||||||
|
## downgrade to the old version
|
||||||
|
!./bin/emqx install $${OLD_VSN}
|
||||||
|
?Made release permanent:.*
|
||||||
|
?SH-PROMPT
|
||||||
|
|
||||||
|
!./bin/emqx versions |grep permanent | grep -qs "$${OLD_VSN}"
|
||||||
|
?SH-PROMPT:
|
||||||
|
!echo ==$$?==
|
||||||
|
?^==0==
|
||||||
|
?SH-PROMPT:
|
||||||
|
|
||||||
|
## again, upgrade to the new version
|
||||||
!./bin/emqx install $VSN
|
!./bin/emqx install $VSN
|
||||||
?Made release permanent: "$VSN"
|
?Made release permanent: "$VSN"
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
@ -109,6 +132,27 @@
|
||||||
|
|
||||||
!cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/
|
!cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/
|
||||||
|
|
||||||
|
## upgrade to the new version
|
||||||
|
!./bin/emqx install $VSN
|
||||||
|
?Made release permanent: "$VSN"
|
||||||
|
?SH-PROMPT
|
||||||
|
|
||||||
|
!./bin/emqx versions |grep permanent
|
||||||
|
?(.*)$VSN
|
||||||
|
?SH-PROMPT
|
||||||
|
|
||||||
|
## downgrade to the old version
|
||||||
|
!./bin/emqx install $${OLD_VSN}
|
||||||
|
?Made release permanent:.*
|
||||||
|
?SH-PROMPT
|
||||||
|
|
||||||
|
!./bin/emqx versions |grep permanent | grep -qs "$${OLD_VSN}"
|
||||||
|
?SH-PROMPT:
|
||||||
|
!echo ==$$?==
|
||||||
|
?^==0==
|
||||||
|
?SH-PROMPT:
|
||||||
|
|
||||||
|
## again, upgrade to the new version
|
||||||
!./bin/emqx install $VSN
|
!./bin/emqx install $VSN
|
||||||
?Made release permanent: "$VSN"
|
?Made release permanent: "$VSN"
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
@ -138,17 +182,20 @@
|
||||||
!./bin/emqx_ctl broker metrics | grep "messages.publish"
|
!./bin/emqx_ctl broker metrics | grep "messages.publish"
|
||||||
???SH-PROMPT
|
???SH-PROMPT
|
||||||
|
|
||||||
|
## We don't guarantee not to lose a single message!
|
||||||
|
## So even if we received 290~300 messages, we consider it as success
|
||||||
[shell bench]
|
[shell bench]
|
||||||
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].metrics[] | select(.node==\"emqx@127.0.0.1\").matched"
|
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].metrics[] | select(.node==\"emqx@127.0.0.1\").matched"
|
||||||
?300
|
?(29[0-9])|(300)
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].actions[0].metrics[] | select(.node==\"emqx@127.0.0.1\").success"
|
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].actions[0].metrics[] | select(.node==\"emqx@127.0.0.1\").success"
|
||||||
?300
|
?(29[0-9])|(300)
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
|
## The /counter API is provided by .ci/fvt_test/http_server
|
||||||
!curl http://127.0.0.1:8080/counter
|
!curl http://127.0.0.1:8080/counter
|
||||||
???{"data":300,"code":0}
|
?\{"data":(29[0-9])|(300),"code":0\}
|
||||||
?SH-PROMPT
|
?SH-PROMPT
|
||||||
|
|
||||||
[shell emqx2]
|
[shell emqx2]
|
||||||
|
|
|
@ -12,15 +12,30 @@ File format:
|
||||||
|
|
||||||
## v4.3.13
|
## v4.3.13
|
||||||
|
|
||||||
|
### Important changes
|
||||||
|
|
||||||
|
* For docker image, /opt/emqx/etc has been removed from the VOLUME list,
|
||||||
|
this made it easier for the users to rebuild image on top with changed configs.
|
||||||
|
|
||||||
### Enhancements
|
### Enhancements
|
||||||
|
|
||||||
* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
|
* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
|
||||||
to force an immediate reload of all certificates after the files are updated on disk.
|
to force an immediate reload of all certificates after the files are updated on disk.
|
||||||
|
* Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983]
|
||||||
|
* Force shutdown of processe that cannot answer takeover event [#7026]
|
||||||
|
|
||||||
|
* `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter)
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
||||||
|
* Fix the `{error,eexist}` error when do release upgrade again if last run failed. [#7121]
|
||||||
* Fix case where publishing to a non-existent topic alias would crash the connection [#6979]
|
* Fix case where publishing to a non-existent topic alias would crash the connection [#6979]
|
||||||
* Fix HTTP-API 500 error on querying the lwm2m client list on the another node [#7009]
|
* Fix HTTP-API 500 error on querying the lwm2m client list on the another node [#7009]
|
||||||
|
* Fix the ExProto connection registry is not released after the client process abnormally exits [#6983]
|
||||||
|
* Fix Server-KeepAlive wrongly applied on MQTT v3.0/v3.1 [#7085]
|
||||||
|
* Fix Stomp client can not trigger `$event/client_connection` message [#7096]
|
||||||
|
* Fix system memory false alarm at boot
|
||||||
|
* Fix the MQTT-SN message replay when the topic is not registered to the client [#6970]
|
||||||
|
|
||||||
## v4.3.12
|
## v4.3.12
|
||||||
### Important changes
|
### Important changes
|
||||||
|
|
1
Makefile
1
Makefile
|
@ -7,7 +7,6 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-5:24.1.5-3-alpine3.1
|
||||||
export EMQX_DEFAULT_RUNNER = alpine:3.14
|
export EMQX_DEFAULT_RUNNER = alpine:3.14
|
||||||
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
|
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
|
||||||
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
|
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
|
||||||
export EMQX_DESC ?= EMQ X
|
|
||||||
export EMQX_CE_DASHBOARD_VERSION ?= v4.4.0
|
export EMQX_CE_DASHBOARD_VERSION ?= v4.4.0
|
||||||
export DOCKERFILE := deploy/docker/Dockerfile
|
export DOCKERFILE := deploy/docker/Dockerfile
|
||||||
export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing
|
export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing
|
||||||
|
|
|
@ -37,6 +37,11 @@
|
||||||
, handle_disconnected/2
|
, handle_disconnected/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% for testing
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([ replvar/1 ]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
|
||||||
|
@ -176,13 +181,13 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
|
||||||
end
|
end
|
||||||
end, Subscriptions).
|
end, Subscriptions).
|
||||||
|
|
||||||
|
replvar(Options) ->
|
||||||
|
replvar([topic, clientid, max_inflight], Options).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal funcs
|
%% Internal funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
replvar(Options) ->
|
|
||||||
replvar([clientid, max_inflight], Options).
|
|
||||||
|
|
||||||
replvar([], Options) ->
|
replvar([], Options) ->
|
||||||
Options;
|
Options;
|
||||||
replvar([Key|More], Options) ->
|
replvar([Key|More], Options) ->
|
||||||
|
@ -194,8 +199,8 @@ replvar([Key|More], Options) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% ${node} => node()
|
%% ${node} => node()
|
||||||
feedvar(clientid, ClientId, _) ->
|
feedvar(Key, Value, _) when Key =:= topic; Key =:= clientid ->
|
||||||
iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node())));
|
iolist_to_binary(re:replace(Value, "\\${node}", atom_to_list(node())));
|
||||||
|
|
||||||
feedvar(max_inflight, 0, _) ->
|
feedvar(max_inflight, 0, _) ->
|
||||||
infinity;
|
infinity;
|
||||||
|
|
|
@ -45,3 +45,14 @@ send_and_ack_test() ->
|
||||||
after
|
after
|
||||||
meck:unload(emqtt)
|
meck:unload(emqtt)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
replvar_test() ->
|
||||||
|
Node = atom_to_list(node()),
|
||||||
|
Config = #{clientid => <<"Hey ${node}">>, topic => <<"topic ${node}">>, other => <<"other">>},
|
||||||
|
|
||||||
|
ReplacedConfig = emqx_bridge_mqtt:replvar(Config),
|
||||||
|
|
||||||
|
ExpectedConfig = #{clientid => iolist_to_binary("Hey " ++ Node),
|
||||||
|
topic => iolist_to_binary("topic " ++ Node),
|
||||||
|
other => <<"other">>},
|
||||||
|
?assertEqual(ExpectedConfig, ReplacedConfig).
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_exproto,
|
{application, emqx_exproto,
|
||||||
[{description, "EMQ X Extension for Protocol"},
|
[{description, "EMQ X Extension for Protocol"},
|
||||||
{vsn, "4.3.5"}, %% 4.3.3 is used by ee
|
{vsn, "4.3.6"}, %% 4.3.3 is used by ee
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_exproto_app, []}},
|
{mod, {emqx_exproto_app, []}},
|
||||||
|
|
|
@ -1,28 +1,24 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.4",
|
[{<<"4\\.3\\.[4-5]">>,
|
||||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
|
||||||
{"4.3.3",
|
|
||||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.2",
|
{<<"4\\.3\\.[2-3]">>,
|
||||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4.3.[0-1]">>,
|
{<<"4\\.3\\.[0-1]">>,
|
||||||
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.4",
|
[{<<"4\\.3\\.[4-5]">>,
|
||||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
|
||||||
{"4.3.3",
|
|
||||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.2",
|
{<<"4\\.3\\.[2-3]">>,
|
||||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4.3.[0-1]">>,
|
{<<"4\\.3\\.[0-1]">>,
|
||||||
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -94,9 +94,6 @@
|
||||||
awaiting_rel_max
|
awaiting_rel_max
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(CHANMOCK(P), {exproto_anonymous_client, P}).
|
|
||||||
-define(CHAN_CONN_TAB, emqx_channel_conn).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Attrs and Caps
|
%% Info, Attrs and Caps
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -155,15 +152,14 @@ init(ConnInfo = #{socktype := Socktype,
|
||||||
Channel = #channel{gcli = #{channel => GRpcChann},
|
Channel = #channel{gcli = #{channel => GRpcChann},
|
||||||
conninfo = NConnInfo,
|
conninfo = NConnInfo,
|
||||||
clientinfo = ClientInfo,
|
clientinfo = ClientInfo,
|
||||||
conn_state = connecting,
|
conn_state = accepted,
|
||||||
timers = #{}
|
timers = #{}
|
||||||
},
|
},
|
||||||
case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
|
case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
throw(nopermission);
|
throw(nopermission);
|
||||||
_ ->
|
_ ->
|
||||||
ConnMod = maps:get(conn_mod, NConnInfo),
|
ok = register_the_anonymous_client(ClientInfo, NConnInfo),
|
||||||
true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}),
|
|
||||||
Req = #{conninfo =>
|
Req = #{conninfo =>
|
||||||
peercert(Peercert,
|
peercert(Peercert,
|
||||||
#{socktype => socktype(Socktype),
|
#{socktype => socktype(Socktype),
|
||||||
|
@ -172,6 +168,22 @@ init(ConnInfo = #{socktype := Socktype,
|
||||||
try_dispatch(on_socket_created, wrap(Req), Channel)
|
try_dispatch(on_socket_created, wrap(Req), Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
register_the_anonymous_client(ClientInfo, ConnInfo) ->
|
||||||
|
ClientId = maps:get(clientid, ClientInfo),
|
||||||
|
case emqx_cm:open_session(true, ClientInfo, ConnInfo) of
|
||||||
|
{ok, _} ->
|
||||||
|
?LOG(debug, "Registered an anonymous connection, "
|
||||||
|
"temporary clientid: ~s", [ClientId]),
|
||||||
|
emqx_logger:set_metadata_clientid(ClientId),
|
||||||
|
_ = self() ! {event, accepted},
|
||||||
|
ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
throw({register_anonymous_error, Reason})
|
||||||
|
end.
|
||||||
|
|
||||||
|
unregister_the_anonymous_client(ClientId) ->
|
||||||
|
emqx_cm:unregister_channel(ClientId).
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
peercert(NoSsl, ConnInfo) when NoSsl == nossl;
|
peercert(NoSsl, ConnInfo) when NoSsl == nossl;
|
||||||
NoSsl == undefined ->
|
NoSsl == undefined ->
|
||||||
|
@ -274,15 +286,14 @@ handle_call(close, Channel) ->
|
||||||
handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
|
handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
|
||||||
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
|
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
|
||||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
|
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
|
||||||
handle_call({auth, ClientInfo0, Password},
|
handle_call({auth, RequestedClientInfo, Password},
|
||||||
Channel = #channel{conninfo = ConnInfo,
|
Channel = #channel{conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo0}) ->
|
||||||
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
|
ClientInfo1 = enrich_clientinfo(RequestedClientInfo, ClientInfo0),
|
||||||
NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo),
|
NConnInfo = enrich_conninfo(RequestedClientInfo, ConnInfo),
|
||||||
|
|
||||||
Channel1 = Channel#channel{conninfo = NConnInfo,
|
Channel1 = Channel#channel{conninfo = NConnInfo,
|
||||||
clientinfo = ClientInfo1},
|
clientinfo = ClientInfo1},
|
||||||
|
|
||||||
#{clientid := ClientId, username := Username} = ClientInfo1,
|
#{clientid := ClientId, username := Username} = ClientInfo1,
|
||||||
|
|
||||||
case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of
|
case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of
|
||||||
|
@ -292,9 +303,10 @@ handle_call({auth, ClientInfo0, Password},
|
||||||
emqx_metrics:inc('client.auth.anonymous'),
|
emqx_metrics:inc('client.auth.anonymous'),
|
||||||
NClientInfo = maps:merge(ClientInfo1, AuthResult),
|
NClientInfo = maps:merge(ClientInfo1, AuthResult),
|
||||||
NChannel = Channel1#channel{clientinfo = NClientInfo},
|
NChannel = Channel1#channel{clientinfo = NClientInfo},
|
||||||
clean_anonymous_clients(),
|
|
||||||
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
|
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
|
||||||
{ok, _Session} ->
|
{ok, _Session} ->
|
||||||
|
AnonymousClientId = maps:get(clientid, ClientInfo0),
|
||||||
|
unregister_the_anonymous_client(AnonymousClientId),
|
||||||
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
|
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
|
||||||
[ClientId, Username]),
|
[ClientId, Username]),
|
||||||
{reply, ok, [{event, connected}], ensure_connected(NChannel)};
|
{reply, ok, [{event, connected}], ensure_connected(NChannel)};
|
||||||
|
@ -354,6 +366,9 @@ handle_call({publish, Topic, Qos, Payload},
|
||||||
handle_call(kick, Channel) ->
|
handle_call(kick, Channel) ->
|
||||||
{shutdown, kicked, ok, Channel};
|
{shutdown, kicked, ok, Channel};
|
||||||
|
|
||||||
|
handle_call(discard, Channel) ->
|
||||||
|
{shutdown, discarded, ok, Channel};
|
||||||
|
|
||||||
handle_call(Req, Channel) ->
|
handle_call(Req, Channel) ->
|
||||||
?LOG(warning, "Unexpected call: ~p", [Req]),
|
?LOG(warning, "Unexpected call: ~p", [Req]),
|
||||||
{reply, {error, unexpected_call}, Channel}.
|
{reply, {error, unexpected_call}, Channel}.
|
||||||
|
@ -406,16 +421,12 @@ handle_info(Info, Channel) ->
|
||||||
|
|
||||||
-spec(terminate(any(), channel()) -> channel()).
|
-spec(terminate(any(), channel()) -> channel()).
|
||||||
terminate(Reason, Channel) ->
|
terminate(Reason, Channel) ->
|
||||||
clean_anonymous_clients(),
|
|
||||||
Req = #{reason => stringfy(Reason)},
|
Req = #{reason => stringfy(Reason)},
|
||||||
try_dispatch(on_socket_closed, wrap(Req), Channel).
|
try_dispatch(on_socket_closed, wrap(Req), Channel).
|
||||||
|
|
||||||
is_anonymous(#{anonymous := true}) -> true;
|
is_anonymous(#{anonymous := true}) -> true;
|
||||||
is_anonymous(_AuthResult) -> false.
|
is_anonymous(_AuthResult) -> false.
|
||||||
|
|
||||||
clean_anonymous_clients() ->
|
|
||||||
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
|
|
||||||
|
|
||||||
packet_to_message(Topic, Qos, Payload,
|
packet_to_message(Topic, Qos, Payload,
|
||||||
#channel{
|
#channel{
|
||||||
conninfo = #{proto_ver := ProtoVer},
|
conninfo = #{proto_ver := ProtoVer},
|
||||||
|
@ -608,23 +619,32 @@ default_conninfo(ConnInfo) ->
|
||||||
username => undefined,
|
username => undefined,
|
||||||
conn_props => #{},
|
conn_props => #{},
|
||||||
connected => true,
|
connected => true,
|
||||||
|
proto_name => <<"exproto">>,
|
||||||
|
proto_ver => <<"1.0">>,
|
||||||
connected_at => erlang:system_time(millisecond),
|
connected_at => erlang:system_time(millisecond),
|
||||||
keepalive => 0,
|
keepalive => 0,
|
||||||
receive_maximum => 0,
|
receive_maximum => 0,
|
||||||
expiry_interval => 0}.
|
expiry_interval => 0}.
|
||||||
|
|
||||||
default_clientinfo(#{peername := {PeerHost, _},
|
default_clientinfo(#{peername := {PeerHost, PeerPort},
|
||||||
sockname := {_, SockPort}}) ->
|
sockname := {_, SockPort}}) ->
|
||||||
#{zone => external,
|
#{zone => external,
|
||||||
protocol => undefined,
|
protocol => exproto,
|
||||||
peerhost => PeerHost,
|
peerhost => PeerHost,
|
||||||
sockport => SockPort,
|
sockport => SockPort,
|
||||||
clientid => undefined,
|
clientid => anonymous_clientid(PeerHost, PeerPort),
|
||||||
username => undefined,
|
username => undefined,
|
||||||
is_bridge => false,
|
is_bridge => false,
|
||||||
is_superuser => false,
|
is_superuser => false,
|
||||||
mountpoint => undefined}.
|
mountpoint => undefined}.
|
||||||
|
|
||||||
|
anonymous_clientid(PeerHost, PeerPort) ->
|
||||||
|
iolist_to_binary(
|
||||||
|
["exproto-anonymous-",
|
||||||
|
inet:ntoa(PeerHost), "-", integer_to_list(PeerPort),
|
||||||
|
"-", emqx_rule_id:gen()
|
||||||
|
]).
|
||||||
|
|
||||||
stringfy(Reason) ->
|
stringfy(Reason) ->
|
||||||
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).
|
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).
|
||||||
|
|
||||||
|
|
|
@ -439,7 +439,8 @@ handle_msg({close, Reason}, State) ->
|
||||||
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
|
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
handle_msg({event, connected}, State = #state{channel = Channel}) ->
|
handle_msg({event, Event}, State = #state{channel = Channel})
|
||||||
|
when Event == connected; Event == accepted ->
|
||||||
ClientId = emqx_exproto_channel:info(clientid, Channel),
|
ClientId = emqx_exproto_channel:info(clientid, Channel),
|
||||||
emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
|
emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
|
||||||
|
|
||||||
|
|
|
@ -297,6 +297,7 @@ format_channel_info({_Key, Info, Stats0}) ->
|
||||||
SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)),
|
SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)),
|
||||||
Connected = case maps:get(conn_state, Info, connected) of
|
Connected = case maps:get(conn_state, Info, connected) of
|
||||||
connected -> true;
|
connected -> true;
|
||||||
|
accepted -> true; %% for exproto anonymous clients
|
||||||
_ -> false
|
_ -> false
|
||||||
end,
|
end,
|
||||||
NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0),
|
NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0),
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
##====================================================================
|
##====================================================================
|
||||||
## Rule Engine for EMQ X R4.0
|
## EMQX Rule Engine
|
||||||
##====================================================================
|
##====================================================================
|
||||||
|
|
||||||
rule_engine.ignore_sys_message = on
|
rule_engine.ignore_sys_message = on
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_sn,
|
{application, emqx_sn,
|
||||||
[{description, "EMQ X MQTT-SN Plugin"},
|
[{description, "EMQ X MQTT-SN Plugin"},
|
||||||
{vsn, "4.3.5"}, % strict semver, bump manually!
|
{vsn, "4.3.6"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,esockd]},
|
{applications, [kernel,stdlib,esockd]},
|
||||||
|
|
|
@ -1,29 +1,25 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
{"4.3.4",[
|
{<<"4\\.3\\.[4-5]">>,[
|
||||||
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.3",[
|
{<<"4.3.[2-3]">>,[
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
|
||||||
]},
|
|
||||||
{"4.3.2", [
|
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.3.4",[
|
{<<"4\\.3\\.[4-5]">>,[
|
||||||
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.3.3",[
|
{<<"4.3.[2-3]">>,[
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
|
||||||
]},
|
|
||||||
{"4.3.2", [
|
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
|
|
|
@ -268,40 +268,81 @@ message_type(16#1d) ->
|
||||||
message_type(Type) ->
|
message_type(Type) ->
|
||||||
io_lib:format("Unknown Type ~p", [Type]).
|
io_lib:format("Unknown Type ~p", [Type]).
|
||||||
|
|
||||||
|
format(?SN_CONNECT_MSG(Flags, ProtocolId, Duration, ClientId)) ->
|
||||||
|
#mqtt_sn_flags{
|
||||||
|
will = Will,
|
||||||
|
clean_start = CleanStart} = Flags,
|
||||||
|
io_lib:format("SN_CONNECT(W~w, C~w, ProtocolId=~w, Duration=~w, "
|
||||||
|
"ClientId=~s)",
|
||||||
|
[bool(Will), bool(CleanStart),
|
||||||
|
ProtocolId, Duration, ClientId]);
|
||||||
|
format(?SN_CONNACK_MSG(ReturnCode)) ->
|
||||||
|
io_lib:format("SN_CONNACK(ReturnCode=~w)", [ReturnCode]);
|
||||||
|
format(?SN_WILLTOPICREQ_MSG()) ->
|
||||||
|
"SN_WILLTOPICREQ()";
|
||||||
|
format(?SN_WILLTOPIC_MSG(Flags, Topic)) ->
|
||||||
|
#mqtt_sn_flags{
|
||||||
|
qos = QoS,
|
||||||
|
retain = Retain} = Flags,
|
||||||
|
io_lib:format("SN_WILLTOPIC(Q~w, R~w, Topic=~s)",
|
||||||
|
[QoS, bool(Retain), Topic]);
|
||||||
|
format(?SN_WILLTOPIC_EMPTY_MSG) ->
|
||||||
|
"SN_WILLTOPIC(_)";
|
||||||
|
format(?SN_WILLMSGREQ_MSG()) ->
|
||||||
|
"SN_WILLMSGREQ()";
|
||||||
|
format(?SN_WILLMSG_MSG(Msg)) ->
|
||||||
|
io_lib:format("SN_WILLMSG_MSG(Msg=~p)", [Msg]);
|
||||||
format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) ->
|
format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_PUBLISH, ~s, TopicId=~w, MsgId=~w, Payload=~w",
|
#mqtt_sn_flags{
|
||||||
[format_flag(Flags), TopicId, MsgId, Data]);
|
dup = Dup,
|
||||||
format(?SN_PUBACK_MSG(Flags, MsgId, ReturnCode)) ->
|
qos = QoS,
|
||||||
io_lib:format("mqtt_sn_message SN_PUBACK, ~s, MsgId=~w, ReturnCode=~w",
|
retain = Retain,
|
||||||
[format_flag(Flags), MsgId, ReturnCode]);
|
topic_id_type = TopicIdType} = Flags,
|
||||||
|
io_lib:format("SN_PUBLISH(D~w, Q~w, R~w, TopicIdType=~w, TopicId=~w, "
|
||||||
|
"MsgId=~w, Payload=~p)",
|
||||||
|
[bool(Dup), QoS, bool(Retain),
|
||||||
|
TopicIdType, TopicId, MsgId, Data]);
|
||||||
|
format(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)) ->
|
||||||
|
io_lib:format("SN_PUBACK(TopicId=~w, MsgId=~w, ReturnCode=~w)",
|
||||||
|
[TopicId, MsgId, ReturnCode]);
|
||||||
format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) ->
|
format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_PUBCOMP, MsgId=~w", [MsgId]);
|
io_lib:format("SN_PUBCOMP(MsgId=~w)", [MsgId]);
|
||||||
format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) ->
|
format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_PUBREC, MsgId=~w", [MsgId]);
|
io_lib:format("SN_PUBREC(MsgId=~w)", [MsgId]);
|
||||||
format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) ->
|
format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_PUBREL, MsgId=~w", [MsgId]);
|
io_lib:format("SN_PUBREL(MsgId=~w)", [MsgId]);
|
||||||
format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
|
format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_SUBSCRIBE, ~s, MsgId=~w, TopicId=~w",
|
#mqtt_sn_flags{
|
||||||
[format_flag(Flags), Msgid, Topic]);
|
dup = Dup,
|
||||||
|
qos = QoS,
|
||||||
|
topic_id_type = TopicIdType} = Flags,
|
||||||
|
io_lib:format("SN_SUBSCRIBE(D~w, Q~w, TopicIdType=~w, MsgId=~w, "
|
||||||
|
"TopicId=~w)",
|
||||||
|
[bool(Dup), QoS, TopicIdType, Msgid, Topic]);
|
||||||
format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) ->
|
format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_SUBACK, ~s, MsgId=~w, TopicId=~w, ReturnCode=~w",
|
#mqtt_sn_flags{qos = QoS} = Flags,
|
||||||
[format_flag(Flags), MsgId, TopicId, ReturnCode]);
|
io_lib:format("SN_SUBACK(GrantedQoS=~w, MsgId=~w, TopicId=~w, "
|
||||||
|
"ReturnCode=~w)",
|
||||||
|
[QoS, MsgId, TopicId, ReturnCode]);
|
||||||
format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
|
format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_UNSUBSCRIBE, ~s, MsgId=~w, TopicId=~w",
|
#mqtt_sn_flags{topic_id_type = TopicIdType} = Flags,
|
||||||
[format_flag(Flags), Msgid, Topic]);
|
io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~s, MsgId=~w, TopicId=~w)",
|
||||||
|
[TopicIdType, Msgid, Topic]);
|
||||||
format(?SN_UNSUBACK_MSG(MsgId)) ->
|
format(?SN_UNSUBACK_MSG(MsgId)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_UNSUBACK, MsgId=~w", [MsgId]);
|
io_lib:format("SN_UNSUBACK(MsgId=~w)", [MsgId]);
|
||||||
format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) ->
|
format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_REGISTER, TopicId=~w, MsgId=~w, TopicName=~w",
|
io_lib:format("SN_REGISTER(TopicId=~w, MsgId=~w, TopicName=~s)",
|
||||||
[TopicId, MsgId, TopicName]);
|
[TopicId, MsgId, TopicName]);
|
||||||
format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
|
format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
|
||||||
io_lib:format("mqtt_sn_message SN_REGACK, TopicId=~w, MsgId=~w, ReturnCode=~w",
|
io_lib:format("SN_REGACK(TopicId=~w, MsgId=~w, ReturnCode=~w)",
|
||||||
[TopicId, MsgId, ReturnCode]);
|
[TopicId, MsgId, ReturnCode]);
|
||||||
|
format(?SN_PINGREQ_MSG(ClientId)) ->
|
||||||
|
io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]);
|
||||||
|
format(?SN_PINGRESP_MSG()) ->
|
||||||
|
"SN_PINGREQ()";
|
||||||
|
format(?SN_DISCONNECT_MSG(Duration)) ->
|
||||||
|
io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]);
|
||||||
|
|
||||||
format(#mqtt_sn_message{type = Type, variable = Var}) ->
|
format(#mqtt_sn_message{type = Type, variable = Var}) ->
|
||||||
io_lib:format("mqtt_sn_message type=~s, Var=~w", [emqx_sn_frame:message_type(Type), Var]).
|
io_lib:format("mqtt_sn_message(type=~s, Var=~w)",
|
||||||
|
[emqx_sn_frame:message_type(Type), Var]).
|
||||||
format_flag(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will, clean_start = CleanStart, topic_id_type = TopicType}) ->
|
|
||||||
io_lib:format("mqtt_sn_flags{dup=~p, qos=~p, retain=~p, will=~p, clean_session=~p, topic_id_type=~p}",
|
|
||||||
[Dup, QoS, Retain, Will, CleanStart, TopicType]);
|
|
||||||
format_flag(_Flag) -> "invalid flag".
|
|
||||||
|
|
||||||
|
|
|
@ -94,6 +94,8 @@
|
||||||
idle_timeout :: integer(),
|
idle_timeout :: integer(),
|
||||||
enable_qos3 = false :: boolean(),
|
enable_qos3 = false :: boolean(),
|
||||||
has_pending_pingresp = false :: boolean(),
|
has_pending_pingresp = false :: boolean(),
|
||||||
|
%% Store all qos0 messages for waiting REGACK
|
||||||
|
%% Note: QoS1/QoS2 messages will kept inflight queue
|
||||||
pending_topic_ids = #{} :: pending_msgs()
|
pending_topic_ids = #{} :: pending_msgs()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -490,7 +492,7 @@ handle_event({call, From}, Req, _StateName, State) ->
|
||||||
{reply, Reply, NState} ->
|
{reply, Reply, NState} ->
|
||||||
gen_server:reply(From, Reply),
|
gen_server:reply(From, Reply),
|
||||||
{keep_state, NState};
|
{keep_state, NState};
|
||||||
{stop, Reason, Reply, NState} ->
|
{shutdown, Reason, Reply, NState} ->
|
||||||
State0 = case NState#state.sockstate of
|
State0 = case NState#state.sockstate of
|
||||||
running ->
|
running ->
|
||||||
send_message(?SN_DISCONNECT_MSG(undefined), NState);
|
send_message(?SN_DISCONNECT_MSG(undefined), NState);
|
||||||
|
@ -518,10 +520,9 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_event(info, {deliver, _Topic, Msg}, asleep,
|
handle_event(info, {deliver, _Topic, Msg}, asleep,
|
||||||
State = #state{channel = Channel, pending_topic_ids = Pendings}) ->
|
State = #state{channel = Channel}) ->
|
||||||
% section 6.14, Support of sleeping clients
|
% section 6.14, Support of sleeping clients
|
||||||
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p",
|
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]),
|
||||||
[Msg, Pendings]),
|
|
||||||
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
|
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
|
||||||
Msg, emqx_channel:get_session(Channel)),
|
Msg, emqx_channel:get_session(Channel)),
|
||||||
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
|
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
|
||||||
|
@ -610,7 +611,7 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
|
||||||
{reply, Reply, NChannel} ->
|
{reply, Reply, NChannel} ->
|
||||||
{reply, Reply, State#state{channel = NChannel}};
|
{reply, Reply, State#state{channel = NChannel}};
|
||||||
{shutdown, Reason, Reply, NChannel} ->
|
{shutdown, Reason, Reply, NChannel} ->
|
||||||
stop(Reason, Reply, State#state{channel = NChannel})
|
{shutdown, Reason, Reply, State#state{channel = NChannel}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) ->
|
handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) ->
|
||||||
|
@ -723,11 +724,19 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
|
||||||
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
|
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
|
||||||
?SN_UNSUBACK_MSG(MsgId);
|
?SN_UNSUBACK_MSG(MsgId);
|
||||||
|
|
||||||
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) ->
|
mqtt2sn(
|
||||||
NewPacketId = if QoS =:= ?QOS_0 -> 0;
|
#mqtt_packet{header = #mqtt_packet_header{
|
||||||
|
type = ?PUBLISH,
|
||||||
|
qos = QoS,
|
||||||
|
%dup = Dup,
|
||||||
|
retain = Retain},
|
||||||
|
variable = #mqtt_packet_publish{
|
||||||
|
topic_name = Topic,
|
||||||
|
packet_id = PacketId},
|
||||||
|
payload = Payload}, #state{clientid = ClientId}) ->
|
||||||
|
NPacketId = if QoS =:= ?QOS_0 -> 0;
|
||||||
true -> PacketId
|
true -> PacketId
|
||||||
end,
|
end,
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
|
||||||
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of
|
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of
|
||||||
{predef, PredefTopicId} ->
|
{predef, PredefTopicId} ->
|
||||||
{?SN_PREDEFINED_TOPIC, PredefTopicId};
|
{?SN_PREDEFINED_TOPIC, PredefTopicId};
|
||||||
|
@ -737,8 +746,12 @@ mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channe
|
||||||
{?SN_SHORT_TOPIC, Topic}
|
{?SN_SHORT_TOPIC, Topic}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType},
|
Flags = #mqtt_sn_flags{
|
||||||
?SN_PUBLISH_MSG(Flags, TopicContent, NewPacketId, Payload);
|
%dup = Dup,
|
||||||
|
qos = QoS,
|
||||||
|
retain = Retain,
|
||||||
|
topic_id_type = TopicIdType},
|
||||||
|
?SN_PUBLISH_MSG(Flags, TopicContent, NPacketId, Payload);
|
||||||
|
|
||||||
mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
|
mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
|
||||||
% if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...)
|
% if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...)
|
||||||
|
@ -766,9 +779,10 @@ send_connack(State) ->
|
||||||
|
|
||||||
send_message(Msg = #mqtt_sn_message{type = Type},
|
send_message(Msg = #mqtt_sn_message{type = Type},
|
||||||
State = #state{sockpid = SockPid, peername = Peername}) ->
|
State = #state{sockpid = SockPid, peername = Peername}) ->
|
||||||
?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
|
?LOG(info, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
|
||||||
inc_outgoing_stats(Type),
|
inc_outgoing_stats(Type),
|
||||||
Data = emqx_sn_frame:serialize(Msg),
|
Data = emqx_sn_frame:serialize(Msg),
|
||||||
|
?LOG(debug, "SEND ~0p", [Data]),
|
||||||
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
|
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
|
||||||
SockPid ! {datagram, Peername, Data},
|
SockPid ! {datagram, Peername, Data},
|
||||||
State.
|
State.
|
||||||
|
@ -793,13 +807,6 @@ stop(Reason, State) ->
|
||||||
maybe_send_will_msg(Reason, State),
|
maybe_send_will_msg(Reason, State),
|
||||||
{stop, {shutdown, Reason}, State}.
|
{stop, {shutdown, Reason}, State}.
|
||||||
|
|
||||||
stop({shutdown, Reason}, Reply, State) ->
|
|
||||||
stop(Reason, Reply, State);
|
|
||||||
stop(Reason, Reply, State) ->
|
|
||||||
?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
|
|
||||||
maybe_send_will_msg(Reason, State),
|
|
||||||
{stop, {shutdown, Reason}, Reply, State}.
|
|
||||||
|
|
||||||
maybe_send_will_msg(normal, _State) ->
|
maybe_send_will_msg(normal, _State) ->
|
||||||
ok;
|
ok;
|
||||||
maybe_send_will_msg(_Reason, State) ->
|
maybe_send_will_msg(_Reason, State) ->
|
||||||
|
@ -825,6 +832,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
|
||||||
%% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
|
%% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
|
||||||
%% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
|
%% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
|
||||||
%% before it could start a new level 1 or 2 transaction.
|
%% before it could start a new level 1 or 2 transaction.
|
||||||
|
%%
|
||||||
|
%% FIXME: But we should have a re-try timer to re-send the inflight
|
||||||
|
%% qos1/qos2 message
|
||||||
OnlyOneInflight = #{'Receive-Maximum' => 1},
|
OnlyOneInflight = #{'Receive-Maximum' => 1},
|
||||||
ConnPkt = #mqtt_packet_connect{clientid = ClientId,
|
ConnPkt = #mqtt_packet_connect{clientid = ClientId,
|
||||||
clean_start = CleanStart,
|
clean_start = CleanStart,
|
||||||
|
@ -974,10 +984,15 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
|
||||||
undefined -> {keep_state, State};
|
undefined -> {keep_state, State};
|
||||||
TopicName ->
|
TopicName ->
|
||||||
%% notice that this TopicName maybe normal or predefined,
|
%% notice that this TopicName maybe normal or predefined,
|
||||||
%% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels
|
%% involving the predefined topic name in register to
|
||||||
{keep_state, send_register(TopicName, TopicId, MsgId, State)}
|
%% enhance the gateway's robustness even inconsistent
|
||||||
|
%% with MQTT-SN channel
|
||||||
|
{keep_state,
|
||||||
|
send_register(TopicName, TopicId, MsgId, State)}
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
|
%% XXX: We need to handle others error code
|
||||||
|
%% 'Rejection: congestion'
|
||||||
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
|
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
|
||||||
{keep_state, State}
|
{keep_state, State}
|
||||||
end.
|
end.
|
||||||
|
@ -1050,7 +1065,7 @@ handle_incoming(Packet, _StName, State) ->
|
||||||
channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
|
channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
|
||||||
_ = inc_incoming_stats(Type),
|
_ = inc_incoming_stats(Type),
|
||||||
ok = emqx_metrics:inc_recv(Packet),
|
ok = emqx_metrics:inc_recv(Packet),
|
||||||
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
?LOG(debug, "Transed-RECV ~s", [emqx_packet:format(Packet)]),
|
||||||
emqx_channel:handle_in(Packet, Channel).
|
emqx_channel:handle_in(Packet, Channel).
|
||||||
|
|
||||||
handle_outgoing(Packets, State) when is_list(Packets) ->
|
handle_outgoing(Packets, State) when is_list(Packets) ->
|
||||||
|
@ -1064,7 +1079,9 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
|
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
|
||||||
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
|
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
|
||||||
true -> register_and_notify_client(PubPkt, State);
|
true ->
|
||||||
|
%% TODO: only one REGISTER inflight if qos=0??
|
||||||
|
register_and_notify_client(PubPkt, State);
|
||||||
false -> send_message(mqtt2sn(PubPkt, State), State)
|
false -> send_message(mqtt2sn(PubPkt, State), State)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -1077,13 +1094,40 @@ cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) ->
|
||||||
Msgs = maps:get(pending_topic_ids, Pendings, []),
|
Msgs = maps:get(pending_topic_ids, Pendings, []),
|
||||||
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.
|
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.
|
||||||
|
|
||||||
replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State0) ->
|
replay_no_reg_pending_publishes(TopicId,
|
||||||
?LOG(debug, "replay non-registered publish message for topic-id: ~p, pendings: ~0p",
|
State0 = #state{
|
||||||
[TopicId, Pendings]),
|
pending_topic_ids = Pendings}) ->
|
||||||
|
?LOG(debug, "replay non-registered qos0 publish message for "
|
||||||
|
"topic-id: ~p, pendings: ~0p", [TopicId, Pendings]),
|
||||||
State = lists:foldl(fun(Msg, State1) ->
|
State = lists:foldl(fun(Msg, State1) ->
|
||||||
send_message(Msg, State1)
|
send_message(Msg, State1)
|
||||||
end, State0, maps:get(TopicId, Pendings, [])),
|
end, State0, maps:get(TopicId, Pendings, [])),
|
||||||
State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
|
|
||||||
|
NState = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)},
|
||||||
|
case replay_inflight_messages(TopicId, State#state.channel) of
|
||||||
|
[] -> ok;
|
||||||
|
Outgoings ->
|
||||||
|
?LOG(debug, "replay non-registered qos1/qos2 publish message "
|
||||||
|
"for topic-id: ~0p, messages: ~0p",
|
||||||
|
[TopicId, Outgoings]),
|
||||||
|
handle_outgoing(Outgoings, NState)
|
||||||
|
end.
|
||||||
|
|
||||||
|
replay_inflight_messages(TopicId, Channel) ->
|
||||||
|
Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)),
|
||||||
|
|
||||||
|
case emqx_inflight:to_list(Inflight) of
|
||||||
|
[] -> [];
|
||||||
|
[{PktId, {Msg, _Ts}}] -> %% Fixed inflight size 1
|
||||||
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
|
ReplayTopic = emqx_sn_registry:lookup_topic(ClientId, TopicId),
|
||||||
|
case ReplayTopic =:= emqx_message:topic(Msg) of
|
||||||
|
false -> [];
|
||||||
|
true ->
|
||||||
|
NMsg = emqx_message:set_flag(dup, true, Msg),
|
||||||
|
[emqx_message:to_packet(PktId, NMsg)]
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
|
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
|
||||||
State = #state{pending_topic_ids = Pendings, channel = Channel}) ->
|
State = #state{pending_topic_ids = Pendings, channel = Channel}) ->
|
||||||
|
@ -1091,10 +1135,17 @@ register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) =
|
||||||
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
|
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
|
||||||
ClientId = emqx_channel:info(clientid, Channel),
|
ClientId = emqx_channel:info(clientid, Channel),
|
||||||
TopicId = emqx_sn_registry:register_topic(ClientId, TopicName),
|
TopicId = emqx_sn_registry:register_topic(ClientId, TopicName),
|
||||||
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
|
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, "
|
||||||
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
|
"QoS=~p,Retain=~p, MsgId=~p",
|
||||||
NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State),
|
[TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
|
||||||
send_register(TopicName, TopicId, MsgId, State#state{pending_topic_ids = NewPendings}).
|
NPendings = case QoS == ?QOS_0 of
|
||||||
|
true ->
|
||||||
|
cache_no_reg_publish_message(
|
||||||
|
Pendings, TopicId, PubPkt, State);
|
||||||
|
_ -> Pendings
|
||||||
|
end,
|
||||||
|
send_register(TopicName, TopicId, MsgId,
|
||||||
|
State#state{pending_topic_ids = NPendings}).
|
||||||
|
|
||||||
message_id(undefined) ->
|
message_id(undefined) ->
|
||||||
rand:uniform(16#FFFF);
|
rand:uniform(16#FFFF);
|
||||||
|
|
|
@ -943,6 +943,151 @@ t_publish_qos2_case03(_) ->
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_delivery_qos1_register_invalid_topic_id(_) ->
|
||||||
|
Dup = 0,
|
||||||
|
QoS = 1,
|
||||||
|
Retain = 0,
|
||||||
|
Will = 0,
|
||||||
|
CleanSession = 0,
|
||||||
|
MsgId = 1,
|
||||||
|
TopicId = ?MAX_PRED_TOPIC_ID + 1,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(Socket, <<"test">>),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
|
||||||
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||||
|
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||||
|
receive_response(Socket)),
|
||||||
|
|
||||||
|
Payload = <<"test-registration-inconsistent">>,
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"ab">>, Payload)),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
<<(7 + byte_size(Payload)), ?SN_PUBLISH,
|
||||||
|
Dup:1, QoS:2, Retain:1,
|
||||||
|
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||||
|
TopicId:16, MsgId:16, Payload/binary>>, receive_response(Socket)),
|
||||||
|
%% acked with ?SN_RC_INVALID_TOPIC_ID to
|
||||||
|
send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
?assertEqual(
|
||||||
|
{TopicId, MsgId},
|
||||||
|
check_register_msg_on_udp(<<"ab">>, receive_response(Socket))),
|
||||||
|
send_regack_msg(Socket, TopicId, MsgId + 1),
|
||||||
|
|
||||||
|
%% receive the replay message
|
||||||
|
?assertEqual(
|
||||||
|
<<(7 + byte_size(Payload)), ?SN_PUBLISH,
|
||||||
|
Dup:1, QoS:2, Retain:1,
|
||||||
|
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||||
|
TopicId:16, (MsgId):16, Payload/binary>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_delivery_takeover_and_re_register(_) ->
|
||||||
|
MsgId = 1,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(Socket, <<"test">>, 0),
|
||||||
|
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
|
||||||
|
receive_response(Socket)),
|
||||||
|
|
||||||
|
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
|
||||||
|
<<_, ?SN_SUBACK, 2#00100000,
|
||||||
|
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
|
||||||
|
|
||||||
|
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
|
||||||
|
<<_, ?SN_SUBACK, 2#01000000,
|
||||||
|
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
|
||||||
|
|
||||||
|
_ = emqx:publish(
|
||||||
|
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
|
||||||
|
_ = emqx:publish(
|
||||||
|
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
|
||||||
|
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
|
||||||
|
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket),
|
||||||
|
|
||||||
|
%% offline messages will be queued into the MQTT-SN session
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
|
||||||
|
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
|
||||||
|
|
||||||
|
{ok, NSocket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(NSocket, <<"test">>, 0),
|
||||||
|
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
|
||||||
|
receive_response(NSocket)),
|
||||||
|
|
||||||
|
%% qos1
|
||||||
|
|
||||||
|
%% received the resume messages
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
|
||||||
|
%% only one qos1/qos2 inflight
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
||||||
|
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
%% recv register
|
||||||
|
<<_, ?SN_REGISTER,
|
||||||
|
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
|
||||||
|
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
|
||||||
|
%% received the replay messages
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#00100000,
|
||||||
|
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
%% qos2
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
|
||||||
|
%% only one qos1/qos2 inflight
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
||||||
|
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
%% recv register
|
||||||
|
<<_, ?SN_REGISTER,
|
||||||
|
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
|
||||||
|
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
|
||||||
|
%% received the replay messages
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
|
||||||
|
send_pubrec_msg(NSocket, MsgIdB1),
|
||||||
|
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
|
||||||
|
send_pubcomp_msg(NSocket, MsgIdB1),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
<<_, ?SN_PUBLISH, 2#01000000,
|
||||||
|
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
|
||||||
|
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
|
||||||
|
|
||||||
|
%% no more messages
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
||||||
|
|
||||||
|
send_disconnect_msg(NSocket, undefined),
|
||||||
|
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
|
||||||
|
gen_udp:close(NSocket).
|
||||||
|
|
||||||
t_will_case01(_) ->
|
t_will_case01(_) ->
|
||||||
QoS = 1,
|
QoS = 1,
|
||||||
Duration = 1,
|
Duration = 1,
|
||||||
|
@ -1758,13 +1903,16 @@ send_searchgw_msg(Socket) ->
|
||||||
ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
|
||||||
|
|
||||||
send_connect_msg(Socket, ClientId) ->
|
send_connect_msg(Socket, ClientId) ->
|
||||||
|
send_connect_msg(Socket, ClientId, 1).
|
||||||
|
|
||||||
|
send_connect_msg(Socket, ClientId, CleanSession) when CleanSession == 0;
|
||||||
|
CleanSession == 1 ->
|
||||||
Length = 6 + byte_size(ClientId),
|
Length = 6 + byte_size(ClientId),
|
||||||
MsgType = ?SN_CONNECT,
|
MsgType = ?SN_CONNECT,
|
||||||
Dup = 0,
|
Dup = 0,
|
||||||
QoS = 0,
|
QoS = 0,
|
||||||
Retain = 0,
|
Retain = 0,
|
||||||
Will = 0,
|
Will = 0,
|
||||||
CleanSession = 1,
|
|
||||||
TopicIdType = 0,
|
TopicIdType = 0,
|
||||||
ProtocolId = 1,
|
ProtocolId = 1,
|
||||||
Duration = 10,
|
Duration = 10,
|
||||||
|
@ -1880,9 +2028,12 @@ send_publish_msg_short_topic(Socket, QoS, MsgId, TopicName, Data) ->
|
||||||
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
|
||||||
|
|
||||||
send_puback_msg(Socket, TopicId, MsgId) ->
|
send_puback_msg(Socket, TopicId, MsgId) ->
|
||||||
|
send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED).
|
||||||
|
|
||||||
|
send_puback_msg(Socket, TopicId, MsgId, Rc) ->
|
||||||
Length = 7,
|
Length = 7,
|
||||||
MsgType = ?SN_PUBACK,
|
MsgType = ?SN_PUBACK,
|
||||||
PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED:8>>,
|
PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, Rc:8>>,
|
||||||
?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
||||||
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket).
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket).
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,7 @@ default_conninfo(ConnInfo) ->
|
||||||
clean_start => true,
|
clean_start => true,
|
||||||
clientid => undefined,
|
clientid => undefined,
|
||||||
username => undefined,
|
username => undefined,
|
||||||
conn_props => [],
|
conn_props => #{},
|
||||||
connected => false,
|
connected => false,
|
||||||
connected_at => undefined,
|
connected_at => undefined,
|
||||||
keepalive => undefined,
|
keepalive => undefined,
|
||||||
|
@ -814,4 +814,3 @@ interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
|
||||||
emqx_stomp_heartbeat:interval(outgoing, HrtBt);
|
emqx_stomp_heartbeat:interval(outgoing, HrtBt);
|
||||||
interval(clean_trans_timer, _) ->
|
interval(clean_trans_timer, _) ->
|
||||||
?TRANS_TIMEOUT.
|
?TRANS_TIMEOUT.
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%-*- mode: erlang -*-
|
%%-*- mode: erlang -*-
|
||||||
%% EMQ X R3.0 config mapping
|
%% EMQX config mapping
|
||||||
|
|
||||||
{mapping, "web.hook.url", "emqx_web_hook.url", [
|
{mapping, "web.hook.url", "emqx_web_hook.url", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_web_hook,
|
{application, emqx_web_hook,
|
||||||
[{description, "EMQ X WebHook Plugin"},
|
[{description, "EMQ X WebHook Plugin"},
|
||||||
{vsn, "4.3.9"}, % strict semver, bump manually!
|
{vsn, "4.3.10"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_web_hook_sup]},
|
{registered, [emqx_web_hook_sup]},
|
||||||
{applications, [kernel,stdlib,ehttpc]},
|
{applications, [kernel,stdlib,ehttpc]},
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[{<<"4\\.3\\.[0-2]">>,
|
[{<<"4\\.3\\.[0-2]$">>,
|
||||||
[{apply,{application,stop,[emqx_web_hook]}},
|
[{apply,{application,stop,[emqx_web_hook]}},
|
||||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||||
|
@ -12,8 +12,11 @@
|
||||||
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.9",
|
||||||
|
[ %% nothing so far
|
||||||
|
]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{<<"4\\.3\\.[0-2]">>,
|
[{<<"4\\.3\\.[0-2]$">>,
|
||||||
[{apply,{application,stop,[emqx_web_hook]}},
|
[{apply,{application,stop,[emqx_web_hook]}},
|
||||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||||
|
@ -25,4 +28,7 @@
|
||||||
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
||||||
|
{"4.3.9",
|
||||||
|
[ %% nothing so far
|
||||||
|
]},
|
||||||
{<<".*">>,[]}]}.
|
{<<".*">>,[]}]}.
|
||||||
|
|
|
@ -6,6 +6,8 @@
|
||||||
-define(TIMEOUT, 300000).
|
-define(TIMEOUT, 300000).
|
||||||
-define(INFO(Fmt,Args), io:format(Fmt++"~n",Args)).
|
-define(INFO(Fmt,Args), io:format(Fmt++"~n",Args)).
|
||||||
|
|
||||||
|
-mode(compile).
|
||||||
|
|
||||||
main([Command0, DistInfoStr | CommandArgs]) ->
|
main([Command0, DistInfoStr | CommandArgs]) ->
|
||||||
%% convert the distribution info arguments string to an erlang term
|
%% convert the distribution info arguments string to an erlang term
|
||||||
{ok, Tokens, _} = erl_scan:string(DistInfoStr ++ "."),
|
{ok, Tokens, _} = erl_scan:string(DistInfoStr ++ "."),
|
||||||
|
@ -210,15 +212,24 @@ find_and_link_release_package(Version, RelName) ->
|
||||||
ok = filelib:ensure_dir(filename:join([filename:dirname(ReleaseLink), "dummy"])),
|
ok = filelib:ensure_dir(filename:join([filename:dirname(ReleaseLink), "dummy"])),
|
||||||
%% create the symlink pointing to the full path name of the
|
%% create the symlink pointing to the full path name of the
|
||||||
%% release package we found
|
%% release package we found
|
||||||
case file:make_symlink(filename:absname(Filename), ReleaseLink) of
|
make_symlink_or_copy(filename:absname(Filename), ReleaseLink),
|
||||||
ok ->
|
|
||||||
ok;
|
|
||||||
{error, eperm} -> % windows!
|
|
||||||
{ok,_} = file:copy(filename:absname(Filename), ReleaseLink)
|
|
||||||
end,
|
|
||||||
{Filename, ReleaseHandlerPackageLink}
|
{Filename, ReleaseHandlerPackageLink}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
make_symlink_or_copy(Filename, ReleaseLink) ->
|
||||||
|
case file:make_symlink(Filename, ReleaseLink) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, eexist} ->
|
||||||
|
?INFO("symlink ~p already exists, recreate it", [ReleaseLink]),
|
||||||
|
ok = file:delete(ReleaseLink),
|
||||||
|
make_symlink_or_copy(Filename, ReleaseLink);
|
||||||
|
{error, Reason} when Reason =:= eperm; Reason =:= enotsup ->
|
||||||
|
{ok, _} = file:copy(Filename, ReleaseLink);
|
||||||
|
{error, Reason} ->
|
||||||
|
?INFO("create symlink ~p failed", [ReleaseLink]),
|
||||||
|
error({Reason, ReleaseLink})
|
||||||
|
end.
|
||||||
|
|
||||||
unpack_zipballs(RelNameStr, Version) ->
|
unpack_zipballs(RelNameStr, Version) ->
|
||||||
{ok, Cwd} = file:get_cwd(),
|
{ok, Cwd} = file:get_cwd(),
|
||||||
GzFile = filename:absname(filename:join(["releases", RelNameStr ++ "-" ++ Version ++ ".tar.gz"])),
|
GzFile = filename:absname(filename:join(["releases", RelNameStr ++ "-" ++ Version ++ ".tar.gz"])),
|
||||||
|
|
|
@ -49,7 +49,8 @@ RUN chgrp -Rf emqx /opt/emqx && chmod -Rf g+w /opt/emqx \
|
||||||
|
|
||||||
USER emqx
|
USER emqx
|
||||||
|
|
||||||
VOLUME ["/opt/emqx/log", "/opt/emqx/data", "/opt/emqx/etc"]
|
## NOTE: /opt/emqx/etc is removed from the VOLUME list since 4.3.13
|
||||||
|
VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
|
||||||
|
|
||||||
# emqx will occupy these port:
|
# emqx will occupy these port:
|
||||||
# - 1883 port for MQTT
|
# - 1883 port for MQTT
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
%%-*- mode: erlang -*-
|
%%-*- mode: erlang -*-
|
||||||
%% EMQ X R4.0 config mapping
|
%% EMQX Config Mapping
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Cluster
|
%% Cluster
|
||||||
|
|
|
@ -32,8 +32,8 @@ Options:
|
||||||
--make-command A command used to assemble the release
|
--make-command A command used to assemble the release
|
||||||
--release-dir Release directory
|
--release-dir Release directory
|
||||||
--src-dirs Directories where source code is found. Defaults to '{src,apps,lib-*}/**/'
|
--src-dirs Directories where source code is found. Defaults to '{src,apps,lib-*}/**/'
|
||||||
--binary-rel-url Binary release URL pattern. %TAG% variable is substituted with the release tag.
|
--binary-rel-url Binary release URL pattern.
|
||||||
E.g. \"https://github.com/emqx/emqx/releases/download/v%TAG%/emqx-centos7-%TAG%-amd64.zip\"
|
E.g. https://www.emqx.com/en/downloads/broker/v4.4.1/emqx-4.4.1-otp24.1.5-3-el7-amd64.zip
|
||||||
".
|
".
|
||||||
|
|
||||||
-record(app,
|
-record(app,
|
||||||
|
@ -171,9 +171,10 @@ download_prev_release(Tag, #{binary_rel_url := {ok, URL0}, clone_url := Repo}) -
|
||||||
BaseDir = "/tmp/emqx-baseline-bin/",
|
BaseDir = "/tmp/emqx-baseline-bin/",
|
||||||
Dir = filename:basename(Repo, ".git") ++ [$-|Tag],
|
Dir = filename:basename(Repo, ".git") ++ [$-|Tag],
|
||||||
Filename = filename:join(BaseDir, Dir),
|
Filename = filename:join(BaseDir, Dir),
|
||||||
Script = "mkdir -p ${OUTFILE} &&
|
Script = "echo \"Download: ${OUTFILE}\" &&
|
||||||
wget -c -O ${OUTFILE}.zip ${URL} &&
|
mkdir -p ${OUTFILE} &&
|
||||||
unzip -n -d ${OUTFILE} ${OUTFILE}.zip",
|
curl -f -L -o ${OUTFILE}.zip ${URL} &&
|
||||||
|
unzip -q -n -d ${OUTFILE} ${OUTFILE}.zip",
|
||||||
Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}],
|
Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}],
|
||||||
bash(Script, Env),
|
bash(Script, Env),
|
||||||
{ok, Filename}.
|
{ok, Filename}.
|
||||||
|
@ -208,8 +209,8 @@ find_appup_actions(App,
|
||||||
{OldUpgrade0, OldDowngrade0} = find_old_appup_actions(App, PrevVersion),
|
{OldUpgrade0, OldDowngrade0} = find_old_appup_actions(App, PrevVersion),
|
||||||
OldUpgrade = ensure_all_patch_versions(App, CurrVersion, OldUpgrade0),
|
OldUpgrade = ensure_all_patch_versions(App, CurrVersion, OldUpgrade0),
|
||||||
OldDowngrade = ensure_all_patch_versions(App, CurrVersion, OldDowngrade0),
|
OldDowngrade = ensure_all_patch_versions(App, CurrVersion, OldDowngrade0),
|
||||||
Upgrade = merge_update_actions(App, diff_app(App, CurrAppIdx, PrevAppIdx), OldUpgrade),
|
Upgrade = merge_update_actions(App, diff_app(up, App, CurrAppIdx, PrevAppIdx), OldUpgrade),
|
||||||
Downgrade = merge_update_actions(App, diff_app(App, PrevAppIdx, CurrAppIdx), OldDowngrade),
|
Downgrade = merge_update_actions(App, diff_app(down, App, PrevAppIdx, CurrAppIdx), OldDowngrade),
|
||||||
if OldUpgrade =:= Upgrade andalso OldDowngrade =:= Downgrade ->
|
if OldUpgrade =:= Upgrade andalso OldDowngrade =:= Downgrade ->
|
||||||
%% The appup file has been already updated:
|
%% The appup file has been already updated:
|
||||||
[];
|
[];
|
||||||
|
@ -521,7 +522,7 @@ index_app(AppFile) ->
|
||||||
, modules = Modules
|
, modules = Modules
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
diff_app(App,
|
diff_app(UpOrDown, App,
|
||||||
#app{version = NewVersion, modules = NewModules},
|
#app{version = NewVersion, modules = NewModules},
|
||||||
#app{version = OldVersion, modules = OldModules}) ->
|
#app{version = OldVersion, modules = OldModules}) ->
|
||||||
{New, Changed} =
|
{New, Changed} =
|
||||||
|
@ -540,13 +541,15 @@ diff_app(App,
|
||||||
),
|
),
|
||||||
Deleted = maps:keys(maps:without(maps:keys(NewModules), OldModules)),
|
Deleted = maps:keys(maps:without(maps:keys(NewModules), OldModules)),
|
||||||
NChanges = length(New) + length(Changed) + length(Deleted),
|
NChanges = length(New) + length(Changed) + length(Deleted),
|
||||||
if NewVersion =:= OldVersion andalso NChanges > 0 ->
|
case NewVersion =:= OldVersion of
|
||||||
set_invalid(),
|
true when NChanges =:= 0 ->
|
||||||
log("ERROR: Application '~p' contains changes, but its version is not updated~n", [App]);
|
%% no change
|
||||||
NewVersion > OldVersion ->
|
|
||||||
log("INFO: Application '~p' has been updated: ~p -> ~p~n", [App, OldVersion, NewVersion]),
|
|
||||||
ok;
|
ok;
|
||||||
true ->
|
true ->
|
||||||
|
set_invalid(),
|
||||||
|
log("ERROR: Application '~p' contains changes, but its version is not updated~n", [App]);
|
||||||
|
false ->
|
||||||
|
log("INFO: Application '~p' has been updated: ~p --[~p]--> ~p~n", [App, OldVersion, UpOrDown, NewVersion]),
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
{New, Changed, Deleted}.
|
{New, Changed, Deleted}.
|
||||||
|
@ -607,13 +610,18 @@ locate(ebin_current, App, Suffix) ->
|
||||||
locate(src, App, Suffix) ->
|
locate(src, App, Suffix) ->
|
||||||
AppStr = atom_to_list(App),
|
AppStr = atom_to_list(App),
|
||||||
SrcDirs = getopt(src_dirs),
|
SrcDirs = getopt(src_dirs),
|
||||||
case filelib:wildcard(SrcDirs ++ AppStr ++ Suffix) of
|
case find_app(SrcDirs ++ AppStr ++ Suffix) of
|
||||||
[File] ->
|
[File] ->
|
||||||
{ok, File};
|
{ok, File};
|
||||||
[] ->
|
[] ->
|
||||||
undefined
|
undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
find_app(Pattern) ->
|
||||||
|
%% exclude _build dir inside apps
|
||||||
|
lists:filter(fun(S) -> string:find(S, "/_build/") =:= nomatch end,
|
||||||
|
filelib:wildcard(Pattern)).
|
||||||
|
|
||||||
bash(Script) ->
|
bash(Script) ->
|
||||||
bash(Script, []).
|
bash(Script, []).
|
||||||
|
|
||||||
|
|
|
@ -1,54 +1,61 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.4.1",
|
[{"4.4.1",
|
||||||
[ {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
[{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_banned,brutal_purge,soft_purge,[]}
|
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.0",
|
{"4.4.0",
|
||||||
[ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}
|
[{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_banned,brutal_purge,soft_purge,[]}
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
|
||||||
, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}
|
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx,brutal_purge,soft_purge,[]}
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_app,brutal_purge,soft_purge,[]}
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_message,brutal_purge,soft_purge,[]}
|
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}
|
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{<<".*">>,[]}
|
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||||
],
|
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
||||||
|
{<<".*">>,[]}],
|
||||||
[{"4.4.1",
|
[{"4.4.1",
|
||||||
[ {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
[{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_banned,brutal_purge,soft_purge,[]}
|
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.4.0",
|
{"4.4.0",
|
||||||
[ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}
|
[{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_banned,brutal_purge,soft_purge,[]}
|
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}
|
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}
|
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}
|
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx,brutal_purge,soft_purge,[]}
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_app,brutal_purge,soft_purge,[]}
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_message,brutal_purge,soft_purge,[]}
|
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
|
||||||
, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}
|
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||||
]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{<<".*">>,[]}
|
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||||
]
|
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
||||||
}.
|
{<<".*">>,[]}]}.
|
||||||
|
|
|
@ -235,18 +235,25 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
Self = self(),
|
Self = self(),
|
||||||
ResumeStart = fun(_) ->
|
ResumeStart = fun(_) ->
|
||||||
|
CreateSess =
|
||||||
|
fun() ->
|
||||||
|
Session = create_session(ClientInfo, ConnInfo),
|
||||||
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
|
{ok, #{session => Session, present => false}}
|
||||||
|
end,
|
||||||
case takeover_session(ClientId) of
|
case takeover_session(ClientId) of
|
||||||
{ok, ConnMod, ChanPid, Session} ->
|
{ok, ConnMod, ChanPid, Session} ->
|
||||||
ok = emqx_session:resume(ClientInfo, Session),
|
ok = emqx_session:resume(ClientInfo, Session),
|
||||||
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
|
case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
|
||||||
|
{ok, Pendings} ->
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
{ok, #{session => Session,
|
{ok, #{session => Session,
|
||||||
present => true,
|
present => true,
|
||||||
pendings => Pendings}};
|
pendings => Pendings}};
|
||||||
{error, not_found} ->
|
{error, _} ->
|
||||||
Session = create_session(ClientInfo, ConnInfo),
|
CreateSess()
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
end;
|
||||||
{ok, #{session => Session, present => false}}
|
{error, _Reason} -> CreateSess()
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
emqx_cm_locker:trans(ClientId, ResumeStart).
|
emqx_cm_locker:trans(ClientId, ResumeStart).
|
||||||
|
@ -280,9 +287,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
ConnMod when is_atom(ConnMod) ->
|
ConnMod when is_atom(ConnMod) ->
|
||||||
%% TODO: if takeover times out, maybe kill the old?
|
case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
|
||||||
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
|
{ok, Session} ->
|
||||||
{ok, ConnMod, ChanPid, Session}
|
{ok, ConnMod, ChanPid, Session};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end
|
||||||
end;
|
end;
|
||||||
takeover_session(ClientId, ChanPid) ->
|
takeover_session(ClientId, ChanPid) ->
|
||||||
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
|
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
|
||||||
|
@ -295,42 +305,63 @@ discard_session(ClientId) when is_binary(ClientId) ->
|
||||||
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
|
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private Kick a local stale session to force it step down.
|
%% @private call a local stale session to execute an Action.
|
||||||
%% If failed to kick (e.g. timeout) force a kill.
|
%% If failed to response (e.g. timeout) force a kill.
|
||||||
%% Keeping the stale pid around, or returning error or raise an exception
|
%% Keeping the stale pid around, or returning error or raise an exception
|
||||||
%% benefits nobody.
|
%% benefits nobody.
|
||||||
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
|
-spec request_stepdown(Action, module(), pid())
|
||||||
kick_or_kill(Action, ConnMod, Pid) ->
|
-> ok
|
||||||
try
|
| {ok, emqx_session:session() | list(emqx_type:deliver())}
|
||||||
|
| {error, term()}
|
||||||
|
when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
|
||||||
|
request_stepdown(Action, ConnMod, Pid) ->
|
||||||
|
Timeout =
|
||||||
|
case Action == kick orelse Action == discard of
|
||||||
|
true -> ?T_KICK;
|
||||||
|
_ -> ?T_TAKEOVER
|
||||||
|
end,
|
||||||
|
Return =
|
||||||
%% this is essentailly a gen_server:call implemented in emqx_connection
|
%% this is essentailly a gen_server:call implemented in emqx_connection
|
||||||
%% and emqx_ws_connection.
|
%% and emqx_ws_connection.
|
||||||
%% the handle_call is implemented in emqx_channel
|
%% the handle_call is implemented in emqx_channel
|
||||||
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
|
try apply(ConnMod, call, [Pid, Action, Timeout]) of
|
||||||
|
ok -> ok;
|
||||||
|
Reply -> {ok, Reply}
|
||||||
catch
|
catch
|
||||||
_ : noproc -> % emqx_ws_connection: call
|
_ : noproc -> % emqx_ws_connection: call
|
||||||
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
|
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
|
||||||
|
{error, noproc};
|
||||||
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
||||||
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
|
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
|
||||||
_ : {shutdown, _} ->
|
{error, noproc};
|
||||||
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
|
_ : Reason = {shutdown, _} ->
|
||||||
_ : {{shutdown, _}, _} ->
|
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
|
||||||
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
|
{error, Reason};
|
||||||
|
_ : Reason = {{shutdown, _}, _} ->
|
||||||
|
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
|
||||||
|
{error, Reason};
|
||||||
_ : {timeout, {gen_server, call, _}} ->
|
_ : {timeout, {gen_server, call, _}} ->
|
||||||
?tp(warning, "session_kick_timeout",
|
?tp(warning, "session_stepdown_request_timeout",
|
||||||
#{pid => Pid,
|
#{pid => Pid,
|
||||||
action => Action,
|
action => Action,
|
||||||
stale_channel => stale_channel_info(Pid)
|
stale_channel => stale_channel_info(Pid)
|
||||||
}),
|
}),
|
||||||
ok = force_kill(Pid);
|
ok = force_kill(Pid),
|
||||||
|
{error, timeout};
|
||||||
_ : Error : St ->
|
_ : Error : St ->
|
||||||
?tp(error, "session_kick_exception",
|
?tp(error, "session_stepdown_request_exception",
|
||||||
#{pid => Pid,
|
#{pid => Pid,
|
||||||
action => Action,
|
action => Action,
|
||||||
reason => Error,
|
reason => Error,
|
||||||
stacktrace => St,
|
stacktrace => St,
|
||||||
stale_channel => stale_channel_info(Pid)
|
stale_channel => stale_channel_info(Pid)
|
||||||
}),
|
}),
|
||||||
ok = force_kill(Pid)
|
ok = force_kill(Pid),
|
||||||
|
{error, Error}
|
||||||
|
end,
|
||||||
|
case Action == kick orelse Action == discard of
|
||||||
|
true -> ok;
|
||||||
|
_ -> Return
|
||||||
end.
|
end.
|
||||||
|
|
||||||
force_kill(Pid) ->
|
force_kill(Pid) ->
|
||||||
|
@ -353,7 +384,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
%% already deregistered
|
%% already deregistered
|
||||||
ok;
|
ok;
|
||||||
ConnMod when is_atom(ConnMod) ->
|
ConnMod when is_atom(ConnMod) ->
|
||||||
ok = kick_or_kill(Action, ConnMod, ChanPid)
|
ok = request_stepdown(Action, ConnMod, ChanPid)
|
||||||
end;
|
end;
|
||||||
kick_session(Action, ClientId, ChanPid) ->
|
kick_session(Action, ClientId, ChanPid) ->
|
||||||
%% call remote node on the old APIs because we do not know if they have upgraded
|
%% call remote node on the old APIs because we do not know if they have upgraded
|
||||||
|
|
|
@ -190,7 +190,7 @@ ensure_system_memory_alarm(HW) ->
|
||||||
case erlang:whereis(memsup) of
|
case erlang:whereis(memsup) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
_Pid ->
|
_Pid ->
|
||||||
{Allocated, Total, _Worst} = memsup:get_memory_data(),
|
{Total, Allocated, _Worst} = memsup:get_memory_data(),
|
||||||
case Total =/= 0 andalso Allocated/Total * 100 > HW of
|
case Total =/= 0 andalso Allocated/Total * 100 > HW of
|
||||||
true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW});
|
true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW});
|
||||||
false -> ok
|
false -> ok
|
||||||
|
|
|
@ -50,9 +50,11 @@ monitor(Pid, PMon) ->
|
||||||
?MODULE:monitor(Pid, undefined, PMon).
|
?MODULE:monitor(Pid, undefined, PMon).
|
||||||
|
|
||||||
-spec(monitor(pid(), term(), pmon()) -> pmon()).
|
-spec(monitor(pid(), term(), pmon()) -> pmon()).
|
||||||
monitor(Pid, Val, PMon = ?PMON(Map)) ->
|
monitor(Pid, Val, ?PMON(Map)) ->
|
||||||
case maps:is_key(Pid, Map) of
|
case maps:is_key(Pid, Map) of
|
||||||
true -> PMon;
|
true ->
|
||||||
|
{Ref, _Val} = maps:get(Pid, Map),
|
||||||
|
?PMON(maps:put(Pid, {Ref, Val}, Map));
|
||||||
false ->
|
false ->
|
||||||
Ref = erlang:monitor(process, Pid),
|
Ref = erlang:monitor(process, Pid),
|
||||||
?PMON(maps:put(Pid, {Ref, Val}, Map))
|
?PMON(maps:put(Pid, {Ref, Val}, Map))
|
||||||
|
|
|
@ -119,7 +119,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
|
||||||
suppress({long_schedule, Port},
|
suppress({long_schedule, Port},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
|
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
|
||||||
?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]),
|
?LOG(warning, "~s~n~p", [WarnMsg, portinfo(Port)]),
|
||||||
safe_publish(long_schedule, WarnMsg)
|
safe_publish(long_schedule, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
|
||||||
suppress({busy_port, Port},
|
suppress({busy_port, Port},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
||||||
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
|
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), portinfo(Port)]),
|
||||||
safe_publish(busy_port, WarnMsg)
|
safe_publish(busy_port, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
|
@ -143,7 +143,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
|
||||||
suppress({busy_dist_port, Port},
|
suppress({busy_dist_port, Port},
|
||||||
fun() ->
|
fun() ->
|
||||||
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
||||||
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
|
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), portinfo(Port)]),
|
||||||
safe_publish(busy_dist_port, WarnMsg)
|
safe_publish(busy_dist_port, WarnMsg)
|
||||||
end, State);
|
end, State);
|
||||||
|
|
||||||
|
@ -200,3 +200,9 @@ safe_publish(Event, WarnMsg) ->
|
||||||
sysmon_msg(Topic, Payload) ->
|
sysmon_msg(Topic, Payload) ->
|
||||||
Msg = emqx_message:make(?SYSMON, Topic, Payload),
|
Msg = emqx_message:make(?SYSMON, Topic, Payload),
|
||||||
emqx_message:set_flag(sys, Msg).
|
emqx_message:set_flag(sys, Msg).
|
||||||
|
|
||||||
|
portinfo(Port) ->
|
||||||
|
case is_port(Port) andalso erlang:port_info(Port) of
|
||||||
|
L when is_list(L) -> L;
|
||||||
|
_ -> []
|
||||||
|
end.
|
||||||
|
|
|
@ -187,46 +187,78 @@ t_open_session_race_condition(_) ->
|
||||||
ok = flush_emqx_pool(),
|
ok = flush_emqx_pool(),
|
||||||
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
|
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
|
||||||
|
|
||||||
t_kick_session_discard_normal(_) ->
|
t_stepdown_sessiondiscard_normal(_) ->
|
||||||
test_kick_session(discard, normal).
|
test_stepdown_session(discard, normal).
|
||||||
|
|
||||||
t_kick_session_discard_shutdown(_) ->
|
t_stepdown_sessiondiscard_shutdown(_) ->
|
||||||
test_kick_session(discard, shutdown).
|
test_stepdown_session(discard, shutdown).
|
||||||
|
|
||||||
t_kick_session_discard_shutdown_with_reason(_) ->
|
t_stepdown_sessiondiscard_shutdown_with_reason(_) ->
|
||||||
test_kick_session(discard, {shutdown, discard}).
|
test_stepdown_session(discard, {shutdown, discard}).
|
||||||
|
|
||||||
t_kick_session_discard_timeout(_) ->
|
t_stepdown_sessiondiscard_timeout(_) ->
|
||||||
test_kick_session(discard, timeout).
|
test_stepdown_session(discard, timeout).
|
||||||
|
|
||||||
t_kick_session_discard_noproc(_) ->
|
t_stepdown_sessiondiscard_noproc(_) ->
|
||||||
test_kick_session(discard, noproc).
|
test_stepdown_session(discard, noproc).
|
||||||
|
|
||||||
t_kick_session_kick_normal(_) ->
|
t_stepdown_sessionkick_normal(_) ->
|
||||||
test_kick_session(discard, normal).
|
test_stepdown_session(kick, normal).
|
||||||
|
|
||||||
t_kick_session_kick_shutdown(_) ->
|
t_stepdown_sessionkick_shutdown(_) ->
|
||||||
test_kick_session(discard, shutdown).
|
test_stepdown_session(kick, shutdown).
|
||||||
|
|
||||||
t_kick_session_kick_shutdown_with_reason(_) ->
|
t_stepdown_sessionkick_shutdown_with_reason(_) ->
|
||||||
test_kick_session(discard, {shutdown, discard}).
|
test_stepdown_session(kick, {shutdown, discard}).
|
||||||
|
|
||||||
t_kick_session_kick_timeout(_) ->
|
t_stepdown_sessionkick_timeout(_) ->
|
||||||
test_kick_session(discard, timeout).
|
test_stepdown_session(kick, timeout).
|
||||||
|
|
||||||
t_kick_session_kick_noproc(_) ->
|
t_stepdown_sessionkick_noproc(_) ->
|
||||||
test_kick_session(discard, noproc).
|
test_stepdown_session(discard, noproc).
|
||||||
|
|
||||||
test_kick_session(Action, Reason) ->
|
t_stepdown_sessiontakeover_begin_normal(_) ->
|
||||||
|
test_stepdown_session({takeover, 'begin'}, normal).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_begin_shutdown(_) ->
|
||||||
|
test_stepdown_session({takeover, 'begin'}, shutdown).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_begin_shutdown_with_reason(_) ->
|
||||||
|
test_stepdown_session({takeover, 'begin'}, {shutdown, discard}).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_begin_timeout(_) ->
|
||||||
|
test_stepdown_session({takeover, 'begin'}, timeout).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_begin_noproc(_) ->
|
||||||
|
test_stepdown_session({takeover, 'begin'}, noproc).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_end_normal(_) ->
|
||||||
|
test_stepdown_session({takeover, 'end'}, normal).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_end_shutdown(_) ->
|
||||||
|
test_stepdown_session({takeover, 'end'}, shutdown).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_end_shutdown_with_reason(_) ->
|
||||||
|
test_stepdown_session({takeover, 'end'}, {shutdown, discard}).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_end_timeout(_) ->
|
||||||
|
test_stepdown_session({takeover, 'end'}, timeout).
|
||||||
|
|
||||||
|
t_stepdown_sessiontakeover_end_noproc(_) ->
|
||||||
|
test_stepdown_session({takeover, 'end'}, noproc).
|
||||||
|
|
||||||
|
test_stepdown_session(Action, Reason) ->
|
||||||
ClientId = rand_client_id(),
|
ClientId = rand_client_id(),
|
||||||
#{conninfo := ConnInfo} = ?ChanInfo,
|
#{conninfo := ConnInfo} = ?ChanInfo,
|
||||||
FakeSessionFun =
|
FakeSessionFun =
|
||||||
fun Loop() ->
|
fun Loop() ->
|
||||||
receive
|
receive
|
||||||
{'$gen_call', From, A} when A =:= kick orelse
|
{'$gen_call', From, A} when A =:= kick orelse
|
||||||
A =:= discard ->
|
A =:= discard orelse
|
||||||
|
A =:= {takeover, 'begin'} orelse
|
||||||
|
A =:= {takeover, 'end'} ->
|
||||||
case Reason of
|
case Reason of
|
||||||
normal ->
|
normal when A =:= kick orelse A =:= discard ->
|
||||||
gen_server:reply(From, ok);
|
gen_server:reply(From, ok);
|
||||||
timeout ->
|
timeout ->
|
||||||
%% no response to the call
|
%% no response to the call
|
||||||
|
@ -249,9 +281,10 @@ test_kick_session(Action, Reason) ->
|
||||||
noproc -> exit(Pid1, kill), exit(Pid2, kill);
|
noproc -> exit(Pid1, kill), exit(Pid2, kill);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
ok = case Action of
|
_ = case Action of
|
||||||
kick -> emqx_cm:kick_session(ClientId);
|
kick -> emqx_cm:kick_session(ClientId);
|
||||||
discard -> emqx_cm:discard_session(ClientId)
|
discard -> emqx_cm:discard_session(ClientId);
|
||||||
|
{takeover, _} -> emqx_cm:takeover_session(ClientId)
|
||||||
end,
|
end,
|
||||||
case Reason =:= timeout orelse Reason =:= noproc of
|
case Reason =:= timeout orelse Reason =:= noproc of
|
||||||
true ->
|
true ->
|
||||||
|
|
|
@ -33,6 +33,11 @@
|
||||||
{self(), busy_port,
|
{self(), busy_port,
|
||||||
concat_str("busy_port warning: suspid = ~p, port = ~p",
|
concat_str("busy_port warning: suspid = ~p, port = ~p",
|
||||||
self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")},
|
self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")},
|
||||||
|
%% for the case when the port is missing, for some
|
||||||
|
%% reason.
|
||||||
|
{self(), busy_port,
|
||||||
|
concat_str("busy_port warning: suspid = ~p, port = ~p",
|
||||||
|
self(), []), []},
|
||||||
{self(), busy_dist_port,
|
{self(), busy_dist_port,
|
||||||
concat_str("busy_dist_port warning: suspid = ~p, port = ~p",
|
concat_str("busy_dist_port warning: suspid = ~p, port = ~p",
|
||||||
self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")},
|
self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")},
|
||||||
|
@ -122,6 +127,16 @@ t_sys_mon(_Config) ->
|
||||||
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort)
|
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort)
|
||||||
end, ?INPUTINFO).
|
end, ?INPUTINFO).
|
||||||
|
|
||||||
|
%% Existing port, but closed.
|
||||||
|
t_sys_mon_dead_port(_Config) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
Port = dead_port(),
|
||||||
|
{PidOrPort, SysMonName, ValidateInfo, InfoOrPort} =
|
||||||
|
{self(), busy_port,
|
||||||
|
concat_str("busy_port warning: suspid = ~p, port = ~p",
|
||||||
|
self(), Port), Port},
|
||||||
|
validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort).
|
||||||
|
|
||||||
t_sys_mon2(_Config) ->
|
t_sys_mon2(_Config) ->
|
||||||
?SYSMON ! {timeout, ignored, reset},
|
?SYSMON ! {timeout, ignored, reset},
|
||||||
?SYSMON ! {ignored},
|
?SYSMON ! {ignored},
|
||||||
|
@ -155,3 +170,8 @@ some_function(Parent, _Arg2) ->
|
||||||
stop ->
|
stop ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
dead_port() ->
|
||||||
|
Port = erlang:open_port({spawn, "ls"}, []),
|
||||||
|
exit(Port, kill),
|
||||||
|
Port.
|
||||||
|
|
Loading…
Reference in New Issue