feat: create new `message.validation_failed` hookpoint and rule engine event
This commit is contained in:
parent
62030e8942
commit
00aa7b5621
|
@ -59,6 +59,7 @@
|
|||
'message.publish',
|
||||
'message.puback',
|
||||
'message.dropped',
|
||||
'message.validation_failed',
|
||||
'message.delivered',
|
||||
'message.acked',
|
||||
'delivery.dropped',
|
||||
|
@ -182,6 +183,9 @@ when
|
|||
-callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) ->
|
||||
callback_result().
|
||||
|
||||
-callback 'message.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) ->
|
||||
callback_result().
|
||||
|
||||
-callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when
|
||||
Msg :: emqx_types:message().
|
||||
|
||||
|
|
|
@ -388,12 +388,14 @@ run_validations(Validations, Message) ->
|
|||
validation => Name,
|
||||
action => ignore
|
||||
}),
|
||||
run_message_validation_failed_hook(Message, Validation),
|
||||
{cont, Acc};
|
||||
FailureAction ->
|
||||
trace_failure(Validation, "validation_failed", #{
|
||||
validation => Name,
|
||||
action => FailureAction
|
||||
}),
|
||||
run_message_validation_failed_hook(Message, Validation),
|
||||
{halt, FailureAction}
|
||||
end
|
||||
end,
|
||||
|
@ -436,12 +438,17 @@ trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) ->
|
|||
name := _Name,
|
||||
failure_action := _Action
|
||||
} = Validation,
|
||||
?tp(message_validation_failure, #{log_level => none, name => _Name, action => _Action}),
|
||||
?tp(message_validation_failed, #{log_level => none, name => _Name, action => _Action}),
|
||||
ok;
|
||||
trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) ->
|
||||
#{
|
||||
name := _Name,
|
||||
failure_action := _Action
|
||||
} = Validation,
|
||||
?tp(message_validation_failure, #{log_level => Level, name => _Name, action => _Action}),
|
||||
?tp(message_validation_failed, #{log_level => Level, name => _Name, action => _Action}),
|
||||
?TRACE(Level, ?TRACE_TAG, Msg, Meta).
|
||||
|
||||
run_message_validation_failed_hook(Message, Validation) ->
|
||||
#{name := Name} = Validation,
|
||||
ValidationContext = #{name => Name},
|
||||
emqx_hooks:run('message.validation_failed', [Message, ValidationContext]).
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
-define(RECORDED_EVENTS_TAB, recorded_actions).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -328,6 +330,39 @@ assert_index_order(ExpectedOrder, Topic, Comment) ->
|
|||
Comment
|
||||
).
|
||||
|
||||
create_failure_tracing_rule() ->
|
||||
Params = #{
|
||||
enable => true,
|
||||
sql => <<"select * from \"$events/message_validation_failed\" ">>,
|
||||
actions => [make_trace_fn_action()]
|
||||
},
|
||||
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
|
||||
Res = request(post, Path, Params),
|
||||
ct:pal("create failure tracing rule result:\n ~p", [Res]),
|
||||
case Res of
|
||||
{ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} ->
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
simplify_result(Res);
|
||||
_ ->
|
||||
simplify_result(Res)
|
||||
end.
|
||||
|
||||
make_trace_fn_action() ->
|
||||
persistent_term:put({?MODULE, test_pid}, self()),
|
||||
Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>,
|
||||
emqx_utils_ets:new(?RECORDED_EVENTS_TAB, [named_table, public, ordered_set]),
|
||||
#{function => Fn, args => #{}}.
|
||||
|
||||
trace_rule(Data, Envs, _Args) ->
|
||||
Now = erlang:monotonic_time(),
|
||||
ets:insert(?RECORDED_EVENTS_TAB, {Now, #{data => Data, envs => Envs}}),
|
||||
TestPid = persistent_term:get({?MODULE, test_pid}),
|
||||
TestPid ! {action, #{data => Data, envs => Envs}},
|
||||
ok.
|
||||
|
||||
get_traced_failures_from_rule_engine() ->
|
||||
ets:tab2list(?RECORDED_EVENTS_TAB).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -579,16 +614,16 @@ t_log_failure_none(_Config) ->
|
|||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch([#{log_level := none}], ?of_kind(message_validation_failure, Trace)),
|
||||
?assertMatch([#{log_level := none}], ?of_kind(message_validation_failed, Trace)),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_action_ignore(_Config) ->
|
||||
Name1 = <<"foo">>,
|
||||
?check_trace(
|
||||
begin
|
||||
Name1 = <<"foo">>,
|
||||
AlwaysFailCheck = sql_check(<<"select * where false">>),
|
||||
Validation1 = validation(
|
||||
Name1,
|
||||
|
@ -598,18 +633,25 @@ t_action_ignore(_Config) ->
|
|||
|
||||
{201, _} = insert(Validation1),
|
||||
|
||||
{201, _} = create_failure_tracing_rule(),
|
||||
|
||||
C = connect(<<"c1">>),
|
||||
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
|
||||
|
||||
ok = publish(C, <<"t/1">>, #{}),
|
||||
?assertReceive({publish, _}),
|
||||
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch([#{action := ignore}], ?of_kind(message_validation_failure, Trace)),
|
||||
?assertMatch([#{action := ignore}], ?of_kind(message_validation_failed, Trace)),
|
||||
ok
|
||||
end
|
||||
),
|
||||
?assertMatch(
|
||||
[{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}],
|
||||
get_traced_failures_from_rule_engine()
|
||||
),
|
||||
ok.
|
||||
|
||||
t_enable_disable_via_api_endpoint(_Config) ->
|
||||
|
@ -717,6 +759,8 @@ t_any_pass(_Config) ->
|
|||
|
||||
%% Checks that multiple validations are run in order.
|
||||
t_multiple_validations(_Config) ->
|
||||
{201, _} = create_failure_tracing_rule(),
|
||||
|
||||
Name1 = <<"foo">>,
|
||||
Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>),
|
||||
Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}),
|
||||
|
@ -732,14 +776,24 @@ t_multiple_validations(_Config) ->
|
|||
|
||||
ok = publish(C, <<"t/1">>, #{x => 11, y => 1}),
|
||||
?assertReceive({publish, _}),
|
||||
%% Barred by `Name1'
|
||||
ok = publish(C, <<"t/1">>, #{x => 7, y => 0}),
|
||||
?assertNotReceive({publish, _}),
|
||||
?assertNotReceive({disconnected, _, _}),
|
||||
%% Barred by `Name2'
|
||||
unlink(C),
|
||||
ok = publish(C, <<"t/1">>, #{x => 0, y => 1}),
|
||||
?assertNotReceive({publish, _}),
|
||||
?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}),
|
||||
|
||||
?assertMatch(
|
||||
[
|
||||
{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}},
|
||||
{_, #{data := #{validation := Name2, event := 'message.validation_failed'}}}
|
||||
],
|
||||
get_traced_failures_from_rule_engine()
|
||||
),
|
||||
|
||||
ok.
|
||||
|
||||
t_schema_check_non_existent_serde(_Config) ->
|
||||
|
|
|
@ -44,6 +44,7 @@
|
|||
on_session_unsubscribed/4,
|
||||
on_message_publish/2,
|
||||
on_message_dropped/4,
|
||||
on_message_validation_failed/3,
|
||||
on_message_delivered/3,
|
||||
on_message_acked/3,
|
||||
on_delivery_dropped/4,
|
||||
|
@ -78,6 +79,7 @@ event_names() ->
|
|||
'message.delivered',
|
||||
'message.acked',
|
||||
'message.dropped',
|
||||
'message.validation_failed',
|
||||
'delivery.dropped'
|
||||
].
|
||||
|
||||
|
@ -93,6 +95,7 @@ event_topics_enum() ->
|
|||
'$events/message_delivered',
|
||||
'$events/message_acked',
|
||||
'$events/message_dropped',
|
||||
'$events/message_validation_failed',
|
||||
'$events/delivery_dropped'
|
||||
% '$events/message_publish' % not possible to use in SELECT FROM
|
||||
].
|
||||
|
@ -221,6 +224,19 @@ on_message_dropped(Message, _, Reason, Conf) ->
|
|||
end,
|
||||
{ok, Message}.
|
||||
|
||||
on_message_validation_failed(Message, ValidationContext, Conf) ->
|
||||
case ignore_sys_message(Message) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
apply_event(
|
||||
'message.validation_failed',
|
||||
fun() -> eventmsg_validation_failed(Message, ValidationContext) end,
|
||||
Conf
|
||||
)
|
||||
end,
|
||||
{ok, Message}.
|
||||
|
||||
on_message_delivered(ClientInfo, Message, Conf) ->
|
||||
case ignore_sys_message(Message) of
|
||||
true ->
|
||||
|
@ -477,6 +493,38 @@ eventmsg_dropped(
|
|||
#{headers => Headers}
|
||||
).
|
||||
|
||||
eventmsg_validation_failed(
|
||||
Message = #message{
|
||||
id = Id,
|
||||
from = ClientId,
|
||||
qos = QoS,
|
||||
flags = Flags,
|
||||
topic = Topic,
|
||||
headers = Headers,
|
||||
payload = Payload,
|
||||
timestamp = Timestamp
|
||||
},
|
||||
ValidationContext
|
||||
) ->
|
||||
#{name := ValidationName} = ValidationContext,
|
||||
with_basic_columns(
|
||||
'message.validation_failed',
|
||||
#{
|
||||
id => emqx_guid:to_hexstr(Id),
|
||||
validation => ValidationName,
|
||||
clientid => ClientId,
|
||||
username => emqx_message:get_header(username, Message, undefined),
|
||||
payload => Payload,
|
||||
peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
|
||||
topic => Topic,
|
||||
qos => QoS,
|
||||
flags => Flags,
|
||||
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
|
||||
publish_received_at => Timestamp
|
||||
},
|
||||
#{headers => Headers}
|
||||
).
|
||||
|
||||
eventmsg_delivered(
|
||||
_ClientInfo = #{
|
||||
peerhost := PeerHost,
|
||||
|
@ -627,6 +675,7 @@ event_info() ->
|
|||
event_info_message_deliver(),
|
||||
event_info_message_acked(),
|
||||
event_info_message_dropped(),
|
||||
event_info_message_validation_failed(),
|
||||
event_info_client_connected(),
|
||||
event_info_client_disconnected(),
|
||||
event_info_client_connack(),
|
||||
|
@ -666,6 +715,13 @@ event_info_message_dropped() ->
|
|||
<<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>},
|
||||
<<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
|
||||
).
|
||||
event_info_message_validation_failed() ->
|
||||
event_info_common(
|
||||
'message.validation_failed',
|
||||
{<<"message validation failed">>, <<"TODO"/utf8>>},
|
||||
{<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>},
|
||||
<<"SELECT * FROM \"$events/message_validation_failed\" WHERE topic =~ 't/#'">>
|
||||
).
|
||||
event_info_delivery_dropped() ->
|
||||
event_info_common(
|
||||
'delivery.dropped',
|
||||
|
@ -737,6 +793,9 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
|
|||
test_columns('message.dropped') ->
|
||||
[{<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]}] ++
|
||||
test_columns('message.publish');
|
||||
test_columns('message.validation_failed') ->
|
||||
[{<<"validation">>, <<"myvalidation">>}] ++
|
||||
test_columns('message.publish');
|
||||
test_columns('message.publish') ->
|
||||
[
|
||||
{<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]},
|
||||
|
@ -840,6 +899,23 @@ columns_with_exam('message.dropped') ->
|
|||
{<<"timestamp">>, erlang:system_time(millisecond)},
|
||||
{<<"node">>, node()}
|
||||
];
|
||||
columns_with_exam('message.validation_failed') ->
|
||||
[
|
||||
{<<"event">>, 'message.validation_failed'},
|
||||
{<<"validation">>, <<"my_validation">>},
|
||||
{<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())},
|
||||
{<<"clientid">>, <<"c_emqx">>},
|
||||
{<<"username">>, <<"u_emqx">>},
|
||||
{<<"payload">>, <<"{\"msg\": \"hello\"}">>},
|
||||
{<<"peerhost">>, <<"192.168.0.10">>},
|
||||
{<<"topic">>, <<"t/a">>},
|
||||
{<<"qos">>, 1},
|
||||
{<<"flags">>, #{}},
|
||||
{<<"publish_received_at">>, erlang:system_time(millisecond)},
|
||||
columns_example_props(pub_props),
|
||||
{<<"timestamp">>, erlang:system_time(millisecond)},
|
||||
{<<"node">>, node()}
|
||||
];
|
||||
columns_with_exam('delivery.dropped') ->
|
||||
[
|
||||
{<<"event">>, 'delivery.dropped'},
|
||||
|
@ -1030,6 +1106,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
|
|||
hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
|
||||
hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
|
||||
hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
|
||||
hook_fun('message.validation_failed') -> fun ?MODULE:on_message_validation_failed/3;
|
||||
hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
|
||||
hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
|
||||
hook_fun(Event) -> error({invalid_event, Event}).
|
||||
|
@ -1054,6 +1131,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
|
|||
event_name(<<"$events/message_delivered">>) -> 'message.delivered';
|
||||
event_name(<<"$events/message_acked">>) -> 'message.acked';
|
||||
event_name(<<"$events/message_dropped">>) -> 'message.dropped';
|
||||
event_name(<<"$events/message_validation_failed">>) -> 'message.validation_failed';
|
||||
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
|
||||
event_name(_) -> 'message.publish'.
|
||||
|
||||
|
@ -1067,6 +1145,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
|
|||
event_topic('message.delivered') -> <<"$events/message_delivered">>;
|
||||
event_topic('message.acked') -> <<"$events/message_acked">>;
|
||||
event_topic('message.dropped') -> <<"$events/message_dropped">>;
|
||||
event_topic('message.validation_failed') -> <<"$events/message_validation_failed">>;
|
||||
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
|
||||
event_topic('message.publish') -> <<"$events/message_publish">>.
|
||||
|
||||
|
|
Loading…
Reference in New Issue