diff --git a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl index ad7112707..f074055f9 100644 --- a/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx_plugin_libs/src/emqx_trace/emqx_trace.erl @@ -368,14 +368,12 @@ classify_by_time([Trace | Traces], Now, Wait, Run, Finish) -> classify_by_time(Traces, Now, Wait, [Trace | Run], Finish). to_trace(TraceParam) -> - case to_trace(ensure_proplists(TraceParam), #?TRACE{}) of + case to_trace(ensure_map(TraceParam), #?TRACE{}) of {error, Reason} -> {error, Reason}; {ok, #?TRACE{name = undefined}} -> {error, "name required"}; {ok, #?TRACE{type = undefined}} -> {error, "type=[topic,clientid,ip_address] required"}; - {ok, #?TRACE{filter = undefined}} -> - {error, "topic/clientid/ip_address filter required"}; {ok, TraceRec0 = #?TRACE{}} -> case fill_default(TraceRec0) of #?TRACE{start_at = Start, end_at = End} when End =< Start -> @@ -385,13 +383,13 @@ to_trace(TraceParam) -> end end. -ensure_proplists(#{} = Trace) -> maps:to_list(Trace); -ensure_proplists(Trace) when is_list(Trace) -> +ensure_map(#{} = Trace) -> Trace; +ensure_map(Trace) when is_list(Trace) -> lists:foldl( - fun({K, V}, Acc) when is_binary(K) -> [{binary_to_existing_atom(K), V} | Acc]; - ({K, V}, Acc) when is_atom(K) -> [{K, V} | Acc]; + fun({K, V}, Acc) when is_binary(K) -> Acc#{binary_to_existing_atom(K) => V}; + ({K, V}, Acc) when is_atom(K) -> Acc#{K => V}; (_, Acc) -> Acc - end, [], Trace). + end, #{}, Trace). fill_default(Trace = #?TRACE{start_at = undefined}) -> fill_default(Trace#?TRACE{start_at = erlang:system_time(second)}); @@ -401,45 +399,45 @@ fill_default(Trace) -> Trace. -define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_]*$"). -to_trace([], Rec) -> {ok, Rec}; -to_trace([{name, Name} | Trace], Rec) -> +to_trace(#{name := Name} = Trace, Rec) -> case re:run(Name, ?NAME_RE) of nomatch -> {error, "Name should be " ?NAME_RE}; - _ -> to_trace(Trace, Rec#?TRACE{name = Name}) + _ -> to_trace(maps:remove(name, Trace), Rec#?TRACE{name = Name}) end; -to_trace([{type, Type} | Trace], Rec) -> - case lists:member(Type, [<<"clientid">>, <<"topic">>, <<"ip_address">>]) of - true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)}); - false -> {error, "incorrect type: only support clientid/topic/ip_address"} +to_trace(#{type := <<"clientid">>, clientid := Filter} = Trace, Rec) -> + Trace0 = maps:without([type, clientid], Trace), + to_trace(Trace0, Rec#?TRACE{type = clientid, filter = Filter}); +to_trace(#{type := <<"topic">>, topic := Filter} = Trace, Rec) -> + case validate_topic(Filter) of + ok -> + Trace0 = maps:without([type, topic], Trace), + to_trace(Trace0, Rec#?TRACE{type = topic, filter = Filter}); + Error -> Error end; -to_trace([{topic, Topic} | Trace], Rec) -> - case validate_topic(Topic) of - ok -> to_trace(Trace, Rec#?TRACE{filter = Topic}); - {error, Reason} -> {error, Reason} +to_trace(#{type := <<"ip_address">>, ip_address := Filter} = Trace, Rec) -> + case validate_ip_address(Filter) of + ok -> + Trace0 = maps:without([type, ip_address], Trace), + to_trace(Trace0, Rec#?TRACE{type = ip_address, filter = Filter}); + Error -> Error end; -to_trace([{clientid, ClientId} | Trace], Rec) -> - to_trace(Trace, Rec#?TRACE{filter = ClientId}); -to_trace([{ip_address, IP} | Trace], Rec) -> - case inet:parse_address(binary_to_list(IP)) of - {ok, _} -> to_trace(Trace, Rec#?TRACE{filter = binary_to_list(IP)}); - {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))} - end; -to_trace([{start_at, StartAt} | Trace], Rec) -> +to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])}; +to_trace(#{start_at := StartAt} = Trace, Rec) -> case to_system_second(StartAt) of - {ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec}); + {ok, Sec} -> to_trace(maps:remove(start_at, Trace), Rec#?TRACE{start_at = Sec}); {error, Reason} -> {error, Reason} end; -to_trace([{end_at, EndAt} | Trace], Rec) -> +to_trace(#{end_at := EndAt} = Trace, Rec) -> Now = erlang:system_time(second), case to_system_second(EndAt) of {ok, Sec} when Sec > Now -> - to_trace(Trace, Rec#?TRACE{end_at = Sec}); + to_trace(maps:remove(end_at, Trace), Rec#?TRACE{end_at = Sec}); {ok, _Sec} -> {error, "end_at time has already passed"}; {error, Reason} -> {error, Reason} end; -to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}. +to_trace(_, Rec) -> {ok, Rec}. validate_topic(TopicName) -> try emqx_topic:validate(filter, TopicName) of @@ -448,6 +446,11 @@ validate_topic(TopicName) -> error:Error -> {error, io_lib:format("topic: ~s invalid by ~p", [TopicName, Error])} end. +validate_ip_address(IP) -> + case inet:parse_address(binary_to_list(IP)) of + {ok, _} -> ok; + {error, Reason} -> {error, lists:flatten(io_lib:format("ip address: ~p", [Reason]))} + end. to_system_second(At) -> try diff --git a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl index e20fb6957..56b81424e 100644 --- a/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_trace_SUITE.erl @@ -106,26 +106,29 @@ t_create_size_max(_Config) -> ok. t_create_failed(_Config) -> - UnknownField = [{<<"unknown">>, 12}], + Name = {<<"name">>, <<"test">>}, + UnknownField = [Name, {<<"unknown">>, 12}], {error, Reason1} = emqx_trace:create(UnknownField), - ?assertEqual(<<"unknown field: {unknown,12}">>, iolist_to_binary(Reason1)), + ?assertEqual(<<"type=[topic,clientid,ip_address] required">>, iolist_to_binary(Reason1)), - InvalidTopic = [{<<"topic">>, "#/#//"}], + InvalidTopic = [Name, {<<"topic">>, "#/#//"}, {<<"type">>, <<"topic">>}], {error, Reason2} = emqx_trace:create(InvalidTopic), ?assertEqual(<<"topic: #/#// invalid by function_clause">>, iolist_to_binary(Reason2)), - InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}], + InvalidStart = [Name, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/sys/">>}, + {<<"start_at">>, <<"2021-12-3:12">>}], {error, Reason3} = emqx_trace:create(InvalidStart), ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, iolist_to_binary(Reason3)), - InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}], + InvalidEnd = [Name, {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/sys/">>}, + {<<"end_at">>, <<"2021-12-3:12">>}], {error, Reason4} = emqx_trace:create(InvalidEnd), ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, iolist_to_binary(Reason4)), - {error, Reason7} = emqx_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]), - ?assertEqual(<<"topic/clientid/ip_address filter required">>, iolist_to_binary(Reason7)), + {error, Reason7} = emqx_trace:create([Name, {<<"type">>, <<"clientid">>}]), + ?assertEqual(<<"required clientid field">>, iolist_to_binary(Reason7)), InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, {<<"type">>, <<"clientid">>}], @@ -135,12 +138,9 @@ t_create_failed(_Config) -> ?assertEqual({error, "type=[topic,clientid,ip_address] required"}, emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"clientid">>, <<"good">>}])), - ?assertEqual({error, "incorrect type: only support clientid/topic/ip_address"}, - emqx_trace:create([{<<"name">>, <<"test-name">>}, - {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])), - ?assertEqual({error, "ip address: einval"}, - emqx_trace:create([{<<"ip_address">>, <<"test-name">>}])), + emqx_trace:create([Name, {<<"type">>, <<"ip_address">>}, + {<<"ip_address">>, <<"test-name">>}])), ok. t_create_default(_Config) -> @@ -173,6 +173,20 @@ t_create_default(_Config) -> ?assertEqual(true, Start - erlang:system_time(second) < 5), ok. +t_create_with_extra_fields(_Config) -> + ok = emqx_trace:clear(), + Trace = [ + {<<"name">>, <<"test-name">>}, + {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/z">>}, + {<<"clientid">>, <<"dev001">>}, + {<<"ip_address">>, <<"127.0.0.1">>} + ], + ok = emqx_trace:create(Trace), + ?assertMatch([#emqx_trace{name = <<"test-name">>, filter = <<"/x/y/z">>, type = topic}], + emqx_trace:list()), + ok. + t_update_enable(_Config) -> Name = <<"test-name">>, Now = erlang:system_time(second),