style: format `emqx_modules`

This commit is contained in:
Thales Macedo Garitezi 2022-03-22 09:04:30 -03:00
parent 650683ac19
commit 4d3157743e
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
23 changed files with 1686 additions and 1150 deletions

View File

@ -1,5 +1,5 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{deps, {deps, [{emqx, {path, "../emqx"}}]}.
[ {emqx, {path, "../emqx"}}
]}. {project_plugins, [erlfmt]}.

View File

@ -27,33 +27,36 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-export([ start_link/0 -export([
, on_message_publish/1 start_link/0,
]). on_message_publish/1
]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([
, handle_call/3 init/1,
, handle_cast/2 handle_call/3,
, handle_info/2 handle_cast/2,
, terminate/2 handle_info/2,
, code_change/3 terminate/2,
]). code_change/3
]).
%% gen_server callbacks %% gen_server callbacks
-export([ enable/0 -export([
, disable/0 enable/0,
, set_max_delayed_messages/1 disable/0,
, update_config/1 set_max_delayed_messages/1,
, list/1 update_config/1,
, get_delayed_message/1 list/1,
, get_delayed_message/2 get_delayed_message/1,
, delete_delayed_message/1 get_delayed_message/2,
, delete_delayed_message/2 delete_delayed_message/1,
, post_config_update/5 delete_delayed_message/2,
, cluster_list/1 post_config_update/5,
, cluster_query/4 cluster_list/1,
]). cluster_query/4
]).
-export([format_delayed/1]). -export([format_delayed/1]).
@ -64,13 +67,13 @@
-export_type([with_id_return/0, with_id_return/1]). -export_type([with_id_return/0, with_id_return/1]).
-type state() :: #{
-type state() :: #{ publish_timer := maybe(timer:tref()) publish_timer := maybe(timer:tref()),
, publish_at := non_neg_integer() publish_at := non_neg_integer(),
, stats_timer := maybe(reference()) stats_timer := maybe(reference()),
, stats_fun := maybe(fun((pos_integer()) -> ok)) stats_fun := maybe(fun((pos_integer()) -> ok)),
, max_delayed_messages := non_neg_integer() max_delayed_messages := non_neg_integer()
}. }.
%% sync ms with record change %% sync ms with record change
-define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]). -define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]).
@ -86,40 +89,42 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = mria:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, ordered_set}, {type, ordered_set},
{storage, disc_copies}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, delayed_message}, {record_name, delayed_message},
{attributes, record_info(fields, delayed_message)}]). {attributes, record_info(fields, delayed_message)}
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Hooks %% Hooks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
on_message_publish(Msg = #message{ on_message_publish(
id = Id, Msg = #message{
topic = <<"$delayed/", Topic/binary>>, id = Id,
timestamp = Ts topic = <<"$delayed/", Topic/binary>>,
}) -> timestamp = Ts
}
) ->
[Delay, Topic1] = binary:split(Topic, <<"/">>), [Delay, Topic1] = binary:split(Topic, <<"/">>),
{PubAt, Delayed} = case binary_to_integer(Delay) of {PubAt, Delayed} =
Interval when Interval < ?MAX_INTERVAL -> case binary_to_integer(Delay) of
{Interval + erlang:round(Ts / 1000), Interval}; Interval when Interval < ?MAX_INTERVAL ->
Timestamp -> {Interval + erlang:round(Ts / 1000), Interval};
%% Check malicious timestamp? Timestamp ->
case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of %% Check malicious timestamp?
true -> error(invalid_delayed_timestamp); case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of
false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)} true -> error(invalid_delayed_timestamp);
end false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)}
end, end
end,
PubMsg = Msg#message{topic = Topic1}, PubMsg = Msg#message{topic = Topic1},
Headers = PubMsg#message.headers, Headers = PubMsg#message.headers,
case store(#delayed_message{key = {PubAt, Id}, delayed = Delayed, msg = PubMsg}) of case store(#delayed_message{key = {PubAt, Id}, delayed = Delayed, msg = PubMsg}) of
ok -> ok; ok -> ok;
{error, Error} -> {error, Error} -> ?SLOG(error, #{msg => "store_delayed_message_fail", error => Error})
?SLOG(error, #{msg => "store_delayed_message_fail", error => Error})
end, end,
{stop, PubMsg#message{headers = Headers#{allow_publish => false}}}; {stop, PubMsg#message{headers = Headers#{allow_publish => false}}};
on_message_publish(Msg) -> on_message_publish(Msg) ->
{ok, Msg}. {ok, Msg}.
@ -127,12 +132,12 @@ on_message_publish(Msg) ->
%% Start delayed publish server %% Start delayed publish server
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(start_link() -> emqx_types:startlink_ret()). -spec start_link() -> emqx_types:startlink_ret().
start_link() -> start_link() ->
Opts = emqx_conf:get([delayed], #{}), Opts = emqx_conf:get([delayed], #{}),
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
-spec(store(delayed_message()) -> ok | {error, atom()}). -spec store(delayed_message()) -> ok | {error, atom()}.
store(DelayedMsg) -> store(DelayedMsg) ->
gen_server:call(?SERVER, {store, DelayedMsg}, infinity). gen_server:call(?SERVER, {store, DelayedMsg}, infinity).
@ -158,13 +163,21 @@ cluster_query(Table, _QsSpec, Continuation, Limit) ->
format_delayed(Delayed) -> format_delayed(Delayed) ->
format_delayed(Delayed, false). format_delayed(Delayed, false).
format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed, format_delayed(
msg = #message{topic = Topic, #delayed_message{
from = From, key = {ExpectTimeStamp, Id},
headers = Headers, delayed = Delayed,
qos = Qos, msg = #message{
timestamp = PublishTimeStamp, topic = Topic,
payload = Payload}}, WithPayload) -> from = From,
headers = Headers,
qos = Qos,
timestamp = PublishTimeStamp,
payload = Payload
}
},
WithPayload
) ->
PublishTime = to_rfc3339(PublishTimeStamp div 1000), PublishTime = to_rfc3339(PublishTimeStamp div 1000),
ExpectTime = to_rfc3339(ExpectTimeStamp), ExpectTime = to_rfc3339(ExpectTimeStamp),
RemainingTime = ExpectTimeStamp - erlang:system_time(second), RemainingTime = ExpectTimeStamp - erlang:system_time(second),
@ -202,7 +215,6 @@ get_delayed_message(Id) ->
get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Node, Id) when Node =:= node() ->
get_delayed_message(Id); get_delayed_message(Id);
get_delayed_message(Node, Id) -> get_delayed_message(Node, Id) ->
emqx_delayed_proto_v1:get_delayed_message(Node, Id). emqx_delayed_proto_v1:get_delayed_message(Node, Id).
@ -249,24 +261,32 @@ init([Opts]) ->
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
emqx_conf:add_handler([delayed], ?MODULE), emqx_conf:add_handler([delayed], ?MODULE),
MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0), MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0),
{ok, ensure_stats_event( {ok,
ensure_publish_timer(#{publish_timer => undefined, ensure_stats_event(
publish_at => 0, ensure_publish_timer(#{
stats_timer => undefined, publish_timer => undefined,
stats_fun => undefined, publish_at => 0,
max_delayed_messages => MaxDelayedMessages}))}. stats_timer => undefined,
stats_fun => undefined,
max_delayed_messages => MaxDelayedMessages
})
)}.
handle_call({set_max_delayed_messages, Max}, _From, State) -> handle_call({set_max_delayed_messages, Max}, _From, State) ->
{reply, ok, State#{max_delayed_messages => Max}}; {reply, ok, State#{max_delayed_messages => Max}};
handle_call(
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, {store, DelayedMsg = #delayed_message{key = Key}},
_From, State = #{max_delayed_messages := 0}) -> _From,
State = #{max_delayed_messages := 0}
) ->
ok = mria:dirty_write(?TAB, DelayedMsg), ok = mria:dirty_write(?TAB, DelayedMsg),
emqx_metrics:inc('messages.delayed'), emqx_metrics:inc('messages.delayed'),
{reply, ok, ensure_publish_timer(Key, State)}; {reply, ok, ensure_publish_timer(Key, State)};
handle_call(
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, {store, DelayedMsg = #delayed_message{key = Key}},
_From, State = #{max_delayed_messages := Max}) -> _From,
State = #{max_delayed_messages := Max}
) ->
Size = mnesia:table_info(?TAB, size), Size = mnesia:table_info(?TAB, size),
case Size >= Max of case Size >= Max of
true -> true ->
@ -276,15 +296,12 @@ handle_call({store, DelayedMsg = #delayed_message{key = Key}},
emqx_metrics:inc('messages.delayed'), emqx_metrics:inc('messages.delayed'),
{reply, ok, ensure_publish_timer(Key, State)} {reply, ok, ensure_publish_timer(Key, State)}
end; end;
handle_call(enable, _From, State) -> handle_call(enable, _From, State) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
{reply, ok, State}; {reply, ok, State};
handle_call(disable, _From, State) -> handle_call(disable, _From, State) ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
@ -298,12 +315,10 @@ handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) ->
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys), lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys),
{noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})}; {noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})};
handle_info(stats, State = #{stats_fun := StatsFun}) -> handle_info(stats, State = #{stats_fun := StatsFun}) ->
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
StatsFun(delayed_count()), StatsFun(delayed_count()),
{noreply, State#{stats_timer := StatsTimer}, hibernate}; {noreply, State#{stats_timer := StatsTimer}, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}. {noreply, State}.
@ -336,8 +351,9 @@ ensure_publish_timer('$end_of_table', State) ->
State#{publish_timer := undefined, publish_at := 0}; State#{publish_timer := undefined, publish_at := 0};
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) -> ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) ->
ensure_publish_timer(Ts, os:system_time(seconds), State); ensure_publish_timer(Ts, os:system_time(seconds), State);
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when
when Ts < PubAt -> Ts < PubAt
->
ok = emqx_misc:cancel_timer(TRef), ok = emqx_misc:cancel_timer(TRef),
ensure_publish_timer(Ts, os:system_time(seconds), State); ensure_publish_timer(Ts, os:system_time(seconds), State);
ensure_publish_timer(_Key, State) -> ensure_publish_timer(_Key, State) ->
@ -359,10 +375,9 @@ do_publish({Ts, _Id}, Now, Acc) when Ts > Now ->
do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
case mnesia:dirty_read(?TAB, Key) of case mnesia:dirty_read(?TAB, Key) of
[] -> ok; [] -> ok;
[#delayed_message{msg = Msg}] -> [#delayed_message{msg = Msg}] -> emqx_pool:async_submit(fun emqx:publish/1, [Msg])
emqx_pool:async_submit(fun emqx:publish/1, [Msg])
end, end,
do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]). do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]).
-spec(delayed_count() -> non_neg_integer()). -spec delayed_count() -> non_neg_integer().
delayed_count() -> mnesia:table_info(?TAB, size). delayed_count() -> mnesia:table_info(?TAB, size).

View File

