From fefadbcd177b0a3f759c721580255050e36a2b38 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 9 Nov 2021 22:05:09 +0800 Subject: [PATCH 01/16] fix(emqx_stomp): fix hot-upgrade stopping listener failed When the upgrade is executed, all envs of plugins are cleared, which causes the listener of stomp to stop failing. This is only a temporary modification to ensure that the upgrade can be executed successfully. following fixes: https://github.com/emqx/emqx/pull/6105 --- apps/emqx_stomp/src/emqx_stomp.appup.src | 3 ++- apps/emqx_stomp/src/emqx_stomp.erl | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index bf4603e52..5ce1f8bd3 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -2,7 +2,8 @@ {VSN, [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{restart_application,emqx_stomp}]}, + [{restart_application,emqx_stomp}, + {apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]}, {<<".*">>,[]}], [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", diff --git a/apps/emqx_stomp/src/emqx_stomp.erl b/apps/emqx_stomp/src/emqx_stomp.erl index 9eafe3cf7..b8a66009c 100644 --- a/apps/emqx_stomp/src/emqx_stomp.erl +++ b/apps/emqx_stomp/src/emqx_stomp.erl @@ -33,6 +33,8 @@ , stop_listener/3 ]). +-export([force_clear_after_app_stoped/0]). + -export([init/1]). -define(APP, ?MODULE). @@ -52,6 +54,18 @@ start(_StartType, _StartArgs) -> stop(_State) -> stop_listeners(). +force_clear_after_app_stoped() -> + lists:foreach(fun({Name = {ProtoName, _}, _}) -> + case is_stomp_listener(ProtoName) of + true -> esockd:close(Name); + _ -> ok + end + end, esockd:listeners()). + +is_stomp_listener('stomp:tcp') -> true; +is_stomp_listener('stomp:ssl') -> true; +is_stomp_listener(_) -> false. + %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- From 133609a0400131f94b47f06bfe55e40d40041099 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 9 Nov 2021 20:51:07 +0800 Subject: [PATCH 02/16] fix(relup): configs for plugins are missing after relup --- bin/install_upgrade.escript | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/bin/install_upgrade.escript b/bin/install_upgrade.escript index 0658bdef2..97548cba8 100755 --- a/bin/install_upgrade.escript +++ b/bin/install_upgrade.escript @@ -248,15 +248,20 @@ parse_version(V) when is_list(V) -> hd(string:tokens(V,"/")). check_and_install(TargetNode, Vsn) -> - {ok, [[CurrAppConf]]} = rpc:call(TargetNode, init, get_argument, [config], ?TIMEOUT), + %% Backup the vm.args. VM args should be unchanged during hot upgrade + %% but we still backup it here {ok, [[CurrVmArgs]]} = rpc:call(TargetNode, init, get_argument, [vm_args], ?TIMEOUT), - case filename:extension(CurrAppConf) of - ".config" -> - {ok, _} = file:copy(CurrAppConf, filename:join(["releases", Vsn, "sys.config"])); - _ -> - {ok, _} = file:copy(CurrAppConf++".config", filename:join(["releases", Vsn, "sys.config"])) - end, {ok, _} = file:copy(CurrVmArgs, filename:join(["releases", Vsn, "vm.args"])), + %% Backup the sys.config, this will be used when we check and install release + %% NOTE: We cannot backup the old sys.config directly, because the + %% configs for plugins are only in app-envs, not in the old sys.config + Configs0 = + [{AppName, rpc:call(TargetNode, application, get_all_env, [AppName], ?TIMEOUT)} + || {AppName, _, _} <- rpc:call(TargetNode, application, which_applications, [], ?TIMEOUT)], + Configs1 = [{AppName, Conf} || {AppName, Conf} <- Configs0, Conf =/= []], + ok = file:write_file(filename:join(["releases", Vsn, "sys.config"]), io_lib:format("~p.", [Configs1])), + + %% check and install release case rpc:call(TargetNode, release_handler, check_install_release, [Vsn], ?TIMEOUT) of {ok, _OtherVsn, _Desc} -> From fa34d8353e521566caeb8f9aea383a6495365d78 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 10 Nov 2021 15:40:46 +0800 Subject: [PATCH 03/16] fix(test): flaky mqtt expiry test case. (#6112) --- test/emqx_mqtt_SUITE.erl | 112 +++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 39 deletions(-) diff --git a/test/emqx_mqtt_SUITE.erl b/test/emqx_mqtt_SUITE.erl index ae7e7382c..3a77134b8 100644 --- a/test/emqx_mqtt_SUITE.erl +++ b/test/emqx_mqtt_SUITE.erl @@ -62,79 +62,104 @@ t_conn_stats(_) -> t_tcp_sock_passive(_) -> with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []). -t_message_expiry_interval_1(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). +t_message_expiry_interval(_) -> + {CPublish, CControl} = message_expiry_interval_init(), + [message_expiry_interval_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]], + emqtt:stop(CPublish), + emqtt:stop(CControl). -t_message_expiry_interval_2(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). +t_message_not_expiry_interval(_) -> + {CPublish, CControl} = message_expiry_interval_init(), + [message_expiry_interval_not_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]], + emqtt:stop(CPublish), + emqtt:stop(CControl). message_expiry_interval_init() -> - {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-a">>}, + {ok, CPublish} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Publish">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientA), - {ok, _} = emqtt:connect(ClientB), - %% subscribe and disconnect client-b - emqtt:subscribe(ClientB, <<"t/a">>, 1), - emqtt:stop(ClientB), - ClientA. + {ok, CControl} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Control">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(CPublish), + {ok, _} = emqtt:connect(CVerify), + {ok, _} = emqtt:connect(CControl), + %% subscribe and disconnect Client-verify + emqtt:subscribe(CControl, <<"t/a">>, 1), + emqtt:subscribe(CVerify, <<"t/a">>, 1), + emqtt:stop(CVerify), + {CPublish, CControl}. -message_expiry_interval_exipred(ClientA, QoS) -> +message_expiry_interval_exipred(CPublish, CControl, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a and waiting for the message expired - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), - ct:sleep(2000), + emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, + <<"this will be purged in 1s">>, [{qos, QoS}]), + %% CControl make sure publish already store in broker. + receive + {publish,#{client_pid := CControl, topic := <<"t/a">>}} -> + ok + after 1000 -> + ct:fail(should_receive_publish) + end, + ct:sleep(1100), - %% resume the session for client-b - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + %% resume the session for Client-Verify + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + {ok, _} = emqtt:connect(CVerify), - %% verify client-b could not receive the publish message + %% verify Client-Verify could not receive the publish message receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> + {publish,#{client_pid := CVerify, topic := <<"t/a">>}} -> ct:fail(should_have_expired) after 300 -> ok end, - emqtt:stop(ClientB1). + emqtt:stop(CVerify). -message_expiry_interval_not_exipred(ClientA, QoS) -> +message_expiry_interval_not_exipred(CPublish, CControl, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, + <<"this will be purged in 20s">>, [{qos, QoS}]), - %% wait for 1s and then resume the session for client-b, the message should not expires + %% CControl make sure publish already store in broker. + receive + {publish,#{client_pid := CControl, topic := <<"t/a">>}} -> + ok + after 1000 -> + ct:fail(should_receive_publish) + end, + + %% wait for 1.2s and then resume the session for Client-Verify, the message should not expires %% as Message-Expiry-Interval = 20s - ct:sleep(1000), - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + ct:sleep(1200), + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + {ok, _} = emqtt:connect(CVerify), - %% verify client-b could receive the publish message and the Message-Expiry-Interval is set + %% verify Client-Verify could receive the publish message and the Message-Expiry-Interval is set receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>, + {publish,#{client_pid := CVerify, topic := <<"t/a">>, properties := #{'Message-Expiry-Interval' := MsgExpItvl}}} - when MsgExpItvl < 20 -> ok; + when MsgExpItvl =< 20 -> ok; {publish, _} = Msg -> ct:fail({incorrect_publish, Msg}) after 300 -> ct:fail(no_publish_received) end, - emqtt:stop(ClientB1). + emqtt:stop(CVerify). with_client(TestFun, _Options) -> ClientId = <<"t_conn">>, @@ -156,6 +181,15 @@ t_async_set_keepalive('end', _Config) -> ok. t_async_set_keepalive(_) -> + case os:type() of + {unix, darwin} -> + %% Mac OSX don't support the feature + ok; + _ -> + do_async_set_keepalive() + end. + +do_async_set_keepalive() -> ClientID = <<"client-tcp-keepalive">>, {ok, Client} = emqtt:start_link([{host, "localhost"}, {proto_ver,v5}, From 98136ff119fe60420657ff7a56112b03edb545a5 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Wed, 10 Nov 2021 14:41:37 +0100 Subject: [PATCH 04/16] chore(ehttpc): pin 0.1.12 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 4994d54ae..77df3dd6d 100644 --- a/rebar.config +++ b/rebar.config @@ -37,7 +37,7 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.10"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} From 8f07f26744d1ee1fa546dcadcc9d6d7ba13366d8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Nov 2021 11:28:17 +0800 Subject: [PATCH 05/16] fix: ensure starting listeners before plugins --- src/emqx_app.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 8d0ff11a8..be6f45e25 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -42,10 +42,13 @@ start(_Type, _Args) -> ekka:start(), {ok, Sup} = emqx_sup:start_link(), ok = start_autocluster(), + %% We need to make sure that emqx's listeners start before plugins + %% and modules. Since if the emqx-conf module/plugin is enabled, it will + %% try to start or update the listeners with the latest configuration + emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), ok = emqx_plugins:init(), _ = emqx_plugins:load(), _ = start_ce_modules(), - emqx_boot:is_enabled(listeners) andalso (ok = emqx_listeners:start()), register(emqx, self()), ok = emqx_alarm_handler:load(), print_vsn(), From 86b8d881656855b58bf012f35be93ab193ccd926 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 11 Nov 2021 11:55:57 +0800 Subject: [PATCH 06/16] chore(emqx): update appup.src --- src/emqx.appup.src | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index b826775d6..d40db1093 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,6 +1,8 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -155,7 +157,9 @@ {load_module,emqx_rpc,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.10",[{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, From 74b6b5214a9c44683ae421f783143df50b2a2ef9 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 11 Nov 2021 10:33:31 +0100 Subject: [PATCH 07/16] test(emqx_cm_SUITE): add a gen_server call sync --- test/emqx_cm_SUITE.erl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index acafeb36f..a46c046ca 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -183,7 +183,8 @@ t_open_session_race_condition(_) -> exit(Winner, kill), receive {'DOWN', _, process, Winner, _} -> ok end, - ignored = gen_server:call(emqx_cm, ignore, infinity), %% sync + ignored = gen_server:call(?CM, ignore, infinity), %% sync + ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). t_kick_session_discard_normal(_) -> @@ -260,6 +261,7 @@ test_kick_session(Action, Reason) -> ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2_000, R)), ?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2_000, R)) end, + ignored = gen_server:call(?CM, ignore, infinity), %% sync ok = flush_emqx_pool(), ?assertEqual([], emqx_cm:lookup_channels(ClientId)). @@ -271,10 +273,11 @@ test_kick_session(Action, Reason) -> %% The number of tasks should be large enough to ensure all workers have %% the chance to work on at least one of the tasks. flush_emqx_pool() -> + Ref = make_ref(), Self = self(), L = lists:seq(1, 1000), - lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L), - lists:foreach(fun(I) -> receive {done, I} -> ok end end, L). + lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I, Ref} end, []) end, L), + lists:foreach(fun(I) -> receive {done, I, Ref} -> ok end end, L). t_discard_session_race(_) -> ClientId = rand_client_id(), From 21898e1dafd1e5ebf2c93c438be061622a66f894 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 11 Nov 2021 10:34:29 +0100 Subject: [PATCH 08/16] chore(emqx_dashboard): bump version after v4.3.10 release --- lib-ce/emqx_dashboard/src/emqx_dashboard.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src index 724a76237..e78f1c3f6 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src @@ -1,6 +1,6 @@ {application, emqx_dashboard, [{description, "EMQ X Web Dashboard"}, - {vsn, "4.3.6"}, % strict semver, bump manually! + {vsn, "4.3.7"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [kernel,stdlib,mnesia,minirest]}, From 04a4462f1e685e2616b4beb0d89d5f57735cd619 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Fri, 12 Nov 2021 10:36:27 +0800 Subject: [PATCH 09/16] chore(release): update version to 4.3.10 --- include/emqx_release.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index c89dde010..e9f954b3c 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.3.9"}). +-define(EMQX_RELEASE, {opensource, "4.3.10"}). -else. From 4c29c3a5e5a9051aac70584f806dcfe20e87a1ab Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 16:24:42 +0800 Subject: [PATCH 10/16] chore: fill message headers --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 18 +++++++--- .../emqx_exproto/src/emqx_exproto_channel.erl | 29 ++++++++++++---- apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl | 18 ++++++++-- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 34 +++++++++++++------ 4 files changed, 76 insertions(+), 23 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index d465f9ca3..89fb411b2 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -244,15 +244,26 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) -> case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of allow -> _ = emqx_broker:publish( - emqx_message:set_flag(retain, false, - emqx_message:make(ClientId, ?QOS_0, Topic, Payload))), - ok; + packet_to_message(Topic, Payload, State)), ok; deny -> ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.", [Topic, ClientId]), {error, forbidden} end. +packet_to_message(Topic, Payload, + #state{clientid = ClientId, + username = Username, + peername = {PeerHost, _}}) -> + Message = emqx_message:set_flag( + retain, false, + emqx_message:make(ClientId, ?QOS_0, Topic, Payload) + ), + emqx_message:set_headers( + #{ proto_ver => 1 + , protocol => coap + , username => Username + , peerhost => PeerHost}, Message). %%-------------------------------------------------------------------- %% Deliver @@ -384,4 +395,3 @@ clientinfo(#state{peername = {PeerHost, _}, mountpoint => undefined, ws_cookie => undefined }. - diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 67f85f932..d8ceae4bd 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -340,17 +340,14 @@ handle_call({unsubscribe, TopicFilter}, handle_call({publish, Topic, Qos, Payload}, Channel = #channel{ conn_state = connected, - clientinfo = ClientInfo - = #{clientid := From, - mountpoint := Mountpoint}}) -> + clientinfo = ClientInfo}) -> case is_acl_enabled(ClientInfo) andalso emqx_access_control:check_acl(ClientInfo, publish, Topic) of deny -> {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; _ -> - Msg = emqx_message:make(From, Qos, Topic, Payload), - NMsg = emqx_mountpoint:mount(Mountpoint, Msg), - _ = emqx:publish(NMsg), + Msg = packet_to_message(Topic, Qos, Payload, Channel), + _ = emqx:publish(Msg), {reply, ok, Channel} end; @@ -419,6 +416,24 @@ is_anonymous(_AuthResult) -> false. clean_anonymous_clients() -> ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())). +packet_to_message(Topic, Qos, Payload, + #channel{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost, + mountpoint := Mountpoint}}) -> + Msg = emqx_message:make( + ClientId, Qos, + Topic, Payload, #{}, + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost}), + emqx_mountpoint:mount(Mountpoint, Msg). + %%-------------------------------------------------------------------- %% Sub/UnSub %%-------------------------------------------------------------------- @@ -591,6 +606,8 @@ default_conninfo(ConnInfo) -> ConnInfo#{clean_start => true, clientid => undefined, username => undefined, + proto_name => undefined, + proto_ver => undefined, conn_props => #{}, connected => true, connected_at => erlang:system_time(millisecond), diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 34c72dcca..4a5fafdb0 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -235,8 +235,20 @@ unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) -> emqx_broker:unsubscribe(Topic), emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]). -publish(Topic, Payload, Qos, EndpointName) -> - emqx_broker:publish(emqx_message:set_flag(retain, false, emqx_message:make(EndpointName, Qos, Topic, Payload))). +publish(Topic, Payload, Qos, + #lwm2m_state{ + version = ProtoVer, + peername = {PeerHost, _}, + endpoint_name = EndpointName}) -> + Message = emqx_message:set_flag( + retain, false, + emqx_message:make(EndpointName, Qos, Topic, Payload) + ), + NMessage = emqx_message:set_headers( + #{proto_ver => ProtoVer, + protocol => lwm2m, + peerhost => PeerHost}, Message), + emqx_broker:publish(NMessage). time_now() -> erlang:system_time(millisecond). @@ -281,7 +293,7 @@ do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpo emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}), NewPayload = maps:put(<<"msgType">>, EventType, Payload), Topic = uplink_topic(EventType, Lwm2mState), - publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name). + publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState). %%-------------------------------------------------------------------- %% Auto Observe diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index fc211be10..3dcc6052f 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -588,15 +588,29 @@ next_ackid() -> put(ackid, AckId + 1), AckId. -make_mqtt_message(Topic, Headers, Body) -> - Msg = emqx_message:make(stomp, Topic, Body), - Headers1 = lists:foldl(fun(Key, Headers0) -> - proplists:delete(Key, Headers0) - end, Headers, [<<"destination">>, - <<"content-length">>, - <<"content-type">>, - <<"transaction">>, - <<"receipt">>]), +make_mqtt_message(Topic, Headers, Body, + #pstate{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost}}) -> + Msg = emqx_message:make( + ClientId, ?QOS_0, + Topic, Body, #{}, + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost}), + Headers1 = lists:foldl( + fun(Key, Headers0) -> + proplists:delete(Key, Headers0) + end, Headers, [<<"destination">>, + <<"content-length">>, + <<"content-type">>, + <<"transaction">>, + <<"receipt">>]), emqx_message:set_headers(#{stomp_headers => Headers1}, Msg). receipt_id(Headers) -> @@ -611,7 +625,7 @@ handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, bod allow -> _ = maybe_send_receipt(receipt_id(Headers), State), _ = emqx_broker:publish( - make_mqtt_message(Topic, Headers, iolist_to_binary(Body)) + make_mqtt_message(Topic, Headers, iolist_to_binary(Body), State) ), State; deny -> From 0c5cb1b9ac52a2447c0c515494c4f83337d38f1f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 16:53:03 +0800 Subject: [PATCH 11/16] chore: update appup.src --- apps/emqx_coap/src/emqx_coap.app.src | 2 +- apps/emqx_coap/src/emqx_coap.appup.src | 9 +++++++++ apps/emqx_exproto/src/emqx_exproto.app.src | 2 +- apps/emqx_exproto/src/emqx_exproto.appup.src | 8 ++++++-- apps/emqx_lwm2m/src/emqx_lwm2m.app.src | 2 +- apps/emqx_lwm2m/src/emqx_lwm2m.appup.src | 6 ++++-- apps/emqx_stomp/src/emqx_stomp.app.src | 2 +- apps/emqx_stomp/src/emqx_stomp.appup.src | 10 ++++++++-- 8 files changed, 31 insertions(+), 10 deletions(-) create mode 100644 apps/emqx_coap/src/emqx_coap.appup.src diff --git a/apps/emqx_coap/src/emqx_coap.app.src b/apps/emqx_coap/src/emqx_coap.app.src index 2b5fcbb6a..dce5ef4f4 100644 --- a/apps/emqx_coap/src/emqx_coap.app.src +++ b/apps/emqx_coap/src/emqx_coap.app.src @@ -1,6 +1,6 @@ {application, emqx_coap, [{description, "EMQ X CoAP Gateway"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,gen_coap]}, diff --git a/apps/emqx_coap/src/emqx_coap.appup.src b/apps/emqx_coap/src/emqx_coap.appup.src new file mode 100644 index 000000000..b73e26be6 --- /dev/null +++ b/apps/emqx_coap/src/emqx_coap.appup.src @@ -0,0 +1,9 @@ +%% -*-: erlang -*- +{VSN, + [{"4.3.0",[ + {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, + {<<".*">>, []}], + [{"4.3.0",[ + {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, + {<<".*">>, []}] +}. diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src index f7cab4c2e..0ed54e6fd 100644 --- a/apps/emqx_exproto/src/emqx_exproto.app.src +++ b/apps/emqx_exproto/src/emqx_exproto.app.src @@ -1,6 +1,6 @@ {application, emqx_exproto, [{description, "EMQ X Extension for Protocol"}, - {vsn, "4.3.4"}, %% 4.3.3 is used by ee + {vsn, "4.3.5"}, %% 4.3.3 is used by ee {modules, []}, {registered, []}, {mod, {emqx_exproto_app, []}}, diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index e0a021af5..7da28e773 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,6 +1,8 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.3", + [{"4.3.4", + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {"4.3.3", [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -12,7 +14,9 @@ {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.3", + [{"4.3.4", + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {"4.3.3", [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {"4.3.2", diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.app.src b/apps/emqx_lwm2m/src/emqx_lwm2m.app.src index 551cf8d07..b929ad854 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.app.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.app.src @@ -1,6 +1,6 @@ {application,emqx_lwm2m, [{description,"EMQ X LwM2M Gateway"}, - {vsn, "4.3.4"}, % strict semver, bump manually! + {vsn, "4.3.5"}, % strict semver, bump manually! {modules,[]}, {registered,[emqx_lwm2m_sup]}, {applications,[kernel,stdlib,lwm2m_coap]}, diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index 600cf236b..0cd98db0d 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -1,5 +1,5 @@ %% -*-: erlang -*- -{"4.3.4", +{VSN, [ {<<"4\\.3\\.[0-1]">>, [ {restart_application, emqx_lwm2m} @@ -7,7 +7,9 @@ {"4.3.2", [ {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} ]}, - {"4.3.3", []} %% only config change + {<<"4\\.3\\.[3-4]">>, [ + {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} + ]} ], [ {<<"4\\.3\\.[0-1]">>, [ diff --git a/apps/emqx_stomp/src/emqx_stomp.app.src b/apps/emqx_stomp/src/emqx_stomp.app.src index d2ecae53b..c2f4b57d3 100644 --- a/apps/emqx_stomp/src/emqx_stomp.app.src +++ b/apps/emqx_stomp/src/emqx_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_stomp, [{description, "EMQ X Stomp Protocol Plugin"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "4.3.3"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_stomp_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index 5ce1f8bd3..a29390bc1 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -1,11 +1,17 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, + {"4.3.1",[ + {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}, {apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]}, {<<".*">>,[]}], - [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, + {"4.3.1",[ + {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}]}, {<<".*">>,[]}]}. From 439fb3a403b87fe76b3ae8705871a1bb84c0a8cb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 10 Nov 2021 17:05:27 +0800 Subject: [PATCH 12/16] chore: fix elvis warnings --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 4 +- .../emqx_exproto/src/emqx_exproto_channel.erl | 2 - apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl | 47 ++++++++++++------- apps/emqx_stomp/src/emqx_stomp_protocol.erl | 34 ++++++++++---- 4 files changed, 57 insertions(+), 30 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index 89fb411b2..f7f0b4eda 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -139,7 +139,7 @@ handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicL NewTopics = proplists:delete(Topic, TopicList), IsWild = emqx_topic:wildcard(Topic), {reply, chann_subscribe(Topic, State), State#state{sub_topics = - [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate}; + [{Topic, {IsWild, CoapPid}} | NewTopics]}, hibernate}; handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) -> NewTopics = proplists:delete(Topic, TopicList), @@ -281,7 +281,7 @@ do_deliver({Topic, Payload}, Subscribers) -> deliver_to_coap(_TopicName, _Payload, []) -> ok; -deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) -> +deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) -> Matched = case IsWild of true -> emqx_topic:match(TopicName, TopicFilter); false -> TopicName =:= TopicFilter diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index d8ceae4bd..44b8b64d3 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -606,8 +606,6 @@ default_conninfo(ConnInfo) -> ConnInfo#{clean_start => true, clientid => undefined, username => undefined, - proto_name => undefined, - proto_ver => undefined, conn_props => #{}, connected => true, connected_at => erlang:system_time(millisecond), diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 4a5fafdb0..7f40041a5 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -74,7 +74,8 @@ call(Pid, Msg, Timeout) -> Error -> {error, Error} end. -init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) -> +init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, + RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) -> Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""), Lwm2mState = #lwm2m_state{peername = Peername, endpoint_name = EndpointName, @@ -103,9 +104,10 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1)) end), emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)), - emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), + emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), - {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}}; + NTimer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired}), + {ok, Lwm2mState1#lwm2m_state{life_timer = NTimer}}; {error, Error} -> _ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined), {error, Error} @@ -133,7 +135,7 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi %% - report the registration info update, but only when objectList is updated. case NewRegInfo of #{<<"objectList">> := _} -> - emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo), + emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo), send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState); _ -> ok end @@ -186,7 +188,8 @@ deliver(#message{topic = Topic, payload = Payload}, started_at = StartedAt, endpoint_name = EndpointName}) -> IsCacheMode = is_cache_mode(RegInfo, StartedAt), - ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]), + ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, " + "Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]), AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName), Lwm2mState. @@ -256,7 +259,8 @@ time_now() -> erlang:system_time(millisecond). %% Deliver downlink message to coap %%-------------------------------------------------------------------- -deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)-> +deliver_to_coap(AlternatePath, JsonData, + CoapPid, CacheMode, EndpointName) when is_binary(JsonData)-> try TermData = emqx_json:decode(JsonData, [return_maps]), deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) @@ -285,7 +289,8 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when send_to_broker(EventType, Payload = #{}, Lwm2mState) -> do_send_to_broker(EventType, Payload, Lwm2mState). -do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) -> +do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, + #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) -> ReqPath = maps:get(<<"reqPath">>, Data, undefined), Code = maps:get(<<"code">>, Data, undefined), CodeMsg = maps:get(<<"codeMsg">>, Data, undefined), @@ -327,18 +332,27 @@ auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) -> observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) -> lists:foreach(fun(ObjectPath) -> - [ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath), + [ObjId | LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath), case ObjId of <<"19">> -> [ObjInsId | _LastPath1] = LastPath, case ObjInsId of <<"0">> -> - observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName); + observe_object_slowly( + AlternatePath, <<"/19/0/0">>, + CoapPid, 100, EndpointName + ); _ -> - observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName) + observe_object_slowly( + AlternatePath, ObjectPath, + CoapPid, 100, EndpointName + ) end; _ -> - observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName) + observe_object_slowly( + AlternatePath, ObjectPath, + CoapPid, 100, EndpointName + ) end end, ObjectList). @@ -392,11 +406,12 @@ get_cached_downlink_messages() -> is_cache_mode(RegInfo, StartedAt) -> case is_psm(RegInfo) orelse is_qmode(RegInfo) of true -> - QModeTimeWind = proplists:get_value(qmode_time_window, lwm2m_coap_responder:options(), 22), - Now = time_now(), - if (Now - StartedAt) >= QModeTimeWind -> true; - true -> false - end; + QModeTimeWind = proplists:get_value( + qmode_time_window, + lwm2m_coap_responder:options(), + 22 + ), + (time_now() - StartedAt) >= QModeTimeWind; false -> false end. diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index 3dcc6052f..4e371d9b0 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -108,6 +108,8 @@ , init/2 ]}). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + -type(pstate() :: #pstate{}). %% @doc Init protocol @@ -132,8 +134,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, AllowAnonymous = get_value(allow_anonymous, Opts, false), DefaultUser = get_value(default_user, Opts), - - #pstate{ + #pstate{ conninfo = NConnInfo, clientinfo = ClientInfo, heartfun = HeartFun, @@ -165,7 +166,7 @@ default_conninfo(ConnInfo) -> info(State) -> maps:from_list(info(?INFO_KEYS, State)). --spec info(list(atom())|atom(), pstate()) -> term(). +-spec info(list(atom()) | atom(), pstate()) -> term(). info(Keys, State) when is_list(Keys) -> [{Key, info(Key, State)} || Key <- Keys]; info(conninfo, #pstate{conninfo = ConnInfo}) -> @@ -288,7 +289,12 @@ received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_send_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, + receipt_id(Headers), + State + ) end; received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, @@ -346,7 +352,11 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers}, received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_ack_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, + receipt_id(Headers), + State) end; %% NACK @@ -357,7 +367,11 @@ received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) -> received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_nack_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, + receipt_id(Headers), + State) end; %% BEGIN @@ -516,9 +530,9 @@ negotiate_version(Accepts) -> negotiate_version(Ver, []) -> {error, <<"Supported protocol versions < ", Ver/binary>>}; -negotiate_version(Ver, [AcceptVer|_]) when Ver >= AcceptVer -> +negotiate_version(Ver, [AcceptVer | _]) when Ver >= AcceptVer -> {ok, AcceptVer}; -negotiate_version(Ver, [_|T]) -> +negotiate_version(Ver, [_ | T]) -> negotiate_version(Ver, T). check_login(Login, _, AllowAnonymous, _) @@ -537,7 +551,7 @@ check_login(Login, Passcode, _, DefaultUser) -> add_action(Id, Action, ReceiptId, State = #pstate{transaction = Trans}) -> case maps:get(Id, Trans, undefined) of {Ts, Actions} -> - NTrans = Trans#{Id => {Ts, [Action|Actions]}}, + NTrans = Trans#{Id => {Ts, [Action | Actions]}}, {ok, State#pstate{transaction = NTrans}}; _ -> send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State) @@ -713,7 +727,7 @@ find_sub_by_id(Id, Subs) -> end, Subs), case maps:to_list(Found) of [] -> undefined; - [Sub|_] -> Sub + [Sub | _] -> Sub end. is_acl_enabled(_) -> From 8e4c2c88c359aa37a3d4f697b475fdf763167739 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 12 Nov 2021 10:45:26 +0800 Subject: [PATCH 13/16] chore: use PROTO_VER marco --- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index f7f0b4eda..a6ea09881 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -58,6 +58,8 @@ -define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false}). +-define(PROTO_VER, 1). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -260,7 +262,7 @@ packet_to_message(Topic, Payload, emqx_message:make(ClientId, ?QOS_0, Topic, Payload) ), emqx_message:set_headers( - #{ proto_ver => 1 + #{ proto_ver => ?PROTO_VER , protocol => coap , username => Username , peerhost => PeerHost}, Message). @@ -335,7 +337,7 @@ conninfo(#state{peername = Peername, peercert => nossl, %% TODO: dtls conn_mod => ?MODULE, proto_name => <<"CoAP">>, - proto_ver => 1, + proto_ver => ?PROTO_VER, clean_start => true, clientid => ClientId, username => undefined, From d8f37be210ea9b01f6eff448986bbf8cd2c0a07a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 12 Nov 2021 15:35:08 +0800 Subject: [PATCH 14/16] chore(lwm2m): fix bad appup.src --- apps/emqx_lwm2m/src/emqx_lwm2m.appup.src | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index 0cd98db0d..6656eb149 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -7,7 +7,8 @@ {"4.3.2", [ {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} ]}, - {<<"4\\.3\\.[3-4]">>, [ + {"4.3.3", []}, %% only config change + {"4.3.4", [ {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} ]} ], @@ -18,6 +19,9 @@ {"4.3.2", [ {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} ]}, - {"4.3.3", []} %% only config change + {"4.3.3", []}, %% only config change + {"4.3.4", [ + {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} + ]} ] }. From 0971567cff07be06820a1d98c36349ec26e60943 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Sun, 14 Nov 2021 18:23:30 +0100 Subject: [PATCH 15/16] build: ensure git tag matches release version --- pkg-vsn.sh | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg-vsn.sh b/pkg-vsn.sh index 904690ad1..f3bdb1884 100755 --- a/pkg-vsn.sh +++ b/pkg-vsn.sh @@ -15,9 +15,21 @@ fi ## emqx_release.hrl is the single source of truth for release version RELEASE="$(grep -E "define.+EMQX_RELEASE.+${EDITION}" include/emqx_release.hrl | cut -d '"' -f2)" -## git commit hash is added as suffix in case the git tag and release version is not an exact match -if [ -d .git ] && ! git describe --tags --match "[e|v]${RELEASE}" --exact >/dev/null 2>&1; then +git_exact_vsn() { + local tag + tag="$(git describe --tags --match "[e|v]*" --exact 2>/dev/null)" + echo "$tag" | sed 's/^[v|e]//g' +} + +GIT_EXACT_VSN="$(git_exact_vsn)" +if [ "$GIT_EXACT_VSN" != '' ]; then + if [ "$GIT_EXACT_VSN" != "$RELEASE" ]; then + echo "ERROR: Tagged $GIT_EXACT_VSN, but $RELEASE in include/emqx_release.hrl" 1>&2 + exit 1 + fi + SUFFIX='' +else SUFFIX="-$(git rev-parse HEAD | cut -b1-8)" fi -echo "${RELEASE}${SUFFIX:-}" +echo "${RELEASE}${SUFFIX}" From ca1458d4d7e1040d6fcc7597d5d68b36a316e652 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 15 Nov 2021 15:34:48 +0100 Subject: [PATCH 16/16] chore(emqx_rule_engine): bump app vsn to 4.4.0 --- apps/emqx_rule_engine/src/emqx_rule_engine.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 66af0da21..98e5487e2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.3.6"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]},