diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index 405b4c244..fe68fef44 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -1,6 +1,6 @@ {application, emqx_management, [{description, "EMQ X Management API and CLI"}, - {vsn, "4.3.8"}, % strict semver, bump manually! + {vsn, "4.4.0"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_management_sup]}, {applications, [kernel,stdlib,minirest]}, diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 95f5121cd..97cb182e6 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -21,7 +21,9 @@ -include("emqx_mgmt.hrl"). --define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~s~n", [Cmd, Descr])). +-elvis([{elvis_style, invalid_dynamic_call, #{ ignore => [emqx_mgmt_cli]}}]). + +-define(PRINT_CMD(Cmd, Desc), io:format("~-48s# ~s~n", [Cmd, Desc])). -export([load/0]). @@ -74,11 +76,8 @@ mgmt(["insert", AppId, Name]) -> mgmt(["lookup", AppId]) -> case emqx_mgmt_auth:lookup_app(list_to_binary(AppId)) of - {AppId1, AppSecret, Name, Desc, Status, Expired} -> - emqx_ctl:print("app_id: ~s~nsecret: ~s~nname: ~s~ndesc: ~s~nstatus: ~s~nexpired: ~p~n", - [AppId1, AppSecret, Name, Desc, Status, Expired]); - undefined -> - emqx_ctl:print("Not Found.~n") + undefined -> emqx_ctl:print("Not Found.~n"); + App -> print_app_info(App) end; mgmt(["update", AppId, Status]) -> @@ -99,10 +98,7 @@ mgmt(["delete", AppId]) -> end; mgmt(["list"]) -> - lists:foreach(fun({AppId, AppSecret, Name, Desc, Status, Expired}) -> - emqx_ctl:print("app_id: ~s, secret: ~s, name: ~s, desc: ~s, status: ~s, expired: ~p~n", - [AppId, AppSecret, Name, Desc, Status, Expired]) - end, emqx_mgmt_auth:list_apps()); + lists:foreach(fun print_app_info/1, emqx_mgmt_auth:list_apps()); mgmt(_) -> emqx_ctl:usage([{"mgmt list", "List Applications"}, @@ -128,10 +124,12 @@ broker([]) -> [emqx_ctl:print("~-10s: ~s~n", [Fun, emqx_sys:Fun()]) || Fun <- Funs]; broker(["stats"]) -> - [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) || {Stat, Val} <- lists:sort(emqx_stats:getstats())]; + [emqx_ctl:print("~-30s: ~w~n", [Stat, Val]) || + {Stat, Val} <- lists:sort(emqx_stats:getstats())]; broker(["metrics"]) -> - [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) || {Metric, Val} <- lists:sort(emqx_metrics:all())]; + [emqx_ctl:print("~-30s: ~w~n", [Metric, Val]) || + {Metric, Val} <- lists:sort(emqx_metrics:all())]; broker(_) -> emqx_ctl:usage([{"broker", "Show broker version, uptime and description"}, @@ -256,10 +254,12 @@ subscriptions(["del", ClientId, Topic]) -> end; subscriptions(_) -> - emqx_ctl:usage([{"subscriptions list", "List all subscriptions"}, - {"subscriptions show ", "Show subscriptions of a client"}, - {"subscriptions add ", "Add a static subscription manually"}, - {"subscriptions del ", "Delete a static subscription manually"}]). + emqx_ctl:usage([{"subscriptions list", "List all subscriptions"}, + {"subscriptions show ", "Show subscriptions of a client"}, + {"subscriptions add ", + "Add a static subscription manually"}, + {"subscriptions del ", + "Delete a static subscription manually"}]). if_valid_qos(QoS, Fun) -> try list_to_integer(QoS) of @@ -328,14 +328,20 @@ vm(["memory"]) -> [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; vm(["process"]) -> - [emqx_ctl:print("process/~-16s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{limit, process_limit}, {count, process_count}]]; + [emqx_ctl:print("process/~-16s: ~w~n", + [Name, erlang:system_info(Key)]) || + {Name, Key} <- [{limit, process_limit}, {count, process_count}]]; vm(["io"]) -> IoInfo = lists:usort(lists:flatten(erlang:system_info(check_io))), - [emqx_ctl:print("io/~-21s: ~w~n", [Key, proplists:get_value(Key, IoInfo)]) || Key <- [max_fds, active_fds]]; + [emqx_ctl:print("io/~-21s: ~w~n", + [Key, proplists:get_value(Key, IoInfo)]) || + Key <- [max_fds, active_fds]]; vm(["ports"]) -> - [emqx_ctl:print("ports/~-16s: ~w~n", [Name, erlang:system_info(Key)]) || {Name, Key} <- [{count, port_count}, {limit, port_limit}]]; + [emqx_ctl:print("ports/~-16s: ~w~n", + [Name, erlang:system_info(Key)]) || + {Name, Key} <- [{count, port_count}, {limit, port_limit}]]; vm(_) -> emqx_ctl:usage([{"vm all", "Show info of Erlang VM"}, @@ -372,8 +378,9 @@ log(["primary-level", Level]) -> emqx_ctl:print("~s~n", [emqx_logger:get_primary_log_level()]); log(["handlers", "list"]) -> - _ = [emqx_ctl:print("LogHandler(id=~s, level=~s, destination=~s, status=~s)~n", [Id, Level, Dst, Status]) - || #{id := Id, level := Level, dst := Dst, status := Status} <- emqx_logger:get_log_handlers()], + _ = [emqx_ctl:print("LogHandler(id=~s, level=~s, destination=~s, status=~s)~n", + [Id, Level, Dst, Status]) || #{id := Id, level := Level, dst := Dst, status := Status} + <- emqx_logger:get_log_handlers()], ok; log(["handlers", "start", HandlerId]) -> @@ -406,15 +413,18 @@ log(_) -> {"log handlers list", "Show log handlers"}, {"log handlers start ", "Start a log handler"}, {"log handlers stop ", "Stop a log handler"}, - {"log handlers set-level ", "Set log level of a log handler"}]). + {"log handlers set-level ", + "Set log level of a log handler"}]). %%-------------------------------------------------------------------- %% @doc Trace Command trace(["list"]) -> - lists:foreach(fun({{Who, Name}, {Level, LogFile}}) -> - emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Who, Name, Level, LogFile]) - end, emqx_tracer:lookup_traces()); + lists:foreach(fun(Trace) -> + #{type := Type, level := Level, dst := Dst} = Trace, + Who = maps:get(Type, Trace), + emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Type, Who, Level, Dst]) + end, emqx_tracer:lookup_traces()); trace(["stop", "client", ClientId]) -> trace_off(clientid, ClientId); @@ -441,8 +451,9 @@ trace(_) -> {"trace start topic [] ", "Traces for a topic"}, {"trace stop topic ", "Stop tracing for a topic"}]). +-dialyzer({nowarn_function, [trace_on/4, trace_off/2]}). trace_on(Who, Name, Level, LogFile) -> - case emqx_tracer:start_trace({Who, iolist_to_binary(Name)}, Level, LogFile) of + case emqx_tracer:start_trace(Who, Name, Level, LogFile) of ok -> emqx_ctl:print("trace ~s ~s successfully~n", [Who, Name]); {error, Error} -> @@ -450,7 +461,7 @@ trace_on(Who, Name, Level, LogFile) -> end. trace_off(Who, Name) -> - case emqx_tracer:stop_trace({Who, iolist_to_binary(Name)}) of + case emqx_tracer:stop_trace(Who, Name) of ok -> emqx_ctl:print("stop tracing ~s ~s successfully~n", [Who, Name]); {error, Error} -> @@ -472,18 +483,20 @@ listeners([]) -> lists:foreach(fun indent_print/1, Info) end, esockd:listeners()), lists:foreach(fun({Protocol, Opts}) -> - Port = proplists:get_value(port, Opts), - Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}}, - {acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, - {max_conns, proplists:get_value(max_connections, Opts)}, - {current_conn, proplists:get_value(all_connections, Opts)}, - {shutdown_count, []}], - emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]), - lists:foreach(fun indent_print/1, Info) - end, ranch:info()); + Port = proplists:get_value(port, Opts), + Acceptors = maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0), + Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}}, + {acceptors, Acceptors}, + {max_conns, proplists:get_value(max_connections, Opts)}, + {current_conn, proplists:get_value(all_connections, Opts)}, + {shutdown_count, []}], + emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]), + lists:foreach(fun indent_print/1, Info) + end, ranch:info()); listeners(["stop", Name = "http" ++ _N | _MaybePort]) -> - %% _MaybePort is to be backward compatible, to stop http listener, there is no need for the port number + %% _MaybePort is to be backward compatible, to stop http listener, + %% there is no need for the port number case minirest:stop_http(list_to_atom(Name)) of ok -> emqx_ctl:print("Stop ~s listener successfully.~n", [Name]); @@ -564,7 +577,8 @@ data(["import", Filename, "--env", Env]) -> {error, unsupported_version} -> emqx_ctl:print("The emqx data import failed: Unsupported version.~n"); {error, Reason} -> - emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename]) + emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", + [Reason, Filename]) end; data(_) -> @@ -657,19 +671,23 @@ print({client, {ClientId, ChanPid}}) -> maps:with([created_at], Session)]), InfoKeys = [clientid, username, peername, clean_start, keepalive, expiry_interval, - subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped, - connected, created_at, connected_at] ++ case maps:is_key(disconnected_at, Info) of - true -> [disconnected_at]; - false -> [] - end, + subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, + send_msg, mqueue_len, mqueue_dropped, + connected, created_at, connected_at] ++ + case maps:is_key(disconnected_at, Info) of + true -> [disconnected_at]; + false -> [] + end, emqx_ctl:print("Client(~s, username=~s, peername=~s, " - "clean_start=~s, keepalive=~w, session_expiry_interval=~w, " - "subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, " - "connected=~s, created_at=~w, connected_at=~w" ++ case maps:is_key(disconnected_at, Info) of - true -> ", disconnected_at=~w)~n"; - false -> ")~n" - end, - [format(K, maps:get(K, Info)) || K <- InfoKeys]); + "clean_start=~s, keepalive=~w, session_expiry_interval=~w, " + "subscriptions=~w, inflight=~w, awaiting_rel=~w, " + "delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, " + "connected=~s, created_at=~w, connected_at=~w" ++ + case maps:is_key(disconnected_at, Info) of + true -> ", disconnected_at=~w)~n"; + false -> ")~n" + end, + [format(K, maps:get(K, Info)) || K <- InfoKeys]); print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~s -> ~s~n", [Topic, Node]); @@ -721,3 +739,7 @@ restart_http_listener(Scheme, AppName) -> http_mod_name(emqx_management) -> emqx_mgmt_http; http_mod_name(Name) -> Name. + +print_app_info({AppId, AppSecret, Name, Desc, Status, Expired}) -> + emqx_ctl:print("app_id: ~s, secret: ~s, name: ~s, desc: ~s, status: ~s, expired: ~p~n", + [AppId, AppSecret, Name, Desc, Status, Expired]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index a96ee7a62..698cbf605 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -17,6 +17,9 @@ -module(emqx_rule_funcs). -include("rule_engine.hrl"). +-elvis([{elvis_style, god_modules, disable}]). +-elvis([{elvis_style, function_naming_convention, disable}]). +-elvis([{elvis_style, macro_names, disable}]). %% IoT Funcs -export([ msgid/0 @@ -438,7 +441,8 @@ subbits(Bits, Len) when is_integer(Len), is_bitstring(Bits) -> subbits(Bits, Start, Len) when is_integer(Start), is_integer(Len), is_bitstring(Bits) -> get_subbits(Bits, Start, Len, <<"integer">>, <<"unsigned">>, <<"big">>). -subbits(Bits, Start, Len, Type, Signedness, Endianness) when is_integer(Start), is_integer(Len), is_bitstring(Bits) -> +subbits(Bits, Start, Len, Type, Signedness, Endianness) + when is_integer(Start), is_integer(Len), is_bitstring(Bits) -> get_subbits(Bits, Start, Len, Type, Signedness, Endianness). get_subbits(Bits, Start, Len, Type, Signedness, Endianness) -> @@ -520,7 +524,7 @@ map(Data) -> emqx_rule_utils:map(Data). bin2hexstr(Bin) when is_binary(Bin) -> - emqx_misc:bin2hexstr_A_F(Bin). + emqx_misc:bin2hexstr_a_f_upper(Bin). hexstr2bin(Str) when is_binary(Str) -> emqx_misc:hexstr2bin(Str). @@ -608,7 +612,8 @@ tokens(S, Separators) -> [list_to_binary(R) || R <- string:lexemes(binary_to_list(S), binary_to_list(Separators))]. tokens(S, Separators, <<"nocrlf">>) -> - [list_to_binary(R) || R <- string:lexemes(binary_to_list(S), binary_to_list(Separators) ++ [$\r,$\n,[$\r,$\n]])]. + [list_to_binary(R) || R <- string:lexemes(binary_to_list(S), + binary_to_list(Separators) ++ [$\r,$\n,[$\r,$\n]])]. concat(S1, S2) when is_binary(S1), is_binary(S2) -> unicode:characters_to_binary([S1, S2], unicode). @@ -646,7 +651,8 @@ replace(SrcStr, P, RepStr) when is_binary(SrcStr), is_binary(P), is_binary(RepSt replace(SrcStr, P, RepStr, <<"all">>) when is_binary(SrcStr), is_binary(P), is_binary(RepStr) -> iolist_to_binary(string:replace(SrcStr, P, RepStr, all)); -replace(SrcStr, P, RepStr, <<"trailing">>) when is_binary(SrcStr), is_binary(P), is_binary(RepStr) -> +replace(SrcStr, P, RepStr, <<"trailing">>) + when is_binary(SrcStr), is_binary(P), is_binary(RepStr) -> iolist_to_binary(string:replace(SrcStr, P, RepStr, trailing)); replace(SrcStr, P, RepStr, <<"leading">>) when is_binary(SrcStr), is_binary(P), is_binary(RepStr) -> @@ -662,7 +668,7 @@ regex_replace(SrcStr, RE, RepStr) -> re:replace(SrcStr, RE, RepStr, [global, {return,binary}]). ascii(Char) when is_binary(Char) -> - [FirstC| _] = binary_to_list(Char), + [FirstC | _] = binary_to_list(Char), FirstC. find(S, P) when is_binary(S), is_binary(P) -> @@ -782,7 +788,7 @@ sha256(S) when is_binary(S) -> hash(sha256, S). hash(Type, Data) -> - emqx_misc:bin2hexstr_a_f(crypto:hash(Type, Data)). + emqx_misc:bin2hexstr_a_f_lower(crypto:hash(Type, Data)). %%------------------------------------------------------------------------------ %% Data encode and decode Funcs @@ -875,23 +881,23 @@ time_unit(<<"nanosecond">>) -> nanosecond. %% the function handling to the worker module. %% @end -ifdef(EMQX_ENTERPRISE). -'$handle_undefined_function'(schema_decode, [SchemaId, Data|MoreArgs]) -> +'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) -> emqx_schema_parser:decode(SchemaId, Data, MoreArgs); '$handle_undefined_function'(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); -'$handle_undefined_function'(schema_encode, [SchemaId, Term|MoreArgs]) -> +'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) -> emqx_schema_parser:encode(SchemaId, Term, MoreArgs); '$handle_undefined_function'(schema_encode, Args) -> error({args_count_error, {schema_encode, Args}}); -'$handle_undefined_function'(sprintf, [Format|Args]) -> +'$handle_undefined_function'(sprintf, [Format | Args]) -> erlang:apply(fun sprintf_s/2, [Format, Args]); '$handle_undefined_function'(Fun, Args) -> error({sql_function_not_supported, function_literal(Fun, Args)}). -else. -'$handle_undefined_function'(sprintf, [Format|Args]) -> +'$handle_undefined_function'(sprintf, [Format | Args]) -> erlang:apply(fun sprintf_s/2, [Format, Args]); '$handle_undefined_function'(Fun, Args) -> diff --git a/deploy/charts/emqx/values.yaml b/deploy/charts/emqx/values.yaml index c30e237b7..62cf779ea 100644 --- a/deploy/charts/emqx/values.yaml +++ b/deploy/charts/emqx/values.yaml @@ -94,6 +94,8 @@ emqxLoadedPlugins: > emqxLoadedModules: > {emqx_mod_acl_internal, true}. {emqx_mod_presence, true}. + {emqx_mod_trace, false}. + {emqx_mod_st_statistics, false}. {emqx_mod_delayed, false}. {emqx_mod_rewrite, false}. {emqx_mod_subscription, false}. diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 5dd9a317c..71c2f25f3 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -542,4 +542,22 @@ -define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). -define(IS_SHARE(Topic), case Topic of <> -> true; _ -> false end). +-define(TYPE_NAMES, { + 'CONNECT' + , 'CONNACK' + , 'PUBLISH' + , 'PUBACK' + , 'PUBREC' + , 'PUBREL' + , 'PUBCOMP' + , 'SUBSCRIBE' + , 'SUBACK' + , 'UNSUBSCRIBE' + , 'UNSUBACK' + , 'PINGREQ' + , 'PINGRESP' + , 'DISCONNECT' + , 'AUTH' + }). + -endif. diff --git a/lib-ce/emqx_modules/src/emqx_mod_sup.erl b/lib-ce/emqx_modules/src/emqx_mod_sup.erl index 60907f580..109564f65 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_sup.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_sup.erl @@ -77,5 +77,5 @@ init([]) -> assert_started({ok, _Pid}) -> ok; assert_started({ok, _Pid, _Info}) -> ok; -assert_started({error, {already_tarted, _Pid}}) -> ok; +assert_started({error, {already_started, _Pid}}) -> ok; assert_started({error, Reason}) -> erlang:error(Reason). diff --git a/lib-ce/emqx_modules/src/emqx_mod_trace.erl b/lib-ce/emqx_modules/src/emqx_mod_trace.erl new file mode 100644 index 000000000..737ee7e97 --- /dev/null +++ b/lib-ce/emqx_modules/src/emqx_mod_trace.erl @@ -0,0 +1,508 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_trace). + +-behaviour(gen_server). +-behaviour(emqx_gen_mod). + + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("kernel/include/file.hrl"). + +-logger_header("[Trace]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ load/1 + , unload/1 + , description/0 + ]). + +-export([ start_link/0 + , list/0 + , list/1 + , get_trace_filename/1 + , create/1 + , delete/1 + , clear/0 + , update/2 + ]). + +-export([ format/1 + , zip_dir/0 + , trace_dir/0 + , trace_file/1 + , delete_files_after_send/2 + ]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(TRACE, ?MODULE). +-define(PACKETS, tuple_to_list(?TYPE_NAMES)). +-define(MAX_SIZE, 30). + +-ifdef(TEST). +-export([log_file/2]). +-endif. + +-record(?TRACE, + { name :: binary() | undefined | '_' + , type :: clientid | topic | undefined | '_' + , topic :: emqx_types:topic() | undefined | '_' + , clientid :: emqx_types:clientid() | undefined | '_' + , packets = [] :: list() | '_' + , enable = true :: boolean() | '_' + , start_at :: integer() | undefined | binary() | '_' + , end_at :: integer() | undefined | binary() | '_' + , log_size = #{} :: map() | '_' + }). + +mnesia(boot) -> + ok = ekka_mnesia:create_table(?TRACE, [ + {type, set}, + {disc_copies, [node()]}, + {record_name, ?TRACE}, + {attributes, record_info(fields, ?TRACE)}]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TRACE, disc_copies). + +description() -> + "EMQ X Trace Module". + +-spec load(any()) -> ok. +load(_Env) -> + emqx_mod_sup:start_child(?MODULE, worker). + +-spec unload(any()) -> ok. +unload(_Env) -> + _ = emqx_mod_sup:stop_child(?MODULE), + stop_all_trace_handler(). + +-spec(start_link() -> emqx_types:startlink_ret()). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec list() -> [tuple()]. +list() -> + ets:match_object(?TRACE, #?TRACE{_ = '_'}). + +-spec list(boolean()) -> [tuple()]. +list(Enable) -> + ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}). + +-spec create([{Key :: binary(), Value :: binary()}]) -> + ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}. +create(Trace) -> + case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of + true -> + case to_trace(Trace) of + {ok, TraceRec} -> create_new_trace(TraceRec); + {error, Reason} -> {error, Reason} + end; + false -> + {error, """The number of traces created has reached the maximum, +please delete the useless ones first"""} + end. + +-spec delete(Name :: binary()) -> ok | {error, not_found}. +delete(Name) -> + Tran = fun() -> + case mnesia:read(?TRACE, Name) of + [_] -> mnesia:delete(?TRACE, Name, write); + [] -> mnesia:abort(not_found) + end + end, + transaction(Tran). + +-spec clear() -> ok | {error, Reason :: term()}. +clear() -> + case mnesia:clear_table(?TRACE) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end. + +-spec update(Name :: binary(), Enable :: boolean()) -> + ok | {error, not_found | finished}. +update(Name, Enable) -> + Tran = fun() -> + case mnesia:read(?TRACE, Name) of + [] -> mnesia:abort(not_found); + [#?TRACE{enable = Enable}] -> ok; + [Rec] -> + case erlang:system_time(second) >= Rec#?TRACE.end_at of + false -> mnesia:write(?TRACE, Rec#?TRACE{enable = Enable}, write); + true -> mnesia:abort(finished) + end + end + end, + transaction(Tran). + +-spec get_trace_filename(Name :: binary()) -> + {ok, FileName :: string()} | {error, not_found}. +get_trace_filename(Name) -> + Tran = fun() -> + case mnesia:read(?TRACE, Name, read) of + [] -> mnesia:abort(not_found); + [#?TRACE{start_at = Start}] -> {ok, filename(Name, Start)} + end end, + transaction(Tran). + +-spec trace_file(File :: list()) -> + {ok, Node :: list(), Binary :: binary()} | + {error, Node :: list(), Reason :: term()}. +trace_file(File) -> + FileName = filename:join(trace_dir(), File), + Node = atom_to_list(node()), + case file:read_file(FileName) of + {ok, Bin} -> {ok, Node, Bin}; + {error, Reason} -> {error, Node, Reason} + end. + +delete_files_after_send(TraceLog, Zips) -> + gen_server:cast(?MODULE, {delete_tag, self(), [TraceLog | Zips]}). + +-spec format(list(#?TRACE{})) -> list(map()). +format(Traces) -> + Fields = record_info(fields, ?TRACE), + lists:map(fun(Trace0 = #?TRACE{start_at = StartAt, end_at = EndAt}) -> + Trace = Trace0#?TRACE{ + start_at = list_to_binary(calendar:system_time_to_rfc3339(StartAt)), + end_at = list_to_binary(calendar:system_time_to_rfc3339(EndAt)) + }, + [_ | Values] = tuple_to_list(Trace), + maps:from_list(lists:zip(Fields, Values)) + end, Traces). + +init([]) -> + erlang:process_flag(trap_exit, true), + OriginLogLevel = emqx_logger:get_primary_log_level(), + ok = filelib:ensure_dir(trace_dir()), + ok = filelib:ensure_dir(zip_dir()), + {ok, _} = mnesia:subscribe({table, ?TRACE, simple}), + Traces = get_enable_trace(), + ok = update_log_primary_level(Traces, OriginLogLevel), + TRef = update_trace(Traces), + {ok, #{timer => TRef, monitors => #{}, primary_log_level => OriginLogLevel}}. + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ok, State}. + +handle_cast({delete_tag, Pid, Files}, State = #{monitors := Monitors}) -> + erlang:monitor(process, Pid), + {noreply, State#{monitors => Monitors#{Pid => Files}}}; +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #{monitors := Monitors}) -> + case maps:take(Pid, Monitors) of + error -> {noreply, State}; + {Files, NewMonitors} -> + lists:foreach(fun(F) -> file:delete(F) end, Files), + {noreply, State#{monitors => NewMonitors}} + end; +handle_info({timeout, TRef, update_trace}, + #{timer := TRef, primary_log_level := OriginLogLevel} = State) -> + Traces = get_enable_trace(), + ok = update_log_primary_level(Traces, OriginLogLevel), + NextTRef = update_trace(Traces), + {noreply, State#{timer => NextTRef}}; + +handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> + emqx_misc:cancel_timer(TRef), + handle_info({timeout, TRef, update_trace}, State); + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #{timer := TRef, primary_log_level := OriginLogLevel}) -> + ok = set_log_primary_level(OriginLogLevel), + _ = mnesia:unsubscribe({table, ?TRACE, simple}), + emqx_misc:cancel_timer(TRef), + stop_all_trace_handler(), + _ = file:del_dir_r(zip_dir()), + ok. + +code_change(_, State, _Extra) -> + {ok, State}. + +create_new_trace(Trace) -> + Tran = fun() -> + case mnesia:read(?TRACE, Trace#?TRACE.name) of + [] -> + #?TRACE{start_at = StartAt, topic = Topic, + clientid = ClientId, packets = Packets} = Trace, + Match = #?TRACE{_ = '_', start_at = StartAt, topic = Topic, + clientid = ClientId, packets = Packets}, + case mnesia:match_object(?TRACE, Match, read) of + [] -> mnesia:write(?TRACE, Trace, write); + [#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name}) + end; + [#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name}) + end + end, + transaction(Tran). + +update_trace(Traces) -> + Now = erlang:system_time(second), + {_Waiting, Running, Finished} = classify_by_time(Traces, Now), + disable_finished(Finished), + Started = already_running(), + {NeedRunning, AllStarted} = start_trace(Running, Started), + NeedStop = AllStarted -- NeedRunning, + ok = stop_trace(NeedStop, Started), + clean_stale_trace_files(), + NextTime = find_closest_time(Traces, Now), + emqx_misc:start_timer(NextTime, update_trace). + +stop_all_trace_handler() -> + lists:foreach(fun(#{type := Type, name := Name} = Trace) -> + _ = emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name) + end + , already_running()). + +already_running() -> + emqx_tracer:lookup_traces(). + +get_enable_trace() -> + {atomic, Traces} = + mnesia:transaction(fun() -> + mnesia:match_object(?TRACE, #?TRACE{enable = true, _ = '_'}, read) + end), + Traces. + +find_closest_time(Traces, Now) -> + Sec = + lists:foldl( + fun(#?TRACE{start_at = Start, end_at = End}, Closest) + when Start >= Now andalso Now < End -> %% running + min(End - Now, Closest); + (#?TRACE{start_at = Start}, Closest) when Start < Now -> %% waiting + min(Now - Start, Closest); + (_, Closest) -> Closest %% finished + end, 60 * 15, Traces), + timer:seconds(Sec). + +disable_finished([]) -> ok; +disable_finished(Traces) -> + NameWithLogSize = + lists:map(fun(#?TRACE{name = Name, start_at = StartAt}) -> + FileSize = filelib:file_size(log_file(Name, StartAt)), + {Name, FileSize} end, Traces), + transaction(fun() -> + lists:map(fun({Name, LogSize}) -> + case mnesia:read(?TRACE, Name, write) of + [] -> ok; + [Trace = #?TRACE{log_size = Logs}] -> + mnesia:write(?TRACE, Trace#?TRACE{enable = false, + log_size = Logs#{node() => LogSize}}, write) + end end, NameWithLogSize) + end). + +start_trace(Traces, Started0) -> + Started = lists:map(fun(#{name := Name}) -> Name end, Started0), + lists:foldl(fun(#?TRACE{name = Name} = Trace, {Running, StartedAcc}) -> + case lists:member(Name, StartedAcc) of + true -> {[Name | Running], StartedAcc}; + false -> + case start_trace(Trace) of + ok -> {[Name | Running], [Name | StartedAcc]}; + Error -> + ?LOG(error, "(~p)start trace failed by:~p", [Name, Error]), + {[Name | Running], StartedAcc} + end + end + end, {[], Started}, Traces). + +start_trace(Trace) -> + #?TRACE{name = Name + , type = Type + , clientid = ClientId + , topic = Topic + , packets = Packets + , start_at = Start + } = Trace, + Who0 = #{name => Name, labels => Packets}, + Who = + case Type of + topic -> Who0#{type => topic, topic => Topic}; + clientid -> Who0#{type => clientid, clientid => ClientId} + end, + case emqx_tracer:start_trace(Who, debug, log_file(Name, Start)) of + ok -> ok; + {error, {already_exist, _}} -> ok; + {error, Reason} -> {error, Reason} + end. + +stop_trace(Finished, Started) -> + lists:foreach(fun(#{name := Name, type := Type} = Trace) -> + case lists:member(Name, Finished) of + true -> emqx_tracer:stop_trace(Type, maps:get(Type, Trace), Name); + false -> ok + end + end, Started). + +clean_stale_trace_files() -> + TraceDir = trace_dir(), + case file:list_dir(TraceDir) of + {ok, AllFiles} when AllFiles =/= ["zip"] -> + FileFun = fun(#?TRACE{name = Name, start_at = StartAt}) -> filename(Name, StartAt) end, + KeepFiles = lists:map(FileFun, list()), + case AllFiles -- ["zip" | KeepFiles] of + [] -> ok; + DeleteFiles -> + DelFun = fun(F) -> file:delete(filename:join(TraceDir, F)) end, + lists:foreach(DelFun, DeleteFiles) + end; + _ -> ok + end. + +classify_by_time(Traces, Now) -> + classify_by_time(Traces, Now, [], [], []). + +classify_by_time([], _Now, Wait, Run, Finish) -> {Wait, Run, Finish}; +classify_by_time([Trace = #?TRACE{start_at = Start} | Traces], + Now, Wait, Run, Finish) when Start > Now -> + classify_by_time(Traces, Now, [Trace | Wait], Run, Finish); +classify_by_time([Trace = #?TRACE{end_at = End} | Traces], + Now, Wait, Run, Finish) when End =< Now -> + classify_by_time(Traces, Now, Wait, Run, [Trace | Finish]); +classify_by_time([Trace | Traces], Now, Wait, Run, Finish) -> + classify_by_time(Traces, Now, Wait, [Trace | Run], Finish). + +to_trace(TraceList) -> + case to_trace(TraceList, #?TRACE{}) of + {error, Reason} -> {error, Reason}; + {ok, #?TRACE{name = undefined}} -> + {error, "name required"}; + {ok, #?TRACE{type = undefined}} -> + {error, "type required"}; + {ok, #?TRACE{topic = undefined, clientid = undefined}} -> + {error, "topic/clientid cannot be both empty"}; + {ok, Trace} -> + case fill_default(Trace) of + #?TRACE{start_at = Start, end_at = End} when End =< Start -> + {error, "failed by start_at >= end_at"}; + Trace1 -> {ok, Trace1} + end + end. + +fill_default(Trace = #?TRACE{start_at = undefined}) -> + fill_default(Trace#?TRACE{start_at = erlang:system_time(second)}); +fill_default(Trace = #?TRACE{end_at = undefined, start_at = StartAt}) -> + fill_default(Trace#?TRACE{end_at = StartAt + 10 * 60}); +fill_default(Trace) -> Trace. + +to_trace([], Rec) -> {ok, Rec}; +to_trace([{<<"name">>, Name} | Trace], Rec) -> + case binary:match(Name, [<<"/">>], []) of + nomatch -> to_trace(Trace, Rec#?TRACE{name = Name}); + _ -> {error, "name cannot contain /"} + end; +to_trace([{<<"type">>, Type} | Trace], Rec) -> + case lists:member(Type, [<<"clientid">>, <<"topic">>]) of + true -> to_trace(Trace, Rec#?TRACE{type = binary_to_existing_atom(Type)}); + false -> {error, "incorrect type: only support clientid/topic"} + end; +to_trace([{<<"topic">>, Topic} | Trace], Rec) -> + case validate_topic(Topic) of + ok -> to_trace(Trace, Rec#?TRACE{topic = Topic}); + {error, Reason} -> {error, Reason} + end; +to_trace([{<<"start_at">>, StartAt} | Trace], Rec) -> + case to_system_second(StartAt) of + {ok, Sec} -> to_trace(Trace, Rec#?TRACE{start_at = Sec}); + {error, Reason} -> {error, Reason} + end; +to_trace([{<<"end_at">>, EndAt} | Trace], Rec) -> + Now = erlang:system_time(second), + case to_system_second(EndAt) of + {ok, Sec} when Sec > Now -> + to_trace(Trace, Rec#?TRACE{end_at = Sec}); + {ok, _Sec} -> + {error, "end_at time has already passed"}; + {error, Reason} -> + {error, Reason} + end; +to_trace([{<<"clientid">>, ClientId} | Trace], Rec) -> + to_trace(Trace, Rec#?TRACE{clientid = ClientId}); +to_trace([{<<"packets">>, PacketList} | Trace], Rec) -> + case to_packets(PacketList) of + {ok, Packets} -> to_trace(Trace, Rec#?TRACE{packets = Packets}); + {error, Reason} -> {error, io_lib:format("unsupport packets: ~p", [Reason])} + end; +to_trace([Unknown | _Trace], _Rec) -> {error, io_lib:format("unknown field: ~p", [Unknown])}. + +validate_topic(TopicName) -> + try emqx_topic:validate(name, TopicName) of + true -> ok + catch + error:Error -> + {error, io_lib:format("~s invalid by ~p", [TopicName, Error])} + end. + +to_system_second(At) -> + try + Sec = calendar:rfc3339_to_system_time(binary_to_list(At), [{unit, second}]), + {ok, Sec} + catch error: {badmatch, _} -> + {error, ["The rfc3339 specification not satisfied: ", At]} + end. + +to_packets(Packets) when is_list(Packets) -> + AtomTypes = lists:map(fun(Type) -> binary_to_existing_atom(Type) end, Packets), + case lists:filter(fun(T) -> not lists:member(T, ?PACKETS) end, AtomTypes) of + [] -> {ok, AtomTypes}; + InvalidE -> {error, InvalidE} + end; +to_packets(Packets) -> {error, Packets}. + +zip_dir() -> + trace_dir() ++ "zip/". + +trace_dir() -> + filename:join(emqx:get_env(data_dir), "trace") ++ "/". + +log_file(Name, Start) -> + filename:join(trace_dir(), filename(Name, Start)). + +filename(Name, Start) -> + [Time, _] = string:split(calendar:system_time_to_rfc3339(Start), "T", leading), + lists:flatten(["trace_", binary_to_list(Name), "_", Time, ".log"]). + +transaction(Tran) -> + case mnesia:transaction(Tran) of + {atomic, Res} -> Res; + {aborted, Reason} -> {error, Reason} + end. + +update_log_primary_level([], OriginLevel) -> set_log_primary_level(OriginLevel); +update_log_primary_level(_, _) -> set_log_primary_level(debug). + +set_log_primary_level(NewLevel) -> + case NewLevel =/= emqx_logger:get_primary_log_level() of + true -> emqx_logger:set_primary_log_level(NewLevel); + false -> ok + end. diff --git a/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl new file mode 100644 index 000000000..6d2813b96 --- /dev/null +++ b/lib-ce/emqx_modules/src/emqx_mod_trace_api.erl @@ -0,0 +1,191 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_trace_api). +-include_lib("emqx/include/logger.hrl"). + +%% API +-export([ list_trace/2 + , create_trace/2 + , update_trace/2 + , delete_trace/2 + , clear_traces/2 + , download_zip_log/2 + , stream_log_file/2 +]). +-export([read_trace_file/3]). + +-define(TO_BIN(_B_), iolist_to_binary(_B_)). +-define(RETURN_NOT_FOUND(N), return({error, 'NOT_FOUND', ?TO_BIN([N, "NOT FOUND"])})). + +-import(minirest, [return/1]). + +-rest_api(#{name => list_trace, + method => 'GET', + path => "/trace/", + func => list_trace, + descr => "list all traces"}). + +-rest_api(#{name => create_trace, + method => 'POST', + path => "/trace/", + func => create_trace, + descr => "create trace"}). + +-rest_api(#{name => delete_trace, + method => 'DELETE', + path => "/trace/:bin:name", + func => delete_trace, + descr => "delete trace"}). + +-rest_api(#{name => clear_trace, + method => 'DELETE', + path => "/trace/", + func => clear_traces, + descr => "clear all traces"}). + +-rest_api(#{name => update_trace, + method => 'PUT', + path => "/trace/:bin:name/:atom:operation", + func => update_trace, + descr => "diable/enable trace"}). + +-rest_api(#{name => download_zip_log, + method => 'GET', + path => "/trace/:bin:name/download", + func => download_zip_log, + descr => "download trace's log"}). + +-rest_api(#{name => stream_log_file, + method => 'GET', + path => "/trace/:bin:name/log", + func => stream_log_file, + descr => "download trace's log"}). + +list_trace(_, Params) -> + List = + case Params of + [{<<"enable">>, Enable}] -> emqx_mod_trace:list(Enable); + _ -> emqx_mod_trace:list() + end, + return({ok, emqx_mod_trace:format(List)}). + +create_trace(_, Param) -> + case emqx_mod_trace:create(Param) of + ok -> return(ok); + {error, {already_existed, Name}} -> + return({error, 'ALREADY_EXISTED', ?TO_BIN([Name, "Already Exists"])}); + {error, {duplicate_condition, Name}} -> + return({error, 'DUPLICATE_CONDITION', ?TO_BIN([Name, "Duplication Condition"])}); + {error, Reason} -> + return({error, 'INCORRECT_PARAMS', ?TO_BIN(Reason)}) + end. + +delete_trace(#{name := Name}, _Param) -> + case emqx_mod_trace:delete(Name) of + ok -> return(ok); + {error, not_found} -> ?RETURN_NOT_FOUND(Name) + end. + +clear_traces(_, _) -> + return(emqx_mod_trace:clear()). + +update_trace(#{name := Name, operation := Operation}, _Param) -> + Enable = case Operation of disable -> false; enable -> true end, + case emqx_mod_trace:update(Name, Enable) of + ok -> return({ok, #{enable => Enable, name => Name}}); + {error, not_found} -> ?RETURN_NOT_FOUND(Name) + end. + +%% if HTTP request headers include accept-encoding: gzip and file size > 300 bytes. +%% cowboy_compress_h will auto encode gzip format. +download_zip_log(#{name := Name}, _Param) -> + case emqx_mod_trace:get_trace_filename(Name) of + {ok, TraceLog} -> + TraceFiles = collect_trace_file(TraceLog), + ZipDir = emqx_mod_trace:zip_dir(), + Zips = group_trace_file(ZipDir, TraceLog, TraceFiles), + ZipFileName = ZipDir ++ TraceLog, + {ok, ZipFile} = zip:zip(ZipFileName, Zips), + emqx_mod_trace:delete_files_after_send(ZipFileName, Zips), + {ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}}; + {error, Reason} -> + return({error, Reason}) + end. + +group_trace_file(ZipDir, TraceLog, TraceFiles) -> + lists:foldl(fun(Res, Acc) -> + case Res of + {ok, Node, Bin} -> + ZipName = ZipDir ++ Node ++ "-" ++ TraceLog, + ok = file:write_file(ZipName, Bin), + [ZipName | Acc]; + {error, Node, Reason} -> + ?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]), + Acc + end + end, [], TraceFiles). + +collect_trace_file(TraceLog) -> + Nodes = ekka_mnesia:running_nodes(), + {Files, BadNodes} = rpc:multicall(Nodes, emqx_mod_trace, trace_file, [TraceLog], 60000), + BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]), + Files. + +%% _page as position and _limit as bytes for front-end reusing components +stream_log_file(#{name := Name}, Params) -> + Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())), + Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>), + Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>), + Node = binary_to_existing_atom(Node0), + Position = binary_to_integer(Position0), + Bytes = binary_to_integer(Bytes0), + case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of + {ok, Bin} -> + Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes}, + return({ok, #{meta => Meta, items => Bin}}); + eof -> + Meta = #{<<"page">> => Position, <<"limit">> => Bytes}, + return({ok, #{meta => Meta, items => <<"">>}}); + {error, Reason} -> + logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]), + return({error, Reason}) + end. + +%% this is an rpc call for stream_log_file/2 +read_trace_file(Name, Position, Limit) -> + TraceDir = emqx_mod_trace:trace_dir(), + {ok, AllFiles} = file:list_dir(TraceDir), + TracePrefix = "trace_" ++ binary_to_list(Name) ++ "_", + Filter = fun(FileName) -> nomatch =/= string:prefix(FileName, TracePrefix) end, + case lists:filter(Filter, AllFiles) of + [TraceFile] -> + TracePath = filename:join([TraceDir, TraceFile]), + read_file(TracePath, Position, Limit); + [] -> {error, not_found} + end. + +read_file(Path, Offset, Bytes) -> + {ok, IoDevice} = file:open(Path, [read, raw, binary]), + try + _ = case Offset of + 0 -> ok; + _ -> file:position(IoDevice, {bof, Offset}) + end, + file:read(IoDevice, Bytes) + after + file:close(IoDevice) + end. diff --git a/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl index 997eff1c2..e6a9f6c16 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_rewrite_SUITE.erl @@ -62,7 +62,7 @@ t_mod_rewrite(_Config) -> timer:sleep(100), ?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)), %% Pub Rules - {ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), + {ok, _Props1, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), RecvTopics2 = [begin ok = emqtt:publish(C, Topic, <<"payload">>), {ok, #{topic := RecvTopic}} = receive_publish(100), diff --git a/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl index 59d0ffde2..7c666ea9a 100644 --- a/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_mod_sup_SUITE.erl @@ -41,9 +41,8 @@ t_start_child(_) -> modules => [Mod]}, ok = emqx_mod_sup:start_child(Mod, worker), - ?assertError({already_started, _}, emqx_mod_sup:start_child(Spec)), + ?assertEqual(ok, emqx_mod_sup:start_child(Spec)), ok = emqx_mod_sup:stop_child(Mod), {error, not_found} = emqx_mod_sup:stop_child(Mod), ok. - diff --git a/lib-ce/emqx_modules/test/emqx_mod_trace_SUITE.erl b/lib-ce/emqx_modules/test/emqx_mod_trace_SUITE.erl new file mode 100644 index 000000000..c643c7908 --- /dev/null +++ b/lib-ce/emqx_modules/test/emqx_mod_trace_SUITE.erl @@ -0,0 +1,478 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mod_trace_SUITE). + +%% API +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). + +-define(HOST, "http://127.0.0.1:18083/"). +-define(API_VERSION, "v4"). +-define(BASE_PATH, "api"). + +-record(emqx_mod_trace, { + name, + type, + topic, + clientid, + packets = [], + enable = true, + start_at, + end_at, + log_size = #{} + }). + +-define(PACKETS, ['CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL' + , 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK' + , 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH']). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([emqx_modules, emqx_dashboard]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([emqx_modules, emqx_dashboard]). + +t_base_create_delete(_Config) -> + ok = emqx_mod_trace:clear(), + Now = erlang:system_time(second), + Start = to_rfc3339(Now), + End = to_rfc3339(Now + 30 * 60), + Name = <<"name1">>, + ClientId = <<"test-device">>, + Packets = [atom_to_binary(E) || E <- ?PACKETS], + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, + {<<"clientid">>, ClientId}, + {<<"packets">>, Packets}, + {<<"start_at">>, Start}, + {<<"end_at">>, End} + ], + AnotherTrace = lists:keyreplace(<<"name">>, 1, Trace, {<<"name">>, <<"AnotherName">>}), + ok = emqx_mod_trace:create(Trace), + ?assertEqual({error, {already_existed, Name}}, emqx_mod_trace:create(Trace)), + ?assertEqual({error, {duplicate_condition, Name}}, emqx_mod_trace:create(AnotherTrace)), + [TraceRec] = emqx_mod_trace:list(), + Expect = #emqx_mod_trace{ + name = Name, + type = clientid, + topic = undefined, + clientid = ClientId, + packets = ?PACKETS, + start_at = Now, + end_at = Now + 30 * 60 + }, + ?assertEqual(Expect, TraceRec), + ExpectFormat = [ + #{ + clientid => <<"test-device">>, + enable => true, + type => clientid, + packets => ?PACKETS, + name => <<"name1">>, + start_at => Start, + end_at => End, + log_size => #{}, + topic => undefined + } + ], + ?assertEqual(ExpectFormat, emqx_mod_trace:format([TraceRec])), + ?assertEqual(ok, emqx_mod_trace:delete(Name)), + ?assertEqual({error, not_found}, emqx_mod_trace:delete(Name)), + ?assertEqual([], emqx_mod_trace:list()), + ok. + +t_create_size_max(_Config) -> + emqx_mod_trace:clear(), + lists:map(fun(Seq) -> + Name = list_to_binary("name" ++ integer_to_list(Seq)), + Trace = [{<<"name">>, Name}, {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, + {<<"topic">>, list_to_binary("/x/y/" ++ integer_to_list(Seq))}], + ok = emqx_mod_trace:create(Trace) + end, lists:seq(1, 30)), + Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/31">>}], + {error, _} = emqx_mod_trace:create(Trace31), + ok = emqx_mod_trace:delete(<<"name30">>), + ok = emqx_mod_trace:create(Trace31), + ?assertEqual(30, erlang:length(emqx_mod_trace:list())), + ok. + +t_create_failed(_Config) -> + ok = emqx_mod_trace:clear(), + UnknownField = [{<<"unknown">>, 12}], + {error, Reason1} = emqx_mod_trace:create(UnknownField), + ?assertEqual(<<"unknown field: {<<\"unknown\">>,12}">>, iolist_to_binary(Reason1)), + + InvalidTopic = [{<<"topic">>, "#/#//"}], + {error, Reason2} = emqx_mod_trace:create(InvalidTopic), + ?assertEqual(<<"#/#// invalid by function_clause">>, iolist_to_binary(Reason2)), + + InvalidStart = [{<<"start_at">>, <<"2021-12-3:12">>}], + {error, Reason3} = emqx_mod_trace:create(InvalidStart), + ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, + iolist_to_binary(Reason3)), + + InvalidEnd = [{<<"end_at">>, <<"2021-12-3:12">>}], + {error, Reason4} = emqx_mod_trace:create(InvalidEnd), + ?assertEqual(<<"The rfc3339 specification not satisfied: 2021-12-3:12">>, + iolist_to_binary(Reason4)), + + InvalidPackets = [{<<"packets">>, [<<"publish">>]}], + {error, Reason5} = emqx_mod_trace:create(InvalidPackets), + ?assertEqual(<<"unsupport packets: [publish]">>, iolist_to_binary(Reason5)), + + InvalidPackets2 = [{<<"packets">>, <<"publish">>}], + {error, Reason6} = emqx_mod_trace:create(InvalidPackets2), + ?assertEqual(<<"unsupport packets: <<\"publish\">>">>, iolist_to_binary(Reason6)), + + {error, Reason7} = emqx_mod_trace:create([{<<"name">>, <<"test">>}, {<<"type">>, <<"clientid">>}]), + ?assertEqual(<<"topic/clientid cannot be both empty">>, iolist_to_binary(Reason7)), + + InvalidPackets4 = [{<<"name">>, <<"/test">>}, {<<"clientid">>, <<"t">>}, {<<"type">>, <<"clientid">>}], + {error, Reason9} = emqx_mod_trace:create(InvalidPackets4), + ?assertEqual(<<"name cannot contain /">>, iolist_to_binary(Reason9)), + + ?assertEqual({error, "type required"}, emqx_mod_trace:create([{<<"name">>, <<"test-name">>}, + {<<"packets">>, []}, {<<"clientid">>, <<"good">>}])), + + ?assertEqual({error, "incorrect type: only support clientid/topic"}, + emqx_mod_trace:create([{<<"name">>, <<"test-name">>}, + {<<"packets">>, []}, {<<"clientid">>, <<"good">>}, {<<"type">>, <<"typeerror">> }])), + ok. + +t_create_default(_Config) -> + ok = emqx_mod_trace:clear(), + {error, "name required"} = emqx_mod_trace:create([]), + ok = emqx_mod_trace:create([{<<"name">>, <<"test-name">>}, + {<<"type">>, <<"clientid">>}, {<<"packets">>, []}, {<<"clientid">>, <<"good">>}]), + [#emqx_mod_trace{packets = Packets}] = emqx_mod_trace:list(), + ?assertEqual([], Packets), + ok = emqx_mod_trace:clear(), + Trace = [ + {<<"name">>, <<"test-name">>}, + {<<"packets">>, [<<"PUBLISH">>]}, + {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/z">>}, + {<<"start_at">>, <<"2021-10-28T10:54:47+08:00">>}, + {<<"end_at">>, <<"2021-10-27T10:54:47+08:00">>} + ], + {error, "end_at time has already passed"} = emqx_mod_trace:create(Trace), + Now = erlang:system_time(second), + Trace2 = [ + {<<"name">>, <<"test-name">>}, + {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, + {<<"topic">>, <<"/x/y/z">>}, + {<<"start_at">>, to_rfc3339(Now + 10)}, + {<<"end_at">>, to_rfc3339(Now + 3)} + ], + {error, "failed by start_at >= end_at"} = emqx_mod_trace:create(Trace2), + ok = emqx_mod_trace:create([{<<"name">>, <<"test-name">>}, + {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, {<<"topic">>, <<"/x/y/z">>}]), + [#emqx_mod_trace{start_at = Start, end_at = End}] = emqx_mod_trace:list(), + ?assertEqual(10 * 60, End - Start), + ?assertEqual(true, Start - erlang:system_time(second) < 5), + ok. + +t_update_enable(_Config) -> + ok = emqx_mod_trace:clear(), + Name = <<"test-name">>, + Now = erlang:system_time(second), + End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)), + ok = emqx_mod_trace:create([{<<"name">>, Name}, {<<"packets">>, [<<"PUBLISH">>]}, + {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), + [#emqx_mod_trace{enable = Enable}] = emqx_mod_trace:list(), + ?assertEqual(Enable, true), + ok = emqx_mod_trace:update(Name, false), + [#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(), + ok = emqx_mod_trace:update(Name, false), + [#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(), + ok = emqx_mod_trace:update(Name, true), + [#emqx_mod_trace{enable = true}] = emqx_mod_trace:list(), + ok = emqx_mod_trace:update(Name, false), + [#emqx_mod_trace{enable = false}] = emqx_mod_trace:list(), + ?assertEqual({error, not_found}, emqx_mod_trace:update(<<"Name not found">>, true)), + ct:sleep(2100), + ?assertEqual({error, finished}, emqx_mod_trace:update(Name, true)), + ok. + +t_load_state(_Config) -> + emqx_mod_trace:clear(), + ok = emqx_mod_trace:load(test), + Now = erlang:system_time(second), + Running = [{<<"name">>, <<"Running">>}, {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/1">>}, {<<"start_at">>, to_rfc3339(Now - 1)}, {<<"end_at">>, to_rfc3339(Now + 2)}], + Waiting = [{<<"name">>, <<"Waiting">>}, {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/2">>}, {<<"start_at">>, to_rfc3339(Now + 3)}, {<<"end_at">>, to_rfc3339(Now + 8)}], + Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, <<"topic">>}, + {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)}, {<<"end_at">>, to_rfc3339(Now)}], + ok = emqx_mod_trace:create(Running), + ok = emqx_mod_trace:create(Waiting), + {error, "end_at time has already passed"} = emqx_mod_trace:create(Finished), + Traces = emqx_mod_trace:format(emqx_mod_trace:list()), + ?assertEqual(2, erlang:length(Traces)), + Enables = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces), + ExpectEnables = [{<<"Running">>, true}, {<<"Waiting">>, true}], + ?assertEqual(ExpectEnables, lists:sort(Enables)), + ct:sleep(3500), + Traces2 = emqx_mod_trace:format(emqx_mod_trace:list()), + ?assertEqual(2, erlang:length(Traces2)), + Enables2 = lists:map(fun(#{name := Name, enable := Enable}) -> {Name, Enable} end, Traces2), + ExpectEnables2 = [{<<"Running">>, false}, {<<"Waiting">>, true}], + ?assertEqual(ExpectEnables2, lists:sort(Enables2)), + ok = emqx_mod_trace:unload(test), + ok. + +t_client_event(_Config) -> + application:set_env(emqx, allow_anonymous, true), + emqx_mod_trace:clear(), + ClientId = <<"client-test">>, + ok = emqx_mod_trace:load(test), + Now = erlang:system_time(second), + Start = to_rfc3339(Now), + Name = <<"test_client_id_event">>, + ok = emqx_mod_trace:create([{<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + ct:sleep(200), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + emqtt:ping(Client), + ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]), + ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]), + ct:sleep(200), + ok = emqx_mod_trace:create([{<<"name">>, <<"test_topic">>}, + {<<"type">>, <<"topic">>}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), + ct:sleep(200), + {ok, Bin} = file:read_file(emqx_mod_trace:log_file(Name, Now)), + ok = emqtt:publish(Client, <<"/test">>, #{}, <<"3">>, [{qos, 0}]), + ok = emqtt:publish(Client, <<"/test">>, #{}, <<"4">>, [{qos, 0}]), + ok = emqtt:disconnect(Client), + ct:sleep(200), + {ok, Bin2} = file:read_file(emqx_mod_trace:log_file(Name, Now)), + {ok, Bin3} = file:read_file(emqx_mod_trace:log_file(<<"test_topic">>, Now)), + ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]), + ?assert(erlang:byte_size(Bin) > 0), + ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)), + ?assert(erlang:byte_size(Bin3) > 0), + ok = emqx_mod_trace:unload(test), + ok. + +t_get_log_filename(_Config) -> + ok = emqx_mod_trace:clear(), + ok = emqx_mod_trace:load(test), + Now = erlang:system_time(second), + Start = calendar:system_time_to_rfc3339(Now), + End = calendar:system_time_to_rfc3339(Now + 2), + Name = <<"name1">>, + ClientId = <<"test-device">>, + Packets = [atom_to_binary(E) || E <- ?PACKETS], + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, + {<<"clientid">>, ClientId}, + {<<"packets">>, Packets}, + {<<"start_at">>, list_to_binary(Start)}, + {<<"end_at">>, list_to_binary(End)} + ], + ok = emqx_mod_trace:create(Trace), + ?assertEqual({error, not_found}, emqx_mod_trace:get_trace_filename(<<"test">>)), + ?assertEqual(ok, element(1, emqx_mod_trace:get_trace_filename(Name))), + ct:sleep(3000), + ?assertEqual(ok, element(1, emqx_mod_trace:get_trace_filename(Name))), + ok = emqx_mod_trace:unload(test), + ok. + +t_trace_file(_Config) -> + FileName = "test.log", + Content = <<"test \n test">>, + TraceDir = emqx_mod_trace:trace_dir(), + File = filename:join(TraceDir, FileName), + ok = file:write_file(File, Content), + {ok, Node, Bin} = emqx_mod_trace:trace_file(FileName), + ?assertEqual(Node, atom_to_list(node())), + ?assertEqual(Content, Bin), + ok = file:delete(File), + ok. + +t_http_test(_Config) -> + emqx_mod_trace:clear(), + emqx_mod_trace:load(test), + Header = auth_header_(), + %% list + {ok, Empty} = request_api(get, api_path("trace"), Header), + ?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(Empty)), + %% create + ErrorTrace = #{}, + {ok, Error} = request_api(post, api_path("trace"), Header, ErrorTrace), + ?assertEqual(#{<<"message">> => <<"unknown field: {}">>, <<"code">> => <<"INCORRECT_PARAMS">>}, json(Error)), + + Name = <<"test-name">>, + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"topic">>}, + {<<"packets">>, [<<"PUBLISH">>]}, + {<<"topic">>, <<"/x/y/z">>} + ], + + {ok, Create} = request_api(post, api_path("trace"), Header, Trace), + ?assertEqual(#{<<"code">> => 0}, json(Create)), + + {ok, List} = request_api(get, api_path("trace"), Header), + #{<<"code">> := 0, <<"data">> := [Data]} = json(List), + ?assertEqual(Name, maps:get(<<"name">>, Data)), + + %% update + {ok, Update} = request_api(put, api_path("trace/test-name/disable"), Header, #{}), + ?assertEqual(#{<<"code">> => 0, + <<"data">> => #{<<"enable">> => false, + <<"name">> => <<"test-name">>}}, json(Update)), + + {ok, List1} = request_api(get, api_path("trace"), Header), + #{<<"code">> := 0, <<"data">> := [Data1]} = json(List1), + ?assertEqual(false, maps:get(<<"enable">>, Data1)), + + %% delete + {ok, Delete} = request_api(delete, api_path("trace/test-name"), Header), + ?assertEqual(#{<<"code">> => 0}, json(Delete)), + + {ok, DeleteNotFound} = request_api(delete, api_path("trace/test-name"), Header), + ?assertEqual(#{<<"code">> => <<"NOT_FOUND">>, + <<"message">> => <<"test-nameNOT FOUND">>}, json(DeleteNotFound)), + + {ok, List2} = request_api(get, api_path("trace"), Header), + ?assertEqual(#{<<"code">> => 0, <<"data">> => []}, json(List2)), + + %% clear + {ok, Create1} = request_api(post, api_path("trace"), Header, Trace), + ?assertEqual(#{<<"code">> => 0}, json(Create1)), + + {ok, Clear} = request_api(delete, api_path("trace"), Header), + ?assertEqual(#{<<"code">> => 0}, json(Clear)), + + emqx_mod_trace:unload(test), + ok. + +t_download_log(_Config) -> + emqx_mod_trace:clear(), + emqx_mod_trace:load(test), + ClientId = <<"client-test">>, + Now = erlang:system_time(second), + Start = to_rfc3339(Now), + Name = <<"test_client_id">>, + ok = emqx_mod_trace:create([{<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], + ct:sleep(100), + {ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} = + emqx_mod_trace_api:download_zip_log(#{name => Name}, []), + ?assert(ZipFileSize > 0), + %% download zip file failed by server_closed occasionally? + %Header = auth_header_(), + %{ok, ZipBin} = request_api(get, api_path("trace/test_client_id/download"), Header), + %{ok, ZipHandler} = zip:zip_open(ZipBin), + %{ok, [ZipName]} = zip:zip_get(ZipHandler), + %?assertNotEqual(nomatch, string:find(ZipName, "test@127.0.0.1")), + %{ok, _} = file:read_file(emqx_mod_trace:log_file(<<"test_client_id">>, Now)), + ok = emqtt:disconnect(Client), + emqx_mod_trace:unload(test), + ok. + +t_stream_log(_Config) -> + application:set_env(emqx, allow_anonymous, true), + emqx_mod_trace:clear(), + emqx_mod_trace:load(test), + ClientId = <<"client-stream">>, + Now = erlang:system_time(second), + Name = <<"test_stream_log">>, + Start = to_rfc3339(Now - 10), + ok = emqx_mod_trace:create([{<<"name">>, Name}, + {<<"type">>, <<"clientid">>}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), + ct:sleep(200), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + [begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)], + emqtt:publish(Client, <<"/good">>, #{}, <<"ghood1">>, [{qos, 0}]), + emqtt:publish(Client, <<"/good">>, #{}, <<"ghood2">>, [{qos, 0}]), + ok = emqtt:disconnect(Client), + ct:sleep(200), + File = emqx_mod_trace:log_file(Name, Now), + ct:pal("FileName: ~p", [File]), + {ok, FileBin} = file:read_file(File), + ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]), + Header = auth_header_(), + {ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header), + #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary), + ?assertEqual(10, byte_size(Bin)), + ?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta), + {ok, Binary1} = request_api(get, api_path("trace/test_stream_log/log?_page=20&_limit=10"), Header), + #{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1), + ?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1), + ?assertEqual(10, byte_size(Bin1)), + emqx_mod_trace:unload(test), + ok. + +to_rfc3339(Second) -> + list_to_binary(calendar:system_time_to_rfc3339(Second)). + +auth_header_() -> + auth_header_("admin", "public"). + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User, ":", Pass])), + {"Authorization", "Basic " ++ Encoded}. + +request_api(Method, Url, Auth) -> do_request_api(Method, {Url, [Auth]}). + +request_api(Method, Url, Auth, Body) -> + Request = {Url, [Auth], "application/json", emqx_json:encode(Body)}, + do_request_api(Method, Request). + +do_request_api(Method, Request) -> + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], [{body_format, binary}]) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {error,{shutdown, server_closed}} -> + {error, server_closed}; + {ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } + when Code =:= 200 orelse Code =:= 201 -> + {ok, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +api_path(Path) -> + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION, Path]). + +json(Data) -> + {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), Jsx. diff --git a/priv/emqx.schema b/priv/emqx.schema index febd4cb9e..69f3acc41 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2259,6 +2259,7 @@ end}. [{emqx_mod_rewrite, Rewrites()}], [{emqx_mod_topic_metrics, []}], [{emqx_mod_delayed, []}], + [{emqx_mod_trace, []}], [{emqx_mod_st_statistics, SlowTopic()}], [{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}] ]) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index d3ad128bb..14617e2e1 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -82,7 +82,7 @@ -define(SUBSCRIPTION, emqx_subscription). %% Guards --define(is_subid(Id), (is_binary(Id) orelse is_atom(Id))). +-define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))). -spec(start_link(atom(), pos_integer()) -> startlink_ret()). start_link(Pool, Id) -> @@ -118,14 +118,15 @@ subscribe(Topic) when is_binary(Topic) -> subscribe(Topic, undefined). -spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok). -subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> +subscribe(Topic, SubId) when is_binary(Topic), ?IS_SUBID(SubId) -> subscribe(Topic, SubId, ?DEFAULT_SUBOPTS); subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> subscribe(Topic, undefined, SubOpts). -spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok). -subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) -> +subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) -> SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), + _ = emqx_tracer:trace_subscribe(Topic, SubId, SubOpts), case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of false -> %% New ok = emqx_broker_helper:register_sub(SubPid, SubId), @@ -171,6 +172,7 @@ unsubscribe(Topic) when is_binary(Topic) -> SubPid = self(), case ets:lookup(?SUBOPTION, {SubPid, Topic}) of [{_, SubOpts}] -> + emqx_tracer:trace_unsubscribe(Topic, SubOpts), _ = emqx_broker_helper:reclaim_seq(Topic), do_unsubscribe(Topic, SubPid, SubOpts); [] -> ok @@ -183,13 +185,7 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> do_unsubscribe(Group, Topic, SubPid, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> - case maps:get(shard, SubOpts, 0) of - 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; - + clean_subscribe(SubOpts, Topic, SubPid); do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> emqx_shared_sub:unsubscribe(Group, Topic, SubPid). @@ -199,7 +195,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> -spec(publish(emqx_types:message()) -> emqx_types:publish_result()). publish(Msg) when is_record(Msg, message) -> - _ = emqx_tracer:trace(publish, Msg), + _ = emqx_tracer:trace_publish(Msg), emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of #message{headers = #{allow_publish := false}} -> @@ -231,8 +227,7 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}. -spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()). route([], #delivery{message = Msg}) -> - ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), - ok = inc_dropped_cnt(Msg), + drop_message(Msg), []; route(Routes, Delivery) -> @@ -240,6 +235,10 @@ route(Routes, Delivery) -> [do_route(Route, Delivery) | Acc] end, [], Routes). +drop_message(Msg) -> + ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), + ok = inc_dropped_cnt(Msg). + do_route({To, Node}, Delivery) when Node =:= node() -> {Node, To, dispatch(To, Delivery)}; do_route({To, Node}, Delivery) when is_atom(Node) -> @@ -261,7 +260,7 @@ aggre(Routes) -> end, [], Routes). %% @doc Forward message to another node. --spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync|async) +-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async) -> emqx_types:deliver_result()). forward(Node, To, Delivery, async) -> case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of @@ -288,8 +287,7 @@ dispatch(Topic, #delivery{message = Msg}) -> end, 0, subscribers(Topic)), case DispN of 0 -> - ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), - ok = inc_dropped_cnt(Msg), + drop_message(Msg), {error, no_subscribers}; _ -> {ok, DispN} @@ -336,17 +334,20 @@ subscriber_down(SubPid) -> SubOpts when is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), true = ets:delete(?SUBOPTION, {SubPid, Topic}), - case maps:get(shard, SubOpts, 0) of - 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; + clean_subscribe(SubOpts, Topic, SubPid); undefined -> ok end end, lookup_value(?SUBSCRIPTION, SubPid, [])), ets:delete(?SUBSCRIPTION, SubPid). +clean_subscribe(SubOpts, Topic, SubPid) -> + case maps:get(shard, SubOpts, 0) of + 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + end. + %%-------------------------------------------------------------------- %% Management APIs %%-------------------------------------------------------------------- @@ -366,14 +367,14 @@ subscriptions(SubId) -> -spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic()) -> boolean()). subscribed(SubPid, Topic) when is_pid(SubPid) -> ets:member(?SUBOPTION, {SubPid, Topic}); -subscribed(SubId, Topic) when ?is_subid(SubId) -> +subscribed(SubId, Topic) when ?IS_SUBID(SubId) -> SubPid = emqx_broker_helper:lookup_subpid(SubId), ets:member(?SUBOPTION, {SubPid, Topic}). -spec(get_subopts(pid(), emqx_topic:topic()) -> maybe(emqx_types:subopts())). get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> lookup_value(?SUBOPTION, {SubPid, Topic}); -get_subopts(SubId, Topic) when ?is_subid(SubId) -> +get_subopts(SubId, Topic) when ?IS_SUBID(SubId) -> case emqx_broker_helper:lookup_subpid(SubId) of SubPid when is_pid(SubPid) -> get_subopts(SubPid, Topic); @@ -498,4 +499,3 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - diff --git a/src/emqx_guid.erl b/src/emqx_guid.erl index 3b66f6e92..4963c48d4 100644 --- a/src/emqx_guid.erl +++ b/src/emqx_guid.erl @@ -39,6 +39,8 @@ , from_base62/1 ]). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + -define(TAG_VERSION, 131). -define(PID_EXT, 103). -define(NEW_PID_EXT, 88). @@ -137,7 +139,7 @@ npid() -> NPid. to_hexstr(I) when byte_size(I) =:= 16 -> - emqx_misc:bin2hexstr_A_F(I). + emqx_misc:bin2hexstr_a_f_upper(I). from_hexstr(S) when byte_size(S) =:= 32 -> emqx_misc:hexstr2bin(S). diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index eb6a25377..2b77d0455 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -21,6 +21,8 @@ -include("types.hrl"). -include("logger.hrl"). +-elvis([{elvis_style, god_modules, disable}]). + -export([ merge_opts/2 , maybe_apply/2 , compose/1 @@ -47,8 +49,8 @@ , ipv6_probe/1 ]). --export([ bin2hexstr_A_F/1 - , bin2hexstr_a_f/1 +-export([ bin2hexstr_a_f_upper/1 + , bin2hexstr_a_f_lower/1 , hexstr2bin/1 ]). @@ -98,9 +100,9 @@ maybe_apply(Fun, Arg) when is_function(Fun) -> -spec(compose(list(F)) -> G when F :: fun((any()) -> any()), G :: fun((any()) -> any())). -compose([F|More]) -> compose(F, More). +compose([F | More]) -> compose(F, More). --spec(compose(F, G|[Gs]) -> C +-spec(compose(F, G | [Gs]) -> C when F :: fun((X1) -> X2), G :: fun((X2) -> X3), Gs :: [fun((Xn) -> Xn1)], @@ -108,19 +110,19 @@ compose([F|More]) -> compose(F, More). X3 :: any(), Xn :: any(), Xn1 :: any(), Xm :: any()). compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end; compose(F, [G]) -> compose(F, G); -compose(F, [G|More]) -> compose(compose(F, G), More). +compose(F, [G | More]) -> compose(compose(F, G), More). %% @doc RunFold run_fold([], Acc, _State) -> Acc; -run_fold([Fun|More], Acc, State) -> +run_fold([Fun | More], Acc, State) -> run_fold(More, Fun(Acc, State), State). %% @doc Pipeline pipeline([], Input, State) -> {ok, Input, State}; -pipeline([Fun|More], Input, State) -> +pipeline([Fun | More], Input, State) -> case apply_fun(Fun, Input, State) of ok -> pipeline(More, Input, State); {ok, NState} -> @@ -169,7 +171,7 @@ drain_deliver(0, Acc) -> drain_deliver(N, Acc) -> receive Deliver = {deliver, _Topic, _Msg} -> - drain_deliver(N-1, [Deliver|Acc]) + drain_deliver(N-1, [Deliver | Acc]) after 0 -> lists:reverse(Acc) end. @@ -184,7 +186,7 @@ drain_down(0, Acc) -> drain_down(Cnt, Acc) -> receive {'DOWN', _MRef, process, Pid, _Reason} -> - drain_down(Cnt-1, [Pid|Acc]) + drain_down(Cnt-1, [Pid | Acc]) after 0 -> lists:reverse(Acc) end. @@ -210,7 +212,7 @@ check_oom(Pid, #{message_queue_len := MaxQLen, end. do_check_oom([]) -> ok; -do_check_oom([{Val, Max, Reason}|Rest]) -> +do_check_oom([{Val, Max, Reason} | Rest]) -> case is_integer(Max) andalso (0 < Max) andalso (Max < Val) of true -> {shutdown, Reason}; false -> do_check_oom(Rest) @@ -257,8 +259,8 @@ proc_stats(Pid) -> reductions, memory]) of undefined -> []; - [{message_queue_len, Len}|ProcStats] -> - [{mailbox_len, Len}|ProcStats] + [{message_queue_len, Len} | ProcStats] -> + [{mailbox_len, Len} | ProcStats] end. rand_seed() -> @@ -278,17 +280,17 @@ index_of(E, L) -> index_of(_E, _I, []) -> error(badarg); -index_of(E, I, [E|_]) -> +index_of(E, I, [E | _]) -> I; -index_of(E, I, [_|L]) -> +index_of(E, I, [_ | L]) -> index_of(E, I+1, L). --spec(bin2hexstr_A_F(binary()) -> binary()). -bin2hexstr_A_F(B) when is_binary(B) -> +-spec(bin2hexstr_a_f_upper(binary()) -> binary()). +bin2hexstr_a_f_upper(B) when is_binary(B) -> << <<(int2hexchar(H, upper)), (int2hexchar(L, upper))>> || <> <= B>>. --spec(bin2hexstr_a_f(binary()) -> binary()). -bin2hexstr_a_f(B) when is_binary(B) -> +-spec(bin2hexstr_a_f_lower(binary()) -> binary()). +bin2hexstr_a_f_lower(B) when is_binary(B) -> << <<(int2hexchar(H, lower)), (int2hexchar(L, lower))>> || <> <= B>>. int2hexchar(I, _) when I >= 0 andalso I < 10 -> I + $0; diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index a4d440ba1..2a1137f57 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -46,24 +46,6 @@ -export([format/1]). --define(TYPE_NAMES, - { 'CONNECT' - , 'CONNACK' - , 'PUBLISH' - , 'PUBACK' - , 'PUBREC' - , 'PUBREL' - , 'PUBCOMP' - , 'SUBSCRIBE' - , 'SUBACK' - , 'UNSUBSCRIBE' - , 'UNSUBACK' - , 'PINGREQ' - , 'PINGRESP' - , 'DISCONNECT' - , 'AUTH' - }). - -type(connect() :: #mqtt_packet_connect{}). -type(publish() :: #mqtt_packet_publish{}). -type(subscribe() :: #mqtt_packet_subscribe{}). @@ -107,14 +89,14 @@ retain(#mqtt_packet{header = #mqtt_packet_header{retain = Retain}}) -> %%-------------------------------------------------------------------- %% @doc Protocol name of the CONNECT Packet. --spec(proto_name(emqx_types:packet()|connect()) -> binary()). +-spec(proto_name(emqx_types:packet() | connect()) -> binary()). proto_name(?CONNECT_PACKET(ConnPkt)) -> proto_name(ConnPkt); proto_name(#mqtt_packet_connect{proto_name = Name}) -> Name. %% @doc Protocol version of the CONNECT Packet. --spec(proto_ver(emqx_types:packet()|connect()) -> emqx_types:version()). +-spec(proto_ver(emqx_types:packet() | connect()) -> emqx_types:version()). proto_ver(?CONNECT_PACKET(ConnPkt)) -> proto_ver(ConnPkt); proto_ver(#mqtt_packet_connect{proto_ver = Ver}) -> @@ -249,7 +231,7 @@ set_props(Props, #mqtt_packet_auth{} = Pkt) -> %%-------------------------------------------------------------------- %% @doc Check PubSub Packet. --spec(check(emqx_types:packet()|publish()|subscribe()|unsubscribe()) +-spec(check(emqx_types:packet() | publish() | subscribe() | unsubscribe()) -> ok | {error, emqx_types:reason_code()}). check(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH}, variable = PubPkt}) when not is_tuple(PubPkt) -> @@ -318,7 +300,7 @@ check_pub_props(#{'Response-Topic' := ResponseTopic}) -> check_pub_props(_Props) -> ok. %% @doc Check CONNECT Packet. --spec(check(emqx_types:packet()|connect(), Opts :: map()) +-spec(check(emqx_types:packet() | connect(), Opts :: map()) -> ok | {error, emqx_types:reason_code()}). check(?CONNECT_PACKET(ConnPkt), Opts) -> check(ConnPkt, Opts); @@ -357,11 +339,13 @@ check_conn_props(#mqtt_packet_connect{properties = undefined}, _Opts) -> ok; check_conn_props(#mqtt_packet_connect{properties = #{'Receive-Maximum' := 0}}, _Opts) -> {error, ?RC_PROTOCOL_ERROR}; -check_conn_props(#mqtt_packet_connect{properties = #{'Request-Response-Information' := ReqRespInfo}}, _Opts) - when ReqRespInfo =/= 0, ReqRespInfo =/= 1 -> +check_conn_props(#mqtt_packet_connect{properties = + #{'Request-Response-Information' := ReqRespInfo}}, _Opts) + when ReqRespInfo =/= 0, ReqRespInfo =/= 1 -> {error, ?RC_PROTOCOL_ERROR}; -check_conn_props(#mqtt_packet_connect{properties = #{'Request-Problem-Information' := ReqProInfo}}, _Opts) - when ReqProInfo =/= 0, ReqProInfo =/= 1 -> +check_conn_props(#mqtt_packet_connect{properties = + #{'Request-Problem-Information' := ReqProInfo}}, _Opts) + when ReqProInfo =/= 0, ReqProInfo =/= 1 -> {error, ?RC_PROTOCOL_ERROR}; check_conn_props(_ConnPkt, _Opts) -> ok. @@ -382,7 +366,7 @@ check_will_msg(#mqtt_packet_connect{will_topic = WillTopic}, _Opts) -> run_checks([], _Packet, _Options) -> ok; -run_checks([Check|More], Packet, Options) -> +run_checks([Check | More], Packet, Options) -> case Check(Packet, Options) of ok -> run_checks(More, Packet, Options); Error = {error, _Reason} -> Error @@ -419,7 +403,8 @@ to_message(#mqtt_packet{ Msg#message{flags = #{dup => Dup, retain => Retain}, headers = Headers#{properties => Props}}. --spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()). +-type(connectPacket() :: #mqtt_packet_connect{}). +-spec(will_msg(connectPacket()) -> emqx_types:message()). will_msg(#mqtt_packet_connect{will_flag = false}) -> undefined; will_msg(#mqtt_packet_connect{clientid = ClientId, @@ -468,13 +453,16 @@ format_variable(#mqtt_packet_connect{ will_payload = WillPayload, username = Username, password = Password}) -> - Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanStart=~s, KeepAlive=~p, Username=~s, Password=~s", - Args = [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)], - {Format1, Args1} = if - WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~s, Payload=~0p)", - Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]}; - true -> {Format, Args} - end, + Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanStart=~s," + " KeepAlive=~p, Username=~s, Password=~s", + Args = [ClientId, ProtoName, ProtoVer, CleanStart, + KeepAlive, Username, format_password(Password)], + {Format1, Args1} = + case WillFlag of + true -> {Format ++ ", Will(Q~p, R~p, Topic=~s, Payload=~0p)", + Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]}; + false -> {Format, Args} + end, io_lib:format(Format1, Args1); format_variable(#mqtt_packet_disconnect @@ -520,4 +508,3 @@ format_password(_Password) -> '******'. i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. - diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 995712f6c..e563c8eab 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -22,13 +22,29 @@ -logger_header("[Tracer]"). %% APIs --export([ trace/2 +-export([ trace_publish/1 + , trace_subscribe/3 + , trace_unsubscribe/2 , start_trace/3 + , start_trace/4 , lookup_traces/0 - , stop_trace/1 + , stop_trace/3 + , stop_trace/2 ]). --type(trace_who() :: {clientid | topic, binary()}). +-ifdef(TEST). +-export([is_match/3]). +-endif. + +-type(label() :: 'CONNECT' | 'CONNACK' | 'PUBLISH' | 'PUBACK' | 'PUBREC' | + 'PUBREL' | 'PUBCOMP' | 'SUBSCRIBE' | 'SUBACK' | 'UNSUBSCRIBE' | + 'UNSUBACK' | 'PINGREQ' | 'PINGRESP' | 'DISCONNECT' | 'AUTH'). + +-type(tracer() :: #{name := binary(), + type := clientid | topic, + clientid => emqx_types:clientid(), + topic => emqx_types:topic(), + labels := [label()]}). -define(TRACER, ?MODULE). -define(FORMAT, {logger_formatter, @@ -44,10 +60,16 @@ msg, "\n"], single_line => false }}). --define(TOPIC_TRACE_ID(T), "trace_topic_"++T). --define(CLIENT_TRACE_ID(C), "trace_clientid_"++C). --define(TOPIC_TRACE(T), {topic, T}). --define(CLIENT_TRACE(C), {clientid, C}). +-define(TOPIC_COMBINATOR, <<"_trace_topic_">>). +-define(CLIENTID_COMBINATOR, <<"_trace_clientid_">>). +-define(TOPIC_TRACE_ID(T, N), + binary_to_atom(<<(N)/binary, ?TOPIC_COMBINATOR/binary, (T)/binary>>)). +-define(CLIENT_TRACE_ID(C, N), + binary_to_atom(<<(N)/binary, ?CLIENTID_COMBINATOR/binary, (C)/binary>>)). +-define(TOPIC_TRACE(T, N, M), {topic, T, N, M}). +-define(CLIENT_TRACE(C, N, M), {clientid, C, N, M}). +-define(TOPIC_TRACE(T, N), {topic, T, N}). +-define(CLIENT_TRACE(C, N), {clientid, C, N}). -define(IS_LOG_LEVEL(L), L =:= emergency orelse @@ -59,28 +81,49 @@ L =:= info orelse L =:= debug). --dialyzer({nowarn_function, [install_trace_handler/3]}). - %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ -trace(publish, #message{topic = <<"$SYS/", _/binary>>}) -> +trace_publish(#message{topic = <<"$SYS/", _/binary>>}) -> %% Do not trace '$SYS' publish ignore; -trace(publish, #message{from = From, topic = Topic, payload = Payload}) +trace_publish(#message{from = From, topic = Topic, payload = Payload}) when is_binary(From); is_atom(From) -> emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~0p", [Topic, Payload]). +trace_subscribe(<<"$SYS/", _/binary>>, _SubId, _SubOpts) -> ignore; +trace_subscribe(Topic, SubId, SubOpts) -> + emqx_logger:info(#{topic => Topic, + mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, + "~ts SUBSCRIBE ~ts: Options: ~0p", [SubId, Topic, SubOpts]). + +trace_unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) -> ignore; +trace_unsubscribe(Topic, SubOpts) -> + emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, + "~ts UNSUBSCRIBE ~ts: Options: ~0p", + [maps:get(subid, SubOpts, ""), Topic, SubOpts]). + +-spec(start_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic(), + logger:level() | all, string()) -> ok | {error, term()}). +start_trace(clientid, ClientId0, Level, LogFile) -> + ClientId = ensure_bin(ClientId0), + Who = #{type => clientid, clientid => ClientId, name => ClientId, labels => []}, + start_trace(Who, Level, LogFile); +start_trace(topic, Topic0, Level, LogFile) -> + Topic = ensure_bin(Topic0), + Who = #{type => topic, topic => Topic, name => Topic, labels => []}, + start_trace(Who, Level, LogFile). + %% @doc Start to trace clientid or topic. --spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}). +-spec(start_trace(tracer(), logger:level() | all, string()) -> ok | {error, term()}). start_trace(Who, all, LogFile) -> start_trace(Who, debug, LogFile); start_trace(Who, Level, LogFile) -> case ?IS_LOG_LEVEL(Level) of true -> - #{level := PrimaryLevel} = logger:get_primary_config(), + PrimaryLevel = emqx_logger:get_primary_log_level(), try logger:compare_levels(Level, PrimaryLevel) of lt -> {error, @@ -96,13 +139,28 @@ start_trace(Who, Level, LogFile) -> false -> {error, {invalid_log_level, Level}} end. +-spec(stop_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic()) -> + ok | {error, term()}). +stop_trace(Type, ClientIdOrTopic) -> + stop_trace(Type, ClientIdOrTopic, ClientIdOrTopic). + %% @doc Stop tracing clientid or topic. --spec(stop_trace(trace_who()) -> ok | {error, term()}). -stop_trace(Who) -> +-spec(stop_trace(clientid | topic, emqx_types:clientid() | emqx_types:topic(), binary()) -> + ok | {error, term()}). +stop_trace(clientid, ClientId, Name) -> + Who = #{type => clientid, clientid => ensure_bin(ClientId), name => ensure_bin(Name)}, + uninstall_trance_handler(Who); +stop_trace(topic, Topic, Name) -> + Who = #{type => topic, topic => ensure_bin(Topic), name => ensure_bin(Name)}, uninstall_trance_handler(Who). %% @doc Lookup all traces --spec(lookup_traces() -> [{Who :: trace_who(), LogFile :: string()}]). +-spec(lookup_traces() -> [#{ type => topic | clientid, + name => binary(), + topic => emqx_types:topic(), + level => logger:level(), + dst => file:filename() | console | unknown + }]). lookup_traces() -> lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). @@ -116,7 +174,8 @@ install_trace_handler(Who, Level, LogFile) -> {fun filter_by_meta_key/2, Who}}]}) of ok -> - ?LOG(info, "Start trace for ~p", [Who]); + ?LOG(info, "Start trace for ~p", [Who]), + ok; {error, Reason} -> ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]), {error, Reason} @@ -125,44 +184,78 @@ install_trace_handler(Who, Level, LogFile) -> uninstall_trance_handler(Who) -> case logger:remove_handler(handler_id(Who)) of ok -> - ?LOG(info, "Stop trace for ~p", [Who]); + ?LOG(info, "Stop trace for ~p", [Who]), + ok; {error, Reason} -> ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]), {error, Reason} end. filter_traces(#{id := Id, level := Level, dst := Dst}, Acc) -> - case atom_to_list(Id) of - ?TOPIC_TRACE_ID(T)-> - [{?TOPIC_TRACE(T), {Level, Dst}} | Acc]; - ?CLIENT_TRACE_ID(C) -> - [{?CLIENT_TRACE(C), {Level, Dst}} | Acc]; - _ -> Acc + IdStr = atom_to_binary(Id), + case binary:split(IdStr, [?TOPIC_COMBINATOR]) of + [Name, Topic] -> + [#{ type => topic, + name => Name, + topic => Topic, + level => Level, + dst => Dst} | Acc]; + _ -> + case binary:split(IdStr, [?CLIENTID_COMBINATOR]) of + [Name, ClientId] -> + [#{ type => clientid, + name => Name, + clientid => ClientId, + level => Level, + dst => Dst} | Acc]; + _ -> Acc + end end. -handler_id(?TOPIC_TRACE(Topic)) -> - list_to_atom(?TOPIC_TRACE_ID(handler_name(Topic))); -handler_id(?CLIENT_TRACE(ClientId)) -> - list_to_atom(?CLIENT_TRACE_ID(handler_name(ClientId))). +%% Plan to support topic_and_client type, so we have type field. +handler_id(#{type := topic, topic := Topic, name := Name}) -> + ?TOPIC_TRACE_ID(format(Topic), format(Name)); +handler_id(#{type := clientid, clientid := ClientId, name := Name}) -> + ?CLIENT_TRACE_ID(format(ClientId), format(Name)). -filter_by_meta_key(#{meta := Meta} = Log, {Key, Value}) -> - case is_meta_match(Key, Value, Meta) of +filter_by_meta_key(#{meta := Meta, level := Level} = Log, Context) -> + case is_match(Context, Meta, Level) of true -> Log; false -> ignore end. -is_meta_match(clientid, ClientId, #{clientid := ClientIdStr}) -> - ClientId =:= iolist_to_binary(ClientIdStr); -is_meta_match(topic, TopicFilter, #{topic := TopicMeta}) -> - emqx_topic:match(TopicMeta, TopicFilter); -is_meta_match(_, _, _) -> +%% When the log level is higher than debug and clientid/topic is match, +%% it will be logged without judging the content inside the labels. +%% When the log level is debug, in addition to the matched clientid/topic, +%% you also need to determine whether the label is in the labels +is_match(#{type := clientid, clientid := ExpectId, labels := Labels}, + #{clientid := RealId} = Meta, + Level) -> + is_match(ExpectId =:= iolist_to_binary(RealId), Level, Meta, Labels); +is_match(#{type := topic, topic := TopicFilter, labels := Labels}, + #{topic := Topic} = Meta, Level) -> + is_match(emqx_topic:match(Topic, TopicFilter), Level, Meta, Labels); +is_match(_, _, _) -> false. -handler_name(Bin) -> - case byte_size(Bin) of - Size when Size =< 200 -> binary_to_list(Bin); - _ -> hashstr(Bin) +is_match(true, debug, Meta, Labels) -> is_match_labels(Meta, Labels); +is_match(Boolean, _, _Meta, _Labels) -> Boolean. + +is_match_labels(#{trace_label := 'ALL'}, _Context) -> true; +is_match_labels(_, []) -> true; +is_match_labels(#{trace_label := Packet}, Context) -> + lists:member(Packet, Context); +is_match_labels(_, _) -> false. + +format(List)when is_list(List) -> + format(list_to_binary(List)); +format(Atom)when is_atom(Atom) -> + format(atom_to_list(Atom)); +format(Bin0)when is_binary(Bin0) -> + case byte_size(Bin0) of + Size when Size =< 200 -> Bin0; + _ -> emqx_misc:bin2hexstr_a_f_upper(Bin0) end. -hashstr(Bin) -> - binary_to_list(emqx_misc:bin2hexstr_A_F(Bin)). +ensure_bin(List) when is_list(List) -> iolist_to_binary(List); +ensure_bin(Bin) when is_binary(Bin) -> Bin. diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index f5af65440..5f7105774 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -22,8 +22,13 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(CLIENT, [{host, "localhost"}, + {clientid, <<"client">>}, + {username, <<"testuser">>}, + {password, <<"pass">>} + ]). -all() -> [t_trace_clientid, t_trace_topic]. +all() -> [t_trace_clientid, t_trace_topic, t_is_match]. init_per_suite(Config) -> emqx_ct_helpers:boot_modules(all), @@ -34,33 +39,36 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_trace_clientid(_Config) -> - {ok, T} = emqtt:start_link([{host, "localhost"}, - {clientid, <<"client">>}, - {username, <<"testuser">>}, - {password, <<"pass">>} - ]), + {ok, T} = emqtt:start_link(?CLIENT), emqtt:connect(T), %% Start tracing emqx_logger:set_log_level(error), - {error, _} = emqx_tracer:start_trace({clientid, <<"client">>}, debug, "tmp/client.log"), + {error, _} = emqx_tracer:start_trace(clientid, <<"client">>, debug, "tmp/client.log"), emqx_logger:set_log_level(debug), - ok = emqx_tracer:start_trace({clientid, <<"client">>}, debug, "tmp/client.log"), - ok = emqx_tracer:start_trace({clientid, <<"client2">>}, all, "tmp/client2.log"), - ok = emqx_tracer:start_trace({clientid, <<"client3">>}, all, "tmp/client3.log"), - {error, {invalid_log_level, bad_level}} = emqx_tracer:start_trace({clientid, <<"client4">>}, bad_level, "tmp/client4.log"), - {error, {handler_not_added, {file_error,".",eisdir}}} = emqx_tracer:start_trace({clientid, <<"client5">>}, debug, "."), + %% add list clientid + ok = emqx_tracer:start_trace(clientid, "client", debug, "tmp/client.log"), + ok = emqx_tracer:start_trace(clientid, <<"client2">>, all, "tmp/client2.log"), + ok = emqx_tracer:start_trace(clientid, <<"client3">>, all, "tmp/client3.log"), + {error, {invalid_log_level, bad_level}} = + emqx_tracer:start_trace(clientid, <<"client4">>, bad_level, "tmp/client4.log"), + {error, {handler_not_added, {file_error, ".", eisdir}}} = + emqx_tracer:start_trace(clientid, <<"client5">>, debug, "."), ct:sleep(100), %% Verify the tracing file exits ?assert(filelib:is_regular("tmp/client.log")), ?assert(filelib:is_regular("tmp/client2.log")), + ?assert(filelib:is_regular("tmp/client3.log")), %% Get current traces - ?assertEqual([{{clientid,"client"},{debug,"tmp/client.log"}}, - {{clientid,"client2"},{debug,"tmp/client2.log"}}, - {{clientid,"client3"},{debug,"tmp/client3.log"}} - ], emqx_tracer:lookup_traces()), + ?assertEqual([#{type => clientid, clientid => <<"client">>, + name => <<"client">>, level => debug, dst => "tmp/client.log"}, + #{type => clientid, clientid => <<"client2">>, + name => <<"client2">>, level => debug, dst => "tmp/client2.log"}, + #{type => clientid, clientid => <<"client3">>, + name => <<"client3">>, level => debug, dst => "tmp/client3.log"} + ], emqx_tracer:lookup_traces()), %% set the overall log level to debug emqx_logger:set_log_level(debug), @@ -74,47 +82,90 @@ t_trace_clientid(_Config) -> ?assert(filelib:file_size("tmp/client2.log") == 0), %% Stop tracing - ok = emqx_tracer:stop_trace({clientid, <<"client">>}), - ok = emqx_tracer:stop_trace({clientid, <<"client2">>}), - ok = emqx_tracer:stop_trace({clientid, <<"client3">>}), + ok = emqx_tracer:stop_trace(clientid, <<"client">>), + ok = emqx_tracer:stop_trace(clientid, <<"client2">>), + ok = emqx_tracer:stop_trace(clientid, <<"client3">>), emqtt:disconnect(T), emqx_logger:set_log_level(warning). t_trace_topic(_Config) -> - {ok, T} = emqtt:start_link([{host, "localhost"}, - {clientid, <<"client">>}, - {username, <<"testuser">>}, - {password, <<"pass">>} - ]), + {ok, T} = emqtt:start_link(?CLIENT), emqtt:connect(T), %% Start tracing emqx_logger:set_log_level(debug), - ok = emqx_tracer:start_trace({topic, <<"x/#">>}, all, "tmp/topic_trace.log"), - ok = emqx_tracer:start_trace({topic, <<"y/#">>}, all, "tmp/topic_trace.log"), + ok = emqx_tracer:start_trace(topic, <<"x/#">>, all, "tmp/topic_trace_x.log"), + ok = emqx_tracer:start_trace(topic, <<"y/#">>, all, "tmp/topic_trace_y.log"), ct:sleep(100), %% Verify the tracing file exits - ?assert(filelib:is_regular("tmp/topic_trace.log")), + ?assert(filelib:is_regular("tmp/topic_trace_x.log")), + ?assert(filelib:is_regular("tmp/topic_trace_y.log")), %% Get current traces - ?assertEqual([{{topic,"x/#"},{debug,"tmp/topic_trace.log"}}, - {{topic,"y/#"},{debug,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()), + ?assertEqual([#{type => topic, topic => <<"x/#">>, name => <<"x/#">>, + level => debug, dst => "tmp/topic_trace_x.log"}, + #{type => topic, topic => <<"y/#">>, name => <<"y/#">>, + level => debug, dst => "tmp/topic_trace_y.log"}], + emqx_tracer:lookup_traces()), %% set the overall log level to debug emqx_logger:set_log_level(debug), %% Client with clientid = "client" publishes a "hi" message to "x/y/z". - emqtt:publish(T, <<"x/y/z">>, <<"hi">>), + emqtt:publish(T, <<"x/y/z">>, <<"hi1">>), + emqtt:publish(T, <<"x/y/z">>, <<"hi2">>), ct:sleep(200), - ?assert(filelib:file_size("tmp/topic_trace.log") > 0), + ?assert(filelib:file_size("tmp/topic_trace_x.log") > 0), + ?assert(filelib:file_size("tmp/topic_trace_y.log") =:= 0), %% Stop tracing - ok = emqx_tracer:stop_trace({topic, <<"x/#">>}), - ok = emqx_tracer:stop_trace({topic, <<"y/#">>}), - {error, _Reason} = emqx_tracer:stop_trace({topic, <<"z/#">>}), + ok = emqx_tracer:stop_trace(topic, <<"x/#">>), + ok = emqx_tracer:stop_trace(topic, <<"y/#">>), + {error, _Reason} = emqx_tracer:stop_trace(topic, <<"z/#">>), emqtt:disconnect(T), emqx_logger:set_log_level(warning). + +t_is_match(_Config) -> + ClientId = <<"test">>, + ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, + #{clientid => ClientId}, warning)), + ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, + #{clientid => ClientId}, debug)), + ?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, + labels => ['PUBLISH']}, #{clientid => ClientId}, debug)), + ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, + #{clientid => ClientId, trace_label => 'PUBLISH'}, debug)), + ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['PUBLISH']}, + #{clientid => ClientId, trace_label => 'PUBLISH'}, debug)), + ?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['SUBACK']}, + #{clientid => ClientId, trace_label => 'PUBLISH'}, debug)), + ?assert(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => ['SUBACK']}, + #{clientid => ClientId, trace_label => 'ALL'}, debug)), + ?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, + #{clientid => <<"Bad">>}, warning)), + ?assertNot(emqx_tracer:is_match(#{clientid => ClientId, type => clientid, labels => []}, + #{clientid => <<"Bad">>, trace_label => 'PUBLISH'}, debug)), + + Topic = <<"/test/#">>, + ?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []}, + #{topic => <<"/test/1">>}, warning)), + ?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []}, + #{topic => <<"/test/1/2">>}, debug)), + ?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['SUBSCRIBE']}, + #{topic => <<"/test/1/2">>}, debug)), + ?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []}, + #{topic => <<"/test/3">>, trace_label => 'PUBLISH'}, debug)), + ?assert(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['PUBLISH']}, + #{topic => <<"/test/398/">>, trace_label => 'PUBLISH'}, debug)), + ?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['SUBACK']}, + #{topic => <<"/test/1/xy/y">>, trace_label => 'PUBLISH'}, debug)), + + ?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => ['PUBLISH']}, + #{topic => <<"/t1est/398/">>, trace_label => 'PUBLISH'}, debug)), + ?assertNot(emqx_tracer:is_match(#{type => topic, topic => Topic, labels => []}, + #{topic => <<"/t1est/1/xy/y">>, trace_label => 'PUBLISH'}, debug)), + ok.