From 033f8619ebd1573ec3ea5fa1f67692185abda5bd Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Thu, 25 Feb 2021 14:07:42 +0800 Subject: [PATCH] chore(emqx_management): abstract data backup --- lib-ce/emqx_management/src/emqx_mgmt.erl | 408 ------------- .../src/emqx_mgmt_api_data.erl | 75 +-- lib-ce/emqx_management/src/emqx_mgmt_cli.erl | 97 +-- .../src/emqx_mgmt_data_backup.erl | 557 ++++++++++++++++++ 4 files changed, 571 insertions(+), 566 deletions(-) create mode 100644 lib-ce/emqx_management/src/emqx_mgmt_data_backup.erl diff --git a/lib-ce/emqx_management/src/emqx_mgmt.erl b/lib-ce/emqx_management/src/emqx_mgmt.erl index 19474e5a1..e7d896c6a 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt.erl @@ -36,15 +36,6 @@ %% Metrics and Stats -export([ get_metrics/0 , get_metrics/1 - , get_all_topic_metrics/0 - , get_topic_metrics/1 - , get_topic_metrics/2 - , register_topic_metrics/1 - , register_topic_metrics/2 - , unregister_topic_metrics/1 - , unregister_topic_metrics/2 - , unregister_all_topic_metrics/0 - , unregister_all_topic_metrics/1 , get_stats/0 , get_stats/1 ]). @@ -91,14 +82,6 @@ , reload_plugin/2 ]). -%% Modules --export([ list_modules/0 - , list_modules/1 - , load_module/2 - , unload_module/2 - , reload_module/2 - ]). - %% Listeners -export([ list_listeners/0 , list_listeners/1 @@ -118,26 +101,6 @@ , delete_banned/1 ]). -%% Export/Import --export([ export_rules/0 - , export_resources/0 - , export_blacklist/0 - , export_applications/0 - , export_users/0 - , export_auth_mnesia/0 - , export_acl_mnesia/0 - , import_rules/1 - , import_resources/1 - , import_resources_and_rules/3 - , import_blacklist/1 - , import_applications/1 - , import_users/1 - , import_auth_clientid/1 %% BACKW: 4.1.x - , import_auth_username/1 %% BACKW: 4.1.x - , import_auth_mnesia/2 - , import_acl_mnesia/2 - , to_version/1 - ]). -export([ enable_telemetry/0 , disable_telemetry/0 @@ -217,73 +180,6 @@ get_metrics(Node) when Node =:= node() -> get_metrics(Node) -> rpc_call(Node, get_metrics, [Node]). -get_all_topic_metrics() -> - lists:foldl(fun(Topic, Acc) -> - case get_topic_metrics(Topic) of - {error, _Reason} -> - Acc; - Metrics -> - [#{topic => Topic, metrics => Metrics} | Acc] - end - end, [], emqx_mod_topic_metrics:all_registered_topics()). - -get_topic_metrics(Topic) -> - lists:foldl(fun(Node, Acc) -> - case get_topic_metrics(Node, Topic) of - {error, _Reason} -> - Acc; - Metrics -> - case Acc of - [] -> Metrics; - _ -> - lists:foldl(fun({K, V}, Acc0) -> - [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0] - end, [], Acc) - end - end - end, [], ekka_mnesia:running_nodes()). - -get_topic_metrics(Node, Topic) when Node =:= node() -> - emqx_mod_topic_metrics:metrics(Topic); -get_topic_metrics(Node, Topic) -> - rpc_call(Node, get_topic_metrics, [Node, Topic]). - -register_topic_metrics(Topic) -> - Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of - true -> ok; - false -> lists:last(Results) - end. - -register_topic_metrics(Node, Topic) when Node =:= node() -> - emqx_mod_topic_metrics:register(Topic); -register_topic_metrics(Node, Topic) -> - rpc_call(Node, register_topic_metrics, [Node, Topic]). - -unregister_topic_metrics(Topic) -> - Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of - true -> ok; - false -> lists:last(Results) - end. - -unregister_topic_metrics(Node, Topic) when Node =:= node() -> - emqx_mod_topic_metrics:unregister(Topic); -unregister_topic_metrics(Node, Topic) -> - rpc_call(Node, unregister_topic_metrics, [Node, Topic]). - -unregister_all_topic_metrics() -> - Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of - true -> ok; - false -> lists:last(Results) - end. - -unregister_all_topic_metrics(Node) when Node =:= node() -> - emqx_mod_topic_metrics:unregister_all(); -unregister_all_topic_metrics(Node) -> - rpc_call(Node, unregister_topic_metrics, [Node]). - get_stats() -> [{Node, get_stats(Node)} || Node <- ekka_mnesia:running_nodes()]. @@ -504,33 +400,6 @@ reload_plugin(Node, Plugin) when Node =:= node() -> reload_plugin(Node, Plugin) -> rpc_call(Node, reload_plugin, [Node, Plugin]). - -%%-------------------------------------------------------------------- -%% Modules -%%-------------------------------------------------------------------- - -list_modules() -> - [{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()]. - -list_modules(Node) when Node =:= node() -> - emqx_modules:list(); -list_modules(Node) -> - rpc_call(Node, list_modules, [Node]). - -load_module(Node, Module) when Node =:= node() -> - emqx_modules:load(Module); -load_module(Node, Module) -> - rpc_call(Node, load_module, [Node, Module]). - -unload_module(Node, Module) when Node =:= node() -> - emqx_modules:unload(Module); -unload_module(Node, Module) -> - rpc_call(Node, unload_module, [Node, Module]). - -reload_module(Node, Module) when Node =:= node() -> - emqx_modules:reload(Module); -reload_module(Node, Module) -> - rpc_call(Node, reload_module, [Node, Module]). %%-------------------------------------------------------------------- %% Listeners %%-------------------------------------------------------------------- @@ -613,282 +482,6 @@ create_banned(Banned) -> delete_banned(Who) -> emqx_banned:delete(Who). -%%-------------------------------------------------------------------- -%% Data Export and Import -%%-------------------------------------------------------------------- - -export_rules() -> - lists:map(fun({_, RuleId, _, RawSQL, _, _, _, _, _, _, Actions, Enabled, Desc}) -> - [{id, RuleId}, - {rawsql, RawSQL}, - {actions, actions_to_prop_list(Actions)}, - {enabled, Enabled}, - {description, Desc}] - end, emqx_rule_registry:get_rules()). - -export_resources() -> - lists:map(fun({_, Id, Type, Config, CreatedAt, Desc}) -> - NCreatedAt = case CreatedAt of - undefined -> null; - _ -> CreatedAt - end, - [{id, Id}, - {type, Type}, - {config, maps:to_list(Config)}, - {created_at, NCreatedAt}, - {description, Desc}] - end, emqx_rule_registry:get_resources()). - -export_blacklist() -> - lists:map(fun(#banned{who = Who, by = By, reason = Reason, at = At, until = Until}) -> - NWho = case Who of - {peerhost, Peerhost} -> {peerhost, inet:ntoa(Peerhost)}; - _ -> Who - end, - [{who, [NWho]}, {by, By}, {reason, Reason}, {at, At}, {until, Until}] - end, ets:tab2list(emqx_banned)). - -export_applications() -> - lists:map(fun({_, AppID, AppSecret, Name, Desc, Status, Expired}) -> - [{id, AppID}, {secret, AppSecret}, {name, Name}, {desc, Desc}, {status, Status}, {expired, Expired}] - end, ets:tab2list(mqtt_app)). - -export_users() -> - lists:map(fun({_, Username, Password, Tags}) -> - [{username, Username}, {password, base64:encode(Password)}, {tags, Tags}] - end, ets:tab2list(mqtt_admin)). - -export_auth_mnesia() -> - case ets:info(emqx_user) of - undefined -> []; - _ -> - lists:map(fun({_, {Type, Login}, Password, CreatedAt}) -> - [{login, Login}, {type, Type}, {password, base64:encode(Password)}, {created_at, CreatedAt}] - end, ets:tab2list(emqx_user)) - end. - -export_acl_mnesia() -> - case ets:info(emqx_acl) of - undefined -> []; - _ -> - lists:map(fun({_, Filter, Action, Access, CreatedAt}) -> - Filter1 = case Filter of - {{Type, TypeValue}, Topic} -> - [{type, Type}, {type_value, TypeValue}, {topic, Topic}]; - {Type, Topic} -> - [{type, Type}, {topic, Topic}] - end, - Filter1 ++ [{action, Action}, {access, Access}, {created_at, CreatedAt}] - end, ets:tab2list(emqx_acl)) - end. - -import_rules(Rules) -> - lists:foreach(fun(Rule) -> - import_rule(Rule) - end, Rules). - -import_resources(Reources) -> - lists:foreach(fun(Resource) -> - import_resource(Resource) - end, Reources). - -import_rule(#{<<"id">> := RuleId, - <<"rawsql">> := RawSQL, - <<"actions">> := Actions, - <<"enabled">> := Enabled, - <<"description">> := Desc}) -> - Rule = #{id => RuleId, - rawsql => RawSQL, - actions => map_to_actions(Actions), - enabled => Enabled, - description => Desc}, - try emqx_rule_engine:create_rule(Rule) - catch throw:{resource_not_initialized, _ResId} -> - emqx_rule_engine:create_rule(Rule#{enabled => false}) - end. - -import_resource(#{<<"id">> := Id, - <<"type">> := Type, - <<"config">> := Config, - <<"created_at">> := CreatedAt, - <<"description">> := Desc}) -> - NCreatedAt = case CreatedAt of - null -> undefined; - _ -> CreatedAt - end, - emqx_rule_engine:create_resource(#{id => Id, - type => any_to_atom(Type), - config => Config, - created_at => NCreatedAt, - description => Desc}). - -import_resources_and_rules(Resources, Rules, FromVersion) - when FromVersion =:= "4.0" orelse FromVersion =:= "4.1" orelse FromVersion =:= "4.2" -> - Configs = lists:foldl(fun(#{<<"id">> := ID, - <<"type">> := <<"web_hook">>, - <<"config">> := #{<<"content_type">> := ContentType, - <<"headers">> := Headers, - <<"method">> := Method, - <<"url">> := URL}} = Resource, Acc) -> - NConfig = #{<<"connect_timeout">> => 5, - <<"request_timeout">> => 5, - <<"cacertfile">> => <<>>, - <<"certfile">> => <<>>, - <<"keyfile">> => <<>>, - <<"pool_size">> => 8, - <<"url">> => URL, - <<"verify">> => true}, - NResource = Resource#{<<"config">> := NConfig}, - import_resource(NResource), - NHeaders = maps:put(<<"content-type">>, ContentType, Headers), - [{ID, #{headers => NHeaders, method => Method}} | Acc]; - (Resource, Acc) -> - import_resource(Resource), - Acc - end, [], Resources), - lists:foreach(fun(#{<<"actions">> := Actions} = Rule) -> - NActions = apply_new_config(Actions, Configs), - import_rule(Rule#{<<"actions">> := NActions}) - end, Rules); -import_resources_and_rules(Resources, Rules, _FromVersion) -> - import_resources(Resources), - import_rules(Rules). - -apply_new_config(Actions, Configs) -> - apply_new_config(Actions, Configs, []). - -apply_new_config([], _Configs, Acc) -> - Acc; -apply_new_config([Action = #{<<"name">> := <<"data_to_webserver">>, - <<"args">> := #{<<"$resource">> := ID, - <<"path">> := Path, - <<"payload_tmpl">> := PayloadTmpl}} | More], Configs, Acc) -> - case proplists:get_value(ID, Configs, undefined) of - undefined -> - apply_new_config(More, Configs, [Action | Acc]); - #{headers := Headers, method := Method} -> - Args = #{<<"$resource">> => ID, - <<"body">> => PayloadTmpl, - <<"headers">> => Headers, - <<"method">> => Method, - <<"path">> => Path}, - apply_new_config(More, Configs, [Action#{<<"args">> := Args} | Acc]) - end. - -import_blacklist(Blacklist) -> - lists:foreach(fun(#{<<"who">> := Who, - <<"by">> := By, - <<"reason">> := Reason, - <<"at">> := At, - <<"until">> := Until}) -> - NWho = case Who of - #{<<"peerhost">> := Peerhost} -> - {ok, NPeerhost} = inet:parse_address(Peerhost), - {peerhost, NPeerhost}; - #{<<"clientid">> := ClientId} -> {clientid, ClientId}; - #{<<"username">> := Username} -> {username, Username} - end, - emqx_banned:create(#banned{who = NWho, by = By, reason = Reason, at = At, until = Until}) - end, Blacklist). - -import_applications(Apps) -> - lists:foreach(fun(#{<<"id">> := AppID, - <<"secret">> := AppSecret, - <<"name">> := Name, - <<"desc">> := Desc, - <<"status">> := Status, - <<"expired">> := Expired}) -> - NExpired = case is_integer(Expired) of - true -> Expired; - false -> undefined - end, - emqx_mgmt_auth:force_add_app(AppID, Name, AppSecret, Desc, Status, NExpired) - end, Apps). - -import_users(Users) -> - lists:foreach(fun(#{<<"username">> := Username, - <<"password">> := Password, - <<"tags">> := Tags}) -> - NPassword = base64:decode(Password), - emqx_dashboard_admin:force_add_user(Username, NPassword, Tags) - end, Users). - -import_auth_clientid(Lists) -> - case ets:info(emqx_user) of - undefined -> ok; - _ -> - [ mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)}) - || #{<<"clientid">> := Clientid, <<"password">> := Password} <- Lists ] - end. - -import_auth_username(Lists) -> - case ets:info(emqx_user) of - undefined -> ok; - _ -> - [ mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)}) - || #{<<"username">> := Username, <<"password">> := Password} <- Lists ] - end. - -import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse - FromVersion =:= "4.1" -> - case ets:info(emqx_user) of - undefined -> ok; - _ -> - CreatedAt = erlang:system_time(millisecond), - [ begin - mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt}) - end - || #{<<"login">> := Login, - <<"password">> := Password} <- Auths ] - - end; - -import_auth_mnesia(Auths, _) -> - case ets:info(emqx_user) of - undefined -> ok; - _ -> - [ mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt}) - || #{<<"login">> := Login, - <<"type">> := Type, - <<"password">> := Password, - <<"created_at">> := CreatedAt } <- Auths ] - end. - -import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse - FromVersion =:= "4.1" -> - case ets:info(emqx_acl) of - undefined -> ok; - _ -> - CreatedAt = erlang:system_time(millisecond), - [begin - Allow1 = case any_to_atom(Allow) of - true -> allow; - false -> deny - end, - mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt}) - end || #{<<"login">> := Login, - <<"topic">> := Topic, - <<"allow">> := Allow, - <<"action">> := Action} <- Acls] - end; - -import_acl_mnesia(Acls, _) -> - case ets:info(emqx_acl) of - undefined -> ok; - _ -> - [ begin - Filter = case maps:get(<<"type_value">>, Map, undefined) of - undefined -> - {any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)}; - Value -> - {{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)} - end, - mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt}) - end - || Map = #{<<"action">> := Action, - <<"access">> := Access, - <<"created_at">> := CreatedAt} <- Acls ] - end. any_to_atom(L) when is_list(L) -> list_to_atom(L); any_to_atom(B) when is_binary(B) -> binary_to_atom(B, utf8); @@ -985,4 +578,3 @@ action_to_prop_list({action_instance, ActionInstId, Name, FallbackActions, Args} {name, Name}, {fallbacks, actions_to_prop_list(FallbackActions)}, {args, Args}]. - diff --git a/lib-ce/emqx_management/src/emqx_mgmt_api_data.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_data.erl index 723824d99..d3d22e4d1 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_api_data.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_api_data.erl @@ -75,48 +75,8 @@ ]). export(_Bindings, _Params) -> - Rules = emqx_mgmt:export_rules(), - Resources = emqx_mgmt:export_resources(), - Blacklist = emqx_mgmt:export_blacklist(), - Apps = emqx_mgmt:export_applications(), - Users = emqx_mgmt:export_users(), - AuthMnesia = emqx_mgmt:export_auth_mnesia(), - AclMnesia = emqx_mgmt:export_acl_mnesia(), - Seconds = erlang:system_time(second), - {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), - Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), - NFilename = filename:join([emqx:get_env(data_dir), Filename]), - Version = string:sub_string(emqx_sys:version(), 1, 3), - Data = [{version, erlang:list_to_binary(Version)}, - {date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}, - {rules, Rules}, - {resources, Resources}, - {blacklist, Blacklist}, - {apps, Apps}, - {users, Users}, - {auth_mnesia, AuthMnesia}, - {acl_mnesia, AclMnesia} - ], - - Bin = emqx_json:encode(Data), - ok = filelib:ensure_dir(NFilename), - case file:write_file(NFilename, Bin) of - ok -> - case file:read_file_info(NFilename) of - {ok, #file_info{size = Size, ctime = {{Y0, M0, D0}, {H0, MM0, S0}}}} -> - CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y0, M0, D0, H0, MM0, S0]), - return({ok, [{filename, list_to_binary(Filename)}, - {size, Size}, - {created_at, list_to_binary(CreatedAt)}, - {node, node()} - ]}); - {error, Reason} -> - return({error, Reason}) - end; - {error, Reason} -> - return({error, Reason}) - end. - + return(emqx_mgmt_data_backup:export()). + list_exported(_Bindings, _Params) -> List = [ rpc:call(Node, ?MODULE, get_list_exported, []) || Node <- ekka_mnesia:running_nodes() ], NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, lists:append(List))), @@ -158,7 +118,7 @@ import(_Bindings, Params) -> case lists:member(Node, [ erlang:atom_to_binary(N, utf8) || N <- ekka_mnesia:running_nodes() ] ) of - true -> rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename]); + true -> return(rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename])); false -> return({error, no_existent_node}) end end, @@ -167,34 +127,7 @@ import(_Bindings, Params) -> do_import(Filename) -> FullFilename = filename:join([emqx:get_env(data_dir), Filename]), - case file:read_file(FullFilename) of - {ok, Json} -> - Data = emqx_json:decode(Json, [return_maps]), - Version = emqx_mgmt:to_version(maps:get(<<"version">>, Data)), - case lists:member(Version, ?VERSIONS) of - true -> - try - emqx_mgmt:import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version), - emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])), - emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])), - emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])), - _ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), - _ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])), - _ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), - _ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), - logger:debug("The emqx data has been imported successfully"), - ok - catch Class:Reason:Stack -> - logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]), - {error, import_failed} - end; - false -> - logger:error("Unsupported version: ~p", [Version]), - {error, unsupported_version} - end; - {error, Reason} -> - {error, Reason} - end. + emqx_mgmt_data_backup:import(FullFilename). download(#{filename := Filename}, _Params) -> FullFilename = filename:join([emqx:get_env(data_dir), Filename]), diff --git a/lib-ce/emqx_management/src/emqx_mgmt_cli.erl b/lib-ce/emqx_management/src/emqx_mgmt_cli.erl index 8f3a09026..d2712b5e1 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_cli.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_cli.erl @@ -41,7 +41,6 @@ , log/1 , mgmt/1 , data/1 - , modules/1 ]). -define(PROC_INFOKEYS, [status, @@ -322,44 +321,6 @@ plugins(_) -> {"plugins reload ", "Reload plugin"} ]). -%%-------------------------------------------------------------------- -%% @doc Modules Command -modules(["list"]) -> - foreach(fun(Module) -> print({module, Module}) end, emqx_modules:list()); - -modules(["load", Name]) -> - case emqx_modules:load(list_to_atom(Name)) of - ok -> - emqx_ctl:print("Module ~s loaded successfully.~n", [Name]); - {error, Reason} -> - emqx_ctl:print("Load module ~s error: ~p.~n", [Name, Reason]) - end; - -modules(["unload", Name]) -> - case emqx_modules:unload(list_to_atom(Name)) of - ok -> - emqx_ctl:print("Module ~s unloaded successfully.~n", [Name]); - {error, Reason} -> - emqx_ctl:print("Unload module ~s error: ~p.~n", [Name, Reason]) - end; - -modules(["reload", "emqx_mod_acl_internal" = Name]) -> - case emqx_modules:reload(list_to_atom(Name)) of - ok -> - emqx_ctl:print("Module ~s reloaded successfully.~n", [Name]); - {error, Reason} -> - emqx_ctl:print("Reload module ~s error: ~p.~n", [Name, Reason]) - end; -modules(["reload", Name]) -> - emqx_ctl:print("Module: ~p does not need to be reloaded.~n", [Name]); - -modules(_) -> - emqx_ctl:usage([{"modules list", "Show loaded modules"}, - {"modules load ", "Load module"}, - {"modules unload ", "Unload module"}, - {"modules reload ", "Reload module"} - ]). - %%-------------------------------------------------------------------- %% @doc vm command @@ -582,59 +543,21 @@ stop_listener(#{listen_on := ListenOn} = Listener, _Input) -> %% @doc data Command data(["export"]) -> - Rules = emqx_mgmt:export_rules(), - Resources = emqx_mgmt:export_resources(), - Blacklist = emqx_mgmt:export_blacklist(), - Apps = emqx_mgmt:export_applications(), - Users = emqx_mgmt:export_users(), - AuthMnesia = emqx_mgmt:export_auth_mnesia(), - AclMnesia = emqx_mgmt:export_acl_mnesia(), - Seconds = erlang:system_time(second), - {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), - Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), - NFilename = filename:join([emqx:get_env(data_dir), Filename]), - Version = string:sub_string(emqx_sys:version(), 1, 3), - Data = [{version, erlang:list_to_binary(Version)}, - {date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}, - {rules, Rules}, - {resources, Resources}, - {blacklist, Blacklist}, - {apps, Apps}, - {users, Users}, - {auth_mnesia, AuthMnesia}, - {acl_mnesia, AclMnesia}], - ok = filelib:ensure_dir(NFilename), - case file:write_file(NFilename, emqx_json:encode(Data)) of - ok -> + case emqx_mgmt_data_backup:export() of + {ok, _} -> emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [NFilename]); {error, Reason} -> emqx_ctl:print("The emqx data export failed due to ~p.~n", [Reason]) - end; + end; data(["import", Filename]) -> - case file:read_file(Filename) of - {ok, Json} -> - Data = emqx_json:decode(Json, [return_maps]), - Version = emqx_mgmt:to_version(maps:get(<<"version">>, Data)), - case lists:member(Version, ?VERSIONS) of - true -> - try - emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])), - emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])), - emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])), - emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])), - emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])), - _ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), - _ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])), - _ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), - _ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), - emqx_ctl:print("The emqx data has been imported successfully.~n") - catch Class:Reason:Stack -> - emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}]) - end; - false -> - emqx_ctl:print("Unsupported version: ~p~n", [Version]) - end; + case emqx_mgmt_data_backup:import(Filename) of + ok -> + emqx_ctl:print("The emqx data has been imported successfully.~n"); + {error, import_failed} -> + emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}]); + {error, unsupported_version} -> + emqx_ctl:print("Unsupported version: ~p~n", [Version]); {error, Reason} -> emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename]) end; diff --git a/lib-ce/emqx_management/src/emqx_mgmt_data_backup.erl b/lib-ce/emqx_management/src/emqx_mgmt_data_backup.erl new file mode 100644 index 000000000..c2e4be94e --- /dev/null +++ b/lib-ce/emqx_management/src/emqx_mgmt_data_backup.erl @@ -0,0 +1,557 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_data_backup). + +-ifdef(EMQX_ENTERPISE). +-export([ export_modules/0 + , export_schemas/0 + , export_confs/0 + , import_modules/1 + , import_schemas/1 + , import_confs/2 + ]). +-else. +-export([ import_resources_and_rules/3 ]). +-endif. + +-export([ export_rules/0 + , export_resources/0 + , export_blacklist/0 + , export_applications/0 + , export_users/0 + , export_auth_mnesia/0 + , export_acl_mnesia/0 + , import_rules/1 + , import_resources/1 + , import_blacklist/1 + , import_applications/1 + , import_users/1 + , import_auth_clientid/1 %% BACKW: 4.1.x + , import_auth_username/1 %% BACKW: 4.1.x + , import_auth_mnesia/2 + , import_acl_mnesia/2 + , to_version/1 + ]). + +-export([ export/0 + , import/1 + ]). + +%%-------------------------------------------------------------------- +%% Data Export and Import +%%-------------------------------------------------------------------- + +export_rules() -> + lists:map(fun({_, RuleId, _, RawSQL, _, _, _, _, _, _, Actions, Enabled, Desc}) -> + [{id, RuleId}, + {rawsql, RawSQL}, + {actions, actions_to_prop_list(Actions)}, + {enabled, Enabled}, + {description, Desc}] + end, emqx_rule_registry:get_rules()). + +export_resources() -> + lists:map(fun({_, Id, Type, Config, CreatedAt, Desc}) -> + NCreatedAt = case CreatedAt of + undefined -> null; + _ -> CreatedAt + end, + [{id, Id}, + {type, Type}, + {config, maps:to_list(Config)}, + {created_at, NCreatedAt}, + {description, Desc}] + end, emqx_rule_registry:get_resources()). + +export_blacklist() -> + lists:map(fun(#banned{who = Who, by = By, reason = Reason, at = At, until = Until}) -> + NWho = case Who of + {peerhost, Peerhost} -> {peerhost, inet:ntoa(Peerhost)}; + _ -> Who + end, + [{who, [NWho]}, {by, By}, {reason, Reason}, {at, At}, {until, Until}] + end, ets:tab2list(emqx_banned)). + +export_applications() -> + lists:map(fun({_, AppID, AppSecret, Name, Desc, Status, Expired}) -> + [{id, AppID}, {secret, AppSecret}, {name, Name}, {desc, Desc}, {status, Status}, {expired, Expired}] + end, ets:tab2list(mqtt_app)). + +export_users() -> + lists:map(fun({_, Username, Password, Tags}) -> + [{username, Username}, {password, base64:encode(Password)}, {tags, Tags}] + end, ets:tab2list(mqtt_admin)). + +export_auth_mnesia() -> + case ets:info(emqx_user) of + undefined -> []; + _ -> + lists:map(fun({_, {Type, Login}, Password, CreatedAt}) -> + [{login, Login}, {type, Type}, {password, base64:encode(Password)}, {created_at, CreatedAt}] + end, ets:tab2list(emqx_user)) + end. + +export_acl_mnesia() -> + case ets:info(emqx_acl) of + undefined -> []; + _ -> + lists:map(fun({_, Filter, Action, Access, CreatedAt}) -> + Filter1 = case Filter of + {{Type, TypeValue}, Topic} -> + [{type, Type}, {type_value, TypeValue}, {topic, Topic}]; + {Type, Topic} -> + [{type, Type}, {topic, Topic}] + end, + Filter1 ++ [{action, Action}, {access, Access}, {created_at, CreatedAt}] + end, ets:tab2list(emqx_acl)) + end. + +-ifdef(EMQX_ENTERPISE). +export_modules() -> + case ets:info(emqx_modules) of + undefined -> []; + _ -> + lists:map(fun({_, Id, Type, Config, Enabled, CreatedAt, Description}) -> + [{id, Id}, + {type, Type}, + {config, Config}, + {enabled, Enabled}, + {created_at, CreatedAt}, + {description, Description} + ] + end, ets:tab2list(emqx_modules)) + end. + +export_schemas() -> + case ets:info(emqx_schema) of + undefined -> []; + _ -> + [emqx_schema_api:format_schema(Schema) || Schema <- emqx_schema_registry:get_all_schemas()] + end. + +export_confs() -> + case ets:info(emqx_conf_info) of + undefined -> {[], []}; + _ -> + {lists:map(fun({_, Key, Confs}) -> + case Key of + {_Zone, Name} -> + [{zone, list_to_binary(Name)}, + {confs, confs_to_binary(Confs)}]; + {_Listener, Type, Name} -> + [{type, list_to_binary(Type)}, + {name, list_to_binary(Name)}, + {confs, confs_to_binary(Confs)}]; + Name -> + [{name, list_to_binary(Name)}, + {confs, confs_to_binary(Confs)}] + end + end, ets:tab2list(emqx_conf_b)), + lists:map(fun({_, {_Listener, Type, Name}, Status}) -> + [{type, list_to_binary(Type)}, + {name, list_to_binary(Name)}, + {status, Status}] + end, ets:tab2list(emqx_listeners_state))} + end. + +confs_to_binary(Confs) -> + [{list_to_binary(Key), list_to_binary(Val)} || {Key, Val} <-Confs]. + +-endif. + +import_rules(Rules) -> + lists:foreach(fun(Resource) -> + import_resource(Resource) + end, Rules). +import_rule(#{<<"id">> := RuleId, + <<"rawsql">> := RawSQL, + <<"actions">> := Actions, + <<"enabled">> := Enabled, + <<"description">> := Desc}) -> + Rule = #{id => RuleId, + rawsql => RawSQL, + actions => map_to_actions(Actions), + enabled => Enabled, + description => Desc}, + try emqx_rule_engine:create_rule(Rule) + catch throw:{resource_not_initialized, _ResId} -> + emqx_rule_engine:create_rule(Rule#{enabled => false}) + end. + +import_resources(Reources) -> + lists:foreach(fun(Resource) -> + import_resource(Resource) + end, Reources). + +import_resource(#{<<"id">> := Id, + <<"type">> := Type, + <<"config">> := Config, + <<"created_at">> := CreatedAt, + <<"description">> := Desc}) -> + NCreatedAt = case CreatedAt of + null -> undefined; + _ -> CreatedAt + end, + emqx_rule_engine:create_resource(#{id => Id, + type => any_to_atom(Type), + config => Config, + created_at => NCreatedAt, + description => Desc}). + +-ifndef(EMQX_ENTERPRISE). +import_resources_and_rules(Resources, Rules, FromVersion) + when FromVersion =:= "4.0" orelse FromVersion =:= "4.1" orelse FromVersion =:= "4.2" -> + Configs = lists:foldl(fun(#{<<"id">> := ID, + <<"type">> := <<"web_hook">>, + <<"config">> := #{<<"content_type">> := ContentType, + <<"headers">> := Headers, + <<"method">> := Method, + <<"url">> := URL}} = Resource, Acc) -> + NConfig = #{<<"connect_timeout">> => 5, + <<"request_timeout">> => 5, + <<"cacertfile">> => <<>>, + <<"certfile">> => <<>>, + <<"keyfile">> => <<>>, + <<"pool_size">> => 8, + <<"url">> => URL, + <<"verify">> => true}, + NResource = Resource#{<<"config">> := NConfig}, + import_resource(NResource), + NHeaders = maps:put(<<"content-type">>, ContentType, Headers), + [{ID, #{headers => NHeaders, method => Method}} | Acc]; + (Resource, Acc) -> + import_resource(Resource), + Acc + end, [], Resources), + lists:foreach(fun(#{<<"actions">> := Actions} = Rule) -> + NActions = apply_new_config(Actions, Configs), + import_rule(Rule#{<<"actions">> := NActions}) + end, Rules); +import_resources_and_rules(Resources, Rules, _FromVersion) -> + import_resources(Resources), + import_rules(Rules). + +apply_new_config(Actions, Configs) -> + apply_new_config(Actions, Configs, []). + +apply_new_config([], _Configs, Acc) -> + Acc; +apply_new_config([Action = #{<<"name">> := <<"data_to_webserver">>, + <<"args">> := #{<<"$resource">> := ID, + <<"path">> := Path, + <<"payload_tmpl">> := PayloadTmpl}} | More], Configs, Acc) -> + case proplists:get_value(ID, Configs, undefined) of + undefined -> + apply_new_config(More, Configs, [Action | Acc]); + #{headers := Headers, method := Method} -> + Args = #{<<"$resource">> => ID, + <<"body">> => PayloadTmpl, + <<"headers">> => Headers, + <<"method">> => Method, + <<"path">> => Path}, + apply_new_config(More, Configs, [Action#{<<"args">> := Args} | Acc]) + end. + + +-endif. + +import_blacklist(Blacklist) -> + lists:foreach(fun(#{<<"who">> := Who, + <<"by">> := By, + <<"reason">> := Reason, + <<"at">> := At, + <<"until">> := Until}) -> + NWho = case Who of + #{<<"peerhost">> := Peerhost} -> + {ok, NPeerhost} = inet:parse_address(Peerhost), + {peerhost, NPeerhost}; + #{<<"clientid">> := ClientId} -> {clientid, ClientId}; + #{<<"username">> := Username} -> {username, Username} + end, + emqx_banned:create(#banned{who = NWho, by = By, reason = Reason, at = At, until = Until}) + end, Blacklist). + +import_applications(Apps) -> + lists:foreach(fun(#{<<"id">> := AppID, + <<"secret">> := AppSecret, + <<"name">> := Name, + <<"desc">> := Desc, + <<"status">> := Status, + <<"expired">> := Expired}) -> + NExpired = case is_integer(Expired) of + true -> Expired; + false -> undefined + end, + emqx_mgmt_auth:force_add_app(AppID, Name, AppSecret, Desc, Status, NExpired) + end, Apps). + +import_users(Users) -> + lists:foreach(fun(#{<<"username">> := Username, + <<"password">> := Password, + <<"tags">> := Tags}) -> + NPassword = base64:decode(Password), + emqx_dashboard_admin:force_add_user(Username, NPassword, Tags) + end, Users). + +import_auth_clientid(Lists) -> + case ets:info(emqx_user) of + undefined -> ok; + _ -> + lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password}) -> + mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)}) + end, Lists) + end. + +import_auth_username(Lists) -> + case ets:info(emqx_user) of + undefined -> ok; + _ -> + lists:foreach(fun(#{<<"username">> := Username, <<"password">> := Password}) -> + mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)}) + end, Lists) + end. + +import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse + FromVersion =:= "4.1" -> + case ets:info(emqx_user) of + undefined -> ok; + _ -> + CreatedAt = erlang:system_time(millisecond), + lists:foreach(fun(#{<<"login">> := Login, + <<"password">> := Password}) -> + mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt}) + end, Auths) + end; + +import_auth_mnesia(Auths, _) -> + case ets:info(emqx_user) of + undefined -> ok; + _ -> + lists:foreach(fun(#{<<"login">> := Login, + <<"type">> := Type, + <<"password">> := Password, + <<"created_at">> := CreatedAt }) -> + mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt}) + end, Auths) + end. + +import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse + FromVersion =:= "4.1" -> + case ets:info(emqx_acl) of + undefined -> ok; + _ -> + CreatedAt = erlang:system_time(millisecond), + lists:foreach(fun(#{<<"login">> := Login, + <<"topic">> := Topic, + <<"allow">> := Allow, + <<"action">> := Action}) -> + Allow1 = case any_to_atom(Allow) of + true -> allow; + false -> deny + end, + mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt}) + end, Acls) + end; + +import_acl_mnesia(Acls, _) -> + case ets:info(emqx_acl) of + undefined -> ok; + _ -> + lists:foreach(fun(Map = #{<<"action">> := Action, + <<"access">> := Access, + <<"created_at">> := CreatedAt}) -> + Filter = case maps:get(<<"type_value">>, Map, undefined) of + undefined -> + {any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)}; + Value -> + {{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)} + end, + mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt}) + end, Acls) + end. + +-ifdef(EMQX_ENTERPRISE). +import_modules(Modules) -> + case ets:info(emqx_modules) of + undefined -> []; + _ -> + lists:foreach(fun(#{<<"id">> := Id, + <<"type">> := Type, + <<"config">> := Config, + <<"enabled">> := Enabled, + <<"created_at">> := CreatedAt, + <<"description">> := Description}) -> + emqx_modules:import_module({Id, any_to_atom(Type), Config, Enabled, CreatedAt, Description}) + end, Modules) + end. + + +import_schemas(Schemas) -> + case ets:info(emqx_schema) of + undefined -> ok; + _ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas] + end. + +import_confs(Configs, ListenersState) -> + case ets:info(emqx_conf_info) of + undefined -> ok; + _ -> + emqx_conf:import_confs(Configs, ListenersState) + end. + +-endif. + +-ifdef(EMQX_ENTERPRISE). +export() -> + Modules = export_modules(), + Rules = export_rules(), + Resources = export_resources(), + Blacklist = export_blacklist(), + Apps = export_applications(), + Users = export_users(), + AuthMnesia = export_auth_mnesia(), + AclMnesia = export_acl_mnesia(), + Schemas = export_schemas(), + {Configs, State} = export_confs(), + Seconds = erlang:system_time(second), + {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), + Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), + NFilename = filename:join([emqx:get_env(data_dir), Filename]), + Version = string:sub_string(emqx_sys:version(), 1, 3), + Data = [{version, erlang:list_to_binary(Version)}, + {date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}, + {modules, Modules}, + {rules, Rules}, + {resources, Resources}, + {blacklist, Blacklist}, + {apps, Apps}, + {users, Users}, + {auth_mnesia, AuthMnesia}, + {acl_mnesia, AclMnesia}, + {schemas, Schemas}, + {configs, Configs}, + {listeners_state, State}], + write_file(NFilename, Data). + +import(Filename) -> + case file:read_file(FullFilename) of + {ok, Json} -> + Data = emqx_json:decode(Json, [return_maps]), + Version = to_version(maps:get(<<"version">>, Data)), + case lists:member(Version, ?VERSIONS) of + true -> + try + import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])), + import_resources(maps:get(<<"resources">>, Data, [])), + import_rules(maps:get(<<"rules">>, Data, [])), + import_blacklist(maps:get(<<"blacklist">>, Data, [])), + import_applications(maps:get(<<"apps">>, Data, [])), + import_users(maps:get(<<"users">>, Data, [])), + import_modules(maps:get(<<"modules">>, Data, [])), + import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), + import_auth_username(maps:get(<<"auth_username">>, Data, [])), + import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), + import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), + import_schemas(maps:get(<<"schemas">>, Data, [])), + logger:debug("The emqx data has been imported successfully"), + ok + catch Class:Reason:Stack -> + logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]), + {error, import_failed} + end; + false -> + logger:error("Unsupported version: ~p", [Version]), + {error, unsupported_version} + end; + {error, Reason} -> + {error, Reason} + end. + +-else. +export() -> + Rules = export_rules(), + Resources = export_resources(), + Blacklist = export_blacklist(), + Apps = export_applications(), + Users = export_users(), + AuthMnesia = export_auth_mnesia(), + AclMnesia = export_acl_mnesia(), + Seconds = erlang:system_time(second), + {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), + Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), + NFilename = filename:join([emqx:get_env(data_dir), Filename]), + Version = string:sub_string(emqx_sys:version(), 1, 3), + Data = [{version, erlang:list_to_binary(Version)}, + {date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}, + {rules, Rules}, + {resources, Resources}, + {blacklist, Blacklist}, + {apps, Apps}, + {users, Users}, + {auth_mnesia, AuthMnesia}, + {acl_mnesia, AclMnesia}], + write_file(NFilename, Data). + +import(Filename) -> + case file:read_file(FullFilename) of + {ok, Json} -> + Data = emqx_json:decode(Json, [return_maps]), + Version = to_version(maps:get(<<"version">>, Data)), + case lists:member(Version, ?VERSIONS) of + true -> + try + import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version), + import_blacklist(maps:get(<<"blacklist">>, Data, [])), + import_applications(maps:get(<<"apps">>, Data, [])), + import_users(maps:get(<<"users">>, Data, [])), + import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), + import_auth_username(maps:get(<<"auth_username">>, Data, [])), + import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), + import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), + logger:debug("The emqx data has been imported successfully"), + ok + catch Class:Reason:Stack -> + logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]), + {error, import_failed} + end; + false -> + logger:error("Unsupported version: ~p", [Version]), + {error, unsupported_version} + end; + {error, Reason} -> + {error, Reason} + end. +-endif. + +write_file(Filename, Data) -> + ok = filelib:ensure_dir(Filename), + case file:write_file(Filename, emqx_json:encode(Data)) of + ok -> + case file:read_file_info(Filename) of + {ok, #file_info{size = Size, ctime = {{Y, M, D}, {H, MM, S}}}} -> + CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]), + {ok, [{filename, list_to_binary(Filename)}, + {size, Size}, + {created_at, list_to_binary(CreatedAt)}, + {node, node()} + ]}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end.