@ -26,14 +26,17 @@
-define(MAX_PAYLOAD_LENGTH, 2048). -define(MAX_PAYLOAD_LENGTH, 2048).
-define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
-export([ status/2 -export([
, delayed_messages/2 status/2,
, delayed_message/2 delayed_messages/2,
]). delayed_message/2
]).
-export([ paths/0 -export([
, fields/1 paths/0,
, schema/1]). fields/1,
schema/1
]).
%% for rpc %% for rpc
-export([update_config_/1]). -export([update_config_/1]).
@ -46,15 +49,17 @@
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
-define(INVALID_NODE, 'INVALID_NODE'). -define(INVALID_NODE, 'INVALID_NODE').
-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 %% 1MB = 1024 x 1024
-define(MAX_PAYLOAD_SIZE, 1048576).
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE). emqx_dashboard_swagger:spec(?MODULE).
paths() -> paths() ->
[ "/mqtt/delayed" [
, "/mqtt/delayed/messages" "/mqtt/delayed",
, "/mqtt/delayed/messages/:node/:msgid" "/mqtt/delayed/messages",
"/mqtt/delayed/messages/:node/:msgid"
]. ].
schema("/mqtt/delayed") -> schema("/mqtt/delayed") ->
@ -73,47 +78,64 @@ schema("/mqtt/delayed") ->
description => <<"Enable or disable delayed, set max delayed messages">>, description => <<"Enable or disable delayed, set max delayed messages">>,
'requestBody' => ref(emqx_modules_schema, "delayed"), 'requestBody' => ref(emqx_modules_schema, "delayed"),
responses => #{ responses => #{
200 => mk(ref(emqx_modules_schema, "delayed"), 200 => mk(
#{desc => <<"Enable or disable delayed successfully">>}), ref(emqx_modules_schema, "delayed"),
400 => emqx_dashboard_swagger:error_codes( [?BAD_REQUEST] #{desc => <<"Enable or disable delayed successfully">>}
, <<"Max limit illegality">>) ),
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST],
<<"Max limit illegality">>
)
} }
} }
}; };
schema("/mqtt/delayed/messages/:node/:msgid") -> schema("/mqtt/delayed/messages/:node/:msgid") ->
#{'operationId' => delayed_message, #{
'operationId' => delayed_message,
get => #{ get => #{
tags => ?API_TAG_MQTT, tags => ?API_TAG_MQTT,
description => <<"Get delayed message">>, description => <<"Get delayed message">>,
parameters => [ {node, parameters => [
mk(binary(), {node,
#{in => path, desc => <<"The node where message from">>})} mk(
, {msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})} binary(),
], #{in => path, desc => <<"The node where message from">>}
)},
{msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})}
],
responses => #{ responses => #{
200 => ref("message_without_payload"), 200 => ref("message_without_payload"),
400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR] 400 => emqx_dashboard_swagger:error_codes(
, <<"Bad MsgId format">>), [?MESSAGE_ID_SCHEMA_ERROR],
404 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_NOT_FOUND] <<"Bad MsgId format">>
, <<"MsgId not found">>) ),
404 => emqx_dashboard_swagger:error_codes(
[?MESSAGE_ID_NOT_FOUND],
<<"MsgId not found">>
)
} }
}, },
delete => #{ delete => #{
tags => ?API_TAG_MQTT, tags => ?API_TAG_MQTT,
description => <<"Delete delayed message">>, description => <<"Delete delayed message">>,
parameters => [ {node, parameters => [
mk(binary(), {node,
#{in => path, desc => <<"The node where message from">>})} mk(
, {msgid, binary(),
mk(binary(), #{in => path, desc => <<"Delay message ID">>})} #{in => path, desc => <<"The node where message from">>}
], )},
{msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})}
],
responses => #{ responses => #{
204 => <<"Delete delayed message success">>, 204 => <<"Delete delayed message success">>,
400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR] 400 => emqx_dashboard_swagger:error_codes(
, <<"Bad MsgId format">>), [?MESSAGE_ID_SCHEMA_ERROR],
404 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_NOT_FOUND] <<"Bad MsgId format">>
, <<"MsgId not found">>) ),
404 => emqx_dashboard_swagger:error_codes(
[?MESSAGE_ID_NOT_FOUND],
<<"MsgId not found">>
)
} }
} }
}; };
@ -126,44 +148,44 @@ schema("/mqtt/delayed/messages") ->
parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)], parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
responses => #{ responses => #{
200 => 200 =>
[ [
{data, mk(hoconsc:array(ref("message")), #{})}, {data, mk(hoconsc:array(ref("message")), #{})},
{meta, [ {meta, [
{page, mk(integer(), #{})}, {page, mk(integer(), #{})},
{limit, mk(integer(), #{})}, {limit, mk(integer(), #{})},
{count, mk(integer(), #{})} {count, mk(integer(), #{})}
]} ]}
] ]
} }
} }
}. }.
fields("message_without_payload") -> fields("message_without_payload") ->
[ [
{msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})}, {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})},
{node, mk(binary(), #{desc => <<"The node where message from">>})}, {node, mk(binary(), #{desc => <<"The node where message from">>})},
{publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})}, {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})},
{delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})}, {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})},
{delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})}, {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})},
{expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})}, {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})},
{topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})}, {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})},
{qos, mk(binary(), #{desc => <<"QoS">>})}, {qos, mk(binary(), #{desc => <<"QoS">>})},
{from_clientid, mk(binary(), #{desc => <<"From ClientId">>})}, {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})},
{from_username, mk(binary(), #{desc => <<"From Username">>})} {from_username, mk(binary(), #{desc => <<"From Username">>})}
]; ];
fields("message") -> fields("message") ->
PayloadDesc = io_lib:format( PayloadDesc = io_lib:format(
"Payload, base64 encode. Payload will be ~p if length large than ~p", "Payload, base64 encode. Payload will be ~p if length large than ~p",
[?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]), [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]
),
fields("message_without_payload") ++ fields("message_without_payload") ++
[{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}]. [{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% HTTP API %% HTTP API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
status(get, _Params) -> status(get, _Params) ->
{200, get_status()}; {200, get_status()};
status(put, #{body := Body}) -> status(put, #{body := Body}) ->
update_config(Body). update_config(Body).
@ -173,34 +195,37 @@ delayed_messages(get, #{query_string := Qs}) ->
delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) -> delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1), MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1), MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
with_maybe([MaybeNode, MaybeId], with_maybe(
fun(Node, Id) -> [MaybeNode, MaybeId],
case emqx_delayed:get_delayed_message(Node, Id) of fun(Node, Id) ->
{ok, Message} -> case emqx_delayed:get_delayed_message(Node, Id) of
Payload = maps:get(payload, Message), {ok, Message} ->
case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of Payload = maps:get(payload, Message),
true -> case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of
{200, Message}; true ->
_ -> {200, Message};
{200, Message#{payload => base64:encode(Payload)}} _ ->
end; {200, Message#{payload => base64:encode(Payload)}}
{error, not_found} -> end;
{404, generate_http_code_map(not_found, Id)} {error, not_found} ->
end {404, generate_http_code_map(not_found, Id)}
end); end
end
);
delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) -> delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1), MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1),
MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1), MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1),
with_maybe([MaybeNode, MaybeId], with_maybe(
fun(Node, Id) -> [MaybeNode, MaybeId],
case emqx_delayed:delete_delayed_message(Node, Id) of fun(Node, Id) ->
ok -> case emqx_delayed:delete_delayed_message(Node, Id) of
{204}; ok ->
{error, not_found} -> {204};
{404, generate_http_code_map(not_found, Id)} {error, not_found} ->
end {404, generate_http_code_map(not_found, Id)}
end). end
end
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% internal function %% internal function
@ -241,25 +266,36 @@ update_config_(Config) ->
{200, NewDelayed}; {200, NewDelayed};
{error, Reason} -> {error, Reason} ->
Message = list_to_binary( Message = list_to_binary(
io_lib:format("Update config failed ~p", [Reason])), io_lib:format("Update config failed ~p", [Reason])
),
{500, ?INTERNAL_ERROR, Message} {500, ?INTERNAL_ERROR, Message}
end. end.
generate_http_code_map(id_schema_error, Id) -> generate_http_code_map(id_schema_error, Id) ->
#{code => ?MESSAGE_ID_SCHEMA_ERROR, message => #{
iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))}; code => ?MESSAGE_ID_SCHEMA_ERROR,
message =>
iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))
};
generate_http_code_map(not_found, Id) -> generate_http_code_map(not_found, Id) ->
#{code => ?MESSAGE_ID_NOT_FOUND, message => #{
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}; code => ?MESSAGE_ID_NOT_FOUND,
message =>
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))
};
generate_http_code_map(invalid_node, Node) -> generate_http_code_map(invalid_node, Node) ->
#{code => ?INVALID_NODE, message => #{
iolist_to_binary(io_lib:format("The node name ~p is invalid", [Node]))}. code => ?INVALID_NODE,
message =>
iolist_to_binary(io_lib:format("The node name ~p is invalid", [Node]))
}.
make_maybe(X, Error, Fun) -> make_maybe(X, Error, Fun) ->
try Fun(X) of try Fun(X) of
Right -> Right ->
Right Right
catch _:_ -> catch
_:_ ->
{left, X, Error} {left, X, Error}
end. end.

View File

@ -20,22 +20,24 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_modules.hrl"). -include("emqx_modules.hrl").
-export([ list/0 -export([
, update/1 list/0,
, enable/0 update/1,
, disable/0 enable/0,
, post_config_update/5 disable/0,
, init_conf_handler/0 post_config_update/5,
]). init_conf_handler/0
]).
-export([ on_client_connected/2 -export([
, on_client_disconnected/3 on_client_connected/2,
, on_client_subscribed/3 on_client_disconnected/3,
, on_client_unsubscribed/3 on_client_subscribed/3,
, on_message_dropped/3 on_client_unsubscribed/3,
, on_message_delivered/2 on_message_dropped/3,
, on_message_acked/2 on_message_delivered/2,
]). on_message_acked/2
]).
-ifdef(TEST). -ifdef(TEST).
-export([reason/1]). -export([reason/1]).
@ -48,9 +50,13 @@ list() ->
emqx_conf:get([event_message], #{}). emqx_conf:get([event_message], #{}).
update(Params) -> update(Params) ->
case emqx_conf:update([event_message], case
Params, emqx_conf:update(
#{rawconf_with_defaults => true, override_to => cluster}) of [event_message],
Params,
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewEventMessage}} -> {ok, #{raw_config := NewEventMessage}} ->
{ok, NewEventMessage}; {ok, NewEventMessage};
{error, Reason} -> {error, Reason} ->
@ -74,46 +80,61 @@ disable() ->
on_client_connected(ClientInfo, ConnInfo) -> on_client_connected(ClientInfo, ConnInfo) ->
Payload0 = common_infos(ClientInfo, ConnInfo), Payload0 = common_infos(ClientInfo, ConnInfo),
Payload = Payload0#{ Payload = Payload0#{
keepalive => maps:get(keepalive, ConnInfo, 0), keepalive => maps:get(keepalive, ConnInfo, 0),
clean_start => maps:get(clean_start, ConnInfo, true), clean_start => maps:get(clean_start, ConnInfo, true),
expiry_interval => maps:get(expiry_interval, ConnInfo, 0) expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
}, },
publish_event_msg(<<"$event/client_connected">>, Payload). publish_event_msg(<<"$event/client_connected">>, Payload).
on_client_disconnected(ClientInfo, on_client_disconnected(
Reason, ConnInfo = #{disconnected_at := DisconnectedAt}) -> ClientInfo,
Reason,
ConnInfo = #{disconnected_at := DisconnectedAt}
) ->
Payload0 = common_infos(ClientInfo, ConnInfo), Payload0 = common_infos(ClientInfo, ConnInfo),
Payload = Payload0#{ Payload = Payload0#{
reason => reason(Reason), reason => reason(Reason),
disconnected_at => DisconnectedAt disconnected_at => DisconnectedAt
}, },
publish_event_msg(<<"$event/client_disconnected">>, Payload). publish_event_msg(<<"$event/client_disconnected">>, Payload).
on_client_subscribed(_ClientInfo = #{clientid := ClientId, on_client_subscribed(
username := Username}, _ClientInfo = #{
Topic, SubOpts) -> clientid := ClientId,
Payload = #{clientid => ClientId, username := Username
username => Username, },
topic => Topic, Topic,
subopts => SubOpts, SubOpts
ts => erlang:system_time(millisecond) ) ->
}, Payload = #{
clientid => ClientId,
username => Username,
topic => Topic,
subopts => SubOpts,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/client_subscribed">>, Payload). publish_event_msg(<<"$event/client_subscribed">>, Payload).
on_client_unsubscribed(_ClientInfo = #{clientid := ClientId, on_client_unsubscribed(
username := Username}, _ClientInfo = #{
Topic, _SubOpts) -> clientid := ClientId,
Payload = #{clientid => ClientId, username := Username
username => Username, },
topic => Topic, Topic,
ts => erlang:system_time(millisecond) _SubOpts
}, ) ->
Payload = #{
clientid => ClientId,
username => Username,
topic => Topic,
ts => erlang:system_time(millisecond)
},
publish_event_msg(<<"$event/client_unsubscribed">>, Payload). publish_event_msg(<<"$event/client_unsubscribed">>, Payload).
on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
case ignore_sys_message(Message) of case ignore_sys_message(Message) of
true -> ok; true ->
ok;
false -> false ->
Payload0 = base_message(Message), Payload0 = base_message(Message),
Payload = Payload0#{ Payload = Payload0#{
@ -126,13 +147,17 @@ on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
end, end,
{ok, Message}. {ok, Message}.
on_message_delivered(_ClientInfo = #{ on_message_delivered(
peerhost := PeerHost, _ClientInfo = #{
clientid := ReceiverCId, peerhost := PeerHost,
username := ReceiverUsername}, clientid := ReceiverCId,
#message{from = ClientId} = Message) -> username := ReceiverUsername
},
#message{from = ClientId} = Message
) ->
case ignore_sys_message(Message) of case ignore_sys_message(Message) of
true -> ok; true ->
ok;
false -> false ->
Payload0 = base_message(Message), Payload0 = base_message(Message),
Payload = Payload0#{ Payload = Payload0#{
@ -146,13 +171,17 @@ on_message_delivered(_ClientInfo = #{
end, end,
{ok, Message}. {ok, Message}.
on_message_acked(_ClientInfo = #{ on_message_acked(
peerhost := PeerHost, _ClientInfo = #{
clientid := ReceiverCId, peerhost := PeerHost,
username := ReceiverUsername}, clientid := ReceiverCId,
#message{from = ClientId} = Message) -> username := ReceiverUsername
},
#message{from = ClientId} = Message
) ->
case ignore_sys_message(Message) of case ignore_sys_message(Message) of
true -> ok; true ->
ok;
false -> false ->
Payload0 = base_message(Message), Payload0 = base_message(Message),
Payload = Payload0#{ Payload = Payload0#{
@ -170,29 +199,36 @@ on_message_acked(_ClientInfo = #{
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
common_infos( common_infos(
_ClientInfo = #{clientid := ClientId, _ClientInfo = #{
username := Username, clientid := ClientId,
peerhost := PeerHost, username := Username,
sockport := SockPort peerhost := PeerHost,
}, sockport := SockPort
_ConnInfo = #{proto_name := ProtoName, },
proto_ver := ProtoVer, _ConnInfo = #{
connected_at := ConnectedAt proto_name := ProtoName,
}) -> proto_ver := ProtoVer,
#{clientid => ClientId, connected_at := ConnectedAt
username => Username, }
ipaddress => ntoa(PeerHost), ) ->
sockport => SockPort, #{
proto_name => ProtoName, clientid => ClientId,
proto_ver => ProtoVer, username => Username,
connected_at => ConnectedAt, ipaddress => ntoa(PeerHost),
ts => erlang:system_time(millisecond) sockport => SockPort,
}. proto_name => ProtoName,
proto_ver => ProtoVer,
connected_at => ConnectedAt,
ts => erlang:system_time(millisecond)
}.
make_msg(Topic, Payload) -> make_msg(Topic, Payload) ->
emqx_message:set_flag( emqx_message:set_flag(
sys, emqx_message:make( sys,
?MODULE, 0, Topic, iolist_to_binary(Payload))). emqx_message:make(
?MODULE, 0, Topic, iolist_to_binary(Payload)
)
).
-compile({inline, [reason/1]}). -compile({inline, [reason/1]}).
reason(Reason) when is_atom(Reason) -> Reason; reason(Reason) when is_atom(Reason) -> Reason;
@ -201,26 +237,33 @@ reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error. reason(_) -> internal_error.
ntoa(undefined) -> undefined; ntoa(undefined) -> undefined;
ntoa({IpAddr, Port}) -> ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
ntoa(IpAddr) ->
iolist_to_binary(inet:ntoa(IpAddr)).
printable_maps(undefined) -> #{}; printable_maps(undefined) ->
#{};
printable_maps(Headers) -> printable_maps(Headers) ->
maps:fold( maps:fold(
fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname -> fun
(K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname ->
AccIn#{K => ntoa(V0)}; AccIn#{K => ntoa(V0)};
('User-Property', V0, AccIn) when is_list(V0) -> ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{ AccIn#{
'User-Property' => maps:from_list(V0), 'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{ 'User-Property-Pairs' => [
key => Key, #{
value => Value key => Key,
} || {Key, Value} <- V0] value => Value
}
|| {Key, Value} <- V0
]
}; };
(K, V0, AccIn) -> AccIn#{K => V0} (K, V0, AccIn) ->
end, #{}, Headers). AccIn#{K => V0}
end,
#{},
Headers
).
base_message(Message) -> base_message(Message) ->
#message{ #message{
@ -230,7 +273,8 @@ base_message(Message) ->
topic = Topic, topic = Topic,
headers = Headers, headers = Headers,
payload = Payload, payload = Payload,
timestamp = Timestamp} = Message, timestamp = Timestamp
} = Message,
#{ #{
id => emqx_guid:to_hexstr(Id), id => emqx_guid:to_hexstr(Id),
payload => Payload, payload => Payload,

View File

