refactor: clients api; status api; adapter minirest v1

The serious influence:
authn:
api
authz:
api; api test suit
dashboard:
all closed
lwm2m:
api;
modules:
api(api_topic_metrics, modules_api); test suit(emqx_modules_SUITE)
prometheus:
api
retainer:
api; api test suit
rule_engine:
api: api test suit
telemetry:
api
This commit is contained in:
DDDHuang 2021-07-12 10:05:51 +08:00 committed by GitHub
commit d04e1c6f54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 798 additions and 1831 deletions

View File

@ -49,7 +49,7 @@ emqx_test(){
exit 1
fi
IDLE_TIME=0
while ! curl http://localhost:8081/status >/dev/null 2>&1; do
while ! curl http://localhost:8081/api/v5/status >/dev/null 2>&1; do
if [ $IDLE_TIME -gt 10 ]
then
echo "emqx running error"
@ -138,7 +138,7 @@ EOF
exit 1
fi
IDLE_TIME=0
while ! curl http://localhost:8081/status >/dev/null 2>&1; do
while ! curl http://localhost:8081/api/v5/status >/dev/null 2>&1; do
if [ $IDLE_TIME -gt 10 ]
then
echo "emqx running error"
@ -159,7 +159,7 @@ EOF
exit 1
fi
IDLE_TIME=0
while ! curl http://localhost:8081/status >/dev/null 2>&1; do
while ! curl http://localhost:8081/api/v5/status >/dev/null 2>&1; do
if [ $IDLE_TIME -gt 10 ]
then
echo "emqx service error"

View File

@ -186,7 +186,7 @@ jobs:
./emqx/bin/emqx start || cat emqx/log/erlang.log.1
ready='no'
for i in {1..10}; do
if curl -fs 127.0.0.1:8081/status > /dev/null; then
if curl -fs 127.0.0.1:8081/api/v5/status > /dev/null; then
ready='yes'
break
fi

View File

@ -112,7 +112,7 @@ jobs:
./emqx/bin/emqx start || cat emqx/log/erlang.log.1
ready='no'
for i in {1..10}; do
if curl -fs 127.0.0.1:8081/status > /dev/null; then
if curl -fs 127.0.0.1:8081/api/v5/status > /dev/null; then
ready='yes'
break
fi

View File

@ -40,8 +40,6 @@
, list_users/2
]).
-import(minirest, [return/1]).
-rest_api(#{name => create_chain,
method => 'POST',
path => "/authentication/chains",
@ -542,3 +540,7 @@ get_missed_params(Actual, Expected) ->
end
end, [], Expected),
lists:reverse(Keys).
return(_) ->
%% TODO: V5 API
ok.

View File

@ -53,21 +53,21 @@
]).
lookup_authz(_Bindings, _Params) ->
minirest:return({ok, emqx_authz:lookup()}).
return({ok, emqx_authz:lookup()}).
update_authz(_Bindings, Params) ->
Rules = get_rules(Params),
minirest:return(emqx_authz:update(Rules)).
return(emqx_authz:update(Rules)).
append_authz(_Bindings, Params) ->
Rules = get_rules(Params),
NRules = lists:append(emqx_authz:lookup(), Rules),
minirest:return(emqx_authz:update(NRules)).
return(emqx_authz:update(NRules)).
push_authz(_Bindings, Params) ->
Rules = get_rules(Params),
NRules = lists:append(Rules, emqx_authz:lookup()),
minirest:return(emqx_authz:update(NRules)).
return(emqx_authz:update(NRules)).
%%------------------------------------------------------------------------------
%% Interval Funcs
@ -88,3 +88,7 @@ get_rules(Params) ->
-endif.
return(_) ->
%% TODO: V5 api
ok.

View File

@ -35,7 +35,9 @@
-define(BASE_PATH, "api").
all() ->
emqx_ct:all(?MODULE).
%% TODO: V5 API
%% emqx_ct:all(?MODULE).
[].
groups() ->
[].

View File

@ -19,7 +19,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-import(proplists, [get_value/3]).
%%-import(proplists, [get_value/3]).
-export([ start_listeners/0
, stop_listeners/0
@ -42,56 +42,61 @@ start_listeners() ->
lists:foreach(fun(Listener) -> start_listener(Listener) end, listeners()).
%% Start HTTP Listener
start_listener({Proto, Port, Options}) when Proto == http ->
Dispatch = [{"/", cowboy_static, {priv_file, emqx_dashboard, "www/index.html"}},
{"/static/[...]", cowboy_static, {priv_dir, emqx_dashboard, "www/static"}},
{"/api/v4/[...]", minirest, http_handlers()}],
minirest:start_http(listener_name(Proto), ranch_opts(Port, Options), Dispatch);
start_listener({Proto, Port, Options}) when Proto == https ->
Dispatch = [{"/", cowboy_static, {priv_file, emqx_dashboard, "www/index.html"}},
{"/static/[...]", cowboy_static, {priv_dir, emqx_dashboard, "www/static"}},
{"/api/v4/[...]", minirest, http_handlers()}],
minirest:start_https(listener_name(Proto), ranch_opts(Port, Options), Dispatch).
ranch_opts(Port, Options0) ->
NumAcceptors = get_value(num_acceptors, Options0, 4),
MaxConnections = get_value(max_connections, Options0, 512),
Options = lists:foldl(fun({K, _V}, Acc) when K =:= max_connections orelse K =:= num_acceptors ->
Acc;
({inet6, true}, Acc) -> [inet6 | Acc];
({inet6, false}, Acc) -> Acc;
({ipv6_v6only, true}, Acc) -> [{ipv6_v6only, true} | Acc];
({ipv6_v6only, false}, Acc) -> Acc;
({K, V}, Acc)->
[{K, V} | Acc]
end, [], Options0),
#{num_acceptors => NumAcceptors,
max_connections => MaxConnections,
socket_opts => [{port, Port} | Options]}.
start_listener(_) -> ok.
%% TODO: V5 API
%%start_listener({Proto, Port, Options}) when Proto == http ->
%% Dispatch = [{"/", cowboy_static, {priv_file, emqx_dashboard, "www/index.html"}},
%% {"/static/[...]", cowboy_static, {priv_dir, emqx_dashboard, "www/static"}},
%% {"/api/v4/[...]", minirest, http_handlers()}],
%% minirest:start_http(listener_name(Proto), ranch_opts(Port, Options), Dispatch);
%%
%%start_listener({Proto, Port, Options}) when Proto == https ->
%% Dispatch = [{"/", cowboy_static, {priv_file, emqx_dashboard, "www/index.html"}},
%% {"/static/[...]", cowboy_static, {priv_dir, emqx_dashboard, "www/static"}},
%% {"/api/v4/[...]", minirest, http_handlers()}],
%% minirest:start_https(listener_name(Proto), ranch_opts(Port, Options), Dispatch).
%%
%%ranch_opts(Port, Options0) ->
%% NumAcceptors = get_value(num_acceptors, Options0, 4),
%% MaxConnections = get_value(max_connections, Options0, 512),
%% Options = lists:foldl(fun({K, _V}, Acc) when K =:= max_connections orelse K =:= num_acceptors ->
%% Acc;
%% ({inet6, true}, Acc) -> [inet6 | Acc];
%% ({inet6, false}, Acc) -> Acc;
%% ({ipv6_v6only, true}, Acc) -> [{ipv6_v6only, true} | Acc];
%% ({ipv6_v6only, false}, Acc) -> Acc;
%% ({K, V}, Acc)->
%% [{K, V} | Acc]
%% end, [], Options0),
%% #{num_acceptors => NumAcceptors,
%% max_connections => MaxConnections,
%% socket_opts => [{port, Port} | Options]}.
stop_listeners() ->
lists:foreach(fun(Listener) -> stop_listener(Listener) end, listeners()).
stop_listener({Proto, _Port, _}) ->
minirest:stop_http(listener_name(Proto)).
stop_listener(_) ->
ok.
%% TODO: V5 API
%%stop_listener({Proto, _Port, _}) ->
%% minirest:stop_http(listener_name(Proto)).
listeners() ->
application:get_env(?APP, listeners, []).
listener_name(Proto) ->
list_to_atom(atom_to_list(Proto) ++ ":dashboard").
%%listener_name(Proto) ->
%% list_to_atom(atom_to_list(Proto) ++ ":dashboard").
%%--------------------------------------------------------------------
%% HTTP Handlers and Dispatcher
%%--------------------------------------------------------------------
http_handlers() ->
Plugins = lists:map(fun(Plugin) -> Plugin#plugin.name end, emqx_plugins:list()),
[{"/api/v4/",
minirest:handler(#{apps => Plugins ++ [emqx_modules],
filter => fun ?MODULE:filter/1}),
[{authorization, fun ?MODULE:is_authorized/1}]}].
%%http_handlers() ->
%% Plugins = lists:map(fun(Plugin) -> Plugin#plugin.name end, emqx_plugins:list()),
%% [{"/api/v4/",
%% minirest:handler(#{apps => Plugins ++ [emqx_modules],
%% filter => fun ?MODULE:filter/1}),
%% [{authorization, fun ?MODULE:is_authorized/1}]}].
%%--------------------------------------------------------------------
%% Basic Authorization

View File

