Merge pull request #13300 from SergeTupchiy/EMQX-12195-cluster-link-conf-backup

cluster link conf backup
This commit is contained in:
SergeTupchiy 2024-06-21 11:42:57 +03:00 committed by GitHub
commit fb266fbf8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 88 additions and 26 deletions

View File

@ -519,7 +519,7 @@ cancel_heartbeat(St = #st{heartbeat_timer = TRef}) ->
%% Responsible for transferring local routing table snapshot to the target
%% cluster. Does so either during the initial startup or when MQTT connection
%% is re-established with a clean session. Once bootstrapping is done, it
%% opens the syncer.
%% activates the syncer.
run_bootstrap(St = #st{target = TargetCluster, actor = ?PS_ACTOR, link_conf = #{topics := Topics}}) ->
case mria_config:whoami() of

View File

@ -55,26 +55,27 @@
-define(META_FILENAME, "META.hocon").
-define(CLUSTER_HOCON_FILENAME, "cluster.hocon").
-define(CONF_KEYS, [
<<"delayed">>,
<<"rewrite">>,
<<"retainer">>,
<<"mqtt">>,
<<"alarm">>,
<<"sysmon">>,
<<"sys_topics">>,
<<"limiter">>,
<<"log">>,
<<"persistent_session_store">>,
<<"durable_sessions">>,
<<"prometheus">>,
<<"crl_cache">>,
<<"conn_congestion">>,
<<"force_shutdown">>,
<<"flapping_detect">>,
<<"broker">>,
<<"force_gc">>,
<<"zones">>,
<<"slow_subs">>
[<<"delayed">>],
[<<"rewrite">>],
[<<"retainer">>],
[<<"mqtt">>],
[<<"alarm">>],
[<<"sysmon">>],
[<<"sys_topics">>],
[<<"limiter">>],
[<<"log">>],
[<<"persistent_session_store">>],
[<<"durable_sessions">>],
[<<"prometheus">>],
[<<"crl_cache">>],
[<<"conn_congestion">>],
[<<"force_shutdown">>],
[<<"flapping_detect">>],
[<<"broker">>],
[<<"force_gc">>],
[<<"zones">>],
[<<"slow_subs">>],
[<<"cluster">>, <<"links">>]
]).
%% emqx_bridge_v2 depends on emqx_connector, so connectors need to be imported first
@ -854,10 +855,10 @@ order(Elem, [_ | T], Order) ->
import_generic_conf(Data) ->
lists:map(
fun(Key) ->
case maps:get(Key, Data, undefined) of
undefined -> {[Key], ok};
Conf -> {[Key], emqx_conf:update([Key], Conf, #{override_to => cluster})}
fun(KeyPath) ->
case emqx_utils_maps:deep_get(KeyPath, Data, undefined) of
undefined -> {[KeyPath], ok};
Conf -> {[KeyPath], emqx_conf:update(KeyPath, Conf, #{override_to => cluster})}
end
end,
?CONF_KEYS

View File

@ -28,6 +28,29 @@
-define(ROLE_API_SUPERUSER, <<"administrator">>).
-define(BOOTSTRAP_BACKUP, "emqx-export-test-bootstrap-ce.tar.gz").
-define(CACERT, <<
"-----BEGIN CERTIFICATE-----\n"
"MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV\n"
"BAYTAkNOMREwDwYDVQQIDAhoYW5nemhvdTEMMAoGA1UECgwDRU1RMQ8wDQYDVQQD\n"
"DAZSb290Q0EwHhcNMjAwNTA4MDgwNjUyWhcNMzAwNTA2MDgwNjUyWjA/MQswCQYD\n"
"VQQGEwJDTjERMA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UE\n"
"AwwGUm9vdENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzcgVLex1\n"
"EZ9ON64EX8v+wcSjzOZpiEOsAOuSXOEN3wb8FKUxCdsGrsJYB7a5VM/Jot25Mod2\n"
"juS3OBMg6r85k2TWjdxUoUs+HiUB/pP/ARaaW6VntpAEokpij/przWMPgJnBF3Ur\n"
"MjtbLayH9hGmpQrI5c2vmHQ2reRZnSFbY+2b8SXZ+3lZZgz9+BaQYWdQWfaUWEHZ\n"
"uDaNiViVO0OT8DRjCuiDp3yYDj3iLWbTA/gDL6Tf5XuHuEwcOQUrd+h0hyIphO8D\n"
"tsrsHZ14j4AWYLk1CPA6pq1HIUvEl2rANx2lVUNv+nt64K/Mr3RnVQd9s8bK+TXQ\n"
"KGHd2Lv/PALYuwIDAQABo1AwTjAdBgNVHQ4EFgQUGBmW+iDzxctWAWxmhgdlE8Pj\n"
"EbQwHwYDVR0jBBgwFoAUGBmW+iDzxctWAWxmhgdlE8PjEbQwDAYDVR0TBAUwAwEB\n"
"/zANBgkqhkiG9w0BAQsFAAOCAQEAGbhRUjpIred4cFAFJ7bbYD9hKu/yzWPWkMRa\n"
"ErlCKHmuYsYk+5d16JQhJaFy6MGXfLgo3KV2itl0d+OWNH0U9ULXcglTxy6+njo5\n"
"CFqdUBPwN1jxhzo9yteDMKF4+AHIxbvCAJa17qcwUKR5MKNvv09C6pvQDJLzid7y\n"
"E2dkgSuggik3oa0427KvctFf8uhOV94RvEDyqvT5+pgNYZ2Yfga9pD/jjpoHEUlo\n"
"88IGU8/wJCx3Ds2yc8+oBg/ynxG8f/HmCC1ET6EHHoe2jlo8FpU/SgGtghS1YL30\n"
"IWxNsPrUP+XsZpBJy/mvOhE5QXo6Y35zDqqj8tI7AGmAWu22jg==\n"
"-----END CERTIFICATE-----"
>>).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -360,6 +383,36 @@ t_bad_config(Config) ->
Res = emqx_mgmt_data_backup:import(BadConfigFileName),
?assertMatch({error, #{kind := validation_error}}, Res).
t_cluster_links(_Config) ->
case emqx_release:edition() of
ce ->
%% Only available in EMQX Enterprise
ok;
ee ->
Link = #{
<<"name">> => <<"emqxcl_backup_test">>,
<<"server">> => <<"emqx.emqxcl_backup_test.host:41883">>,
<<"topics">> => [<<"#">>],
<<"ssl">> => #{<<"enable">> => true, <<"cacertfile">> => ?CACERT}
},
{ok, [RawLink]} = emqx_cluster_link_config:update([Link]),
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
{ok, []} = emqx_cluster_link_config:update([]),
#{<<"ssl">> := #{<<"cacertfile">> := CertPath}} = RawLink,
_ = file:delete(CertPath),
?assertEqual(
{ok, #{db_errors => #{}, config_errors => #{}}},
emqx_mgmt_data_backup:import(FileName)
),
[
#{
<<"name">> := <<"emqxcl_backup_test">>,
<<"ssl">> := #{<<"cacertfile">> := CertPath1}
}
] = emqx:get_raw_config([cluster, links]),
?assertEqual({ok, ?CACERT}, file:read_file(CertPath1))
end.
t_import_on_cluster(Config) ->
%% Randomly chosen config key to verify import result additionally
?assertEqual([], emqx:get_config([authentication])),
@ -483,7 +536,7 @@ t_read_files(_Config) ->
setup(TC, Config) ->
WorkDir = filename:join(emqx_cth_suite:work_dir(TC, Config), local),
Started = emqx_cth_suite:start(apps_to_start(), #{work_dir => WorkDir}),
Started = emqx_cth_suite:start(apps_to_start(TC), #{work_dir => WorkDir}),
[{suite_apps, Started} | Config].
cleanup(Config) ->
@ -530,6 +583,14 @@ create_test_tab(Attributes) ->
]),
ok = mria:wait_for_tables([data_backup_test]).
apps_to_start(t_cluster_links) ->
case emqx_release:edition() of
ee -> apps_to_start() ++ [emqx_cluster_link];
ce -> []
end;
apps_to_start(_TC) ->
apps_to_start().
apps_to_start() ->
[
{emqx, #{override_env => [{boot_modules, [broker]}]}},