diff --git a/apps/emqx/include/bpapi.hrl b/apps/emqx/include/bpapi.hrl new file mode 100644 index 000000000..b4e52d50f --- /dev/null +++ b/apps/emqx/include/bpapi.hrl @@ -0,0 +1,6 @@ +-ifndef(EMQX_BPAPI_HRL). +-define(EMQX_BPAPI_HRL, true). + +-compile({parse_transform, emqx_bpapi_trans}). + +-endif. diff --git a/apps/emqx/priv/.gitkeep b/apps/emqx/priv/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index f573ddc87..4ec68d585 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -16,7 +16,7 @@ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.2"}}} - , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} + , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.2"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} diff --git a/apps/emqx/src/bpapi/emqx_bpapi.erl b/apps/emqx/src/bpapi/emqx_bpapi.erl new file mode 100644 index 000000000..d99f0e53b --- /dev/null +++ b/apps/emqx/src/bpapi/emqx_bpapi.erl @@ -0,0 +1,39 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bpapi). + +-export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]). + +-type api() :: atom(). +-type api_version() :: non_neg_integer(). +-type var_name() :: atom(). +-type call() :: {module(), atom(), [var_name()]}. +-type rpc() :: {_From :: call(), _To :: call()}. + +-type bpapi_meta() :: + #{ api := api() + , version := api_version() + , calls := [rpc()] + , casts := [rpc()] + }. + +-callback introduced_in() -> string(). + +-callback deprecated_since() -> string(). + +-callback bpapi_meta() -> bpapi_meta(). + +-optional_callbacks([deprecated_since/0]). diff --git a/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl b/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl new file mode 100644 index 000000000..5023e9b9f --- /dev/null +++ b/apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl @@ -0,0 +1,263 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bpapi_static_checks). + +-export([dump/1, dump/0, check_compat/1]). + +-include_lib("emqx/include/logger.hrl"). + +-type api_dump() :: #{{emqx_bpapi:api(), emqx_bpapi:api_version()} => + #{ calls := [emqx_bpapi:rpc()] + , casts := [emqx_bpapi:rpc()] + }}. + +-type dialyzer_spec() :: {_Type, [_Type]}. + +-type dialyzer_dump() :: #{mfa() => dialyzer_spec()}. + +-type fulldump() :: #{ api => api_dump() + , signatures => dialyzer_dump() + , release => string() + }. + +-type param_types() :: #{emqx_bpapi:var_name() => _Type}. + +%% Applications we wish to ignore in the analysis: +-define(IGNORED_APPS, "gen_rpc, recon, observer_cli, snabbkaffe, ekka, mria"). +%% List of known RPC backend modules: +-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc"). +%% List of functions in the RPC backend modules that we can ignore: +-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0"). + +-define(XREF, myxref). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Functions related to BPAPI compatibility checking +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec check_compat([file:filename()]) -> boolean(). +check_compat(DumpFilenames) -> + put(bpapi_ok, true), + Dumps = lists:map(fun(FN) -> + {ok, [Dump]} = file:consult(FN), + Dump + end, + DumpFilenames), + [check_compat(I, J) || I <- Dumps, J <- Dumps], + erase(bpapi_ok). + +%% Note: sets nok flag +-spec check_compat(fulldump(), fulldump()) -> ok. +check_compat(Dump1, Dump2) -> + check_api_immutability(Dump1, Dump2), + typecheck_apis(Dump1, Dump2). + +%% It's not allowed to change BPAPI modules. Check that no changes +%% have been made. (sets nok flag) +-spec check_api_immutability(fulldump(), fulldump()) -> ok. +check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api := APIs2}) + when Rel2 >= Rel1 -> + %% TODO: Handle API deprecation + _ = maps:map( + fun(Key = {API, Version}, Val) -> + case maps:get(Key, APIs2, undefined) of + Val -> + ok; + undefined -> + setnok(), + ?ERROR("API ~p v~p was removed in release ~p without being deprecated.", + [API, Version, Rel2]); + _Val -> + setnok(), + ?ERROR("API ~p v~p was changed between ~p and ~p. Backplane API should be immutable.", + [API, Version, Rel1, Rel2]) + end + end, + APIs1), + ok; +check_api_immutability(_, _) -> + ok. + +%% Note: sets nok flag +-spec typecheck_apis(fulldump(), fulldump()) -> ok. +typecheck_apis( #{release := CallerRelease, api := CallerAPIs, signatures := CallerSigs} + , #{release := CalleeRelease, signatures := CalleeSigs} + ) -> + AllCalls = lists:flatten([[Calls, Casts] + || #{calls := Calls, casts := Casts} <- maps:values(CallerAPIs)]), + lists:foreach(fun({From, To}) -> + Caller = get_param_types(CallerSigs, From), + Callee = get_param_types(CalleeSigs, To), + %% TODO: check return types + case typecheck_rpc(Caller, Callee) of + [] -> + ok; + TypeErrors -> + setnok(), + [?ERROR("Incompatible RPC call: " + "type of the parameter ~p of RPC call ~s on release ~p " + "is not a subtype of the target function ~s on release ~p", + [Var, format_call(From), CallerRelease, + format_call(To), CalleeRelease]) + || Var <- TypeErrors] + end + end, + AllCalls). + +-spec typecheck_rpc(param_types(), param_types()) -> [emqx_bpapi:var_name()]. +typecheck_rpc(Caller, Callee) -> + maps:fold(fun(Var, CalleeType, Acc) -> + #{Var := CallerType} = Caller, + case erl_types:t_is_subtype(CallerType, CalleeType) of + true -> Acc; + false -> [Var|Acc] + end + end, + [], + Callee). + +-spec get_param_types(dialyzer_dump(), emqx_bpapi:call()) -> param_types(). +get_param_types(Signatures, {M, F, A}) -> + Arity = length(A), + #{{M, F, Arity} := {_RetType, AttrTypes}} = Signatures, + Arity = length(AttrTypes), % assert + maps:from_list(lists:zip(A, AttrTypes)). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Functions related to BPAPI dumping +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +dump() -> + case {filelib:wildcard("*_plt"), filelib:wildcard("_build/emqx*/lib")} of + {[PLT|_], [RelDir|_]} -> + dump(#{plt => PLT, reldir => RelDir}); + _ -> + error("failed to guess run options") + end. + +%% Collect the local BPAPI modules to a dump file +-spec dump(map()) -> boolean(). +dump(Opts) -> + put(bpapi_ok, true), + PLT = prepare(Opts), + %% First we run XREF to find all callers of any known RPC backend: + Callers = find_remote_calls(Opts), + {BPAPICalls, NonBPAPICalls} = lists:partition(fun is_bpapi_call/1, Callers), + warn_nonbpapi_rpcs(NonBPAPICalls), + APIDump = collect_bpapis(BPAPICalls), + DialyzerDump = collect_signatures(PLT, APIDump), + Release = emqx_app:get_release(), + dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}), + erase(bpapi_ok). + +prepare(#{reldir := RelDir, plt := PLT}) -> + ?INFO("Starting xref...", []), + xref:start(?XREF), + filelib:wildcard(RelDir ++ "/*/ebin/") =:= [] andalso + error("No applications found in the release directory. Wrong directory?"), + xref:set_default(?XREF, [{warnings, false}]), + xref:add_release(?XREF, RelDir), + %% Now to the dialyzer stuff: + ?INFO("Loading PLT...", []), + dialyzer_plt:from_file(PLT). + +find_remote_calls(_Opts) -> + Query = "XC | (A - [" ?IGNORED_APPS "]:App) + || ([" ?RPC_MODULES "] : Mod - " ?IGNORED_RPC_CALLS ")", + {ok, Calls} = xref:q(?XREF, Query), + ?INFO("Calls to RPC modules ~p", [Calls]), + {Callers, _Callees} = lists:unzip(Calls), + Callers. + +-spec warn_nonbpapi_rpcs([mfa()]) -> ok. +warn_nonbpapi_rpcs([]) -> + ok; +warn_nonbpapi_rpcs(L) -> + setnok(), + lists:foreach(fun({M, F, A}) -> + ?ERROR("~p:~p/~p does a remote call outside of a dedicated " + "backplane API module. " + "It may break during rolling cluster upgrade", + [M, F, A]) + end, + L). + +-spec is_bpapi_call(mfa()) -> boolean(). +is_bpapi_call({Module, _Function, _Arity}) -> + case catch Module:bpapi_meta() of + #{api := _} -> true; + _ -> false + end. + +-spec dump_api(fulldump()) -> ok. +dump_api(Term = #{api := _, signatures := _, release := Release}) -> + Filename = filename:join(code:priv_dir(emqx), Release ++ ".bpapi"), + file:write_file(Filename, io_lib:format("~0p.", [Term])). + +-spec collect_bpapis([mfa()]) -> api_dump(). +collect_bpapis(L) -> + Modules = lists:usort([M || {M, _F, _A} <- L]), + lists:foldl(fun(Mod, Acc) -> + #{ api := API + , version := Vsn + , calls := Calls + , casts := Casts + } = Mod:bpapi_meta(), + Acc#{{API, Vsn} => #{ calls => Calls + , casts => Casts + }} + end, + #{}, + Modules). + +-spec collect_signatures(_PLT, api_dump()) -> dialyzer_dump(). +collect_signatures(PLT, APIs) -> + maps:fold(fun(_APIAndVersion, #{calls := Calls, casts := Casts}, Acc0) -> + Acc1 = lists:foldl(fun enrich/2, {Acc0, PLT}, Calls), + {Acc, PLT} = lists:foldl(fun enrich/2, Acc1, Casts), + Acc + end, + #{}, + APIs). + +%% Add information about the call types from the PLT +-spec enrich(emqx_bpapi:rpc(), {dialyzer_dump(), _PLT}) -> {dialyzer_dump(), _PLT}. +enrich({From0, To0}, {Acc0, PLT}) -> + From = call_to_mfa(From0), + To = call_to_mfa(To0), + case {dialyzer_plt:lookup(PLT, From), dialyzer_plt:lookup(PLT, To)} of + {{value, TFrom}, {value, TTo}} -> + Acc = Acc0#{ From => TFrom + , To => TTo + }, + {Acc, PLT}; + {{value, _}, none} -> + setnok(), + ?CRITICAL("Backplane API function ~s calls a missing remote function ~s", + [format_call(From0), format_call(To0)]), + error(missing_target) + end. + +-spec call_to_mfa(emqx_bpapi:call()) -> mfa(). +call_to_mfa({M, F, A}) -> + {M, F, length(A)}. + +format_call({M, F, A}) -> + io_lib:format("~p:~p/~p", [M, F, length(A)]). + +setnok() -> + put(bpapi_ok, false). diff --git a/apps/emqx/src/bpapi/emqx_bpapi_trans.erl b/apps/emqx/src/bpapi/emqx_bpapi_trans.erl new file mode 100644 index 000000000..f39215069 --- /dev/null +++ b/apps/emqx/src/bpapi/emqx_bpapi_trans.erl @@ -0,0 +1,195 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @hidden This parse transform generates BPAPI metadata function for +%% a module, and helps dialyzer typechecking RPC calls +-module(emqx_bpapi_trans). + +-export([parse_transform/2, format_error/1]). + +%%-define(debug, true). + +-define(META_FUN, bpapi_meta). + +-type semantics() :: call | cast. + +-record(s, + { api :: emqx_bpapi:api() + , module :: module() + , version :: emqx_bpapi:api_version() | undefined + , targets = [] :: [{semantics(), emqx_bpapi:call(), emqx_bpapi:call()}] + , errors = [] :: list() + , file + }). + +format_error(invalid_name) -> + "BPAPI module name should follow _proto_v pattern"; +format_error({invalid_fun, Name, Arity}) -> + io_lib:format("malformed function ~p/~p. " + "BPAPI functions should have exactly one clause " + "and call (emqx_|e)rpc at the top level", + [Name, Arity]). + +parse_transform(Forms, _Options) -> + log("Original:~n~p", [Forms]), + State = #s{file = File} = lists:foldl(fun go/2, #s{}, Forms), + log("parse_trans state: ~p", [State]), + case check(State) of + [] -> + finalize(Forms, State); + Errors -> + {error, [{File, [{Line, ?MODULE, Msg} || {Line, Msg} <- Errors]}], []} + end. + +%% Scan erl_forms: +go({attribute, _, file, {File, _}}, S) -> + S#s{file = File}; +go({attribute, Line, module, Mod}, S) -> + case api_and_version(Mod) of + {ok, API, Vsn} -> S#s{api = API, version = Vsn, module = Mod}; + error -> push_err(Line, invalid_name, S) + end; +go({function, _Line, introduced_in, 0, _}, S) -> + S; +go({function, _Line, deprecated_since, 0, _}, S) -> + S; +go({function, Line, Name, Arity, Clauses}, S) -> + analyze_fun(Line, Name, Arity, Clauses, S); +go(_, S) -> + S. + +check(#s{errors = Err}) -> + %% Post-processing checks can be placed here + Err. + +finalize(Forms, S) -> + {Attrs, Funcs} = lists:splitwith(fun is_attribute/1, Forms), + AST = mk_meta_fun(S), + log("Meta fun:~n~p", [AST]), + Attrs ++ [mk_export()] ++ [AST|Funcs]. + +mk_meta_fun(#s{api = API, version = Vsn, targets = Targets}) -> + Line = 0, + Calls = [{From, To} || {call, From, To} <- Targets], + Casts = [{From, To} || {cast, From, To} <- Targets], + Ret = typerefl_quote:const(Line, #{ api => API + , version => Vsn + , calls => Calls + , casts => Casts + }), + {function, Line, ?META_FUN, _Arity = 0, + [{clause, Line, _Args = [], _Guards = [], + [Ret]}]}. + +mk_export() -> + {attribute, 0, export, [{?META_FUN, 0}]}. + +is_attribute({attribute, _Line, _Attr, _Val}) -> true; +is_attribute(_) -> false. + +%% Extract the target function of the RPC call +analyze_fun(Line, Name, Arity, [{clause, Line, Head, _Guards, Exprs}], S) -> + analyze_exprs(Line, Name, Arity, Head, Exprs, S); +analyze_fun(Line, Name, Arity, _Clauses, S) -> + invalid_fun(Line, Name, Arity, S). + +analyze_exprs(Line, Name, Arity, Head, Exprs, S) -> + log("~p/~p (~p):~n~p", [Name, Arity, Head, Exprs]), + try + [{call, _, CallToBackend, CallArgs}] = Exprs, + OuterArgs = extract_outer_args(Head), + Key = {S#s.module, Name, OuterArgs}, + {Semantics, Target} = extract_target_call(CallToBackend, CallArgs), + push_target({Semantics, Key, Target}, S) + catch + _:Err:Stack -> + log("Failed to process function call:~n~s~nStack: ~p", [Err, Stack]), + invalid_fun(Line, Name, Arity, S) + end. + +-spec extract_outer_args([erl_parse:abstract_form()]) -> [atom()]. +extract_outer_args(Abs) -> + lists:map(fun({var, _, Var}) -> + Var; + ({match, _, {var, _, Var}, _}) -> + Var; + ({match, _, _, {var, _, Var}}) -> + Var + end, + Abs). + +-spec extract_target_call(_AST, [_AST]) -> {semantics(), emqx_bpapi:call()}. +extract_target_call(RPCBackend, OuterArgs) -> + {Semantics, {atom, _, M}, {atom, _, F}, A} = extract_mfa(RPCBackend, OuterArgs), + {Semantics, {M, F, list_to_args(A)}}. + +-define(BACKEND(MOD, FUN), {remote, _, {atom, _, MOD}, {atom, _, FUN}}). +-define(IS_RPC(MOD), (MOD =:= erpc orelse MOD =:= rpc)). + +%% gen_rpc: +extract_mfa(?BACKEND(gen_rpc, _), _) -> + %% gen_rpc has an extremely messy API, thankfully it's fully wrapped + %% by emqx_rpc, so we simply forbid direct calls to it: + error("direct call to gen_rpc"); +%% emqx_rpc: +extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Node, M, F, A]) -> + {call_or_cast(CallOrCast), M, F, A}; +extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Tag, _Node, M, F, A]) -> + {call_or_cast(CallOrCast), M, F, A}; +%% (e)rpc: +extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A]) when ?IS_RPC(RPC) -> + {call_or_cast(CallOrCast), M, F, A}; +extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A, _Timeout]) when ?IS_RPC(RPC) -> + {call_or_cast(CallOrCast), M, F, A}; +extract_mfa(_, _) -> + error("unrecognized RPC call"). + +call_or_cast(cast) -> cast; +call_or_cast(multicast) -> cast; +call_or_cast(multicall) -> call; +call_or_cast(call) -> call. + +list_to_args({cons, _, {var, _, A}, T}) -> + [A|list_to_args(T)]; +list_to_args({nil, _}) -> + []. + +invalid_fun(Line, Name, Arity, S) -> + push_err(Line, {invalid_fun, Name, Arity}, S). + +push_err(Line, Err, S = #s{errors = Errs}) -> + S#s{errors = [{Line, Err}|Errs]}. + +push_target(Target, S = #s{targets = Targets}) -> + S#s{targets = [Target|Targets]}. + +-spec api_and_version(module()) -> {ok, emqx_bpapi:api(), emqx_bpapi:version()} | error. +api_and_version(Module) -> + Opts = [{capture, all_but_first, list}], + case re:run(atom_to_list(Module), "(.*)_proto_v([0-9]+)$", Opts) of + {match, [API, VsnStr]} -> + {ok, list_to_atom(API), list_to_integer(VsnStr)}; + nomatch -> + error + end. + +-ifdef(debug). +log(Fmt, Args) -> + io:format(user, "!! " ++ Fmt ++ "~n", Args). +-else. +log(_, _) -> + ok. +-endif. diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 9dbfb0b43..bdb6acd4f 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -300,7 +300,7 @@ forward(Node, To, Delivery, sync) -> end. -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). -dispatch(Topic, Delivery) -> +dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) -> case emqx:is_running() of true -> do_dispatch(Topic, Delivery); diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 527123745..ad7e9891f 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -14,9 +14,11 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc wrap gen_rpc? -module(emqx_rpc). +%% Note: please don't forget to add new API functions to +%% `emqx_bpapi_trans:extract_mfa' + -export([ call/4 , call/5 , cast/4 @@ -30,27 +32,25 @@ , rpc_nodes/1 ]}). --define(RPC, gen_rpc). - -define(DefaultClientNum, 1). call(Node, Mod, Fun, Args) -> - filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)). + filter_result(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)). call(Key, Node, Mod, Fun, Args) -> - filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)). + filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args)). multicall(Nodes, Mod, Fun, Args) -> - filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)). + filter_result(gen_rpc:multicall(rpc_nodes(Nodes), Mod, Fun, Args)). multicall(Key, Nodes, Mod, Fun, Args) -> - filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)). + filter_result(gen_rpc:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)). cast(Node, Mod, Fun, Args) -> - filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)). + filter_result(gen_rpc:cast(rpc_node(Node), Mod, Fun, Args)). cast(Key, Node, Mod, Fun, Args) -> - filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)). + filter_result(gen_rpc:cast(rpc_node({Key, Node}), Mod, Fun, Args)). rpc_node(Node) when is_atom(Node) -> {Node, rand:uniform(max_client_num())}; diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl new file mode 100644 index 000000000..d5725b8ac --- /dev/null +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -0,0 +1,38 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_broker_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + , forward/3 + , forward_async/3 + ]). + +-include("bpapi.hrl"). +-include("emqx.hrl"). + +introduced_in() -> + "5.0.0". + +-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). +forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> + emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). + +-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok. +forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> + emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). diff --git a/mix.exs b/mix.exs index b4b9ef701..ca66f0c6d 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:esockd, github: "emqx/esockd", tag: "5.9.0", override: true}, {:mria, github: "emqx/mria", tag: "0.1.5", override: true}, {:ekka, github: "emqx/ekka", tag: "0.11.2", override: true}, - {:gen_rpc, github: "emqx/gen_rpc", tag: "2.5.1", override: true}, + {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.0", override: true}, {:minirest, github: "emqx/minirest", tag: "1.2.9", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.2"}, {:replayq, "0.3.3", override: true}, diff --git a/rebar.config b/rebar.config index 17d4a0ce3..49e37e4af 100644 --- a/rebar.config +++ b/rebar.config @@ -55,7 +55,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {mria, {git, "https://github.com/emqx/mria", {tag, "0.1.5"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.2"}}} - , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} + , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, "0.3.3"}