@ -18,8 +18,6 @@
-include("emqx_dashboard.hrl").
-import(minirest, [return/1]).
-rest_api(#{name => auth_user,
method => 'POST',
path => "/auth",
@ -107,3 +105,6 @@ delete(#{name := Username}, _Params) ->
row(#mqtt_admin{username = Username, tags = Tags}) ->
#{username => Username, tags => Tags}.
return(_) ->
%% TODO: V5 API
ok.

View File

@ -40,7 +40,9 @@
-define(OVERVIEWS, ['alarms/activated', 'alarms/deactivated', banned, brokers, stats, metrics, listeners, clients, subscriptions, routes, plugins]).
all() ->
emqx_ct:all(?MODULE).
%% TODO: V5 API
%% emqx_ct:all(?MODULE).
[].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_management, emqx_dashboard],fun set_special_configs/1),

View File

@ -16,8 +16,6 @@
-module(emqx_lwm2m_api).
-import(minirest, [return/1]).
-rest_api(#{name => list,
method => 'GET',
path => "/lwm2m_channels/",
@ -160,3 +158,7 @@ path_list(Path) ->
[ObjId, ObjInsId] -> [ObjId, ObjInsId];
[ObjId] -> [ObjId]
end.
return(_) ->
%% TODO: V5 API
ok.

View File

@ -7,3 +7,6 @@ EMQ X Management API
http://restful-api-design.readthedocs.io/en/latest/scope.html
default application see:
header:
authorization: Basic YWRtaW46cHVibGlj

View File

@ -35,3 +35,29 @@
-define(VERSIONS, ["4.0", "4.1", "4.2", "4.3"]).
-define(MANAGEMENT_SHARD, emqx_management_shard).
-define(GENERATE_API_METADATA(MetaData),
maps:fold(
fun(Method, MethodDef0, NextMetaData) ->
Default = #{
tags => [?MODULE],
security => [#{application => []}]},
MethodDef =
lists:foldl(
fun(Key, NMethodDef) ->
case maps:is_key(Key, NMethodDef) of
true ->
NMethodDef;
false ->
maps:put(Key, maps:get(Key, Default), NMethodDef)
end
end, MethodDef0, maps:keys(Default)),
maps:put(Method, MethodDef, NextMetaData)
end,
#{}, MetaData)).
-define(GENERATE_API(Path, MetaData, Function),
{Path, ?GENERATE_API_METADATA(MetaData), Function}).
-define(GENERATE_APIS(Apis),
[?GENERATE_API(Path, MetaData, Function) || {Path, MetaData, Function} <- Apis]).

View File

@ -113,10 +113,11 @@
-define(APP, emqx_management).
%% TODO: remove these function after all api use minirest version 1.X
return() ->
minirest:return().
return(Response) ->
minirest:return(Response).
ok.
return(_Response) ->
ok.
%%--------------------------------------------------------------------
%% Node Info

View File

@ -16,308 +16,469 @@
-module(emqx_mgmt_api_clients).
-include("emqx_mgmt.hrl").
-behavior(minirest_api).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-define(CLIENT_QS_SCHEMA, {emqx_channel_info,
[{<<"clientid">>, binary},
{<<"username">>, binary},
{<<"zone">>, atom},
{<<"ip_address">>, ip},
{<<"conn_state">>, atom},
{<<"clean_start">>, atom},
{<<"proto_name">>, binary},
{<<"proto_ver">>, integer},
{<<"_like_clientid">>, binary},
{<<"_like_username">>, binary},
{<<"_gte_created_at">>, timestamp},
{<<"_lte_created_at">>, timestamp},
{<<"_gte_connected_at">>, timestamp},
{<<"_lte_connected_at">>, timestamp}]}).
-include_lib("emqx/include/logger.hrl").
-rest_api(#{name => list_clients,
method => 'GET',
path => "/clients/",
func => list,
descr => "A list of clients on current node"}).
-include("emqx_mgmt.hrl").
-rest_api(#{name => list_node_clients,
method => 'GET',
path => "nodes/:atom:node/clients/",
func => list,
descr => "A list of clients on specified node"}).
%% API
-export([api_spec/0]).
-rest_api(#{name => lookup_client,
method => 'GET',
path => "/clients/:bin:clientid",
func => lookup,
descr => "Lookup a client in the cluster"}).
-rest_api(#{name => lookup_node_client,
method => 'GET',
path => "nodes/:atom:node/clients/:bin:clientid",
func => lookup,
descr => "Lookup a client on the node"}).
-rest_api(#{name => lookup_client_via_username,
method => 'GET',
path => "/clients/username/:bin:username",
func => lookup,
descr => "Lookup a client via username in the cluster"
}).
-rest_api(#{name => lookup_node_client_via_username,
method => 'GET',
path => "/nodes/:atom:node/clients/username/:bin:username",
func => lookup,
descr => "Lookup a client via username on the node "
}).
-rest_api(#{name => kickout_client,
method => 'DELETE',
path => "/clients/:bin:clientid",
func => kickout,
descr => "Kick out the client in the cluster"}).
-rest_api(#{name => clean_acl_cache,
method => 'DELETE',
path => "/clients/:bin:clientid/acl_cache",
func => clean_acl_cache,
descr => "Clear the ACL cache of a specified client in the cluster"}).
-rest_api(#{name => list_acl_cache,
method => 'GET',
path => "/clients/:bin:clientid/acl_cache",
func => list_acl_cache,
descr => "List the ACL cache of a specified client in the cluster"}).
-rest_api(#{name => set_ratelimit_policy,
method => 'POST',
path => "/clients/:bin:clientid/ratelimit",
func => set_ratelimit_policy,
descr => "Set the client ratelimit policy"}).
-rest_api(#{name => clean_ratelimit,
method => 'DELETE',
path => "/clients/:bin:clientid/ratelimit",
func => clean_ratelimit,
descr => "Clear the ratelimit policy"}).
-rest_api(#{name => set_quota_policy,
method => 'POST',
path => "/clients/:bin:clientid/quota",
func => set_quota_policy,
descr => "Set the client quota policy"}).
-rest_api(#{name => clean_quota,
method => 'DELETE',
path => "/clients/:bin:clientid/quota",
func => clean_quota,
descr => "Clear the quota policy"}).
-import(emqx_mgmt_util, [ ntoa/1
, strftime/1
]).
-export([ list/2
, lookup/2
, kickout/2
, clean_acl_cache/2
, list_acl_cache/2
, set_ratelimit_policy/2
, set_quota_policy/2
, clean_ratelimit/2
, clean_quota/2
]).
-export([ clients/2
, client/2
, acl_cache/2
, subscribe/2
, subscribe_batch/2]).
-export([ query/3
, format_channel_info/1
]).
, format_channel_info/1]).
%% for batch operation
-export([do_subscribe/3]).
-define(CLIENT_QS_SCHEMA, {emqx_channel_info,
[ {<<"clientid">>, binary}
, {<<"username">>, binary}
, {<<"zone">>, atom}
, {<<"ip_address">>, ip}
, {<<"conn_state">>, atom}
, {<<"clean_start">>, atom}
, {<<"proto_name">>, binary}
, {<<"proto_ver">>, integer}
, {<<"_like_clientid">>, binary}
, {<<"_like_username">>, binary}
, {<<"_gte_created_at">>, timestamp}
, {<<"_lte_created_at">>, timestamp}
, {<<"_gte_connected_at">>, timestamp}
, {<<"_lte_connected_at">>, timestamp}]}).
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format_channel_info}).
list(Bindings, Params) when map_size(Bindings) == 0 ->
fence(fun() ->
emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)
end);
-define(CLIENT_ID_NOT_FOUND,
<<"{\"code\": \"RESOURCE_NOT_FOUND\", \"reason\": \"Client id not found\"}">>).
list(#{node := Node}, Params) when Node =:= node() ->
fence(fun() ->
emqx_mgmt_api:node_query(Node, Params, ?CLIENT_QS_SCHEMA, ?query_fun)
end);
api_spec() ->
{apis(), schemas()}.
list(Bindings = #{node := Node}, Params) ->
case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of
{badrpc, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason});
Res -> Res
apis() ->
[ clients_api()
, client_api()
, clients_acl_cache_api()
, subscribe_api()].
schemas() ->
ClientDef = #{
<<"node">> => #{
type => <<"string">>,
description => <<"Name of the node to which the client is connected">>},
<<"clientid">> => #{
type => <<"string">>,
description => <<"Client identifier">>},
<<"username">> => #{
type => <<"string">>,
description => <<"User name of client when connecting">>},
<<"proto_name">> => #{
type => <<"string">>,
description => <<"Client protocol name">>},
<<"proto_ver">> => #{
type => <<"integer">>,
description => <<"Protocol version used by the client">>},
<<"ip_address">> => #{
type => <<"string">>,
description => <<"Client's IP address">>},
<<"is_bridge">> => #{
type => <<"boolean">>,
description => <<"Indicates whether the client is connectedvia bridge">>},
<<"connected_at">> => #{
type => <<"string">>,
description => <<"Client connection time">>},
<<"disconnected_at">> => #{
type => <<"string">>,
description => <<"Client offline time, This field is only valid and returned when connected is false">>},
<<"connected">> => #{
type => <<"boolean">>,
description => <<"Whether the client is connected">>},
<<"will_msg">> => #{
type => <<"string">>,
description => <<"Client will message">>},
<<"zone">> => #{
type => <<"string">>,
description => <<"Indicate the configuration group used by the client">>},
<<"keepalive">> => #{
type => <<"integer">>,
description => <<"keepalive time, with the unit of second">>},
<<"clean_start">> => #{
type => <<"boolean">>,
description => <<"Indicate whether the client is using a brand new session">>},
<<"expiry_interval">> => #{
type => <<"integer">>,
description => <<"Session expiration interval, with the unit of second">>},
<<"created_at">> => #{
type => <<"string">>,
description => <<"Session creation time">>},
<<"subscriptions_cnt">> => #{
type => <<"integer">>,
description => <<"Number of subscriptions established by this client.">>},
<<"subscriptions_max">> => #{
type => <<"integer">>,
description => <<"v4 api name [max_subscriptions] Maximum number of subscriptions allowed by this client">>},
<<"inflight_cnt">> => #{
type => <<"integer">>,
description => <<"Current length of inflight">>},
<<"inflight_max">> => #{
type => <<"integer">>,
description => <<"v4 api name [max_inflight]. Maximum length of inflight">>},
<<"mqueue_len">> => #{
type => <<"integer">>,
description => <<"Current length of message queue">>},
<<"mqueue_max">> => #{
type => <<"integer">>,
description => <<"v4 api name [max_mqueue]. Maximum length of message queue">>},
<<"mqueue_dropped">> => #{
type => <<"integer">>,
description => <<"Number of messages dropped by the message queue due to exceeding the length">>},
<<"awaiting_rel_cnt">> => #{
type => <<"integer">>,
description => <<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>},
<<"awaiting_rel_max">> => #{
type => <<"integer">>,
description => <<"v4 api name [max_awaiting_rel]. Maximum allowed number of awaiting PUBREC packet">>},
<<"recv_oct">> => #{
type => <<"integer">>,
description => <<"Number of bytes received by EMQ X Broker (the same below)">>},
<<"recv_cnt">> => #{
type => <<"integer">>,
description => <<"Number of TCP packets received">>},
<<"recv_pkt">> => #{
type => <<"integer">>,
description => <<"Number of MQTT packets received">>},
<<"recv_msg">> => #{
type => <<"integer">>,
description => <<"Number of PUBLISH packets received">>},
<<"send_oct">> => #{
type => <<"integer">>,
description => <<"Number of bytes sent">>},
<<"send_cnt">> => #{
type => <<"integer">>,
description => <<"Number of TCP packets sent">>},
<<"send_pkt">> => #{
type => <<"integer">>,
description => <<"Number of MQTT packets sent">>},
<<"send_msg">> => #{
type => <<"integer">>,
description => <<"Number of PUBLISH packets sent">>},
<<"mailbox_len">> => #{
type => <<"integer">>,
description => <<"Process mailbox size">>},
<<"heap_size">> => #{
type => <<"integer">>,
description => <<"Process heap size with the unit of byte">>
},
<<"reductions">> => #{
type => <<"integer">>,
description => <<"Erlang reduction">>}},
ACLCacheDefinitionProperties = #{
<<"topic">> => #{
type => <<"string">>,
description => <<"Topic name">>},
<<"access">> => #{
type => <<"string">>,
enum => [<<"subscribe">>, <<"publish">>],
description => <<"Access type">>},
<<"result">> => #{
type => <<"string">>,
enum => [<<"allow">>, <<"deny">>],
default => <<"allow">>,
description => <<"Allow or deny">>},
<<"updated_time">> => #{
type => <<"integer">>,
description => <<"Update time">>}},
[{<<"client">>, ClientDef}, {<<"acl_cache">>, ACLCacheDefinitionProperties}].
clients_api() ->
Metadata = #{
get => #{
description => "List clients",
responses => #{
<<"200">> => #{
description => <<"List clients 200 OK">>,
schema => #{
type => array,
items => minirest:ref(<<"client">>)}}}}},
{"/clients", Metadata, clients}.
client_api() ->
Metadata = #{
get => #{
description => "Get clients info by client ID",
parameters => [#{
name => clientid,
in => path,
type => string,
required => true,
default => 123456}],
responses => #{
<<"404">> => emqx_mgmt_util:not_found_schema(<<"Client id not found">>),
<<"200">> => #{
description => <<"Get clients 200 OK">>,
schema => minirest:ref(<<"client">>)}}},
delete => #{
description => "Kick out client by client ID",
parameters => [#{
name => clientid,
in => path,
type => string,
required => true,
default => 123456}],
responses => #{
<<"404">> => emqx_mgmt_util:not_found_schema(<<"Client id not found">>),
<<"200">> => #{description => <<"Kick out clients OK">>}}}},
{"/clients/:clientid", Metadata, client}.
clients_acl_cache_api() ->
Metadata = #{
get => #{
description => "Get client acl cache",
parameters => [#{
name => clientid,
in => path,
type => string,
required => true,
default => 123456}],
responses => #{
<<"404">> => emqx_mgmt_util:not_found_schema(<<"Client id not found">>),
<<"200">> => #{
description => <<"List 200 OK">>,
schema => minirest:ref(<<"acl_cache">>)}}},
delete => #{
description => "Clean client acl cache",
parameters => [#{
name => clientid,
in => path,
type => string,
required => true,
default => 123456}],
responses => #{
<<"404">> => emqx_mgmt_util:not_found_schema(<<"client id not found">>),
<<"200">> => #{
description => <<"Clean acl cache 200 OK">>}}}},
{"/clients/:clientid/acl_cache", Metadata, acl_cache}.
subscribe_api() ->
Path = "/clients/:clientid/subscribe",
Metadata = #{
post => #{
description => "subscribe",
parameters => [
#{
name => clientid,
in => path,
type => string,
required => true,
default => 123456
},
#{
name => topics,
in => body,
schema => #{
type => object,
properties => #{
<<"topic">> => #{
type => <<"string">>,
example => <<"topic_1">>,
description => <<"Topic">>},
<<"qos">> => #{
type => <<"integer">>,
enum => [0, 1, 2],
example => 0,
description => <<"QOS">>}}}
}
],
responses => #{
<<"404">> => emqx_mgmt_util:not_found_schema(<<"Client id not found">>),
<<"200">> => #{description => <<"publish ok">>}}}},
{Path, Metadata, subscribe}.
%%%==============================================================================================
%% parameters trans
clients(get, _Request) ->
list(#{}).
client(get, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
lookup(#{clientid => ClientID});
client(delete, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
kickout(#{clientid => ClientID}).
acl_cache(get, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
get_acl_cache(#{clientid => ClientID});
acl_cache(delete, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
clean_acl_cache(#{clientid => ClientID}).
subscribe(post, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
TopicInfo = emqx_json:decode(Body, [return_maps]),
Topic = maps:get(<<"topic">>, TopicInfo),
Qos = maps:get(<<"qos">>, TopicInfo, 0),
subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}).
%% TODO: batch
subscribe_batch(post, Request) ->
ClientID = cowboy_req:binding(clientid, Request),
{ok, Body, _} = cowboy_req:read_body(Request),
TopicInfos = emqx_json:decode(Body, [return_maps]),
Topics =
[begin
Topic = maps:get(<<"topic">>, TopicInfo),
Qos = maps:get(<<"qos">>, TopicInfo, 0),
#{topic => Topic, qos => Qos}
end || TopicInfo <- TopicInfos],
subscribe_batch(#{clientid => ClientID, topics => Topics}).
%%%==============================================================================================
%% api apply
list(Params) ->
Data = emqx_mgmt_api:cluster_query(maps:to_list(Params), ?CLIENT_QS_SCHEMA, ?query_fun),
Body = emqx_json:encode(Data),
{200, Body}.
lookup(#{clientid := ClientID}) ->
case emqx_mgmt:lookup_client({clientid, ClientID}, ?format_fun) of
[] ->
{404, ?CLIENT_ID_NOT_FOUND};
ClientInfo ->
Response = emqx_json:encode(hd(ClientInfo)),
{ok, Response}
end.
%% @private
fence(Func) ->
try
emqx_mgmt:return({ok, Func()})
catch
throw : {bad_value_type, {_Key, Type, Value}} ->
Reason = iolist_to_binary(
io_lib:format("Can't convert ~p to ~p type",
[Value, Type])
),
emqx_mgmt:return({error, ?ERROR8, Reason})
kickout(#{clientid := ClientID}) ->
emqx_mgmt:kickout_client(ClientID),
{200}.
get_acl_cache(#{clientid := ClientID})->
case emqx_mgmt:list_acl_cache(ClientID) of
{error, not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
{500, #{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}};
Caches ->
Response = emqx_json:encode([format_acl_cache(Cache) || Cache <- Caches]),
{200, Response}
end.
lookup(#{node := Node, clientid := ClientId}, _Params) ->
emqx_mgmt:return({ok, emqx_mgmt:lookup_client(Node, {clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
lookup(#{clientid := ClientId}, _Params) ->
emqx_mgmt:return({ok, emqx_mgmt:lookup_client({clientid, emqx_mgmt_util:urldecode(ClientId)}, ?format_fun)});
lookup(#{node := Node, username := Username}, _Params) ->
emqx_mgmt:return({ok, emqx_mgmt:lookup_client(Node, {username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)});
lookup(#{username := Username}, _Params) ->
emqx_mgmt:return({ok, emqx_mgmt:lookup_client({username, emqx_mgmt_util:urldecode(Username)}, ?format_fun)}).
kickout(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:kickout_client(emqx_mgmt_util:urldecode(ClientId)) of
ok -> emqx_mgmt:return();
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
clean_acl_cache(#{clientid := ClientID}) ->
case emqx_mgmt:clean_acl_cache(ClientID) of
ok ->
{200};
{error, not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
{500, #{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}}
end.
clean_acl_cache(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:clean_acl_cache(emqx_mgmt_util:urldecode(ClientId)) of
ok -> emqx_mgmt:return();
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) ->
case do_subscribe(ClientID, Topic, Qos) of
{error, channel_not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
Body = emqx_json:encode(#{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}),
{200, Body};
ok ->
{200}
end.
list_acl_cache(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:list_acl_cache(emqx_mgmt_util:urldecode(ClientId)) of
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason});
Caches -> emqx_mgmt:return({ok, [format_acl_cache(Cache) || Cache <- Caches]})
end.
subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
ArgList = [[ClientID, Topic, Qos]|| #{topic := Topic, qos := Qos} <- Topics],
emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList).
set_ratelimit_policy(#{clientid := ClientId}, Params) ->
P = [{conn_bytes_in, proplists:get_value(<<"conn_bytes_in">>, Params)},
{conn_messages_in, proplists:get_value(<<"conn_messages_in">>, Params)}],
case [{K, parse_ratelimit_str(V)} || {K, V} <- P, V =/= undefined] of
[] -> emqx_mgmt:return();
Policy ->
case emqx_mgmt:set_ratelimit_policy(emqx_mgmt_util:urldecode(ClientId), Policy) of
ok -> emqx_mgmt:return();
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
end
end.
%%%==============================================================================================
%% internal function
format_channel_info({_, ClientInfo, ClientStats}) ->
Fun =
fun
(_Key, Value, Current) when is_map(Value) ->
maps:merge(Current, Value);
(Key, Value, Current) ->
maps:put(Key, Value, Current)
end,
StatsMap = maps:without([memory, next_pkt_id, total_heap_size],
maps:from_list(ClientStats)),
ClientInfoMap0 = maps:fold(Fun, #{}, ClientInfo),
IpAddress = peer_to_binary(maps:get(peername, ClientInfoMap0)),
Connected = maps:get(conn_state, ClientInfoMap0) =:= connected,
ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0),
ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1),
ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap3),
RemoveList = [
auth_result
, peername
, sockname
, peerhost
, conn_state
, send_pend
, conn_props
, peercert
, sockstate
, receive_maximum
, protocol
, is_superuser
, sockport
, anonymous
, mountpoint
, socktype
, active_n
, await_rel_timeout
, conn_mod
, sockname
, retry_interval
, upgrade_qos
],
maps:without(RemoveList, ClientInfoMap).
clean_ratelimit(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:set_ratelimit_policy(emqx_mgmt_util:urldecode(ClientId), []) of
ok -> emqx_mgmt:return();
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
end.
set_quota_policy(#{clientid := ClientId}, Params) ->
P = [{conn_messages_routing, proplists:get_value(<<"conn_messages_routing">>, Params)}],
case [{K, parse_ratelimit_str(V)} || {K, V} <- P, V =/= undefined] of
[] -> emqx_mgmt:return();
Policy ->
case emqx_mgmt:set_quota_policy(emqx_mgmt_util:urldecode(ClientId), Policy) of
ok -> emqx_mgmt:return();
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
end
end.
clean_quota(#{clientid := ClientId}, _Params) ->
case emqx_mgmt:set_quota_policy(emqx_mgmt_util:urldecode(ClientId), []) of
ok -> emqx_mgmt:return();
{error, not_found} -> emqx_mgmt:return({error, ?ERROR12, not_found});
{error, Reason} -> emqx_mgmt:return({error, ?ERROR1, Reason})
end.
%% @private
%% S = 100,1s
%% | 100KB, 1m
parse_ratelimit_str(S) when is_binary(S) ->
parse_ratelimit_str(binary_to_list(S));
parse_ratelimit_str(S) ->
[L, D] = string:tokens(S, ", "),
Limit = case cuttlefish_bytesize:parse(L) of
Sz when is_integer(Sz) -> Sz;
{error, Reason1} -> error(Reason1)
end,
Duration = case cuttlefish_duration:parse(D, s) of
Secs when is_integer(Secs) -> Secs;
{error, Reason} -> error(Reason)
end,
{Limit, Duration}.
%%--------------------------------------------------------------------
%% Format
format_channel_info({_Key, Info, Stats0}) ->
Stats = maps:from_list(Stats0),
ClientInfo = maps:get(clientinfo, Info, #{}),
ConnInfo = maps:get(conninfo, Info, #{}),
Session = case maps:get(session, Info, #{}) of
undefined -> #{};
_Sess -> _Sess
end,
SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)),
Connected = case maps:get(conn_state, Info, connected) of
connected -> true;
_ -> false
end,
NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0),
max_inflight => maps:get(inflight_max, Stats, 0),
max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0),
max_mqueue => maps:get(mqueue_max, Stats, 0),
inflight => maps:get(inflight_cnt, Stats, 0),
awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)},
format(
lists:foldl(fun(Items, Acc) ->
maps:merge(Items, Acc)
end, #{connected => Connected},
[maps:with([ subscriptions_cnt, max_subscriptions,
inflight, max_inflight, awaiting_rel,
max_awaiting_rel, mqueue_len, mqueue_dropped,
max_mqueue, heap_size, reductions, mailbox_len,
recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt,
send_msg, send_oct, send_pkt], NStats),
maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo),
maps:with([clean_start, keepalive, expiry_interval, proto_name,
proto_ver, peername, connected_at, disconnected_at], ConnInfo),
#{created_at => SessCreated}])).
format(Data) when is_map(Data)->
{IpAddr, Port} = maps:get(peername, Data),
ConnectedAt = maps:get(connected_at, Data),
CreatedAt = maps:get(created_at, Data),
Data1 = maps:without([peername], Data),
maps:merge(Data1#{node => node(),
ip_address => iolist_to_binary(ntoa(IpAddr)),
port => Port,
connected_at => iolist_to_binary(strftime(ConnectedAt div 1000)),
created_at => iolist_to_binary(strftime(CreatedAt div 1000))},
case maps:get(disconnected_at, Data, undefined) of
undefined -> #{};
DisconnectedAt -> #{disconnected_at => iolist_to_binary(strftime(DisconnectedAt div 1000))}
end).
peer_to_binary({Addr, Port}) ->
AddrBinary = list_to_binary(inet:ntoa(Addr)),
PortBinary = integer_to_binary(Port),
<<AddrBinary/binary, ":", PortBinary/binary>>;
peer_to_binary(Addr) ->
list_to_binary(inet:ntoa(Addr)).
format_acl_cache({{PubSub, Topic}, {AclResult, Timestamp}}) ->
#{access => PubSub,
topic => Topic,
result => AclResult,
updated_time => Timestamp}.
#{
access => PubSub,
topic => Topic,
result => AclResult,
updated_time => Timestamp
}.
%%--------------------------------------------------------------------
do_subscribe(ClientID, Topic0, Qos) ->
{Topic, Opts} = emqx_topic:parse(Topic0),
TopicTable = [{Topic, Opts#{qos => Qos}}],
emqx_mgmt:subscribe(ClientID, TopicTable),
case emqx_mgmt:subscribe(ClientID, TopicTable) of
{error, Reason} ->
{error, Reason};
{subscribe, Subscriptions} ->
case proplists:is_defined(Topic, Subscriptions) of
true ->
ok;
false ->
{error, unknow_error}
end
end.
%%%==============================================================================================
%% Query Functions
%%--------------------------------------------------------------------
query({Qs, []}, Start, Limit) ->
Ms = qs2ms(Qs),
@ -328,37 +489,8 @@ query({Qs, Fuzzy}, Start, Limit) ->
MatchFun = match_fun(Ms, Fuzzy),
emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1).
%%--------------------------------------------------------------------
%% Match funcs
match_fun(Ms, Fuzzy) ->
MsC = ets:match_spec_compile(Ms),
REFuzzy = lists:map(fun({K, like, S}) ->
{ok, RE} = re:compile(S),
{K, like, RE}
end, Fuzzy),
fun(Rows) ->
case ets:match_spec_run(Rows, MsC) of
[] -> [];
Ls ->
lists:filter(fun(E) ->
run_fuzzy_match(E, REFuzzy)
end, Ls)
end
end.
run_fuzzy_match(_, []) ->
true;
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, "") of
undefined -> "";
V -> V
end,
re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy).
%%--------------------------------------------------------------------
%%%==============================================================================================
%% QueryString to Match Spec
-spec qs2ms(list()) -> ets:match_spec().
qs2ms(Qs) ->
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
@ -380,7 +512,7 @@ put_conds({_, Op, V}, Holder, Conds) ->
[{Op, Holder, V} | Conds];
put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
[{Op2, Holder, V2},
{Op1, Holder, V1} | Conds].
{Op1, Holder, V1} | Conds].
ms(clientid, X) ->
#{clientinfo => #{clientid => X}};
@ -403,51 +535,29 @@ ms(connected_at, X) ->
ms(created_at, X) ->
#{session => #{created_at => X}}.
%%--------------------------------------------------------------------
%% EUnits
%%--------------------------------------------------------------------
%%%==============================================================================================
%% Match funcs
match_fun(Ms, Fuzzy) ->
MsC = ets:match_spec_compile(Ms),
REFuzzy = lists:map(fun({K, like, S}) ->
{ok, RE} = re:compile(S),
{K, like, RE}
end, Fuzzy),
fun(Rows) ->
case ets:match_spec_run(Rows, MsC) of
[] -> [];
Ls ->
lists:filter(fun(E) ->
run_fuzzy_match(E, REFuzzy)
end, Ls)
end
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
params2qs_test() ->
QsSchema = element(2, ?CLIENT_QS_SCHEMA),
Params = [{<<"clientid">>, <<"abc">>},
{<<"username">>, <<"def">>},
{<<"zone">>, <<"external">>},
{<<"ip_address">>, <<"127.0.0.1">>},
{<<"conn_state">>, <<"connected">>},
{<<"clean_start">>, true},
{<<"proto_name">>, <<"MQTT">>},
{<<"proto_ver">>, 4},
{<<"_gte_created_at">>, 1},
{<<"_lte_created_at">>, 5},
{<<"_gte_connected_at">>, 1},
{<<"_lte_connected_at">>, 5},
{<<"_like_clientid">>, <<"a">>},
{<<"_like_username">>, <<"e">>}
],
ExpectedMtchHead =
#{clientinfo => #{clientid => <<"abc">>,
username => <<"def">>,
zone => external,
peerhost => {127,0,0,1}
},
conn_state => connected,
conninfo => #{clean_start => true,
proto_name => <<"MQTT">>,
proto_ver => 4,
connected_at => '$3'},
session => #{created_at => '$2'}},
ExpectedCondi = [{'>=','$2', 1},
{'=<','$2', 5},
{'>=','$3', 1},
{'=<','$3', 5}],
{10, {Qs1, []}} = emqx_mgmt_api:params2qs(Params, QsSchema),
[{{'$1', MtchHead, _}, Condi, _}] = qs2ms(Qs1),
?assertEqual(ExpectedMtchHead, MtchHead),
?assertEqual(ExpectedCondi, Condi),
[{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]).
-endif.
run_fuzzy_match(_, []) ->
true;
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, "") of
undefined -> "";
V -> V
end,
re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy).

View File

@ -0,0 +1,47 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_status).
%% API
-behavior(minirest_api).
-export([api_spec/0]).
-export([running_status/2]).
api_spec() ->
{[status_api()], []}.
status_api() ->
Path = "/status",
Metadata = #{
get => #{
security => [],
responses => #{
<<"200">> => #{description => <<"running">>}}}},
{Path, Metadata, running_status}.
running_status(get, _Request) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
AppStatus =
case lists:keysearch(emqx, 1, application:which_applications()) of
false -> not_running;
{value, _Val} -> running
end,
Status = io_lib:format("Node ~s is ~s~nemqx is ~s", [node(), InternalStatus, AppStatus]),
Body = list_to_binary(Status),
{200, #{<<"content-type">> => <<"text/plain">>}, Body}.

View File

@ -485,7 +485,7 @@ listeners([]) ->
listeners(["stop", Name = "http" ++ _N | _MaybePort]) ->
%% _MaybePort is to be backward compatible, to stop http listener, there is no need for the port number
case minirest:stop_http(list_to_atom(Name)) of
case minirest:stop(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Stop ~s listener successfully.~n", [Name]);
{error, Error} ->

View File

@ -13,27 +13,21 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_http).
-export([ start_listeners/0
, handle_request/2
, stop_listeners/0
, start_listener/1
, stop_listener/1
]).
, stop_listener/1]).
-export([init/2]).
%% Authorization
-export([authorize_appid/1]).
-include_lib("emqx/include/emqx.hrl").
-define(APP, emqx_management).
-define(EXCEPT_PLUGIN, [emqx_dashboard]).
-ifdef(TEST).
-define(EXCEPT, []).
-else.
-define(EXCEPT, [add_app, del_app, list_apps, lookup_app, update_app]).
-endif.
-define(BASE_PATH, "/api/v5").
%%--------------------------------------------------------------------
%% Start/Stop Listeners
@ -45,37 +39,54 @@ start_listeners() ->
stop_listeners() ->
lists:foreach(fun stop_listener/1, listeners()).
start_listener({Proto, Port, Options}) when Proto == http ->
Dispatch = [{"/status", emqx_mgmt_http, []},
{"/api/v4/[...]", minirest, http_handlers()}],
minirest:start_http(listener_name(Proto), ranch_opts(Port, Options), Dispatch);
start_listener({Proto, Port, Options}) ->
{ok, _} = application:ensure_all_started(minirest),
Authorization = {?MODULE, authorize_appid},
RanchOptions = ranch_opts(Port, Options),
GlobalSpec = #{
swagger => "2.0",
info => #{title => "EMQ X API", version => "5.0.0"},
basePath => ?BASE_PATH,
securityDefinitions => #{
application => #{
type => apiKey,
name => "authorization",
in => header}}},
Minirest = #{
protocol => Proto,
base_path => ?BASE_PATH,
apps => apps(),
authorization => Authorization,
security => [#{application => []}],
swagger_global_spec => GlobalSpec},
MinirestOptions = maps:merge(Minirest, RanchOptions),
minirest:start(listener_name(Proto), MinirestOptions).
start_listener({Proto, Port, Options}) when Proto == https ->
Dispatch = [{"/status", emqx_mgmt_http, []},
{"/api/v4/[...]", minirest, http_handlers()}],
minirest:start_https(listener_name(Proto), ranch_opts(Port, Options), Dispatch).
apps() ->
Apps = [App || {App, _, _} <- application:loaded_applications(),
case re:run(atom_to_list(App), "^emqx") of
{match,[{0,4}]} -> true;
_ -> false
end],
Plugins = lists:map(fun(Plugin) -> Plugin#plugin.name end, emqx_plugins:list()),
Apps ++ Plugins.
ranch_opts(Port, Options0) ->
NumAcceptors = proplists:get_value(num_acceptors, Options0, 4),
MaxConnections = proplists:get_value(max_connections, Options0, 512),
Options = lists:foldl(fun({K, _V}, Acc) when K =:= max_connections orelse K =:= num_acceptors ->
Acc;
({inet6, true}, Acc) -> [inet6 | Acc];
({inet6, false}, Acc) -> Acc;
({ipv6_v6only, true}, Acc) -> [{ipv6_v6only, true} | Acc];
({ipv6_v6only, false}, Acc) -> Acc;
({K, V}, Acc)->
[{K, V} | Acc]
end, [], Options0),
Res = #{num_acceptors => NumAcceptors,
max_connections => MaxConnections,
socket_opts => [{port, Port} | Options]},
Res.
Options = lists:foldl(
fun
({K, _V}, Acc) when K =:= max_connections orelse K =:= num_acceptors -> Acc;
({inet6, true}, Acc) -> [inet6 | Acc];
({inet6, false}, Acc) -> Acc;
({ipv6_v6only, true}, Acc) -> [{ipv6_v6only, true} | Acc];
({ipv6_v6only, false}, Acc) -> Acc;
({K, V}, Acc)->
[{K, V} | Acc]
end, [], Options0),
maps:from_list([{port, Port} | Options]).
stop_listener({Proto, Port, _}) ->
io:format("Stop http:management listener on ~s successfully.~n",[format(Port)]),
minirest:stop_http(listener_name(Proto)).
minirest:stop(listener_name(Proto)).
listeners() ->
[{Protocol, Port, maps:to_list(maps:without([protocol, port], Map))}
@ -85,45 +96,15 @@ listeners() ->
listener_name(Proto) ->
list_to_atom(atom_to_list(Proto) ++ ":management").
http_handlers() ->
Apps = [ App || {App, _, _} <- application:loaded_applications(),
case re:run(atom_to_list(App), "^emqx") of
{match,[{0,4}]} -> true;
_ -> false
end],
Plugins = lists:map(fun(Plugin) -> Plugin#plugin.name end, emqx_plugins:list()),
[{"/api/v4", minirest:handler(#{apps => Plugins ++ Apps -- ?EXCEPT_PLUGIN,
except => ?EXCEPT,
filter => fun(_) -> true end}),
[{authorization, fun authorize_appid/1}]}].
%%--------------------------------------------------------------------
%% Handle 'status' request
%%--------------------------------------------------------------------
init(Req, Opts) ->
Req1 = handle_request(cowboy_req:path(Req), Req),
{ok, Req1, Opts}.
handle_request(Path, Req) ->
handle_request(cowboy_req:method(Req), Path, Req).
handle_request(<<"GET">>, <<"/status">>, Req) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
AppStatus = case lists:keysearch(emqx, 1, application:which_applications()) of
false -> not_running;
{value, _Val} -> running
end,
Status = io_lib:format("Node ~s is ~s~nemqx is ~s",
[node(), InternalStatus, AppStatus]),
cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, Status, Req);
handle_request(_Method, _Path, Req) ->
cowboy_req:reply(400, #{<<"content-type">> => <<"text/plain">>}, <<"Not found.">>, Req).
authorize_appid(Req) ->
case cowboy_req:parse_header(<<"authorization">>, Req) of
{basic, AppId, AppSecret} -> emqx_mgmt_auth:is_authorized(AppId, AppSecret);
_ -> false
{basic, AppId, AppSecret} ->
case emqx_mgmt_auth:is_authorized(AppId, AppSecret) of
true -> ok;
false -> {401}
end;
_ ->
{401}
end.
format(Port) when is_integer(Port) ->

View File

@ -21,6 +21,8 @@
, kmg/1
, ntoa/1
, merge_maps/2
, not_found_schema/1
, batch_operation/3
]).
-export([urldecode/1]).
@ -77,3 +79,34 @@ merge_maps(Default, New) ->
urldecode(S) ->
emqx_http_lib:uri_decode(S).
not_found_schema(Description) ->
not_found_schema(Description, ["RESOURCE_NOT_FOUND"]).
not_found_schema(Description, Enum) ->
#{
description => Description,
schema => #{
type => object,
properties => #{
code => #{
type => string,
enum => Enum},
reason => #{
type => string}}}
}.
batch_operation(Module, Function, ArgsList) ->
Failed = batch_operation(Module, Function, ArgsList, []),
Len = erlang:length(Failed),
Success = erlang:length(ArgsList) - Len,
Fun = fun({Args, Reason}, Detail) -> [#{data => Args, reason => io_lib:format("~p", [Reason])} | Detail] end,
#{success => Success, failed => Len, detail => lists:foldl(Fun, [], Failed)}.
batch_operation(_Module, _Function, [], Failed) ->
lists:reverse(Failed);
batch_operation(Module, Function, [Args | ArgsList], Failed) ->
case erlang:apply(Module, Function, Args) of
ok ->
batch_operation(Module, Function, ArgsList, Failed);
{error ,Reason} ->
batch_operation(Module, Function, ArgsList, [{Args, Reason} | Failed])
end.

View File

@ -1,340 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(LOG_LEVELS, ["debug", "error", "info"]).
-define(LOG_HANDLER_ID, [file, default]).
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_retainer, emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_management, emqx_retainer]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}]}),
ok;
set_special_configs(_App) ->
ok.
t_app(_Config) ->
{ok, AppSecret} = emqx_mgmt_auth:add_app(<<"app_id">>, <<"app_name">>),
?assert(emqx_mgmt_auth:is_authorized(<<"app_id">>, AppSecret)),
?assertEqual(AppSecret, emqx_mgmt_auth:get_appsecret(<<"app_id">>)),
?assertEqual({<<"app_id">>, AppSecret,
<<"app_name">>, <<"Application user">>,
true, undefined},
lists:keyfind(<<"app_id">>, 1, emqx_mgmt_auth:list_apps())),
emqx_mgmt_auth:del_app(<<"app_id">>),
application:set_env(emqx_management, application, []),
%% Specify the application secret
{ok, AppSecret2} = emqx_mgmt_auth:add_app(
<<"app_id">>, <<"app_name">>, <<"secret">>,
<<"app_desc">>, true, undefined),
?assert(emqx_mgmt_auth:is_authorized(<<"app_id">>, AppSecret2)),
?assertEqual(AppSecret2, emqx_mgmt_auth:get_appsecret(<<"app_id">>)),
?assertEqual({<<"app_id">>, AppSecret2, <<"app_name">>, <<"app_desc">>, true, undefined},
lists:keyfind(<<"app_id">>, 1, emqx_mgmt_auth:list_apps())),
emqx_mgmt_auth:del_app(<<"app_id">>),
ok.
t_log_cmd(_) ->
mock_print(),
lists:foreach(fun(Level) ->
emqx_mgmt_cli:log(["primary-level", Level]),
?assertEqual(Level ++ "\n", emqx_mgmt_cli:log(["primary-level"]))
end, ?LOG_LEVELS),
lists:foreach(fun(Level) ->
emqx_mgmt_cli:log(["set-level", Level]),
?assertEqual(Level ++ "\n", emqx_mgmt_cli:log(["primary-level"]))
end, ?LOG_LEVELS),
[lists:foreach(fun(Level) ->
?assertEqual(Level ++ "\n", emqx_mgmt_cli:log(["handlers", "set-level",
atom_to_list(Id), Level]))
end, ?LOG_LEVELS)
|| #{id := Id} <- emqx_logger:get_log_handlers()],
meck:unload().
t_mgmt_cmd(_) ->
% ct:pal("start testing the mgmt command"),
mock_print(),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
["lookup", "emqx_appid"]), "Not Found.")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
["insert", "emqx_appid", "emqx_name"]), "AppSecret:")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
["insert", "emqx_appid", "emqx_name"]), "Error:")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
["lookup", "emqx_appid"]), "app_id:")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
["update", "emqx_appid", "ts"]), "update successfully")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:mgmt(
["delete", "emqx_appid"]), "ok")),
ok = emqx_mgmt_cli:mgmt(["list"]),
meck:unload().
t_status_cmd(_) ->
% ct:pal("start testing status command"),
mock_print(),
%% init internal status seem to be always 'starting' when running ct tests
?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([]), "Node\s.*@.*\sis\sstart(ed|ing)")),
meck:unload().
t_broker_cmd(_) ->
% ct:pal("start testing the broker command"),
mock_print(),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker([]), "sysdescr")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker(["stats"]), "subscriptions.shared")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker(["metrics"]), "bytes.sent")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:broker([undefined]), "broker")),
meck:unload().
t_clients_cmd(_) ->
% ct:pal("start testing the client command"),
mock_print(),
process_flag(trap_exit, true),
{ok, T} = emqtt:start_link([{clientid, <<"client12">>},
{username, <<"testuser1">>},
{password, <<"pass1">>}
]),
{ok, _} = emqtt:connect(T),
timer:sleep(300),
emqx_mgmt_cli:clients(["list"]),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client12"]), "client12")),
?assertEqual((emqx_mgmt_cli:clients(["kick", "client12"])), "ok~n"),
timer:sleep(500),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client12"]), "Not Found")),
receive
{'EXIT', T, _} ->
ok
% ct:pal("Connection closed: ~p~n", [Reason])
after
500 ->
erlang:error("Client is not kick")
end,
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
{ok, _} = rfc6455_client:open(WS),
Packet = raw_send_serialize(?CONNECT_PACKET(#mqtt_packet_connect{
clientid = <<"client13">>})),
ok = rfc6455_client:send_binary(WS, Packet),
Connack = ?CONNACK_PACKET(?CONNACK_ACCEPT),
{binary, Bin} = rfc6455_client:recv(WS),
{ok, Connack, <<>>, _} = raw_recv_pase(Bin),
timer:sleep(300),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client13"]), "client13")),
meck:unload().
% emqx_mgmt_cli:clients(["kick", "client13"]),
% timer:sleep(500),
% ?assertMatch({match, _}, re:run(emqx_mgmt_cli:clients(["show", "client13"]), "Not Found")).
raw_recv_pase(Packet) ->
emqx_frame:parse(Packet).
raw_send_serialize(Packet) ->
emqx_frame:serialize(Packet).
t_vm_cmd(_) ->
% ct:pal("start testing the vm command"),
mock_print(),
[[?assertMatch({match, _}, re:run(Result, Name))
|| Result <- emqx_mgmt_cli:vm([Name])]
|| Name <- ["load", "memory", "process", "io", "ports"]],
[?assertMatch({match, _}, re:run(Result, "load"))
|| Result <- emqx_mgmt_cli:vm(["load"])],
[?assertMatch({match, _}, re:run(Result, "memory"))
|| Result <- emqx_mgmt_cli:vm(["memory"])],
[?assertMatch({match, _}, re:run(Result, "process"))
|| Result <- emqx_mgmt_cli:vm(["process"])],
[?assertMatch({match, _}, re:run(Result, "io"))
|| Result <- emqx_mgmt_cli:vm(["io"])],
[?assertMatch({match, _}, re:run(Result, "ports"))
|| Result <- emqx_mgmt_cli:vm(["ports"])],
unmock_print().
t_trace_cmd(_) ->
% ct:pal("start testing the trace command"),
mock_print(),
logger:set_primary_config(level, debug),
{ok, T} = emqtt:start_link([{clientid, <<"client">>},
{username, <<"testuser">>},
{password, <<"pass">>}
]),
emqtt:connect(T),
emqtt:subscribe(T, <<"a/b/c">>),
Trace1 = emqx_mgmt_cli:trace(["start", "client", "client",
"log/clientid_trace.log"]),
?assertMatch({match, _}, re:run(Trace1, "successfully")),
Trace2 = emqx_mgmt_cli:trace(["stop", "client", "client"]),
?assertMatch({match, _}, re:run(Trace2, "successfully")),
Trace3 = emqx_mgmt_cli:trace(["start", "client", "client",
"log/clientid_trace.log",
"error"]),
?assertMatch({match, _}, re:run(Trace3, "successfully")),
Trace4 = emqx_mgmt_cli:trace(["stop", "client", "client"]),
?assertMatch({match, _}, re:run(Trace4, "successfully")),
Trace5 = emqx_mgmt_cli:trace(["start", "topic", "a/b/c",
"log/clientid_trace.log"]),
?assertMatch({match, _}, re:run(Trace5, "successfully")),
Trace6 = emqx_mgmt_cli:trace(["stop", "topic", "a/b/c"]),
?assertMatch({match, _}, re:run(Trace6, "successfully")),
Trace7 = emqx_mgmt_cli:trace(["start", "topic", "a/b/c",
"log/clientid_trace.log", "error"]),
?assertMatch({match, _}, re:run(Trace7, "successfully")),
logger:set_primary_config(level, error),
unmock_print().
t_router_cmd(_) ->
% ct:pal("start testing the router command"),
mock_print(),
{ok, T} = emqtt:start_link([{clientid, <<"client1">>},
{username, <<"testuser1">>},
{password, <<"pass1">>}
]),
emqtt:connect(T),
emqtt:subscribe(T, <<"a/b/c">>),
{ok, T1} = emqtt:start_link([{clientid, <<"client2">>},
{username, <<"testuser2">>},
{password, <<"pass2">>}
]),
emqtt:connect(T1),
emqtt:subscribe(T1, <<"a/b/c/d">>),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:routes(["list"]), "a/b/c | a/b/c")),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:routes(["show", "a/b/c"]), "a/b/c")),
unmock_print().
t_subscriptions_cmd(_) ->
% ct:pal("Start testing the subscriptions command"),
mock_print(),
{ok, T3} = emqtt:start_link([{clientid, <<"client">>},
{username, <<"testuser">>},
{password, <<"pass">>}
]),
{ok, _} = emqtt:connect(T3),
{ok, _, _} = emqtt:subscribe(T3, <<"b/b/c">>),
timer:sleep(300),
[?assertMatch({match, _} , re:run(Result, "b/b/c"))
|| Result <- emqx_mgmt_cli:subscriptions(["show", <<"client">>])],
?assertEqual(emqx_mgmt_cli:subscriptions(["add", "client", "b/b/c", "0"]), "ok~n"),
?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"),
unmock_print().
t_listeners_cmd_old(_) ->
ok = emqx_listeners:ensure_all_started(),
mock_print(),
?assertEqual(emqx_mgmt_cli:listeners([]), ok),
?assertEqual(
"Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
emqx_mgmt_cli:listeners(["stop", "wss", "8084"])
),
unmock_print().
t_listeners_cmd_new(_) ->
ok = emqx_listeners:ensure_all_started(),
mock_print(),
?assertEqual(emqx_mgmt_cli:listeners([]), ok),
?assertEqual(
"Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
emqx_mgmt_cli:listeners(["stop", "mqtt:wss:external"])
),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "mqtt:tcp:external"]),
"Restarted mqtt:tcp:external listener successfully.\n"
),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "mqtt:ssl:external"]),
"Restarted mqtt:ssl:external listener successfully.\n"
),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "bad:listener:identifier"]),
"Failed to restart bad:listener:identifier listener: {no_such_listener,\"bad:listener:identifier\"}\n"
),
unmock_print().
t_plugins_cmd(_) ->
mock_print(),
meck:new(emqx_plugins, [non_strict, passthrough]),
meck:expect(emqx_plugins, load, fun(_) -> ok end),
meck:expect(emqx_plugins, unload, fun(_) -> ok end),
meck:expect(emqx_plugins, reload, fun(_) -> ok end),
?assertEqual(emqx_mgmt_cli:plugins(["list"]), ok),
?assertEqual(
emqx_mgmt_cli:plugins(["unload", "emqx_retainer"]),
"Plugin emqx_retainer unloaded successfully.\n"
),
?assertEqual(
emqx_mgmt_cli:plugins(["load", "emqx_retainer"]),
"Plugin emqx_retainer loaded successfully.\n"
),
?assertEqual(
emqx_mgmt_cli:plugins(["unload", "emqx_management"]),
"Plugin emqx_management can not be unloaded.~n"
),
unmock_print().
t_cli(_) ->
mock_print(),
?assertMatch({match, _}, re:run(emqx_mgmt_cli:status([""]), "status")),
[?assertMatch({match, _}, re:run(Value, "broker"))
|| Value <- emqx_mgmt_cli:broker([""])],
[?assertMatch({match, _}, re:run(Value, "cluster"))
|| Value <- emqx_mgmt_cli:cluster([""])],
[?assertMatch({match, _}, re:run(Value, "clients"))
|| Value <- emqx_mgmt_cli:clients([""])],
[?assertMatch({match, _}, re:run(Value, "routes"))
|| Value <- emqx_mgmt_cli:routes([""])],
[?assertMatch({match, _}, re:run(Value, "subscriptions"))
|| Value <- emqx_mgmt_cli:subscriptions([""])],
[?assertMatch({match, _}, re:run(Value, "plugins"))
|| Value <- emqx_mgmt_cli:plugins([""])],
[?assertMatch({match, _}, re:run(Value, "listeners"))
|| Value <- emqx_mgmt_cli:listeners([""])],
[?assertMatch({match, _}, re:run(Value, "vm"))
|| Value <- emqx_mgmt_cli:vm([""])],
[?assertMatch({match, _}, re:run(Value, "mnesia"))
|| Value <- emqx_mgmt_cli:mnesia([""])],
[?assertMatch({match, _}, re:run(Value, "trace"))
|| Value <- emqx_mgmt_cli:trace([""])],
[?assertMatch({match, _}, re:run(Value, "mgmt"))
|| Value <- emqx_mgmt_cli:mgmt([""])],
unmock_print().
mock_print() ->
catch meck:unload(emqx_ctl),
meck:new(emqx_ctl, [non_strict, passthrough]),
meck:expect(emqx_ctl, print, fun(Arg) -> emqx_ctl:format(Arg) end),
meck:expect(emqx_ctl, print, fun(Msg, Arg) -> emqx_ctl:format(Msg, Arg) end),
meck:expect(emqx_ctl, usage, fun(Usages) -> emqx_ctl:format_usage(Usages) end),
meck:expect(emqx_ctl, usage, fun(Cmd, Descr) -> emqx_ctl:format_usage(Cmd, Descr) end).
unmock_print() ->
meck:unload(emqx_ctl).

