Merge pull request #9422 from emqx/1124-refactor-avoid-creating-atom-at-run

refactor: use static function references
This commit is contained in:
Zaiming (Stone) Shi 2022-12-06 14:24:05 +01:00 committed by GitHub
commit 380b75d4a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 136 additions and 112 deletions

View File

@ -65,10 +65,7 @@ fields("rule_info") ->
] ++ fields("rule_creation");
%% TODO: we can delete this API if the Dashboard not depends on it
fields("rule_events") ->
ETopics = [
binary_to_atom(emqx_rule_events:event_topic(E))
|| E <- emqx_rule_events:event_names()
],
ETopics = emqx_rule_events:event_topics_enum(),
[
{"event", sc(hoconsc:enum(ETopics), #{desc => ?DESC("rs_event"), required => true})},
{"title", sc(binary(), #{desc => ?DESC("rs_title"), example => "some title"})},
@ -150,77 +147,43 @@ fields("node_metrics") ->
fields("metrics");
fields("ctx_pub") ->
[
{"event_type", sc(message_publish, #{desc => ?DESC("event_event_type"), required => true})},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()];
{"event_type", event_type_sc(message_publish)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})}
| msg_event_common_fields()
];
fields("ctx_sub") ->
[
{"event_type",
sc(session_subscribed, #{desc => ?DESC("event_event_type"), required => true})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()];
{"event_type", event_type_sc(session_subscribed)}
| msg_event_common_fields()
];
fields("ctx_unsub") ->
[
{"event_type",
sc(session_unsubscribed, #{desc => ?DESC("event_event_type"), required => true})}
] ++
proplists:delete("event_type", fields("ctx_sub"));
{"event_type", event_type_sc(session_unsubscribed)}
| proplists:delete("event_type", fields("ctx_sub"))
];
fields("ctx_delivered") ->
[
{"event_type",
sc(message_delivered, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(message_delivered)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()];
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
| msg_event_common_fields()
];
fields("ctx_acked") ->
[{"event_type", sc(message_acked, #{desc => ?DESC("event_event_type"), required => true})}] ++
proplists:delete("event_type", fields("ctx_delivered"));
[
{"event_type", event_type_sc(message_acked)}
| proplists:delete("event_type", fields("ctx_delivered"))
];
fields("ctx_dropped") ->
[
{"event_type", sc(message_dropped, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(message_dropped)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()];
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})}
| msg_event_common_fields()
];
fields("ctx_connected") ->
[
{"event_type",
sc(client_connected, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(client_connected)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"mountpoint", sc(binary(), #{desc => ?DESC("event_mountpoint")})},
@ -239,8 +202,7 @@ fields("ctx_connected") ->
];
fields("ctx_disconnected") ->
[
{"event_type",
sc(client_disconnected, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(client_disconnected)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_disconnected_reason")})},
@ -253,7 +215,7 @@ fields("ctx_disconnected") ->
];
fields("ctx_connack") ->
[
{"event_type", sc(client_connack, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(client_connack)},
{"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_connack_reason_code")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
@ -271,8 +233,7 @@ fields("ctx_connack") ->
];
fields("ctx_check_authz_complete") ->
[
{"event_type",
sc(client_check_authz_complete, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(client_check_authz_complete)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
@ -283,19 +244,16 @@ fields("ctx_check_authz_complete") ->
];
fields("ctx_bridge_mqtt") ->
[
{"event_type",
sc('$bridges/mqtt:*', #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc('$bridges/mqtt:*')},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"server", sc(binary(), #{desc => ?DESC("event_server")})},
{"dup", sc(binary(), #{desc => ?DESC("event_dup")})},
{"retain", sc(binary(), #{desc => ?DESC("event_retain")})},
{"message_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()].
{"message_received_at", publish_received_at_sc()},
qos()
].
qos() ->
{"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}.
@ -312,4 +270,22 @@ rule_id() ->
)}.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).
event_type_sc(Event) ->
sc(Event, #{desc => ?DESC("event_event_type"), required => true}).
publish_received_at_sc() ->
sc(integer(), #{desc => ?DESC("event_publish_received_at")}).
msg_event_common_fields() ->
[
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at", publish_received_at_sc()},
qos()
].

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.4"},
{vsn, "5.0.5"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt]},

View File

@ -28,6 +28,7 @@
unload/1,
event_names/0,
event_name/1,
event_topics_enum/0,
event_topic/1,
eventmsg_publish/1
]).
@ -78,6 +79,22 @@ event_names() ->
'delivery.dropped'
].
%% for documentation purposes
event_topics_enum() ->
[
'$events/client_connected',
'$events/client_disconnected',
'$events/client_connack',
'$events/client_check_authz_complete',
'$events/session_subscribed',
'$events/session_unsubscribed',
'$events/message_delivered',
'$events/message_acked',
'$events/message_dropped',
'$events/delivery_dropped'
% '$events/message_publish' % not possible to use in SELECT FROM
].
reload() ->
lists:foreach(
fun(Rule) ->
@ -88,21 +105,22 @@ reload() ->
load(Topic) ->
HookPoint = event_name(Topic),
HookFun = hook_fun_name(HookPoint),
emqx_hooks:put(
HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}, ?HP_RULE_ENGINE
HookPoint, {?MODULE, HookFun, [#{event_topic => Topic}]}, ?HP_RULE_ENGINE
).
unload() ->
lists:foreach(
fun(HookPoint) ->
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)})
end,
event_names()
).
unload(Topic) ->
HookPoint = event_name(Topic),
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}).
emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)}).
%%--------------------------------------------------------------------
%% Callbacks
@ -987,15 +1005,25 @@ columns_example_props_specific(unsub_props) ->
%% Helper functions
%%--------------------------------------------------------------------
hook_fun(<<"$bridges/", _/binary>>) ->
on_bridge_message_received;
hook_fun(Event) ->
case string:split(atom_to_list(Event), ".") of
[Prefix, Name] ->
list_to_atom(lists:append(["on_", Prefix, "_", Name]));
[_] ->
error(invalid_event, Event)
end.
hook_fun_name(HookPoint) ->
HookFun = hook_fun(HookPoint),
{name, HookFunName} = erlang:fun_info(HookFun, name),
HookFunName.
%% return static function references to help static code checks
hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2;
hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3;
hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4;
hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4;
hook_fun('client.check_authz_complete') -> fun ?MODULE:on_client_check_authz_complete/6;
hook_fun('session.subscribed') -> fun ?MODULE:on_session_subscribed/4;
hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
hook_fun(Event) -> error({invalid_event, Event}).
reason(Reason) when is_atom(Reason) -> Reason;
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
@ -1006,19 +1034,20 @@ ntoa(undefined) -> undefined;
ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack';
event_name(<<"$events/client_check_authz_complete", _/binary>>) -> 'client.check_authz_complete';
event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed';
event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
event_name(<<"$bridges/", _/binary>> = Topic) -> Topic;
event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
event_name(<<"$events/client_connected">>) -> 'client.connected';
event_name(<<"$events/client_disconnected">>) -> 'client.disconnected';
event_name(<<"$events/client_connack">>) -> 'client.connack';
event_name(<<"$events/client_check_authz_complete">>) -> 'client.check_authz_complete';
event_name(<<"$events/session_subscribed">>) -> 'session.subscribed';
event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
event_name(<<"$events/message_delivered">>) -> 'message.delivered';
event_name(<<"$events/message_acked">>) -> 'message.acked';
event_name(<<"$events/message_dropped">>) -> 'message.dropped';
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
event_name(_) -> 'message.publish'.
event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
event_topic('client.connected') -> <<"$events/client_connected">>;
event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
event_topic('client.connack') -> <<"$events/client_connack">>;
@ -1029,8 +1058,7 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>;
event_topic('message.acked') -> <<"$events/message_acked">>;
event_topic('message.dropped') -> <<"$events/message_dropped">>;
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
event_topic('message.publish') -> <<"$events/message_publish">>;
event_topic(<<"$bridges/", _/binary>> = Topic) -> Topic.
event_topic('message.publish') -> <<"$events/message_publish">>.
printable_maps(undefined) ->
#{};

View File

@ -8,19 +8,18 @@
all() -> emqx_common_test_helpers:all(?MODULE).
t_mod_hook_fun(_) ->
Funcs = emqx_rule_events:module_info(exports),
[
?assert(lists:keymember(emqx_rule_events:hook_fun(Event), 1, Funcs))
|| Event <- [
'client.connected',
'client.disconnected',
'session.subscribed',
'session.unsubscribed',
'message.acked',
'message.dropped',
'message.delivered'
]
].
Events = emqx_rule_events:event_names(),
lists:foreach(
fun(E) ->
?assert(is_function(emqx_rule_events:hook_fun(E)))
end,
Events
),
?assertEqual(
fun emqx_rule_events:on_bridge_message_received/2,
emqx_rule_events:hook_fun(<<"$bridges/foo">>)
),
?assertError({invalid_event, foo}, emqx_rule_events:hook_fun(foo)).
t_printable_maps(_) ->
Headers = #{
@ -42,3 +41,24 @@ t_printable_maps(_) ->
?assertNot(maps:is_key(redispatch_to, Converted)),
?assertNot(maps:is_key(shared_dispatch_ack, Converted)),
ok.
t_event_name_topic_conversion(_) ->
Events = emqx_rule_events:event_names() -- ['message.publish'],
Topics = [atom_to_binary(A) || A <- emqx_rule_events:event_topics_enum()],
Zip = lists:zip(Events, Topics),
lists:foreach(
fun({Event, Topic}) ->
?assertEqual(Event, emqx_rule_events:event_name(Topic)),
?assertEqual(Topic, emqx_rule_events:event_topic(Event))
end,
Zip
).
t_special_events_name_topic_conversion(_) ->
Bridge = <<"$bridges/foo:bar">>,
AdHoc = <<"foo/bar">>,
NonExisting = <<"$events/message_publish">>,
?assertEqual(Bridge, emqx_rule_events:event_name(Bridge)),
?assertEqual('message.publish', emqx_rule_events:event_name(AdHoc)),
?assertEqual('message.publish', emqx_rule_events:event_name(NonExisting)),
?assertEqual(NonExisting, emqx_rule_events:event_topic('message.publish')).