diff --git a/.ci/fvt_tests/relup.lux b/.ci/fvt_tests/relup.lux index 6da46a706..e3d28a2e8 100644 --- a/.ci/fvt_tests/relup.lux +++ b/.ci/fvt_tests/relup.lux @@ -20,6 +20,7 @@ ?> [shell emqx] + !OLD_VSN=$(echo $OLD_VSN | sed -r 's/[v|e]//g') !cd $PACKAGE_PATH !unzip -q -o $PROFILE-$(echo $OLD_VSN | sed -r 's/[v|e]//g')-otp${FROM_OTP_VSN}-ubuntu20.04-amd64.zip ?SH-PROMPT @@ -32,6 +33,7 @@ ?SH-PROMPT [shell emqx2] + !OLD_VSN=$(echo $OLD_VSN | sed -r 's/[v|e]//g') !cd $PACKAGE_PATH !cp -f $ONE_MORE_EMQX_PATH/one_more_$(echo $PROFILE | sed 's/-/_/g').sh . !./one_more_$(echo $PROFILE | sed 's/-/_/g').sh emqx2 @@ -84,6 +86,27 @@ !cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/ + ## upgrade to the new version + !./bin/emqx install $VSN + ?Made release permanent: "$VSN" + ?SH-PROMPT + + !./bin/emqx versions |grep permanent + ?(.*)$VSN + ?SH-PROMPT + + ## downgrade to the old version + !./bin/emqx install $${OLD_VSN} + ?Made release permanent:.* + ?SH-PROMPT + + !./bin/emqx versions |grep permanent | grep -qs "$${OLD_VSN}" + ?SH-PROMPT: + !echo ==$$?== + ?^==0== + ?SH-PROMPT: + + ## again, upgrade to the new version !./bin/emqx install $VSN ?Made release permanent: "$VSN" ?SH-PROMPT @@ -109,6 +132,27 @@ !cp -f ../$PROFILE-$VSN-otp${TO_OTP_VSN}-ubuntu20.04-amd64.zip releases/ + ## upgrade to the new version + !./bin/emqx install $VSN + ?Made release permanent: "$VSN" + ?SH-PROMPT + + !./bin/emqx versions |grep permanent + ?(.*)$VSN + ?SH-PROMPT + + ## downgrade to the old version + !./bin/emqx install $${OLD_VSN} + ?Made release permanent:.* + ?SH-PROMPT + + !./bin/emqx versions |grep permanent | grep -qs "$${OLD_VSN}" + ?SH-PROMPT: + !echo ==$$?== + ?^==0== + ?SH-PROMPT: + + ## again, upgrade to the new version !./bin/emqx install $VSN ?Made release permanent: "$VSN" ?SH-PROMPT @@ -138,17 +182,20 @@ !./bin/emqx_ctl broker metrics | grep "messages.publish" ???SH-PROMPT +## We don't guarantee not to lose a single message! +## So even if we received 290~300 messages, we consider it as success [shell bench] !curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].metrics[] | select(.node==\"emqx@127.0.0.1\").matched" - ?300 + ?(29[0-9])|(300) ?SH-PROMPT !curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].actions[0].metrics[] | select(.node==\"emqx@127.0.0.1\").success" - ?300 + ?(29[0-9])|(300) ?SH-PROMPT + ## The /counter API is provided by .ci/fvt_test/http_server !curl http://127.0.0.1:8080/counter - ???{"data":300,"code":0} + ?\{"data":(29[0-9])|(300),"code":0\} ?SH-PROMPT [shell emqx2] diff --git a/.github/ISSUE_TEMPLATE/bug-report.md b/.github/ISSUE_TEMPLATE/bug-report.md index 0258866dd..d8a0e22cd 100644 --- a/.github/ISSUE_TEMPLATE/bug-report.md +++ b/.github/ISSUE_TEMPLATE/bug-report.md @@ -12,11 +12,11 @@ assignees: tigercl **Environment**: -- EMQ X version (e.g. `emqx_ctl status`): +- EMQX version (e.g. `emqx_ctl status`): - Hardware configuration (e.g. `lscpu`): - OS (e.g. `cat /etc/os-release`): - Kernel (e.g. `uname -a`): -- Erlang/OTP version (in case you build emqx from source code): +- Erlang/OTP version (in case you build emqx from source code): - Others: **What happened and what you expected to happen**: diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index ed3050058..648086ec4 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -12,15 +12,30 @@ File format: ## v4.3.13 +### Important changes + +* For docker image, /opt/emqx/etc has been removed from the VOLUME list, + this made it easier for the users to rebuild image on top with changed configs. + ### Enhancements * CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache, to force an immediate reload of all certificates after the files are updated on disk. +* Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983] +* Force shutdown of processe that cannot answer takeover event [#7026] + +* `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter) ### Bug fixes +* Fix the `{error,eexist}` error when do release upgrade again if last run failed. [#7121] * Fix case where publishing to a non-existent topic alias would crash the connection [#6979] * Fix HTTP-API 500 error on querying the lwm2m client list on the another node [#7009] +* Fix the ExProto connection registry is not released after the client process abnormally exits [#6983] +* Fix Server-KeepAlive wrongly applied on MQTT v3.0/v3.1 [#7085] +* Fix Stomp client can not trigger `$event/client_connection` message [#7096] +* Fix system memory false alarm at boot +* Fix the MQTT-SN message replay when the topic is not registered to the client [#6970] ## v4.3.12 ### Important changes diff --git a/Makefile b/Makefile index 9e749a676..08a7a615f 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,6 @@ export EMQX_DEFAULT_BUILDER = ghcr.io/emqx/emqx-builder/4.4-5:24.1.5-3-alpine3.1 export EMQX_DEFAULT_RUNNER = alpine:3.14 export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh) export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) -export EMQX_DESC ?= EMQ X export EMQX_CE_DASHBOARD_VERSION ?= v4.4.0 export DOCKERFILE := deploy/docker/Dockerfile export DOCKERFILE_TESTING := deploy/docker/Dockerfile.testing diff --git a/apps/emqx_auth_http/etc/emqx_auth_http.conf b/apps/emqx_auth_http/etc/emqx_auth_http.conf index 2dd131dec..584448046 100644 --- a/apps/emqx_auth_http/etc/emqx_auth_http.conf +++ b/apps/emqx_auth_http/etc/emqx_auth_http.conf @@ -168,7 +168,7 @@ auth.http.enable_pipelining = true ## If not specified, the server's names returned in server's certificate is validated against ## what's provided `auth.http.auth_req.url` config's host part. -## Setting to 'disable' will make EMQ X ignore unmatched server names. +## Setting to 'disable' will make EMQX ignore unmatched server names. ## If set with a host name, the server's names returned in server's certificate is validated ## against this value. ## diff --git a/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf b/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf index 8baddae19..46a938d05 100644 --- a/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf +++ b/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf @@ -83,7 +83,7 @@ auth.mongo.database = mqtt ## If not specified, the server's names returned in server's certificate is validated against ## what's provided `auth.mongo.server` config's host part. -## Setting to 'disable' will make EMQ X ignore unmatched server names. +## Setting to 'disable' will make EMQX ignore unmatched server names. ## If set with a host name, the server's names returned in server's certificate is validated ## against this value. ## diff --git a/apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf b/apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf index 6014329b3..e38357e07 100644 --- a/apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf +++ b/apps/emqx_auth_mysql/etc/emqx_auth_mysql.conf @@ -123,7 +123,7 @@ auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic f ## If not specified, the server's names returned in server's certificate is validated against ## what's provided `auth.mysql.server` config's host part. -## Setting to 'disable' will make EMQ X ignore unmatched server names. +## Setting to 'disable' will make EMQX ignore unmatched server names. ## If set with a host name, the server's names returned in server's certificate is validated ## against this value. ## diff --git a/apps/emqx_auth_pgsql/etc/emqx_auth_pgsql.conf b/apps/emqx_auth_pgsql/etc/emqx_auth_pgsql.conf index e39d0c78a..b58be120b 100644 --- a/apps/emqx_auth_pgsql/etc/emqx_auth_pgsql.conf +++ b/apps/emqx_auth_pgsql/etc/emqx_auth_pgsql.conf @@ -70,7 +70,7 @@ auth.pgsql.ssl = off ## If not specified, the server's names returned in server's certificate is validated against ## what's provided `auth.pgsql.server` config's host part. -## Setting to 'disable' will make EMQ X ignore unmatched server names. +## Setting to 'disable' will make EMQX ignore unmatched server names. ## If set with a host name, the server's names returned in server's certificate is validated ## against this value. ## diff --git a/apps/emqx_auth_redis/etc/emqx_auth_redis.conf b/apps/emqx_auth_redis/etc/emqx_auth_redis.conf index 5a56c5dce..44aa250f9 100644 --- a/apps/emqx_auth_redis/etc/emqx_auth_redis.conf +++ b/apps/emqx_auth_redis/etc/emqx_auth_redis.conf @@ -123,9 +123,9 @@ auth.redis.acl_cmd = HGETALL mqtt_acl:%u ## If not specified, the server's names returned in server's certificate is validated against ## what's provided `auth.redis.server` config's host part. -## Setting to 'disable' will make EMQ X ignore unmatched server names. +## Setting to 'disable' will make EMQX ignore unmatched server names. ## If set with a host name, the server's names returned in server's certificate is validated ## against this value. ## ## Value: String | disable -## auth.redis.ssl.server_name_indication = disable \ No newline at end of file +## auth.redis.ssl.server_name_indication = disable diff --git a/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf b/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf index f92ef042b..1192863c0 100644 --- a/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf +++ b/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf @@ -1,5 +1,5 @@ ##==================================================================== -## Configuration for EMQ X MQTT Broker Bridge +## Configuration for EMQX MQTT Broker Bridge ##==================================================================== ##-------------------------------------------------------------------- diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index 52b4e24fc..21fc2d8b1 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -31,4 +31,4 @@ ]}, {<<".*">>, []} ] -}. +}. \ No newline at end of file diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index 3bd12564c..ecb425226 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -37,6 +37,11 @@ , handle_disconnected/2 ]). +%% for testing +-ifdef(TEST). +-export([ replvar/1 ]). +-endif. + -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -176,13 +181,13 @@ subscribe_remote_topics(ClientPid, Subscriptions) -> end end, Subscriptions). +replvar(Options) -> + replvar([topic, clientid, max_inflight], Options). + %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -replvar(Options) -> - replvar([clientid, max_inflight], Options). - replvar([], Options) -> Options; replvar([Key|More], Options) -> @@ -194,8 +199,8 @@ replvar([Key|More], Options) -> end. %% ${node} => node() -feedvar(clientid, ClientId, _) -> - iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))); +feedvar(Key, Value, _) when Key =:= topic; Key =:= clientid -> + iolist_to_binary(re:replace(Value, "\\${node}", atom_to_list(node()))); feedvar(max_inflight, 0, _) -> infinity; diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl index 830fb1fe0..d37f1680b 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl @@ -44,4 +44,15 @@ send_and_ack_test() -> ok = emqx_bridge_mqtt:stop(Conn) after meck:unload(emqtt) - end. \ No newline at end of file + end. + +replvar_test() -> + Node = atom_to_list(node()), + Config = #{clientid => <<"Hey ${node}">>, topic => <<"topic ${node}">>, other => <<"other">>}, + + ReplacedConfig = emqx_bridge_mqtt:replvar(Config), + + ExpectedConfig = #{clientid => iolist_to_binary("Hey " ++ Node), + topic => iolist_to_binary("topic " ++ Node), + other => <<"other">>}, + ?assertEqual(ExpectedConfig, ReplacedConfig). diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index 23895f902..0608fdadc 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -1,5 +1,5 @@ ##==================================================================== -## EMQ X Hooks +## EMQX Hooks ##==================================================================== ## The default value or action will be returned, while the request to diff --git a/apps/emqx_exproto/etc/emqx_exproto.conf b/apps/emqx_exproto/etc/emqx_exproto.conf index 713685734..ae79e1a42 100644 --- a/apps/emqx_exproto/etc/emqx_exproto.conf +++ b/apps/emqx_exproto/etc/emqx_exproto.conf @@ -1,5 +1,5 @@ ##==================================================================== -## EMQ X ExProto +## EMQX ExProto ##==================================================================== exproto.server.http.port = 9100 @@ -65,7 +65,7 @@ exproto.listener.protoname.idle_timeout = 30s ## Example: allow 192.168.0.0/24 exproto.listener.protoname.access.1 = allow all -## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed +## Enable the Proxy Protocol V1/2 if the EMQX cluster is deployed ## behind HAProxy or Nginx. ## ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ @@ -73,7 +73,7 @@ exproto.listener.protoname.access.1 = allow all ## Value: on | off ## exproto.listener.protoname.proxy_protocol = on -## Sets the timeout for proxy protocol. EMQ X will close the TCP connection +## Sets the timeout for proxy protocol. EMQX will close the TCP connection ## if no proxy protocol packet recevied within the timeout. ## ## Value: Duration diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src index 0ed54e6fd..4ce137fc5 100644 --- a/apps/emqx_exproto/src/emqx_exproto.app.src +++ b/apps/emqx_exproto/src/emqx_exproto.app.src @@ -1,6 +1,6 @@ {application, emqx_exproto, [{description, "EMQ X Extension for Protocol"}, - {vsn, "4.3.5"}, %% 4.3.3 is used by ee + {vsn, "4.3.6"}, %% 4.3.3 is used by ee {modules, []}, {registered, []}, {mod, {emqx_exproto_app, []}}, diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index 7da28e773..4b58335b4 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,28 +1,24 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.4", - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {"4.3.3", + [{<<"4\\.3\\.[4-5]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {"4.3.2", + {<<"4\\.3\\.[2-3]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, + {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.4", - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {"4.3.3", + [{<<"4\\.3\\.[4-5]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {"4.3.2", + {<<"4\\.3\\.[2-3]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, - {<<"4.3.[0-1]">>, + {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 44b8b64d3..527f2e9b7 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -94,9 +94,6 @@ awaiting_rel_max ]). --define(CHANMOCK(P), {exproto_anonymous_client, P}). --define(CHAN_CONN_TAB, emqx_channel_conn). - %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -155,15 +152,14 @@ init(ConnInfo = #{socktype := Socktype, Channel = #channel{gcli = #{channel => GRpcChann}, conninfo = NConnInfo, clientinfo = ClientInfo, - conn_state = connecting, + conn_state = accepted, timers = #{} }, case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of {error, _Reason} -> throw(nopermission); _ -> - ConnMod = maps:get(conn_mod, NConnInfo), - true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}), + ok = register_the_anonymous_client(ClientInfo, NConnInfo), Req = #{conninfo => peercert(Peercert, #{socktype => socktype(Socktype), @@ -172,6 +168,22 @@ init(ConnInfo = #{socktype := Socktype, try_dispatch(on_socket_created, wrap(Req), Channel) end. +register_the_anonymous_client(ClientInfo, ConnInfo) -> + ClientId = maps:get(clientid, ClientInfo), + case emqx_cm:open_session(true, ClientInfo, ConnInfo) of + {ok, _} -> + ?LOG(debug, "Registered an anonymous connection, " + "temporary clientid: ~s", [ClientId]), + emqx_logger:set_metadata_clientid(ClientId), + _ = self() ! {event, accepted}, + ok; + {error, Reason} -> + throw({register_anonymous_error, Reason}) + end. + +unregister_the_anonymous_client(ClientId) -> + emqx_cm:unregister_channel(ClientId). + %% @private peercert(NoSsl, ConnInfo) when NoSsl == nossl; NoSsl == undefined -> @@ -274,15 +286,14 @@ handle_call(close, Channel) -> handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) -> ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; -handle_call({auth, ClientInfo0, Password}, +handle_call({auth, RequestedClientInfo, Password}, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> - ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), - NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo), + clientinfo = ClientInfo0}) -> + ClientInfo1 = enrich_clientinfo(RequestedClientInfo, ClientInfo0), + NConnInfo = enrich_conninfo(RequestedClientInfo, ConnInfo), Channel1 = Channel#channel{conninfo = NConnInfo, clientinfo = ClientInfo1}, - #{clientid := ClientId, username := Username} = ClientInfo1, case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of @@ -292,9 +303,10 @@ handle_call({auth, ClientInfo0, Password}, emqx_metrics:inc('client.auth.anonymous'), NClientInfo = maps:merge(ClientInfo1, AuthResult), NChannel = Channel1#channel{clientinfo = NClientInfo}, - clean_anonymous_clients(), case emqx_cm:open_session(true, NClientInfo, NConnInfo) of {ok, _Session} -> + AnonymousClientId = maps:get(clientid, ClientInfo0), + unregister_the_anonymous_client(AnonymousClientId), ?LOG(debug, "Client ~s (Username: '~s') authorized successfully!", [ClientId, Username]), {reply, ok, [{event, connected}], ensure_connected(NChannel)}; @@ -354,6 +366,9 @@ handle_call({publish, Topic, Qos, Payload}, handle_call(kick, Channel) -> {shutdown, kicked, ok, Channel}; +handle_call(discard, Channel) -> + {shutdown, discarded, ok, Channel}; + handle_call(Req, Channel) -> ?LOG(warning, "Unexpected call: ~p", [Req]), {reply, {error, unexpected_call}, Channel}. @@ -406,16 +421,12 @@ handle_info(Info, Channel) -> -spec(terminate(any(), channel()) -> channel()). terminate(Reason, Channel) -> - clean_anonymous_clients(), Req = #{reason => stringfy(Reason)}, try_dispatch(on_socket_closed, wrap(Req), Channel). is_anonymous(#{anonymous := true}) -> true; is_anonymous(_AuthResult) -> false. -clean_anonymous_clients() -> - ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())). - packet_to_message(Topic, Qos, Payload, #channel{ conninfo = #{proto_ver := ProtoVer}, @@ -608,23 +619,32 @@ default_conninfo(ConnInfo) -> username => undefined, conn_props => #{}, connected => true, + proto_name => <<"exproto">>, + proto_ver => <<"1.0">>, connected_at => erlang:system_time(millisecond), keepalive => 0, receive_maximum => 0, expiry_interval => 0}. -default_clientinfo(#{peername := {PeerHost, _}, +default_clientinfo(#{peername := {PeerHost, PeerPort}, sockname := {_, SockPort}}) -> #{zone => external, - protocol => undefined, + protocol => exproto, peerhost => PeerHost, sockport => SockPort, - clientid => undefined, + clientid => anonymous_clientid(PeerHost, PeerPort), username => undefined, is_bridge => false, is_superuser => false, mountpoint => undefined}. +anonymous_clientid(PeerHost, PeerPort) -> + iolist_to_binary( + ["exproto-anonymous-", + inet:ntoa(PeerHost), "-", integer_to_list(PeerPort), + "-", emqx_rule_id:gen() + ]). + stringfy(Reason) -> unicode:characters_to_binary((io_lib:format("~0p", [Reason]))). diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index 02c0b31d6..f0cca7bec 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -439,7 +439,8 @@ handle_msg({close, Reason}, State) -> ?LOG(debug, "Force to close the socket due to ~p", [Reason]), handle_info({sock_closed, Reason}, close_socket(State)); -handle_msg({event, connected}, State = #state{channel = Channel}) -> +handle_msg({event, Event}, State = #state{channel = Channel}) + when Event == connected; Event == accepted -> ClientId = emqx_exproto_channel:info(clientid, Channel), emqx_cm:insert_channel_info(ClientId, info(State), stats(State)); diff --git a/apps/emqx_lua_hook/etc/emqx_lua_hook.conf b/apps/emqx_lua_hook/etc/emqx_lua_hook.conf index f0256afae..c8d220677 100644 --- a/apps/emqx_lua_hook/etc/emqx_lua_hook.conf +++ b/apps/emqx_lua_hook/etc/emqx_lua_hook.conf @@ -1,4 +1,4 @@ ##-------------------------------------------------------------------- -## EMQ X Lua Hook +## EMQX Lua Hook ##-------------------------------------------------------------------- diff --git a/apps/emqx_management/etc/emqx_management.conf b/apps/emqx_management/etc/emqx_management.conf index aa737add9..0170059d7 100644 --- a/apps/emqx_management/etc/emqx_management.conf +++ b/apps/emqx_management/etc/emqx_management.conf @@ -1,5 +1,5 @@ ##-------------------------------------------------------------------- -## EMQ X Management Plugin +## EMQX Management Plugin ##-------------------------------------------------------------------- ## Max Row Limit diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index de1fbabde..985752f73 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -297,6 +297,7 @@ format_channel_info({_Key, Info, Stats0}) -> SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), Connected = case maps:get(conn_state, Info, connected) of connected -> true; + accepted -> true; %% for exproto anonymous clients _ -> false end, NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), diff --git a/apps/emqx_management/test/etc/emqx_management.conf b/apps/emqx_management/test/etc/emqx_management.conf index cec70cc8e..074e10cd1 100644 --- a/apps/emqx_management/test/etc/emqx_management.conf +++ b/apps/emqx_management/test/etc/emqx_management.conf @@ -1,5 +1,5 @@ ##-------------------------------------------------------------------- -## EMQ X Management Plugin +## EMQX Management Plugin ##-------------------------------------------------------------------- ## Max Row Limit diff --git a/apps/emqx_prometheus/etc/emqx_prometheus.conf b/apps/emqx_prometheus/etc/emqx_prometheus.conf index 7bfa22095..48765f207 100644 --- a/apps/emqx_prometheus/etc/emqx_prometheus.conf +++ b/apps/emqx_prometheus/etc/emqx_prometheus.conf @@ -1,5 +1,5 @@ ##-------------------------------------------------------------------- -## emqx_prometheus for EMQ X +## emqx_prometheus for EMQX ##-------------------------------------------------------------------- ## The Prometheus Push Gateway URL address diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 8b5c251c6..1b6e1082e 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -1,5 +1,5 @@ ##-------------------------------------------------------------------- -## EMQ X Retainer +## EMQX Retainer ##-------------------------------------------------------------------- ## Where to store the retained messages. diff --git a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf index 2fe946779..c0cba59bd 100644 --- a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf +++ b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf @@ -1,5 +1,5 @@ ##==================================================================== -## Rule Engine for EMQ X R4.0 +## EMQX Rule Engine ##==================================================================== rule_engine.ignore_sys_message = on diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index ef0d24bf9..319137fed 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.5"}, % strict semver, bump manually! + {vsn, "4.3.6"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 749a72956..6a4eb66d1 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -1,29 +1,25 @@ %% -*- mode: erlang -*- {VSN, [ - {"4.3.4",[ + {<<"4\\.3\\.[4-5]">>,[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} ]}, - {"4.3.3",[ - {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} - ]}, - {"4.3.2", [ + {<<"4.3.[2-3]">>,[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} ], [ - {"4.3.4",[ + {<<"4\\.3\\.[4-5]">>,[ + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} ]}, - {"4.3.3",[ - {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, - {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} - ]}, - {"4.3.2", [ + {<<"4.3.[2-3]">>,[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_sn/src/emqx_sn_frame.erl index eed32803d..28a20956e 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_sn/src/emqx_sn_frame.erl @@ -268,40 +268,81 @@ message_type(16#1d) -> message_type(Type) -> io_lib:format("Unknown Type ~p", [Type]). +format(?SN_CONNECT_MSG(Flags, ProtocolId, Duration, ClientId)) -> + #mqtt_sn_flags{ + will = Will, + clean_start = CleanStart} = Flags, + io_lib:format("SN_CONNECT(W~w, C~w, ProtocolId=~w, Duration=~w, " + "ClientId=~s)", + [bool(Will), bool(CleanStart), + ProtocolId, Duration, ClientId]); +format(?SN_CONNACK_MSG(ReturnCode)) -> + io_lib:format("SN_CONNACK(ReturnCode=~w)", [ReturnCode]); +format(?SN_WILLTOPICREQ_MSG()) -> + "SN_WILLTOPICREQ()"; +format(?SN_WILLTOPIC_MSG(Flags, Topic)) -> + #mqtt_sn_flags{ + qos = QoS, + retain = Retain} = Flags, + io_lib:format("SN_WILLTOPIC(Q~w, R~w, Topic=~s)", + [QoS, bool(Retain), Topic]); +format(?SN_WILLTOPIC_EMPTY_MSG) -> + "SN_WILLTOPIC(_)"; +format(?SN_WILLMSGREQ_MSG()) -> + "SN_WILLMSGREQ()"; +format(?SN_WILLMSG_MSG(Msg)) -> + io_lib:format("SN_WILLMSG_MSG(Msg=~p)", [Msg]); format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) -> - io_lib:format("mqtt_sn_message SN_PUBLISH, ~s, TopicId=~w, MsgId=~w, Payload=~w", - [format_flag(Flags), TopicId, MsgId, Data]); -format(?SN_PUBACK_MSG(Flags, MsgId, ReturnCode)) -> - io_lib:format("mqtt_sn_message SN_PUBACK, ~s, MsgId=~w, ReturnCode=~w", - [format_flag(Flags), MsgId, ReturnCode]); + #mqtt_sn_flags{ + dup = Dup, + qos = QoS, + retain = Retain, + topic_id_type = TopicIdType} = Flags, + io_lib:format("SN_PUBLISH(D~w, Q~w, R~w, TopicIdType=~w, TopicId=~w, " + "MsgId=~w, Payload=~p)", + [bool(Dup), QoS, bool(Retain), + TopicIdType, TopicId, MsgId, Data]); +format(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)) -> + io_lib:format("SN_PUBACK(TopicId=~w, MsgId=~w, ReturnCode=~w)", + [TopicId, MsgId, ReturnCode]); format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) -> - io_lib:format("mqtt_sn_message SN_PUBCOMP, MsgId=~w", [MsgId]); + io_lib:format("SN_PUBCOMP(MsgId=~w)", [MsgId]); format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) -> - io_lib:format("mqtt_sn_message SN_PUBREC, MsgId=~w", [MsgId]); + io_lib:format("SN_PUBREC(MsgId=~w)", [MsgId]); format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) -> - io_lib:format("mqtt_sn_message SN_PUBREL, MsgId=~w", [MsgId]); + io_lib:format("SN_PUBREL(MsgId=~w)", [MsgId]); format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) -> - io_lib:format("mqtt_sn_message SN_SUBSCRIBE, ~s, MsgId=~w, TopicId=~w", - [format_flag(Flags), Msgid, Topic]); + #mqtt_sn_flags{ + dup = Dup, + qos = QoS, + topic_id_type = TopicIdType} = Flags, + io_lib:format("SN_SUBSCRIBE(D~w, Q~w, TopicIdType=~w, MsgId=~w, " + "TopicId=~w)", + [bool(Dup), QoS, TopicIdType, Msgid, Topic]); format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) -> - io_lib:format("mqtt_sn_message SN_SUBACK, ~s, MsgId=~w, TopicId=~w, ReturnCode=~w", - [format_flag(Flags), MsgId, TopicId, ReturnCode]); + #mqtt_sn_flags{qos = QoS} = Flags, + io_lib:format("SN_SUBACK(GrantedQoS=~w, MsgId=~w, TopicId=~w, " + "ReturnCode=~w)", + [QoS, MsgId, TopicId, ReturnCode]); format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) -> - io_lib:format("mqtt_sn_message SN_UNSUBSCRIBE, ~s, MsgId=~w, TopicId=~w", - [format_flag(Flags), Msgid, Topic]); + #mqtt_sn_flags{topic_id_type = TopicIdType} = Flags, + io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~s, MsgId=~w, TopicId=~w)", + [TopicIdType, Msgid, Topic]); format(?SN_UNSUBACK_MSG(MsgId)) -> - io_lib:format("mqtt_sn_message SN_UNSUBACK, MsgId=~w", [MsgId]); + io_lib:format("SN_UNSUBACK(MsgId=~w)", [MsgId]); format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) -> - io_lib:format("mqtt_sn_message SN_REGISTER, TopicId=~w, MsgId=~w, TopicName=~w", + io_lib:format("SN_REGISTER(TopicId=~w, MsgId=~w, TopicName=~s)", [TopicId, MsgId, TopicName]); format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) -> - io_lib:format("mqtt_sn_message SN_REGACK, TopicId=~w, MsgId=~w, ReturnCode=~w", + io_lib:format("SN_REGACK(TopicId=~w, MsgId=~w, ReturnCode=~w)", [TopicId, MsgId, ReturnCode]); +format(?SN_PINGREQ_MSG(ClientId)) -> + io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]); +format(?SN_PINGRESP_MSG()) -> + "SN_PINGREQ()"; +format(?SN_DISCONNECT_MSG(Duration)) -> + io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]); + format(#mqtt_sn_message{type = Type, variable = Var}) -> - io_lib:format("mqtt_sn_message type=~s, Var=~w", [emqx_sn_frame:message_type(Type), Var]). - -format_flag(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will, clean_start = CleanStart, topic_id_type = TopicType}) -> - io_lib:format("mqtt_sn_flags{dup=~p, qos=~p, retain=~p, will=~p, clean_session=~p, topic_id_type=~p}", - [Dup, QoS, Retain, Will, CleanStart, TopicType]); -format_flag(_Flag) -> "invalid flag". - + io_lib:format("mqtt_sn_message(type=~s, Var=~w)", + [emqx_sn_frame:message_type(Type), Var]). diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 442aa1db8..265607229 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -94,6 +94,8 @@ idle_timeout :: integer(), enable_qos3 = false :: boolean(), has_pending_pingresp = false :: boolean(), + %% Store all qos0 messages for waiting REGACK + %% Note: QoS1/QoS2 messages will kept inflight queue pending_topic_ids = #{} :: pending_msgs() }). @@ -490,7 +492,7 @@ handle_event({call, From}, Req, _StateName, State) -> {reply, Reply, NState} -> gen_server:reply(From, Reply), {keep_state, NState}; - {stop, Reason, Reply, NState} -> + {shutdown, Reason, Reply, NState} -> State0 = case NState#state.sockstate of running -> send_message(?SN_DISCONNECT_MSG(undefined), NState); @@ -518,10 +520,9 @@ handle_event(info, {datagram, SockPid, Data}, StateName, end; handle_event(info, {deliver, _Topic, Msg}, asleep, - State = #state{channel = Channel, pending_topic_ids = Pendings}) -> + State = #state{channel = Channel}) -> % section 6.14, Support of sleeping clients - ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p", - [Msg, Pendings]), + ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]), Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; @@ -610,7 +611,7 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> {reply, Reply, NChannel} -> {reply, Reply, State#state{channel = NChannel}}; {shutdown, Reason, Reply, NChannel} -> - stop(Reason, Reply, State#state{channel = NChannel}) + {shutdown, Reason, Reply, State#state{channel = NChannel}} end. handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) -> @@ -723,11 +724,19 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) -> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> ?SN_UNSUBACK_MSG(MsgId); -mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) -> - NewPacketId = if QoS =:= ?QOS_0 -> 0; +mqtt2sn( + #mqtt_packet{header = #mqtt_packet_header{ + type = ?PUBLISH, + qos = QoS, + %dup = Dup, + retain = Retain}, + variable = #mqtt_packet_publish{ + topic_name = Topic, + packet_id = PacketId}, + payload = Payload}, #state{clientid = ClientId}) -> + NPacketId = if QoS =:= ?QOS_0 -> 0; true -> PacketId end, - ClientId = emqx_channel:info(clientid, Channel), {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of {predef, PredefTopicId} -> {?SN_PREDEFINED_TOPIC, PredefTopicId}; @@ -737,8 +746,12 @@ mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channe {?SN_SHORT_TOPIC, Topic} end, - Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType}, - ?SN_PUBLISH_MSG(Flags, TopicContent, NewPacketId, Payload); + Flags = #mqtt_sn_flags{ + %dup = Dup, + qos = QoS, + retain = Retain, + topic_id_type = TopicIdType}, + ?SN_PUBLISH_MSG(Flags, TopicContent, NPacketId, Payload); mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)-> % if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...) @@ -766,9 +779,10 @@ send_connack(State) -> send_message(Msg = #mqtt_sn_message{type = Type}, State = #state{sockpid = SockPid, peername = Peername}) -> - ?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]), + ?LOG(info, "SEND ~s~n", [emqx_sn_frame:format(Msg)]), inc_outgoing_stats(Type), Data = emqx_sn_frame:serialize(Msg), + ?LOG(debug, "SEND ~0p", [Data]), ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)), SockPid ! {datagram, Peername, Data}, State. @@ -793,13 +807,6 @@ stop(Reason, State) -> maybe_send_will_msg(Reason, State), {stop, {shutdown, Reason}, State}. -stop({shutdown, Reason}, Reply, State) -> - stop(Reason, Reply, State); -stop(Reason, Reply, State) -> - ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]), - maybe_send_will_msg(Reason, State), - {stop, {shutdown, Reason}, Reply, State}. - maybe_send_will_msg(normal, _State) -> ok; maybe_send_will_msg(_Reason, State) -> @@ -825,6 +832,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> %% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message %% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange %% before it could start a new level 1 or 2 transaction. + %% + %% FIXME: But we should have a re-try timer to re-send the inflight + %% qos1/qos2 message OnlyOneInflight = #{'Receive-Maximum' => 1}, ConnPkt = #mqtt_packet_connect{clientid = ClientId, clean_start = CleanStart, @@ -973,11 +983,16 @@ do_puback(TopicId, MsgId, ReturnCode, StateName, case emqx_sn_registry:lookup_topic(ClientId, TopicId) of undefined -> {keep_state, State}; TopicName -> - %%notice that this TopicName maybe normal or predefined, - %% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels - {keep_state, send_register(TopicName, TopicId, MsgId, State)} + %% notice that this TopicName maybe normal or predefined, + %% involving the predefined topic name in register to + %% enhance the gateway's robustness even inconsistent + %% with MQTT-SN channel + {keep_state, + send_register(TopicName, TopicId, MsgId, State)} end; _ -> + %% XXX: We need to handle others error code + %% 'Rejection: congestion' ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]), {keep_state, State} end. @@ -1050,7 +1065,7 @@ handle_incoming(Packet, _StName, State) -> channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) -> _ = inc_incoming_stats(Type), ok = emqx_metrics:inc_recv(Packet), - ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), + ?LOG(debug, "Transed-RECV ~s", [emqx_packet:format(Packet)]), emqx_channel:handle_in(Packet, Channel). handle_outgoing(Packets, State) when is_list(Packets) -> @@ -1064,7 +1079,9 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _), ClientId = emqx_channel:info(clientid, Channel), TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of - true -> register_and_notify_client(PubPkt, State); + true -> + %% TODO: only one REGISTER inflight if qos=0?? + register_and_notify_client(PubPkt, State); false -> send_message(mqtt2sn(PubPkt, State), State) end; @@ -1077,13 +1094,40 @@ cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) -> Msgs = maps:get(pending_topic_ids, Pendings, []), Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}. -replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State0) -> - ?LOG(debug, "replay non-registered publish message for topic-id: ~p, pendings: ~0p", - [TopicId, Pendings]), +replay_no_reg_pending_publishes(TopicId, + State0 = #state{ + pending_topic_ids = Pendings}) -> + ?LOG(debug, "replay non-registered qos0 publish message for " + "topic-id: ~p, pendings: ~0p", [TopicId, Pendings]), State = lists:foldl(fun(Msg, State1) -> send_message(Msg, State1) end, State0, maps:get(TopicId, Pendings, [])), - State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}. + + NState = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}, + case replay_inflight_messages(TopicId, State#state.channel) of + [] -> ok; + Outgoings -> + ?LOG(debug, "replay non-registered qos1/qos2 publish message " + "for topic-id: ~0p, messages: ~0p", + [TopicId, Outgoings]), + handle_outgoing(Outgoings, NState) + end. + +replay_inflight_messages(TopicId, Channel) -> + Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)), + + case emqx_inflight:to_list(Inflight) of + [] -> []; + [{PktId, {Msg, _Ts}}] -> %% Fixed inflight size 1 + ClientId = emqx_channel:info(clientid, Channel), + ReplayTopic = emqx_sn_registry:lookup_topic(ClientId, TopicId), + case ReplayTopic =:= emqx_message:topic(Msg) of + false -> []; + true -> + NMsg = emqx_message:set_flag(dup, true, Msg), + [emqx_message:to_packet(PktId, NMsg)] + end + end. register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, State = #state{pending_topic_ids = Pendings, channel = Channel}) -> @@ -1091,10 +1135,17 @@ register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, ClientId = emqx_channel:info(clientid, Channel), TopicId = emqx_sn_registry:register_topic(ClientId, TopicName), - ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " - "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), - NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), - send_register(TopicName, TopicId, MsgId, State#state{pending_topic_ids = NewPendings}). + ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, " + "QoS=~p,Retain=~p, MsgId=~p", + [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), + NPendings = case QoS == ?QOS_0 of + true -> + cache_no_reg_publish_message( + Pendings, TopicId, PubPkt, State); + _ -> Pendings + end, + send_register(TopicName, TopicId, MsgId, + State#state{pending_topic_ids = NPendings}). message_id(undefined) -> rand:uniform(16#FFFF); diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 27215cd4f..e78ad5938 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -943,6 +943,151 @@ t_publish_qos2_case03(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). +t_delivery_qos1_register_invalid_topic_id(_) -> + Dup = 0, + QoS = 1, + Retain = 0, + Will = 0, + CleanSession = 0, + MsgId = 1, + TopicId = ?MAX_PRED_TOPIC_ID + 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, + ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + Payload = <<"test-registration-inconsistent">>, + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"ab">>, Payload)), + + ?assertEqual( + <<(7 + byte_size(Payload)), ?SN_PUBLISH, + Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, MsgId:16, Payload/binary>>, receive_response(Socket)), + %% acked with ?SN_RC_INVALID_TOPIC_ID to + send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), + ?assertEqual( + {TopicId, MsgId}, + check_register_msg_on_udp(<<"ab">>, receive_response(Socket))), + send_regack_msg(Socket, TopicId, MsgId + 1), + + %% receive the replay message + ?assertEqual( + <<(7 + byte_size(Payload)), ?SN_PUBLISH, + Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, (MsgId):16, Payload/binary>>, receive_response(Socket)), + + send_disconnect_msg(Socket, undefined), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket). + +t_delivery_takeover_and_re_register(_) -> + MsgId = 1, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(Socket)), + + send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1), + <<_, ?SN_SUBACK, 2#00100000, + TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2), + <<_, ?SN_SUBACK, 2#01000000, + TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket), + + _ = emqx:publish( + emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)), + _ = emqx:publish( + emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket), + send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED), + + send_disconnect_msg(Socket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket), + + %% offline messages will be queued into the MQTT-SN session + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), + _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), + + {ok, NSocket} = gen_udp:open(0, [binary]), + send_connect_msg(NSocket, <<"test">>, 0), + ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, + receive_response(NSocket)), + + %% qos1 + + %% received the resume messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdA, RegMsgIdA), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#00100000, + TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED), + + %% qos2 + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket), + %% only one qos1/qos2 inflight + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID), + %% recv register + <<_, ?SN_REGISTER, + TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket), + send_regack_msg(NSocket, TopicIdB, RegMsgIdB), + %% received the replay messages + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket), + send_pubrec_msg(NSocket, MsgIdB1), + <<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket), + send_pubcomp_msg(NSocket, MsgIdB1), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED), + + <<_, ?SN_PUBLISH, 2#01000000, + TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket), + send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED), + + %% no more messages + ?assertEqual(udp_receive_timeout, receive_response(NSocket)), + + send_disconnect_msg(NSocket, undefined), + ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), + gen_udp:close(NSocket). + t_will_case01(_) -> QoS = 1, Duration = 1, @@ -1758,13 +1903,16 @@ send_searchgw_msg(Socket) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, <>). send_connect_msg(Socket, ClientId) -> + send_connect_msg(Socket, ClientId, 1). + +send_connect_msg(Socket, ClientId, CleanSession) when CleanSession == 0; + CleanSession == 1 -> Length = 6 + byte_size(ClientId), MsgType = ?SN_CONNECT, Dup = 0, QoS = 0, Retain = 0, Will = 0, - CleanSession = 1, TopicIdType = 0, ProtocolId = 1, Duration = 10, @@ -1880,9 +2028,12 @@ send_publish_msg_short_topic(Socket, QoS, MsgId, TopicName, Data) -> ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket). send_puback_msg(Socket, TopicId, MsgId) -> + send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED). + +send_puback_msg(Socket, TopicId, MsgId, Rc) -> Length = 7, MsgType = ?SN_PUBACK, - PubAckPacket = <>, + PubAckPacket = <>, ?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket). diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index 6049b88f3..96e1a3203 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -152,7 +152,7 @@ default_conninfo(ConnInfo) -> clean_start => true, clientid => undefined, username => undefined, - conn_props => [], + conn_props => #{}, connected => false, connected_at => undefined, keepalive => undefined, @@ -814,4 +814,3 @@ interval(outgoing_timer, #pstate{heart_beats = HrtBt}) -> emqx_stomp_heartbeat:interval(outgoing, HrtBt); interval(clean_trans_timer, _) -> ?TRANS_TIMEOUT. - diff --git a/apps/emqx_web_hook/etc/emqx_web_hook.conf b/apps/emqx_web_hook/etc/emqx_web_hook.conf index 6bffbc7ef..12934bf8b 100644 --- a/apps/emqx_web_hook/etc/emqx_web_hook.conf +++ b/apps/emqx_web_hook/etc/emqx_web_hook.conf @@ -45,7 +45,7 @@ web.hook.body.encoding_of_payload_field = plain ## If not specified, the server's names returned in server's certificate is validated against ## what's provided `web.hook.url` config's host part. -## Setting to 'disable' will make EMQ X ignore unmatched server names. +## Setting to 'disable' will make EMQX ignore unmatched server names. ## If set with a host name, the server's names returned in server's certificate is validated ## against this value. ## diff --git a/apps/emqx_web_hook/priv/emqx_web_hook.schema b/apps/emqx_web_hook/priv/emqx_web_hook.schema index a80f460c9..2dfd0b081 100644 --- a/apps/emqx_web_hook/priv/emqx_web_hook.schema +++ b/apps/emqx_web_hook/priv/emqx_web_hook.schema @@ -1,5 +1,5 @@ %%-*- mode: erlang -*- -%% EMQ X R3.0 config mapping +%% EMQX config mapping {mapping, "web.hook.url", "emqx_web_hook.url", [ {datatype, string} diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src index acabcd954..64cd28784 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.app.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.app.src @@ -1,6 +1,6 @@ {application, emqx_web_hook, [{description, "EMQ X WebHook Plugin"}, - {vsn, "4.3.9"}, % strict semver, bump manually! + {vsn, "4.3.10"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_web_hook_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index 7b3be1b13..d0eb585f7 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -1,6 +1,6 @@ %% -*- mode: erlang -*- {VSN, - [{<<"4\\.3\\.[0-2]">>, + [{<<"4\\.3\\.[0-2]$">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -12,8 +12,11 @@ {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, + {"4.3.9", + [ %% nothing so far + ]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[0-2]">>, + [{<<"4\\.3\\.[0-2]$">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -25,4 +28,7 @@ {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, + {"4.3.9", + [ %% nothing so far + ]}, {<<".*">>,[]}]}. diff --git a/bin/install_upgrade.escript b/bin/install_upgrade.escript index f88af106d..0bd4bdee1 100755 --- a/bin/install_upgrade.escript +++ b/bin/install_upgrade.escript @@ -6,6 +6,8 @@ -define(TIMEOUT, 300000). -define(INFO(Fmt,Args), io:format(Fmt++"~n",Args)). +-mode(compile). + main([Command0, DistInfoStr | CommandArgs]) -> %% convert the distribution info arguments string to an erlang term {ok, Tokens, _} = erl_scan:string(DistInfoStr ++ "."), @@ -210,15 +212,24 @@ find_and_link_release_package(Version, RelName) -> ok = filelib:ensure_dir(filename:join([filename:dirname(ReleaseLink), "dummy"])), %% create the symlink pointing to the full path name of the %% release package we found - case file:make_symlink(filename:absname(Filename), ReleaseLink) of - ok -> - ok; - {error, eperm} -> % windows! - {ok,_} = file:copy(filename:absname(Filename), ReleaseLink) - end, + make_symlink_or_copy(filename:absname(Filename), ReleaseLink), {Filename, ReleaseHandlerPackageLink} end. +make_symlink_or_copy(Filename, ReleaseLink) -> + case file:make_symlink(Filename, ReleaseLink) of + ok -> ok; + {error, eexist} -> + ?INFO("symlink ~p already exists, recreate it", [ReleaseLink]), + ok = file:delete(ReleaseLink), + make_symlink_or_copy(Filename, ReleaseLink); + {error, Reason} when Reason =:= eperm; Reason =:= enotsup -> + {ok, _} = file:copy(Filename, ReleaseLink); + {error, Reason} -> + ?INFO("create symlink ~p failed", [ReleaseLink]), + error({Reason, ReleaseLink}) + end. + unpack_zipballs(RelNameStr, Version) -> {ok, Cwd} = file:get_cwd(), GzFile = filename:absname(filename:join(["releases", RelNameStr ++ "-" ++ Version ++ ".tar.gz"])), diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index 6d965c2f1..6f48d3432 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -49,7 +49,8 @@ RUN chgrp -Rf emqx /opt/emqx && chmod -Rf g+w /opt/emqx \ USER emqx -VOLUME ["/opt/emqx/log", "/opt/emqx/data", "/opt/emqx/etc"] +## NOTE: /opt/emqx/etc is removed from the VOLUME list since 4.3.13 +VOLUME ["/opt/emqx/log", "/opt/emqx/data"] # emqx will occupy these port: # - 1883 port for MQTT diff --git a/etc/emqx.conf b/etc/emqx.conf index 19fdbc656..8b8c0aca4 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1,4 +1,4 @@ -## EMQ X Configuration 4.3 +## EMQX Configuration 4.3 ## NOTE: Do not change format of CONFIG_SECTION_{BGN,END} comments! @@ -208,10 +208,10 @@ node.data_dir = {{ platform_data_dir }} ## the heartbeat pings. ## ## NOTE: When managed by systemd (or other supervision tools like systemd), -## heart will probably only cause EMQ X to stop, but restart or not will +## heart will probably only cause EMQX to stop, but restart or not will ## depend on systemd's restart strategy. ## NOTE: When running in docker, the container will die as soon as the the -## heart process kills EMQ X, but restart or not will depend on container +## heart process kills EMQX, but restart or not will depend on container ## supervision strategy, such as k8s restartPolicy. ## ## Value: on @@ -1123,7 +1123,7 @@ listener.tcp.external.zone = external ## Example: allow 192.168.0.0/24 listener.tcp.external.access.1 = allow all -## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed +## Enable the Proxy Protocol V1/2 if the EMQX cluster is deployed ## behind HAProxy or Nginx. ## ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ @@ -1131,7 +1131,7 @@ listener.tcp.external.access.1 = allow all ## Value: on | off ## listener.tcp.external.proxy_protocol = on -## Sets the timeout for proxy protocol. EMQ X will close the TCP connection +## Sets the timeout for proxy protocol. EMQX will close the TCP connection ## if no proxy protocol packet recevied within the timeout. ## ## Value: Duration @@ -1634,13 +1634,13 @@ listener.ws.external.access.1 = allow all ## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 ## listener.ws.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 -## Specify which HTTP header for real source IP if the EMQ X cluster is +## Specify which HTTP header for real source IP if the EMQX cluster is ## deployed behind NGINX or HAProxy. ## ## Default: X-Forwarded-For ## listener.ws.external.proxy_address_header = X-Forwarded-For -## Specify which HTTP header for real source port if the EMQ X cluster is +## Specify which HTTP header for real source port if the EMQX cluster is ## deployed behind NGINX or HAProxy. ## ## Default: X-Forwarded-Port @@ -1903,13 +1903,13 @@ listener.wss.external.access.1 = allow all ## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 ## listener.wss.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5 -## Specify which HTTP header for real source IP if the EMQ X cluster is +## Specify which HTTP header for real source IP if the EMQX cluster is ## deployed behind NGINX or HAProxy. ## ## Default: X-Forwarded-For ## listener.wss.external.proxy_address_header = X-Forwarded-For -## Specify which HTTP header for real source port if the EMQ X cluster is +## Specify which HTTP header for real source port if the EMQX cluster is ## deployed behind NGINX or HAProxy. ## ## Default: X-Forwarded-Port diff --git a/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf index 933b9885d..f67b88c17 100644 --- a/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf +++ b/lib-ce/emqx_dashboard/etc/emqx_dashboard.conf @@ -1,5 +1,5 @@ ##-------------------------------------------------------------------- -## EMQ X Dashboard +## EMQX Dashboard ##-------------------------------------------------------------------- ## Default user's login name. diff --git a/priv/emqx.schema b/priv/emqx.schema index b983fb864..8dd6c2f2b 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1,5 +1,5 @@ %%-*- mode: erlang -*- -%% EMQ X R4.0 config mapping +%% EMQX Config Mapping %%-------------------------------------------------------------------- %% Cluster diff --git a/scripts/relup-base-packages.sh b/scripts/relup-base-packages.sh index 070f3926a..901460a24 100755 --- a/scripts/relup-base-packages.sh +++ b/scripts/relup-base-packages.sh @@ -45,6 +45,10 @@ esac case "$SYSTEM" in + windows*) + echo "WARNING: skipped downloading relup base for windows because we do not support relup for windows yet." + exit 0 + ;; macos*) SHASUM="shasum -a 256" ;; diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 965de1efe..d3d5fd11c 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -32,8 +32,8 @@ Options: --make-command A command used to assemble the release --release-dir Release directory --src-dirs Directories where source code is found. Defaults to '{src,apps,lib-*}/**/' - --binary-rel-url Binary release URL pattern. %TAG% variable is substituted with the release tag. - E.g. \"https://github.com/emqx/emqx/releases/download/v%TAG%/emqx-centos7-%TAG%-amd64.zip\" + --binary-rel-url Binary release URL pattern. + E.g. https://www.emqx.com/en/downloads/broker/v4.4.1/emqx-4.4.1-otp24.1.5-3-el7-amd64.zip ". -record(app, @@ -171,9 +171,10 @@ download_prev_release(Tag, #{binary_rel_url := {ok, URL0}, clone_url := Repo}) - BaseDir = "/tmp/emqx-baseline-bin/", Dir = filename:basename(Repo, ".git") ++ [$-|Tag], Filename = filename:join(BaseDir, Dir), - Script = "mkdir -p ${OUTFILE} && - wget -c -O ${OUTFILE}.zip ${URL} && - unzip -n -d ${OUTFILE} ${OUTFILE}.zip", + Script = "echo \"Download: ${OUTFILE}\" && + mkdir -p ${OUTFILE} && + curl -f -L -o ${OUTFILE}.zip ${URL} && + unzip -q -n -d ${OUTFILE} ${OUTFILE}.zip", Env = [{"TAG", Tag}, {"OUTFILE", Filename}, {"URL", URL}], bash(Script, Env), {ok, Filename}. @@ -208,8 +209,8 @@ find_appup_actions(App, {OldUpgrade0, OldDowngrade0} = find_old_appup_actions(App, PrevVersion), OldUpgrade = ensure_all_patch_versions(App, CurrVersion, OldUpgrade0), OldDowngrade = ensure_all_patch_versions(App, CurrVersion, OldDowngrade0), - Upgrade = merge_update_actions(App, diff_app(App, CurrAppIdx, PrevAppIdx), OldUpgrade), - Downgrade = merge_update_actions(App, diff_app(App, PrevAppIdx, CurrAppIdx), OldDowngrade), + Upgrade = merge_update_actions(App, diff_app(up, App, CurrAppIdx, PrevAppIdx), OldUpgrade), + Downgrade = merge_update_actions(App, diff_app(down, App, PrevAppIdx, CurrAppIdx), OldDowngrade), if OldUpgrade =:= Upgrade andalso OldDowngrade =:= Downgrade -> %% The appup file has been already updated: []; @@ -521,7 +522,7 @@ index_app(AppFile) -> , modules = Modules }}. -diff_app(App, +diff_app(UpOrDown, App, #app{version = NewVersion, modules = NewModules}, #app{version = OldVersion, modules = OldModules}) -> {New, Changed} = @@ -540,13 +541,15 @@ diff_app(App, ), Deleted = maps:keys(maps:without(maps:keys(NewModules), OldModules)), NChanges = length(New) + length(Changed) + length(Deleted), - if NewVersion =:= OldVersion andalso NChanges > 0 -> + case NewVersion =:= OldVersion of + true when NChanges =:= 0 -> + %% no change + ok; + true -> set_invalid(), log("ERROR: Application '~p' contains changes, but its version is not updated~n", [App]); - NewVersion > OldVersion -> - log("INFO: Application '~p' has been updated: ~p -> ~p~n", [App, OldVersion, NewVersion]), - ok; - true -> + false -> + log("INFO: Application '~p' has been updated: ~p --[~p]--> ~p~n", [App, OldVersion, UpOrDown, NewVersion]), ok end, {New, Changed, Deleted}. @@ -607,13 +610,18 @@ locate(ebin_current, App, Suffix) -> locate(src, App, Suffix) -> AppStr = atom_to_list(App), SrcDirs = getopt(src_dirs), - case filelib:wildcard(SrcDirs ++ AppStr ++ Suffix) of + case find_app(SrcDirs ++ AppStr ++ Suffix) of [File] -> {ok, File}; [] -> undefined end. +find_app(Pattern) -> + %% exclude _build dir inside apps + lists:filter(fun(S) -> string:find(S, "/_build/") =:= nomatch end, + filelib:wildcard(Pattern)). + bash(Script) -> bash(Script, []). diff --git a/src/emqx.appup.src b/src/emqx.appup.src index c95a1beaf..b9d4f5a16 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,54 +1,61 @@ %% -*- mode: erlang -*- {VSN, [{"4.4.1", - [ {load_module,emqx_connection,brutal_purge,soft_purge,[]} - , {load_module,emqx_banned,brutal_purge,soft_purge,[]} - , {load_module,emqx_ctl,brutal_purge,soft_purge,[]} - , {load_module,emqx_channel,brutal_purge,soft_purge,[]} - ]}, + [{load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]} - , {load_module,emqx_banned,brutal_purge,soft_purge,[]} - , {load_module,emqx_ctl,brutal_purge,soft_purge,[]} - , {load_module,emqx_channel,brutal_purge,soft_purge,[]} - , {load_module,emqx_connection,brutal_purge,soft_purge,[]} - , {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]} - , {load_module,emqx_metrics,brutal_purge,soft_purge,[]} - , {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}} - , {load_module,emqx_access_control,brutal_purge,soft_purge,[]} - , {load_module,emqx_alarm,brutal_purge,soft_purge,[]} - , {load_module,emqx_session,brutal_purge,soft_purge,[]} - , {load_module,emqx_os_mon,brutal_purge,soft_purge,[]} - , {load_module,emqx,brutal_purge,soft_purge,[]} - , {load_module,emqx_app,brutal_purge,soft_purge,[]} - , {load_module,emqx_message,brutal_purge,soft_purge,[]} - , {load_module,emqx_limiter,brutal_purge,soft_purge,[]} - ]}, - {<<".*">>,[]} - ], + [{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_message,brutal_purge,soft_purge,[]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}], [{"4.4.1", - [ {load_module,emqx_connection,brutal_purge,soft_purge,[]} - , {load_module,emqx_banned,brutal_purge,soft_purge,[]} - , {load_module,emqx_ctl,brutal_purge,soft_purge,[]} - , {load_module,emqx_channel,brutal_purge,soft_purge,[]} - ]}, + [{load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.4.0", - [ {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]} - , {load_module,emqx_banned,brutal_purge,soft_purge,[]} - , {load_module,emqx_ctl,brutal_purge,soft_purge,[]} - , {load_module,emqx_channel,brutal_purge,soft_purge,[]} - , {load_module,emqx_connection,brutal_purge,soft_purge,[]} - , {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]} - , {load_module,emqx_metrics,brutal_purge,soft_purge,[]} - , {load_module,emqx_access_control,brutal_purge,soft_purge,[]} - , {load_module,emqx_alarm,brutal_purge,soft_purge,[]} - , {load_module,emqx_session,brutal_purge,soft_purge,[]} - , {load_module,emqx_os_mon,brutal_purge,soft_purge,[]} - , {load_module,emqx,brutal_purge,soft_purge,[]} - , {load_module,emqx_app,brutal_purge,soft_purge,[]} - , {load_module,emqx_message,brutal_purge,soft_purge,[]} - , {load_module,emqx_limiter,brutal_purge,soft_purge,[]} - ]}, - {<<".*">>,[]} - ] -}. + [{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, + {load_module,emqx_cm,brutal_purge,soft_purge,[]}, + {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_vm_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx,brutal_purge,soft_purge,[]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_message,brutal_purge,soft_purge,[]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}]}. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 1c6d4080a..4aa953caa 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -235,18 +235,25 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), ResumeStart = fun(_) -> - case takeover_session(ClientId) of - {ok, ConnMod, ChanPid, Session} -> - ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session, - present => true, - pendings => Pendings}}; - {error, not_found} -> + CreateSess = + fun() -> Session = create_session(ClientInfo, ConnInfo), register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} + end, + case takeover_session(ClientId) of + {ok, ConnMod, ChanPid, Session} -> + ok = emqx_session:resume(ClientInfo, Session), + case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of + {ok, Pendings} -> + register_channel(ClientId, Self, ConnInfo), + {ok, #{session => Session, + present => true, + pendings => Pendings}}; + {error, _} -> + CreateSess() + end; + {error, _Reason} -> CreateSess() end end, emqx_cm_locker:trans(ClientId, ResumeStart). @@ -280,9 +287,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - %% TODO: if takeover times out, maybe kill the old? - Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), - {ok, ConnMod, ChanPid, Session} + case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of + {ok, Session} -> + {ok, ConnMod, ChanPid, Session}; + {error, Reason} -> + {error, Reason} + end end; takeover_session(ClientId, ChanPid) -> rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). @@ -295,42 +305,63 @@ discard_session(ClientId) when is_binary(ClientId) -> ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids) end. -%% @private Kick a local stale session to force it step down. -%% If failed to kick (e.g. timeout) force a kill. +%% @private call a local stale session to execute an Action. +%% If failed to response (e.g. timeout) force a kill. %% Keeping the stale pid around, or returning error or raise an exception %% benefits nobody. --spec kick_or_kill(kick | discard, module(), pid()) -> ok. -kick_or_kill(Action, ConnMod, Pid) -> - try +-spec request_stepdown(Action, module(), pid()) + -> ok + | {ok, emqx_session:session() | list(emqx_type:deliver())} + | {error, term()} + when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}. +request_stepdown(Action, ConnMod, Pid) -> + Timeout = + case Action == kick orelse Action == discard of + true -> ?T_KICK; + _ -> ?T_TAKEOVER + end, + Return = %% this is essentailly a gen_server:call implemented in emqx_connection %% and emqx_ws_connection. %% the handle_call is implemented in emqx_channel - ok = apply(ConnMod, call, [Pid, Action, ?T_KICK]) - catch - _ : noproc -> % emqx_ws_connection: call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); - _ : {noproc, _} -> % emqx_connection: gen_server:call - ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}); - _ : {shutdown, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); - _ : {{shutdown, _}, _} -> - ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}); - _ : {timeout, {gen_server, call, _}} -> - ?tp(warning, "session_kick_timeout", - #{pid => Pid, - action => Action, - stale_channel => stale_channel_info(Pid) - }), - ok = force_kill(Pid); - _ : Error : St -> - ?tp(error, "session_kick_exception", - #{pid => Pid, - action => Action, - reason => Error, - stacktrace => St, - stale_channel => stale_channel_info(Pid) - }), - ok = force_kill(Pid) + try apply(ConnMod, call, [Pid, Action, Timeout]) of + ok -> ok; + Reply -> {ok, Reply} + catch + _ : noproc -> % emqx_ws_connection: call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}), + {error, noproc}; + _ : Reason = {shutdown, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, Reason}; + _ : Reason = {{shutdown, _}, _} -> + ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}), + {error, Reason}; + _ : {timeout, {gen_server, call, _}} -> + ?tp(warning, "session_stepdown_request_timeout", + #{pid => Pid, + action => Action, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid), + {error, timeout}; + _ : Error : St -> + ?tp(error, "session_stepdown_request_exception", + #{pid => Pid, + action => Action, + reason => Error, + stacktrace => St, + stale_channel => stale_channel_info(Pid) + }), + ok = force_kill(Pid), + {error, Error} + end, + case Action == kick orelse Action == discard of + true -> ok; + _ -> Return end. force_kill(Pid) -> @@ -353,7 +384,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> - ok = kick_or_kill(Action, ConnMod, ChanPid) + ok = request_stepdown(Action, ConnMod, ChanPid) end; kick_session(Action, ClientId, ChanPid) -> %% call remote node on the old APIs because we do not know if they have upgraded diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 55c324d57..e7565947c 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -190,7 +190,7 @@ ensure_system_memory_alarm(HW) -> case erlang:whereis(memsup) of undefined -> ok; _Pid -> - {Allocated, Total, _Worst} = memsup:get_memory_data(), + {Total, Allocated, _Worst} = memsup:get_memory_data(), case Total =/= 0 andalso Allocated/Total * 100 > HW of true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW}); false -> ok diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index 1f9ba7af6..cfa366027 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -50,9 +50,11 @@ monitor(Pid, PMon) -> ?MODULE:monitor(Pid, undefined, PMon). -spec(monitor(pid(), term(), pmon()) -> pmon()). -monitor(Pid, Val, PMon = ?PMON(Map)) -> +monitor(Pid, Val, ?PMON(Map)) -> case maps:is_key(Pid, Map) of - true -> PMon; + true -> + {Ref, _Val} = maps:get(Pid, Map), + ?PMON(maps:put(Pid, {Ref, Val}, Map)); false -> Ref = erlang:monitor(process, Pid), ?PMON(maps:put(Pid, {Ref, Val}, Map)) diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 710895bcd..4f07f0b18 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -119,7 +119,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> suppress({long_schedule, Port}, fun() -> WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), - ?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p", [WarnMsg, portinfo(Port)]), safe_publish(long_schedule, WarnMsg) end, State); @@ -135,7 +135,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) -> suppress({busy_port, Port}, fun() -> WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), portinfo(Port)]), safe_publish(busy_port, WarnMsg) end, State); @@ -143,7 +143,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> suppress({busy_dist_port, Port}, fun() -> WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), portinfo(Port)]), safe_publish(busy_dist_port, WarnMsg) end, State); @@ -200,3 +200,9 @@ safe_publish(Event, WarnMsg) -> sysmon_msg(Topic, Payload) -> Msg = emqx_message:make(?SYSMON, Topic, Payload), emqx_message:set_flag(sys, Msg). + +portinfo(Port) -> + case is_port(Port) andalso erlang:port_info(Port) of + L when is_list(L) -> L; + _ -> [] + end. diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index a46c046ca..4fcaaf473 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -187,57 +187,89 @@ t_open_session_race_condition(_) -> ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). -t_kick_session_discard_normal(_) -> - test_kick_session(discard, normal). +t_stepdown_sessiondiscard_normal(_) -> + test_stepdown_session(discard, normal). -t_kick_session_discard_shutdown(_) -> - test_kick_session(discard, shutdown). +t_stepdown_sessiondiscard_shutdown(_) -> + test_stepdown_session(discard, shutdown). -t_kick_session_discard_shutdown_with_reason(_) -> - test_kick_session(discard, {shutdown, discard}). +t_stepdown_sessiondiscard_shutdown_with_reason(_) -> + test_stepdown_session(discard, {shutdown, discard}). -t_kick_session_discard_timeout(_) -> - test_kick_session(discard, timeout). +t_stepdown_sessiondiscard_timeout(_) -> + test_stepdown_session(discard, timeout). -t_kick_session_discard_noproc(_) -> - test_kick_session(discard, noproc). +t_stepdown_sessiondiscard_noproc(_) -> + test_stepdown_session(discard, noproc). -t_kick_session_kick_normal(_) -> - test_kick_session(discard, normal). +t_stepdown_sessionkick_normal(_) -> + test_stepdown_session(kick, normal). -t_kick_session_kick_shutdown(_) -> - test_kick_session(discard, shutdown). +t_stepdown_sessionkick_shutdown(_) -> + test_stepdown_session(kick, shutdown). -t_kick_session_kick_shutdown_with_reason(_) -> - test_kick_session(discard, {shutdown, discard}). +t_stepdown_sessionkick_shutdown_with_reason(_) -> + test_stepdown_session(kick, {shutdown, discard}). -t_kick_session_kick_timeout(_) -> - test_kick_session(discard, timeout). +t_stepdown_sessionkick_timeout(_) -> + test_stepdown_session(kick, timeout). -t_kick_session_kick_noproc(_) -> - test_kick_session(discard, noproc). +t_stepdown_sessionkick_noproc(_) -> + test_stepdown_session(discard, noproc). -test_kick_session(Action, Reason) -> +t_stepdown_sessiontakeover_begin_normal(_) -> + test_stepdown_session({takeover, 'begin'}, normal). + +t_stepdown_sessiontakeover_begin_shutdown(_) -> + test_stepdown_session({takeover, 'begin'}, shutdown). + +t_stepdown_sessiontakeover_begin_shutdown_with_reason(_) -> + test_stepdown_session({takeover, 'begin'}, {shutdown, discard}). + +t_stepdown_sessiontakeover_begin_timeout(_) -> + test_stepdown_session({takeover, 'begin'}, timeout). + +t_stepdown_sessiontakeover_begin_noproc(_) -> + test_stepdown_session({takeover, 'begin'}, noproc). + +t_stepdown_sessiontakeover_end_normal(_) -> + test_stepdown_session({takeover, 'end'}, normal). + +t_stepdown_sessiontakeover_end_shutdown(_) -> + test_stepdown_session({takeover, 'end'}, shutdown). + +t_stepdown_sessiontakeover_end_shutdown_with_reason(_) -> + test_stepdown_session({takeover, 'end'}, {shutdown, discard}). + +t_stepdown_sessiontakeover_end_timeout(_) -> + test_stepdown_session({takeover, 'end'}, timeout). + +t_stepdown_sessiontakeover_end_noproc(_) -> + test_stepdown_session({takeover, 'end'}, noproc). + +test_stepdown_session(Action, Reason) -> ClientId = rand_client_id(), #{conninfo := ConnInfo} = ?ChanInfo, FakeSessionFun = fun Loop() -> - receive - {'$gen_call', From, A} when A =:= kick orelse - A =:= discard -> - case Reason of - normal -> - gen_server:reply(From, ok); - timeout -> - %% no response to the call - Loop(); - _ -> - exit(Reason) - end; - Msg -> - ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), - Loop() - end + receive + {'$gen_call', From, A} when A =:= kick orelse + A =:= discard orelse + A =:= {takeover, 'begin'} orelse + A =:= {takeover, 'end'} -> + case Reason of + normal when A =:= kick orelse A =:= discard -> + gen_server:reply(From, ok); + timeout -> + %% no response to the call + Loop(); + _ -> + exit(Reason) + end; + Msg -> + ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]), + Loop() + end end, {Pid1, _} = spawn_monitor(FakeSessionFun), {Pid2, _} = spawn_monitor(FakeSessionFun), @@ -249,10 +281,11 @@ test_kick_session(Action, Reason) -> noproc -> exit(Pid1, kill), exit(Pid2, kill); _ -> ok end, - ok = case Action of - kick -> emqx_cm:kick_session(ClientId); - discard -> emqx_cm:discard_session(ClientId) - end, + _ = case Action of + kick -> emqx_cm:kick_session(ClientId); + discard -> emqx_cm:discard_session(ClientId); + {takeover, _} -> emqx_cm:takeover_session(ClientId) + end, case Reason =:= timeout orelse Reason =:= noproc of true -> ?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), diff --git a/test/emqx_sys_mon_SUITE.erl b/test/emqx_sys_mon_SUITE.erl index 63e7c376a..e177e9527 100644 --- a/test/emqx_sys_mon_SUITE.erl +++ b/test/emqx_sys_mon_SUITE.erl @@ -33,6 +33,11 @@ {self(), busy_port, concat_str("busy_port warning: suspid = ~p, port = ~p", self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")}, + %% for the case when the port is missing, for some + %% reason. + {self(), busy_port, + concat_str("busy_port warning: suspid = ~p, port = ~p", + self(), []), []}, {self(), busy_dist_port, concat_str("busy_dist_port warning: suspid = ~p, port = ~p", self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")}, @@ -122,6 +127,16 @@ t_sys_mon(_Config) -> validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) end, ?INPUTINFO). +%% Existing port, but closed. +t_sys_mon_dead_port(_Config) -> + process_flag(trap_exit, true), + Port = dead_port(), + {PidOrPort, SysMonName, ValidateInfo, InfoOrPort} = + {self(), busy_port, + concat_str("busy_port warning: suspid = ~p, port = ~p", + self(), Port), Port}, + validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort). + t_sys_mon2(_Config) -> ?SYSMON ! {timeout, ignored, reset}, ?SYSMON ! {ignored}, @@ -155,3 +170,8 @@ some_function(Parent, _Arg2) -> stop -> ok end. + +dead_port() -> + Port = erlang:open_port({spawn, "ls"}, []), + exit(Port, kill), + Port.