From 3ca3470844f7b3bc9ae2af356208db07bca6ff98 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 10 Mar 2022 22:15:20 +0800 Subject: [PATCH] fix(rule_events): add descs for test columns --- .../test/emqx_connector_api_SUITE.erl | 2 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 71 +++++++++---------- .../src/emqx_rule_sqltester.erl | 18 +---- 3 files changed, 39 insertions(+), 52 deletions(-) diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 7d2b55275..6490e21fe 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -537,7 +537,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> } = jsx:decode(Rule1), %% we also check if the outputs of the rule is triggered ?assertMatch(#{inspect := #{ - event := '$bridges/mqtt', + event := <<"$bridges/mqtt", _/binary>>, id := MsgId, payload := Payload, topic := RemoteTopic, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index c21412970..c7da1a17f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -72,12 +72,9 @@ reload() -> ok = emqx_rule_engine:load_hooks_for_rule(Rule) end, emqx_rule_engine:get_rules()). -load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) -> - emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received, - [#{bridge_event_name => BridgeTopic}]}); load(Topic) -> HookPoint = event_name(Topic), - emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{}]}). + emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}). unload() -> lists:foreach(fun(HookPoint) -> @@ -102,7 +99,7 @@ on_message_publish(Message = #message{topic = Topic}, _Env) -> end, {ok, Message}. -on_bridge_message_received(Message, Env = #{bridge_event_name := BridgeTopic}) -> +on_bridge_message_received(Message, Env = #{event_topic := BridgeTopic}) -> apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env). on_client_connected(ClientInfo, ConnInfo, Env) -> @@ -451,7 +448,7 @@ event_info_session_unsubscribed() -> ). event_info_bridge_mqtt()-> event_info_common( - <<"$bridges/mqtt">>, + <<"$bridges/mqtt:my_mqtt_bridge">>, {<<"MQTT bridge message">>, <<"MQTT 桥接消息"/utf8>>}, {<<"received a message from MQTT bridge">>, <<"收到来自 MQTT 桥接的消息"/utf8>>}, <<"SELECT * FROM \"$bridges/mqtt:my_mqtt_bridge\" WHERE topic =~ 't/#'">> @@ -467,51 +464,51 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> }. test_columns('message.dropped') -> - [ {<<"reason">>, <<"no_subscribers">>} + [ {<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]} ] ++ test_columns('message.publish'); test_columns('message.publish') -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username of the sender">>]} + , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]} + , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} + , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]; test_columns('delivery.dropped') -> - [ {<<"reason">>, <<"queue_full">>} + [ {<<"reason">>, [<<"queue_full">>, <<"the reason of dropping">>]} ] ++ test_columns('message.delivered'); test_columns('message.acked') -> test_columns('message.delivered'); test_columns('message.delivered') -> - [ {<<"from_clientid">>, <<"c_emqx_1">>} - , {<<"from_username">>, <<"u_emqx_1">>} - , {<<"clientid">>, <<"c_emqx_2">>} - , {<<"username">>, <<"u_emqx_2">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + [ {<<"from_clientid">>, [<<"c_emqx_1">>, <<"the clientid of the sender">>]} + , {<<"from_username">>, [<<"u_emqx_1">>, <<"the username of the sender">>]} + , {<<"clientid">>, [<<"c_emqx_2">>, <<"the clientid of the receiver">>]} + , {<<"username">>, [<<"u_emqx_2">>, <<"the username of the receiver">>]} + , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]} + , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} + , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]; test_columns('client.connected') -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"peername">>, <<"127.0.0.1:52918">>} + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} + , {<<"peername">>, [<<"127.0.0.1:52918">>, <<"the IP address and port of the client">>]} ]; test_columns('client.disconnected') -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"reason">>, <<"normal">>} + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} + , {<<"reason">>, [<<"normal">>, <<"the reason for shutdown">>]} ]; test_columns('session.unsubscribed') -> test_columns('session.subscribed'); test_columns('session.subscribed') -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} + , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]} + , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} ]; -test_columns(<<"$bridges/mqtt">>) -> - [ {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} +test_columns(<<"$bridges/mqtt", _/binary>>) -> + [ {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]} + , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]} + , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]} ]. columns_with_exam('message.publish') -> @@ -606,8 +603,8 @@ columns_with_exam('session.subscribed') -> columns_with_exam('session.unsubscribed') -> [ columns_example_props(unsub_props) ] ++ columns_message_sub_unsub('session.unsubscribed'); -columns_with_exam(<<"$bridges/mqtt">>) -> - [ {<<"event">>, <<"$bridges/mqtt">>} +columns_with_exam(<<"$bridges/mqtt", _/binary>> = EventName) -> + [ {<<"event">>, EventName} , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} , {<<"peerhost">>, <<"192.168.0.10">>} @@ -687,6 +684,8 @@ columns_example_props_specific(unsub_props) -> %% Helper functions %%-------------------------------------------------------------------- +hook_fun(<<"$bridges/", _/binary>>) -> + on_bridge_message_received; hook_fun(Event) -> case string:split(atom_to_list(Event), ".") of [Prefix, Name] -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 4aa517fef..8b50a9db2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -69,6 +69,7 @@ get_selected_data(Selected, _Envs, _Args) -> Selected. is_publish_topic(<<"$events/", _/binary>>) -> false; +is_publish_topic(<<"$bridges/", _/binary>>) -> false; is_publish_topic(_Topic) -> true. flatten([]) -> []; @@ -83,21 +84,8 @@ echo_action(Data, Envs) -> fill_default_values(Event, Context) -> maps:merge(envs_examp(Event), Context). -envs_examp(<<"$events/", _/binary>> = EVENT_TOPIC) -> +envs_examp(EVENT_TOPIC) -> EventName = emqx_rule_events:event_name(EVENT_TOPIC), emqx_rule_maps:atom_key_map( maps:from_list( - emqx_rule_events:columns_with_exam(EventName))); -envs_examp(_) -> - #{id => emqx_guid:to_hexstr(emqx_guid:gen()), - clientid => <<"c_emqx">>, - username => <<"u_emqx">>, - payload => <<"{\"id\": 1, \"name\": \"ha\"}">>, - peerhost => <<"127.0.0.1">>, - topic => <<"t/a">>, - qos => 1, - flags => #{sys => true, event => true}, - publish_received_at => emqx_plugin_libs_rule:now_ms(), - timestamp => emqx_plugin_libs_rule:now_ms(), - node => node() - }. + emqx_rule_events:columns_with_exam(EventName))).