View File

@ -1,592 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_management/include/emqx_mgmt.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
-define(HOST, "http://127.0.0.1:8081/").
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx_management]),
Config.
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, Config) ->
Config.
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
get(Key, ResponseBody) ->
maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])).
lookup_alarm(Name, [#{<<"name">> := Name} | _More]) ->
true;
lookup_alarm(Name, [_Alarm | More]) ->
lookup_alarm(Name, More);
lookup_alarm(_Name, []) ->
false.
is_existing(Name, [#{name := Name} | _More]) ->
true;
is_existing(Name, [_Alarm | More]) ->
is_existing(Name, More);
is_existing(_Name, []) ->
false.
t_alarms(_) ->
emqx_alarm:activate(alarm1),
emqx_alarm:activate(alarm2),
?assert(is_existing(alarm1, emqx_alarm:get_alarms(activated))),
?assert(is_existing(alarm2, emqx_alarm:get_alarms(activated))),
{ok, Return1} = request_api(get, api_path(["alarms/activated"]), auth_header_()),
?assert(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))),
?assert(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return1))))),
emqx_alarm:deactivate(alarm1),
{ok, Return2} = request_api(get, api_path(["alarms"]), auth_header_()),
?assert(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return2))))),
?assert(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return2))))),
{ok, Return3} = request_api(get, api_path(["alarms/deactivated"]), auth_header_()),
?assert(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return3))))),
?assertNot(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return3))))),
emqx_alarm:deactivate(alarm2),
{ok, Return4} = request_api(get, api_path(["alarms/deactivated"]), auth_header_()),
?assert(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return4))))),
?assert(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return4))))),
{ok, _} = request_api(delete, api_path(["alarms/deactivated"]), auth_header_()),
{ok, Return5} = request_api(get, api_path(["alarms/deactivated"]), auth_header_()),
?assertNot(lookup_alarm(<<"alarm1">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return5))))),
?assertNot(lookup_alarm(<<"alarm2">>, maps:get(<<"alarms">>, lists:nth(1, get(<<"data">>, Return5))))).
t_apps(_) ->
AppId = <<"123456">>,
meck:new(emqx_mgmt_auth, [passthrough, no_history]),
meck:expect(emqx_mgmt_auth, add_app, 6, fun(_, _, _, _, _, _) -> {error, undefined} end),
{ok, Error1} = request_api(post, api_path(["apps"]), [],
auth_header_(), #{<<"app_id">> => AppId,
<<"name">> => <<"test">>,
<<"status">> => true}),
?assertMatch(<<"undefined">>, get(<<"message">>, Error1)),
meck:expect(emqx_mgmt_auth, del_app, 1, fun(_) -> {error, undefined} end),
{ok, Error2} = request_api(delete, api_path(["apps", binary_to_list(AppId)]), auth_header_()),
?assertMatch(<<"undefined">>, get(<<"message">>, Error2)),
meck:unload(emqx_mgmt_auth),
{ok, NoApp} = request_api(get, api_path(["apps", binary_to_list(AppId)]), auth_header_()),
?assertEqual(0, maps:size(get(<<"data">>, NoApp))),
{ok, NotFound} = request_api(put, api_path(["apps", binary_to_list(AppId)]), [],
auth_header_(), #{<<"name">> => <<"test 2">>,
<<"status">> => true}),
?assertEqual(<<"not_found">>, get(<<"message">>, NotFound)),
{ok, _} = request_api(post, api_path(["apps"]), [],
auth_header_(), #{<<"app_id">> => AppId,
<<"name">> => <<"test">>,
<<"status">> => true}),
{ok, _} = request_api(get, api_path(["apps"]), auth_header_()),
{ok, _} = request_api(get, api_path(["apps", binary_to_list(AppId)]), auth_header_()),
{ok, _} = request_api(put, api_path(["apps", binary_to_list(AppId)]), [],
auth_header_(), #{<<"name">> => <<"test 2">>,
<<"status">> => true}),
{ok, AppInfo} = request_api(get, api_path(["apps", binary_to_list(AppId)]), auth_header_()),
?assertEqual(<<"test 2">>, maps:get(<<"name">>, get(<<"data">>, AppInfo))),
{ok, _} = request_api(delete, api_path(["apps", binary_to_list(AppId)]), auth_header_()),
{ok, Result} = request_api(get, api_path(["apps"]), auth_header_()),
[App] = get(<<"data">>, Result),
?assertEqual(<<"admin">>, maps:get(<<"app_id">>, App)).
t_banned(_) ->
Who = <<"myclient">>,
{ok, _} = request_api(post, api_path(["banned"]), [],
auth_header_(), #{<<"who">> => Who,
<<"as">> => <<"clientid">>,
<<"reason">> => <<"test">>,
<<"by">> => <<"dashboard">>,
<<"at">> => erlang:system_time(second),
<<"until">> => erlang:system_time(second) + 10}),
{ok, Result} = request_api(get, api_path(["banned"]), auth_header_()),
[Banned] = get(<<"data">>, Result),
?assertEqual(Who, maps:get(<<"who">>, Banned)),
{ok, _} = request_api(delete, api_path(["banned", "clientid", binary_to_list(Who)]), auth_header_()),
{ok, Result2} = request_api(get, api_path(["banned"]), auth_header_()),
?assertEqual([], get(<<"data">>, Result2)).
t_brokers(_) ->
{ok, _} = request_api(get, api_path(["brokers"]), auth_header_()),
{ok, _} = request_api(get, api_path(["brokers", atom_to_list(node())]), auth_header_()),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, lookup_broker, 1, fun(_) -> {error, undefined} end),
{ok, Error} = request_api(get, api_path(["brokers", atom_to_list(node())]), auth_header_()),
?assertEqual(<<"undefined">>, get(<<"message">>, Error)),
meck:unload(emqx_mgmt).
t_clients(_) ->
process_flag(trap_exit, true),
Username1 = <<"user1">>,
Username2 = <<"user2">>,
ClientId1 = <<"client1">>,
ClientId2 = <<"client2">>,
{ok, C1} = emqtt:start_link(#{username => Username1, clientid => ClientId1}),
{ok, _} = emqtt:connect(C1),
{ok, C2} = emqtt:start_link(#{username => Username2, clientid => ClientId2}),
{ok, _} = emqtt:connect(C2),
timer:sleep(300),
{ok, Clients1} = request_api(get, api_path(["clients", binary_to_list(ClientId1)])
, auth_header_()),
?assertEqual(<<"client1">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients1)))),
{ok, Clients2} = request_api(get, api_path(["nodes", atom_to_list(node()),
"clients", binary_to_list(ClientId2)])
, auth_header_()),
?assertEqual(<<"client2">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients2)))),
{ok, Clients3} = request_api(get, api_path(["clients",
"username", binary_to_list(Username1)]),
auth_header_()),
?assertEqual(<<"client1">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients3)))),
{ok, Clients4} = request_api(get, api_path(["nodes", atom_to_list(node()),
"clients",
"username", binary_to_list(Username2)])
, auth_header_()),
?assertEqual(<<"client2">>, maps:get(<<"clientid">>, lists:nth(1, get(<<"data">>, Clients4)))),
{ok, Clients5} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()),
?assertEqual(2, maps:get(<<"count">>, get(<<"meta">>, Clients5))),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, kickout_client, 1, fun(_) -> {error, undefined} end),
{ok, MeckRet1} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
?assertEqual(?ERROR1, get(<<"code">>, MeckRet1)),
meck:expect(emqx_mgmt, clean_acl_cache, 1, fun(_) -> {error, undefined} end),
{ok, MeckRet2} = request_api(delete, api_path(["clients", binary_to_list(ClientId1), "acl_cache"]), auth_header_()),
?assertEqual(?ERROR1, get(<<"code">>, MeckRet2)),
meck:expect(emqx_mgmt, list_acl_cache, 1, fun(_) -> {error, undefined} end),
{ok, MeckRet3} = request_api(get, api_path(["clients", binary_to_list(ClientId2), "acl_cache"]), auth_header_()),
?assertEqual(?ERROR1, get(<<"code">>, MeckRet3)),
meck:unload(emqx_mgmt),
{ok, Ok} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
?assertEqual(?SUCCESS, get(<<"code">>, Ok)),
timer:sleep(300),
{ok, NotFound0} = request_api(delete, api_path(["clients", binary_to_list(ClientId1)]), auth_header_()),
?assertEqual(?ERROR12, get(<<"code">>, NotFound0)),
{ok, Clients6} = request_api(get, api_path(["clients"]), "_limit=100&_page=1", auth_header_()),
?assertEqual(1, maps:get(<<"count">>, get(<<"meta">>, Clients6))),
{ok, NotFound1} = request_api(get, api_path(["clients", binary_to_list(ClientId1), "acl_cache"]), auth_header_()),
?assertEqual(?ERROR12, get(<<"code">>, NotFound1)),
{ok, NotFound2} = request_api(delete, api_path(["clients", binary_to_list(ClientId1), "acl_cache"]), auth_header_()),
?assertEqual(?ERROR12, get(<<"code">>, NotFound2)),
{ok, EmptyAclCache} = request_api(get, api_path(["clients", binary_to_list(ClientId2), "acl_cache"]), auth_header_()),
?assertEqual(0, length(get(<<"data">>, EmptyAclCache))),
{ok, Ok1} = request_api(delete, api_path(["clients", binary_to_list(ClientId2), "acl_cache"]), auth_header_()),
?assertEqual(?SUCCESS, get(<<"code">>, Ok1)).
receive_exit(0) ->
ok;
receive_exit(Count) ->
receive
{'EXIT', Client, {shutdown, tcp_closed}} ->
ct:log("receive exit signal, Client: ~p", [Client]),
receive_exit(Count - 1);
{'EXIT', Client, _Reason} ->
ct:log("receive exit signal, Client: ~p", [Client]),
receive_exit(Count - 1)
after 1000 ->
ct:log("timeout")
end.
t_listeners(_) ->
{ok, _} = request_api(get, api_path(["listeners"]), auth_header_()),
{ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "listeners"]), auth_header_()),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, list_listeners, 0, fun() -> [{node(), {error, undefined}}] end),
{ok, Return} = request_api(get, api_path(["listeners"]), auth_header_()),
[Error] = get(<<"data">>, Return),
?assertEqual(<<"undefined">>,
maps:get(<<"error">>, maps:get(<<"listeners">>, Error))),
meck:unload(emqx_mgmt).
t_metrics(_) ->
{ok, _} = request_api(get, api_path(["metrics"]), auth_header_()),
{ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "metrics"]), auth_header_()),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, get_metrics, 1, fun(_) -> {error, undefined} end),
{ok, "{\"message\":\"undefined\"}"} = request_api(get, api_path(["nodes", atom_to_list(node()), "metrics"]), auth_header_()),
meck:unload(emqx_mgmt).
t_nodes(_) ->
{ok, _} = request_api(get, api_path(["nodes"]), auth_header_()),
{ok, _} = request_api(get, api_path(["nodes", atom_to_list(node())]), auth_header_()),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, list_nodes, 0, fun() -> [{node(), {error, undefined}}] end),
{ok, Return} = request_api(get, api_path(["nodes"]), auth_header_()),
[Error] = get(<<"data">>, Return),
?assertEqual(<<"undefined">>, maps:get(<<"error">>, Error)),
meck:unload(emqx_mgmt).
% t_plugins(_) ->
% application:ensure_all_started(emqx_retainer),
% {ok, Plugins1} = request_api(get, api_path(["plugins"]), auth_header_()),
% [Plugins11] = filter(get(<<"data">>, Plugins1), <<"node">>, atom_to_binary(node(), utf8)),
% [Plugin1] = filter(maps:get(<<"plugins">>, Plugins11), <<"name">>, <<"emqx_retainer">>),
% ?assertEqual(<<"emqx_retainer">>, maps:get(<<"name">>, Plugin1)),
% ?assertEqual(true, maps:get(<<"active">>, Plugin1)),
%
% {ok, _} = request_api(put,
% api_path(["plugins",
% atom_to_list(emqx_retainer),
% "unload"]),
% auth_header_()),
% {ok, Error1} = request_api(put,
% api_path(["plugins",
% atom_to_list(emqx_retainer),
% "unload"]),
% auth_header_()),
% ?assertEqual(<<"not_started">>, get(<<"message">>, Error1)),
% {ok, Plugins2} = request_api(get,
% api_path(["nodes", atom_to_list(node()), "plugins"]),
% auth_header_()),
% [Plugin2] = filter(get(<<"data">>, Plugins2), <<"name">>, <<"emqx_retainer">>),
% ?assertEqual(<<"emqx_retainer">>, maps:get(<<"name">>, Plugin2)),
% ?assertEqual(false, maps:get(<<"active">>, Plugin2)),
%
% {ok, _} = request_api(put,
% api_path(["nodes",
% atom_to_list(node()),
% "plugins",
% atom_to_list(emqx_retainer),
% "load"]),
% auth_header_()),
% {ok, Plugins3} = request_api(get,
% api_path(["nodes", atom_to_list(node()), "plugins"]),
% auth_header_()),
% [Plugin3] = filter(get(<<"data">>, Plugins3), <<"name">>, <<"emqx_retainer">>),
% ?assertEqual(<<"emqx_retainer">>, maps:get(<<"name">>, Plugin3)),
% ?assertEqual(true, maps:get(<<"active">>, Plugin3)),
%
% {ok, _} = request_api(put,
% api_path(["nodes",
% atom_to_list(node()),
% "plugins",
% atom_to_list(emqx_retainer),
% "unload"]),
% auth_header_()),
% {ok, Error2} = request_api(put,
% api_path(["nodes",
% atom_to_list(node()),
% "plugins",
% atom_to_list(emqx_retainer),
% "unload"]),
% auth_header_()),
% ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)),
% application:stop(emqx_retainer).
t_acl_cache(_) ->
ClientId = <<"client1">>,
Topic = <<"mytopic">>,
{ok, C1} = emqtt:start_link(#{clientid => ClientId}),
{ok, _} = emqtt:connect(C1),
{ok, _, _} = emqtt:subscribe(C1, Topic, 2),
%% get acl cache, should not be empty
{ok, Result} = request_api(get, api_path(["clients", binary_to_list(ClientId), "acl_cache"]), [], auth_header_()),
#{<<"code">> := 0, <<"data">> := Caches} = jiffy:decode(list_to_binary(Result), [return_maps]),
?assert(length(Caches) > 0),
?assertMatch(#{<<"access">> := <<"subscribe">>,
<<"topic">> := Topic,
<<"result">> := <<"allow">>,
<<"updated_time">> := _}, hd(Caches)),
%% clear acl cache
{ok, Result2} = request_api(delete, api_path(["clients", binary_to_list(ClientId), "acl_cache"]), [], auth_header_()),
?assertMatch(#{<<"code">> := 0}, jiffy:decode(list_to_binary(Result2), [return_maps])),
%% get acl cache again, after the acl cache is cleared
{ok, Result3} = request_api(get, api_path(["clients", binary_to_list(ClientId), "acl_cache"]), [], auth_header_()),
#{<<"code">> := 0, <<"data">> := Caches3} = jiffy:decode(list_to_binary(Result3), [return_maps]),
?assertEqual(0, length(Caches3)),
ok = emqtt:disconnect(C1).
t_pubsub(_) ->
Qos1Received = emqx_metrics:val('messages.qos1.received'),
Qos2Received = emqx_metrics:val('messages.qos2.received'),
Received = emqx_metrics:val('messages.received'),
ClientId = <<"client1">>,
Options = #{clientid => ClientId,
proto_ver => 5},
Topic = <<"mytopic">>,
{ok, C1} = emqtt:start_link(Options),
{ok, _} = emqtt:connect(C1),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, subscribe, 2, fun(_, _) -> {error, undefined} end),
{ok, NotFound1} = request_api(post, api_path(["mqtt/subscribe"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topic">> => Topic,
<<"qos">> => 2}),
?assertEqual(?ERROR12, get(<<"code">>, NotFound1)),
meck:unload(emqx_mgmt),
{ok, BadTopic1} = request_api(post, api_path(["mqtt/subscribe"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topics">> => <<"">>,
<<"qos">> => 2}),
?assertEqual(?ERROR15, get(<<"code">>, BadTopic1)),
{ok, BadTopic2} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topics">> => <<"">>,
<<"qos">> => 1,
<<"payload">> => <<"hello">>}),
?assertEqual(?ERROR15, get(<<"code">>, BadTopic2)),
{ok, BadTopic3} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topic">> => <<"">>}),
?assertEqual(?ERROR15, get(<<"code">>, BadTopic3)),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, unsubscribe, 2, fun(_, _) -> {error, undefined} end),
{ok, NotFound2} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topic">> => Topic}),
?assertEqual(?ERROR12, get(<<"code">>, NotFound2)),
meck:unload(emqx_mgmt),
{ok, Code} = request_api(post, api_path(["mqtt/subscribe"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topic">> => Topic,
<<"qos">> => 2}),
?assertEqual(?SUCCESS, get(<<"code">>, Code)),
{ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topic">> => <<"mytopic">>,
<<"qos">> => 1,
<<"payload">> => <<"hello">>}),
?assert(receive
{publish, #{payload := <<"hello">>}} ->
true
after 100 ->
false
end),
%% json payload
{ok, Code} = request_api(post, api_path(["mqtt/publish"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topic">> => <<"mytopic">>,
<<"qos">> => 1,
<<"payload">> => #{body => "hello world"}}),
Payload = emqx_json:encode(#{body => "hello world"}),
?assert(receive
{publish, #{payload := Payload}} ->
true
after 100 ->
false
end),
{ok, Code} = request_api(post, api_path(["mqtt/unsubscribe"]), [], auth_header_(),
#{<<"clientid">> => ClientId,
<<"topic">> => Topic}),
%% tests subscribe_batch
Topic_list = [<<"mytopic1">>, <<"mytopic2">>],
[ {ok, _, [2]} = emqtt:subscribe(C1, Topics, 2) || Topics <- Topic_list],
Body1 = [ #{<<"clientid">> => ClientId, <<"topic">> => Topics, <<"qos">> => 2} || Topics <- Topic_list],
{ok, Data1} = request_api(post, api_path(["mqtt/subscribe_batch"]), [], auth_header_(), Body1),
loop(maps:get(<<"data">>, jiffy:decode(list_to_binary(Data1), [return_maps]))),
%% tests publish_batch
Body2 = [ #{<<"clientid">> => ClientId, <<"topic">> => Topics, <<"qos">> => 2, <<"retain">> => <<"false">>, <<"payload">> => #{body => "hello world"}} || Topics <- Topic_list ],
{ok, Data2} = request_api(post, api_path(["mqtt/publish_batch"]), [], auth_header_(), Body2),
loop(maps:get(<<"data">>, jiffy:decode(list_to_binary(Data2), [return_maps]))),
[ ?assert(receive
{publish, #{topic := Topics}} ->
true
after 100 ->
false
end) || Topics <- Topic_list ],
%% tests unsubscribe_batch
Body3 = [#{<<"clientid">> => ClientId, <<"topic">> => Topics} || Topics <- Topic_list],
{ok, Data3} = request_api(post, api_path(["mqtt/unsubscribe_batch"]), [], auth_header_(), Body3),
loop(maps:get(<<"data">>, jiffy:decode(list_to_binary(Data3), [return_maps]))),
ok = emqtt:disconnect(C1),
?assertEqual(2, emqx_metrics:val('messages.qos1.received') - Qos1Received),
?assertEqual(2, emqx_metrics:val('messages.qos2.received') - Qos2Received),
?assertEqual(4, emqx_metrics:val('messages.received') - Received).
loop([]) -> [];
loop(Data) ->
[H | T] = Data,
ct:pal("H: ~p~n", [H]),
?assertEqual(0, maps:get(<<"code">>, H)),
loop(T).
t_routes_and_subscriptions(_) ->
ClientId = <<"myclient">>,
Topic = <<"mytopic">>,
{ok, NonRoute} = request_api(get, api_path(["routes"]), auth_header_()),
?assertEqual([], get(<<"data">>, NonRoute)),
{ok, NonSubscription} = request_api(get, api_path(["subscriptions"]), auth_header_()),
?assertEqual([], get(<<"data">>, NonSubscription)),
{ok, NonSubscription1} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), auth_header_()),
?assertEqual([], get(<<"data">>, NonSubscription1)),
{ok, NonSubscription2} = request_api(get,
api_path(["subscriptions", binary_to_list(ClientId)]),
auth_header_()),
?assertEqual([], get(<<"data">>, NonSubscription2)),
{ok, NonSubscription3} = request_api(get, api_path(["nodes",
atom_to_list(node()),
"subscriptions",
binary_to_list(ClientId)])
, auth_header_()),
?assertEqual([], get(<<"data">>, NonSubscription3)),
{ok, C1} = emqtt:start_link(#{clean_start => true,
clientid => ClientId,
proto_ver => ?MQTT_PROTO_V5}),
{ok, _} = emqtt:connect(C1),
{ok, _, [2]} = emqtt:subscribe(C1, Topic, qos2),
{ok, Result} = request_api(get, api_path(["routes"]), auth_header_()),
[Route] = get(<<"data">>, Result),
?assertEqual(Topic, maps:get(<<"topic">>, Route)),
{ok, Result2} = request_api(get, api_path(["routes", binary_to_list(Topic)]), auth_header_()),
[Route] = get(<<"data">>, Result2),
{ok, Result3} = request_api(get, api_path(["subscriptions"]), auth_header_()),
[Subscription] = get(<<"data">>, Result3),
?assertEqual(Topic, maps:get(<<"topic">>, Subscription)),
?assertEqual(ClientId, maps:get(<<"clientid">>, Subscription)),
{ok, Result3} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), auth_header_()),
{ok, Result4} = request_api(get, api_path(["subscriptions", binary_to_list(ClientId)]), auth_header_()),
[Subscription] = get(<<"data">>, Result4),
{ok, Result4} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions", binary_to_list(ClientId)])
, auth_header_()),
ok = emqtt:disconnect(C1).
t_stats(_) ->
{ok, _} = request_api(get, api_path(["stats"]), auth_header_()),
{ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "stats"]), auth_header_()),
meck:new(emqx_mgmt, [passthrough, no_history]),
meck:expect(emqx_mgmt, get_stats, 1, fun(_) -> {error, undefined} end),
{ok, Return} = request_api(get, api_path(["nodes", atom_to_list(node()), "stats"]), auth_header_()),
?assertEqual(<<"undefined">>, get(<<"message">>, Return)),
meck:unload(emqx_mgmt).
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).
request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code =:= 200 orelse Code =:= 201 ->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
auth_header_() ->
AppId = <<"admin">>,
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
api_path(Parts)->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).
filter(List, Key, Value) ->
lists:filter(fun(Item) ->
maps:get(Key, Item) == Value
end, List).

