test(rule events): add test cases for `schema.validation_failed` and `message.transformation_failed` events

This commit is contained in:
Thales Macedo Garitezi 2024-07-22 15:58:16 -03:00
parent 7ca5205f3f
commit 99e6613713
1 changed files with 104 additions and 4 deletions

View File

@ -160,13 +160,55 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[ lists:flatten([
emqx, emqx,
emqx_conf, emqx_conf,
emqx_rule_engine, emqx_rule_engine,
emqx_auth, emqx_auth,
emqx_bridge emqx_bridge,
], [
{emqx_schema_validation, #{
config => #{
<<"schema_validation">> => #{
<<"validations">> => [
#{
<<"name">> => <<"v1">>,
<<"topics">> => [<<"sv/fail">>],
<<"strategy">> => <<"all_pass">>,
<<"failure_action">> => <<"drop">>,
<<"checks">> => [
#{
<<"type">> => <<"sql">>,
<<"sql">> => <<"select 1 where false">>
}
]
}
]
}
}
}}
|| is_ee()
],
[
{emqx_message_transformation, #{
config => #{
<<"message_transformation">> => #{
<<"transformations">> => [
#{
<<"name">> => <<"t1">>,
<<"topics">> => <<"mt/fail">>,
<<"failure_action">> => <<"drop">>,
<<"payload_decoder">> => #{<<"type">> => <<"json">>},
<<"payload_encoder">> => #{<<"type">> => <<"json">>},
<<"operations">> => []
}
]
}
}
}}
|| is_ee()
]
]),
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
@ -250,6 +292,8 @@ init_per_testcase(t_events, Config) ->
"\"$events/message_delivered\", " "\"$events/message_delivered\", "
"\"$events/message_dropped\", " "\"$events/message_dropped\", "
"\"$events/delivery_dropped\", " "\"$events/delivery_dropped\", "
"\"$events/schema_validation_failed\", "
"\"$events/message_transformation_failed\", "
"\"t1\"", "\"t1\"",
{ok, Rule} = emqx_rule_engine:create_rule( {ok, Rule} = emqx_rule_engine:create_rule(
#{ #{
@ -834,6 +878,13 @@ t_events(_Config) ->
session_subscribed(Client2), session_subscribed(Client2),
ct:pal("====== verify t1"), ct:pal("====== verify t1"),
message_publish(Client), message_publish(Client),
is_ee() andalso
begin
ct:pal("====== verify $events/schema_validation_failed"),
schema_validation_failed(Client),
ct:pal("====== verify $events/message_transformation_failed"),
message_transformation_failed(Client)
end,
ct:pal("====== verify $events/delivery_dropped"), ct:pal("====== verify $events/delivery_dropped"),
delivery_dropped(Client), delivery_dropped(Client),
ct:pal("====== verify $events/message_delivered"), ct:pal("====== verify $events/message_delivered"),
@ -1151,6 +1202,16 @@ message_dropped(Client) ->
message_acked(_Client) -> message_acked(_Client) ->
verify_event('message.acked'), verify_event('message.acked'),
ok. ok.
schema_validation_failed(Client) ->
{ok, _} = emqtt:publish(Client, <<"sv/fail">>, <<"">>, [{qos, 1}]),
ct:sleep(100),
verify_event('schema.validation_failed'),
ok.
message_transformation_failed(Client) ->
{ok, _} = emqtt:publish(Client, <<"mt/fail">>, <<"will fail to { parse">>, [{qos, 1}]),
ct:sleep(100),
verify_event('message.transformation_failed'),
ok.
t_match_atom_and_binary(_Config) -> t_match_atom_and_binary(_Config) ->
SQL = SQL =
@ -3834,6 +3895,9 @@ t_trace_rule_id(_Config) ->
%% Internal helpers %% Internal helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
is_ee() ->
emqx_release:edition() == ee.
republish_action(Topic) -> republish_action(Topic) ->
republish_action(Topic, <<"${payload}">>). republish_action(Topic, <<"${payload}">>).
@ -4251,7 +4315,43 @@ verify_event_fields('client.check_authn_complete', Fields) ->
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
?assert(erlang:is_boolean(IsAnonymous)), ?assert(erlang:is_boolean(IsAnonymous)),
?assert(erlang:is_boolean(IsSuperuser)). ?assert(erlang:is_boolean(IsSuperuser));
verify_event_fields('schema.validation_failed', Fields) ->
#{
validation := ValidationName,
clientid := ClientId,
username := Username,
payload := _Payload,
peername := PeerName,
qos := _QoS,
topic := _Topic,
flags := _Flags,
pub_props := _PubProps,
publish_received_at := _PublishReceivedAt
} = Fields,
?assertEqual(<<"v1">>, ValidationName),
verify_peername(PeerName),
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
ok;
verify_event_fields('message.transformation_failed', Fields) ->
#{
transformation := TransformationName,
clientid := ClientId,
username := Username,
payload := _Payload,
peername := PeerName,
qos := _QoS,
topic := _Topic,
flags := _Flags,
pub_props := _PubProps,
publish_received_at := _PublishReceivedAt
} = Fields,
?assertEqual(<<"t1">>, TransformationName),
verify_peername(PeerName),
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
ok.
verify_peername(PeerName) -> verify_peername(PeerName) ->
case string:split(PeerName, ":") of case string:split(PeerName, ":") of