Merge branch 'main-v4.4' into inject_relup_instructions_v4.4

This commit is contained in:
Shawn 2022-02-28 19:13:55 +08:00
commit bcd56d3db2
50 changed files with 771 additions and 309 deletions

View File

@ -20,6 +20,7 @@
?> ?>
[shell emqx] [shell emqx]
!OLD_VSN=$(echo $OLD_VSN | sed -r 's/[v|e]//g')
!cd $PACKAGE_PATH !cd $PACKAGE_PATH
!unzip -q -o $PROFILE-$(echo $OLD_VSN | sed -r 's/[v|e]//g')-otp${FROM_OTP_VSN}-ubuntu20.04-amd64.zip !unzip -q -o $PROFILE-$(echo $OLD_VSN | sed -r 's/[v|e]//g')-otp${FROM_OTP_VSN}-ubuntu20.04-amd64.zip
?SH-PROMPT ?SH-PROMPT
@ -32,6 +33,7 @@
?SH-PROMPT ?SH-PROMPT
[shell emqx2] [shell emqx2]
!OLD_VSN=$(echo $OLD_VSN | sed -r 's/[v|e]//g')
!cd $PACKAGE_PATH !cd $PACKAGE_PATH
!cp -f $ONE_MORE_EMQX_PATH/one_more_$(echo $PROFILE | sed 's/-/_/g').sh . !cp -f $ONE_MORE_EMQX_PATH/one_more_$(echo $PROFILE | sed 's/-/_/g').sh .
!./one_more_$(echo $PROFILE | sed 's/-/_/g').sh emqx2 !./one_more_$(echo $PROFILE | sed 's/-/_/g').sh emqx2
@ -84,6 +86,27 @@
!cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/ !cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/
## upgrade to the new version
!./bin/emqx install $VSN
?Made release permanent: "$VSN"
?SH-PROMPT
!./bin/emqx versions |grep permanent
?(.*)$VSN
?SH-PROMPT
## downgrade to the old version
!./bin/emqx install $${OLD_VSN}
?Made release permanent:.*
?SH-PROMPT
!./bin/emqx versions |grep permanent | grep -qs "$${OLD_VSN}"
?SH-PROMPT:
!echo ==$$?==
?^==0==
?SH-PROMPT:
## again, upgrade to the new version
!./bin/emqx install $VSN !./bin/emqx install $VSN
?Made release permanent: "$VSN" ?Made release permanent: "$VSN"
?SH-PROMPT ?SH-PROMPT
@ -109,6 +132,27 @@
!cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/ !cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/
## upgrade to the new version
!./bin/emqx install $VSN
?Made release permanent: "$VSN"
?SH-PROMPT
!./bin/emqx versions |grep permanent
?(.*)$VSN
?SH-PROMPT
## downgrade to the old version
!./bin/emqx install $${OLD_VSN}
?Made release permanent:.*
?SH-PROMPT
!./bin/emqx versions |grep permanent | grep -qs "$${OLD_VSN}"
?SH-PROMPT:
!echo ==$$?==
?^==0==
?SH-PROMPT:
## again, upgrade to the new version
!./bin/emqx install $VSN !./bin/emqx install $VSN
?Made release permanent: "$VSN" ?Made release permanent: "$VSN"
?SH-PROMPT ?SH-PROMPT
@ -138,17 +182,20 @@
!./bin/emqx_ctl broker metrics | grep "messages.publish" !./bin/emqx_ctl broker metrics | grep "messages.publish"
???SH-PROMPT ???SH-PROMPT
## We don't guarantee not to lose a single message!
## So even if we received 290~300 messages, we consider it as success
[shell bench] [shell bench]
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].metrics[] | select(.node==\"emqx@127.0.0.1\").matched" !curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].metrics[] | select(.node==\"emqx@127.0.0.1\").matched"
?300 ?(29[0-9])|(300)
?SH-PROMPT ?SH-PROMPT
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].actions[0].metrics[] | select(.node==\"emqx@127.0.0.1\").success" !curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].actions[0].metrics[] | select(.node==\"emqx@127.0.0.1\").success"
?300 ?(29[0-9])|(300)
?SH-PROMPT ?SH-PROMPT
## The /counter API is provided by .ci/fvt_test/http_server
!curl http://127.0.0.1:8080/counter !curl http://127.0.0.1:8080/counter
???{"data":300,"code":0} ?\{"data":(29[0-9])|(300),"code":0\}
?SH-PROMPT ?SH-PROMPT
[shell emqx2] [shell emqx2]

View File

@ -12,11 +12,11 @@ assignees: tigercl
**Environment**: **Environment**:
- EMQ X version (e.g. `emqx_ctl status`): - EMQX version (e.g. `emqx_ctl status`):
- Hardware configuration (e.g. `lscpu`): - Hardware configuration (e.g. `lscpu`):
- OS (e.g. `cat /etc/os-release`): - OS (e.g. `cat /etc/os-release`):
- Kernel (e.g. `uname -a`): - Kernel (e.g. `uname -a`):
- Erlang/OTP version (in case you build emqx from source code): - Erlang/OTP version (in case you build emqx from source code):
- Others: - Others:
**What happened and what you expected to happen**: **What happened and what you expected to happen**:

View File

@ -12,15 +12,30 @@ File format:
## v4.3.13 ## v4.3.13
### Important changes
* For docker image, /opt/emqx/etc has been removed from the VOLUME list,
this made it easier for the users to rebuild image on top with changed configs.
### Enhancements ### Enhancements
* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache, * CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
to force an immediate reload of all certificates after the files are updated on disk. to force an immediate reload of all certificates after the files are updated on disk.
* Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983]
* Force shutdown of processe that cannot answer takeover event [#7026]
* `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter)
### Bug fixes ### Bug fixes
* Fix the `{error,eexist}` error when do release upgrade again if last run failed. [#7121]
* Fix case where publishing to a non-existent topic alias would crash the connection [#6979] * Fix case where publishing to a non-existent topic alias would crash the connection [#6979]
* Fix HTTP-API 500 error on querying the lwm2m client list on the another node [#7009] * Fix HTTP-API 500 error on querying the lwm2m client list on the another node [#7009]
* Fix the ExProto connection registry is not released after the client process abnormally exits [#6983]
* Fix Server-KeepAlive wrongly applied on MQTT v3.0/v3.1 [#7085]
* Fix Stomp client can not trigger `$event/client_connection` message [#7096]
* Fix system memory false alarm at boot
* Fix the MQTT-SN message replay when the topic is not registered to the client [#6970]
## v4.3.12 ## v4.3.12
### Important changes ### Important changes

View File

@ -7,7 +7,6 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-5:24.1.5-3-alpine3.1
export EMQX_DEFAULT_RUNNER = alpine:3.14 export EMQX_DEFAULT_RUNNER = alpine:3.14
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
export EMQX_DESC ?= EMQ X
export EMQX_CE_DASHBOARD_VERSION ?= v4.4.0 export EMQX_CE_DASHBOARD_VERSION ?= v4.4.0
export DOCKERFILE := deploy/docker/Dockerfile export DOCKERFILE := deploy/docker/Dockerfile
export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing

View File

@ -168,7 +168,7 @@ auth.http.enable_pipelining = true
## If not specified, the server's names returned in server's certificate is validated against ## If not specified, the server's names returned in server's certificate is validated against
## what's provided `auth.http.auth_req.url` config's host part. ## what's provided `auth.http.auth_req.url` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names. ## Setting to 'disable' will make EMQX ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated ## If set with a host name, the server's names returned in server's certificate is validated
## against this value. ## against this value.
## ##

View File

@ -83,7 +83,7 @@ auth.mongo.database = mqtt
## If not specified, the server's names returned in server's certificate is validated against ## If not specified, the server's names returned in server's certificate is validated against
## what's provided `auth.mongo.server` config's host part. ## what's provided `auth.mongo.server` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names. ## Setting to 'disable' will make EMQX ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated ## If set with a host name, the server's names returned in server's certificate is validated
## against this value. ## against this value.
## ##

View File

@ -123,7 +123,7 @@ auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic f
## If not specified, the server's names returned in server's certificate is validated against ## If not specified, the server's names returned in server's certificate is validated against
## what's provided `auth.mysql.server` config's host part. ## what's provided `auth.mysql.server` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names. ## Setting to 'disable' will make EMQX ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated ## If set with a host name, the server's names returned in server's certificate is validated
## against this value. ## against this value.
## ##

View File

@ -70,7 +70,7 @@ auth.pgsql.ssl = off
## If not specified, the server's names returned in server's certificate is validated against ## If not specified, the server's names returned in server's certificate is validated against
## what's provided `auth.pgsql.server` config's host part. ## what's provided `auth.pgsql.server` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names. ## Setting to 'disable' will make EMQX ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated ## If set with a host name, the server's names returned in server's certificate is validated
## against this value. ## against this value.
## ##

View File

@ -123,9 +123,9 @@ auth.redis.acl_cmd = HGETALL mqtt_acl:%u
## If not specified, the server's names returned in server's certificate is validated against ## If not specified, the server's names returned in server's certificate is validated against
## what's provided `auth.redis.server` config's host part. ## what's provided `auth.redis.server` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names. ## Setting to 'disable' will make EMQX ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated ## If set with a host name, the server's names returned in server's certificate is validated
## against this value. ## against this value.
## ##
## Value: String | disable ## Value: String | disable
## auth.redis.ssl.server_name_indication = disable ## auth.redis.ssl.server_name_indication = disable

View File

@ -1,5 +1,5 @@
##==================================================================== ##====================================================================
## Configuration for EMQ X MQTT Broker Bridge ## Configuration for EMQX MQTT Broker Bridge
##==================================================================== ##====================================================================
##-------------------------------------------------------------------- ##--------------------------------------------------------------------

View File

@ -31,4 +31,4 @@
]}, ]},
{<<".*">>, []} {<<".*">>, []}
] ]
}. }.

View File

@ -37,6 +37,11 @@
, handle_disconnected/2 , handle_disconnected/2
]). ]).
%% for testing
-ifdef(TEST).
-export([ replvar/1 ]).
-endif.
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
@ -176,13 +181,13 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
end end
end, Subscriptions). end, Subscriptions).
replvar(Options) ->
replvar([topic, clientid, max_inflight], Options).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
replvar(Options) ->
replvar([clientid, max_inflight], Options).
replvar([], Options) -> replvar([], Options) ->
Options; Options;
replvar([Key|More], Options) -> replvar([Key|More], Options) ->
@ -194,8 +199,8 @@ replvar([Key|More], Options) ->
end. end.
%% ${node} => node() %% ${node} => node()
feedvar(clientid, ClientId, _) -> feedvar(Key, Value, _) when Key =:= topic; Key =:= clientid ->
iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))); iolist_to_binary(re:replace(Value, "\\${node}", atom_to_list(node())));
feedvar(max_inflight, 0, _) -> feedvar(max_inflight, 0, _) ->
infinity; infinity;

View File

@ -44,4 +44,15 @@ send_and_ack_test() ->
ok = emqx_bridge_mqtt:stop(Conn) ok = emqx_bridge_mqtt:stop(Conn)
after after
meck:unload(emqtt) meck:unload(emqtt)
end. end.
replvar_test() ->
Node = atom_to_list(node()),
Config = #{clientid => <<"Hey ${node}">>, topic => <<"topic ${node}">>, other => <<"other">>},
ReplacedConfig = emqx_bridge_mqtt:replvar(Config),
ExpectedConfig = #{clientid => iolist_to_binary("Hey " ++ Node),
topic => iolist_to_binary("topic " ++ Node),
other => <<"other">>},
?assertEqual(ExpectedConfig, ReplacedConfig).

View File

@ -1,5 +1,5 @@
##==================================================================== ##====================================================================
## EMQ X Hooks ## EMQX Hooks
##==================================================================== ##====================================================================
## The default value or action will be returned, while the request to ## The default value or action will be returned, while the request to

View File

@ -1,5 +1,5 @@
##==================================================================== ##====================================================================
## EMQ X ExProto ## EMQX ExProto
##==================================================================== ##====================================================================
exproto.server.http.port = 9100 exproto.server.http.port = 9100
@ -65,7 +65,7 @@ exproto.listener.protoname.idle_timeout = 30s
## Example: allow 192.168.0.0/24 ## Example: allow 192.168.0.0/24
exproto.listener.protoname.access.1 = allow all exproto.listener.protoname.access.1 = allow all
## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed ## Enable the Proxy Protocol V1/2 if the EMQX cluster is deployed
## behind HAProxy or Nginx. ## behind HAProxy or Nginx.
## ##
## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/
@ -73,7 +73,7 @@ exproto.listener.protoname.access.1 = allow all
## Value: on | off ## Value: on | off
## exproto.listener.protoname.proxy_protocol = on ## exproto.listener.protoname.proxy_protocol = on
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection ## Sets the timeout for proxy protocol. EMQX will close the TCP connection
## if no proxy protocol packet recevied within the timeout. ## if no proxy protocol packet recevied within the timeout.
## ##
## Value: Duration ## Value: Duration

View File

@ -1,6 +1,6 @@
{application, emqx_exproto, {application, emqx_exproto,
[{description, "EMQ X Extension for Protocol"}, [{description, "EMQ X Extension for Protocol"},
{vsn, "4.3.5"}, %% 4.3.3 is used by ee {vsn, "4.3.6"}, %% 4.3.3 is used by ee
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{mod, {emqx_exproto_app, []}}, {mod, {emqx_exproto_app, []}},

View File

@ -1,28 +1,24 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[{"4.3.4", [{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.2", {<<"4\\.3\\.[2-3]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>, {<<"4\\.3\\.[0-1]">>,
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.4", [{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.2", {<<"4\\.3\\.[2-3]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>, {<<"4\\.3\\.[0-1]">>,
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},

View File

@ -94,9 +94,6 @@
awaiting_rel_max awaiting_rel_max
]). ]).
-define(CHANMOCK(P), {exproto_anonymous_client, P}).
-define(CHAN_CONN_TAB, emqx_channel_conn).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Info, Attrs and Caps %% Info, Attrs and Caps
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -155,15 +152,14 @@ init(ConnInfo = #{socktype := Socktype,
Channel = #channel{gcli = #{channel => GRpcChann}, Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo, conninfo = NConnInfo,
clientinfo = ClientInfo, clientinfo = ClientInfo,
conn_state = connecting, conn_state = accepted,
timers = #{} timers = #{}
}, },
case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
{error, _Reason} -> {error, _Reason} ->
throw(nopermission); throw(nopermission);
_ -> _ ->
ConnMod = maps:get(conn_mod, NConnInfo), ok = register_the_anonymous_client(ClientInfo, NConnInfo),
true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}),
Req = #{conninfo => Req = #{conninfo =>
peercert(Peercert, peercert(Peercert,
#{socktype => socktype(Socktype), #{socktype => socktype(Socktype),
@ -172,6 +168,22 @@ init(ConnInfo = #{socktype := Socktype,
try_dispatch(on_socket_created, wrap(Req), Channel) try_dispatch(on_socket_created, wrap(Req), Channel)
end. end.
register_the_anonymous_client(ClientInfo, ConnInfo) ->
ClientId = maps:get(clientid, ClientInfo),
case emqx_cm:open_session(true, ClientInfo, ConnInfo) of
{ok, _} ->
?LOG(debug, "Registered an anonymous connection, "
"temporary clientid: ~s", [ClientId]),
emqx_logger:set_metadata_clientid(ClientId),
_ = self() ! {event, accepted},
ok;
{error, Reason} ->
throw({register_anonymous_error, Reason})
end.
unregister_the_anonymous_client(ClientId) ->
emqx_cm:unregister_channel(ClientId).
%% @private %% @private
peercert(NoSsl, ConnInfo) when NoSsl == nossl; peercert(NoSsl, ConnInfo) when NoSsl == nossl;
NoSsl == undefined -> NoSsl == undefined ->
@ -274,15 +286,14 @@ handle_call(close, Channel) ->
handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) -> handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
handle_call({auth, ClientInfo0, Password}, handle_call({auth, RequestedClientInfo, Password},
Channel = #channel{conninfo = ConnInfo, Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) -> clientinfo = ClientInfo0}) ->
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), ClientInfo1 = enrich_clientinfo(RequestedClientInfo, ClientInfo0),
NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo), NConnInfo = enrich_conninfo(RequestedClientInfo, ConnInfo),
Channel1 = Channel#channel{conninfo = NConnInfo, Channel1 = Channel#channel{conninfo = NConnInfo,
clientinfo = ClientInfo1}, clientinfo = ClientInfo1},
#{clientid := ClientId, username := Username} = ClientInfo1, #{clientid := ClientId, username := Username} = ClientInfo1,
case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of
@ -292,9 +303,10 @@ handle_call({auth, ClientInfo0, Password},
emqx_metrics:inc('client.auth.anonymous'), emqx_metrics:inc('client.auth.anonymous'),
NClientInfo = maps:merge(ClientInfo1, AuthResult), NClientInfo = maps:merge(ClientInfo1, AuthResult),
NChannel = Channel1#channel{clientinfo = NClientInfo}, NChannel = Channel1#channel{clientinfo = NClientInfo},
clean_anonymous_clients(),
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
{ok, _Session} -> {ok, _Session} ->
AnonymousClientId = maps:get(clientid, ClientInfo0),
unregister_the_anonymous_client(AnonymousClientId),
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!", ?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
[ClientId, Username]), [ClientId, Username]),
{reply, ok, [{event, connected}], ensure_connected(NChannel)}; {reply, ok, [{event, connected}], ensure_connected(NChannel)};
@ -354,6 +366,9 @@ handle_call({publish, Topic, Qos, Payload},
handle_call(kick, Channel) -> handle_call(kick, Channel) ->
{shutdown, kicked, ok, Channel}; {shutdown, kicked, ok, Channel};
handle_call(discard, Channel) ->
{shutdown, discarded, ok, Channel};
handle_call(Req, Channel) -> handle_call(Req, Channel) ->
?LOG(warning, "Unexpected call: ~p", [Req]), ?LOG(warning, "Unexpected call: ~p", [Req]),
{reply, {error, unexpected_call}, Channel}. {reply, {error, unexpected_call}, Channel}.
@ -406,16 +421,12 @@ handle_info(Info, Channel) ->
-spec(terminate(any(), channel()) -> channel()). -spec(terminate(any(), channel()) -> channel()).
terminate(Reason, Channel) -> terminate(Reason, Channel) ->
clean_anonymous_clients(),
Req = #{reason => stringfy(Reason)}, Req = #{reason => stringfy(Reason)},
try_dispatch(on_socket_closed, wrap(Req), Channel). try_dispatch(on_socket_closed, wrap(Req), Channel).
is_anonymous(#{anonymous := true}) -> true; is_anonymous(#{anonymous := true}) -> true;
is_anonymous(_AuthResult) -> false. is_anonymous(_AuthResult) -> false.
clean_anonymous_clients() ->
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
packet_to_message(Topic, Qos, Payload, packet_to_message(Topic, Qos, Payload,
#channel{ #channel{
conninfo = #{proto_ver := ProtoVer}, conninfo = #{proto_ver := ProtoVer},
@ -608,23 +619,32 @@ default_conninfo(ConnInfo) ->
username => undefined, username => undefined,
conn_props => #{}, conn_props => #{},
connected => true, connected => true,
proto_name => <<"exproto">>,
proto_ver => <<"1.0">>,
connected_at => erlang:system_time(millisecond), connected_at => erlang:system_time(millisecond),
keepalive => 0, keepalive => 0,
receive_maximum => 0, receive_maximum => 0,
expiry_interval => 0}. expiry_interval => 0}.
default_clientinfo(#{peername := {PeerHost, _}, default_clientinfo(#{peername := {PeerHost, PeerPort},
sockname := {_, SockPort}}) -> sockname := {_, SockPort}}) ->
#{zone => external, #{zone => external,
protocol => undefined, protocol => exproto,
peerhost => PeerHost, peerhost => PeerHost,
sockport => SockPort, sockport => SockPort,
clientid => undefined, clientid => anonymous_clientid(PeerHost, PeerPort),
username => undefined, username => undefined,
is_bridge => false, is_bridge => false,
is_superuser => false, is_superuser => false,
mountpoint => undefined}. mountpoint => undefined}.
anonymous_clientid(PeerHost, PeerPort) ->
iolist_to_binary(
["exproto-anonymous-",
inet:ntoa(PeerHost), "-", integer_to_list(PeerPort),
"-", emqx_rule_id:gen()
]).
stringfy(Reason) -> stringfy(Reason) ->
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))). unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).

View File

@ -439,7 +439,8 @@ handle_msg({close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason]), ?LOG(debug, "Force to close the socket due to ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) -> handle_msg({event, Event}, State = #state{channel = Channel})
when Event == connected; Event == accepted ->
ClientId = emqx_exproto_channel:info(clientid, Channel), ClientId = emqx_exproto_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)); emqx_cm:insert_channel_info(ClientId, info(State), stats(State));

View File

@ -1,4 +1,4 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## EMQ X Lua Hook ## EMQX Lua Hook
##-------------------------------------------------------------------- ##--------------------------------------------------------------------

View File

@ -1,5 +1,5 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## EMQ X Management Plugin ## EMQX Management Plugin
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Max Row Limit ## Max Row Limit

View File

@ -297,6 +297,7 @@ format_channel_info({_Key, Info, Stats0}) ->
SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)),
Connected = case maps:get(conn_state, Info, connected) of Connected = case maps:get(conn_state, Info, connected) of
connected -> true; connected -> true;
accepted -> true; %% for exproto anonymous clients
_ -> false _ -> false
end, end,
NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0),

View File

@ -1,5 +1,5 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## EMQ X Management Plugin ## EMQX Management Plugin
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Max Row Limit ## Max Row Limit

View File

@ -1,5 +1,5 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## emqx_prometheus for EMQ X ## emqx_prometheus for EMQX
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## The Prometheus Push Gateway URL address ## The Prometheus Push Gateway URL address

View File

@ -1,5 +1,5 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## EMQ X Retainer ## EMQX Retainer
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Where to store the retained messages. ## Where to store the retained messages.

View File

@ -1,5 +1,5 @@
##==================================================================== ##====================================================================
## Rule Engine for EMQ X R4.0 ## EMQX Rule Engine
##==================================================================== ##====================================================================
rule_engine.ignore_sys_message = on rule_engine.ignore_sys_message = on

View File

@ -1,6 +1,6 @@
{application, emqx_sn, {application, emqx_sn,
[{description, "EMQ X MQTT-SN Plugin"}, [{description, "EMQ X MQTT-SN Plugin"},
{vsn, "4.3.5"}, % strict semver, bump manually! {vsn, "4.3.6"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,esockd]}, {applications, [kernel,stdlib,esockd]},

View File

@ -1,29 +1,25 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[ [
{"4.3.4",[ {<<"4\\.3\\.[4-5]">>,[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.3",[ {<<"4.3.[2-3]">>,[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.2", [
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]}, ]},
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
], ],
[ [
{"4.3.4",[ {<<"4\\.3\\.[4-5]">>,[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.3",[ {<<"4.3.[2-3]">>,[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.2", [
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]}, ]},
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}

View File

@ -268,40 +268,81 @@ message_type(16#1d) ->
message_type(Type) -> message_type(Type) ->
io_lib:format("Unknown Type ~p", [Type]). io_lib:format("Unknown Type ~p", [Type]).
format(?SN_CONNECT_MSG(Flags, ProtocolId, Duration, ClientId)) ->
#mqtt_sn_flags{
will = Will,
clean_start = CleanStart} = Flags,
io_lib:format("SN_CONNECT(W~w, C~w, ProtocolId=~w, Duration=~w, "
"ClientId=~s)",
[bool(Will), bool(CleanStart),
ProtocolId, Duration, ClientId]);
format(?SN_CONNACK_MSG(ReturnCode)) ->
io_lib:format("SN_CONNACK(ReturnCode=~w)", [ReturnCode]);
format(?SN_WILLTOPICREQ_MSG()) ->
"SN_WILLTOPICREQ()";
format(?SN_WILLTOPIC_MSG(Flags, Topic)) ->
#mqtt_sn_flags{
qos = QoS,
retain = Retain} = Flags,
io_lib:format("SN_WILLTOPIC(Q~w, R~w, Topic=~s)",
[QoS, bool(Retain), Topic]);
format(?SN_WILLTOPIC_EMPTY_MSG) ->
"SN_WILLTOPIC(_)";
format(?SN_WILLMSGREQ_MSG()) ->
"SN_WILLMSGREQ()";
format(?SN_WILLMSG_MSG(Msg)) ->
io_lib:format("SN_WILLMSG_MSG(Msg=~p)", [Msg]);
format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) -> format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) ->
io_lib:format("mqtt_sn_message SN_PUBLISH, ~s, TopicId=~w, MsgId=~w, Payload=~w", #mqtt_sn_flags{
[format_flag(Flags), TopicId, MsgId, Data]); dup = Dup,
format(?SN_PUBACK_MSG(Flags, MsgId, ReturnCode)) -> qos = QoS,
io_lib:format("mqtt_sn_message SN_PUBACK, ~s, MsgId=~w, ReturnCode=~w", retain = Retain,
[format_flag(Flags), MsgId, ReturnCode]); topic_id_type = TopicIdType} = Flags,
io_lib:format("SN_PUBLISH(D~w, Q~w, R~w, TopicIdType=~w, TopicId=~w, "
"MsgId=~w, Payload=~p)",
[bool(Dup), QoS, bool(Retain),
TopicIdType, TopicId, MsgId, Data]);
format(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)) ->
io_lib:format("SN_PUBACK(TopicId=~w, MsgId=~w, ReturnCode=~w)",
[TopicId, MsgId, ReturnCode]);
format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) -> format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) ->
io_lib:format("mqtt_sn_message SN_PUBCOMP, MsgId=~w", [MsgId]); io_lib:format("SN_PUBCOMP(MsgId=~w)", [MsgId]);
format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) -> format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) ->
io_lib:format("mqtt_sn_message SN_PUBREC, MsgId=~w", [MsgId]); io_lib:format("SN_PUBREC(MsgId=~w)", [MsgId]);
format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) -> format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) ->
io_lib:format("mqtt_sn_message SN_PUBREL, MsgId=~w", [MsgId]); io_lib:format("SN_PUBREL(MsgId=~w)", [MsgId]);
format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) -> format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
io_lib:format("mqtt_sn_message SN_SUBSCRIBE, ~s, MsgId=~w, TopicId=~w", #mqtt_sn_flags{
[format_flag(Flags), Msgid, Topic]); dup = Dup,
qos = QoS,
topic_id_type = TopicIdType} = Flags,
io_lib:format("SN_SUBSCRIBE(D~w, Q~w, TopicIdType=~w, MsgId=~w, "
"TopicId=~w)",
[bool(Dup), QoS, TopicIdType, Msgid, Topic]);
format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) -> format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) ->
io_lib:format("mqtt_sn_message SN_SUBACK, ~s, MsgId=~w, TopicId=~w, ReturnCode=~w", #mqtt_sn_flags{qos = QoS} = Flags,
[format_flag(Flags), MsgId, TopicId, ReturnCode]); io_lib:format("SN_SUBACK(GrantedQoS=~w, MsgId=~w, TopicId=~w, "
"ReturnCode=~w)",
[QoS, MsgId, TopicId, ReturnCode]);
format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) -> format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
io_lib:format("mqtt_sn_message SN_UNSUBSCRIBE, ~s, MsgId=~w, TopicId=~w", #mqtt_sn_flags{topic_id_type = TopicIdType} = Flags,
[format_flag(Flags), Msgid, Topic]); io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~s, MsgId=~w, TopicId=~w)",
[TopicIdType, Msgid, Topic]);
format(?SN_UNSUBACK_MSG(MsgId)) -> format(?SN_UNSUBACK_MSG(MsgId)) ->
io_lib:format("mqtt_sn_message SN_UNSUBACK, MsgId=~w", [MsgId]); io_lib:format("SN_UNSUBACK(MsgId=~w)", [MsgId]);
format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) -> format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) ->
io_lib:format("mqtt_sn_message SN_REGISTER, TopicId=~w, MsgId=~w, TopicName=~w", io_lib:format("SN_REGISTER(TopicId=~w, MsgId=~w, TopicName=~s)",
[TopicId, MsgId, TopicName]); [TopicId, MsgId, TopicName]);
format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
io_lib:format("mqtt_sn_message SN_REGACK, TopicId=~w, MsgId=~w, ReturnCode=~w", io_lib:format("SN_REGACK(TopicId=~w, MsgId=~w, ReturnCode=~w)",
[TopicId, MsgId, ReturnCode]); [TopicId, MsgId, ReturnCode]);
format(?SN_PINGREQ_MSG(ClientId)) ->
io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]);
format(?SN_PINGRESP_MSG()) ->
"SN_PINGREQ()";
format(?SN_DISCONNECT_MSG(Duration)) ->
io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]);
format(#mqtt_sn_message{type = Type, variable = Var}) -> format(#mqtt_sn_message{type = Type, variable = Var}) ->
io_lib:format("mqtt_sn_message type=~s, Var=~w", [emqx_sn_frame:message_type(Type), Var]). io_lib:format("mqtt_sn_message(type=~s, Var=~w)",
[emqx_sn_frame:message_type(Type), Var]).
format_flag(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will, clean_start = CleanStart, topic_id_type = TopicType}) ->
io_lib:format("mqtt_sn_flags{dup=~p, qos=~p, retain=~p, will=~p, clean_session=~p, topic_id_type=~p}",
[Dup, QoS, Retain, Will, CleanStart, TopicType]);
format_flag(_Flag) -> "invalid flag".

View File

@ -94,6 +94,8 @@
idle_timeout :: integer(), idle_timeout :: integer(),
enable_qos3 = false :: boolean(), enable_qos3 = false :: boolean(),
has_pending_pingresp = false :: boolean(), has_pending_pingresp = false :: boolean(),
%% Store all qos0 messages for waiting REGACK
%% Note: QoS1/QoS2 messages will kept inflight queue
pending_topic_ids = #{} :: pending_msgs() pending_topic_ids = #{} :: pending_msgs()
}). }).
@ -490,7 +492,7 @@ handle_event({call, From}, Req, _StateName, State) ->
{reply, Reply, NState} -> {reply, Reply, NState} ->
gen_server:reply(From, Reply), gen_server:reply(From, Reply),
{keep_state, NState}; {keep_state, NState};
{stop, Reason, Reply, NState} -> {shutdown, Reason, Reply, NState} ->
State0 = case NState#state.sockstate of State0 = case NState#state.sockstate of
running -> running ->
send_message(?SN_DISCONNECT_MSG(undefined), NState); send_message(?SN_DISCONNECT_MSG(undefined), NState);
@ -518,10 +520,9 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
end; end;
handle_event(info, {deliver, _Topic, Msg}, asleep, handle_event(info, {deliver, _Topic, Msg}, asleep,
State = #state{channel = Channel, pending_topic_ids = Pendings}) -> State = #state{channel = Channel}) ->
% section 6.14, Support of sleeping clients % section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p", ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]),
[Msg, Pendings]),
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
Msg, emqx_channel:get_session(Channel)), Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -610,7 +611,7 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
{reply, Reply, NChannel} -> {reply, Reply, NChannel} ->
{reply, Reply, State#state{channel = NChannel}}; {reply, Reply, State#state{channel = NChannel}};
{shutdown, Reason, Reply, NChannel} -> {shutdown, Reason, Reply, NChannel} ->
stop(Reason, Reply, State#state{channel = NChannel}) {shutdown, Reason, Reply, State#state{channel = NChannel}}
end. end.
handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) -> handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) ->
@ -723,11 +724,19 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
?SN_UNSUBACK_MSG(MsgId); ?SN_UNSUBACK_MSG(MsgId);
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) -> mqtt2sn(
NewPacketId = if QoS =:= ?QOS_0 -> 0; #mqtt_packet{header = #mqtt_packet_header{
type = ?PUBLISH,
qos = QoS,
%dup = Dup,
retain = Retain},
variable = #mqtt_packet_publish{
topic_name = Topic,
packet_id = PacketId},
payload = Payload}, #state{clientid = ClientId}) ->
NPacketId = if QoS =:= ?QOS_0 -> 0;
true -> PacketId true -> PacketId
end, end,
ClientId = emqx_channel:info(clientid, Channel),
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of
{predef, PredefTopicId} -> {predef, PredefTopicId} ->
{?SN_PREDEFINED_TOPIC, PredefTopicId}; {?SN_PREDEFINED_TOPIC, PredefTopicId};
@ -737,8 +746,12 @@ mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channe
{?SN_SHORT_TOPIC, Topic} {?SN_SHORT_TOPIC, Topic}
end, end,
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType}, Flags = #mqtt_sn_flags{
?SN_PUBLISH_MSG(Flags, TopicContent, NewPacketId, Payload); %dup = Dup,
qos = QoS,
retain = Retain,
topic_id_type = TopicIdType},
?SN_PUBLISH_MSG(Flags, TopicContent, NPacketId, Payload);
mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)-> mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
% if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...) % if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...)
@ -766,9 +779,10 @@ send_connack(State) ->
send_message(Msg = #mqtt_sn_message{type = Type}, send_message(Msg = #mqtt_sn_message{type = Type},
State = #state{sockpid = SockPid, peername = Peername}) -> State = #state{sockpid = SockPid, peername = Peername}) ->
?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]), ?LOG(info, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
inc_outgoing_stats(Type), inc_outgoing_stats(Type),
Data = emqx_sn_frame:serialize(Msg), Data = emqx_sn_frame:serialize(Msg),
?LOG(debug, "SEND ~0p", [Data]),
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)), ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
SockPid ! {datagram, Peername, Data}, SockPid ! {datagram, Peername, Data},
State. State.
@ -793,13 +807,6 @@ stop(Reason, State) ->
maybe_send_will_msg(Reason, State), maybe_send_will_msg(Reason, State),
{stop, {shutdown, Reason}, State}. {stop, {shutdown, Reason}, State}.
stop({shutdown, Reason}, Reply, State) ->
stop(Reason, Reply, State);
stop(Reason, Reply, State) ->
?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
maybe_send_will_msg(Reason, State),
{stop, {shutdown, Reason}, Reply, State}.
maybe_send_will_msg(normal, _State) -> maybe_send_will_msg(normal, _State) ->
ok; ok;
maybe_send_will_msg(_Reason, State) -> maybe_send_will_msg(_Reason, State) ->
@ -825,6 +832,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
%% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message %% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
%% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange %% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
%% before it could start a new level 1 or 2 transaction. %% before it could start a new level 1 or 2 transaction.
%%
%% FIXME: But we should have a re-try timer to re-send the inflight
%% qos1/qos2 message
OnlyOneInflight = #{'Receive-Maximum' => 1}, OnlyOneInflight = #{'Receive-Maximum' => 1},
ConnPkt = #mqtt_packet_connect{clientid = ClientId, ConnPkt = #mqtt_packet_connect{clientid = ClientId,
clean_start = CleanStart, clean_start = CleanStart,
@ -973,11 +983,16 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
undefined -> {keep_state, State}; undefined -> {keep_state, State};
TopicName -> TopicName ->
%%notice that this TopicName maybe normal or predefined, %% notice that this TopicName maybe normal or predefined,
%% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels %% involving the predefined topic name in register to
{keep_state, send_register(TopicName, TopicId, MsgId, State)} %% enhance the gateway's robustness even inconsistent
%% with MQTT-SN channel
{keep_state,
send_register(TopicName, TopicId, MsgId, State)}
end; end;
_ -> _ ->
%% XXX: We need to handle others error code
%% 'Rejection: congestion'
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]), ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
{keep_state, State} {keep_state, State}
end. end.
@ -1050,7 +1065,7 @@ handle_incoming(Packet, _StName, State) ->
channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) -> channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
_ = inc_incoming_stats(Type), _ = inc_incoming_stats(Type),
ok = emqx_metrics:inc_recv(Packet), ok = emqx_metrics:inc_recv(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ?LOG(debug, "Transed-RECV ~s", [emqx_packet:format(Packet)]),
emqx_channel:handle_in(Packet, Channel). emqx_channel:handle_in(Packet, Channel).
handle_outgoing(Packets, State) when is_list(Packets) -> handle_outgoing(Packets, State) when is_list(Packets) ->
@ -1064,7 +1079,9 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
true -> register_and_notify_client(PubPkt, State); true ->
%% TODO: only one REGISTER inflight if qos=0??
register_and_notify_client(PubPkt, State);
false -> send_message(mqtt2sn(PubPkt, State), State) false -> send_message(mqtt2sn(PubPkt, State), State)
end; end;
@ -1077,13 +1094,40 @@ cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) ->
Msgs = maps:get(pending_topic_ids, Pendings, []), Msgs = maps:get(pending_topic_ids, Pendings, []),
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}. Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.
replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State0) -> replay_no_reg_pending_publishes(TopicId,
?LOG(debug, "replay non-registered publish message for topic-id: ~p, pendings: ~0p", State0 = #state{
[TopicId, Pendings]), pending_topic_ids = Pendings}) ->
?LOG(debug, "replay non-registered qos0 publish message for "
"topic-id: ~p, pendings: ~0p", [TopicId, Pendings]),
State = lists:foldl(fun(Msg, State1) -> State = lists:foldl(fun(Msg, State1) ->
send_message(Msg, State1) send_message(Msg, State1)
end, State0, maps:get(TopicId, Pendings, [])), end, State0, maps:get(TopicId, Pendings, [])),
State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
NState = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)},
case replay_inflight_messages(TopicId, State#state.channel) of
[] -> ok;
Outgoings ->
?LOG(debug, "replay non-registered qos1/qos2 publish message "
"for topic-id: ~0p, messages: ~0p",
[TopicId, Outgoings]),
handle_outgoing(Outgoings, NState)
end.
replay_inflight_messages(TopicId, Channel) ->
Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)),
case emqx_inflight:to_list(Inflight) of
[] -> [];
[{PktId, {Msg, _Ts}}] -> %% Fixed inflight size 1
ClientId = emqx_channel:info(clientid, Channel),
ReplayTopic = emqx_sn_registry:lookup_topic(ClientId, TopicId),
case ReplayTopic =:= emqx_message:topic(Msg) of
false -> [];
true ->
NMsg = emqx_message:set_flag(dup, true, Msg),
[emqx_message:to_packet(PktId, NMsg)]
end
end.
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
State = #state{pending_topic_ids = Pendings, channel = Channel}) -> State = #state{pending_topic_ids = Pendings, channel = Channel}) ->
@ -1091,10 +1135,17 @@ register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) =
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:register_topic(ClientId, TopicName), TopicId = emqx_sn_registry:register_topic(ClientId, TopicName),
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, "
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), "QoS=~p,Retain=~p, MsgId=~p",
NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
send_register(TopicName, TopicId, MsgId, State#state{pending_topic_ids = NewPendings}). NPendings = case QoS == ?QOS_0 of
true ->
cache_no_reg_publish_message(
Pendings, TopicId, PubPkt, State);
_ -> Pendings
end,
send_register(TopicName, TopicId, MsgId,
State#state{pending_topic_ids = NPendings}).
message_id(undefined) -> message_id(undefined) ->
rand:uniform(16#FFFF); rand:uniform(16#FFFF);

View File

@ -943,6 +943,151 @@ t_publish_qos2_case03(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket). gen_udp:close(Socket).
t_delivery_qos1_register_invalid_topic_id(_) ->
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicId = ?MAX_PRED_TOPIC_ID + 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
Payload = <<"test-registration-inconsistent">>,
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"ab">>, Payload)),
?assertEqual(
<<(7 + byte_size(Payload)), ?SN_PUBLISH,
Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId:16, MsgId:16, Payload/binary>>, receive_response(Socket)),
%% acked with ?SN_RC_INVALID_TOPIC_ID to
send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID),
?assertEqual(
{TopicId, MsgId},
check_register_msg_on_udp(<<"ab">>, receive_response(Socket))),
send_regack_msg(Socket, TopicId, MsgId + 1),
%% receive the replay message
?assertEqual(
<<(7 + byte_size(Payload)), ?SN_PUBLISH,
Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId:16, (MsgId):16, Payload/binary>>, receive_response(Socket)),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_delivery_takeover_and_re_register(_) ->
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#00100000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% qos1
%% received the resume messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
%% qos2
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_disconnect_msg(NSocket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
gen_udp:close(NSocket).
t_will_case01(_) -> t_will_case01(_) ->
QoS = 1, QoS = 1,
Duration = 1, Duration = 1,
@ -1758,13 +1903,16 @@ send_searchgw_msg(Socket) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>). ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
send_connect_msg(Socket, ClientId) -> send_connect_msg(Socket, ClientId) ->
send_connect_msg(Socket, ClientId, 1).
send_connect_msg(Socket, ClientId, CleanSession) when CleanSession == 0;
CleanSession == 1 ->
Length = 6 + byte_size(ClientId), Length = 6 + byte_size(ClientId),
MsgType = ?SN_CONNECT, MsgType = ?SN_CONNECT,
Dup = 0, Dup = 0,
QoS = 0, QoS = 0,
Retain = 0, Retain = 0,
Will = 0, Will = 0,
CleanSession = 1,
TopicIdType = 0, TopicIdType = 0,
ProtocolId = 1, ProtocolId = 1,
Duration = 10, Duration = 10,
@ -1880,9 +2028,12 @@ send_publish_msg_short_topic(Socket, QoS, MsgId, TopicName, Data) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket). ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
send_puback_msg(Socket, TopicId, MsgId) -> send_puback_msg(Socket, TopicId, MsgId) ->
send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED).
send_puback_msg(Socket, TopicId, MsgId, Rc) ->
Length = 7, Length = 7,
MsgType = ?SN_PUBACK, MsgType = ?SN_PUBACK,
PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED:8>>, PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, Rc:8>>,
?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]), ?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket). ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket).

View File

@ -152,7 +152,7 @@ default_conninfo(ConnInfo) ->
clean_start => true, clean_start => true,
clientid => undefined, clientid => undefined,
username => undefined, username => undefined,
conn_props => [], conn_props => #{},
connected => false, connected => false,
connected_at => undefined, connected_at => undefined,
keepalive => undefined, keepalive => undefined,
@ -814,4 +814,3 @@ interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
emqx_stomp_heartbeat:interval(outgoing, HrtBt); emqx_stomp_heartbeat:interval(outgoing, HrtBt);
interval(clean_trans_timer, _) -> interval(clean_trans_timer, _) ->
?TRANS_TIMEOUT. ?TRANS_TIMEOUT.

View File

@ -45,7 +45,7 @@ web.hook.body.encoding_of_payload_field = plain
## If not specified, the server's names returned in server's certificate is validated against ## If not specified, the server's names returned in server's certificate is validated against
## what's provided `web.hook.url` config's host part. ## what's provided `web.hook.url` config's host part.
## Setting to 'disable' will make EMQ X ignore unmatched server names. ## Setting to 'disable' will make EMQX ignore unmatched server names.
## If set with a host name, the server's names returned in server's certificate is validated ## If set with a host name, the server's names returned in server's certificate is validated
## against this value. ## against this value.
## ##

View File

@ -1,5 +1,5 @@
%%-*- mode: erlang -*- %%-*- mode: erlang -*-
%% EMQ X R3.0 config mapping %% EMQX config mapping
{mapping, "web.hook.url", "emqx_web_hook.url", [ {mapping, "web.hook.url", "emqx_web_hook.url", [
{datatype, string} {datatype, string}

View File

@ -1,6 +1,6 @@
{application, emqx_web_hook, {application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"}, [{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.9"}, % strict semver, bump manually! {vsn, "4.3.10"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_web_hook_sup]}, {registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]}, {applications, [kernel,stdlib,ehttpc]},

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[{<<"4\\.3\\.[0-2]">>, [{<<"4\\.3\\.[0-2]$">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -12,8 +12,11 @@
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{"4.3.8", {"4.3.8",
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[ %% nothing so far
]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[0-2]">>, [{<<"4\\.3\\.[0-2]$">>,
[{apply,{application,stop,[emqx_web_hook]}}, [{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -25,4 +28,7 @@
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{"4.3.8", {"4.3.8",
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[ %% nothing so far
]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -6,6 +6,8 @@
-define(TIMEOUT, 300000). -define(TIMEOUT, 300000).
-define(INFO(Fmt,Args), io:format(Fmt++"~n",Args)). -define(INFO(Fmt,Args), io:format(Fmt++"~n",Args)).
-mode(compile).
main([Command0, DistInfoStr | CommandArgs]) -> main([Command0, DistInfoStr | CommandArgs]) ->
%% convert the distribution info arguments string to an erlang term %% convert the distribution info arguments string to an erlang term
{ok, Tokens, _} = erl_scan:string(DistInfoStr ++ "."), {ok, Tokens, _} = erl_scan:string(DistInfoStr ++ "."),
@ -210,15 +212,24 @@ find_and_link_release_package(Version, RelName) ->
ok = filelib:ensure_dir(filename:join([filename:dirname(ReleaseLink), "dummy"])), ok = filelib:ensure_dir(filename:join([filename:dirname(ReleaseLink), "dummy"])),
%% create the symlink pointing to the full path name of the %% create the symlink pointing to the full path name of the
%% release package we found %% release package we found
case file:make_symlink(filename:absname(Filename), ReleaseLink) of make_symlink_or_copy(filename:absname(Filename), ReleaseLink),
ok ->
ok;
{error, eperm} -> % windows!
{ok,_} = file:copy(filename:absname(Filename), ReleaseLink)
end,
{Filename, ReleaseHandlerPackageLink} {Filename, ReleaseHandlerPackageLink}
end. end.
make_symlink_or_copy(Filename, ReleaseLink) ->
case file:make_symlink(Filename, ReleaseLink) of
ok -> ok;
{error, eexist} ->
?INFO("symlink ~p already exists, recreate it", [ReleaseLink]),
ok = file:delete(ReleaseLink),
make_symlink_or_copy(Filename, ReleaseLink);
{error, Reason} when Reason =:= eperm; Reason =:= enotsup ->
{ok, _} = file:copy(Filename, ReleaseLink);
{error, Reason} ->
?INFO("create symlink ~p failed", [ReleaseLink]),
error({Reason, ReleaseLink})
end.
unpack_zipballs(RelNameStr, Version) -> unpack_zipballs(RelNameStr, Version) ->
{ok, Cwd} = file:get_cwd(), {ok, Cwd} = file:get_cwd(),
GzFile = filename:absname(filename:join(["releases", RelNameStr ++ "-" ++ Version ++ ".tar.gz"])), GzFile = filename:absname(filename:join(["releases", RelNameStr ++ "-" ++ Version ++ ".tar.gz"])),

View File

@ -49,7 +49,8 @@ RUN chgrp -Rf emqx /opt/emqx && chmod -Rf g+w /opt/emqx \
USER emqx USER emqx
VOLUME ["/opt/emqx/log", "/opt/emqx/data", "/opt/emqx/etc"] ## NOTE: /opt/emqx/etc is removed from the VOLUME list since 4.3.13
VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
# emqx will occupy these port: # emqx will occupy these port:
# - 1883 port for MQTT # - 1883 port for MQTT

View File

@ -1,4 +1,4 @@
## EMQ X Configuration 4.3 ## EMQX Configuration 4.3
## NOTE: Do not change format of CONFIG_SECTION_{BGN,END} comments! ## NOTE: Do not change format of CONFIG_SECTION_{BGN,END} comments!
@ -208,10 +208,10 @@ node.data_dir = {{ platform_data_dir }}
## the heartbeat pings. ## the heartbeat pings.
## ##
## NOTE: When managed by systemd (or other supervision tools like systemd), ## NOTE: When managed by systemd (or other supervision tools like systemd),
## heart will probably only cause EMQ X to stop, but restart or not will ## heart will probably only cause EMQX to stop, but restart or not will
## depend on systemd's restart strategy. ## depend on systemd's restart strategy.
## NOTE: When running in docker, the container will die as soon as the the ## NOTE: When running in docker, the container will die as soon as the the
## heart process kills EMQ X, but restart or not will depend on container ## heart process kills EMQX, but restart or not will depend on container
## supervision strategy, such as k8s restartPolicy. ## supervision strategy, such as k8s restartPolicy.
## ##
## Value: on ## Value: on
@ -1123,7 +1123,7 @@ listener.tcp.external.zone = external
## Example: allow 192.168.0.0/24 ## Example: allow 192.168.0.0/24
listener.tcp.external.access.1 = allow all listener.tcp.external.access.1 = allow all
## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed ## Enable the Proxy Protocol V1/2 if the EMQX cluster is deployed
## behind HAProxy or Nginx. ## behind HAProxy or Nginx.
## ##
## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/
@ -1131,7 +1131,7 @@ listener.tcp.external.access.1 = allow all
## Value: on | off ## Value: on | off
## listener.tcp.external.proxy_protocol = on ## listener.tcp.external.proxy_protocol = on
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection ## Sets the timeout for proxy protocol. EMQX will close the TCP connection
## if no proxy protocol packet recevied within the timeout. ## if no proxy protocol packet recevied within the timeout.
## ##
## Value: Duration ## Value: Duration
@ -1634,13 +1634,13 @@ listener.ws.external.access.1 = allow all
## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 ## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
## listener.ws.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 ## listener.ws.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
## Specify which HTTP header for real source IP if the EMQ X cluster is ## Specify which HTTP header for real source IP if the EMQX cluster is
## deployed behind NGINX or HAProxy. ## deployed behind NGINX or HAProxy.
## ##
## Default: X-Forwarded-For ## Default: X-Forwarded-For
## listener.ws.external.proxy_address_header = X-Forwarded-For ## listener.ws.external.proxy_address_header = X-Forwarded-For
## Specify which HTTP header for real source port if the EMQ X cluster is ## Specify which HTTP header for real source port if the EMQX cluster is
## deployed behind NGINX or HAProxy. ## deployed behind NGINX or HAProxy.
## ##
## Default: X-Forwarded-Port ## Default: X-Forwarded-Port
@ -1903,13 +1903,13 @@ listener.wss.external.access.1 = allow all
## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 ## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
## listener.wss.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 ## listener.wss.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
## Specify which HTTP header for real source IP if the EMQ X cluster is ## Specify which HTTP header for real source IP if the EMQX cluster is
## deployed behind NGINX or HAProxy. ## deployed behind NGINX or HAProxy.
## ##
## Default: X-Forwarded-For ## Default: X-Forwarded-For
## listener.wss.external.proxy_address_header = X-Forwarded-For ## listener.wss.external.proxy_address_header = X-Forwarded-For
## Specify which HTTP header for real source port if the EMQ X cluster is ## Specify which HTTP header for real source port if the EMQX cluster is
## deployed behind NGINX or HAProxy. ## deployed behind NGINX or HAProxy.
## ##
## Default: X-Forwarded-Port ## Default: X-Forwarded-Port

View File

@ -1,5 +1,5 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## EMQ X Dashboard ## EMQX Dashboard
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Default user's login name. ## Default user's login name.

View File

@ -1,5 +1,5 @@
%%-*- mode: erlang -*- %%-*- mode: erlang -*-
%% EMQ X R4.0 config mapping %% EMQX Config Mapping
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Cluster %% Cluster

View File

@ -45,6 +45,10 @@ esac
case "$SYSTEM" in case "$SYSTEM" in
windows*)
echo "WARNING: skipped downloading relup base for windows because we do not support relup for windows yet."
exit 0
;;
macos*) macos*)
SHASUM="shasum -a 256" SHASUM="shasum -a 256"
;; ;;

View File

@ -32,8 +32,8 @@ Options:
--make-command A command used to assemble the release --make-command A command used to assemble the release
--release-dir Release directory --release-dir Release directory
--src-dirs Directories where source code is found. Defaults to '{src,apps,lib-*}/**/' --src-dirs Directories where source code is found. Defaults to '{src,apps,lib-*}/**/'
--binary-rel-url Binary release URL pattern. %TAG% variable is substituted with the release tag. --binary-rel-url Binary release URL pattern.
E.g. \"https://github.com/emqx/emqx/releases/download/v%TAG%/emqx-centos7-%TAG%-amd64.zip\" E.g. https://www.emqx.com/en/downloads/broker/v4.4.1/emqx-4.4.1-otp24.1.5-3-el7-amd64.zip
". ".
-record(app, -record(app,
@ -171,9 +171,10 @@ download_prev_release(Tag, #{binary_rel_url := {ok, URL0}, clone_url := Repo}) -
BaseDir = "/tmp/emqx-baseline-bin/", BaseDir = "/tmp/emqx-baseline-bin/",
Dir = filename:basename(Repo, ".git") ++ [$-|Tag], Dir = filename:basename(Repo, ".git") ++ [$-|Tag],
Filename = filename:join(BaseDir, Dir), Filename = filename:join(BaseDir, Dir),
Script = "mkdir -p ${OUTFILE} && Script = "echo \"Download: ${OUTFILE}\" &&
wget -c -O ${OUTFILE}.zip ${URL} && mkdir -p ${OUTFILE} &&
unzip -n -d ${OUTFILE} ${OUTFILE}.zip", curl -f -L -o ${OUTFILE}.zip ${URL} &&
unzip -q -n -d ${OUTFILE} ${OUTFILE}.zip",
Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}], Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}],
bash(Script, Env), bash(Script, Env),
{ok, Filename}. {ok, Filename}.
@ -208,8 +209,8 @@ find_appup_actions(App,
{OldUpgrade0, OldDowngrade0} = find_old_appup_actions(App, PrevVersion), {OldUpgrade0, OldDowngrade0} = find_old_appup_actions(App, PrevVersion),
OldUpgrade = ensure_all_patch_versions(App, CurrVersion, OldUpgrade0), OldUpgrade = ensure_all_patch_versions(App, CurrVersion, OldUpgrade0),
OldDowngrade = ensure_all_patch_versions(App, CurrVersion, OldDowngrade0), OldDowngrade = ensure_all_patch_versions(App, CurrVersion, OldDowngrade0),
Upgrade = merge_update_actions(App, diff_app(App, CurrAppIdx, PrevAppIdx), OldUpgrade), Upgrade = merge_update_actions(App, diff_app(up, App, CurrAppIdx, PrevAppIdx), OldUpgrade),
Downgrade = merge_update_actions(App, diff_app(App, PrevAppIdx, CurrAppIdx), OldDowngrade), Downgrade = merge_update_actions(App, diff_app(down, App, PrevAppIdx, CurrAppIdx), OldDowngrade),
if OldUpgrade =:= Upgrade andalso OldDowngrade =:= Downgrade -> if OldUpgrade =:= Upgrade andalso OldDowngrade =:= Downgrade ->
%% The appup file has been already updated: %% The appup file has been already updated:
[]; [];
@ -521,7 +522,7 @@ index_app(AppFile) ->
, modules = Modules , modules = Modules
}}. }}.
diff_app(App, diff_app(UpOrDown, App,
#app{version = NewVersion, modules = NewModules}, #app{version = NewVersion, modules = NewModules},
#app{version = OldVersion, modules = OldModules}) -> #app{version = OldVersion, modules = OldModules}) ->
{New, Changed} = {New, Changed} =
@ -540,13 +541,15 @@ diff_app(App,
), ),
Deleted = maps:keys(maps:without(maps:keys(NewModules), OldModules)), Deleted = maps:keys(maps:without(maps:keys(NewModules), OldModules)),
NChanges = length(New) + length(Changed) + length(Deleted), NChanges = length(New) + length(Changed) + length(Deleted),
if NewVersion =:= OldVersion andalso NChanges > 0 -> case NewVersion =:= OldVersion of
true when NChanges =:= 0 ->
%% no change
ok;
true ->
set_invalid(), set_invalid(),
log("ERROR: Application '~p' contains changes, but its version is not updated~n", [App]); log("ERROR: Application '~p' contains changes, but its version is not updated~n", [App]);
NewVersion > OldVersion -> false ->
log("INFO: Application '~p' has been updated: ~p -> ~p~n", [App, OldVersion, NewVersion]), log("INFO: Application '~p' has been updated: ~p --[~p]--> ~p~n", [App, OldVersion, UpOrDown, NewVersion]),
ok;
true ->
ok ok
end, end,
{New, Changed, Deleted}. {New, Changed, Deleted}.
@ -607,13 +610,18 @@ locate(ebin_current, App, Suffix) ->
locate(src, App, Suffix) -> locate(src, App, Suffix) ->
AppStr = atom_to_list(App), AppStr = atom_to_list(App),
SrcDirs = getopt(src_dirs), SrcDirs = getopt(src_dirs),
case filelib:wildcard(SrcDirs ++ AppStr ++ Suffix) of case find_app(SrcDirs ++ AppStr ++ Suffix) of
[File] -> [File] ->
{ok, File}; {ok, File};
[] -> [] ->
undefined undefined
end. end.
find_app(Pattern) ->
%% exclude _build dir inside apps
lists:filter(fun(S) -> string:find(S, "/_build/") =:= nomatch end,
filelib:wildcard(Pattern)).
bash(Script) -> bash(Script) ->
bash(Script, []). bash(Script, []).

View File

@ -1,54 +1,61 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[{"4.4.1", [{"4.4.1",
[ {load_module,emqx_connection,brutal_purge,soft_purge,[]} [{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
, {load_module,emqx_banned,brutal_purge,soft_purge,[]} {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
, {load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_cm,brutal_purge,soft_purge,[]},
, {load_module,emqx_channel,brutal_purge,soft_purge,[]} {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.4.0", {"4.4.0",
[ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]} [{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
, {load_module,emqx_banned,brutal_purge,soft_purge,[]} {load_module,emqx_cm,brutal_purge,soft_purge,[]},
, {load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
, {load_module,emqx_channel,brutal_purge,soft_purge,[]} {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
, {load_module,emqx_connection,brutal_purge,soft_purge,[]} {load_module,emqx_banned,brutal_purge,soft_purge,[]},
, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]} {load_module,emqx_channel,brutal_purge,soft_purge,[]},
, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}} {load_module,emqx_connection,brutal_purge,soft_purge,[]},
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]} {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]} {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
, {load_module,emqx_session,brutal_purge,soft_purge,[]} {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]} {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
, {load_module,emqx,brutal_purge,soft_purge,[]} {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
, {load_module,emqx_app,brutal_purge,soft_purge,[]} {load_module,emqx_session,brutal_purge,soft_purge,[]},
, {load_module,emqx_message,brutal_purge,soft_purge,[]} {load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
, {load_module,emqx_limiter,brutal_purge,soft_purge,[]} {load_module,emqx,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{<<".*">>,[]} {load_module,emqx_message,brutal_purge,soft_purge,[]},
], {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.1", [{"4.4.1",
[ {load_module,emqx_connection,brutal_purge,soft_purge,[]} [{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
, {load_module,emqx_banned,brutal_purge,soft_purge,[]} {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
, {load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_cm,brutal_purge,soft_purge,[]},
, {load_module,emqx_channel,brutal_purge,soft_purge,[]} {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.4.0", {"4.4.0",
[ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]} [{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
, {load_module,emqx_banned,brutal_purge,soft_purge,[]} {load_module,emqx_cm,brutal_purge,soft_purge,[]},
, {load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
, {load_module,emqx_channel,brutal_purge,soft_purge,[]} {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
, {load_module,emqx_connection,brutal_purge,soft_purge,[]} {load_module,emqx_banned,brutal_purge,soft_purge,[]},
, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]} {load_module,emqx_channel,brutal_purge,soft_purge,[]},
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]} {load_module,emqx_connection,brutal_purge,soft_purge,[]},
, {load_module,emqx_alarm,brutal_purge,soft_purge,[]} {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
, {load_module,emqx_session,brutal_purge,soft_purge,[]} {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]} {load_module,emqx_access_control,brutal_purge,soft_purge,[]},
, {load_module,emqx,brutal_purge,soft_purge,[]} {load_module,emqx_alarm,brutal_purge,soft_purge,[]},
, {load_module,emqx_app,brutal_purge,soft_purge,[]} {load_module,emqx_session,brutal_purge,soft_purge,[]},
, {load_module,emqx_message,brutal_purge,soft_purge,[]} {load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
, {load_module,emqx_limiter,brutal_purge,soft_purge,[]} {load_module,emqx,brutal_purge,soft_purge,[]},
]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{<<".*">>,[]} {load_module,emqx_message,brutal_purge,soft_purge,[]},
] {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
}. {<<".*">>,[]}]}.

View File

@ -235,18 +235,25 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(), Self = self(),
ResumeStart = fun(_) -> ResumeStart = fun(_) ->
case takeover_session(ClientId) of CreateSess =
{ok, ConnMod, ChanPid, Session} -> fun() ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
pendings => Pendings}};
{error, not_found} ->
Session = create_session(ClientInfo, ConnInfo), Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo), register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}} {ok, #{session => Session, present => false}}
end,
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
{ok, Pendings} ->
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
pendings => Pendings}};
{error, _} ->
CreateSess()
end;
{error, _Reason} -> CreateSess()
end end
end, end,
emqx_cm_locker:trans(ClientId, ResumeStart). emqx_cm_locker:trans(ClientId, ResumeStart).
@ -280,9 +287,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined -> undefined ->
{error, not_found}; {error, not_found};
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
%% TODO: if takeover times out, maybe kill the old? case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, Session} ->
{ok, ConnMod, ChanPid, Session} {ok, ConnMod, ChanPid, Session};
{error, Reason} ->
{error, Reason}
end
end; end;
takeover_session(ClientId, ChanPid) -> takeover_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
@ -295,42 +305,63 @@ discard_session(ClientId) when is_binary(ClientId) ->
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
end. end.
%% @private Kick a local stale session to force it step down. %% @private call a local stale session to execute an Action.
%% If failed to kick (e.g. timeout) force a kill. %% If failed to response (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception %% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody. %% benefits nobody.
-spec kick_or_kill(kick | discard, module(), pid()) -> ok. -spec request_stepdown(Action, module(), pid())
kick_or_kill(Action, ConnMod, Pid) -> -> ok
try | {ok, emqx_session:session() | list(emqx_type:deliver())}
| {error, term()}
when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
request_stepdown(Action, ConnMod, Pid) ->
Timeout =
case Action == kick orelse Action == discard of
true -> ?T_KICK;
_ -> ?T_TAKEOVER
end,
Return =
%% this is essentailly a gen_server:call implemented in emqx_connection %% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection. %% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel %% the handle_call is implemented in emqx_channel
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) try apply(ConnMod, call, [Pid, Action, Timeout]) of
catch ok -> ok;
_ : noproc -> % emqx_ws_connection: call Reply -> {ok, Reply}
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); catch
_ : {noproc, _} -> % emqx_connection: gen_server:call _ : noproc -> % emqx_ws_connection: call
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
_ : {shutdown, _} -> {error, noproc};
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); _ : {noproc, _} -> % emqx_connection: gen_server:call
_ : {{shutdown, _}, _} -> ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); {error, noproc};
_ : {timeout, {gen_server, call, _}} -> _ : Reason = {shutdown, _} ->
?tp(warning, "session_kick_timeout", ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
#{pid => Pid, {error, Reason};
action => Action, _ : Reason = {{shutdown, _}, _} ->
stale_channel => stale_channel_info(Pid) ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
}), {error, Reason};
ok = force_kill(Pid); _ : {timeout, {gen_server, call, _}} ->
_ : Error : St -> ?tp(warning, "session_stepdown_request_timeout",
?tp(error, "session_kick_exception", #{pid => Pid,
#{pid => Pid, action => Action,
action => Action, stale_channel => stale_channel_info(Pid)
reason => Error, }),
stacktrace => St, ok = force_kill(Pid),
stale_channel => stale_channel_info(Pid) {error, timeout};
}), _ : Error : St ->
ok = force_kill(Pid) ?tp(error, "session_stepdown_request_exception",
#{pid => Pid,
action => Action,
reason => Error,
stacktrace => St,
stale_channel => stale_channel_info(Pid)
}),
ok = force_kill(Pid),
{error, Error}
end,
case Action == kick orelse Action == discard of
true -> ok;
_ -> Return
end. end.
force_kill(Pid) -> force_kill(Pid) ->
@ -353,7 +384,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
%% already deregistered %% already deregistered
ok; ok;
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
ok = kick_or_kill(Action, ConnMod, ChanPid) ok = request_stepdown(Action, ConnMod, ChanPid)
end; end;
kick_session(Action, ClientId, ChanPid) -> kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded %% call remote node on the old APIs because we do not know if they have upgraded

View File

@ -190,7 +190,7 @@ ensure_system_memory_alarm(HW) ->
case erlang:whereis(memsup) of case erlang:whereis(memsup) of
undefined -> ok; undefined -> ok;
_Pid -> _Pid ->
{Allocated, Total, _Worst} = memsup:get_memory_data(), {Total, Allocated, _Worst} = memsup:get_memory_data(),
case Total =/= 0 andalso Allocated/Total * 100 > HW of case Total =/= 0 andalso Allocated/Total * 100 > HW of
true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW}); true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW});
false -> ok false -> ok

View File

@ -50,9 +50,11 @@ monitor(Pid, PMon) ->
?MODULE:monitor(Pid, undefined, PMon). ?MODULE:monitor(Pid, undefined, PMon).
-spec(monitor(pid(), term(), pmon()) -> pmon()). -spec(monitor(pid(), term(), pmon()) -> pmon()).
monitor(Pid, Val, PMon = ?PMON(Map)) -> monitor(Pid, Val, ?PMON(Map)) ->
case maps:is_key(Pid, Map) of case maps:is_key(Pid, Map) of
true -> PMon; true ->
{Ref, _Val} = maps:get(Pid, Map),
?PMON(maps:put(Pid, {Ref, Val}, Map));
false -> false ->
Ref = erlang:monitor(process, Pid), Ref = erlang:monitor(process, Pid),
?PMON(maps:put(Pid, {Ref, Val}, Map)) ?PMON(maps:put(Pid, {Ref, Val}, Map))

View File

@ -119,7 +119,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
suppress({long_schedule, Port}, suppress({long_schedule, Port},
fun() -> fun() ->
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]), ?LOG(warning, "~s~n~p", [WarnMsg, portinfo(Port)]),
safe_publish(long_schedule, WarnMsg) safe_publish(long_schedule, WarnMsg)
end, State); end, State);
@ -135,7 +135,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
suppress({busy_port, Port}, suppress({busy_port, Port},
fun() -> fun() ->
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), portinfo(Port)]),
safe_publish(busy_port, WarnMsg) safe_publish(busy_port, WarnMsg)
end, State); end, State);
@ -143,7 +143,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
suppress({busy_dist_port, Port}, suppress({busy_dist_port, Port},
fun() -> fun() ->
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), portinfo(Port)]),
safe_publish(busy_dist_port, WarnMsg) safe_publish(busy_dist_port, WarnMsg)
end, State); end, State);
@ -200,3 +200,9 @@ safe_publish(Event, WarnMsg) ->
sysmon_msg(Topic, Payload) -> sysmon_msg(Topic, Payload) ->
Msg = emqx_message:make(?SYSMON, Topic, Payload), Msg = emqx_message:make(?SYSMON, Topic, Payload),
emqx_message:set_flag(sys, Msg). emqx_message:set_flag(sys, Msg).
portinfo(Port) ->
case is_port(Port) andalso erlang:port_info(Port) of
L when is_list(L) -> L;
_ -> []
end.

View File

@ -187,57 +187,89 @@ t_open_session_race_condition(_) ->
ok = flush_emqx_pool(), ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)). ?assertEqual([], emqx_cm:lookup_channels(ClientId)).
t_kick_session_discard_normal(_) -> t_stepdown_sessiondiscard_normal(_) ->
test_kick_session(discard, normal). test_stepdown_session(discard, normal).
t_kick_session_discard_shutdown(_) -> t_stepdown_sessiondiscard_shutdown(_) ->
test_kick_session(discard, shutdown). test_stepdown_session(discard, shutdown).
t_kick_session_discard_shutdown_with_reason(_) -> t_stepdown_sessiondiscard_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}). test_stepdown_session(discard, {shutdown, discard}).
t_kick_session_discard_timeout(_) -> t_stepdown_sessiondiscard_timeout(_) ->
test_kick_session(discard, timeout). test_stepdown_session(discard, timeout).
t_kick_session_discard_noproc(_) -> t_stepdown_sessiondiscard_noproc(_) ->
test_kick_session(discard, noproc). test_stepdown_session(discard, noproc).
t_kick_session_kick_normal(_) -> t_stepdown_sessionkick_normal(_) ->
test_kick_session(discard, normal). test_stepdown_session(kick, normal).
t_kick_session_kick_shutdown(_) -> t_stepdown_sessionkick_shutdown(_) ->
test_kick_session(discard, shutdown). test_stepdown_session(kick, shutdown).
t_kick_session_kick_shutdown_with_reason(_) -> t_stepdown_sessionkick_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}). test_stepdown_session(kick, {shutdown, discard}).
t_kick_session_kick_timeout(_) -> t_stepdown_sessionkick_timeout(_) ->
test_kick_session(discard, timeout). test_stepdown_session(kick, timeout).
t_kick_session_kick_noproc(_) -> t_stepdown_sessionkick_noproc(_) ->
test_kick_session(discard, noproc). test_stepdown_session(discard, noproc).
test_kick_session(Action, Reason) -> t_stepdown_sessiontakeover_begin_normal(_) ->
test_stepdown_session({takeover, 'begin'}, normal).
t_stepdown_sessiontakeover_begin_shutdown(_) ->
test_stepdown_session({takeover, 'begin'}, shutdown).
t_stepdown_sessiontakeover_begin_shutdown_with_reason(_) ->
test_stepdown_session({takeover, 'begin'}, {shutdown, discard}).
t_stepdown_sessiontakeover_begin_timeout(_) ->
test_stepdown_session({takeover, 'begin'}, timeout).
t_stepdown_sessiontakeover_begin_noproc(_) ->
test_stepdown_session({takeover, 'begin'}, noproc).
t_stepdown_sessiontakeover_end_normal(_) ->
test_stepdown_session({takeover, 'end'}, normal).
t_stepdown_sessiontakeover_end_shutdown(_) ->
test_stepdown_session({takeover, 'end'}, shutdown).
t_stepdown_sessiontakeover_end_shutdown_with_reason(_) ->
test_stepdown_session({takeover, 'end'}, {shutdown, discard}).
t_stepdown_sessiontakeover_end_timeout(_) ->
test_stepdown_session({takeover, 'end'}, timeout).
t_stepdown_sessiontakeover_end_noproc(_) ->
test_stepdown_session({takeover, 'end'}, noproc).
test_stepdown_session(Action, Reason) ->
ClientId = rand_client_id(), ClientId = rand_client_id(),
#{conninfo := ConnInfo} = ?ChanInfo, #{conninfo := ConnInfo} = ?ChanInfo,
FakeSessionFun = FakeSessionFun =
fun Loop() -> fun Loop() ->
receive receive
{'$gen_call', From, A} when A =:= kick orelse {'$gen_call', From, A} when A =:= kick orelse
A =:= discard -> A =:= discard orelse
case Reason of A =:= {takeover, 'begin'} orelse
normal -> A =:= {takeover, 'end'} ->
gen_server:reply(From, ok); case Reason of
timeout -> normal when A =:= kick orelse A =:= discard ->
%% no response to the call gen_server:reply(From, ok);
Loop(); timeout ->
_ -> %% no response to the call
exit(Reason) Loop();
end; _ ->
Msg -> exit(Reason)
ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), end;
Loop() Msg ->
end ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]),
Loop()
end
end, end,
{Pid1, _} = spawn_monitor(FakeSessionFun), {Pid1, _} = spawn_monitor(FakeSessionFun),
{Pid2, _} = spawn_monitor(FakeSessionFun), {Pid2, _} = spawn_monitor(FakeSessionFun),
@ -249,10 +281,11 @@ test_kick_session(Action, Reason) ->
noproc -> exit(Pid1, kill), exit(Pid2, kill); noproc -> exit(Pid1, kill), exit(Pid2, kill);
_ -> ok _ -> ok
end, end,
ok = case Action of _ = case Action of
kick -> emqx_cm:kick_session(ClientId); kick -> emqx_cm:kick_session(ClientId);
discard -> emqx_cm:discard_session(ClientId) discard -> emqx_cm:discard_session(ClientId);
end, {takeover, _} -> emqx_cm:takeover_session(ClientId)
end,
case Reason =:= timeout orelse Reason =:= noproc of case Reason =:= timeout orelse Reason =:= noproc of
true -> true ->
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)),

View File

@ -33,6 +33,11 @@
{self(), busy_port, {self(), busy_port,
concat_str("busy_port warning: suspid = ~p, port = ~p", concat_str("busy_port warning: suspid = ~p, port = ~p",
self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")}, self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")},
%% for the case when the port is missing, for some
%% reason.
{self(), busy_port,
concat_str("busy_port warning: suspid = ~p, port = ~p",
self(), []), []},
{self(), busy_dist_port, {self(), busy_dist_port,
concat_str("busy_dist_port warning: suspid = ~p, port = ~p", concat_str("busy_dist_port warning: suspid = ~p, port = ~p",
self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")}, self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")},
@ -122,6 +127,16 @@ t_sys_mon(_Config) ->
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort)
end, ?INPUTINFO). end, ?INPUTINFO).
%% Existing port, but closed.
t_sys_mon_dead_port(_Config) ->
process_flag(trap_exit, true),
Port = dead_port(),
{PidOrPort, SysMonName, ValidateInfo, InfoOrPort} =
{self(), busy_port,
concat_str("busy_port warning: suspid = ~p, port = ~p",
self(), Port), Port},
validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort).
t_sys_mon2(_Config) -> t_sys_mon2(_Config) ->
?SYSMON ! {timeout, ignored, reset}, ?SYSMON ! {timeout, ignored, reset},
?SYSMON ! {ignored}, ?SYSMON ! {ignored},
@ -155,3 +170,8 @@ some_function(Parent, _Arg2) ->
stop -> stop ->
ok ok
end. end.
dead_port() ->
Port = erlang:open_port({spawn, "ls"}, []),
exit(Port, kill),
Port.