From fefadbcd177b0a3f759c721580255050e36a2b38 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 9 Nov 2021 22:05:09 +0800 Subject: [PATCH 01/25] fix(emqx_stomp): fix hot-upgrade stopping listener failed When the upgrade is executed, all envs of plugins are cleared, which causes the listener of stomp to stop failing. This is only a temporary modification to ensure that the upgrade can be executed successfully. following fixes: https://github.com/emqx/emqx/pull/6105 --- apps/emqx_stomp/src/emqx_stomp.appup.src | 3 ++- apps/emqx_stomp/src/emqx_stomp.erl | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index bf4603e52..5ce1f8bd3 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -2,7 +2,8 @@ {VSN, [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{restart_application,emqx_stomp}]}, + [{restart_application,emqx_stomp}, + {apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]}, {<<".*">>,[]}], [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", diff --git a/apps/emqx_stomp/src/emqx_stomp.erl b/apps/emqx_stomp/src/emqx_stomp.erl index 9eafe3cf7..b8a66009c 100644 --- a/apps/emqx_stomp/src/emqx_stomp.erl +++ b/apps/emqx_stomp/src/emqx_stomp.erl @@ -33,6 +33,8 @@ , stop_listener/3 ]). +-export([force_clear_after_app_stoped/0]). + -export([init/1]). -define(APP, ?MODULE). @@ -52,6 +54,18 @@ start(_StartType, _StartArgs) -> stop(_State) -> stop_listeners(). +force_clear_after_app_stoped() -> + lists:foreach(fun({Name = {ProtoName, _}, _}) -> + case is_stomp_listener(ProtoName) of + true -> esockd:close(Name); + _ -> ok + end + end, esockd:listeners()). + +is_stomp_listener('stomp:tcp') -> true; +is_stomp_listener('stomp:ssl') -> true; +is_stomp_listener(_) -> false. + %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- From 133609a0400131f94b47f06bfe55e40d40041099 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 9 Nov 2021 20:51:07 +0800 Subject: [PATCH 02/25] fix(relup): configs for plugins are missing after relup --- bin/install_upgrade.escript | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/bin/install_upgrade.escript b/bin/install_upgrade.escript index 0658bdef2..97548cba8 100755 --- a/bin/install_upgrade.escript +++ b/bin/install_upgrade.escript @@ -248,15 +248,20 @@ parse_version(V) when is_list(V) -> hd(string:tokens(V,"/")). check_and_install(TargetNode, Vsn) -> - {ok, [[CurrAppConf]]} = rpc:call(TargetNode, init, get_argument, [config], ?TIMEOUT), + %% Backup the vm.args. VM args should be unchanged during hot upgrade + %% but we still backup it here {ok, [[CurrVmArgs]]} = rpc:call(TargetNode, init, get_argument, [vm_args], ?TIMEOUT), - case filename:extension(CurrAppConf) of - ".config" -> - {ok, _} = file:copy(CurrAppConf, filename:join(["releases", Vsn, "sys.config"])); - _ -> - {ok, _} = file:copy(CurrAppConf++".config", filename:join(["releases", Vsn, "sys.config"])) - end, {ok, _} = file:copy(CurrVmArgs, filename:join(["releases", Vsn, "vm.args"])), + %% Backup the sys.config, this will be used when we check and install release + %% NOTE: We cannot backup the old sys.config directly, because the + %% configs for plugins are only in app-envs, not in the old sys.config + Configs0 = + [{AppName, rpc:call(TargetNode, application, get_all_env, [AppName], ?TIMEOUT)} + || {AppName, _, _} <- rpc:call(TargetNode, application, which_applications, [], ?TIMEOUT)], + Configs1 = [{AppName, Conf} || {AppName, Conf} <- Configs0, Conf =/= []], + ok = file:write_file(filename:join(["releases", Vsn, "sys.config"]), io_lib:format("~p.", [Configs1])), + + %% check and install release case rpc:call(TargetNode, release_handler, check_install_release, [Vsn], ?TIMEOUT) of {ok, _OtherVsn, _Desc} -> From fa34d8353e521566caeb8f9aea383a6495365d78 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 10 Nov 2021 15:40:46 +0800 Subject: [PATCH 03/25] fix(test): flaky mqtt expiry test case. (#6112) --- test/emqx_mqtt_SUITE.erl | 112 +++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 39 deletions(-) diff --git a/test/emqx_mqtt_SUITE.erl b/test/emqx_mqtt_SUITE.erl index ae7e7382c..3a77134b8 100644 --- a/test/emqx_mqtt_SUITE.erl +++ b/test/emqx_mqtt_SUITE.erl @@ -62,79 +62,104 @@ t_conn_stats(_) -> t_tcp_sock_passive(_) -> with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []). -t_message_expiry_interval_1(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). +t_message_expiry_interval(_) -> + {CPublish, CControl} = message_expiry_interval_init(), + [message_expiry_interval_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]], + emqtt:stop(CPublish), + emqtt:stop(CControl). -t_message_expiry_interval_2(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). +t_message_not_expiry_interval(_) -> + {CPublish, CControl} = message_expiry_interval_init(), + [message_expiry_interval_not_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]], + emqtt:stop(CPublish), + emqtt:stop(CControl). message_expiry_interval_init() -> - {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-a">>}, + {ok, CPublish} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Publish">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientA), - {ok, _} = emqtt:connect(ClientB), - %% subscribe and disconnect client-b - emqtt:subscribe(ClientB, <<"t/a">>, 1), - emqtt:stop(ClientB), - ClientA. + {ok, CControl} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Control">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(CPublish), + {ok, _} = emqtt:connect(CVerify), + {ok, _} = emqtt:connect(CControl), + %% subscribe and disconnect Client-verify + emqtt:subscribe(CControl, <<"t/a">>, 1), + emqtt:subscribe(CVerify, <<"t/a">>, 1), + emqtt:stop(CVerify), + {CPublish, CControl}. -message_expiry_interval_exipred(ClientA, QoS) -> +message_expiry_interval_exipred(CPublish, CControl, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a and waiting for the message expired - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), - ct:sleep(2000), + emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, + <<"this will be purged in 1s">>, [{qos, QoS}]), + %% CControl make sure publish already store in broker. + receive + {publish,#{client_pid := CControl, topic := <<"t/a">>}} -> + ok + after 1000 -> + ct:fail(should_receive_publish) + end, + ct:sleep(1100), - %% resume the session for client-b - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + %% resume the session for Client-Verify + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + {ok, _} = emqtt:connect(CVerify), - %% verify client-b could not receive the publish message + %% verify Client-Verify could not receive the publish message receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> + {publish,#{client_pid := CVerify, topic := <<"t/a">>}} -> ct:fail(should_have_expired) after 300 -> ok end, - emqtt:stop(ClientB1). + emqtt:stop(CVerify). -message_expiry_interval_not_exipred(ClientA, QoS) -> +message_expiry_interval_not_exipred(CPublish, CControl, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, + <<"this will be purged in 20s">>, [{qos, QoS}]), - %% wait for 1s and then resume the session for client-b, the message should not expires + %% CControl make sure publish already store in broker. + receive + {publish,#{client_pid := CControl, topic := <<"t/a">>}} -> + ok + after 1000 -> + ct:fail(should_receive_publish) + end, + + %% wait for 1.2s and then resume the session for Client-Verify, the message should not expires %% as Message-Expiry-Interval = 20s - ct:sleep(1000), - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + ct:sleep(1200), + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + {ok, _} = emqtt:connect(CVerify), - %% verify client-b could receive the publish message and the Message-Expiry-Interval is set + %% verify Client-Verify could receive the publish message and the Message-Expiry-Interval is set receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>, + {publish,#{client_pid := CVerify, topic := <<"t/a">>, properties := #{'Message-Expiry-Interval' := MsgExpItvl}}} - when MsgExpItvl < 20 -> ok; + when MsgExpItvl =< 20 -> ok; {publish, _} = Msg -> ct:fail({incorrect_publish, Msg}) after 300 -> ct:fail(no_publish_received) end, - emqtt:stop(ClientB1). + emqtt:stop(CVerify). with_client(TestFun, _Options) -> ClientId = <<"t_conn">>, @@ -156,6 +181,15 @@ t_async_set_keepalive('end', _Config) -> ok. t_async_set_keepalive(_) -> + case os:type() of + {unix, darwin} -> + %% Mac OSX don't support the feature + ok; + _ -> + do_async_set_keepalive() + end. + +do_async_set_keepalive() -> ClientID = <<"client-tcp-keepalive">>, {ok, Client} = emqtt:start_link([{host, "localhost"}, {proto_ver,v5}, From 98136ff119fe60420657ff7a56112b03edb545a5 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 10 Nov 2021 14:41:37 +0100 Subject: [PATCH 04/25] chore(ehttpc): pin 0.1.12 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 4994d54ae..77df3dd6d 100644 --- a/rebar.config +++ b/rebar.config @@ -37,7 +37,7 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.10"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} From 8f07f26744d1ee1fa546dcadcc9d6d7ba13366d8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Nov 2021 11:28:17 +0800 Subject: [PATCH 05/25] fix: ensure starting listeners before plugins --- src/emqx_app.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 8d0ff11a8..be6f45e25 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -42,10 +42,13 @@ start(_Type, _Args) -> ekka:start(), {ok, Sup} = emqx_sup:start_link(), ok = start_autocluster(), + %% We need to make sure that emqx's listeners start before plugins + %% and modules. Since if the emqx-conf module/plugin is enabled, it will + %% try to start or update the listeners with the latest configuration + emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), ok = emqx_plugins:init(), _ = emqx_plugins:load(), _ = start_ce_modules(), - emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), register(emqx, self()), ok = emqx_alarm_handler:load(), print_vsn(), From 86b8d881656855b58bf012f35be93ab193ccd926 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Nov 2021 11:55:57 +0800 Subject: [PATCH 06/25] chore(emqx): update appup.src --- src/emqx.appup.src | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index b826775d6..d40db1093 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,6 +1,8 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -155,7 +157,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, From 74b6b5214a9c44683ae421f783143df50b2a2ef9 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 11 Nov 2021 10:33:31 +0100 Subject: [PATCH 07/25] test(emqx_cm_SUITE): add a gen_server call sync --- test/emqx_cm_SUITE.erl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index acafeb36f..a46c046ca 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -183,7 +183,8 @@ t_open_session_race_condition(_) -> exit(Winner, kill), receive {'DOWN', _, process, Winner, _} -> ok end, - ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync + ignored = gen_server:call(?CM, ignore, infinity), %% sync + ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). t_kick_session_discard_normal(_) -> @@ -260,6 +261,7 @@ test_kick_session(Action, Reason) -> ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) end, + ignored = gen_server:call(?CM, ignore, infinity), %% sync ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). @@ -271,10 +273,11 @@ test_kick_session(Action, Reason) -> %% The number of tasks should be large enough to ensure all workers have %% the chance to work on at least one of the tasks. flush_emqx_pool() -> + Ref = make_ref(), Self = self(), L = lists:seq(1, 1000), - lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), - lists:foreach(fun(I) -> receive {done, I} -> ok end end, L). + lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I, Ref} end, []) end, L), + lists:foreach(fun(I) -> receive {done, I, Ref} -> ok end end, L). t_discard_session_race(_) -> ClientId = rand_client_id(), From 21898e1dafd1e5ebf2c93c438be061622a66f894 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 11 Nov 2021 10:34:29 +0100 Subject: [PATCH 08/25] chore(emqx_dashboard): bump version after v4.3.10 release --- lib-ce/emqx_dashboard/src/emqx_dashboard.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src index 724a76237..e78f1c3f6 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src @@ -1,6 +1,6 @@ {application, emqx_dashboard, [{description, "EMQ X Web Dashboard"}, - {vsn, "4.3.6"}, % strict semver, bump manually! + {vsn, "4.3.7"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [kernel,stdlib,mnesia,minirest]}, From 04a4462f1e685e2616b4beb0d89d5f57735cd619 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Fri, 12 Nov 2021 10:36:27 +0800 Subject: [PATCH 09/25] chore(release): update version to 4.3.10 --- include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index c89dde010..e9f954b3c 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.3.9"}). +-define(EMQX_RELEASE, {opensource, "4.3.10"}). -else. From 4c29c3a5e5a9051aac70584f806dcfe20e87a1ab Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 16:24:42 +0800 Subject: [PATCH 10/25] chore: fill message headers --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 18 +++++++--- .../emqx_exproto/src/emqx_exproto_channel.erl | 29 ++++++++++++---- apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl | 18 ++++++++-- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 34 +++++++++++++------ 4 files changed, 76 insertions(+), 23 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index d465f9ca3..89fb411b2 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -244,15 +244,26 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) -> case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of allow -> _ = emqx_broker:publish( - emqx_message:set_flag(retain, false, - emqx_message:make(ClientId, ?QOS_0, Topic, Payload))), - ok; + packet_to_message(Topic, Payload, State)), ok; deny -> ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.", [Topic, ClientId]), {error, forbidden} end. +packet_to_message(Topic, Payload, + #state{clientid = ClientId, + username = Username, + peername = {PeerHost, _}}) -> + Message = emqx_message:set_flag( + retain, false, + emqx_message:make(ClientId, ?QOS_0, Topic, Payload) + ), + emqx_message:set_headers( + #{ proto_ver => 1 + , protocol => coap + , username => Username + , peerhost => PeerHost}, Message). %%-------------------------------------------------------------------- %% Deliver @@ -384,4 +395,3 @@ clientinfo(#state{peername = {PeerHost, _}, mountpoint => undefined, ws_cookie => undefined }. - diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 67f85f932..d8ceae4bd 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -340,17 +340,14 @@ handle_call({unsubscribe, TopicFilter}, handle_call({publish, Topic, Qos, Payload}, Channel = #channel{ conn_state = connected, - clientinfo = ClientInfo - = #{clientid := From, - mountpoint := Mountpoint}}) -> + clientinfo = ClientInfo}) -> case is_acl_enabled(ClientInfo) andalso emqx_access_control:check_acl(ClientInfo, publish, Topic) of deny -> {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; _ -> - Msg = emqx_message:make(From, Qos, Topic, Payload), - NMsg = emqx_mountpoint:mount(Mountpoint, Msg), - _ = emqx:publish(NMsg), + Msg = packet_to_message(Topic, Qos, Payload, Channel), + _ = emqx:publish(Msg), {reply, ok, Channel} end; @@ -419,6 +416,24 @@ is_anonymous(_AuthResult) -> false. clean_anonymous_clients() -> ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())). +packet_to_message(Topic, Qos, Payload, + #channel{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost, + mountpoint := Mountpoint}}) -> + Msg = emqx_message:make( + ClientId, Qos, + Topic, Payload, #{}, + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost}), + emqx_mountpoint:mount(Mountpoint, Msg). + %%-------------------------------------------------------------------- %% Sub/UnSub %%-------------------------------------------------------------------- @@ -591,6 +606,8 @@ default_conninfo(ConnInfo) -> ConnInfo#{clean_start => true, clientid => undefined, username => undefined, + proto_name => undefined, + proto_ver => undefined, conn_props => #{}, connected => true, connected_at => erlang:system_time(millisecond), diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 34c72dcca..4a5fafdb0 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -235,8 +235,20 @@ unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) -> emqx_broker:unsubscribe(Topic), emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]). -publish(Topic, Payload, Qos, EndpointName) -> - emqx_broker:publish(emqx_message:set_flag(retain, false, emqx_message:make(EndpointName, Qos, Topic, Payload))). +publish(Topic, Payload, Qos, + #lwm2m_state{ + version = ProtoVer, + peername = {PeerHost, _}, + endpoint_name = EndpointName}) -> + Message = emqx_message:set_flag( + retain, false, + emqx_message:make(EndpointName, Qos, Topic, Payload) + ), + NMessage = emqx_message:set_headers( + #{proto_ver => ProtoVer, + protocol => lwm2m, + peerhost => PeerHost}, Message), + emqx_broker:publish(NMessage). time_now() -> erlang:system_time(millisecond). @@ -281,7 +293,7 @@ do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpo emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}), NewPayload = maps:put(<<"msgType">>, EventType, Payload), Topic = uplink_topic(EventType, Lwm2mState), - publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name). + publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState). %%-------------------------------------------------------------------- %% Auto Observe diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index fc211be10..3dcc6052f 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -588,15 +588,29 @@ next_ackid() -> put(ackid, AckId + 1), AckId. -make_mqtt_message(Topic, Headers, Body) -> - Msg = emqx_message:make(stomp, Topic, Body), - Headers1 = lists:foldl(fun(Key, Headers0) -> - proplists:delete(Key, Headers0) - end, Headers, [<<"destination">>, - <<"content-length">>, - <<"content-type">>, - <<"transaction">>, - <<"receipt">>]), +make_mqtt_message(Topic, Headers, Body, + #pstate{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost}}) -> + Msg = emqx_message:make( + ClientId, ?QOS_0, + Topic, Body, #{}, + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost}), + Headers1 = lists:foldl( + fun(Key, Headers0) -> + proplists:delete(Key, Headers0) + end, Headers, [<<"destination">>, + <<"content-length">>, + <<"content-type">>, + <<"transaction">>, + <<"receipt">>]), emqx_message:set_headers(#{stomp_headers => Headers1}, Msg). receipt_id(Headers) -> @@ -611,7 +625,7 @@ handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, bod allow -> _ = maybe_send_receipt(receipt_id(Headers), State), _ = emqx_broker:publish( - make_mqtt_message(Topic, Headers, iolist_to_binary(Body)) + make_mqtt_message(Topic, Headers, iolist_to_binary(Body), State) ), State; deny -> From 0c5cb1b9ac52a2447c0c515494c4f83337d38f1f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 16:53:03 +0800 Subject: [PATCH 11/25] chore: update appup.src --- apps/emqx_coap/src/emqx_coap.app.src | 2 +- apps/emqx_coap/src/emqx_coap.appup.src | 9 +++++++++ apps/emqx_exproto/src/emqx_exproto.app.src | 2 +- apps/emqx_exproto/src/emqx_exproto.appup.src | 8 ++++++-- apps/emqx_lwm2m/src/emqx_lwm2m.app.src | 2 +- apps/emqx_lwm2m/src/emqx_lwm2m.appup.src | 6 ++++-- apps/emqx_stomp/src/emqx_stomp.app.src | 2 +- apps/emqx_stomp/src/emqx_stomp.appup.src | 10 ++++++++-- 8 files changed, 31 insertions(+), 10 deletions(-) create mode 100644 apps/emqx_coap/src/emqx_coap.appup.src diff --git a/apps/emqx_coap/src/emqx_coap.app.src b/apps/emqx_coap/src/emqx_coap.app.src index 2b5fcbb6a..dce5ef4f4 100644 --- a/apps/emqx_coap/src/emqx_coap.app.src +++ b/apps/emqx_coap/src/emqx_coap.app.src @@ -1,6 +1,6 @@ {application, emqx_coap, [{description, "EMQ X CoAP Gateway"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,gen_coap]}, diff --git a/apps/emqx_coap/src/emqx_coap.appup.src b/apps/emqx_coap/src/emqx_coap.appup.src new file mode 100644 index 000000000..b73e26be6 --- /dev/null +++ b/apps/emqx_coap/src/emqx_coap.appup.src @@ -0,0 +1,9 @@ +%% -*-: erlang -*- +{VSN, + [{"4.3.0",[ + {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, + {<<".*">>, []}], + [{"4.3.0",[ + {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, + {<<".*">>, []}] +}. diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src index f7cab4c2e..0ed54e6fd 100644 --- a/apps/emqx_exproto/src/emqx_exproto.app.src +++ b/apps/emqx_exproto/src/emqx_exproto.app.src @@ -1,6 +1,6 @@ {application, emqx_exproto, [{description, "EMQ X Extension for Protocol"}, - {vsn, "4.3.4"}, %% 4.3.3 is used by ee + {vsn, "4.3.5"}, %% 4.3.3 is used by ee {modules, []}, {registered, []}, {mod, {emqx_exproto_app, []}}, diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index e0a021af5..7da28e773 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,6 +1,8 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.3", + [{"4.3.4", + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {"4.3.3", [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -12,7 +14,9 @@ {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.3", + [{"4.3.4", + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {"4.3.3", [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {"4.3.2", diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.app.src b/apps/emqx_lwm2m/src/emqx_lwm2m.app.src index 551cf8d07..b929ad854 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.app.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.app.src @@ -1,6 +1,6 @@ {application,emqx_lwm2m, [{description,"EMQ X LwM2M Gateway"}, - {vsn, "4.3.4"}, % strict semver, bump manually! + {vsn, "4.3.5"}, % strict semver, bump manually! {modules,[]}, {registered,[emqx_lwm2m_sup]}, {applications,[kernel,stdlib,lwm2m_coap]}, diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index 600cf236b..0cd98db0d 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -1,5 +1,5 @@ %% -*-: erlang -*- -{"4.3.4", +{VSN, [ {<<"4\\.3\\.[0-1]">>, [ {restart_application, emqx_lwm2m} @@ -7,7 +7,9 @@ {"4.3.2", [ {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} ]}, - {"4.3.3", []} %% only config change + {<<"4\\.3\\.[3-4]">>, [ + {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} + ]} ], [ {<<"4\\.3\\.[0-1]">>, [ diff --git a/apps/emqx_stomp/src/emqx_stomp.app.src b/apps/emqx_stomp/src/emqx_stomp.app.src index d2ecae53b..c2f4b57d3 100644 --- a/apps/emqx_stomp/src/emqx_stomp.app.src +++ b/apps/emqx_stomp/src/emqx_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_stomp, [{description, "EMQ X Stomp Protocol Plugin"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "4.3.3"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_stomp_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index 5ce1f8bd3..a29390bc1 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -1,11 +1,17 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, + {"4.3.1",[ + {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}, {apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]}, {<<".*">>,[]}], - [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, + {"4.3.1",[ + {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}]}, {<<".*">>,[]}]}. From 439fb3a403b87fe76b3ae8705871a1bb84c0a8cb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 17:05:27 +0800 Subject: [PATCH 12/25] chore: fix elvis warnings --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 4 +- .../emqx_exproto/src/emqx_exproto_channel.erl | 2 - apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl | 47 ++++++++++++------- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 34 ++++++++++---- 4 files changed, 57 insertions(+), 30 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index 89fb411b2..f7f0b4eda 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -139,7 +139,7 @@ handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicL NewTopics = proplists:delete(Topic, TopicList), IsWild = emqx_topic:wildcard(Topic), {reply, chann_subscribe(Topic, State), State#state{sub_topics = - [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate}; + [{Topic, {IsWild, CoapPid}} | NewTopics]}, hibernate}; handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) -> NewTopics = proplists:delete(Topic, TopicList), @@ -281,7 +281,7 @@ do_deliver({Topic, Payload}, Subscribers) -> deliver_to_coap(_TopicName, _Payload, []) -> ok; -deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) -> +deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) -> Matched = case IsWild of true -> emqx_topic:match(TopicName, TopicFilter); false -> TopicName =:= TopicFilter diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index d8ceae4bd..44b8b64d3 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -606,8 +606,6 @@ default_conninfo(ConnInfo) -> ConnInfo#{clean_start => true, clientid => undefined, username => undefined, - proto_name => undefined, - proto_ver => undefined, conn_props => #{}, connected => true, connected_at => erlang:system_time(millisecond), diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 4a5fafdb0..7f40041a5 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -74,7 +74,8 @@ call(Pid, Msg, Timeout) -> Error -> {error, Error} end. -init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) -> +init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, + RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) -> Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""), Lwm2mState = #lwm2m_state{peername = Peername, endpoint_name = EndpointName, @@ -103,9 +104,10 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1)) end), emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)), - emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), + emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), - {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}}; + NTimer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired}), + {ok, Lwm2mState1#lwm2m_state{life_timer = NTimer}}; {error, Error} -> _ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined), {error, Error} @@ -133,7 +135,7 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi %% - report the registration info update, but only when objectList is updated. case NewRegInfo of #{<<"objectList">> := _} -> - emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo), + emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo), send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState); _ -> ok end @@ -186,7 +188,8 @@ deliver(#message{topic = Topic, payload = Payload}, started_at = StartedAt, endpoint_name = EndpointName}) -> IsCacheMode = is_cache_mode(RegInfo, StartedAt), - ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]), + ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, " + "Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]), AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName), Lwm2mState. @@ -256,7 +259,8 @@ time_now() -> erlang:system_time(millisecond). %% Deliver downlink message to coap %%-------------------------------------------------------------------- -deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)-> +deliver_to_coap(AlternatePath, JsonData, + CoapPid, CacheMode, EndpointName) when is_binary(JsonData)-> try TermData = emqx_json:decode(JsonData, [return_maps]), deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) @@ -285,7 +289,8 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when send_to_broker(EventType, Payload = #{}, Lwm2mState) -> do_send_to_broker(EventType, Payload, Lwm2mState). -do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) -> +do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, + #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) -> ReqPath = maps:get(<<"reqPath">>, Data, undefined), Code = maps:get(<<"code">>, Data, undefined), CodeMsg = maps:get(<<"codeMsg">>, Data, undefined), @@ -327,18 +332,27 @@ auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) -> observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) -> lists:foreach(fun(ObjectPath) -> - [ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath), + [ObjId | LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath), case ObjId of <<"19">> -> [ObjInsId | _LastPath1] = LastPath, case ObjInsId of <<"0">> -> - observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName); + observe_object_slowly( + AlternatePath, <<"/19/0/0">>, + CoapPid, 100, EndpointName + ); _ -> - observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName) + observe_object_slowly( + AlternatePath, ObjectPath, + CoapPid, 100, EndpointName + ) end; _ -> - observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName) + observe_object_slowly( + AlternatePath, ObjectPath, + CoapPid, 100, EndpointName + ) end end, ObjectList). @@ -392,11 +406,12 @@ get_cached_downlink_messages() -> is_cache_mode(RegInfo, StartedAt) -> case is_psm(RegInfo) orelse is_qmode(RegInfo) of true -> - QModeTimeWind = proplists:get_value(qmode_time_window, lwm2m_coap_responder:options(), 22), - Now = time_now(), - if (Now - StartedAt) >= QModeTimeWind -> true; - true -> false - end; + QModeTimeWind = proplists:get_value( + qmode_time_window, + lwm2m_coap_responder:options(), + 22 + ), + (time_now() - StartedAt) >= QModeTimeWind; false -> false end. diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index 3dcc6052f..4e371d9b0 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -108,6 +108,8 @@ , init/2 ]}). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + -type(pstate() :: #pstate{}). %% @doc Init protocol @@ -132,8 +134,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, AllowAnonymous = get_value(allow_anonymous, Opts, false), DefaultUser = get_value(default_user, Opts), - - #pstate{ + #pstate{ conninfo = NConnInfo, clientinfo = ClientInfo, heartfun = HeartFun, @@ -165,7 +166,7 @@ default_conninfo(ConnInfo) -> info(State) -> maps:from_list(info(?INFO_KEYS, State)). --spec info(list(atom())|atom(), pstate()) -> term(). +-spec info(list(atom()) | atom(), pstate()) -> term(). info(Keys, State) when is_list(Keys) -> [{Key, info(Key, State)} || Key <- Keys]; info(conninfo, #pstate{conninfo = ConnInfo}) -> @@ -288,7 +289,12 @@ received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_send_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, + receipt_id(Headers), + State + ) end; received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, @@ -346,7 +352,11 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers}, received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_ack_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, + receipt_id(Headers), + State) end; %% NACK @@ -357,7 +367,11 @@ received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) -> received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_nack_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, + receipt_id(Headers), + State) end; %% BEGIN @@ -516,9 +530,9 @@ negotiate_version(Accepts) -> negotiate_version(Ver, []) -> {error, <<"Supported protocol versions < ", Ver/binary>>}; -negotiate_version(Ver, [AcceptVer|_]) when Ver >= AcceptVer -> +negotiate_version(Ver, [AcceptVer | _]) when Ver >= AcceptVer -> {ok, AcceptVer}; -negotiate_version(Ver, [_|T]) -> +negotiate_version(Ver, [_ | T]) -> negotiate_version(Ver, T). check_login(Login, _, AllowAnonymous, _) @@ -537,7 +551,7 @@ check_login(Login, Passcode, _, DefaultUser) -> add_action(Id, Action, ReceiptId, State = #pstate{transaction = Trans}) -> case maps:get(Id, Trans, undefined) of {Ts, Actions} -> - NTrans = Trans#{Id => {Ts, [Action|Actions]}}, + NTrans = Trans#{Id => {Ts, [Action | Actions]}}, {ok, State#pstate{transaction = NTrans}}; _ -> send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State) @@ -713,7 +727,7 @@ find_sub_by_id(Id, Subs) -> end, Subs), case maps:to_list(Found) of [] -> undefined; - [Sub|_] -> Sub + [Sub | _] -> Sub end. is_acl_enabled(_) -> From 8e4c2c88c359aa37a3d4f697b475fdf763167739 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 12 Nov 2021 10:45:26 +0800 Subject: [PATCH 13/25] chore: use PROTO_VER marco --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index f7f0b4eda..a6ea09881 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -58,6 +58,8 @@ -define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false}). +-define(PROTO_VER, 1). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -260,7 +262,7 @@ packet_to_message(Topic, Payload, emqx_message:make(ClientId, ?QOS_0, Topic, Payload) ), emqx_message:set_headers( - #{ proto_ver => 1 + #{ proto_ver => ?PROTO_VER , protocol => coap , username => Username , peerhost => PeerHost}, Message). @@ -335,7 +337,7 @@ conninfo(#state{peername = Peername, peercert => nossl, %% TODO: dtls conn_mod => ?MODULE, proto_name => <<"CoAP">>, - proto_ver => 1, + proto_ver => ?PROTO_VER, clean_start => true, clientid => ClientId, username => undefined, From d8f37be210ea9b01f6eff448986bbf8cd2c0a07a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 12 Nov 2021 15:35:08 +0800 Subject: [PATCH 14/25] chore(lwm2m): fix bad appup.src --- apps/emqx_lwm2m/src/emqx_lwm2m.appup.src | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index 0cd98db0d..6656eb149 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -7,7 +7,8 @@ {"4.3.2", [ {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} ]}, - {<<"4\\.3\\.[3-4]">>, [ + {"4.3.3", []}, %% only config change + {"4.3.4", [ {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} ]} ], @@ -18,6 +19,9 @@ {"4.3.2", [ {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} ]}, - {"4.3.3", []} %% only config change + {"4.3.3", []}, %% only config change + {"4.3.4", [ + {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} + ]} ] }. From 0971567cff07be06820a1d98c36349ec26e60943 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Sun, 14 Nov 2021 18:23:30 +0100 Subject: [PATCH 15/25] build: ensure git tag matches release version --- pkg-vsn.sh | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg-vsn.sh b/pkg-vsn.sh index 904690ad1..f3bdb1884 100755 --- a/pkg-vsn.sh +++ b/pkg-vsn.sh @@ -15,9 +15,21 @@ fi ## emqx_release.hrl is the single source of truth for release version RELEASE="$(grep -E "define.+EMQX_RELEASE.+${EDITION}" include/emqx_release.hrl | cut -d '"' -f2)" -## git commit hash is added as suffix in case the git tag and release version is not an exact match -if [ -d .git ] && ! git describe --tags --match "[e|v]${RELEASE}" --exact >/dev/null 2>&1; then +git_exact_vsn() { + local tag + tag="$(git describe --tags --match "[e|v]*" --exact 2>/dev/null)" + echo "$tag" | sed 's/^[v|e]//g' +} + +GIT_EXACT_VSN="$(git_exact_vsn)" +if [ "$GIT_EXACT_VSN" != '' ]; then + if [ "$GIT_EXACT_VSN" != "$RELEASE" ]; then + echo "ERROR: Tagged $GIT_EXACT_VSN, but $RELEASE in include/emqx_release.hrl" 1>&2 + exit 1 + fi + SUFFIX='' +else SUFFIX="-$(git rev-parse HEAD | cut -b1-8)" fi -echo "${RELEASE}${SUFFIX:-}" +echo "${RELEASE}${SUFFIX}" From 25f504c90ac27d7685e598aab179c2885b033efe Mon Sep 17 00:00:00 2001 From: zhouzb Date: Fri, 12 Nov 2021 15:59:35 +0800 Subject: [PATCH 16/25] feat(mongo srv): support srv for mongodb authentication --- apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf | 9 +- .../priv/emqx_auth_mongo.schema | 37 +++++--- .../src/emqx_auth_mongo.app.src | 2 +- .../src/emqx_auth_mongo_sup.erl | 93 ++++++++++++++++++- 4 files changed, 121 insertions(+), 20 deletions(-) diff --git a/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf b/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf index 2a3d038f0..3d3d0ee5b 100644 --- a/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf +++ b/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf @@ -5,7 +5,13 @@ ## MongoDB Topology Type. ## ## Value: single | unknown | sharded | rs -auth.mongo.type = single +auth.mongo.type = + +## Whether to use SRV and TXT records. +## +## Value: true | false +## Default: false +auth.mongo.srv_record = false ## The set name if type is rs. ## @@ -37,7 +43,6 @@ auth.mongo.pool = 8 ## MongoDB AuthSource ## ## Value: String -## Default: mqtt ## auth.mongo.auth_source = admin ## MongoDB database diff --git a/apps/emqx_auth_mongo/priv/emqx_auth_mongo.schema b/apps/emqx_auth_mongo/priv/emqx_auth_mongo.schema index 8a2ff98b3..17a83c37c 100644 --- a/apps/emqx_auth_mongo/priv/emqx_auth_mongo.schema +++ b/apps/emqx_auth_mongo/priv/emqx_auth_mongo.schema @@ -6,8 +6,12 @@ {datatype, {enum, [single, unknown, sharded, rs]}} ]}. +{mapping, "auth.mongo.srv_record", "emqx_auth_mongo.server", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {mapping, "auth.mongo.rs_set_name", "emqx_auth_mongo.server", [ - {default, "mqtt"}, {datatype, string} ]}. @@ -41,7 +45,6 @@ ]}. {mapping, "auth.mongo.auth_source", "emqx_auth_mongo.server", [ - {default, "mqtt"}, {datatype, string} ]}. @@ -101,9 +104,9 @@ ]}. {translation, "emqx_auth_mongo.server", fun(Conf) -> - H = cuttlefish:conf_get("auth.mongo.server", Conf), - Hosts = string:tokens(H, ","), - Type0 = cuttlefish:conf_get("auth.mongo.type", Conf), + SrvRecord = cuttlefish:conf_get("auth.mongo.srv_record", Conf, false), + Server = cuttlefish:conf_get("auth.mongo.server", Conf), + Type = cuttlefish:conf_get("auth.mongo.type", Conf), Pool = cuttlefish:conf_get("auth.mongo.pool", Conf), %% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0 Login = cuttlefish:conf_get("auth.mongo.username", Conf, @@ -111,7 +114,10 @@ ), Passwd = cuttlefish:conf_get("auth.mongo.password", Conf), DB = cuttlefish:conf_get("auth.mongo.database", Conf), - AuthSrc = cuttlefish:conf_get("auth.mongo.auth_source", Conf), + AuthSource = case cuttlefish:conf_get("auth.mongo.auth_source", Conf, undefined) of + undefined -> []; + AuthSource0 -> [{auth_source, list_to_binary(AuthSource0)}] + end, R = cuttlefish:conf_get("auth.mongo.w_mode", Conf), W = cuttlefish:conf_get("auth.mongo.r_mode", Conf), Login0 = case Login =:= [] of @@ -156,8 +162,8 @@ false -> [] end, - WorkerOptions = [{database, list_to_binary(DB)}, {auth_source, list_to_binary(AuthSrc)}] - ++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl, + WorkerOptions = [{database, list_to_binary(DB)}] + ++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl ++ AuthSource, Vars = cuttlefish_variable:fuzzy_matches(["auth", "mongo", "topology", "$name"], Conf), Options = lists:map(fun({_, Name}) -> @@ -174,16 +180,17 @@ {list_to_atom(Name2), cuttlefish:conf_get("auth.mongo.topology."++Name, Conf)} end, Vars), - Type = case Type0 =:= rs of - true -> {Type0, list_to_binary(cuttlefish:conf_get("auth.mongo.rs_set_name", Conf))}; - false -> Type0 - end, - [{type, Type}, - {hosts, Hosts}, + ReplicaSet = case cuttlefish:conf_get("auth.mongo.rs_set_name", Conf, undefined) of + undefined -> []; + ReplicaSet0 -> [{rs_set_name, list_to_binary(ReplicaSet0)}] + end, + [{srv_record, SrvRecord}, + {type, Type}, + {server, Server}, {options, Options}, {worker_options, WorkerOptions}, {auto_reconnect, 1}, - {pool_size, Pool}] + {pool_size, Pool}] ++ ReplicaSet end}. %% The mongodb operation timeout is specified by the value of `cursor_timeout` from application config, diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src index cc4e72ef3..ab0b4ff56 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_mongo, [{description, "EMQ X Authentication/ACL with MongoDB"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_mongo_sup]}, {applications, [kernel,stdlib,mongodb,ecpool]}, diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo_sup.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo_sup.erl index 3f27cb1dd..55263494a 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo_sup.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo_sup.erl @@ -28,7 +28,96 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, PoolEnv} = application:get_env(?APP, server), - PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, PoolEnv), + {ok, Opts} = application:get_env(?APP, server), + NOpts = may_parse_srv_and_txt_records(Opts), + PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, NOpts), {ok, {{one_for_all, 10, 100}, [PoolSpec]}}. +may_parse_srv_and_txt_records(Opts) when is_list(Opts) -> + maps:to_list(may_parse_srv_and_txt_records(maps:from_list(Opts))); + +may_parse_srv_and_txt_records(#{type := Type, + srv_record := false, + server := Server} = Opts) -> + Hosts = to_hosts(Server), + case Type =:= rs of + true -> + case maps:get(rs_set_name, Opts, undefined) of + undefined -> + error({missing_parameter, rs_set_name}); + ReplicaSet -> + Opts#{type => {rs, ReplicaSet}, + hosts => Hosts} + end; + false -> + Opts#{hosts => Hosts} + end; + +may_parse_srv_and_txt_records(#{type := Type, + srv_record := true, + server := Server, + worker_options := WorkerOptions} = Opts) -> + Hosts = parse_srv_records(Server), + Opts0 = parse_txt_records(Type, Server), + NWorkerOptions = maps:to_list(maps:merge(maps:from_list(WorkerOptions), maps:with([auth_source], Opts0))), + NOpts = Opts#{hosts => Hosts, worker_options => NWorkerOptions}, + case Type =:= rs of + true -> + case maps:get(rs_set_name, Opts0, maps:get(rs_set_name, NOpts, undefined)) of + undefined -> + error({missing_parameter, rs_set_name}); + ReplicaSet -> + NOpts#{type => {Type, ReplicaSet}} + end; + false -> + NOpts + end. + +to_hosts(Server) -> + [string:trim(H) || H <- string:tokens(Server, ",")]. + +parse_srv_records(Server) -> + case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of + [] -> + error(service_not_found); + Services -> + [Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services] + end. + +parse_txt_records(Type, Server) -> + case inet_res:lookup(Server, in, txt) of + [] -> + #{}; + [[QueryString]] -> + case uri_string:dissect_query(QueryString) of + {error, _, _} -> + error({invalid_txt_record, invalid_query_string}); + Options -> + Fields = case Type of + rs -> ["authSource", "replicaSet"]; + _ -> ["authSource"] + end, + take_and_convert(Fields, Options) + end; + _ -> + error({invalid_txt_record, multiple_records}) + end. + +take_and_convert(Fields, Options) -> + take_and_convert(Fields, Options, #{}). + +take_and_convert([], [_ | _], _Acc) -> + error({invalid_txt_record, invalid_option}); +take_and_convert([], [], Acc) -> + Acc; +take_and_convert([Field | More], Options, Acc) -> + case lists:keytake(Field, 1, Options) of + {value, {"authSource", V}, NOptions} -> + take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)}); + {value, {"replicaSet", V}, NOptions} -> + take_and_convert(More, NOptions, Acc#{rs_set_name => list_to_binary(V)}); + {value, _, _} -> + error({invalid_txt_record, invalid_option}); + false -> + take_and_convert(More, Options, Acc) + end. From cb185389577263c17af0f3348b82c4f3fc50464a Mon Sep 17 00:00:00 2001 From: zhouzb Date: Mon, 15 Nov 2021 09:41:03 +0800 Subject: [PATCH 17/25] fix(mong srv): fix wrong configuration --- apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf b/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf index 3d3d0ee5b..8baddae19 100644 --- a/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf +++ b/apps/emqx_auth_mongo/etc/emqx_auth_mongo.conf @@ -5,7 +5,7 @@ ## MongoDB Topology Type. ## ## Value: single | unknown | sharded | rs -auth.mongo.type = +auth.mongo.type = single ## Whether to use SRV and TXT records. ## From f46084438b1994084137dd07a4322ebf20a1084e Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Thu, 28 Oct 2021 09:19:30 +0800 Subject: [PATCH 18/25] chore(cluster): add new type for dns auto cluster Signed-off-by: zhanghongtong --- etc/emqx.conf | 5 +++++ priv/emqx.schema | 8 +++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 7b81e40cc..78d383930 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -101,6 +101,11 @@ cluster.autoclean = 5m ## Value: String ## cluster.dns.app = emqx +## Type of dns record. +## +## Value: Value: a | srv +## cluster.dns.type = a + ##-------------------------------------------------------------------- ## Cluster using etcd diff --git a/priv/emqx.schema b/priv/emqx.schema index 602de56b9..f8df59da9 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -96,6 +96,11 @@ {datatype, string} ]}. +{mapping, "cluster.dns.type", "ekka.cluster_discovery", [ + {datatype, {enum, [a, srv]}}, + {default, a} +]}. + %%-------------------------------------------------------------------- %% Cluster using etcd @@ -171,7 +176,8 @@ {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}]; (dns) -> [{name, cuttlefish:conf_get("cluster.dns.name", Conf)}, - {app, cuttlefish:conf_get("cluster.dns.app", Conf)}]; + {app, cuttlefish:conf_get("cluster.dns.app", Conf)}, + {type, cuttlefish:conf_get("cluster.dns.type", Conf)}]; (etcd) -> SslOpts = fun(Conf) -> Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf), From 87a2667e35ec6ee62b8918f4d66c04f4f2b2e938 Mon Sep 17 00:00:00 2001 From: lafirest Date: Mon, 15 Nov 2021 10:57:59 +0800 Subject: [PATCH 19/25] fix(emqx_retainer): fix timer message error (#6156) * fix(emqx_retainer): fix timer message error --- apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- apps/emqx_retainer/src/emqx_retainer.erl | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 4ef423b78..a423fb9b7 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -1,6 +1,6 @@ {application, emqx_retainer, [{description, "EMQ X Retainer"}, - {vsn, "4.4.1"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index fcc5652cc..b6de9d9f6 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -156,7 +156,7 @@ start_expire_timer(0, State) -> start_expire_timer(undefined, State) -> State; start_expire_timer(Ms, State) -> - Timer = erlang:send_after(Ms, self(), stats), + Timer = erlang:send_after(Ms, self(), {expire, Ms}), State#state{expiry_timer = Timer}. handle_call(Req, _From, State) -> @@ -168,12 +168,14 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info(stats, State = #state{stats_fun = StatsFun}) -> + StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), StatsFun(retained_count()), - {noreply, State, hibernate}; + {noreply, State#state{stats_timer = StatsTimer}, hibernate}; -handle_info(expire, State) -> +handle_info({expire, Ms} = Expire, State) -> + Timer = erlang:send_after(Ms, self(), Expire), ok = expire_messages(), - {noreply, State, hibernate}; + {noreply, State#state{expiry_timer = Timer}, hibernate}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), From 0357f7ad853db6b83b1c92978885dacd3cbbb2e4 Mon Sep 17 00:00:00 2001 From: lafirest Date: Mon, 15 Nov 2021 11:00:04 +0800 Subject: [PATCH 20/25] improve(emqx_st_statistics): optimize the parameters of on_publish_done (#6151) * fix(emqx_st_statistics): optimize the parameters of on_publish_done --- .../emqx_st_statistics/emqx_st_statistics.erl | 35 +++++++++++++------ src/emqx_session.erl | 9 +++-- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl index 21cf2692e..f22aec41c 100644 --- a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl +++ b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl @@ -24,7 +24,7 @@ -logger_header("[SLOW TOPICS]"). --export([ start_link/1, on_publish_done/5, enable/0 +-export([ start_link/1, on_publish_done/3, enable/0 , disable/0, clear_history/0 ]). @@ -42,7 +42,7 @@ -type state() :: #{ config := proplist:proplist() , period := pos_integer() , last_tick_at := pos_integer() - , counter := counters:counter_ref() + , counter := counters:counters_ref() , enable := boolean() }. @@ -70,6 +70,13 @@ -type slow_log() :: #slow_log{}. -type top_k_map() :: #{emqx_types:topic() => top_k()}. +-type publish_done_env() :: #{ ignore_before_create := boolean() + , threshold := pos_integer() + , counter := counters:counters_ref() + }. + +-type publish_done_args() :: #{session_rebirth_time => pos_integer()}. + -ifdef(TEST). -define(TOPK_ACCESS, public). -else. @@ -90,13 +97,16 @@ start_link(Env) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). --spec on_publish_done(message(), - pos_integer(), boolean(), pos_integer(), counters:counters_ref()) -> ok. -on_publish_done(#message{timestamp = Timestamp}, Created, IgnoreBeforeCreate, _, _) +-spec on_publish_done(message(), publish_done_args(), publish_done_env()) -> ok. +on_publish_done(#message{timestamp = Timestamp}, + #{session_rebirth_time := Created}, + #{ignore_before_create := IgnoreBeforeCreate}) when IgnoreBeforeCreate, Timestamp < Created -> ok; -on_publish_done(#message{timestamp = Timestamp} = Msg, _, _, Threshold, Counter) -> +on_publish_done(#message{timestamp = Timestamp} = Msg, + _, + #{threshold := Threshold, counter := Counter}) -> case ?NOW - Timestamp of Elapsed when Elapsed > Threshold -> case get_log_quota(Counter) of @@ -202,7 +212,7 @@ init_topk_tab(_) -> , {read_concurrency, true} ]). --spec get_log_quota(counter:counter_ref()) -> boolean(). +-spec get_log_quota(counters:counters_ref()) -> boolean(). get_log_quota(Counter) -> case counters:get(Counter, ?QUOTA_IDX) of Quota when Quota > 0 -> @@ -212,7 +222,7 @@ get_log_quota(Counter) -> false end. --spec set_log_quota(proplists:proplist(), counter:counter_ref()) -> ok. +-spec set_log_quota(proplists:proplist(), counters:counters_ref()) -> ok. set_log_quota(Cfg, Counter) -> MaxLogNum = get_value(max_log_num, Cfg), counters:put(Counter, ?QUOTA_IDX, MaxLogNum). @@ -328,12 +338,15 @@ publish(TickTime, Cfg, Notices) -> load(IgnoreBeforeCreate, Threshold, Counter) -> _ = emqx:hook('message.publish_done', - fun ?MODULE:on_publish_done/5, - [IgnoreBeforeCreate, Threshold, Counter]), + fun ?MODULE:on_publish_done/3, + [#{ignore_before_create => IgnoreBeforeCreate, + threshold => Threshold, + counter => Counter} + ]), ok. unload() -> - emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/5). + emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3). -spec get_topic(proplists:proplist()) -> binary(). get_topic(Cfg) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d4b671e71..6122982ae 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -320,7 +320,8 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - emqx:run_hook('message.publish_done', [Msg, CreatedAt]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => CreatedAt}]), Inflight1 = emqx_inflight:delete(PacketId, Inflight), return_with(Msg, dequeue(Session#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> @@ -346,7 +347,8 @@ pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt} case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> %% execute hook here, because message record will be replaced by pubrel - emqx:run_hook('message.publish_done', [Msg, CreatedAt]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => CreatedAt}]), Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> @@ -443,7 +445,8 @@ deliver([Msg | More], Acc, Session) -> end. deliver_msg(Msg = #message{qos = ?QOS_0}, Session) -> - emqx:run_hook('message.publish_done', [Msg, Session#session.created_at]), + emqx:run_hook('message.publish_done', + [Msg, #{session_rebirth_time => Session#session.created_at}]), {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(Msg = #message{qos = QoS}, Session = From 6bd5fd218ec421b144f7c75211f46e12ca4b1da2 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 15 Nov 2021 11:02:45 +0800 Subject: [PATCH 21/25] chore: limit/page to position/bytes (#6161) --- .../src/emqx_trace/emqx_trace_api.erl | 25 ++++++++++++------- .../test/emqx_mod_trace_api_SUITE.erl | 8 +++--- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl index a5e5b2906..3af272776 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl @@ -16,6 +16,7 @@ -module(emqx_trace_api). -include_lib("emqx/include/logger.hrl"). +-include_lib("kernel/include/file.hrl"). %% API -export([ list_trace/2 @@ -75,7 +76,7 @@ download_zip_log(#{name := Name}, _Param) -> ZipDir = emqx_trace:zip_dir(), Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), ZipFileName = ZipDir ++ TraceLog, - {ok, ZipFile} = zip:zip(ZipFileName, Zips), + {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), emqx_trace:delete_files_after_send(ZipFileName, Zips), {ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}}; {error, Reason} -> @@ -88,7 +89,7 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) -> {ok, Node, Bin} -> ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, ok = file:write_file(ZipName, Bin), - [ZipName | Acc]; + [Node ++ "-" ++ TraceLog | Acc]; {error, Node, Reason} -> ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]), Acc @@ -101,20 +102,19 @@ collect_trace_file(TraceLog) -> BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]), Files. -%% _page as position and _limit as bytes for front-end reusing components stream_log_file(#{name := Name}, Params) -> Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())), - Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>), - Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>), + Position0 = proplists:get_value(<<"position">>, Params, <<"0">>), + Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>), Node = binary_to_existing_atom(Node0), Position = binary_to_integer(Position0), Bytes = binary_to_integer(Bytes0), case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of {ok, Bin} -> - Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes}, + Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, {ok, #{meta => Meta, items => Bin}}; - eof -> - Meta = #{<<"page">> => Position, <<"limit">> => Bytes}, + {eof, Size} -> + Meta = #{<<"position">> => Size, <<"bytes">> => Bytes}, {ok, #{meta => Meta, items => <<"">>}}; {error, Reason} -> logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]), @@ -134,6 +134,7 @@ read_trace_file(Name, Position, Limit) -> [] -> {error, not_found} end. +-dialyzer({nowarn_function, read_file/3}). read_file(Path, Offset, Bytes) -> {ok, IoDevice} = file:open(Path, [read, raw, binary]), try @@ -141,7 +142,13 @@ read_file(Path, Offset, Bytes) -> 0 -> ok; _ -> file:position(IoDevice, {bof, Offset}) end, - file:read(IoDevice, Bytes) + case file:read(IoDevice, Bytes) of + {ok, Bin} -> {ok, Bin}; + {error, Reason} -> {error, Reason}; + eof -> + #file_info{size = Size} = file:read_file_info(IoDevice), + {eof, Size} + end after file:close(IoDevice) end. diff --git a/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl index 609a2d93c..fc786dbd0 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_trace_api_SUITE.erl @@ -124,14 +124,14 @@ t_stream_log(_Config) -> {ok, FileBin} = file:read_file(File), ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]), Header = auth_header_(), - {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header), + {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10"), Header), #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary), ?assertEqual(10, byte_size(Bin)), - ?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta), - Path = api_path("trace/test_stream_log/log?_page=20&_limit=10"), + ?assertEqual(#{<<"position">> => 10, <<"bytes">> => 10}, Meta), + Path = api_path("trace/test_stream_log/log?position=20&bytes=10"), {ok, Binary1} = request_api(get, Path, Header), #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1), - ?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1), + ?assertEqual(#{<<"position">> => 30, <<"bytes">> => 10}, Meta1), ?assertEqual(10, byte_size(Bin1)), unload(), ok. From 23e2bd62c59a65d4f38ae661cb3e166c49839692 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 15 Nov 2021 15:53:49 +0800 Subject: [PATCH 22/25] fix: there should not be multiple layers of directories when download trace zip file (#6165) --- apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl | 4 ++-- apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl | 5 ++--- lib-ce/emqx_modules/src/emqx_mod_trace_api.erl | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl index 3af272776..e6c87d69f 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace_api.erl @@ -75,10 +75,10 @@ download_zip_log(#{name := Name}, _Param) -> TraceFiles = collect_trace_file(TraceLog), ZipDir = emqx_trace:zip_dir(), Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), - ZipFileName = ZipDir ++ TraceLog, + ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip", {ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]), emqx_trace:delete_files_after_send(ZipFileName, Zips), - {ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}}; + {ok, ZipFile}; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl index 1bd28d888..169fd50bc 100644 --- a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl @@ -336,9 +336,8 @@ t_download_log(_Config) -> {ok, _} = emqtt:connect(Client), [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], ct:sleep(100), - {ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} = - emqx_trace_api:download_zip_log(#{name => Name}, []), - ?assert(ZipFileSize > 0), + {ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []), + ?assert(filelib:file_size(ZipFile) > 0), ok = emqtt:disconnect(Client), unload(), ok. diff --git a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl index a3abbedf7..99fb97796 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl @@ -87,7 +87,7 @@ update_trace(Path, Params) -> download_zip_log(Path, Params) -> case emqx_trace_api:download_zip_log(Path, Params) of - {ok, _Header, _File}= Return -> Return; + {ok, File} -> minirest:return_file(File); {error, _Reason} = Err -> return(Err) end. From 61c68ddb35a269821ae30d2daf259583691d7766 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 15 Nov 2021 10:27:08 +0100 Subject: [PATCH 23/25] feat(rpc): Bump gen_rpc version --- priv/emqx.schema | 24 ++++++++++++++++++++++++ rebar.config | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index 602de56b9..b4f008a6c 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -362,11 +362,35 @@ end}. ]}. %% RPC server port. +{mapping, "rpc.driver", "gen_rpc.driver", +[ {default, tcp} +, {datatype, {enum, [tcp, ssl]}} +]}. + {mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [ {default, 5369}, {datatype, integer} ]}. +%% RPC SSL server port. +{mapping, "rpc.enable_ssl", "gen_rpc.ssl_server_port", [ + {default, 5369}, + {datatype, integer} +]}. + +%% RPC SSL certificates +{mapping, "rpc.certfile", "gen_rpc.certfile", [ + {datatype, string} +]}. + +{mapping, "rpc.keyfile", "gen_rpc.keyfile", [ + {datatype, string} +]}. + +{mapping, "rpc.cacertfile", "gen_rpc.cacertfile", [ + {datatype, string} +]}. + %% Number of tcp connections when connecting to RPC server {mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [ {default, 0}, diff --git a/rebar.config b/rebar.config index cc147dec3..52f07a12a 100644 --- a/rebar.config +++ b/rebar.config @@ -44,7 +44,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.2"}}} - , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.6.0"}}} + , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.0"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} From 96d2615cc84137368902b0ea09db21e71c0cb2b5 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Mon, 15 Nov 2021 17:41:22 +0800 Subject: [PATCH 24/25] fix: return error code when trace log not foundd (#6168) --- lib-ce/emqx_modules/src/emqx_mod_trace_api.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl index 99fb97796..1daab1520 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl @@ -88,8 +88,11 @@ update_trace(Path, Params) -> download_zip_log(Path, Params) -> case emqx_trace_api:download_zip_log(Path, Params) of {ok, File} -> minirest:return_file(File); - {error, _Reason} = Err -> return(Err) + {error, Reason} -> return({error, 'NOT_FOUND', Reason}) end. stream_log_file(Path, Params) -> - return(emqx_trace_api:stream_log_file(Path, Params)). + case emqx_trace_api:stream_log_file(Path, Params) of + {ok, File} -> return({ok, File}); + {error, Reason} -> return({error, 'NOT_FOUND', Reason}) + end. From ca1458d4d7e1040d6fcc7597d5d68b36a316e652 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 15 Nov 2021 15:34:48 +0100 Subject: [PATCH 25/25] chore(emqx_rule_engine): bump app vsn to 4.4.0 --- apps/emqx_rule_engine/src/emqx_rule_engine.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 66af0da21..98e5487e2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.3.6"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]},