@ -21,10 +21,11 @@
-import(hoconsc, [mk/2, ref/2]). -import(hoconsc, [mk/2, ref/2]).
-export([ api_spec/0 -export([
, paths/0 api_spec/0,
, schema/1 paths/0,
]). schema/1
]).
-export([event_message/2]). -export([event_message/2]).
@ -35,28 +36,30 @@ paths() ->
["/mqtt/event_message"]. ["/mqtt/event_message"].
schema("/mqtt/event_message") -> schema("/mqtt/event_message") ->
#{ 'operationId' => event_message #{
, get => 'operationId' => event_message,
#{ description => <<"Event Message">> get =>
, tags => ?API_TAG_MQTT #{
, responses => description => <<"Event Message">>,
#{200 => status_schema(<<"Get Event Message config successfully">>)} tags => ?API_TAG_MQTT,
responses =>
#{200 => status_schema(<<"Get Event Message config successfully">>)}
},
put =>
#{
description => <<"Update Event Message">>,
tags => ?API_TAG_MQTT,
'requestBody' => status_schema(<<"Update Event Message config">>),
responses =>
#{200 => status_schema(<<"Update Event Message config successfully">>)}
} }
, put => }.
#{ description => <<"Update Event Message">>
, tags => ?API_TAG_MQTT
, 'requestBody' => status_schema(<<"Update Event Message config">>)
, responses =>
#{200 => status_schema(<<"Update Event Message config successfully">>)}
}
}.
status_schema(Desc) -> status_schema(Desc) ->
mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}). mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}).
event_message(get, _Params) -> event_message(get, _Params) ->
{200, emqx_event_message:list()}; {200, emqx_event_message:list()};
event_message(put, #{body := Body}) -> event_message(put, #{body := Body}) ->
case emqx_event_message:update(Body) of case emqx_event_message:update(Body) of
{ok, NewConfig} -> {ok, NewConfig} ->

View File

@ -1,10 +1,10 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_modules, {application, emqx_modules, [
[{description, "EMQX Modules"}, {description, "EMQX Modules"},
{vsn, "5.0.0"}, {vsn, "5.0.0"},
{modules, []}, {modules, []},
{applications, [kernel,stdlib,emqx]}, {applications, [kernel, stdlib, emqx]},
{mod, {emqx_modules_app, []}}, {mod, {emqx_modules_app, []}},
{registered, [emqx_modules_sup]}, {registered, [emqx_modules_sup]},
{env, []} {env, []}
]}. ]}.

View File

@ -18,9 +18,10 @@
-behaviour(application). -behaviour(application).
-export([ start/2 -export([
, stop/1 start/2,
]). stop/1
]).
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_modules_sup:start_link(), {ok, Sup} = emqx_modules_sup:start_link(),

View File

@ -20,20 +20,23 @@
-behaviour(emqx_config_handler). -behaviour(emqx_config_handler).
%% Load/Unload %% Load/Unload
-export([ load/0 -export([
, unload/0 load/0,
]). unload/0
]).
%% topci-metrics %% topci-metrics
-export([ topic_metrics/0 -export([
, add_topic_metrics/1 topic_metrics/0,
, remove_topic_metrics/1 add_topic_metrics/1,
]). remove_topic_metrics/1
]).
%% config handlers %% config handlers
-export([ pre_config_update/3 -export([
, post_config_update/5 pre_config_update/3,
]). post_config_update/5
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Load/Unload %% Load/Unload
@ -52,22 +55,22 @@ unload() ->
-spec topic_metrics() -> [emqx_types:topic()]. -spec topic_metrics() -> [emqx_types:topic()].
topic_metrics() -> topic_metrics() ->
lists:map( lists:map(
fun(#{topic := Topic}) -> Topic end, fun(#{topic := Topic}) -> Topic end,
emqx:get_config([topic_metrics]) emqx:get_config([topic_metrics])
). ).
-spec add_topic_metrics(emqx_types:topic()) -spec add_topic_metrics(emqx_types:topic()) ->
-> {ok, emqx_types:topic()} {ok, emqx_types:topic()}
| {error, term()}. | {error, term()}.
add_topic_metrics(Topic) -> add_topic_metrics(Topic) ->
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
{ok, _} -> {ok, Topic}; {ok, _} -> {ok, Topic};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
-spec remove_topic_metrics(emqx_types:topic()) -spec remove_topic_metrics(emqx_types:topic()) ->
-> ok ok
| {error, term()}. | {error, term()}.
remove_topic_metrics(Topic) -> remove_topic_metrics(Topic) ->
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
{ok, _} -> ok; {ok, _} -> ok;
@ -75,10 +78,13 @@ remove_topic_metrics(Topic) ->
end. end.
cfg_update(topic_metrics, Action, Params) -> cfg_update(topic_metrics, Action, Params) ->
res(emqx_conf:update( res(
[topic_metrics], emqx_conf:update(
{Action, Params}, [topic_metrics],
#{override_to => cluster})). {Action, Params},
#{override_to => cluster}
)
).
res({ok, Result}) -> {ok, Result}; res({ok, Result}) -> {ok, Result};
res({error, {pre_config_update, ?MODULE, Reason}}) -> {error, Reason}; res({error, {pre_config_update, ?MODULE, Reason}}) -> {error, Reason};
@ -89,9 +95,11 @@ res({error, Reason}) -> {error, Reason}.
%% Config Handler %% Config Handler
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec pre_config_update(list(atom()), -spec pre_config_update(
emqx_config:update_request(), list(atom()),
emqx_config:raw_config()) -> emqx_config:update_request(),
emqx_config:raw_config()
) ->
{ok, emqx_config:update_request()} | {error, term()}. {ok, emqx_config:update_request()} | {error, term()}.
pre_config_update(_, {add_topic_metrics, Topic0}, RawConf) -> pre_config_update(_, {add_topic_metrics, Topic0}, RawConf) ->
Topic = #{<<"topic">> => Topic0}, Topic = #{<<"topic">> => Topic0},
@ -110,21 +118,33 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
{error, not_found} {error, not_found}
end. end.
-spec post_config_update(list(atom()), -spec post_config_update(
emqx_config:update_request(), list(atom()),
emqx_config:config(), emqx_config:update_request(),
emqx_config:config(), emqx_config:app_envs()) emqx_config:config(),
-> ok | {ok, Result::any()} | {error, Reason::term()}. emqx_config:config(),
emqx_config:app_envs()
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
post_config_update(_, {add_topic_metrics, Topic}, post_config_update(
_NewConfig, _OldConfig, _AppEnvs) -> _,
{add_topic_metrics, Topic},
_NewConfig,
_OldConfig,
_AppEnvs
) ->
case emqx_topic_metrics:register(Topic) of case emqx_topic_metrics:register(Topic) of
ok -> ok; ok -> ok;
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end; end;
post_config_update(
post_config_update(_, {remove_topic_metrics, Topic}, _,
_NewConfig, _OldConfig, _AppEnvs) -> {remove_topic_metrics, Topic},
_NewConfig,
_OldConfig,
_AppEnvs
) ->
case emqx_topic_metrics:deregister(Topic) of case emqx_topic_metrics:deregister(Topic) of
ok -> ok; ok -> ok;
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}

View File

@ -20,82 +20,121 @@
-behaviour(hocon_schema). -behaviour(hocon_schema).
-export([ namespace/0 -export([
, roots/0 namespace/0,
, fields/1]). roots/0,
fields/1
]).
namespace() -> modules. namespace() -> modules.
roots() -> roots() ->
["delayed", [
"telemetry", "delayed",
"event_message", "telemetry",
array("rewrite"), "event_message",
array("topic_metrics")]. array("rewrite"),
array("topic_metrics")
].
fields("telemetry") -> fields("telemetry") ->
[ {enable, hoconsc:mk(boolean(), #{default => false})} [{enable, hoconsc:mk(boolean(), #{default => false})}];
];
fields("delayed") -> fields("delayed") ->
[ {enable, hoconsc:mk(boolean(), #{default => false})} [
, {max_delayed_messages, sc(integer(), #{})} {enable, hoconsc:mk(boolean(), #{default => false})},
{max_delayed_messages, sc(integer(), #{})}
]; ];
fields("rewrite") -> fields("rewrite") ->
[ { action [
, sc( hoconsc:enum([subscribe, publish, all]) {action,
, #{desc => <<"Action">>, example => publish})} sc(
, { source_topic hoconsc:enum([subscribe, publish, all]),
, sc( binary() #{desc => <<"Action">>, example => publish}
, #{desc => <<"Origin Topic">>, example => "x/#"})} )},
, { dest_topic {source_topic,
, sc( binary() sc(
, #{desc => <<"Destination Topic">>, example => "z/y/$1"})} binary(),
, { re, fun regular_expression/1 } #{desc => <<"Origin Topic">>, example => "x/#"}
)},
{dest_topic,
sc(
binary(),
#{desc => <<"Destination Topic">>, example => "z/y/$1"}
)},
{re, fun regular_expression/1}
]; ];
fields("event_message") -> fields("event_message") ->
Fields = Fields =
[ { client_connected [
, sc( boolean() {client_connected,
, #{desc => <<"Enable/disable client_connected event messages">>, sc(
default => false})} boolean(),
, { client_disconnected #{
, sc(boolean() desc => <<"Enable/disable client_connected event messages">>,
, #{desc => <<"Enable/disable client_disconnected event messages">>, default => false
default => false})} }
, { client_subscribed )},
, sc( boolean() {client_disconnected,
, #{desc => <<"Enable/disable client_subscribed event messages">>, sc(
default => false})} boolean(),
, { client_unsubscribed #{
, sc( boolean() desc => <<"Enable/disable client_disconnected event messages">>,
, #{desc => <<"Enable/disable client_unsubscribed event messages">>, default => false
default => false})} }
, { message_delivered )},
, sc( boolean() {client_subscribed,
, #{desc => <<"Enable/disable message_delivered event messages">>, sc(
default => false})} boolean(),
, { message_acked #{
, sc( boolean() desc => <<"Enable/disable client_subscribed event messages">>,
, #{desc => <<"Enable/disable message_acked event messages">>, default => false
default => false})} }
, { message_dropped )},
, sc( boolean() {client_unsubscribed,
, #{desc => <<"Enable/disable message_dropped event messages">>, sc(
default => false})} boolean(),
#{
desc => <<"Enable/disable client_unsubscribed event messages">>,
default => false
}
)},
{message_delivered,
sc(
boolean(),
#{
desc => <<"Enable/disable message_delivered event messages">>,
default => false
}
)},
{message_acked,
sc(
boolean(),
#{
desc => <<"Enable/disable message_acked event messages">>,
default => false
}
)},
{message_dropped,
sc(
boolean(),
#{
desc => <<"Enable/disable message_dropped event messages">>,
default => false
}
)}
], ],
#{fields => Fields, #{
desc => """ fields => Fields,
Enable/Disable system event messages. desc =>
The messages are published to '$event' prefixed topics. ""
For example, if `client_disconnected` is set to `true`, "\n"
a message is published to `$event/client_connected` topic "Enable/Disable system event messages.\n"
whenever a client is connected. "The messages are published to '$event' prefixed topics.\n"
"""}; "For example, if `client_disconnected` is set to `true`,\n"
"a message is published to `$event/client_connected` topic\n"
"whenever a client is connected.\n"
""
};
fields("topic_metrics") -> fields("topic_metrics") ->
[{topic, sc(binary(), #{})}]. [{topic, sc(binary(), #{})}].

View File

@ -23,12 +23,14 @@
-export([init/1]). -export([init/1]).
%% Helper macro for declaring children of supervisor %% Helper macro for declaring children of supervisor
-define(CHILD(Mod), #{id => Mod, -define(CHILD(Mod), #{
start => {Mod, start_link, []}, id => Mod,
restart => permanent, start => {Mod, start_link, []},
shutdown => 5000, restart => permanent,
type => worker, shutdown => 5000,
modules => [Mod]}). type => worker,
modules => [Mod]
}).
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -38,8 +40,10 @@ start_link() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
emqx_event_message:init_conf_handler(), emqx_event_message:init_conf_handler(),
{ok, {{one_for_one, 10, 3600}, {ok,
[ ?CHILD(emqx_telemetry) {{one_for_one, 10, 3600}, [
, ?CHILD(emqx_topic_metrics) ?CHILD(emqx_telemetry),
, ?CHILD(emqx_trace) ?CHILD(emqx_topic_metrics),
, ?CHILD(emqx_delayed)]}}. ?CHILD(emqx_trace),
?CHILD(emqx_delayed)
]}}.

View File

