diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 7d7e4713a..53a71736a 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -146,19 +146,36 @@ to_timestamp(Rfc3339) when is_binary(Rfc3339) -> to_timestamp(Rfc3339) -> calendar:rfc3339_to_system_time(Rfc3339, [{unit, second}]). --spec(create(emqx_types:banned() | map()) -> ok). +-spec(create(emqx_types:banned() | map()) -> + {ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}). create(#{who := Who, by := By, reason := Reason, at := At, until := Until}) -> - mria:dirty_write(?BANNED_TAB, #banned{who = Who, - by = By, - reason = Reason, - at = At, - until = Until}); -create(Banned) when is_record(Banned, banned) -> - mria:dirty_write(?BANNED_TAB, Banned). + Banned = #banned{ + who = Who, + by = By, + reason = Reason, + at = At, + until = Until + }, + create(Banned); + +create(Banned = #banned{who = Who}) -> + case look_up(Who) of + [] -> + mria:dirty_write(?BANNED_TAB, Banned), + {ok, Banned}; + [OldBanned = #banned{until = Until}] -> + case Until < erlang:system_time(second) of + true -> + {error, {already_exist, OldBanned}}; + false -> + mria:dirty_write(?BANNED_TAB, Banned), + {ok, Banned} + end + end. look_up(Who) when is_map(Who) -> look_up(pares_who(Who)); diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 0b4611c4c..600144adc 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -129,7 +129,8 @@ handle_cast({detected, #flapping{clientid = ClientId, reason = <<"flapping is detected">>, at = Now, until = Now + (Interval div 1000)}, - emqx_banned:create(Banned); + {ok, _} = emqx_banned:create(Banned), + ok; false -> ?SLOG(warning, #{ msg => "client_disconnected", diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 0581937b1..c6ef92b95 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -17,6 +17,8 @@ %% @doc Start/Stop MQTT listeners. -module(emqx_listeners). +-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 10000}}]). + -include("emqx_mqtt.hrl"). -include("logger.hrl"). @@ -28,6 +30,7 @@ , is_running/1 , current_conns/2 , max_conns/2 + , id_example/0 ]). -export([ start_listener/1 @@ -48,6 +51,18 @@ -define(CONF_KEY_PATH, [listeners]). -define(TYPES_STRING, ["tcp","ssl","ws","wss","quic"]). +-spec(id_example() -> atom()). +id_example() -> + id_example(list()). + +id_example([]) -> + {ID, _} = hd(list()), + ID; +id_example([{'tcp:default', _} | _]) -> + 'tcp:default'; +id_example([_ | Listeners]) -> + id_example(Listeners). + %% @doc List configured listeners. -spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]). list() -> @@ -235,10 +250,10 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> , {key, maps:get(keyfile, Opts)} , {alpn, ["mqtt"]} , {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])} - , {idle_timeout_ms, lists:max([ - emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3 - , timer:seconds(maps:get(idle_timeout, Opts))] - )} + , {idle_timeout_ms, + lists:max([ + emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3, + timer:seconds(maps:get(idle_timeout, Opts))])} ], ConnectionOpts = #{ conn_callback => emqx_quic_connection , peer_unidi_stream_count => 1 @@ -281,7 +296,8 @@ flatten_listeners(Conf0) -> || {Type, Conf} <- maps:to_list(Conf0)])). do_flatten_listeners(Type, Conf0) -> - [{listener_id(Type, Name), maps:remove(authentication, Conf)} || {Name, Conf} <- maps:to_list(Conf0)]. + [{listener_id(Type, Name), maps:remove(authentication, Conf)} || + {Name, Conf} <- maps:to_list(Conf0)]. esockd_opts(Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), diff --git a/apps/emqx/test/emqx_banned_SUITE.erl b/apps/emqx/test/emqx_banned_SUITE.erl index de117ab00..e09d0baae 100644 --- a/apps/emqx/test/emqx_banned_SUITE.erl +++ b/apps/emqx/test/emqx_banned_SUITE.erl @@ -41,16 +41,16 @@ t_add_delete(_) -> at = erlang:system_time(second), until = erlang:system_time(second) + 1000 }, - ok = emqx_banned:create(Banned), + {ok, _} = emqx_banned:create(Banned), ?assertEqual(1, emqx_banned:info(size)), ok = emqx_banned:delete({clientid, <<"TestClient">>}), ?assertEqual(0, emqx_banned:info(size)). t_check(_) -> - ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}}), - ok = emqx_banned:create(#banned{who = {username, <<"BannedUser">>}}), - ok = emqx_banned:create(#banned{who = {peerhost, {192,168,0,1}}}), + {ok, _} = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}}), + {ok, _} = emqx_banned:create(#banned{who = {username, <<"BannedUser">>}}), + {ok, _} = emqx_banned:create(#banned{who = {peerhost, {192,168,0,1}}}), ?assertEqual(3, emqx_banned:info(size)), ClientInfo1 = #{clientid => <<"BannedClient">>, username => <<"user">>, @@ -83,7 +83,7 @@ t_check(_) -> t_unused(_) -> {ok, Banned} = emqx_banned:start_link(), - ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}, + {ok, _} = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}, until = erlang:system_time(second)}), ?assertEqual(ignored, gen_server:call(Banned, unexpected_req)), ?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)), diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 3b6ef495d..af59343e7 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -533,8 +533,9 @@ t_publish_while_client_is_gone(Config) -> {clean_start, false} | Config]), {ok, _} = emqtt:ConnFun(Client2), - [Msg1] = receive_messages(1), - [Msg2] = receive_messages(1), + Msgs = receive_messages(2), + ?assertEqual(length(Msgs), 2), + [Msg2, Msg1] = Msgs, ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)), ?assertEqual({ok, 2}, maps:find(qos, Msg1)), ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)), diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index 98159f5b7..bef27f99d 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -30,6 +30,9 @@ % Swagger +-define(API_TAGS_GLOBAL, [<<"authentication">>, <<"authentication config(global)">>]). +-define(API_TAGS_SINGLE, [<<"authentication">>, <<"authentication config(single listener)">>]). + -export([ api_spec/0 , paths/0 , schema/1 @@ -126,7 +129,7 @@ schema("/authentication") -> #{ 'operationId' => authenticators, get => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"List authenticators for global authentication">>, responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( @@ -135,7 +138,7 @@ schema("/authentication") -> } }, post => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Create authenticator for global authentication">>, 'requestBody' => emqx_dashboard_swagger:schema_with_examples( emqx_authn_schema:authenticator_type(), @@ -154,9 +157,9 @@ schema("/authentication/:id") -> #{ 'operationId' => authenticator, get => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Get authenticator from global authentication chain">>, - parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}], + parameters => [param_auth_id()], responses => #{ 200 => emqx_dashboard_swagger:schema_with_examples( emqx_authn_schema:authenticator_type(), @@ -165,9 +168,9 @@ schema("/authentication/:id") -> } }, put => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Update authenticator from global authentication chain">>, - parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}], + parameters => [param_auth_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( emqx_authn_schema:authenticator_type(), authenticator_examples() @@ -182,9 +185,9 @@ schema("/authentication/:id") -> } }, delete => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Delete authenticator from global authentication chain">>, - parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}], + parameters => [param_auth_id()], responses => #{ 204 => <<"Authenticator deleted">>, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) @@ -196,9 +199,9 @@ schema("/listeners/:listener_id/authentication") -> #{ 'operationId' => listener_authenticators, get => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"List authenticators for listener authentication">>, - parameters => [{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}], + parameters => [param_listener_id()], responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( hoconsc:array(emqx_authn_schema:authenticator_type()), @@ -206,9 +209,9 @@ schema("/listeners/:listener_id/authentication") -> } }, post => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Create authenticator for listener authentication">>, - parameters => [{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}], + parameters => [param_listener_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( emqx_authn_schema:authenticator_type(), authenticator_examples() @@ -227,12 +230,9 @@ schema("/listeners/:listener_id/authentication/:id") -> #{ 'operationId' => listener_authenticator, get => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Get authenticator from listener authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})} - ], + parameters => [param_listener_id(), param_auth_id()], responses => #{ 200 => emqx_dashboard_swagger:schema_with_examples( emqx_authn_schema:authenticator_type(), @@ -241,12 +241,9 @@ schema("/listeners/:listener_id/authentication/:id") -> } }, put => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Update authenticator from listener authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})} - ], + parameters => [param_listener_id(), param_auth_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( emqx_authn_schema:authenticator_type(), authenticator_examples()), @@ -260,12 +257,9 @@ schema("/listeners/:listener_id/authentication/:id") -> } }, delete => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Delete authenticator from listener authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})} - ], + parameters => [param_listener_id(), param_auth_id()], responses => #{ 204 => <<"Authenticator deleted">>, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) @@ -278,9 +272,9 @@ schema("/authentication/:id/move") -> #{ 'operationId' => authenticator_move, post => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Move authenticator in global authentication chain">>, - parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}], + parameters => [param_auth_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(request_move), request_move_examples()), @@ -296,12 +290,9 @@ schema("/listeners/:listener_id/authentication/:id/move") -> #{ 'operationId' => listener_authenticator_move, post => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Move authenticator in listener authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})} - ], + parameters => [param_listener_id(), param_auth_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(request_move), request_move_examples()), @@ -317,9 +308,9 @@ schema("/authentication/:id/import_users") -> #{ 'operationId' => authenticator_import_users, post => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Import users into authenticator in global authentication chain">>, - parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}], + parameters => [param_auth_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(request_import_users), request_import_users_examples()), @@ -335,12 +326,9 @@ schema("/listeners/:listener_id/authentication/:id/import_users") -> #{ 'operationId' => listener_authenticator_import_users, post => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Import users into authenticator in listener authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})} - ], + parameters => [param_listener_id(), param_auth_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(request_import_users), request_import_users_examples()), @@ -356,9 +344,9 @@ schema("/authentication/:id/users") -> #{ 'operationId' => authenticator_users, post => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Create users for authenticator in global authentication chain">>, - parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}], + parameters => [param_auth_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(request_user_create), request_user_create_examples()), @@ -371,10 +359,10 @@ schema("/authentication/:id/users") -> } }, get => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"List users in authenticator in global authentication chain">>, parameters => [ - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}, + param_auth_id(), {page, mk(integer(), #{in => query, desc => <<"Page Index">>, nullable => true})}, {limit, mk(integer(), #{in => query, desc => <<"Page Limit">>, nullable => true})} ], @@ -392,12 +380,9 @@ schema("/listeners/:listener_id/authentication/:id/users") -> #{ 'operationId' => listener_authenticator_users, post => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Create users for authenticator in global authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})} - ], + parameters => [param_auth_id(), param_listener_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(request_user_create), request_user_create_examples()), @@ -410,11 +395,10 @@ schema("/listeners/:listener_id/authentication/:id/users") -> } }, get => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"List users in authenticator in listener authentication chain">>, parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}, + param_listener_id(), param_auth_id(), {page, mk(integer(), #{in => query, desc => <<"Page Index">>, nullable => true})}, {limit, mk(integer(), #{in => query, desc => <<"Page Limit">>, nullable => true})} ], @@ -432,12 +416,9 @@ schema("/authentication/:id/users/:user_id") -> #{ 'operationId' => authenticator_user, get => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Get user from authenticator in global authentication chain">>, - parameters => [ - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}, - {user_id, mk(binary(), #{in => path, desc => <<"User ID">>})} - ], + parameters => [param_auth_id(), param_user_id()], responses => #{ 200 => emqx_dashboard_swagger:schema_with_examples( ref(response_user), @@ -446,12 +427,9 @@ schema("/authentication/:id/users/:user_id") -> } }, put => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Update user in authenticator in global authentication chain">>, - parameters => [ - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}, - {user_id, mk(binary(), #{in => path, desc => <<"User ID">>})} - ], + parameters => [param_auth_id(), param_user_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_examples( ref(request_user_update), request_user_update_examples()), @@ -464,12 +442,9 @@ schema("/authentication/:id/users/:user_id") -> } }, delete => #{ - tags => [<<"authentication">>, <<"global">>], + tags => ?API_TAGS_GLOBAL, description => <<"Update user in authenticator in global authentication chain">>, - parameters => [ - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}, - {user_id, mk(binary(), #{in => path, desc => <<"User ID">>})} - ], + parameters => [param_auth_id(), param_user_id()], responses => #{ 204 => <<"User deleted">>, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) @@ -481,13 +456,9 @@ schema("/listeners/:listener_id/authentication/:id/users/:user_id") -> #{ 'operationId' => listener_authenticator_user, get => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Get user from authenticator in listener authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}, - {user_id, mk(binary(), #{in => path, desc => <<"User ID">>})} - ], + parameters => [param_listener_id(), param_auth_id(), param_user_id()], responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( ref(response_user), @@ -496,13 +467,9 @@ schema("/listeners/:listener_id/authentication/:id/users/:user_id") -> } }, put => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Update user in authenticator in listener authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}, - {user_id, mk(binary(), #{in => path, desc => <<"User ID">>})} - ], + parameters => [param_listener_id(), param_auth_id(), param_user_id()], 'requestBody' => emqx_dashboard_swagger:schema_with_example( ref(request_user_update), request_user_update_examples()), @@ -516,13 +483,9 @@ schema("/listeners/:listener_id/authentication/:id/users/:user_id") -> }, delete => #{ - tags => [<<"authentication">>, <<"listener">>], + tags => ?API_TAGS_SINGLE, description => <<"Update user in authenticator in listener authentication chain">>, - parameters => [ - {listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}, - {id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}, - {user_id, mk(binary(), #{in => path, desc => <<"User ID">>})} - ], + parameters => [param_listener_id(), param_auth_id(), param_user_id()], responses => #{ 204 => <<"User deleted">>, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) @@ -530,6 +493,34 @@ schema("/listeners/:listener_id/authentication/:id/users/:user_id") -> } }. +param_auth_id() -> + { + id, + mk(binary(), #{ + in => path, + desc => <<"Authenticator ID">> + }) + }. + +param_listener_id() -> + { + listener_id, + mk(binary(), #{ + in => path, + desc => <<"Listener ID">>, + example => emqx_listeners:id_example() + }) + }. + +param_user_id() -> + { + user_id, + mk(binary(), #{ + in => path, + desc => <<"User ID">> + }) + }. + authenticators(post, #{body := Config}) -> create_authenticator([authentication], ?GLOBAL, Config); diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index c032093f7..0f3ecd49b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -18,6 +18,8 @@ -include_lib("stdlib/include/qlc.hrl"). +-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]). + -define(FRESH_SELECT, fresh_select). -export([ paginate/3 @@ -35,23 +37,14 @@ paginate(Tables, Params, {Module, FormatFun}) -> Qh = query_handle(Tables), Count = count(Tables), - Page = b2i(page(Params)), - Limit = b2i(limit(Params)), - Cursor = qlc:cursor(Qh), - case Page > 1 of - true -> - _ = qlc:next_answers(Cursor, (Page - 1) * Limit), - ok; - false -> ok - end, - Rows = qlc:next_answers(Cursor, Limit), - qlc:delete_cursor(Cursor), - #{meta => #{page => Page, limit => Limit, count => Count}, - data => [Module:FormatFun(Row) || Row <- Rows]}. + do_paginate(Qh, Count, Params, {Module, FormatFun}). paginate(Tables, MatchSpec, Params, {Module, FormatFun}) -> Qh = query_handle(Tables, MatchSpec), Count = count(Tables, MatchSpec), + do_paginate(Qh, Count, Params, {Module, FormatFun}). + +do_paginate(Qh, Count, Params, {Module, FormatFun}) -> Page = b2i(page(Params)), Limit = b2i(limit(Params)), Cursor = qlc:cursor(Qh), @@ -64,7 +57,7 @@ paginate(Tables, MatchSpec, Params, {Module, FormatFun}) -> Rows = qlc:next_answers(Cursor, Limit), qlc:delete_cursor(Cursor), #{meta => #{page => Page, limit => Limit, count => Count}, - data => [Module:FormatFun(Row) || Row <- Rows]}. + data => [erlang:apply(Module, FormatFun, [Row]) || Row <- Rows]}. query_handle(Table) when is_atom(Table) -> qlc:q([R || R <- ets:table(Table)]); @@ -95,9 +88,7 @@ count(Table, MatchSpec) when is_atom(Table) -> NMatchSpec = [{MatchPattern, Where, [true]}], ets:select_count(Table, NMatchSpec); count([Table], MatchSpec) when is_atom(Table) -> - [{MatchPattern, Where, _Re}] = MatchSpec, - NMatchSpec = [{MatchPattern, Where, [true]}], - ets:select_count(Table, NMatchSpec); + count(Table, MatchSpec); count(Tables, MatchSpec) -> lists:sum([count(T, MatchSpec) || T <- Tables]). @@ -111,16 +102,23 @@ limit(Params) when is_map(Params) -> limit(Params) -> proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()). +init_meta(Params) -> + Limit = b2i(limit(Params)), + Page = b2i(page(Params)), + #{ + page => Page, + limit => Limit, + count => 0 + }. + %%-------------------------------------------------------------------- %% Node Query %%-------------------------------------------------------------------- node_query(Node, Params, Tab, QsSchema, QueryFun) -> {_CodCnt, Qs} = params2qs(Params, QsSchema), - Limit = b2i(limit(Params)), - Page = b2i(page(Params)), - Meta = #{page => Page, limit => Limit, count => 0}, - page_limit_check_query(Meta, {fun do_node_query/5, [Node, Tab, Qs, QueryFun, Meta]}). + page_limit_check_query(init_meta(Params), + {fun do_node_query/5, [Node, Tab, Qs, QueryFun, init_meta(Params)]}). %% @private do_node_query(Node, Tab, Qs, QueryFun, Meta) -> @@ -129,34 +127,15 @@ do_node_query(Node, Tab, Qs, QueryFun, Meta) -> do_node_query( Node, Tab, Qs, QueryFun, Continuation , Meta = #{limit := Limit} , Results) -> - {Len, Rows, NContinuation} = do_query(Node, Tab, Qs, QueryFun, Continuation, Limit), - case judge_page_with_counting(Len, Meta) of - {more, NMeta} -> - case NContinuation of - ?FRESH_SELECT -> - #{meta => NMeta, data => []}; %% page and limit too big - _ -> - do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, []) - end; - {cutrows, NMeta} -> - {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), - ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), - NResults = lists:sublist( lists:append(Results, ThisRows) - , SubStart, Limit), - case NContinuation of - ?FRESH_SELECT -> - #{meta => NMeta, data => NResults}; - _ -> - do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults) - end; - {enough, NMeta} -> - NResults = lists:sublist(lists:append(Results, Rows), 1, Limit), - case NContinuation of - ?FRESH_SELECT -> - #{meta => NMeta, data => NResults}; - _ -> - do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults) - end + case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of + {error, {badrpc, R}} -> + {error, Node, {badrpc, R}}; + {Len, Rows, ?FRESH_SELECT} -> + {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), + #{meta => NMeta, data => NResults}; + {Len, Rows, NContinuation} -> + {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), + do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults) end. %%-------------------------------------------------------------------- @@ -165,11 +144,9 @@ do_node_query( Node, Tab, Qs, QueryFun, Continuation cluster_query(Params, Tab, QsSchema, QueryFun) -> {_CodCnt, Qs} = params2qs(Params, QsSchema), - Limit = b2i(limit(Params)), - Page = b2i(page(Params)), Nodes = mria_mnesia:running_nodes(), - Meta = #{page => Page, limit => Limit, count => 0}, - page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}). + page_limit_check_query(init_meta(Params), + {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, init_meta(Params)]}). %% @private do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) -> @@ -177,37 +154,17 @@ do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) -> do_cluster_query([], _Tab, _Qs, _QueryFun, _Continuation, Meta, Results) -> #{meta => Meta, data => Results}; -do_cluster_query( [Node | Nodes], Tab, Qs, QueryFun, Continuation - , Meta = #{limit := Limit} - , Results) -> - {Len, Rows, NContinuation} = do_query(Node, Tab, Qs, QueryFun, Continuation, Limit), - case judge_page_with_counting(Len, Meta) of - {more, NMeta} -> - case NContinuation of - ?FRESH_SELECT -> - do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, []); %% next node with parts of results - _ -> - do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, []) %% continue this node - end; - {cutrows, NMeta} -> - {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), - ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), - NResults = lists:sublist( lists:append(Results, ThisRows) - , SubStart, Limit), - case NContinuation of - ?FRESH_SELECT -> - do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults); %% next node with parts of results - _ -> - do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, NResults) %% continue this node - end; - {enough, NMeta} -> - NResults = lists:sublist(lists:append(Results, Rows), 1, Limit), - case NContinuation of - ?FRESH_SELECT -> - do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults); %% next node with parts of results - _ -> - do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, NResults) %% continue this node - end +do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation, + Meta = #{limit := Limit}, Results) -> + case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of + {error, {badrpc, R}} -> + {error, Node, {bar_rpc, R}}; + {Len, Rows, ?FRESH_SELECT} -> + {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), + do_cluster_query(Tail, Tab, Qs, QueryFun, ?FRESH_SELECT, NMeta, NResults); + {Len, Rows, NContinuation} -> + {NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta), + do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults) end. %%-------------------------------------------------------------------- @@ -216,11 +173,26 @@ do_cluster_query( [Node | Nodes], Tab, Qs, QueryFun, Continuation %% @private do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() -> - M:F(Tab, Qs, Continuation, Limit); + erlang:apply(M, F, [Tab, Qs, Continuation, Limit]); do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) -> rpc_call(Node, ?MODULE, do_query, [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000). +sub_query_result(Len, Rows, Limit, Results, Meta) -> + {Flag, NMeta} = judge_page_with_counting(Len, Meta), + NResults = + case Flag of + more -> + []; + cutrows -> + {SubStart, NeedNowNum} = rows_sub_params(Len, NMeta), + ThisRows = lists:sublist(Rows, SubStart, NeedNowNum), + lists:sublist(lists:append(Results, ThisRows), SubStart, Limit); + enough -> + lists:sublist(lists:append(Results, Rows), 1, Limit) + end, + {NMeta, NResults}. + %% @private rpc_call(Node, M, F, A, T) -> case rpc:call(Node, M, F, A, T) of @@ -294,16 +266,20 @@ pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) -> end, case lists:keytake(OpposeKey, 1, Params) of false -> - pick_params_to_qs(Params, QsSchema, [qs(Key, Value, Type) | Acc1], Acc2); + pick_params_to_qs(Params, QsSchema, + [qs(Key, Value, Type) | Acc1], Acc2); {value, {K2, V2}, NParams} -> - pick_params_to_qs(NParams, QsSchema, [qs(Key, Value, K2, V2, Type) | Acc1], Acc2) + pick_params_to_qs(NParams, QsSchema, + [qs(Key, Value, K2, V2, Type) | Acc1], Acc2) end; _ -> case is_fuzzy_key(Key) of true -> - pick_params_to_qs(Params, QsSchema, Acc1, [qs(Key, Value, Type) | Acc2]); + pick_params_to_qs(Params, QsSchema, Acc1, + [qs(Key, Value, Type) | Acc2]); _ -> - pick_params_to_qs(Params, QsSchema, [qs(Key, Value, Type) | Acc1], Acc2) + pick_params_to_qs(Params, QsSchema, + [qs(Key, Value, Type) | Acc1], Acc2) end end @@ -340,10 +316,9 @@ is_fuzzy_key(<<"match_", _/binary>>) -> is_fuzzy_key(_) -> false. -page_start(Page, Limit) -> - if Page > 1 -> (Page-1) * Limit + 1; - true -> 1 - end. +page_start(1, _) -> 1; +page_start(Page, Limit) -> (Page-1) * Limit + 1. + judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) -> PageStart = page_start(Page, Limit), @@ -359,11 +334,12 @@ judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Co rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) -> PageStart = page_start(Page, Limit), - if Count - Len < PageStart -> + case (Count - Len) < PageStart of + true -> NeedNowNum = Count - PageStart + 1, SubStart = Len - NeedNowNum + 1, {SubStart, NeedNowNum}; - true -> + false -> {_SubStart = 1, _NeedNowNum = Len} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_banned.erl b/apps/emqx_management/src/emqx_mgmt_api_banned.erl index 9c9fbbb31..c9ae1401d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_banned.erl @@ -23,7 +23,11 @@ -behaviour(minirest_api). --export([api_spec/0, paths/0, schema/1, fields/1]). +-export([ api_spec/0 + , paths/0 + , schema/1 + , fields/1]). + -export([format/1]). -export([ banned/2 @@ -62,7 +66,9 @@ schema("/banned") -> description => <<"Create banned">>, 'requestBody' => hoconsc:mk(hoconsc:ref(ban)), responses => #{ - 200 => <<"Create success">> + 200 => [{data, hoconsc:mk(hoconsc:array(hoconsc:ref(ban)), #{})}], + 400 => emqx_dashboard_swagger:error_codes(['ALREADY_EXISTED'], + <<"Banned already existed">>) } } }; @@ -135,8 +141,12 @@ banned(get, #{query_string := Params}) -> Response = emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN), {200, Response}; banned(post, #{body := Body}) -> - _ = emqx_banned:create(emqx_banned:parse(Body)), - {200}. + case emqx_banned:create(emqx_banned:parse(Body)) of + {ok, Banned} -> + {200, format(Banned)}; + {error, {already_exist, Old}} -> + {400, #{code => 'ALREADY_EXISTED', message => format(Old)}} + end. delete_banned(delete, #{bindings := Params}) -> case emqx_banned:look_up(Params) of diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index 48affe996..963db10b7 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -155,8 +155,20 @@ config_reset(post, _Params, Req) -> configs(get, Params, _Req) -> Node = maps:get(node, Params, node()), - Res = rpc:call(Node, ?MODULE, get_full_config, []), - {200, Res}. + case + lists:member(Node, mria_mnesia:running_nodes()) + andalso + rpc:call(Node, ?MODULE, get_full_config, []) + of + false -> + Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])), + {500, #{code => 'BAD_NODE', message => Message}}; + {error, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("Bad node ~p, reason ~p", [Node, R])), + {500, #{code => 'BAD_NODE', message => Message}}; + Res -> + {200, Res} + end. conf_path_reset(Req) -> <<"/api/v5", ?PREFIX_RESET, Path/binary>> = cowboy_req:path(Req), diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 2a871d166..f21b1d8de 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -210,7 +210,7 @@ param_path_id() -> #{ name => id, in => path, - schema => #{type => string}, + schema => #{type => string, example => emqx_listeners:id_example()}, required => true }. diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index b276988bd..cce9276ed 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -126,7 +126,8 @@ array_schema(Schema, Desc) -> object_array_schema(Properties) when is_map(Properties) -> json_content_schema(#{type => array, items => #{type => object, properties => Properties}}). object_array_schema(Properties, Desc) -> - json_content_schema(#{type => array, items => #{type => object, properties => Properties}}, Desc). + json_content_schema(#{type => array, + items => #{type => object, properties => Properties}}, Desc). page_schema(Ref) when is_atom(Ref) -> page_schema(minirest:ref(atom_to_binary(Ref, utf8))); @@ -201,7 +202,10 @@ batch_operation(Module, Function, ArgsList) -> Failed = batch_operation(Module, Function, ArgsList, []), Len = erlang:length(Failed), Success = erlang:length(ArgsList) - Len, - Fun = fun({Args, Reason}, Detail) -> [#{data => Args, reason => io_lib:format("~p", [Reason])} | Detail] end, + Fun = + fun({Args, Reason}, Detail) -> + [#{data => Args, reason => io_lib:format("~p", [Reason])} | Detail] + end, #{success => Success, failed => Len, detail => lists:foldl(Fun, [], Failed)}. batch_operation(_Module, _Function, [], Failed) -> @@ -218,7 +222,7 @@ properties(Props) -> properties(Props, #{}). properties([], Acc) -> Acc; -properties([Key| Props], Acc) when is_atom(Key) -> +properties([Key | Props], Acc) when is_atom(Key) -> properties(Props, maps:put(Key, #{type => string}, Acc)); properties([{Key, Type} | Props], Acc) -> properties(Props, maps:put(Key, #{type => Type}, Acc)); @@ -266,6 +270,9 @@ generate_response(QueryResult) -> case QueryResult of {error, page_limit_invalid} -> {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; + {error, Node, {badrpc, R}} -> + Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])), + {500, #{code => <<"NODE_DOWN">>, message => Message}}; Response -> {200, Response} end. diff --git a/apps/emqx_modules/src/emqx_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl index 799f192d0..c3e1476bc 100644 --- a/apps/emqx_modules/src/emqx_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -63,10 +63,12 @@ status_api() -> responses => #{<<"200">> => object_schema(Props)} }, put => #{ - description => "Enable or disbale telemetry", + description => "Enable or disable telemetry", 'requestBody' => object_schema(Props), responses => #{ - <<"200">> => schema(<<"Enable or disbale telemetry successfully">>), + <<"200">> => + object_schema(properties([{enable, boolean, <<"">>}]), + <<"Enable or disable telemetry successfully">>), <<"400">> => bad_request() } } @@ -77,10 +79,7 @@ data_api() -> Metadata = #{ get => #{ responses => #{ - <<"200">> => object_schema(properties(), <<"Get telemetry data">>) - } - } - }, + <<"200">> => object_schema(properties(), <<"Get telemetry data">>)}}}, {"/telemetry/data", Metadata, data}. %%-------------------------------------------------------------------- @@ -97,10 +96,10 @@ status(put, #{body := Body}) -> true -> <<"Telemetry status is already enabled">>; false -> <<"Telemetry status is already disable">> end, - {400, #{code => "BAD_REQUEST", message => Reason}}; + {400, #{code => 'BAD_REQUEST', message => Reason}}; false -> enable_telemetry(Enable), - {200} + {200, #{<<"enable">> => emqx_telemetry:get_status()}} end. data(get, _Request) ->