chore(emqx_management): format code

This commit is contained in:
zhanghongtong 2021-02-25 19:06:40 +08:00 committed by turtleDeng
parent baa9fd8255
commit 28653bb457
5 changed files with 107 additions and 157 deletions

View File

@ -80,7 +80,7 @@ export(_Bindings, _Params) ->
return({ok, File#{filename => filename:basename(Filename)}}); return({ok, File#{filename => filename:basename(Filename)}});
Return -> return(Return) Return -> return(Return)
end. end.
list_exported(_Bindings, _Params) -> list_exported(_Bindings, _Params) ->
List = [ rpc:call(Node, ?MODULE, get_list_exported, []) || Node <- ekka_mnesia:running_nodes() ], List = [ rpc:call(Node, ?MODULE, get_list_exported, []) || Node <- ekka_mnesia:running_nodes() ],
NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, lists:append(List))), NList = lists:map(fun({_, FileInfo}) -> FileInfo end, lists:keysort(1, lists:append(List))),
@ -131,7 +131,7 @@ import(_Bindings, Params) ->
do_import(Filename) -> do_import(Filename) ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]), FullFilename = filename:join([emqx:get_env(data_dir), Filename]),
emqx_mgmt_data_backup:import(FullFilename). emqx_mgmt_data_backup:import(FullFilename).
download(#{filename := Filename}, _Params) -> download(#{filename := Filename}, _Params) ->
FullFilename = filename:join([emqx:get_env(data_dir), Filename]), FullFilename = filename:join([emqx:get_env(data_dir), Filename]),

View File

@ -23,8 +23,6 @@
-define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~s~n", [Cmd, Descr])). -define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~s~n", [Cmd, Descr])).
-import(lists, [foreach/2]).
-export([load/0]). -export([load/0]).
-export([ status/1 -export([ status/1
@ -58,7 +56,7 @@
-spec(load() -> ok). -spec(load() -> ok).
load() -> load() ->
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds).
is_cmd(Fun) -> is_cmd(Fun) ->
not lists:member(Fun, [init, load, module_info]). not lists:member(Fun, [init, load, module_info]).
@ -100,7 +98,7 @@ mgmt(["delete", AppId]) ->
end; end;
mgmt(["list"]) -> mgmt(["list"]) ->
foreach(fun({AppId, AppSecret, Name, Desc, Status, Expired}) -> lists:foreach(fun({AppId, AppSecret, Name, Desc, Status, Expired}) ->
emqx_ctl:print("app_id: ~s, secret: ~s, name: ~s, desc: ~s, status: ~s, expired: ~p~n", emqx_ctl:print("app_id: ~s, secret: ~s, name: ~s, desc: ~s, status: ~s, expired: ~p~n",
[AppId, AppSecret, Name, Desc, Status, Expired]) [AppId, AppSecret, Name, Desc, Status, Expired])
end, emqx_mgmt_auth:list_apps()); end, emqx_mgmt_auth:list_apps());
@ -229,7 +227,7 @@ routes(_) ->
{"routes show <Topic>", "Show a route"}]). {"routes show <Topic>", "Show a route"}]).
subscriptions(["list"]) -> subscriptions(["list"]) ->
foreach(fun(Suboption) -> lists:foreach(fun(Suboption) ->
print({emqx_suboption, Suboption}) print({emqx_suboption, Suboption})
end, ets:tab2list(emqx_suboption)); end, ets:tab2list(emqx_suboption));
@ -279,7 +277,7 @@ if_valid_qos(QoS, Fun) ->
end. end.
plugins(["list"]) -> plugins(["list"]) ->
foreach(fun print/1, emqx_plugins:list()); lists:foreach(fun print/1, emqx_plugins:list());
plugins(["load", Name]) -> plugins(["load", Name]) ->
case emqx_plugins:load(list_to_atom(Name)) of case emqx_plugins:load(list_to_atom(Name)) of
@ -421,7 +419,7 @@ log(_) ->
%% @doc Trace Command %% @doc Trace Command
trace(["list"]) -> trace(["list"]) ->
foreach(fun({{Who, Name}, {Level, LogFile}}) -> lists:foreach(fun({{Who, Name}, {Level, LogFile}}) ->
emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Who, Name, Level, LogFile]) emqx_ctl:print("Trace(~s=~s, level=~s, destination=~p)~n", [Who, Name, Level, LogFile])
end, emqx_tracer:lookup_traces()); end, emqx_tracer:lookup_traces());
@ -470,7 +468,7 @@ trace_off(Who, Name) ->
%% @doc Listeners Command %% @doc Listeners Command
listeners([]) -> listeners([]) ->
foreach(fun({{Protocol, ListenOn}, _Pid}) -> lists:foreach(fun({{Protocol, ListenOn}, _Pid}) ->
Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}}, Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}},
{acceptors, esockd:get_acceptors({Protocol, ListenOn})}, {acceptors, esockd:get_acceptors({Protocol, ListenOn})},
{max_conns, esockd:get_max_connections({Protocol, ListenOn})}, {max_conns, esockd:get_max_connections({Protocol, ListenOn})},
@ -478,9 +476,9 @@ listeners([]) ->
{shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})} {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}
], ],
emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]), emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]),
foreach(fun indent_print/1, Info) lists:foreach(fun indent_print/1, Info)
end, esockd:listeners()), end, esockd:listeners()),
foreach(fun({Protocol, Opts}) -> lists:foreach(fun({Protocol, Opts}) ->
Port = proplists:get_value(port, Opts), Port = proplists:get_value(port, Opts),
Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}}, Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}},
{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, {acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)},
@ -488,7 +486,7 @@ listeners([]) ->
{current_conn, proplists:get_value(all_connections, Opts)}, {current_conn, proplists:get_value(all_connections, Opts)},
{shutdown_count, []}], {shutdown_count, []}],
emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]), emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]),
foreach(fun indent_print/1, Info) lists:foreach(fun indent_print/1, Info)
end, ranch:info()); end, ranch:info());
listeners(["stop", Name = "http" ++ _N | _MaybePort]) -> listeners(["stop", Name = "http" ++ _N | _MaybePort]) ->
@ -548,16 +546,16 @@ data(["export"]) ->
emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [Filename]); emqx_ctl:print("The emqx data has been successfully exported to ~s.~n", [Filename]);
{error, Reason} -> {error, Reason} ->
emqx_ctl:print("The emqx data export failed due to ~p.~n", [Reason]) emqx_ctl:print("The emqx data export failed due to ~p.~n", [Reason])
end; end;
data(["import", Filename]) -> data(["import", Filename]) ->
case emqx_mgmt_data_backup:import(Filename) of case emqx_mgmt_data_backup:import(Filename) of
ok -> ok ->
emqx_ctl:print("The emqx data has been imported successfully.~n"); emqx_ctl:print("The emqx data has been imported successfully.~n");
{error, import_failed} -> {error, import_failed} ->
emqx_ctl:print("The emqx data import failed. ~n"); emqx_ctl:print("The emqx data import failed.~n");
{error, unsupported_version} -> {error, unsupported_version} ->
emqx_ctl:print("The emqx data import failed: Unsupported version. ~n"); emqx_ctl:print("The emqx data import failed: Unsupported version.~n");
{error, Reason} -> {error, Reason} ->
emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename]) emqx_ctl:print("The emqx data import failed: ~0p while reading ~s.~n", [Reason, Filename])
end; end;