@ -16,13 +16,13 @@
-module(emqx_observer_cli). -module(emqx_observer_cli).
-export([ enable/0 -export([
, disable/0 enable/0,
]). disable/0
]).
-export([cmd/1]). -export([cmd/1]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% enable/disable %% enable/disable
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -34,20 +34,19 @@ disable() ->
cmd(["status"]) -> cmd(["status"]) ->
observer_cli:start(); observer_cli:start();
cmd(["bin_leak"]) -> cmd(["bin_leak"]) ->
[emqx_ctl:print("~p~n", [Row]) || Row <- recon:bin_leak(100)]; [emqx_ctl:print("~p~n", [Row]) || Row <- recon:bin_leak(100)];
cmd(["load", Mod]) -> cmd(["load", Mod]) ->
Module = list_to_existing_atom(Mod), Module = list_to_existing_atom(Mod),
Nodes = nodes(), Nodes = nodes(),
Res = remote_load(Nodes, Module), Res = remote_load(Nodes, Module),
emqx_ctl:print("Loaded ~p module on ~p on ~n", [Mod, Nodes, Res]); emqx_ctl:print("Loaded ~p module on ~p on ~n", [Mod, Nodes, Res]);
cmd(_) -> cmd(_) ->
emqx_ctl:usage([{"observer status", "observer_cli:start()"}, emqx_ctl:usage([
{"observer bin_leak", "recon:bin_leak(100)"}, {"observer status", "observer_cli:start()"},
{"observer load Mod", "recon:remote_load(Mod) to all nodes"}]). {"observer bin_leak", "recon:bin_leak(100)"},
{"observer load Mod", "recon:remote_load(Mod) to all nodes"}
]).
%% recon:remote_load/1 has a bug, when nodes() returns [], it is %% recon:remote_load/1 has a bug, when nodes() returns [], it is
%% taken by recon as a node name. %% taken by recon as a node name.

View File

@ -21,25 +21,29 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-ifdef(TEST). -ifdef(TEST).
-export([ compile/1 -export([
, match_and_rewrite/2 compile/1,
]). match_and_rewrite/2
]).
-endif. -endif.
%% APIs %% APIs
-export([ rewrite_subscribe/4 -export([
, rewrite_unsubscribe/4 rewrite_subscribe/4,
, rewrite_publish/2 rewrite_unsubscribe/4,
]). rewrite_publish/2
]).
-export([ enable/0 -export([
, disable/0 enable/0,
]). disable/0
]).
-export([ list/0 -export([
, update/1 list/0,
, post_config_update/5 update/1,
]). post_config_update/5
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Load/Unload %% Load/Unload
@ -65,14 +69,16 @@ update(Rules0) ->
post_config_update(_KeyPath, _Config, Rules, _OldConf, _AppEnvs) -> post_config_update(_KeyPath, _Config, Rules, _OldConf, _AppEnvs) ->
register_hook(Rules). register_hook(Rules).
register_hook([]) -> unregister_hook(); register_hook([]) ->
unregister_hook();
register_hook(Rules) -> register_hook(Rules) ->
{PubRules, SubRules, ErrRules} = compile(Rules), {PubRules, SubRules, ErrRules} = compile(Rules),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}), emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}),
case ErrRules of case ErrRules of
[] -> ok; [] ->
ok;
_ -> _ ->
?SLOG(error, #{rewrite_rule_re_complie_failed => ErrRules}), ?SLOG(error, #{rewrite_rule_re_complie_failed => ErrRules}),
{error, ErrRules} {error, ErrRules}
@ -96,39 +102,54 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
compile(Rules) -> compile(Rules) ->
lists:foldl(fun(Rule, {Publish, Subscribe, Error}) -> lists:foldl(
#{source_topic := Topic, re := Re, dest_topic := Dest, action := Action} = Rule, fun(Rule, {Publish, Subscribe, Error}) ->
case re:compile(Re) of #{source_topic := Topic, re := Re, dest_topic := Dest, action := Action} = Rule,
{ok, MP} -> case re:compile(Re) of
case Action of {ok, MP} ->
publish -> case Action of
{[{Topic, MP, Dest} | Publish], Subscribe, Error}; publish ->
subscribe -> {[{Topic, MP, Dest} | Publish], Subscribe, Error};
{Publish, [{Topic, MP, Dest} | Subscribe], Error}; subscribe ->
all -> {Publish, [{Topic, MP, Dest} | Subscribe], Error};
{[{Topic, MP, Dest} | Publish], [{Topic, MP, Dest} | Subscribe], Error} all ->
end; {[{Topic, MP, Dest} | Publish], [{Topic, MP, Dest} | Subscribe], Error}
{error, ErrSpec} -> end;
{Publish, Subscribe, [{Topic, Re, Dest, ErrSpec}]} {error, ErrSpec} ->
end end, {[], [], []}, Rules). {Publish, Subscribe, [{Topic, Re, Dest, ErrSpec}]}
end
end,
{[], [], []},
Rules
).
match_and_rewrite(Topic, []) -> match_and_rewrite(Topic, []) ->
Topic; Topic;
match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules]) -> match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules]) ->
case emqx_topic:match(Topic, Filter) of case emqx_topic:match(Topic, Filter) of
true -> rewrite(Topic, MP, Dest); true -> rewrite(Topic, MP, Dest);
false -> match_and_rewrite(Topic, Rules) false -> match_and_rewrite(Topic, Rules)
end. end.
rewrite(Topic, MP, Dest) -> rewrite(Topic, MP, Dest) ->
case re:run(Topic, MP, [{capture, all_but_first, list}]) of case re:run(Topic, MP, [{capture, all_but_first, list}]) of
{match, Captured} -> {match, Captured} ->
Vars = lists:zip(["\\$" ++ integer_to_list(I) Vars = lists:zip(
|| I <- lists:seq(1, length(Captured))], Captured), [
iolist_to_binary(lists:foldl( "\\$" ++ integer_to_list(I)
|| I <- lists:seq(1, length(Captured))
],
Captured
),
iolist_to_binary(
lists:foldl(
fun({Var, Val}, Acc) -> fun({Var, Val}, Acc) ->
re:replace(Acc, Var, Val, [global]) re:replace(Acc, Var, Val, [global])
end, Dest, Vars)); end,
nomatch -> Topic Dest,
Vars
)
);
nomatch ->
Topic
end. end.

View File

