Merge pull request #8369 from zhongwencool/sync-authz-certs
fix: sync data's authz and certs
This commit is contained in:
commit
670f83e415
|
@ -5,6 +5,7 @@
|
|||
{emqx_broker,1}.
|
||||
{emqx_cm,1}.
|
||||
{emqx_conf,1}.
|
||||
{emqx_conf,2}.
|
||||
{emqx_dashboard,1}.
|
||||
{emqx_delayed,1}.
|
||||
{emqx_exhook,1}.
|
||||
|
|
|
@ -1,10 +1,21 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"0.1.0",
|
||||
[{load_module,emqx_conf_schema,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_conf,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"0.1.0",
|
||||
[{load_module,emqx_conf_schema,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_conf,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}]}.
|
||||
[ {"0.1.0",
|
||||
[
|
||||
{add_module, emqx_conf_proto_v2},
|
||||
{load_module,emqx_conf_schema,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_conf,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_conf_app,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[ {"0.1.0",
|
||||
[
|
||||
{delete_module, emqx_conf_proto_v2},
|
||||
{load_module,emqx_conf_schema,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_conf,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_conf_app,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
}.
|
||||
|
|
|
@ -62,7 +62,7 @@ get_raw(KeyPath) ->
|
|||
%% @doc Returns all values in the cluster.
|
||||
-spec get_all(emqx_map_lib:config_key_path()) -> #{node() => term()}.
|
||||
get_all(KeyPath) ->
|
||||
{ResL, []} = emqx_conf_proto_v1:get_all(KeyPath),
|
||||
{ResL, []} = emqx_conf_proto_v2:get_all(KeyPath),
|
||||
maps:from_list(ResL).
|
||||
|
||||
%% @doc Returns the specified node's KeyPath, or exception if not found
|
||||
|
@ -70,14 +70,14 @@ get_all(KeyPath) ->
|
|||
get_by_node(Node, KeyPath) when Node =:= node() ->
|
||||
emqx:get_config(KeyPath);
|
||||
get_by_node(Node, KeyPath) ->
|
||||
emqx_conf_proto_v1:get_config(Node, KeyPath).
|
||||
emqx_conf_proto_v2:get_config(Node, KeyPath).
|
||||
|
||||
%% @doc Returns the specified node's KeyPath, or the default value if not found
|
||||
-spec get_by_node(node(), emqx_map_lib:config_key_path(), term()) -> term().
|
||||
get_by_node(Node, KeyPath, Default) when Node =:= node() ->
|
||||
emqx:get_config(KeyPath, Default);
|
||||
get_by_node(Node, KeyPath, Default) ->
|
||||
emqx_conf_proto_v1:get_config(Node, KeyPath, Default).
|
||||
emqx_conf_proto_v2:get_config(Node, KeyPath, Default).
|
||||
|
||||
%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
|
||||
-spec get_node_and_config(emqx_map_lib:config_key_path()) -> term().
|
||||
|
@ -92,7 +92,7 @@ get_node_and_config(KeyPath) ->
|
|||
) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
update(KeyPath, UpdateReq, Opts) ->
|
||||
emqx_conf_proto_v1:update(KeyPath, UpdateReq, Opts).
|
||||
emqx_conf_proto_v2:update(KeyPath, UpdateReq, Opts).
|
||||
|
||||
%% @doc Update the specified node's key path in local-override.conf.
|
||||
-spec update(
|
||||
|
@ -105,13 +105,13 @@ update(KeyPath, UpdateReq, Opts) ->
|
|||
update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
|
||||
emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
|
||||
update(Node, KeyPath, UpdateReq, Opts) ->
|
||||
emqx_conf_proto_v1:update(Node, KeyPath, UpdateReq, Opts).
|
||||
emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts).
|
||||
|
||||
%% @doc remove all value of key path in cluster-override.conf or local-override.conf.
|
||||
-spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
remove(KeyPath, Opts) ->
|
||||
emqx_conf_proto_v1:remove_config(KeyPath, Opts).
|
||||
emqx_conf_proto_v2:remove_config(KeyPath, Opts).
|
||||
|
||||
%% @doc remove the specified node's key path in local-override.conf.
|
||||
-spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
|
||||
|
@ -119,13 +119,13 @@ remove(KeyPath, Opts) ->
|
|||
remove(Node, KeyPath, Opts) when Node =:= node() ->
|
||||
emqx:remove_config(KeyPath, Opts#{override_to => local});
|
||||
remove(Node, KeyPath, Opts) ->
|
||||
emqx_conf_proto_v1:remove_config(Node, KeyPath, Opts).
|
||||
emqx_conf_proto_v2:remove_config(Node, KeyPath, Opts).
|
||||
|
||||
%% @doc reset all value of key path in cluster-override.conf or local-override.conf.
|
||||
-spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
reset(KeyPath, Opts) ->
|
||||
emqx_conf_proto_v1:reset(KeyPath, Opts).
|
||||
emqx_conf_proto_v2:reset(KeyPath, Opts).
|
||||
|
||||
%% @doc reset the specified node's key path in local-override.conf.
|
||||
-spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
|
||||
|
@ -133,7 +133,7 @@ reset(KeyPath, Opts) ->
|
|||
reset(Node, KeyPath, Opts) when Node =:= node() ->
|
||||
emqx:reset_config(KeyPath, Opts#{override_to => local});
|
||||
reset(Node, KeyPath, Opts) ->
|
||||
emqx_conf_proto_v1:reset(Node, KeyPath, Opts).
|
||||
emqx_conf_proto_v2:reset(Node, KeyPath, Opts).
|
||||
|
||||
%% @doc Called from build script.
|
||||
-spec dump_schema(file:name_all()) -> ok.
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
-export([start/2, stop/1]).
|
||||
-export([get_override_config_file/0]).
|
||||
-export([sync_data_from_node/0]).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include("emqx_conf.hrl").
|
||||
|
@ -56,6 +57,16 @@ get_override_config_file() ->
|
|||
end
|
||||
end.
|
||||
|
||||
sync_data_from_node() ->
|
||||
Dir = emqx:data_dir(),
|
||||
TargetDirs = lists:filter(fun(Type) -> filelib:is_dir(filename:join(Dir, Type)) end, [
|
||||
"authz", "certs"
|
||||
]),
|
||||
{ok, Zip} = zip:zip(atom_to_list(node()) ++ "_data.zip", TargetDirs, [{cwd, Dir}]),
|
||||
Res = {ok, _Bin} = file:read_file(Zip),
|
||||
_ = file:delete(Zip),
|
||||
Res.
|
||||
|
||||
%% ------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%% ------------------------------------------------------------------------------
|
||||
|
@ -86,7 +97,7 @@ copy_override_conf_from_core_node() ->
|
|||
?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}),
|
||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||
Nodes ->
|
||||
{Results, Failed} = emqx_conf_proto_v1:get_override_config_file(Nodes),
|
||||
{Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
|
||||
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
||||
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
|
||||
case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of
|
||||
|
@ -150,6 +161,7 @@ copy_override_conf_from_core_node() ->
|
|||
RawOverrideConf,
|
||||
#{override_to => cluster}
|
||||
),
|
||||
ok = sync_data_from_node(Node),
|
||||
{ok, TnxId}
|
||||
end
|
||||
end.
|
||||
|
@ -173,3 +185,14 @@ conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clo
|
|||
W1 > W2;
|
||||
conf_sort({ok, _}, {ok, _}) ->
|
||||
false.
|
||||
|
||||
sync_data_from_node(Node) ->
|
||||
case emqx_conf_proto_v2:sync_data_from_node(Node) of
|
||||
{ok, DataBin} ->
|
||||
{ok, Files} = zip:unzip(DataBin, [{cwd, emqx:data_dir()}]),
|
||||
?SLOG(debug, #{node => Node, msg => "sync_data_from_node_ok", files => Files}),
|
||||
ok;
|
||||
Error ->
|
||||
?SLOG(emergency, #{node => Node, msg => "sync_data_from_node_failed", reason => Error}),
|
||||
error(Error)
|
||||
end.
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_conf_proto_v2).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
sync_data_from_node/1,
|
||||
get_config/2,
|
||||
get_config/3,
|
||||
get_all/1,
|
||||
|
||||
update/3,
|
||||
update/4,
|
||||
remove_config/2,
|
||||
remove_config/3,
|
||||
|
||||
reset/2,
|
||||
reset/3,
|
||||
|
||||
get_override_config_file/1
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.0.1".
|
||||
|
||||
-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
||||
sync_data_from_node(Node) ->
|
||||
rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).
|
||||
-type update_config_key_path() :: [emqx_map_lib:config_key(), ...].
|
||||
|
||||
-spec get_config(node(), emqx_map_lib:config_key_path()) ->
|
||||
term() | emqx_rpc:badrpc().
|
||||
get_config(Node, KeyPath) ->
|
||||
rpc:call(Node, emqx, get_config, [KeyPath]).
|
||||
|
||||
-spec get_config(node(), emqx_map_lib:config_key_path(), _Default) ->
|
||||
term() | emqx_rpc:badrpc().
|
||||
get_config(Node, KeyPath, Default) ->
|
||||
rpc:call(Node, emqx, get_config, [KeyPath, Default]).
|
||||
|
||||
-spec get_all(emqx_map_lib:config_key_path()) -> emqx_rpc:multicall_result().
|
||||
get_all(KeyPath) ->
|
||||
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
|
||||
|
||||
-spec update(
|
||||
update_config_key_path(),
|
||||
emqx_config:update_request(),
|
||||
emqx_config:update_opts()
|
||||
) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
update(KeyPath, UpdateReq, Opts) ->
|
||||
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
|
||||
|
||||
-spec update(
|
||||
node(),
|
||||
update_config_key_path(),
|
||||
emqx_config:update_request(),
|
||||
emqx_config:update_opts()
|
||||
) ->
|
||||
{ok, emqx_config:update_result()}
|
||||
| {error, emqx_config:update_error()}
|
||||
| emqx_rpc:badrpc().
|
||||
update(Node, KeyPath, UpdateReq, Opts) ->
|
||||
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
|
||||
|
||||
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
remove_config(KeyPath, Opts) ->
|
||||
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
|
||||
|
||||
-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()}
|
||||
| {error, emqx_config:update_error()}
|
||||
| emqx_rpc:badrpc().
|
||||
remove_config(Node, KeyPath, Opts) ->
|
||||
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
|
||||
|
||||
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
reset(KeyPath, Opts) ->
|
||||
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
|
||||
|
||||
-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
|
||||
{ok, emqx_config:update_result()}
|
||||
| {error, emqx_config:update_error()}
|
||||
| emqx_rpc:badrpc().
|
||||
reset(Node, KeyPath, Opts) ->
|
||||
rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).
|
||||
|
||||
-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result().
|
||||
get_override_config_file(Nodes) ->
|
||||
rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000).
|
Loading…
Reference in New Issue