From f84a9966712ed832f6397a52d53019f7cc4ed4e5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 12 Mar 2024 17:02:26 -0300 Subject: [PATCH] feat: implement message validation Fixes https://emqx.atlassian.net/browse/EMQX-11980 --- apps/emqx/include/emqx_hooks.hrl | 1 + apps/emqx/src/emqx_broker.erl | 6 + apps/emqx/src/emqx_channel.erl | 17 +- apps/emqx/src/emqx_types.erl | 12 +- apps/emqx/test/emqx_cth_suite.erl | 4 + apps/emqx_machine/priv/reboot_lists.eterm | 1 + apps/emqx_message_validation/BSL.txt | 94 +++ apps/emqx_message_validation/README.md | 29 + apps/emqx_message_validation/rebar.config | 16 + .../src/emqx_message_validation.app.src | 14 + .../src/emqx_message_validation.erl | 380 ++++++++++ .../src/emqx_message_validation_app.erl | 32 + .../src/emqx_message_validation_http_api.erl | 385 ++++++++++ .../src/emqx_message_validation_registry.erl | 225 ++++++ .../src/emqx_message_validation_schema.erl | 206 ++++++ .../src/emqx_message_validation_sup.erl | 47 ++ ...emqx_message_validation_http_api_SUITE.erl | 690 ++++++++++++++++++ .../test/emqx_message_validation_tests.erl | 194 +++++ .../emqx_rule_engine/src/emqx_rule_engine.erl | 6 - .../src/emqx_rule_engine_app.erl | 3 +- .../src/emqx_rule_engine_schema.erl | 3 +- .../src/emqx_rule_engine_sup.erl | 30 +- .../src/emqx_rule_runtime.erl | 61 +- .../src/emqx_rule_sqlparser.erl | 88 ++- .../test/emqx_rule_funcs_SUITE.erl | 17 +- apps/emqx_utils/src/emqx_utils.erl | 14 +- apps/emqx_utils/src/emqx_utils_maps.erl | 4 +- apps/emqx_utils/test/emqx_utils_tests.erl | 34 + changes/ce/feat-12711.en.md | 3 + mix.exs | 3 +- rebar.config | 2 +- rebar.config.erl | 1 + .../emqx_message_validation_http_api.hocon | 24 + rel/i18n/emqx_message_validation_schema.hocon | 88 +++ 34 files changed, 2654 insertions(+), 80 deletions(-) create mode 100644 apps/emqx_message_validation/BSL.txt create mode 100644 apps/emqx_message_validation/README.md create mode 100644 apps/emqx_message_validation/rebar.config create mode 100644 apps/emqx_message_validation/src/emqx_message_validation.app.src create mode 100644 apps/emqx_message_validation/src/emqx_message_validation.erl create mode 100644 apps/emqx_message_validation/src/emqx_message_validation_app.erl create mode 100644 apps/emqx_message_validation/src/emqx_message_validation_http_api.erl create mode 100644 apps/emqx_message_validation/src/emqx_message_validation_registry.erl create mode 100644 apps/emqx_message_validation/src/emqx_message_validation_schema.erl create mode 100644 apps/emqx_message_validation/src/emqx_message_validation_sup.erl create mode 100644 apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl create mode 100644 apps/emqx_message_validation/test/emqx_message_validation_tests.erl create mode 100644 changes/ce/feat-12711.en.md create mode 100644 rel/i18n/emqx_message_validation_http_api.hocon create mode 100644 rel/i18n/emqx_message_validation_schema.hocon diff --git a/apps/emqx/include/emqx_hooks.hrl b/apps/emqx/include/emqx_hooks.hrl index 3bb2baad2..2e632d545 100644 --- a/apps/emqx/include/emqx_hooks.hrl +++ b/apps/emqx/include/emqx_hooks.hrl @@ -25,6 +25,7 @@ -define(HP_AUTHN, 970). -define(HP_AUTHZ, 960). -define(HP_SYS_MSGS, 950). +-define(HP_MSG_VALIDATION, 945). -define(HP_TOPIC_METRICS, 940). -define(HP_RETAINER, 930). -define(HP_AUTO_SUB, 920). diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index b20c3a15f..6f6580517 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -235,6 +235,12 @@ publish(Msg) when is_record(Msg, message) -> _ = emqx_trace:publish(Msg), emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'), case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of + #message{headers = #{should_disconnect := true}, topic = Topic} -> + ?TRACE("MQTT", "msg_publish_not_allowed_disconnect", #{ + message => emqx_message:to_log_map(Msg), + topic => Topic + }), + disconnect; #message{headers = #{allow_publish := false}, topic = Topic} -> ?TRACE("MQTT", "msg_publish_not_allowed", #{ message => emqx_message:to_log_map(Msg), diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 51b66f4f9..9e783a054 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -702,14 +702,21 @@ packet_to_message(Packet, #channel{ do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) -> Result = emqx_broker:publish(Msg), - NChannel = ensure_quota(Result, Channel), - {ok, NChannel}; + case Result of + disconnect -> + handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel); + _ -> + NChannel = ensure_quota(Result, Channel), + {ok, NChannel} + end; do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) -> PubRes = emqx_broker:publish(Msg), RC = puback_reason_code(PacketId, Msg, PubRes), case RC of undefined -> {ok, Channel}; + disconnect -> + handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel); _Value -> do_finish_publish(PacketId, PubRes, RC, Channel) end; @@ -719,6 +726,8 @@ do_publish( Channel = #channel{clientinfo = ClientInfo, session = Session} ) -> case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of + {ok, disconnect, _NSession} -> + handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel); {ok, PubRes, NSession} -> RC = pubrec_reason_code(PubRes), NChannel0 = Channel#channel{session = NSession}, @@ -763,7 +772,9 @@ pubrec_reason_code([_ | _]) -> ?RC_SUCCESS. puback_reason_code(PacketId, Msg, [] = PubRes) -> emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS); puback_reason_code(PacketId, Msg, [_ | _] = PubRes) -> - emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS). + emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS); +puback_reason_code(_PacketId, _Msg, disconnect) -> + disconnect. -compile({inline, [after_message_acked/3]}). after_message_acked(ClientInfo, Msg, PubAckProps) -> diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index c53188753..99a71e20b 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -258,11 +258,13 @@ -type deliver() :: {deliver, topic(), message()}. -type delivery() :: #delivery{}. -type deliver_result() :: ok | {ok, non_neg_integer()} | {error, term()}. --type publish_result() :: [ - {node(), topic(), deliver_result()} - | {share, topic(), deliver_result()} - | persisted -]. +-type publish_result() :: + [ + {node(), topic(), deliver_result()} + | {share, topic(), deliver_result()} + | persisted + ] + | disconnect. -type route() :: #route{}. -type route_entry() :: {topic(), node()} | {topic, group()}. -type command() :: #command{}. diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index eae12145f..6a3eea5d9 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -380,6 +380,10 @@ default_appspec(emqx_dashboard, _SuiteOpts) -> true = emqx_dashboard_listener:is_ready(infinity) end }; +default_appspec(emqx_schema_registry, _SuiteOpts) -> + #{schema_mod => emqx_schema_registry_schema, config => #{}}; +default_appspec(emqx_message_validation, _SuiteOpts) -> + #{schema_mod => emqx_message_validation_schema, config => #{}}; default_appspec(_, _) -> #{}. diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index a6559bcab..ef42c0fc9 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -88,6 +88,7 @@ [ emqx_license, emqx_enterprise, + emqx_message_validation, emqx_bridge_kafka, emqx_bridge_pulsar, emqx_bridge_gcp_pubsub, diff --git a/apps/emqx_message_validation/BSL.txt b/apps/emqx_message_validation/BSL.txt new file mode 100644 index 000000000..f0cd31c6f --- /dev/null +++ b/apps/emqx_message_validation/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2028-01-26 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_message_validation/README.md b/apps/emqx_message_validation/README.md new file mode 100644 index 000000000..c32e74147 --- /dev/null +++ b/apps/emqx_message_validation/README.md @@ -0,0 +1,29 @@ +# EMQX Message Validation + +This application encapsulates the functionality to validate incoming or internally +triggered published payloads and take an action upon failure, which can be to just drop +the message without further processing, or to disconnect the offending client as well. + +# Documentation + +Refer to [Message +Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-validation.html) +for more information about the semantics and checks available. + +# HTTP APIs + +APIs are provided for validation management, which includes creating, +updating, looking up, deleting, listing validations. + +Refer to [API Docs - +Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Validation) +for more detailed information. + + +# Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/apps/emqx_message_validation/rebar.config b/apps/emqx_message_validation/rebar.config new file mode 100644 index 000000000..170115ef6 --- /dev/null +++ b/apps/emqx_message_validation/rebar.config @@ -0,0 +1,16 @@ +%% -*- mode: erlang -*- + +{erl_opts, [ + warn_unused_vars, + warn_shadow_vars, + warn_unused_import, + warn_obsolete_guard, + warnings_as_errors, + debug_info +]}. +{deps, [ + {emqx, {path, "../emqx"}}, + {emqx_utils, {path, "../emqx_utils"}}, + {emqx_rule_engine, {path, "../emqx_rule_engine"}}, + {emqx_schema_registry, {path, "../emqx_schema_registry"}} +]}. diff --git a/apps/emqx_message_validation/src/emqx_message_validation.app.src b/apps/emqx_message_validation/src/emqx_message_validation.app.src new file mode 100644 index 000000000..077f839a9 --- /dev/null +++ b/apps/emqx_message_validation/src/emqx_message_validation.app.src @@ -0,0 +1,14 @@ +{application, emqx_message_validation, [ + {description, "EMQX Message Validation"}, + {vsn, "0.1.0"}, + {registered, [emqx_message_validation_sup, emqx_message_validation_registry]}, + {mod, {emqx_message_validation_app, []}}, + {applications, [ + kernel, + stdlib + ]}, + {env, []}, + {modules, []}, + + {links, []} +]}. diff --git a/apps/emqx_message_validation/src/emqx_message_validation.erl b/apps/emqx_message_validation/src/emqx_message_validation.erl new file mode 100644 index 000000000..3513a4210 --- /dev/null +++ b/apps/emqx_message_validation/src/emqx_message_validation.erl @@ -0,0 +1,380 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_message_validation). + +-include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% API +-export([ + add_handler/0, + remove_handler/0, + + load/0, + unload/0, + + list/0, + move/2, + lookup/1, + insert/1, + update/1, + delete/1 +]). + +%% `emqx_hooks' API +-export([ + register_hooks/0, + unregister_hooks/0, + + on_message_publish/1 +]). + +%% `emqx_config_handler' API +-export([pre_config_update/3, post_config_update/5]). + +%% Internal exports +-export([parse_sql_check/1]). + +%% Internal functions; exported for tests +-export([ + evaluate_sql_check/2 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-define(TRACE_TAG, "MESSAGE_VALIDATION"). +-define(CONF_ROOT, message_validation). +-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]). + +-type validation_name() :: binary(). +-type validation() :: _TODO. +-type position() :: front | rear | {'after', validation_name()} | {before, validation_name()}. + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec add_handler() -> ok. +add_handler() -> + ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE), + ok. + +-spec remove_handler() -> ok. +remove_handler() -> + ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH), + ok. + +load() -> + lists:foreach(fun insert/1, emqx:get_config(?VALIDATIONS_CONF_PATH, [])). + +unload() -> + lists:foreach(fun delete/1, emqx:get_config(?VALIDATIONS_CONF_PATH, [])). + +-spec list() -> [validation()]. +list() -> + emqx:get_config(?VALIDATIONS_CONF_PATH, []). + +-spec move(validation_name(), position()) -> + {ok, _} | {error, _}. +move(Name, Position) -> + emqx:update_config( + ?VALIDATIONS_CONF_PATH, + {move, Name, Position}, + #{override_to => cluster} + ). + +-spec lookup(validation_name()) -> {ok, validation()} | {error, not_found}. +lookup(Name) -> + Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []), + do_lookup(Name, Validations). + +-spec insert(validation()) -> + {ok, _} | {error, _}. +insert(Validation) -> + emqx:update_config( + ?VALIDATIONS_CONF_PATH, + {append, Validation}, + #{override_to => cluster} + ). + +-spec update(validation()) -> + {ok, _} | {error, _}. +update(Validation) -> + emqx:update_config( + ?VALIDATIONS_CONF_PATH, + {update, Validation}, + #{override_to => cluster} + ). + +-spec delete(validation_name()) -> + {ok, _} | {error, _}. +delete(Name) -> + emqx:update_config( + ?VALIDATIONS_CONF_PATH, + {delete, Name}, + #{override_to => cluster} + ). + +%%------------------------------------------------------------------------------ +%% Hooks +%%------------------------------------------------------------------------------ + +-spec register_hooks() -> ok. +register_hooks() -> + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_MSG_VALIDATION). + +-spec unregister_hooks() -> ok. +unregister_hooks() -> + emqx_hooks:del('message.publish', {?MODULE, on_message_publish}). + +-spec on_message_publish(emqx_types:message()) -> + {ok, emqx_types:message()} | {stop, emqx_types:message()}. +on_message_publish(Message = #message{topic = Topic, headers = Headers}) -> + case emqx_message_validation_registry:matching_validations(Topic) of + [] -> + ok; + Validations -> + case run_validations(Validations, Message) of + ok -> + {ok, Message}; + drop -> + {stop, Message#message{headers = Headers#{allow_publish => false}}}; + disconnect -> + {stop, Message#message{ + headers = Headers#{ + allow_publish => false, + should_disconnect => true + } + }} + end + end. + +%%------------------------------------------------------------------------------ +%% `emqx_config_handler' API +%%------------------------------------------------------------------------------ + +pre_config_update(?VALIDATIONS_CONF_PATH, {append, Validation}, OldValidations) -> + Validations = OldValidations ++ [Validation], + {ok, Validations}; +pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations) -> + replace(OldValidations, Validation); +pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) -> + delete(OldValidations, Validation); +pre_config_update(?VALIDATIONS_CONF_PATH, {move, Name, Position}, OldValidations) -> + move(OldValidations, Name, Position). + +post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) -> + {Pos, Validation} = fetch_with_index(New, Name), + ok = emqx_message_validation_registry:insert(Pos, Validation), + ok; +post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) -> + {_Pos, OldValidation} = fetch_with_index(Old, Name), + {Pos, NewValidation} = fetch_with_index(New, Name), + ok = emqx_message_validation_registry:update(OldValidation, Pos, NewValidation), + ok; +post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) -> + {_Pos, Validation} = fetch_with_index(Old, Name), + ok = emqx_message_validation_registry:delete(Validation), + ok; +post_config_update(?VALIDATIONS_CONF_PATH, {move, _Name, _Position}, New, _Old, _AppEnvs) -> + ok = emqx_message_validation_registry:reindex_positions(New), + ok. + +%%------------------------------------------------------------------------------ +%% Internal exports +%%------------------------------------------------------------------------------ + +parse_sql_check(SQL) -> + case emqx_rule_sqlparser:parse(SQL, #{with_from => false}) of + {ok, Select} -> + case emqx_rule_sqlparser:select_is_foreach(Select) of + true -> + {error, foreach_not_allowed}; + false -> + Check = #{ + type => sql, + fields => emqx_rule_sqlparser:select_fields(Select), + conditions => emqx_rule_sqlparser:select_where(Select) + }, + {ok, Check} + end; + Error = {error, _} -> + Error + end. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +evaluate_sql_check(Check, Message) -> + #{ + fields := Fields, + conditions := Conditions + } = Check, + {Data, _} = emqx_rule_events:eventmsg_publish(Message), + try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of + {ok, _} -> + true; + false -> + false + catch + throw:_Reason -> + %% TODO: log? + false; + _Class:_Error:_Stacktrace -> + %% TODO: log? + false + end. + +evaluate_schema_check(Check, #message{payload = Data}) -> + #{schema := SerdeName} = Check, + ExtraArgs = + case Check of + #{type := protobuf, message_name := MessageName} -> + [MessageName]; + _ -> + [] + end, + try + emqx_schema_registry_serde:handle_rule_function(schema_check, [SerdeName, Data | ExtraArgs]) + catch + error:{serde_not_found, _} -> + false; + _Class:_Error:_Stacktrace -> + %% TODO: log? + false + end. + +replace(OldValidations, Validation = #{<<"name">> := Name}) -> + {Found, RevNewValidations} = + lists:foldl( + fun + (#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name -> + {true, [Validation | Acc]}; + (Val, {FoundIn, Acc}) -> + {FoundIn, [Val | Acc]} + end, + {false, []}, + OldValidations + ), + case Found of + true -> + {ok, lists:reverse(RevNewValidations)}; + false -> + {error, not_found} + end. + +delete(OldValidations, Name) -> + {Found, RevNewValidations} = + lists:foldl( + fun + (#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name -> + {true, Acc}; + (Val, {FoundIn, Acc}) -> + {FoundIn, [Val | Acc]} + end, + {false, []}, + OldValidations + ), + case Found of + true -> + {ok, lists:reverse(RevNewValidations)}; + false -> + {error, not_found} + end. + +move(OldValidations, Name, front) -> + {Validation, Front, Rear} = take(Name, OldValidations), + {ok, [Validation | Front ++ Rear]}; +move(OldValidations, Name, rear) -> + {Validation, Front, Rear} = take(Name, OldValidations), + {ok, Front ++ Rear ++ [Validation]}; +move(OldValidations, Name, {'after', OtherName}) -> + {Validation, Front1, Rear1} = take(Name, OldValidations), + {OtherValidation, Front2, Rear2} = take(OtherName, Front1 ++ Rear1), + {ok, Front2 ++ [OtherValidation, Validation] ++ Rear2}; +move(OldValidations, Name, {before, OtherName}) -> + {Validation, Front1, Rear1} = take(Name, OldValidations), + {OtherValidation, Front2, Rear2} = take(OtherName, Front1 ++ Rear1), + {ok, Front2 ++ [Validation, OtherValidation] ++ Rear2}. + +fetch_with_index([{Pos, #{name := Name} = Validation} | _Rest], Name) -> + {Pos, Validation}; +fetch_with_index([{_, _} | Rest], Name) -> + fetch_with_index(Rest, Name); +fetch_with_index(Validations, Name) -> + fetch_with_index(lists:enumerate(Validations), Name). + +take(Name, Validations) -> + case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Validations) of + {_Front, []} -> + throw({validation_not_found, Name}); + {Front, [Found | Rear]} -> + {Found, Front, Rear} + end. + +do_lookup(_Name, _Validations = []) -> + {error, not_found}; +do_lookup(Name, [#{name := Name} = Validation | _Rest]) -> + {ok, Validation}; +do_lookup(Name, [_ | Rest]) -> + do_lookup(Name, Rest). + +run_validations(Validations, Message) -> + try + emqx_rule_runtime:clear_rule_payload(), + Fun = fun(Validation, Acc) -> + #{ + name := Name, + log_failure_at := FailureLogLevel + } = Validation, + case run_validation(Validation, Message) of + ok -> + {cont, Acc}; + FailureAction -> + ?TRACE( + FailureLogLevel, + ?TRACE_TAG, + "validation_failed", + #{validation => Name, action => FailureAction} + ), + {halt, FailureAction} + end + end, + emqx_utils:foldl_while(Fun, _Passed = ok, Validations) + after + emqx_rule_runtime:clear_rule_payload() + end. + +run_validation(#{strategy := all_pass} = Validation, Message) -> + #{ + checks := Checks, + failure_action := FailureAction + } = Validation, + Fun = fun(Check, Acc) -> + case run_check(Check, Message) of + true -> {cont, Acc}; + false -> {halt, FailureAction} + end + end, + emqx_utils:foldl_while(Fun, _Passed = ok, Checks); +run_validation(#{strategy := any_pass} = Validation, Message) -> + #{ + checks := Checks, + failure_action := FailureAction + } = Validation, + case lists:any(fun(C) -> run_check(C, Message) end, Checks) of + true -> + ok; + false -> + FailureAction + end. + +run_check(#{type := sql} = Check, Message) -> + evaluate_sql_check(Check, Message); +run_check(Check, Message) -> + evaluate_schema_check(Check, Message). diff --git a/apps/emqx_message_validation/src/emqx_message_validation_app.erl b/apps/emqx_message_validation/src/emqx_message_validation_app.erl new file mode 100644 index 000000000..4e566eedd --- /dev/null +++ b/apps/emqx_message_validation/src/emqx_message_validation_app.erl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_message_validation_app). + +-behaviour(application). + +%% `application' API +-export([start/2, stop/1]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% `application' API +%%------------------------------------------------------------------------------ + +-spec start(application:start_type(), term()) -> {ok, pid()}. +start(_Type, _Args) -> + {ok, Sup} = emqx_message_validation_sup:start_link(), + ok = emqx_message_validation:add_handler(), + ok = emqx_message_validation:register_hooks(), + ok = emqx_message_validation:load(), + {ok, Sup}. + +-spec stop(term()) -> ok. +stop(_State) -> + ok = emqx_message_validation:unload(), + ok = emqx_message_validation:unregister_hooks(), + ok = emqx_message_validation:remove_handler(), + ok. diff --git a/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl b/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl new file mode 100644 index 000000000..5cbb911df --- /dev/null +++ b/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl @@ -0,0 +1,385 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_message_validation_http_api). + +-behaviour(minirest_api). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). + +%% `minirest' and `minirest_trails' API +-export([ + namespace/0, + api_spec/0, + fields/1, + paths/0, + schema/1 +]). + +%% `minirest' handlers +-export([ + '/message_validations'/2, + '/message_validations/:name'/2, + '/message_validations/:name/move'/2 +]). + +%%------------------------------------------------------------------------------------------------- +%% Type definitions +%%------------------------------------------------------------------------------------------------- + +-define(TAGS, [<<"Message Validation">>]). + +%%------------------------------------------------------------------------------------------------- +%% `minirest' and `minirest_trails' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> "message_validation_http_api". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + [ + "/message_validations", + "/message_validations/:name", + "/message_validations/:name/move" + ]. + +schema("/message_validations") -> + #{ + 'operationId' => '/message_validations', + get => #{ + tags => ?TAGS, + summary => <<"List validations">>, + description => ?DESC("list_validations"), + responses => + #{ + 200 => + emqx_dashboard_swagger:schema_with_examples( + hoconsc:array( + emqx_message_validation_schema:api_schema(list) + ), + #{ + sample => + #{value => example_return_list()} + } + ) + } + }, + post => #{ + tags => ?TAGS, + summary => <<"Append a new validation">>, + description => ?DESC("append_validation"), + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_message_validation_schema:api_schema(post), + example_input_create() + ), + responses => + #{ + 201 => + emqx_dashboard_swagger:schema_with_examples( + emqx_message_validation_schema:api_schema(post), + example_return_create() + ), + 400 => error_schema('ALREADY_EXISTS', "Validation already exists") + } + }, + put => #{ + tags => ?TAGS, + summary => <<"Update a validation">>, + description => ?DESC("update_validation"), + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_message_validation_schema:api_schema(put), + example_input_update() + ), + responses => + #{ + 200 => + emqx_dashboard_swagger:schema_with_examples( + emqx_message_validation_schema:api_schema(put), + example_return_update() + ), + 404 => error_schema('NOT_FOUND', "Validation not found"), + 400 => error_schema('BAD_REQUEST', "Bad params") + } + } + }; +schema("/message_validations/:name") -> + #{ + 'operationId' => '/message_validations/:name', + get => #{ + tags => ?TAGS, + summary => <<"Lookup a validation">>, + description => ?DESC("lookup_validation"), + parameters => [param_path_name()], + responses => + #{ + 200 => + emqx_dashboard_swagger:schema_with_examples( + hoconsc:array( + emqx_message_validation_schema:api_schema(lookup) + ), + #{ + sample => + #{value => example_return_lookup()} + } + ), + 404 => error_schema('NOT_FOUND', "Validation not found") + } + }, + delete => #{ + tags => ?TAGS, + summary => <<"Delete a validation">>, + description => ?DESC("delete_validation"), + parameters => [param_path_name()], + responses => + #{ + 204 => <<"Validation deleted">>, + 404 => error_schema('NOT_FOUND', "Validation not found") + } + } + }; +schema("/message_validations/:name/move") -> + #{ + 'operationId' => '/message_validations/:name/move', + post => #{ + tags => ?TAGS, + summary => <<"Change the order of a validation">>, + description => ?DESC("move_validation"), + parameters => [param_path_name()], + 'requestBody' => + emqx_dashboard_swagger:schema_with_examples( + hoconsc:union(fun position_union_member_selector/1), + example_position() + ), + responses => + #{ + 204 => <<"No Content">>, + 400 => error_schema('BAD_REQUEST', <<"Bad request">>), + 404 => error_schema('NOT_FOUND', "Validation not found") + } + } + }. + +param_path_name() -> + {name, + mk( + binary(), + #{ + in => path, + required => true, + example => <<"my_validation">>, + desc => ?DESC("param_path_name") + } + )}. + +fields(front) -> + [{position, mk(front, #{default => front, required => true, in => body})}]; +fields(rear) -> + [{position, mk(rear, #{default => rear, required => true, in => body})}]; +fields('after') -> + [ + {position, mk('after', #{default => 'after', required => true, in => body})}, + {validation, mk(binary(), #{required => true, in => body})} + ]; +fields(before) -> + [ + {position, mk(before, #{default => before, required => true, in => body})}, + {validation, mk(binary(), #{required => true, in => body})} + ]. + +%%------------------------------------------------------------------------------------------------- +%% `minirest' handlers +%%------------------------------------------------------------------------------------------------- + +'/message_validations'(get, _Params) -> + ?OK(emqx_message_validation:list()); +'/message_validations'(post, #{body := Params = #{<<"name">> := Name}}) -> + with_validation( + Name, + return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)), + fun() -> + case emqx_message_validation:insert(Params) of + {ok, _} -> + {ok, Res} = emqx_message_validation:lookup(Name), + {201, Res}; + {error, Error} -> + ?BAD_REQUEST(Error) + end + end + ); +'/message_validations'(put, #{body := Params = #{<<"name">> := Name}}) -> + with_validation( + Name, + fun() -> + case emqx_message_validation:update(Params) of + {ok, _} -> + {ok, Res} = emqx_message_validation:lookup(Name), + {200, Res}; + {error, Error} -> + ?BAD_REQUEST(Error) + end + end, + not_found() + ). + +'/message_validations/:name'(get, #{bindings := #{name := Name}}) -> + with_validation( + Name, + fun(Validation) -> ?OK(Validation) end, + not_found() + ); +'/message_validations/:name'(delete, #{bindings := #{name := Name}}) -> + with_validation( + Name, + fun() -> + case emqx_message_validation:delete(Name) of + {ok, _} -> + ?NO_CONTENT; + {error, Error} -> + ?BAD_REQUEST(Error) + end + end, + not_found() + ). + +'/message_validations/:name/move'(post, #{bindings := #{name := Name}, body := Body}) -> + with_validation( + Name, + fun() -> + do_move(Name, parse_position(Body)) + end, + not_found(Name) + ). + +%%------------------------------------------------------------------------------------------------- +%% Internal fns +%%------------------------------------------------------------------------------------------------- + +ref(Struct) -> hoconsc:ref(?MODULE, Struct). +mk(Type, Opts) -> hoconsc:mk(Type, Opts). + +example_input_create() -> + %% TODO + #{}. + +example_input_update() -> + %% TODO + #{}. + +example_return_list() -> + %% TODO + []. + +example_return_create() -> + %% TODO + #{}. + +example_return_update() -> + %% TODO + #{}. + +example_return_lookup() -> + %% TODO + #{}. + +example_position() -> + %% TODO + #{}. + +error_schema(Code, Message) when is_atom(Code) -> + error_schema([Code], Message); +error_schema(Codes, Message) when is_list(Message) -> + error_schema(Codes, list_to_binary(Message)); +error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) -> + emqx_dashboard_swagger:error_codes(Codes, Message). + +position_union_member_selector(all_union_members) -> + position_refs(); +position_union_member_selector({value, V}) -> + position_refs(V). + +position_refs() -> + []. + +position_types() -> + [ + front, + rear, + 'after', + before + ]. + +position_refs(#{<<"position">> := <<"front">>}) -> + [ref(front)]; +position_refs(#{<<"position">> := <<"rear">>}) -> + [ref(rear)]; +position_refs(#{<<"position">> := <<"after">>}) -> + [ref('after')]; +position_refs(#{<<"position">> := <<"before">>}) -> + [ref(before)]; +position_refs(_) -> + Expected = lists:join(" | ", [atom_to_list(T) || T <- position_types()]), + throw(#{ + field_name => position, + expected => iolist_to_binary(Expected) + }). + +%% Schema is already checked, so we don't need to do further validation. +parse_position(#{<<"position">> := <<"front">>}) -> + front; +parse_position(#{<<"position">> := <<"rear">>}) -> + rear; +parse_position(#{<<"position">> := <<"after">>, <<"validation">> := OtherValidationName}) -> + {'after', OtherValidationName}; +parse_position(#{<<"position">> := <<"before">>, <<"validation">> := OtherValidationName}) -> + {before, OtherValidationName}. + +do_move(ValidationName, {_, OtherValidationName} = Position) -> + with_validation( + OtherValidationName, + fun() -> + case emqx_message_validation:move(ValidationName, Position) of + {ok, _} -> + ?NO_CONTENT; + {error, Error} -> + ?BAD_REQUEST(Error) + end + end, + bad_request_not_found(OtherValidationName) + ); +do_move(ValidationName, Position) -> + case emqx_message_validation:move(ValidationName, Position) of + {ok, _} -> + ?NO_CONTENT; + {error, Error} -> + ?BAD_REQUEST(Error) + end. + +with_validation(Name, FoundFn, NotFoundFn) -> + case emqx_message_validation:lookup(Name) of + {ok, Validation} -> + {arity, Arity} = erlang:fun_info(FoundFn, arity), + case Arity of + 1 -> FoundFn(Validation); + 0 -> FoundFn() + end; + {error, not_found} -> + NotFoundFn() + end. + +return(Response) -> + fun() -> Response end. + +not_found() -> + return(?NOT_FOUND(<<"Validation not found">>)). + +not_found(Name) -> + return(?NOT_FOUND(<<"Validation not found: ", Name/binary>>)). + +%% After we found the base validation, but not the other one being referenced in a move. +bad_request_not_found(Name) -> + return(?BAD_REQUEST(<<"Validation not found: ", Name/binary>>)). diff --git a/apps/emqx_message_validation/src/emqx_message_validation_registry.erl b/apps/emqx_message_validation/src/emqx_message_validation_registry.erl new file mode 100644 index 000000000..125429565 --- /dev/null +++ b/apps/emqx_message_validation/src/emqx_message_validation_registry.erl @@ -0,0 +1,225 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_message_validation_registry). + +-behaviour(gen_server). + +%% API +-export([ + lookup/1, + insert/2, + update/3, + delete/1, + reindex_positions/1, + + matching_validations/1, + + start_link/0, + metrics_worker_spec/0 +]). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-define(VALIDATION_TOPIC_INDEX, emqx_message_validation_index). +-define(VALIDATION_TAB, emqx_message_validation_tab). + +-define(METRIC_NAME, message_validation). +-define(METRICS, [ + 'matched', + 'passed', + 'failed' +]). +-define(RATE_METRICS, ['matched']). + +-type validation_name() :: binary(). +-type validation() :: _TODO. +-type position_index() :: pos_integer(). + +-record(reindex_positions, {validations :: [validation()]}). +-record(insert, {pos :: position_index(), validation :: validation()}). +-record(update, {old :: validation(), pos :: position_index(), new :: validation()}). +-record(delete, {validation :: validation()}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link() -> gen_server:start_ret(). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec lookup(validation_name()) -> + {ok, validation()} | {error, not_found}. +lookup(Name) -> + case emqx_utils_ets:lookup_value(?VALIDATION_TAB, Name, undefined) of + undefined -> + {error, not_found}; + Validation -> + {ok, Validation} + end. + +-spec reindex_positions([validation()]) -> ok. +reindex_positions(Validations) -> + gen_server:call(?MODULE, #reindex_positions{validations = Validations}, infinity). + +-spec insert(position_index(), validation()) -> ok. +insert(Pos, Validation) -> + gen_server:call(?MODULE, #insert{pos = Pos, validation = Validation}, infinity). + +-spec update(validation(), position_index(), validation()) -> ok. +update(Old, Pos, New) -> + gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity). + +-spec delete(validation()) -> ok. +delete(Validation) -> + gen_server:call(?MODULE, #delete{validation = Validation}, infinity). + +%% @doc Returns a list of matching validation names, sorted by their configuration order. +-spec matching_validations(emqx_types:topic()) -> [validation()]. +matching_validations(Topic) -> + Validations0 = [ + {Pos, Validation} + || M <- emqx_topic_index:matches(Topic, ?VALIDATION_TOPIC_INDEX, [unique]), + [Pos] <- [emqx_topic_index:get_record(M, ?VALIDATION_TOPIC_INDEX)], + {ok, Validation} <- [ + lookup(emqx_topic_index:get_id(M)) + ] + ], + Validations1 = lists:sort(fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Validations0), + lists:map(fun({_Pos, V}) -> V end, Validations1). + +-spec metrics_worker_spec() -> supervisor:child_spec(). +metrics_worker_spec() -> + emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME). + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init(_) -> + create_tables(), + State = #{}, + {ok, State}. + +handle_call(#reindex_positions{validations = Validations}, _From, State) -> + do_reindex_positions(Validations), + {reply, ok, State}; +handle_call(#insert{pos = Pos, validation = Validation}, _From, State) -> + do_insert(Pos, Validation), + {reply, ok, State}; +handle_call(#update{old = OldValidation, pos = Pos, new = NewValidation}, _From, State) -> + ok = do_update(OldValidation, Pos, NewValidation), + {reply, ok, State}; +handle_call(#delete{validation = Validation}, _From, State) -> + do_delete(Validation), + {reply, ok, State}; +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +create_tables() -> + _ = emqx_utils_ets:new(?VALIDATION_TOPIC_INDEX, [public, ordered_set, {read_concurrency, true}]), + _ = emqx_utils_ets:new(?VALIDATION_TAB, [public, ordered_set, {read_concurrency, true}]), + ok. + +do_reindex_positions(Validations) -> + lists:foreach( + fun({Pos, Validation}) -> + #{ + name := Name, + topics := Topics + } = Validation, + do_insert_into_tab(Name, Validation, Pos), + update_topic_index(Name, Pos, Topics) + end, + lists:enumerate(Validations) + ). + +do_insert(Pos, Validation) -> + #{ + name := Name, + topics := Topics + } = Validation, + maybe_create_metrics(Name), + do_insert_into_tab(Name, Validation, Pos), + update_topic_index(Name, Pos, Topics), + ok. + +do_update(OldValidation, Pos, NewValidation) -> + #{topics := OldTopics} = OldValidation, + #{ + name := Name, + topics := NewTopics + } = NewValidation, + maybe_create_metrics(Name), + do_insert_into_tab(Name, NewValidation, Pos), + delete_topic_index(Name, OldTopics), + update_topic_index(Name, Pos, NewTopics), + ok. + +do_delete(Validation) -> + #{ + name := Name, + topics := Topics + } = Validation, + ets:delete(?VALIDATION_TAB, Name), + delete_topic_index(Name, Topics), + drop_metrics(Name), + ok. + +do_insert_into_tab(Name, Validation0, Pos) -> + Validation = transform_validation(Validation0#{pos => Pos}), + ets:insert(?VALIDATION_TAB, {Name, Validation}), + ok. + +maybe_create_metrics(Name) -> + case emqx_metrics_worker:has_metrics(?METRIC_NAME, Name) of + true -> + ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, Name); + false -> + ok = emqx_metrics_worker:create_metrics(?METRIC_NAME, Name, ?METRICS, ?RATE_METRICS) + end. + +drop_metrics(Name) -> + ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, Name). + +update_topic_index(Name, Pos, Topics) -> + lists:foreach( + fun(Topic) -> + true = emqx_topic_index:insert(Topic, Name, Pos, ?VALIDATION_TOPIC_INDEX) + end, + Topics + ). + +delete_topic_index(Name, Topics) -> + lists:foreach( + fun(Topic) -> + true = emqx_topic_index:delete(Topic, Name, ?VALIDATION_TOPIC_INDEX) + end, + Topics + ). + +transform_validation(Validation = #{checks := Checks}) -> + Validation#{checks := lists:map(fun transform_check/1, Checks)}. + +transform_check(#{type := sql, sql := SQL}) -> + {ok, Check} = emqx_message_validation:parse_sql_check(SQL), + Check; +transform_check(Check) -> + Check. diff --git a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl new file mode 100644 index 000000000..998e11302 --- /dev/null +++ b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl @@ -0,0 +1,206 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_message_validation_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +%% `hocon_schema' API +-export([ + namespace/0, + roots/0, + fields/1 +]). + +%% `minirest_trails' API +-export([ + api_schema/1 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% `hocon_schema' API +%%------------------------------------------------------------------------------ + +namespace() -> message_validation. + +roots() -> + [{message_validation, mk(ref(message_validation), #{importance => ?IMPORTANCE_HIDDEN})}]. + +fields(message_validation) -> + [ + {validations, + mk( + hoconsc:array(ref(validation)), + #{ + default => [], + desc => ?DESC("validations"), + validator => fun validate_unique_names/1 + } + )} + ]; +fields(validation) -> + [ + {tags, emqx_schema:tags_schema()}, + {description, emqx_schema:description_schema()}, + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {name, mk(binary(), #{required => true, desc => ?DESC("name")})}, + {topics, + mk( + hoconsc:union([binary(), hoconsc:array(binary())]), + #{ + desc => ?DESC("topics"), + converter => fun ensure_array/2, + required => true + } + )}, + {strategy, + mk( + hoconsc:enum([any_pass, all_pass]), + #{desc => ?DESC("strategy"), required => true} + )}, + {failure_action, + mk( + hoconsc:enum([drop, disconnect]), + #{desc => ?DESC("failure_action"), required => true} + )}, + {log_failure_at, + mk( + hoconsc:enum([error, warning, notice, info, debug]), + #{desc => ?DESC("log_failure_at"), default => info} + )}, + {checks, + mk( + hoconsc:array( + hoconsc:union(fun checks_union_member_selector/1) + ), + #{ + required => true, + desc => ?DESC("checks"), + validator => fun + ([]) -> + {error, "at least one check must be defined"}; + (_) -> + ok + end + } + )} + ]; +fields(check_sql) -> + [ + {type, mk(sql, #{default => sql, desc => ?DESC("check_sql_type")})}, + {sql, + mk(binary(), #{ + required => true, + desc => ?DESC("check_sql_type"), + validator => fun validate_sql/1 + })} + ]; +fields(check_json) -> + [ + {type, mk(json, #{default => json, desc => ?DESC("check_json_type")})}, + {schema, mk(binary(), #{required => true, desc => ?DESC("check_json_type")})} + ]; +fields(check_protobuf) -> + [ + {type, mk(protobuf, #{default => protobuf, desc => ?DESC("check_protobuf_type")})}, + {schema, mk(binary(), #{required => true, desc => ?DESC("check_protobuf_schema")})}, + {message_name, + mk(binary(), #{required => true, desc => ?DESC("check_protobuf_message_name")})} + ]; +fields(check_avro) -> + [ + {type, mk(avro, #{default => avro, desc => ?DESC("check_avro_type")})}, + {schema, mk(binary(), #{required => true, desc => ?DESC("check_avro_schema")})} + ]. + +checks_union_member_selector(all_union_members) -> + checks_refs(); +checks_union_member_selector({value, V}) -> + checks_refs(V). + +checks_refs() -> + [ref(CheckType) || CheckType <- check_types()]. + +check_types() -> + [ + check_sql, + check_json, + check_avro, + check_protobuf + ]. + +checks_refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) -> + checks_refs(Value#{<<"type">> := atom_to_binary(TypeAtom)}); +checks_refs(#{<<"type">> := <<"sql">>}) -> + [ref(check_sql)]; +checks_refs(#{<<"type">> := <<"json">>}) -> + [ref(check_json)]; +checks_refs(#{<<"type">> := <<"avro">>}) -> + [ref(check_avro)]; +checks_refs(#{<<"type">> := <<"protobuf">>}) -> + [ref(check_protobuf)]; +checks_refs(_Value) -> + Expected = lists:join( + " | ", + [ + Name + || T <- check_types(), + "check_" ++ Name <- [atom_to_list(T)] + ] + ), + throw(#{ + field_name => type, + expected => iolist_to_binary(Expected) + }). + +%%------------------------------------------------------------------------------ +%% `minirest_trails' API +%%------------------------------------------------------------------------------ + +api_schema(list) -> + hoconsc:array(ref(validation)); +api_schema(lookup) -> + ref(validation); +api_schema(post) -> + ref(validation); +api_schema(put) -> + ref(validation). + +%%------------------------------------------------------------------------------ +%% Internal exports +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +mk(Type, Meta) -> hoconsc:mk(Type, Meta). +ref(Name) -> hoconsc:ref(?MODULE, Name). + +ensure_array(undefined, _) -> undefined; +ensure_array(L, _) when is_list(L) -> L; +ensure_array(B, _) -> [B]. + +validate_sql(SQL) -> + case emqx_message_validation:parse_sql_check(SQL) of + {ok, _Parsed} -> + ok; + Error = {error, _} -> + Error + end. + +validate_unique_names(Validations0) -> + Validations = emqx_utils_maps:binary_key_map(Validations0), + do_validate_unique_names(Validations, #{}). + +do_validate_unique_names(_Validations = [], _Acc) -> + ok; +do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(Name, Acc) -> + {error, <<"duplicated name: ", Name/binary>>}; +do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) -> + do_validate_unique_names(Rest, Acc#{Name => true}). diff --git a/apps/emqx_message_validation/src/emqx_message_validation_sup.erl b/apps/emqx_message_validation/src/emqx_message_validation_sup.erl new file mode 100644 index 000000000..2e8d6b8c6 --- /dev/null +++ b/apps/emqx_message_validation/src/emqx_message_validation_sup.erl @@ -0,0 +1,47 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_message_validation_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% `supervisor' API +-export([init/1]). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%------------------------------------------------------------------------------ +%% `supervisor' API +%%------------------------------------------------------------------------------ + +init([]) -> + Registry = worker_spec(emqx_message_validation_registry), + Metrics = emqx_message_validation_registry:metrics_worker_spec(), + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 10 + }, + ChildSpecs = [Metrics, Registry], + {ok, {SupFlags, ChildSpecs}}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +worker_spec(Mod) -> + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 5_000, + type => worker + }. diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl new file mode 100644 index 000000000..9c1289e88 --- /dev/null +++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl @@ -0,0 +1,690 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_message_validation_http_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), + Apps = emqx_cth_suite:start( + lists:flatten( + [ + emqx, + emqx_conf, + emqx_rule_engine, + emqx_message_validation, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard(), + emqx_schema_registry + ] + ), + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + ok = emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + clear_all_validations(), + emqx_common_test_helpers:call_janitor(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +-define(assertIndexOrder(EXPECTED, TOPIC), assert_index_order(EXPECTED, TOPIC, #{line => ?LINE})). + +clear_all_validations() -> + lists:foreach( + fun(#{name := Name}) -> + {ok, _} = emqx_message_validation:delete(Name) + end, + emqx_message_validation:list() + ). + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +request(Method, Path, Params) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + +validation(Name, Checks) -> + validation(Name, Checks, _Overrides = #{}). + +validation(Name, Checks, Overrides) -> + Default = #{ + <<"tags">> => [<<"some">>, <<"tags">>], + <<"description">> => <<"my validation">>, + <<"enable">> => true, + <<"name">> => Name, + <<"topics">> => <<"t/+">>, + <<"strategy">> => <<"all_pass">>, + <<"failure_action">> => <<"drop">>, + <<"log_failure_at">> => <<"warning">>, + <<"checks">> => Checks + }, + emqx_utils_maps:deep_merge(Default, Overrides). + +sql_check() -> + sql_check(<<"select * where true">>). + +sql_check(SQL) -> + #{ + <<"type">> => <<"sql">>, + <<"sql">> => SQL + }. + +schema_check(Type, SerdeName) -> + schema_check(Type, SerdeName, _Overrides = #{}). + +schema_check(Type, SerdeName, Overrides) -> + emqx_utils_maps:deep_merge( + #{ + <<"type">> => emqx_utils_conv:bin(Type), + <<"schema">> => SerdeName + }, + Overrides + ). + +api_root() -> "message_validations". + +simplify_result(Res) -> + case Res of + {error, {{_, Status, _}, _, Body}} -> + {Status, Body}; + {ok, {{_, Status, _}, _, Body}} -> + {Status, Body} + end. + +list() -> + Path = emqx_mgmt_api_test_util:api_path([api_root()]), + Res = request(get, Path, _Params = []), + ct:pal("list result:\n ~p", [Res]), + simplify_result(Res). + +lookup(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), + Res = request(get, Path, _Params = []), + ct:pal("lookup ~s result:\n ~p", [Name, Res]), + simplify_result(Res). + +insert(Params) -> + Path = emqx_mgmt_api_test_util:api_path([api_root()]), + Res = request(post, Path, Params), + ct:pal("insert result:\n ~p", [Res]), + simplify_result(Res). + +update(Params) -> + Path = emqx_mgmt_api_test_util:api_path([api_root()]), + Res = request(put, Path, Params), + ct:pal("update result:\n ~p", [Res]), + simplify_result(Res). + +delete(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]), + Res = request(delete, Path, _Params = []), + ct:pal("delete result:\n ~p", [Res]), + simplify_result(Res). + +move(Name, Pos) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), Name, "move"]), + Res = request(post, Path, Pos), + ct:pal("move result:\n ~p", [Res]), + simplify_result(Res). + +connect(ClientId) -> + connect(ClientId, _IsPersistent = false). + +connect(ClientId, IsPersistent) -> + Properties = emqx_utils_maps:put_if(#{}, 'Session-Expiry-Interval', 30, IsPersistent), + {ok, Client} = emqtt:start_link([ + {clean_start, true}, + {clientid, ClientId}, + {properties, Properties}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client), + on_exit(fun() -> catch emqtt:stop(Client) end), + Client. + +publish(Client, Topic, Payload) -> + publish(Client, Topic, Payload, _QoS = 0). + +publish(Client, Topic, {raw, Payload}, QoS) -> + case emqtt:publish(Client, Topic, Payload, QoS) of + ok -> ok; + {ok, _} -> ok; + Err -> Err + end; +publish(Client, Topic, Payload, QoS) -> + case emqtt:publish(Client, Topic, emqx_utils_json:encode(Payload), QoS) of + ok -> ok; + {ok, _} -> ok; + Err -> Err + end. + +json_valid_payloads() -> + [ + #{i => 10, s => <<"s">>}, + #{i => 10} + ]. + +json_invalid_payloads() -> + [ + #{i => <<"wrong type">>}, + #{x => <<"unknown property">>} + ]. + +json_create_serde(SerdeName) -> + Source = #{ + type => object, + properties => #{ + i => #{type => integer}, + s => #{type => string} + }, + required => [<<"i">>], + additionalProperties => false + }, + Schema = #{type => json, source => emqx_utils_json:encode(Source)}, + ok = emqx_schema_registry:add_schema(SerdeName, Schema), + on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end), + ok. + +avro_valid_payloads(SerdeName) -> + lists:map( + fun(Payload) -> emqx_schema_registry_serde:encode(SerdeName, Payload) end, + [ + #{i => 10, s => <<"s">>}, + #{i => 10} + ] + ). + +avro_invalid_payloads() -> + [ + emqx_utils_json:encode(#{i => 10, s => <<"s">>}), + <<"">> + ]. + +avro_create_serde(SerdeName) -> + Source = #{ + type => record, + name => <<"test">>, + namespace => <<"emqx.com">>, + fields => [ + #{name => <<"i">>, type => <<"int">>}, + #{name => <<"s">>, type => [<<"null">>, <<"string">>], default => <<"null">>} + ] + }, + Schema = #{type => avro, source => emqx_utils_json:encode(Source)}, + ok = emqx_schema_registry:add_schema(SerdeName, Schema), + on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end), + ok. + +protobuf_valid_payloads(SerdeName, MessageName) -> + lists:map( + fun(Payload) -> emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageName]) end, + [ + #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>}, + #{<<"name">> => <<"some name">>, <<"id">> => 10} + ] + ). + +protobuf_invalid_payloads() -> + [ + emqx_utils_json:encode(#{name => <<"a">>, id => 10, email => <<"email">>}), + <<"not protobuf">> + ]. + +protobuf_create_serde(SerdeName) -> + Source = + << + "message Person {\n" + " required string name = 1;\n" + " required int32 id = 2;\n" + " optional string email = 3;\n" + " }\n" + "message UnionValue {\n" + " oneof u {\n" + " int32 a = 1;\n" + " string b = 2;\n" + " }\n" + "}" + >>, + Schema = #{type => protobuf, source => Source}, + ok = emqx_schema_registry:add_schema(SerdeName, Schema), + on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end), + ok. + +%% Checks that the internal order in the registry/index matches expectation. +assert_index_order(ExpectedOrder, Topic, Comment) -> + ?assertEqual( + ExpectedOrder, + [ + N + || #{name := N} <- emqx_message_validation_registry:matching_validations(Topic) + ], + Comment + ). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +%% Smoke test where we have a single check and `all_pass' strategy. +t_smoke_test(_Config) -> + Name1 = <<"foo">>, + Check1 = sql_check(<<"select payload.value as x where x > 15">>), + Validation1 = validation(Name1, [Check1]), + {201, _} = insert(Validation1), + + lists:foreach( + fun({QoS, IsPersistent}) -> + ct:pal("qos = ~b, is persistent = ~p", [QoS, IsPersistent]), + C = connect(<<"c1">>, IsPersistent), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + {200, _} = update(Validation1), + + ok = publish(C, <<"t/1">>, #{value => 20}, QoS), + ?assertReceive({publish, _}), + ok = publish(C, <<"t/1">>, #{value => 10}, QoS), + ?assertNotReceive({publish, _}), + ok = publish(C, <<"t/1/a">>, #{value => 10}, QoS), + ?assertReceive({publish, _}), + + %% test `disconnect' failure action + Validation2 = validation(Name1, [Check1], #{<<"failure_action">> => <<"disconnect">>}), + {200, _} = update(Validation2), + + unlink(C), + PubRes = publish(C, <<"t/1">>, #{value => 0}, QoS), + case QoS =:= 0 of + true -> + ?assertMatch(ok, PubRes); + false -> + ?assertMatch( + {error, {disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}}, + PubRes + ) + end, + ?assertNotReceive({publish, _}), + ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}), + + ok + end, + [ + {QoS, IsPersistent} + || IsPersistent <- [false, true], + QoS <- [0, 1, 2] + ] + ), + + ok. + +t_crud(_Config) -> + ?assertMatch({200, []}, list()), + + Name1 = <<"foo">>, + Validation1 = validation(Name1, [sql_check()]), + + ?assertMatch({201, #{<<"name">> := Name1}}, insert(Validation1)), + ?assertMatch({200, #{<<"name">> := Name1}}, lookup(Name1)), + ?assertMatch({200, [#{<<"name">> := Name1}]}, list()), + %% Duplicated name + ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, insert(Validation1)), + + Name2 = <<"bar">>, + Validation2 = validation(Name2, [sql_check()]), + %% Not found + ?assertMatch({404, _}, update(Validation2)), + ?assertMatch({201, _}, insert(Validation2)), + ?assertMatch( + {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]}, + list() + ), + ?assertMatch({200, #{<<"name">> := Name2}}, lookup(Name2)), + Validation1b = validation(Name1, [ + sql_check(<<"select * where true">>), + sql_check(<<"select * where false">>) + ]), + ?assertMatch({200, _}, update(Validation1b)), + ?assertMatch({200, #{<<"checks">> := [_, _]}}, lookup(Name1)), + %% order is unchanged + ?assertMatch( + {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]}, + list() + ), + + ?assertMatch({204, _}, delete(Name1)), + ?assertMatch({404, _}, lookup(Name1)), + ?assertMatch({200, [#{<<"name">> := Name2}]}, list()), + ?assertMatch({404, _}, update(Validation1)), + + ok. + +t_move(_Config) -> + lists:foreach( + fun(Pos) -> + ?assertMatch({404, _}, move(<<"nonexistent_validation">>, Pos)) + end, + [ + #{<<"position">> => <<"front">>}, + #{<<"position">> => <<"rear">>}, + #{<<"position">> => <<"after">>, <<"validation">> => <<"also_non_existent">>}, + #{<<"position">> => <<"before">>, <<"validation">> => <<"also_non_existent">>} + ] + ), + + Topic = <<"t">>, + + Name1 = <<"foo">>, + Validation1 = validation(Name1, [sql_check()], #{<<"topics">> => Topic}), + {201, _} = insert(Validation1), + + %% bogus positions + lists:foreach( + fun(Pos) -> + ?assertMatch( + {400, #{<<"message">> := #{<<"kind">> := <<"validation_error">>}}}, + move(Name1, Pos) + ) + end, + [ + #{<<"position">> => <<"foo">>}, + #{<<"position">> => <<"bar">>, <<"validation">> => Name1} + ] + ), + + lists:foreach( + fun(Pos) -> + ?assertMatch({204, _}, move(Name1, Pos)), + ?assertMatch({200, [#{<<"name">> := Name1}]}, list()) + end, + [ + #{<<"position">> => <<"front">>}, + #{<<"position">> => <<"rear">>} + ] + ), + + lists:foreach( + fun(Pos) -> + ?assertMatch({400, _}, move(Name1, Pos)) + end, + [ + #{<<"position">> => <<"after">>, <<"validation">> => <<"nonexistent">>}, + #{<<"position">> => <<"before">>, <<"validation">> => <<"nonexistent">>} + ] + ), + + Name2 = <<"bar">>, + Validation2 = validation(Name2, [sql_check()], #{<<"topics">> => Topic}), + {201, _} = insert(Validation2), + ?assertMatch({200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]}, list()), + ?assertIndexOrder([Name1, Name2], Topic), + + ?assertMatch({204, _}, move(Name1, #{<<"position">> => <<"rear">>})), + ?assertMatch({200, [#{<<"name">> := Name2}, #{<<"name">> := Name1}]}, list()), + ?assertIndexOrder([Name2, Name1], Topic), + + ?assertMatch({204, _}, move(Name1, #{<<"position">> => <<"front">>})), + ?assertMatch({200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]}, list()), + ?assertIndexOrder([Name1, Name2], Topic), + + Name3 = <<"baz">>, + Validation3 = validation(Name3, [sql_check()], #{<<"topics">> => Topic}), + {201, _} = insert(Validation3), + ?assertMatch( + {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}, #{<<"name">> := Name3}]}, + list() + ), + ?assertIndexOrder([Name1, Name2, Name3], Topic), + + ?assertMatch( + {204, _}, move(Name3, #{<<"position">> => <<"before">>, <<"validation">> => Name2}) + ), + ?assertMatch( + {200, [#{<<"name">> := Name1}, #{<<"name">> := Name3}, #{<<"name">> := Name2}]}, + list() + ), + ?assertIndexOrder([Name1, Name3, Name2], Topic), + + ?assertMatch( + {204, _}, move(Name1, #{<<"position">> => <<"after">>, <<"validation">> => Name2}) + ), + ?assertMatch( + {200, [#{<<"name">> := Name3}, #{<<"name">> := Name2}, #{<<"name">> := Name1}]}, + list() + ), + ?assertIndexOrder([Name3, Name2, Name1], Topic), + + ok. + +%% Check the `all_pass' strategy +t_all_pass(_Config) -> + Name1 = <<"foo">>, + Check1 = sql_check(<<"select payload.x as x where x > 5">>), + Check2 = sql_check(<<"select payload.x as x where x > 10">>), + Validation1 = validation(Name1, [Check1, Check2], #{<<"strategy">> => <<"all_pass">>}), + {201, _} = insert(Validation1), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + ok = publish(C, <<"t/1">>, #{x => 0}), + ?assertNotReceive({publish, _}), + ok = publish(C, <<"t/1">>, #{x => 7}), + ?assertNotReceive({publish, _}), + ok = publish(C, <<"t/1">>, #{x => 11}), + ?assertReceive({publish, _}), + + ok. + +%% Check the `any_pass' strategy +t_any_pass(_Config) -> + Name1 = <<"foo">>, + Check1 = sql_check(<<"select payload.x as x where x > 5">>), + Check2 = sql_check(<<"select payload.x as x where x > 10">>), + Validation1 = validation(Name1, [Check1, Check2], #{<<"strategy">> => <<"any_pass">>}), + {201, _} = insert(Validation1), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + ok = publish(C, <<"t/1">>, #{x => 11}), + ?assertReceive({publish, _}), + ok = publish(C, <<"t/1">>, #{x => 7}), + ?assertReceive({publish, _}), + ok = publish(C, <<"t/1">>, #{x => 0}), + ?assertNotReceive({publish, _}), + + ok. + +%% Checks that multiple validations are run in order. +t_multiple_validations(_Config) -> + Name1 = <<"foo">>, + Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>), + Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}), + {201, _} = insert(Validation1), + + Name2 = <<"bar">>, + Check2 = sql_check(<<"select payload.x as x where x > 5">>), + Validation2 = validation(Name2, [Check2], #{<<"failure_action">> => <<"disconnect">>}), + {201, _} = insert(Validation2), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + ok = publish(C, <<"t/1">>, #{x => 11, y => 1}), + ?assertReceive({publish, _}), + ok = publish(C, <<"t/1">>, #{x => 7, y => 0}), + ?assertNotReceive({publish, _}), + ?assertNotReceive({disconnected, _, _}), + unlink(C), + ok = publish(C, <<"t/1">>, #{x => 0, y => 1}), + ?assertNotReceive({publish, _}), + ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}), + + ok. + +t_schema_check_non_existent_serde(_Config) -> + SerdeName = <<"idontexist">>, + Name1 = <<"foo">>, + Check1 = schema_check(json, SerdeName), + Validation1 = validation(Name1, [Check1]), + {201, _} = insert(Validation1), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + ok = publish(C, <<"t/1">>, #{i => 10, s => <<"s">>}), + ?assertNotReceive({publish, _}), + + ok. + +t_schema_check_json(_Config) -> + SerdeName = <<"myserde">>, + json_create_serde(SerdeName), + + Name1 = <<"foo">>, + Check1 = schema_check(json, SerdeName), + Validation1 = validation(Name1, [Check1]), + {201, _} = insert(Validation1), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + lists:foreach( + fun(Payload) -> + ok = publish(C, <<"t/1">>, Payload), + ?assertReceive({publish, _}) + end, + json_valid_payloads() + ), + lists:foreach( + fun(Payload) -> + ok = publish(C, <<"t/1">>, Payload), + ?assertNotReceive({publish, _}) + end, + json_invalid_payloads() + ), + + ok. + +t_schema_check_avro(_Config) -> + SerdeName = <<"myserde">>, + avro_create_serde(SerdeName), + + Name1 = <<"foo">>, + Check1 = schema_check(avro, SerdeName), + Validation1 = validation(Name1, [Check1]), + {201, _} = insert(Validation1), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + lists:foreach( + fun(Payload) -> + ok = publish(C, <<"t/1">>, {raw, Payload}), + ?assertReceive({publish, _}) + end, + avro_valid_payloads(SerdeName) + ), + lists:foreach( + fun(Payload) -> + ok = publish(C, <<"t/1">>, {raw, Payload}), + ?assertNotReceive({publish, _}) + end, + avro_invalid_payloads() + ), + + ok. + +t_schema_check_protobuf(_Config) -> + SerdeName = <<"myserde">>, + MessageName = <<"Person">>, + protobuf_create_serde(SerdeName), + + Name1 = <<"foo">>, + Check1 = schema_check(protobuf, SerdeName, #{<<"message_name">> => MessageName}), + Validation1 = validation(Name1, [Check1]), + {201, _} = insert(Validation1), + + C = connect(<<"c1">>), + {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>), + + lists:foreach( + fun(Payload) -> + ok = publish(C, <<"t/1">>, {raw, Payload}), + ?assertReceive({publish, _}) + end, + protobuf_valid_payloads(SerdeName, MessageName) + ), + lists:foreach( + fun(Payload) -> + ok = publish(C, <<"t/1">>, {raw, Payload}), + ?assertNotReceive({publish, _}) + end, + protobuf_invalid_payloads() + ), + + %% Bad config: unknown message name + Check2 = schema_check(protobuf, SerdeName, #{<<"message_name">> => <<"idontexist">>}), + Validation2 = validation(Name1, [Check2]), + {200, _} = update(Validation2), + + lists:foreach( + fun(Payload) -> + ok = publish(C, <<"t/1">>, {raw, Payload}), + ?assertNotReceive({publish, _}) + end, + protobuf_valid_payloads(SerdeName, MessageName) + ), + + ok. diff --git a/apps/emqx_message_validation/test/emqx_message_validation_tests.erl b/apps/emqx_message_validation/test/emqx_message_validation_tests.erl new file mode 100644 index 000000000..118eaebbc --- /dev/null +++ b/apps/emqx_message_validation/test/emqx_message_validation_tests.erl @@ -0,0 +1,194 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_message_validation_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(VALIDATIONS_PATH, "message_validation.validations"). + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +parse_and_check(InnerConfigs) -> + RootBin = <<"message_validation">>, + InnerBin = <<"validations">>, + RawConf = #{RootBin => #{InnerBin => InnerConfigs}}, + #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain( + emqx_message_validation_schema, + RawConf, + #{ + required => false, + atom_key => false, + make_serializable => false + } + ), + Checked. + +validation(Name, Checks) -> + validation(Name, Checks, _Overrides = #{}). + +validation(Name, Checks, Overrides) -> + Default = #{ + <<"tags">> => [<<"some">>, <<"tags">>], + <<"description">> => <<"my validation">>, + <<"enable">> => true, + <<"name">> => Name, + <<"topics">> => <<"t/+">>, + <<"strategy">> => <<"all_pass">>, + <<"failure_action">> => <<"drop">>, + <<"log_failure_at">> => <<"warning">>, + <<"checks">> => Checks + }, + emqx_utils_maps:deep_merge(Default, Overrides). + +sql_check() -> + sql_check(<<"select * where true">>). + +sql_check(SQL) -> + #{ + <<"type">> => <<"sql">>, + <<"sql">> => SQL + }. + +eval_sql(Message, SQL) -> + {ok, Check} = emqx_message_validation:parse_sql_check(SQL), + emqx_message_validation:evaluate_sql_check(Check, Message). + +message() -> + message(_Opts = #{}). + +message(Opts) -> + Defaults = #{ + id => emqx_guid:gen(), + qos => 0, + from => emqx_guid:to_hexstr(emqx_guid:gen()), + flags => #{retain => false}, + headers => #{ + proto_ver => v5, + properties => #{'User-Property' => [{<<"a">>, <<"b">>}]} + }, + topic => <<"t/t">>, + payload => emqx_utils_json:encode(#{value => 10}), + timestamp => 1710272561615, + extra => [] + }, + emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)). + +%%------------------------------------------------------------------------------ +%% Test cases +%%------------------------------------------------------------------------------ + +schema_test_() -> + [ + {"topics is always a list 1", + ?_assertMatch( + [#{<<"topics">> := [<<"t/1">>]}], + parse_and_check([ + validation( + <<"foo">>, + [sql_check()], + #{<<"topics">> => <<"t/1">>} + ) + ]) + )}, + {"topics is always a list 2", + ?_assertMatch( + [#{<<"topics">> := [<<"t/1">>]}], + parse_and_check([ + validation( + <<"foo">>, + [sql_check()], + #{<<"topics">> => [<<"t/1">>]} + ) + ]) + )}, + {"foreach expression is not allowed", + ?_assertThrow( + {_Schema, [ + #{ + reason := foreach_not_allowed, + kind := validation_error + } + ]}, + parse_and_check([ + validation( + <<"foo">>, + [sql_check(<<"foreach foo as f where true">>)] + ) + ]) + )}, + {"from clause is not allowed", + ?_assertThrow( + {_Schema, [ + #{ + reason := non_empty_from_clause, + kind := validation_error + } + ]}, + parse_and_check([ + validation( + <<"foo">>, + [sql_check(<<"select * from t">>)] + ) + ]) + )}, + {"names are unique", + ?_assertThrow( + {_Schema, [ + #{ + reason := <<"duplicated name:", _/binary>>, + path := ?VALIDATIONS_PATH, + kind := validation_error + } + ]}, + parse_and_check([ + validation(<<"foo">>, [sql_check()]), + validation(<<"foo">>, [sql_check()]) + ]) + )}, + {"checks must be non-empty", + ?_assertThrow( + {_Schema, [ + #{ + reason := "at least one check must be defined", + kind := validation_error + } + ]}, + parse_and_check([ + validation( + <<"foo">>, + [] + ) + ]) + )}, + {"bogus check type", + ?_assertThrow( + {_Schema, [ + #{ + expected := <<"sql", _/binary>>, + kind := validation_error, + field_name := type + } + ]}, + parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])]) + )} + ]. + +check_test_() -> + [ + {"denied by payload 1", + ?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))}, + {"denied by payload 2", + ?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))}, + {"allowed by payload 1", + ?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))}, + {"allowed by payload 2", + ?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))}, + {"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))}, + {"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))}, + {"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))}, + {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))}, + {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))} + ]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 1cc67840b..373dd2622 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -51,12 +51,6 @@ get_rules_ordered_by_ts/0 ]). -%% exported for cluster_call --export([ - do_delete_rule/1, - do_insert_rule/1 -]). - -export([ load_hooks_for_rule/1, unload_hooks_for_rule/1, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 6bc60b70d..899a85002 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -41,4 +41,5 @@ stop(_State) -> RulePath = [RuleEngine | _] = ?KEY_PATH, emqx_conf:remove_handler(RulePath ++ ['?']), emqx_conf:remove_handler([RuleEngine]), - ok = emqx_rule_events:unload(). + ok = emqx_rule_events:unload(), + ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 38c05ff3b..6a46e153b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -38,7 +38,8 @@ namespace() -> rule_engine. tags() -> [<<"Rule Engine">>]. -roots() -> [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}]. +roots() -> + [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}]. fields("rule_engine") -> rule_engine_settings() ++ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 3e5e10c3a..e322d3ecd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -28,13 +28,25 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Registry = #{ - id => emqx_rule_engine, - start => {emqx_rule_engine, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_rule_engine] + RuleEngineRegistry = worker_spec(emqx_rule_engine), + RuleEngineMetrics = emqx_metrics_worker:child_spec(rule_metrics), + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 10 }, - Metrics = emqx_metrics_worker:child_spec(rule_metrics), - {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}. + Children = [ + RuleEngineRegistry, + RuleEngineMetrics + ], + {ok, {SupFlags, Children}}. + +worker_spec(Mod) -> + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 5_000, + type => worker, + modules => [Mod] + }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index fa845bc05..f51908772 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -26,6 +26,9 @@ inc_action_metrics/2 ]). +%% Internal exports used by message validation +-export([evaluate_select/3, clear_rule_payload/0]). + -import( emqx_rule_maps, [ @@ -129,27 +132,16 @@ do_apply_rule( Columns, Envs ) -> - {Selected, Collection} = ?RAISE( - select_and_collect(Fields, Columns), - {select_and_collect_error, {EXCLASS, EXCPTION, ST}} - ), - ColumnsAndSelected = maps:merge(Columns, Selected), - case - ?RAISE( - match_conditions(Conditions, ColumnsAndSelected), - {match_conditions_error, {EXCLASS, EXCPTION, ST}} - ) - of - true -> - Collection2 = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection), - case Collection2 of + case evaluate_foreach(Fields, Columns, Conditions, InCase, DoEach) of + {ok, ColumnsAndSelected, FinalCollection} -> + case FinalCollection of [] -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'); _ -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') end, NewEnvs = maps:merge(ColumnsAndSelected, Envs), - {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]}; + {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]}; false -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} @@ -165,6 +157,16 @@ do_apply_rule( Columns, Envs ) -> + case evaluate_select(Fields, Columns, Conditions) of + {ok, Selected} -> + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), + {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))}; + false -> + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), + {error, nomatch} + end. + +evaluate_select(Fields, Columns, Conditions) -> Selected = ?RAISE( select_and_transform(Fields, Columns), {select_and_transform_error, {EXCLASS, EXCPTION, ST}} @@ -176,11 +178,28 @@ do_apply_rule( ) of true -> - ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), - {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))}; + {ok, Selected}; false -> - ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), - {error, nomatch} + false + end. + +evaluate_foreach(Fields, Columns, Conditions, InCase, DoEach) -> + {Selected, Collection} = ?RAISE( + select_and_collect(Fields, Columns), + {select_and_collect_error, {EXCLASS, EXCPTION, ST}} + ), + ColumnsAndSelected = maps:merge(Columns, Selected), + case + ?RAISE( + match_conditions(Conditions, ColumnsAndSelected), + {match_conditions_error, {EXCLASS, EXCPTION, ST}} + ) + of + true -> + FinalCollection = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection), + {ok, ColumnsAndSelected, FinalCollection}; + false -> + false end. clear_rule_payload() -> @@ -281,6 +300,10 @@ match_conditions({'fun', {_, Name}, Args}, Data) -> apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data); match_conditions({Op, L, R}, Data) when ?is_comp(Op) -> compare(Op, eval(L, Data), eval(R, Data)); +match_conditions({const, true}, _Data) -> + true; +match_conditions({const, false}, _Data) -> + false; match_conditions({}, _Data) -> true. diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl index f305d386b..8fbc80704 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl @@ -18,7 +18,7 @@ -include("rule_engine.hrl"). --export([parse/1]). +-export([parse/1, parse/2]). -export([ select_fields/1, @@ -62,37 +62,29 @@ -type field() :: const() | variable() | {as, field(), alias()} | sql_func(). +-type parse_opts() :: #{ + %% Whether `from' clause should be mandatory. + %% Default: `true'. + with_from => boolean() +}. + -export_type([select/0]). %% Parse one select statement. -spec parse(string() | binary()) -> {ok, select()} | {error, term()}. parse(Sql) -> - try - case rulesql:parsetree(Sql) of - {ok, {select, Clauses}} -> - {ok, #select{ - is_foreach = false, - fields = get_value(fields, Clauses), - doeach = [], - incase = {}, - from = get_value(from, Clauses), - where = get_value(where, Clauses) - }}; - {ok, {foreach, Clauses}} -> - {ok, #select{ - is_foreach = true, - fields = get_value(fields, Clauses), - doeach = get_value(do, Clauses, []), - incase = get_value(incase, Clauses, {}), - from = get_value(from, Clauses), - where = get_value(where, Clauses) - }}; - Error -> - {error, Error} - end - catch - _Error:Reason:StackTrace -> - {error, {Reason, StackTrace}} + parse(Sql, _Opts = #{}). + +-spec parse(string() | binary(), parse_opts()) -> {ok, select()} | {error, term()}. +parse(Sql, Opts) -> + WithFrom = maps:get(with_from, Opts, true), + case do_parse(Sql) of + {ok, Parsed} when WithFrom -> + ensure_non_empty_from(Parsed); + {ok, Parsed} -> + ensure_empty_from(Parsed); + Error = {error, _} -> + Error end. -spec select_fields(select()) -> list(field()). @@ -118,3 +110,45 @@ select_from(#select{from = From}) -> -spec select_where(select()) -> tuple(). select_where(#select{where = Where}) -> Where. + +-spec do_parse(string() | binary()) -> {ok, select()} | {error, term()}. +do_parse(Sql) -> + try + case rulesql:parsetree(Sql) of + {ok, {select, Clauses}} -> + Parsed = #select{ + is_foreach = false, + fields = get_value(fields, Clauses), + doeach = [], + incase = {}, + from = get_value(from, Clauses), + where = get_value(where, Clauses) + }, + {ok, Parsed}; + {ok, {foreach, Clauses}} -> + Parsed = #select{ + is_foreach = true, + fields = get_value(fields, Clauses), + doeach = get_value(do, Clauses, []), + incase = get_value(incase, Clauses, {}), + from = get_value(from, Clauses), + where = get_value(where, Clauses) + }, + {ok, Parsed}; + Error -> + {error, Error} + end + catch + _Error:Reason:StackTrace -> + {error, {Reason, StackTrace}} + end. + +ensure_non_empty_from(#select{from = []}) -> + {error, empty_from_clause}; +ensure_non_empty_from(Parsed) -> + {ok, Parsed}. + +ensure_empty_from(#select{from = [_ | _]}) -> + {error, non_empty_from_clause}; +ensure_empty_from(Parsed) -> + {ok, Parsed}. diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index b31889ab4..d7fdad2cd 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -27,12 +27,19 @@ %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))). init_per_suite(Config) -> - application:load(emqx_conf), - ConfigConf = <<"rule_engine {jq_function_default_timeout=10s}">>, - ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + {emqx_rule_engine, "rule_engine {jq_function_default_timeout=10s}"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), ok. eventmsg_publish(Msg) -> diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 0be489696..fb7424b4a 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -66,7 +66,8 @@ tcp_keepalive_opts/4, format/1, call_first_defined/1, - ntoa/1 + ntoa/1, + foldl_while/3 ]). -export([ @@ -176,6 +177,17 @@ pipeline([Fun | More], Input, State) -> {error, Reason, NState} -> {error, Reason, NState} end. +-spec foldl_while(fun((X, Acc) -> {cont | halt, Acc}), Acc, [X]) -> Acc. +foldl_while(_Fun, Acc, []) -> + Acc; +foldl_while(Fun, Acc, [X | Xs]) -> + case Fun(X, Acc) of + {cont, NewAcc} -> + foldl_while(Fun, NewAcc, Xs); + {halt, NewAcc} -> + NewAcc + end. + -compile({inline, [apply_fun/3]}). apply_fun(Fun, Input, State) -> case erlang:fun_info(Fun, arity) of diff --git a/apps/emqx_utils/src/emqx_utils_maps.erl b/apps/emqx_utils/src/emqx_utils_maps.erl index 07e36607d..96051a722 100644 --- a/apps/emqx_utils/src/emqx_utils_maps.erl +++ b/apps/emqx_utils/src/emqx_utils_maps.erl @@ -158,7 +158,9 @@ deep_convert(Val, _, _Args) -> unsafe_atom_key_map(Map) -> convert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end). --spec binary_key_map(map()) -> map(). +-spec binary_key_map + (map()) -> map(); + (list()) -> list(). binary_key_map(Map) -> deep_convert( Map, diff --git a/apps/emqx_utils/test/emqx_utils_tests.erl b/apps/emqx_utils/test/emqx_utils_tests.erl index 26c4cbfca..fa4a29622 100644 --- a/apps/emqx_utils/test/emqx_utils_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_tests.erl @@ -28,3 +28,37 @@ is_redacted_test_() -> ?_assert(emqx_utils:is_redacted(password, fun() -> <<"******">> end)), ?_assert(emqx_utils:is_redacted(password, emqx_secret:wrap(<<"******">>))) ]. + +foldl_while_test_() -> + [ + ?_assertEqual( + [3, 2, 1], + emqx_utils:foldl_while(fun(X, Acc) -> {cont, [X | Acc]} end, [], [1, 2, 3]) + ), + ?_assertEqual( + [1], + emqx_utils:foldl_while( + fun + (X, Acc) when X == 2 -> + {halt, Acc}; + (X, Acc) -> + {cont, [X | Acc]} + end, + [], + [1, 2, 3] + ) + ), + ?_assertEqual( + finished, + emqx_utils:foldl_while( + fun + (X, _Acc) when X == 3 -> + {halt, finished}; + (X, Acc) -> + {cont, [X | Acc]} + end, + [], + [1, 2, 3] + ) + ) + ]. diff --git a/changes/ce/feat-12711.en.md b/changes/ce/feat-12711.en.md new file mode 100644 index 000000000..66349a1b9 --- /dev/null +++ b/changes/ce/feat-12711.en.md @@ -0,0 +1,3 @@ +Implemented message validation feature. + +With this feature, once validations are configured for certain topic filters, the configured checks are run against published messages and, if they are not accepted by a validation, the message is dropped and the client may be disconnected, depending on the configuration. diff --git a/mix.exs b/mix.exs index ecf8ed168..1e6e932db 100644 --- a/mix.exs +++ b/mix.exs @@ -65,7 +65,7 @@ defmodule EMQXUmbrella.MixProject do # maybe forbid to fetch quicer {:emqtt, github: "emqx/emqtt", tag: "1.10.1", override: true, system_env: maybe_no_quic_env()}, - {:rulesql, github: "emqx/rulesql", tag: "0.1.8"}, + {:rulesql, github: "emqx/rulesql", tag: "0.2.0"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, {:telemetry, "1.1.0"}, @@ -184,6 +184,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_s3, :emqx_bridge_s3, :emqx_schema_registry, + :emqx_message_validation, :emqx_enterprise, :emqx_bridge_kinesis, :emqx_bridge_azure_event_hub, diff --git a/rebar.config b/rebar.config index 262dff039..3f58749bf 100644 --- a/rebar.config +++ b/rebar.config @@ -91,7 +91,7 @@ {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}}, - {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.8"}}}, + {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.0"}}}, % NOTE: depends on recon 2.5.x {observer_cli, "1.7.1"}, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}, diff --git a/rebar.config.erl b/rebar.config.erl index 1ca2dee09..a81d162a9 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -115,6 +115,7 @@ is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false; is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false; is_community_umbrella_app("apps/emqx_gateway_jt808") -> false; is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false; +is_community_umbrella_app("apps/emqx_message_validation") -> false; is_community_umbrella_app(_) -> true. %% BUILD_WITHOUT_JQ diff --git a/rel/i18n/emqx_message_validation_http_api.hocon b/rel/i18n/emqx_message_validation_http_api.hocon new file mode 100644 index 000000000..d169e2f76 --- /dev/null +++ b/rel/i18n/emqx_message_validation_http_api.hocon @@ -0,0 +1,24 @@ +emqx_message_validation_http_api { + + list_validations.desc: + """List validations""" + + lookup_validation.desc: + """Lookup a validation""" + + update_validation.desc: + """Update a validation""" + + delete_validation.desc: + """Delete a validation""" + + append_validation.desc: + """Append a new validation to the list of validations""" + + move_validation.desc: + """Change the order of a validation in the list of validations""" + + param_path_name.desc: + """Validation name""" + +} diff --git a/rel/i18n/emqx_message_validation_schema.hocon b/rel/i18n/emqx_message_validation_schema.hocon new file mode 100644 index 000000000..bf1524cc1 --- /dev/null +++ b/rel/i18n/emqx_message_validation_schema.hocon @@ -0,0 +1,88 @@ +emqx_message_validation_schema { + + check_avro_type.desc: + """Avro schema check""" + check_avro_type.label: + """Avro schema check""" + + check_avro_schema.desc: + """Schema name to use during check.""" + check_avro_schema.label: + """Schema name""" + + check_json_type.desc: + """JSON schema check""" + check_json_type.label: + """JSON schema check""" + + check_json_schema.desc: + """Schema name to use during check.""" + check_json_schema.label: + """Schema name""" + + check_protobuf_type.desc: + """Protobuf schema check""" + check_protobuf_type.label: + """Protobuf schema check""" + + check_protobuf_schema.desc: + """Schema name to use during check.""" + check_protobuf_schema.label: + """Schema name""" + + check_protobuf_message_name.desc: + """Message name to use during check.""" + check_protobuf_message_name.label: + """Message name""" + + check_sql_type.desc: + """Use rule-engine's SQL to validate the message. SQL here is the same as in rule-engine, + just with the different that the `FROM` clause must be omitted. + A SQL statement which yields any value is considered successfully validated, otherwise failed. + For example SELECT payload.foo + payload.bar as sum WHERE sum > 0 + validates that the sum of field `foo` and `bar` is a positive value.""" + check_sql_type.label: + """SQL schema check""" + + check_sql_schema.desc: + """Schema name to use during check.""" + check_sql_schema.label: + """Schema name""" + + topics.desc: + """A single topic filter or list of topic filters that this validation should validate.""" + topics.label: + """Topic filter(s)""" + + name.desc: + """The name for this validation. Must be unique among all validations.""" + name.desc: + """Name""" + + strategy.desc: + """How the validation should consider the checks to be successful. + + all_pass: All checks will be evaluated and must pass. + any_pass: Any passing check will suffice. Stops at the first success.""" + strategy.desc: + """Strategy""" + + failure_action.desc: + """How to proceed if the validation fails. + + drop: The offending message is simply dropped without further processing. + disconnect: The message is not published, and the publishing client is disconnected.""" + failure_action.label: + """Failure action""" + + log_failure_at.desc: + """Log level at which failures will be logged.""" + log_failure_at.label: + """Failure log level""" + + checks.desc: + """Checks that will be performed during validation. They are evaluated in the same order as defined.""" + checks.label: + """Checks""" + +}