@ -40,27 +40,36 @@ schema("/mqtt/topic_rewrite") ->
tags => ?API_TAG_MQTT, tags => ?API_TAG_MQTT,
description => <<"List rewrite topic.">>, description => <<"List rewrite topic.">>,
responses => #{ responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), 200 => hoconsc:mk(
#{desc => <<"List all rewrite rules">>}) hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),
#{desc => <<"List all rewrite rules">>}
)
} }
}, },
put => #{ put => #{
description => <<"Update rewrite topic">>, description => <<"Update rewrite topic">>,
tags => ?API_TAG_MQTT, tags => ?API_TAG_MQTT,
'requestBody' => hoconsc:mk(hoconsc:array( 'requestBody' => hoconsc:mk(
hoconsc:ref(emqx_modules_schema, "rewrite")),#{}), hoconsc:array(
hoconsc:ref(emqx_modules_schema, "rewrite")
),
#{}
),
responses => #{ responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), 200 => hoconsc:mk(
#{desc => <<"Update rewrite topic success.">>}), hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),
413 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], #{desc => <<"Update rewrite topic success.">>}
<<"Rules count exceed max limit">>) ),
413 => emqx_dashboard_swagger:error_codes(
[?EXCEED_LIMIT],
<<"Rules count exceed max limit">>
)
} }
} }
}. }.
topic_rewrite(get, _Params) -> topic_rewrite(get, _Params) ->
{200, emqx_rewrite:list()}; {200, emqx_rewrite:list()};
topic_rewrite(put, #{body := Body}) -> topic_rewrite(put, #{body := Body}) ->
case length(Body) < ?MAX_RULES_LIMIT of case length(Body) < ?MAX_RULES_LIMIT of
true -> true ->
@ -68,6 +77,7 @@ topic_rewrite(put, #{body := Body}) ->
{200, emqx_rewrite:list()}; {200, emqx_rewrite:list()};
_ -> _ ->
Message = iolist_to_binary( Message = iolist_to_binary(
io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])), io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])
),
{413, #{code => ?EXCEED_LIMIT, message => Message}} {413, #{code => ?EXCEED_LIMIT, message => Message}}
end. end.

View File

@ -26,28 +26,32 @@
-include("emqx_modules.hrl"). -include("emqx_modules.hrl").
-export([ start_link/0 -export([
, stop/0 start_link/0,
]). stop/0
]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([
, handle_call/3 init/1,
, handle_cast/2 handle_call/3,
, handle_continue/2 handle_cast/2,
, handle_info/2 handle_continue/2,
, terminate/2 handle_info/2,
, code_change/3 terminate/2,
]). code_change/3
]).
-export([ enable/0 -export([
, disable/0 enable/0,
]). disable/0
]).
-export([ get_uuid/0 -export([
, get_telemetry/0 get_uuid/0,
, get_status/0 get_telemetry/0,
]). get_status/0
]).
-export([official_version/1]). -export([official_version/1]).
@ -59,9 +63,10 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-endif. -endif.
-import(proplists, [ get_value/2 -import(proplists, [
, get_value/3 get_value/2,
]). get_value/3
]).
-record(telemetry, { -record(telemetry, {
id :: non_neg_integer(), id :: non_neg_integer(),
@ -88,12 +93,16 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_link() -> start_link() ->
ok = mria:create_table(?TELEMETRY, ok = mria:create_table(
[{type, set}, ?TELEMETRY,
{storage, disc_copies}, [
{local_content, true}, {type, set},
{record_name, telemetry}, {storage, disc_copies},
{attributes, record_info(fields, telemetry)}]), {local_content, true},
{record_name, telemetry},
{attributes, record_info(fields, telemetry)}
]
),
_ = mria:wait_for_tables([?TELEMETRY]), _ = mria:wait_for_tables([?TELEMETRY]),
Opts = emqx:get_config([telemetry], #{}), Opts = emqx:get_config([telemetry], #{}),
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
@ -127,18 +136,23 @@ get_telemetry() ->
%% is very small, it should be safe to ignore. %% is very small, it should be safe to ignore.
-dialyzer([{nowarn_function, [init/1]}]). -dialyzer([{nowarn_function, [init/1]}]).
init(_Opts) -> init(_Opts) ->
UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of UUID1 =
[] -> case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of
UUID = generate_uuid(), [] ->
mria:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, UUID = generate_uuid(),
uuid = UUID}), mria:dirty_write(?TELEMETRY, #telemetry{
UUID; id = ?UNIQUE_ID,
[#telemetry{uuid = UUID} | _] -> uuid = UUID
UUID }),
end, UUID;
{ok, #state{url = ?TELEMETRY_URL, [#telemetry{uuid = UUID} | _] ->
report_interval = timer:seconds(?REPORT_INTERVAL), UUID
uuid = UUID1}}. end,
{ok, #state{
url = ?TELEMETRY_URL,
report_interval = timer:seconds(?REPORT_INTERVAL),
uuid = UUID1
}}.
handle_call(enable, _From, State) -> handle_call(enable, _From, State) ->
case ?MODULE:official_version(emqx_app:get_release()) of case ?MODULE:official_version(emqx_app:get_release()) of
@ -148,7 +162,6 @@ handle_call(enable, _From, State) ->
false -> false ->
{reply, {error, not_official_version}, State} {reply, {error, not_official_version}, State}
end; end;
handle_call(disable, _From, State = #state{timer = Timer}) -> handle_call(disable, _From, State = #state{timer = Timer}) ->
case ?MODULE:official_version(emqx_app:get_release()) of case ?MODULE:official_version(emqx_app:get_release()) of
true -> true ->
@ -157,13 +170,10 @@ handle_call(disable, _From, State = #state{timer = Timer}) ->
false -> false ->
{reply, {error, not_official_version}, State} {reply, {error, not_official_version}, State}
end; end;
handle_call(get_uuid, _From, State = #state{uuid = UUID}) -> handle_call(get_uuid, _From, State = #state{uuid = UUID}) ->
{reply, {ok, UUID}, State}; {reply, {ok, UUID}, State};
handle_call(get_telemetry, _From, State) -> handle_call(get_telemetry, _From, State) ->
{reply, {ok, get_telemetry(State)}, State}; {reply, {ok, get_telemetry(State)}, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
@ -182,7 +192,6 @@ handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer
false -> ok false -> ok
end, end,
{noreply, ensure_report_timer(State)}; {noreply, ensure_report_timer(State)};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}. {noreply, State}.
@ -206,22 +215,35 @@ ensure_report_timer(State = #state{report_interval = ReportInterval}) ->
os_info() -> os_info() ->
case erlang:system_info(os_type) of case erlang:system_info(os_type) of
{unix,darwin} -> {unix, darwin} ->
[Name | _] = string:tokens(os:cmd("sw_vers -productName"), "\n"), [Name | _] = string:tokens(os:cmd("sw_vers -productName"), "\n"),
[Version | _] = string:tokens(os:cmd("sw_vers -productVersion"), "\n"), [Version | _] = string:tokens(os:cmd("sw_vers -productVersion"), "\n"),
[{os_name, Name}, [
{os_version, Version}]; {os_name, Name},
{os_version, Version}
];
{unix, _} -> {unix, _} ->
case file:read_file("/etc/os-release") of case file:read_file("/etc/os-release") of
{error, _} -> {error, _} ->
[{os_name, "Unknown"}, [
{os_version, "Unknown"}]; {os_name, "Unknown"},
{os_version, "Unknown"}
];
{ok, FileContent} -> {ok, FileContent} ->
OSInfo = parse_os_release(FileContent), OSInfo = parse_os_release(FileContent),
[{os_name, get_value("NAME", OSInfo)}, [
{os_version, get_value("VERSION", OSInfo, {os_name, get_value("NAME", OSInfo)},
get_value("VERSION_ID", OSInfo, {os_version,
get_value("PRETTY_NAME", OSInfo)))}] get_value(
"VERSION",
OSInfo,
get_value(
"VERSION_ID",
OSInfo,
get_value("PRETTY_NAME", OSInfo)
)
)}
]
end; end;
{win32, nt} -> {win32, nt} ->
Ver = os:cmd("ver"), Ver = os:cmd("ver"),
@ -231,11 +253,15 @@ os_info() ->
{match, [Version]} = {match, [Version]} =
re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]), re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]),
[Name | _] = string:split(NVer, " [Version "), [Name | _] = string:split(NVer, " [Version "),
[{os_name, Name}, [
{os_version, Version}]; {os_name, Name},
{os_version, Version}
];
nomatch -> nomatch ->
[{os_name, "Unknown"}, [
{os_version, "Unknown"}] {os_name, "Unknown"},
{os_version, "Unknown"}
]
end end
end. end.
@ -247,22 +273,30 @@ uptime() ->
nodes_uuid() -> nodes_uuid() ->
Nodes = lists:delete(node(), mria_mnesia:running_nodes()), Nodes = lists:delete(node(), mria_mnesia:running_nodes()),
lists:foldl(fun(Node, Acc) -> lists:foldl(
case emqx_telemetry_proto_v1:get_uuid(Node) of fun(Node, Acc) ->
{badrpc, _Reason} -> case emqx_telemetry_proto_v1:get_uuid(Node) of
Acc; {badrpc, _Reason} ->
{ok, UUID} -> Acc;
[UUID | Acc] {ok, UUID} ->
end [UUID | Acc]
end, [], Nodes). end
end,
[],
Nodes
).
active_plugins() -> active_plugins() ->
lists:foldl(fun(#plugin{name = Name, active = Active}, Acc) -> lists:foldl(
case Active of fun(#plugin{name = Name, active = Active}, Acc) ->
true -> [Name | Acc]; case Active of
false -> Acc true -> [Name | Acc];
end false -> Acc
end, [], emqx_plugins:list()). end
end,
[],
emqx_plugins:list()
).
num_clients() -> num_clients() ->
emqx_stats:getstat('connections.max'). emqx_stats:getstat('connections.max').
@ -282,25 +316,31 @@ generate_uuid() ->
<<NTimeHigh:16>> = <<16#01:4, TimeHigh:12>>, <<NTimeHigh:16>> = <<16#01:4, TimeHigh:12>>,
<<NClockSeq:16>> = <<1:1, 0:1, ClockSeq:14>>, <<NClockSeq:16>> = <<1:1, 0:1, ClockSeq:14>>,
<<Node:48>> = <<First:7, 1:1, Last:40>>, <<Node:48>> = <<First:7, 1:1, Last:40>>,
list_to_binary(io_lib:format( "~.16B-~.16B-~.16B-~.16B-~.16B" list_to_binary(
, [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])). io_lib:format(
"~.16B-~.16B-~.16B-~.16B-~.16B",
[TimeLow, TimeMid, NTimeHigh, NClockSeq, Node]
)
).
get_telemetry(#state{uuid = UUID}) -> get_telemetry(#state{uuid = UUID}) ->
OSInfo = os_info(), OSInfo = os_info(),
[{emqx_version, bin(emqx_app:get_release())}, [
{license, [{edition, <<"community">>}]}, {emqx_version, bin(emqx_app:get_release())},
{os_name, bin(get_value(os_name, OSInfo))}, {license, [{edition, <<"community">>}]},
{os_version, bin(get_value(os_version, OSInfo))}, {os_name, bin(get_value(os_name, OSInfo))},
{otp_version, bin(otp_version())}, {os_version, bin(get_value(os_version, OSInfo))},
{up_time, uptime()}, {otp_version, bin(otp_version())},
{uuid, UUID}, {up_time, uptime()},
{nodes_uuid, nodes_uuid()}, {uuid, UUID},
{active_plugins, active_plugins()}, {nodes_uuid, nodes_uuid()},
{num_clients, num_clients()}, {active_plugins, active_plugins()},
{messages_received, messages_received()}, {num_clients, num_clients()},
{messages_sent, messages_sent()}, {messages_received, messages_received()},
{build_info, build_info()}, {messages_sent, messages_sent()},
{vm_specs, vm_specs()}]. {build_info, build_info()},
{vm_specs, vm_specs()}
].
report_telemetry(State = #state{url = URL}) -> report_telemetry(State = #state{url = URL}) ->
Data = get_telemetry(State), Data = get_telemetry(State),
@ -319,17 +359,21 @@ httpc_request(Method, URL, Headers, Body) ->
httpc:request(Method, {URL, Headers, "application/json", Body}, HTTPOptions, Options). httpc:request(Method, {URL, Headers, "application/json", Body}, HTTPOptions, Options).
parse_os_release(FileContent) -> parse_os_release(FileContent) ->
lists:foldl(fun(Line, Acc) -> lists:foldl(
[Var, Value] = string:tokens(Line, "="), fun(Line, Acc) ->
NValue = case Value of [Var, Value] = string:tokens(Line, "="),
_ when is_list(Value) -> NValue =
lists:nth(1, string:tokens(Value, "\"")); case Value of
_ -> _ when is_list(Value) ->
Value lists:nth(1, string:tokens(Value, "\""));
end, _ ->
[{Var, NValue} | Acc] Value
end, end,
[], string:tokens(binary:bin_to_list(FileContent), "\n")). [{Var, NValue} | Acc]
end,
[],
string:tokens(binary:bin_to_list(FileContent), "\n")
).
build_info() -> build_info() ->
case ?MODULE:read_raw_build_info() of case ?MODULE:read_raw_build_info() of
@ -342,14 +386,19 @@ build_info() ->
end. end.
read_raw_build_info() -> read_raw_build_info() ->
Filename = filename:join([code:root_dir(), "releases", Filename = filename:join([
emqx_app:get_release(), "BUILD_INFO"]), code:root_dir(),
"releases",
emqx_app:get_release(),
"BUILD_INFO"
]),
file:read_file(Filename). file:read_file(Filename).
vm_specs() -> vm_specs() ->
SysMemData = memsup:get_system_memory_data(), SysMemData = memsup:get_system_memory_data(),
[ {num_cpus, erlang:system_info(logical_processors)} [
, {total_memory, proplists:get_value(available_memory, SysMemData)} {num_cpus, erlang:system_info(logical_processors)},
{total_memory, proplists:get_value(available_memory, SysMemData)}
]. ].
bin(L) when is_list(L) -> bin(L) when is_list(L) ->

View File

@ -24,17 +24,19 @@
% -export([cli/1]). % -export([cli/1]).
-export([ status/2 -export([
, data/2 status/2,
]). data/2
]).
-export([enable_telemetry/2]). -export([enable_telemetry/2]).
-export([ api_spec/0 -export([
, paths/0 api_spec/0,
, schema/1 paths/0,
, fields/1 schema/1,
]). fields/1
]).
-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_REQUEST, 'BAD_REQUEST').
@ -42,126 +44,166 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() -> paths() ->
[ "/telemetry/status" [
, "/telemetry/data" "/telemetry/status",
"/telemetry/data"
]. ].
schema("/telemetry/status") -> schema("/telemetry/status") ->
#{ 'operationId' => status, #{
get => 'operationId' => status,
#{ description => <<"Get telemetry status">> get =>
, responses => #{
#{ 200 => status_schema(<<"Get telemetry status">>)} description => <<"Get telemetry status">>,
responses =>
#{200 => status_schema(<<"Get telemetry status">>)}
}, },
put => put =>
#{ description => <<"Enable or disable telemetry">> #{
, 'requestBody' => status_schema(<<"Enable or disable telemetry">>) description => <<"Enable or disable telemetry">>,
, responses => 'requestBody' => status_schema(<<"Enable or disable telemetry">>),
#{ 200 => status_schema(<<"Enable or disable telemetry successfully">>) responses =>
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>) #{
} 200 => status_schema(<<"Enable or disable telemetry successfully">>),
400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
}
} }
}; };
schema("/telemetry/data") -> schema("/telemetry/data") ->
#{ 'operationId' => data, #{
get => 'operationId' => data,
#{ description => <<"Get telemetry data">> get =>
, responses => #{
#{ 200 => mk(ref(?MODULE, telemetry), #{ desc => <<"Get telemetry data">>})}} description => <<"Get telemetry data">>,
}. responses =>
#{200 => mk(ref(?MODULE, telemetry), #{desc => <<"Get telemetry data">>})}
}
}.
status_schema(Desc) -> status_schema(Desc) ->
mk(ref(?MODULE, status), #{in => body, desc => Desc}). mk(ref(?MODULE, status), #{in => body, desc => Desc}).
fields(status) -> fields(status) ->
[ { enable [
, mk( boolean() {enable,
, #{ desc => <<"Telemetry status">> mk(
, default => true boolean(),
, example => false #{
}) desc => <<"Telemetry status">>,
} default => true,
example => false
}
)}
]; ];
fields(telemetry) -> fields(telemetry) ->
[ { emqx_version [
, mk( string() {emqx_version,
, #{ desc => <<"EMQX Version">> mk(
, example => <<"5.0.0-beta.3-32d1547c">> string(),
}) #{
} desc => <<"EMQX Version">>,
, { license example => <<"5.0.0-beta.3-32d1547c">>
, mk( map() }
, #{ desc => <<"EMQX License">> )},
, example => #{edition => <<"community">>} {license,
}) mk(
} map(),
, { os_name #{
, mk( string() desc => <<"EMQX License">>,
, #{ desc => <<"OS Name">> example => #{edition => <<"community">>}
, example => <<"Linux">> }
}) )},
} {os_name,
, { os_version mk(
, mk( string() string(),
, #{ desc => <<"OS Version">> #{
, example => <<"20.04">> desc => <<"OS Name">>,
}) example => <<"Linux">>
} }
, { otp_version )},
, mk( string() {os_version,
, #{ desc => <<"Erlang/OTP Version">> mk(
, example => <<"24">> string(),
}) #{
} desc => <<"OS Version">>,
, { up_time example => <<"20.04">>
, mk( integer() }
, #{ desc => <<"EMQX Runtime">> )},
, example => 20220113 {otp_version,
}) mk(
} string(),
, { uuid #{
, mk( string() desc => <<"Erlang/OTP Version">>,
, #{ desc => <<"EMQX UUID">> example => <<"24">>
, example => <<"AAAAAAAA-BBBB-CCCC-2022-DDDDEEEEFFF">> }
}) )},
} {up_time,
, { nodes_uuid mk(
, mk( array(binary()) integer(),
, #{ desc => <<"EMQX Cluster Nodes UUID">> #{
, example => [ <<"AAAAAAAA-BBBB-CCCC-2022-DDDDEEEEFFF">> desc => <<"EMQX Runtime">>,
, <<"ZZZZZZZZ-CCCC-BBBB-2022-DDDDEEEEFFF">>] example => 20220113
}) }
} )},
, { active_plugins {uuid,
, mk( array(binary()) mk(
, #{ desc => <<"EMQX Active Plugins">> string(),
, example => [<<"Plugin A">>, <<"Plugin B">>] #{
}) desc => <<"EMQX UUID">>,
} example => <<"AAAAAAAA-BBBB-CCCC-2022-DDDDEEEEFFF">>
, { active_modules }
, mk( array(binary()) )},
, #{ desc => <<"EMQX Active Modules">> {nodes_uuid,
, example => [<<"Module A">>, <<"Module B">>] mk(
}) array(binary()),
} #{
, { num_clients desc => <<"EMQX Cluster Nodes UUID">>,
, mk( integer() example => [
, #{ desc => <<"EMQX Current Connections">> <<"AAAAAAAA-BBBB-CCCC-2022-DDDDEEEEFFF">>,
, example => 20220113 <<"ZZZZZZZZ-CCCC-BBBB-2022-DDDDEEEEFFF">>
}) ]
} }
, { messages_received )},
, mk( integer() {active_plugins,
, #{ desc => <<"EMQX Current Received Message">> mk(
, example => 2022 array(binary()),
}) #{
} desc => <<"EMQX Active Plugins">>,
, { messages_sent example => [<<"Plugin A">>, <<"Plugin B">>]
, mk( integer() }
, #{ desc => <<"EMQX Current Sent Message">> )},
, example => 2022 {active_modules,
}) mk(
} array(binary()),
#{
desc => <<"EMQX Active Modules">>,
example => [<<"Module A">>, <<"Module B">>]
}
)},
{num_clients,
mk(
integer(),
#{
desc => <<"EMQX Current Connections">>,
example => 20220113
}
)},
{messages_received,
mk(
integer(),
#{
desc => <<"EMQX Current Received Message">>,
example => 2022
}
)},
{messages_sent,
mk(
integer(),
#{
desc => <<"EMQX Current Sent Message">>,
example => 2022
}
)}
]. ].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -169,15 +211,15 @@ fields(telemetry) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
status(get, _Params) -> status(get, _Params) ->
{200, get_telemetry_status()}; {200, get_telemetry_status()};
status(put, #{body := Body}) -> status(put, #{body := Body}) ->
Enable = maps:get(<<"enable">>, Body), Enable = maps:get(<<"enable">>, Body),
case Enable =:= emqx_telemetry:get_status() of case Enable =:= emqx_telemetry:get_status() of
true -> true ->
Reason = case Enable of Reason =
true -> <<"Telemetry status is already enabled">>; case Enable of
false -> <<"Telemetry status is already disable">> true -> <<"Telemetry status is already enabled">>;
end, false -> <<"Telemetry status is already disable">>
end,
{400, #{code => 'BAD_REQUEST', message => Reason}}; {400, #{code => 'BAD_REQUEST', message => Reason}};
false -> false ->
enable_telemetry(Enable), enable_telemetry(Enable),
@ -231,9 +273,12 @@ data(get, _Request) ->
%% internal function %% internal function
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
enable_telemetry(Enable) -> enable_telemetry(Enable) ->
lists:foreach(fun(Node) -> lists:foreach(
enable_telemetry(Node, Enable) fun(Node) ->
end, mria_mnesia:running_nodes()). enable_telemetry(Node, Enable)
end,
mria_mnesia:running_nodes()
).
enable_telemetry(Node, true) -> enable_telemetry(Node, true) ->
is_ok(emqx_telemetry_proto_v1:enable_telemetry(Node)); is_ok(emqx_telemetry_proto_v1:enable_telemetry(Node));

View File

@ -22,41 +22,45 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-export([
-export([ on_message_publish/1 on_message_publish/1,
, on_message_delivered/2 on_message_delivered/2,
, on_message_dropped/3 on_message_dropped/3
]). ]).
%% API functions %% API functions
-export([ start_link/0 -export([
, stop/0 start_link/0,
]). stop/0
]).
-export([ enable/0 -export([
, disable/0 enable/0,
]). disable/0
]).
-export([ max_limit/0]). -export([max_limit/0]).
-export([ metrics/0 -export([
, metrics/1 metrics/0,
, register/1 metrics/1,
, deregister/1 register/1,
, deregister_all/0 deregister/1,
, is_registered/1 deregister_all/0,
, all_registered_topics/0 is_registered/1,
, reset/0 all_registered_topics/0,
, reset/1 reset/0,
]). reset/1
]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([
, handle_call/3 init/1,
, handle_info/2 handle_call/3,
, handle_cast/2 handle_info/2,
, terminate/2 handle_cast/2,
]). terminate/2
]).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
@ -66,17 +70,17 @@
-define(MAX_TOPICS, 512). -define(MAX_TOPICS, 512).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(TOPIC_METRICS, -define(TOPIC_METRICS, [
['messages.in', 'messages.in',
'messages.out', 'messages.out',
'messages.qos0.in', 'messages.qos0.in',
'messages.qos0.out', 'messages.qos0.out',
'messages.qos1.in', 'messages.qos1.in',
'messages.qos1.out', 'messages.qos1.out',
'messages.qos2.in', 'messages.qos2.in',
'messages.qos2.out', 'messages.qos2.out',
'messages.dropped' 'messages.dropped'
]). ]).
-define(TICKING_INTERVAL, 1). -define(TICKING_INTERVAL, 1).
-define(SPEED_AVERAGE_WINDOW_SIZE, 5). -define(SPEED_AVERAGE_WINDOW_SIZE, 5).
@ -84,15 +88,15 @@
-define(SPEED_LONG_WINDOW_SIZE, 300). -define(SPEED_LONG_WINDOW_SIZE, 300).
-record(speed, { -record(speed, {
last = 0 :: number(), last = 0 :: number(),
last_v = 0 :: number(), last_v = 0 :: number(),
last_medium = 0 :: number(), last_medium = 0 :: number(),
last_long = 0 :: number() last_long = 0 :: number()
}). }).
-record(state, { -record(state, {
speeds :: #{{binary(), atom()} => #speed{}} speeds :: #{{binary(), atom()} => #speed{}}
}). }).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs
@ -217,34 +221,33 @@ handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) ->
Error -> Error ->
{reply, Error, State} {reply, Error, State}
end; end;
handle_call({deregister, all}, _From, State) -> handle_call({deregister, all}, _From, State) ->
true = ets:delete_all_objects(?TAB), true = ets:delete_all_objects(?TAB),
{reply, ok, State#state{speeds = #{}}}; {reply, ok, State#state{speeds = #{}}};
handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) -> handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) ->
case is_registered(Topic) of case is_registered(Topic) of
false -> false ->
{reply, {error, topic_not_found}, State}; {reply, {error, topic_not_found}, State};
true -> true ->
true = ets:delete(?TAB, Topic), true = ets:delete(?TAB, Topic),
NSpeeds = lists:foldl(fun(Metric, Acc) -> NSpeeds = lists:foldl(
maps:remove({Topic, Metric}, Acc) fun(Metric, Acc) ->
end, Speeds, ?TOPIC_METRICS), maps:remove({Topic, Metric}, Acc)
end,
Speeds,
?TOPIC_METRICS
),
{reply, ok, State#state{speeds = NSpeeds}} {reply, ok, State#state{speeds = NSpeeds}}
end; end;
handle_call({reset, all}, _From, State = #state{speeds = Speeds}) -> handle_call({reset, all}, _From, State = #state{speeds = Speeds}) ->
Fun = Fun =
fun(T, NSpeeds) -> fun(T, NSpeeds) ->
reset_topic(T, NSpeeds) reset_topic(T, NSpeeds)
end, end,
{reply, ok, State#state{speeds = lists:foldl(Fun, Speeds, ets:tab2list(?TAB))}}; {reply, ok, State#state{speeds = lists:foldl(Fun, Speeds, ets:tab2list(?TAB))}};
handle_call({reset, Topic}, _From, State = #state{speeds = Speeds}) -> handle_call({reset, Topic}, _From, State = #state{speeds = Speeds}) ->
NSpeeds = reset_topic(Topic, Speeds), NSpeeds = reset_topic(Topic, Speeds),
{reply, ok, State#state{speeds = NSpeeds}}; {reply, ok, State#state{speeds = NSpeeds}};
handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) -> handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) ->
case is_registered(Topic) of case is_registered(Topic) of
false -> false ->
@ -253,8 +256,8 @@ handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds})
case maps:get({Topic, Metric}, Speeds, undefined) of case maps:get({Topic, Metric}, Speeds, undefined) of
undefined -> undefined ->
{reply, {error, invalid_metric}, State}; {reply, {error, invalid_metric}, State};
#speed{last = Short, last_medium = Medium, last_long = Long} -> #speed{last = Short, last_medium = Medium, last_long = Long} ->
{reply, #{short => Short, medium => Medium, long => Long }, State} {reply, #{short => Short, medium => Medium, long => Long}, State}
end end
end. end.
@ -264,15 +267,16 @@ handle_cast(Msg, State) ->
handle_info(ticking, State = #state{speeds = Speeds}) -> handle_info(ticking, State = #state{speeds = Speeds}) ->
NSpeeds = maps:map( NSpeeds = maps:map(
fun({Topic, Metric}, Speed) -> fun({Topic, Metric}, Speed) ->
case val(Topic, Metric) of case val(Topic, Metric) of
{error, topic_not_found} -> maps:remove({Topic, Metric}, Speeds); {error, topic_not_found} -> maps:remove({Topic, Metric}, Speeds);
Val -> calculate_speed(Val, Speed) Val -> calculate_speed(Val, Speed)
end end
end, Speeds), end,
Speeds
),
erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking), erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
{noreply, State#state{speeds = NSpeeds}}; {noreply, State#state{speeds = NSpeeds}};
handle_info(Info, State) -> handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}), ?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}. {noreply, State}.
@ -309,11 +313,16 @@ do_register(Topic, Speeds) ->
ok = reset_counter(CRef), ok = reset_counter(CRef),
Data = #{ Data = #{
counter_ref => CRef, counter_ref => CRef,
create_time => CreateTime}, create_time => CreateTime
},
true = ets:insert(?TAB, {Topic, Data}), true = ets:insert(?TAB, {Topic, Data}),
NSpeeds = lists:foldl(fun(Metric, Acc) -> NSpeeds = lists:foldl(
maps:put({Topic, Metric}, #speed{}, Acc) fun(Metric, Acc) ->
end, Speeds, ?TOPIC_METRICS), maps:put({Topic, Metric}, #speed{}, Acc)
end,
Speeds,
?TOPIC_METRICS
),
{ok, NSpeeds}; {ok, NSpeeds};
{true, true} -> {true, true} ->
{error, bad_topic}; {error, bad_topic};
@ -329,9 +338,9 @@ format({Topic, Data}) ->
Fun = Fun =
fun(Key, Metrics) -> fun(Key, Metrics) ->
CounterKey = to_count(Key), CounterKey = to_count(Key),
Counter = counters:get(CRef, metric_idx(Key)), Counter = counters:get(CRef, metric_idx(Key)),
RateKey = to_rate(Key), RateKey = to_rate(Key),
Rate = emqx_rule_funcs:float(rate(Topic, Key), 4), Rate = emqx_rule_funcs:float(rate(Topic, Key), 4),
maps:put(RateKey, Rate, maps:put(CounterKey, Counter, Metrics)) maps:put(RateKey, Rate, maps:put(CounterKey, Counter, Metrics))
end, end,
Metrics = lists:foldl(Fun, #{}, ?TOPIC_METRICS), Metrics = lists:foldl(Fun, #{}, ?TOPIC_METRICS),
@ -390,17 +399,16 @@ rate(Topic, Metric) ->
{error, Reason} {error, Reason}
end. end.
metric_idx('messages.in') -> 01; metric_idx('messages.in') -> 01;
metric_idx('messages.out') -> 02; metric_idx('messages.out') -> 02;
metric_idx('messages.qos0.in') -> 03; metric_idx('messages.qos0.in') -> 03;
metric_idx('messages.qos0.out') -> 04; metric_idx('messages.qos0.out') -> 04;
metric_idx('messages.qos1.in') -> 05; metric_idx('messages.qos1.in') -> 05;
metric_idx('messages.qos1.out') -> 06; metric_idx('messages.qos1.out') -> 06;
metric_idx('messages.qos2.in') -> 07; metric_idx('messages.qos2.in') -> 07;
metric_idx('messages.qos2.out') -> 08; metric_idx('messages.qos2.out') -> 08;
metric_idx('messages.dropped') -> 09; metric_idx('messages.dropped') -> 09;
metric_idx(_) -> metric_idx(_) -> {error, invalid_metric}.
{error, invalid_metric}.
to_count('messages.in') -> to_count('messages.in') ->
'messages.in.count'; 'messages.in.count';
@ -456,7 +464,8 @@ counters_size() ->
number_of_registered_topics() -> number_of_registered_topics() ->
proplists:get_value(size, ets:info(?TAB)). proplists:get_value(size, ets:info(?TAB)).
calculate_speed(CurVal, #speed{last = Last, calculate_speed(CurVal, #speed{
last = Last,
last_v = LastVal, last_v = LastVal,
last_medium = LastMedium, last_medium = LastMedium,
last_long = LastLong last_long = LastLong

View File

@ -21,25 +21,33 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include("emqx_modules.hrl"). -include("emqx_modules.hrl").
-import( hoconsc -import(
, [ mk/2 hoconsc,
, ref/1 [
, ref/2 mk/2,
, array/1 ref/1,
, map/2]). ref/2,
array/1,
map/2
]
).
-export([ topic_metrics/2 -export([
, operate_topic_metrics/2 topic_metrics/2,
]). operate_topic_metrics/2
]).
-export([ cluster_accumulation_metrics/0 -export([
, cluster_accumulation_metrics/1]). cluster_accumulation_metrics/0,
cluster_accumulation_metrics/1
]).
-export([ api_spec/0 -export([
, paths/0 api_spec/0,
, schema/1 paths/0,
, fields/1 schema/1,
]). fields/1
]).
-define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(EXCEED_LIMIT, 'EXCEED_LIMIT').
-define(BAD_TOPIC, 'BAD_TOPIC'). -define(BAD_TOPIC, 'BAD_TOPIC').
@ -50,182 +58,306 @@ api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() -> paths() ->
[ "/mqtt/topic_metrics" [
, "/mqtt/topic_metrics/:topic" "/mqtt/topic_metrics",
"/mqtt/topic_metrics/:topic"
]. ].
schema("/mqtt/topic_metrics") -> schema("/mqtt/topic_metrics") ->
#{ 'operationId' => topic_metrics #{
, get => 'operationId' => topic_metrics,
#{ description => <<"List topic metrics">> get =>
, tags => ?API_TAG_MQTT #{
, responses => description => <<"List topic metrics">>,
#{200 => tags => ?API_TAG_MQTT,
mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List all topic metrics">>})} responses =>
#{
200 =>
mk(array(hoconsc:ref(topic_metrics)), #{
desc => <<"List all topic metrics">>
})
}
},
put =>
#{
description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">>,
tags => ?API_TAG_MQTT,
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
ref(reset),
reset_examples()
),
responses =>
#{
204 => <<"Reset topic metrics successfully">>,
404 =>
emqx_dashboard_swagger:error_codes(
[?TOPIC_NOT_FOUND], <<"Topic not found">>
)
}
},
post =>
#{
description => <<"Create topic metrics">>,
tags => ?API_TAG_MQTT,
'requestBody' => [topic(body)],
responses =>
#{
204 => <<"Create topic metrics success">>,
409 => emqx_dashboard_swagger:error_codes(
[?EXCEED_LIMIT],
<<"Topic metrics exceeded max limit 512">>
),
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST, ?BAD_TOPIC],
<<"Topic metrics already existed or bad topic">>
)
}
} }
, put => };
#{ description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">>
, tags => ?API_TAG_MQTT
, 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
ref(reset),
reset_examples())
, responses =>
#{ 204 => <<"Reset topic metrics successfully">>
, 404 =>
emqx_dashboard_swagger:error_codes(
[?TOPIC_NOT_FOUND], <<"Topic not found">>)
}
}
, post =>
#{ description => <<"Create topic metrics">>
, tags => ?API_TAG_MQTT
, 'requestBody' => [topic(body)]
, responses =>
#{ 204 => <<"Create topic metrics success">>
, 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT],
<<"Topic metrics exceeded max limit 512">>)
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC],
<<"Topic metrics already existed or bad topic">>)
}
}
};
schema("/mqtt/topic_metrics/:topic") -> schema("/mqtt/topic_metrics/:topic") ->
#{ 'operationId' => operate_topic_metrics #{
, get => 'operationId' => operate_topic_metrics,
#{ description => <<"Get topic metrics">> get =>
, tags => ?API_TAG_MQTT #{
, parameters => [topic(path)] description => <<"Get topic metrics">>,
, responses => tags => ?API_TAG_MQTT,
#{ 200 => mk(ref(topic_metrics), #{ desc => <<"Topic metrics">> }) parameters => [topic(path)],
, 404 => emqx_dashboard_swagger:error_codes([?TOPIC_NOT_FOUND], responses =>
<<"Topic not found">>) #{
} 200 => mk(ref(topic_metrics), #{desc => <<"Topic metrics">>}),
404 => emqx_dashboard_swagger:error_codes(
[?TOPIC_NOT_FOUND],
<<"Topic not found">>
)
}
},
delete =>
#{
description => <<"Remove the topic metrics">>,
tags => ?API_TAG_MQTT,
parameters => [topic(path)],
responses =>
#{
204 => <<"Removed topic metrics successfully">>,
404 => emqx_dashboard_swagger:error_codes(
[?TOPIC_NOT_FOUND],
<<"Topic not found">>
)
}
} }
, delete => }.
#{ description => <<"Remove the topic metrics">>
, tags => ?API_TAG_MQTT
, parameters => [topic(path)]
, responses =>
#{ 204 => <<"Removed topic metrics successfully">>,
404 => emqx_dashboard_swagger:error_codes([?TOPIC_NOT_FOUND],
<<"Topic not found">>)
}
}
}.
fields(reset) -> fields(reset) ->
[ {topic [
, mk( binary() {topic,
, #{ desc => mk(
<<"Topic Name. If this parameter is not present," binary(),
" all created topic metrics will be reset">> #{
, example => <<"testtopic/1">> desc =>
, required => false})} <<
, {action "Topic Name. If this parameter is not present,"
, mk( string() " all created topic metrics will be reset"
, #{ desc => <<"Action Name. Only as a \"reset\"">> >>,
, enum => [reset] example => <<"testtopic/1">>,
, required => true required => false
, example => <<"reset">>})} }
)},
{action,
mk(
string(),
#{
desc => <<"Action Name. Only as a \"reset\"">>,
enum => [reset],
required => true,
example => <<"reset">>
}
)}
]; ];
fields(topic_metrics) -> fields(topic_metrics) ->
[ { topic [
, mk( binary() {topic,
, #{ desc => <<"Topic Name">> mk(
, example => <<"testtopic/1">> binary(),
, required => true})}, #{
{ create_time desc => <<"Topic Name">>,
, mk( emqx_datetime:epoch_second() example => <<"testtopic/1">>,
, #{ desc => <<"Topic Metrics created date time, in rfc3339">> required => true
, required => true }
, example => <<"2022-01-14T21:48:47+08:00">>})}, )},
{ reset_time {create_time,
, mk( emqx_datetime:epoch_second() mk(
, #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reset">> emqx_datetime:epoch_second(),
, required => false #{
, example => <<"2022-01-14T21:48:47+08:00">>})}, desc => <<"Topic Metrics created date time, in rfc3339">>,
{ metrics required => true,
, mk( ref(metrics) example => <<"2022-01-14T21:48:47+08:00">>
, #{ desc => <<"Topic Metrics fields">> }
, required => true}) )},
} {reset_time,
mk(
emqx_datetime:epoch_second(),
#{
desc =>
<<"Topic Metrics reset date time, in rfc3339. Nullable if never reset">>,
required => false,
example => <<"2022-01-14T21:48:47+08:00">>
}
)},
{metrics,
mk(
ref(metrics),
#{
desc => <<"Topic Metrics fields">>,
required => true
}
)}
]; ];
fields(metrics) -> fields(metrics) ->
[ {'messages.dropped.count', mk(integer(), [
#{ desc => <<"Message dropped count">> {'messages.dropped.count',
, example => 0 mk(
})} integer(),
, {'messages.dropped.rate', mk(number(), #{
#{ desc => <<"Message dropped rate in 5s">> desc => <<"Message dropped count">>,
, example => 0 example => 0
})} }
, {'messages.in.count', mk(integer(), )},
#{ desc => <<"Message received count">> {'messages.dropped.rate',
, example => 0 mk(
})} number(),
, {'messages.in.rate', mk(number(), #{
#{ desc => <<"Message received rate in 5s">> desc => <<"Message dropped rate in 5s">>,
, example => 0 example => 0
})} }
, {'messages.out.count', mk(integer(), )},
#{ desc => <<"Message sent count">> {'messages.in.count',
, example => 0 mk(
})} integer(),
, {'messages.out.rate', mk(number(), #{
#{ desc => <<"Message sent rate in 5s">> desc => <<"Message received count">>,
, example => 0 example => 0
})} }
, {'messages.qos0.in.count', mk(integer(), )},
#{ desc => <<"Message with QoS 0 received count">> {'messages.in.rate',
, example => 0 mk(
})} number(),
, {'messages.qos0.in.rate', mk(number(), #{
#{ desc => <<"Message with QoS 0 received rate in 5s">> desc => <<"Message received rate in 5s">>,
, example => 0 example => 0
})} }
, {'messages.qos0.out.count', mk(integer(), )},
#{ desc => <<"Message with QoS 0 sent count">> {'messages.out.count',
, example => 0 mk(
})} integer(),
, {'messages.qos0.out.rate', mk(number(), #{
#{ desc => <<"Message with QoS 0 sent rate in 5s">> desc => <<"Message sent count">>,
, example => 0 example => 0
})} }
, {'messages.qos1.in.count', mk(integer(), )},
#{ desc => <<"Message with QoS 1 received count">> {'messages.out.rate',
, example => 0 mk(
})} number(),
, {'messages.qos1.in.rate', mk(number(), #{
#{ desc => <<"Message with QoS 1 received rate in 5s">> desc => <<"Message sent rate in 5s">>,
, example => 0 example => 0
})} }
, {'messages.qos1.out.count', mk(integer(), )},
#{ desc => <<"Message with QoS 1 sent count">> {'messages.qos0.in.count',
, example => 0 mk(
})} integer(),
, {'messages.qos1.out.rate', mk(number(), #{
#{ desc => <<"Message with QoS 1 sent rate in 5s">> desc => <<"Message with QoS 0 received count">>,
, example => 0 example => 0
})} }
, {'messages.qos2.in.count', mk(integer(), )},
#{ desc => <<"Message with QoS 2 sent count">> {'messages.qos0.in.rate',
, example => 0 mk(
})} number(),
, {'messages.qos2.in.rate', mk(number(), #{
#{ desc => <<"Message with QoS 2 received rate in 5s">> desc => <<"Message with QoS 0 received rate in 5s">>,
, example => 0 example => 0
})} }
, {'messages.qos2.out.count', mk(integer(), )},
#{ desc => <<"Message with QoS 2 sent count">> {'messages.qos0.out.count',
, example => 0 mk(
})} integer(),
, {'messages.qos2.out.rate', mk(number(), #{
#{ desc => <<"Message with QoS 2 sent rate in 5s">> desc => <<"Message with QoS 0 sent count">>,
, example => 0 example => 0
})} }
)},
{'messages.qos0.out.rate',
mk(
number(),
#{
desc => <<"Message with QoS 0 sent rate in 5s">>,
example => 0
}
)},
{'messages.qos1.in.count',
mk(
integer(),
#{
desc => <<"Message with QoS 1 received count">>,
example => 0
}
)},
{'messages.qos1.in.rate',
mk(
number(),
#{
desc => <<"Message with QoS 1 received rate in 5s">>,
example => 0
}
)},
{'messages.qos1.out.count',
mk(
integer(),
#{
desc => <<"Message with QoS 1 sent count">>,
example => 0
}
)},
{'messages.qos1.out.rate',
mk(
number(),
#{
desc => <<"Message with QoS 1 sent rate in 5s">>,
example => 0
}
)},
{'messages.qos2.in.count',
mk(
integer(),
#{
desc => <<"Message with QoS 2 sent count">>,
example => 0
}
)},
{'messages.qos2.in.rate',
mk(
number(),
#{
desc => <<"Message with QoS 2 received rate in 5s">>,
example => 0
}
)},
{'messages.qos2.out.count',
mk(
integer(),
#{
desc => <<"Message with QoS 2 sent count">>,
example => 0
}
)},
{'messages.qos2.out.rate',
mk(
number(),
#{
desc => <<"Message with QoS 2 sent rate in 5s">>,
example => 0
}
)}
]. ].
topic(In) -> topic(In) ->
@ -235,30 +367,35 @@ topic(In) ->
path -> path ->
Desc = <<"Notice: Topic string in url path must be encoded">> Desc = <<"Notice: Topic string in url path must be encoded">>
end, end,
{ topic {topic,
, mk( binary(), mk(
#{ desc => Desc binary(),
, required => true #{
, in => In desc => Desc,
, example => <<"testtopic/1">> required => true,
}) in => In,
}. example => <<"testtopic/1">>
}
)}.
reset_examples() -> reset_examples() ->
#{ reset_specific_one_topic_metrics => #{
#{ summary => <<"reset_specific_one_topic_metrics">> reset_specific_one_topic_metrics =>
, value => #{
#{ topic => "testtopic/1" summary => <<"reset_specific_one_topic_metrics">>,
, action => "reset" value =>
} #{
topic => "testtopic/1",
action => "reset"
}
},
reset_all_topic_metrics =>
#{
summary => <<"reset_all_topic_metrics">>,
value =>
#{action => "reset"}
} }
, reset_all_topic_metrics => }.
#{ summary => <<"reset_all_topic_metrics">>
, value =>
#{ action => "reset"
}
}
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% HTTP Callbacks %% HTTP Callbacks
@ -266,7 +403,6 @@ reset_examples() ->
topic_metrics(get, _) -> topic_metrics(get, _) ->
get_cluster_response([]); get_cluster_response([]);
topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) ->
case reset(Topic) of case reset(Topic) of
ok -> ok ->
@ -277,7 +413,6 @@ topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>
topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) ->
reset(), reset(),
get_cluster_response([]); get_cluster_response([]);
topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) ->
{400, 'BAD_REQUEST', <<"Topic can not be empty">>}; {400, 'BAD_REQUEST', <<"Topic can not be empty">>};
topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> topic_metrics(post, #{body := #{<<"topic">> := Topic}}) ->
@ -290,7 +425,6 @@ topic_metrics(post, #{body := #{<<"topic">> := Topic}}) ->
operate_topic_metrics(get, #{bindings := #{topic := Topic}}) -> operate_topic_metrics(get, #{bindings := #{topic := Topic}}) ->
get_cluster_response([Topic]); get_cluster_response([Topic]);
operate_topic_metrics(delete, #{bindings := #{topic := Topic}}) -> operate_topic_metrics(delete, #{bindings := #{topic := Topic}}) ->
case emqx_modules_conf:remove_topic_metrics(Topic) of case emqx_modules_conf:remove_topic_metrics(Topic) of
ok -> {204}; ok -> {204};
@ -314,12 +448,19 @@ cluster_accumulation_metrics(Topic) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
case emqx_topic_metrics_proto_v1:metrics(Nodes, Topic) of case emqx_topic_metrics_proto_v1:metrics(Nodes, Topic) of
{SuccResList, []} -> {SuccResList, []} ->
case lists:filter(fun({error, _}) -> false; (_) -> true case
end, SuccResList) of lists:filter(
fun
({error, _}) -> false;
(_) -> true
end,
SuccResList
)
of
[] -> [] ->
{error, topic_not_found}; {error, topic_not_found};
TopicMetrics -> TopicMetrics ->
NTopicMetrics = [ [T] || T <- TopicMetrics], NTopicMetrics = [[T] || T <- TopicMetrics],
[AccMetrics] = accumulate_nodes_metrics(NTopicMetrics), [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics),
{ok, AccMetrics} {ok, AccMetrics}
end; end;
@ -328,42 +469,72 @@ cluster_accumulation_metrics(Topic) ->
end. end.
accumulate_nodes_metrics(NodesTopicMetrics) -> accumulate_nodes_metrics(NodesTopicMetrics) ->
AccMap = lists:foldl(fun(TopicMetrics, ExAcc) -> AccMap = lists:foldl(
MetricsMap = lists:foldl( fun(TopicMetrics, ExAcc) ->
fun(#{topic := Topic, MetricsMap = lists:foldl(
metrics := Metrics, fun(
create_time := CreateTime}, Acc) -> #{
Acc#{Topic => {Metrics, CreateTime}} topic := Topic,
end, #{}, TopicMetrics), metrics := Metrics,
accumulate_metrics(MetricsMap, ExAcc) create_time := CreateTime
end, #{}, NodesTopicMetrics), },
maps:fold(fun(Topic, {Metrics, CreateTime1}, Acc1) -> Acc
[#{topic => Topic, ) ->
metrics => Metrics, Acc#{Topic => {Metrics, CreateTime}}
create_time => CreateTime1} | Acc1] end,
end, [], AccMap). #{},
TopicMetrics
),
accumulate_metrics(MetricsMap, ExAcc)
end,
#{},
NodesTopicMetrics
),
maps:fold(
fun(Topic, {Metrics, CreateTime1}, Acc1) ->
[
#{
topic => Topic,
metrics => Metrics,
create_time => CreateTime1
}
| Acc1
]
end,
[],
AccMap
).
%% @doc TopicMetricsIn :: #{<<"topic">> := {Metrics, CreateTime}} %% @doc TopicMetricsIn :: #{<<"topic">> := {Metrics, CreateTime}}
accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) -> accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) ->
Topics = maps:keys(TopicMetricsIn), Topics = maps:keys(TopicMetricsIn),
lists:foldl(fun(Topic, Acc) -> lists:foldl(
{Metrics, CreateTime} = maps:get(Topic, TopicMetricsIn), fun(Topic, Acc) ->
NMetrics = do_accumulation_metrics( {Metrics, CreateTime} = maps:get(Topic, TopicMetricsIn),
Metrics, NMetrics = do_accumulation_metrics(
maps:get(Topic, TopicMetricsAcc, undefined) Metrics,
), maps:get(Topic, TopicMetricsAcc, undefined)
maps:put(Topic, {NMetrics, CreateTime}, Acc) ),
end, #{}, Topics). maps:put(Topic, {NMetrics, CreateTime}, Acc)
end,
#{},
Topics
).
%% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...} %% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...}
do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn; do_accumulation_metrics(MetricsIn, undefined) ->
MetricsIn;
do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) -> do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) ->
Keys = maps:keys(MetricsIn), Keys = maps:keys(MetricsIn),
lists:foldl(fun(Key, Acc) -> lists:foldl(
InVal = maps:get(Key, MetricsIn), fun(Key, Acc) ->
NVal = InVal + maps:get(Key, MetricsAcc, 0), InVal = maps:get(Key, MetricsIn),
maps:put(Key, NVal, Acc) NVal = InVal + maps:get(Key, MetricsAcc, 0),
end, #{}, Keys). maps:put(Key, NVal, Acc)
end,
#{},
Keys
).
reset() -> reset() ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
@ -374,8 +545,15 @@ reset(Topic) ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
case emqx_topic_metrics_proto_v1:reset(Nodes, Topic) of case emqx_topic_metrics_proto_v1:reset(Nodes, Topic) of
{SuccResList, []} -> {SuccResList, []} ->
case lists:filter(fun({error, _}) -> true; (_) -> false case
end, SuccResList) of lists:filter(
fun
({error, _}) -> true;
(_) -> false
end,
SuccResList
)
of
[{error, Reason} | _] -> [{error, Reason} | _] ->
{error, Reason}; {error, Reason};
[] -> [] ->
@ -388,17 +566,22 @@ reset(Topic) ->
reason2httpresp(quota_exceeded) -> reason2httpresp(quota_exceeded) ->
Msg = list_to_binary( Msg = list_to_binary(
io_lib:format("Max topic metrics count is ~p", io_lib:format(
[emqx_topic_metrics:max_limit()])), "Max topic metrics count is ~p",
[emqx_topic_metrics:max_limit()]
)
),
{409, #{code => ?EXCEED_LIMIT, message => Msg}}; {409, #{code => ?EXCEED_LIMIT, message => Msg}};
reason2httpresp(bad_topic) -> reason2httpresp(bad_topic) ->
Msg = <<"Bad Topic, topic cannot have wildcard">>, Msg = <<"Bad Topic, topic cannot have wildcard">>,
{400, #{code => ?BAD_TOPIC, message => Msg}}; {400, #{code => ?BAD_TOPIC, message => Msg}};
reason2httpresp({quota_exceeded, bad_topic}) -> reason2httpresp({quota_exceeded, bad_topic}) ->
Msg = list_to_binary( Msg = list_to_binary(
io_lib:format( io_lib:format(
"Max topic metrics count is ~p, and topic cannot have wildcard", "Max topic metrics count is ~p, and topic cannot have wildcard",
[emqx_topic_metrics:max_limit()])), [emqx_topic_metrics:max_limit()]
)
),
{400, #{code => ?BAD_REQUEST, message => Msg}}; {400, #{code => ?BAD_REQUEST, message => Msg}};
reason2httpresp(already_existed) -> reason2httpresp(already_existed) ->
Msg = <<"Topic already registered">>, Msg = <<"Topic already registered">>,

View File

@ -49,7 +49,7 @@ end_per_suite(_) ->
t_load_case(_) -> t_load_case(_) ->
Hooks = emqx_hooks:lookup('message.publish'), Hooks = emqx_hooks:lookup('message.publish'),
MFA = {emqx_delayed,on_message_publish,[]}, MFA = {emqx_delayed, on_message_publish, []},
?assertEqual(false, lists:keyfind(MFA, 2, Hooks)), ?assertEqual(false, lists:keyfind(MFA, 2, Hooks)),
ok = emqx_delayed:enable(), ok = emqx_delayed:enable(),
Hooks1 = emqx_hooks:lookup('message.publish'), Hooks1 = emqx_hooks:lookup('message.publish'),
@ -59,7 +59,10 @@ t_load_case(_) ->
t_delayed_message(_) -> t_delayed_message(_) ->
ok = emqx_delayed:enable(), ok = emqx_delayed:enable(),
DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>), DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>),
?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)), ?assertEqual(
{stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}},
on_message_publish(DelayedMsg)
),
Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>), Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>),
?assertEqual({ok, Msg}, on_message_publish(Msg)), ?assertEqual({ok, Msg}, on_message_publish(Msg)),

