diff --git a/apps/emqx_modules/rebar.config b/apps/emqx_modules/rebar.config index 3f84bb8df..9f17b7657 100644 --- a/apps/emqx_modules/rebar.config +++ b/apps/emqx_modules/rebar.config @@ -1,5 +1,5 @@ %% -*- mode: erlang -*- -{deps, - [ {emqx, {path, "../emqx"}} - ]}. +{deps, [{emqx, {path, "../emqx"}}]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 4fee7a000..be7ca3ba3 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -27,33 +27,36 @@ -boot_mnesia({mnesia, [boot]}). --export([ start_link/0 - , on_message_publish/1 - ]). +-export([ + start_link/0, + on_message_publish/1 +]). %% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). %% gen_server callbacks --export([ enable/0 - , disable/0 - , set_max_delayed_messages/1 - , update_config/1 - , list/1 - , get_delayed_message/1 - , get_delayed_message/2 - , delete_delayed_message/1 - , delete_delayed_message/2 - , post_config_update/5 - , cluster_list/1 - , cluster_query/4 - ]). +-export([ + enable/0, + disable/0, + set_max_delayed_messages/1, + update_config/1, + list/1, + get_delayed_message/1, + get_delayed_message/2, + delete_delayed_message/1, + delete_delayed_message/2, + post_config_update/5, + cluster_list/1, + cluster_query/4 +]). -export([format_delayed/1]). @@ -64,13 +67,13 @@ -export_type([with_id_return/0, with_id_return/1]). - --type state() :: #{ publish_timer := maybe(timer:tref()) - , publish_at := non_neg_integer() - , stats_timer := maybe(reference()) - , stats_fun := maybe(fun((pos_integer()) -> ok)) - , max_delayed_messages := non_neg_integer() - }. +-type state() :: #{ + publish_timer := maybe(timer:tref()), + publish_at := non_neg_integer(), + stats_timer := maybe(reference()), + stats_fun := maybe(fun((pos_integer()) -> ok)), + max_delayed_messages := non_neg_integer() +}. %% sync ms with record change -define(QUERY_MS(Id), [{{delayed_message, {'_', Id}, '_', '_'}, [], ['$_']}]). @@ -86,40 +89,42 @@ %%-------------------------------------------------------------------- mnesia(boot) -> ok = mria:create_table(?TAB, [ - {type, ordered_set}, - {storage, disc_copies}, - {local_content, true}, - {record_name, delayed_message}, - {attributes, record_info(fields, delayed_message)}]). + {type, ordered_set}, + {storage, disc_copies}, + {local_content, true}, + {record_name, delayed_message}, + {attributes, record_info(fields, delayed_message)} + ]). %%-------------------------------------------------------------------- %% Hooks %%-------------------------------------------------------------------- -on_message_publish(Msg = #message{ - id = Id, - topic = <<"$delayed/", Topic/binary>>, - timestamp = Ts - }) -> +on_message_publish( + Msg = #message{ + id = Id, + topic = <<"$delayed/", Topic/binary>>, + timestamp = Ts + } +) -> [Delay, Topic1] = binary:split(Topic, <<"/">>), - {PubAt, Delayed} = case binary_to_integer(Delay) of - Interval when Interval < ?MAX_INTERVAL -> - {Interval + erlang:round(Ts / 1000), Interval}; - Timestamp -> - %% Check malicious timestamp? - case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of - true -> error(invalid_delayed_timestamp); - false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)} - end - end, + {PubAt, Delayed} = + case binary_to_integer(Delay) of + Interval when Interval < ?MAX_INTERVAL -> + {Interval + erlang:round(Ts / 1000), Interval}; + Timestamp -> + %% Check malicious timestamp? + case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of + true -> error(invalid_delayed_timestamp); + false -> {Timestamp, Timestamp - erlang:round(Ts / 1000)} + end + end, PubMsg = Msg#message{topic = Topic1}, Headers = PubMsg#message.headers, case store(#delayed_message{key = {PubAt, Id}, delayed = Delayed, msg = PubMsg}) of ok -> ok; - {error, Error} -> - ?SLOG(error, #{msg => "store_delayed_message_fail", error => Error}) + {error, Error} -> ?SLOG(error, #{msg => "store_delayed_message_fail", error => Error}) end, {stop, PubMsg#message{headers = Headers#{allow_publish => false}}}; - on_message_publish(Msg) -> {ok, Msg}. @@ -127,12 +132,12 @@ on_message_publish(Msg) -> %% Start delayed publish server %%-------------------------------------------------------------------- --spec(start_link() -> emqx_types:startlink_ret()). +-spec start_link() -> emqx_types:startlink_ret(). start_link() -> Opts = emqx_conf:get([delayed], #{}), gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). --spec(store(delayed_message()) -> ok | {error, atom()}). +-spec store(delayed_message()) -> ok | {error, atom()}. store(DelayedMsg) -> gen_server:call(?SERVER, {store, DelayedMsg}, infinity). @@ -158,13 +163,21 @@ cluster_query(Table, _QsSpec, Continuation, Limit) -> format_delayed(Delayed) -> format_delayed(Delayed, false). -format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed, - msg = #message{topic = Topic, - from = From, - headers = Headers, - qos = Qos, - timestamp = PublishTimeStamp, - payload = Payload}}, WithPayload) -> +format_delayed( + #delayed_message{ + key = {ExpectTimeStamp, Id}, + delayed = Delayed, + msg = #message{ + topic = Topic, + from = From, + headers = Headers, + qos = Qos, + timestamp = PublishTimeStamp, + payload = Payload + } + }, + WithPayload +) -> PublishTime = to_rfc3339(PublishTimeStamp div 1000), ExpectTime = to_rfc3339(ExpectTimeStamp), RemainingTime = ExpectTimeStamp - erlang:system_time(second), @@ -202,7 +215,6 @@ get_delayed_message(Id) -> get_delayed_message(Node, Id) when Node =:= node() -> get_delayed_message(Id); - get_delayed_message(Node, Id) -> emqx_delayed_proto_v1:get_delayed_message(Node, Id). @@ -249,24 +261,32 @@ init([Opts]) -> erlang:process_flag(trap_exit, true), emqx_conf:add_handler([delayed], ?MODULE), MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0), - {ok, ensure_stats_event( - ensure_publish_timer(#{publish_timer => undefined, - publish_at => 0, - stats_timer => undefined, - stats_fun => undefined, - max_delayed_messages => MaxDelayedMessages}))}. + {ok, + ensure_stats_event( + ensure_publish_timer(#{ + publish_timer => undefined, + publish_at => 0, + stats_timer => undefined, + stats_fun => undefined, + max_delayed_messages => MaxDelayedMessages + }) + )}. handle_call({set_max_delayed_messages, Max}, _From, State) -> {reply, ok, State#{max_delayed_messages => Max}}; - -handle_call({store, DelayedMsg = #delayed_message{key = Key}}, - _From, State = #{max_delayed_messages := 0}) -> +handle_call( + {store, DelayedMsg = #delayed_message{key = Key}}, + _From, + State = #{max_delayed_messages := 0} +) -> ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)}; - -handle_call({store, DelayedMsg = #delayed_message{key = Key}}, - _From, State = #{max_delayed_messages := Max}) -> +handle_call( + {store, DelayedMsg = #delayed_message{key = Key}}, + _From, + State = #{max_delayed_messages := Max} +) -> Size = mnesia:table_info(?TAB, size), case Size >= Max of true -> @@ -276,15 +296,12 @@ handle_call({store, DelayedMsg = #delayed_message{key = Key}}, emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)} end; - handle_call(enable, _From, State) -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), {reply, ok, State}; - handle_call(disable, _From, State) -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), {reply, ok, State}; - handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -298,12 +315,10 @@ handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) -> DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys), {noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})}; - handle_info(stats, State = #{stats_fun := StatsFun}) -> StatsTimer = erlang:send_after(timer:seconds(1), self(), stats), StatsFun(delayed_count()), {noreply, State#{stats_timer := StatsTimer}, hibernate}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -336,8 +351,9 @@ ensure_publish_timer('$end_of_table', State) -> State#{publish_timer := undefined, publish_at := 0}; ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) -> ensure_publish_timer(Ts, os:system_time(seconds), State); -ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) - when Ts < PubAt -> +ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when + Ts < PubAt +-> ok = emqx_misc:cancel_timer(TRef), ensure_publish_timer(Ts, os:system_time(seconds), State); ensure_publish_timer(_Key, State) -> @@ -359,10 +375,9 @@ do_publish({Ts, _Id}, Now, Acc) when Ts > Now -> do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> case mnesia:dirty_read(?TAB, Key) of [] -> ok; - [#delayed_message{msg = Msg}] -> - emqx_pool:async_submit(fun emqx:publish/1, [Msg]) + [#delayed_message{msg = Msg}] -> emqx_pool:async_submit(fun emqx:publish/1, [Msg]) end, do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]). --spec(delayed_count() -> non_neg_integer()). +-spec delayed_count() -> non_neg_integer(). delayed_count() -> mnesia:table_info(?TAB, size). diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 061bdc472..57028c462 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -26,14 +26,17 @@ -define(MAX_PAYLOAD_LENGTH, 2048). -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). --export([ status/2 - , delayed_messages/2 - , delayed_message/2 - ]). +-export([ + status/2, + delayed_messages/2, + delayed_message/2 +]). --export([ paths/0 - , fields/1 - , schema/1]). +-export([ + paths/0, + fields/1, + schema/1 +]). %% for rpc -export([update_config_/1]). @@ -46,15 +49,17 @@ -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). -define(INVALID_NODE, 'INVALID_NODE'). --define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 +%% 1MB = 1024 x 1024 +-define(MAX_PAYLOAD_SIZE, 1048576). api_spec() -> emqx_dashboard_swagger:spec(?MODULE). paths() -> - [ "/mqtt/delayed" - , "/mqtt/delayed/messages" - , "/mqtt/delayed/messages/:node/:msgid" + [ + "/mqtt/delayed", + "/mqtt/delayed/messages", + "/mqtt/delayed/messages/:node/:msgid" ]. schema("/mqtt/delayed") -> @@ -73,47 +78,64 @@ schema("/mqtt/delayed") -> description => <<"Enable or disable delayed, set max delayed messages">>, 'requestBody' => ref(emqx_modules_schema, "delayed"), responses => #{ - 200 => mk(ref(emqx_modules_schema, "delayed"), - #{desc => <<"Enable or disable delayed successfully">>}), - 400 => emqx_dashboard_swagger:error_codes( [?BAD_REQUEST] - , <<"Max limit illegality">>) + 200 => mk( + ref(emqx_modules_schema, "delayed"), + #{desc => <<"Enable or disable delayed successfully">>} + ), + 400 => emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST], + <<"Max limit illegality">> + ) } } }; - schema("/mqtt/delayed/messages/:node/:msgid") -> - #{'operationId' => delayed_message, + #{ + 'operationId' => delayed_message, get => #{ tags => ?API_TAG_MQTT, description => <<"Get delayed message">>, - parameters => [ {node, - mk(binary(), - #{in => path, desc => <<"The node where message from">>})} - , {msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})} - ], + parameters => [ + {node, + mk( + binary(), + #{in => path, desc => <<"The node where message from">>} + )}, + {msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})} + ], responses => #{ 200 => ref("message_without_payload"), - 400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR] - , <<"Bad MsgId format">>), - 404 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_NOT_FOUND] - , <<"MsgId not found">>) + 400 => emqx_dashboard_swagger:error_codes( + [?MESSAGE_ID_SCHEMA_ERROR], + <<"Bad MsgId format">> + ), + 404 => emqx_dashboard_swagger:error_codes( + [?MESSAGE_ID_NOT_FOUND], + <<"MsgId not found">> + ) } }, delete => #{ tags => ?API_TAG_MQTT, description => <<"Delete delayed message">>, - parameters => [ {node, - mk(binary(), - #{in => path, desc => <<"The node where message from">>})} - , {msgid, - mk(binary(), #{in => path, desc => <<"Delay message ID">>})} - ], + parameters => [ + {node, + mk( + binary(), + #{in => path, desc => <<"The node where message from">>} + )}, + {msgid, mk(binary(), #{in => path, desc => <<"Delay message ID">>})} + ], responses => #{ 204 => <<"Delete delayed message success">>, - 400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR] - , <<"Bad MsgId format">>), - 404 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_NOT_FOUND] - , <<"MsgId not found">>) + 400 => emqx_dashboard_swagger:error_codes( + [?MESSAGE_ID_SCHEMA_ERROR], + <<"Bad MsgId format">> + ), + 404 => emqx_dashboard_swagger:error_codes( + [?MESSAGE_ID_NOT_FOUND], + <<"MsgId not found">> + ) } } }; @@ -126,44 +148,44 @@ schema("/mqtt/delayed/messages") -> parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)], responses => #{ 200 => - [ - {data, mk(hoconsc:array(ref("message")), #{})}, - {meta, [ - {page, mk(integer(), #{})}, - {limit, mk(integer(), #{})}, - {count, mk(integer(), #{})} - ]} - ] + [ + {data, mk(hoconsc:array(ref("message")), #{})}, + {meta, [ + {page, mk(integer(), #{})}, + {limit, mk(integer(), #{})}, + {count, mk(integer(), #{})} + ]} + ] } } }. fields("message_without_payload") -> [ - {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})}, - {node, mk(binary(), #{desc => <<"The node where message from">>})}, - {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})}, - {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})}, - {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})}, - {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})}, - {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})}, - {qos, mk(binary(), #{desc => <<"QoS">>})}, - {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})}, - {from_username, mk(binary(), #{desc => <<"From Username">>})} + {msgid, mk(integer(), #{desc => <<"Message Id (MQTT message id hash)">>})}, + {node, mk(binary(), #{desc => <<"The node where message from">>})}, + {publish_at, mk(binary(), #{desc => <<"Client publish message time, rfc 3339">>})}, + {delayed_interval, mk(integer(), #{desc => <<"Delayed interval, second">>})}, + {delayed_remaining, mk(integer(), #{desc => <<"Delayed remaining, second">>})}, + {expected_at, mk(binary(), #{desc => <<"Expect publish time, rfc 3339">>})}, + {topic, mk(binary(), #{desc => <<"Topic">>, example => <<"/sys/#">>})}, + {qos, mk(binary(), #{desc => <<"QoS">>})}, + {from_clientid, mk(binary(), #{desc => <<"From ClientId">>})}, + {from_username, mk(binary(), #{desc => <<"From Username">>})} ]; fields("message") -> PayloadDesc = io_lib:format( - "Payload, base64 encode. Payload will be ~p if length large than ~p", - [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]), + "Payload, base64 encode. Payload will be ~p if length large than ~p", + [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH] + ), fields("message_without_payload") ++ - [{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}]. + [{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}]. %%-------------------------------------------------------------------- %% HTTP API %%-------------------------------------------------------------------- status(get, _Params) -> {200, get_status()}; - status(put, #{body := Body}) -> update_config(Body). @@ -173,34 +195,37 @@ delayed_messages(get, #{query_string := Qs}) -> delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) -> MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1), MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1), - with_maybe([MaybeNode, MaybeId], - fun(Node, Id) -> - case emqx_delayed:get_delayed_message(Node, Id) of - {ok, Message} -> - Payload = maps:get(payload, Message), - case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of - true -> - {200, Message}; - _ -> - {200, Message#{payload => base64:encode(Payload)}} - end; - {error, not_found} -> - {404, generate_http_code_map(not_found, Id)} - end - end); - + with_maybe( + [MaybeNode, MaybeId], + fun(Node, Id) -> + case emqx_delayed:get_delayed_message(Node, Id) of + {ok, Message} -> + Payload = maps:get(payload, Message), + case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of + true -> + {200, Message}; + _ -> + {200, Message#{payload => base64:encode(Payload)}} + end; + {error, not_found} -> + {404, generate_http_code_map(not_found, Id)} + end + end + ); delayed_message(delete, #{bindings := #{node := NodeBin, msgid := HexId}}) -> MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1), MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1), - with_maybe([MaybeNode, MaybeId], - fun(Node, Id) -> - case emqx_delayed:delete_delayed_message(Node, Id) of - ok -> - {204}; - {error, not_found} -> - {404, generate_http_code_map(not_found, Id)} - end - end). + with_maybe( + [MaybeNode, MaybeId], + fun(Node, Id) -> + case emqx_delayed:delete_delayed_message(Node, Id) of + ok -> + {204}; + {error, not_found} -> + {404, generate_http_code_map(not_found, Id)} + end + end + ). %%-------------------------------------------------------------------- %% internal function @@ -241,25 +266,36 @@ update_config_(Config) -> {200, NewDelayed}; {error, Reason} -> Message = list_to_binary( - io_lib:format("Update config failed ~p", [Reason])), + io_lib:format("Update config failed ~p", [Reason]) + ), {500, ?INTERNAL_ERROR, Message} end. generate_http_code_map(id_schema_error, Id) -> - #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => - iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))}; + #{ + code => ?MESSAGE_ID_SCHEMA_ERROR, + message => + iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id])) + }; generate_http_code_map(not_found, Id) -> - #{code => ?MESSAGE_ID_NOT_FOUND, message => - iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}; + #{ + code => ?MESSAGE_ID_NOT_FOUND, + message => + iolist_to_binary(io_lib:format("Message ID ~p not found", [Id])) + }; generate_http_code_map(invalid_node, Node) -> - #{code => ?INVALID_NODE, message => - iolist_to_binary(io_lib:format("The node name ~p is invalid", [Node]))}. + #{ + code => ?INVALID_NODE, + message => + iolist_to_binary(io_lib:format("The node name ~p is invalid", [Node])) + }. make_maybe(X, Error, Fun) -> try Fun(X) of Right -> Right - catch _:_ -> + catch + _:_ -> {left, X, Error} end. diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl index b71f107fd..c462e7e25 100644 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ b/apps/emqx_modules/src/emqx_event_message.erl @@ -20,22 +20,24 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_modules.hrl"). --export([ list/0 - , update/1 - , enable/0 - , disable/0 - , post_config_update/5 - , init_conf_handler/0 - ]). +-export([ + list/0, + update/1, + enable/0, + disable/0, + post_config_update/5, + init_conf_handler/0 +]). --export([ on_client_connected/2 - , on_client_disconnected/3 - , on_client_subscribed/3 - , on_client_unsubscribed/3 - , on_message_dropped/3 - , on_message_delivered/2 - , on_message_acked/2 - ]). +-export([ + on_client_connected/2, + on_client_disconnected/3, + on_client_subscribed/3, + on_client_unsubscribed/3, + on_message_dropped/3, + on_message_delivered/2, + on_message_acked/2 +]). -ifdef(TEST). -export([reason/1]). @@ -48,9 +50,13 @@ list() -> emqx_conf:get([event_message], #{}). update(Params) -> - case emqx_conf:update([event_message], - Params, - #{rawconf_with_defaults => true, override_to => cluster}) of + case + emqx_conf:update( + [event_message], + Params, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of {ok, #{raw_config := NewEventMessage}} -> {ok, NewEventMessage}; {error, Reason} -> @@ -74,46 +80,61 @@ disable() -> on_client_connected(ClientInfo, ConnInfo) -> Payload0 = common_infos(ClientInfo, ConnInfo), Payload = Payload0#{ - keepalive => maps:get(keepalive, ConnInfo, 0), - clean_start => maps:get(clean_start, ConnInfo, true), - expiry_interval => maps:get(expiry_interval, ConnInfo, 0) - }, + keepalive => maps:get(keepalive, ConnInfo, 0), + clean_start => maps:get(clean_start, ConnInfo, true), + expiry_interval => maps:get(expiry_interval, ConnInfo, 0) + }, publish_event_msg(<<"$event/client_connected">>, Payload). -on_client_disconnected(ClientInfo, - Reason, ConnInfo = #{disconnected_at := DisconnectedAt}) -> - +on_client_disconnected( + ClientInfo, + Reason, + ConnInfo = #{disconnected_at := DisconnectedAt} +) -> Payload0 = common_infos(ClientInfo, ConnInfo), Payload = Payload0#{ - reason => reason(Reason), - disconnected_at => DisconnectedAt - }, + reason => reason(Reason), + disconnected_at => DisconnectedAt + }, publish_event_msg(<<"$event/client_disconnected">>, Payload). -on_client_subscribed(_ClientInfo = #{clientid := ClientId, - username := Username}, - Topic, SubOpts) -> - Payload = #{clientid => ClientId, - username => Username, - topic => Topic, - subopts => SubOpts, - ts => erlang:system_time(millisecond) - }, +on_client_subscribed( + _ClientInfo = #{ + clientid := ClientId, + username := Username + }, + Topic, + SubOpts +) -> + Payload = #{ + clientid => ClientId, + username => Username, + topic => Topic, + subopts => SubOpts, + ts => erlang:system_time(millisecond) + }, publish_event_msg(<<"$event/client_subscribed">>, Payload). -on_client_unsubscribed(_ClientInfo = #{clientid := ClientId, - username := Username}, - Topic, _SubOpts) -> - Payload = #{clientid => ClientId, - username => Username, - topic => Topic, - ts => erlang:system_time(millisecond) - }, +on_client_unsubscribed( + _ClientInfo = #{ + clientid := ClientId, + username := Username + }, + Topic, + _SubOpts +) -> + Payload = #{ + clientid => ClientId, + username => Username, + topic => Topic, + ts => erlang:system_time(millisecond) + }, publish_event_msg(<<"$event/client_unsubscribed">>, Payload). on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> case ignore_sys_message(Message) of - true -> ok; + true -> + ok; false -> Payload0 = base_message(Message), Payload = Payload0#{ @@ -126,13 +147,17 @@ on_message_dropped(Message = #message{from = ClientId}, _, Reason) -> end, {ok, Message}. -on_message_delivered(_ClientInfo = #{ - peerhost := PeerHost, - clientid := ReceiverCId, - username := ReceiverUsername}, - #message{from = ClientId} = Message) -> +on_message_delivered( + _ClientInfo = #{ + peerhost := PeerHost, + clientid := ReceiverCId, + username := ReceiverUsername + }, + #message{from = ClientId} = Message +) -> case ignore_sys_message(Message) of - true -> ok; + true -> + ok; false -> Payload0 = base_message(Message), Payload = Payload0#{ @@ -146,13 +171,17 @@ on_message_delivered(_ClientInfo = #{ end, {ok, Message}. -on_message_acked(_ClientInfo = #{ - peerhost := PeerHost, - clientid := ReceiverCId, - username := ReceiverUsername}, - #message{from = ClientId} = Message) -> +on_message_acked( + _ClientInfo = #{ + peerhost := PeerHost, + clientid := ReceiverCId, + username := ReceiverUsername + }, + #message{from = ClientId} = Message +) -> case ignore_sys_message(Message) of - true -> ok; + true -> + ok; false -> Payload0 = base_message(Message), Payload = Payload0#{ @@ -170,29 +199,36 @@ on_message_acked(_ClientInfo = #{ %% Helper functions %%-------------------------------------------------------------------- common_infos( - _ClientInfo = #{clientid := ClientId, - username := Username, - peerhost := PeerHost, - sockport := SockPort - }, - _ConnInfo = #{proto_name := ProtoName, - proto_ver := ProtoVer, - connected_at := ConnectedAt - }) -> - #{clientid => ClientId, - username => Username, - ipaddress => ntoa(PeerHost), - sockport => SockPort, - proto_name => ProtoName, - proto_ver => ProtoVer, - connected_at => ConnectedAt, - ts => erlang:system_time(millisecond) - }. + _ClientInfo = #{ + clientid := ClientId, + username := Username, + peerhost := PeerHost, + sockport := SockPort + }, + _ConnInfo = #{ + proto_name := ProtoName, + proto_ver := ProtoVer, + connected_at := ConnectedAt + } +) -> + #{ + clientid => ClientId, + username => Username, + ipaddress => ntoa(PeerHost), + sockport => SockPort, + proto_name => ProtoName, + proto_ver => ProtoVer, + connected_at => ConnectedAt, + ts => erlang:system_time(millisecond) + }. make_msg(Topic, Payload) -> emqx_message:set_flag( - sys, emqx_message:make( - ?MODULE, 0, Topic, iolist_to_binary(Payload))). + sys, + emqx_message:make( + ?MODULE, 0, Topic, iolist_to_binary(Payload) + ) + ). -compile({inline, [reason/1]}). reason(Reason) when is_atom(Reason) -> Reason; @@ -201,26 +237,33 @@ reason({Error, _}) when is_atom(Error) -> Error; reason(_) -> internal_error. ntoa(undefined) -> undefined; -ntoa({IpAddr, Port}) -> - iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); -ntoa(IpAddr) -> - iolist_to_binary(inet:ntoa(IpAddr)). +ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); +ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). -printable_maps(undefined) -> #{}; +printable_maps(undefined) -> + #{}; printable_maps(Headers) -> maps:fold( - fun (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname -> + fun + (K, V0, AccIn) when K =:= peerhost; K =:= peername; K =:= sockname -> AccIn#{K => ntoa(V0)}; ('User-Property', V0, AccIn) when is_list(V0) -> AccIn#{ 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [#{ - key => Key, - value => Value - } || {Key, Value} <- V0] + 'User-Property-Pairs' => [ + #{ + key => Key, + value => Value + } + || {Key, Value} <- V0 + ] }; - (K, V0, AccIn) -> AccIn#{K => V0} - end, #{}, Headers). + (K, V0, AccIn) -> + AccIn#{K => V0} + end, + #{}, + Headers + ). base_message(Message) -> #message{ @@ -230,7 +273,8 @@ base_message(Message) -> topic = Topic, headers = Headers, payload = Payload, - timestamp = Timestamp} = Message, + timestamp = Timestamp + } = Message, #{ id => emqx_guid:to_hexstr(Id), payload => Payload, diff --git a/apps/emqx_modules/src/emqx_event_message_api.erl b/apps/emqx_modules/src/emqx_event_message_api.erl index e25edff97..c6c71b804 100644 --- a/apps/emqx_modules/src/emqx_event_message_api.erl +++ b/apps/emqx_modules/src/emqx_event_message_api.erl @@ -21,10 +21,11 @@ -import(hoconsc, [mk/2, ref/2]). --export([ api_spec/0 - , paths/0 - , schema/1 - ]). +-export([ + api_spec/0, + paths/0, + schema/1 +]). -export([event_message/2]). @@ -35,28 +36,30 @@ paths() -> ["/mqtt/event_message"]. schema("/mqtt/event_message") -> - #{ 'operationId' => event_message - , get => - #{ description => <<"Event Message">> - , tags => ?API_TAG_MQTT - , responses => - #{200 => status_schema(<<"Get Event Message config successfully">>)} + #{ + 'operationId' => event_message, + get => + #{ + description => <<"Event Message">>, + tags => ?API_TAG_MQTT, + responses => + #{200 => status_schema(<<"Get Event Message config successfully">>)} + }, + put => + #{ + description => <<"Update Event Message">>, + tags => ?API_TAG_MQTT, + 'requestBody' => status_schema(<<"Update Event Message config">>), + responses => + #{200 => status_schema(<<"Update Event Message config successfully">>)} } - , put => - #{ description => <<"Update Event Message">> - , tags => ?API_TAG_MQTT - , 'requestBody' => status_schema(<<"Update Event Message config">>) - , responses => - #{200 => status_schema(<<"Update Event Message config successfully">>)} - } - }. + }. status_schema(Desc) -> mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}). event_message(get, _Params) -> {200, emqx_event_message:list()}; - event_message(put, #{body := Body}) -> case emqx_event_message:update(Body) of {ok, NewConfig} -> diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src index da85e79e7..9575cca28 100644 --- a/apps/emqx_modules/src/emqx_modules.app.src +++ b/apps/emqx_modules/src/emqx_modules.app.src @@ -1,10 +1,10 @@ %% -*- mode: erlang -*- -{application, emqx_modules, - [{description, "EMQX Modules"}, - {vsn, "5.0.0"}, - {modules, []}, - {applications, [kernel,stdlib,emqx]}, - {mod, {emqx_modules_app, []}}, - {registered, [emqx_modules_sup]}, - {env, []} - ]}. +{application, emqx_modules, [ + {description, "EMQX Modules"}, + {vsn, "5.0.0"}, + {modules, []}, + {applications, [kernel, stdlib, emqx]}, + {mod, {emqx_modules_app, []}}, + {registered, [emqx_modules_sup]}, + {env, []} +]}. diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 472480643..bf8efc8f6 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -18,9 +18,10 @@ -behaviour(application). --export([ start/2 - , stop/1 - ]). +-export([ + start/2, + stop/1 +]). start(_Type, _Args) -> {ok, Sup} = emqx_modules_sup:start_link(), diff --git a/apps/emqx_modules/src/emqx_modules_conf.erl b/apps/emqx_modules/src/emqx_modules_conf.erl index 5c9f51763..cc033daa1 100644 --- a/apps/emqx_modules/src/emqx_modules_conf.erl +++ b/apps/emqx_modules/src/emqx_modules_conf.erl @@ -20,20 +20,23 @@ -behaviour(emqx_config_handler). %% Load/Unload --export([ load/0 - , unload/0 - ]). +-export([ + load/0, + unload/0 +]). %% topci-metrics --export([ topic_metrics/0 - , add_topic_metrics/1 - , remove_topic_metrics/1 - ]). +-export([ + topic_metrics/0, + add_topic_metrics/1, + remove_topic_metrics/1 +]). %% config handlers --export([ pre_config_update/3 - , post_config_update/5 - ]). +-export([ + pre_config_update/3, + post_config_update/5 +]). %%-------------------------------------------------------------------- %% Load/Unload @@ -52,22 +55,22 @@ unload() -> -spec topic_metrics() -> [emqx_types:topic()]. topic_metrics() -> lists:map( - fun(#{topic := Topic}) -> Topic end, - emqx:get_config([topic_metrics]) - ). + fun(#{topic := Topic}) -> Topic end, + emqx:get_config([topic_metrics]) + ). --spec add_topic_metrics(emqx_types:topic()) - -> {ok, emqx_types:topic()} - | {error, term()}. +-spec add_topic_metrics(emqx_types:topic()) -> + {ok, emqx_types:topic()} + | {error, term()}. add_topic_metrics(Topic) -> case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of {ok, _} -> {ok, Topic}; {error, Reason} -> {error, Reason} end. --spec remove_topic_metrics(emqx_types:topic()) - -> ok - | {error, term()}. +-spec remove_topic_metrics(emqx_types:topic()) -> + ok + | {error, term()}. remove_topic_metrics(Topic) -> case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of {ok, _} -> ok; @@ -75,10 +78,13 @@ remove_topic_metrics(Topic) -> end. cfg_update(topic_metrics, Action, Params) -> - res(emqx_conf:update( - [topic_metrics], - {Action, Params}, - #{override_to => cluster})). + res( + emqx_conf:update( + [topic_metrics], + {Action, Params}, + #{override_to => cluster} + ) + ). res({ok, Result}) -> {ok, Result}; res({error, {pre_config_update, ?MODULE, Reason}}) -> {error, Reason}; @@ -89,9 +95,11 @@ res({error, Reason}) -> {error, Reason}. %% Config Handler %%-------------------------------------------------------------------- --spec pre_config_update(list(atom()), - emqx_config:update_request(), - emqx_config:raw_config()) -> +-spec pre_config_update( + list(atom()), + emqx_config:update_request(), + emqx_config:raw_config() +) -> {ok, emqx_config:update_request()} | {error, term()}. pre_config_update(_, {add_topic_metrics, Topic0}, RawConf) -> Topic = #{<<"topic">> => Topic0}, @@ -110,21 +118,33 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) -> {error, not_found} end. --spec post_config_update(list(atom()), - emqx_config:update_request(), - emqx_config:config(), - emqx_config:config(), emqx_config:app_envs()) - -> ok | {ok, Result::any()} | {error, Reason::term()}. +-spec post_config_update( + list(atom()), + emqx_config:update_request(), + emqx_config:config(), + emqx_config:config(), + emqx_config:app_envs() +) -> + ok | {ok, Result :: any()} | {error, Reason :: term()}. -post_config_update(_, {add_topic_metrics, Topic}, - _NewConfig, _OldConfig, _AppEnvs) -> +post_config_update( + _, + {add_topic_metrics, Topic}, + _NewConfig, + _OldConfig, + _AppEnvs +) -> case emqx_topic_metrics:register(Topic) of ok -> ok; {error, Reason} -> {error, Reason} end; - -post_config_update(_, {remove_topic_metrics, Topic}, - _NewConfig, _OldConfig, _AppEnvs) -> +post_config_update( + _, + {remove_topic_metrics, Topic}, + _NewConfig, + _OldConfig, + _AppEnvs +) -> case emqx_topic_metrics:deregister(Topic) of ok -> ok; {error, Reason} -> {error, Reason} diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index 3edded03f..58272a54a 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -20,82 +20,121 @@ -behaviour(hocon_schema). --export([ namespace/0 - , roots/0 - , fields/1]). +-export([ + namespace/0, + roots/0, + fields/1 +]). namespace() -> modules. roots() -> - ["delayed", - "telemetry", - "event_message", - array("rewrite"), - array("topic_metrics")]. + [ + "delayed", + "telemetry", + "event_message", + array("rewrite"), + array("topic_metrics") + ]. fields("telemetry") -> - [ {enable, hoconsc:mk(boolean(), #{default => false})} - ]; - + [{enable, hoconsc:mk(boolean(), #{default => false})}]; fields("delayed") -> - [ {enable, hoconsc:mk(boolean(), #{default => false})} - , {max_delayed_messages, sc(integer(), #{})} + [ + {enable, hoconsc:mk(boolean(), #{default => false})}, + {max_delayed_messages, sc(integer(), #{})} ]; - fields("rewrite") -> - [ { action - , sc( hoconsc:enum([subscribe, publish, all]) - , #{desc => <<"Action">>, example => publish})} - , { source_topic - , sc( binary() - , #{desc => <<"Origin Topic">>, example => "x/#"})} - , { dest_topic - , sc( binary() - , #{desc => <<"Destination Topic">>, example => "z/y/$1"})} - , { re, fun regular_expression/1 } + [ + {action, + sc( + hoconsc:enum([subscribe, publish, all]), + #{desc => <<"Action">>, example => publish} + )}, + {source_topic, + sc( + binary(), + #{desc => <<"Origin Topic">>, example => "x/#"} + )}, + {dest_topic, + sc( + binary(), + #{desc => <<"Destination Topic">>, example => "z/y/$1"} + )}, + {re, fun regular_expression/1} ]; - - fields("event_message") -> Fields = - [ { client_connected - , sc( boolean() - , #{desc => <<"Enable/disable client_connected event messages">>, - default => false})} - , { client_disconnected - , sc(boolean() - , #{desc => <<"Enable/disable client_disconnected event messages">>, - default => false})} - , { client_subscribed - , sc( boolean() - , #{desc => <<"Enable/disable client_subscribed event messages">>, - default => false})} - , { client_unsubscribed - , sc( boolean() - , #{desc => <<"Enable/disable client_unsubscribed event messages">>, - default => false})} - , { message_delivered - , sc( boolean() - , #{desc => <<"Enable/disable message_delivered event messages">>, - default => false})} - , { message_acked - , sc( boolean() - , #{desc => <<"Enable/disable message_acked event messages">>, - default => false})} - , { message_dropped - , sc( boolean() - , #{desc => <<"Enable/disable message_dropped event messages">>, - default => false})} + [ + {client_connected, + sc( + boolean(), + #{ + desc => <<"Enable/disable client_connected event messages">>, + default => false + } + )}, + {client_disconnected, + sc( + boolean(), + #{ + desc => <<"Enable/disable client_disconnected event messages">>, + default => false + } + )}, + {client_subscribed, + sc( + boolean(), + #{ + desc => <<"Enable/disable client_subscribed event messages">>, + default => false + } + )}, + {client_unsubscribed, + sc( + boolean(), + #{ + desc => <<"Enable/disable client_unsubscribed event messages">>, + default => false + } + )}, + {message_delivered, + sc( + boolean(), + #{ + desc => <<"Enable/disable message_delivered event messages">>, + default => false + } + )}, + {message_acked, + sc( + boolean(), + #{ + desc => <<"Enable/disable message_acked event messages">>, + default => false + } + )}, + {message_dropped, + sc( + boolean(), + #{ + desc => <<"Enable/disable message_dropped event messages">>, + default => false + } + )} ], - #{fields => Fields, - desc => """ -Enable/Disable system event messages. -The messages are published to '$event' prefixed topics. -For example, if `client_disconnected` is set to `true`, -a message is published to `$event/client_connected` topic -whenever a client is connected. -"""}; - + #{ + fields => Fields, + desc => + "" + "\n" + "Enable/Disable system event messages.\n" + "The messages are published to '$event' prefixed topics.\n" + "For example, if `client_disconnected` is set to `true`,\n" + "a message is published to `$event/client_connected` topic\n" + "whenever a client is connected.\n" + "" + }; fields("topic_metrics") -> [{topic, sc(binary(), #{})}]. diff --git a/apps/emqx_modules/src/emqx_modules_sup.erl b/apps/emqx_modules/src/emqx_modules_sup.erl index b92058c6d..a0d71bcbc 100644 --- a/apps/emqx_modules/src/emqx_modules_sup.erl +++ b/apps/emqx_modules/src/emqx_modules_sup.erl @@ -23,12 +23,14 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(Mod), #{id => Mod, - start => {Mod, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [Mod]}). +-define(CHILD(Mod), #{ + id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod] +}). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -38,8 +40,10 @@ start_link() -> %%-------------------------------------------------------------------- init([]) -> emqx_event_message:init_conf_handler(), - {ok, {{one_for_one, 10, 3600}, - [ ?CHILD(emqx_telemetry) - , ?CHILD(emqx_topic_metrics) - , ?CHILD(emqx_trace) - , ?CHILD(emqx_delayed)]}}. + {ok, + {{one_for_one, 10, 3600}, [ + ?CHILD(emqx_telemetry), + ?CHILD(emqx_topic_metrics), + ?CHILD(emqx_trace), + ?CHILD(emqx_delayed) + ]}}. diff --git a/apps/emqx_modules/src/emqx_observer_cli.erl b/apps/emqx_modules/src/emqx_observer_cli.erl index 16095b5d7..d5a2a0bfa 100644 --- a/apps/emqx_modules/src/emqx_observer_cli.erl +++ b/apps/emqx_modules/src/emqx_observer_cli.erl @@ -16,13 +16,13 @@ -module(emqx_observer_cli). --export([ enable/0 - , disable/0 - ]). +-export([ + enable/0, + disable/0 +]). -export([cmd/1]). - %%-------------------------------------------------------------------- %% enable/disable %%-------------------------------------------------------------------- @@ -34,20 +34,19 @@ disable() -> cmd(["status"]) -> observer_cli:start(); - cmd(["bin_leak"]) -> [emqx_ctl:print("~p~n", [Row]) || Row <- recon:bin_leak(100)]; - cmd(["load", Mod]) -> Module = list_to_existing_atom(Mod), Nodes = nodes(), Res = remote_load(Nodes, Module), emqx_ctl:print("Loaded ~p module on ~p on ~n", [Mod, Nodes, Res]); - cmd(_) -> - emqx_ctl:usage([{"observer status", "observer_cli:start()"}, - {"observer bin_leak", "recon:bin_leak(100)"}, - {"observer load Mod", "recon:remote_load(Mod) to all nodes"}]). + emqx_ctl:usage([ + {"observer status", "observer_cli:start()"}, + {"observer bin_leak", "recon:bin_leak(100)"}, + {"observer load Mod", "recon:remote_load(Mod) to all nodes"} + ]). %% recon:remote_load/1 has a bug, when nodes() returns [], it is %% taken by recon as a node name. diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index 5f8dd1ad0..efc392d3d 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -21,25 +21,29 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -ifdef(TEST). --export([ compile/1 - , match_and_rewrite/2 - ]). +-export([ + compile/1, + match_and_rewrite/2 +]). -endif. %% APIs --export([ rewrite_subscribe/4 - , rewrite_unsubscribe/4 - , rewrite_publish/2 - ]). +-export([ + rewrite_subscribe/4, + rewrite_unsubscribe/4, + rewrite_publish/2 +]). --export([ enable/0 - , disable/0 - ]). +-export([ + enable/0, + disable/0 +]). --export([ list/0 - , update/1 - , post_config_update/5 - ]). +-export([ + list/0, + update/1, + post_config_update/5 +]). %%-------------------------------------------------------------------- %% Load/Unload @@ -65,14 +69,16 @@ update(Rules0) -> post_config_update(_KeyPath, _Config, Rules, _OldConf, _AppEnvs) -> register_hook(Rules). -register_hook([]) -> unregister_hook(); +register_hook([]) -> + unregister_hook(); register_hook(Rules) -> {PubRules, SubRules, ErrRules} = compile(Rules), emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}), case ErrRules of - [] -> ok; + [] -> + ok; _ -> ?SLOG(error, #{rewrite_rule_re_complie_failed => ErrRules}), {error, ErrRules} @@ -96,39 +102,54 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) -> %% Internal functions %%-------------------------------------------------------------------- compile(Rules) -> - lists:foldl(fun(Rule, {Publish, Subscribe, Error}) -> - #{source_topic := Topic, re := Re, dest_topic := Dest, action := Action} = Rule, - case re:compile(Re) of - {ok, MP} -> - case Action of - publish -> - {[{Topic, MP, Dest} | Publish], Subscribe, Error}; - subscribe -> - {Publish, [{Topic, MP, Dest} | Subscribe], Error}; - all -> - {[{Topic, MP, Dest} | Publish], [{Topic, MP, Dest} | Subscribe], Error} - end; - {error, ErrSpec} -> - {Publish, Subscribe, [{Topic, Re, Dest, ErrSpec}]} - end end, {[], [], []}, Rules). + lists:foldl( + fun(Rule, {Publish, Subscribe, Error}) -> + #{source_topic := Topic, re := Re, dest_topic := Dest, action := Action} = Rule, + case re:compile(Re) of + {ok, MP} -> + case Action of + publish -> + {[{Topic, MP, Dest} | Publish], Subscribe, Error}; + subscribe -> + {Publish, [{Topic, MP, Dest} | Subscribe], Error}; + all -> + {[{Topic, MP, Dest} | Publish], [{Topic, MP, Dest} | Subscribe], Error} + end; + {error, ErrSpec} -> + {Publish, Subscribe, [{Topic, Re, Dest, ErrSpec}]} + end + end, + {[], [], []}, + Rules + ). match_and_rewrite(Topic, []) -> Topic; - match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules]) -> case emqx_topic:match(Topic, Filter) of - true -> rewrite(Topic, MP, Dest); + true -> rewrite(Topic, MP, Dest); false -> match_and_rewrite(Topic, Rules) end. rewrite(Topic, MP, Dest) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> - Vars = lists:zip(["\\$" ++ integer_to_list(I) - || I <- lists:seq(1, length(Captured))], Captured), - iolist_to_binary(lists:foldl( + Vars = lists:zip( + [ + "\\$" ++ integer_to_list(I) + || I <- lists:seq(1, length(Captured)) + ], + Captured + ), + iolist_to_binary( + lists:foldl( fun({Var, Val}, Acc) -> re:replace(Acc, Var, Val, [global]) - end, Dest, Vars)); - nomatch -> Topic + end, + Dest, + Vars + ) + ); + nomatch -> + Topic end. diff --git a/apps/emqx_modules/src/emqx_rewrite_api.erl b/apps/emqx_modules/src/emqx_rewrite_api.erl index d985d3fe5..264a36a06 100644 --- a/apps/emqx_modules/src/emqx_rewrite_api.erl +++ b/apps/emqx_modules/src/emqx_rewrite_api.erl @@ -40,27 +40,36 @@ schema("/mqtt/topic_rewrite") -> tags => ?API_TAG_MQTT, description => <<"List rewrite topic.">>, responses => #{ - 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), - #{desc => <<"List all rewrite rules">>}) + 200 => hoconsc:mk( + hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), + #{desc => <<"List all rewrite rules">>} + ) } }, put => #{ description => <<"Update rewrite topic">>, tags => ?API_TAG_MQTT, - 'requestBody' => hoconsc:mk(hoconsc:array( - hoconsc:ref(emqx_modules_schema, "rewrite")),#{}), + 'requestBody' => hoconsc:mk( + hoconsc:array( + hoconsc:ref(emqx_modules_schema, "rewrite") + ), + #{} + ), responses => #{ - 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), - #{desc => <<"Update rewrite topic success.">>}), - 413 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], - <<"Rules count exceed max limit">>) + 200 => hoconsc:mk( + hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")), + #{desc => <<"Update rewrite topic success.">>} + ), + 413 => emqx_dashboard_swagger:error_codes( + [?EXCEED_LIMIT], + <<"Rules count exceed max limit">> + ) } } }. topic_rewrite(get, _Params) -> {200, emqx_rewrite:list()}; - topic_rewrite(put, #{body := Body}) -> case length(Body) < ?MAX_RULES_LIMIT of true -> @@ -68,6 +77,7 @@ topic_rewrite(put, #{body := Body}) -> {200, emqx_rewrite:list()}; _ -> Message = iolist_to_binary( - io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])), + io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT]) + ), {413, #{code => ?EXCEED_LIMIT, message => Message}} end. diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 6fdccc3d6..5a935cbd2 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -26,28 +26,32 @@ -include("emqx_modules.hrl"). --export([ start_link/0 - , stop/0 - ]). +-export([ + start_link/0, + stop/0 +]). %% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_continue/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_continue/2, + handle_info/2, + terminate/2, + code_change/3 +]). --export([ enable/0 - , disable/0 - ]). +-export([ + enable/0, + disable/0 +]). --export([ get_uuid/0 - , get_telemetry/0 - , get_status/0 - ]). +-export([ + get_uuid/0, + get_telemetry/0, + get_status/0 +]). -export([official_version/1]). @@ -59,9 +63,10 @@ -compile(nowarn_export_all). -endif. --import(proplists, [ get_value/2 - , get_value/3 - ]). +-import(proplists, [ + get_value/2, + get_value/3 +]). -record(telemetry, { id :: non_neg_integer(), @@ -88,12 +93,16 @@ %%-------------------------------------------------------------------- start_link() -> - ok = mria:create_table(?TELEMETRY, - [{type, set}, - {storage, disc_copies}, - {local_content, true}, - {record_name, telemetry}, - {attributes, record_info(fields, telemetry)}]), + ok = mria:create_table( + ?TELEMETRY, + [ + {type, set}, + {storage, disc_copies}, + {local_content, true}, + {record_name, telemetry}, + {attributes, record_info(fields, telemetry)} + ] + ), _ = mria:wait_for_tables([?TELEMETRY]), Opts = emqx:get_config([telemetry], #{}), gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). @@ -127,18 +136,23 @@ get_telemetry() -> %% is very small, it should be safe to ignore. -dialyzer([{nowarn_function, [init/1]}]). init(_Opts) -> - UUID1 = case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of - [] -> - UUID = generate_uuid(), - mria:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID, - uuid = UUID}), - UUID; - [#telemetry{uuid = UUID} | _] -> - UUID - end, - {ok, #state{url = ?TELEMETRY_URL, - report_interval = timer:seconds(?REPORT_INTERVAL), - uuid = UUID1}}. + UUID1 = + case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of + [] -> + UUID = generate_uuid(), + mria:dirty_write(?TELEMETRY, #telemetry{ + id = ?UNIQUE_ID, + uuid = UUID + }), + UUID; + [#telemetry{uuid = UUID} | _] -> + UUID + end, + {ok, #state{ + url = ?TELEMETRY_URL, + report_interval = timer:seconds(?REPORT_INTERVAL), + uuid = UUID1 + }}. handle_call(enable, _From, State) -> case ?MODULE:official_version(emqx_app:get_release()) of @@ -148,7 +162,6 @@ handle_call(enable, _From, State) -> false -> {reply, {error, not_official_version}, State} end; - handle_call(disable, _From, State = #state{timer = Timer}) -> case ?MODULE:official_version(emqx_app:get_release()) of true -> @@ -157,13 +170,10 @@ handle_call(disable, _From, State = #state{timer = Timer}) -> false -> {reply, {error, not_official_version}, State} end; - handle_call(get_uuid, _From, State = #state{uuid = UUID}) -> {reply, {ok, UUID}, State}; - handle_call(get_telemetry, _From, State) -> {reply, {ok, get_telemetry(State)}, State}; - handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -182,7 +192,6 @@ handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer false -> ok end, {noreply, ensure_report_timer(State)}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -206,22 +215,35 @@ ensure_report_timer(State = #state{report_interval = ReportInterval}) -> os_info() -> case erlang:system_info(os_type) of - {unix,darwin} -> + {unix, darwin} -> [Name | _] = string:tokens(os:cmd("sw_vers -productName"), "\n"), [Version | _] = string:tokens(os:cmd("sw_vers -productVersion"), "\n"), - [{os_name, Name}, - {os_version, Version}]; + [ + {os_name, Name}, + {os_version, Version} + ]; {unix, _} -> case file:read_file("/etc/os-release") of {error, _} -> - [{os_name, "Unknown"}, - {os_version, "Unknown"}]; + [ + {os_name, "Unknown"}, + {os_version, "Unknown"} + ]; {ok, FileContent} -> OSInfo = parse_os_release(FileContent), - [{os_name, get_value("NAME", OSInfo)}, - {os_version, get_value("VERSION", OSInfo, - get_value("VERSION_ID", OSInfo, - get_value("PRETTY_NAME", OSInfo)))}] + [ + {os_name, get_value("NAME", OSInfo)}, + {os_version, + get_value( + "VERSION", + OSInfo, + get_value( + "VERSION_ID", + OSInfo, + get_value("PRETTY_NAME", OSInfo) + ) + )} + ] end; {win32, nt} -> Ver = os:cmd("ver"), @@ -231,11 +253,15 @@ os_info() -> {match, [Version]} = re:run(NVer, "([0-9]+[\.])+[0-9]+", [{capture, first, list}]), [Name | _] = string:split(NVer, " [Version "), - [{os_name, Name}, - {os_version, Version}]; + [ + {os_name, Name}, + {os_version, Version} + ]; nomatch -> - [{os_name, "Unknown"}, - {os_version, "Unknown"}] + [ + {os_name, "Unknown"}, + {os_version, "Unknown"} + ] end end. @@ -247,22 +273,30 @@ uptime() -> nodes_uuid() -> Nodes = lists:delete(node(), mria_mnesia:running_nodes()), - lists:foldl(fun(Node, Acc) -> - case emqx_telemetry_proto_v1:get_uuid(Node) of - {badrpc, _Reason} -> - Acc; - {ok, UUID} -> - [UUID | Acc] - end - end, [], Nodes). + lists:foldl( + fun(Node, Acc) -> + case emqx_telemetry_proto_v1:get_uuid(Node) of + {badrpc, _Reason} -> + Acc; + {ok, UUID} -> + [UUID | Acc] + end + end, + [], + Nodes + ). active_plugins() -> - lists:foldl(fun(#plugin{name = Name, active = Active}, Acc) -> - case Active of - true -> [Name | Acc]; - false -> Acc - end - end, [], emqx_plugins:list()). + lists:foldl( + fun(#plugin{name = Name, active = Active}, Acc) -> + case Active of + true -> [Name | Acc]; + false -> Acc + end + end, + [], + emqx_plugins:list() + ). num_clients() -> emqx_stats:getstat('connections.max'). @@ -282,25 +316,31 @@ generate_uuid() -> <> = <<16#01:4, TimeHigh:12>>, <> = <<1:1, 0:1, ClockSeq:14>>, <> = <>, - list_to_binary(io_lib:format( "~.16B-~.16B-~.16B-~.16B-~.16B" - , [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node])). + list_to_binary( + io_lib:format( + "~.16B-~.16B-~.16B-~.16B-~.16B", + [TimeLow, TimeMid, NTimeHigh, NClockSeq, Node] + ) + ). get_telemetry(#state{uuid = UUID}) -> OSInfo = os_info(), - [{emqx_version, bin(emqx_app:get_release())}, - {license, [{edition, <<"community">>}]}, - {os_name, bin(get_value(os_name, OSInfo))}, - {os_version, bin(get_value(os_version, OSInfo))}, - {otp_version, bin(otp_version())}, - {up_time, uptime()}, - {uuid, UUID}, - {nodes_uuid, nodes_uuid()}, - {active_plugins, active_plugins()}, - {num_clients, num_clients()}, - {messages_received, messages_received()}, - {messages_sent, messages_sent()}, - {build_info, build_info()}, - {vm_specs, vm_specs()}]. + [ + {emqx_version, bin(emqx_app:get_release())}, + {license, [{edition, <<"community">>}]}, + {os_name, bin(get_value(os_name, OSInfo))}, + {os_version, bin(get_value(os_version, OSInfo))}, + {otp_version, bin(otp_version())}, + {up_time, uptime()}, + {uuid, UUID}, + {nodes_uuid, nodes_uuid()}, + {active_plugins, active_plugins()}, + {num_clients, num_clients()}, + {messages_received, messages_received()}, + {messages_sent, messages_sent()}, + {build_info, build_info()}, + {vm_specs, vm_specs()} + ]. report_telemetry(State = #state{url = URL}) -> Data = get_telemetry(State), @@ -319,17 +359,21 @@ httpc_request(Method, URL, Headers, Body) -> httpc:request(Method, {URL, Headers, "application/json", Body}, HTTPOptions, Options). parse_os_release(FileContent) -> - lists:foldl(fun(Line, Acc) -> - [Var, Value] = string:tokens(Line, "="), - NValue = case Value of - _ when is_list(Value) -> - lists:nth(1, string:tokens(Value, "\"")); - _ -> - Value - end, - [{Var, NValue} | Acc] + lists:foldl( + fun(Line, Acc) -> + [Var, Value] = string:tokens(Line, "="), + NValue = + case Value of + _ when is_list(Value) -> + lists:nth(1, string:tokens(Value, "\"")); + _ -> + Value end, - [], string:tokens(binary:bin_to_list(FileContent), "\n")). + [{Var, NValue} | Acc] + end, + [], + string:tokens(binary:bin_to_list(FileContent), "\n") + ). build_info() -> case ?MODULE:read_raw_build_info() of @@ -342,14 +386,19 @@ build_info() -> end. read_raw_build_info() -> - Filename = filename:join([code:root_dir(), "releases", - emqx_app:get_release(), "BUILD_INFO"]), + Filename = filename:join([ + code:root_dir(), + "releases", + emqx_app:get_release(), + "BUILD_INFO" + ]), file:read_file(Filename). vm_specs() -> SysMemData = memsup:get_system_memory_data(), - [ {num_cpus, erlang:system_info(logical_processors)} - , {total_memory, proplists:get_value(available_memory, SysMemData)} + [ + {num_cpus, erlang:system_info(logical_processors)}, + {total_memory, proplists:get_value(available_memory, SysMemData)} ]. bin(L) when is_list(L) -> diff --git a/apps/emqx_modules/src/emqx_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl index 0cbffadea..85b25c9a9 100644 --- a/apps/emqx_modules/src/emqx_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -24,17 +24,19 @@ % -export([cli/1]). --export([ status/2 - , data/2 - ]). +-export([ + status/2, + data/2 +]). -export([enable_telemetry/2]). --export([ api_spec/0 - , paths/0 - , schema/1 - , fields/1 - ]). +-export([ + api_spec/0, + paths/0, + schema/1, + fields/1 +]). -define(BAD_REQUEST, 'BAD_REQUEST'). @@ -42,126 +44,166 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> - [ "/telemetry/status" - , "/telemetry/data" + [ + "/telemetry/status", + "/telemetry/data" ]. schema("/telemetry/status") -> - #{ 'operationId' => status, - get => - #{ description => <<"Get telemetry status">> - , responses => - #{ 200 => status_schema(<<"Get telemetry status">>)} + #{ + 'operationId' => status, + get => + #{ + description => <<"Get telemetry status">>, + responses => + #{200 => status_schema(<<"Get telemetry status">>)} }, - put => - #{ description => <<"Enable or disable telemetry">> - , 'requestBody' => status_schema(<<"Enable or disable telemetry">>) - , responses => - #{ 200 => status_schema(<<"Enable or disable telemetry successfully">>) - , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>) - } + put => + #{ + description => <<"Enable or disable telemetry">>, + 'requestBody' => status_schema(<<"Enable or disable telemetry">>), + responses => + #{ + 200 => status_schema(<<"Enable or disable telemetry successfully">>), + 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>) + } } - }; + }; schema("/telemetry/data") -> - #{ 'operationId' => data, - get => - #{ description => <<"Get telemetry data">> - , responses => - #{ 200 => mk(ref(?MODULE, telemetry), #{ desc => <<"Get telemetry data">>})}} - }. + #{ + 'operationId' => data, + get => + #{ + description => <<"Get telemetry data">>, + responses => + #{200 => mk(ref(?MODULE, telemetry), #{desc => <<"Get telemetry data">>})} + } + }. status_schema(Desc) -> mk(ref(?MODULE, status), #{in => body, desc => Desc}). fields(status) -> - [ { enable - , mk( boolean() - , #{ desc => <<"Telemetry status">> - , default => true - , example => false - }) - } + [ + {enable, + mk( + boolean(), + #{ + desc => <<"Telemetry status">>, + default => true, + example => false + } + )} ]; fields(telemetry) -> - [ { emqx_version - , mk( string() - , #{ desc => <<"EMQX Version">> - , example => <<"5.0.0-beta.3-32d1547c">> - }) - } - , { license - , mk( map() - , #{ desc => <<"EMQX License">> - , example => #{edition => <<"community">>} - }) - } - , { os_name - , mk( string() - , #{ desc => <<"OS Name">> - , example => <<"Linux">> - }) - } - , { os_version - , mk( string() - , #{ desc => <<"OS Version">> - , example => <<"20.04">> - }) - } - , { otp_version - , mk( string() - , #{ desc => <<"Erlang/OTP Version">> - , example => <<"24">> - }) - } - , { up_time - , mk( integer() - , #{ desc => <<"EMQX Runtime">> - , example => 20220113 - }) - } - , { uuid - , mk( string() - , #{ desc => <<"EMQX UUID">> - , example => <<"AAAAAAAA-BBBB-CCCC-2022-DDDDEEEEFFF">> - }) - } - , { nodes_uuid - , mk( array(binary()) - , #{ desc => <<"EMQX Cluster Nodes UUID">> - , example => [ <<"AAAAAAAA-BBBB-CCCC-2022-DDDDEEEEFFF">> - , <<"ZZZZZZZZ-CCCC-BBBB-2022-DDDDEEEEFFF">>] - }) - } - , { active_plugins - , mk( array(binary()) - , #{ desc => <<"EMQX Active Plugins">> - , example => [<<"Plugin A">>, <<"Plugin B">>] - }) - } - , { active_modules - , mk( array(binary()) - , #{ desc => <<"EMQX Active Modules">> - , example => [<<"Module A">>, <<"Module B">>] - }) - } - , { num_clients - , mk( integer() - , #{ desc => <<"EMQX Current Connections">> - , example => 20220113 - }) - } - , { messages_received - , mk( integer() - , #{ desc => <<"EMQX Current Received Message">> - , example => 2022 - }) - } - , { messages_sent - , mk( integer() - , #{ desc => <<"EMQX Current Sent Message">> - , example => 2022 - }) - } + [ + {emqx_version, + mk( + string(), + #{ + desc => <<"EMQX Version">>, + example => <<"5.0.0-beta.3-32d1547c">> + } + )}, + {license, + mk( + map(), + #{ + desc => <<"EMQX License">>, + example => #{edition => <<"community">>} + } + )}, + {os_name, + mk( + string(), + #{ + desc => <<"OS Name">>, + example => <<"Linux">> + } + )}, + {os_version, + mk( + string(), + #{ + desc => <<"OS Version">>, + example => <<"20.04">> + } + )}, + {otp_version, + mk( + string(), + #{ + desc => <<"Erlang/OTP Version">>, + example => <<"24">> + } + )}, + {up_time, + mk( + integer(), + #{ + desc => <<"EMQX Runtime">>, + example => 20220113 + } + )}, + {uuid, + mk( + string(), + #{ + desc => <<"EMQX UUID">>, + example => <<"AAAAAAAA-BBBB-CCCC-2022-DDDDEEEEFFF">> + } + )}, + {nodes_uuid, + mk( + array(binary()), + #{ + desc => <<"EMQX Cluster Nodes UUID">>, + example => [ + <<"AAAAAAAA-BBBB-CCCC-2022-DDDDEEEEFFF">>, + <<"ZZZZZZZZ-CCCC-BBBB-2022-DDDDEEEEFFF">> + ] + } + )}, + {active_plugins, + mk( + array(binary()), + #{ + desc => <<"EMQX Active Plugins">>, + example => [<<"Plugin A">>, <<"Plugin B">>] + } + )}, + {active_modules, + mk( + array(binary()), + #{ + desc => <<"EMQX Active Modules">>, + example => [<<"Module A">>, <<"Module B">>] + } + )}, + {num_clients, + mk( + integer(), + #{ + desc => <<"EMQX Current Connections">>, + example => 20220113 + } + )}, + {messages_received, + mk( + integer(), + #{ + desc => <<"EMQX Current Received Message">>, + example => 2022 + } + )}, + {messages_sent, + mk( + integer(), + #{ + desc => <<"EMQX Current Sent Message">>, + example => 2022 + } + )} ]. %%-------------------------------------------------------------------- @@ -169,15 +211,15 @@ fields(telemetry) -> %%-------------------------------------------------------------------- status(get, _Params) -> {200, get_telemetry_status()}; - status(put, #{body := Body}) -> Enable = maps:get(<<"enable">>, Body), case Enable =:= emqx_telemetry:get_status() of true -> - Reason = case Enable of - true -> <<"Telemetry status is already enabled">>; - false -> <<"Telemetry status is already disable">> - end, + Reason = + case Enable of + true -> <<"Telemetry status is already enabled">>; + false -> <<"Telemetry status is already disable">> + end, {400, #{code => 'BAD_REQUEST', message => Reason}}; false -> enable_telemetry(Enable), @@ -231,9 +273,12 @@ data(get, _Request) -> %% internal function %%-------------------------------------------------------------------- enable_telemetry(Enable) -> - lists:foreach(fun(Node) -> - enable_telemetry(Node, Enable) - end, mria_mnesia:running_nodes()). + lists:foreach( + fun(Node) -> + enable_telemetry(Node, Enable) + end, + mria_mnesia:running_nodes() + ). enable_telemetry(Node, true) -> is_ok(emqx_telemetry_proto_v1:enable_telemetry(Node)); diff --git a/apps/emqx_modules/src/emqx_topic_metrics.erl b/apps/emqx_modules/src/emqx_topic_metrics.erl index d6236eb68..5aa04db07 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics.erl @@ -22,41 +22,45 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). - --export([ on_message_publish/1 - , on_message_delivered/2 - , on_message_dropped/3 - ]). +-export([ + on_message_publish/1, + on_message_delivered/2, + on_message_dropped/3 +]). %% API functions --export([ start_link/0 - , stop/0 - ]). +-export([ + start_link/0, + stop/0 +]). --export([ enable/0 - , disable/0 - ]). +-export([ + enable/0, + disable/0 +]). --export([ max_limit/0]). +-export([max_limit/0]). --export([ metrics/0 - , metrics/1 - , register/1 - , deregister/1 - , deregister_all/0 - , is_registered/1 - , all_registered_topics/0 - , reset/0 - , reset/1 - ]). +-export([ + metrics/0, + metrics/1, + register/1, + deregister/1, + deregister_all/0, + is_registered/1, + all_registered_topics/0, + reset/0, + reset/1 +]). %% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_info/2 - , handle_cast/2 - , terminate/2 - ]). +-export([ + init/1, + handle_call/3, + handle_info/2, + handle_cast/2, + terminate/2 +]). -ifdef(TEST). -compile(export_all). @@ -66,17 +70,17 @@ -define(MAX_TOPICS, 512). -define(TAB, ?MODULE). --define(TOPIC_METRICS, - ['messages.in', - 'messages.out', - 'messages.qos0.in', - 'messages.qos0.out', - 'messages.qos1.in', - 'messages.qos1.out', - 'messages.qos2.in', - 'messages.qos2.out', - 'messages.dropped' - ]). +-define(TOPIC_METRICS, [ + 'messages.in', + 'messages.out', + 'messages.qos0.in', + 'messages.qos0.out', + 'messages.qos1.in', + 'messages.qos1.out', + 'messages.qos2.in', + 'messages.qos2.out', + 'messages.dropped' +]). -define(TICKING_INTERVAL, 1). -define(SPEED_AVERAGE_WINDOW_SIZE, 5). @@ -84,15 +88,15 @@ -define(SPEED_LONG_WINDOW_SIZE, 300). -record(speed, { - last = 0 :: number(), - last_v = 0 :: number(), - last_medium = 0 :: number(), - last_long = 0 :: number() - }). + last = 0 :: number(), + last_v = 0 :: number(), + last_medium = 0 :: number(), + last_long = 0 :: number() +}). -record(state, { - speeds :: #{{binary(), atom()} => #speed{}} - }). + speeds :: #{{binary(), atom()} => #speed{}} +}). %%------------------------------------------------------------------------------ %% APIs @@ -217,34 +221,33 @@ handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) -> Error -> {reply, Error, State} end; - handle_call({deregister, all}, _From, State) -> true = ets:delete_all_objects(?TAB), {reply, ok, State#state{speeds = #{}}}; - handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) -> case is_registered(Topic) of false -> {reply, {error, topic_not_found}, State}; true -> true = ets:delete(?TAB, Topic), - NSpeeds = lists:foldl(fun(Metric, Acc) -> - maps:remove({Topic, Metric}, Acc) - end, Speeds, ?TOPIC_METRICS), + NSpeeds = lists:foldl( + fun(Metric, Acc) -> + maps:remove({Topic, Metric}, Acc) + end, + Speeds, + ?TOPIC_METRICS + ), {reply, ok, State#state{speeds = NSpeeds}} end; - handle_call({reset, all}, _From, State = #state{speeds = Speeds}) -> Fun = fun(T, NSpeeds) -> reset_topic(T, NSpeeds) end, {reply, ok, State#state{speeds = lists:foldl(Fun, Speeds, ets:tab2list(?TAB))}}; - handle_call({reset, Topic}, _From, State = #state{speeds = Speeds}) -> NSpeeds = reset_topic(Topic, Speeds), {reply, ok, State#state{speeds = NSpeeds}}; - handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) -> case is_registered(Topic) of false -> @@ -253,8 +256,8 @@ handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) case maps:get({Topic, Metric}, Speeds, undefined) of undefined -> {reply, {error, invalid_metric}, State}; - #speed{last = Short, last_medium = Medium, last_long = Long} -> - {reply, #{short => Short, medium => Medium, long => Long }, State} + #speed{last = Short, last_medium = Medium, last_long = Long} -> + {reply, #{short => Short, medium => Medium, long => Long}, State} end end. @@ -264,15 +267,16 @@ handle_cast(Msg, State) -> handle_info(ticking, State = #state{speeds = Speeds}) -> NSpeeds = maps:map( - fun({Topic, Metric}, Speed) -> - case val(Topic, Metric) of - {error, topic_not_found} -> maps:remove({Topic, Metric}, Speeds); - Val -> calculate_speed(Val, Speed) - end - end, Speeds), + fun({Topic, Metric}, Speed) -> + case val(Topic, Metric) of + {error, topic_not_found} -> maps:remove({Topic, Metric}, Speeds); + Val -> calculate_speed(Val, Speed) + end + end, + Speeds + ), erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking), {noreply, State#state{speeds = NSpeeds}}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -309,11 +313,16 @@ do_register(Topic, Speeds) -> ok = reset_counter(CRef), Data = #{ counter_ref => CRef, - create_time => CreateTime}, + create_time => CreateTime + }, true = ets:insert(?TAB, {Topic, Data}), - NSpeeds = lists:foldl(fun(Metric, Acc) -> - maps:put({Topic, Metric}, #speed{}, Acc) - end, Speeds, ?TOPIC_METRICS), + NSpeeds = lists:foldl( + fun(Metric, Acc) -> + maps:put({Topic, Metric}, #speed{}, Acc) + end, + Speeds, + ?TOPIC_METRICS + ), {ok, NSpeeds}; {true, true} -> {error, bad_topic}; @@ -329,9 +338,9 @@ format({Topic, Data}) -> Fun = fun(Key, Metrics) -> CounterKey = to_count(Key), - Counter = counters:get(CRef, metric_idx(Key)), - RateKey = to_rate(Key), - Rate = emqx_rule_funcs:float(rate(Topic, Key), 4), + Counter = counters:get(CRef, metric_idx(Key)), + RateKey = to_rate(Key), + Rate = emqx_rule_funcs:float(rate(Topic, Key), 4), maps:put(RateKey, Rate, maps:put(CounterKey, Counter, Metrics)) end, Metrics = lists:foldl(Fun, #{}, ?TOPIC_METRICS), @@ -390,17 +399,16 @@ rate(Topic, Metric) -> {error, Reason} end. -metric_idx('messages.in') -> 01; -metric_idx('messages.out') -> 02; -metric_idx('messages.qos0.in') -> 03; +metric_idx('messages.in') -> 01; +metric_idx('messages.out') -> 02; +metric_idx('messages.qos0.in') -> 03; metric_idx('messages.qos0.out') -> 04; -metric_idx('messages.qos1.in') -> 05; +metric_idx('messages.qos1.in') -> 05; metric_idx('messages.qos1.out') -> 06; -metric_idx('messages.qos2.in') -> 07; +metric_idx('messages.qos2.in') -> 07; metric_idx('messages.qos2.out') -> 08; -metric_idx('messages.dropped') -> 09; -metric_idx(_) -> - {error, invalid_metric}. +metric_idx('messages.dropped') -> 09; +metric_idx(_) -> {error, invalid_metric}. to_count('messages.in') -> 'messages.in.count'; @@ -456,7 +464,8 @@ counters_size() -> number_of_registered_topics() -> proplists:get_value(size, ets:info(?TAB)). -calculate_speed(CurVal, #speed{last = Last, +calculate_speed(CurVal, #speed{ + last = Last, last_v = LastVal, last_medium = LastMedium, last_long = LastLong diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index 9e8223d04..e98be4b42 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -21,25 +21,33 @@ -include_lib("typerefl/include/types.hrl"). -include("emqx_modules.hrl"). --import( hoconsc - , [ mk/2 - , ref/1 - , ref/2 - , array/1 - , map/2]). +-import( + hoconsc, + [ + mk/2, + ref/1, + ref/2, + array/1, + map/2 + ] +). --export([ topic_metrics/2 - , operate_topic_metrics/2 - ]). +-export([ + topic_metrics/2, + operate_topic_metrics/2 +]). --export([ cluster_accumulation_metrics/0 - , cluster_accumulation_metrics/1]). +-export([ + cluster_accumulation_metrics/0, + cluster_accumulation_metrics/1 +]). --export([ api_spec/0 - , paths/0 - , schema/1 - , fields/1 - ]). +-export([ + api_spec/0, + paths/0, + schema/1, + fields/1 +]). -define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(BAD_TOPIC, 'BAD_TOPIC'). @@ -50,182 +58,306 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> - [ "/mqtt/topic_metrics" - , "/mqtt/topic_metrics/:topic" + [ + "/mqtt/topic_metrics", + "/mqtt/topic_metrics/:topic" ]. - schema("/mqtt/topic_metrics") -> - #{ 'operationId' => topic_metrics - , get => - #{ description => <<"List topic metrics">> - , tags => ?API_TAG_MQTT - , responses => - #{200 => - mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List all topic metrics">>})} + #{ + 'operationId' => topic_metrics, + get => + #{ + description => <<"List topic metrics">>, + tags => ?API_TAG_MQTT, + responses => + #{ + 200 => + mk(array(hoconsc:ref(topic_metrics)), #{ + desc => <<"List all topic metrics">> + }) + } + }, + put => + #{ + description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">>, + tags => ?API_TAG_MQTT, + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + ref(reset), + reset_examples() + ), + responses => + #{ + 204 => <<"Reset topic metrics successfully">>, + 404 => + emqx_dashboard_swagger:error_codes( + [?TOPIC_NOT_FOUND], <<"Topic not found">> + ) + } + }, + post => + #{ + description => <<"Create topic metrics">>, + tags => ?API_TAG_MQTT, + 'requestBody' => [topic(body)], + responses => + #{ + 204 => <<"Create topic metrics success">>, + 409 => emqx_dashboard_swagger:error_codes( + [?EXCEED_LIMIT], + <<"Topic metrics exceeded max limit 512">> + ), + 400 => emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST, ?BAD_TOPIC], + <<"Topic metrics already existed or bad topic">> + ) + } } - , put => - #{ description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">> - , tags => ?API_TAG_MQTT - , 'requestBody' => emqx_dashboard_swagger:schema_with_examples( - ref(reset), - reset_examples()) - , responses => - #{ 204 => <<"Reset topic metrics successfully">> - , 404 => - emqx_dashboard_swagger:error_codes( - [?TOPIC_NOT_FOUND], <<"Topic not found">>) - } - } - , post => - #{ description => <<"Create topic metrics">> - , tags => ?API_TAG_MQTT - , 'requestBody' => [topic(body)] - , responses => - #{ 204 => <<"Create topic metrics success">> - , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], - <<"Topic metrics exceeded max limit 512">>) - , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], - <<"Topic metrics already existed or bad topic">>) - } - } - }; + }; schema("/mqtt/topic_metrics/:topic") -> - #{ 'operationId' => operate_topic_metrics - , get => - #{ description => <<"Get topic metrics">> - , tags => ?API_TAG_MQTT - , parameters => [topic(path)] - , responses => - #{ 200 => mk(ref(topic_metrics), #{ desc => <<"Topic metrics">> }) - , 404 => emqx_dashboard_swagger:error_codes([?TOPIC_NOT_FOUND], - <<"Topic not found">>) - } + #{ + 'operationId' => operate_topic_metrics, + get => + #{ + description => <<"Get topic metrics">>, + tags => ?API_TAG_MQTT, + parameters => [topic(path)], + responses => + #{ + 200 => mk(ref(topic_metrics), #{desc => <<"Topic metrics">>}), + 404 => emqx_dashboard_swagger:error_codes( + [?TOPIC_NOT_FOUND], + <<"Topic not found">> + ) + } + }, + delete => + #{ + description => <<"Remove the topic metrics">>, + tags => ?API_TAG_MQTT, + parameters => [topic(path)], + responses => + #{ + 204 => <<"Removed topic metrics successfully">>, + 404 => emqx_dashboard_swagger:error_codes( + [?TOPIC_NOT_FOUND], + <<"Topic not found">> + ) + } } - , delete => - #{ description => <<"Remove the topic metrics">> - , tags => ?API_TAG_MQTT - , parameters => [topic(path)] - , responses => - #{ 204 => <<"Removed topic metrics successfully">>, - 404 => emqx_dashboard_swagger:error_codes([?TOPIC_NOT_FOUND], - <<"Topic not found">>) - } - } - }. + }. fields(reset) -> - [ {topic - , mk( binary() - , #{ desc => - <<"Topic Name. If this parameter is not present," - " all created topic metrics will be reset">> - , example => <<"testtopic/1">> - , required => false})} - , {action - , mk( string() - , #{ desc => <<"Action Name. Only as a \"reset\"">> - , enum => [reset] - , required => true - , example => <<"reset">>})} + [ + {topic, + mk( + binary(), + #{ + desc => + << + "Topic Name. If this parameter is not present," + " all created topic metrics will be reset" + >>, + example => <<"testtopic/1">>, + required => false + } + )}, + {action, + mk( + string(), + #{ + desc => <<"Action Name. Only as a \"reset\"">>, + enum => [reset], + required => true, + example => <<"reset">> + } + )} ]; - fields(topic_metrics) -> - [ { topic - , mk( binary() - , #{ desc => <<"Topic Name">> - , example => <<"testtopic/1">> - , required => true})}, - { create_time - , mk( emqx_datetime:epoch_second() - , #{ desc => <<"Topic Metrics created date time, in rfc3339">> - , required => true - , example => <<"2022-01-14T21:48:47+08:00">>})}, - { reset_time - , mk( emqx_datetime:epoch_second() - , #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reset">> - , required => false - , example => <<"2022-01-14T21:48:47+08:00">>})}, - { metrics - , mk( ref(metrics) - , #{ desc => <<"Topic Metrics fields">> - , required => true}) - } + [ + {topic, + mk( + binary(), + #{ + desc => <<"Topic Name">>, + example => <<"testtopic/1">>, + required => true + } + )}, + {create_time, + mk( + emqx_datetime:epoch_second(), + #{ + desc => <<"Topic Metrics created date time, in rfc3339">>, + required => true, + example => <<"2022-01-14T21:48:47+08:00">> + } + )}, + {reset_time, + mk( + emqx_datetime:epoch_second(), + #{ + desc => + <<"Topic Metrics reset date time, in rfc3339. Nullable if never reset">>, + required => false, + example => <<"2022-01-14T21:48:47+08:00">> + } + )}, + {metrics, + mk( + ref(metrics), + #{ + desc => <<"Topic Metrics fields">>, + required => true + } + )} ]; - fields(metrics) -> - [ {'messages.dropped.count', mk(integer(), - #{ desc => <<"Message dropped count">> - , example => 0 - })} - , {'messages.dropped.rate', mk(number(), - #{ desc => <<"Message dropped rate in 5s">> - , example => 0 - })} - , {'messages.in.count', mk(integer(), - #{ desc => <<"Message received count">> - , example => 0 - })} - , {'messages.in.rate', mk(number(), - #{ desc => <<"Message received rate in 5s">> - , example => 0 - })} - , {'messages.out.count', mk(integer(), - #{ desc => <<"Message sent count">> - , example => 0 - })} - , {'messages.out.rate', mk(number(), - #{ desc => <<"Message sent rate in 5s">> - , example => 0 - })} - , {'messages.qos0.in.count', mk(integer(), - #{ desc => <<"Message with QoS 0 received count">> - , example => 0 - })} - , {'messages.qos0.in.rate', mk(number(), - #{ desc => <<"Message with QoS 0 received rate in 5s">> - , example => 0 - })} - , {'messages.qos0.out.count', mk(integer(), - #{ desc => <<"Message with QoS 0 sent count">> - , example => 0 - })} - , {'messages.qos0.out.rate', mk(number(), - #{ desc => <<"Message with QoS 0 sent rate in 5s">> - , example => 0 - })} - , {'messages.qos1.in.count', mk(integer(), - #{ desc => <<"Message with QoS 1 received count">> - , example => 0 - })} - , {'messages.qos1.in.rate', mk(number(), - #{ desc => <<"Message with QoS 1 received rate in 5s">> - , example => 0 - })} - , {'messages.qos1.out.count', mk(integer(), - #{ desc => <<"Message with QoS 1 sent count">> - , example => 0 - })} - , {'messages.qos1.out.rate', mk(number(), - #{ desc => <<"Message with QoS 1 sent rate in 5s">> - , example => 0 - })} - , {'messages.qos2.in.count', mk(integer(), - #{ desc => <<"Message with QoS 2 sent count">> - , example => 0 - })} - , {'messages.qos2.in.rate', mk(number(), - #{ desc => <<"Message with QoS 2 received rate in 5s">> - , example => 0 - })} - , {'messages.qos2.out.count', mk(integer(), - #{ desc => <<"Message with QoS 2 sent count">> - , example => 0 - })} - , {'messages.qos2.out.rate', mk(number(), - #{ desc => <<"Message with QoS 2 sent rate in 5s">> - , example => 0 - })} + [ + {'messages.dropped.count', + mk( + integer(), + #{ + desc => <<"Message dropped count">>, + example => 0 + } + )}, + {'messages.dropped.rate', + mk( + number(), + #{ + desc => <<"Message dropped rate in 5s">>, + example => 0 + } + )}, + {'messages.in.count', + mk( + integer(), + #{ + desc => <<"Message received count">>, + example => 0 + } + )}, + {'messages.in.rate', + mk( + number(), + #{ + desc => <<"Message received rate in 5s">>, + example => 0 + } + )}, + {'messages.out.count', + mk( + integer(), + #{ + desc => <<"Message sent count">>, + example => 0 + } + )}, + {'messages.out.rate', + mk( + number(), + #{ + desc => <<"Message sent rate in 5s">>, + example => 0 + } + )}, + {'messages.qos0.in.count', + mk( + integer(), + #{ + desc => <<"Message with QoS 0 received count">>, + example => 0 + } + )}, + {'messages.qos0.in.rate', + mk( + number(), + #{ + desc => <<"Message with QoS 0 received rate in 5s">>, + example => 0 + } + )}, + {'messages.qos0.out.count', + mk( + integer(), + #{ + desc => <<"Message with QoS 0 sent count">>, + example => 0 + } + )}, + {'messages.qos0.out.rate', + mk( + number(), + #{ + desc => <<"Message with QoS 0 sent rate in 5s">>, + example => 0 + } + )}, + {'messages.qos1.in.count', + mk( + integer(), + #{ + desc => <<"Message with QoS 1 received count">>, + example => 0 + } + )}, + {'messages.qos1.in.rate', + mk( + number(), + #{ + desc => <<"Message with QoS 1 received rate in 5s">>, + example => 0 + } + )}, + {'messages.qos1.out.count', + mk( + integer(), + #{ + desc => <<"Message with QoS 1 sent count">>, + example => 0 + } + )}, + {'messages.qos1.out.rate', + mk( + number(), + #{ + desc => <<"Message with QoS 1 sent rate in 5s">>, + example => 0 + } + )}, + {'messages.qos2.in.count', + mk( + integer(), + #{ + desc => <<"Message with QoS 2 sent count">>, + example => 0 + } + )}, + {'messages.qos2.in.rate', + mk( + number(), + #{ + desc => <<"Message with QoS 2 received rate in 5s">>, + example => 0 + } + )}, + {'messages.qos2.out.count', + mk( + integer(), + #{ + desc => <<"Message with QoS 2 sent count">>, + example => 0 + } + )}, + {'messages.qos2.out.rate', + mk( + number(), + #{ + desc => <<"Message with QoS 2 sent rate in 5s">>, + example => 0 + } + )} ]. topic(In) -> @@ -235,30 +367,35 @@ topic(In) -> path -> Desc = <<"Notice: Topic string in url path must be encoded">> end, - { topic - , mk( binary(), - #{ desc => Desc - , required => true - , in => In - , example => <<"testtopic/1">> - }) - }. + {topic, + mk( + binary(), + #{ + desc => Desc, + required => true, + in => In, + example => <<"testtopic/1">> + } + )}. reset_examples() -> - #{ reset_specific_one_topic_metrics => - #{ summary => <<"reset_specific_one_topic_metrics">> - , value => - #{ topic => "testtopic/1" - , action => "reset" - } + #{ + reset_specific_one_topic_metrics => + #{ + summary => <<"reset_specific_one_topic_metrics">>, + value => + #{ + topic => "testtopic/1", + action => "reset" + } + }, + reset_all_topic_metrics => + #{ + summary => <<"reset_all_topic_metrics">>, + value => + #{action => "reset"} } - , reset_all_topic_metrics => - #{ summary => <<"reset_all_topic_metrics">> - , value => - #{ action => "reset" - } - } - }. + }. %%-------------------------------------------------------------------- %% HTTP Callbacks @@ -266,7 +403,6 @@ reset_examples() -> topic_metrics(get, _) -> get_cluster_response([]); - topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) -> case reset(Topic) of ok -> @@ -277,7 +413,6 @@ topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">> topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) -> reset(), get_cluster_response([]); - topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) -> {400, 'BAD_REQUEST', <<"Topic can not be empty">>}; topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> @@ -290,7 +425,6 @@ topic_metrics(post, #{body := #{<<"topic">> := Topic}}) -> operate_topic_metrics(get, #{bindings := #{topic := Topic}}) -> get_cluster_response([Topic]); - operate_topic_metrics(delete, #{bindings := #{topic := Topic}}) -> case emqx_modules_conf:remove_topic_metrics(Topic) of ok -> {204}; @@ -314,12 +448,19 @@ cluster_accumulation_metrics(Topic) -> Nodes = mria_mnesia:running_nodes(), case emqx_topic_metrics_proto_v1:metrics(Nodes, Topic) of {SuccResList, []} -> - case lists:filter(fun({error, _}) -> false; (_) -> true - end, SuccResList) of + case + lists:filter( + fun + ({error, _}) -> false; + (_) -> true + end, + SuccResList + ) + of [] -> {error, topic_not_found}; TopicMetrics -> - NTopicMetrics = [ [T] || T <- TopicMetrics], + NTopicMetrics = [[T] || T <- TopicMetrics], [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics), {ok, AccMetrics} end; @@ -328,42 +469,72 @@ cluster_accumulation_metrics(Topic) -> end. accumulate_nodes_metrics(NodesTopicMetrics) -> - AccMap = lists:foldl(fun(TopicMetrics, ExAcc) -> - MetricsMap = lists:foldl( - fun(#{topic := Topic, - metrics := Metrics, - create_time := CreateTime}, Acc) -> - Acc#{Topic => {Metrics, CreateTime}} - end, #{}, TopicMetrics), - accumulate_metrics(MetricsMap, ExAcc) - end, #{}, NodesTopicMetrics), - maps:fold(fun(Topic, {Metrics, CreateTime1}, Acc1) -> - [#{topic => Topic, - metrics => Metrics, - create_time => CreateTime1} | Acc1] - end, [], AccMap). + AccMap = lists:foldl( + fun(TopicMetrics, ExAcc) -> + MetricsMap = lists:foldl( + fun( + #{ + topic := Topic, + metrics := Metrics, + create_time := CreateTime + }, + Acc + ) -> + Acc#{Topic => {Metrics, CreateTime}} + end, + #{}, + TopicMetrics + ), + accumulate_metrics(MetricsMap, ExAcc) + end, + #{}, + NodesTopicMetrics + ), + maps:fold( + fun(Topic, {Metrics, CreateTime1}, Acc1) -> + [ + #{ + topic => Topic, + metrics => Metrics, + create_time => CreateTime1 + } + | Acc1 + ] + end, + [], + AccMap + ). %% @doc TopicMetricsIn :: #{<<"topic">> := {Metrics, CreateTime}} accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) -> Topics = maps:keys(TopicMetricsIn), - lists:foldl(fun(Topic, Acc) -> - {Metrics, CreateTime} = maps:get(Topic, TopicMetricsIn), - NMetrics = do_accumulation_metrics( - Metrics, - maps:get(Topic, TopicMetricsAcc, undefined) - ), - maps:put(Topic, {NMetrics, CreateTime}, Acc) - end, #{}, Topics). + lists:foldl( + fun(Topic, Acc) -> + {Metrics, CreateTime} = maps:get(Topic, TopicMetricsIn), + NMetrics = do_accumulation_metrics( + Metrics, + maps:get(Topic, TopicMetricsAcc, undefined) + ), + maps:put(Topic, {NMetrics, CreateTime}, Acc) + end, + #{}, + Topics + ). %% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...} -do_accumulation_metrics(MetricsIn, undefined) -> MetricsIn; +do_accumulation_metrics(MetricsIn, undefined) -> + MetricsIn; do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) -> Keys = maps:keys(MetricsIn), - lists:foldl(fun(Key, Acc) -> - InVal = maps:get(Key, MetricsIn), - NVal = InVal + maps:get(Key, MetricsAcc, 0), - maps:put(Key, NVal, Acc) - end, #{}, Keys). + lists:foldl( + fun(Key, Acc) -> + InVal = maps:get(Key, MetricsIn), + NVal = InVal + maps:get(Key, MetricsAcc, 0), + maps:put(Key, NVal, Acc) + end, + #{}, + Keys + ). reset() -> Nodes = mria_mnesia:running_nodes(), @@ -374,8 +545,15 @@ reset(Topic) -> Nodes = mria_mnesia:running_nodes(), case emqx_topic_metrics_proto_v1:reset(Nodes, Topic) of {SuccResList, []} -> - case lists:filter(fun({error, _}) -> true; (_) -> false - end, SuccResList) of + case + lists:filter( + fun + ({error, _}) -> true; + (_) -> false + end, + SuccResList + ) + of [{error, Reason} | _] -> {error, Reason}; [] -> @@ -388,17 +566,22 @@ reset(Topic) -> reason2httpresp(quota_exceeded) -> Msg = list_to_binary( - io_lib:format("Max topic metrics count is ~p", - [emqx_topic_metrics:max_limit()])), + io_lib:format( + "Max topic metrics count is ~p", + [emqx_topic_metrics:max_limit()] + ) + ), {409, #{code => ?EXCEED_LIMIT, message => Msg}}; reason2httpresp(bad_topic) -> Msg = <<"Bad Topic, topic cannot have wildcard">>, {400, #{code => ?BAD_TOPIC, message => Msg}}; reason2httpresp({quota_exceeded, bad_topic}) -> Msg = list_to_binary( - io_lib:format( - "Max topic metrics count is ~p, and topic cannot have wildcard", - [emqx_topic_metrics:max_limit()])), + io_lib:format( + "Max topic metrics count is ~p, and topic cannot have wildcard", + [emqx_topic_metrics:max_limit()] + ) + ), {400, #{code => ?BAD_REQUEST, message => Msg}}; reason2httpresp(already_existed) -> Msg = <<"Topic already registered">>, diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index 551363d73..5bde45c4b 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -49,7 +49,7 @@ end_per_suite(_) -> t_load_case(_) -> Hooks = emqx_hooks:lookup('message.publish'), - MFA = {emqx_delayed,on_message_publish,[]}, + MFA = {emqx_delayed, on_message_publish, []}, ?assertEqual(false, lists:keyfind(MFA, 2, Hooks)), ok = emqx_delayed:enable(), Hooks1 = emqx_hooks:lookup('message.publish'), @@ -59,7 +59,10 @@ t_load_case(_) -> t_delayed_message(_) -> ok = emqx_delayed:enable(), DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>), - ?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)), + ?assertEqual( + {stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, + on_message_publish(DelayedMsg) + ), Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>), ?assertEqual({ok, Msg}, on_message_publish(Msg)), diff --git a/apps/emqx_modules/test/emqx_event_message_SUITE.erl b/apps/emqx_modules/test/emqx_event_message_SUITE.erl index 889ecc628..c74db36f6 100644 --- a/apps/emqx_modules/test/emqx_event_message_SUITE.erl +++ b/apps/emqx_modules/test/emqx_event_message_SUITE.erl @@ -22,16 +22,20 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(EVENT_MESSAGE, <<""" -event_message: { - client_connected: true - client_disconnected: true - client_subscribed: true - client_unsubscribed: true - message_delivered: true - message_acked: true - message_dropped: true -}""">>). +-define(EVENT_MESSAGE, << + "" + "\n" + "event_message: {\n" + " client_connected: true\n" + " client_disconnected: true\n" + " client_subscribed: true\n" + " client_unsubscribed: true\n" + " message_delivered: true\n" + " message_acked: true\n" + " message_dropped: true\n" + "}" + "" +>>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -49,8 +53,10 @@ t_event_topic(_) -> {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), {ok, _} = emqtt:connect(C1), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1), - {ok, C2} = emqtt:start_link([{clientid, <<"clientid">>}, - {username, <<"username">>}]), + {ok, C2} = emqtt:start_link([ + {clientid, <<"clientid">>}, + {username, <<"username">>} + ]), {ok, _} = emqtt:connect(C2), ok = recv_connected(<<"clientid">>), @@ -72,7 +78,7 @@ t_event_topic(_) -> recv_message_acked(<<"clientid">>), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1), - ok= emqtt:publish(C2, <<"test_sub1">>, <<"test">>), + ok = emqtt:publish(C2, <<"test_sub1">>, <<"test">>), recv_message_dropped(<<"clientid">>), {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_unsubscribed">>, qos1), @@ -94,15 +100,19 @@ t_reason(_) -> recv_connected(ClientId) -> {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), ?assertMatch(<<"$event/client_connected">>, Topic), - ?assertMatch(#{<<"clientid">> := ClientId, - <<"username">> := <<"username">>, - <<"ipaddress">> := <<"127.0.0.1">>, - <<"proto_name">> := <<"MQTT">>, - <<"proto_ver">> := ?MQTT_PROTO_V4, - <<"clean_start">> := true, - <<"expiry_interval">> := 0, - <<"keepalive">> := 60 - }, emqx_json:decode(Payload, [return_maps])). + ?assertMatch( + #{ + <<"clientid">> := ClientId, + <<"username">> := <<"username">>, + <<"ipaddress">> := <<"127.0.0.1">>, + <<"proto_name">> := <<"MQTT">>, + <<"proto_ver">> := ?MQTT_PROTO_V4, + <<"clean_start">> := true, + <<"expiry_interval">> := 0, + <<"keepalive">> := 60 + }, + emqx_json:decode(Payload, [return_maps]) + ). recv_subscribed(_ClientId) -> {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100), @@ -128,9 +138,14 @@ recv_unsubscribed(_ClientId) -> recv_disconnected(ClientId) -> {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), ?assertMatch(<<"$event/client_disconnected">>, Topic), - ?assertMatch(#{<<"clientid">> := ClientId, - <<"username">> := <<"username">>, - <<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])). + ?assertMatch( + #{ + <<"clientid">> := ClientId, + <<"username">> := <<"username">>, + <<"reason">> := <<"normal">> + }, + emqx_json:decode(Payload, [return_maps]) + ). %%-------------------------------------------------------------------- %% Internal functions @@ -140,6 +155,5 @@ receive_publish(Timeout) -> receive {publish, Publish} -> {ok, Publish} - after - Timeout -> {error, timeout} + after Timeout -> {error, timeout} end. diff --git a/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl b/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl index c79153cba..72258c413 100644 --- a/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl +++ b/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl @@ -48,4 +48,3 @@ t_topic_metrics_list(_) -> t_topic_metrics_add_remove(_) -> ok. - diff --git a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl index cdc4b4521..430e422f4 100644 --- a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl +++ b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl @@ -22,27 +22,31 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(REWRITE, <<""" -rewrite: [ - { - action : publish - source_topic : \"x/#\" - re : \"^x/y/(.+)$\" - dest_topic : \"z/y/$1\" - }, - { - action : subscribe - source_topic : \"y/+/z/#\" - re : \"^y/(.+)/z/(.+)$\" - dest_topic : \"y/z/$2\" - }, - { - action : all - source_topic : \"all/+/x/#\" - re : \"^all/(.+)/x/(.+)$\" - dest_topic : \"all/x/$2\" - } -]""">>). +-define(REWRITE, << + "" + "\n" + "rewrite: [\n" + " {\n" + " action : publish\n" + " source_topic : \"x/#\"\n" + " re : \"^x/y/(.+)$\"\n" + " dest_topic : \"z/y/$1\"\n" + " },\n" + " {\n" + " action : subscribe\n" + " source_topic : \"y/+/z/#\"\n" + " re : \"^y/(.+)/z/(.+)$\"\n" + " dest_topic : \"y/z/$2\"\n" + " },\n" + " {\n" + " action : all\n" + " source_topic : \"all/+/x/#\"\n" + " re : \"^all/(.+)/x/(.+)$\"\n" + " dest_topic : \"all/x/$2\"\n" + " }\n" + "]" + "" +>>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -62,11 +66,14 @@ t_subscribe_rewrite(_Config) -> timer:sleep(150), Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>), ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]), - RecvTopics = [begin - ok = emqtt:publish(Conn, Topic, <<"payload">>), - {ok, #{topic := RecvTopic}} = receive_publish(100), - RecvTopic - end || Topic <- SubDestTopics], + RecvTopics = [ + begin + ok = emqtt:publish(Conn, Topic, <<"payload">>), + {ok, #{topic := RecvTopic}} = receive_publish(100), + RecvTopic + end + || Topic <- SubDestTopics + ], ?assertEqual(SubDestTopics, RecvTopics), {ok, _, _} = emqtt:unsubscribe(Conn, SubOrigTopics), timer:sleep(100), @@ -79,11 +86,14 @@ t_publish_rewrite(_Config) -> PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>], {ok, _Props2, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- PubDestTopics]), - RecvTopics = [begin - ok = emqtt:publish(Conn, Topic, <<"payload">>), - {ok, #{topic := RecvTopic}} = receive_publish(100), - RecvTopic - end || Topic <- PubOrigTopics], + RecvTopics = [ + begin + ok = emqtt:publish(Conn, Topic, <<"payload">>), + {ok, #{topic := RecvTopic}} = receive_publish(100), + RecvTopic + end + || Topic <- PubOrigTopics + ], ?assertEqual(PubDestTopics, RecvTopics), {ok, _, _} = emqtt:unsubscribe(Conn, PubDestTopics), terminate(Conn). @@ -96,17 +106,19 @@ t_rewrite_rule(_Config) -> ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). t_rewrite_re_error(_Config) -> - Rules = [#{ - action => subscribe, - source_topic => "y/+/z/#", - re => "{^y/(.+)/z/(.+)$*", - dest_topic => "\"y/z/$2" - }], + Rules = [ + #{ + action => subscribe, + source_topic => "y/+/z/#", + re => "{^y/(.+)/z/(.+)$*", + dest_topic => "\"y/z/$2" + } + ], Error = { "y/+/z/#", "{^y/(.+)/z/(.+)$*", "\"y/z/$2", - {"nothing to repeat",16} + {"nothing to repeat", 16} }, ?assertEqual({[], [], [Error]}, emqx_rewrite:compile(Rules)), ok. @@ -114,30 +126,39 @@ t_rewrite_re_error(_Config) -> t_list(_Config) -> ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE), Expect = [ - #{<<"action">> => <<"publish">>, + #{ + <<"action">> => <<"publish">>, <<"dest_topic">> => <<"z/y/$1">>, <<"re">> => <<"^x/y/(.+)$">>, - <<"source_topic">> => <<"x/#">>}, - #{<<"action">> => <<"subscribe">>, + <<"source_topic">> => <<"x/#">> + }, + #{ + <<"action">> => <<"subscribe">>, <<"dest_topic">> => <<"y/z/$2">>, <<"re">> => <<"^y/(.+)/z/(.+)$">>, - <<"source_topic">> => <<"y/+/z/#">>}, - #{<<"action">> => <<"all">>, + <<"source_topic">> => <<"y/+/z/#">> + }, + #{ + <<"action">> => <<"all">>, <<"dest_topic">> => <<"all/x/$2">>, <<"re">> => <<"^all/(.+)/x/(.+)$">>, - <<"source_topic">> => <<"all/+/x/#">>}], + <<"source_topic">> => <<"all/+/x/#">> + } + ], ?assertEqual(Expect, emqx_rewrite:list()), ok. t_update(_Config) -> ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE), Init = emqx_rewrite:list(), - Rules = [#{ - <<"source_topic">> => <<"test/#">>, - <<"re">> => <<"test/*">>, - <<"dest_topic">> => <<"test1/$2">>, - <<"action">> => <<"publish">> - }], + Rules = [ + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => <<"test/*">>, + <<"dest_topic">> => <<"test1/$2">>, + <<"action">> => <<"publish">> + } + ], ok = emqx_rewrite:update(Rules), ?assertEqual(Rules, emqx_rewrite:list()), ok = emqx_rewrite:update(Init), @@ -161,26 +182,25 @@ t_update_disable(_Config) -> t_update_re_failed(_Config) -> ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE), Re = <<"*^test/*">>, - Rules = [#{ - <<"source_topic">> => <<"test/#">>, - <<"re">> => Re, - <<"dest_topic">> => <<"test1/$2">>, - <<"action">> => <<"publish">> - }], - ?assertError({badmatch, - {error, - {_, - [ - {validation_error, - #{ - reason := {Re, {"nothing to repeat", 0}}, - value := Re - } - } - ] - } + Rules = [ + #{ + <<"source_topic">> => <<"test/#">>, + <<"re">> => Re, + <<"dest_topic">> => <<"test1/$2">>, + <<"action">> => <<"publish">> } - }, emqx_rewrite:update(Rules)), + ], + ?assertError( + {badmatch, + {error, + {_, [ + {validation_error, #{ + reason := {Re, {"nothing to repeat", 0}}, + value := Re + }} + ]}}}, + emqx_rewrite:update(Rules) + ), ok. %%-------------------------------------------------------------------- @@ -190,8 +210,7 @@ t_update_re_failed(_Config) -> receive_publish(Timeout) -> receive {publish, Publish} -> {ok, Publish} - after - Timeout -> {error, timeout} + after Timeout -> {error, timeout} end. init() -> diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index efb258853..d4903b9b1 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -40,32 +40,38 @@ init_per_testcase(t_get_telemetry, Config) -> TestPID = self(), ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]), ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) -> - TestPID ! {request, Method, URL, Headers, Body} - end), + TestPID ! {request, Method, URL, Headers, Body} + end), ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]), - ok = meck:expect(emqx_telemetry, read_raw_build_info, - fun() -> - {ok, Path} = file:read_link(filename:join([DataDir, "BUILD_INFO"])), - {ok, Template} = file:read_file(Path), - Vars0 = [ {build_info_arch, "arch"} - , {build_info_wordsize, "64"} - , {build_info_os, "os"} - , {build_info_erlang, "erlang"} - , {build_info_elixir, "elixir"} - , {build_info_relform, "relform"} - ], - Vars = [{atom_to_list(K), iolist_to_binary(V)} - || {K, V} <- Vars0], - Rendered = bbmustache:render(Template, Vars), - {ok, Rendered} - end), + ok = meck:expect( + emqx_telemetry, + read_raw_build_info, + fun() -> + {ok, Path} = file:read_link(filename:join([DataDir, "BUILD_INFO"])), + {ok, Template} = file:read_file(Path), + Vars0 = [ + {build_info_arch, "arch"}, + {build_info_wordsize, "64"}, + {build_info_os, "os"}, + {build_info_erlang, "erlang"}, + {build_info_elixir, "elixir"}, + {build_info_relform, "relform"} + ], + Vars = [ + {atom_to_list(K), iolist_to_binary(V)} + || {K, V} <- Vars0 + ], + Rendered = bbmustache:render(Template, Vars), + {ok, Rendered} + end + ), Config; init_per_testcase(_Testcase, Config) -> TestPID = self(), ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]), ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) -> - TestPID ! {request, Method, URL, Headers, Body} - end), + TestPID ! {request, Method, URL, Headers, Body} + end), Config. end_per_testcase(t_get_telemetry, _Config) -> @@ -113,14 +119,16 @@ t_get_telemetry(_Config) -> ?assertEqual(0, get_value(num_clients, TelemetryData)), BuildInfo = get_value(build_info, TelemetryData), ?assertMatch( - #{ <<"arch">> := <<_/binary>> - , <<"elixir">> := <<_/binary>> - , <<"erlang">> := <<_/binary>> - , <<"os">> := <<_/binary>> - , <<"relform">> := <<_/binary>> - , <<"wordsize">> := Wordsize + #{ + <<"arch">> := <<_/binary>>, + <<"elixir">> := <<_/binary>>, + <<"erlang">> := <<_/binary>>, + <<"os">> := <<_/binary>>, + <<"relform">> := <<_/binary>>, + <<"wordsize">> := Wordsize } when is_integer(Wordsize), - BuildInfo), + BuildInfo + ), VMSpecs = get_value(vm_specs, TelemetryData), ?assert(is_integer(get_value(num_cpus, VMSpecs))), ?assert(0 =< get_value(num_cpus, VMSpecs)), diff --git a/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl b/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl index cae3d4917..4d8a340d2 100644 --- a/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl +++ b/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl @@ -19,9 +19,12 @@ -compile(export_all). -compile(nowarn_export_all). - --define(TOPIC, <<""" -topic_metrics: []""">>). +-define(TOPIC, << + "" + "\n" + "topic_metrics: []" + "" +>>). -include_lib("eunit/include/eunit.hrl"). @@ -47,7 +50,12 @@ t_nonexistent_topic_metrics(_) -> ?assertEqual({error, invalid_metric}, emqx_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')), - % ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')), + + %% ?assertEqual( + %% {error, invalid_metric}, + %% emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics') + %% ), + emqx_topic_metrics:deregister(<<"a/b/c">>), emqx_topic_metrics:disable(). @@ -64,7 +72,12 @@ t_topic_metrics(_) -> ?assertEqual(ok, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assert(emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0), - % ?assert(emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}), + + %% ?assert( + %% emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= + %% #{long => 0, medium => 0, short => 0} + %% ), + emqx_topic_metrics:deregister(<<"a/b/c">>), emqx_topic_metrics:disable(). @@ -78,9 +91,11 @@ t_hook(_) -> ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), - {ok, C} = emqtt:start_link([{host, "localhost"}, - {clientid, "myclient"}, - {username, "myuser"}]), + {ok, C} = emqtt:start_link([ + {host, "localhost"}, + {clientid, "myclient"}, + {username, "myuser"} + ]), {ok, _} = emqtt:connect(C), emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), ct:sleep(100),