Merge pull request #6705 from k32/bpapi-conf

refactor(emqx_conf): Decorate remote procedure calls
This commit is contained in:
k32 2022-01-12 22:32:35 +01:00 committed by GitHub
commit 64d594d1df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 267 additions and 62 deletions

View File

@ -139,10 +139,6 @@ xref: $(REBAR)
dialyzer: $(REBAR)
@$(REBAR) as check dialyzer
.PHONY: ldialyzer
ldialyzer: $(REBAR)
@$(REBAR) as lcheck dialyzer
COMMON_DEPS := $(REBAR) get-dashboard conf-segs
## rel target is to create release package without relup

View File

@ -0,0 +1,109 @@
Backplane API
===
# Motivation
This directory contains modules that help defining and maintaining
EMQX broker-to-broker (backplane) protocols.
Historically, all inter-broker communication was done by the means of
remote procedure calls. This approach allowed for rapid development,
but presented some challenges for rolling cluster upgrade, since
tracking destination of the RPC could not be automated using standard
tools (such as xref and dialyzer).
Starting from EMQX v5.0.0, `emqx_bpapi` sub-application is used to
facilitate backplane API backward- and forward-compatibility. Wild
remote procedure calls are no longer allowed. Instead, every call is
decorated by a very thin wrapper function located in a versioned
"_proto_" module.
Some restrictions are put on the lifecycle of the `_proto_` modules,
and they are additionally tracked in a database created in at the
build time.
# Rolling upgrade
During rolling upgrades different versions of the code is running
side-by-side:
```txt
+--------------+ +---------------+
| | | |
| Node A | ----- rpc:call(foo, foo, []) ------> | Node B |
| | | |
| EMQX 5.1.2 | <---- rpc:call(foo, foo, [1]) ------- | EMQX 5.0.13 |
| | | |
+--------------+ +---------------+
```
The following changes will break the backplane API:
1. removing a target function
2. adding a new method to the protocol
3. reducing the domain of the target function
4. extending the co-domain of the target function
Bullets 1 and 2 are addressed by a static check that verifies
immutability of the proto modules. 3 is checked using dialyzer
specs. 4 is not checked at this moment.
# Backplane API modules
A distributed Erlang application in EMQX is organized like this:
```txt
...
myapp/src/myapp.erl
myapp/src/myapp.app.src
myapp/src/proto/myapp_proto_v1.erl
myapp/src/proto/myapp_proto_v2.erl
```
Notice `proto` directory containing several modules that follow
`<something>_proto_v<number>` pattern.
These modules should follow the following template:
```erlang
-module(emqx_proto_v1).
-behaviour(emqx_bpapi).
%% Note: the below include is mandatory
-include_lib("emqx/include/bpapi.hrl").
-export([ introduced_in/0
, deprecated_since/0 %% Optional
]).
-export([ is_running/1
]).
introduced_in() ->
"5.0.0".
deprecated_since() ->
"5.2.0".
-spec is_running(node()) -> boolean().
is_running(Node) ->
rpc:call(Node, emqx, is_running, []).
```
The following limitations apply to these modules:
1. Once the minor EMQX release stated in `introduced_in()` callback of
a module reaches GA, the module is frozen. No changes are allowed
there, except for adding `deprecated_since()` callback.
2. After the _next_ minor release after the one deprecating the
module reaches GA, the module can be removed.
3. Old versions of the protocol can be dropped in the next major
release.
This way we ensure each minor EMQX release is backward-compatible with
the previous one.
# Protocol version negotiation
TODO

View File

