chore(emqx_management): abstract data backup

This commit is contained in:
zhanghongtong 2021-02-25 14:07:42 +08:00 committed by turtleDeng
parent 4e9330a37e
commit 033f8619eb
4 changed files with 571 additions and 566 deletions

View File

@ -36,15 +36,6 @@
%% Metrics and Stats %% Metrics and Stats
-export([ get_metrics/0 -export([ get_metrics/0
, get_metrics/1 , get_metrics/1
, get_all_topic_metrics/0
, get_topic_metrics/1
, get_topic_metrics/2
, register_topic_metrics/1
, register_topic_metrics/2
, unregister_topic_metrics/1
, unregister_topic_metrics/2
, unregister_all_topic_metrics/0
, unregister_all_topic_metrics/1
, get_stats/0 , get_stats/0
, get_stats/1 , get_stats/1
]). ]).
@ -91,14 +82,6 @@
, reload_plugin/2 , reload_plugin/2
]). ]).
%% Modules
-export([ list_modules/0
, list_modules/1
, load_module/2
, unload_module/2
, reload_module/2
]).
%% Listeners %% Listeners
-export([ list_listeners/0 -export([ list_listeners/0
, list_listeners/1 , list_listeners/1
@ -118,26 +101,6 @@
, delete_banned/1 , delete_banned/1
]). ]).
%% Export/Import
-export([ export_rules/0
, export_resources/0
, export_blacklist/0
, export_applications/0
, export_users/0
, export_auth_mnesia/0
, export_acl_mnesia/0
, import_rules/1
, import_resources/1
, import_resources_and_rules/3
, import_blacklist/1
, import_applications/1
, import_users/1
, import_auth_clientid/1 %% BACKW: 4.1.x
, import_auth_username/1 %% BACKW: 4.1.x
, import_auth_mnesia/2
, import_acl_mnesia/2
, to_version/1
]).
-export([ enable_telemetry/0 -export([ enable_telemetry/0
, disable_telemetry/0 , disable_telemetry/0
@ -217,73 +180,6 @@ get_metrics(Node) when Node =:= node() ->
get_metrics(Node) -> get_metrics(Node) ->
rpc_call(Node, get_metrics, [Node]). rpc_call(Node, get_metrics, [Node]).
get_all_topic_metrics() ->
lists:foldl(fun(Topic, Acc) ->
case get_topic_metrics(Topic) of
{error, _Reason} ->
Acc;
Metrics ->
[#{topic => Topic, metrics => Metrics} | Acc]
end
end, [], emqx_mod_topic_metrics:all_registered_topics()).
get_topic_metrics(Topic) ->
lists:foldl(fun(Node, Acc) ->
case get_topic_metrics(Node, Topic) of
{error, _Reason} ->
Acc;
Metrics ->
case Acc of
[] -> Metrics;
_ ->
lists:foldl(fun({K, V}, Acc0) ->
[{K, V + proplists:get_value(K, Metrics, 0)} | Acc0]
end, [], Acc)
end
end
end, [], ekka_mnesia:running_nodes()).
get_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:metrics(Topic);
get_topic_metrics(Node, Topic) ->
rpc_call(Node, get_topic_metrics, [Node, Topic]).
register_topic_metrics(Topic) ->
Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
register_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:register(Topic);
register_topic_metrics(Node, Topic) ->
rpc_call(Node, register_topic_metrics, [Node, Topic]).
unregister_topic_metrics(Topic) ->
Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
unregister_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:unregister(Topic);
unregister_topic_metrics(Node, Topic) ->
rpc_call(Node, unregister_topic_metrics, [Node, Topic]).
unregister_all_topic_metrics() ->
Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
unregister_all_topic_metrics(Node) when Node =:= node() ->
emqx_mod_topic_metrics:unregister_all();
unregister_all_topic_metrics(Node) ->
rpc_call(Node, unregister_topic_metrics, [Node]).
get_stats() -> get_stats() ->
[{Node, get_stats(Node)} || Node <- ekka_mnesia:running_nodes()]. [{Node, get_stats(Node)} || Node <- ekka_mnesia:running_nodes()].
@ -504,33 +400,6 @@ reload_plugin(Node, Plugin) when Node =:= node() ->
reload_plugin(Node, Plugin) -> reload_plugin(Node, Plugin) ->
rpc_call(Node, reload_plugin, [Node, Plugin]). rpc_call(Node, reload_plugin, [Node, Plugin]).
%%--------------------------------------------------------------------
%% Modules
%%--------------------------------------------------------------------
list_modules() ->
[{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()].
list_modules(Node) when Node =:= node() ->
emqx_modules:list();
list_modules(Node) ->
rpc_call(Node, list_modules, [Node]).
load_module(Node, Module) when Node =:= node() ->
emqx_modules:load(Module);
load_module(Node, Module) ->
rpc_call(Node, load_module, [Node, Module]).
unload_module(Node, Module) when Node =:= node() ->
emqx_modules:unload(Module);
unload_module(Node, Module) ->
rpc_call(Node, unload_module, [Node, Module]).
reload_module(Node, Module) when Node =:= node() ->
emqx_modules:reload(Module);
reload_module(Node, Module) ->
rpc_call(Node, reload_module, [Node, Module]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Listeners %% Listeners
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -613,282 +482,6 @@ create_banned(Banned) ->
delete_banned(Who) -> delete_banned(Who) ->
emqx_banned:delete(Who). emqx_banned:delete(Who).
%%--------------------------------------------------------------------
%% Data Export and Import
%%--------------------------------------------------------------------
export_rules() ->
lists:map(fun({_, RuleId, _, RawSQL, _, _, _, _, _, _, Actions, Enabled, Desc}) ->
[{id, RuleId},
{rawsql, RawSQL},
{actions, actions_to_prop_list(Actions)},
{enabled, Enabled},
{description, Desc}]
end, emqx_rule_registry:get_rules()).
export_resources() ->
lists:map(fun({_, Id, Type, Config, CreatedAt, Desc}) ->
NCreatedAt = case CreatedAt of
undefined -> null;
_ -> CreatedAt
end,
[{id, Id},
{type, Type},
{config, maps:to_list(Config)},
{created_at, NCreatedAt},
{description, Desc}]
end, emqx_rule_registry:get_resources()).
export_blacklist() ->
lists:map(fun(#banned{who = Who, by = By, reason = Reason, at = At, until = Until}) ->
NWho = case Who of
{peerhost, Peerhost} -> {peerhost, inet:ntoa(Peerhost)};
_ -> Who
end,
[{who, [NWho]}, {by, By}, {reason, Reason}, {at, At}, {until, Until}]
end, ets:tab2list(emqx_banned)).
export_applications() ->
lists:map(fun({_, AppID, AppSecret, Name, Desc, Status, Expired}) ->
[{id, AppID}, {secret, AppSecret}, {name, Name}, {desc, Desc}, {status, Status}, {expired, Expired}]
end, ets:tab2list(mqtt_app)).
export_users() ->
lists:map(fun({_, Username, Password, Tags}) ->
[{username, Username}, {password, base64:encode(Password)}, {tags, Tags}]
end, ets:tab2list(mqtt_admin)).
export_auth_mnesia() ->
case ets:info(emqx_user) of
undefined -> [];
_ ->
lists:map(fun({_, {Type, Login}, Password, CreatedAt}) ->
[{login, Login}, {type, Type}, {password, base64:encode(Password)}, {created_at, CreatedAt}]
end, ets:tab2list(emqx_user))
end.
export_acl_mnesia() ->
case ets:info(emqx_acl) of
undefined -> [];
_ ->
lists:map(fun({_, Filter, Action, Access, CreatedAt}) ->
Filter1 = case Filter of
{{Type, TypeValue}, Topic} ->
[{type, Type}, {type_value, TypeValue}, {topic, Topic}];
{Type, Topic} ->
[{type, Type}, {topic, Topic}]
end,
Filter1 ++ [{action, Action}, {access, Access}, {created_at, CreatedAt}]
end, ets:tab2list(emqx_acl))
end.
import_rules(Rules) ->
lists:foreach(fun(Rule) ->
import_rule(Rule)
end, Rules).
import_resources(Reources) ->
lists:foreach(fun(Resource) ->
import_resource(Resource)
end, Reources).
import_rule(#{<<"id">> := RuleId,
<<"rawsql">> := RawSQL,
<<"actions">> := Actions,
<<"enabled">> := Enabled,
<<"description">> := Desc}) ->
Rule = #{id => RuleId,
rawsql => RawSQL,
actions => map_to_actions(Actions),
enabled => Enabled,
description => Desc},
try emqx_rule_engine:create_rule(Rule)
catch throw:{resource_not_initialized, _ResId} ->
emqx_rule_engine:create_rule(Rule#{enabled => false})
end.
import_resource(#{<<"id">> := Id,
<<"type">> := Type,
<<"config">> := Config,
<<"created_at">> := CreatedAt,
<<"description">> := Desc}) ->
NCreatedAt = case CreatedAt of
null -> undefined;
_ -> CreatedAt
end,
emqx_rule_engine:create_resource(#{id => Id,
type => any_to_atom(Type),
config => Config,
created_at => NCreatedAt,
description => Desc}).
import_resources_and_rules(Resources, Rules, FromVersion)
when FromVersion =:= "4.0" orelse FromVersion =:= "4.1" orelse FromVersion =:= "4.2" ->
Configs = lists:foldl(fun(#{<<"id">> := ID,
<<"type">> := <<"web_hook">>,
<<"config">> := #{<<"content_type">> := ContentType,
<<"headers">> := Headers,
<<"method">> := Method,
<<"url">> := URL}} = Resource, Acc) ->
NConfig = #{<<"connect_timeout">> => 5,
<<"request_timeout">> => 5,
<<"cacertfile">> => <<>>,
<<"certfile">> => <<>>,
<<"keyfile">> => <<>>,
<<"pool_size">> => 8,
<<"url">> => URL,
<<"verify">> => true},
NResource = Resource#{<<"config">> := NConfig},
import_resource(NResource),
NHeaders = maps:put(<<"content-type">>, ContentType, Headers),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
(Resource, Acc) ->
import_resource(Resource),
Acc
end, [], Resources),
lists:foreach(fun(#{<<"actions">> := Actions} = Rule) ->
NActions = apply_new_config(Actions, Configs),
import_rule(Rule#{<<"actions">> := NActions})
end, Rules);
import_resources_and_rules(Resources, Rules, _FromVersion) ->
import_resources(Resources),
import_rules(Rules).
apply_new_config(Actions, Configs) ->
apply_new_config(Actions, Configs, []).
apply_new_config([], _Configs, Acc) ->
Acc;
apply_new_config([Action = #{<<"name">> := <<"data_to_webserver">>,
<<"args">> := #{<<"$resource">> := ID,
<<"path">> := Path,
<<"payload_tmpl">> := PayloadTmpl}} | More], Configs, Acc) ->
case proplists:get_value(ID, Configs, undefined) of
undefined ->
apply_new_config(More, Configs, [Action | Acc]);
#{headers := Headers, method := Method} ->
Args = #{<<"$resource">> => ID,
<<"body">> => PayloadTmpl,
<<"headers">> => Headers,
<<"method">> => Method,
<<"path">> => Path},
apply_new_config(More, Configs, [Action#{<<"args">> := Args} | Acc])
end.
import_blacklist(Blacklist) ->
lists:foreach(fun(#{<<"who">> := Who,
<<"by">> := By,
<<"reason">> := Reason,
<<"at">> := At,
<<"until">> := Until}) ->
NWho = case Who of
#{<<"peerhost">> := Peerhost} ->
{ok, NPeerhost} = inet:parse_address(Peerhost),
{peerhost, NPeerhost};
#{<<"clientid">> := ClientId} -> {clientid, ClientId};
#{<<"username">> := Username} -> {username, Username}
end,
emqx_banned:create(#banned{who = NWho, by = By, reason = Reason, at = At, until = Until})
end, Blacklist).
import_applications(Apps) ->
lists:foreach(fun(#{<<"id">> := AppID,
<<"secret">> := AppSecret,
<<"name">> := Name,
<<"desc">> := Desc,
<<"status">> := Status,
<<"expired">> := Expired}) ->
NExpired = case is_integer(Expired) of
true -> Expired;
false -> undefined
end,
emqx_mgmt_auth:force_add_app(AppID, Name, AppSecret, Desc, Status, NExpired)
end, Apps).
import_users(Users) ->
lists:foreach(fun(#{<<"username">> := Username,
<<"password">> := Password,
<<"tags">> := Tags}) ->
NPassword = base64:decode(Password),
emqx_dashboard_admin:force_add_user(Username, NPassword, Tags)
end, Users).
import_auth_clientid(Lists) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
[ mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)})
|| #{<<"clientid">> := Clientid, <<"password">> := Password} <- Lists ]
end.
import_auth_username(Lists) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
[ mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)})
|| #{<<"username">> := Username, <<"password">> := Password} <- Lists ]
end.
import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
[ begin
mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt})
end
|| #{<<"login">> := Login,
<<"password">> := Password} <- Auths ]
end;
import_auth_mnesia(Auths, _) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
[ mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt})
|| #{<<"login">> := Login,
<<"type">> := Type,
<<"password">> := Password,
<<"created_at">> := CreatedAt } <- Auths ]
end.
import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
case ets:info(emqx_acl) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
[begin
Allow1 = case any_to_atom(Allow) of
true -> allow;
false -> deny
end,
mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt})
end || #{<<"login">> := Login,
<<"topic">> := Topic,
<<"allow">> := Allow,
<<"action">> := Action} <- Acls]
end;
import_acl_mnesia(Acls, _) ->
case ets:info(emqx_acl) of
undefined -> ok;
_ ->
[ begin
Filter = case maps:get(<<"type_value">>, Map, undefined) of
undefined ->
{any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)};
Value ->
{{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)}
end,
mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt})
end
|| Map = #{<<"action">> := Action,
<<"access">> := Access,
<<"created_at">> := CreatedAt} <- Acls ]
end.
any_to_atom(L) when is_list(L) -> list_to_atom(L); any_to_atom(L) when is_list(L) -> list_to_atom(L);
any_to_atom(B) when is_binary(B) -> binary_to_atom(B, utf8); any_to_atom(B) when is_binary(B) -> binary_to_atom(B, utf8);
@ -985,4 +578,3 @@ action_to_prop_list({action_instance, ActionInstId, Name, FallbackActions, Args}
{name, Name}, {name, Name},
{fallbacks, actions_to_prop_list(FallbackActions)}, {fallbacks, actions_to_prop_list(FallbackActions)},
{args, Args}]. {args, Args}].

