Merge pull request #12826 from emqx/import-source-bridges
fix: source bridges missing after restore the backup files
This commit is contained in:
commit
e11c4a9c83
|
@ -16,9 +16,14 @@
|
|||
|
||||
-module(emqx_config_backup).
|
||||
|
||||
-type ok_result() :: #{
|
||||
root_key => emqx_utils_maps:config_key(),
|
||||
changed => [emqx_utils_maps:config_key_path()]
|
||||
}.
|
||||
|
||||
-type error_result() :: #{root_key => emqx_utils_maps:config_key(), reason => term()}.
|
||||
|
||||
-callback import_config(RawConf :: map()) ->
|
||||
{ok, #{
|
||||
root_key => emqx_utils_maps:config_key(),
|
||||
changed => [emqx_utils_maps:config_key_path()]
|
||||
}}
|
||||
| {error, #{root_key => emqx_utils_maps:config_key(), reason => term()}}.
|
||||
{ok, ok_result()}
|
||||
| {error, error_result()}
|
||||
| {results, {[ok_result()], [error_result()]}}.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_bridge, [
|
||||
{description, "EMQX bridges"},
|
||||
{vsn, "0.1.34"},
|
||||
{vsn, "0.1.35"},
|
||||
{registered, [emqx_bridge_sup]},
|
||||
{mod, {emqx_bridge_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -1030,7 +1030,26 @@ bridge_v2_type_to_connector_type(Type) ->
|
|||
|
||||
import_config(RawConf) ->
|
||||
%% actions structure
|
||||
emqx_bridge:import_config(RawConf, <<"actions">>, ?ROOT_KEY_ACTIONS, config_key_path()).
|
||||
ActionRes = emqx_bridge:import_config(
|
||||
RawConf, <<"actions">>, ?ROOT_KEY_ACTIONS, config_key_path()
|
||||
),
|
||||
SourceRes = emqx_bridge:import_config(
|
||||
RawConf, <<"sources">>, ?ROOT_KEY_SOURCES, config_key_path_sources()
|
||||
),
|
||||
group_import_results([ActionRes, SourceRes]).
|
||||
|
||||
group_import_results(Results0) ->
|
||||
Results = lists:foldr(
|
||||
fun
|
||||
({ok, OkRes}, {OkAcc, ErrAcc}) ->
|
||||
{[OkRes | OkAcc], ErrAcc};
|
||||
({error, ErrRes}, {OkAcc, ErrAcc}) ->
|
||||
{OkAcc, [ErrRes | ErrAcc]}
|
||||
end,
|
||||
{[], []},
|
||||
Results0
|
||||
),
|
||||
{results, Results}.
|
||||
|
||||
%%====================================================================
|
||||
%% Config Update Handler API
|
||||
|
|
|
@ -773,23 +773,42 @@ validate_cluster_hocon(RawConf) ->
|
|||
do_import_conf(RawConf, Opts) ->
|
||||
GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
|
||||
maybe_print_conf_errors(GenConfErrs, Opts),
|
||||
Errors =
|
||||
lists:foldl(
|
||||
fun(Module, ErrorsAcc) ->
|
||||
case Module:import_config(RawConf) of
|
||||
{ok, #{changed := Changed}} ->
|
||||
maybe_print_changed(Changed, Opts),
|
||||
ErrorsAcc;
|
||||
{error, #{root_key := RootKey, reason := Reason}} ->
|
||||
ErrorsAcc#{[RootKey] => Reason}
|
||||
end
|
||||
end,
|
||||
GenConfErrs,
|
||||
sort_importer_modules(find_behaviours(emqx_config_backup))
|
||||
),
|
||||
Modules = sort_importer_modules(find_behaviours(emqx_config_backup)),
|
||||
Errors = lists:foldl(print_ok_results_collect_errors(RawConf, Opts), GenConfErrs, Modules),
|
||||
maybe_print_conf_errors(Errors, Opts),
|
||||
Errors.
|
||||
|
||||
print_ok_results_collect_errors(RawConf, Opts) ->
|
||||
fun(Module, Errors) ->
|
||||
case Module:import_config(RawConf) of
|
||||
{results, {OkResults, ErrResults}} ->
|
||||
print_ok_results(OkResults, Opts),
|
||||
collect_errors(ErrResults, Errors);
|
||||
{ok, OkResult} ->
|
||||
print_ok_results([OkResult], Opts),
|
||||
Errors;
|
||||
{error, ErrResult} ->
|
||||
collect_errors([ErrResult], Errors)
|
||||
end
|
||||
end.
|
||||
|
||||
print_ok_results(Results, Opts) ->
|
||||
lists:foreach(
|
||||
fun(#{changed := Changed}) ->
|
||||
maybe_print_changed(Changed, Opts)
|
||||
end,
|
||||
Results
|
||||
).
|
||||
|
||||
collect_errors(Results, Errors) ->
|
||||
lists:foldr(
|
||||
fun(#{root_key := RootKey, reason := Reason}, Acc) ->
|
||||
Acc#{[RootKey] => Reason}
|
||||
end,
|
||||
Errors,
|
||||
Results
|
||||
).
|
||||
|
||||
sort_importer_modules(Modules) ->
|
||||
lists:sort(
|
||||
fun(M1, M2) -> order(M1, ?IMPORT_ORDER) =< order(M2, ?IMPORT_ORDER) end,
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
@ -86,6 +87,33 @@ t_empty_export_import(_Config) ->
|
|||
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
|
||||
?assertEqual(ExpRawConf, emqx:get_raw_config([])).
|
||||
|
||||
t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) ->
|
||||
case emqx_release:edition() of
|
||||
ce ->
|
||||
ok;
|
||||
ee ->
|
||||
FNameEmqx44 = "emqx-export-4.4.24-retainer-mqttsub.tar.gz",
|
||||
BackupFile = filename:join(?config(data_dir, Config), FNameEmqx44),
|
||||
Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
|
||||
?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)),
|
||||
RawConfAfterImport = emqx:get_raw_config([]),
|
||||
%% verify that MQTT sources are imported
|
||||
?assertMatch(
|
||||
#{<<"sources">> := #{<<"mqtt">> := Sources}} when map_size(Sources) > 0,
|
||||
RawConfAfterImport
|
||||
),
|
||||
%% verify that retainer messages are imported
|
||||
?assertMatch(
|
||||
{ok, [#message{payload = <<"test-payload">>}]},
|
||||
emqx_retainer:read_message(<<"test-retained-message/1">>)
|
||||
),
|
||||
%% Export and import again
|
||||
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
|
||||
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
|
||||
?assertEqual(RawConfAfterImport, emqx:get_raw_config([]))
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_cluster_hocon_export_import(Config) ->
|
||||
RawConfBeforeImport = emqx:get_raw_config([]),
|
||||
BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),
|
||||
|
|
Binary file not shown.
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_retainer, [
|
||||
{description, "EMQX Retainer"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.21"},
|
||||
{vsn, "5.0.22"},
|
||||
{modules, []},
|
||||
{registered, [emqx_retainer_sup]},
|
||||
{applications, [kernel, stdlib, emqx, emqx_ctl]},
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-module(emqx_retainer_mnesia).
|
||||
|
||||
-behaviour(emqx_retainer).
|
||||
-behaviour(emqx_db_backup).
|
||||
|
||||
-include("emqx_retainer.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
@ -54,6 +55,8 @@
|
|||
-export([populate_index_meta/0]).
|
||||
-export([reindex/3]).
|
||||
|
||||
-export([backup_tables/0]).
|
||||
|
||||
-record(retained_message, {topic, msg, expiry_time}).
|
||||
-record(retained_index, {key, expiry_time}).
|
||||
-record(retained_index_meta, {key, read_indices, write_indices, reindexing, extra}).
|
||||
|
@ -73,6 +76,12 @@
|
|||
topics() ->
|
||||
[emqx_topic:join(I) || I <- mnesia:dirty_all_keys(?TAB_MESSAGE)].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Data backup
|
||||
%%--------------------------------------------------------------------
|
||||
backup_tables() ->
|
||||
[?TAB_MESSAGE].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% emqx_retainer callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
Fixed an issue that prevented importing source data integrations and retained messages.
|
||||
|
||||
Before the fix:
|
||||
|
||||
- source data integrations are ignored from the backup file
|
||||
- importing the `mnesia` table for retained messages are not supported
|
|
@ -101,6 +101,10 @@ matrix() {
|
|||
entries+=("$(format_app_entry "$app" 1 emqx "$runner")")
|
||||
entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")")
|
||||
;;
|
||||
apps/emqx_management)
|
||||
entries+=("$(format_app_entry "$app" 1 emqx "$runner")")
|
||||
entries+=("$(format_app_entry "$app" 1 emqx-enterprise "$runner")")
|
||||
;;
|
||||
apps/*)
|
||||
if [[ -f "${app}/BSL.txt" ]]; then
|
||||
profile='emqx-enterprise'
|
||||
|
|
Loading…
Reference in New Issue