@ -20,6 +20,9 @@
-include_lib("emqx/include/logger.hrl").
%% Using an undocumented API here :(
-include_lib("dialyzer/src/dialyzer.hrl").
-type api_dump() :: #{{emqx_bpapi:api(), emqx_bpapi:api_version()} =>
#{ calls := [emqx_bpapi:rpc()]
, casts := [emqx_bpapi:rpc()]
@ -45,6 +48,8 @@
-define(IGNORED_MODULES, "emqx_rpc").
%% List of known RPC backend modules:
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
%% List of known functions also known to do RPC:
-define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5").
%% List of functions in the RPC backend modules that we can ignore:
-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0").
@ -128,21 +133,24 @@ typecheck_apis( #{release := CallerRelease, api := CallerAPIs, signatures := Cal
setnok(),
[?ERROR("Incompatible RPC call: "
"type of the parameter ~p of RPC call ~s on release ~p "
"is not a subtype of the target function ~s on release ~p",
"is not a subtype of the target function ~s on release ~p.~n"
"Caller type: ~s~nCallee type: ~s~n",
[Var, format_call(From), CallerRelease,
format_call(To), CalleeRelease])
|| Var <- TypeErrors]
format_call(To), CalleeRelease,
erl_types:t_to_string(CallerType),
erl_types:t_to_string(CalleeType)])
|| {Var, CallerType, CalleeType} <- TypeErrors]
end
end,
AllCalls).
-spec typecheck_rpc(param_types(), param_types()) -> [emqx_bpapi:var_name()].
-spec typecheck_rpc(param_types(), param_types()) -> [{emqx_bpapi:var_name(), _Type, _Type}].
typecheck_rpc(Caller, Callee) ->
maps:fold(fun(Var, CalleeType, Acc) ->
#{Var := CallerType} = Caller,
case erl_types:t_is_subtype(CallerType, CalleeType) of
true -> Acc;
false -> [Var|Acc]
false -> [{Var, CallerType, CalleeType}|Acc]
end
end,
[],
@ -182,7 +190,7 @@ dump(Opts) ->
warn_nonbpapi_rpcs(NonBPAPICalls),
APIDump = collect_bpapis(BPAPICalls),
DialyzerDump = collect_signatures(PLT, APIDump),
Release = emqx_app:get_release(),
[Release|_] = string:split(emqx_app:get_release(), "-"),
dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}),
xref:stop(?XREF),
erase(bpapi_ok).
@ -200,7 +208,7 @@ prepare(#{reldir := RelDir, plt := PLT}) ->
find_remote_calls(_Opts) ->
Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "] : Mod)
|| ([" ?RPC_MODULES "] : Mod - " ?IGNORED_RPC_CALLS ")",
|| (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - " ?IGNORED_RPC_CALLS ")",
{ok, Calls} = xref:q(?XREF, Query),
?INFO("Calls to RPC modules ~p", [Calls]),
{Callers, _Callees} = lists:unzip(Calls),
@ -263,9 +271,11 @@ collect_signatures(PLT, APIs) ->
enrich({From0, To0}, {Acc0, PLT}) ->
From = call_to_mfa(From0),
To = call_to_mfa(To0),
case {dialyzer_plt:lookup(PLT, From), dialyzer_plt:lookup(PLT, To)} of
{{value, TFrom}, {value, TTo}} ->
Acc = Acc0#{ From => TFrom
case {dialyzer_plt:lookup_contract(PLT, From), dialyzer_plt:lookup(PLT, To)} of
{{value, #contract{args = FromArgs}}, {value, TTo}} ->
%% TODO: Check return type
FromRet = erl_types:t_any(),
Acc = Acc0#{ From => {FromRet, FromArgs}
, To => TTo
},
{Acc, PLT};

View File

@ -150,10 +150,19 @@ extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Node, M, F, A]) ->
extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Tag, _Node, M, F, A]) ->
{call_or_cast(CallOrCast), M, F, A};
%% (e)rpc:
extract_mfa(?BACKEND(rpc, multicall), [M, F, A]) ->
{call_or_cast(multicall), M, F, A};
extract_mfa(?BACKEND(rpc, multicall), [M, F, A, {integer, _, _Timeout}]) ->
{call_or_cast(multicall), M, F, A};
extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A]) when ?IS_RPC(RPC) ->
{call_or_cast(CallOrCast), M, F, A};
extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A, _Timeout]) when ?IS_RPC(RPC) ->
{call_or_cast(CallOrCast), M, F, A};
%% emqx_cluster_rpc:
extract_mfa(?BACKEND(emqx_cluster_rpc, multicall), [M, F, A]) ->
{call, M, F, A};
extract_mfa(?BACKEND(emqx_cluster_rpc, multicall), [M, F, A, _RequiredNum, _Timeout]) ->
{call, M, F, A};
extract_mfa(_, _) ->
error("unrecognized RPC call").

View File

@ -31,7 +31,7 @@
]).
-export_type([config_key/0, config_key_path/0]).
-type config_key() :: atom() | binary() | string().
-type config_key() :: atom() | binary() | [byte()].
-type config_key_path() :: [config_key()].
-type convert_fun() :: fun((...) -> {K1::any(), V1::any()} | drop).

View File