View File

@ -22,16 +22,20 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(EVENT_MESSAGE, <<""" -define(EVENT_MESSAGE, <<
event_message: { ""
client_connected: true "\n"
client_disconnected: true "event_message: {\n"
client_subscribed: true " client_connected: true\n"
client_unsubscribed: true " client_disconnected: true\n"
message_delivered: true " client_subscribed: true\n"
message_acked: true " client_unsubscribed: true\n"
message_dropped: true " message_delivered: true\n"
}""">>). " message_acked: true\n"
" message_dropped: true\n"
"}"
""
>>).
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
@ -49,8 +53,10 @@ t_event_topic(_) ->
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1),
{ok, C2} = emqtt:start_link([{clientid, <<"clientid">>}, {ok, C2} = emqtt:start_link([
{username, <<"username">>}]), {clientid, <<"clientid">>},
{username, <<"username">>}
]),
{ok, _} = emqtt:connect(C2), {ok, _} = emqtt:connect(C2),
ok = recv_connected(<<"clientid">>), ok = recv_connected(<<"clientid">>),
@ -72,7 +78,7 @@ t_event_topic(_) ->
recv_message_acked(<<"clientid">>), recv_message_acked(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1),
ok= emqtt:publish(C2, <<"test_sub1">>, <<"test">>), ok = emqtt:publish(C2, <<"test_sub1">>, <<"test">>),
recv_message_dropped(<<"clientid">>), recv_message_dropped(<<"clientid">>),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1),
@ -94,15 +100,19 @@ t_reason(_) ->
recv_connected(ClientId) -> recv_connected(ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
?assertMatch(<<"$event/client_connected">>, Topic), ?assertMatch(<<"$event/client_connected">>, Topic),
?assertMatch(#{<<"clientid">> := ClientId, ?assertMatch(
<<"username">> := <<"username">>, #{
<<"ipaddress">> := <<"127.0.0.1">>, <<"clientid">> := ClientId,
<<"proto_name">> := <<"MQTT">>, <<"username">> := <<"username">>,
<<"proto_ver">> := ?MQTT_PROTO_V4, <<"ipaddress">> := <<"127.0.0.1">>,
<<"clean_start">> := true, <<"proto_name">> := <<"MQTT">>,
<<"expiry_interval">> := 0, <<"proto_ver">> := ?MQTT_PROTO_V4,
<<"keepalive">> := 60 <<"clean_start">> := true,
}, emqx_json:decode(Payload, [return_maps])). <<"expiry_interval">> := 0,
<<"keepalive">> := 60
},
emqx_json:decode(Payload, [return_maps])
).
recv_subscribed(_ClientId) -> recv_subscribed(_ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
@ -128,9 +138,14 @@ recv_unsubscribed(_ClientId) ->
recv_disconnected(ClientId) -> recv_disconnected(ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
?assertMatch(<<"$event/client_disconnected">>, Topic), ?assertMatch(<<"$event/client_disconnected">>, Topic),
?assertMatch(#{<<"clientid">> := ClientId, ?assertMatch(
<<"username">> := <<"username">>, #{
<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])). <<"clientid">> := ClientId,
<<"username">> := <<"username">>,
<<"reason">> := <<"normal">>
},
emqx_json:decode(Payload, [return_maps])
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
@ -140,6 +155,5 @@ receive_publish(Timeout) ->
receive receive
{publish, Publish} -> {publish, Publish} ->
{ok, Publish} {ok, Publish}
after after Timeout -> {error, timeout}
Timeout -> {error, timeout}
end. end.