View File

@ -75,48 +75,8 @@
]). ]).
export(_Bindings, _Params) -> export(_Bindings, _Params) ->
Rules = emqx_mgmt:export_rules(), return(emqx_mgmt_data_backup:export()).
Resources = emqx_mgmt:export_resources(),
Blacklist = emqx_mgmt:export_blacklist(),
Apps = emqx_mgmt:export_applications(),
Users = emqx_mgmt:export_users(),
AuthMnesia = emqx_mgmt:export_auth_mnesia(),
AclMnesia = emqx_mgmt:export_acl_mnesia(),
Seconds = erlang:system_time(second),
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]),
Version = string:sub_string(emqx_sys:version(), 1, 3),
Data = [{version, erlang:list_to_binary(Version)},
{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))},
{rules, Rules},
{resources, Resources},
{blacklist, Blacklist},
{apps, Apps},
{users, Users},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia}
],
Bin = emqx_json:encode(Data),
ok = filelib:ensure_dir(NFilename),
case file:write_file(NFilename, Bin) of
ok ->
case file:read_file_info(NFilename) of
{ok, #file_info{size = Size, ctime = {{Y0, M0, D0}, {H0, MM0, S0}}}} ->
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y0, M0, D0, H0, MM0, S0]),
return({ok, [{filename, list_to_binary(Filename)},
{size, Size},
{created_at, list_to_binary(CreatedAt)},
{node, node()}
]});
{error, Reason} ->
return({error, Reason})
end;
{error, Reason} ->
return({error, Reason})
end.
list_exported(_Bindings, _Params) -> list_exported(_Bindings, _Params) ->
List = [ rpc:call(Node, ?MODULE, get_list_exported, []) || Node <- ekka_mnesia:running_nodes() ], List = [ rpc:call(Node, ?MODULE, get_list_exported, []) || Node <- ekka_mnesia:running_nodes() ],
NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, lists:append(List))), NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, lists:append(List))),
@ -158,7 +118,7 @@ import(_Bindings, Params) ->
case lists:member(Node, case lists:member(Node,
[ erlang:atom_to_binary(N, utf8) || N <- ekka_mnesia:running_nodes() ] [ erlang:atom_to_binary(N, utf8) || N <- ekka_mnesia:running_nodes() ]
) of ) of
true -> rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename]); true -> return(rpc:call(erlang:binary_to_atom(Node, utf8), ?MODULE, do_import, [Filename]));
false -> return({error, no_existent_node}) false -> return({error, no_existent_node})
end end
end, end,
@ -167,34 +127,7 @@ import(_Bindings, Params) ->
do_import(Filename) -> do_import(Filename) ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]), FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
case file:read_file(FullFilename) of emqx_mgmt_data_backup:import(FullFilename).
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = emqx_mgmt:to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
emqx_mgmt:import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version),
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
_ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
_ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
_ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
_ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
{error, Reason} ->
{error, Reason}
end.
download(#{filename := Filename}, _Params) -> download(#{filename := Filename}, _Params) ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]), FullFilename = filename:join([emqx:get_env(data_dir), Filename]),

View File

@ -41,7 +41,6 @@
, log/1 , log/1
, mgmt/1 , mgmt/1
, data/1 , data/1
, modules/1
]). ]).
-define(PROC_INFOKEYS, [status, -define(PROC_INFOKEYS, [status,
@ -322,44 +321,6 @@ plugins(_) ->
{"plugins reload <Plugin>", "Reload plugin"} {"plugins reload <Plugin>", "Reload plugin"}
]). ]).
%%--------------------------------------------------------------------
%% @doc Modules Command
modules(["list"]) ->
foreach(fun(Module) -> print({module, Module}) end, emqx_modules:list());
modules(["load", Name]) ->
case emqx_modules:load(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s loaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Load module ~s error: ~p.~n", [Name, Reason])
end;
modules(["unload", Name]) ->
case emqx_modules:unload(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s unloaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Unload module ~s error: ~p.~n", [Name, Reason])
end;
modules(["reload", "emqx_mod_acl_internal" = Name]) ->
case emqx_modules:reload(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s reloaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Reload module ~s error: ~p.~n", [Name, Reason])
end;
modules(["reload", Name]) ->
emqx_ctl:print("Module: ~p does not need to be reloaded.~n", [Name]);
modules(_) ->
emqx_ctl:usage([{"modules list", "Show loaded modules"},
{"modules load <Module>", "Load module"},
{"modules unload <Module>", "Unload module"},
{"modules reload <Module>", "Reload module"}
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc vm command %% @doc vm command
@ -582,59 +543,21 @@ stop_listener(#{listen_on := ListenOn} = Listener, _Input) ->
%% @doc data Command %% @doc data Command
data(["export"]) -> data(["export"]) ->
Rules = emqx_mgmt:export_rules(), case emqx_mgmt_data_backup:export() of
Resources = emqx_mgmt:export_resources(), {ok, _} ->
Blacklist = emqx_mgmt:export_blacklist(),
Apps = emqx_mgmt:export_applications(),
Users = emqx_mgmt:export_users(),
AuthMnesia = emqx_mgmt:export_auth_mnesia(),
AclMnesia = emqx_mgmt:export_acl_mnesia(),
Seconds = erlang:system_time(second),
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]),
Version = string:sub_string(emqx_sys:version(), 1, 3),
Data = [{version, erlang:list_to_binary(Version)},
{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))},
{rules, Rules},
{resources, Resources},
{blacklist, Blacklist},
{apps, Apps},
{users, Users},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia}],
ok = filelib:ensure_dir(NFilename),
case file:write_file(NFilename, emqx_json:encode(Data)) of
ok ->
emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [NFilename]); emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [NFilename]);
{error, Reason} -> {error, Reason} ->
emqx_ctl:print("The emqx data export failed due to ~p.~n", [Reason]) emqx_ctl:print("The emqx data export failed due to ~p.~n", [Reason])
end; end;
data(["import", Filename]) -> data(["import", Filename]) ->
case file:read_file(Filename) of case emqx_mgmt_data_backup:import(Filename) of
{ok, Json} -> ok ->
Data = emqx_json:decode(Json, [return_maps]), emqx_ctl:print("The emqx data has been imported successfully.~n");
Version = emqx_mgmt:to_version(maps:get(<<"version">>, Data)), {error, import_failed} ->
case lists:member(Version, ?VERSIONS) of emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}]);
true -> {error, unsupported_version} ->
try emqx_ctl:print("Unsupported version: ~p~n", [Version]);
emqx_mgmt:import_resources(maps:get(<<"resources">>, Data, [])),
emqx_mgmt:import_rules(maps:get(<<"rules">>, Data, [])),
emqx_mgmt:import_blacklist(maps:get(<<"blacklist">>, Data, [])),
emqx_mgmt:import_applications(maps:get(<<"apps">>, Data, [])),
emqx_mgmt:import_users(maps:get(<<"users">>, Data, [])),
_ = emqx_mgmt:import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
_ = emqx_mgmt:import_auth_username(maps:get(<<"auth_username">>, Data, [])),
_ = emqx_mgmt:import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
_ = emqx_mgmt:import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
emqx_ctl:print("The emqx data has been imported successfully.~n")
catch Class:Reason:Stack ->
emqx_ctl:print("The emqx data import failed due: ~0p~n", [{Class,Reason,Stack}])
end;
false ->
emqx_ctl:print("Unsupported version: ~p~n", [Version])
end;
{error, Reason} -> {error, Reason} ->
emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename]) emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename])
end; end;

