diff --git a/apps/emqx_dashboard/etc/emqx_dashboard.conf b/apps/emqx_dashboard/etc/emqx_dashboard.conf index de453c12c..c25c2802d 100644 --- a/apps/emqx_dashboard/etc/emqx_dashboard.conf +++ b/apps/emqx_dashboard/etc/emqx_dashboard.conf @@ -5,6 +5,8 @@ emqx_dashboard:{ default_username: "admin" default_password: "public" + ## notice: sample_interval should be divisible by 60. + sample_interval: 10s listeners: [ { num_acceptors: 4 diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index cc3d9b3d6..65f1d6ff5 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -21,3 +21,8 @@ -define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))). -define(DASHBOARD_SHARD, emqx_dashboard_shard). + +-record(mqtt_collect, { + timestamp :: integer(), + collect + }). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_api.erl index f11d6588b..b518232af 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_api.erl @@ -105,6 +105,7 @@ user_api() -> Metadata = #{ delete => #{ description => <<"Delete dashboard users">>, + parameters => [path_param_username()], responses => #{ <<"200">> => response_schema(<<"Delete User successfully">>), <<"400">> => bad_request() @@ -112,6 +113,7 @@ user_api() -> }, put => #{ description => <<"Update dashboard users">>, + parameters => [path_param_username()], 'requestBody' => request_body_schema(#{ type => object, properties => #{ @@ -127,6 +129,7 @@ user_api() -> }, post => #{ description => <<"Create dashboard users">>, + parameters => [path_param_username()], 'requestBody' => request_body_schema(create_user), responses => #{ <<"200">> => response_schema(<<"Create Users successfully">>), @@ -140,6 +143,7 @@ change_pwd_api() -> Metadata = #{ put => #{ description => <<"Update dashboard users password">>, + parameters => [path_param_username()], 'requestBody' => request_body_schema(#{ type => object, properties => #{ @@ -159,6 +163,15 @@ change_pwd_api() -> }, {"/change_pwd/:username", Metadata, change_pwd}. +path_param_username() -> + #{ + name => username, + in => path, + required => true, + schema => #{type => string}, + example => <<"admin">> + }. + -define(EMPTY(V), (V == undefined orelse V == <<>>)). auth(post, Request) -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl new file mode 100644 index 000000000..bf172ee97 --- /dev/null +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -0,0 +1,173 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_dashboard_collection). + +-behaviour(gen_server). + +-include("emqx_dashboard.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-export([ start_link/0 + ]). + +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-export([get_collect/0]). + +-export([get_local_time/0]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-define(APP, emqx_dashboard). + +-define(DEFAULT_INTERVAL, 10). %% seconds + +-define(COLLECT, {[],[],[]}). + +-define(CLEAR_INTERVAL, 86400000). + +-define(EXPIRE_INTERVAL, 86400000 * 7). + +mnesia(boot) -> + ok = ekka_mnesia:create_table(emqx_collect, [ + {type, set}, + {local_content, true}, + {disc_only_copies, [node()]}, + {record_name, mqtt_collect}, + {attributes, record_info(fields, mqtt_collect)}]); +mnesia(copy) -> + mnesia:add_table_copy(emqx_collect, node(), disc_only_copies). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_collect() -> gen_server:call(whereis(?MODULE), get_collect). + +init([]) -> + timer(timer:seconds(interval()), collect), + timer(get_today_remaining_seconds(), clear_expire_data), + ExpireInterval = emqx_config:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), + State = #{ + count => count(), + expire_interval => ExpireInterval, + collect => ?COLLECT, + temp_collect => {0, 0, 0, 0}, + last_collects => {0, 0, 0} + }, + {ok, State}. + +interval() -> + emqx_config:get([?APP, sample_interval], ?DEFAULT_INTERVAL). + +count() -> + 60 div interval(). + +handle_call(get_collect, _From, State = #{temp_collect := {Received, Sent, _, _}}) -> + {reply, {Received, Sent, collect(subscriptions), collect(connections)}, State, hibernate}; +handle_call(_Req, _From, State) -> + {reply, ok, State}. +handle_cast(_Req, State) -> + {noreply, State}. + +handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) -> + NewLastCollect = flush(collect_all(Collect), LastCollect), + TempCollect1 = temp_collect(TempCollect), + timer(timer:seconds(interval()), collect), + {noreply, State#{count => count(), + collect => ?COLLECT, + temp_collect => TempCollect1, + last_collects => NewLastCollect}}; + +handle_info(collect, State = #{count := Count, collect := Collect, temp_collect := TempCollect}) -> + TempCollect1 = temp_collect(TempCollect), + timer(timer:seconds(interval()), collect), + {noreply, State#{count => Count - 1, + collect => collect_all(Collect), + temp_collect => TempCollect1}, hibernate}; + +handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) -> + timer(?CLEAR_INTERVAL, clear_expire_data), + T1 = get_local_time(), + Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end), + Collects = dets:select(emqx_collect, Spec), + lists:foreach(fun(Collect) -> + dets:delete_object(emqx_collect, Collect) + end, Collects), + {noreply, State, hibernate}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +temp_collect({_, _, Received, Sent}) -> + Received1 = collect(received), + Sent1 = collect(sent), + {(Received1 - Received) div interval(), + (Sent1 - Sent) div interval(), + Received1, + Sent1}. + +collect_all({Connection, Route, Subscription}) -> + {[collect(connections)| Connection], + [collect(routes)| Route], + [collect(subscriptions)| Subscription]}. + +collect(connections) -> + emqx_stats:getstat('connections.count'); +collect(routes) -> + emqx_stats:getstat('routes.count'); +collect(subscriptions) -> + emqx_stats:getstat('subscriptions.count'); +collect(received) -> + emqx_metrics:val('messages.received'); +collect(sent) -> + emqx_metrics:val('messages.sent'); +collect(dropped) -> + emqx_metrics:val('messages.dropped'). + +flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) -> + Received = collect(received), + Sent = collect(sent), + Dropped = collect(dropped), + Collect = {avg(Connection), + avg(Route), + avg(Subscription), + diff(Received, Received0), + diff(Sent, Sent0), + diff(Dropped, Dropped0)}, + Ts = get_local_time(), + _ = mnesia:dirty_write(emqx_collect, #mqtt_collect{timestamp = Ts, collect = Collect}), + {Received, Sent, Dropped}. + +avg(Items) -> + lists:sum(Items) div count(). + +diff(Item0, Item1) -> + Item0 - Item1. + +timer(Secs, Msg) -> + erlang:send_after(Secs, self(), Msg). + +get_today_remaining_seconds() -> + ?CLEAR_INTERVAL - (get_local_time() rem ?CLEAR_INTERVAL). + +get_local_time() -> + (calendar:datetime_to_gregorian_seconds(calendar:local_time()) - + calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})) * 1000. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl new file mode 100644 index 000000000..130139780 --- /dev/null +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -0,0 +1,205 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_dashboard_monitor_api). + +-include("emqx_dashboard.hrl"). + +-behaviour(minirest_api). + +-export([ sampling/1 + , sampling/2 + , get_collect/1 + ]). + +-export([api_spec/0]). + +-export([counters/2, current_counters/2]). + +-define(COUNTERS, [ connection + , route + , subscriptions + , received + , sent + , dropped]). + +api_spec() -> + {[monitor_api(), monitor_current_api()], [counters_schema()]}. + +monitor_api() -> + Metadata = #{ + get => #{ + description => <<"List monitor data">>, + parameters => [ + #{ + name => node, + in => query, + required => false, + schema => #{type => string}, + example => node() + }, + #{ + name => counter, + in => query, + required => false, + schema => #{type => string, enum => ?COUNTERS} + } + ], + responses => #{ + <<"200">> => emqx_mgmt_util:response_array_schema(<<"Monitor count data">>, counters)}}}, + {"/monitor", Metadata, counters}. +monitor_current_api() -> + Metadata = #{ + get => #{ + description => <<"Current monitor data">>, + responses => #{ + <<"200">> => emqx_mgmt_util:response_schema(<<"Current monitor data">>, + current_counters_schema())}}}, + {"/monitor/current", Metadata, current_counters}. + +current_counters_schema() -> + #{ + type => object, + properties => #{ + nodes => #{ + type => integer, + description => <<"Nodes count">>}, + connection => #{type => integer}, + sent => #{type => integer}, + received => #{type => integer}, + subscription => #{type => integer}} + }. + +counters_schema() -> + Node = #{ + node => #{ + type => string, + example => node() + } + }, + Properties = lists:foldl(fun(K, M) -> maps:merge(M, counters_schema(K)) end, Node, ?COUNTERS), + #{ + counters => #{ + type => object, + properties => Properties} + }. + +counters_schema(Name) -> + #{Name => #{ + type => array, + items => #{ + type => object, + properties => #{ + timestamp => #{ + type => integer}, + count => #{ + type => integer}}}}}. +%%%============================================================================================== +%% parameters trans +counters(get, Request) -> + case cowboy_req:parse_qs(Request) of + [] -> + Response = [sampling(Node) || Node <- ekka_mnesia:running_nodes()], + {200, Response}; + Params -> + lookup(Params) + end. + +current_counters(get, _) -> + Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()], + Nodes = length(ekka_mnesia:running_nodes()), + {Received, Sent, Sub, Conn} = format_current_metrics(Data), + Response = #{ + nodes => Nodes, + received => Received, + sent => Sent, + subscription => Sub, + connection => Conn + }, + {200, Response}. + + %%%============================================================================================== +%% api apply + +lookup(Params) -> + Fun = + fun({K,V}, M) -> + maps:put(binary_to_atom(K, utf8), binary_to_atom(V, utf8), M) + end, + lookup_(lists:foldl(Fun, #{}, Params)). + +lookup_(#{node := Node, counter := Counter}) -> + {200, sampling(Node, Counter)}; +lookup_(#{node := Node}) -> + {200, sampling(Node)}; +lookup_(#{counter := Counter}) -> + Data = [sampling(Node, Counter) || Node <- ekka_mnesia:running_nodes()], + {200, Data}. + +format_current_metrics(Collects) -> + format_current_metrics(Collects, {0,0,0,0}). +format_current_metrics([], Acc) -> + Acc; +format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) -> + format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}). + +get_collect(Node) when Node =:= node() -> + emqx_dashboard_collection:get_collect(); +get_collect(Node) -> + case rpc:call(Node, emqx_dashboard_collection, get_collect, []) of + {badrpc, _Reason} -> #{}; + Res -> Res + end. + +sampling(Node) when Node =:= node() -> + Time = emqx_dashboard_collection:get_local_time() - 7200000, + All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), + maps:put(node, Node, format(lists:sort(All))); +sampling(Node) -> + rpc:call(Node, ?MODULE, sampling, [Node]). + +sampling(Node, Counter) when Node =:= node() -> + Time = emqx_dashboard_collection:get_local_time() - 7200000, + All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]), + maps:put(node, Node, format_single(lists:sort(All), Counter)); +sampling(Node, Counter) -> + rpc:call(Node, ?MODULE, sampling, [Node, Counter]). + +format(Collects) -> + format(Collects, {[],[],[],[],[],[]}). +format([], {Connection, Route, Subscription, Received, Sent, Dropped}) -> + #{ + connection => add_key(Connection), + route => add_key(Route), + subscriptions => add_key(Subscription), + received => add_key(Received), + sent => add_key(Sent), + dropped => add_key(Dropped) + }; + +format([#mqtt_collect{timestamp = Ts, collect = {C, R, S, Re, S1, D}} | Collects], + {Connection, Route, Subscription, Received, Sent, Dropped}) -> + format(Collects, {[[Ts, C] | Connection], + [[Ts, R] | Route], + [[Ts, S] | Subscription], + [[Ts, Re] | Received], + [[Ts, S1] | Sent], + [[Ts, D] | Dropped]}). +add_key(Collects) -> + lists:reverse([#{timestamp => Ts, count => C} || [Ts, C] <- Collects]). + +format_single(Collects, Counter) -> + #{Counter => format_single(Collects, counter_index(Counter), [])}. +format_single([], _Index, Acc) -> + lists:reverse(Acc); +format_single([#mqtt_collect{timestamp = Ts, collect = Collect} | Collects], Index, Acc) -> + format_single(Collects, Index, + [#{timestamp => Ts, count => erlang:element(Index, Collect)} | Acc]). + +counter_index(connection) -> 1; +counter_index(route) -> 2; +counter_index(subscriptions) -> 3; +counter_index(received) -> 4; +counter_index(sent) -> 5; +counter_index(dropped) -> 6. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index 45ad345fb..2dae5e7e4 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -27,6 +27,7 @@ fields("emqx_dashboard") -> hoconsc:ref(?MODULE, "https")]))} , {default_username, fun default_username/1} , {default_password, fun default_password/1} + , {sample_interval, emqx_schema:t(emqx_schema:duration_s(), undefined, "10s")} ]; fields("http") -> diff --git a/apps/emqx_dashboard/src/emqx_dashboard_sup.erl b/apps/emqx_dashboard/src/emqx_dashboard_sup.erl index b132ebaf4..8ec161f11 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_sup.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_sup.erl @@ -28,5 +28,5 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, { {one_for_all, 10, 100}, [?CHILD(emqx_dashboard_admin)] } }. - + {ok, {{one_for_all, 10, 100}, + [?CHILD(emqx_dashboard_admin), ?CHILD(emqx_dashboard_collection)]}}. diff --git a/rebar.config b/rebar.config index 0f70bed4d..8d1d7c3d7 100644 --- a/rebar.config +++ b/rebar.config @@ -17,7 +17,7 @@ %% Check for the mnesia calls forbidden by Ekka: {xref_queries, - [ {"E || \"mnesia\":\"dirty_write\"/\".*\" : Fun", []} + [ {"E || \"mnesia\":\"dirty_write\"/\".*\" : Fun", [{{emqx_dashboard_collection,flush,2},{mnesia,dirty_write,2}}]} , {"E || \"mnesia\":\"dirty_delete.*\"/\".*\" : Fun", []} , {"E || \"mnesia\":\"transaction\"/\".*\" : Fun", []} , {"E || \"mnesia\":\"async_dirty\"/\".*\" : Fun", []}