View File

@ -1,43 +0,0 @@
emqx_management:{
applications: [
{
id: "admin",
secret: "public"
}
]
max_row_limit: 10000
listeners: [
{
num_acceptors: 4
max_connections: 512
protocol: http
port: 8080
backlog: 512
send_timeout: 15s
send_timeout_close: on
inet6: false
ipv6_v6only: false
}
## ,
## {
## protocol: https
## port: 8081
## acceptors: 2
## backlog: 512
## send_timeout: 15s
## send_timeout_close: on
## inet6: false
## ipv6_v6only: false
## certfile = "etc/certs/cert.pem"
## keyfile = "etc/certs/key.pem"
## cacertfile = "etc/certs/cacert.pem"
## verify = verify_peer
## tls_versions = "tlsv1.3,tlsv1.2,tlsv1.1,tlsv1"
## ciphers = "TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_CCM_SHA256,TLS_AES_128_CCM_8_SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA"
## fail_if_no_peer_cert = true
## inet6 = false
## ipv6_v6only = false
## }
]
}

View File

@ -1,24 +0,0 @@
##--------------------------------------------------------------------
## Reloader Plugin
##--------------------------------------------------------------------
## Interval of hot code reloading.
##
## Value: Duration
## - h: hour
## - m: minute
## - s: second
##
## Examples:
## - 2h: 2 hours
## - 30m: 30 minutes
## - 20s: 20 seconds
##
## Defaut: 60s
reloader.interval = 60s
## Logfile of reloader.
##
## Value: File
reloader.logfile = reloader.log

