diff --git a/Makefile b/Makefile index 3a8e639d8..33052d6f9 100644 --- a/Makefile +++ b/Makefile @@ -139,10 +139,6 @@ xref: $(REBAR) dialyzer: $(REBAR) @$(REBAR) as check dialyzer -.PHONY: ldialyzer -ldialyzer: $(REBAR) - @$(REBAR) as lcheck dialyzer - COMMON_DEPS := $(REBAR) get-dashboard conf-segs ## rel target is to create release package without relup diff --git a/apps/emqx/src/bpapi/README.md b/apps/emqx/src/bpapi/README.md new file mode 100644 index 000000000..00b5c0c41 --- /dev/null +++ b/apps/emqx/src/bpapi/README.md @@ -0,0 +1,109 @@ +Backplane API +=== + +# Motivation + +This directory contains modules that help defining and maintaining +EMQX broker-to-broker (backplane) protocols. + +Historically, all inter-broker communication was done by the means of +remote procedure calls. This approach allowed for rapid development, +but presented some challenges for rolling cluster upgrade, since +tracking destination of the RPC could not be automated using standard +tools (such as xref and dialyzer). + +Starting from EMQX v5.0.0, `emqx_bpapi` sub-application is used to +facilitate backplane API backward- and forward-compatibility. Wild +remote procedure calls are no longer allowed. Instead, every call is +decorated by a very thin wrapper function located in a versioned +"_proto_" module. + +Some restrictions are put on the lifecycle of the `_proto_` modules, +and they are additionally tracked in a database created in at the +build time. + +# Rolling upgrade + +During rolling upgrades different versions of the code is running +side-by-side: + +```txt ++--------------+ +---------------+ +| | | | +| Node A | ----- rpc:call(foo, foo, []) ------> | Node B | +| | | | +| EMQX 5.1.2 | <---- rpc:call(foo, foo, [1]) ------- | EMQX 5.0.13 | +| | | | ++--------------+ +---------------+ +``` + +The following changes will break the backplane API: + +1. removing a target function +2. adding a new method to the protocol +3. reducing the domain of the target function +4. extending the co-domain of the target function + +Bullets 1 and 2 are addressed by a static check that verifies +immutability of the proto modules. 3 is checked using dialyzer +specs. 4 is not checked at this moment. + +# Backplane API modules + +A distributed Erlang application in EMQX is organized like this: + +```txt +... +myapp/src/myapp.erl +myapp/src/myapp.app.src +myapp/src/proto/myapp_proto_v1.erl +myapp/src/proto/myapp_proto_v2.erl +``` + +Notice `proto` directory containing several modules that follow +`_proto_v` pattern. + +These modules should follow the following template: + +```erlang +-module(emqx_proto_v1). + +-behaviour(emqx_bpapi). + +%% Note: the below include is mandatory +-include_lib("emqx/include/bpapi.hrl"). + +-export([ introduced_in/0 + , deprecated_since/0 %% Optional + ]). + +-export([ is_running/1 + ]). + +introduced_in() -> + "5.0.0". + +deprecated_since() -> + "5.2.0". + +-spec is_running(node()) -> boolean(). +is_running(Node) -> + rpc:call(Node, emqx, is_running, []). +``` + +The following limitations apply to these modules: + +1. Once the minor EMQX release stated in `introduced_in()` callback of + a module reaches GA, the module is frozen. No changes are allowed + there, except for adding `deprecated_since()` callback. +2. After the _next_ minor release after the one deprecating the + module reaches GA, the module can be removed. +3. Old versions of the protocol can be dropped in the next major + release. + +This way we ensure each minor EMQX release is backward-compatible with +the previous one. + +# Protocol version negotiation + +TODO diff --git a/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl b/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl index 91cbbc13b..4d742c600 100644 --- a/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl +++ b/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl @@ -20,6 +20,9 @@ -include_lib("emqx/include/logger.hrl"). +%% Using an undocumented API here :( +-include_lib("dialyzer/src/dialyzer.hrl"). + -type api_dump() :: #{{emqx_bpapi:api(), emqx_bpapi:api_version()} => #{ calls := [emqx_bpapi:rpc()] , casts := [emqx_bpapi:rpc()] @@ -45,6 +48,8 @@ -define(IGNORED_MODULES, "emqx_rpc"). %% List of known RPC backend modules: -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). +%% List of known functions also known to do RPC: +-define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5"). %% List of functions in the RPC backend modules that we can ignore: -define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0"). @@ -128,21 +133,24 @@ typecheck_apis( #{release := CallerRelease, api := CallerAPIs, signatures := Cal setnok(), [?ERROR("Incompatible RPC call: " "type of the parameter ~p of RPC call ~s on release ~p " - "is not a subtype of the target function ~s on release ~p", + "is not a subtype of the target function ~s on release ~p.~n" + "Caller type: ~s~nCallee type: ~s~n", [Var, format_call(From), CallerRelease, - format_call(To), CalleeRelease]) - || Var <- TypeErrors] + format_call(To), CalleeRelease, + erl_types:t_to_string(CallerType), + erl_types:t_to_string(CalleeType)]) + || {Var, CallerType, CalleeType} <- TypeErrors] end end, AllCalls). --spec typecheck_rpc(param_types(), param_types()) -> [emqx_bpapi:var_name()]. +-spec typecheck_rpc(param_types(), param_types()) -> [{emqx_bpapi:var_name(), _Type, _Type}]. typecheck_rpc(Caller, Callee) -> maps:fold(fun(Var, CalleeType, Acc) -> #{Var := CallerType} = Caller, case erl_types:t_is_subtype(CallerType, CalleeType) of true -> Acc; - false -> [Var|Acc] + false -> [{Var, CallerType, CalleeType}|Acc] end end, [], @@ -182,7 +190,7 @@ dump(Opts) -> warn_nonbpapi_rpcs(NonBPAPICalls), APIDump = collect_bpapis(BPAPICalls), DialyzerDump = collect_signatures(PLT, APIDump), - Release = emqx_app:get_release(), + [Release|_] = string:split(emqx_app:get_release(), "-"), dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}), xref:stop(?XREF), erase(bpapi_ok). @@ -200,7 +208,7 @@ prepare(#{reldir := RelDir, plt := PLT}) -> find_remote_calls(_Opts) -> Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "] : Mod) - || ([" ?RPC_MODULES "] : Mod - " ?IGNORED_RPC_CALLS ")", + || (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - " ?IGNORED_RPC_CALLS ")", {ok, Calls} = xref:q(?XREF, Query), ?INFO("Calls to RPC modules ~p", [Calls]), {Callers, _Callees} = lists:unzip(Calls), @@ -263,9 +271,11 @@ collect_signatures(PLT, APIs) -> enrich({From0, To0}, {Acc0, PLT}) -> From = call_to_mfa(From0), To = call_to_mfa(To0), - case {dialyzer_plt:lookup(PLT, From), dialyzer_plt:lookup(PLT, To)} of - {{value, TFrom}, {value, TTo}} -> - Acc = Acc0#{ From => TFrom + case {dialyzer_plt:lookup_contract(PLT, From), dialyzer_plt:lookup(PLT, To)} of + {{value, #contract{args = FromArgs}}, {value, TTo}} -> + %% TODO: Check return type + FromRet = erl_types:t_any(), + Acc = Acc0#{ From => {FromRet, FromArgs} , To => TTo }, {Acc, PLT}; diff --git a/apps/emqx/src/bpapi/emqx_bpapi_trans.erl b/apps/emqx/src/bpapi/emqx_bpapi_trans.erl index f39215069..aec4aa320 100644 --- a/apps/emqx/src/bpapi/emqx_bpapi_trans.erl +++ b/apps/emqx/src/bpapi/emqx_bpapi_trans.erl @@ -150,10 +150,19 @@ extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Node, M, F, A]) -> extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Tag, _Node, M, F, A]) -> {call_or_cast(CallOrCast), M, F, A}; %% (e)rpc: +extract_mfa(?BACKEND(rpc, multicall), [M, F, A]) -> + {call_or_cast(multicall), M, F, A}; +extract_mfa(?BACKEND(rpc, multicall), [M, F, A, {integer, _, _Timeout}]) -> + {call_or_cast(multicall), M, F, A}; extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A]) when ?IS_RPC(RPC) -> {call_or_cast(CallOrCast), M, F, A}; extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A, _Timeout]) when ?IS_RPC(RPC) -> {call_or_cast(CallOrCast), M, F, A}; +%% emqx_cluster_rpc: +extract_mfa(?BACKEND(emqx_cluster_rpc, multicall), [M, F, A]) -> + {call, M, F, A}; +extract_mfa(?BACKEND(emqx_cluster_rpc, multicall), [M, F, A, _RequiredNum, _Timeout]) -> + {call, M, F, A}; extract_mfa(_, _) -> error("unrecognized RPC call"). diff --git a/apps/emqx/src/emqx_map_lib.erl b/apps/emqx/src/emqx_map_lib.erl index cdc167729..8a5ecefad 100644 --- a/apps/emqx/src/emqx_map_lib.erl +++ b/apps/emqx/src/emqx_map_lib.erl @@ -31,7 +31,7 @@ ]). -export_type([config_key/0, config_key_path/0]). --type config_key() :: atom() | binary() | string(). +-type config_key() :: atom() | binary() | [byte()]. -type config_key_path() :: [config_key()]. -type convert_fun() :: fun((...) -> {K1::any(), V1::any()} | drop). diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 87758680f..40957bd3c 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -18,12 +18,18 @@ %% API -export([start_link/0, mnesia/1]). + +%% Note: multicall functions are statically checked by +%% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't +%% forget to update it when adding or removing them here: -export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1, fast_forward_to_commit/2]). -export([get_node_tnx_id/1, latest_tnx_id/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - handle_continue/2, code_change/3]). + handle_continue/2, code_change/3]). + +-export_type([txn_id/0, succeed_num/0, multicall_return/0]). -ifdef(TEST). -compile(export_all). @@ -38,6 +44,14 @@ -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). +-type txn_id() :: pos_integer(). + +-type succeed_num() :: pos_integer() | all. + +-type multicall_return() :: {ok, txn_id(), _Result} + | {error, term()} + | {retry, txn_id(), _Result, node()}. + %%%=================================================================== %%% API %%%=================================================================== @@ -64,27 +78,11 @@ start_link(Node, Name, RetryMs) -> %% @doc return {ok, TnxId, MFARes} the first MFA result when all MFA run ok. %% return {error, MFARes} when the first MFA result is no ok or {ok, term()}. %% return {retry, TnxId, MFARes, Nodes} when some Nodes failed and some Node ok. --spec multicall(Module, Function, Args) -> - {ok, TnxId, term()} | {error, Reason} | {retry, TnxId, MFARes, node()} when - Module :: module(), - Function :: atom(), - Args :: [term()], - MFARes :: term(), - TnxId :: pos_integer(), - Reason :: string(). +-spec multicall(module(), atom(), list()) -> multicall_return(). multicall(M, F, A) -> multicall(M, F, A, all, timer:minutes(2)). --spec multicall(Module, Function, Args, SucceedNum, Timeout) -> - {ok, TnxId, MFARes} | {error, Reason} | {retry, TnxId, MFARes, node()} when - Module :: module(), - Function :: atom(), - Args :: [term()], - SucceedNum :: pos_integer() | all, - TnxId :: pos_integer(), - MFARes :: term(), - Timeout :: timeout(), - Reason :: string(). +-spec multicall(module(), atom(), list(), succeed_num(), timeout()) -> multicall_return(). multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNum >= 1 -> MFA = {initiate, {M, F, A}}, Begin = erlang:monotonic_time(), diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 16d3b4a5d..3100833f0 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -55,22 +55,22 @@ get_raw(KeyPath, Default) -> %% @doc Returns all values in the cluster. -spec get_all(emqx_map_lib:config_key_path()) -> #{node() => term()}. get_all(KeyPath) -> - {ResL, []} = rpc:multicall(?MODULE, get_node_and_config, [KeyPath], 5000), + {ResL, []} = emqx_conf_proto_v1:get_all(KeyPath), maps:from_list(ResL). %% @doc Returns the specified node's KeyPath, or exception if not found -spec get_by_node(node(), emqx_map_lib:config_key_path()) -> term(). -get_by_node(Node, KeyPath)when Node =:= node() -> +get_by_node(Node, KeyPath) when Node =:= node() -> emqx:get_config(KeyPath); get_by_node(Node, KeyPath) -> - rpc:call(Node, ?MODULE, get_by_node, [Node, KeyPath]). + emqx_conf_proto_v1:get_config(Node, KeyPath). %% @doc Returns the specified node's KeyPath, or the default value if not found -spec get_by_node(node(), emqx_map_lib:config_key_path(), term()) -> term(). -get_by_node(Node, KeyPath, Default)when Node =:= node() -> +get_by_node(Node, KeyPath, Default) when Node =:= node() -> emqx:get_config(KeyPath, Default); get_by_node(Node, KeyPath, Default) -> - rpc:call(Node, ?MODULE, get_by_node, [Node, KeyPath, Default]). + emqx_conf_proto_v1:get_config(Node, KeyPath, Default). %% @doc Returns the specified node's KeyPath, or config_not_found if key path not found -spec get_node_and_config(emqx_map_lib:config_key_path()) -> term(). @@ -81,25 +81,23 @@ get_node_and_config(KeyPath) -> -spec update(emqx_map_lib:config_key_path(), emqx_config:update_request(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -update(KeyPath, UpdateReq, Opts0) -> - Args = [KeyPath, UpdateReq, Opts0], - multicall(emqx, update_config, Args). +update(KeyPath, UpdateReq, Opts) -> + check_cluster_rpc_result(emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts)). %% @doc Update the specified node's key path in local-override.conf. -spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(), emqx_config:update_opts()) -> - {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -update(Node, KeyPath, UpdateReq, Opts0)when Node =:= node() -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()} | emqx_rpc:badrpc(). +update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() -> emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local}); -update(Node, KeyPath, UpdateReq, Opts0) -> - rpc:call(Node, ?MODULE, update, [Node, KeyPath, UpdateReq, Opts0], 5000). +update(Node, KeyPath, UpdateReq, Opts) -> + emqx_conf_proto_v1:update(Node, KeyPath, UpdateReq, Opts). %% @doc remove all value of key path in cluster-override.conf or local-override.conf. -spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -remove(KeyPath, Opts0) -> - Args = [KeyPath, Opts0], - multicall(emqx, remove_config, Args). +remove(KeyPath, Opts) -> + check_cluster_rpc_result(emqx_conf_proto_v1:remove_config(KeyPath, Opts)). %% @doc remove the specified node's key path in local-override.conf. -spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> @@ -107,14 +105,13 @@ remove(KeyPath, Opts0) -> remove(Node, KeyPath, Opts) when Node =:= node() -> emqx:remove_config(KeyPath, Opts#{override_to => local}); remove(Node, KeyPath, Opts) -> - rpc:call(Node, ?MODULE, remove, [KeyPath, Opts]). + emqx_conf_proto_v1:remove_config(Node, KeyPath, Opts). %% @doc reset all value of key path in cluster-override.conf or local-override.conf. -spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -reset(KeyPath, Opts0) -> - Args = [KeyPath, Opts0], - multicall(emqx, reset_config, Args). +reset(KeyPath, Opts) -> + check_cluster_rpc_result(emqx_conf_proto_v1:reset(KeyPath, Opts)). %% @doc reset the specified node's key path in local-override.conf. -spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> @@ -122,7 +119,7 @@ reset(KeyPath, Opts0) -> reset(Node, KeyPath, Opts) when Node =:= node() -> emqx:reset_config(KeyPath, Opts#{override_to => local}); reset(Node, KeyPath, Opts) -> - rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]). + emqx_conf_proto_v1:reset(Node, KeyPath, Opts). -spec gen_doc(file:name_all()) -> ok. gen_doc(File) -> @@ -138,14 +135,14 @@ gen_doc(File) -> %% Internal functions %%-------------------------------------------------------------------- -multicall(M, F, Args) -> - case emqx_cluster_rpc:multicall(M, F, Args) of +check_cluster_rpc_result(Result) -> + case Result of {ok, _TnxId, Res} -> Res; {retry, TnxId, Res, Nodes} -> %% The init MFA return ok, but other nodes failed. %% We return ok and alert an alarm. ?SLOG(error, #{msg => "failed_to_update_config_in_cluster", nodes => Nodes, - tnx_id => TnxId, mfa => {M, F, Args}}), + tnx_id => TnxId}), Res; {error, Error} -> %% all MFA return not ok or {ok, term()}. Error diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl new file mode 100644 index 000000000..1aa6973c3 --- /dev/null +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl @@ -0,0 +1,91 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , get_config/2 + , get_config/3 + , get_all/1 + + , update/3 + , update/4 + , remove_config/2 + , remove_config/3 + + , reset/2 + , reset/3 + ]). + +-include_lib("emqx/include/bpapi.hrl"). + +-type update_config_key_path() :: [emqx_map_lib:config_key(), ...]. + +introduced_in() -> + "5.0.0". + +-spec get_config(node(), emqx_map_lib:config_key_path()) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_config, [KeyPath]). + +-spec get_config(node(), emqx_map_lib:config_key_path(), _Default) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath, Default) -> + rpc:call(Node, emqx, get_config, [KeyPath, Default]). + +-spec get_all(emqx_map_lib:config_key_path()) -> emqx_rpc:multicall_result(). +get_all(KeyPath) -> + rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). + +-spec update(update_config_key_path(), emqx_config:update_request(), + emqx_config:update_opts()) -> emqx_cluster_rpc:multicall_return(). +update(KeyPath, UpdateReq, Opts) -> + emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). + +-spec update(node(), update_config_key_path(), emqx_config:update_request(), + emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +update(Node, KeyPath, UpdateReq, Opts) -> + rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). + +-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> _. +remove_config(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). + +-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +remove_config(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). + +-spec reset(update_config_key_path(), emqx_config:update_opts()) -> + emqx_cluster_rpc:multicall_return(). +reset(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). + +-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +reset(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, reset_config, [KeyPath, Opts]). diff --git a/rebar.config.erl b/rebar.config.erl index ee472119d..95e16835b 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -163,11 +163,6 @@ profiles() -> [ {erl_opts, common_compile_opts()} , {project_app_dirs, project_app_dirs(ce)} ]} - , {lcheck, - [ {erl_opts, common_compile_opts()} - , {project_app_dirs, project_app_dirs(ce)} - , {dialyzer, [{warnings, [unmatched_returns, error_handling]}]} - ]} , {test, [ {deps, test_deps()} , {erl_opts, common_compile_opts() ++ erl_opts_i(ce) }