diff --git a/apps/emqx/include/emqx_hooks.hrl b/apps/emqx/include/emqx_hooks.hrl
index 3bb2baad2..2e632d545 100644
--- a/apps/emqx/include/emqx_hooks.hrl
+++ b/apps/emqx/include/emqx_hooks.hrl
@@ -25,6 +25,7 @@
-define(HP_AUTHN, 970).
-define(HP_AUTHZ, 960).
-define(HP_SYS_MSGS, 950).
+-define(HP_MSG_VALIDATION, 945).
-define(HP_TOPIC_METRICS, 940).
-define(HP_RETAINER, 930).
-define(HP_AUTO_SUB, 920).
diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl
index b20c3a15f..6f6580517 100644
--- a/apps/emqx/src/emqx_broker.erl
+++ b/apps/emqx/src/emqx_broker.erl
@@ -235,6 +235,12 @@ publish(Msg) when is_record(Msg, message) ->
_ = emqx_trace:publish(Msg),
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
+ #message{headers = #{should_disconnect := true}, topic = Topic} ->
+ ?TRACE("MQTT", "msg_publish_not_allowed_disconnect", #{
+ message => emqx_message:to_log_map(Msg),
+ topic => Topic
+ }),
+ disconnect;
#message{headers = #{allow_publish := false}, topic = Topic} ->
?TRACE("MQTT", "msg_publish_not_allowed", #{
message => emqx_message:to_log_map(Msg),
diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl
index 51b66f4f9..9e783a054 100644
--- a/apps/emqx/src/emqx_channel.erl
+++ b/apps/emqx/src/emqx_channel.erl
@@ -702,14 +702,21 @@ packet_to_message(Packet, #channel{
do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
Result = emqx_broker:publish(Msg),
- NChannel = ensure_quota(Result, Channel),
- {ok, NChannel};
+ case Result of
+ disconnect ->
+ handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
+ _ ->
+ NChannel = ensure_quota(Result, Channel),
+ {ok, NChannel}
+ end;
do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
PubRes = emqx_broker:publish(Msg),
RC = puback_reason_code(PacketId, Msg, PubRes),
case RC of
undefined ->
{ok, Channel};
+ disconnect ->
+ handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
_Value ->
do_finish_publish(PacketId, PubRes, RC, Channel)
end;
@@ -719,6 +726,8 @@ do_publish(
Channel = #channel{clientinfo = ClientInfo, session = Session}
) ->
case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
+ {ok, disconnect, _NSession} ->
+ handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
{ok, PubRes, NSession} ->
RC = pubrec_reason_code(PubRes),
NChannel0 = Channel#channel{session = NSession},
@@ -763,7 +772,9 @@ pubrec_reason_code([_ | _]) -> ?RC_SUCCESS.
puback_reason_code(PacketId, Msg, [] = PubRes) ->
emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS);
puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->
- emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS).
+ emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS);
+puback_reason_code(_PacketId, _Msg, disconnect) ->
+ disconnect.
-compile({inline, [after_message_acked/3]}).
after_message_acked(ClientInfo, Msg, PubAckProps) ->
diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl
index c53188753..99a71e20b 100644
--- a/apps/emqx/src/emqx_types.erl
+++ b/apps/emqx/src/emqx_types.erl
@@ -258,11 +258,13 @@
-type deliver() :: {deliver, topic(), message()}.
-type delivery() :: #delivery{}.
-type deliver_result() :: ok | {ok, non_neg_integer()} | {error, term()}.
--type publish_result() :: [
- {node(), topic(), deliver_result()}
- | {share, topic(), deliver_result()}
- | persisted
-].
+-type publish_result() ::
+ [
+ {node(), topic(), deliver_result()}
+ | {share, topic(), deliver_result()}
+ | persisted
+ ]
+ | disconnect.
-type route() :: #route{}.
-type route_entry() :: {topic(), node()} | {topic, group()}.
-type command() :: #command{}.
diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl
index eae12145f..6a3eea5d9 100644
--- a/apps/emqx/test/emqx_cth_suite.erl
+++ b/apps/emqx/test/emqx_cth_suite.erl
@@ -380,6 +380,10 @@ default_appspec(emqx_dashboard, _SuiteOpts) ->
true = emqx_dashboard_listener:is_ready(infinity)
end
};
+default_appspec(emqx_schema_registry, _SuiteOpts) ->
+ #{schema_mod => emqx_schema_registry_schema, config => #{}};
+default_appspec(emqx_message_validation, _SuiteOpts) ->
+ #{schema_mod => emqx_message_validation_schema, config => #{}};
default_appspec(_, _) ->
#{}.
diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm
index a6559bcab..ef42c0fc9 100644
--- a/apps/emqx_machine/priv/reboot_lists.eterm
+++ b/apps/emqx_machine/priv/reboot_lists.eterm
@@ -88,6 +88,7 @@
[
emqx_license,
emqx_enterprise,
+ emqx_message_validation,
emqx_bridge_kafka,
emqx_bridge_pulsar,
emqx_bridge_gcp_pubsub,
diff --git a/apps/emqx_message_validation/BSL.txt b/apps/emqx_message_validation/BSL.txt
new file mode 100644
index 000000000..f0cd31c6f
--- /dev/null
+++ b/apps/emqx_message_validation/BSL.txt
@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor: Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work: EMQX Enterprise Edition
+ The Licensed Work is (c) 2023
+ Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+ modify, and create derivative work for research
+ or education.
+Change Date: 2028-01-26
+Change License: Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+ or a license that is compatible with GPL Version 2.0 or a later version,
+ where “compatible” means that software provided under the Change License can
+ be included in a program with software provided under GPL Version 2.0 or a
+ later version. Licensor may specify additional Change Licenses without
+ limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+ impose any additional restriction on the right granted in this License, as
+ the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.
diff --git a/apps/emqx_message_validation/README.md b/apps/emqx_message_validation/README.md
new file mode 100644
index 000000000..c32e74147
--- /dev/null
+++ b/apps/emqx_message_validation/README.md
@@ -0,0 +1,29 @@
+# EMQX Message Validation
+
+This application encapsulates the functionality to validate incoming or internally
+triggered published payloads and take an action upon failure, which can be to just drop
+the message without further processing, or to disconnect the offending client as well.
+
+# Documentation
+
+Refer to [Message
+Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-validation.html)
+for more information about the semantics and checks available.
+
+# HTTP APIs
+
+APIs are provided for validation management, which includes creating,
+updating, looking up, deleting, listing validations.
+
+Refer to [API Docs -
+Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Validation)
+for more detailed information.
+
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).
diff --git a/apps/emqx_message_validation/rebar.config b/apps/emqx_message_validation/rebar.config
new file mode 100644
index 000000000..170115ef6
--- /dev/null
+++ b/apps/emqx_message_validation/rebar.config
@@ -0,0 +1,16 @@
+%% -*- mode: erlang -*-
+
+{erl_opts, [
+ warn_unused_vars,
+ warn_shadow_vars,
+ warn_unused_import,
+ warn_obsolete_guard,
+ warnings_as_errors,
+ debug_info
+]}.
+{deps, [
+ {emqx, {path, "../emqx"}},
+ {emqx_utils, {path, "../emqx_utils"}},
+ {emqx_rule_engine, {path, "../emqx_rule_engine"}},
+ {emqx_schema_registry, {path, "../emqx_schema_registry"}}
+]}.
diff --git a/apps/emqx_message_validation/src/emqx_message_validation.app.src b/apps/emqx_message_validation/src/emqx_message_validation.app.src
new file mode 100644
index 000000000..077f839a9
--- /dev/null
+++ b/apps/emqx_message_validation/src/emqx_message_validation.app.src
@@ -0,0 +1,14 @@
+{application, emqx_message_validation, [
+ {description, "EMQX Message Validation"},
+ {vsn, "0.1.0"},
+ {registered, [emqx_message_validation_sup, emqx_message_validation_registry]},
+ {mod, {emqx_message_validation_app, []}},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {env, []},
+ {modules, []},
+
+ {links, []}
+]}.
diff --git a/apps/emqx_message_validation/src/emqx_message_validation.erl b/apps/emqx_message_validation/src/emqx_message_validation.erl
new file mode 100644
index 000000000..3513a4210
--- /dev/null
+++ b/apps/emqx_message_validation/src/emqx_message_validation.erl
@@ -0,0 +1,380 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation).
+
+-include_lib("emqx_utils/include/emqx_message.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%% API
+-export([
+ add_handler/0,
+ remove_handler/0,
+
+ load/0,
+ unload/0,
+
+ list/0,
+ move/2,
+ lookup/1,
+ insert/1,
+ update/1,
+ delete/1
+]).
+
+%% `emqx_hooks' API
+-export([
+ register_hooks/0,
+ unregister_hooks/0,
+
+ on_message_publish/1
+]).
+
+%% `emqx_config_handler' API
+-export([pre_config_update/3, post_config_update/5]).
+
+%% Internal exports
+-export([parse_sql_check/1]).
+
+%% Internal functions; exported for tests
+-export([
+ evaluate_sql_check/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-define(TRACE_TAG, "MESSAGE_VALIDATION").
+-define(CONF_ROOT, message_validation).
+-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
+
+-type validation_name() :: binary().
+-type validation() :: _TODO.
+-type position() :: front | rear | {'after', validation_name()} | {before, validation_name()}.
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec add_handler() -> ok.
+add_handler() ->
+ ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
+ ok.
+
+-spec remove_handler() -> ok.
+remove_handler() ->
+ ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
+ ok.
+
+load() ->
+ lists:foreach(fun insert/1, emqx:get_config(?VALIDATIONS_CONF_PATH, [])).
+
+unload() ->
+ lists:foreach(fun delete/1, emqx:get_config(?VALIDATIONS_CONF_PATH, [])).
+
+-spec list() -> [validation()].
+list() ->
+ emqx:get_config(?VALIDATIONS_CONF_PATH, []).
+
+-spec move(validation_name(), position()) ->
+ {ok, _} | {error, _}.
+move(Name, Position) ->
+ emqx:update_config(
+ ?VALIDATIONS_CONF_PATH,
+ {move, Name, Position},
+ #{override_to => cluster}
+ ).
+
+-spec lookup(validation_name()) -> {ok, validation()} | {error, not_found}.
+lookup(Name) ->
+ Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
+ do_lookup(Name, Validations).
+
+-spec insert(validation()) ->
+ {ok, _} | {error, _}.
+insert(Validation) ->
+ emqx:update_config(
+ ?VALIDATIONS_CONF_PATH,
+ {append, Validation},
+ #{override_to => cluster}
+ ).
+
+-spec update(validation()) ->
+ {ok, _} | {error, _}.
+update(Validation) ->
+ emqx:update_config(
+ ?VALIDATIONS_CONF_PATH,
+ {update, Validation},
+ #{override_to => cluster}
+ ).
+
+-spec delete(validation_name()) ->
+ {ok, _} | {error, _}.
+delete(Name) ->
+ emqx:update_config(
+ ?VALIDATIONS_CONF_PATH,
+ {delete, Name},
+ #{override_to => cluster}
+ ).
+
+%%------------------------------------------------------------------------------
+%% Hooks
+%%------------------------------------------------------------------------------
+
+-spec register_hooks() -> ok.
+register_hooks() ->
+ emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_MSG_VALIDATION).
+
+-spec unregister_hooks() -> ok.
+unregister_hooks() ->
+ emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
+
+-spec on_message_publish(emqx_types:message()) ->
+ {ok, emqx_types:message()} | {stop, emqx_types:message()}.
+on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
+ case emqx_message_validation_registry:matching_validations(Topic) of
+ [] ->
+ ok;
+ Validations ->
+ case run_validations(Validations, Message) of
+ ok ->
+ {ok, Message};
+ drop ->
+ {stop, Message#message{headers = Headers#{allow_publish => false}}};
+ disconnect ->
+ {stop, Message#message{
+ headers = Headers#{
+ allow_publish => false,
+ should_disconnect => true
+ }
+ }}
+ end
+ end.
+
+%%------------------------------------------------------------------------------
+%% `emqx_config_handler' API
+%%------------------------------------------------------------------------------
+
+pre_config_update(?VALIDATIONS_CONF_PATH, {append, Validation}, OldValidations) ->
+ Validations = OldValidations ++ [Validation],
+ {ok, Validations};
+pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations) ->
+ replace(OldValidations, Validation);
+pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
+ delete(OldValidations, Validation);
+pre_config_update(?VALIDATIONS_CONF_PATH, {move, Name, Position}, OldValidations) ->
+ move(OldValidations, Name, Position).
+
+post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
+ {Pos, Validation} = fetch_with_index(New, Name),
+ ok = emqx_message_validation_registry:insert(Pos, Validation),
+ ok;
+post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
+ {_Pos, OldValidation} = fetch_with_index(Old, Name),
+ {Pos, NewValidation} = fetch_with_index(New, Name),
+ ok = emqx_message_validation_registry:update(OldValidation, Pos, NewValidation),
+ ok;
+post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
+ {_Pos, Validation} = fetch_with_index(Old, Name),
+ ok = emqx_message_validation_registry:delete(Validation),
+ ok;
+post_config_update(?VALIDATIONS_CONF_PATH, {move, _Name, _Position}, New, _Old, _AppEnvs) ->
+ ok = emqx_message_validation_registry:reindex_positions(New),
+ ok.
+
+%%------------------------------------------------------------------------------
+%% Internal exports
+%%------------------------------------------------------------------------------
+
+parse_sql_check(SQL) ->
+ case emqx_rule_sqlparser:parse(SQL, #{with_from => false}) of
+ {ok, Select} ->
+ case emqx_rule_sqlparser:select_is_foreach(Select) of
+ true ->
+ {error, foreach_not_allowed};
+ false ->
+ Check = #{
+ type => sql,
+ fields => emqx_rule_sqlparser:select_fields(Select),
+ conditions => emqx_rule_sqlparser:select_where(Select)
+ },
+ {ok, Check}
+ end;
+ Error = {error, _} ->
+ Error
+ end.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+evaluate_sql_check(Check, Message) ->
+ #{
+ fields := Fields,
+ conditions := Conditions
+ } = Check,
+ {Data, _} = emqx_rule_events:eventmsg_publish(Message),
+ try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of
+ {ok, _} ->
+ true;
+ false ->
+ false
+ catch
+ throw:_Reason ->
+ %% TODO: log?
+ false;
+ _Class:_Error:_Stacktrace ->
+ %% TODO: log?
+ false
+ end.
+
+evaluate_schema_check(Check, #message{payload = Data}) ->
+ #{schema := SerdeName} = Check,
+ ExtraArgs =
+ case Check of
+ #{type := protobuf, message_name := MessageName} ->
+ [MessageName];
+ _ ->
+ []
+ end,
+ try
+ emqx_schema_registry_serde:handle_rule_function(schema_check, [SerdeName, Data | ExtraArgs])
+ catch
+ error:{serde_not_found, _} ->
+ false;
+ _Class:_Error:_Stacktrace ->
+ %% TODO: log?
+ false
+ end.
+
+replace(OldValidations, Validation = #{<<"name">> := Name}) ->
+ {Found, RevNewValidations} =
+ lists:foldl(
+ fun
+ (#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
+ {true, [Validation | Acc]};
+ (Val, {FoundIn, Acc}) ->
+ {FoundIn, [Val | Acc]}
+ end,
+ {false, []},
+ OldValidations
+ ),
+ case Found of
+ true ->
+ {ok, lists:reverse(RevNewValidations)};
+ false ->
+ {error, not_found}
+ end.
+
+delete(OldValidations, Name) ->
+ {Found, RevNewValidations} =
+ lists:foldl(
+ fun
+ (#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
+ {true, Acc};
+ (Val, {FoundIn, Acc}) ->
+ {FoundIn, [Val | Acc]}
+ end,
+ {false, []},
+ OldValidations
+ ),
+ case Found of
+ true ->
+ {ok, lists:reverse(RevNewValidations)};
+ false ->
+ {error, not_found}
+ end.
+
+move(OldValidations, Name, front) ->
+ {Validation, Front, Rear} = take(Name, OldValidations),
+ {ok, [Validation | Front ++ Rear]};
+move(OldValidations, Name, rear) ->
+ {Validation, Front, Rear} = take(Name, OldValidations),
+ {ok, Front ++ Rear ++ [Validation]};
+move(OldValidations, Name, {'after', OtherName}) ->
+ {Validation, Front1, Rear1} = take(Name, OldValidations),
+ {OtherValidation, Front2, Rear2} = take(OtherName, Front1 ++ Rear1),
+ {ok, Front2 ++ [OtherValidation, Validation] ++ Rear2};
+move(OldValidations, Name, {before, OtherName}) ->
+ {Validation, Front1, Rear1} = take(Name, OldValidations),
+ {OtherValidation, Front2, Rear2} = take(OtherName, Front1 ++ Rear1),
+ {ok, Front2 ++ [Validation, OtherValidation] ++ Rear2}.
+
+fetch_with_index([{Pos, #{name := Name} = Validation} | _Rest], Name) ->
+ {Pos, Validation};
+fetch_with_index([{_, _} | Rest], Name) ->
+ fetch_with_index(Rest, Name);
+fetch_with_index(Validations, Name) ->
+ fetch_with_index(lists:enumerate(Validations), Name).
+
+take(Name, Validations) ->
+ case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Validations) of
+ {_Front, []} ->
+ throw({validation_not_found, Name});
+ {Front, [Found | Rear]} ->
+ {Found, Front, Rear}
+ end.
+
+do_lookup(_Name, _Validations = []) ->
+ {error, not_found};
+do_lookup(Name, [#{name := Name} = Validation | _Rest]) ->
+ {ok, Validation};
+do_lookup(Name, [_ | Rest]) ->
+ do_lookup(Name, Rest).
+
+run_validations(Validations, Message) ->
+ try
+ emqx_rule_runtime:clear_rule_payload(),
+ Fun = fun(Validation, Acc) ->
+ #{
+ name := Name,
+ log_failure_at := FailureLogLevel
+ } = Validation,
+ case run_validation(Validation, Message) of
+ ok ->
+ {cont, Acc};
+ FailureAction ->
+ ?TRACE(
+ FailureLogLevel,
+ ?TRACE_TAG,
+ "validation_failed",
+ #{validation => Name, action => FailureAction}
+ ),
+ {halt, FailureAction}
+ end
+ end,
+ emqx_utils:foldl_while(Fun, _Passed = ok, Validations)
+ after
+ emqx_rule_runtime:clear_rule_payload()
+ end.
+
+run_validation(#{strategy := all_pass} = Validation, Message) ->
+ #{
+ checks := Checks,
+ failure_action := FailureAction
+ } = Validation,
+ Fun = fun(Check, Acc) ->
+ case run_check(Check, Message) of
+ true -> {cont, Acc};
+ false -> {halt, FailureAction}
+ end
+ end,
+ emqx_utils:foldl_while(Fun, _Passed = ok, Checks);
+run_validation(#{strategy := any_pass} = Validation, Message) ->
+ #{
+ checks := Checks,
+ failure_action := FailureAction
+ } = Validation,
+ case lists:any(fun(C) -> run_check(C, Message) end, Checks) of
+ true ->
+ ok;
+ false ->
+ FailureAction
+ end.
+
+run_check(#{type := sql} = Check, Message) ->
+ evaluate_sql_check(Check, Message);
+run_check(Check, Message) ->
+ evaluate_schema_check(Check, Message).
diff --git a/apps/emqx_message_validation/src/emqx_message_validation_app.erl b/apps/emqx_message_validation/src/emqx_message_validation_app.erl
new file mode 100644
index 000000000..4e566eedd
--- /dev/null
+++ b/apps/emqx_message_validation/src/emqx_message_validation_app.erl
@@ -0,0 +1,32 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_app).
+
+-behaviour(application).
+
+%% `application' API
+-export([start/2, stop/1]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% `application' API
+%%------------------------------------------------------------------------------
+
+-spec start(application:start_type(), term()) -> {ok, pid()}.
+start(_Type, _Args) ->
+ {ok, Sup} = emqx_message_validation_sup:start_link(),
+ ok = emqx_message_validation:add_handler(),
+ ok = emqx_message_validation:register_hooks(),
+ ok = emqx_message_validation:load(),
+ {ok, Sup}.
+
+-spec stop(term()) -> ok.
+stop(_State) ->
+ ok = emqx_message_validation:unload(),
+ ok = emqx_message_validation:unregister_hooks(),
+ ok = emqx_message_validation:remove_handler(),
+ ok.
diff --git a/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl b/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl
new file mode 100644
index 000000000..5cbb911df
--- /dev/null
+++ b/apps/emqx_message_validation/src/emqx_message_validation_http_api.erl
@@ -0,0 +1,385 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_http_api).
+
+-behaviour(minirest_api).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_utils/include/emqx_utils_api.hrl").
+
+%% `minirest' and `minirest_trails' API
+-export([
+ namespace/0,
+ api_spec/0,
+ fields/1,
+ paths/0,
+ schema/1
+]).
+
+%% `minirest' handlers
+-export([
+ '/message_validations'/2,
+ '/message_validations/:name'/2,
+ '/message_validations/:name/move'/2
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% Type definitions
+%%-------------------------------------------------------------------------------------------------
+
+-define(TAGS, [<<"Message Validation">>]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `minirest' and `minirest_trails' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() -> "message_validation_http_api".
+
+api_spec() ->
+ emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+ [
+ "/message_validations",
+ "/message_validations/:name",
+ "/message_validations/:name/move"
+ ].
+
+schema("/message_validations") ->
+ #{
+ 'operationId' => '/message_validations',
+ get => #{
+ tags => ?TAGS,
+ summary => <<"List validations">>,
+ description => ?DESC("list_validations"),
+ responses =>
+ #{
+ 200 =>
+ emqx_dashboard_swagger:schema_with_examples(
+ hoconsc:array(
+ emqx_message_validation_schema:api_schema(list)
+ ),
+ #{
+ sample =>
+ #{value => example_return_list()}
+ }
+ )
+ }
+ },
+ post => #{
+ tags => ?TAGS,
+ summary => <<"Append a new validation">>,
+ description => ?DESC("append_validation"),
+ 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+ emqx_message_validation_schema:api_schema(post),
+ example_input_create()
+ ),
+ responses =>
+ #{
+ 201 =>
+ emqx_dashboard_swagger:schema_with_examples(
+ emqx_message_validation_schema:api_schema(post),
+ example_return_create()
+ ),
+ 400 => error_schema('ALREADY_EXISTS', "Validation already exists")
+ }
+ },
+ put => #{
+ tags => ?TAGS,
+ summary => <<"Update a validation">>,
+ description => ?DESC("update_validation"),
+ 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+ emqx_message_validation_schema:api_schema(put),
+ example_input_update()
+ ),
+ responses =>
+ #{
+ 200 =>
+ emqx_dashboard_swagger:schema_with_examples(
+ emqx_message_validation_schema:api_schema(put),
+ example_return_update()
+ ),
+ 404 => error_schema('NOT_FOUND', "Validation not found"),
+ 400 => error_schema('BAD_REQUEST', "Bad params")
+ }
+ }
+ };
+schema("/message_validations/:name") ->
+ #{
+ 'operationId' => '/message_validations/:name',
+ get => #{
+ tags => ?TAGS,
+ summary => <<"Lookup a validation">>,
+ description => ?DESC("lookup_validation"),
+ parameters => [param_path_name()],
+ responses =>
+ #{
+ 200 =>
+ emqx_dashboard_swagger:schema_with_examples(
+ hoconsc:array(
+ emqx_message_validation_schema:api_schema(lookup)
+ ),
+ #{
+ sample =>
+ #{value => example_return_lookup()}
+ }
+ ),
+ 404 => error_schema('NOT_FOUND', "Validation not found")
+ }
+ },
+ delete => #{
+ tags => ?TAGS,
+ summary => <<"Delete a validation">>,
+ description => ?DESC("delete_validation"),
+ parameters => [param_path_name()],
+ responses =>
+ #{
+ 204 => <<"Validation deleted">>,
+ 404 => error_schema('NOT_FOUND', "Validation not found")
+ }
+ }
+ };
+schema("/message_validations/:name/move") ->
+ #{
+ 'operationId' => '/message_validations/:name/move',
+ post => #{
+ tags => ?TAGS,
+ summary => <<"Change the order of a validation">>,
+ description => ?DESC("move_validation"),
+ parameters => [param_path_name()],
+ 'requestBody' =>
+ emqx_dashboard_swagger:schema_with_examples(
+ hoconsc:union(fun position_union_member_selector/1),
+ example_position()
+ ),
+ responses =>
+ #{
+ 204 => <<"No Content">>,
+ 400 => error_schema('BAD_REQUEST', <<"Bad request">>),
+ 404 => error_schema('NOT_FOUND', "Validation not found")
+ }
+ }
+ }.
+
+param_path_name() ->
+ {name,
+ mk(
+ binary(),
+ #{
+ in => path,
+ required => true,
+ example => <<"my_validation">>,
+ desc => ?DESC("param_path_name")
+ }
+ )}.
+
+fields(front) ->
+ [{position, mk(front, #{default => front, required => true, in => body})}];
+fields(rear) ->
+ [{position, mk(rear, #{default => rear, required => true, in => body})}];
+fields('after') ->
+ [
+ {position, mk('after', #{default => 'after', required => true, in => body})},
+ {validation, mk(binary(), #{required => true, in => body})}
+ ];
+fields(before) ->
+ [
+ {position, mk(before, #{default => before, required => true, in => body})},
+ {validation, mk(binary(), #{required => true, in => body})}
+ ].
+
+%%-------------------------------------------------------------------------------------------------
+%% `minirest' handlers
+%%-------------------------------------------------------------------------------------------------
+
+'/message_validations'(get, _Params) ->
+ ?OK(emqx_message_validation:list());
+'/message_validations'(post, #{body := Params = #{<<"name">> := Name}}) ->
+ with_validation(
+ Name,
+ return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)),
+ fun() ->
+ case emqx_message_validation:insert(Params) of
+ {ok, _} ->
+ {ok, Res} = emqx_message_validation:lookup(Name),
+ {201, Res};
+ {error, Error} ->
+ ?BAD_REQUEST(Error)
+ end
+ end
+ );
+'/message_validations'(put, #{body := Params = #{<<"name">> := Name}}) ->
+ with_validation(
+ Name,
+ fun() ->
+ case emqx_message_validation:update(Params) of
+ {ok, _} ->
+ {ok, Res} = emqx_message_validation:lookup(Name),
+ {200, Res};
+ {error, Error} ->
+ ?BAD_REQUEST(Error)
+ end
+ end,
+ not_found()
+ ).
+
+'/message_validations/:name'(get, #{bindings := #{name := Name}}) ->
+ with_validation(
+ Name,
+ fun(Validation) -> ?OK(Validation) end,
+ not_found()
+ );
+'/message_validations/:name'(delete, #{bindings := #{name := Name}}) ->
+ with_validation(
+ Name,
+ fun() ->
+ case emqx_message_validation:delete(Name) of
+ {ok, _} ->
+ ?NO_CONTENT;
+ {error, Error} ->
+ ?BAD_REQUEST(Error)
+ end
+ end,
+ not_found()
+ ).
+
+'/message_validations/:name/move'(post, #{bindings := #{name := Name}, body := Body}) ->
+ with_validation(
+ Name,
+ fun() ->
+ do_move(Name, parse_position(Body))
+ end,
+ not_found(Name)
+ ).
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+ref(Struct) -> hoconsc:ref(?MODULE, Struct).
+mk(Type, Opts) -> hoconsc:mk(Type, Opts).
+
+example_input_create() ->
+ %% TODO
+ #{}.
+
+example_input_update() ->
+ %% TODO
+ #{}.
+
+example_return_list() ->
+ %% TODO
+ [].
+
+example_return_create() ->
+ %% TODO
+ #{}.
+
+example_return_update() ->
+ %% TODO
+ #{}.
+
+example_return_lookup() ->
+ %% TODO
+ #{}.
+
+example_position() ->
+ %% TODO
+ #{}.
+
+error_schema(Code, Message) when is_atom(Code) ->
+ error_schema([Code], Message);
+error_schema(Codes, Message) when is_list(Message) ->
+ error_schema(Codes, list_to_binary(Message));
+error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
+ emqx_dashboard_swagger:error_codes(Codes, Message).
+
+position_union_member_selector(all_union_members) ->
+ position_refs();
+position_union_member_selector({value, V}) ->
+ position_refs(V).
+
+position_refs() ->
+ [].
+
+position_types() ->
+ [
+ front,
+ rear,
+ 'after',
+ before
+ ].
+
+position_refs(#{<<"position">> := <<"front">>}) ->
+ [ref(front)];
+position_refs(#{<<"position">> := <<"rear">>}) ->
+ [ref(rear)];
+position_refs(#{<<"position">> := <<"after">>}) ->
+ [ref('after')];
+position_refs(#{<<"position">> := <<"before">>}) ->
+ [ref(before)];
+position_refs(_) ->
+ Expected = lists:join(" | ", [atom_to_list(T) || T <- position_types()]),
+ throw(#{
+ field_name => position,
+ expected => iolist_to_binary(Expected)
+ }).
+
+%% Schema is already checked, so we don't need to do further validation.
+parse_position(#{<<"position">> := <<"front">>}) ->
+ front;
+parse_position(#{<<"position">> := <<"rear">>}) ->
+ rear;
+parse_position(#{<<"position">> := <<"after">>, <<"validation">> := OtherValidationName}) ->
+ {'after', OtherValidationName};
+parse_position(#{<<"position">> := <<"before">>, <<"validation">> := OtherValidationName}) ->
+ {before, OtherValidationName}.
+
+do_move(ValidationName, {_, OtherValidationName} = Position) ->
+ with_validation(
+ OtherValidationName,
+ fun() ->
+ case emqx_message_validation:move(ValidationName, Position) of
+ {ok, _} ->
+ ?NO_CONTENT;
+ {error, Error} ->
+ ?BAD_REQUEST(Error)
+ end
+ end,
+ bad_request_not_found(OtherValidationName)
+ );
+do_move(ValidationName, Position) ->
+ case emqx_message_validation:move(ValidationName, Position) of
+ {ok, _} ->
+ ?NO_CONTENT;
+ {error, Error} ->
+ ?BAD_REQUEST(Error)
+ end.
+
+with_validation(Name, FoundFn, NotFoundFn) ->
+ case emqx_message_validation:lookup(Name) of
+ {ok, Validation} ->
+ {arity, Arity} = erlang:fun_info(FoundFn, arity),
+ case Arity of
+ 1 -> FoundFn(Validation);
+ 0 -> FoundFn()
+ end;
+ {error, not_found} ->
+ NotFoundFn()
+ end.
+
+return(Response) ->
+ fun() -> Response end.
+
+not_found() ->
+ return(?NOT_FOUND(<<"Validation not found">>)).
+
+not_found(Name) ->
+ return(?NOT_FOUND(<<"Validation not found: ", Name/binary>>)).
+
+%% After we found the base validation, but not the other one being referenced in a move.
+bad_request_not_found(Name) ->
+ return(?BAD_REQUEST(<<"Validation not found: ", Name/binary>>)).
diff --git a/apps/emqx_message_validation/src/emqx_message_validation_registry.erl b/apps/emqx_message_validation/src/emqx_message_validation_registry.erl
new file mode 100644
index 000000000..125429565
--- /dev/null
+++ b/apps/emqx_message_validation/src/emqx_message_validation_registry.erl
@@ -0,0 +1,225 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_registry).
+
+-behaviour(gen_server).
+
+%% API
+-export([
+ lookup/1,
+ insert/2,
+ update/3,
+ delete/1,
+ reindex_positions/1,
+
+ matching_validations/1,
+
+ start_link/0,
+ metrics_worker_spec/0
+]).
+
+%% `gen_server' API
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-define(VALIDATION_TOPIC_INDEX, emqx_message_validation_index).
+-define(VALIDATION_TAB, emqx_message_validation_tab).
+
+-define(METRIC_NAME, message_validation).
+-define(METRICS, [
+ 'matched',
+ 'passed',
+ 'failed'
+]).
+-define(RATE_METRICS, ['matched']).
+
+-type validation_name() :: binary().
+-type validation() :: _TODO.
+-type position_index() :: pos_integer().
+
+-record(reindex_positions, {validations :: [validation()]}).
+-record(insert, {pos :: position_index(), validation :: validation()}).
+-record(update, {old :: validation(), pos :: position_index(), new :: validation()}).
+-record(delete, {validation :: validation()}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec start_link() -> gen_server:start_ret().
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec lookup(validation_name()) ->
+ {ok, validation()} | {error, not_found}.
+lookup(Name) ->
+ case emqx_utils_ets:lookup_value(?VALIDATION_TAB, Name, undefined) of
+ undefined ->
+ {error, not_found};
+ Validation ->
+ {ok, Validation}
+ end.
+
+-spec reindex_positions([validation()]) -> ok.
+reindex_positions(Validations) ->
+ gen_server:call(?MODULE, #reindex_positions{validations = Validations}, infinity).
+
+-spec insert(position_index(), validation()) -> ok.
+insert(Pos, Validation) ->
+ gen_server:call(?MODULE, #insert{pos = Pos, validation = Validation}, infinity).
+
+-spec update(validation(), position_index(), validation()) -> ok.
+update(Old, Pos, New) ->
+ gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity).
+
+-spec delete(validation()) -> ok.
+delete(Validation) ->
+ gen_server:call(?MODULE, #delete{validation = Validation}, infinity).
+
+%% @doc Returns a list of matching validation names, sorted by their configuration order.
+-spec matching_validations(emqx_types:topic()) -> [validation()].
+matching_validations(Topic) ->
+ Validations0 = [
+ {Pos, Validation}
+ || M <- emqx_topic_index:matches(Topic, ?VALIDATION_TOPIC_INDEX, [unique]),
+ [Pos] <- [emqx_topic_index:get_record(M, ?VALIDATION_TOPIC_INDEX)],
+ {ok, Validation} <- [
+ lookup(emqx_topic_index:get_id(M))
+ ]
+ ],
+ Validations1 = lists:sort(fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Validations0),
+ lists:map(fun({_Pos, V}) -> V end, Validations1).
+
+-spec metrics_worker_spec() -> supervisor:child_spec().
+metrics_worker_spec() ->
+ emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_) ->
+ create_tables(),
+ State = #{},
+ {ok, State}.
+
+handle_call(#reindex_positions{validations = Validations}, _From, State) ->
+ do_reindex_positions(Validations),
+ {reply, ok, State};
+handle_call(#insert{pos = Pos, validation = Validation}, _From, State) ->
+ do_insert(Pos, Validation),
+ {reply, ok, State};
+handle_call(#update{old = OldValidation, pos = Pos, new = NewValidation}, _From, State) ->
+ ok = do_update(OldValidation, Pos, NewValidation),
+ {reply, ok, State};
+handle_call(#delete{validation = Validation}, _From, State) ->
+ do_delete(Validation),
+ {reply, ok, State};
+handle_call(_Call, _From, State) ->
+ {reply, ignored, State}.
+
+handle_cast(_Cast, State) ->
+ {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+create_tables() ->
+ _ = emqx_utils_ets:new(?VALIDATION_TOPIC_INDEX, [public, ordered_set, {read_concurrency, true}]),
+ _ = emqx_utils_ets:new(?VALIDATION_TAB, [public, ordered_set, {read_concurrency, true}]),
+ ok.
+
+do_reindex_positions(Validations) ->
+ lists:foreach(
+ fun({Pos, Validation}) ->
+ #{
+ name := Name,
+ topics := Topics
+ } = Validation,
+ do_insert_into_tab(Name, Validation, Pos),
+ update_topic_index(Name, Pos, Topics)
+ end,
+ lists:enumerate(Validations)
+ ).
+
+do_insert(Pos, Validation) ->
+ #{
+ name := Name,
+ topics := Topics
+ } = Validation,
+ maybe_create_metrics(Name),
+ do_insert_into_tab(Name, Validation, Pos),
+ update_topic_index(Name, Pos, Topics),
+ ok.
+
+do_update(OldValidation, Pos, NewValidation) ->
+ #{topics := OldTopics} = OldValidation,
+ #{
+ name := Name,
+ topics := NewTopics
+ } = NewValidation,
+ maybe_create_metrics(Name),
+ do_insert_into_tab(Name, NewValidation, Pos),
+ delete_topic_index(Name, OldTopics),
+ update_topic_index(Name, Pos, NewTopics),
+ ok.
+
+do_delete(Validation) ->
+ #{
+ name := Name,
+ topics := Topics
+ } = Validation,
+ ets:delete(?VALIDATION_TAB, Name),
+ delete_topic_index(Name, Topics),
+ drop_metrics(Name),
+ ok.
+
+do_insert_into_tab(Name, Validation0, Pos) ->
+ Validation = transform_validation(Validation0#{pos => Pos}),
+ ets:insert(?VALIDATION_TAB, {Name, Validation}),
+ ok.
+
+maybe_create_metrics(Name) ->
+ case emqx_metrics_worker:has_metrics(?METRIC_NAME, Name) of
+ true ->
+ ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, Name);
+ false ->
+ ok = emqx_metrics_worker:create_metrics(?METRIC_NAME, Name, ?METRICS, ?RATE_METRICS)
+ end.
+
+drop_metrics(Name) ->
+ ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, Name).
+
+update_topic_index(Name, Pos, Topics) ->
+ lists:foreach(
+ fun(Topic) ->
+ true = emqx_topic_index:insert(Topic, Name, Pos, ?VALIDATION_TOPIC_INDEX)
+ end,
+ Topics
+ ).
+
+delete_topic_index(Name, Topics) ->
+ lists:foreach(
+ fun(Topic) ->
+ true = emqx_topic_index:delete(Topic, Name, ?VALIDATION_TOPIC_INDEX)
+ end,
+ Topics
+ ).
+
+transform_validation(Validation = #{checks := Checks}) ->
+ Validation#{checks := lists:map(fun transform_check/1, Checks)}.
+
+transform_check(#{type := sql, sql := SQL}) ->
+ {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
+ Check;
+transform_check(Check) ->
+ Check.
diff --git a/apps/emqx_message_validation/src/emqx_message_validation_schema.erl b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl
new file mode 100644
index 000000000..998e11302
--- /dev/null
+++ b/apps/emqx_message_validation/src/emqx_message_validation_schema.erl
@@ -0,0 +1,206 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% `hocon_schema' API
+-export([
+ namespace/0,
+ roots/0,
+ fields/1
+]).
+
+%% `minirest_trails' API
+-export([
+ api_schema/1
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%------------------------------------------------------------------------------
+
+namespace() -> message_validation.
+
+roots() ->
+ [{message_validation, mk(ref(message_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
+
+fields(message_validation) ->
+ [
+ {validations,
+ mk(
+ hoconsc:array(ref(validation)),
+ #{
+ default => [],
+ desc => ?DESC("validations"),
+ validator => fun validate_unique_names/1
+ }
+ )}
+ ];
+fields(validation) ->
+ [
+ {tags, emqx_schema:tags_schema()},
+ {description, emqx_schema:description_schema()},
+ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+ {name, mk(binary(), #{required => true, desc => ?DESC("name")})},
+ {topics,
+ mk(
+ hoconsc:union([binary(), hoconsc:array(binary())]),
+ #{
+ desc => ?DESC("topics"),
+ converter => fun ensure_array/2,
+ required => true
+ }
+ )},
+ {strategy,
+ mk(
+ hoconsc:enum([any_pass, all_pass]),
+ #{desc => ?DESC("strategy"), required => true}
+ )},
+ {failure_action,
+ mk(
+ hoconsc:enum([drop, disconnect]),
+ #{desc => ?DESC("failure_action"), required => true}
+ )},
+ {log_failure_at,
+ mk(
+ hoconsc:enum([error, warning, notice, info, debug]),
+ #{desc => ?DESC("log_failure_at"), default => info}
+ )},
+ {checks,
+ mk(
+ hoconsc:array(
+ hoconsc:union(fun checks_union_member_selector/1)
+ ),
+ #{
+ required => true,
+ desc => ?DESC("checks"),
+ validator => fun
+ ([]) ->
+ {error, "at least one check must be defined"};
+ (_) ->
+ ok
+ end
+ }
+ )}
+ ];
+fields(check_sql) ->
+ [
+ {type, mk(sql, #{default => sql, desc => ?DESC("check_sql_type")})},
+ {sql,
+ mk(binary(), #{
+ required => true,
+ desc => ?DESC("check_sql_type"),
+ validator => fun validate_sql/1
+ })}
+ ];
+fields(check_json) ->
+ [
+ {type, mk(json, #{default => json, desc => ?DESC("check_json_type")})},
+ {schema, mk(binary(), #{required => true, desc => ?DESC("check_json_type")})}
+ ];
+fields(check_protobuf) ->
+ [
+ {type, mk(protobuf, #{default => protobuf, desc => ?DESC("check_protobuf_type")})},
+ {schema, mk(binary(), #{required => true, desc => ?DESC("check_protobuf_schema")})},
+ {message_name,
+ mk(binary(), #{required => true, desc => ?DESC("check_protobuf_message_name")})}
+ ];
+fields(check_avro) ->
+ [
+ {type, mk(avro, #{default => avro, desc => ?DESC("check_avro_type")})},
+ {schema, mk(binary(), #{required => true, desc => ?DESC("check_avro_schema")})}
+ ].
+
+checks_union_member_selector(all_union_members) ->
+ checks_refs();
+checks_union_member_selector({value, V}) ->
+ checks_refs(V).
+
+checks_refs() ->
+ [ref(CheckType) || CheckType <- check_types()].
+
+check_types() ->
+ [
+ check_sql,
+ check_json,
+ check_avro,
+ check_protobuf
+ ].
+
+checks_refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
+ checks_refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
+checks_refs(#{<<"type">> := <<"sql">>}) ->
+ [ref(check_sql)];
+checks_refs(#{<<"type">> := <<"json">>}) ->
+ [ref(check_json)];
+checks_refs(#{<<"type">> := <<"avro">>}) ->
+ [ref(check_avro)];
+checks_refs(#{<<"type">> := <<"protobuf">>}) ->
+ [ref(check_protobuf)];
+checks_refs(_Value) ->
+ Expected = lists:join(
+ " | ",
+ [
+ Name
+ || T <- check_types(),
+ "check_" ++ Name <- [atom_to_list(T)]
+ ]
+ ),
+ throw(#{
+ field_name => type,
+ expected => iolist_to_binary(Expected)
+ }).
+
+%%------------------------------------------------------------------------------
+%% `minirest_trails' API
+%%------------------------------------------------------------------------------
+
+api_schema(list) ->
+ hoconsc:array(ref(validation));
+api_schema(lookup) ->
+ ref(validation);
+api_schema(post) ->
+ ref(validation);
+api_schema(put) ->
+ ref(validation).
+
+%%------------------------------------------------------------------------------
+%% Internal exports
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+mk(Type, Meta) -> hoconsc:mk(Type, Meta).
+ref(Name) -> hoconsc:ref(?MODULE, Name).
+
+ensure_array(undefined, _) -> undefined;
+ensure_array(L, _) when is_list(L) -> L;
+ensure_array(B, _) -> [B].
+
+validate_sql(SQL) ->
+ case emqx_message_validation:parse_sql_check(SQL) of
+ {ok, _Parsed} ->
+ ok;
+ Error = {error, _} ->
+ Error
+ end.
+
+validate_unique_names(Validations0) ->
+ Validations = emqx_utils_maps:binary_key_map(Validations0),
+ do_validate_unique_names(Validations, #{}).
+
+do_validate_unique_names(_Validations = [], _Acc) ->
+ ok;
+do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(Name, Acc) ->
+ {error, <<"duplicated name: ", Name/binary>>};
+do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
+ do_validate_unique_names(Rest, Acc#{Name => true}).
diff --git a/apps/emqx_message_validation/src/emqx_message_validation_sup.erl b/apps/emqx_message_validation/src/emqx_message_validation_sup.erl
new file mode 100644
index 000000000..2e8d6b8c6
--- /dev/null
+++ b/apps/emqx_message_validation/src/emqx_message_validation_sup.erl
@@ -0,0 +1,47 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% `supervisor' API
+-export([init/1]).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%------------------------------------------------------------------------------
+%% `supervisor' API
+%%------------------------------------------------------------------------------
+
+init([]) ->
+ Registry = worker_spec(emqx_message_validation_registry),
+ Metrics = emqx_message_validation_registry:metrics_worker_spec(),
+ SupFlags = #{
+ strategy => one_for_one,
+ intensity => 10,
+ period => 10
+ },
+ ChildSpecs = [Metrics, Registry],
+ {ok, {SupFlags, ChildSpecs}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+worker_spec(Mod) ->
+ #{
+ id => Mod,
+ start => {Mod, start_link, []},
+ restart => permanent,
+ shutdown => 5_000,
+ type => worker
+ }.
diff --git a/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl
new file mode 100644
index 000000000..9c1289e88
--- /dev/null
+++ b/apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl
@@ -0,0 +1,690 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_http_api_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+ emqx_common_test_helpers:clear_screen(),
+ Apps = emqx_cth_suite:start(
+ lists:flatten(
+ [
+ emqx,
+ emqx_conf,
+ emqx_rule_engine,
+ emqx_message_validation,
+ emqx_management,
+ emqx_mgmt_api_test_util:emqx_dashboard(),
+ emqx_schema_registry
+ ]
+ ),
+ #{work_dir => emqx_cth_suite:work_dir(Config)}
+ ),
+ {ok, _} = emqx_common_test_http:create_default_app(),
+ [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+ Apps = ?config(apps, Config),
+ ok = emqx_cth_suite:stop(Apps),
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ clear_all_validations(),
+ emqx_common_test_helpers:call_janitor(),
+ ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+-define(assertIndexOrder(EXPECTED, TOPIC), assert_index_order(EXPECTED, TOPIC, #{line => ?LINE})).
+
+clear_all_validations() ->
+ lists:foreach(
+ fun(#{name := Name}) ->
+ {ok, _} = emqx_message_validation:delete(Name)
+ end,
+ emqx_message_validation:list()
+ ).
+
+maybe_json_decode(X) ->
+ case emqx_utils_json:safe_decode(X, [return_maps]) of
+ {ok, Decoded} -> Decoded;
+ {error, _} -> X
+ end.
+
+request(Method, Path, Params) ->
+ AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+ Opts = #{return_all => true},
+ case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
+ {ok, {Status, Headers, Body0}} ->
+ Body = maybe_json_decode(Body0),
+ {ok, {Status, Headers, Body}};
+ {error, {Status, Headers, Body0}} ->
+ Body =
+ case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+ {ok, Decoded0 = #{<<"message">> := Msg0}} ->
+ Msg = maybe_json_decode(Msg0),
+ Decoded0#{<<"message">> := Msg};
+ {ok, Decoded0} ->
+ Decoded0;
+ {error, _} ->
+ Body0
+ end,
+ {error, {Status, Headers, Body}};
+ Error ->
+ Error
+ end.
+
+validation(Name, Checks) ->
+ validation(Name, Checks, _Overrides = #{}).
+
+validation(Name, Checks, Overrides) ->
+ Default = #{
+ <<"tags">> => [<<"some">>, <<"tags">>],
+ <<"description">> => <<"my validation">>,
+ <<"enable">> => true,
+ <<"name">> => Name,
+ <<"topics">> => <<"t/+">>,
+ <<"strategy">> => <<"all_pass">>,
+ <<"failure_action">> => <<"drop">>,
+ <<"log_failure_at">> => <<"warning">>,
+ <<"checks">> => Checks
+ },
+ emqx_utils_maps:deep_merge(Default, Overrides).
+
+sql_check() ->
+ sql_check(<<"select * where true">>).
+
+sql_check(SQL) ->
+ #{
+ <<"type">> => <<"sql">>,
+ <<"sql">> => SQL
+ }.
+
+schema_check(Type, SerdeName) ->
+ schema_check(Type, SerdeName, _Overrides = #{}).
+
+schema_check(Type, SerdeName, Overrides) ->
+ emqx_utils_maps:deep_merge(
+ #{
+ <<"type">> => emqx_utils_conv:bin(Type),
+ <<"schema">> => SerdeName
+ },
+ Overrides
+ ).
+
+api_root() -> "message_validations".
+
+simplify_result(Res) ->
+ case Res of
+ {error, {{_, Status, _}, _, Body}} ->
+ {Status, Body};
+ {ok, {{_, Status, _}, _, Body}} ->
+ {Status, Body}
+ end.
+
+list() ->
+ Path = emqx_mgmt_api_test_util:api_path([api_root()]),
+ Res = request(get, Path, _Params = []),
+ ct:pal("list result:\n ~p", [Res]),
+ simplify_result(Res).
+
+lookup(Name) ->
+ Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]),
+ Res = request(get, Path, _Params = []),
+ ct:pal("lookup ~s result:\n ~p", [Name, Res]),
+ simplify_result(Res).
+
+insert(Params) ->
+ Path = emqx_mgmt_api_test_util:api_path([api_root()]),
+ Res = request(post, Path, Params),
+ ct:pal("insert result:\n ~p", [Res]),
+ simplify_result(Res).
+
+update(Params) ->
+ Path = emqx_mgmt_api_test_util:api_path([api_root()]),
+ Res = request(put, Path, Params),
+ ct:pal("update result:\n ~p", [Res]),
+ simplify_result(Res).
+
+delete(Name) ->
+ Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]),
+ Res = request(delete, Path, _Params = []),
+ ct:pal("delete result:\n ~p", [Res]),
+ simplify_result(Res).
+
+move(Name, Pos) ->
+ Path = emqx_mgmt_api_test_util:api_path([api_root(), Name, "move"]),
+ Res = request(post, Path, Pos),
+ ct:pal("move result:\n ~p", [Res]),
+ simplify_result(Res).
+
+connect(ClientId) ->
+ connect(ClientId, _IsPersistent = false).
+
+connect(ClientId, IsPersistent) ->
+ Properties = emqx_utils_maps:put_if(#{}, 'Session-Expiry-Interval', 30, IsPersistent),
+ {ok, Client} = emqtt:start_link([
+ {clean_start, true},
+ {clientid, ClientId},
+ {properties, Properties},
+ {proto_ver, v5}
+ ]),
+ {ok, _} = emqtt:connect(Client),
+ on_exit(fun() -> catch emqtt:stop(Client) end),
+ Client.
+
+publish(Client, Topic, Payload) ->
+ publish(Client, Topic, Payload, _QoS = 0).
+
+publish(Client, Topic, {raw, Payload}, QoS) ->
+ case emqtt:publish(Client, Topic, Payload, QoS) of
+ ok -> ok;
+ {ok, _} -> ok;
+ Err -> Err
+ end;
+publish(Client, Topic, Payload, QoS) ->
+ case emqtt:publish(Client, Topic, emqx_utils_json:encode(Payload), QoS) of
+ ok -> ok;
+ {ok, _} -> ok;
+ Err -> Err
+ end.
+
+json_valid_payloads() ->
+ [
+ #{i => 10, s => <<"s">>},
+ #{i => 10}
+ ].
+
+json_invalid_payloads() ->
+ [
+ #{i => <<"wrong type">>},
+ #{x => <<"unknown property">>}
+ ].
+
+json_create_serde(SerdeName) ->
+ Source = #{
+ type => object,
+ properties => #{
+ i => #{type => integer},
+ s => #{type => string}
+ },
+ required => [<<"i">>],
+ additionalProperties => false
+ },
+ Schema = #{type => json, source => emqx_utils_json:encode(Source)},
+ ok = emqx_schema_registry:add_schema(SerdeName, Schema),
+ on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
+ ok.
+
+avro_valid_payloads(SerdeName) ->
+ lists:map(
+ fun(Payload) -> emqx_schema_registry_serde:encode(SerdeName, Payload) end,
+ [
+ #{i => 10, s => <<"s">>},
+ #{i => 10}
+ ]
+ ).
+
+avro_invalid_payloads() ->
+ [
+ emqx_utils_json:encode(#{i => 10, s => <<"s">>}),
+ <<"">>
+ ].
+
+avro_create_serde(SerdeName) ->
+ Source = #{
+ type => record,
+ name => <<"test">>,
+ namespace => <<"emqx.com">>,
+ fields => [
+ #{name => <<"i">>, type => <<"int">>},
+ #{name => <<"s">>, type => [<<"null">>, <<"string">>], default => <<"null">>}
+ ]
+ },
+ Schema = #{type => avro, source => emqx_utils_json:encode(Source)},
+ ok = emqx_schema_registry:add_schema(SerdeName, Schema),
+ on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
+ ok.
+
+protobuf_valid_payloads(SerdeName, MessageName) ->
+ lists:map(
+ fun(Payload) -> emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageName]) end,
+ [
+ #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+ #{<<"name">> => <<"some name">>, <<"id">> => 10}
+ ]
+ ).
+
+protobuf_invalid_payloads() ->
+ [
+ emqx_utils_json:encode(#{name => <<"a">>, id => 10, email => <<"email">>}),
+ <<"not protobuf">>
+ ].
+
+protobuf_create_serde(SerdeName) ->
+ Source =
+ <<
+ "message Person {\n"
+ " required string name = 1;\n"
+ " required int32 id = 2;\n"
+ " optional string email = 3;\n"
+ " }\n"
+ "message UnionValue {\n"
+ " oneof u {\n"
+ " int32 a = 1;\n"
+ " string b = 2;\n"
+ " }\n"
+ "}"
+ >>,
+ Schema = #{type => protobuf, source => Source},
+ ok = emqx_schema_registry:add_schema(SerdeName, Schema),
+ on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
+ ok.
+
+%% Checks that the internal order in the registry/index matches expectation.
+assert_index_order(ExpectedOrder, Topic, Comment) ->
+ ?assertEqual(
+ ExpectedOrder,
+ [
+ N
+ || #{name := N} <- emqx_message_validation_registry:matching_validations(Topic)
+ ],
+ Comment
+ ).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+%% Smoke test where we have a single check and `all_pass' strategy.
+t_smoke_test(_Config) ->
+ Name1 = <<"foo">>,
+ Check1 = sql_check(<<"select payload.value as x where x > 15">>),
+ Validation1 = validation(Name1, [Check1]),
+ {201, _} = insert(Validation1),
+
+ lists:foreach(
+ fun({QoS, IsPersistent}) ->
+ ct:pal("qos = ~b, is persistent = ~p", [QoS, IsPersistent]),
+ C = connect(<<"c1">>, IsPersistent),
+ {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+ {200, _} = update(Validation1),
+
+ ok = publish(C, <<"t/1">>, #{value => 20}, QoS),
+ ?assertReceive({publish, _}),
+ ok = publish(C, <<"t/1">>, #{value => 10}, QoS),
+ ?assertNotReceive({publish, _}),
+ ok = publish(C, <<"t/1/a">>, #{value => 10}, QoS),
+ ?assertReceive({publish, _}),
+
+ %% test `disconnect' failure action
+ Validation2 = validation(Name1, [Check1], #{<<"failure_action">> => <<"disconnect">>}),
+ {200, _} = update(Validation2),
+
+ unlink(C),
+ PubRes = publish(C, <<"t/1">>, #{value => 0}, QoS),
+ case QoS =:= 0 of
+ true ->
+ ?assertMatch(ok, PubRes);
+ false ->
+ ?assertMatch(
+ {error, {disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}},
+ PubRes
+ )
+ end,
+ ?assertNotReceive({publish, _}),
+ ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}),
+
+ ok
+ end,
+ [
+ {QoS, IsPersistent}
+ || IsPersistent <- [false, true],
+ QoS <- [0, 1, 2]
+ ]
+ ),
+
+ ok.
+
+t_crud(_Config) ->
+ ?assertMatch({200, []}, list()),
+
+ Name1 = <<"foo">>,
+ Validation1 = validation(Name1, [sql_check()]),
+
+ ?assertMatch({201, #{<<"name">> := Name1}}, insert(Validation1)),
+ ?assertMatch({200, #{<<"name">> := Name1}}, lookup(Name1)),
+ ?assertMatch({200, [#{<<"name">> := Name1}]}, list()),
+ %% Duplicated name
+ ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, insert(Validation1)),
+
+ Name2 = <<"bar">>,
+ Validation2 = validation(Name2, [sql_check()]),
+ %% Not found
+ ?assertMatch({404, _}, update(Validation2)),
+ ?assertMatch({201, _}, insert(Validation2)),
+ ?assertMatch(
+ {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]},
+ list()
+ ),
+ ?assertMatch({200, #{<<"name">> := Name2}}, lookup(Name2)),
+ Validation1b = validation(Name1, [
+ sql_check(<<"select * where true">>),
+ sql_check(<<"select * where false">>)
+ ]),
+ ?assertMatch({200, _}, update(Validation1b)),
+ ?assertMatch({200, #{<<"checks">> := [_, _]}}, lookup(Name1)),
+ %% order is unchanged
+ ?assertMatch(
+ {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]},
+ list()
+ ),
+
+ ?assertMatch({204, _}, delete(Name1)),
+ ?assertMatch({404, _}, lookup(Name1)),
+ ?assertMatch({200, [#{<<"name">> := Name2}]}, list()),
+ ?assertMatch({404, _}, update(Validation1)),
+
+ ok.
+
+t_move(_Config) ->
+ lists:foreach(
+ fun(Pos) ->
+ ?assertMatch({404, _}, move(<<"nonexistent_validation">>, Pos))
+ end,
+ [
+ #{<<"position">> => <<"front">>},
+ #{<<"position">> => <<"rear">>},
+ #{<<"position">> => <<"after">>, <<"validation">> => <<"also_non_existent">>},
+ #{<<"position">> => <<"before">>, <<"validation">> => <<"also_non_existent">>}
+ ]
+ ),
+
+ Topic = <<"t">>,
+
+ Name1 = <<"foo">>,
+ Validation1 = validation(Name1, [sql_check()], #{<<"topics">> => Topic}),
+ {201, _} = insert(Validation1),
+
+ %% bogus positions
+ lists:foreach(
+ fun(Pos) ->
+ ?assertMatch(
+ {400, #{<<"message">> := #{<<"kind">> := <<"validation_error">>}}},
+ move(Name1, Pos)
+ )
+ end,
+ [
+ #{<<"position">> => <<"foo">>},
+ #{<<"position">> => <<"bar">>, <<"validation">> => Name1}
+ ]
+ ),
+
+ lists:foreach(
+ fun(Pos) ->
+ ?assertMatch({204, _}, move(Name1, Pos)),
+ ?assertMatch({200, [#{<<"name">> := Name1}]}, list())
+ end,
+ [
+ #{<<"position">> => <<"front">>},
+ #{<<"position">> => <<"rear">>}
+ ]
+ ),
+
+ lists:foreach(
+ fun(Pos) ->
+ ?assertMatch({400, _}, move(Name1, Pos))
+ end,
+ [
+ #{<<"position">> => <<"after">>, <<"validation">> => <<"nonexistent">>},
+ #{<<"position">> => <<"before">>, <<"validation">> => <<"nonexistent">>}
+ ]
+ ),
+
+ Name2 = <<"bar">>,
+ Validation2 = validation(Name2, [sql_check()], #{<<"topics">> => Topic}),
+ {201, _} = insert(Validation2),
+ ?assertMatch({200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]}, list()),
+ ?assertIndexOrder([Name1, Name2], Topic),
+
+ ?assertMatch({204, _}, move(Name1, #{<<"position">> => <<"rear">>})),
+ ?assertMatch({200, [#{<<"name">> := Name2}, #{<<"name">> := Name1}]}, list()),
+ ?assertIndexOrder([Name2, Name1], Topic),
+
+ ?assertMatch({204, _}, move(Name1, #{<<"position">> => <<"front">>})),
+ ?assertMatch({200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]}, list()),
+ ?assertIndexOrder([Name1, Name2], Topic),
+
+ Name3 = <<"baz">>,
+ Validation3 = validation(Name3, [sql_check()], #{<<"topics">> => Topic}),
+ {201, _} = insert(Validation3),
+ ?assertMatch(
+ {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}, #{<<"name">> := Name3}]},
+ list()
+ ),
+ ?assertIndexOrder([Name1, Name2, Name3], Topic),
+
+ ?assertMatch(
+ {204, _}, move(Name3, #{<<"position">> => <<"before">>, <<"validation">> => Name2})
+ ),
+ ?assertMatch(
+ {200, [#{<<"name">> := Name1}, #{<<"name">> := Name3}, #{<<"name">> := Name2}]},
+ list()
+ ),
+ ?assertIndexOrder([Name1, Name3, Name2], Topic),
+
+ ?assertMatch(
+ {204, _}, move(Name1, #{<<"position">> => <<"after">>, <<"validation">> => Name2})
+ ),
+ ?assertMatch(
+ {200, [#{<<"name">> := Name3}, #{<<"name">> := Name2}, #{<<"name">> := Name1}]},
+ list()
+ ),
+ ?assertIndexOrder([Name3, Name2, Name1], Topic),
+
+ ok.
+
+%% Check the `all_pass' strategy
+t_all_pass(_Config) ->
+ Name1 = <<"foo">>,
+ Check1 = sql_check(<<"select payload.x as x where x > 5">>),
+ Check2 = sql_check(<<"select payload.x as x where x > 10">>),
+ Validation1 = validation(Name1, [Check1, Check2], #{<<"strategy">> => <<"all_pass">>}),
+ {201, _} = insert(Validation1),
+
+ C = connect(<<"c1">>),
+ {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+ ok = publish(C, <<"t/1">>, #{x => 0}),
+ ?assertNotReceive({publish, _}),
+ ok = publish(C, <<"t/1">>, #{x => 7}),
+ ?assertNotReceive({publish, _}),
+ ok = publish(C, <<"t/1">>, #{x => 11}),
+ ?assertReceive({publish, _}),
+
+ ok.
+
+%% Check the `any_pass' strategy
+t_any_pass(_Config) ->
+ Name1 = <<"foo">>,
+ Check1 = sql_check(<<"select payload.x as x where x > 5">>),
+ Check2 = sql_check(<<"select payload.x as x where x > 10">>),
+ Validation1 = validation(Name1, [Check1, Check2], #{<<"strategy">> => <<"any_pass">>}),
+ {201, _} = insert(Validation1),
+
+ C = connect(<<"c1">>),
+ {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+ ok = publish(C, <<"t/1">>, #{x => 11}),
+ ?assertReceive({publish, _}),
+ ok = publish(C, <<"t/1">>, #{x => 7}),
+ ?assertReceive({publish, _}),
+ ok = publish(C, <<"t/1">>, #{x => 0}),
+ ?assertNotReceive({publish, _}),
+
+ ok.
+
+%% Checks that multiple validations are run in order.
+t_multiple_validations(_Config) ->
+ Name1 = <<"foo">>,
+ Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>),
+ Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}),
+ {201, _} = insert(Validation1),
+
+ Name2 = <<"bar">>,
+ Check2 = sql_check(<<"select payload.x as x where x > 5">>),
+ Validation2 = validation(Name2, [Check2], #{<<"failure_action">> => <<"disconnect">>}),
+ {201, _} = insert(Validation2),
+
+ C = connect(<<"c1">>),
+ {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+ ok = publish(C, <<"t/1">>, #{x => 11, y => 1}),
+ ?assertReceive({publish, _}),
+ ok = publish(C, <<"t/1">>, #{x => 7, y => 0}),
+ ?assertNotReceive({publish, _}),
+ ?assertNotReceive({disconnected, _, _}),
+ unlink(C),
+ ok = publish(C, <<"t/1">>, #{x => 0, y => 1}),
+ ?assertNotReceive({publish, _}),
+ ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}),
+
+ ok.
+
+t_schema_check_non_existent_serde(_Config) ->
+ SerdeName = <<"idontexist">>,
+ Name1 = <<"foo">>,
+ Check1 = schema_check(json, SerdeName),
+ Validation1 = validation(Name1, [Check1]),
+ {201, _} = insert(Validation1),
+
+ C = connect(<<"c1">>),
+ {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+ ok = publish(C, <<"t/1">>, #{i => 10, s => <<"s">>}),
+ ?assertNotReceive({publish, _}),
+
+ ok.
+
+t_schema_check_json(_Config) ->
+ SerdeName = <<"myserde">>,
+ json_create_serde(SerdeName),
+
+ Name1 = <<"foo">>,
+ Check1 = schema_check(json, SerdeName),
+ Validation1 = validation(Name1, [Check1]),
+ {201, _} = insert(Validation1),
+
+ C = connect(<<"c1">>),
+ {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+ lists:foreach(
+ fun(Payload) ->
+ ok = publish(C, <<"t/1">>, Payload),
+ ?assertReceive({publish, _})
+ end,
+ json_valid_payloads()
+ ),
+ lists:foreach(
+ fun(Payload) ->
+ ok = publish(C, <<"t/1">>, Payload),
+ ?assertNotReceive({publish, _})
+ end,
+ json_invalid_payloads()
+ ),
+
+ ok.
+
+t_schema_check_avro(_Config) ->
+ SerdeName = <<"myserde">>,
+ avro_create_serde(SerdeName),
+
+ Name1 = <<"foo">>,
+ Check1 = schema_check(avro, SerdeName),
+ Validation1 = validation(Name1, [Check1]),
+ {201, _} = insert(Validation1),
+
+ C = connect(<<"c1">>),
+ {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+ lists:foreach(
+ fun(Payload) ->
+ ok = publish(C, <<"t/1">>, {raw, Payload}),
+ ?assertReceive({publish, _})
+ end,
+ avro_valid_payloads(SerdeName)
+ ),
+ lists:foreach(
+ fun(Payload) ->
+ ok = publish(C, <<"t/1">>, {raw, Payload}),
+ ?assertNotReceive({publish, _})
+ end,
+ avro_invalid_payloads()
+ ),
+
+ ok.
+
+t_schema_check_protobuf(_Config) ->
+ SerdeName = <<"myserde">>,
+ MessageName = <<"Person">>,
+ protobuf_create_serde(SerdeName),
+
+ Name1 = <<"foo">>,
+ Check1 = schema_check(protobuf, SerdeName, #{<<"message_name">> => MessageName}),
+ Validation1 = validation(Name1, [Check1]),
+ {201, _} = insert(Validation1),
+
+ C = connect(<<"c1">>),
+ {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+ lists:foreach(
+ fun(Payload) ->
+ ok = publish(C, <<"t/1">>, {raw, Payload}),
+ ?assertReceive({publish, _})
+ end,
+ protobuf_valid_payloads(SerdeName, MessageName)
+ ),
+ lists:foreach(
+ fun(Payload) ->
+ ok = publish(C, <<"t/1">>, {raw, Payload}),
+ ?assertNotReceive({publish, _})
+ end,
+ protobuf_invalid_payloads()
+ ),
+
+ %% Bad config: unknown message name
+ Check2 = schema_check(protobuf, SerdeName, #{<<"message_name">> => <<"idontexist">>}),
+ Validation2 = validation(Name1, [Check2]),
+ {200, _} = update(Validation2),
+
+ lists:foreach(
+ fun(Payload) ->
+ ok = publish(C, <<"t/1">>, {raw, Payload}),
+ ?assertNotReceive({publish, _})
+ end,
+ protobuf_valid_payloads(SerdeName, MessageName)
+ ),
+
+ ok.
diff --git a/apps/emqx_message_validation/test/emqx_message_validation_tests.erl b/apps/emqx_message_validation/test/emqx_message_validation_tests.erl
new file mode 100644
index 000000000..118eaebbc
--- /dev/null
+++ b/apps/emqx_message_validation/test/emqx_message_validation_tests.erl
@@ -0,0 +1,194 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(VALIDATIONS_PATH, "message_validation.validations").
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+parse_and_check(InnerConfigs) ->
+ RootBin = <<"message_validation">>,
+ InnerBin = <<"validations">>,
+ RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
+ #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
+ emqx_message_validation_schema,
+ RawConf,
+ #{
+ required => false,
+ atom_key => false,
+ make_serializable => false
+ }
+ ),
+ Checked.
+
+validation(Name, Checks) ->
+ validation(Name, Checks, _Overrides = #{}).
+
+validation(Name, Checks, Overrides) ->
+ Default = #{
+ <<"tags">> => [<<"some">>, <<"tags">>],
+ <<"description">> => <<"my validation">>,
+ <<"enable">> => true,
+ <<"name">> => Name,
+ <<"topics">> => <<"t/+">>,
+ <<"strategy">> => <<"all_pass">>,
+ <<"failure_action">> => <<"drop">>,
+ <<"log_failure_at">> => <<"warning">>,
+ <<"checks">> => Checks
+ },
+ emqx_utils_maps:deep_merge(Default, Overrides).
+
+sql_check() ->
+ sql_check(<<"select * where true">>).
+
+sql_check(SQL) ->
+ #{
+ <<"type">> => <<"sql">>,
+ <<"sql">> => SQL
+ }.
+
+eval_sql(Message, SQL) ->
+ {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
+ emqx_message_validation:evaluate_sql_check(Check, Message).
+
+message() ->
+ message(_Opts = #{}).
+
+message(Opts) ->
+ Defaults = #{
+ id => emqx_guid:gen(),
+ qos => 0,
+ from => emqx_guid:to_hexstr(emqx_guid:gen()),
+ flags => #{retain => false},
+ headers => #{
+ proto_ver => v5,
+ properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
+ },
+ topic => <<"t/t">>,
+ payload => emqx_utils_json:encode(#{value => 10}),
+ timestamp => 1710272561615,
+ extra => []
+ },
+ emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+schema_test_() ->
+ [
+ {"topics is always a list 1",
+ ?_assertMatch(
+ [#{<<"topics">> := [<<"t/1">>]}],
+ parse_and_check([
+ validation(
+ <<"foo">>,
+ [sql_check()],
+ #{<<"topics">> => <<"t/1">>}
+ )
+ ])
+ )},
+ {"topics is always a list 2",
+ ?_assertMatch(
+ [#{<<"topics">> := [<<"t/1">>]}],
+ parse_and_check([
+ validation(
+ <<"foo">>,
+ [sql_check()],
+ #{<<"topics">> => [<<"t/1">>]}
+ )
+ ])
+ )},
+ {"foreach expression is not allowed",
+ ?_assertThrow(
+ {_Schema, [
+ #{
+ reason := foreach_not_allowed,
+ kind := validation_error
+ }
+ ]},
+ parse_and_check([
+ validation(
+ <<"foo">>,
+ [sql_check(<<"foreach foo as f where true">>)]
+ )
+ ])
+ )},
+ {"from clause is not allowed",
+ ?_assertThrow(
+ {_Schema, [
+ #{
+ reason := non_empty_from_clause,
+ kind := validation_error
+ }
+ ]},
+ parse_and_check([
+ validation(
+ <<"foo">>,
+ [sql_check(<<"select * from t">>)]
+ )
+ ])
+ )},
+ {"names are unique",
+ ?_assertThrow(
+ {_Schema, [
+ #{
+ reason := <<"duplicated name:", _/binary>>,
+ path := ?VALIDATIONS_PATH,
+ kind := validation_error
+ }
+ ]},
+ parse_and_check([
+ validation(<<"foo">>, [sql_check()]),
+ validation(<<"foo">>, [sql_check()])
+ ])
+ )},
+ {"checks must be non-empty",
+ ?_assertThrow(
+ {_Schema, [
+ #{
+ reason := "at least one check must be defined",
+ kind := validation_error
+ }
+ ]},
+ parse_and_check([
+ validation(
+ <<"foo">>,
+ []
+ )
+ ])
+ )},
+ {"bogus check type",
+ ?_assertThrow(
+ {_Schema, [
+ #{
+ expected := <<"sql", _/binary>>,
+ kind := validation_error,
+ field_name := type
+ }
+ ]},
+ parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
+ )}
+ ].
+
+check_test_() ->
+ [
+ {"denied by payload 1",
+ ?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
+ {"denied by payload 2",
+ ?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
+ {"allowed by payload 1",
+ ?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
+ {"allowed by payload 2",
+ ?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
+ {"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
+ {"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
+ {"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
+ {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
+ {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
+ ].
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
index 1cc67840b..373dd2622 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
@@ -51,12 +51,6 @@
get_rules_ordered_by_ts/0
]).
-%% exported for cluster_call
--export([
- do_delete_rule/1,
- do_insert_rule/1
-]).
-
-export([
load_hooks_for_rule/1,
unload_hooks_for_rule/1,
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl
index 6bc60b70d..899a85002 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl
@@ -41,4 +41,5 @@ stop(_State) ->
RulePath = [RuleEngine | _] = ?KEY_PATH,
emqx_conf:remove_handler(RulePath ++ ['?']),
emqx_conf:remove_handler([RuleEngine]),
- ok = emqx_rule_events:unload().
+ ok = emqx_rule_events:unload(),
+ ok.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
index 38c05ff3b..6a46e153b 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
@@ -38,7 +38,8 @@ namespace() -> rule_engine.
tags() ->
[<<"Rule Engine">>].
-roots() -> [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}].
+roots() ->
+ [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}].
fields("rule_engine") ->
rule_engine_settings() ++
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl
index 3e5e10c3a..e322d3ecd 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl
@@ -28,13 +28,25 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- Registry = #{
- id => emqx_rule_engine,
- start => {emqx_rule_engine, start_link, []},
- restart => permanent,
- shutdown => 5000,
- type => worker,
- modules => [emqx_rule_engine]
+ RuleEngineRegistry = worker_spec(emqx_rule_engine),
+ RuleEngineMetrics = emqx_metrics_worker:child_spec(rule_metrics),
+ SupFlags = #{
+ strategy => one_for_one,
+ intensity => 10,
+ period => 10
},
- Metrics = emqx_metrics_worker:child_spec(rule_metrics),
- {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
+ Children = [
+ RuleEngineRegistry,
+ RuleEngineMetrics
+ ],
+ {ok, {SupFlags, Children}}.
+
+worker_spec(Mod) ->
+ #{
+ id => Mod,
+ start => {Mod, start_link, []},
+ restart => permanent,
+ shutdown => 5_000,
+ type => worker,
+ modules => [Mod]
+ }.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
index fa845bc05..f51908772 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl
@@ -26,6 +26,9 @@
inc_action_metrics/2
]).
+%% Internal exports used by message validation
+-export([evaluate_select/3, clear_rule_payload/0]).
+
-import(
emqx_rule_maps,
[
@@ -129,27 +132,16 @@ do_apply_rule(
Columns,
Envs
) ->
- {Selected, Collection} = ?RAISE(
- select_and_collect(Fields, Columns),
- {select_and_collect_error, {EXCLASS, EXCPTION, ST}}
- ),
- ColumnsAndSelected = maps:merge(Columns, Selected),
- case
- ?RAISE(
- match_conditions(Conditions, ColumnsAndSelected),
- {match_conditions_error, {EXCLASS, EXCPTION, ST}}
- )
- of
- true ->
- Collection2 = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection),
- case Collection2 of
+ case evaluate_foreach(Fields, Columns, Conditions, InCase, DoEach) of
+ {ok, ColumnsAndSelected, FinalCollection} ->
+ case FinalCollection of
[] ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
_ ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
end,
NewEnvs = maps:merge(ColumnsAndSelected, Envs),
- {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]};
+ {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]};
false ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
{error, nomatch}
@@ -165,6 +157,16 @@ do_apply_rule(
Columns,
Envs
) ->
+ case evaluate_select(Fields, Columns, Conditions) of
+ {ok, Selected} ->
+ ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
+ {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
+ false ->
+ ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
+ {error, nomatch}
+ end.
+
+evaluate_select(Fields, Columns, Conditions) ->
Selected = ?RAISE(
select_and_transform(Fields, Columns),
{select_and_transform_error, {EXCLASS, EXCPTION, ST}}
@@ -176,11 +178,28 @@ do_apply_rule(
)
of
true ->
- ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
- {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
+ {ok, Selected};
false ->
- ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
- {error, nomatch}
+ false
+ end.
+
+evaluate_foreach(Fields, Columns, Conditions, InCase, DoEach) ->
+ {Selected, Collection} = ?RAISE(
+ select_and_collect(Fields, Columns),
+ {select_and_collect_error, {EXCLASS, EXCPTION, ST}}
+ ),
+ ColumnsAndSelected = maps:merge(Columns, Selected),
+ case
+ ?RAISE(
+ match_conditions(Conditions, ColumnsAndSelected),
+ {match_conditions_error, {EXCLASS, EXCPTION, ST}}
+ )
+ of
+ true ->
+ FinalCollection = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection),
+ {ok, ColumnsAndSelected, FinalCollection};
+ false ->
+ false
end.
clear_rule_payload() ->
@@ -281,6 +300,10 @@ match_conditions({'fun', {_, Name}, Args}, Data) ->
apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
compare(Op, eval(L, Data), eval(R, Data));
+match_conditions({const, true}, _Data) ->
+ true;
+match_conditions({const, false}, _Data) ->
+ false;
match_conditions({}, _Data) ->
true.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl
index f305d386b..8fbc80704 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl
@@ -18,7 +18,7 @@
-include("rule_engine.hrl").
--export([parse/1]).
+-export([parse/1, parse/2]).
-export([
select_fields/1,
@@ -62,37 +62,29 @@
-type field() :: const() | variable() | {as, field(), alias()} | sql_func().
+-type parse_opts() :: #{
+ %% Whether `from' clause should be mandatory.
+ %% Default: `true'.
+ with_from => boolean()
+}.
+
-export_type([select/0]).
%% Parse one select statement.
-spec parse(string() | binary()) -> {ok, select()} | {error, term()}.
parse(Sql) ->
- try
- case rulesql:parsetree(Sql) of
- {ok, {select, Clauses}} ->
- {ok, #select{
- is_foreach = false,
- fields = get_value(fields, Clauses),
- doeach = [],
- incase = {},
- from = get_value(from, Clauses),
- where = get_value(where, Clauses)
- }};
- {ok, {foreach, Clauses}} ->
- {ok, #select{
- is_foreach = true,
- fields = get_value(fields, Clauses),
- doeach = get_value(do, Clauses, []),
- incase = get_value(incase, Clauses, {}),
- from = get_value(from, Clauses),
- where = get_value(where, Clauses)
- }};
- Error ->
- {error, Error}
- end
- catch
- _Error:Reason:StackTrace ->
- {error, {Reason, StackTrace}}
+ parse(Sql, _Opts = #{}).
+
+-spec parse(string() | binary(), parse_opts()) -> {ok, select()} | {error, term()}.
+parse(Sql, Opts) ->
+ WithFrom = maps:get(with_from, Opts, true),
+ case do_parse(Sql) of
+ {ok, Parsed} when WithFrom ->
+ ensure_non_empty_from(Parsed);
+ {ok, Parsed} ->
+ ensure_empty_from(Parsed);
+ Error = {error, _} ->
+ Error
end.
-spec select_fields(select()) -> list(field()).
@@ -118,3 +110,45 @@ select_from(#select{from = From}) ->
-spec select_where(select()) -> tuple().
select_where(#select{where = Where}) ->
Where.
+
+-spec do_parse(string() | binary()) -> {ok, select()} | {error, term()}.
+do_parse(Sql) ->
+ try
+ case rulesql:parsetree(Sql) of
+ {ok, {select, Clauses}} ->
+ Parsed = #select{
+ is_foreach = false,
+ fields = get_value(fields, Clauses),
+ doeach = [],
+ incase = {},
+ from = get_value(from, Clauses),
+ where = get_value(where, Clauses)
+ },
+ {ok, Parsed};
+ {ok, {foreach, Clauses}} ->
+ Parsed = #select{
+ is_foreach = true,
+ fields = get_value(fields, Clauses),
+ doeach = get_value(do, Clauses, []),
+ incase = get_value(incase, Clauses, {}),
+ from = get_value(from, Clauses),
+ where = get_value(where, Clauses)
+ },
+ {ok, Parsed};
+ Error ->
+ {error, Error}
+ end
+ catch
+ _Error:Reason:StackTrace ->
+ {error, {Reason, StackTrace}}
+ end.
+
+ensure_non_empty_from(#select{from = []}) ->
+ {error, empty_from_clause};
+ensure_non_empty_from(Parsed) ->
+ {ok, Parsed}.
+
+ensure_empty_from(#select{from = [_ | _]}) ->
+ {error, non_empty_from_clause};
+ensure_empty_from(Parsed) ->
+ {ok, Parsed}.
diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl
index b31889ab4..d7fdad2cd 100644
--- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl
+++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl
@@ -27,12 +27,19 @@
%%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
init_per_suite(Config) ->
- application:load(emqx_conf),
- ConfigConf = <<"rule_engine {jq_function_default_timeout=10s}">>,
- ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf),
- Config.
+ Apps = emqx_cth_suite:start(
+ [
+ emqx,
+ emqx_conf,
+ {emqx_rule_engine, "rule_engine {jq_function_default_timeout=10s}"}
+ ],
+ #{work_dir => emqx_cth_suite:work_dir(Config)}
+ ),
+ [{apps, Apps} | Config].
-end_per_suite(_Config) ->
+end_per_suite(Config) ->
+ Apps = ?config(apps, Config),
+ emqx_cth_suite:stop(Apps),
ok.
eventmsg_publish(Msg) ->
diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl
index 0be489696..fb7424b4a 100644
--- a/apps/emqx_utils/src/emqx_utils.erl
+++ b/apps/emqx_utils/src/emqx_utils.erl
@@ -66,7 +66,8 @@
tcp_keepalive_opts/4,
format/1,
call_first_defined/1,
- ntoa/1
+ ntoa/1,
+ foldl_while/3
]).
-export([
@@ -176,6 +177,17 @@ pipeline([Fun | More], Input, State) ->
{error, Reason, NState} -> {error, Reason, NState}
end.
+-spec foldl_while(fun((X, Acc) -> {cont | halt, Acc}), Acc, [X]) -> Acc.
+foldl_while(_Fun, Acc, []) ->
+ Acc;
+foldl_while(Fun, Acc, [X | Xs]) ->
+ case Fun(X, Acc) of
+ {cont, NewAcc} ->
+ foldl_while(Fun, NewAcc, Xs);
+ {halt, NewAcc} ->
+ NewAcc
+ end.
+
-compile({inline, [apply_fun/3]}).
apply_fun(Fun, Input, State) ->
case erlang:fun_info(Fun, arity) of
diff --git a/apps/emqx_utils/src/emqx_utils_maps.erl b/apps/emqx_utils/src/emqx_utils_maps.erl
index 07e36607d..96051a722 100644
--- a/apps/emqx_utils/src/emqx_utils_maps.erl
+++ b/apps/emqx_utils/src/emqx_utils_maps.erl
@@ -158,7 +158,9 @@ deep_convert(Val, _, _Args) ->
unsafe_atom_key_map(Map) ->
convert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
--spec binary_key_map(map()) -> map().
+-spec binary_key_map
+ (map()) -> map();
+ (list()) -> list().
binary_key_map(Map) ->
deep_convert(
Map,
diff --git a/apps/emqx_utils/test/emqx_utils_tests.erl b/apps/emqx_utils/test/emqx_utils_tests.erl
index 26c4cbfca..fa4a29622 100644
--- a/apps/emqx_utils/test/emqx_utils_tests.erl
+++ b/apps/emqx_utils/test/emqx_utils_tests.erl
@@ -28,3 +28,37 @@ is_redacted_test_() ->
?_assert(emqx_utils:is_redacted(password, fun() -> <<"******">> end)),
?_assert(emqx_utils:is_redacted(password, emqx_secret:wrap(<<"******">>)))
].
+
+foldl_while_test_() ->
+ [
+ ?_assertEqual(
+ [3, 2, 1],
+ emqx_utils:foldl_while(fun(X, Acc) -> {cont, [X | Acc]} end, [], [1, 2, 3])
+ ),
+ ?_assertEqual(
+ [1],
+ emqx_utils:foldl_while(
+ fun
+ (X, Acc) when X == 2 ->
+ {halt, Acc};
+ (X, Acc) ->
+ {cont, [X | Acc]}
+ end,
+ [],
+ [1, 2, 3]
+ )
+ ),
+ ?_assertEqual(
+ finished,
+ emqx_utils:foldl_while(
+ fun
+ (X, _Acc) when X == 3 ->
+ {halt, finished};
+ (X, Acc) ->
+ {cont, [X | Acc]}
+ end,
+ [],
+ [1, 2, 3]
+ )
+ )
+ ].
diff --git a/changes/ce/feat-12711.en.md b/changes/ce/feat-12711.en.md
new file mode 100644
index 000000000..66349a1b9
--- /dev/null
+++ b/changes/ce/feat-12711.en.md
@@ -0,0 +1,3 @@
+Implemented message validation feature.
+
+With this feature, once validations are configured for certain topic filters, the configured checks are run against published messages and, if they are not accepted by a validation, the message is dropped and the client may be disconnected, depending on the configuration.
diff --git a/mix.exs b/mix.exs
index ecf8ed168..1e6e932db 100644
--- a/mix.exs
+++ b/mix.exs
@@ -65,7 +65,7 @@ defmodule EMQXUmbrella.MixProject do
# maybe forbid to fetch quicer
{:emqtt,
github: "emqx/emqtt", tag: "1.10.1", override: true, system_env: maybe_no_quic_env()},
- {:rulesql, github: "emqx/rulesql", tag: "0.1.8"},
+ {:rulesql, github: "emqx/rulesql", tag: "0.2.0"},
{:observer_cli, "1.7.1"},
{:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
{:telemetry, "1.1.0"},
@@ -184,6 +184,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_s3,
:emqx_bridge_s3,
:emqx_schema_registry,
+ :emqx_message_validation,
:emqx_enterprise,
:emqx_bridge_kinesis,
:emqx_bridge_azure_event_hub,
diff --git a/rebar.config b/rebar.config
index 262dff039..3f58749bf 100644
--- a/rebar.config
+++ b/rebar.config
@@ -91,7 +91,7 @@
{replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}},
- {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.8"}}},
+ {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.0"}}},
% NOTE: depends on recon 2.5.x
{observer_cli, "1.7.1"},
{system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}},
diff --git a/rebar.config.erl b/rebar.config.erl
index 1ca2dee09..a81d162a9 100644
--- a/rebar.config.erl
+++ b/rebar.config.erl
@@ -115,6 +115,7 @@ is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
+is_community_umbrella_app("apps/emqx_message_validation") -> false;
is_community_umbrella_app(_) -> true.
%% BUILD_WITHOUT_JQ
diff --git a/rel/i18n/emqx_message_validation_http_api.hocon b/rel/i18n/emqx_message_validation_http_api.hocon
new file mode 100644
index 000000000..d169e2f76
--- /dev/null
+++ b/rel/i18n/emqx_message_validation_http_api.hocon
@@ -0,0 +1,24 @@
+emqx_message_validation_http_api {
+
+ list_validations.desc:
+ """List validations"""
+
+ lookup_validation.desc:
+ """Lookup a validation"""
+
+ update_validation.desc:
+ """Update a validation"""
+
+ delete_validation.desc:
+ """Delete a validation"""
+
+ append_validation.desc:
+ """Append a new validation to the list of validations"""
+
+ move_validation.desc:
+ """Change the order of a validation in the list of validations"""
+
+ param_path_name.desc:
+ """Validation name"""
+
+}
diff --git a/rel/i18n/emqx_message_validation_schema.hocon b/rel/i18n/emqx_message_validation_schema.hocon
new file mode 100644
index 000000000..bf1524cc1
--- /dev/null
+++ b/rel/i18n/emqx_message_validation_schema.hocon
@@ -0,0 +1,88 @@
+emqx_message_validation_schema {
+
+ check_avro_type.desc:
+ """Avro schema check"""
+ check_avro_type.label:
+ """Avro schema check"""
+
+ check_avro_schema.desc:
+ """Schema name to use during check."""
+ check_avro_schema.label:
+ """Schema name"""
+
+ check_json_type.desc:
+ """JSON schema check"""
+ check_json_type.label:
+ """JSON schema check"""
+
+ check_json_schema.desc:
+ """Schema name to use during check."""
+ check_json_schema.label:
+ """Schema name"""
+
+ check_protobuf_type.desc:
+ """Protobuf schema check"""
+ check_protobuf_type.label:
+ """Protobuf schema check"""
+
+ check_protobuf_schema.desc:
+ """Schema name to use during check."""
+ check_protobuf_schema.label:
+ """Schema name"""
+
+ check_protobuf_message_name.desc:
+ """Message name to use during check."""
+ check_protobuf_message_name.label:
+ """Message name"""
+
+ check_sql_type.desc:
+ """Use rule-engine's SQL to validate the message. SQL here is the same as in rule-engine,
+ just with the different that the `FROM` clause must be omitted.
+ A SQL statement which yields any value is considered successfully validated, otherwise failed.
+ For example SELECT payload.foo + payload.bar as sum WHERE sum > 0
+ validates that the sum of field `foo` and `bar` is a positive value."""
+ check_sql_type.label:
+ """SQL schema check"""
+
+ check_sql_schema.desc:
+ """Schema name to use during check."""
+ check_sql_schema.label:
+ """Schema name"""
+
+ topics.desc:
+ """A single topic filter or list of topic filters that this validation should validate."""
+ topics.label:
+ """Topic filter(s)"""
+
+ name.desc:
+ """The name for this validation. Must be unique among all validations."""
+ name.desc:
+ """Name"""
+
+ strategy.desc:
+ """How the validation should consider the checks to be successful.
+
+ all_pass
: All checks will be evaluated and must pass.
+ any_pass
: Any passing check will suffice. Stops at the first success."""
+ strategy.desc:
+ """Strategy"""
+
+ failure_action.desc:
+ """How to proceed if the validation fails.
+
+ drop
: The offending message is simply dropped without further processing.
+ disconnect
: The message is not published, and the publishing client is disconnected."""
+ failure_action.label:
+ """Failure action"""
+
+ log_failure_at.desc:
+ """Log level at which failures will be logged."""
+ log_failure_at.label:
+ """Failure log level"""
+
+ checks.desc:
+ """Checks that will be performed during validation. They are evaluated in the same order as defined."""
+ checks.label:
+ """Checks"""
+
+}