chore(event_message): rename event_topic to event_message

This commit is contained in:
Turtle 2021-07-30 11:02:56 +08:00 committed by turtleDeng
parent ebe31c79d4
commit 485f1293fc
6 changed files with 18 additions and 45 deletions

View File

@ -12,7 +12,7 @@ telemetry: {
}
event_topic: {
event_message: {
topics: [
"$event/client_connected",
"$event/client_disconnected",

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_event_topic).
-module(emqx_event_message).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -38,7 +38,7 @@
-endif.
enable() ->
Topics = emqx_config:get([event_topic, topics], []),
Topics = emqx_config:get([event_message, topics], []),
lists:foreach(fun(Topic) ->
case Topic of
<<"$event/client_connected">> ->
@ -61,7 +61,7 @@ enable() ->
end, Topics).
disable() ->
Topics = emqx_config:get([event_topic, topics], []),
Topics = emqx_config:get([event_message, topics], []),
lists:foreach(fun(Topic) ->
case Topic of
<<"$event/client_connected">> ->
@ -187,33 +187,6 @@ on_message_acked(_ClientInfo = #{
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
connected_payload(#{peerhost := PeerHost,
sockport := SockPort,
clientid := ClientId,
username := Username
},
#{clean_start := CleanStart,
proto_name := ProtoName,
proto_ver := ProtoVer,
keepalive := Keepalive,
connected_at := ConnectedAt,
expiry_interval := ExpiryInterval
}) ->
#{clientid => ClientId,
username => Username,
ipaddress => ntoa(PeerHost),
sockport => SockPort,
proto_name => ProtoName,
proto_ver => ProtoVer,
keepalive => Keepalive,
connack => 0, %% Deprecated?
clean_start => CleanStart,
expiry_interval => ExpiryInterval div 1000,
connected_at => ConnectedAt,
ts => erlang:system_time(millisecond)
}.
common_infos(
_ClientInfo = #{clientid := ClientId,
username := Username,

View File

@ -35,7 +35,7 @@ maybe_enable_modules() ->
emqx_config:get([delayed, enable], true) andalso emqx_delayed:enable(),
emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
emqx_config:get([recon, enable], true) andalso emqx_recon:enable(),
emqx_event_topic:enable(),
emqx_event_message:enable(),
emqx_rewrite:enable(),
emqx_topic_metrics:enable().
@ -43,6 +43,6 @@ maybe_disable_modules() ->
emqx_config:get([delayed, enable], true) andalso emqx_delayed:disable(),
emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:disable(),
emqx_config:get([recon, enable], true) andalso emqx_recon:disable(),
emqx_event_topic:disable(),
emqx_event_message:disable(),
emqx_rewrite:disable(),
emqx_topic_metrics:disable().

View File

@ -27,7 +27,7 @@ structs() ->
["delayed",
"recon",
"telemetry",
"event_topic",
"event_message",
"rewrite",
"topic_metrics"].
@ -45,7 +45,7 @@ fields("rewrite") ->
[ {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))}
];
fields("event_topic") ->
fields("event_message") ->
[ {topics, fun topics/1}
];

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_event_topic_SUITE).
-module(emqx_event_message_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -28,9 +28,9 @@ init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([emqx_modules]),
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, includes, fun() -> ["event_topic"] end ),
meck:expect(emqx_schema, includes, fun() -> ["event_message"] end ),
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_modules_schema:fields(FieldName) end),
ok = emqx_config:update([event_topic, topics], [<<"$event/client_connected">>,
ok = emqx_config:update([event_message, topics], [<<"$event/client_connected">>,
<<"$event/client_disconnected">>,
<<"$event/session_subscribed">>,
<<"$event/session_unsubscribed">>,
@ -44,7 +44,7 @@ end_per_suite(_Config) ->
meck:unload(emqx_schema).
t_event_topic(_) ->
ok = emqx_event_topic:enable(),
ok = emqx_event_message:enable(),
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
{ok, _} = emqtt:connect(C1),
{ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1),
@ -82,13 +82,13 @@ t_event_topic(_) ->
ok = emqtt:disconnect(C2),
ok = recv_disconnected(<<"clientid">>),
ok = emqtt:disconnect(C1),
ok = emqx_event_topic:disable().
ok = emqx_event_message:disable().
t_reason(_) ->
?assertEqual(normal, emqx_event_topic:reason(normal)),
?assertEqual(discarded, emqx_event_topic:reason({shutdown, discarded})),
?assertEqual(tcp_error, emqx_event_topic:reason({tcp_error, einval})),
?assertEqual(internal_error, emqx_event_topic:reason(<<"unknown error">>)).
?assertEqual(normal, emqx_event_message:reason(normal)),
?assertEqual(discarded, emqx_event_message:reason({shutdown, discarded})),
?assertEqual(tcp_error, emqx_event_message:reason({tcp_error, einval})),
?assertEqual(internal_error, emqx_event_message:reason(<<"unknown error">>)).
recv_connected(ClientId) ->
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),

View File

@ -16,7 +16,7 @@
, {"delayed", emqx_modules_schema}
, {"recon", emqx_modules_schema}
, {"telemetry", emqx_modules_schema}
, {"event_topic", emqx_modules_schema}
, {"event_message", emqx_modules_schema}
, {"rewrite", emqx_modules_schema}
, {"topic_metrics", emqx_modules_schema}
].