View File

@ -51,7 +51,7 @@
, to_version/1 , to_version/1
]). ]).
-export([ export/0 -export([ export/0
, import/1 , import/1
]). ]).
@ -323,7 +323,7 @@ import_auth_clientid(Lists) ->
case ets:info(emqx_user) of case ets:info(emqx_user) of
undefined -> ok; undefined -> ok;
_ -> _ ->
lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password}) -> lists:foreach(fun(#{<<"clientid">> := Clientid, <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)}) mnesia:dirty_write({emqx_user, {clientid, Clientid}, base64:decode(Password), erlang:system_time(millisecond)})
end, Lists) end, Lists)
end. end.
@ -333,7 +333,7 @@ import_auth_username(Lists) ->
undefined -> ok; undefined -> ok;
_ -> _ ->
lists:foreach(fun(#{<<"username">> := Username, <<"password">> := Password}) -> lists:foreach(fun(#{<<"username">> := Username, <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)}) mnesia:dirty_write({emqx_user, {username, Username}, base64:decode(Password), erlang:system_time(millisecond)})
end, Lists) end, Lists)
end. end.
@ -344,8 +344,8 @@ import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse
_ -> _ ->
CreatedAt = erlang:system_time(millisecond), CreatedAt = erlang:system_time(millisecond),
lists:foreach(fun(#{<<"login">> := Login, lists:foreach(fun(#{<<"login">> := Login,
<<"password">> := Password}) -> <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt}) mnesia:dirty_write({emqx_user, {username, Login}, base64:decode(Password), CreatedAt})
end, Auths) end, Auths)
end; end;
@ -356,7 +356,7 @@ import_auth_mnesia(Auths, _) ->
lists:foreach(fun(#{<<"login">> := Login, lists:foreach(fun(#{<<"login">> := Login,
<<"type">> := Type, <<"type">> := Type,
<<"password">> := Password, <<"password">> := Password,
<<"created_at">> := CreatedAt }) -> <<"created_at">> := CreatedAt }) ->
mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt}) mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt})
end, Auths) end, Auths)
end. end.
@ -375,7 +375,7 @@ import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse
true -> allow; true -> allow;
false -> deny false -> deny
end, end,
mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt}) mnesia:dirty_write({emqx_acl, {{username, Login}, Topic}, any_to_atom(Action), Allow1, CreatedAt})
end, Acls) end, Acls)
end; end;
@ -385,14 +385,14 @@ import_acl_mnesia(Acls, _) ->
_ -> _ ->
lists:foreach(fun(Map = #{<<"action">> := Action, lists:foreach(fun(Map = #{<<"action">> := Action,
<<"access">> := Access, <<"access">> := Access,
<<"created_at">> := CreatedAt}) -> <<"created_at">> := CreatedAt}) ->
Filter = case maps:get(<<"type_value">>, Map, undefined) of Filter = case maps:get(<<"type_value">>, Map, undefined) of
undefined -> undefined ->
{any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)}; {any_to_atom(maps:get(<<"type">>, Map)), maps:get(<<"topic">>, Map)};
Value -> Value ->
{{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)} {{any_to_atom(maps:get(<<"type">>, Map)), Value}, maps:get(<<"topic">>, Map)}
end, end,
mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt}) mnesia:dirty_write({emqx_acl ,Filter, any_to_atom(Action), any_to_atom(Access), CreatedAt})
end, Acls) end, Acls)
end. end.
@ -447,135 +447,19 @@ to_version(Version) when is_binary(Version) ->
to_version(Version) when is_list(Version) -> to_version(Version) when is_list(Version) ->
Version. Version.
-ifdef(EMQX_ENTERPRISE).
export() -> export() ->
Modules = export_modules(),
Rules = export_rules(),
Resources = export_resources(),
Blacklist = export_blacklist(),
Apps = export_applications(),
Users = export_users(),
AuthMnesia = export_auth_mnesia(),
AclMnesia = export_acl_mnesia(),
Schemas = export_schemas(),
{Configs, State} = export_confs(),
Seconds = erlang:system_time(second), Seconds = erlang:system_time(second),
Data = do_export_data() ++ [{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))}],
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds), {{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]), Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]), NFilename = filename:join([emqx:get_env(data_dir), Filename]),
Version = string:sub_string(emqx_sys:version(), 1, 3), ok = filelib:ensure_dir(NFilename),
Data = [{version, erlang:list_to_binary(Version)}, case file:write_file(NFilename, emqx_json:encode(Data)) of
{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))},
{modules, Modules},
{rules, Rules},
{resources, Resources},
{blacklist, Blacklist},
{apps, Apps},
{users, Users},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia},
{schemas, Schemas},
{configs, Configs},
{listeners_state, State}],
write_file(NFilename, Data).
import(Filename) ->
case file:read_file(FullFilename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
import_resources(maps:get(<<"resources">>, Data, [])),
import_rules(maps:get(<<"rules">>, Data, [])),
import_blacklist(maps:get(<<"blacklist">>, Data, [])),
import_applications(maps:get(<<"apps">>, Data, [])),
import_users(maps:get(<<"users">>, Data, [])),
import_modules(maps:get(<<"modules">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
import_auth_username(maps:get(<<"auth_username">>, Data, [])),
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
import_schemas(maps:get(<<"schemas">>, Data, [])),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
{error, Reason} ->
{error, Reason}
end.
-else.
export() ->
Rules = export_rules(),
Resources = export_resources(),
Blacklist = export_blacklist(),
Apps = export_applications(),
Users = export_users(),
AuthMnesia = export_auth_mnesia(),
AclMnesia = export_acl_mnesia(),
Seconds = erlang:system_time(second),
{{Y, M, D}, {H, MM, S}} = emqx_mgmt_util:datetime(Seconds),
Filename = io_lib:format("emqx-export-~p-~p-~p-~p-~p-~p.json", [Y, M, D, H, MM, S]),
NFilename = filename:join([emqx:get_env(data_dir), Filename]),
Version = string:sub_string(emqx_sys:version(), 1, 3),
Data = [{version, erlang:list_to_binary(Version)},
{date, erlang:list_to_binary(emqx_mgmt_util:strftime(Seconds))},
{rules, Rules},
{resources, Resources},
{blacklist, Blacklist},
{apps, Apps},
{users, Users},
{auth_mnesia, AuthMnesia},
{acl_mnesia, AclMnesia}],
write_file(NFilename, Data).
import(Filename) ->
case file:read_file(Filename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version),
import_blacklist(maps:get(<<"blacklist">>, Data, [])),
import_applications(maps:get(<<"apps">>, Data, [])),
import_users(maps:get(<<"users">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
import_auth_username(maps:get(<<"auth_username">>, Data, [])),
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
Error -> Error
end.
-endif.
write_file(Filename, Data) ->
ok = filelib:ensure_dir(Filename),
case file:write_file(Filename, emqx_json:encode(Data)) of
ok -> ok ->
case file:read_file_info(Filename) of case file:read_file_info(NFilename) of
{ok, #file_info{size = Size, ctime = {{Y, M, D}, {H, MM, S}}}} -> {ok, #file_info{size = Size, ctime = {{Y, M, D}, {H, MM, S}}}} ->
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]), CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y, M, D, H, MM, S]),
{ok, #{filename => list_to_binary(Filename), {ok, #{filename => list_to_binary(NFilename),
size => Size, size => Size,
created_at => list_to_binary(CreatedAt), created_at => list_to_binary(CreatedAt),
node => node() node => node()
@ -584,3 +468,71 @@ write_file(Filename, Data) ->
end; end;
Error -> Error Error -> Error
end. end.
do_export_data() ->
Version = string:sub_string(emqx_sys:version(), 1, 3),
[{version, erlang:list_to_binary(Version)},
{rules, export_rules()},
{resources, export_resources()},
{blacklist, export_blacklist()},
{apps, export_applications()},
{users, export_users()},
{auth_mnesia, export_auth_mnesia()},
{acl_mnesia, export_acl_mnesia()}
] ++ do_export_extra_data().
-ifdef(EMQX_ENTERPRISE).
do_export_extra_data() ->
{Configs, State} = export_confs(),
[{modules, export_modules()},
{schemas, export_schemas()},
{configs, Configs},
{listeners_state, State}
].
-else.
do_export_extra_data() -> [].
-endif.
import(Filename) ->
case file:read_file(Filename) of
{ok, Json} ->
Data = emqx_json:decode(Json, [return_maps]),
Version = to_version(maps:get(<<"version">>, Data)),
case lists:member(Version, ?VERSIONS) of
true ->
try
do_import_data(Data, Version),
logger:debug("The emqx data has been imported successfully"),
ok
catch Class:Reason:Stack ->
logger:error("The emqx data import failed: ~0p", [{Class,Reason,Stack}]),
{error, import_failed}
end;
false ->
logger:error("Unsupported version: ~p", [Version]),
{error, unsupported_version}
end;
Error -> Error
end.
do_import_data(Data, Version) ->
import_blacklist(maps:get(<<"blacklist">>, Data, [])),
import_applications(maps:get(<<"apps">>, Data, [])),
import_users(maps:get(<<"users">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
import_auth_username(maps:get(<<"auth_username">>, Data, [])),
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version),
do_import_extra_data(Data, Version).
-ifdef(EMQX_ENTERPRISE).
do_import_extra_data(Data, Version) ->
import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
import_resources(maps:get(<<"resources">>, Data, [])),
import_rules(maps:get(<<"rules">>, Data, [])),
import_modules(maps:get(<<"modules">>, Data, [])),
import_schemas(maps:get(<<"schemas">>, Data, [])).
-else.
do_import_extra_data(Data, Version) ->
import_resources_and_rules(maps:get(<<"resources">>, Data, []), maps:get(<<"rules">>, Data, []), Version).
-endif.

View File

@ -101,7 +101,7 @@ get(Key, ResponseBody) ->
lookup_alarm(Name, [#{<<"name">> := Name} | _More]) -> lookup_alarm(Name, [#{<<"name">> := Name} | _More]) ->
true; true;
lookup_alarm(Name, [_Alarm | More]) -> lookup_alarm(Name, [_Alarm | More]) ->
lookup_alarm(Name, More); lookup_alarm(Name, More);
lookup_alarm(_Name, []) -> lookup_alarm(_Name, []) ->
false. false.
@ -119,7 +119,7 @@ alarms(_) ->
?assert(is_existing(alarm1, emqx_alarm:get_alarms(activated))), ?assert(is_existing(alarm1, emqx_alarm:get_alarms(activated))),
?assert(is_existing(alarm2, emqx_alarm:get_alarms(activated))), ?assert(is_existing(alarm2, emqx_alarm:get_alarms(activated))),
{ok, Return1} = request_api(get, api_path(["alarms/activated"]), auth_header_()), {ok, Return1} = request_api(get, api_path(["alarms/activated"]), auth_header_()),
?assert(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))), ?assert(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))),
?assert(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))), ?assert(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))),
@ -230,7 +230,7 @@ clients(_) ->
{ok, Clients2} = request_api(get, api_path(["nodes", atom_to_list(node()), {ok, Clients2} = request_api(get, api_path(["nodes", atom_to_list(node()),
"clients", binary_to_list(ClientId2)]) "clients", binary_to_list(ClientId2)])
, auth_header_()), , auth_header_()),
?assertEqual(<<"client2">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients2)))), ?assertEqual(<<"client2">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients2)))),
{ok, Clients3} = request_api(get, api_path(["clients", {ok, Clients3} = request_api(get, api_path(["clients",
"username", binary_to_list(Username1)]), "username", binary_to_list(Username1)]),
@ -245,7 +245,7 @@ clients(_) ->
{ok, Clients5} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()), {ok, Clients5} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()),
?assertEqual(2, maps:get(<<"count">>, get(<<"meta">>, Clients5))), ?assertEqual(2, maps:get(<<"count">>, get(<<"meta">>, Clients5))),
meck:new(emqx_mgmt, [passthrough, no_history]), meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, kickout_client, 1, fun(_) -> {error, undefined} end), meck:expect(emqx_mgmt, kickout_client, 1, fun(_) -> {error, undefined} end),
@ -261,7 +261,7 @@ clients(_) ->
?assertEqual(?ERROR1, get(<<"code">>, MeckRet3)), ?assertEqual(?ERROR1, get(<<"code">>, MeckRet3)),
meck:unload(emqx_mgmt), meck:unload(emqx_mgmt),
{ok, Ok} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()), {ok, Ok} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
?assertEqual(?SUCCESS, get(<<"code">>, Ok)), ?assertEqual(?SUCCESS, get(<<"code">>, Ok)),
@ -436,7 +436,7 @@ pubsub(_) ->
<<"topics">> => <<"">>, <<"topics">> => <<"">>,
<<"qos">> => 1, <<"qos">> => 1,
<<"payload">> => <<"hello">>}), <<"payload">> => <<"hello">>}),
?assertEqual(?ERROR15, get(<<"code">>, BadTopic2)), ?assertEqual(?ERROR15, get(<<"code">>, BadTopic2)),
{ok, BadTopic3} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(), {ok, BadTopic3} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(),
#{<<"clientid">> => ClientId, #{<<"clientid">> => ClientId,