View File

@ -1,252 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ Management Console.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2012-2016 Pivotal Software, Inc. All rights reserved.
%%
-module(rfc6455_client).
-export([new/2, open/1, recv/1, send/2, send_binary/2, close/1, close/2]).
-record(state, {host, port, addr, path, ppid, socket, data, phase}).
%% --------------------------------------------------------------------------
new(WsUrl, PPid) ->
crypto:start(),
"ws://" ++ Rest = WsUrl,
[Addr, Path] = split("/", Rest, 1),
[Host, MaybePort] = split(":", Addr, 1, empty),
Port = case MaybePort of
empty -> 80;
V -> {I, ""} = string:to_integer(V), I
end,
State = #state{host = Host,
port = Port,
addr = Addr,
path = "/" ++ Path,
ppid = PPid},
spawn(fun() ->
start_conn(State)
end).
open(WS) ->
receive
{rfc6455, open, WS, Opts} ->
{ok, Opts};
{rfc6455, close, WS, R} ->
{close, R}
end.
recv(WS) ->
receive
{rfc6455, recv, WS, Payload} ->
{ok, Payload};
{rfc6455, recv_binary, WS, Payload} ->
{binary, Payload};
{rfc6455, close, WS, R} ->
{close, R}
end.
send(WS, IoData) ->
WS ! {send, IoData},
ok.
send_binary(WS, IoData) ->
WS ! {send_binary, IoData},
ok.
close(WS) ->
close(WS, {1000, ""}).
close(WS, WsReason) ->
WS ! {close, WsReason},
receive
{rfc6455, close, WS, R} ->
{close, R}
end.
%% --------------------------------------------------------------------------
start_conn(State) ->
{ok, Socket} = gen_tcp:connect(State#state.host, State#state.port,
[binary,
{packet, 0}]),
Key = base64:encode_to_string(crypto:strong_rand_bytes(16)),
gen_tcp:send(Socket,
"GET " ++ State#state.path ++ " HTTP/1.1\r\n" ++
"Host: " ++ State#state.addr ++ "\r\n" ++
"Upgrade: websocket\r\n" ++
"Connection: Upgrade\r\n" ++
"Sec-WebSocket-Key: " ++ Key ++ "\r\n" ++
"Origin: null\r\n" ++
"Sec-WebSocket-Protocol: mqtt\r\n" ++
"Sec-WebSocket-Version: 13\r\n\r\n"),
loop(State#state{socket = Socket,
data = <<>>,
phase = opening}).
do_recv(State = #state{phase = opening, ppid = PPid, data = Data}) ->
case split("\r\n\r\n", binary_to_list(Data), 1, empty) of
[_Http, empty] -> State;
[Http, Data1] ->
%% TODO: don't ignore http response data, verify key
PPid ! {rfc6455, open, self(), [{http_response, Http}]},
State#state{phase = open,
data = Data1}
end;
do_recv(State = #state{phase = Phase, data = Data, socket = Socket, ppid = PPid})
when Phase =:= open orelse Phase =:= closing ->
R = case Data of
<<F:1, _:3, O:4, 0:1, L:7, Payload:L/binary, Rest/binary>>
when L < 126 ->
{F, O, Payload, Rest};
<<F:1, _:3, O:4, 0:1, 126:7, L2:16, Payload:L2/binary, Rest/binary>> ->
{F, O, Payload, Rest};
<<F:1, _:3, O:4, 0:1, 127:7, L2:64, Payload:L2/binary, Rest/binary>> ->
{F, O, Payload, Rest};
<<_:1, _:3, _:4, 1:1, _/binary>> ->
%% According o rfc6455 5.1 the server must not mask any frames.
die(Socket, PPid, {1006, "Protocol error"}, normal);
_ ->
moredata
end,
case R of
moredata ->
State;
_ -> do_recv2(State, R)
end.
do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid}, R) ->
case R of
{1, 1, Payload, Rest} ->
PPid ! {rfc6455, recv, self(), Payload},
State#state{data = Rest};
{1, 2, Payload, Rest} ->
PPid ! {rfc6455, recv_binary, self(), Payload},
State#state{data = Rest};
{1, 8, Payload, _Rest} ->
WsReason = case Payload of
<<WC:16, WR/binary>> -> {WC, WR};
<<>> -> {1005, "No status received"}
end,
case Phase of
open -> %% echo
do_close(State, WsReason),
gen_tcp:close(Socket);
closing ->
ok
end,
die(Socket, PPid, WsReason, normal);
{_, _, _, _Rest2} ->
io:format("Unknown frame type~n"),
die(Socket, PPid, {1006, "Unknown frame type"}, normal)
end.
encode_frame(F, O, Payload) ->
Mask = crypto:strong_rand_bytes(4),
MaskedPayload = apply_mask(Mask, iolist_to_binary(Payload)),
L = byte_size(MaskedPayload),
IoData = case L of
_ when L < 126 ->
[<<F:1, 0:3, O:4, 1:1, L:7>>, Mask, MaskedPayload];
_ when L < 65536 ->
[<<F:1, 0:3, O:4, 1:1, 126:7, L:16>>, Mask, MaskedPayload];
_ ->
[<<F:1, 0:3, O:4, 1:1, 127:7, L:64>>, Mask, MaskedPayload]
end,
iolist_to_binary(IoData).
do_send(State = #state{socket = Socket}, Payload) ->
gen_tcp:send(Socket, encode_frame(1, 1, Payload)),
State.
do_send_binary(State = #state{socket = Socket}, Payload) ->
gen_tcp:send(Socket, encode_frame(1, 2, Payload)),
State.
do_close(State = #state{socket = Socket}, {Code, Reason}) ->
Payload = iolist_to_binary([<<Code:16>>, Reason]),
gen_tcp:send(Socket, encode_frame(1, 8, Payload)),
State#state{phase = closing}.
loop(State = #state{socket = Socket, ppid = PPid, data = Data,
phase = Phase}) ->
receive
{tcp, Socket, Bin} ->
State1 = State#state{data = iolist_to_binary([Data, Bin])},
loop(do_recv(State1));
{send, Payload} when Phase == open ->
loop(do_send(State, Payload));
{send_binary, Payload} when Phase == open ->
loop(do_send_binary(State, Payload));
{tcp_closed, Socket} ->
die(Socket, PPid, {1006, "Connection closed abnormally"}, normal);
{close, WsReason} when Phase == open ->
loop(do_close(State, WsReason))
end.
die(Socket, PPid, WsReason, Reason) ->
gen_tcp:shutdown(Socket, read_write),
PPid ! {rfc6455, close, self(), WsReason},
exit(Reason).
%% --------------------------------------------------------------------------
split(SubStr, Str, Limit) ->
split(SubStr, Str, Limit, "").
split(SubStr, Str, Limit, Default) ->
Acc = split(SubStr, Str, Limit, [], Default),
lists:reverse(Acc).
split(_SubStr, Str, 0, Acc, _Default) -> [Str | Acc];
split(SubStr, Str, Limit, Acc, Default) ->
{L, R} = case string:str(Str, SubStr) of
0 -> {Str, Default};
I -> {string:substr(Str, 1, I-1),
string:substr(Str, I+length(SubStr))}
end,
split(SubStr, R, Limit-1, [L | Acc], Default).
apply_mask(Mask, Data) when is_number(Mask) ->
apply_mask(<<Mask:32>>, Data);
apply_mask(<<0:32>>, Data) ->
Data;
apply_mask(Mask, Data) ->
iolist_to_binary(lists:reverse(apply_mask2(Mask, Data, []))).
apply_mask2(M = <<Mask:32>>, <<Data:32, Rest/binary>>, Acc) ->
T = Data bxor Mask,
apply_mask2(M, Rest, [<<T:32>> | Acc]);
apply_mask2(<<Mask:24, _:8>>, <<Data:24>>, Acc) ->
T = Data bxor Mask,
[<<T:24>> | Acc];
apply_mask2(<<Mask:16, _:16>>, <<Data:16>>, Acc) ->
T = Data bxor Mask,
[<<T:16>> | Acc];
apply_mask2(<<Mask:8, _:24>>, <<Data:8>>, Acc) ->
T = Data bxor Mask,
[<<T:8>> | Acc];
apply_mask2(_, <<>>, Acc) ->
Acc.

View File

@ -1,19 +0,0 @@
%% @author:
%% @description:
-module(test_utils).
%% ====================================================================
%% API functions
%% ====================================================================
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-compile([export_all, nowarn_export_all]).
%% ====================================================================
%% Internal functions
%% ====================================================================
resource_is_alive(Id) ->
{ok, #resource_params{status = #{is_alive := Alive}} = Params} = emqx_rule_registry:find_resource_params(Id),
ct:pal("Id: ~p, Alive: ~p, Resource ===> :~p~n", [Id, Alive, Params]),
?assertEqual(true, Alive),
Alive.

View File

@ -16,8 +16,6 @@
-module(emqx_mod_api_topic_metrics).
-import(minirest, [return/1]).
-rest_api(#{name => list_all_topic_metrics,
method => 'GET',
path => "/topic-metrics",
@ -203,3 +201,7 @@ rpc_call(Node, Fun, Args) ->
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
return(_) ->
%% TODO: V5 API
ok.

View File

@ -16,8 +16,6 @@
-module(emqx_modules_api).
-import(minirest, [return/1]).
-rest_api(#{name => list_all_modules,
method => 'GET',
path => "/modules/",
@ -167,3 +165,7 @@ name(emqx_mod_presence) -> presence;
name(emqx_mod_recon) -> recon;
name(emqx_mod_rewrite) -> rewrite;
name(emqx_mod_topic_metrics) -> topic_metrics.
return(_) ->
%% TODO: V5 API
ok.

View File

@ -58,59 +58,60 @@ t_list(_) ->
?assertMatch([_ | _ ], emqx_modules:list()),
emqx_modules:unload(presence).
t_modules_api(_) ->
emqx_modules:load(presence, #{qos => 1}),
timer:sleep(50),
{ok, Modules1} = request_api(get, api_path(["modules"]), auth_header_()),
[Modules11] = filter(get(<<"data">>, Modules1), <<"node">>, atom_to_binary(node(), utf8)),
[Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"presence">>),
?assertEqual(<<"presence">>, maps:get(<<"name">>, Module1)),
{ok, _} = request_api(put,
api_path(["modules",
atom_to_list(presence),
"unload"]),
auth_header_()),
{ok, Error1} = request_api(put,
api_path(["modules",
atom_to_list(presence),
"unload"]),
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error1)),
{ok, Modules2} = request_api(get,
api_path(["nodes", atom_to_list(node()), "modules"]),
auth_header_()),
[Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"presence">>),
?assertEqual(<<"presence">>, maps:get(<<"name">>, Module2)),
{ok, _} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(presence),
"load"]),
auth_header_()),
{ok, Modules3} = request_api(get,
api_path(["nodes", atom_to_list(node()), "modules"]),
auth_header_()),
[Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"presence">>),
?assertEqual(<<"presence">>, maps:get(<<"name">>, Module3)),
{ok, _} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(presence),
"unload"]),
auth_header_()),
{ok, Error2} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(presence),
"unload"]),
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error2)),
emqx_modules:unload(presence).
%% TODO: V5 API
%%t_modules_api(_) ->
%% emqx_modules:load(presence, #{qos => 1}),
%% timer:sleep(50),
%% {ok, Modules1} = request_api(get, api_path(["modules"]), auth_header_()),
%% [Modules11] = filter(get(<<"data">>, Modules1), <<"node">>, atom_to_binary(node(), utf8)),
%% [Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"presence">>),
%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module1)),
%% {ok, _} = request_api(put,
%% api_path(["modules",
%% atom_to_list(presence),
%% "unload"]),
%% auth_header_()),
%% {ok, Error1} = request_api(put,
%% api_path(["modules",
%% atom_to_list(presence),
%% "unload"]),
%% auth_header_()),
%% ?assertEqual(<<"not_started">>, get(<<"message">>, Error1)),
%% {ok, Modules2} = request_api(get,
%% api_path(["nodes", atom_to_list(node()), "modules"]),
%% auth_header_()),
%% [Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"presence">>),
%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module2)),
%%
%% {ok, _} = request_api(put,
%% api_path(["nodes",
%% atom_to_list(node()),
%% "modules",
%% atom_to_list(presence),
%% "load"]),
%% auth_header_()),
%% {ok, Modules3} = request_api(get,
%% api_path(["nodes", atom_to_list(node()), "modules"]),
%% auth_header_()),
%% [Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"presence">>),
%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module3)),
%%
%% {ok, _} = request_api(put,
%% api_path(["nodes",
%% atom_to_list(node()),
%% "modules",
%% atom_to_list(presence),
%% "unload"]),
%% auth_header_()),
%% {ok, Error2} = request_api(put,
%% api_path(["nodes",
%% atom_to_list(node()),
%% "modules",
%% atom_to_list(presence),
%% "unload"]),
%% auth_header_()),
%% ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)),
%% emqx_modules:unload(presence).
t_modules_cmd(_) ->