@ -18,6 +18,10 @@
%% API
-export([start_link/0, mnesia/1]).
%% Note: multicall functions are statically checked by
%% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't
%% forget to update it when adding or removing them here:
-export([multicall/3, multicall/5, query/1, reset/0, status/0,
skip_failed_commit/1, fast_forward_to_commit/2]).
-export([get_node_tnx_id/1, latest_tnx_id/0]).
@ -25,6 +29,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
handle_continue/2, code_change/3]).
-export_type([txn_id/0, succeed_num/0, multicall_return/0]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
@ -38,6 +44,14 @@
-define(CATCH_UP, catch_up).
-define(TIMEOUT, timer:minutes(1)).
-type txn_id() :: pos_integer().
-type succeed_num() :: pos_integer() | all.
-type multicall_return() :: {ok, txn_id(), _Result}
| {error, term()}
| {retry, txn_id(), _Result, node()}.
%%%===================================================================
%%% API
%%%===================================================================
@ -64,27 +78,11 @@ start_link(Node, Name, RetryMs) ->
%% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok.
%% return {error, MFARes} when the first MFA result is no ok or {ok, term()}.
%% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok.
-spec multicall(Module, Function, Args) ->
{ok, TnxId, term()} | {error, Reason} | {retry, TnxId, MFARes, node()} when
Module :: module(),
Function :: atom(),
Args :: [term()],
MFARes :: term(),
TnxId :: pos_integer(),
Reason :: string().
-spec multicall(module(), atom(), list()) -> multicall_return().
multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)).
-spec multicall(Module, Function, Args, SucceedNum, Timeout) ->
{ok, TnxId, MFARes} | {error, Reason} | {retry, TnxId, MFARes, node()} when
Module :: module(),
Function :: atom(),
Args :: [term()],
SucceedNum :: pos_integer() | all,
TnxId :: pos_integer(),
MFARes :: term(),
Timeout :: timeout(),
Reason :: string().
-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return().
multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 ->
MFA = {initiate, {M, F, A}},
Begin = erlang:monotonic_time(),

View File

@ -55,22 +55,22 @@ get_raw(KeyPath, Default) ->
%% @doc Returns all values in the cluster.
-spec get_all(emqx_map_lib:config_key_path()) -> #{node() => term()}.
get_all(KeyPath) ->
{ResL, []} = rpc:multicall(?MODULE, get_node_and_config, [KeyPath], 5000),
{ResL, []} = emqx_conf_proto_v1:get_all(KeyPath),
maps:from_list(ResL).
%% @doc Returns the specified node's KeyPath, or exception if not found
-spec get_by_node(node(), emqx_map_lib:config_key_path()) -> term().
get_by_node(Node, KeyPath)when Node =:= node() ->
get_by_node(Node, KeyPath) when Node =:= node() ->
emqx:get_config(KeyPath);
get_by_node(Node, KeyPath) ->
rpc:call(Node, ?MODULE, get_by_node, [Node, KeyPath]).
emqx_conf_proto_v1:get_config(Node, KeyPath).
%% @doc Returns the specified node's KeyPath, or the default value if not found
-spec get_by_node(node(), emqx_map_lib:config_key_path(), term()) -> term().
get_by_node(Node, KeyPath, Default)when Node =:= node() ->
get_by_node(Node, KeyPath, Default) when Node =:= node() ->
emqx:get_config(KeyPath, Default);
get_by_node(Node, KeyPath, Default) ->
rpc:call(Node, ?MODULE, get_by_node, [Node, KeyPath, Default]).
emqx_conf_proto_v1:get_config(Node, KeyPath, Default).
%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
-spec get_node_and_config(emqx_map_lib:config_key_path()) -> term().
@ -81,25 +81,23 @@ get_node_and_config(KeyPath) ->
-spec update(emqx_map_lib:config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts0) ->
Args = [KeyPath, UpdateReq, Opts0],
multicall(emqx, update_config, Args).
update(KeyPath, UpdateReq, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts)).
%% @doc Update the specified node's key path in local-override.conf.
-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(Node, KeyPath, UpdateReq, Opts0)when Node =:= node() ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()} | emqx_rpc:badrpc().
update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
update(Node, KeyPath, UpdateReq, Opts0) ->
rpc:call(Node, ?MODULE, update, [Node, KeyPath, UpdateReq, Opts0], 5000).
update(Node, KeyPath, UpdateReq, Opts) ->
emqx_conf_proto_v1:update(Node, KeyPath, UpdateReq, Opts).
%% @doc remove all value of key path in cluster-override.conf or local-override.conf.
-spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(KeyPath, Opts0) ->
Args = [KeyPath, Opts0],
multicall(emqx, remove_config, Args).
remove(KeyPath, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:remove_config(KeyPath, Opts)).
%% @doc remove the specified node's key path in local-override.conf.
-spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -107,14 +105,13 @@ remove(KeyPath, Opts0) ->
remove(Node, KeyPath, Opts) when Node =:= node() ->
emqx:remove_config(KeyPath, Opts#{override_to => local});
remove(Node, KeyPath, Opts) ->
rpc:call(Node, ?MODULE, remove, [KeyPath, Opts]).
emqx_conf_proto_v1:remove_config(Node, KeyPath, Opts).
%% @doc reset all value of key path in cluster-override.conf or local-override.conf.
-spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts0) ->
Args = [KeyPath, Opts0],
multicall(emqx, reset_config, Args).
reset(KeyPath, Opts) ->
check_cluster_rpc_result(emqx_conf_proto_v1:reset(KeyPath, Opts)).
%% @doc reset the specified node's key path in local-override.conf.
-spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -122,7 +119,7 @@ reset(KeyPath, Opts0) ->
reset(Node, KeyPath, Opts) when Node =:= node() ->
emqx:reset_config(KeyPath, Opts#{override_to => local});
reset(Node, KeyPath, Opts) ->
rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]).
emqx_conf_proto_v1:reset(Node, KeyPath, Opts).
-spec gen_doc(file:name_all()) -> ok.
gen_doc(File) ->
@ -138,14 +135,14 @@ gen_doc(File) ->
%% Internal functions
%%--------------------------------------------------------------------
multicall(M, F, Args) ->
case emqx_cluster_rpc:multicall(M, F, Args) of
check_cluster_rpc_result(Result) ->
case Result of
{ok, _TnxId, Res} -> Res;
{retry, TnxId, Res, Nodes} ->
%% The init MFA return ok, but other nodes failed.
%% We return ok and alert an alarm.
?SLOG(error, #{msg => "failed_to_update_config_in_cluster", nodes => Nodes,
tnx_id => TnxId, mfa => {M, F, Args}}),
tnx_id => TnxId}),
Res;
{error, Error} -> %% all MFA return not ok or {ok, term()}.
Error