View File

@ -57,7 +57,7 @@ list(#{topic := Topic0}, _Params) ->
execute_when_enabled(fun() -> execute_when_enabled(fun() ->
Topic = emqx_mgmt_util:urldecode(Topic0), Topic = emqx_mgmt_util:urldecode(Topic0),
case safe_validate(Topic) of case safe_validate(Topic) of
true -> true ->
case get_topic_metrics(Topic) of case get_topic_metrics(Topic) of
{error, Reason} -> return({error, Reason}); {error, Reason} -> return({error, Reason});
Metrics -> return({ok, maps:from_list(Metrics)}) Metrics -> return({ok, maps:from_list(Metrics)})
@ -74,7 +74,7 @@ list(_Bindings, _Params) ->
Metrics -> return({ok, Metrics}) Metrics -> return({ok, Metrics})
end end
end). end).
register(_Bindings, Params) -> register(_Bindings, Params) ->
execute_when_enabled(fun() -> execute_when_enabled(fun() ->
case proplists:get_value(<<"topic">>, Params) of case proplists:get_value(<<"topic">>, Params) of
@ -82,7 +82,7 @@ register(_Bindings, Params) ->
return({error, missing_required_params}); return({error, missing_required_params});
Topic -> Topic ->
case safe_validate(Topic) of case safe_validate(Topic) of
true -> true ->
register_topic_metrics(Topic), register_topic_metrics(Topic),
return(ok); return(ok);
false -> false ->
@ -101,7 +101,7 @@ unregister(#{topic := Topic0}, _Params) ->
execute_when_enabled(fun() -> execute_when_enabled(fun() ->
Topic = emqx_mgmt_util:urldecode(Topic0), Topic = emqx_mgmt_util:urldecode(Topic0),
case safe_validate(Topic) of case safe_validate(Topic) of
true -> true ->
unregister_topic_metrics(Topic), unregister_topic_metrics(Topic),
return(ok); return(ok);
false -> false ->