diff --git a/.github/workflows/run_api_tests.yaml b/.github/workflows/run_api_tests.yaml index e8ab99f57..c5a8b94cb 100644 --- a/.github/workflows/run_api_tests.yaml +++ b/.github/workflows/run_api_tests.yaml @@ -86,7 +86,7 @@ jobs: - uses: actions/checkout@v2 with: repository: emqx/emqx-fvt - ref: 1.0.3-dev2 + ref: 1.0.4-dev1 path: . - uses: actions/setup-java@v1 with: diff --git a/README.md b/README.md index 90e7f8259..1c1aaf90c 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,9 @@ Visiting [EMQ X FAQ](https://docs.emqx.io/en/broker/latest/faq/faq.html) to get ### Questions -[GitHub Discussions](https://github.com/emqx/emqx/discussions) is where you can ask questions, and share ideas. +- [GitHub Discussions](https://github.com/emqx/emqx/discussions) is where you can ask questions, and share ideas. +- [Slack](https://slack-invite.emqx.io/) is where you can ask and discuss questions or contact our teams directly. +- [Discord](https://discord.gg/xYGf3fQnES) is where you can get help and upcoming events related to IoT technologies. ### Proposals diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 6c889e000..3c46c6b54 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -80,9 +80,15 @@ , mark_channel_connected/1 , mark_channel_disconnected/1 , get_connected_client_count/0 + + , do_kick_session/3 + , do_get_chan_stats/2 + , do_get_chan_info/2 + , do_get_chann_conn_mod/2 ]). -export_type([ channel_info/0 + , chan_pid/0 ]). -type(chan_pid() :: pid()). @@ -92,6 +98,8 @@ , _Stats :: emqx_types:stats() }). +-include("emqx_cm.hrl"). + %% Tables for channel management. -define(CHAN_TAB, emqx_channel). -define(CHAN_CONN_TAB, emqx_channel_conn). @@ -111,10 +119,6 @@ %% Server name -define(CM, ?MODULE). --define(T_KICK, 5_000). --define(T_GET_INFO, 5_000). --define(T_TAKEOVER, 15_000). - %% linting overrides -elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}} , {elvis_style, god_modules, #{ignore => [emqx_cm]}} @@ -181,16 +185,19 @@ connection_closed(ClientId, ChanPid) -> get_chan_info(ClientId) -> with_channel(ClientId, fun(ChanPid) -> get_chan_info(ClientId, ChanPid) end). --spec(get_chan_info(emqx_types:clientid(), chan_pid()) +-spec(do_get_chan_info(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:infos())). -get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_info(ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try ets:lookup_element(?CHAN_INFO_TAB, Chan, 2) catch error:badarg -> undefined - end; + end. + +-spec(get_chan_info(emqx_types:clientid(), chan_pid()) + -> maybe(emqx_types:infos())). get_chan_info(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO). + wrap_rpc(emqx_cm_proto_v1:get_chan_info(ClientId, ChanPid)). %% @doc Update infos of the channel. -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()). @@ -206,16 +213,19 @@ set_chan_info(ClientId, Info) when is_binary(ClientId) -> get_chan_stats(ClientId) -> with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end). --spec(get_chan_stats(emqx_types:clientid(), chan_pid()) +-spec(do_get_chan_stats(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:stats())). -get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_stats(ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try ets:lookup_element(?CHAN_INFO_TAB, Chan, 3) catch error:badarg -> undefined - end; + end. + +-spec(get_chan_stats(emqx_types:clientid(), chan_pid()) + -> maybe(emqx_types:stats())). get_chan_stats(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO). + wrap_rpc(emqx_cm_proto_v1:get_chan_stats(ClientId, ChanPid)). %% @doc Set channel's stats. -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()). @@ -368,7 +378,7 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> {living, ConnMod, ChanPid, Session} end; do_takeover_session(ClientId, ChanPid) -> - rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER). + wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)). %% @doc Discard all the sessions identified by the ClientId. -spec(discard_session(emqx_types:clientid()) -> ok). @@ -422,24 +432,20 @@ discard_session(ClientId, ChanPid) -> kick_session(ClientId, ChanPid) -> kick_session(kick, ClientId, ChanPid). -%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). -kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_kick_session(kick | discard, emqx_types:clientid(), chan_pid()) -> ok. +do_kick_session(Action, ClientId, ChanPid) -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> %% already deregistered ok; ConnMod when is_atom(ConnMod) -> ok = kick_or_kill(Action, ConnMod, ChanPid) - end; + end. + +%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). kick_session(Action, ClientId, ChanPid) -> - %% call remote node on the old APIs because we do not know if they have upgraded - %% to have kick_session/3 - Function = case Action of - discard -> discard_session; - kick -> kick_session - end, try - rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK) + wrap_rpc(emqx_cm_proto_v1:kick_session(Action, ClientId, ChanPid)) catch Error : Reason -> %% This should mostly be RPC failures. @@ -525,8 +531,8 @@ lookup_client({clientid, ClientId}) -> , Rec <- ets:lookup(emqx_channel_info, Key)]. %% @private -rpc_call(Node, Fun, Args, Timeout) -> - case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of +wrap_rpc(Result) -> + case Result of {badrpc, Reason} -> %% since emqx app 4.3.10, the 'kick' and 'discard' calls handler %% should catch all exceptions and always return 'ok'. @@ -599,14 +605,17 @@ update_stats({Tab, Stat, MaxStat}) -> Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. -get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_get_chann_conn_mod(emqx_types:clientid(), chan_pid()) -> + module() | undefined. +do_get_chann_conn_mod(ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod catch error:badarg -> undefined - end; + end. + get_chann_conn_mod(ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO). + wrap_rpc(emqx_cm_proto_v1:get_chann_conn_mod(ClientId, ChanPid)). mark_channel_connected(ChanPid) -> ?tp(emqx_cm_connected_client_count_inc, #{}), diff --git a/apps/emqx/src/emqx_cm.hrl b/apps/emqx/src/emqx_cm.hrl new file mode 100644 index 000000000..ec23e21e9 --- /dev/null +++ b/apps/emqx/src/emqx_cm.hrl @@ -0,0 +1,23 @@ +%%------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_CM_HRL). +-define(EMQX_CM_HRL, true). + +-define(T_KICK, 5_000). +-define(T_GET_INFO, 5_000). +-define(T_TAKEOVER, 15_000). + +-endif. diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 4a5b195c6..c542b460b 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -25,6 +25,8 @@ , cast/5 , multicall/4 , multicall/5 + + , unwrap_erpc/1 ]). -export_type([ badrpc/0 @@ -106,3 +108,15 @@ filter_result(Delivery) -> max_client_num() -> emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum). + +-spec unwrap_erpc(emqx_rpc:erpc(A)) -> A | {error, _Err}. +unwrap_erpc({ok, A}) -> + A; +unwrap_erpc({throw, A}) -> + {error, A}; +unwrap_erpc({error, {exception, Err, _Stack}}) -> + {error, Err}; +unwrap_erpc({error, {exit, Err}}) -> + {error, Err}; +unwrap_erpc({error, {erpc, Err}}) -> + {error, Err}. diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index e55367530..eaa5bd67b 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -145,7 +145,7 @@ list(Enable) -> ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}). -spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) -> - ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}. +{ok, #?TRACE{}} | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}. create(Trace) -> case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of true -> @@ -291,7 +291,9 @@ insert_new_trace(Trace) -> #?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace, Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter}, case mnesia:match_object(?TRACE, Match, read) of - [] -> mnesia:write(?TRACE, Trace, write); + [] -> + ok = mnesia:write(?TRACE, Trace, write), + {ok, Trace}; [#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name}) end; [#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name}) diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index d55fef88f..8bf777935 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -25,9 +25,6 @@ , list_client_subscriptions/2 , list_subscriptions_via_topic/2 - , lookup_client/2 - , kickout_client/2 - , start_listener/2 , stop_listener/2 , restart_listener/2 @@ -48,15 +45,6 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). --spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. -kickout_client(Node, ClientId) -> - rpc:call(Node, emqx_cm, kick_session, [ClientId]). - --spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> - [emqx_cm:channel_info()] | {badrpc, _}. -lookup_client(Node, Key) -> - rpc:call(Node, emqx_cm, lookup_client, [Key]). - -spec list_client_subscriptions(node(), emqx_types:clientid()) -> [{emqx_types:topic(), emqx_types:subopts()}] | emqx_rpc:badrpc(). diff --git a/apps/emqx/src/proto/emqx_cm_proto_v1.erl b/apps/emqx/src/proto/emqx_cm_proto_v1.erl new file mode 100644 index 000000000..e8f0115cb --- /dev/null +++ b/apps/emqx/src/proto/emqx_cm_proto_v1.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cm_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , lookup_client/2 + , kickout_client/2 + + , get_chan_stats/2 + , get_chan_info/2 + , get_chann_conn_mod/2 + + , takeover_session/2 + , kick_session/3 + ]). + +-include("bpapi.hrl"). +-include("emqx_cm.hrl"). + +introduced_in() -> + "5.0.0". + +-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. +kickout_client(Node, ClientId) -> + rpc:call(Node, emqx_cm, kick_session, [ClientId]). + +-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [emqx_cm:channel_info()] | {badrpc, _}. +lookup_client(Node, Key) -> + rpc:call(Node, emqx_cm, lookup_client, [Key]). + +-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:stats() | {badrpc, _}. +get_chan_stats(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:infos() | {badrpc, _}. +get_chan_info(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> module() | undefined | {badrpc, _}. +get_chann_conn_mod(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> + none + | {expired | persistent, emqx_session:session()} + | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} + | {badrpc, _}. +takeover_session(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2). + +-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}. +kick_session(Action, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2). diff --git a/apps/emqx/test/emqx_bpapi_static_checks.erl b/apps/emqx/test/emqx_bpapi_static_checks.erl index 4d742c600..57f6d95ea 100644 --- a/apps/emqx/test/emqx_bpapi_static_checks.erl +++ b/apps/emqx/test/emqx_bpapi_static_checks.erl @@ -49,9 +49,18 @@ %% List of known RPC backend modules: -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). %% List of known functions also known to do RPC: --define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5"). +-define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5, " + "emqx_plugin_libs_rule:cluster_call/3"). %% List of functions in the RPC backend modules that we can ignore: --define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0"). +-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0, emqx_rpc:unwrap_erpc/1, rpc:pmap/3"). % TODO: handle pmap +%% List of business-layer functions that are exempt from the checks: +-define(EXEMPTIONS, + "emqx_mgmt_api:do_query/6," % Reason: legacy code. A fun and a QC query are + % passed in the args, it's futile to try to statically + % check it + "emqx_plugin_libs_rule:cluster_call/3" % Reason: some sort of external plugin API that we + % don't want to break? + ). -define(XREF, myxref). @@ -61,15 +70,20 @@ -spec run() -> boolean(). run() -> - dump(), %% TODO: check return value - Dumps = filelib:wildcard(dumps_dir() ++ "/*.bpapi"), - case Dumps of - [] -> - ?ERROR("No BPAPI dumps are found in ~s, abort", [dumps_dir()]), - false; - _ -> - ?NOTICE("Running API compatibility checks for ~p", [Dumps]), - check_compat(Dumps) + case dump() of + true -> + Dumps = filelib:wildcard(dumps_dir() ++ "/*.bpapi"), + case Dumps of + [] -> + ?ERROR("No BPAPI dumps are found in ~s, abort", [dumps_dir()]), + false; + _ -> + ?NOTICE("Running API compatibility checks for ~p", [Dumps]), + check_compat(Dumps) + end; + false -> + ?CRITICAL("Backplane API violations found on the current branch."), + false end. -spec check_compat([file:filename()]) -> boolean(). @@ -207,8 +221,8 @@ prepare(#{reldir := RelDir, plt := PLT}) -> dialyzer_plt:from_file(PLT). find_remote_calls(_Opts) -> - Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "] : Mod) - || (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - " ?IGNORED_RPC_CALLS ")", + Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "]:Mod - [" ?EXEMPTIONS "]) + || (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - [" ?IGNORED_RPC_CALLS "])", {ok, Calls} = xref:q(?XREF, Query), ?INFO("Calls to RPC modules ~p", [Calls]), {Callers, _Callees} = lists:unzip(Calls), diff --git a/apps/emqx/test/emqx_bpapi_suite.erl b/apps/emqx/test/emqx_bpapi_suite.erl index 5d0a313f8..ab86e5211 100644 --- a/apps/emqx/test/emqx_bpapi_suite.erl +++ b/apps/emqx/test/emqx_bpapi_suite.erl @@ -19,6 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/logger.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). @@ -28,7 +29,8 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> - ok. + ?NOTICE("If this test suite failed, and you are unsure why, read this:~n" + "https://github.com/emqx/emqx/blob/master/apps/emqx/src/bpapi/README.md", []). t_run_check(_) -> ?assertMatch(true, emqx_bpapi_static_checks:run()). diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 4ff8d501e..f28fd6bd8 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -283,6 +283,7 @@ flush_emqx_pool() -> t_discard_session_race(_) -> ClientId = rand_client_id(), ?check_trace( + #{timetrap => 60000}, begin #{conninfo := ConnInfo0} = ?ChanInfo, ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection}, @@ -290,12 +291,9 @@ t_discard_session_race(_) -> ok = emqx_cm:register_channel(ClientId, Pid, ConnInfo), Pid ! stop, receive {'DOWN', Ref, process, Pid, normal} -> ok end, - ok = emqx_cm:discard_session(ClientId), - {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000) + ?assertMatch(ok, emqx_cm:discard_session(ClientId)) end, - fun(_, _) -> - true - end). + []). t_takeover_session(_) -> #{conninfo := ConnInfo} = ?ChanInfo, diff --git a/apps/emqx/test/emqx_trace_SUITE.erl b/apps/emqx/test/emqx_trace_SUITE.erl index a1cad9d27..b5d761cc1 100644 --- a/apps/emqx/test/emqx_trace_SUITE.erl +++ b/apps/emqx/test/emqx_trace_SUITE.erl @@ -62,7 +62,7 @@ t_base_create_delete(_Config) -> end_at => End }, AnotherTrace = Trace#{name => <<"anotherTrace">>}, - ok = emqx_trace:create(Trace), + {ok, _} = emqx_trace:create(Trace), ?assertEqual({error, {already_existed, Name}}, emqx_trace:create(Trace)), ?assertEqual({error, {duplicate_condition, Name}}, emqx_trace:create(AnotherTrace)), [TraceRec] = emqx_trace:list(), @@ -95,13 +95,13 @@ t_create_size_max(_Config) -> Name = list_to_binary("name" ++ integer_to_list(Seq)), Trace = [{name, Name}, {type, topic}, {topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}], - ok = emqx_trace:create(Trace) + {ok, _} = emqx_trace:create(Trace) end, lists:seq(1, 30)), Trace31 = [{<<"name">>, <<"name31">>}, {<<"type">>, topic}, {<<"topic">>, <<"/x/y/31">>}], {error, _} = emqx_trace:create(Trace31), ok = emqx_trace:delete(<<"name30">>), - ok = emqx_trace:create(Trace31), + {ok, _} = emqx_trace:create(Trace31), ?assertEqual(30, erlang:length(emqx_trace:list())), ok. @@ -145,7 +145,7 @@ t_create_failed(_Config) -> t_create_default(_Config) -> {error, "name required"} = emqx_trace:create([]), - ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, + {ok, _} = emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"type">>, clientid}, {<<"clientid">>, <<"good">>}]), [#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(), ok = emqx_trace:clear(), @@ -166,7 +166,7 @@ t_create_default(_Config) -> {<<"end_at">>, to_rfc3339(Now + 3)} ], {error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2), - ok = emqx_trace:create([{<<"name">>, <<"test-name">>}, + {ok, _} = emqx_trace:create([{<<"name">>, <<"test-name">>}, {<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}]), [#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(), ?assertEqual(10 * 60, End - Start), @@ -182,7 +182,7 @@ t_create_with_extra_fields(_Config) -> {<<"clientid">>, <<"dev001">>}, {<<"ip_address">>, <<"127.0.0.1">>} ], - ok = emqx_trace:create(Trace), + {ok, _} = emqx_trace:create(Trace), ?assertMatch([#emqx_trace{name = <<"test-name">>, filter = <<"/x/y/z">>, type = topic}], emqx_trace:list()), ok. @@ -191,7 +191,7 @@ t_update_enable(_Config) -> Name = <<"test-name">>, Now = erlang:system_time(second), End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)), - ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic}, + {ok, _} = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]), [#emqx_trace{enable = Enable}] = emqx_trace:list(), ?assertEqual(Enable, true), @@ -219,8 +219,8 @@ t_load_state(_Config) -> Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, topic}, {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)}, {<<"end_at">>, to_rfc3339(Now)}], - ok = emqx_trace:create(Running), - ok = emqx_trace:create(Waiting), + {ok, _} = emqx_trace:create(Running), + {ok, _} = emqx_trace:create(Waiting), {error, "end_at time has already passed"} = emqx_trace:create(Finished), Traces = emqx_trace:format(emqx_trace:list()), ?assertEqual(2, erlang:length(Traces)), @@ -241,7 +241,7 @@ t_client_event(_Config) -> Now = erlang:system_time(second), Start = to_rfc3339(Now), Name = <<"test_client_id_event">>, - ok = emqx_trace:create([{<<"name">>, Name}, + {ok, _} = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), ok = emqx_trace_handler_SUITE:filesync(Name, clientid), {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), @@ -250,7 +250,7 @@ t_client_event(_Config) -> ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]), ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]), ok = emqx_trace_handler_SUITE:filesync(Name, clientid), - ok = emqx_trace:create([{<<"name">>, <<"test_topic">>}, + {ok, _} = emqx_trace:create([{<<"name">>, <<"test_topic">>}, {<<"type">>, topic}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]), ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic), {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)), @@ -279,7 +279,7 @@ t_get_log_filename(_Config) -> {<<"start_at">>, list_to_binary(Start)}, {<<"end_at">>, list_to_binary(End)} ], - ok = emqx_trace:create(Trace), + {ok, _} = emqx_trace:create(Trace), ?assertEqual({error, not_found}, emqx_trace:get_trace_filename(<<"test">>)), ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))), ct:sleep(3000), diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 40957bd3c..e7e922397 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -29,7 +29,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, handle_continue/2, code_change/3]). --export_type([txn_id/0, succeed_num/0, multicall_return/0]). +-export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]). -ifdef(TEST). -compile(export_all). @@ -48,9 +48,11 @@ -type succeed_num() :: pos_integer() | all. --type multicall_return() :: {ok, txn_id(), _Result} - | {error, term()} - | {retry, txn_id(), _Result, node()}. +-type multicall_return(Result) :: {ok, txn_id(), Result} + | {error, term()} + | {retry, txn_id(), Result, node()}. + +-type multicall_return() :: multicall_return(_). %%%=================================================================== %%% API diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 1fbdb9eca..802bde288 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -1,3 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -module(emqx_dashboard_swagger). -include_lib("typerefl/include/types.hrl"). @@ -313,6 +329,7 @@ responses(Responses, Module) -> response(Status, Bin, {Acc, RefsAcc, Module}) when is_binary(Bin) -> {Acc#{integer_to_binary(Status) => #{description => Bin}}, RefsAcc, Module}; %% Support swagger raw object(file download). +%% TODO: multi type response(i.e. Support both 'application/json' and 'plain/text') response(Status, #{content := _} = Content, {Acc, RefsAcc, Module}) -> {Acc#{integer_to_binary(Status) => Content}, RefsAcc, Module}; response(Status, ?REF(StructName), {Acc, RefsAcc, Module}) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl index d0680e715..4df1b5573 100644 --- a/apps/emqx_exhook/src/emqx_exhook_api.erl +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -32,9 +32,6 @@ -define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_RPC, 'BAD_RPC'). --type rpc_result() :: {error, any()} - | any(). - -dialyzer([{nowarn_function, [ fill_cluster_server_info/5 , nodes_server_info/5 , fill_server_hooks_info/4 @@ -285,7 +282,7 @@ get_nodes_server_info(Name) -> %% GET /exhooks %%-------------------------------------------------------------------- nodes_all_server_info(ConfL) -> - AllInfos = call_cluster(emqx_exhook_mgr, all_servers_info, []), + AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:all_servers_info(Nodes) end), Default = emqx_exhook_metrics:new_metrics_info(), node_all_server_info(ConfL, AllInfos, Default, []). @@ -324,7 +321,7 @@ fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) -> %% GET /exhooks/{name} %%-------------------------------------------------------------------- nodes_server_info(Name) -> - InfoL = call_cluster(emqx_exhook_mgr, server_info, [Name]), + InfoL = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_info(Nodes, Name) end), Default = emqx_exhook_metrics:new_metrics_info(), nodes_server_info(InfoL, Name, Default, [], []). @@ -359,7 +356,7 @@ get_nodes_server_hooks_info(Name) -> case emqx_exhook_mgr:hooks(Name) of [] -> []; Hooks -> - AllInfos = call_cluster(emqx_exhook_mgr, server_hooks_metrics, [Name]), + AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_hooks_metrics(Nodes, Name) end), Default = emqx_exhook_metrics:new_metrics_info(), get_nodes_server_hooks_info(Hooks, AllInfos, Default, []) end. @@ -387,16 +384,10 @@ fill_server_hooks_info([], _Name, _Default, MetricsL) -> %%-------------------------------------------------------------------- %% cluster call %%-------------------------------------------------------------------- -call_cluster(Module, Fun, Args) -> + +-spec call_cluster(fun(([node()]) -> emqx_rpc:erpc_multicall(A))) -> + [{node(), A | {error, _Err}}]. +call_cluster(Fun) -> Nodes = mria_mnesia:running_nodes(), - [{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes]. - --spec rpc_call(node(), atom(), atom(), list()) -> rpc_result(). -rpc_call(Node, Module, Fun, Args) when Node =:= node() -> - erlang:apply(Module, Fun, Args); - -rpc_call(Node, Module, Fun, Args) -> - case rpc:call(Node, Module, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. + Ret = Fun(Nodes), + lists:zip(Nodes, lists:map(fun emqx_rpc:unwrap_erpc/1, Ret)). diff --git a/apps/emqx_exhook/src/proto/emqx_exhook_proto_v1.erl b/apps/emqx_exhook/src/proto/emqx_exhook_proto_v1.erl new file mode 100644 index 000000000..a61acf1e7 --- /dev/null +++ b/apps/emqx_exhook/src/proto/emqx_exhook_proto_v1.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , all_servers_info/1 + , server_info/2 + , server_hooks_metrics/2 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec all_servers_info([node()]) -> + emqx_rpc:erpc_multicall(map()). +all_servers_info(Nodes) -> + erpc:multicall(Nodes, emqx_exhook_mgr, all_servers_info, []). + +-spec server_info([node()], emqx_exhook_mgr:server_name()) -> + emqx_rpc:erpc_multicall(map()). +server_info(Nodes, Name) -> + erpc:multicall(Nodes, emqx_exhook_mgr, server_info, [Name]). + +-spec server_hooks_metrics([node()], emqx_exhook_mgr:server_name()) -> + emqx_rpc:erpc_multicall(emqx_exhook_metrics:hooks_metrics()). +server_hooks_metrics(Nodes, Name) -> + erpc:multicall(Nodes, emqx_exhook_mgr, server_hooks_metrics, [Name]). diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 86dbb69af..eddd25853 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -304,10 +304,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, %% format funcs format_channel_info({_, Infos, Stats} = R) -> + Node = maps:get(node, Infos, node()), ClientInfo = maps:get(clientinfo, Infos, #{}), ConnInfo = maps:get(conninfo, Infos, #{}), SessInfo = maps:get(session, Infos, #{}), - FetchX = [ {node, ClientInfo, node()} + FetchX = [ {node, ClientInfo, Node} , {clientid, ClientInfo} , {username, ClientInfo} , {proto_name, ConnInfo} diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 647d3a6a7..d977b0933 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -36,6 +36,7 @@ , register_channel/4 , unregister_channel/2 , insert_channel_info/4 + , lookup_by_clientid/2 , set_chan_info/3 , set_chan_info/4 , get_chan_info/2 @@ -63,6 +64,20 @@ , code_change/3 ]). +%% RPC targets +-export([ do_lookup_by_clientid/2 + , do_get_chan_info/3 + , do_set_chan_info/4 + , do_get_chan_stats/3 + , do_set_chan_stats/4 + , do_discard_session/3 + , do_kick_session/3 + , do_get_chann_conn_mod/3 + ]). + +-export_type([ gateway_name/0 + ]). + -record(state, { gwname :: gateway_name(), %% Gateway Name locker :: pid(), %% ClientId Locker for CM @@ -146,16 +161,38 @@ get_chan_info(GwName, ClientId) -> get_chan_info(GwName, ClientId, ChanPid) end). --spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) +-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> + [pid()]. +do_lookup_by_clientid(GwName, ClientId) -> + ChanTab = emqx_gateway_cm:tabname(chan, GwName), + [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)]. + +-spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:infos() | undefined. -get_chan_info(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_info(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, - try ets:lookup_element(tabname(info, GwName), Chan, 2) + try + Info = ets:lookup_element(tabname(info, GwName), Chan, 2), + Info#{node => node()} catch error:badarg -> undefined - end; + end. + +-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:infos() | undefined. get_chan_info(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_info, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)). + +-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) -> + [pid()]. +lookup_by_clientid(GwName, ClientId) -> + Nodes = mria_mnesia:running_nodes(), + case emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId) of + {Pids, []} -> + lists:append(Pids); + {_, _BadNodes} -> + error(badrpc) + end. %% @doc Update infos of the channel. -spec set_chan_info(gateway_name(), @@ -164,18 +201,23 @@ get_chan_info(GwName, ClientId, ChanPid) -> set_chan_info(GwName, ClientId, Infos) -> set_chan_info(GwName, ClientId, self(), Infos). --spec set_chan_info(gateway_name(), - emqx_types:clientid(), - pid(), - emqx_types:infos()) -> boolean(). -set_chan_info(GwName, ClientId, ChanPid, Infos) when node(ChanPid) == node() -> +-spec do_set_chan_info(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean(). +do_set_chan_info(GwName, ClientId, ChanPid, Infos) -> Chan = {ClientId, ChanPid}, try ets:update_element(tabname(info, GwName), Chan, {2, Infos}) catch error:badarg -> false - end; + end. + +-spec set_chan_info(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean(). set_chan_info(GwName, ClientId, ChanPid, Infos) -> - rpc_call(node(ChanPid), set_chan_info, [GwName, ClientId, ChanPid, Infos]). + wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_info(GwName, ClientId, ChanPid, Infos)). %% @doc Get channel's stats. -spec get_chan_stats(gateway_name(), emqx_types:clientid()) @@ -186,16 +228,19 @@ get_chan_stats(GwName, ClientId) -> get_chan_stats(GwName, ClientId, ChanPid) end). --spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) +-spec do_get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) -> emqx_types:stats() | undefined. -get_chan_stats(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +do_get_chan_stats(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try ets:lookup_element(tabname(info, GwName), Chan, 3) catch error:badarg -> undefined - end; + end. + +-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:stats() | undefined. get_chan_stats(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chan_stats, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_stats(GwName, ClientId, ChanPid)). -spec set_chan_stats(gateway_name(), emqx_types:clientid(), @@ -203,19 +248,23 @@ get_chan_stats(GwName, ClientId, ChanPid) -> set_chan_stats(GwName, ClientId, Stats) -> set_chan_stats(GwName, ClientId, self(), Stats). +-spec do_set_chan_stats(gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:stats()) -> boolean(). +do_set_chan_stats(GwName, ClientId, ChanPid, Stats) -> + Chan = {ClientId, ChanPid}, + try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) + catch + error:badarg -> false + end. + -spec set_chan_stats(gateway_name(), emqx_types:clientid(), pid(), emqx_types:stats()) -> boolean(). -set_chan_stats(GwName, ClientId, ChanPid, Stats) - when node(ChanPid) == node() -> - Chan = {ClientId, self()}, - try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) - catch - error:badarg -> false - end; set_chan_stats(GwName, ClientId, ChanPid, Stats) -> - rpc_call(node(ChanPid), set_chan_stats, [GwName, ClientId, ChanPid, Stats]). + wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_stats(GwName, ClientId, ChanPid, Stats)). -spec connection_closed(gateway_name(), emqx_types:clientid()) -> true. connection_closed(GwName, ClientId) -> @@ -297,11 +346,11 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> discard_session(GwName, ClientId) when is_binary(ClientId) -> case lookup_channels(GwName, ClientId) of [] -> ok; - ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(GwName, ClientId, Pid) end, ChanPids) + ChanPids -> lists:foreach(fun(Pid) -> safe_discard_session(GwName, ClientId, Pid) end, ChanPids) end. %% @private -do_discard_session(GwName, ClientId, Pid) -> +safe_discard_session(GwName, ClientId, Pid) -> try discard_session(GwName, ClientId, Pid) catch @@ -315,17 +364,20 @@ do_discard_session(GwName, ClientId, Pid) -> ok end. -%% @private -discard_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_discard_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. +do_discard_session(GwName, ClientId, ChanPid) -> case get_chann_conn_mod(GwName, ClientId, ChanPid) of undefined -> ok; ConnMod when is_atom(ConnMod) -> ConnMod:call(ChanPid, discard, ?T_TAKEOVER) - end; + end. %% @private +-spec discard_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. discard_session(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), discard_session, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:discard_session(GwName, ClientId, ChanPid)). -spec kick_session(gateway_name(), emqx_types:clientid()) -> {error, any()} @@ -346,16 +398,20 @@ kick_session(GwName, ClientId) -> kick_session(GwName, ClientId, ChanPid) end. -kick_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_kick_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. +do_kick_session(GwName, ClientId, ChanPid) -> case get_chan_info(GwName, ClientId, ChanPid) of #{conninfo := #{conn_mod := ConnMod}} -> ConnMod:call(ChanPid, kick, ?T_TAKEOVER); undefined -> {error, not_found} - end; + end. +-spec kick_session(gateway_name(), emqx_types:clientid(), pid()) -> + _. kick_session(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), kick_session, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:kick_session(GwName, ClientId, ChanPid)). with_channel(GwName, ClientId, Fun) -> case lookup_channels(GwName, ClientId) of @@ -369,14 +425,17 @@ with_channel(GwName, ClientId, Fun) -> lookup_channels(GwName, ClientId) -> emqx_gateway_cm_registry:lookup_channels(GwName, ClientId). -get_chann_conn_mod(GwName, ClientId, ChanPid) when node(ChanPid) == node() -> +-spec do_get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom(). +do_get_chann_conn_mod(GwName, ClientId, ChanPid) -> Chan = {ClientId, ChanPid}, try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod catch error:badarg -> undefined - end; + end. + +-spec get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom(). get_chann_conn_mod(GwName, ClientId, ChanPid) -> - rpc_call(node(ChanPid), get_chann_conn_mod, [GwName, ClientId, ChanPid]). + wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)). %% Locker @@ -398,8 +457,8 @@ locker_unlock(Locker, ClientId) -> ekka_locker:release(Locker, ClientId, quorum). %% @private -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of +wrap_rpc(Ret) -> + case Ret of {badrpc, Reason} -> error(Reason); Res -> Res end. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 641f29932..bb4bfa7f9 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -47,9 +47,7 @@ %% Mgmt APIs - clients -export([ lookup_client/3 - , lookup_client/4 , kickout_client/2 - , kickout_client/3 , list_client_subscriptions/2 , client_subscribe/4 , client_unsubscribe/3 @@ -231,41 +229,28 @@ confexp({error, Reason}) -> error(Reason). %%-------------------------------------------------------------------- -spec lookup_client(gateway_name(), - emqx_types:clientid(), {atom(), atom()}) -> list(). -lookup_client(GwName, ClientId, FormatFun) -> - lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) - || Node <- mria_mnesia:running_nodes()]). - -lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() -> - ChanTab = emqx_gateway_cm:tabname(chan, GwName), - InfoTab = emqx_gateway_cm:tabname(info, GwName), - - lists:append(lists:map( - fun(Key) -> - lists:map(fun M:F/1, ets:lookup(InfoTab, Key)) - end, ets:lookup(ChanTab, ClientId))); - -lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> - rpc_call(Node, lookup_client, - [Node, GwName, {clientid, ClientId}, FormatFun]). + emqx_types:clientid(), {module(), atom()}) -> list(). +lookup_client(GwName, ClientId, {M, F}) -> + [begin + Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid), + Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid), + M:F({{ClientId, Pid}, Info, Stats}) + end + || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)]. -spec kickout_client(gateway_name(), emqx_types:clientid()) -> {error, any()} | ok. kickout_client(GwName, ClientId) -> - Results = [kickout_client(Node, GwName, ClientId) - || Node <- mria_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of - true -> ok; - false -> lists:last(Results) + Results = [emqx_gateway_cm:kick_session(GwName, ClientId, Pid) + || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)], + IsOk = lists:any(fun(Item) -> Item =:= ok end, Results), + case {IsOk, Results} of + {true , _ } -> ok; + {_ , []} -> {error, not_found}; + {false, _ } -> lists:last(Results) end. -kickout_client(Node, GwName, ClientId) when Node =:= node() -> - emqx_gateway_cm:kick_session(GwName, ClientId); - -kickout_client(Node, GwName, ClientId) -> - rpc_call(Node, kickout_client, [Node, GwName, ClientId]). - -spec list_client_subscriptions(gateway_name(), emqx_types:clientid()) -> {error, any()} | {ok, list()}. @@ -459,9 +444,3 @@ to_list(B) when is_binary(B) -> %%-------------------------------------------------------------------- %% Internal funcs - -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. diff --git a/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl b/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl new file mode 100644 index 000000000..6aa5521c5 --- /dev/null +++ b/apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl @@ -0,0 +1,77 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_cm_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , get_chan_info/3 + , set_chan_info/4 + , get_chan_stats/3 + , set_chan_stats/4 + , discard_session/3 + , kick_session/3 + , get_chann_conn_mod/3 + , lookup_by_clientid/3 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec lookup_by_clientid([node()], emqx_gateway_cm:gateway_name(), emqx_types:clientid()) -> + emqx_rpc:multicall_return([pid()]). +lookup_by_clientid(Nodes, GwName, ClientId) -> + rpc:multicall(Nodes, emqx_gateway_cm, do_lookup_by_clientid, [GwName, ClientId]). + +-spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:infos() | undefined | {badrpc, _}. +get_chan_info(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_info, [GwName, ClientId, ChanPid]). + +-spec set_chan_info(emqx_gateway_cm:gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:infos()) -> boolean() | {badrpc, _}. +set_chan_info(GwName, ClientId, ChanPid, Infos) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_info, [GwName, ClientId, ChanPid, Infos]). + +-spec get_chan_stats(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) + -> emqx_types:stats() | undefined | {badrpc, _}. +get_chan_stats(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_stats, [GwName, ClientId, ChanPid]). + +-spec set_chan_stats(emqx_gateway_cm:gateway_name(), + emqx_types:clientid(), + pid(), + emqx_types:stats()) -> boolean() | {badrpc, _}. +set_chan_stats(GwName, ClientId, ChanPid, Stats) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_stats, [GwName, ClientId, ChanPid, Stats]). + +-spec discard_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _. +discard_session(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_discard_session, [GwName, ClientId, ChanPid]). + +-spec kick_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _. +kick_session(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_kick_session, [GwName, ClientId, ChanPid]). + +-spec get_chann_conn_mod(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> atom() | {badrpc, _}. +get_chann_conn_mod(GwName, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chann_conn_mod, [GwName, ClientId, ChanPid]). diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl index 82d97a166..558e90bd3 100644 --- a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl @@ -125,7 +125,7 @@ t_get_set_chan_info_stats(_) -> #{clientinfo => clientinfo(), conninfo => conninfo()}, []), %% Info: get/set - NInfo = #{newinfo => true}, + NInfo = #{newinfo => true, node => node()}, emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo), ?assertEqual( NInfo, @@ -200,6 +200,7 @@ t_kick_session(_) -> 100 -> ?assert(false, "waiting kick msg timeout") end, + ?assertMatch({error, not_found}, emqx_gateway_http:kickout_client(?GWNAME, <<"i-dont-exist">>)), meck:unload(emqx_gateway_cm_registry). t_unexpected_handle(Conf) -> diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index ab333cf87..fded10e03 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -1799,6 +1799,7 @@ t_clients_api(_) -> %% kickout {204, _} = request(delete, "/gateway/mqttsn/clients/client_id_test1"), + timer:sleep(100), {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"), send_disconnect_msg(Socket, undefined), diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 2bf92a3c7..8148e4487 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -235,7 +235,6 @@ nodes_info_count(PropList) -> %%-------------------------------------------------------------------- lookup_client({clientid, ClientId}, FormatFun) -> - lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]); @@ -244,9 +243,13 @@ lookup_client({username, Username}, FormatFun) -> || Node <- mria_mnesia:running_nodes()]). lookup_client(Node, Key, {M, F}) -> - case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of + case wrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of {error, Err} -> {error, Err}; - L -> lists:map(fun M:F/1, L) + L -> lists:map(fun({Chan, Info0, Stats}) -> + Info = Info0#{node => Node}, + M:F({Chan, Info, Stats}) + end, + L) end. kickout_client({ClientID, FormatFun}) -> @@ -259,7 +262,7 @@ kickout_client({ClientID, FormatFun}) -> end. kickout_client(Node, ClientId) -> - wrap_rpc(emqx_broker_proto_v1:kickout_client(Node, ClientId)). + wrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)). list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 6136cab3a..3ca1266eb 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -197,12 +197,15 @@ do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation, %% Do Query (or rpc query) %%-------------------------------------------------------------------- -%% @private +%% @private This function is exempt from BPAPI do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() -> erlang:apply(M, F, [Tab, Qs, Continuation, Limit]); do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) -> - rpc_call(Node, ?MODULE, do_query, - [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000). + case rpc:call(Node, ?MODULE, do_query, + [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000) of + {badrpc, _} = R -> {error, R}; + Ret -> Ret + end. sub_query_result(Len, Rows, Limit, Results, Meta) -> {Flag, NMeta} = judge_page_with_counting(Len, Meta), @@ -219,13 +222,6 @@ sub_query_result(Len, Rows, Limit, Results, Meta) -> end, {NMeta, NResults}. -%% @private -rpc_call(Node, M, F, A, T) -> - case rpc:call(Node, M, F, A, T) of - {badrpc, _} = R -> {error, R}; - Res -> Res - end. - %%-------------------------------------------------------------------- %% Table Select %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 298d9da9a..9cc5d3016 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -473,7 +473,7 @@ keepalive_api() -> ], responses => #{ <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), - <<"400">> => emqx_mgmt_util:error_schema(<<"">>, 'PARAMS_ERROR'), + <<"400">> => emqx_mgmt_util:error_schema(<<"">>, ['PARAMS_ERROR']), <<"200">> => emqx_mgmt_util:schema(<<"ok">>)}}}, {"/clients/:clientid/keepalive", Metadata, set_keepalive}. %%%============================================================================================== @@ -732,13 +732,17 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | %% format funcs format_channel_info({_, ClientInfo, ClientStats}) -> + Node = case ClientInfo of + #{node := N} -> N; + _ -> node() + end, StatsMap = maps:without([memory, next_pkt_id, total_heap_size], maps:from_list(ClientStats)), ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo), {IpAddress, Port} = peername_dispart(maps:get(peername, ClientInfoMap0)), Connected = maps:get(conn_state, ClientInfoMap0) =:= connected, ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0), - ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1), + ClientInfoMap2 = maps:put(node, Node, ClientInfoMap1), ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3), ClientInfoMap = maps:put(connected, Connected, ClientInfoMap4), @@ -801,4 +805,3 @@ format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) -> result => AuthzResult, updated_time => Timestamp }. - diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index efab75791..599073704 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -158,12 +158,12 @@ configs(get, Params, _Req) -> case lists:member(Node, mria_mnesia:running_nodes()) andalso - rpc:call(Node, ?MODULE, get_full_config, []) + emqx_management_proto_v1:get_full_config(Node) of false -> Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])), {500, #{code => 'BAD_NODE', message => Message}}; - {error, {badrpc, R}} -> + {badrpc, R} -> Message = list_to_binary(io_lib:format("Bad node ~p, reason ~p", [Node, R])), {500, #{code => 'BAD_NODE', message => Message}}; Res -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 17fcafe24..d32e9f7cd 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -261,7 +261,7 @@ trace(get, _Params) -> end; trace(post, #{body := Param}) -> case emqx_trace:create(Param) of - ok -> {200}; + {ok, Trace0} -> {200, format_trace(Trace0)}; {error, {already_existed, Name}} -> {400, #{ code => 'ALREADY_EXISTED', @@ -280,11 +280,27 @@ trace(post, #{body := Param}) -> end; trace(delete, _Param) -> ok = emqx_trace:clear(), - {200}. + {204}. + +format_trace(Trace0) -> + [ + #{start_at := Start, end_at := End, + enable := Enable, type := Type, filter := Filter} = Trace1 + ] = emqx_trace:format([Trace0]), + Now = erlang:system_time(second), + LogSize = lists:foldl(fun(Node, Acc) -> Acc#{Node => 0} end, #{}, + mria_mnesia:running_nodes()), + Trace2 = maps:without([enable, filter], Trace1), + Trace2#{log_size => LogSize + , Type => iolist_to_binary(Filter) + , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start)) + , end_at => list_to_binary(calendar:system_time_to_rfc3339(End)) + , status => status(Enable, Start, End, Now) + }. delete_trace(delete, #{bindings := #{name := Name}}) -> case emqx_trace:delete(Name) of - ok -> {200}; + ok -> {204}; {error, not_found} -> ?NOT_FOUND(Name) end. diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index cd5474261..34aff22fb 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -490,7 +490,7 @@ trace_cluster_on(Name, Type, Filter, DurationS0) -> , end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS)) }, case emqx_trace:create(Trace) of - ok -> + {ok, _} -> emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]); {error, Error} -> emqx_ctl:print("[error] cluster_trace ~s ~s=~s ~p~n", diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl index d3d8ff989..306145ade 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl @@ -32,6 +32,8 @@ , unsubscribe/3 , call_client/3 + + , get_full_config/1 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -77,3 +79,7 @@ unsubscribe(Node, ClientId, Topic) -> -spec call_client(node(), emqx_types:clientid(), term()) -> term(). call_client(Node, ClientId, Req) -> rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]). + +-spec get_full_config(node()) -> map() | list() | {badrpc, _}. +get_full_config(Node) -> + rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []). diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index 8e6c278ae..c73d1d046 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -69,7 +69,7 @@ t_http_test(_Config) -> ], {ok, Create} = request_api(post, api_path("trace"), Header, Trace), - ?assertEqual(<<>>, Create), + ?assertMatch(#{<<"name">> := Name}, json(Create)), {ok, List} = request_api(get, api_path("trace"), Header), [Data] = json(List), @@ -107,7 +107,7 @@ t_http_test(_Config) -> %% clear {ok, Create1} = request_api(post, api_path("trace"), Header, Trace), - ?assertEqual(<<>>, Create1), + ?assertMatch(#{<<"name">> := Name}, json(Create1)), {ok, Clear} = request_api(delete, api_path("trace"), Header), ?assertEqual(<<>>, Clear), @@ -141,7 +141,7 @@ create_trace(Name, ClientId, Start) -> ?check_trace( #{timetrap => 900}, begin - ok = emqx_trace:create([{<<"name">>, Name}, + {ok, _} = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]), ?block_until(#{?snk_kind := update_trace_done}) end, @@ -206,7 +206,7 @@ do_request_api(Method, Request) -> {error, {shutdown, server_closed}} -> {error, server_closed}; {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}} - when Code =:= 200 orelse Code =:= 201 -> + when Code =:= 200 orelse Code =:= 201 orelse Code =:= 204 -> {ok, Return}; {ok, {Reason, _Header, Body}} -> {error, Reason, Body} diff --git a/apps/emqx_modules/include/emqx_modules.hrl b/apps/emqx_modules/include/emqx_modules.hrl index 334173015..8d505969c 100644 --- a/apps/emqx_modules/include/emqx_modules.hrl +++ b/apps/emqx_modules/include/emqx_modules.hrl @@ -1,5 +1,24 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + %% The destination URL for the telemetry data report -define(TELEMETRY_URL, "https://telemetry.emqx.io/api/telemetry"). %% Interval for reporting telemetry data, Default: 7d -define(REPORT_INTERVAR, 604800). + +-define(API_TAG_MQTT, [<<"mqtt">>]). +-define(API_SCHEMA_MODULE, emqx_modules_schema). diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 56d782580..9a878f7f2 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -19,6 +19,7 @@ -behaviour(minirest_api). -include_lib("typerefl/include/types.hrl"). +-include("emqx_modules.hrl"). -import(hoconsc, [mk/2, ref/1, ref/2]). @@ -62,7 +63,7 @@ schema("/mqtt/delayed") -> #{ 'operationId' => status, get => #{ - tags => [<<"mqtt">>], + tags => ?API_TAG_MQTT, description => <<"Get delayed status">>, summary => <<"Get delayed status">>, responses => #{ @@ -70,7 +71,7 @@ schema("/mqtt/delayed") -> } }, put => #{ - tags => [<<"mqtt">>], + tags => ?API_TAG_MQTT, description => <<"Enable or disable delayed, set max delayed messages">>, 'requestBody' => ref(emqx_modules_schema, "delayed"), responses => #{ @@ -85,7 +86,7 @@ schema("/mqtt/delayed") -> schema("/mqtt/delayed/messages/:msgid") -> #{'operationId' => delayed_message, get => #{ - tags => [<<"mqtt">>], + tags => ?API_TAG_MQTT, description => <<"Get delayed message">>, parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}], responses => #{ @@ -97,7 +98,7 @@ schema("/mqtt/delayed/messages/:msgid") -> } }, delete => #{ - tags => [<<"mqtt">>], + tags => ?API_TAG_MQTT, description => <<"Delete delayed message">>, parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}], responses => #{ @@ -113,7 +114,7 @@ schema("/mqtt/delayed/messages") -> #{ 'operationId' => delayed_messages, get => #{ - tags => [<<"mqtt">>], + tags => ?API_TAG_MQTT, description => <<"List delayed messages">>, parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)], responses => #{ diff --git a/apps/emqx_modules/src/emqx_event_message_api.erl b/apps/emqx_modules/src/emqx_event_message_api.erl index 55e12f591..65c33c0d7 100644 --- a/apps/emqx_modules/src/emqx_event_message_api.erl +++ b/apps/emqx_modules/src/emqx_event_message_api.erl @@ -17,37 +17,41 @@ -behaviour(minirest_api). --export([api_spec/0]). +-import(hoconsc, [mk/2, ref/2]). +-include("emqx_modules.hrl"). + +-export([ api_spec/0 + , paths/0 + , schema/1 + ]). -export([event_message/2]). --import(emqx_mgmt_util, [ schema/1 - ]). - api_spec() -> - {[event_message_api()], []}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -conf_schema() -> - emqx_mgmt_api_configs:gen_schema(emqx:get_config([event_message])). +paths() -> + ["/mqtt/event_message"]. -event_message_api() -> - Path = "/mqtt/event_message", - Metadata = #{ - get => #{ - description => <<"Event Message">>, - responses => #{ - <<"200">> => schema(conf_schema()) +schema("/mqtt/event_message") -> + #{ 'operationId' => event_message + , get => + #{ description => <<"Event Message">> + , tags => ?API_TAG_MQTT + , responses => + #{200 => status_schema(<<"Get Event Message config successfully">>)} } - }, - put => #{ - description => <<"Update Event Message">>, - 'requestBody' => schema(conf_schema()), - responses => #{ - <<"200">> => schema(conf_schema()) + , put => + #{ description => <<"Update Event Message">> + , tags => ?API_TAG_MQTT + , 'requestBody' => status_schema(<<"Update Event Message config">>) + , responses => + #{200 => status_schema(<<"Update Event Message config successfully">>)} } - } - }, - {Path, Metadata, event_message}. + }. + +status_schema(Desc) -> + mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}). event_message(get, _Params) -> {200, emqx_event_message:list()}; diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index 0c5363e16..a7c6c2952 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -43,21 +43,41 @@ fields("delayed") -> ]; fields("rewrite") -> - [ {action, sc(hoconsc:enum([subscribe, publish, all]), #{desc => "Action", example => publish})} - , {source_topic, sc(binary(), #{desc => "Origin Topic", example => "x/#"})} - , {dest_topic, sc(binary(), #{desc => "Destination Topic", example => "z/y/$1"})} - , {re, fun regular_expression/1 } + [ { action + , sc( hoconsc:enum([subscribe, publish, all]) + , #{desc => <<"Action">>, example => publish})} + , { source_topic + , sc( binary() + , #{desc => <<"Origin Topic">>, example => "x/#"})} + , { dest_topic + , sc( binary() + , #{desc => <<"Destination Topic">>, example => "z/y/$1"})} + , { re, fun regular_expression/1 } ]; fields("event_message") -> - [ {"$event/client_connected", sc(boolean(), #{default => false})} - , {"$event/client_disconnected", sc(boolean(), #{default => false})} - , {"$event/client_subscribed", sc(boolean(), #{default => false})} - , {"$event/client_unsubscribed", sc(boolean(), #{default => false})} - , {"$event/message_delivered", sc(boolean(), #{default => false})} - , {"$event/message_acked", sc(boolean(), #{default => false})} - , {"$event/message_dropped", sc(boolean(), #{default => false})} + [ { '$event/client_connected' + , sc( boolean() + , #{desc => <<"Client connected to EMQ X event">>, default => false})} + , { '$event/client_disconnected' + , sc(boolean() + , #{desc => <<"Client disconnected to EMQ X event">>, default => false})} + , { '$event/client_subscribed' + , sc( boolean() + , #{desc => <<"Client subscribe topic event">>, default => false})} + , { '$event/client_unsubscribed' + , sc( boolean() + , #{desc => <<"Client unsubscribe topic event">>, default => false})} + , { '$event/message_delivered' + , sc( boolean() + , #{desc => <<"Message delivered event">>, default => false})} + , { '$event/message_acked' + , sc( boolean() + , #{desc => <<"Message acked event">>, default => false})} + , { '$event/message_dropped' + , sc( boolean() + , #{desc => <<"Message dropped event">>, default => false})} ]; fields("topic_metrics") -> diff --git a/apps/emqx_modules/src/emqx_rewrite_api.erl b/apps/emqx_modules/src/emqx_rewrite_api.erl index 534864c74..6590b2cac 100644 --- a/apps/emqx_modules/src/emqx_rewrite_api.erl +++ b/apps/emqx_modules/src/emqx_rewrite_api.erl @@ -17,6 +17,7 @@ -behaviour(minirest_api). -include_lib("typerefl/include/types.hrl"). +-include("emqx_modules.hrl"). -export([api_spec/0, paths/0, schema/1]). @@ -42,7 +43,7 @@ schema("/mqtt/topic_rewrite") -> #{ operationId => topic_rewrite, get => #{ - tags => [mqtt], + tags => ?API_TAG_MQTT, description => <<"List rewrite topic.">>, responses => #{ 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), @@ -51,6 +52,7 @@ schema("/mqtt/topic_rewrite") -> }, put => #{ description => <<"Update rewrite topic">>, + tags => ?API_TAG_MQTT, requestBody => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),#{}), responses => #{ 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), diff --git a/apps/emqx_modules/src/emqx_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl index db4e4620c..d879b083f 100644 --- a/apps/emqx_modules/src/emqx_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -71,13 +71,13 @@ schema("/telemetry/data") -> }. status_schema(Desc) -> - mk(ref(?MODULE, status), #{desc => Desc}). + mk(ref(?MODULE, status), #{in => body, desc => Desc}). fields(status) -> [ { enable , mk( boolean() , #{ desc => <<"Telemetry status">> - , default => false + , default => true , example => false }) } diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index 1e82f7a94..9fe583ca3 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -18,163 +18,258 @@ -behaviour(minirest_api). --import(emqx_mgmt_util, [ properties/1 - , schema/1 - , object_schema/1 - , object_schema/2 - , object_array_schema/2 - , error_schema/2 - ]). +-include_lib("typerefl/include/types.hrl"). +-include("emqx_modules.hrl"). --export([api_spec/0]). +-import( hoconsc + , [ mk/2 + , ref/1 + , ref/2 + , array/1 + , map/2]). -export([ topic_metrics/2 , operate_topic_metrics/2 ]). +-export([ cluster_accumulation_metrics/0 + , cluster_accumulation_metrics/1]). + +-export([ api_spec/0 + , paths/0 + , schema/1 + , fields/1 + ]). + -define(ERROR_TOPIC, 'ERROR_TOPIC'). - -define(EXCEED_LIMIT, 'EXCEED_LIMIT'). - -define(BAD_TOPIC, 'BAD_TOPIC'). - +-define(BAD_RPC, 'BAD_RPC'). -define(BAD_REQUEST, 'BAD_REQUEST'). api_spec() -> - {[ - topic_metrics_api(), - operation_topic_metrics_api() - ],[]}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -properties() -> - properties([ - {topic, string}, - {create_time, string, <<"Date time, rfc3339">>}, - {reset_time, string, <<"Nullable. Date time, rfc3339.">>}, - {metrics, object, [{'messages.dropped.count', integer}, - {'messages.dropped.rate', number}, - {'messages.in.count', integer}, - {'messages.in.rate', number}, - {'messages.out.count', integer}, - {'messages.out.rate', number}, - {'messages.qos0.in.count', integer}, - {'messages.qos0.in.rate', number}, - {'messages.qos0.out.count', integer}, - {'messages.qos0.out.rate', number}, - {'messages.qos1.in.count', integer}, - {'messages.qos1.in.rate', number}, - {'messages.qos1.out.count', integer}, - {'messages.qos1.out.rate', number}, - {'messages.qos2.in.count', integer}, - {'messages.qos2.in.rate', number}, - {'messages.qos2.out.count', integer}, - {'messages.qos2.out.rate', number}]} - ]). +paths() -> + [ "/mqtt/topic_metrics" + , "/mqtt/topic_metrics/:topic" + ]. -topic_metrics_api() -> - MetaData = #{ - %% Get all nodes metrics and accumulate all of these - get => #{ - description => <<"List topic metrics">>, - responses => #{ - <<"200">> => object_array_schema(properties(), <<"List topic metrics">>) - } - }, - put => #{ - description => <<"Reset topic metrics by topic name, or all">>, - 'requestBody' => object_schema(properties([ - {topic, string, <<"no topic will reset all">>}, - {action, string, <<"Action, default reset">>, [reset]} - ])), - responses => #{ - <<"200">> => schema(<<"Reset topic metrics success">>), - <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC]) - } - }, - post => #{ - description => <<"Create topic metrics">>, - 'requestBody' => object_schema(properties([{topic, string}])), - responses => #{ - <<"200">> => schema(<<"Create topic metrics success">>), - <<"409">> => error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]), - <<"400">> => error_schema( <<"Topic metrics already exist or bad topic">> - , [?BAD_REQUEST, ?BAD_TOPIC]) - } - } - }, - {"/mqtt/topic_metrics", MetaData, topic_metrics}. -operation_topic_metrics_api() -> - MetaData = #{ - get => #{ - description => <<"Get topic metrics">>, - parameters => [topic_param()], - responses => #{ - <<"200">> => object_schema(properties(), <<"Topic metrics">>), - <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC]) - }}, - delete => #{ - description => <<"Deregister topic metrics">>, - parameters => [topic_param()], - responses => #{ - <<"204">> => schema(<<"Deregister topic metrics">>), - <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC]) +schema("/mqtt/topic_metrics") -> + #{ 'operationId' => topic_metrics + , get => + #{ description => <<"List topic metrics">> + , tags => ?API_TAG_MQTT + , responses => + #{200 => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List all topic metrics">>})} } - } - }, - {"/mqtt/topic_metrics/:topic", MetaData, operate_topic_metrics}. + , put => + #{ description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">> + , tags => ?API_TAG_MQTT + , 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + ref(reset), + reset_examples()) + , responses => + #{ 204 => <<"Reset topic metrics successfully">> + , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) + } + } + , post => + #{ description => <<"Create topic metrics">> + , tags => ?API_TAG_MQTT + , 'requestBody' => [topic(body)] + , responses => + #{ 204 => <<"Create topic metrics success">> + , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics exceeded max limit 512">>) + , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already existed or bad topic">>) + } + } + }; +schema("/mqtt/topic_metrics/:topic") -> + #{ 'operationId' => operate_topic_metrics + , get => + #{ description => <<"Get topic metrics">> + , tags => ?API_TAG_MQTT + , parameters => [topic(path)] + , responses => + #{ 200 => mk(ref(topic_metrics), #{ desc => <<"Topic metrics">> }) + , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) + } + } + , delete => + #{ description => <<"Remove the topic metrics">> + , tags => ?API_TAG_MQTT + , parameters => [topic(path)] + , responses => + #{ 204 => <<"Removed topic metrics successfully">>, + 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>) + } + } + }. -topic_param() -> - #{ - name => topic, - in => path, - required => true, - description => <<"Notice: Topic string url must encode">>, - schema => #{type => string} +fields(reset) -> + [ {topic + , mk( binary() + , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reseted">> + , example => <<"testtopic/1">> + , nullable => true})} + , {action + , mk( string() + , #{ desc => <<"Action Name. Only as a \"reset\"">> + , enum => [reset] + , nullable => false + , example => <<"reset">>})} + ]; + +fields(topic_metrics) -> + [ { topic + , mk( binary() + , #{ desc => <<"Topic Name">> + , example => <<"testtopic/1">> + , nullable => false})}, + { create_time + , mk( emqx_schema:rfc3339_system_time() + , #{ desc => <<"Topic Metrics created date time, in rfc3339">> + , nullable => false + , example => <<"2022-01-14T21:48:47+08:00">>})}, + { reset_time + , mk( emqx_schema:rfc3339_system_time() + , #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reseted">> + , nullable => true + , example => <<"2022-01-14T21:48:47+08:00">>})}, + { metrics + , mk( ref(metrics) + , #{ desc => <<"Topic Metrics fields">> + , nullable => false}) + } + ]; + +fields(metrics) -> + [ { 'messages.dropped.count' + , mk( integer(), #{ desc => <<"Message dropped count">> + , example => 0})}, + { 'messages.dropped.rate' + , mk( number(), #{ desc => <<"Message dropped rate in 5s">> + , example => 0})}, + { 'messages.in.count' + , mk( integer(), #{ desc => <<"Message received count">> + , example => 0})}, + { 'messages.in.rate' + , mk( number(), #{ desc => <<"Message received rate in 5s">> + , example => 0})}, + { 'messages.out.count' + , mk( integer(), #{ desc => <<"Message sent count">> + , example => 0})}, + { 'messages.out.rate' + , mk( number(), #{ desc => <<"Message sent rate in 5s">> + , example => 0})}, + { 'messages.qos0.in.count' + , mk( integer(), #{ desc => <<"Message with QoS 0 received count">> + , example => 0})}, + { 'messages.qos0.in.rate' + , mk( number(), #{ desc => <<"Message with QoS 0 received rate in 5s">> + , example => 0})}, + { 'messages.qos0.out.count' + , mk( integer(), #{ desc => <<"Message with QoS 0 sent count">> + , example => 0})}, + { 'messages.qos0.out.rate' + , mk( number(), #{ desc => <<"Message with QoS 0 sent rate in 5s">> + , example => 0})}, + { 'messages.qos1.in.count' + , mk( integer(), #{ desc => <<"Message with QoS 1 received count">> + , example => 0})}, + { 'messages.qos1.in.rate' + , mk( number(), #{ desc => <<"Message with QoS 1 received rate in 5s">> + , example => 0})}, + { 'messages.qos1.out.count' + , mk( integer(), #{ desc => <<"Message with QoS 1 sent count">> + , example => 0})}, + { 'messages.qos1.out.rate' + , mk( number(), #{ desc => <<"Message with QoS 1 sent rate in 5s">> + , example => 0})}, + { 'messages.qos2.in.count' + , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">> + , example => 0})}, + { 'messages.qos2.in.rate' + , mk( number(), #{ desc => <<"Message with QoS 2 received rate in 5s">> + , example => 0})}, + { 'messages.qos2.out.count' + , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">> + , example => 0})}, + { 'messages.qos2.out.rate' + , mk( number(), #{ desc => <<"Message with QoS 2 sent rate in 5s">> + , example => 0})} + ]. + +topic(In) -> + case In of + body -> + Desc = <<"Raw topic string">>, + Example = "testtopic/1"; + path -> + Desc = <<"Notice: Topic string in url path must be encoded">>, + Example = "testtopic%2F1" + end, + { topic + , mk( binary(), + #{ desc => Desc + , required => true + , in => In + , example => Example + }) }. +reset_examples() -> + #{ reset_specific_one_topic_metrics => + #{ summary => <<"reset_specific_one_topic_metrics">> + , value => + #{ topic => "testtopic/1" + , action => "reset" + } + } + , reset_all_topic_metrics => + #{ summary => <<"reset_all_topic_metrics">> + , value => + #{ action => "reset" + } + } + }. + %%-------------------------------------------------------------------- %% HTTP Callbacks %%-------------------------------------------------------------------- topic_metrics(get, _) -> - case cluster_accumulation_metrics() of - {error, Reason} -> - {500, Reason}; - {ok, Metrics} -> - {200, Metrics} - end; + get_cluster_response([]); topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> case reset(Topic) of - ok -> {200}; - {error, Reason} -> reason2httpresp(Reason) + ok -> + get_cluster_response([Topic]); + {error, Reason} -> + reason2httpresp(Reason) end; topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> reset(), - {200}; + get_cluster_response([]); topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> {400, 'BAD_REQUEST', <<"Topic can not be empty">>}; topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> case emqx_modules_conf:add_topic_metrics(Topic) of {ok, Topic} -> - {200}; + get_cluster_response([Topic]); {error, Reason} -> reason2httpresp(Reason) end. operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) -> - case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of - {ok, Metrics} -> - {200, Metrics}; - {error, Reason} -> - reason2httpresp(Reason) - end; + get_cluster_response([emqx_http_lib:uri_decode(Topic0)]); operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) -> case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of - ok -> {200}; + ok -> {204}; {error, Reason} -> reason2httpresp(Reason) end. @@ -197,7 +292,8 @@ cluster_accumulation_metrics(Topic) -> {SuccResList, []} -> case lists:filter(fun({error, _}) -> false; (_) -> true end, SuccResList) of - [] -> {error, topic_not_found}; + [] -> + {error, topic_not_found}; TopicMetrics -> NTopicMetrics = [ [T] || T <- TopicMetrics], [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics), @@ -277,8 +373,8 @@ reason2httpresp(bad_topic) -> reason2httpresp({quota_exceeded, bad_topic}) -> Msg = list_to_binary( io_lib:format( - "Max topic metrics count is ~p, and topic cannot have wildcard", - [emqx_topic_metrics:max_limit()])), + "Max topic metrics count is ~p, and topic cannot have wildcard", + [emqx_topic_metrics:max_limit()])), {400, #{code => ?BAD_REQUEST, message => Msg}}; reason2httpresp(already_existed) -> Msg = <<"Topic already registered">>, @@ -289,3 +385,13 @@ reason2httpresp(topic_not_found) -> reason2httpresp(not_found) -> Msg = <<"Topic not found">>, {404, #{code => ?ERROR_TOPIC, message => Msg}}. + +get_cluster_response(Args) -> + case erlang:apply(?MODULE, cluster_accumulation_metrics, Args) of + {error, {badrpc, RPCReason}} -> + {500, RPCReason}; + {error, Reason} when is_atom(Reason) -> + reason2httpresp(Reason); + {ok, Metrics} -> + {200, Metrics} + end. diff --git a/apps/emqx_prometheus/rebar.config b/apps/emqx_prometheus/rebar.config index bd611d4eb..eccd78543 100644 --- a/apps/emqx_prometheus/rebar.config +++ b/apps/emqx_prometheus/rebar.config @@ -5,7 +5,7 @@ %% FIXME: tag this as v3.1.3 {prometheus, {git, "https://github.com/emqx/prometheus.erl", {ref, "9994c76adca40d91a2545102230ccce2423fd8a7"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.2"}}}, - {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}} + {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.10"}}} ]}. {edoc_opts, [{preprocess, true}]}. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index c129b1064..4b19c9665 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -20,48 +20,56 @@ -include("emqx_prometheus.hrl"). --import(emqx_mgmt_util, [ schema/1]). +-import(hoconsc, [ref/2]). --export([api_spec/0]). +-export([ api_spec/0 + , paths/0 + , schema/1 + ]). -export([ prometheus/2 , stats/2 ]). +-define(API_TAG_PROMETHEUS, [<<"premetheus">>]). +-define(SCHEMA_MODULE, emqx_prometheus_schema). + + api_spec() -> - {[prometheus_api(), prometheus_data_api()], []}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -conf_schema() -> - emqx_mgmt_api_configs:gen_schema(emqx:get_raw_config([prometheus])). +paths() -> + [ "/prometheus" + , "/prometheus/stats" + ]. -prometheus_api() -> - Metadata = #{ - get => #{ - description => <<"Get Prometheus info">>, - responses => #{<<"200">> => schema(conf_schema())} - }, - put => #{ - description => <<"Update Prometheus">>, - 'requestBody' => schema(conf_schema()), - responses => #{<<"200">> => schema(conf_schema())} - } - }, - {"/prometheus", Metadata, prometheus}. - -prometheus_data_api() -> - Metadata = #{ - get => #{ - description => <<"Get Prometheus Data">>, - responses => #{<<"200">> => - #{content => - #{ - 'application/json' => #{schema => #{type => object}}, - 'text/plain' => #{schema => #{type => string}} - }} +schema("/prometheus") -> + #{ 'operationId' => prometheus + , get => + #{ description => <<"Get Prometheus config info">> + , tags => ?API_TAG_PROMETHEUS + , responses => + #{200 => prometheus_config_schema()} } - } - }, - {"/prometheus/stats", Metadata, stats}. + , put => + #{ description => <<"Update Prometheus config">> + , 'requestBody' => prometheus_config_schema() + , responses => + #{200 => prometheus_config_schema()} + } + }; +schema("/prometheus/stats") -> + #{ 'operationId' => stats + , get => + #{ description => <<"Get Prometheus Data">> + , responses => + #{200 => prometheus_data_schema()} + } + }. + +%%-------------------------------------------------------------------- +%% API Handler funcs +%%-------------------------------------------------------------------- prometheus(get, _Params) -> {200, emqx:get_raw_config([<<"prometheus">>], #{})}; @@ -83,6 +91,35 @@ stats(get, #{headers := Headers}) -> end, Data = emqx_prometheus:collect(Type), case Type of - <<"json">> -> {200, Data}; - <<"prometheus">> -> {200, #{<<"content-type">> => <<"text/plain">>}, Data} + <<"json">> -> + {200, Data}; + <<"prometheus">> -> + {200, #{<<"content-type">> => <<"text/plain">>}, Data} end. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +prometheus_config_schema() -> + emqx_dashboard_swagger:schema_with_example( + ref(?SCHEMA_MODULE, "prometheus"), + prometheus_config_example()). + +prometheus_config_example() -> + #{ enable => true + , interval => "15s" + , push_gateway_server => <<"http://127.0.0.1:9091">> + }. + +prometheus_data_schema() -> + #{ description => <<"Get Prometheus Data">> + , content => + #{ 'application/json' => + #{ schema => #{type => object} + , description => <<"Prometheus Data in json">>} + , 'text/plain' => + #{ schema => #{type => string} + , description => <<"Prometheus Data in text/plain">>} + } + }. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl index ba05237cc..9d7455bdf 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl @@ -28,9 +28,9 @@ namespace() -> "prometheus". roots() -> ["prometheus"]. fields("prometheus") -> - [ {push_gateway_server, sc(string(), #{})} - , {interval, sc(emqx_schema:duration_ms(), #{default => "15s"})} - , {enable, sc(boolean(), #{default => false})} + [ {push_gateway_server, sc(string(), #{default => "http://127.0.0.1:9091", nullabel => false})} + , {interval, sc(emqx_schema:duration_ms(), #{default => "15s", nullabel => false})} + , {enable, sc(boolean(), #{default => false, nullabel => false})} ]. sc(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_resource/rebar.config b/apps/emqx_resource/rebar.config index 194aa0cd4..d5d608a71 100644 --- a/apps/emqx_resource/rebar.config +++ b/apps/emqx_resource/rebar.config @@ -15,4 +15,5 @@ ]}. {deps, [ {jsx, {git, "https://github.com/talentdeficit/jsx", {tag, "v3.1.0"}}} + , {emqx, {path, "../emqx"}} ]}. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 750b7cde5..1f14e527d 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -147,7 +147,7 @@ create(InstId, ResourceType, Config) -> -spec create(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create(InstId, ResourceType, Config, Opts) -> - cluster_call(create_local, [InstId, ResourceType, Config, Opts]). + wrap_rpc(emqx_resource_proto_v1:create(InstId, ResourceType, Config, Opts)). -spec create_local(instance_id(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. @@ -162,7 +162,7 @@ create_local(InstId, ResourceType, Config, Opts) -> -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> - cluster_call(create_dry_run_local, [ResourceType, Config]). + wrap_rpc(emqx_resource_proto_v1:create_dry_run(ResourceType, Config)). -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -172,7 +172,7 @@ create_dry_run_local(ResourceType, Config) -> -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate(InstId, ResourceType, Config, Opts) -> - cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]). + wrap_rpc(emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts)). -spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -181,7 +181,7 @@ recreate_local(InstId, ResourceType, Config, Opts) -> -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> - cluster_call(remove_local, [InstId]). + wrap_rpc(emqx_resource_proto_v1:remove(InstId)). -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. remove_local(InstId) -> @@ -366,8 +366,8 @@ call_instance(InstId, Query) -> safe_apply(Func, Args) -> ?SAFE_CALL(erlang:apply(Func, Args)). -cluster_call(Func, Args) -> - case emqx_cluster_rpc:multicall(?MODULE, Func, Args) of +wrap_rpc(Ret) -> + case Ret of {ok, _TxnId, Result} -> Result; Failed -> Failed end. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index a5d7da1a1..6308a5d60 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -192,7 +192,11 @@ do_create_dry_run(ResourceType, Config) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of - {ok, _} -> ok; + {ok, _} -> + case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of + {error, _} = Error -> Error; + _ -> ok + end; {error, Reason, _} -> {error, Reason} end; {error, Reason} -> diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl new file mode 100644 index 000000000..f32f7840a --- /dev/null +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -0,0 +1,62 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , create/4 + , create_dry_run/2 + , recreate/4 + , remove/1 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec create( emqx_resource:instance_id() + , emqx_resource:resource_type() + , emqx_resource:resource_config() + , emqx_resource:create_opts() + ) -> + emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()). +create(InstId, ResourceType, Config, Opts) -> + emqx_cluster_rpc:multicall(emqx_resource, create_local, [InstId, ResourceType, Config, Opts]). + +-spec create_dry_run( emqx_resource:resource_type() + , emqx_resource:resource_config() + ) -> + emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()). +create_dry_run(ResourceType, Config) -> + emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]). + +-spec recreate( emqx_resource:instance_id() + , emqx_resource:resource_type() + , emqx_resource:resource_config() + , emqx_resource:create_opts() + ) -> + emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()). +recreate(InstId, ResourceType, Config, Opts) -> + emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]). + +-spec remove(emqx_resource:instance_id()) -> + emqx_cluster_rpc:multicall_return(ok). +remove(InstId) -> + emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 80c32b327..6559f769c 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -18,7 +18,6 @@ -compile(nowarn_export_all). -compile(export_all). --include("emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -61,12 +60,12 @@ t_create_remove(_) -> {error, _} = emqx_resource:check_and_create_local( ?ID, ?TEST_RESOURCE, - #{unknown => <<"test_resource">>}), + #{unknown => test_resource}), {ok, _} = emqx_resource:create_local( ?ID, ?TEST_RESOURCE, - #{name => <<"test_resource">>}), + #{name => test_resource}), #{pid := Pid} = emqx_resource:query(?ID, get_state), @@ -81,7 +80,7 @@ t_query(_) -> {ok, _} = emqx_resource:create_local( ?ID, ?TEST_RESOURCE, - #{name => <<"test_resource">>}), + #{name => test_resource}), Pid = self(), Success = fun() -> Pid ! success end, @@ -112,13 +111,19 @@ t_healthy(_) -> ok = emqx_resource:health_check(?ID), - [#{status := started}] = emqx_resource:list_instances_verbose(), + ?assertMatch( + [#{status := started}], + emqx_resource:list_instances_verbose()), erlang:exit(Pid, shutdown), - {error, dead} = emqx_resource:health_check(?ID), + ?assertEqual( + {error, dead}, + emqx_resource:health_check(?ID)), - [#{status := stopped}] = emqx_resource:list_instances_verbose(), + ?assertMatch( + [#{status := stopped}], + emqx_resource:list_instances_verbose()), ok = emqx_resource:remove_local(?ID). @@ -126,12 +131,12 @@ t_stop_start(_) -> {error, _} = emqx_resource:check_and_create_local( ?ID, ?TEST_RESOURCE, - #{unknown => <<"test_resource">>}), + #{unknown => test_resource}), {ok, _} = emqx_resource:create_local( ?ID, ?TEST_RESOURCE, - #{name => <<"test_resource">>}), + #{name => test_resource}), #{pid := Pid0} = emqx_resource:query(?ID, get_state), @@ -161,10 +166,23 @@ t_list_filter(_) -> #{name => grouped_a}), [Id1] = emqx_resource:list_group_instances(<<"default">>), - {ok, #{config := #{name := a}}} = emqx_resource:get_instance(Id1), + ?assertMatch( + {ok, #{config := #{name := a}}}, + emqx_resource:get_instance(Id1)), [Id2] = emqx_resource:list_group_instances(<<"group">>), - {ok, #{config := #{name := grouped_a}}} = emqx_resource:get_instance(Id2). + ?assertMatch( + {ok, #{config := #{name := grouped_a}}}, + emqx_resource:get_instance(Id2)). + +t_create_dry_run_local(_) -> + ?assertEqual( + ok, + emqx_resource:create_dry_run_local( + ?TEST_RESOURCE, + #{name => test_resource, register => true})), + + ?assertEqual(undefined, whereis(test_resource)). %%------------------------------------------------------------------------------ %% Helpers diff --git a/apps/emqx_resource/test/emqx_test_resource.erl b/apps/emqx_resource/test/emqx_test_resource.erl index 85090a1c1..1d91d9c22 100644 --- a/apps/emqx_resource/test/emqx_test_resource.erl +++ b/apps/emqx_resource/test/emqx_test_resource.erl @@ -31,16 +31,23 @@ %% callbacks for emqx_resource config schema -export([roots/0]). -roots() -> [{"name", fun name/1}]. +roots() -> [{name, fun name/1}, + {register, fun register/1}]. -name(type) -> binary(); +name(type) -> atom(); name(nullable) -> false; name(_) -> undefined. -on_start(InstId, #{name := Name}) -> +register(type) -> boolean(); +register(nullable) -> false; +register(default) -> false; +register(_) -> undefined. + +on_start(InstId, #{name := Name} = Opts) -> + Register = maps:get(register, Opts, false), {ok, #{name => Name, id => InstId, - pid => spawn_dummy_process()}}. + pid => spawn_dummy_process(Name, Register)}}. on_stop(_InstId, #{pid := Pid}) -> erlang:exit(Pid, shutdown), @@ -59,9 +66,13 @@ on_health_check(_InstId, State = #{pid := Pid}) -> on_config_merge(OldConfig, NewConfig, _Params) -> maps:merge(OldConfig, NewConfig). -spawn_dummy_process() -> +spawn_dummy_process(Name, Register) -> spawn( fun() -> + true = case Register of + true -> register(Name, self()); + _ -> true + end, Ref = make_ref(), receive Ref -> ok diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index 6fa657dcb..3b0142bd0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -20,36 +20,61 @@ -include("emqx_statsd.hrl"). --import(emqx_mgmt_util, [ schema/1 - , bad_request/0]). +-include_lib("typerefl/include/types.hrl"). --export([api_spec/0]). +-import(hoconsc, [mk/2, ref/2]). --export([ statsd/2 +-export([statsd/2]). + +-export([ api_spec/0 + , paths/0 + , schema/1 ]). +-define(API_TAG_STATSD, [<<"statsd">>]). +-define(SCHEMA_MODULE, emqx_statsd_schema). + +-define(INTERNAL_ERROR, 'INTERNAL_ERROR'). + + api_spec() -> - {statsd_api(), []}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -conf_schema() -> - emqx_mgmt_api_configs:gen_schema(emqx:get_raw_config([statsd])). +paths() -> + ["/statsd"]. -statsd_api() -> - Metadata = #{ - get => #{ - description => <<"Get statsd info">>, - responses => #{<<"200">> => schema(conf_schema())} - }, - put => #{ - description => <<"Update Statsd">>, - 'requestBody' => schema(conf_schema()), - responses => #{ - <<"200">> => schema(conf_schema()), - <<"400">> => bad_request() +schema("/statsd") -> + #{ 'operationId' => statsd + , get => + #{ description => <<"Get statsd config">> + , tags => ?API_TAG_STATSD + , responses => + #{200 => statsd_config_schema()} } - } - }, - [{"/statsd", Metadata, statsd}]. + , put => + #{ description => <<"Set statsd config">> + , tags => ?API_TAG_STATSD + , 'requestBody' => statsd_config_schema() + , responses => + #{200 => statsd_config_schema()} + } + }. + +%%-------------------------------------------------------------------- +%% Helper funcs +%%-------------------------------------------------------------------- + +statsd_config_schema() -> + emqx_dashboard_swagger:schema_with_example( + ref(?SCHEMA_MODULE, "statsd"), + statsd_example()). + +statsd_example() -> + #{ enable => true + , flush_time_interval => "32s" + , sample_time_interval => "32s" + , server => "127.0.0.1:8125" + }. statsd(get, _Params) -> {200, emqx:get_raw_config([<<"statsd">>], #{})}; @@ -60,5 +85,5 @@ statsd(put, #{body := Body}) -> {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), - {500, 'INTERNAL_ERROR', Message} + {500, ?INTERNAL_ERROR, Message} end. diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index 72b245f4a..6190f9e66 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -1,3 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -module(emqx_statsd_schema). -include_lib("typerefl/include/types.hrl"). @@ -17,7 +33,7 @@ namespace() -> "statsd". roots() -> ["statsd"]. fields("statsd") -> - [ {enable, hoconsc:mk(boolean(), #{default => false})} + [ {enable, hoconsc:mk(boolean(), #{default => false, nullable => false})} , {server, fun server/1} , {sample_time_interval, fun duration_ms/1} , {flush_time_interval, fun duration_ms/1} diff --git a/mix.exs b/mix.exs index d03b270f1..5dc97264f 100644 --- a/mix.exs +++ b/mix.exs @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do {:mria, github: "emqx/mria", tag: "0.1.5", override: true}, {:ekka, github: "emqx/ekka", tag: "0.11.2", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.0", override: true}, - {:minirest, github: "emqx/minirest", tag: "1.2.9", override: true}, + {:minirest, github: "emqx/minirest", tag: "1.2.10", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.2"}, {:replayq, "0.3.3", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, diff --git a/rebar.config b/rebar.config index ce88dce5f..3ed5cf391 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {mria, {git, "https://github.com/emqx/mria", {tag, "0.1.5"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.2"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}} - , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.10"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, "0.3.3"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}