From cca3421308b141427db5d4725d7c79e1a4a7bb8e Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 24 Nov 2022 19:40:40 +0100 Subject: [PATCH] refactor: use static function references --- .../src/emqx_rule_api_schema.erl | 122 +++++++----------- .../src/emqx_rule_engine.app.src | 2 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 78 +++++++---- .../test/emqx_rule_events_SUITE.erl | 46 +++++-- 4 files changed, 136 insertions(+), 112 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 5c1ef9cd0..edfce5fa7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -65,10 +65,7 @@ fields("rule_info") -> ] ++ fields("rule_creation"); %% TODO: we can delete this API if the Dashboard not depends on it fields("rule_events") -> - ETopics = [ - binary_to_atom(emqx_rule_events:event_topic(E)) - || E <- emqx_rule_events:event_names() - ], + ETopics = emqx_rule_events:event_topics_enum(), [ {"event", sc(hoconsc:enum(ETopics), #{desc => ?DESC("rs_event"), required => true})}, {"title", sc(binary(), #{desc => ?DESC("rs_title"), example => "some title"})}, @@ -150,77 +147,43 @@ fields("node_metrics") -> fields("metrics"); fields("ctx_pub") -> [ - {"event_type", sc(message_publish, #{desc => ?DESC("event_event_type"), required => true})}, - {"id", sc(binary(), #{desc => ?DESC("event_id")})}, - {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, - {"username", sc(binary(), #{desc => ?DESC("event_username")})}, - {"payload", sc(binary(), #{desc => ?DESC("event_payload")})}, - {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, - {"topic", sc(binary(), #{desc => ?DESC("event_topic")})}, - {"publish_received_at", - sc(integer(), #{ - desc => ?DESC("event_publish_received_at") - })} - ] ++ [qos()]; + {"event_type", event_type_sc(message_publish)}, + {"id", sc(binary(), #{desc => ?DESC("event_id")})} + | msg_event_common_fields() + ]; fields("ctx_sub") -> [ - {"event_type", - sc(session_subscribed, #{desc => ?DESC("event_event_type"), required => true})}, - {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, - {"username", sc(binary(), #{desc => ?DESC("event_username")})}, - {"payload", sc(binary(), #{desc => ?DESC("event_payload")})}, - {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, - {"topic", sc(binary(), #{desc => ?DESC("event_topic")})}, - {"publish_received_at", - sc(integer(), #{ - desc => ?DESC("event_publish_received_at") - })} - ] ++ [qos()]; + {"event_type", event_type_sc(session_subscribed)} + | msg_event_common_fields() + ]; fields("ctx_unsub") -> [ - {"event_type", - sc(session_unsubscribed, #{desc => ?DESC("event_event_type"), required => true})} - ] ++ - proplists:delete("event_type", fields("ctx_sub")); + {"event_type", event_type_sc(session_unsubscribed)} + | proplists:delete("event_type", fields("ctx_sub")) + ]; fields("ctx_delivered") -> [ - {"event_type", - sc(message_delivered, #{desc => ?DESC("event_event_type"), required => true})}, + {"event_type", event_type_sc(message_delivered)}, {"id", sc(binary(), #{desc => ?DESC("event_id")})}, {"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})}, - {"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}, - {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, - {"username", sc(binary(), #{desc => ?DESC("event_username")})}, - {"payload", sc(binary(), #{desc => ?DESC("event_payload")})}, - {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, - {"topic", sc(binary(), #{desc => ?DESC("event_topic")})}, - {"publish_received_at", - sc(integer(), #{ - desc => ?DESC("event_publish_received_at") - })} - ] ++ [qos()]; + {"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})} + | msg_event_common_fields() + ]; fields("ctx_acked") -> - [{"event_type", sc(message_acked, #{desc => ?DESC("event_event_type"), required => true})}] ++ - proplists:delete("event_type", fields("ctx_delivered")); + [ + {"event_type", event_type_sc(message_acked)} + | proplists:delete("event_type", fields("ctx_delivered")) + ]; fields("ctx_dropped") -> [ - {"event_type", sc(message_dropped, #{desc => ?DESC("event_event_type"), required => true})}, + {"event_type", event_type_sc(message_dropped)}, {"id", sc(binary(), #{desc => ?DESC("event_id")})}, - {"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})}, - {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, - {"username", sc(binary(), #{desc => ?DESC("event_username")})}, - {"payload", sc(binary(), #{desc => ?DESC("event_payload")})}, - {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, - {"topic", sc(binary(), #{desc => ?DESC("event_topic")})}, - {"publish_received_at", - sc(integer(), #{ - desc => ?DESC("event_publish_received_at") - })} - ] ++ [qos()]; + {"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})} + | msg_event_common_fields() + ]; fields("ctx_connected") -> [ - {"event_type", - sc(client_connected, #{desc => ?DESC("event_event_type"), required => true})}, + {"event_type", event_type_sc(client_connected)}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"username", sc(binary(), #{desc => ?DESC("event_username")})}, {"mountpoint", sc(binary(), #{desc => ?DESC("event_mountpoint")})}, @@ -239,8 +202,7 @@ fields("ctx_connected") -> ]; fields("ctx_disconnected") -> [ - {"event_type", - sc(client_disconnected, #{desc => ?DESC("event_event_type"), required => true})}, + {"event_type", event_type_sc(client_disconnected)}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"username", sc(binary(), #{desc => ?DESC("event_username")})}, {"reason", sc(binary(), #{desc => ?DESC("event_ctx_disconnected_reason")})}, @@ -253,7 +215,7 @@ fields("ctx_disconnected") -> ]; fields("ctx_connack") -> [ - {"event_type", sc(client_connack, #{desc => ?DESC("event_event_type"), required => true})}, + {"event_type", event_type_sc(client_connack)}, {"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_connack_reason_code")})}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})}, @@ -271,8 +233,7 @@ fields("ctx_connack") -> ]; fields("ctx_check_authz_complete") -> [ - {"event_type", - sc(client_check_authz_complete, #{desc => ?DESC("event_event_type"), required => true})}, + {"event_type", event_type_sc(client_check_authz_complete)}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"username", sc(binary(), #{desc => ?DESC("event_username")})}, {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, @@ -283,19 +244,16 @@ fields("ctx_check_authz_complete") -> ]; fields("ctx_bridge_mqtt") -> [ - {"event_type", - sc('$bridges/mqtt:*', #{desc => ?DESC("event_event_type"), required => true})}, + {"event_type", event_type_sc('$bridges/mqtt:*')}, {"id", sc(binary(), #{desc => ?DESC("event_id")})}, {"payload", sc(binary(), #{desc => ?DESC("event_payload")})}, {"topic", sc(binary(), #{desc => ?DESC("event_topic")})}, {"server", sc(binary(), #{desc => ?DESC("event_server")})}, {"dup", sc(binary(), #{desc => ?DESC("event_dup")})}, {"retain", sc(binary(), #{desc => ?DESC("event_retain")})}, - {"message_received_at", - sc(integer(), #{ - desc => ?DESC("event_publish_received_at") - })} - ] ++ [qos()]. + {"message_received_at", publish_received_at_sc()}, + qos() + ]. qos() -> {"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}. @@ -312,4 +270,22 @@ rule_id() -> )}. sc(Type, Meta) -> hoconsc:mk(Type, Meta). + ref(Field) -> hoconsc:ref(?MODULE, Field). + +event_type_sc(Event) -> + sc(Event, #{desc => ?DESC("event_event_type"), required => true}). + +publish_received_at_sc() -> + sc(integer(), #{desc => ?DESC("event_publish_received_at")}). + +msg_event_common_fields() -> + [ + {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, + {"username", sc(binary(), #{desc => ?DESC("event_username")})}, + {"payload", sc(binary(), #{desc => ?DESC("event_payload")})}, + {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, + {"topic", sc(binary(), #{desc => ?DESC("event_topic")})}, + {"publish_received_at", publish_received_at_sc()}, + qos() + ]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 6419e4184..8608cd67a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.4"}, + {vsn, "5.0.5"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index b80b9777a..b8e227a90 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -28,6 +28,7 @@ unload/1, event_names/0, event_name/1, + event_topics_enum/0, event_topic/1, eventmsg_publish/1 ]). @@ -78,6 +79,22 @@ event_names() -> 'delivery.dropped' ]. +%% for documentation purposes +event_topics_enum() -> + [ + '$events/client_connected', + '$events/client_disconnected', + '$events/client_connack', + '$events/client_check_authz_complete', + '$events/session_subscribed', + '$events/session_unsubscribed', + '$events/message_delivered', + '$events/message_acked', + '$events/message_dropped', + '$events/delivery_dropped' + % '$events/message_publish' % not possible to use in SELECT FROM + ]. + reload() -> lists:foreach( fun(Rule) -> @@ -88,21 +105,22 @@ reload() -> load(Topic) -> HookPoint = event_name(Topic), + HookFun = hook_fun_name(HookPoint), emqx_hooks:put( - HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}, ?HP_RULE_ENGINE + HookPoint, {?MODULE, HookFun, [#{event_topic => Topic}]}, ?HP_RULE_ENGINE ). unload() -> lists:foreach( fun(HookPoint) -> - emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}) + emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)}) end, event_names() ). unload(Topic) -> HookPoint = event_name(Topic), - emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}). + emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)}). %%-------------------------------------------------------------------- %% Callbacks @@ -987,15 +1005,25 @@ columns_example_props_specific(unsub_props) -> %% Helper functions %%-------------------------------------------------------------------- -hook_fun(<<"$bridges/", _/binary>>) -> - on_bridge_message_received; -hook_fun(Event) -> - case string:split(atom_to_list(Event), ".") of - [Prefix, Name] -> - list_to_atom(lists:append(["on_", Prefix, "_", Name])); - [_] -> - error(invalid_event, Event) - end. +hook_fun_name(HookPoint) -> + HookFun = hook_fun(HookPoint), + {name, HookFunName} = erlang:fun_info(HookFun, name), + HookFunName. + +%% return static function references to help static code checks +hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2; +hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3; +hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4; +hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4; +hook_fun('client.check_authz_complete') -> fun ?MODULE:on_client_check_authz_complete/6; +hook_fun('session.subscribed') -> fun ?MODULE:on_session_subscribed/4; +hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4; +hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3; +hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3; +hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4; +hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4; +hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2; +hook_fun(Event) -> error({invalid_event, Event}). reason(Reason) when is_atom(Reason) -> Reason; reason({shutdown, Reason}) when is_atom(Reason) -> Reason; @@ -1006,19 +1034,20 @@ ntoa(undefined) -> undefined; ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). -event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected'; -event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected'; -event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack'; -event_name(<<"$events/client_check_authz_complete", _/binary>>) -> 'client.check_authz_complete'; -event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed'; -event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed'; -event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; -event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; -event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; -event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; -event_name(<<"$bridges/", _/binary>> = Topic) -> Topic; +event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge; +event_name(<<"$events/client_connected">>) -> 'client.connected'; +event_name(<<"$events/client_disconnected">>) -> 'client.disconnected'; +event_name(<<"$events/client_connack">>) -> 'client.connack'; +event_name(<<"$events/client_check_authz_complete">>) -> 'client.check_authz_complete'; +event_name(<<"$events/session_subscribed">>) -> 'session.subscribed'; +event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed'; +event_name(<<"$events/message_delivered">>) -> 'message.delivered'; +event_name(<<"$events/message_acked">>) -> 'message.acked'; +event_name(<<"$events/message_dropped">>) -> 'message.dropped'; +event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. +event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge; event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; event_topic('client.connack') -> <<"$events/client_connack">>; @@ -1029,8 +1058,7 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; -event_topic('message.publish') -> <<"$events/message_publish">>; -event_topic(<<"$bridges/", _/binary>> = Topic) -> Topic. +event_topic('message.publish') -> <<"$events/message_publish">>. printable_maps(undefined) -> #{}; diff --git a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl index c9774b93d..4a1c9d6f5 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_events_SUITE.erl @@ -8,19 +8,18 @@ all() -> emqx_common_test_helpers:all(?MODULE). t_mod_hook_fun(_) -> - Funcs = emqx_rule_events:module_info(exports), - [ - ?assert(lists:keymember(emqx_rule_events:hook_fun(Event), 1, Funcs)) - || Event <- [ - 'client.connected', - 'client.disconnected', - 'session.subscribed', - 'session.unsubscribed', - 'message.acked', - 'message.dropped', - 'message.delivered' - ] - ]. + Events = emqx_rule_events:event_names(), + lists:foreach( + fun(E) -> + ?assert(is_function(emqx_rule_events:hook_fun(E))) + end, + Events + ), + ?assertEqual( + fun emqx_rule_events:on_bridge_message_received/2, + emqx_rule_events:hook_fun(<<"$bridges/foo">>) + ), + ?assertError({invalid_event, foo}, emqx_rule_events:hook_fun(foo)). t_printable_maps(_) -> Headers = #{ @@ -42,3 +41,24 @@ t_printable_maps(_) -> ?assertNot(maps:is_key(redispatch_to, Converted)), ?assertNot(maps:is_key(shared_dispatch_ack, Converted)), ok. + +t_event_name_topic_conversion(_) -> + Events = emqx_rule_events:event_names() -- ['message.publish'], + Topics = [atom_to_binary(A) || A <- emqx_rule_events:event_topics_enum()], + Zip = lists:zip(Events, Topics), + lists:foreach( + fun({Event, Topic}) -> + ?assertEqual(Event, emqx_rule_events:event_name(Topic)), + ?assertEqual(Topic, emqx_rule_events:event_topic(Event)) + end, + Zip + ). + +t_special_events_name_topic_conversion(_) -> + Bridge = <<"$bridges/foo:bar">>, + AdHoc = <<"foo/bar">>, + NonExisting = <<"$events/message_publish">>, + ?assertEqual(Bridge, emqx_rule_events:event_name(Bridge)), + ?assertEqual('message.publish', emqx_rule_events:event_name(AdHoc)), + ?assertEqual('message.publish', emqx_rule_events:event_name(NonExisting)), + ?assertEqual(NonExisting, emqx_rule_events:event_topic('message.publish')).