refactor(minirest-callback): refactor minirest callback function

This commit is contained in:
Turtle 2021-08-24 15:25:02 +08:00 committed by turtleDeng
parent 0111306224
commit e0c05242a7
27 changed files with 198 additions and 298 deletions

View File

@ -21,13 +21,13 @@
-include("emqx_authn.hrl").
-export([ api_spec/0
, authentication/2
, authenticators/2
, authenticators2/2
, move/2
, import_users/2
, users/2
, users2/2
, authentication/3
, authenticators/3
, authenticators2/3
, move/3
, import_users/3
, users/3
, users2/3
]).
-define(EXAMPLE_1, #{name => <<"example 1">>,
@ -35,7 +35,7 @@
server_type => <<"built-in-database">>,
user_id_type => <<"username">>,
password_hash_algorithm => #{
name => <<"sha256">>
name => <<"sha256">>
}}).
-define(EXAMPLE_2, #{name => <<"example 2">>,
@ -332,7 +332,7 @@ authenticators_api2() ->
oneOf => [ minirest:ref(<<"password_based">>)
, minirest:ref(<<"jwt">>)
, minirest:ref(<<"scram">>)
]
]
},
examples => #{
example1 => #{
@ -633,7 +633,7 @@ users2_api() ->
type => string
},
superuser => #{
type => boolean
type => boolean
}
}
}
@ -746,7 +746,7 @@ definitions() ->
oneOf => [ minirest:ref(<<"password_based">>)
, minirest:ref(<<"jwt">>)
, minirest:ref(<<"scram">>)
]
]
},
ReturnedAuthenticatorDef = #{
@ -763,7 +763,7 @@ definitions() ->
oneOf => [ minirest:ref(<<"password_based">>)
, minirest:ref(<<"jwt">>)
, minirest:ref(<<"scram">>)
]
]
}
]
},
@ -792,7 +792,7 @@ definitions() ->
, minirest:ref(<<"password_based_mongodb">>)
, minirest:ref(<<"password_based_redis">>)
, minirest:ref(<<"password_based_http_server">>)
]
]
}
]
},
@ -840,7 +840,7 @@ definitions() ->
ssl => minirest:ref(<<"ssl">>)
}
},
SCRAMDef = #{
type => object,
required => [name, mechanism, server_type],
@ -1205,7 +1205,7 @@ definitions() ->
type => boolean,
default => true
}
}
}
},
PasswordHashAlgorithmDef = #{
@ -1229,7 +1229,7 @@ definitions() ->
properties => #{
enable => #{
type => boolean,
default => false
default => false
},
certfile => #{
type => string
@ -1289,7 +1289,7 @@ definitions() ->
, #{<<"error">> => ErrorDef}
].
authentication(post, Request) ->
authentication(post, _Params, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
case emqx_json:decode(Body, [return_maps]) of
#{<<"enable">> := Enable} ->
@ -1298,11 +1298,11 @@ authentication(post, Request) ->
_ ->
serialize_error({missing_parameter, enable})
end;
authentication(get, _Request) ->
authentication(get, _Params, _Request) ->
Enabled = emqx_authn:is_enabled(),
{200, #{enabled => Enabled}}.
authenticators(post, Request) ->
authenticators(post, _Params, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Config = emqx_json:decode(Body, [return_maps]),
case emqx_authn:update_config([authentication, authenticators], {create_authenticator, Config}) of
@ -1313,7 +1313,7 @@ authenticators(post, Request) ->
{error, {_, _, Reason}} ->
serialize_error(Reason)
end;
authenticators(get, _Request) ->
authenticators(get, _Params, _Request) ->
RawConfig = get_raw_config([authentication, authenticators]),
{ok, Authenticators} = emqx_authn:list_authenticators(?CHAIN),
NAuthenticators = lists:zipwith(fun(#{<<"name">> := Name} = Config, #{id := ID, name := Name}) ->
@ -1321,7 +1321,7 @@ authenticators(get, _Request) ->
end, RawConfig, Authenticators),
{200, NAuthenticators}.
authenticators2(get, Request) ->
authenticators2(get, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
case emqx_authn:lookup_authenticator(?CHAIN, AuthenticatorID) of
{ok, #{id := ID, name := Name}} ->
@ -1331,7 +1331,7 @@ authenticators2(get, Request) ->
{error, Reason} ->
serialize_error(Reason)
end;
authenticators2(put, Request) ->
authenticators2(put, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
Config = emqx_json:decode(Body, [return_maps]),
@ -1344,7 +1344,7 @@ authenticators2(put, Request) ->
{error, {_, _, Reason}} ->
serialize_error(Reason)
end;
authenticators2(delete, Request) ->
authenticators2(delete, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
case emqx_authn:update_config([authentication, authenticators], {delete_authenticator, AuthenticatorID}) of
{ok, _} ->
@ -1353,7 +1353,7 @@ authenticators2(delete, Request) ->
serialize_error(Reason)
end.
move(post, Request) ->
move(post, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
case emqx_json:decode(Body, [return_maps]) of
@ -1366,7 +1366,7 @@ move(post, Request) ->
serialize_error({missing_parameter, position})
end.
import_users(post, Request) ->
import_users(post, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
case emqx_json:decode(Body, [return_maps]) of
@ -1379,7 +1379,7 @@ import_users(post, Request) ->
serialize_error({missing_parameter, filename})
end.
users(post, Request) ->
users(post, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
case emqx_json:decode(Body, [return_maps]) of
@ -1399,7 +1399,7 @@ users(post, Request) ->
_ ->
serialize_error({missing_parameter, user_id})
end;
users(get, Request) ->
users(get, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
case emqx_authn:list_users(?CHAIN, AuthenticatorID) of
{ok, Users} ->
@ -1408,7 +1408,7 @@ users(get, Request) ->
serialize_error(Reason)
end.
users2(patch, Request) ->
users2(patch, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
UserID = cowboy_req:binding(user_id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
@ -1425,7 +1425,7 @@ users2(patch, Request) ->
serialize_error(Reason)
end
end;
users2(get, Request) ->
users2(get, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
UserID = cowboy_req:binding(user_id, Request),
case emqx_authn:lookup_user(?CHAIN, AuthenticatorID, UserID) of
@ -1434,7 +1434,7 @@ users2(get, Request) ->
{error, Reason} ->
serialize_error(Reason)
end;
users2(delete, Request) ->
users2(delete, _Params, Request) ->
AuthenticatorID = cowboy_req:binding(id, Request),
UserID = cowboy_req:binding(user_id, Request),
case emqx_authn:delete_user(?CHAIN, AuthenticatorID, UserID) of
@ -1467,4 +1467,4 @@ serialize_error({invalid_parameter, Name}) ->
)}};
serialize_error(Reason) ->
{400, #{code => <<"BAD_REQUEST">>,
message => list_to_binary(io_lib:format("Todo: ~p", [Reason]))}}.
message => list_to_binary(io_lib:format("Todo: ~p", [Reason]))}}.

View File

@ -40,9 +40,9 @@
topics => [<<"#">>]}).
-export([ api_spec/0
, rules/2
, rule/2
, move_rule/2
, rules/3
, rule/3
, move_rule/3
]).
api_spec() ->
@ -418,7 +418,7 @@ move_rule_api() ->
},
{"/authorization/:id/move", Metadata, move_rule}.
rules(get, Request) ->
rules(get, _Params, Request) ->
Rules = lists:foldl(fun (#{type := _Type, enable := true, annotations := #{id := Id} = Annotations} = Rule, AccIn) ->
NRule = case emqx_resource:health_check(Id) of
ok ->
@ -445,7 +445,7 @@ rules(get, Request) ->
end;
false -> {200, #{rules => Rules}}
end;
rules(post, Request) ->
rules(post, _Params, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
RawConfig = jsx:decode(Body, [return_maps]),
case emqx_authz:update(head, [RawConfig]) of
@ -454,7 +454,7 @@ rules(post, Request) ->
{400, #{code => <<"BAD_REQUEST">>,
messgae => atom_to_binary(Reason)}}
end;
rules(put, Request) ->
rules(put, _Params, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
RawConfig = jsx:decode(Body, [return_maps]),
case emqx_authz:update(replace, RawConfig) of
@ -464,7 +464,7 @@ rules(put, Request) ->
messgae => atom_to_binary(Reason)}}
end.
rule(get, Request) ->
rule(get, _Params, Request) ->
Id = cowboy_req:binding(id, Request),
case emqx_authz:lookup(Id) of
{error, Reason} -> {404, #{messgae => atom_to_binary(Reason)}};
@ -481,7 +481,7 @@ rule(get, Request) ->
end
end;
rule(put, Request) ->
rule(put, _Params, Request) ->
RuleId = cowboy_req:binding(id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
RawConfig = jsx:decode(Body, [return_maps]),
@ -494,7 +494,7 @@ rule(put, Request) ->
{400, #{code => <<"BAD_REQUEST">>,
messgae => atom_to_binary(Reason)}}
end;
rule(delete, Request) ->
rule(delete, _Params, Request) ->
RuleId = cowboy_req:binding(id, Request),
case emqx_authz:update({replace_once, RuleId}, #{}) of
{ok, _} -> {204};
@ -502,7 +502,7 @@ rule(delete, Request) ->
{400, #{code => <<"BAD_REQUEST">>,
messgae => atom_to_binary(Reason)}}
end.
move_rule(post, Request) ->
move_rule(post, _Params, Request) ->
RuleId = cowboy_req:binding(id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
#{<<"position">> := Position} = jsx:decode(Body, [return_maps]),

View File

@ -31,9 +31,7 @@ definitions() ->
},
principal => minirest:ref(<<"principal">>)
}
}
}
}
, minirest:ref(<<"rules">>)

View File

@ -158,9 +158,7 @@ change_pwd_api() ->
},
{"/users/:username/change_pwd", Metadata, change_pwd}.
login(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
login(post, #{body := Params}) ->
Username = maps:get(<<"username">>, Params),
Password = maps:get(<<"password">>, Params),
case emqx_dashboard_admin:sign_token(Username, Password) of
@ -171,9 +169,7 @@ login(post, Request) ->
{401, #{code => Code, message => <<"Auth filed">>}}
end.
logout(_, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
logout(_, #{body := Params}) ->
Username = maps:get(<<"username">>, Params),
emqx_dashboard_admin:destroy_token_by_username(Username),
{200}.
@ -181,9 +177,7 @@ logout(_, Request) ->
users(get, _Request) ->
{200, [row(User) || User <- emqx_dashboard_admin:all_users()]};
users(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
users(post, #{body := Params}) ->
Tag = maps:get(<<"tag">>, Params),
Username = maps:get(<<"username">>, Params),
Password = maps:get(<<"password">>, Params),
@ -199,10 +193,7 @@ users(post, Request) ->
end
end.
user(put, Request) ->
Username = cowboy_req:binding(username, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
user(put, #{bindings := #{username := Username}, body := Params}) ->
Tag = maps:get(<<"tag">>, Params),
case emqx_dashboard_admin:update_user(Username, Tag) of
ok -> {200};
@ -210,8 +201,7 @@ user(put, Request) ->
{400, #{code => <<"UPDATE_FAIL">>, message => Reason}}
end;
user(delete, Request) ->
Username = cowboy_req:binding(username, Request),
user(delete, #{bindings := #{username := Username}}) ->
case Username == <<"admin">> of
true -> {400, #{code => <<"CONNOT_DELETE_ADMIN">>,
message => <<"Cannot delete admin">>}};
@ -220,10 +210,7 @@ user(delete, Request) ->
{200}
end.
change_pwd(put, Request) ->
Username = cowboy_req:binding(username, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
change_pwd(put, #{bindings := #{username := Username}, body := Params}) ->
OldPwd = maps:get(<<"old_pwd">>, Params),
NewPwd = maps:get(<<"new_pwd">>, Params),
case emqx_dashboard_admin:change_password(Username, OldPwd, NewPwd) of

View File

@ -145,24 +145,20 @@ counter_schema() ->
type => integer}}}}.
%%%==============================================================================================
%% parameters trans
monitor(get, Request) ->
Aggregate = proplists:get_value(<<"aggregate">>, cowboy_req:parse_qs(Request), <<"false">>),
monitor(get, #{query_string := Qs}) ->
Aggregate = maps:get(<<"aggregate">>, Qs, <<"false">>),
{200, list_collect(Aggregate)}.
monitor_nodes(get, Request) ->
Node = cowboy_req:binding(node, Request),
monitor_nodes(get, #{bindings := #{node := Node}}) ->
lookup([{<<"node">>, Node}]).
monitor_nodes_counters(get, Request) ->
Node = cowboy_req:binding(node, Request),
Counter = cowboy_req:binding(counter, Request),
monitor_nodes_counters(get, #{bindings := #{node := Node, counter := Counter}}) ->
lookup([{<<"node">>, Node}, {<<"counter">>, Counter}]).
counters(get, Request) ->
Counter = cowboy_req:binding(counter, Request),
counters(get, #{bindings := #{counter := Counter}}) ->
lookup([{<<"counter">>, Counter}]).
current_counters(get, _) ->
current_counters(get, _Params) ->
Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()],
Nodes = length(ekka_mnesia:running_nodes()),
{Received, Sent, Sub, Conn} = format_current_metrics(Data),

View File

@ -33,8 +33,8 @@
paginate(Tables, Params, RowFun) ->
Qh = query_handle(Tables),
Count = count(Tables),
Page = page(Params),
Limit = limit(Params),
Page = b2i(page(Params)),
Limit = b2i(limit(Params)),
Cursor = qlc:cursor(Qh),
case Page > 1 of
true ->
@ -64,14 +64,15 @@ count(Tables) ->
count(Table, Nodes) ->
lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]).
page(Params) when is_map(Params) ->
maps:get(<<"page">>, Params, 1);
page(Params) ->
binary_to_integer(proplists:get_value(<<"page">>, Params, <<"1">>)).
proplists:get_value(<<"page">>, Params, <<"1">>).
limit(Params) when is_map(Params) ->
maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit());
limit(Params) ->
case proplists:get_value(<<"limit">>, Params) of
undefined -> emqx_mgmt:max_row_limit();
Size -> binary_to_integer(Size)
end.
proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
%%--------------------------------------------------------------------
%% Node Query
@ -79,8 +80,8 @@ limit(Params) ->
node_query(Node, Params, {Tab, QsSchema}, QueryFun) ->
{CodCnt, Qs} = params2qs(Params, QsSchema),
Limit = limit(Params),
Page = page(Params),
Limit = b2i(limit(Params)),
Page = b2i(page(Params)),
Start = if Page > 1 -> (Page-1) * Limit;
true -> 0
end,
@ -111,8 +112,8 @@ rpc_call(Node, M, F, A, T) ->
cluster_query(Params, {Tab, QsSchema}, QueryFun) ->
{CodCnt, Qs} = params2qs(Params, QsSchema),
Limit = limit(Params),
Page = page(Params),
Limit = b2i(limit(Params)),
Page = b2i(page(Params)),
Start = if Page > 1 -> (Page-1) * Limit;
true -> 0
end,
@ -199,6 +200,8 @@ select_n_by_one({Rows0, Cons}, Start, Limit, Acc) ->
select_n_by_one(ets:select(Cons), 0, NLimit, [Got|Acc])
end.
params2qs(Params, QsSchema) when is_map(Params) ->
params2qs(maps:to_list(Params), QsSchema);
params2qs(Params, QsSchema) ->
{Qs, Fuzzy} = pick_params_to_qs(Params, QsSchema, [], []),
{length(Qs) + length(Fuzzy), {Qs, Fuzzy}}.
@ -342,3 +345,9 @@ params2qs_test() ->
{0, {[], []}} = params2qs([{not_a_predefined_params, val}], Schema).
-endif.
b2i(Bin) when is_binary(Bin) ->
binary_to_integer(Bin);
b2i(Any) ->
Any.

View File

@ -68,28 +68,18 @@ alarms_api() ->
%%%==============================================================================================
%% parameters trans
alarms(get, Request) ->
Params = cowboy_req:parse_qs(Request),
list(Params);
alarms(delete, _Request) ->
delete().
%%%==============================================================================================
%% api apply
list(Params) ->
alarms(get, #{query_string := Qs}) ->
{Table, Function} =
case proplists:get_value(<<"activated">>, Params, <<"true">>) of
case maps:get(<<"activated">>, Qs, <<"true">>) of
<<"true">> ->
{?ACTIVATED_ALARM, query_activated};
<<"false">> ->
{?DEACTIVATED_ALARM, query_deactivated}
end,
Params1 = proplists:delete(<<"activated">>, Params),
Response = emqx_mgmt_api:cluster_query(Params1, {Table, []}, {?MODULE, Function}),
{200, Response}.
Response = emqx_mgmt_api:cluster_query(Qs, {Table, []}, {?MODULE, Function}),
{200, Response};
delete() ->
alarms(delete, _Params) ->
_ = emqx_mgmt:delete_all_deactivated_alarms(),
{200}.

View File

@ -120,12 +120,10 @@ app_api() ->
%%%==============================================================================================
%% parameters trans
apps(get, _Request) ->
apps(get, _Params) ->
list(#{});
apps(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Data = emqx_json:decode(Body, [return_maps]),
apps(post, #{body := Data}) ->
Parameters = #{
app_id => maps:get(<<"app_id">>, Data),
name => maps:get(<<"name">>, Data),
@ -136,18 +134,13 @@ apps(post, Request) ->
},
create(Parameters).
app(get, Request) ->
AppID = cowboy_req:binding(app_id, Request),
app(get, #{bindings := #{app_id := AppID}}) ->
lookup(#{app_id => AppID});
app(delete, Request) ->
AppID = cowboy_req:binding(app_id, Request),
app(delete, #{bindings := #{app_id := AppID}}) ->
delete(#{app_id => AppID});
app(put, Request) ->
AppID = cowboy_req:binding(app_id, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
Data = emqx_json:decode(Body, [return_maps]),
app(put, #{bindings := #{app_id := AppID}, body := Data}) ->
Parameters = #{
app_id => AppID,
name => maps:get(<<"name">>, Data),

View File

@ -454,46 +454,32 @@ subscribe_api() ->
%%%==============================================================================================
%% parameters trans
clients(get, Request) ->
Params = cowboy_req:parse_qs(Request),
list(Params).
clients(get, #{query_string := Qs}) ->
list(Qs).
client(get, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
lookup(#{clientid => ClientID});
client(get, #{bindings := Bindings}) ->
lookup(Bindings);
client(delete, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
kickout(#{clientid => ClientID}).
client(delete, #{bindings := Bindings}) ->
kickout(Bindings).
authz_cache(get, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
get_authz_cache(#{clientid => ClientID});
authz_cache(get, #{bindings := Bindings}) ->
get_authz_cache(Bindings);
authz_cache(delete, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
clean_authz_cache(#{clientid => ClientID}).
authz_cache(delete, #{bindings := Bindings}) ->
clean_authz_cache(Bindings).
subscribe(post, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
TopicInfo = emqx_json:decode(Body, [return_maps]),
subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Topic = maps:get(<<"topic">>, TopicInfo),
Qos = maps:get(<<"qos">>, TopicInfo, 0),
subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}).
unsubscribe(post, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
TopicInfo = emqx_json:decode(Body, [return_maps]),
unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Topic = maps:get(<<"topic">>, TopicInfo),
unsubscribe(#{clientid => ClientID, topic => Topic}).
%% TODO: batch
subscribe_batch(post, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
TopicInfos = emqx_json:decode(Body, [return_maps]),
subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
Topics =
[begin
Topic = maps:get(<<"topic">>, TopicInfo),
@ -502,8 +488,7 @@ subscribe_batch(post, Request) ->
end || TopicInfo <- TopicInfos],
subscribe_batch(#{clientid => ClientID, topics => Topics}).
subscriptions(get, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
{Node, Subs0} = emqx_mgmt:list_client_subscriptions(ClientID),
Subs = lists:map(fun({Topic, SubOpts}) ->
#{node => Node, clientid => ClientID, topic => Topic, qos => maps:get(qos, SubOpts)}
@ -514,13 +499,13 @@ subscriptions(get, Request) ->
%% api apply
list(Params) ->
case proplists:get_value(<<"node">>, Params, undefined) of
case maps:get(<<"node">>, Params, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun),
{200, Response};
Node1 ->
Node = binary_to_atom(Node1, utf8),
Response = emqx_mgmt_api:node_query(Node, proplists:delete(<<"node">>, Params), ?CLIENT_QS_SCHEMA, ?query_fun),
Response = emqx_mgmt_api:node_query(Node, Params, ?CLIENT_QS_SCHEMA, ?query_fun),
{200, Response}
end.

View File

@ -25,8 +25,8 @@
-export([api_spec/0]).
-export([ config/2
, config_reset/2
-export([ config/3
, config_reset/3
]).
-export([get_conf_schema/2, gen_schema/1]).
@ -100,7 +100,7 @@ config_reset_api() ->
%%%==============================================================================================
%% parameters trans
config(get, Req) ->
config(get, _Params, Req) ->
Path = conf_path(Req),
case emqx_map_lib:deep_find(Path, get_full_config()) of
{ok, Conf} ->
@ -109,13 +109,13 @@ config(get, Req) ->
{404, #{code => 'NOT_FOUND', message => <<"Config cannot found">>}}
end;
config(put, Req) ->
config(put, _Params, Req) ->
Path = conf_path(Req),
{ok, #{raw_config := RawConf}} = emqx:update_config(Path, http_body(Req),
#{rawconf_with_defaults => true}),
{200, emqx_map_lib:jsonable_map(RawConf)}.
config_reset(post, Req) ->
config_reset(post, _Params, Req) ->
%% reset the config specified by the query string param 'conf_path'
Path = conf_path_reset(Req) ++ conf_path_from_querystr(Req),
case emqx:reset_config(Path, #{}) of

View File

@ -22,10 +22,7 @@
-export([ listeners/2
, listener/2
, node_listener/2
, node_listeners/2
, manage_listeners/2
, manage_nodes_listeners/2]).
, manage_listeners/2]).
-import(emqx_mgmt_util, [ schema/1
, schema/2
@ -121,7 +118,7 @@ manage_nodes_listeners_api() ->
error_schema(<<"Listener id not found">>, ['BAD_REQUEST']),
<<"200">> =>
schema(<<"Operation success">>)}}},
{"/node/:node/listeners/:id/:operation", Metadata, manage_nodes_listeners}.
{"/node/:node/listeners/:id/:operation", Metadata, manage_listeners}.
nodes_listeners_api() ->
Metadata = #{
@ -134,7 +131,7 @@ nodes_listeners_api() ->
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
<<"200">> =>
schema(properties(), <<"Get listener info ok">>)}}},
{"/nodes/:node/listeners/:id", Metadata, node_listener}.
{"/nodes/:node/listeners/:id", Metadata, listener}.
nodes_listener_api() ->
Metadata = #{
@ -144,7 +141,7 @@ nodes_listener_api() ->
responses => #{
<<"404">> => error_schema(<<"Listener id not found">>),
<<"200">> => object_schema(properties(), <<"Get listener info ok">>)}}},
{"/nodes/:node/listeners", Metadata, node_listeners}.
{"/nodes/:node/listeners", Metadata, listeners}.
%%%==============================================================================================
%% parameters
param_path_node() ->
@ -182,29 +179,11 @@ param_path_operation()->
listeners(get, _Request) ->
list().
listener(get, Request) ->
ID = b2a(cowboy_req:binding(id, Request)),
get_listeners(#{id => ID}).
listener(get, #{bindings := Bindings}) ->
get_listeners(Bindings).
node_listeners(get, Request) ->
Node = b2a(cowboy_req:binding(node, Request)),
get_listeners(#{node => Node}).
node_listener(get, Request) ->
Node = b2a(cowboy_req:binding(node, Request)),
ID = b2a(cowboy_req:binding(id, Request)),
get_listeners(#{node => Node, id => ID}).
manage_listeners(_, Request) ->
ID = b2a(cowboy_req:binding(id, Request)),
Operation = b2a(cowboy_req:binding(operation, Request)),
manage(Operation, #{id => ID}).
manage_nodes_listeners(_, Request) ->
Node = b2a(cowboy_req:binding(node, Request)),
ID = b2a(cowboy_req:binding(id, Request)),
Operation = b2a(cowboy_req:binding(operation, Request)),
manage(Operation, #{id => ID, node => Node}).
manage_listeners(_, #{bindings := Bindings}) ->
manage(Bindings).
%%%==============================================================================================
@ -215,37 +194,39 @@ list() ->
get_listeners(Param) ->
case list_listener(Param) of
{error, not_found} ->
ID = maps:get(id, Param),
ID = b2a(maps:get(id, Param)),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
{error, nodedown} ->
Node = maps:get(node, Param),
Node = b2a(maps:get(node, Param)),
Reason = iolist_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Response = #{code => 'BAD_NODE_NAME', message => Reason},
{404, Response};
[] ->
ID = maps:get(id, Param),
ID = b2a(maps:get(id, Param)),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
Data ->
{200, Data}
end.
manage(Operation0, Param) ->
OperationMap = #{start => start_listener, stop => stop_listener, restart => restart_listener},
Operation = maps:get(Operation0, OperationMap),
manage(Param) ->
OperationMap = #{start => start_listener,
stop => stop_listener,
restart => restart_listener},
Operation = maps:get(b2a(maps:get(operation, Param)), OperationMap),
case list_listener(Param) of
{error, not_found} ->
ID = maps:get(id, Param),
ID = b2a(maps:get(id, Param)),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
{error, nodedown} ->
Node = maps:get(node, Param),
Node = b2a(maps:get(node, Param)),
Reason = iolist_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Response = #{code => 'BAD_NODE_NAME', message => Reason},
{404, Response};
[] ->
ID = maps:get(id, Param),
ID = b2a(maps:get(id, Param)),
Reason = iolist_to_binary(io_lib:format("Error listener id ~p", [ID])),
{404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}};
ListenersOrSingleListener ->
@ -318,4 +299,5 @@ trans_running(Conf) ->
end.
b2a(B) when is_binary(B) -> binary_to_atom(B, utf8).
b2a(B) when is_binary(B) -> binary_to_atom(B, utf8);
b2a(Any) -> Any.

View File

@ -312,9 +312,8 @@ metrics_api() ->
%%%==============================================================================================
%% api apply
list(get, Request) ->
Params = cowboy_req:parse_qs(Request),
case proplists:get_value(<<"aggregate">>, Params, undefined) of
list(get, #{query_string := Qs}) ->
case maps:get(<<"aggregate">>, Qs, undefined) of
<<"true">> ->
{200, emqx_mgmt:get_metrics()};
_ ->

View File

@ -115,20 +115,17 @@ node_stats_api() ->
%%%==============================================================================================
%% parameters trans
nodes(get, _Request) ->
nodes(get, _Params) ->
list(#{}).
node(get, Request) ->
Params = node_name_path_parameter(Request),
get_node(Params).
node(get, #{bingings := #{node_name := NodeName}}) ->
get_node(binary_to_atom(NodeName, utf8)).
node_metrics(get, Request) ->
Params = node_name_path_parameter(Request),
get_metrics(Params).
node_metrics(get, #{bingings := #{node_name := NodeName}}) ->
get_metrics(binary_to_atom(NodeName, utf8)).
node_stats(get, Request) ->
Params = node_name_path_parameter(Request),
get_stats(Params).
node_stats(get, #{bingings := #{node_name := NodeName}}) ->
get_stats(binary_to_atom(NodeName, utf8)).
%%%==============================================================================================
%% api apply
@ -136,7 +133,7 @@ list(#{}) ->
NodesInfo = [format(Node, NodeInfo) || {Node, NodeInfo} <- emqx_mgmt:list_nodes()],
{200, NodesInfo}.
get_node(#{node := Node}) ->
get_node(Node) ->
case emqx_mgmt:lookup_node(Node) of
#{node_status := 'ERROR'} ->
{400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}};
@ -144,7 +141,7 @@ get_node(#{node := Node}) ->
{200, format(Node, NodeInfo)}
end.
get_metrics(#{node := Node}) ->
get_metrics(Node) ->
case emqx_mgmt:get_metrics(Node) of
{error, _} ->
{400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}};
@ -152,7 +149,7 @@ get_metrics(#{node := Node}) ->
{200, Metrics}
end.
get_stats(#{node := Node}) ->
get_stats(Node) ->
case emqx_mgmt:get_stats(Node) of
{error, _} ->
{400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}};
@ -162,10 +159,6 @@ get_stats(#{node := Node}) ->
%%============================================================================================================
%% internal function
node_name_path_parameter(Request) ->
NodeName = cowboy_req:binding(node_name, Request),
Node = binary_to_atom(NodeName, utf8),
#{node => Node}.
format(_Node, Info = #{memory_total := Total, memory_used := Used}) ->
{ok, SysPathBinary} = file:get_cwd(),

View File

@ -62,15 +62,13 @@ properties() ->
{retain, boolean, <<"Retain message flag, nullable, default false">>}
]).
publish(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Message = message(emqx_json:decode(Body, [return_maps])),
publish(post, #{body := Body}) ->
Message = message(Body),
_ = emqx_mgmt:publish(Message),
{200, format_message(Message)}.
publish_batch(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Messages = messages(emqx_json:decode(Body, [return_maps])),
publish_batch(post, #{body := Body}) ->
Messages = messages(Body),
_ = [emqx_mgmt:publish(Message) || Message <- Messages],
{200, format_message(Messages)}.

View File

@ -82,13 +82,11 @@ route_api() ->
%%%==============================================================================================
%% parameters trans
routes(get, Request) ->
Params = cowboy_req:parse_qs(Request),
list(Params).
routes(get, #{query_string := Qs}) ->
list(Qs).
route(get, Request) ->
Topic = cowboy_req:binding(topic, Request),
lookup(#{topic => Topic}).
route(get, #{bindings := Bindings}) ->
lookup(Bindings).
%%%==============================================================================================
%% api apply

View File

@ -130,9 +130,8 @@ stats_api() ->
%%%==============================================================================================
%% api apply
list(get, Request) ->
Params = cowboy_req:parse_qs(Request),
case proplists:get_value(<<"aggregate">>, Params, undefined) of
list(get, #{query_string := Qs}) ->
case maps:get(<<"aggregate">>, Qs, undefined) of
<<"true">> ->
{200, emqx_mgmt:get_stats()};
_ ->

View File

@ -36,7 +36,7 @@ status_api() ->
},
{Path, Metadata, running_status}.
running_status(get, _Request) ->
running_status(get, _Params) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
AppStatus =
case lists:keysearch(emqx, 1, application:which_applications()) of

View File

@ -106,12 +106,11 @@ parameters() ->
} | page_params()
].
subscriptions(get, Request) ->
Params = cowboy_req:parse_qs(Request),
subscriptions(get, #{query_string := Params}) ->
list(Params).
list(Params) ->
case proplists:get_value(<<"node">>, Params, undefined) of
case maps:get(<<"node">>, Params, undefined) of
undefined ->
{200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)};
Node ->

View File

@ -129,20 +129,16 @@ delayed_message_api() ->
%%--------------------------------------------------------------------
%% HTTP API
%%--------------------------------------------------------------------
status(get, _Request) ->
status(get, _Params) ->
{200, get_status()};
status(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Config = emqx_json:decode(Body, [return_maps]),
update_config(Config).
status(put, #{body := Body}) ->
update_config(Body).
delayed_messages(get, Request) ->
Qs = cowboy_req:parse_qs(Request),
delayed_messages(get, #{query_string := Qs}) ->
{200, emqx_delayed:list(Qs)}.
delayed_message(get, Request) ->
Id = cowboy_req:binding(id, Request),
delayed_message(get, #{bindings := #{id := Id}}) ->
case emqx_delayed:get_delayed_message(Id) of
{ok, Message} ->
Payload = maps:get(payload, Message),
@ -156,8 +152,7 @@ delayed_message(get, Request) ->
Message = iolist_to_binary(io_lib:format("Message ID ~p not found", [Id])),
{404, #{code => ?MESSAGE_ID_NOT_FOUND, message => Message}}
end;
delayed_message(delete, Request) ->
Id = cowboy_req:binding(id, Request),
delayed_message(delete, #{bindings := #{id := Id}}) ->
_ = emqx_delayed:delete_delayed_message(Id),
{200}.

View File

@ -50,11 +50,9 @@ event_message_api() ->
},
{Path, Metadata, event_message}.
event_message(get, _Request) ->
event_message(get, _Params) ->
{200, emqx_event_message:list()};
event_message(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
_ = emqx_event_message:update(Params),
event_message(post, #{body := Body}) ->
_ = emqx_event_message:update(Body),
{200, emqx_event_message:list()}.

View File

@ -60,15 +60,13 @@ rewrite_api() ->
},
{Path, Metadata, topic_rewrite}.
topic_rewrite(get, _Request) ->
topic_rewrite(get, _Params) ->
{200, emqx_rewrite:list()};
topic_rewrite(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
case length(Params) < ?MAX_RULES_LIMIT of
topic_rewrite(post, #{body := Body}) ->
case length(Body) < ?MAX_RULES_LIMIT of
true ->
ok = emqx_rewrite:update(Params),
ok = emqx_rewrite:update(Body),
{200, emqx_rewrite:list()};
_ ->
Message = iolist_to_binary(io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])),

View File

@ -86,13 +86,11 @@ data_api() ->
%%--------------------------------------------------------------------
%% HTTP API
%%--------------------------------------------------------------------
status(get, _Request) ->
status(get, _Params) ->
{200, get_telemetry_status()};
status(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
Enable = maps:get(<<"enable">>, Params),
status(put, #{body := Body}) ->
Enable = maps:get(<<"enable">>, Body),
case Enable =:= emqx_telemetry:get_status() of
true ->
Reason = case Enable of

View File

@ -151,9 +151,6 @@ topic_param() ->
schema => #{type => string}
}.
topic_param(Request) ->
cowboy_req:binding(topic, Request).
%%--------------------------------------------------------------------
%% api callback
list_topic(get, _) ->
@ -162,8 +159,7 @@ list_topic(get, _) ->
list_topic_metrics(get, _) ->
list_metrics().
operate_topic_metrics(Method, Request) ->
Topic = topic_param(Request),
operate_topic_metrics(Method, #{bindings := #{topic := Topic}}) ->
case Method of
get ->
get_metrics(Topic);
@ -176,8 +172,7 @@ operate_topic_metrics(Method, Request) ->
reset_all_topic_metrics(put, _) ->
reset().
reset_topic_metrics(put, Request) ->
Topic = topic_param(Request),
reset_topic_metrics(put, #{bindings := #{topic := Topic}}) ->
reset(Topic).
%%--------------------------------------------------------------------

View File

@ -77,14 +77,12 @@ prometheus_api() ->
% },
% {"/prometheus/stats", Metadata, stats}.
prometheus(get, _Request) ->
prometheus(get, _Params) ->
{200, emqx:get_raw_config([<<"prometheus">>], #{})};
prometheus(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
{ok, Config} = emqx:update_config([prometheus], Params),
case maps:get(<<"enable">>, Params) of
prometheus(put, #{body := Body}) ->
{ok, Config} = emqx:update_config([prometheus], Body),
case maps:get(<<"enable">>, Body) of
true ->
_ = emqx_prometheus_sup:stop_child(?APP),
emqx_prometheus_sup:start_child(?APP, maps:get(config, Config));

View File

@ -122,23 +122,19 @@ config_api() ->
},
{"/mqtt/retainer", MetaData, config}.
lookup_retained_warp(Type, Req) ->
check_backend(Type, Req, fun lookup_retained/2).
lookup_retained_warp(Type, Params) ->
check_backend(Type, Params, fun lookup_retained/2).
with_topic_warp(Type, Req) ->
check_backend(Type, Req, fun with_topic/2).
with_topic_warp(Type, Params) ->
check_backend(Type, Params, fun with_topic/2).
config(get, _) ->
Config = emqx:get_config([mqtt_retainer]),
Body = emqx_json:encode(Config),
{200, Body};
{200, emqx:get_raw_config([emqx_retainer])};
config(put, Req) ->
config(put, #{body := Body}) ->
try
{ok, Body, _} = cowboy_req:read_body(Req),
Cfg = emqx_json:decode(Body),
emqx_retainer:update_config(Cfg),
{200, #{<<"content-type">> => <<"text/plain">>}, <<"Update configs successfully">>}
ok = emqx_retainer:update_config(Body),
{200, emqx:get_raw_config([emqx_retainer])}
catch _:Reason:_ ->
{400,
#{code => 'UPDATE_FAILED',
@ -148,27 +144,25 @@ config(put, Req) ->
%%------------------------------------------------------------------------------
%% Interval Funcs
%%------------------------------------------------------------------------------
lookup_retained(get, Req) ->
lookup(undefined, Req, fun format_message/1).
lookup_retained(get, Params) ->
lookup(undefined, Params, fun format_message/1).
with_topic(get, Req) ->
Topic = cowboy_req:binding(topic, Req),
lookup(Topic, Req, fun format_detail_message/1);
with_topic(get, #{bindings := Bindings} = Params) ->
Topic = maps:get(topic, Bindings),
lookup(Topic, Params, fun format_detail_message/1);
with_topic(delete, Req) ->
Topic = cowboy_req:binding(topic, Req),
with_topic(delete, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings),
emqx_retainer_mnesia:delete_message(undefined, Topic),
{200}.
-spec lookup(undefined | binary(),
cowboy_req:req(),
map(),
fun((#message{}) -> map())) ->
{200, map()}.
lookup(Topic, Req, Formatter) ->
#{page := Page,
limit := Limit} = cowboy_req:match_qs([{page, int, 1},
{limit, int, emqx_mgmt:max_row_limit()}],
Req),
lookup(Topic, #{query_string := Qs}, Formatter) ->
Page = maps:get(page, Qs, 1),
Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
{200, format_message(Msgs, Formatter)}.
@ -197,10 +191,10 @@ to_bin_string(Data) when is_binary(Data) ->
to_bin_string(Data) ->
list_to_binary(io_lib:format("~p", [Data])).
check_backend(Type, Req, Cont) ->
check_backend(Type, Params, Cont) ->
case emqx:get_config([emqx_retainer, config, type]) of
built_in_database ->
Cont(Type, Req);
Cont(Type, Params);
_ ->
{405,
#{<<"content-type">> => <<"text/plain">>},

View File

@ -51,14 +51,12 @@ statsd_api() ->
},
[{"/statsd", Metadata, statsd}].
statsd(get, _Request) ->
statsd(get, _Params) ->
{200, emqx:get_raw_config([<<"statsd">>], #{})};
statsd(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
{ok, Config} = emqx:update_config([statsd], Params),
case maps:get(<<"enable">>, Params) of
statsd(put, #{body := Body}) ->
{ok, Config} = emqx:update_config([statsd], Body),
case maps:get(<<"enable">>, Body) of
true ->
_ = emqx_statsd_sup:stop_child(?APP),
emqx_statsd_sup:start_child(?APP, maps:get(config, Config));

View File

@ -51,7 +51,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.7"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.0"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
, {replayq, "0.3.3"}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}