From 5c2a559991c38071ef8428434ca7bbd54c675b95 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Tue, 4 Jan 2022 10:55:16 +0100 Subject: [PATCH] feat(bpapi): Initial commit --- apps/emqx/src/emqx_broker.erl | 2 +- apps/emqx/src/emqx_rpc.erl | 4 +- apps/emqx/src/proto/emqx_broker_proto_v1.erl | 34 +++ apps/emqx_bpapi/README.md | 5 + apps/emqx_bpapi/include/bpapi.hrl | 6 + apps/emqx_bpapi/priv/.gitkeep | 0 apps/emqx_bpapi/src/emqx_bpapi.app.src | 15 ++ apps/emqx_bpapi/src/emqx_bpapi.erl | 54 +++++ .../src/emqx_bpapi_static_checks.erl | 147 +++++++++++++ apps/emqx_bpapi/src/emqx_bpapi_trans.erl | 200 ++++++++++++++++++ 10 files changed, 465 insertions(+), 2 deletions(-) create mode 100644 apps/emqx/src/proto/emqx_broker_proto_v1.erl create mode 100644 apps/emqx_bpapi/README.md create mode 100644 apps/emqx_bpapi/include/bpapi.hrl create mode 100644 apps/emqx_bpapi/priv/.gitkeep create mode 100644 apps/emqx_bpapi/src/emqx_bpapi.app.src create mode 100644 apps/emqx_bpapi/src/emqx_bpapi.erl create mode 100644 apps/emqx_bpapi/src/emqx_bpapi_static_checks.erl create mode 100644 apps/emqx_bpapi/src/emqx_bpapi_trans.erl diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 9dbfb0b43..bdb6acd4f 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -300,7 +300,7 @@ forward(Node, To, Delivery, sync) -> end. -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). -dispatch(Topic, Delivery) -> +dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) -> case emqx:is_running() of true -> do_dispatch(Topic, Delivery); diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 0ea2814b0..ad7e9891f 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -14,9 +14,11 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc wrap gen_rpc? -module(emqx_rpc). +%% Note: please don't forget to add new API functions to +%% `emqx_bpapi_trans:extract_mfa' + -export([ call/4 , call/5 , cast/4 diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl new file mode 100644 index 000000000..a40884c38 --- /dev/null +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_broker_proto_v1). + +-introduced_in("5.0.0"). + +-export([ forward/3 + , forward_async/3 + ]). + +-include_lib("emqx_bpapi/include/bpapi.hrl"). +-include("emqx.hrl"). + +-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> + emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). + +-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok. +forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> + emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). diff --git a/apps/emqx_bpapi/README.md b/apps/emqx_bpapi/README.md new file mode 100644 index 000000000..78f529a4c --- /dev/null +++ b/apps/emqx_bpapi/README.md @@ -0,0 +1,5 @@ +emqx_bpapi +===== + +A library that helps maintaining EMQX's backplane API backward and +forward compatibility. diff --git a/apps/emqx_bpapi/include/bpapi.hrl b/apps/emqx_bpapi/include/bpapi.hrl new file mode 100644 index 000000000..b4e52d50f --- /dev/null +++ b/apps/emqx_bpapi/include/bpapi.hrl @@ -0,0 +1,6 @@ +-ifndef(EMQX_BPAPI_HRL). +-define(EMQX_BPAPI_HRL, true). + +-compile({parse_transform, emqx_bpapi_trans}). + +-endif. diff --git a/apps/emqx_bpapi/priv/.gitkeep b/apps/emqx_bpapi/priv/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx_bpapi/src/emqx_bpapi.app.src b/apps/emqx_bpapi/src/emqx_bpapi.app.src new file mode 100644 index 000000000..45a3efc10 --- /dev/null +++ b/apps/emqx_bpapi/src/emqx_bpapi.app.src @@ -0,0 +1,15 @@ +{application, emqx_bpapi, + [{description, "A library for verifying safety of RPC calls"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, + [kernel, + stdlib, + typerefl %% Just for some metaprogramming utils + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache 2.0"]}, + {links, []} + ]}. diff --git a/apps/emqx_bpapi/src/emqx_bpapi.erl b/apps/emqx_bpapi/src/emqx_bpapi.erl new file mode 100644 index 000000000..284de4dfc --- /dev/null +++ b/apps/emqx_bpapi/src/emqx_bpapi.erl @@ -0,0 +1,54 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bpapi). + +-export([parse_semver/1, api_and_version/1]). + +-export_type([var_name/0, call/0, rpc/0, bpapi_meta/0]). + +-type semver() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}. + +-type api() :: atom(). +-type api_version() :: non_neg_integer(). +-type var_name() :: atom(). +-type call() :: {module(), atom(), [var_name()]}. +-type rpc() :: {_From :: call(), _To :: call()}. + +-type bpapi_meta() :: + #{ api := api() + , version := api_version() + , calls := [rpc()] + , casts := [rpc()] + }. + +-spec parse_semver(string()) -> {ok, semver()} + | false. +parse_semver(Str) -> + Opts = [{capture, all_but_first, list}], + case re:run(Str, "^([0-9]+)\\.([0-9]+)\\.([0-9]+)$", Opts) of + {match, [A, B, C]} -> {ok, {list_to_integer(A), list_to_integer(B), list_to_integer(C)}}; + nomatch -> error + end. + +-spec api_and_version(module()) -> {atom(), non_neg_integer()}. +api_and_version(Module) -> + Opts = [{capture, all_but_first, list}], + case re:run(atom_to_list(Module), "(.*)_proto_v([0-9]+)$", Opts) of + {match, [API, VsnStr]} -> + {ok, list_to_atom(API), list_to_integer(VsnStr)}; + nomatch -> + error(Module) + end. diff --git a/apps/emqx_bpapi/src/emqx_bpapi_static_checks.erl b/apps/emqx_bpapi/src/emqx_bpapi_static_checks.erl new file mode 100644 index 000000000..de252b2b2 --- /dev/null +++ b/apps/emqx_bpapi/src/emqx_bpapi_static_checks.erl @@ -0,0 +1,147 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bpapi_static_checks). + +-export([run/1, run/0]). + +-include_lib("emqx/include/logger.hrl"). + +%% `emqx_bpapi:call' enriched with dialyzer spec +-type typed_call() :: {emqx_bpapi:call(), _DialyzerSpec}. +-type typed_rpc() :: {typed_call(), typed_call()}. + +-type fulldump() :: #{emqx_bpapi:api() => + #{emqx_bpapi:api_version() => + #{ calls := [typed_rpc()] + , casts := [typed_rpc()] + } + }}. + +%% Applications we wish to ignore in the analysis: +-define(IGNORED_APPS, "gen_rpc, recon, observer_cli, snabbkaffe, ekka, mria"). +%% List of known RPC backend modules: +-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). +%% List of functions in the RPC backend modules that we can ignore: +-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0"). + +-define(XREF, myxref). + +run() -> + case {filelib:wildcard("*_plt"), filelib:wildcard("_build/emqx*/lib")} of + {[PLT|_], [RelDir|_]} -> + run(#{plt => PLT, reldir => RelDir}); + _ -> + error("failed to guess run options") + end. + +-spec run(map()) -> boolean(). +run(Opts) -> + put(bpapi_ok, true), + PLT = prepare(Opts), + %% First we run XREF to find all callers of any known RPC backend: + Callers = find_remote_calls(Opts), + {BPAPICalls, NonBPAPICalls} = lists:partition(fun is_bpapi_call/1, Callers), + warn_nonbpapi_rpcs(NonBPAPICalls), + CombinedAPI = collect_bpapis(BPAPICalls, PLT), + dump_api(CombinedAPI), + erase(bpapi_ok). + +prepare(#{reldir := RelDir, plt := PLT}) -> + ?INFO("Starting xref...", []), + xref:start(?XREF), + filelib:wildcard(RelDir ++ "/*/ebin/") =:= [] andalso + error("No applications found in the release directory. Wrong directory?"), + xref:set_default(?XREF, [{warnings, false}]), + xref:add_release(?XREF, RelDir), + %% Now to the dialyzer stuff: + ?INFO("Loading PLT...", []), + dialyzer_plt:from_file(PLT). + +find_remote_calls(_Opts) -> + Query = "XC | (A - [" ?IGNORED_APPS "]:App) + || ([" ?RPC_MODULES "] : Mod - " ?IGNORED_RPC_CALLS ")", + {ok, Calls} = xref:q(?XREF, Query), + ?INFO("Calls to RPC modules ~p", [Calls]), + {Callers, _Callees} = lists:unzip(Calls), + Callers. + +-spec warn_nonbpapi_rpcs([mfa()]) -> ok. +warn_nonbpapi_rpcs([]) -> + ok; +warn_nonbpapi_rpcs(L) -> + setnok(), + lists:foreach(fun({M, F, A}) -> + ?ERROR("~p:~p/~p does a remote call outside of a dedicated " + "backplane API module. " + "It may break during rolling cluster upgrade", + [M, F, A]) + end, + L). + +-spec is_bpapi_call(mfa()) -> boolean(). +is_bpapi_call({Module, _Function, _Arity}) -> + case catch Module:bpapi_meta() of + #{api := _} -> true; + _ -> false + end. + +-spec dump_api(fulldump()) -> ok. +dump_api(Term) -> + Filename = filename:join(code:priv_dir(emqx_bpapi), emqx_app:get_release() ++ ".bpapi"), + file:write_file(Filename, io_lib:format("~0p.", [Term])). + +-spec collect_bpapis([mfa()], _PLT) -> fulldump(). +collect_bpapis(L, PLT) -> + Modules = lists:usort([M || {M, _F, _A} <- L]), + lists:foldl(fun(Mod, Acc) -> + #{ api := API + , version := Vsn + , calls := Calls0 + , casts := Casts0 + } = Mod:bpapi_meta(), + Calls = enrich(PLT, Calls0), + Casts = enrich(PLT, Casts0), + Acc#{API => #{Vsn => #{ calls => Calls + , casts => Casts + }}} + end, + #{}, + Modules + ). + +%% Add information about types from the PLT +-spec enrich(_PLT, [emqx_bpapi:rpc()]) -> [typed_rpc()]. +enrich(PLT, Calls) -> + [case {lookup_type(PLT, From), lookup_type(PLT, To)} of + {{value, TFrom}, {value, TTo}} -> + {{From, TFrom}, {To, TTo}}; + {_, none} -> + setnok(), + ?CRITICAL("Backplane API function ~s calls a missing remote function ~s", + [format_call(From), format_call(To)]), + error(missing_target) + end + || {From, To} <- Calls]. + +lookup_type(PLT, {M, F, A}) -> + dialyzer_plt:lookup(PLT, {M, F, length(A)}). + +format_call({M, F, A}) -> + io_lib:format("~p:~p/~p", [M, F, length(A)]). + +setnok() -> + put(bpapi_ok, false). diff --git a/apps/emqx_bpapi/src/emqx_bpapi_trans.erl b/apps/emqx_bpapi/src/emqx_bpapi_trans.erl new file mode 100644 index 000000000..bdcab83af --- /dev/null +++ b/apps/emqx_bpapi/src/emqx_bpapi_trans.erl @@ -0,0 +1,200 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @hidden This parse transform generates BPAPI metadata function for +%% a module, and helps dialyzer typechecking RPC calls +-module(emqx_bpapi_trans). + +-export([parse_transform/2, format_error/1]). + +%%-define(debug, true). + +-define(META_FUN, bpapi_meta). + +-type semantics() :: call | cast. + +-record(s, + { api :: atom() + , module :: atom() + , version :: non_neg_integer() | undefined + , introduced_in :: emqx_bpapi:semver() | undefined + , deprecated_since :: emqx_bpapi:semver() | undefined + , targets = [] :: [{semantics(), emqx_bpapi:call(), emqx_bpapi:call()}] + , errors = [] :: [string()] + , file + }). + +format_error(invalid_name) -> + "BPAPI module name should follow _proto_v pattern"; +format_error(invalid_introduced_in) -> + "-introduced_in attribute should be present and its value should be a semver string"; +format_error(invalid_deprecated_since) -> + "value of -deprecated_since attribute should be a semver string"; +format_error({invalid_fun, Name, Arity}) -> + io_lib:format("malformed function ~p/~p. " + "BPAPI functions should have exactly one clause " + "and call (emqx_|e)rpc at the top level", + [Name, Arity]). + +parse_transform(Forms, _Options) -> + log("Original:~n~p", [Forms]), + State = #s{file = File} = lists:foldl(fun go/2, #s{}, Forms), + log("parse_trans state: ~p", [State]), + case check(State) of + [] -> + finalize(Forms, State); + Errors -> + {error, [{File, [{Line, ?MODULE, Msg} || {Line, Msg} <- Errors]}], []} + end. + +%% Scan erl_forms: +go({attribute, _, file, {File, _}}, S) -> + S#s{file = File}; +go({attribute, Line, module, Mod}, S) -> + case emqx_bpapi:api_and_version(Mod) of + {ok, API, Vsn} -> S#s{api = API, version = Vsn, module = Mod}; + error -> push_err(Line, invalid_name, S) + end; +go({attribute, _Line, introduced_in, Str}, S) -> + case is_list(Str) andalso emqx_bpapi:parse_semver(Str) of + {ok, Vsn} -> S#s{introduced_in = Vsn}; + false -> S %% Don't report error here, it's done in check/1 + end; +go({attribute, Line, deprecated_since, Str}, S) -> + case is_list(Str) andalso emqx_bpapi:parse_semver(Str) of + {ok, Vsn} -> S#s{deprecated_since = Vsn}; + false -> push_err(Line, invalid_deprecated_since, S) + end; +go({function, Line, Name, Arity, Clauses}, S) -> + analyze_fun(Line, Name, Arity, Clauses, S); +go(_, S) -> + S. + +check(#s{errors = Err0, introduced_in = II}) -> + [{none, invalid_introduced_in} || II =:= undefined] ++ + Err0. + +finalize(Forms, S) -> + {Attrs, Funcs} = lists:splitwith(fun is_attribute/1, Forms), + AST = mk_meta_fun(S), + log("Meta fun:~n~p", [AST]), + Attrs ++ [mk_export()] ++ [AST|Funcs]. + +mk_meta_fun(#s{api = API, version = Vsn, targets = Targets}) -> + Line = 0, + Calls = [{From, To} || {call, From, To} <- Targets], + Casts = [{From, To} || {cast, From, To} <- Targets], + Ret = typerefl_quote:const(Line, #{ api => API + , version => Vsn + , calls => Calls + , casts => Casts + }), + {function, Line, ?META_FUN, _Arity = 0, + [{clause, Line, _Args = [], _Guards = [], + [Ret]}]}. + +mk_export() -> + {attribute, 0, export, [{?META_FUN, 0}]}. + +is_attribute({attribute, _Line, _Attr, _Val}) -> true; +is_attribute(_) -> false. + +%% Extract the target function of the RPC call +analyze_fun(Line, Name, Arity, [{clause, Line, Head, _Guards, Exprs}], S) -> + analyze_exprs(Line, Name, Arity, Head, Exprs, S); +analyze_fun(Line, Name, Arity, _Clauses, S) -> + invalid_fun(Line, Name, Arity, S). + +analyze_exprs(Line, Name, Arity, Head, Exprs, S) -> + log("~p/~p (~p):~n~p", [Name, Arity, Head, Exprs]), + try + [{call, _, CallToBackend, CallArgs}] = Exprs, + OuterArgs = extract_outer_args(Head), + Key = {S#s.module, Name, OuterArgs}, + {Semantics, Target} = extract_target_call(CallToBackend, CallArgs), + push_target({Semantics, Key, Target}, S) + catch + _:Err:Stack -> + log("Failed to process function call:~n~s~nStack: ~p", [Err, Stack]), + invalid_fun(Line, Name, Arity, S) + end. + +-spec extract_outer_args(erl_parse:abstract_form()) -> [atom()]. +extract_outer_args(Abs) -> + lists:map(fun({var, _, Var}) -> + Var; + ({match, _, {var, _, Var}, _}) -> + Var; + ({match, _, _, {var, _, Var}}) -> + Var + end, + Abs). + +-spec extract_target_call(Abs, [Abs]) -> {semantics(), emqx_bpapi:call()} + when Abs :: erl_parse:abstract_form(). +extract_target_call(RPCBackend, OuterArgs) -> + {Semantics, {atom, _, M}, {atom, _, F}, A} = extract_mfa(RPCBackend, OuterArgs), + {Semantics, {M, F, list_to_args(A)}}. + +-define(BACKEND(MOD, FUN), {remote, _, {atom, _, MOD}, {atom, _, FUN}}). +-define(IS_RPC(MOD), (MOD =:= erpc orelse MOD =:= rpc)). + +-spec extract_mfa(Abs, #s{}) -> {call | cast, Abs, Abs, Abs} + when Abs :: erl_parse:abstract_form(). +%% gen_rpc: +extract_mfa(?BACKEND(gen_rpc, _), _) -> + %% gen_rpc has an extremely messy API, thankfully it's fully wrapped + %% by emqx_rpc, so we simply forbid direct calls to it: + error("direct call to gen_rpc"); +%% emqx_rpc: +extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Node, M, F, A]) -> + {call_or_cast(CallOrCast), M, F, A}; +extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Tag, _Node, M, F, A]) -> + {call_or_cast(CallOrCast), M, F, A}; +%% (e)rpc: +extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A]) when ?IS_RPC(RPC) -> + {call_or_cast(CallOrCast), M, F, A}; +extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A, _Timeout]) when ?IS_RPC(RPC) -> + {call_or_cast(CallOrCast), M, F, A}; +extract_mfa(_, _) -> + error("unrecognized RPC call"). + +call_or_cast(cast) -> cast; +call_or_cast(multicast) -> cast; +call_or_cast(multicall) -> call; +call_or_cast(call) -> call. + +list_to_args({cons, _, {var, _, A}, T}) -> + [A|list_to_args(T)]; +list_to_args({nil, _}) -> + []. + +invalid_fun(Line, Name, Arity, S) -> + push_err(Line, {invalid_fun, Name, Arity}, S). + +push_err(Line, Err, S = #s{errors = Errs}) -> + S#s{errors = [{Line, Err}|Errs]}. + +push_target(Target, S = #s{targets = Targets}) -> + S#s{targets = [Target|Targets]}. + +-ifdef(debug). +log(Fmt, Args) -> + io:format(user, "!! " ++ Fmt ++ "~n", Args). +-else. +log(_, _) -> + ok. +-endif.