diff --git a/apps/emqx/src/emqx_hookpoints.erl b/apps/emqx/src/emqx_hookpoints.erl index 70cf4b41a..1f7b6481b 100644 --- a/apps/emqx/src/emqx_hookpoints.erl +++ b/apps/emqx/src/emqx_hookpoints.erl @@ -59,6 +59,7 @@ 'message.publish', 'message.puback', 'message.dropped', + 'message.validation_failed', 'message.delivered', 'message.acked', 'delivery.dropped', @@ -182,6 +183,9 @@ when -callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) -> callback_result(). +-callback 'message.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) -> + callback_result(). + -callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when Msg :: emqx_types:message(). diff --git a/apps/emqx_message_validation/src/emqx_message_validation.erl b/apps/emqx_message_validation/src/emqx_message_validation.erl index 74de48eac..f0fdb6dee 100644 --- a/apps/emqx_message_validation/src/emqx_message_validation.erl +++ b/apps/emqx_message_validation/src/emqx_message_validation.erl @@ -388,12 +388,14 @@ run_validations(Validations, Message) -> validation => Name, action => ignore }), + run_message_validation_failed_hook(Message, Validation), {cont, Acc}; FailureAction -> trace_failure(Validation, "validation_failed", #{ validation => Name, action => FailureAction }), + run_message_validation_failed_hook(Message, Validation), {halt, FailureAction} end end, @@ -436,12 +438,17 @@ trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) -> name := _Name, failure_action := _Action } = Validation, - ?tp(message_validation_failure, #{log_level => none, name => _Name, action => _Action}), + ?tp(message_validation_failed, #{log_level => none, name => _Name, action => _Action}), ok; trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) -> #{ name := _Name, failure_action := _Action } = Validation, - ?tp(message_validation_failure, #{log_level => Level, name => _Name, action => _Action}), + ?tp(message_validation_failed, #{log_level => Level, name => _Name, action => _Action}), ?TRACE(Level, ?TRACE_TAG, Msg, Meta). + +run_message_validation_failed_hook(Message, Validation) -> + #{name := Name} = Validation, + ValidationContext = #{name => Name}, + emqx_hooks:run('message.validation_failed', [Message, ValidationContext]). diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl index 9e22900fb..b43c8ce35 100644 --- a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -14,6 +14,8 @@ -import(emqx_common_test_helpers, [on_exit/1]). +-define(RECORDED_EVENTS_TAB, recorded_actions). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -328,6 +330,39 @@ assert_index_order(ExpectedOrder, Topic, Comment) -> Comment ). +create_failure_tracing_rule() -> + Params = #{ + enable => true, + sql => <<"select * from \"$events/message_validation_failed\" ">>, + actions => [make_trace_fn_action()] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + Res = request(post, Path, Params), + ct:pal("create failure tracing rule result:\n ~p", [Res]), + case Res of + {ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} -> + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + simplify_result(Res); + _ -> + simplify_result(Res) + end. + +make_trace_fn_action() -> + persistent_term:put({?MODULE, test_pid}, self()), + Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>, + emqx_utils_ets:new(?RECORDED_EVENTS_TAB, [named_table, public, ordered_set]), + #{function => Fn, args => #{}}. + +trace_rule(Data, Envs, _Args) -> + Now = erlang:monotonic_time(), + ets:insert(?RECORDED_EVENTS_TAB, {Now, #{data => Data, envs => Envs}}), + TestPid = persistent_term:get({?MODULE, test_pid}), + TestPid ! {action, #{data => Data, envs => Envs}}, + ok. + +get_traced_failures_from_rule_engine() -> + ets:tab2list(?RECORDED_EVENTS_TAB). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -579,16 +614,16 @@ t_log_failure_none(_Config) -> ok end, fun(Trace) -> - ?assertMatch([#{log_level := none}], ?of_kind(message_validation_failure, Trace)), + ?assertMatch([#{log_level := none}], ?of_kind(message_validation_failed, Trace)), ok end ), ok. t_action_ignore(_Config) -> + Name1 = <<"foo">>, ?check_trace( begin - Name1 = <<"foo">>, AlwaysFailCheck = sql_check(<<"select * where false">>), Validation1 = validation( Name1, @@ -598,18 +633,25 @@ t_action_ignore(_Config) -> {201, _} = insert(Validation1), + {201, _} = create_failure_tracing_rule(), + C = connect(<<"c1">>), {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), ok = publish(C, <<"t/1">>, #{}), ?assertReceive({publish, _}), + ok end, fun(Trace) -> - ?assertMatch([#{action := ignore}], ?of_kind(message_validation_failure, Trace)), + ?assertMatch([#{action := ignore}], ?of_kind(message_validation_failed, Trace)), ok end ), + ?assertMatch( + [{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}], + get_traced_failures_from_rule_engine() + ), ok. t_enable_disable_via_api_endpoint(_Config) -> @@ -717,6 +759,8 @@ t_any_pass(_Config) -> %% Checks that multiple validations are run in order. t_multiple_validations(_Config) -> + {201, _} = create_failure_tracing_rule(), + Name1 = <<"foo">>, Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>), Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}), @@ -732,14 +776,24 @@ t_multiple_validations(_Config) -> ok = publish(C, <<"t/1">>, #{x => 11, y => 1}), ?assertReceive({publish, _}), + %% Barred by `Name1' ok = publish(C, <<"t/1">>, #{x => 7, y => 0}), ?assertNotReceive({publish, _}), ?assertNotReceive({disconnected, _, _}), + %% Barred by `Name2' unlink(C), ok = publish(C, <<"t/1">>, #{x => 0, y => 1}), ?assertNotReceive({publish, _}), ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}), + ?assertMatch( + [ + {_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}, + {_, #{data := #{validation := Name2, event := 'message.validation_failed'}}} + ], + get_traced_failures_from_rule_engine() + ), + ok. t_schema_check_non_existent_serde(_Config) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index df5b2af3c..90bede778 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -44,6 +44,7 @@ on_session_unsubscribed/4, on_message_publish/2, on_message_dropped/4, + on_message_validation_failed/3, on_message_delivered/3, on_message_acked/3, on_delivery_dropped/4, @@ -78,6 +79,7 @@ event_names() -> 'message.delivered', 'message.acked', 'message.dropped', + 'message.validation_failed', 'delivery.dropped' ]. @@ -93,6 +95,7 @@ event_topics_enum() -> '$events/message_delivered', '$events/message_acked', '$events/message_dropped', + '$events/message_validation_failed', '$events/delivery_dropped' % '$events/message_publish' % not possible to use in SELECT FROM ]. @@ -221,6 +224,19 @@ on_message_dropped(Message, _, Reason, Conf) -> end, {ok, Message}. +on_message_validation_failed(Message, ValidationContext, Conf) -> + case ignore_sys_message(Message) of + true -> + ok; + false -> + apply_event( + 'message.validation_failed', + fun() -> eventmsg_validation_failed(Message, ValidationContext) end, + Conf + ) + end, + {ok, Message}. + on_message_delivered(ClientInfo, Message, Conf) -> case ignore_sys_message(Message) of true -> @@ -477,6 +493,38 @@ eventmsg_dropped( #{headers => Headers} ). +eventmsg_validation_failed( + Message = #message{ + id = Id, + from = ClientId, + qos = QoS, + flags = Flags, + topic = Topic, + headers = Headers, + payload = Payload, + timestamp = Timestamp + }, + ValidationContext +) -> + #{name := ValidationName} = ValidationContext, + with_basic_columns( + 'message.validation_failed', + #{ + id => emqx_guid:to_hexstr(Id), + validation => ValidationName, + clientid => ClientId, + username => emqx_message:get_header(username, Message, undefined), + payload => Payload, + peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)), + topic => Topic, + qos => QoS, + flags => Flags, + pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), + publish_received_at => Timestamp + }, + #{headers => Headers} + ). + eventmsg_delivered( _ClientInfo = #{ peerhost := PeerHost, @@ -627,6 +675,7 @@ event_info() -> event_info_message_deliver(), event_info_message_acked(), event_info_message_dropped(), + event_info_message_validation_failed(), event_info_client_connected(), event_info_client_disconnected(), event_info_client_connack(), @@ -666,6 +715,13 @@ event_info_message_dropped() -> <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> ). +event_info_message_validation_failed() -> + event_info_common( + 'message.validation_failed', + {<<"message validation failed">>, <<"TODO"/utf8>>}, + {<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>}, + <<"SELECT * FROM \"$events/message_validation_failed\" WHERE topic =~ 't/#'">> + ). event_info_delivery_dropped() -> event_info_common( 'delivery.dropped', @@ -737,6 +793,9 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> test_columns('message.dropped') -> [{<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]}] ++ test_columns('message.publish'); +test_columns('message.validation_failed') -> + [{<<"validation">>, <<"myvalidation">>}] ++ + test_columns('message.publish'); test_columns('message.publish') -> [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]}, @@ -840,6 +899,23 @@ columns_with_exam('message.dropped') -> {<<"timestamp">>, erlang:system_time(millisecond)}, {<<"node">>, node()} ]; +columns_with_exam('message.validation_failed') -> + [ + {<<"event">>, 'message.validation_failed'}, + {<<"validation">>, <<"my_validation">>}, + {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}, + {<<"clientid">>, <<"c_emqx">>}, + {<<"username">>, <<"u_emqx">>}, + {<<"payload">>, <<"{\"msg\": \"hello\"}">>}, + {<<"peerhost">>, <<"192.168.0.10">>}, + {<<"topic">>, <<"t/a">>}, + {<<"qos">>, 1}, + {<<"flags">>, #{}}, + {<<"publish_received_at">>, erlang:system_time(millisecond)}, + columns_example_props(pub_props), + {<<"timestamp">>, erlang:system_time(millisecond)}, + {<<"node">>, node()} + ]; columns_with_exam('delivery.dropped') -> [ {<<"event">>, 'delivery.dropped'}, @@ -1030,6 +1106,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4; hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3; hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3; hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4; +hook_fun('message.validation_failed') -> fun ?MODULE:on_message_validation_failed/3; hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4; hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2; hook_fun(Event) -> error({invalid_event, Event}). @@ -1054,6 +1131,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed'; event_name(<<"$events/message_delivered">>) -> 'message.delivered'; event_name(<<"$events/message_acked">>) -> 'message.acked'; event_name(<<"$events/message_dropped">>) -> 'message.dropped'; +event_name(<<"$events/message_validation_failed">>) -> 'message.validation_failed'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. @@ -1067,6 +1145,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; +event_topic('message.validation_failed') -> <<"$events/message_validation_failed">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('message.publish') -> <<"$events/message_publish">>.