fix: republish MQTT msgs with properties crash

This commit is contained in:
Shawn 2023-02-08 18:44:46 +08:00
parent efc409bdd6
commit 04cae4ef60
3 changed files with 57 additions and 14 deletions

View File

@ -853,9 +853,25 @@ printable_maps(Headers) ->
value => Value
} || {Key, Value} <- V0]
};
(K, V, AccIn) when is_map(V) ->
AccIn#{K => printable_maps(V)};
(K, V, AccIn) when is_list(V) ->
AccIn#{K => printable_list(V)};
(_K, V, AccIn) when is_tuple(V) ->
%% internal header
%% internal header, remove it
AccIn;
(K, V, AccIn) ->
AccIn#{K => V}
end, #{}, Headers).
printable_list(L) ->
lists:filtermap(fun printable_element/1, L).
printable_element(E) when is_map(E) ->
{true, printable_maps(E)};
printable_element(E) when is_tuple(E) ->
false;
printable_element(E) when is_list(E) ->
{true, printable_list(E)};
printable_element(E) ->
{true, E}.

View File

@ -36,18 +36,44 @@ t_mod_hook_fun(_) ->
]].
t_printable_maps(_) ->
Headers = #{peerhost => {127,0,0,1},
peername => {{127,0,0,1}, 9980},
sockname => {{127,0,0,1}, 1883},
redispatch_to => {<<"group">>, <<"sub/topic/+">>},
shared_dispatch_ack => {self(), ref}
},
TestMap = #{
peerhost => {127,0,0,1},
peername => {{127,0,0,1}, 9980},
sockname => {{127,0,0,1}, 1883},
redispatch_to => {<<"group">>, <<"sub/topic/+">>},
shared_dispatch_ack => {self(), ref},
string => <<"abc">>,
atom => abc,
integer => 1,
float => 1.0,
simple_list => [1, 1.0, a, "abc", <<"abc">>, {a,b}]
},
Headers = TestMap#{
map => TestMap,
map_list => [
TestMap#{
map => TestMap
}
]
},
Converted = emqx_rule_events:printable_maps(Headers),
?assertMatch(
#{peerhost := <<"127.0.0.1">>,
peername := <<"127.0.0.1:9980">>,
sockname := <<"127.0.0.1:1883">>
}, Converted),
?assertNot(maps:is_key(redispatch_to, Converted)),
?assertNot(maps:is_key(shared_dispatch_ack, Converted)),
Verify = fun(Result) ->
?assertMatch(
#{peerhost := <<"127.0.0.1">>,
peername := <<"127.0.0.1:9980">>,
sockname := <<"127.0.0.1:1883">>,
string := <<"abc">>,
atom := abc,
integer := 1,
float := 1.0,
simple_list := [1, 1.0, a, "abc", <<"abc">>] %% {a,b} is removed
}, Result),
?assertNot(maps:is_key(redispatch_to, Result)),
?assertNot(maps:is_key(shared_dispatch_ack, Result)),
%% make sure the result is jsonable
_ = emqx_json:encode(Result)
end,
Verify(maps:get(map, Converted)),
Verify(maps:get(map, lists:nth(1, maps:get(map_list, Converted)))),
Verify(Converted),
ok.

View File

@ -90,6 +90,7 @@ end_per_testcase(_, Config) ->
common_init_per_testcase() ->
AlarmOpts = [{actions, [log, publish]}, {size_limit, 1000}, {validity_period, 86400}],
_ = emqx_alarm:mnesia(boot),
{ok, _} = emqx_alarm:start_link(AlarmOpts),
{ok, _} = emqx_rule_monitor:start_link().