View File

@ -48,4 +48,3 @@ t_topic_metrics_list(_) ->
t_topic_metrics_add_remove(_) -> t_topic_metrics_add_remove(_) ->
ok. ok.

View File

@ -22,27 +22,31 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(REWRITE, <<""" -define(REWRITE, <<
rewrite: [ ""
{ "\n"
action : publish "rewrite: [\n"
source_topic : \"x/#\" " {\n"
re : \"^x/y/(.+)$\" " action : publish\n"
dest_topic : \"z/y/$1\" " source_topic : \"x/#\"\n"
}, " re : \"^x/y/(.+)$\"\n"
{ " dest_topic : \"z/y/$1\"\n"
action : subscribe " },\n"
source_topic : \"y/+/z/#\" " {\n"
re : \"^y/(.+)/z/(.+)$\" " action : subscribe\n"
dest_topic : \"y/z/$2\" " source_topic : \"y/+/z/#\"\n"
}, " re : \"^y/(.+)/z/(.+)$\"\n"
{ " dest_topic : \"y/z/$2\"\n"
action : all " },\n"
source_topic : \"all/+/x/#\" " {\n"
re : \"^all/(.+)/x/(.+)$\" " action : all\n"
dest_topic : \"all/x/$2\" " source_topic : \"all/+/x/#\"\n"
} " re : \"^all/(.+)/x/(.+)$\"\n"
]""">>). " dest_topic : \"all/x/$2\"\n"
" }\n"
"]"
""
>>).
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
@ -62,11 +66,14 @@ t_subscribe_rewrite(_Config) ->
timer:sleep(150), timer:sleep(150),
Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>), Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]), ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
RecvTopics = [begin RecvTopics = [
ok = emqtt:publish(Conn, Topic, <<"payload">>), begin
{ok, #{topic := RecvTopic}} = receive_publish(100), ok = emqtt:publish(Conn, Topic, <<"payload">>),
RecvTopic {ok, #{topic := RecvTopic}} = receive_publish(100),
end || Topic <- SubDestTopics], RecvTopic
end
|| Topic <- SubDestTopics
],
?assertEqual(SubDestTopics, RecvTopics), ?assertEqual(SubDestTopics, RecvTopics),
{ok, _, _} = emqtt:unsubscribe(Conn, SubOrigTopics), {ok, _, _} = emqtt:unsubscribe(Conn, SubOrigTopics),
timer:sleep(100), timer:sleep(100),
@ -79,11 +86,14 @@ t_publish_rewrite(_Config) ->
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],
PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>], PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>],
{ok, _Props2, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), {ok, _Props2, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
RecvTopics = [begin RecvTopics = [
ok = emqtt:publish(Conn, Topic, <<"payload">>), begin
{ok, #{topic := RecvTopic}} = receive_publish(100), ok = emqtt:publish(Conn, Topic, <<"payload">>),
RecvTopic {ok, #{topic := RecvTopic}} = receive_publish(100),
end || Topic <- PubOrigTopics], RecvTopic
end
|| Topic <- PubOrigTopics
],
?assertEqual(PubDestTopics, RecvTopics), ?assertEqual(PubDestTopics, RecvTopics),
{ok, _, _} = emqtt:unsubscribe(Conn, PubDestTopics), {ok, _, _} = emqtt:unsubscribe(Conn, PubDestTopics),
terminate(Conn). terminate(Conn).
@ -96,17 +106,19 @@ t_rewrite_rule(_Config) ->
?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)).
t_rewrite_re_error(_Config) -> t_rewrite_re_error(_Config) ->
Rules = [#{ Rules = [
action => subscribe, #{
source_topic => "y/+/z/#", action => subscribe,
re => "{^y/(.+)/z/(.+)$*", source_topic => "y/+/z/#",
dest_topic => "\"y/z/$2" re => "{^y/(.+)/z/(.+)$*",
}], dest_topic => "\"y/z/$2"
}
],
Error = { Error = {
"y/+/z/#", "y/+/z/#",
"{^y/(.+)/z/(.+)$*", "{^y/(.+)/z/(.+)$*",
"\"y/z/$2", "\"y/z/$2",
{"nothing to repeat",16} {"nothing to repeat", 16}
}, },
?assertEqual({[], [], [Error]}, emqx_rewrite:compile(Rules)), ?assertEqual({[], [], [Error]}, emqx_rewrite:compile(Rules)),
ok. ok.
@ -114,30 +126,39 @@ t_rewrite_re_error(_Config) ->
t_list(_Config) -> t_list(_Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE), ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
Expect = [ Expect = [
#{<<"action">> => <<"publish">>, #{
<<"action">> => <<"publish">>,
<<"dest_topic">> => <<"z/y/$1">>, <<"dest_topic">> => <<"z/y/$1">>,
<<"re">> => <<"^x/y/(.+)$">>, <<"re">> => <<"^x/y/(.+)$">>,
<<"source_topic">> => <<"x/#">>}, <<"source_topic">> => <<"x/#">>
#{<<"action">> => <<"subscribe">>, },
#{
<<"action">> => <<"subscribe">>,
<<"dest_topic">> => <<"y/z/$2">>, <<"dest_topic">> => <<"y/z/$2">>,
<<"re">> => <<"^y/(.+)/z/(.+)$">>, <<"re">> => <<"^y/(.+)/z/(.+)$">>,
<<"source_topic">> => <<"y/+/z/#">>}, <<"source_topic">> => <<"y/+/z/#">>
#{<<"action">> => <<"all">>, },
#{
<<"action">> => <<"all">>,
<<"dest_topic">> => <<"all/x/$2">>, <<"dest_topic">> => <<"all/x/$2">>,
<<"re">> => <<"^all/(.+)/x/(.+)$">>, <<"re">> => <<"^all/(.+)/x/(.+)$">>,
<<"source_topic">> => <<"all/+/x/#">>}], <<"source_topic">> => <<"all/+/x/#">>
}
],
?assertEqual(Expect, emqx_rewrite:list()), ?assertEqual(Expect, emqx_rewrite:list()),
ok. ok.
t_update(_Config) -> t_update(_Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE), ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
Init = emqx_rewrite:list(), Init = emqx_rewrite:list(),
Rules = [#{ Rules = [
<<"source_topic">> => <<"test/#">>, #{
<<"re">> => <<"test/*">>, <<"source_topic">> => <<"test/#">>,
<<"dest_topic">> => <<"test1/$2">>, <<"re">> => <<"test/*">>,
<<"action">> => <<"publish">> <<"dest_topic">> => <<"test1/$2">>,
}], <<"action">> => <<"publish">>
}
],
ok = emqx_rewrite:update(Rules), ok = emqx_rewrite:update(Rules),
?assertEqual(Rules, emqx_rewrite:list()), ?assertEqual(Rules, emqx_rewrite:list()),
ok = emqx_rewrite:update(Init), ok = emqx_rewrite:update(Init),
@ -161,26 +182,25 @@ t_update_disable(_Config) ->
t_update_re_failed(_Config) -> t_update_re_failed(_Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE), ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
Re = <<"*^test/*">>, Re = <<"*^test/*">>,
Rules = [#{ Rules = [
<<"source_topic">> => <<"test/#">>, #{
<<"re">> => Re, <<"source_topic">> => <<"test/#">>,
<<"dest_topic">> => <<"test1/$2">>, <<"re">> => Re,
<<"action">> => <<"publish">> <<"dest_topic">> => <<"test1/$2">>,
}], <<"action">> => <<"publish">>
?assertError({badmatch,
{error,
{_,
[
{validation_error,
#{
reason := {Re, {"nothing to repeat", 0}},
value := Re
}
}
]
}
} }
}, emqx_rewrite:update(Rules)), ],
?assertError(
{badmatch,
{error,
{_, [
{validation_error, #{
reason := {Re, {"nothing to repeat", 0}},
value := Re
}}
]}}},
emqx_rewrite:update(Rules)
),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -190,8 +210,7 @@ t_update_re_failed(_Config) ->
receive_publish(Timeout) -> receive_publish(Timeout) ->
receive receive
{publish, Publish} -> {ok, Publish} {publish, Publish} -> {ok, Publish}
after after Timeout -> {error, timeout}
Timeout -> {error, timeout}
end. end.
init() -> init() ->

