From 28653bb457c8fb9ce108d04d9660a80890caf5f5 Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Thu, 25 Feb 2021 19:06:40 +0800 Subject: [PATCH] chore(emqx_management): format code --- .../src/emqx_mgmt_api_data.erl | 4 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 28 ++- .../src/emqx_mgmt_data_backup.erl | 212 +++++++----------- .../test/emqx_mgmt_api_SUITE.erl | 12 +- .../src/emqx_mod_api_topic_metrics.erl | 8 +- 5 files changed, 107 insertions(+), 157 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_data.erl b/apps/emqx_management/src/emqx_mgmt_api_data.erl index 18ba27265..5c19e95af 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_data.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_data.erl @@ -80,7 +80,7 @@ export(_Bindings, _Params) -> return({ok, File#{filename => filename:basename(Filename)}}); Return -> return(Return) end. - + list_exported(_Bindings, _Params) -> List = [ rpc:call(Node, ?MODULE, get_list_exported, []) || Node <- ekka_mnesia:running_nodes() ], NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, lists:append(List))), @@ -131,7 +131,7 @@ import(_Bindings, Params) -> do_import(Filename) -> FullFilename = filename:join([emqx:get_env(data_dir), Filename]), - emqx_mgmt_data_backup:import(FullFilename). + emqx_mgmt_data_backup:import(FullFilename). download(#{filename := Filename}, _Params) -> FullFilename = filename:join([emqx:get_env(data_dir), Filename]), diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 19cf83f99..fc18bc1bf 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -23,8 +23,6 @@ -define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~s~n", [Cmd, Descr])). --import(lists, [foreach/2]). - -export([load/0]). -export([ status/1 @@ -58,7 +56,7 @@ -spec(load() -> ok). load() -> Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], - foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). + lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). is_cmd(Fun) -> not lists:member(Fun, [init, load, module_info]). @@ -100,7 +98,7 @@ mgmt(["delete", AppId]) -> end; mgmt(["list"]) -> - foreach(fun({AppId, AppSecret, Name, Desc, Status, Expired}) -> + lists:foreach(fun({AppId, AppSecret, Name, Desc, Status, Expired}) -> emqx_ctl:print("app_id: ~s, secret: ~s, name: ~s, desc: ~s, status: ~s, expired: ~p~n", [AppId, AppSecret, Name, Desc, Status, Expired]) end, emqx_mgmt_auth:list_apps()); @@ -229,7 +227,7 @@ routes(_) -> {"routes show ", "Show a route"}]). subscriptions(["list"]) -> - foreach(fun(Suboption) -> + lists:foreach(fun(Suboption) -> print({emqx_suboption, Suboption}) end, ets:tab2list(emqx_suboption)); @@ -279,7 +277,7 @@ if_valid_qos(QoS, Fun) -> end. plugins(["list"]) -> - foreach(fun print/1, emqx_plugins:list()); + lists:foreach(fun print/1, emqx_plugins:list()); plugins(["load", Name]) -> case emqx_plugins:load(list_to_atom(Name)) of @@ -421,7 +419,7 @@ log(_) -> %% @doc Trace Command trace(["list"]) -> - foreach(fun({{Who, Name}, {Level, LogFile}}) -> + lists:foreach(fun({{Who, Name}, {Level, LogFile}}) -> emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Who, Name, Level, LogFile]) end, emqx_tracer:lookup_traces()); @@ -470,7 +468,7 @@ trace_off(Who, Name) -> %% @doc Listeners Command listeners([]) -> - foreach(fun({{Protocol, ListenOn}, _Pid}) -> + lists:foreach(fun({{Protocol, ListenOn}, _Pid}) -> Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}}, {acceptors, esockd:get_acceptors({Protocol, ListenOn})}, {max_conns, esockd:get_max_connections({Protocol, ListenOn})}, @@ -478,9 +476,9 @@ listeners([]) -> {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})} ], emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]), - foreach(fun indent_print/1, Info) + lists:foreach(fun indent_print/1, Info) end, esockd:listeners()), - foreach(fun({Protocol, Opts}) -> + lists:foreach(fun({Protocol, Opts}) -> Port = proplists:get_value(port, Opts), Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}}, {acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, @@ -488,7 +486,7 @@ listeners([]) -> {current_conn, proplists:get_value(all_connections, Opts)}, {shutdown_count, []}], emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]), - foreach(fun indent_print/1, Info) + lists:foreach(fun indent_print/1, Info) end, ranch:info()); listeners(["stop", Name = "http" ++ _N | _MaybePort]) -> @@ -548,16 +546,16 @@ data(["export"]) -> emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [Filename]); {error, Reason} -> emqx_ctl:print("The emqx data export failed due to ~p.~n", [Reason]) - end; + end; data(["import", Filename]) -> case emqx_mgmt_data_backup:import(Filename) of ok -> emqx_ctl:print("The emqx data has been imported successfully.~n"); - {error, import_failed} -> - emqx_ctl:print("The emqx data import failed. ~n"); + {error, import_failed} -> + emqx_ctl:print("The emqx data import failed.~n"); {error, unsupported_version} -> - emqx_ctl:print("The emqx data import failed: Unsupported version. ~n"); + emqx_ctl:print("The emqx data import failed: Unsupported version.~n"); {error, Reason} -> emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename]) end; diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 03703be91..ef8987603 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -51,7 +51,7 @@ , to_version/1 ]). --export([ export/0 +-export([ export/0 , import/1 ]). @@ -323,7 +323,7 @@ import_auth_clientid(Lists) -> case ets:info(emqx_user) of undefined -> ok; _ -> - lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password}) -> + lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password}) -> mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)}) end, Lists) end. @@ -333,7 +333,7 @@ import_auth_username(Lists) -> undefined -> ok; _ -> lists:foreach(fun(#{<<"username">> := Username, <<"password">> := Password}) -> - mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)}) + mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)}) end, Lists) end. @@ -344,8 +344,8 @@ import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse _ -> CreatedAt = erlang:system_time(millisecond), lists:foreach(fun(#{<<"login">> := Login, - <<"password">> := Password}) -> - mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt}) + <<"password">> := Password}) -> + mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt}) end, Auths) end; @@ -356,7 +356,7 @@ import_auth_mnesia(Auths, _) -> lists:foreach(fun(#{<<"login">> := Login, <<"type">> := Type, <<"password">> := Password, - <<"created_at">> := CreatedAt }) -> + <<"created_at">> := CreatedAt }) -> mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt}) end, Auths) end. @@ -375,7 +375,7 @@ import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse true -> allow; false -> deny end, - mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt}) + mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt}) end, Acls) end; @@ -385,14 +385,14 @@ import_acl_mnesia(Acls, _) -> _ -> lists:foreach(fun(Map = #{<<"action">> := Action, <<"access">> := Access, - <<"created_at">> := CreatedAt}) -> + <<"created_at">> := CreatedAt}) -> Filter = case maps:get(<<"type_value">>, Map, undefined) of undefined -> {any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)}; Value -> {{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)} end, - mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt}) + mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt}) end, Acls) end. @@ -447,135 +447,19 @@ to_version(Version) when is_binary(Version) -> to_version(Version) when is_list(Version) -> Version. --ifdef(EMQX_ENTERPRISE). export() -> - Modules = export_modules(), - Rules = export_rules(), - Resources = export_resources(), - Blacklist = export_blacklist(), - Apps = export_applications(), - Users = export_users(), - AuthMnesia = export_auth_mnesia(), - AclMnesia = export_acl_mnesia(), - Schemas = export_schemas(), - {Configs, State} = export_confs(), Seconds = erlang:system_time(second), + Data = do_export_data() ++ [{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}], {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), NFilename = filename:join([emqx:get_env(data_dir), Filename]), - Version = string:sub_string(emqx_sys:version(), 1, 3), - Data = [{version, erlang:list_to_binary(Version)}, - {date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}, - {modules, Modules}, - {rules, Rules}, - {resources, Resources}, - {blacklist, Blacklist}, - {apps, Apps}, - {users, Users}, - {auth_mnesia, AuthMnesia}, - {acl_mnesia, AclMnesia}, - {schemas, Schemas}, - {configs, Configs}, - {listeners_state, State}], - write_file(NFilename, Data). - -import(Filename) -> - case file:read_file(FullFilename) of - {ok, Json} -> - Data = emqx_json:decode(Json, [return_maps]), - Version = to_version(maps:get(<<"version">>, Data)), - case lists:member(Version, ?VERSIONS) of - true -> - try - import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])), - import_resources(maps:get(<<"resources">>, Data, [])), - import_rules(maps:get(<<"rules">>, Data, [])), - import_blacklist(maps:get(<<"blacklist">>, Data, [])), - import_applications(maps:get(<<"apps">>, Data, [])), - import_users(maps:get(<<"users">>, Data, [])), - import_modules(maps:get(<<"modules">>, Data, [])), - import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), - import_auth_username(maps:get(<<"auth_username">>, Data, [])), - import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), - import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), - import_schemas(maps:get(<<"schemas">>, Data, [])), - logger:debug("The emqx data has been imported successfully"), - ok - catch Class:Reason:Stack -> - logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]), - {error, import_failed} - end; - false -> - logger:error("Unsupported version: ~p", [Version]), - {error, unsupported_version} - end; - {error, Reason} -> - {error, Reason} - end. - --else. -export() -> - Rules = export_rules(), - Resources = export_resources(), - Blacklist = export_blacklist(), - Apps = export_applications(), - Users = export_users(), - AuthMnesia = export_auth_mnesia(), - AclMnesia = export_acl_mnesia(), - Seconds = erlang:system_time(second), - {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), - Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), - NFilename = filename:join([emqx:get_env(data_dir), Filename]), - Version = string:sub_string(emqx_sys:version(), 1, 3), - Data = [{version, erlang:list_to_binary(Version)}, - {date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}, - {rules, Rules}, - {resources, Resources}, - {blacklist, Blacklist}, - {apps, Apps}, - {users, Users}, - {auth_mnesia, AuthMnesia}, - {acl_mnesia, AclMnesia}], - write_file(NFilename, Data). - -import(Filename) -> - case file:read_file(Filename) of - {ok, Json} -> - Data = emqx_json:decode(Json, [return_maps]), - Version = to_version(maps:get(<<"version">>, Data)), - case lists:member(Version, ?VERSIONS) of - true -> - try - import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version), - import_blacklist(maps:get(<<"blacklist">>, Data, [])), - import_applications(maps:get(<<"apps">>, Data, [])), - import_users(maps:get(<<"users">>, Data, [])), - import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), - import_auth_username(maps:get(<<"auth_username">>, Data, [])), - import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), - import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), - logger:debug("The emqx data has been imported successfully"), - ok - catch Class:Reason:Stack -> - logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]), - {error, import_failed} - end; - false -> - logger:error("Unsupported version: ~p", [Version]), - {error, unsupported_version} - end; - Error -> Error - end. --endif. - -write_file(Filename, Data) -> - ok = filelib:ensure_dir(Filename), - case file:write_file(Filename, emqx_json:encode(Data)) of + ok = filelib:ensure_dir(NFilename), + case file:write_file(NFilename, emqx_json:encode(Data)) of ok -> - case file:read_file_info(Filename) of + case file:read_file_info(NFilename) of {ok, #file_info{size = Size, ctime = {{Y, M, D}, {H, MM, S}}}} -> CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]), - {ok, #{filename => list_to_binary(Filename), + {ok, #{filename => list_to_binary(NFilename), size => Size, created_at => list_to_binary(CreatedAt), node => node() @@ -584,3 +468,71 @@ write_file(Filename, Data) -> end; Error -> Error end. + +do_export_data() -> + Version = string:sub_string(emqx_sys:version(), 1, 3), + [{version, erlang:list_to_binary(Version)}, + {rules, export_rules()}, + {resources, export_resources()}, + {blacklist, export_blacklist()}, + {apps, export_applications()}, + {users, export_users()}, + {auth_mnesia, export_auth_mnesia()}, + {acl_mnesia, export_acl_mnesia()} + ] ++ do_export_extra_data(). + +-ifdef(EMQX_ENTERPRISE). +do_export_extra_data() -> + {Configs, State} = export_confs(), + [{modules, export_modules()}, + {schemas, export_schemas()}, + {configs, Configs}, + {listeners_state, State} + ]. +-else. +do_export_extra_data() -> []. +-endif. + +import(Filename) -> + case file:read_file(Filename) of + {ok, Json} -> + Data = emqx_json:decode(Json, [return_maps]), + Version = to_version(maps:get(<<"version">>, Data)), + case lists:member(Version, ?VERSIONS) of + true -> + try + do_import_data(Data, Version), + logger:debug("The emqx data has been imported successfully"), + ok + catch Class:Reason:Stack -> + logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]), + {error, import_failed} + end; + false -> + logger:error("Unsupported version: ~p", [Version]), + {error, unsupported_version} + end; + Error -> Error + end. + +do_import_data(Data, Version) -> + import_blacklist(maps:get(<<"blacklist">>, Data, [])), + import_applications(maps:get(<<"apps">>, Data, [])), + import_users(maps:get(<<"users">>, Data, [])), + import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])), + import_auth_username(maps:get(<<"auth_username">>, Data, [])), + import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version), + import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version), + do_import_extra_data(Data, Version). + +-ifdef(EMQX_ENTERPRISE). +do_import_extra_data(Data, Version) -> + import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])), + import_resources(maps:get(<<"resources">>, Data, [])), + import_rules(maps:get(<<"rules">>, Data, [])), + import_modules(maps:get(<<"modules">>, Data, [])), + import_schemas(maps:get(<<"schemas">>, Data, [])). +-else. +do_import_extra_data(Data, Version) -> + import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version). +-endif. diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index 401da3009..8e98567c4 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -101,7 +101,7 @@ get(Key, ResponseBody) -> lookup_alarm(Name, [#{<<"name">> := Name} | _More]) -> true; -lookup_alarm(Name, [_Alarm | More]) -> +lookup_alarm(Name, [_Alarm | More]) -> lookup_alarm(Name, More); lookup_alarm(_Name, []) -> false. @@ -119,7 +119,7 @@ alarms(_) -> ?assert(is_existing(alarm1, emqx_alarm:get_alarms(activated))), ?assert(is_existing(alarm2, emqx_alarm:get_alarms(activated))), - + {ok, Return1} = request_api(get, api_path(["alarms/activated"]), auth_header_()), ?assert(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))), ?assert(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))), @@ -230,7 +230,7 @@ clients(_) -> {ok, Clients2} = request_api(get, api_path(["nodes", atom_to_list(node()), "clients", binary_to_list(ClientId2)]) , auth_header_()), - ?assertEqual(<<"client2">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients2)))), + ?assertEqual(<<"client2">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients2)))), {ok, Clients3} = request_api(get, api_path(["clients", "username", binary_to_list(Username1)]), @@ -245,7 +245,7 @@ clients(_) -> {ok, Clients5} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()), ?assertEqual(2, maps:get(<<"count">>, get(<<"meta">>, Clients5))), - + meck:new(emqx_mgmt, [passthrough, no_history]), meck:expect(emqx_mgmt, kickout_client, 1, fun(_) -> {error, undefined} end), @@ -261,7 +261,7 @@ clients(_) -> ?assertEqual(?ERROR1, get(<<"code">>, MeckRet3)), meck:unload(emqx_mgmt), - + {ok, Ok} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()), ?assertEqual(?SUCCESS, get(<<"code">>, Ok)), @@ -436,7 +436,7 @@ pubsub(_) -> <<"topics">> => <<"">>, <<"qos">> => 1, <<"payload">> => <<"hello">>}), - ?assertEqual(?ERROR15, get(<<"code">>, BadTopic2)), + ?assertEqual(?ERROR15, get(<<"code">>, BadTopic2)), {ok, BadTopic3} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(), #{<<"clientid">> => ClientId, diff --git a/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl b/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl index 20416da7f..1588cf0c8 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl @@ -57,7 +57,7 @@ list(#{topic := Topic0}, _Params) -> execute_when_enabled(fun() -> Topic = emqx_mgmt_util:urldecode(Topic0), case safe_validate(Topic) of - true -> + true -> case get_topic_metrics(Topic) of {error, Reason} -> return({error, Reason}); Metrics -> return({ok, maps:from_list(Metrics)}) @@ -74,7 +74,7 @@ list(_Bindings, _Params) -> Metrics -> return({ok, Metrics}) end end). - + register(_Bindings, Params) -> execute_when_enabled(fun() -> case proplists:get_value(<<"topic">>, Params) of @@ -82,7 +82,7 @@ register(_Bindings, Params) -> return({error, missing_required_params}); Topic -> case safe_validate(Topic) of - true -> + true -> register_topic_metrics(Topic), return(ok); false -> @@ -101,7 +101,7 @@ unregister(#{topic := Topic0}, _Params) -> execute_when_enabled(fun() -> Topic = emqx_mgmt_util:urldecode(Topic0), case safe_validate(Topic) of - true -> + true -> unregister_topic_metrics(Topic), return(ok); false ->