diff --git a/apps/emqx/src/bpapi/README.md b/apps/emqx/src/bpapi/README.md index 00b5c0c41..a52ca03a9 100644 --- a/apps/emqx/src/bpapi/README.md +++ b/apps/emqx/src/bpapi/README.md @@ -96,9 +96,9 @@ The following limitations apply to these modules: 1. Once the minor EMQX release stated in `introduced_in()` callback of a module reaches GA, the module is frozen. No changes are allowed there, except for adding `deprecated_since()` callback. -2. After the _next_ minor release after the one deprecating the - module reaches GA, the module can be removed. -3. Old versions of the protocol can be dropped in the next major +2. If the backplane API was deprecated in a release `maj.min.0`, then + it can be removed in release `maj.min+1.0`. +3. Old versions of the protocols can be dropped in the next major release. This way we ensure each minor EMQX release is backward-compatible with diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 91e796725..4a5b195c6 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -30,9 +30,10 @@ -export_type([ badrpc/0 , call_result/0 , cast_result/0 + , multicall_result/1 , multicall_result/0 , erpc/1 - , erpc_multicast/1 + , erpc_multicall/1 ]). -compile({inline, @@ -48,7 +49,9 @@ -type cast_result() :: true. --type multicall_result() :: {_Results :: [term()], _BadNodes :: [node()]}. +-type multicall_result(Result) :: {[Result], _BadNodes :: [node()]}. + +-type multicall_result() :: multicall_result(term()). -type erpc(Ret) :: {ok, Ret} | {throw, _Err} @@ -56,7 +59,7 @@ | {error, {exception, _Reason, _Stack :: list()}} | {error, {erpc, _Reason}}. --type erpc_multicast(Ret) :: [erpc(Ret)]. +-type erpc_multicall(Ret) :: [erpc(Ret)]. -spec call(node(), module(), atom(), list()) -> call_result(). call(Node, Mod, Fun, Args) -> diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index 9a1c8701a..e55367530 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -204,7 +204,7 @@ get_trace_filename(Name) -> end end, transaction(Tran). --spec trace_file(File :: list()) -> +-spec trace_file(File :: file:filename_all()) -> {ok, Node :: list(), Binary :: binary()} | {error, Node :: list(), Reason :: term()}. trace_file(File) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index f1335b502..d78cbf5f2 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -29,8 +29,7 @@ -export(['/bridges'/2, '/bridges/:id'/2, '/bridges/:id/operation/:operation'/2]). --export([ list_local_bridges/1 - , lookup_from_local_node/2 +-export([ lookup_from_local_node/2 ]). -define(TYPES, [mqtt, http]). @@ -288,12 +287,8 @@ schema("/bridges/:id/operation/:operation") -> end end; '/bridges'(get, _Params) -> - {200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}. - -list_local_bridges(Node) when Node =:= node() -> - [format_resp(Data) || Data <- emqx_bridge:list()]; -list_local_bridges(Node) -> - rpc_call(Node, list_local_bridges, [Node]). + {200, zip_bridges([[format_resp(Data) || Data <- emqx_bridge_proto_v1:list_bridges(Node)] + || Node <- mria_mnesia:running_nodes()])}. '/bridges/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); @@ -321,7 +316,8 @@ list_local_bridges(Node) -> end). lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> - case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of + Nodes = mria_mnesia:running_nodes(), + case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of {ok, [{ok, _} | _] = Results} -> {SuccCode, format_bridge_info([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> @@ -433,9 +429,8 @@ format_metrics(#{ } }) -> ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax). -rpc_multicall(Func, Args) -> - Nodes = mria_mnesia:running_nodes(), - ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000), + +is_ok(ResL) -> case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of [] -> {ok, [Res || {ok, Res} <- ResL]}; ErrL -> {error, ErrL} @@ -446,17 +441,6 @@ filter_out_request_body(Conf) -> <<"node_metrics">>, <<"metrics">>, <<"node">>], maps:without(ExtraConfs, Conf). -rpc_call(Node, Fun, Args) -> - rpc_call(Node, ?MODULE, Fun, Args). - -rpc_call(Node, Mod, Fun, Args) when Node =:= node() -> - apply(Mod, Fun, Args); -rpc_call(Node, Mod, Fun, Args) -> - case rpc:call(Node, Mod, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. - error_msg(Code, Msg) when is_binary(Msg) -> #{code => Code, message => Msg}; error_msg(Code, Msg) -> diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl new file mode 100644 index 000000000..71ea1d2dc --- /dev/null +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , list_bridges/1 + , lookup_from_all_nodes/3 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.0.0". + +-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). +list_bridges(Node) -> + rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, [BridgeType, BridgeName], ?TIMEOUT). diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl index e935ed5c6..c2b898d29 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl @@ -69,7 +69,8 @@ update(KeyPath, UpdateReq, Opts) -> update(Node, KeyPath, UpdateReq, Opts) -> rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). --spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> _. +-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> + emqx_cluster_rpc:multicall_result(). remove_config(KeyPath, Opts) -> emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). diff --git a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl index 30e910f2b..ee239e5d5 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl @@ -1928,6 +1928,7 @@ case100_clients_api(Config) -> %% kickout {204, _} = request(delete, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)), + timer:sleep(100), {200, #{data := []}} = request(get, "/gateway/lwm2m/clients"). case100_subscription_api(Config) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 324f0f227..17fcafe24 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -38,7 +38,7 @@ -export([validate_name/1]). %% for rpc --export([read_trace_file/3 +-export([ read_trace_file/3 , get_trace_size/0 ]). @@ -241,7 +241,7 @@ trace(get, _Params) -> List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end, emqx_trace:format(List0)), Nodes = mria_mnesia:running_nodes(), - TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000), + TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v1:get_trace_size(Nodes)), AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize), Now = erlang:system_time(second), Traces = @@ -333,13 +333,12 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) -> end, [], TraceFiles). collect_trace_file(TraceLog) -> - cluster_call(emqx_trace, trace_file, [TraceLog], 60000). - -cluster_call(Mod, Fun, Args, Timeout) -> Nodes = mria_mnesia:running_nodes(), - {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), + wrap_rpc(emqx_mgmt_trace_proto_v1:trace_file(Nodes, TraceLog)). + +wrap_rpc({GoodRes, BadNodes}) -> BadNodes =/= [] andalso - ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, Args}}), + ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}), GoodRes. stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> @@ -348,7 +347,7 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> Bytes = maps:get(<<"bytes">>, Query, 1000), case to_node(Node0) of {ok, Node} -> - case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of + case emqx_mgmt_trace_proto_v1:read_trace_file(Node, Name, Position, Bytes) of {ok, Bin} -> Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes}, {200, #{meta => Meta, items => Bin}}; @@ -368,6 +367,7 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) -> {error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}} end. +-spec get_trace_size() -> #{{node(), file:name_all()} => non_neg_integer()}. get_trace_size() -> TraceDir = emqx_trace:trace_dir(), Node = node(), @@ -381,6 +381,12 @@ get_trace_size() -> end. %% this is an rpc call for stream_log_file/2 +-spec read_trace_file( binary() + , non_neg_integer() + , non_neg_integer() + ) -> {ok, binary()} + | {error, _} + | {eof, non_neg_integer()}. read_trace_file(Name, Position, Limit) -> case emqx_trace:get_trace_filename(Name) of {error, _} = Error -> Error; diff --git a/apps/emqx_management/src/proto/emqx_mgmt_trace_proto_v1.erl b/apps/emqx_management/src/proto/emqx_mgmt_trace_proto_v1.erl new file mode 100644 index 000000000..6579dcd02 --- /dev/null +++ b/apps/emqx_management/src/proto/emqx_mgmt_trace_proto_v1.erl @@ -0,0 +1,51 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_trace_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , trace_file/2 + , get_trace_size/1 + , read_trace_file/4 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec get_trace_size([node()]) -> + emqx_rpc:multicall_result(#{{node(), file:name_all()} => non_neg_integer()}). +get_trace_size(Nodes) -> + rpc:multicall(Nodes, emqx_mgmt_api_trace, get_trace_size, [], 30000). + +-spec trace_file([node()], file:name_all()) -> + emqx_rpc:multicall_result( + {ok, Node :: list(), Binary :: binary()} | + {error, Node :: list(), Reason :: term()}). +trace_file(Nodes, File) -> + rpc:multicall(Nodes, emqx_trace, trace_file, [File], 60000). + +-spec read_trace_file(node(), binary(), non_neg_integer(), non_neg_integer()) -> + {ok, binary()} + | {error, _} + | {eof, non_neg_integer()} + | {badrpc, _}. +read_trace_file(Node, Name, Position, Limit) -> + rpc:call(Node, emqx_mgmt_api_trace, read_trace_file, [Name, Position, Limit]). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index 811a5131c..eef945a81 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -55,7 +55,7 @@ -define(SAMPLING, 1). -endif. --export_type([metrics/0]). +-export_type([metrics/0, handler_name/0, metric_id/0]). -type rate() :: #{ current => float(), diff --git a/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl b/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl new file mode 100644 index 000000000..9a7e8e6a5 --- /dev/null +++ b/apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_libs_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , get_metrics/3 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec get_metrics( node() + , emqx_plugin_libs_metrics:handler_name() + , emqx_plugin_libs_metrics:metric_id() + ) -> emqx_plugin_libs_metrics:metrics() | {badrpc, _}. +get_metrics(Node, HandlerName, MetricId) -> + rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [HandlerName, MetricId]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 6f1da4e88..58975c310 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -322,7 +322,7 @@ get_rule_metrics(Id) -> , node => Node } end, - [Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id])) + [Format(Node, emqx_plugin_libs_proto_v1:get_metrics(Node, rule_metrics, Id)) || Node <- mria_mnesia:running_nodes()]. aggregate_metrics(AllMetrics) -> @@ -350,4 +350,3 @@ filter_out_request_body(Conf) -> ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>, <<"metrics">>, <<"node">>], maps:without(ExtraConfs, Conf). -