View File

@ -40,32 +40,38 @@ init_per_testcase(t_get_telemetry, Config) ->
TestPID = self(), TestPID = self(),
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]), ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) -> ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) ->
TestPID ! {request, Method, URL, Headers, Body} TestPID ! {request, Method, URL, Headers, Body}
end), end),
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]), ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_telemetry, read_raw_build_info, ok = meck:expect(
fun() -> emqx_telemetry,
{ok, Path} = file:read_link(filename:join([DataDir, "BUILD_INFO"])), read_raw_build_info,
{ok, Template} = file:read_file(Path), fun() ->
Vars0 = [ {build_info_arch, "arch"} {ok, Path} = file:read_link(filename:join([DataDir, "BUILD_INFO"])),
, {build_info_wordsize, "64"} {ok, Template} = file:read_file(Path),
, {build_info_os, "os"} Vars0 = [
, {build_info_erlang, "erlang"} {build_info_arch, "arch"},
, {build_info_elixir, "elixir"} {build_info_wordsize, "64"},
, {build_info_relform, "relform"} {build_info_os, "os"},
], {build_info_erlang, "erlang"},
Vars = [{atom_to_list(K), iolist_to_binary(V)} {build_info_elixir, "elixir"},
|| {K, V} <- Vars0], {build_info_relform, "relform"}
Rendered = bbmustache:render(Template, Vars), ],
{ok, Rendered} Vars = [
end), {atom_to_list(K), iolist_to_binary(V)}
|| {K, V} <- Vars0
],
Rendered = bbmustache:render(Template, Vars),
{ok, Rendered}
end
),
Config; Config;
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
TestPID = self(), TestPID = self(),
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]), ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) -> ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) ->
TestPID ! {request, Method, URL, Headers, Body} TestPID ! {request, Method, URL, Headers, Body}
end), end),
Config. Config.
end_per_testcase(t_get_telemetry, _Config) -> end_per_testcase(t_get_telemetry, _Config) ->
@ -113,14 +119,16 @@ t_get_telemetry(_Config) ->
?assertEqual(0, get_value(num_clients, TelemetryData)), ?assertEqual(0, get_value(num_clients, TelemetryData)),
BuildInfo = get_value(build_info, TelemetryData), BuildInfo = get_value(build_info, TelemetryData),
?assertMatch( ?assertMatch(
#{ <<"arch">> := <<_/binary>> #{
, <<"elixir">> := <<_/binary>> <<"arch">> := <<_/binary>>,
, <<"erlang">> := <<_/binary>> <<"elixir">> := <<_/binary>>,
, <<"os">> := <<_/binary>> <<"erlang">> := <<_/binary>>,
, <<"relform">> := <<_/binary>> <<"os">> := <<_/binary>>,
, <<"wordsize">> := Wordsize <<"relform">> := <<_/binary>>,
<<"wordsize">> := Wordsize
} when is_integer(Wordsize), } when is_integer(Wordsize),
BuildInfo), BuildInfo
),
VMSpecs = get_value(vm_specs, TelemetryData), VMSpecs = get_value(vm_specs, TelemetryData),
?assert(is_integer(get_value(num_cpus, VMSpecs))), ?assert(is_integer(get_value(num_cpus, VMSpecs))),
?assert(0 =< get_value(num_cpus, VMSpecs)), ?assert(0 =< get_value(num_cpus, VMSpecs)),

View File

@ -19,9 +19,12 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-define(TOPIC, <<
-define(TOPIC, <<""" ""
topic_metrics: []""">>). "\n"
"topic_metrics: []"
""
>>).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -47,7 +50,12 @@ t_nonexistent_topic_metrics(_) ->
?assertEqual({error, invalid_metric}, emqx_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')),
% ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')),
%% ?assertEqual(
%% {error, invalid_metric},
%% emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')
%% ),
emqx_topic_metrics:deregister(<<"a/b/c">>), emqx_topic_metrics:deregister(<<"a/b/c">>),
emqx_topic_metrics:disable(). emqx_topic_metrics:disable().
@ -64,7 +72,12 @@ t_topic_metrics(_) ->
?assertEqual(ok, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), ?assertEqual(ok, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assert(emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0), ?assert(emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0),
% ?assert(emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}),
%% ?assert(
%% emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:=
%% #{long => 0, medium => 0, short => 0}
%% ),
emqx_topic_metrics:deregister(<<"a/b/c">>), emqx_topic_metrics:deregister(<<"a/b/c">>),
emqx_topic_metrics:disable(). emqx_topic_metrics:disable().
@ -78,9 +91,11 @@ t_hook(_) ->
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
{ok, C} = emqtt:start_link([{host, "localhost"}, {ok, C} = emqtt:start_link([
{clientid, "myclient"}, {host, "localhost"},
{username, "myuser"}]), {clientid, "myclient"},
{username, "myuser"}
]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0),
ct:sleep(100), ct:sleep(100),