View File

@ -25,8 +25,6 @@
-include_lib("prometheus/include/prometheus.hrl").
-include_lib("prometheus/include/prometheus_model.hrl").
-import(minirest, [return/1]).
-rest_api(#{name => stats,
method => 'GET',
path => "/emqx_prometheus",
@ -610,3 +608,7 @@ emqx_cluster_data() ->
#{running_nodes := Running, stopped_nodes := Stopped} = ekka_mnesia:cluster_info(),
[{nodes_running, length(Running)},
{nodes_stopped, length(Stopped)}].
%% TODO: V5 API
return(_) ->
ok.

View File

@ -36,7 +36,7 @@
lookup_config(_Bindings, _Params) ->
Config = emqx_config:get([emqx_retainer]),
minirest:return({ok, Config}).
return({ok, Config}).
update_config(_Bindings, Params) ->
try
@ -47,9 +47,9 @@ update_config(_Bindings, Params) ->
#{emqx_retainer := Conf} = hocon_schema:richmap_to_map(RichConf),
Action = proplists:get_value(<<"action">>, Params, undefined),
do_update_config(Action, Conf),
minirest:return()
return()
catch _:_:Reason ->
minirest:return({error, Reason})
return({error, Reason})
end.
%%------------------------------------------------------------------------------
@ -59,3 +59,9 @@ do_update_config(undefined, Config) ->
emqx_retainer:update_config(Config);
do_update_config(<<"test">>, _) ->
ok.
%% TODO: V5 API
return() ->
ok.
return(_) ->
ok.

View File

@ -35,7 +35,9 @@
-define(BASE_PATH, "api").
all() ->
emqx_ct:all(?MODULE).
%% TODO: V5 API
%% emqx_ct:all(?MODULE).
[].
groups() ->
[].

View File

@ -21,8 +21,6 @@
-logger_header("[RuleEngineAPI]").
-import(minirest, [return/1]).
-rest_api(#{name => create_rule,
method => 'POST',
path => "/rules/",
@ -552,3 +550,6 @@ get_rule_metrics(Id) ->
get_action_metrics(Id) ->
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]))
|| Node <- ekka_mnesia:running_nodes()].
%% TODO: V5 API
return(_) -> ok.

