fixt: generate api (#6124)
* fix: generate api * fix: banned suite * fix: generate api bad rpc * fix: bad message suite * fix: create banned with check existed
This commit is contained in:
parent
17547970c6
commit
7c48bcabed
|
@ -146,19 +146,36 @@ to_timestamp(Rfc3339) when is_binary(Rfc3339) ->
|
||||||
to_timestamp(Rfc3339) ->
|
to_timestamp(Rfc3339) ->
|
||||||
calendar:rfc3339_to_system_time(Rfc3339, [{unit, second}]).
|
calendar:rfc3339_to_system_time(Rfc3339, [{unit, second}]).
|
||||||
|
|
||||||
-spec(create(emqx_types:banned() | map()) -> ok).
|
-spec(create(emqx_types:banned() | map()) ->
|
||||||
|
{ok, emqx_types:banned()} | {error, {already_exist, emqx_types:banned()}}).
|
||||||
create(#{who := Who,
|
create(#{who := Who,
|
||||||
by := By,
|
by := By,
|
||||||
reason := Reason,
|
reason := Reason,
|
||||||
at := At,
|
at := At,
|
||||||
until := Until}) ->
|
until := Until}) ->
|
||||||
mria:dirty_write(?BANNED_TAB, #banned{who = Who,
|
Banned = #banned{
|
||||||
|
who = Who,
|
||||||
by = By,
|
by = By,
|
||||||
reason = Reason,
|
reason = Reason,
|
||||||
at = At,
|
at = At,
|
||||||
until = Until});
|
until = Until
|
||||||
create(Banned) when is_record(Banned, banned) ->
|
},
|
||||||
mria:dirty_write(?BANNED_TAB, Banned).
|
create(Banned);
|
||||||
|
|
||||||
|
create(Banned = #banned{who = Who}) ->
|
||||||
|
case look_up(Who) of
|
||||||
|
[] ->
|
||||||
|
mria:dirty_write(?BANNED_TAB, Banned),
|
||||||
|
{ok, Banned};
|
||||||
|
[OldBanned = #banned{until = Until}] ->
|
||||||
|
case Until < erlang:system_time(second) of
|
||||||
|
true ->
|
||||||
|
{error, {already_exist, OldBanned}};
|
||||||
|
false ->
|
||||||
|
mria:dirty_write(?BANNED_TAB, Banned),
|
||||||
|
{ok, Banned}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
look_up(Who) when is_map(Who) ->
|
look_up(Who) when is_map(Who) ->
|
||||||
look_up(pares_who(Who));
|
look_up(pares_who(Who));
|
||||||
|
|
|
@ -129,7 +129,8 @@ handle_cast({detected, #flapping{clientid = ClientId,
|
||||||
reason = <<"flapping is detected">>,
|
reason = <<"flapping is detected">>,
|
||||||
at = Now,
|
at = Now,
|
||||||
until = Now + (Interval div 1000)},
|
until = Now + (Interval div 1000)},
|
||||||
emqx_banned:create(Banned);
|
{ok, _} = emqx_banned:create(Banned),
|
||||||
|
ok;
|
||||||
false ->
|
false ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "client_disconnected",
|
msg => "client_disconnected",
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
%% @doc Start/Stop MQTT listeners.
|
%% @doc Start/Stop MQTT listeners.
|
||||||
-module(emqx_listeners).
|
-module(emqx_listeners).
|
||||||
|
|
||||||
|
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 10000}}]).
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
|
||||||
|
@ -28,6 +30,7 @@
|
||||||
, is_running/1
|
, is_running/1
|
||||||
, current_conns/2
|
, current_conns/2
|
||||||
, max_conns/2
|
, max_conns/2
|
||||||
|
, id_example/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ start_listener/1
|
-export([ start_listener/1
|
||||||
|
@ -48,6 +51,18 @@
|
||||||
-define(CONF_KEY_PATH, [listeners]).
|
-define(CONF_KEY_PATH, [listeners]).
|
||||||
-define(TYPES_STRING, ["tcp","ssl","ws","wss","quic"]).
|
-define(TYPES_STRING, ["tcp","ssl","ws","wss","quic"]).
|
||||||
|
|
||||||
|
-spec(id_example() -> atom()).
|
||||||
|
id_example() ->
|
||||||
|
id_example(list()).
|
||||||
|
|
||||||
|
id_example([]) ->
|
||||||
|
{ID, _} = hd(list()),
|
||||||
|
ID;
|
||||||
|
id_example([{'tcp:default', _} | _]) ->
|
||||||
|
'tcp:default';
|
||||||
|
id_example([_ | Listeners]) ->
|
||||||
|
id_example(Listeners).
|
||||||
|
|
||||||
%% @doc List configured listeners.
|
%% @doc List configured listeners.
|
||||||
-spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]).
|
-spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]).
|
||||||
list() ->
|
list() ->
|
||||||
|
@ -235,10 +250,10 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
|
||||||
, {key, maps:get(keyfile, Opts)}
|
, {key, maps:get(keyfile, Opts)}
|
||||||
, {alpn, ["mqtt"]}
|
, {alpn, ["mqtt"]}
|
||||||
, {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])}
|
, {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])}
|
||||||
, {idle_timeout_ms, lists:max([
|
, {idle_timeout_ms,
|
||||||
emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3
|
lists:max([
|
||||||
, timer:seconds(maps:get(idle_timeout, Opts))]
|
emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3,
|
||||||
)}
|
timer:seconds(maps:get(idle_timeout, Opts))])}
|
||||||
],
|
],
|
||||||
ConnectionOpts = #{ conn_callback => emqx_quic_connection
|
ConnectionOpts = #{ conn_callback => emqx_quic_connection
|
||||||
, peer_unidi_stream_count => 1
|
, peer_unidi_stream_count => 1
|
||||||
|
@ -281,7 +296,8 @@ flatten_listeners(Conf0) ->
|
||||||
|| {Type, Conf} <- maps:to_list(Conf0)])).
|
|| {Type, Conf} <- maps:to_list(Conf0)])).
|
||||||
|
|
||||||
do_flatten_listeners(Type, Conf0) ->
|
do_flatten_listeners(Type, Conf0) ->
|
||||||
[{listener_id(Type, Name), maps:remove(authentication, Conf)} || {Name, Conf} <- maps:to_list(Conf0)].
|
[{listener_id(Type, Name), maps:remove(authentication, Conf)} ||
|
||||||
|
{Name, Conf} <- maps:to_list(Conf0)].
|
||||||
|
|
||||||
esockd_opts(Type, Opts0) ->
|
esockd_opts(Type, Opts0) ->
|
||||||
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
|
||||||
|
|
|
@ -41,16 +41,16 @@ t_add_delete(_) ->
|
||||||
at = erlang:system_time(second),
|
at = erlang:system_time(second),
|
||||||
until = erlang:system_time(second) + 1000
|
until = erlang:system_time(second) + 1000
|
||||||
},
|
},
|
||||||
ok = emqx_banned:create(Banned),
|
{ok, _} = emqx_banned:create(Banned),
|
||||||
?assertEqual(1, emqx_banned:info(size)),
|
?assertEqual(1, emqx_banned:info(size)),
|
||||||
|
|
||||||
ok = emqx_banned:delete({clientid, <<"TestClient">>}),
|
ok = emqx_banned:delete({clientid, <<"TestClient">>}),
|
||||||
?assertEqual(0, emqx_banned:info(size)).
|
?assertEqual(0, emqx_banned:info(size)).
|
||||||
|
|
||||||
t_check(_) ->
|
t_check(_) ->
|
||||||
ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}}),
|
{ok, _} = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>}}),
|
||||||
ok = emqx_banned:create(#banned{who = {username, <<"BannedUser">>}}),
|
{ok, _} = emqx_banned:create(#banned{who = {username, <<"BannedUser">>}}),
|
||||||
ok = emqx_banned:create(#banned{who = {peerhost, {192,168,0,1}}}),
|
{ok, _} = emqx_banned:create(#banned{who = {peerhost, {192,168,0,1}}}),
|
||||||
?assertEqual(3, emqx_banned:info(size)),
|
?assertEqual(3, emqx_banned:info(size)),
|
||||||
ClientInfo1 = #{clientid => <<"BannedClient">>,
|
ClientInfo1 = #{clientid => <<"BannedClient">>,
|
||||||
username => <<"user">>,
|
username => <<"user">>,
|
||||||
|
@ -83,7 +83,7 @@ t_check(_) ->
|
||||||
|
|
||||||
t_unused(_) ->
|
t_unused(_) ->
|
||||||
{ok, Banned} = emqx_banned:start_link(),
|
{ok, Banned} = emqx_banned:start_link(),
|
||||||
ok = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>},
|
{ok, _} = emqx_banned:create(#banned{who = {clientid, <<"BannedClient">>},
|
||||||
until = erlang:system_time(second)}),
|
until = erlang:system_time(second)}),
|
||||||
?assertEqual(ignored, gen_server:call(Banned, unexpected_req)),
|
?assertEqual(ignored, gen_server:call(Banned, unexpected_req)),
|
||||||
?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)),
|
?assertEqual(ok, gen_server:cast(Banned, unexpected_msg)),
|
||||||
|
|
|
@ -533,8 +533,9 @@ t_publish_while_client_is_gone(Config) ->
|
||||||
{clean_start, false}
|
{clean_start, false}
|
||||||
| Config]),
|
| Config]),
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
[Msg1] = receive_messages(1),
|
Msgs = receive_messages(2),
|
||||||
[Msg2] = receive_messages(1),
|
?assertEqual(length(Msgs), 2),
|
||||||
|
[Msg2, Msg1] = Msgs,
|
||||||
?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
|
?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
|
||||||
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
|
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
|
||||||
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
|
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
|
||||||
|
|
|
@ -30,6 +30,9 @@
|
||||||
|
|
||||||
% Swagger
|
% Swagger
|
||||||
|
|
||||||
|
-define(API_TAGS_GLOBAL, [<<"authentication">>, <<"authentication config(global)">>]).
|
||||||
|
-define(API_TAGS_SINGLE, [<<"authentication">>, <<"authentication config(single listener)">>]).
|
||||||
|
|
||||||
-export([ api_spec/0
|
-export([ api_spec/0
|
||||||
, paths/0
|
, paths/0
|
||||||
, schema/1
|
, schema/1
|
||||||
|
@ -126,7 +129,7 @@ schema("/authentication") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => authenticators,
|
'operationId' => authenticators,
|
||||||
get => #{
|
get => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"List authenticators for global authentication">>,
|
description => <<"List authenticators for global authentication">>,
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => emqx_dashboard_swagger:schema_with_example(
|
200 => emqx_dashboard_swagger:schema_with_example(
|
||||||
|
@ -135,7 +138,7 @@ schema("/authentication") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Create authenticator for global authentication">>,
|
description => <<"Create authenticator for global authentication">>,
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
emqx_authn_schema:authenticator_type(),
|
emqx_authn_schema:authenticator_type(),
|
||||||
|
@ -154,9 +157,9 @@ schema("/authentication/:id") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => authenticator,
|
'operationId' => authenticator,
|
||||||
get => #{
|
get => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Get authenticator from global authentication chain">>,
|
description => <<"Get authenticator from global authentication chain">>,
|
||||||
parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}],
|
parameters => [param_auth_id()],
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => emqx_dashboard_swagger:schema_with_examples(
|
200 => emqx_dashboard_swagger:schema_with_examples(
|
||||||
emqx_authn_schema:authenticator_type(),
|
emqx_authn_schema:authenticator_type(),
|
||||||
|
@ -165,9 +168,9 @@ schema("/authentication/:id") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
put => #{
|
put => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Update authenticator from global authentication chain">>,
|
description => <<"Update authenticator from global authentication chain">>,
|
||||||
parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}],
|
parameters => [param_auth_id()],
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
emqx_authn_schema:authenticator_type(),
|
emqx_authn_schema:authenticator_type(),
|
||||||
authenticator_examples()
|
authenticator_examples()
|
||||||
|
@ -182,9 +185,9 @@ schema("/authentication/:id") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
delete => #{
|
delete => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Delete authenticator from global authentication chain">>,
|
description => <<"Delete authenticator from global authentication chain">>,
|
||||||
parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}],
|
parameters => [param_auth_id()],
|
||||||
responses => #{
|
responses => #{
|
||||||
204 => <<"Authenticator deleted">>,
|
204 => <<"Authenticator deleted">>,
|
||||||
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
||||||
|
@ -196,9 +199,9 @@ schema("/listeners/:listener_id/authentication") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => listener_authenticators,
|
'operationId' => listener_authenticators,
|
||||||
get => #{
|
get => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"List authenticators for listener authentication">>,
|
description => <<"List authenticators for listener authentication">>,
|
||||||
parameters => [{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}],
|
parameters => [param_listener_id()],
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => emqx_dashboard_swagger:schema_with_example(
|
200 => emqx_dashboard_swagger:schema_with_example(
|
||||||
hoconsc:array(emqx_authn_schema:authenticator_type()),
|
hoconsc:array(emqx_authn_schema:authenticator_type()),
|
||||||
|
@ -206,9 +209,9 @@ schema("/listeners/:listener_id/authentication") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Create authenticator for listener authentication">>,
|
description => <<"Create authenticator for listener authentication">>,
|
||||||
parameters => [{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})}],
|
parameters => [param_listener_id()],
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
emqx_authn_schema:authenticator_type(),
|
emqx_authn_schema:authenticator_type(),
|
||||||
authenticator_examples()
|
authenticator_examples()
|
||||||
|
@ -227,12 +230,9 @@ schema("/listeners/:listener_id/authentication/:id") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => listener_authenticator,
|
'operationId' => listener_authenticator,
|
||||||
get => #{
|
get => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Get authenticator from listener authentication chain">>,
|
description => <<"Get authenticator from listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_listener_id(), param_auth_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}
|
|
||||||
],
|
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => emqx_dashboard_swagger:schema_with_examples(
|
200 => emqx_dashboard_swagger:schema_with_examples(
|
||||||
emqx_authn_schema:authenticator_type(),
|
emqx_authn_schema:authenticator_type(),
|
||||||
|
@ -241,12 +241,9 @@ schema("/listeners/:listener_id/authentication/:id") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
put => #{
|
put => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Update authenticator from listener authentication chain">>,
|
description => <<"Update authenticator from listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_listener_id(), param_auth_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}
|
|
||||||
],
|
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
emqx_authn_schema:authenticator_type(),
|
emqx_authn_schema:authenticator_type(),
|
||||||
authenticator_examples()),
|
authenticator_examples()),
|
||||||
|
@ -260,12 +257,9 @@ schema("/listeners/:listener_id/authentication/:id") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
delete => #{
|
delete => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Delete authenticator from listener authentication chain">>,
|
description => <<"Delete authenticator from listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_listener_id(), param_auth_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}
|
|
||||||
],
|
|
||||||
responses => #{
|
responses => #{
|
||||||
204 => <<"Authenticator deleted">>,
|
204 => <<"Authenticator deleted">>,
|
||||||
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
||||||
|
@ -278,9 +272,9 @@ schema("/authentication/:id/move") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => authenticator_move,
|
'operationId' => authenticator_move,
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Move authenticator in global authentication chain">>,
|
description => <<"Move authenticator in global authentication chain">>,
|
||||||
parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}],
|
parameters => [param_auth_id()],
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
ref(request_move),
|
ref(request_move),
|
||||||
request_move_examples()),
|
request_move_examples()),
|
||||||
|
@ -296,12 +290,9 @@ schema("/listeners/:listener_id/authentication/:id/move") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => listener_authenticator_move,
|
'operationId' => listener_authenticator_move,
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Move authenticator in listener authentication chain">>,
|
description => <<"Move authenticator in listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_listener_id(), param_auth_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}
|
|
||||||
],
|
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
ref(request_move),
|
ref(request_move),
|
||||||
request_move_examples()),
|
request_move_examples()),
|
||||||
|
@ -317,9 +308,9 @@ schema("/authentication/:id/import_users") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => authenticator_import_users,
|
'operationId' => authenticator_import_users,
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Import users into authenticator in global authentication chain">>,
|
description => <<"Import users into authenticator in global authentication chain">>,
|
||||||
parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}],
|
parameters => [param_auth_id()],
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
ref(request_import_users),
|
ref(request_import_users),
|
||||||
request_import_users_examples()),
|
request_import_users_examples()),
|
||||||
|
@ -335,12 +326,9 @@ schema("/listeners/:listener_id/authentication/:id/import_users") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => listener_authenticator_import_users,
|
'operationId' => listener_authenticator_import_users,
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Import users into authenticator in listener authentication chain">>,
|
description => <<"Import users into authenticator in listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_listener_id(), param_auth_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}
|
|
||||||
],
|
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
ref(request_import_users),
|
ref(request_import_users),
|
||||||
request_import_users_examples()),
|
request_import_users_examples()),
|
||||||
|
@ -356,9 +344,9 @@ schema("/authentication/:id/users") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => authenticator_users,
|
'operationId' => authenticator_users,
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Create users for authenticator in global authentication chain">>,
|
description => <<"Create users for authenticator in global authentication chain">>,
|
||||||
parameters => [{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}],
|
parameters => [param_auth_id()],
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
ref(request_user_create),
|
ref(request_user_create),
|
||||||
request_user_create_examples()),
|
request_user_create_examples()),
|
||||||
|
@ -371,10 +359,10 @@ schema("/authentication/:id/users") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
get => #{
|
get => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"List users in authenticator in global authentication chain">>,
|
description => <<"List users in authenticator in global authentication chain">>,
|
||||||
parameters => [
|
parameters => [
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})},
|
param_auth_id(),
|
||||||
{page, mk(integer(), #{in => query, desc => <<"Page Index">>, nullable => true})},
|
{page, mk(integer(), #{in => query, desc => <<"Page Index">>, nullable => true})},
|
||||||
{limit, mk(integer(), #{in => query, desc => <<"Page Limit">>, nullable => true})}
|
{limit, mk(integer(), #{in => query, desc => <<"Page Limit">>, nullable => true})}
|
||||||
],
|
],
|
||||||
|
@ -392,12 +380,9 @@ schema("/listeners/:listener_id/authentication/:id/users") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => listener_authenticator_users,
|
'operationId' => listener_authenticator_users,
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Create users for authenticator in global authentication chain">>,
|
description => <<"Create users for authenticator in global authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_auth_id(), param_listener_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})}
|
|
||||||
],
|
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
ref(request_user_create),
|
ref(request_user_create),
|
||||||
request_user_create_examples()),
|
request_user_create_examples()),
|
||||||
|
@ -410,11 +395,10 @@ schema("/listeners/:listener_id/authentication/:id/users") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
get => #{
|
get => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"List users in authenticator in listener authentication chain">>,
|
description => <<"List users in authenticator in listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
param_listener_id(), param_auth_id(),
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})},
|
|
||||||
{page, mk(integer(), #{in => query, desc => <<"Page Index">>, nullable => true})},
|
{page, mk(integer(), #{in => query, desc => <<"Page Index">>, nullable => true})},
|
||||||
{limit, mk(integer(), #{in => query, desc => <<"Page Limit">>, nullable => true})}
|
{limit, mk(integer(), #{in => query, desc => <<"Page Limit">>, nullable => true})}
|
||||||
],
|
],
|
||||||
|
@ -432,12 +416,9 @@ schema("/authentication/:id/users/:user_id") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => authenticator_user,
|
'operationId' => authenticator_user,
|
||||||
get => #{
|
get => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Get user from authenticator in global authentication chain">>,
|
description => <<"Get user from authenticator in global authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_auth_id(), param_user_id()],
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})},
|
|
||||||
{user_id, mk(binary(), #{in => path, desc => <<"User ID">>})}
|
|
||||||
],
|
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => emqx_dashboard_swagger:schema_with_examples(
|
200 => emqx_dashboard_swagger:schema_with_examples(
|
||||||
ref(response_user),
|
ref(response_user),
|
||||||
|
@ -446,12 +427,9 @@ schema("/authentication/:id/users/:user_id") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
put => #{
|
put => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Update user in authenticator in global authentication chain">>,
|
description => <<"Update user in authenticator in global authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_auth_id(), param_user_id()],
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})},
|
|
||||||
{user_id, mk(binary(), #{in => path, desc => <<"User ID">>})}
|
|
||||||
],
|
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
ref(request_user_update),
|
ref(request_user_update),
|
||||||
request_user_update_examples()),
|
request_user_update_examples()),
|
||||||
|
@ -464,12 +442,9 @@ schema("/authentication/:id/users/:user_id") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
delete => #{
|
delete => #{
|
||||||
tags => [<<"authentication">>, <<"global">>],
|
tags => ?API_TAGS_GLOBAL,
|
||||||
description => <<"Update user in authenticator in global authentication chain">>,
|
description => <<"Update user in authenticator in global authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_auth_id(), param_user_id()],
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})},
|
|
||||||
{user_id, mk(binary(), #{in => path, desc => <<"User ID">>})}
|
|
||||||
],
|
|
||||||
responses => #{
|
responses => #{
|
||||||
204 => <<"User deleted">>,
|
204 => <<"User deleted">>,
|
||||||
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
||||||
|
@ -481,13 +456,9 @@ schema("/listeners/:listener_id/authentication/:id/users/:user_id") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => listener_authenticator_user,
|
'operationId' => listener_authenticator_user,
|
||||||
get => #{
|
get => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Get user from authenticator in listener authentication chain">>,
|
description => <<"Get user from authenticator in listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_listener_id(), param_auth_id(), param_user_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})},
|
|
||||||
{user_id, mk(binary(), #{in => path, desc => <<"User ID">>})}
|
|
||||||
],
|
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => emqx_dashboard_swagger:schema_with_example(
|
200 => emqx_dashboard_swagger:schema_with_example(
|
||||||
ref(response_user),
|
ref(response_user),
|
||||||
|
@ -496,13 +467,9 @@ schema("/listeners/:listener_id/authentication/:id/users/:user_id") ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
put => #{
|
put => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Update user in authenticator in listener authentication chain">>,
|
description => <<"Update user in authenticator in listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_listener_id(), param_auth_id(), param_user_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})},
|
|
||||||
{user_id, mk(binary(), #{in => path, desc => <<"User ID">>})}
|
|
||||||
],
|
|
||||||
'requestBody' => emqx_dashboard_swagger:schema_with_example(
|
'requestBody' => emqx_dashboard_swagger:schema_with_example(
|
||||||
ref(request_user_update),
|
ref(request_user_update),
|
||||||
request_user_update_examples()),
|
request_user_update_examples()),
|
||||||
|
@ -516,13 +483,9 @@ schema("/listeners/:listener_id/authentication/:id/users/:user_id") ->
|
||||||
|
|
||||||
},
|
},
|
||||||
delete => #{
|
delete => #{
|
||||||
tags => [<<"authentication">>, <<"listener">>],
|
tags => ?API_TAGS_SINGLE,
|
||||||
description => <<"Update user in authenticator in listener authentication chain">>,
|
description => <<"Update user in authenticator in listener authentication chain">>,
|
||||||
parameters => [
|
parameters => [param_listener_id(), param_auth_id(), param_user_id()],
|
||||||
{listener_id, mk(binary(), #{in => path, desc => <<"Listener ID">>})},
|
|
||||||
{id, mk(binary(), #{in => path, desc => <<"Authenticator ID">>})},
|
|
||||||
{user_id, mk(binary(), #{in => path, desc => <<"User ID">>})}
|
|
||||||
],
|
|
||||||
responses => #{
|
responses => #{
|
||||||
204 => <<"User deleted">>,
|
204 => <<"User deleted">>,
|
||||||
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
404 => error_codes([?NOT_FOUND], <<"Not Found">>)
|
||||||
|
@ -530,6 +493,34 @@ schema("/listeners/:listener_id/authentication/:id/users/:user_id") ->
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
param_auth_id() ->
|
||||||
|
{
|
||||||
|
id,
|
||||||
|
mk(binary(), #{
|
||||||
|
in => path,
|
||||||
|
desc => <<"Authenticator ID">>
|
||||||
|
})
|
||||||
|
}.
|
||||||
|
|
||||||
|
param_listener_id() ->
|
||||||
|
{
|
||||||
|
listener_id,
|
||||||
|
mk(binary(), #{
|
||||||
|
in => path,
|
||||||
|
desc => <<"Listener ID">>,
|
||||||
|
example => emqx_listeners:id_example()
|
||||||
|
})
|
||||||
|
}.
|
||||||
|
|
||||||
|
param_user_id() ->
|
||||||
|
{
|
||||||
|
user_id,
|
||||||
|
mk(binary(), #{
|
||||||
|
in => path,
|
||||||
|
desc => <<"User ID">>
|
||||||
|
})
|
||||||
|
}.
|
||||||
|
|
||||||
authenticators(post, #{body := Config}) ->
|
authenticators(post, #{body := Config}) ->
|
||||||
create_authenticator([authentication], ?GLOBAL, Config);
|
create_authenticator([authentication], ?GLOBAL, Config);
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
|
||||||
|
-elvis([{elvis_style, dont_repeat_yourself, #{min_complexity => 100}}]).
|
||||||
|
|
||||||
-define(FRESH_SELECT, fresh_select).
|
-define(FRESH_SELECT, fresh_select).
|
||||||
|
|
||||||
-export([ paginate/3
|
-export([ paginate/3
|
||||||
|
@ -35,23 +37,14 @@
|
||||||
paginate(Tables, Params, {Module, FormatFun}) ->
|
paginate(Tables, Params, {Module, FormatFun}) ->
|
||||||
Qh = query_handle(Tables),
|
Qh = query_handle(Tables),
|
||||||
Count = count(Tables),
|
Count = count(Tables),
|
||||||
Page = b2i(page(Params)),
|
do_paginate(Qh, Count, Params, {Module, FormatFun}).
|
||||||
Limit = b2i(limit(Params)),
|
|
||||||
Cursor = qlc:cursor(Qh),
|
|
||||||
case Page > 1 of
|
|
||||||
true ->
|
|
||||||
_ = qlc:next_answers(Cursor, (Page - 1) * Limit),
|
|
||||||
ok;
|
|
||||||
false -> ok
|
|
||||||
end,
|
|
||||||
Rows = qlc:next_answers(Cursor, Limit),
|
|
||||||
qlc:delete_cursor(Cursor),
|
|
||||||
#{meta => #{page => Page, limit => Limit, count => Count},
|
|
||||||
data => [Module:FormatFun(Row) || Row <- Rows]}.
|
|
||||||
|
|
||||||
paginate(Tables, MatchSpec, Params, {Module, FormatFun}) ->
|
paginate(Tables, MatchSpec, Params, {Module, FormatFun}) ->
|
||||||
Qh = query_handle(Tables, MatchSpec),
|
Qh = query_handle(Tables, MatchSpec),
|
||||||
Count = count(Tables, MatchSpec),
|
Count = count(Tables, MatchSpec),
|
||||||
|
do_paginate(Qh, Count, Params, {Module, FormatFun}).
|
||||||
|
|
||||||
|
do_paginate(Qh, Count, Params, {Module, FormatFun}) ->
|
||||||
Page = b2i(page(Params)),
|
Page = b2i(page(Params)),
|
||||||
Limit = b2i(limit(Params)),
|
Limit = b2i(limit(Params)),
|
||||||
Cursor = qlc:cursor(Qh),
|
Cursor = qlc:cursor(Qh),
|
||||||
|
@ -64,7 +57,7 @@ paginate(Tables, MatchSpec, Params, {Module, FormatFun}) ->
|
||||||
Rows = qlc:next_answers(Cursor, Limit),
|
Rows = qlc:next_answers(Cursor, Limit),
|
||||||
qlc:delete_cursor(Cursor),
|
qlc:delete_cursor(Cursor),
|
||||||
#{meta => #{page => Page, limit => Limit, count => Count},
|
#{meta => #{page => Page, limit => Limit, count => Count},
|
||||||
data => [Module:FormatFun(Row) || Row <- Rows]}.
|
data => [erlang:apply(Module, FormatFun, [Row]) || Row <- Rows]}.
|
||||||
|
|
||||||
query_handle(Table) when is_atom(Table) ->
|
query_handle(Table) when is_atom(Table) ->
|
||||||
qlc:q([R || R <- ets:table(Table)]);
|
qlc:q([R || R <- ets:table(Table)]);
|
||||||
|
@ -95,9 +88,7 @@ count(Table, MatchSpec) when is_atom(Table) ->
|
||||||
NMatchSpec = [{MatchPattern, Where, [true]}],
|
NMatchSpec = [{MatchPattern, Where, [true]}],
|
||||||
ets:select_count(Table, NMatchSpec);
|
ets:select_count(Table, NMatchSpec);
|
||||||
count([Table], MatchSpec) when is_atom(Table) ->
|
count([Table], MatchSpec) when is_atom(Table) ->
|
||||||
[{MatchPattern, Where, _Re}] = MatchSpec,
|
count(Table, MatchSpec);
|
||||||
NMatchSpec = [{MatchPattern, Where, [true]}],
|
|
||||||
ets:select_count(Table, NMatchSpec);
|
|
||||||
count(Tables, MatchSpec) ->
|
count(Tables, MatchSpec) ->
|
||||||
lists:sum([count(T, MatchSpec) || T <- Tables]).
|
lists:sum([count(T, MatchSpec) || T <- Tables]).
|
||||||
|
|
||||||
|
@ -111,16 +102,23 @@ limit(Params) when is_map(Params) ->
|
||||||
limit(Params) ->
|
limit(Params) ->
|
||||||
proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
|
proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
|
||||||
|
|
||||||
|
init_meta(Params) ->
|
||||||
|
Limit = b2i(limit(Params)),
|
||||||
|
Page = b2i(page(Params)),
|
||||||
|
#{
|
||||||
|
page => Page,
|
||||||
|
limit => Limit,
|
||||||
|
count => 0
|
||||||
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Node Query
|
%% Node Query
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
node_query(Node, Params, Tab, QsSchema, QueryFun) ->
|
node_query(Node, Params, Tab, QsSchema, QueryFun) ->
|
||||||
{_CodCnt, Qs} = params2qs(Params, QsSchema),
|
{_CodCnt, Qs} = params2qs(Params, QsSchema),
|
||||||
Limit = b2i(limit(Params)),
|
page_limit_check_query(init_meta(Params),
|
||||||
Page = b2i(page(Params)),
|
{fun do_node_query/5, [Node, Tab, Qs, QueryFun, init_meta(Params)]}).
|
||||||
Meta = #{page => Page, limit => Limit, count => 0},
|
|
||||||
page_limit_check_query(Meta, {fun do_node_query/5, [Node, Tab, Qs, QueryFun, Meta]}).
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
do_node_query(Node, Tab, Qs, QueryFun, Meta) ->
|
do_node_query(Node, Tab, Qs, QueryFun, Meta) ->
|
||||||
|
@ -129,34 +127,15 @@ do_node_query(Node, Tab, Qs, QueryFun, Meta) ->
|
||||||
do_node_query( Node, Tab, Qs, QueryFun, Continuation
|
do_node_query( Node, Tab, Qs, QueryFun, Continuation
|
||||||
, Meta = #{limit := Limit}
|
, Meta = #{limit := Limit}
|
||||||
, Results) ->
|
, Results) ->
|
||||||
{Len, Rows, NContinuation} = do_query(Node, Tab, Qs, QueryFun, Continuation, Limit),
|
case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of
|
||||||
case judge_page_with_counting(Len, Meta) of
|
{error, {badrpc, R}} ->
|
||||||
{more, NMeta} ->
|
{error, Node, {badrpc, R}};
|
||||||
case NContinuation of
|
{Len, Rows, ?FRESH_SELECT} ->
|
||||||
?FRESH_SELECT ->
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
||||||
#{meta => NMeta, data => []}; %% page and limit too big
|
|
||||||
_ ->
|
|
||||||
do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, [])
|
|
||||||
end;
|
|
||||||
{cutrows, NMeta} ->
|
|
||||||
{SubStart, NeedNowNum} = rows_sub_params(Len, NMeta),
|
|
||||||
ThisRows = lists:sublist(Rows, SubStart, NeedNowNum),
|
|
||||||
NResults = lists:sublist( lists:append(Results, ThisRows)
|
|
||||||
, SubStart, Limit),
|
|
||||||
case NContinuation of
|
|
||||||
?FRESH_SELECT ->
|
|
||||||
#{meta => NMeta, data => NResults};
|
#{meta => NMeta, data => NResults};
|
||||||
_ ->
|
{Len, Rows, NContinuation} ->
|
||||||
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
||||||
do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults)
|
do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults)
|
||||||
end;
|
|
||||||
{enough, NMeta} ->
|
|
||||||
NResults = lists:sublist(lists:append(Results, Rows), 1, Limit),
|
|
||||||
case NContinuation of
|
|
||||||
?FRESH_SELECT ->
|
|
||||||
#{meta => NMeta, data => NResults};
|
|
||||||
_ ->
|
|
||||||
do_node_query(Node, Tab, Qs, QueryFun, NContinuation, NMeta, NResults)
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -165,11 +144,9 @@ do_node_query( Node, Tab, Qs, QueryFun, Continuation
|
||||||
|
|
||||||
cluster_query(Params, Tab, QsSchema, QueryFun) ->
|
cluster_query(Params, Tab, QsSchema, QueryFun) ->
|
||||||
{_CodCnt, Qs} = params2qs(Params, QsSchema),
|
{_CodCnt, Qs} = params2qs(Params, QsSchema),
|
||||||
Limit = b2i(limit(Params)),
|
|
||||||
Page = b2i(page(Params)),
|
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
Meta = #{page => Page, limit => Limit, count => 0},
|
page_limit_check_query(init_meta(Params),
|
||||||
page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}).
|
{fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, init_meta(Params)]}).
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) ->
|
do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) ->
|
||||||
|
@ -177,37 +154,17 @@ do_cluster_query(Nodes, Tab, Qs, QueryFun, Meta) ->
|
||||||
|
|
||||||
do_cluster_query([], _Tab, _Qs, _QueryFun, _Continuation, Meta, Results) ->
|
do_cluster_query([], _Tab, _Qs, _QueryFun, _Continuation, Meta, Results) ->
|
||||||
#{meta => Meta, data => Results};
|
#{meta => Meta, data => Results};
|
||||||
do_cluster_query( [Node | Nodes], Tab, Qs, QueryFun, Continuation
|
do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation,
|
||||||
, Meta = #{limit := Limit}
|
Meta = #{limit := Limit}, Results) ->
|
||||||
, Results) ->
|
case do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) of
|
||||||
{Len, Rows, NContinuation} = do_query(Node, Tab, Qs, QueryFun, Continuation, Limit),
|
{error, {badrpc, R}} ->
|
||||||
case judge_page_with_counting(Len, Meta) of
|
{error, Node, {bar_rpc, R}};
|
||||||
{more, NMeta} ->
|
{Len, Rows, ?FRESH_SELECT} ->
|
||||||
case NContinuation of
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
||||||
?FRESH_SELECT ->
|
do_cluster_query(Tail, Tab, Qs, QueryFun, ?FRESH_SELECT, NMeta, NResults);
|
||||||
do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, []); %% next node with parts of results
|
{Len, Rows, NContinuation} ->
|
||||||
_ ->
|
{NMeta, NResults} = sub_query_result(Len, Rows, Limit, Results, Meta),
|
||||||
do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, []) %% continue this node
|
do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults)
|
||||||
end;
|
|
||||||
{cutrows, NMeta} ->
|
|
||||||
{SubStart, NeedNowNum} = rows_sub_params(Len, NMeta),
|
|
||||||
ThisRows = lists:sublist(Rows, SubStart, NeedNowNum),
|
|
||||||
NResults = lists:sublist( lists:append(Results, ThisRows)
|
|
||||||
, SubStart, Limit),
|
|
||||||
case NContinuation of
|
|
||||||
?FRESH_SELECT ->
|
|
||||||
do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults); %% next node with parts of results
|
|
||||||
_ ->
|
|
||||||
do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, NResults) %% continue this node
|
|
||||||
end;
|
|
||||||
{enough, NMeta} ->
|
|
||||||
NResults = lists:sublist(lists:append(Results, Rows), 1, Limit),
|
|
||||||
case NContinuation of
|
|
||||||
?FRESH_SELECT ->
|
|
||||||
do_cluster_query(Nodes, Tab, Qs, QueryFun, NContinuation, NMeta, NResults); %% next node with parts of results
|
|
||||||
_ ->
|
|
||||||
do_cluster_query([Node | Nodes], Tab, Qs, QueryFun, NContinuation, NMeta, NResults) %% continue this node
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -216,11 +173,26 @@ do_cluster_query( [Node | Nodes], Tab, Qs, QueryFun, Continuation
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() ->
|
do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() ->
|
||||||
M:F(Tab, Qs, Continuation, Limit);
|
erlang:apply(M, F, [Tab, Qs, Continuation, Limit]);
|
||||||
do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) ->
|
do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) ->
|
||||||
rpc_call(Node, ?MODULE, do_query,
|
rpc_call(Node, ?MODULE, do_query,
|
||||||
[Node, Tab, Qs, QueryFun, Continuation, Limit], 50000).
|
[Node, Tab, Qs, QueryFun, Continuation, Limit], 50000).
|
||||||
|
|
||||||
|
sub_query_result(Len, Rows, Limit, Results, Meta) ->
|
||||||
|
{Flag, NMeta} = judge_page_with_counting(Len, Meta),
|
||||||
|
NResults =
|
||||||
|
case Flag of
|
||||||
|
more ->
|
||||||
|
[];
|
||||||
|
cutrows ->
|
||||||
|
{SubStart, NeedNowNum} = rows_sub_params(Len, NMeta),
|
||||||
|
ThisRows = lists:sublist(Rows, SubStart, NeedNowNum),
|
||||||
|
lists:sublist(lists:append(Results, ThisRows), SubStart, Limit);
|
||||||
|
enough ->
|
||||||
|
lists:sublist(lists:append(Results, Rows), 1, Limit)
|
||||||
|
end,
|
||||||
|
{NMeta, NResults}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
rpc_call(Node, M, F, A, T) ->
|
rpc_call(Node, M, F, A, T) ->
|
||||||
case rpc:call(Node, M, F, A, T) of
|
case rpc:call(Node, M, F, A, T) of
|
||||||
|
@ -294,16 +266,20 @@ pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) ->
|
||||||
end,
|
end,
|
||||||
case lists:keytake(OpposeKey, 1, Params) of
|
case lists:keytake(OpposeKey, 1, Params) of
|
||||||
false ->
|
false ->
|
||||||
pick_params_to_qs(Params, QsSchema, [qs(Key, Value, Type) | Acc1], Acc2);
|
pick_params_to_qs(Params, QsSchema,
|
||||||
|
[qs(Key, Value, Type) | Acc1], Acc2);
|
||||||
{value, {K2, V2}, NParams} ->
|
{value, {K2, V2}, NParams} ->
|
||||||
pick_params_to_qs(NParams, QsSchema, [qs(Key, Value, K2, V2, Type) | Acc1], Acc2)
|
pick_params_to_qs(NParams, QsSchema,
|
||||||
|
[qs(Key, Value, K2, V2, Type) | Acc1], Acc2)
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
case is_fuzzy_key(Key) of
|
case is_fuzzy_key(Key) of
|
||||||
true ->
|
true ->
|
||||||
pick_params_to_qs(Params, QsSchema, Acc1, [qs(Key, Value, Type) | Acc2]);
|
pick_params_to_qs(Params, QsSchema, Acc1,
|
||||||
|
[qs(Key, Value, Type) | Acc2]);
|
||||||
_ ->
|
_ ->
|
||||||
pick_params_to_qs(Params, QsSchema, [qs(Key, Value, Type) | Acc1], Acc2)
|
pick_params_to_qs(Params, QsSchema,
|
||||||
|
[qs(Key, Value, Type) | Acc1], Acc2)
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -340,10 +316,9 @@ is_fuzzy_key(<<"match_", _/binary>>) ->
|
||||||
is_fuzzy_key(_) ->
|
is_fuzzy_key(_) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
page_start(Page, Limit) ->
|
page_start(1, _) -> 1;
|
||||||
if Page > 1 -> (Page-1) * Limit + 1;
|
page_start(Page, Limit) -> (Page-1) * Limit + 1.
|
||||||
true -> 1
|
|
||||||
end.
|
|
||||||
|
|
||||||
judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) ->
|
judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Count}) ->
|
||||||
PageStart = page_start(Page, Limit),
|
PageStart = page_start(Page, Limit),
|
||||||
|
@ -359,11 +334,12 @@ judge_page_with_counting(Len, Meta = #{page := Page, limit := Limit, count := Co
|
||||||
|
|
||||||
rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) ->
|
rows_sub_params(Len, _Meta = #{page := Page, limit := Limit, count := Count}) ->
|
||||||
PageStart = page_start(Page, Limit),
|
PageStart = page_start(Page, Limit),
|
||||||
if Count - Len < PageStart ->
|
case (Count - Len) < PageStart of
|
||||||
|
true ->
|
||||||
NeedNowNum = Count - PageStart + 1,
|
NeedNowNum = Count - PageStart + 1,
|
||||||
SubStart = Len - NeedNowNum + 1,
|
SubStart = Len - NeedNowNum + 1,
|
||||||
{SubStart, NeedNowNum};
|
{SubStart, NeedNowNum};
|
||||||
true ->
|
false ->
|
||||||
{_SubStart = 1, _NeedNowNum = Len}
|
{_SubStart = 1, _NeedNowNum = Len}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,11 @@
|
||||||
|
|
||||||
-behaviour(minirest_api).
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
-export([api_spec/0, paths/0, schema/1, fields/1]).
|
-export([ api_spec/0
|
||||||
|
, paths/0
|
||||||
|
, schema/1
|
||||||
|
, fields/1]).
|
||||||
|
|
||||||
-export([format/1]).
|
-export([format/1]).
|
||||||
|
|
||||||
-export([ banned/2
|
-export([ banned/2
|
||||||
|
@ -62,7 +66,9 @@ schema("/banned") ->
|
||||||
description => <<"Create banned">>,
|
description => <<"Create banned">>,
|
||||||
'requestBody' => hoconsc:mk(hoconsc:ref(ban)),
|
'requestBody' => hoconsc:mk(hoconsc:ref(ban)),
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => <<"Create success">>
|
200 => [{data, hoconsc:mk(hoconsc:array(hoconsc:ref(ban)), #{})}],
|
||||||
|
400 => emqx_dashboard_swagger:error_codes(['ALREADY_EXISTED'],
|
||||||
|
<<"Banned already existed">>)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -135,8 +141,12 @@ banned(get, #{query_string := Params}) ->
|
||||||
Response = emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN),
|
Response = emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN),
|
||||||
{200, Response};
|
{200, Response};
|
||||||
banned(post, #{body := Body}) ->
|
banned(post, #{body := Body}) ->
|
||||||
_ = emqx_banned:create(emqx_banned:parse(Body)),
|
case emqx_banned:create(emqx_banned:parse(Body)) of
|
||||||
{200}.
|
{ok, Banned} ->
|
||||||
|
{200, format(Banned)};
|
||||||
|
{error, {already_exist, Old}} ->
|
||||||
|
{400, #{code => 'ALREADY_EXISTED', message => format(Old)}}
|
||||||
|
end.
|
||||||
|
|
||||||
delete_banned(delete, #{bindings := Params}) ->
|
delete_banned(delete, #{bindings := Params}) ->
|
||||||
case emqx_banned:look_up(Params) of
|
case emqx_banned:look_up(Params) of
|
||||||
|
|
|
@ -155,8 +155,20 @@ config_reset(post, _Params, Req) ->
|
||||||
|
|
||||||
configs(get, Params, _Req) ->
|
configs(get, Params, _Req) ->
|
||||||
Node = maps:get(node, Params, node()),
|
Node = maps:get(node, Params, node()),
|
||||||
Res = rpc:call(Node, ?MODULE, get_full_config, []),
|
case
|
||||||
{200, Res}.
|
lists:member(Node, mria_mnesia:running_nodes())
|
||||||
|
andalso
|
||||||
|
rpc:call(Node, ?MODULE, get_full_config, [])
|
||||||
|
of
|
||||||
|
false ->
|
||||||
|
Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
|
||||||
|
{500, #{code => 'BAD_NODE', message => Message}};
|
||||||
|
{error, {badrpc, R}} ->
|
||||||
|
Message = list_to_binary(io_lib:format("Bad node ~p, reason ~p", [Node, R])),
|
||||||
|
{500, #{code => 'BAD_NODE', message => Message}};
|
||||||
|
Res ->
|
||||||
|
{200, Res}
|
||||||
|
end.
|
||||||
|
|
||||||
conf_path_reset(Req) ->
|
conf_path_reset(Req) ->
|
||||||
<<"/api/v5", ?PREFIX_RESET, Path/binary>> = cowboy_req:path(Req),
|
<<"/api/v5", ?PREFIX_RESET, Path/binary>> = cowboy_req:path(Req),
|
||||||
|
|
|
@ -210,7 +210,7 @@ param_path_id() ->
|
||||||
#{
|
#{
|
||||||
name => id,
|
name => id,
|
||||||
in => path,
|
in => path,
|
||||||
schema => #{type => string},
|
schema => #{type => string, example => emqx_listeners:id_example()},
|
||||||
required => true
|
required => true
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,8 @@ array_schema(Schema, Desc) ->
|
||||||
object_array_schema(Properties) when is_map(Properties) ->
|
object_array_schema(Properties) when is_map(Properties) ->
|
||||||
json_content_schema(#{type => array, items => #{type => object, properties => Properties}}).
|
json_content_schema(#{type => array, items => #{type => object, properties => Properties}}).
|
||||||
object_array_schema(Properties, Desc) ->
|
object_array_schema(Properties, Desc) ->
|
||||||
json_content_schema(#{type => array, items => #{type => object, properties => Properties}}, Desc).
|
json_content_schema(#{type => array,
|
||||||
|
items => #{type => object, properties => Properties}}, Desc).
|
||||||
|
|
||||||
page_schema(Ref) when is_atom(Ref) ->
|
page_schema(Ref) when is_atom(Ref) ->
|
||||||
page_schema(minirest:ref(atom_to_binary(Ref, utf8)));
|
page_schema(minirest:ref(atom_to_binary(Ref, utf8)));
|
||||||
|
@ -201,7 +202,10 @@ batch_operation(Module, Function, ArgsList) ->
|
||||||
Failed = batch_operation(Module, Function, ArgsList, []),
|
Failed = batch_operation(Module, Function, ArgsList, []),
|
||||||
Len = erlang:length(Failed),
|
Len = erlang:length(Failed),
|
||||||
Success = erlang:length(ArgsList) - Len,
|
Success = erlang:length(ArgsList) - Len,
|
||||||
Fun = fun({Args, Reason}, Detail) -> [#{data => Args, reason => io_lib:format("~p", [Reason])} | Detail] end,
|
Fun =
|
||||||
|
fun({Args, Reason}, Detail) ->
|
||||||
|
[#{data => Args, reason => io_lib:format("~p", [Reason])} | Detail]
|
||||||
|
end,
|
||||||
#{success => Success, failed => Len, detail => lists:foldl(Fun, [], Failed)}.
|
#{success => Success, failed => Len, detail => lists:foldl(Fun, [], Failed)}.
|
||||||
|
|
||||||
batch_operation(_Module, _Function, [], Failed) ->
|
batch_operation(_Module, _Function, [], Failed) ->
|
||||||
|
@ -266,6 +270,9 @@ generate_response(QueryResult) ->
|
||||||
case QueryResult of
|
case QueryResult of
|
||||||
{error, page_limit_invalid} ->
|
{error, page_limit_invalid} ->
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
||||||
|
{error, Node, {badrpc, R}} ->
|
||||||
|
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
|
||||||
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
||||||
Response ->
|
Response ->
|
||||||
{200, Response}
|
{200, Response}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -63,10 +63,12 @@ status_api() ->
|
||||||
responses => #{<<"200">> => object_schema(Props)}
|
responses => #{<<"200">> => object_schema(Props)}
|
||||||
},
|
},
|
||||||
put => #{
|
put => #{
|
||||||
description => "Enable or disbale telemetry",
|
description => "Enable or disable telemetry",
|
||||||
'requestBody' => object_schema(Props),
|
'requestBody' => object_schema(Props),
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"200">> => schema(<<"Enable or disbale telemetry successfully">>),
|
<<"200">> =>
|
||||||
|
object_schema(properties([{enable, boolean, <<"">>}]),
|
||||||
|
<<"Enable or disable telemetry successfully">>),
|
||||||
<<"400">> => bad_request()
|
<<"400">> => bad_request()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,10 +79,7 @@ data_api() ->
|
||||||
Metadata = #{
|
Metadata = #{
|
||||||
get => #{
|
get => #{
|
||||||
responses => #{
|
responses => #{
|
||||||
<<"200">> => object_schema(properties(), <<"Get telemetry data">>)
|
<<"200">> => object_schema(properties(), <<"Get telemetry data">>)}}},
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{"/telemetry/data", Metadata, data}.
|
{"/telemetry/data", Metadata, data}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -97,10 +96,10 @@ status(put, #{body := Body}) ->
|
||||||
true -> <<"Telemetry status is already enabled">>;
|
true -> <<"Telemetry status is already enabled">>;
|
||||||
false -> <<"Telemetry status is already disable">>
|
false -> <<"Telemetry status is already disable">>
|
||||||
end,
|
end,
|
||||||
{400, #{code => "BAD_REQUEST", message => Reason}};
|
{400, #{code => 'BAD_REQUEST', message => Reason}};
|
||||||
false ->
|
false ->
|
||||||
enable_telemetry(Enable),
|
enable_telemetry(Enable),
|
||||||
{200}
|
{200, #{<<"enable">> => emqx_telemetry:get_status()}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
data(get, _Request) ->
|
data(get, _Request) ->
|
||||||
|
|
Loading…
Reference in New Issue