View File

@ -0,0 +1,91 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_proto_v1).
-behaviour(emqx_bpapi).
-export([ introduced_in/0
, get_config/2
, get_config/3
, get_all/1
, update/3
, update/4
, remove_config/2
, remove_config/3
, reset/2
, reset/3
]).
-include_lib("emqx/include/bpapi.hrl").
-type update_config_key_path() :: [emqx_map_lib:config_key(), ...].
introduced_in() ->
"5.0.0".
-spec get_config(node(), emqx_map_lib:config_key_path()) ->
term() | emqx_rpc:badrpc().
get_config(Node, KeyPath) ->
rpc:call(Node, emqx, get_config, [KeyPath]).
-spec get_config(node(), emqx_map_lib:config_key_path(), _Default) ->
term() | emqx_rpc:badrpc().
get_config(Node, KeyPath, Default) ->
rpc:call(Node, emqx, get_config, [KeyPath, Default]).
-spec get_all(emqx_map_lib:config_key_path()) -> emqx_rpc:multicall_result().
get_all(KeyPath) ->
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
-spec update(update_config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) -> emqx_cluster_rpc:multicall_return().
update(KeyPath, UpdateReq, Opts) ->
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
-spec update(node(), update_config_key_path(), emqx_config:update_request(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
update(Node, KeyPath, UpdateReq, Opts) ->
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> _.
remove_config(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
remove_config(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
emqx_cluster_rpc:multicall_return().
reset(KeyPath, Opts) ->
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()}
| {error, emqx_config:update_error()}
| emqx_rpc:badrpc().
reset(Node, KeyPath, Opts) ->
rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).

View File

@ -163,11 +163,6 @@ profiles() ->
[ {erl_opts, common_compile_opts()}
, {project_app_dirs, project_app_dirs(ce)}
]}
, {lcheck,
[ {erl_opts, common_compile_opts()}
, {project_app_dirs, project_app_dirs(ce)}
, {dialyzer, [{warnings, [unmatched_returns, error_handling]}]}
]}
, {test,
[ {deps, test_deps()}
, {erl_opts, common_compile_opts() ++ erl_opts_i(ce) }