View File

@ -54,14 +54,15 @@ groups() ->
[t_inspect_action
,t_republish_action
]},
{api, [],
[t_crud_rule_api,
t_list_actions_api,
t_show_action_api,
t_crud_resources_api,
t_list_resource_types_api,
t_show_resource_type_api
]},
%% TODO: V5 API
%% {api, [],
%% [t_crud_rule_api,
%% t_list_actions_api,
%% t_show_action_api,
%% t_crud_resources_api,
%% t_list_resource_types_api,
%% t_show_resource_type_api
%% ]},
{cli, [],
[t_rules_cli,
t_actions_cli,

View File

@ -44,8 +44,6 @@
, get_telemetry_data/0
]).
-import(minirest, [return/1]).
%%--------------------------------------------------------------------
%% CLI
%%--------------------------------------------------------------------
@ -129,3 +127,6 @@ rpc_call(Node, Module, Fun, Args) ->
{badrpc, Reason} -> {error, Reason};
Result -> Result
end.
%% TODO: V5 API
return(_) -> ok.

View File

@ -162,7 +162,7 @@ spec:
{{ end }}
readinessProbe:
httpGet:
path: /status
path: /api/v5/status
port: {{ .Values.emqxConfig.EMQX_MANAGEMENT__LISTENER__HTTP | default 8081 }}
initialDelaySeconds: 5
periodSeconds: 5

View File

@ -51,7 +51,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.6"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.1"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}