feat(bpapi): Version negotiation
This commit is contained in:
parent
2522a36b0c
commit
bdc6d18589
2
Makefile
2
Makefile
|
@ -57,7 +57,7 @@ ct: $(REBAR) conf-segs
|
||||||
|
|
||||||
.PHONY: static_checks
|
.PHONY: static_checks
|
||||||
static_checks:
|
static_checks:
|
||||||
@$(REBAR) as check do xref, dialyzer, ct --suite apps/emqx/test/emqx_bpapi_suite --readable false
|
@$(REBAR) as check do xref, dialyzer, ct --suite apps/emqx/test/emqx_static_checks --readable false
|
||||||
|
|
||||||
APPS=$(shell $(CURDIR)/scripts/find-apps.sh)
|
APPS=$(shell $(CURDIR)/scripts/find-apps.sh)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
{emqx,1}.
|
||||||
|
{emqx_bridge,1}.
|
||||||
|
{emqx_broker,1}.
|
||||||
|
{emqx_cm,1}.
|
||||||
|
{emqx_conf,1}.
|
||||||
|
{emqx_dashboard,1}.
|
||||||
|
{emqx_exhook,1}.
|
||||||
|
{emqx_gateway_cm,1}.
|
||||||
|
{emqx_management,1}.
|
||||||
|
{emqx_mgmt_trace,1}.
|
||||||
|
{emqx_persistent_session,1}.
|
||||||
|
{emqx_plugin_libs,1}.
|
||||||
|
{emqx_prometheus,1}.
|
||||||
|
{emqx_resource,1}.
|
||||||
|
{emqx_statsd,1}.
|
||||||
|
{emqx_telemetry,1}.
|
||||||
|
{emqx_topic_metrics,1}.
|
|
@ -91,11 +91,11 @@ is_running(Node) ->
|
||||||
rpc:call(Node, emqx, is_running, []).
|
rpc:call(Node, emqx, is_running, []).
|
||||||
```
|
```
|
||||||
|
|
||||||
The following limitations apply to these modules:
|
## Backplane module life cycle
|
||||||
|
|
||||||
1. Once the minor EMQX release stated in `introduced_in()` callback of
|
1. Once the minor EMQX release stated in `introduced_in()` callback of
|
||||||
a module reaches GA, the module is frozen. No changes are allowed
|
a module reaches GA, the module is frozen. Only very specific
|
||||||
there, except for adding `deprecated_since()` callback.
|
changes are allowed in these modules, see next chapter.
|
||||||
2. If the backplane API was deprecated in a release `maj.min.0`, then
|
2. If the backplane API was deprecated in a release `maj.min.0`, then
|
||||||
it can be removed in release `maj.min+1.0`.
|
it can be removed in release `maj.min+1.0`.
|
||||||
3. Old versions of the protocols can be dropped in the next major
|
3. Old versions of the protocols can be dropped in the next major
|
||||||
|
@ -104,6 +104,41 @@ The following limitations apply to these modules:
|
||||||
This way we ensure each minor EMQX release is backward-compatible with
|
This way we ensure each minor EMQX release is backward-compatible with
|
||||||
the previous one.
|
the previous one.
|
||||||
|
|
||||||
|
## Changes to BPAPI modules after GA
|
||||||
|
|
||||||
|
Once the backplane API module is frozen, only certain types of changes
|
||||||
|
can be made there.
|
||||||
|
|
||||||
|
- Adding or removing functions is _forbidden_
|
||||||
|
- Changing the RPC target function is _forbidden_
|
||||||
|
- Renaming the function parameters should be safe in theory, but
|
||||||
|
currently the static check will complain when it happens
|
||||||
|
- Renaming the types of the function parameters and the return type is
|
||||||
|
_allowed_
|
||||||
|
- Changing the structure of the function parameters' types is
|
||||||
|
_forbidden_
|
||||||
|
|
||||||
|
To clarify the last statement: BPAPI static checks only verify the
|
||||||
|
structure of the type, so the following definitions are considered
|
||||||
|
equivalent, and replacing one with another is perfectly fine:
|
||||||
|
|
||||||
|
```erlang
|
||||||
|
-type foo() :: inet:ip6_address().
|
||||||
|
|
||||||
|
-type foo() :: {0..65535, 0..65535, 0..65535, 0..65535, 0..65535, 0..65535, 0..65535, 0..65535}.
|
||||||
|
```
|
||||||
|
|
||||||
# Protocol version negotiation
|
# Protocol version negotiation
|
||||||
|
|
||||||
TODO
|
`emqx_bpapi` module provides APIs that business applications can use
|
||||||
|
to negotiate protocol version:
|
||||||
|
|
||||||
|
`emqx_bpapi:supported_version(Node, ProtocolId)` returns maximum
|
||||||
|
protocol version supported by the remote node
|
||||||
|
`Node`. `emqx_bpapi:supported_version(ProtocolId)` returns maximum
|
||||||
|
protocol version that is supported by all nodes in the cluster. It can
|
||||||
|
be useful when the protocol involves multicalls or multicasts.
|
||||||
|
|
||||||
|
The business logic can assume that the supported protocol version is
|
||||||
|
not going to change on the remote node, while it is running. So it is
|
||||||
|
free to cache it for the duration of the session.
|
||||||
|
|
|
@ -15,8 +15,15 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_bpapi).
|
-module(emqx_bpapi).
|
||||||
|
|
||||||
|
%% API:
|
||||||
|
-export([start/0, announce/1, supported_version/1, supported_version/2,
|
||||||
|
versions_file/1]).
|
||||||
|
|
||||||
-export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]).
|
-export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]).
|
||||||
|
|
||||||
|
-include("emqx.hrl").
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
-type api() :: atom().
|
-type api() :: atom().
|
||||||
-type api_version() :: non_neg_integer().
|
-type api_version() :: non_neg_integer().
|
||||||
-type var_name() :: atom().
|
-type var_name() :: atom().
|
||||||
|
@ -30,6 +37,8 @@
|
||||||
, casts := [rpc()]
|
, casts := [rpc()]
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-include("emqx_bpapi.hrl").
|
||||||
|
|
||||||
-callback introduced_in() -> string().
|
-callback introduced_in() -> string().
|
||||||
|
|
||||||
-callback deprecated_since() -> string().
|
-callback deprecated_since() -> string().
|
||||||
|
@ -37,3 +46,65 @@
|
||||||
-callback bpapi_meta() -> bpapi_meta().
|
-callback bpapi_meta() -> bpapi_meta().
|
||||||
|
|
||||||
-optional_callbacks([deprecated_since/0]).
|
-optional_callbacks([deprecated_since/0]).
|
||||||
|
|
||||||
|
-spec start() -> ok.
|
||||||
|
start() ->
|
||||||
|
ok = mria:create_table(?TAB, [ {type, set}
|
||||||
|
, {storage, ram_copies}
|
||||||
|
, {attributes, record_info(fields, ?TAB)}
|
||||||
|
, {rlog_shard, ?COMMON_SHARD}
|
||||||
|
]),
|
||||||
|
ok = mria:wait_for_tables([?TAB]),
|
||||||
|
announce(emqx).
|
||||||
|
|
||||||
|
%% @doc Get maximum version of the backplane API supported by the node
|
||||||
|
-spec supported_version(node(), api()) -> api_version().
|
||||||
|
supported_version(Node, API) ->
|
||||||
|
ets:lookup_element(?TAB, {Node, API}, #?TAB.version).
|
||||||
|
|
||||||
|
%% @doc Get maximum version of the backplane API supported by the
|
||||||
|
%% entire cluster
|
||||||
|
-spec supported_version(api()) -> api_version().
|
||||||
|
supported_version(API) ->
|
||||||
|
ets:lookup_element(?TAB, {?multicall, API}, #?TAB.version).
|
||||||
|
|
||||||
|
-spec announce(atom()) -> ok.
|
||||||
|
announce(App) ->
|
||||||
|
{ok, Data} = file:consult(?MODULE:versions_file(App)),
|
||||||
|
{atomic, ok} = mria:transaction(?COMMON_SHARD, fun announce_fun/1, [Data]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec versions_file(atom()) -> file:filename_all().
|
||||||
|
versions_file(App) ->
|
||||||
|
filename:join(code:priv_dir(App), "bpapi.versions").
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec announce_fun([{api(), api_version()}]) -> ok.
|
||||||
|
announce_fun(Data) ->
|
||||||
|
%% Delete old records, if present:
|
||||||
|
MS = ets:fun2ms(fun(#?TAB{key = {node(), API}}) ->
|
||||||
|
{node(), API}
|
||||||
|
end),
|
||||||
|
OldKeys = mnesia:select(?TAB, MS, write),
|
||||||
|
_ = [mnesia:delete({?TAB, Key})
|
||||||
|
|| Key <- OldKeys],
|
||||||
|
%% Insert new records:
|
||||||
|
_ = [mnesia:write(#?TAB{key = {node(), API}, version = Version})
|
||||||
|
|| {API, Version} <- Data],
|
||||||
|
%% Update maximum supported version:
|
||||||
|
[update_minimum(API) || {API, _} <- Data],
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec update_minimum(api()) -> ok.
|
||||||
|
update_minimum(API) ->
|
||||||
|
MS = ets:fun2ms(fun(#?TAB{ key = {N, A}
|
||||||
|
, version = Value
|
||||||
|
}) when N =/= ?multicall,
|
||||||
|
A =:= API ->
|
||||||
|
Value
|
||||||
|
end),
|
||||||
|
MinVersion = lists:min(mnesia:select(?TAB, MS)),
|
||||||
|
mnesia:write(#?TAB{key = {?multicall, API}, version = MinVersion}).
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-ifndef(EMQX_BPAPI_HRL).
|
||||||
|
-define(EMQX_BPAPI_HRL, true).
|
||||||
|
|
||||||
|
-define(TAB, bpapi).
|
||||||
|
|
||||||
|
-define(multicall, multicall).
|
||||||
|
|
||||||
|
-record(?TAB,
|
||||||
|
{ key :: {node() | ?multicall, emqx_bpapi:api()}
|
||||||
|
, version :: emqx_bpapi:api_version()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-endif.
|
|
@ -43,6 +43,7 @@ start(_Type, _Args) ->
|
||||||
ok = maybe_load_config(),
|
ok = maybe_load_config(),
|
||||||
ok = emqx_persistent_session:init_db_backend(),
|
ok = emqx_persistent_session:init_db_backend(),
|
||||||
ok = maybe_start_quicer(),
|
ok = maybe_start_quicer(),
|
||||||
|
ok = emqx_bpapi:start(),
|
||||||
wait_boot_shards(),
|
wait_boot_shards(),
|
||||||
{ok, Sup} = emqx_sup:start_link(),
|
{ok, Sup} = emqx_sup:start_link(),
|
||||||
ok = maybe_start_listeners(),
|
ok = maybe_start_listeners(),
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bpapi_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
-include_lib("emqx/src/bpapi/emqx_bpapi.hrl").
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_common_test_helpers:start_apps([emqx]),
|
||||||
|
[mnesia:dirty_write(Rec) || Rec <- fake_records()],
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
meck:unload(),
|
||||||
|
[mnesia:dirty_delete({?TAB, Key}) || #?TAB{key = Key} <- fake_records()],
|
||||||
|
emqx_bpapi:announce(emqx),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_max_supported_version(_Config) ->
|
||||||
|
?assertMatch(3, emqx_bpapi:supported_version('i-dont-exist2@localhost', api2)),
|
||||||
|
?assertMatch(2, emqx_bpapi:supported_version(api2)).
|
||||||
|
|
||||||
|
t_announce(Config) ->
|
||||||
|
meck:new(emqx_bpapi, [passthrough, no_history]),
|
||||||
|
Filename = filename:join(?config(data_dir, Config), "test.versions"),
|
||||||
|
meck:expect(emqx_bpapi, versions_file, fun(_) -> Filename end),
|
||||||
|
?assertMatch(ok, emqx_bpapi:announce(emqx)),
|
||||||
|
timer:sleep(100),
|
||||||
|
?assertMatch(4, emqx_bpapi:supported_version(node(), api2)),
|
||||||
|
?assertMatch(2, emqx_bpapi:supported_version(node(), api1)),
|
||||||
|
?assertMatch(2, emqx_bpapi:supported_version(api2)),
|
||||||
|
?assertMatch(2, emqx_bpapi:supported_version(api1)).
|
||||||
|
|
||||||
|
fake_records() ->
|
||||||
|
[ #?TAB{key = {'i-dont-exist@localhost', api1}, version = 2}
|
||||||
|
, #?TAB{key = {'i-dont-exist2@localhost', api1}, version = 2}
|
||||||
|
, #?TAB{key = {?multicall, api1}, version = 2}
|
||||||
|
|
||||||
|
, #?TAB{key = {'i-dont-exist@localhost', api2}, version = 2}
|
||||||
|
, #?TAB{key = {'i-dont-exist2@localhost', api2}, version = 3}
|
||||||
|
, #?TAB{key = {?multicall, api2}, version = 2}
|
||||||
|
].
|
|
@ -0,0 +1,2 @@
|
||||||
|
{api1, 2}.
|
||||||
|
{api2, 4}.
|
|
@ -16,7 +16,7 @@
|
||||||
|
|
||||||
-module(emqx_bpapi_static_checks).
|
-module(emqx_bpapi_static_checks).
|
||||||
|
|
||||||
-export([run/0, dump/1, dump/0, check_compat/1]).
|
-export([run/0, dump/1, dump/0, check_compat/1, versions_file/0]).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
@ -206,6 +206,7 @@ dump(Opts) ->
|
||||||
DialyzerDump = collect_signatures(PLT, APIDump),
|
DialyzerDump = collect_signatures(PLT, APIDump),
|
||||||
[Release|_] = string:split(emqx_app:get_release(), "-"),
|
[Release|_] = string:split(emqx_app:get_release(), "-"),
|
||||||
dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}),
|
dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}),
|
||||||
|
dump_versions(APIDump),
|
||||||
xref:stop(?XREF),
|
xref:stop(?XREF),
|
||||||
erase(bpapi_ok).
|
erase(bpapi_ok).
|
||||||
|
|
||||||
|
@ -254,6 +255,18 @@ dump_api(Term = #{api := _, signatures := _, release := Release}) ->
|
||||||
ok = filelib:ensure_dir(Filename),
|
ok = filelib:ensure_dir(Filename),
|
||||||
file:write_file(Filename, io_lib:format("~0p.", [Term])).
|
file:write_file(Filename, io_lib:format("~0p.", [Term])).
|
||||||
|
|
||||||
|
-spec dump_versions(api_dump()) -> ok.
|
||||||
|
dump_versions(APIs) ->
|
||||||
|
Filename = versions_file(),
|
||||||
|
?NOTICE("Dumping API versions to ~p", [Filename]),
|
||||||
|
ok = filelib:ensure_dir(Filename),
|
||||||
|
{ok, FD} = file:open(Filename, [write]),
|
||||||
|
lists:foreach(fun(API) ->
|
||||||
|
ok = io:format(FD, "~p.~n", [API])
|
||||||
|
end,
|
||||||
|
lists:sort(maps:keys(APIs))),
|
||||||
|
file:close(FD).
|
||||||
|
|
||||||
-spec collect_bpapis([mfa()]) -> api_dump().
|
-spec collect_bpapis([mfa()]) -> api_dump().
|
||||||
collect_bpapis(L) ->
|
collect_bpapis(L) ->
|
||||||
Modules = lists:usort([M || {M, _F, _A} <- L]),
|
Modules = lists:usort([M || {M, _F, _A} <- L]),
|
||||||
|
@ -311,7 +324,10 @@ setnok() ->
|
||||||
put(bpapi_ok, false).
|
put(bpapi_ok, false).
|
||||||
|
|
||||||
dumps_dir() ->
|
dumps_dir() ->
|
||||||
filename:join(project_root_dir(), "apps/emqx/test/emqx_bpapi_suite_data").
|
filename:join(project_root_dir(), "apps/emqx/test/emqx_static_checks_data").
|
||||||
|
|
||||||
project_root_dir() ->
|
project_root_dir() ->
|
||||||
string:trim(os:cmd("git rev-parse --show-toplevel")).
|
string:trim(os:cmd("git rev-parse --show-toplevel")).
|
||||||
|
|
||||||
|
versions_file() ->
|
||||||
|
filename:join(project_root_dir(), "apps/emqx/priv/bpapi.versions").
|
||||||
|
|
|
@ -164,6 +164,7 @@ set_special_confs(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
emqx_common_test_helpers:ensure_mnesia_stopped(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
end_per_group(gc_tests, Config) ->
|
end_per_group(gc_tests, Config) ->
|
||||||
|
@ -1130,4 +1131,3 @@ split([H], L1, L2) ->
|
||||||
{[H|L1], L2};
|
{[H|L1], L2};
|
||||||
split([H1, H2|Left], L1, L2) ->
|
split([H1, H2|Left], L1, L2) ->
|
||||||
split(Left, [H1|L1], [H2|L2]).
|
split(Left, [H1|L1], [H2|L2]).
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_bpapi_suite).
|
-module(emqx_static_checks).
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
@ -33,4 +33,13 @@ end_per_suite(_Config) ->
|
||||||
"https://github.com/emqx/emqx/blob/master/apps/emqx/src/bpapi/README.md", []).
|
"https://github.com/emqx/emqx/blob/master/apps/emqx/src/bpapi/README.md", []).
|
||||||
|
|
||||||
t_run_check(_) ->
|
t_run_check(_) ->
|
||||||
?assertMatch(true, emqx_bpapi_static_checks:run()).
|
{ok, OldData} = file:consult(emqx_bpapi_static_checks:versions_file()),
|
||||||
|
?assert(emqx_bpapi_static_checks:run()),
|
||||||
|
{ok, NewData} = file:consult(emqx_bpapi_static_checks:versions_file()),
|
||||||
|
OldData =:= NewData orelse
|
||||||
|
begin
|
||||||
|
?CRITICAL("BPAPI versions were changed, but not committed to the repo.\n"
|
||||||
|
"Run 'make && make static_checks' and then add the changed "
|
||||||
|
"'bpapi.versions' files to the commit.", []),
|
||||||
|
error(version_mismatch)
|
||||||
|
end.
|
Loading…
Reference in New Issue