From 04cae4ef6051dea283297bb46771d812a3c0b774 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 8 Feb 2023 18:44:46 +0800 Subject: [PATCH] fix: republish MQTT msgs with properties crash --- .../emqx_rule_engine/src/emqx_rule_events.erl | 18 ++++++- .../test/emqx_rule_events_SUITE.erl | 52 ++++++++++++++----- .../test/emqx_rule_monitor_SUITE.erl | 1 + 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index f64e389d0..5406845d8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -853,9 +853,25 @@ printable_maps(Headers) -> value => Value } || {Key, Value} <- V0] }; + (K, V, AccIn) when is_map(V) -> + AccIn#{K => printable_maps(V)}; + (K, V, AccIn) when is_list(V) -> + AccIn#{K => printable_list(V)}; (_K, V, AccIn) when is_tuple(V) -> - %% internal header + %% internal header, remove it AccIn; (K, V, AccIn) -> AccIn#{K => V} end, #{}, Headers). + +printable_list(L) -> + lists:filtermap(fun printable_element/1, L). + +printable_element(E) when is_map(E) -> + {true, printable_maps(E)}; +printable_element(E) when is_tuple(E) -> + false; +printable_element(E) when is_list(E) -> + {true, printable_list(E)}; +printable_element(E) -> + {true, E}. diff --git a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl index 14376c91d..ec9fccab2 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl @@ -36,18 +36,44 @@ t_mod_hook_fun(_) -> ]]. t_printable_maps(_) -> - Headers = #{peerhost => {127,0,0,1}, - peername => {{127,0,0,1}, 9980}, - sockname => {{127,0,0,1}, 1883}, - redispatch_to => {<<"group">>, <<"sub/topic/+">>}, - shared_dispatch_ack => {self(), ref} - }, + TestMap = #{ + peerhost => {127,0,0,1}, + peername => {{127,0,0,1}, 9980}, + sockname => {{127,0,0,1}, 1883}, + redispatch_to => {<<"group">>, <<"sub/topic/+">>}, + shared_dispatch_ack => {self(), ref}, + string => <<"abc">>, + atom => abc, + integer => 1, + float => 1.0, + simple_list => [1, 1.0, a, "abc", <<"abc">>, {a,b}] + }, + Headers = TestMap#{ + map => TestMap, + map_list => [ + TestMap#{ + map => TestMap + } + ] + }, Converted = emqx_rule_events:printable_maps(Headers), - ?assertMatch( - #{peerhost := <<"127.0.0.1">>, - peername := <<"127.0.0.1:9980">>, - sockname := <<"127.0.0.1:1883">> - }, Converted), - ?assertNot(maps:is_key(redispatch_to, Converted)), - ?assertNot(maps:is_key(shared_dispatch_ack, Converted)), + Verify = fun(Result) -> + ?assertMatch( + #{peerhost := <<"127.0.0.1">>, + peername := <<"127.0.0.1:9980">>, + sockname := <<"127.0.0.1:1883">>, + string := <<"abc">>, + atom := abc, + integer := 1, + float := 1.0, + simple_list := [1, 1.0, a, "abc", <<"abc">>] %% {a,b} is removed + }, Result), + ?assertNot(maps:is_key(redispatch_to, Result)), + ?assertNot(maps:is_key(shared_dispatch_ack, Result)), + %% make sure the result is jsonable + _ = emqx_json:encode(Result) + end, + Verify(maps:get(map, Converted)), + Verify(maps:get(map, lists:nth(1, maps:get(map_list, Converted)))), + Verify(Converted), ok. diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl index 1c478fb63..646f2644f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -90,6 +90,7 @@ end_per_testcase(_, Config) -> common_init_per_testcase() -> AlarmOpts = [{actions, [log, publish]}, {size_limit, 1000}, {validity_period, 86400}], + _ = emqx_alarm:mnesia(boot), {ok, _} = emqx_alarm:start_link(AlarmOpts), {ok, _} = emqx_rule_monitor:start_link().