View File

@ -0,0 +1,557 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_data_backup).
-ifdef(EMQX_ENTERPISE).
-export([ export_modules/0
, export_schemas/0
, export_confs/0
, import_modules/1
, import_schemas/1
, import_confs/2
]).
-else.
-export([ import_resources_and_rules/3 ]).
-endif.
-export([ export_rules/0
, export_resources/0
, export_blacklist/0
, export_applications/0
, export_users/0
, export_auth_mnesia/0
, export_acl_mnesia/0
, import_rules/1
, import_resources/1
, import_blacklist/1
, import_applications/1
, import_users/1
, import_auth_clientid/1 %% BACKW: 4.1.x
, import_auth_username/1 %% BACKW: 4.1.x
, import_auth_mnesia/2
, import_acl_mnesia/2
, to_version/1
]).
-export([ export/0
, import/1
]).
%%--------------------------------------------------------------------
%% Data Export and Import
%%--------------------------------------------------------------------
export_rules() ->
lists:map(fun({_, RuleId, _, RawSQL, _, _, _, _, _, _, Actions, Enabled, Desc}) ->
[{id, RuleId},
{rawsql, RawSQL},
{actions, actions_to_prop_list(Actions)},
{enabled, Enabled},
{description, Desc}]
end, emqx_rule_registry:get_rules()).
export_resources() ->
lists:map(fun({_, Id, Type, Config, CreatedAt, Desc}) ->
NCreatedAt = case CreatedAt of
undefined -> null;
_ -> CreatedAt
end,
[{id, Id},
{type, Type},
{config, maps:to_list(Config)},
{created_at, NCreatedAt},
{description, Desc}]
end, emqx_rule_registry:get_resources()).
export_blacklist() ->
lists:map(fun(#banned{who = Who, by = By, reason = Reason, at = At, until = Until}) ->
NWho = case Who of
{peerhost, Peerhost} -> {peerhost, inet:ntoa(Peerhost)};
_ -> Who
end,
[{who, [NWho]}, {by, By}, {reason, Reason}, {at, At}, {until, Until}]
end, ets:tab2list(emqx_banned)).
export_applications() ->
lists:map(fun({_, AppID, AppSecret, Name, Desc, Status, Expired}) ->
[{id, AppID}, {secret, AppSecret}, {name, Name}, {desc, Desc}, {status, Status}, {expired, Expired}]
end, ets:tab2list(mqtt_app)).
export_users() ->
lists:map(fun({_, Username, Password, Tags}) ->
[{username, Username}, {password, base64:encode(Password)}, {tags, Tags}]
end, ets:tab2list(mqtt_admin)).
export_auth_mnesia() ->
case ets:info(emqx_user) of
undefined -> [];
_ ->
lists:map(fun({_, {Type, Login}, Password, CreatedAt}) ->
[{login, Login}, {type, Type}, {password, base64:encode(Password)}, {created_at, CreatedAt}]
end, ets:tab2list(emqx_user))
end.
export_acl_mnesia() ->
case ets:info(emqx_acl) of
undefined -> [];
_ ->
lists:map(fun({_, Filter, Action, Access, CreatedAt}) ->
Filter1 = case Filter of
{{Type, TypeValue}, Topic} ->
[{type, Type}, {type_value, TypeValue}, {topic, Topic}];
{Type, Topic} ->
[{type, Type}, {topic, Topic}]
end,
Filter1 ++ [{action, Action}, {access, Access}, {created_at, CreatedAt}]
end, ets:tab2list(emqx_acl))
end.
-ifdef(EMQX_ENTERPISE).
export_modules() ->
case ets:info(emqx_modules) of
undefined -> [];
_ ->
lists:map(fun({_, Id, Type, Config, Enabled, CreatedAt, Description}) ->
[{id, Id},
{type, Type},
{config, Config},
{enabled, Enabled},
{created_at, CreatedAt},
{description, Description}
]
end, ets:tab2list(emqx_modules))
end.
export_schemas() ->
case ets:info(emqx_schema) of
undefined -> [];
_ ->
[emqx_schema_api:format_schema(Schema) || Schema <- emqx_schema_registry:get_all_schemas()]
end.
export_confs() ->
case ets:info(emqx_conf_info) of
undefined -> {[], []};
_ ->
{lists:map(fun({_, Key, Confs}) ->
case Key of
{_Zone, Name} ->
[{zone, list_to_binary(Name)},
{confs, confs_to_binary(Confs)}];
{_Listener, Type, Name} ->
[{type, list_to_binary(Type)},
{name, list_to_binary(Name)},
{confs, confs_to_binary(Confs)}];
Name ->
[{name, list_to_binary(Name)},
{confs, confs_to_binary(Confs)}]
end
end, ets:tab2list(emqx_conf_b)),
lists:map(fun({_, {_Listener, Type, Name}, Status}) ->
[{type, list_to_binary(Type)},
{name, list_to_binary(Name)},
{status, Status}]
end, ets:tab2list(emqx_listeners_state))}
end.
confs_to_binary(Confs) ->
[{list_to_binary(Key), list_to_binary(Val)} || {Key, Val} <-Confs].
-endif.
import_rules(Rules) ->
lists:foreach(fun(Resource) ->
import_resource(Resource)
end, Rules).
import_rule(#{<<"id">> := RuleId,
<<"rawsql">> := RawSQL,
<<"actions">> := Actions,
<<"enabled">> := Enabled,
<<"description">> := Desc}) ->
Rule = #{id => RuleId,
rawsql => RawSQL,
actions => map_to_actions(Actions),
enabled => Enabled,
description => Desc},
try emqx_rule_engine:create_rule(Rule)
catch throw:{resource_not_initialized, _ResId} ->
emqx_rule_engine:create_rule(Rule#{enabled => false})
end.
import_resources(Reources) ->
lists:foreach(fun(Resource) ->
import_resource(Resource)
end, Reources).
import_resource(#{<<"id">> := Id,
<<"type">> := Type,
<<"config">> := Config,
<<"created_at">> := CreatedAt,
<<"description">> := Desc}) ->
NCreatedAt = case CreatedAt of
null -> undefined;
_ -> CreatedAt
end,
emqx_rule_engine:create_resource(#{id => Id,
type => any_to_atom(Type),
config => Config,
created_at => NCreatedAt,
description => Desc}).
-ifndef(EMQX_ENTERPRISE).
import_resources_and_rules(Resources, Rules, FromVersion)
when FromVersion =:= "4.0" orelse FromVersion =:= "4.1" orelse FromVersion =:= "4.2" ->
Configs = lists:foldl(fun(#{<<"id">> := ID,
<<"type">> := <<"web_hook">>,
<<"config">> := #{<<"content_type">> := ContentType,
<<"headers">> := Headers,
<<"method">> := Method,
<<"url">> := URL}} = Resource, Acc) ->
NConfig = #{<<"connect_timeout">> => 5,
<<"request_timeout">> => 5,
<<"cacertfile">> => <<>>,
<<"certfile">> => <<>>,
<<"keyfile">> => <<>>,
<<"pool_size">> => 8,
<<"url">> => URL,
<<"verify">> => true},
NResource = Resource#{<<"config">> := NConfig},
import_resource(NResource),
NHeaders = maps:put(<<"content-type">>, ContentType, Headers),
[{ID, #{headers => NHeaders, method => Method}} | Acc];
(Resource, Acc) ->
import_resource(Resource),
Acc
end, [], Resources),
lists:foreach(fun(#{<<"actions">> := Actions} = Rule) ->
NActions = apply_new_config(Actions, Configs),
import_rule(Rule#{<<"actions">> := NActions})
end, Rules);
import_resources_and_rules(Resources, Rules, _FromVersion) ->
import_resources(Resources),
import_rules(Rules).
apply_new_config(Actions, Configs) ->
apply_new_config(Actions, Configs, []).
apply_new_config([], _Configs, Acc) ->
Acc;
apply_new_config([Action = #{<<"name">> := <<"data_to_webserver">>,
<<"args">> := #{<<"$resource">> := ID,
<<"path">> := Path,
<<"payload_tmpl">> := PayloadTmpl}} | More], Configs, Acc) ->
case proplists:get_value(ID, Configs, undefined) of
undefined ->
apply_new_config(More, Configs, [Action | Acc]);
#{headers := Headers, method := Method} ->
Args = #{<<"$resource">> => ID,
<<"body">> => PayloadTmpl,
<<"headers">> => Headers,
<<"method">> => Method,
<<"path">> => Path},
apply_new_config(More, Configs, [Action#{<<"args">> := Args} | Acc])
end.
-endif.
import_blacklist(Blacklist) ->
lists:foreach(fun(#{<<"who">> := Who,
<<"by">> := By,
<<"reason">> := Reason,
<<"at">> := At,
<<"until">> := Until}) ->
NWho = case Who of
#{<<"peerhost">> := Peerhost} ->
{ok, NPeerhost} = inet:parse_address(Peerhost),
{peerhost, NPeerhost};
#{<<"clientid">> := ClientId} -> {clientid, ClientId};
#{<<"username">> := Username} -> {username, Username}
end,
emqx_banned:create(#banned{who = NWho, by = By, reason = Reason, at = At, until = Until})
end, Blacklist).
import_applications(Apps) ->
lists:foreach(fun(#{<<"id">> := AppID,
<<"secret">> := AppSecret,
<<"name">> := Name,
<<"desc">> := Desc,
<<"status">> := Status,
<<"expired">> := Expired}) ->
NExpired = case is_integer(Expired) of
true -> Expired;
false -> undefined
end,
emqx_mgmt_auth:force_add_app(AppID, Name, AppSecret, Desc, Status, NExpired)
end, Apps).
import_users(Users) ->
lists:foreach(fun(#{<<"username">> := Username,
<<"password">> := Password,
<<"tags">> := Tags}) ->
NPassword = base64:decode(Password),
emqx_dashboard_admin:force_add_user(Username, NPassword, Tags)
end, Users).
import_auth_clientid(Lists) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)})
end, Lists)
end.
import_auth_username(Lists) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"username">> := Username, <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)})
end, Lists)
end.
import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
lists:foreach(fun(#{<<"login">> := Login,
<<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt})
end, Auths)
end;
import_auth_mnesia(Auths, _) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"login">> := Login,
<<"type">> := Type,
<<"password">> := Password,
<<"created_at">> := CreatedAt }) ->
mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt})
end, Auths)
end.
import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
case ets:info(emqx_acl) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
lists:foreach(fun(#{<<"login">> := Login,
<<"topic">> := Topic,
<<"allow">> := Allow,
<<"action">> := Action}) ->
Allow1 = case any_to_atom(Allow) of
true -> allow;
false -> deny
end,
mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt})
end, Acls)
end;
import_acl_mnesia(Acls, _) ->
case ets:info(emqx_acl) of
undefined -> ok;
_ ->
lists:foreach(fun(Map = #{<<"action">> := Action,
<<"access">> := Access,
<<"created_at">> := CreatedAt}) ->
Filter = case maps:get(<<"type_value">>, Map, undefined) of
undefined ->
{any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)};
Value ->
{{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)}
end,
mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt})
end, Acls)
end.
-ifdef(EMQX_ENTERPRISE).
import_modules(Modules) ->
case ets:info(emqx_modules) of
undefined -> [];
_ ->
lists:foreach(fun(#{<<"id">> := Id,
<<"type">> := Type,
<<"config">> := Config,
<<"enabled">> := Enabled,
<<"created_at">> := CreatedAt,
<<"description">> := Description}) ->
emqx_modules:import_module({Id, any_to_atom(Type), Config, Enabled, CreatedAt, Description})
end, Modules)
end.
import_schemas(Schemas) ->
case ets:info(emqx_schema) of
undefined -> ok;
_ -> [emqx_schema_registry:add_schema(emqx_schema_api:make_schema_params(Schema)) || Schema <- Schemas]
end.
import_confs(Configs, ListenersState) ->
case ets:info(emqx_conf_info) of
undefined -> ok;
_ ->
emqx_conf:import_confs(Configs, ListenersState)
end.
-endif.
-ifdef(EMQX_ENTERPRISE).
export() ->
Modules = export_modules(),
Rules = export_rules(),
Resources = export_resources(),
Blacklist = export_blacklist(),
Apps = export_applications(),
Users = export_users(),
AuthMnesia = export_auth_mnesia(),
AclMnesia = export_acl_mnesia(),
Schemas = export_schemas(),
{Configs, State} = export_confs(),
Seconds = erlang:system_time(second),
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]),
Version = string:sub_string(emqx_sys:version(), 1, 3),
Data = [{version, erlang:list_to_binary(Version)},
{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))},
{modules, Modules},
{rules, Rules},
{resources, Resources},
{blacklist, Blacklist},
{apps, Apps},
{users, Users},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia},
{schemas, Schemas},
{configs, Configs},
{listeners_state, State}],
write_file(NFilename, Data).
import(Filename) ->
case file:read_file(FullFilename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
import_resources(maps:get(<<"resources">>, Data, [])),
import_rules(maps:get(<<"rules">>, Data, [])),
import_blacklist(maps:get(<<"blacklist">>, Data, [])),
import_applications(maps:get(<<"apps">>, Data, [])),
import_users(maps:get(<<"users">>, Data, [])),
import_modules(maps:get(<<"modules">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
import_auth_username(maps:get(<<"auth_username">>, Data, [])),
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
import_schemas(maps:get(<<"schemas">>, Data, [])),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
{error, Reason} ->
{error, Reason}
end.
-else.
export() ->
Rules = export_rules(),
Resources = export_resources(),
Blacklist = export_blacklist(),
Apps = export_applications(),
Users = export_users(),
AuthMnesia = export_auth_mnesia(),
AclMnesia = export_acl_mnesia(),
Seconds = erlang:system_time(second),
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]),
Version = string:sub_string(emqx_sys:version(), 1, 3),
Data = [{version, erlang:list_to_binary(Version)},
{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))},
{rules, Rules},
{resources, Resources},
{blacklist, Blacklist},
{apps, Apps},
{users, Users},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia}],
write_file(NFilename, Data).
import(Filename) ->
case file:read_file(FullFilename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version),
import_blacklist(maps:get(<<"blacklist">>, Data, [])),
import_applications(maps:get(<<"apps">>, Data, [])),
import_users(maps:get(<<"users">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
import_auth_username(maps:get(<<"auth_username">>, Data, [])),
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
{error, Reason} ->
{error, Reason}
end.
-endif.
write_file(Filename, Data) ->
ok = filelib:ensure_dir(Filename),
case file:write_file(Filename, emqx_json:encode(Data)) of
ok ->
case file:read_file_info(Filename) of
{ok, #file_info{size = Size, ctime = {{Y, M, D}, {H, MM, S}}}} ->
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]),
{ok, [{filename, list_to_binary(Filename)},
{size, Size},
{created_at, list_to_binary(CreatedAt)},
{node, node()}
]};
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.