diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index e9adbbdf6..3d67523e9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -172,44 +172,58 @@ fields("node_metrics") -> [{"node", sc(binary(), #{desc => ?DESC("node_node"), example => "emqx@127.0.0.1"})}] ++ fields("metrics"); fields("ctx_pub") -> + Event = 'message.publish', [ - {"event_type", event_type_sc(message_publish)}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, {"id", sc(binary(), #{desc => ?DESC("event_id")})} | msg_event_common_fields() ]; fields("ctx_sub") -> + Event = 'session.subscribed', [ - {"event_type", event_type_sc(session_subscribed)} + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)} | msg_event_common_fields() ]; fields("ctx_unsub") -> + Event = 'session.unsubscribed', [ - {"event_type", event_type_sc(session_unsubscribed)} - | proplists:delete("event_type", fields("ctx_sub")) + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)} + | without(["event_type", "event_topic", "event"], fields("ctx_sub")) ]; fields("ctx_delivered") -> + Event = 'message.delivered', [ - {"event_type", event_type_sc(message_delivered)}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, {"id", sc(binary(), #{desc => ?DESC("event_id")})}, {"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})}, {"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})} | msg_event_common_fields() ]; fields("ctx_acked") -> + Event = 'message.acked', [ - {"event_type", event_type_sc(message_acked)} - | proplists:delete("event_type", fields("ctx_delivered")) + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)} + | without(["event_type", "event_topic", "event"], fields("ctx_delivered")) ]; fields("ctx_dropped") -> + Event = 'message.dropped', [ - {"event_type", event_type_sc(message_dropped)}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, {"id", sc(binary(), #{desc => ?DESC("event_id")})}, {"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})} | msg_event_common_fields() ]; fields("ctx_connected") -> + Event = 'client.connected', [ - {"event_type", event_type_sc(client_connected)}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"username", sc(binary(), #{desc => ?DESC("event_username")})}, {"mountpoint", sc(binary(), #{desc => ?DESC("event_mountpoint")})}, @@ -227,8 +241,10 @@ fields("ctx_connected") -> })} ]; fields("ctx_disconnected") -> + Event = 'client.disconnected', [ - {"event_type", event_type_sc(client_disconnected)}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"username", sc(binary(), #{desc => ?DESC("event_username")})}, {"reason", sc(binary(), #{desc => ?DESC("event_ctx_disconnected_reason")})}, @@ -240,8 +256,10 @@ fields("ctx_disconnected") -> })} ]; fields("ctx_connack") -> + Event = 'client.connack', [ - {"event_type", event_type_sc(client_connack)}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, {"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_connack_reason_code")})}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})}, @@ -258,8 +276,10 @@ fields("ctx_connack") -> })} ]; fields("ctx_check_authz_complete") -> + Event = 'client.check_authz_complete', [ - {"event_type", event_type_sc(client_check_authz_complete)}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, {"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})}, {"username", sc(binary(), #{desc => ?DESC("event_username")})}, {"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})}, @@ -269,8 +289,11 @@ fields("ctx_check_authz_complete") -> {"result", sc(binary(), #{desc => ?DESC("event_result")})} ]; fields("ctx_bridge_mqtt") -> + Event = '$bridges/mqtt:*', + EventBin = atom_to_binary(Event), [ - {"event_type", event_type_sc('$bridges/mqtt:*')}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(EventBin)}, {"id", sc(binary(), #{desc => ?DESC("event_id")})}, {"payload", sc(binary(), #{desc => ?DESC("event_payload")})}, {"topic", sc(binary(), #{desc => ?DESC("event_topic")})}, @@ -281,8 +304,10 @@ fields("ctx_bridge_mqtt") -> qos() ]; fields("ctx_delivery_dropped") -> + Event = 'delivery.dropped', [ - {"event_type", event_type_sc(delivery_dropped)}, + {"event_type", event_type_sc(Event)}, + {"event", event_sc(Event)}, {"id", sc(binary(), #{desc => ?DESC("event_id")})}, {"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})}, {"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})}, @@ -309,7 +334,21 @@ sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). event_type_sc(Event) -> - sc(Event, #{desc => ?DESC("event_event_type"), required => true}). + EventType = event_to_event_type(Event), + sc(EventType, #{desc => ?DESC("event_event_type"), required => true}). + +-spec event_to_event_type(atom()) -> atom(). +event_to_event_type(Event) -> + binary_to_atom(binary:replace(atom_to_binary(Event), <<".">>, <<"_">>)). + +event_sc(Event) when is_binary(Event) -> + %% only exception is `$bridges/...'. + sc(binary(), #{default => Event, importance => ?IMPORTANCE_HIDDEN}); +event_sc(Event) -> + sc(Event, #{default => Event, importance => ?IMPORTANCE_HIDDEN}). + +without(FieldNames, Fields) -> + lists:foldl(fun proplists:delete/2, Fields, FieldNames). publish_received_at_sc() -> sc(integer(), #{desc => ?DESC("event_publish_received_at")}). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 867fffcc1..1db533efc 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -27,7 +27,7 @@ test(#{sql := Sql, context := Context}) -> case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> - InTopic = maps:get(topic, Context, <<>>), + InTopic = get_in_topic(Context), EventTopics = emqx_rule_sqlparser:select_from(Select), case lists:all(fun is_publish_topic/1, EventTopics) of true -> @@ -37,8 +37,13 @@ test(#{sql := Sql, context := Context}) -> false -> {error, nomatch} end; false -> - %% the rule is for both publish and events, test it directly - test_rule(Sql, Select, Context, EventTopics) + case lists:member(InTopic, EventTopics) of + true -> + %% the rule is for both publish and events, test it directly + test_rule(Sql, Select, Context, EventTopics); + false -> + {error, nomatch} + end end; {error, Reason} -> ?SLOG(debug, #{ @@ -92,15 +97,12 @@ flatten([D | L]) when is_list(D) -> [D0 || {ok, D0} <- D] ++ flatten(L). fill_default_values(Event, Context) -> - maps:merge(envs_examp(Event), Context). + maps:merge(envs_examp(Event, Context), Context). -envs_examp(EventTopic) -> - EventName = emqx_rule_events:event_name(EventTopic), - emqx_rule_maps:atom_key_map( - maps:from_list( - emqx_rule_events:columns_with_exam(EventName) - ) - ). +envs_examp(EventTopic, Context) -> + EventName = maps:get(event, Context, emqx_rule_events:event_name(EventTopic)), + Env = maps:from_list(emqx_rule_events:columns_with_exam(EventName)), + emqx_rule_maps:atom_key_map(Env). is_test_runtime_env_atom() -> 'emqx_rule_sqltester:is_test_runtime_env'. @@ -118,3 +120,26 @@ is_test_runtime_env() -> true -> true; _ -> false end. + +%% Most events have the original `topic' input, but their own topic (i.e.: `$events/...') +%% is different from `topic'. +get_in_topic(Context) -> + case maps:find(event_topic, Context) of + {ok, EventTopic} -> + EventTopic; + error -> + case maps:find(event, Context) of + {ok, Event} -> + maybe_infer_in_topic(Context, Event); + error -> + maps:get(topic, Context, <<>>) + end + end. + +maybe_infer_in_topic(Context, 'message.publish') -> + %% This is special because the common use in the frontend is to select this event, but + %% test the input `topic' field against MQTT topic filters in the `FROM' clause rather + %% than the corresponding `$events/message_publish'. + maps:get(topic, Context, <<>>); +maybe_infer_in_topic(_Context, Event) -> + emqx_rule_events:event_topic(Event). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index f3df46b80..9dde51e74 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1990,7 +1990,10 @@ t_sqlparse_event_1(_Config) -> emqx_rule_sqltester:test( #{ sql => Sql, - context => #{topic => <<"t/tt">>} + context => #{ + topic => <<"t/tt">>, + event => 'session.subscribed' + } } ) ). @@ -2004,7 +2007,10 @@ t_sqlparse_event_2(_Config) -> emqx_rule_sqltester:test( #{ sql => Sql, - context => #{clientid => <<"abc">>} + context => #{ + clientid => <<"abc">>, + event => 'client.connected' + } } ) ). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl new file mode 100644 index 000000000..ea4c882fa --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl @@ -0,0 +1,385 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_rule_engine_api_2_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + app_specs(), + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + emqx_common_test_http:create_default_app(), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), + ok. + +app_specs() -> + [ + emqx_conf, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ]. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +request(Method, Path, Params) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + +sql_test_api(Params) -> + Method = post, + Path = emqx_mgmt_api_test_util:api_path(["rule_test"]), + ct:pal("sql test (http):\n ~p", [Params]), + Res = request(Method, Path, Params), + ct:pal("sql test (http) result:\n ~p", [Res]), + Res. + +%%------------------------------------------------------------------------------ +%% Test cases +%%------------------------------------------------------------------------------ + +t_rule_test_smoke(_Config) -> + %% Example inputs recorded from frontend on 2023-12-04 + Publish = [ + #{ + expected => #{code => 200}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_publish">>, + <<"payload">> => <<"{\"msg\": \"hello\"}">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + hint => <<"wrong topic">>, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_publish">>, + <<"payload">> => <<"{\"msg\": \"hello\"}">>, + <<"qos">> => 1, + <<"topic">> => <<"a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + hint => << + "Currently, the frontend doesn't try to match against " + "$events/message_published, but it may start sending " + "the event topic in the future." + >>, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_publish">>, + <<"payload">> => <<"{\"msg\": \"hello\"}">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"$events/message_published\"">> + } + } + ], + %% Default input SQL doesn't match any event topic + DefaultNoMatch = [ + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx_2">>, + <<"event_type">> => <<"message_delivered">>, + <<"from_clientid">> => <<"c_emqx_1">>, + <<"from_username">> => <<"u_emqx_1">>, + <<"payload">> => <<"{\"msg\": \"hello\"}">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx_2">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx_2">>, + <<"event_type">> => <<"message_acked">>, + <<"from_clientid">> => <<"c_emqx_1">>, + <<"from_username">> => <<"u_emqx_1">>, + <<"payload">> => <<"{\"msg\": \"hello\"}">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx_2">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"message_dropped">>, + <<"payload">> => <<"{\"msg\": \"hello\"}">>, + <<"qos">> => 1, + <<"reason">> => <<"no_subscribers">>, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"client_connected">>, + <<"peername">> => <<"127.0.0.1:52918">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"client_disconnected">>, + <<"reason">> => <<"normal">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"client_connack">>, + <<"reason_code">> => <<"sucess">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"action">> => <<"publish">>, + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"client_check_authz_complete">>, + <<"result">> => <<"allow">>, + <<"topic">> => <<"t/1">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"session_subscribed">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"session_unsubscribed">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx_2">>, + <<"event_type">> => <<"delivery_dropped">>, + <<"from_clientid">> => <<"c_emqx_1">>, + <<"from_username">> => <<"u_emqx_1">>, + <<"payload">> => <<"{\"msg\": \"hello\"}">>, + <<"qos">> => 1, + <<"reason">> => <<"queue_full">>, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx_2">> + }, + <<"sql">> => <<"SELECT\n *\nFROM\n \"t/#\"">> + } + } + ], + MultipleFrom = [ + #{ + expected => #{code => 200}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"session_unsubscribed">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => + <<"SELECT\n *\nFROM\n \"t/#\", \"$events/session_unsubscribed\" ">> + } + }, + #{ + expected => #{code => 200}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"session_unsubscribed">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => + <<"SELECT\n *\nFROM\n \"$events/message_dropped\", \"$events/session_unsubscribed\" ">> + } + }, + #{ + expected => #{code => 412}, + input => + #{ + <<"context">> => + #{ + <<"clientid">> => <<"c_emqx">>, + <<"event_type">> => <<"session_unsubscribed">>, + <<"qos">> => 1, + <<"topic">> => <<"t/a">>, + <<"username">> => <<"u_emqx">> + }, + <<"sql">> => + <<"SELECT\n *\nFROM\n \"$events/message_dropped\", \"$events/client_connected\" ">> + } + } + ], + Cases = Publish ++ DefaultNoMatch ++ MultipleFrom, + FailedCases = lists:filtermap(fun do_t_rule_test_smoke/1, Cases), + ?assertEqual([], FailedCases), + ok. + +do_t_rule_test_smoke(#{input := Input, expected := #{code := ExpectedCode}} = Case) -> + {_ErrOrOk, {{_, Code, _}, _, Body}} = sql_test_api(Input), + case Code =:= ExpectedCode of + true -> + false; + false -> + {true, #{ + expected => ExpectedCode, + hint => maps:get(hint, Case, <<>>), + got => Code, + resp_body => Body + }} + end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl index 0c772958e..801303d3f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_test_SUITE.erl @@ -216,7 +216,7 @@ t_ctx_delivery_dropped(_) -> t_mongo_date_function_should_return_string_in_test_env(_) -> SQL = - <<"SELECT mongo_date() as mongo_date FROM \"t/1\"">>, + <<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>, Context = #{ action => <<"publish">>,