Merge pull request #12711 from thalesmg/data-validation-m-20240311

feat: implement message validation
This commit is contained in:
Thales Macedo Garitezi 2024-03-19 11:12:28 -03:00 committed by GitHub
commit a689ae72e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 2748 additions and 81 deletions

View File

@ -25,6 +25,7 @@
-define(HP_AUTHN, 970).
-define(HP_AUTHZ, 960).
-define(HP_SYS_MSGS, 950).
-define(HP_MSG_VALIDATION, 945).
-define(HP_TOPIC_METRICS, 940).
-define(HP_RETAINER, 930).
-define(HP_AUTO_SUB, 920).

View File

@ -235,6 +235,12 @@ publish(Msg) when is_record(Msg, message) ->
_ = emqx_trace:publish(Msg),
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
#message{headers = #{should_disconnect := true}, topic = Topic} ->
?TRACE("MQTT", "msg_publish_not_allowed_disconnect", #{
message => emqx_message:to_log_map(Msg),
topic => Topic
}),
disconnect;
#message{headers = #{allow_publish := false}, topic = Topic} ->
?TRACE("MQTT", "msg_publish_not_allowed", #{
message => emqx_message:to_log_map(Msg),

View File

@ -702,14 +702,21 @@ packet_to_message(Packet, #channel{
do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
Result = emqx_broker:publish(Msg),
NChannel = ensure_quota(Result, Channel),
{ok, NChannel};
case Result of
disconnect ->
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
_ ->
NChannel = ensure_quota(Result, Channel),
{ok, NChannel}
end;
do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
PubRes = emqx_broker:publish(Msg),
RC = puback_reason_code(PacketId, Msg, PubRes),
case RC of
undefined ->
{ok, Channel};
disconnect ->
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
_Value ->
do_finish_publish(PacketId, PubRes, RC, Channel)
end;
@ -719,6 +726,8 @@ do_publish(
Channel = #channel{clientinfo = ClientInfo, session = Session}
) ->
case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
{ok, disconnect, _NSession} ->
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
{ok, PubRes, NSession} ->
RC = pubrec_reason_code(PubRes),
NChannel0 = Channel#channel{session = NSession},
@ -763,7 +772,9 @@ pubrec_reason_code([_ | _]) -> ?RC_SUCCESS.
puback_reason_code(PacketId, Msg, [] = PubRes) ->
emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS);
puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->
emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS).
emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS);
puback_reason_code(_PacketId, _Msg, disconnect) ->
disconnect.
-compile({inline, [after_message_acked/3]}).
after_message_acked(ClientInfo, Msg, PubAckProps) ->

View File

@ -258,11 +258,13 @@
-type deliver() :: {deliver, topic(), message()}.
-type delivery() :: #delivery{}.
-type deliver_result() :: ok | {ok, non_neg_integer()} | {error, term()}.
-type publish_result() :: [
{node(), topic(), deliver_result()}
| {share, topic(), deliver_result()}
| persisted
].
-type publish_result() ::
[
{node(), topic(), deliver_result()}
| {share, topic(), deliver_result()}
| persisted
]
| disconnect.
-type route() :: #route{}.
-type route_entry() :: {topic(), node()} | {topic, group()}.
-type command() :: #command{}.

View File

@ -380,6 +380,10 @@ default_appspec(emqx_dashboard, _SuiteOpts) ->
true = emqx_dashboard_listener:is_ready(infinity)
end
};
default_appspec(emqx_schema_registry, _SuiteOpts) ->
#{schema_mod => emqx_schema_registry_schema, config => #{}};
default_appspec(emqx_message_validation, _SuiteOpts) ->
#{schema_mod => emqx_message_validation_schema, config => #{}};
default_appspec(_, _) ->
#{}.

View File

@ -88,6 +88,7 @@
[
emqx_license,
emqx_enterprise,
emqx_message_validation,
emqx_bridge_kafka,
emqx_bridge_pulsar,
emqx_bridge_gcp_pubsub,

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2028-01-26
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,29 @@
# EMQX Message Validation
This application encapsulates the functionality to validate incoming or internally
triggered published payloads and take an action upon failure, which can be to just drop
the message without further processing, or to disconnect the offending client as well.
# Documentation
Refer to [Message
Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-validation.html)
for more information about the semantics and checks available.
# HTTP APIs
APIs are provided for validation management, which includes creating,
updating, looking up, deleting, listing validations.
Refer to [API Docs -
Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Validation)
for more detailed information.
# Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
# License
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -0,0 +1,16 @@
%% -*- mode: erlang -*-
{erl_opts, [
warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
warnings_as_errors,
debug_info
]}.
{deps, [
{emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_rule_engine, {path, "../emqx_rule_engine"}},
{emqx_schema_registry, {path, "../emqx_schema_registry"}}
]}.

View File

@ -0,0 +1,14 @@
{application, emqx_message_validation, [
{description, "EMQX Message Validation"},
{vsn, "0.1.0"},
{registered, [emqx_message_validation_sup, emqx_message_validation_registry]},
{mod, {emqx_message_validation_app, []}},
{applications, [
kernel,
stdlib
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,456 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_message_validation).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx/include/logger.hrl").
%% API
-export([
add_handler/0,
remove_handler/0,
load/0,
unload/0,
list/0,
reorder/1,
lookup/1,
insert/1,
update/1,
delete/1
]).
%% `emqx_hooks' API
-export([
register_hooks/0,
unregister_hooks/0,
on_message_publish/1
]).
%% `emqx_config_handler' API
-export([pre_config_update/3, post_config_update/5]).
%% Internal exports
-export([parse_sql_check/1]).
%% Internal functions; exported for tests
-export([
evaluate_sql_check/3
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(TRACE_TAG, "MESSAGE_VALIDATION").
-define(CONF_ROOT, message_validation).
-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
-type validation_name() :: binary().
-type validation() :: _TODO.
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec add_handler() -> ok.
add_handler() ->
ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
ok.
-spec remove_handler() -> ok.
remove_handler() ->
ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
ok.
load() ->
lists:foreach(fun insert/1, emqx:get_config(?VALIDATIONS_CONF_PATH, [])).
unload() ->
lists:foreach(fun delete/1, emqx:get_config(?VALIDATIONS_CONF_PATH, [])).
-spec list() -> [validation()].
list() ->
emqx:get_config(?VALIDATIONS_CONF_PATH, []).
-spec reorder([validation_name()]) ->
{ok, _} | {error, _}.
reorder(Order) ->
emqx:update_config(
?VALIDATIONS_CONF_PATH,
{reorder, Order},
#{override_to => cluster}
).
-spec lookup(validation_name()) -> {ok, validation()} | {error, not_found}.
lookup(Name) ->
Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
do_lookup(Name, Validations).
-spec insert(validation()) ->
{ok, _} | {error, _}.
insert(Validation) ->
emqx:update_config(
?VALIDATIONS_CONF_PATH,
{append, Validation},
#{override_to => cluster}
).
-spec update(validation()) ->
{ok, _} | {error, _}.
update(Validation) ->
emqx:update_config(
?VALIDATIONS_CONF_PATH,
{update, Validation},
#{override_to => cluster}
).
-spec delete(validation_name()) ->
{ok, _} | {error, _}.
delete(Name) ->
emqx:update_config(
?VALIDATIONS_CONF_PATH,
{delete, Name},
#{override_to => cluster}
).
%%------------------------------------------------------------------------------
%% Hooks
%%------------------------------------------------------------------------------
-spec register_hooks() -> ok.
register_hooks() ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_MSG_VALIDATION).
-spec unregister_hooks() -> ok.
unregister_hooks() ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
-spec on_message_publish(emqx_types:message()) ->
{ok, emqx_types:message()} | {stop, emqx_types:message()}.
on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
case emqx_message_validation_registry:matching_validations(Topic) of
[] ->
ok;
Validations ->
case run_validations(Validations, Message) of
ok ->
{ok, Message};
drop ->
{stop, Message#message{headers = Headers#{allow_publish => false}}};
disconnect ->
{stop, Message#message{
headers = Headers#{
allow_publish => false,
should_disconnect => true
}
}}
end
end.
%%------------------------------------------------------------------------------
%% `emqx_config_handler' API
%%------------------------------------------------------------------------------
pre_config_update(?VALIDATIONS_CONF_PATH, {append, Validation}, OldValidations) ->
Validations = OldValidations ++ [Validation],
{ok, Validations};
pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations) ->
replace(OldValidations, Validation);
pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
delete(OldValidations, Validation);
pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
reorder(OldValidations, Order).
post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
{Pos, Validation} = fetch_with_index(New, Name),
ok = emqx_message_validation_registry:insert(Pos, Validation),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
{_Pos, OldValidation} = fetch_with_index(Old, Name),
{Pos, NewValidation} = fetch_with_index(New, Name),
ok = emqx_message_validation_registry:update(OldValidation, Pos, NewValidation),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
{_Pos, Validation} = fetch_with_index(Old, Name),
ok = emqx_message_validation_registry:delete(Validation),
ok;
post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
ok = emqx_message_validation_registry:reindex_positions(New),
ok.
%%------------------------------------------------------------------------------
%% Internal exports
%%------------------------------------------------------------------------------
parse_sql_check(SQL) ->
case emqx_rule_sqlparser:parse(SQL, #{with_from => false}) of
{ok, Select} ->
case emqx_rule_sqlparser:select_is_foreach(Select) of
true ->
{error, foreach_not_allowed};
false ->
Check = #{
type => sql,
fields => emqx_rule_sqlparser:select_fields(Select),
conditions => emqx_rule_sqlparser:select_where(Select)
},
{ok, Check}
end;
Error = {error, _} ->
Error
end.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
evaluate_sql_check(Check, Validation, Message) ->
#{
fields := Fields,
conditions := Conditions
} = Check,
#{
name := Name,
log_failure := #{level := FailureLogLevel}
} = Validation,
{Data, _} = emqx_rule_events:eventmsg_publish(Message),
try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of
{ok, _} ->
true;
false ->
false
catch
throw:Reason ->
?TRACE(
FailureLogLevel,
?TRACE_TAG,
"validation_sql_check_throw",
#{
validation => Name,
reason => Reason
}
),
false;
Class:Error:Stacktrace ->
?TRACE(
FailureLogLevel,
?TRACE_TAG,
"validation_sql_check_failure",
#{
validation => Name,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}
),
false
end.
evaluate_schema_check(Check, Validation, #message{payload = Data}) ->
#{schema := SerdeName} = Check,
#{
name := Name,
log_failure := #{level := FailureLogLevel}
} = Validation,
ExtraArgs =
case Check of
#{type := protobuf, message_name := MessageName} ->
[MessageName];
_ ->
[]
end,
try
emqx_schema_registry_serde:schema_check(SerdeName, Data, ExtraArgs)
catch
error:{serde_not_found, _} ->
?TRACE(
FailureLogLevel,
?TRACE_TAG,
"validation_schema_check_schema_not_found",
#{
validation => Name,
schema_name => SerdeName
}
),
false;
Class:Error:Stacktrace ->
?TRACE(
FailureLogLevel,
?TRACE_TAG,
"validation_schema_check_failure",
#{
validation => Name,
schema_name => SerdeName,
kind => Class,
reason => Error,
stacktrace => Stacktrace
}
),
false
end.
replace(OldValidations, Validation = #{<<"name">> := Name}) ->
{Found, RevNewValidations} =
lists:foldl(
fun
(#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
{true, [Validation | Acc]};
(Val, {FoundIn, Acc}) ->
{FoundIn, [Val | Acc]}
end,
{false, []},
OldValidations
),
case Found of
true ->
{ok, lists:reverse(RevNewValidations)};
false ->
{error, not_found}
end.
delete(OldValidations, Name) ->
{Found, RevNewValidations} =
lists:foldl(
fun
(#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
{true, Acc};
(Val, {FoundIn, Acc}) ->
{FoundIn, [Val | Acc]}
end,
{false, []},
OldValidations
),
case Found of
true ->
{ok, lists:reverse(RevNewValidations)};
false ->
{error, not_found}
end.
reorder(Validations, Order) ->
Context = #{
not_found => sets:new([{version, 2}]),
duplicated => sets:new([{version, 2}]),
res => [],
seen => sets:new([{version, 2}])
},
reorder(Validations, Order, Context).
reorder(NotReordered, _Order = [], #{not_found := NotFound0, duplicated := Duplicated0, res := Res}) ->
NotFound = sets:to_list(NotFound0),
Duplicated = sets:to_list(Duplicated0),
case {NotReordered, NotFound, Duplicated} of
{[], [], []} ->
{ok, lists:reverse(Res)};
{_, _, _} ->
Error = #{
not_found => NotFound,
duplicated => Duplicated,
not_reordered => [N || #{<<"name">> := N} <- NotReordered]
},
{error, Error}
end;
reorder(RemainingValidations, [Name | Rest], Context0 = #{seen := Seen0}) ->
case sets:is_element(Name, Seen0) of
true ->
Context = maps:update_with(
duplicated, fun(S) -> sets:add_element(Name, S) end, Context0
),
reorder(RemainingValidations, Rest, Context);
false ->
case safe_take(Name, RemainingValidations) of
error ->
Context = maps:update_with(
not_found, fun(S) -> sets:add_element(Name, S) end, Context0
),
reorder(RemainingValidations, Rest, Context);
{ok, {Validation, Front, Rear}} ->
Context1 = maps:update_with(
seen, fun(S) -> sets:add_element(Name, S) end, Context0
),
Context = maps:update_with(res, fun(Vs) -> [Validation | Vs] end, Context1),
reorder(Front ++ Rear, Rest, Context)
end
end.
fetch_with_index([{Pos, #{name := Name} = Validation} | _Rest], Name) ->
{Pos, Validation};
fetch_with_index([{_, _} | Rest], Name) ->
fetch_with_index(Rest, Name);
fetch_with_index(Validations, Name) ->
fetch_with_index(lists:enumerate(Validations), Name).
safe_take(Name, Validations) ->
case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Validations) of
{_Front, []} ->
error;
{Front, [Found | Rear]} ->
{ok, {Found, Front, Rear}}
end.
do_lookup(_Name, _Validations = []) ->
{error, not_found};
do_lookup(Name, [#{name := Name} = Validation | _Rest]) ->
{ok, Validation};
do_lookup(Name, [_ | Rest]) ->
do_lookup(Name, Rest).
run_validations(Validations, Message) ->
try
emqx_rule_runtime:clear_rule_payload(),
Fun = fun(Validation, Acc) ->
#{
name := Name,
log_failure := #{level := FailureLogLevel}
} = Validation,
case run_validation(Validation, Message) of
ok ->
{cont, Acc};
FailureAction ->
?TRACE(
FailureLogLevel,
?TRACE_TAG,
"validation_failed",
#{validation => Name, action => FailureAction}
),
{halt, FailureAction}
end
end,
emqx_utils:foldl_while(Fun, _Passed = ok, Validations)
after
emqx_rule_runtime:clear_rule_payload()
end.
run_validation(#{strategy := all_pass} = Validation, Message) ->
#{
checks := Checks,
failure_action := FailureAction
} = Validation,
Fun = fun(Check, Acc) ->
case run_check(Check, Validation, Message) of
true -> {cont, Acc};
false -> {halt, FailureAction}
end
end,
emqx_utils:foldl_while(Fun, _Passed = ok, Checks);
run_validation(#{strategy := any_pass} = Validation, Message) ->
#{
checks := Checks,
failure_action := FailureAction
} = Validation,
case lists:any(fun(C) -> run_check(C, Validation, Message) end, Checks) of
true ->
ok;
false ->
FailureAction
end.
run_check(#{type := sql} = Check, Validation, Message) ->
evaluate_sql_check(Check, Validation, Message);
run_check(Check, Validation, Message) ->
evaluate_schema_check(Check, Validation, Message).

View File

@ -0,0 +1,32 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_message_validation_app).
-behaviour(application).
%% `application' API
-export([start/2, stop/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `application' API
%%------------------------------------------------------------------------------
-spec start(application:start_type(), term()) -> {ok, pid()}.
start(_Type, _Args) ->
{ok, Sup} = emqx_message_validation_sup:start_link(),
ok = emqx_message_validation:add_handler(),
ok = emqx_message_validation:register_hooks(),
ok = emqx_message_validation:load(),
{ok, Sup}.
-spec stop(term()) -> ok.
stop(_State) ->
ok = emqx_message_validation:unload(),
ok = emqx_message_validation:unregister_hooks(),
ok = emqx_message_validation:remove_handler(),
ok.

View File

@ -0,0 +1,347 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_message_validation_http_api).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
%% `minirest' and `minirest_trails' API
-export([
namespace/0,
api_spec/0,
fields/1,
paths/0,
schema/1
]).
%% `minirest' handlers
-export([
'/message_validations'/2,
'/message_validations/reorder'/2,
'/message_validations/validation/:name'/2
]).
%%-------------------------------------------------------------------------------------------------
%% Type definitions
%%-------------------------------------------------------------------------------------------------
-define(TAGS, [<<"Message Validation">>]).
%%-------------------------------------------------------------------------------------------------
%% `minirest' and `minirest_trails' API
%%-------------------------------------------------------------------------------------------------
namespace() -> "message_validation_http_api".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/message_validations",
"/message_validations/reorder",
"/message_validations/validation/:name"
].
schema("/message_validations") ->
#{
'operationId' => '/message_validations',
get => #{
tags => ?TAGS,
summary => <<"List validations">>,
description => ?DESC("list_validations"),
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
array(
emqx_message_validation_schema:api_schema(list)
),
#{
sample =>
#{value => example_return_list()}
}
)
}
},
post => #{
tags => ?TAGS,
summary => <<"Append a new validation">>,
description => ?DESC("append_validation"),
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_message_validation_schema:api_schema(post),
example_input_create()
),
responses =>
#{
201 =>
emqx_dashboard_swagger:schema_with_examples(
emqx_message_validation_schema:api_schema(post),
example_return_create()
),
400 => error_schema('ALREADY_EXISTS', "Validation already exists")
}
},
put => #{
tags => ?TAGS,
summary => <<"Update a validation">>,
description => ?DESC("update_validation"),
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_message_validation_schema:api_schema(put),
example_input_update()
),
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
emqx_message_validation_schema:api_schema(put),
example_return_update()
),
404 => error_schema('NOT_FOUND', "Validation not found"),
400 => error_schema('BAD_REQUEST', "Bad params")
}
}
};
schema("/message_validations/reorder") ->
#{
'operationId' => '/message_validations/reorder',
post => #{
tags => ?TAGS,
summary => <<"Reorder all validations">>,
description => ?DESC("reorder_validations"),
'requestBody' =>
emqx_dashboard_swagger:schema_with_examples(
ref(reorder),
example_input_reorder()
),
responses =>
#{
204 => <<"No Content">>,
400 => error_schema(
'BAD_REQUEST',
<<"Bad request">>,
[
{not_found, mk(array(binary()), #{desc => "Validations not found"})},
{not_reordered,
mk(array(binary()), #{desc => "Validations not referenced in input"})},
{duplicated,
mk(array(binary()), #{desc => "Duplicated validations in input"})}
]
)
}
}
};
schema("/message_validations/validation/:name") ->
#{
'operationId' => '/message_validations/validation/:name',
get => #{
tags => ?TAGS,
summary => <<"Lookup a validation">>,
description => ?DESC("lookup_validation"),
parameters => [param_path_name()],
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
array(
emqx_message_validation_schema:api_schema(lookup)
),
#{
sample =>
#{value => example_return_lookup()}
}
),
404 => error_schema('NOT_FOUND', "Validation not found")
}
},
delete => #{
tags => ?TAGS,
summary => <<"Delete a validation">>,
description => ?DESC("delete_validation"),
parameters => [param_path_name()],
responses =>
#{
204 => <<"Validation deleted">>,
404 => error_schema('NOT_FOUND', "Validation not found")
}
}
}.
param_path_name() ->
{name,
mk(
binary(),
#{
in => path,
required => true,
example => <<"my_validation">>,
desc => ?DESC("param_path_name")
}
)}.
fields(front) ->
[{position, mk(front, #{default => front, required => true, in => body})}];
fields(rear) ->
[{position, mk(rear, #{default => rear, required => true, in => body})}];
fields('after') ->
[
{position, mk('after', #{default => 'after', required => true, in => body})},
{validation, mk(binary(), #{required => true, in => body})}
];
fields(before) ->
[
{position, mk(before, #{default => before, required => true, in => body})},
{validation, mk(binary(), #{required => true, in => body})}
];
fields(reorder) ->
[
{order, mk(array(binary()), #{required => true, in => body})}
].
%%-------------------------------------------------------------------------------------------------
%% `minirest' handlers
%%-------------------------------------------------------------------------------------------------
'/message_validations'(get, _Params) ->
?OK(emqx_message_validation:list());
'/message_validations'(post, #{body := Params = #{<<"name">> := Name}}) ->
with_validation(
Name,
return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)),
fun() ->
case emqx_message_validation:insert(Params) of
{ok, _} ->
{ok, Res} = emqx_message_validation:lookup(Name),
{201, Res};
{error, Error} ->
?BAD_REQUEST(Error)
end
end
);
'/message_validations'(put, #{body := Params = #{<<"name">> := Name}}) ->
with_validation(
Name,
fun() ->
case emqx_message_validation:update(Params) of
{ok, _} ->
{ok, Res} = emqx_message_validation:lookup(Name),
{200, Res};
{error, Error} ->
?BAD_REQUEST(Error)
end
end,
not_found()
).
'/message_validations/validation/:name'(get, #{bindings := #{name := Name}}) ->
with_validation(
Name,
fun(Validation) -> ?OK(Validation) end,
not_found()
);
'/message_validations/validation/:name'(delete, #{bindings := #{name := Name}}) ->
with_validation(
Name,
fun() ->
case emqx_message_validation:delete(Name) of
{ok, _} ->
?NO_CONTENT;
{error, Error} ->
?BAD_REQUEST(Error)
end
end,
not_found()
).
'/message_validations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
do_reorder(Order).
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
ref(Struct) -> hoconsc:ref(?MODULE, Struct).
mk(Type, Opts) -> hoconsc:mk(Type, Opts).
array(Type) -> hoconsc:array(Type).
example_input_create() ->
%% TODO
#{}.
example_input_update() ->
%% TODO
#{}.
example_input_reorder() ->
%% TODO
#{}.
example_return_list() ->
%% TODO
[].
example_return_create() ->
%% TODO
#{}.
example_return_update() ->
%% TODO
#{}.
example_return_lookup() ->
%% TODO
#{}.
error_schema(Code, Message) ->
error_schema(Code, Message, _ExtraFields = []).
error_schema(Code, Message, ExtraFields) when is_atom(Code) ->
error_schema([Code], Message, ExtraFields);
error_schema(Codes, Message, ExtraFields) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message), ExtraFields);
error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(Message) ->
ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message).
do_reorder(Order) ->
case emqx_message_validation:reorder(Order) of
{ok, _} ->
?NO_CONTENT;
{error,
{pre_config_update, _HandlerMod, #{
not_found := NotFound,
duplicated := Duplicated,
not_reordered := NotReordered
}}} ->
Msg0 = ?ERROR_MSG('BAD_REQUEST', <<"Bad request">>),
Msg = Msg0#{
not_found => NotFound,
duplicated => Duplicated,
not_reordered => NotReordered
},
{400, Msg};
{error, Error} ->
?BAD_REQUEST(Error)
end.
with_validation(Name, FoundFn, NotFoundFn) ->
case emqx_message_validation:lookup(Name) of
{ok, Validation} ->
{arity, Arity} = erlang:fun_info(FoundFn, arity),
case Arity of
1 -> FoundFn(Validation);
0 -> FoundFn()
end;
{error, not_found} ->
NotFoundFn()
end.
return(Response) ->
fun() -> Response end.
not_found() ->
return(?NOT_FOUND(<<"Validation not found">>)).

View File

@ -0,0 +1,225 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_message_validation_registry).
-behaviour(gen_server).
%% API
-export([
lookup/1,
insert/2,
update/3,
delete/1,
reindex_positions/1,
matching_validations/1,
start_link/0,
metrics_worker_spec/0
]).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(VALIDATION_TOPIC_INDEX, emqx_message_validation_index).
-define(VALIDATION_TAB, emqx_message_validation_tab).
-define(METRIC_NAME, message_validation).
-define(METRICS, [
'matched',
'passed',
'failed'
]).
-define(RATE_METRICS, ['matched']).
-type validation_name() :: binary().
-type validation() :: _TODO.
-type position_index() :: pos_integer().
-record(reindex_positions, {validations :: [validation()]}).
-record(insert, {pos :: position_index(), validation :: validation()}).
-record(update, {old :: validation(), pos :: position_index(), new :: validation()}).
-record(delete, {validation :: validation()}).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec start_link() -> gen_server:start_ret().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec lookup(validation_name()) ->
{ok, validation()} | {error, not_found}.
lookup(Name) ->
case emqx_utils_ets:lookup_value(?VALIDATION_TAB, Name, undefined) of
undefined ->
{error, not_found};
Validation ->
{ok, Validation}
end.
-spec reindex_positions([validation()]) -> ok.
reindex_positions(Validations) ->
gen_server:call(?MODULE, #reindex_positions{validations = Validations}, infinity).
-spec insert(position_index(), validation()) -> ok.
insert(Pos, Validation) ->
gen_server:call(?MODULE, #insert{pos = Pos, validation = Validation}, infinity).
-spec update(validation(), position_index(), validation()) -> ok.
update(Old, Pos, New) ->
gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity).
-spec delete(validation()) -> ok.
delete(Validation) ->
gen_server:call(?MODULE, #delete{validation = Validation}, infinity).
%% @doc Returns a list of matching validation names, sorted by their configuration order.
-spec matching_validations(emqx_types:topic()) -> [validation()].
matching_validations(Topic) ->
Validations0 = [
{Pos, Validation}
|| M <- emqx_topic_index:matches(Topic, ?VALIDATION_TOPIC_INDEX, [unique]),
[Pos] <- [emqx_topic_index:get_record(M, ?VALIDATION_TOPIC_INDEX)],
{ok, Validation} <- [
lookup(emqx_topic_index:get_id(M))
]
],
Validations1 = lists:sort(fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Validations0),
lists:map(fun({_Pos, V}) -> V end, Validations1).
-spec metrics_worker_spec() -> supervisor:child_spec().
metrics_worker_spec() ->
emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME).
%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------
init(_) ->
create_tables(),
State = #{},
{ok, State}.
handle_call(#reindex_positions{validations = Validations}, _From, State) ->
do_reindex_positions(Validations),
{reply, ok, State};
handle_call(#insert{pos = Pos, validation = Validation}, _From, State) ->
do_insert(Pos, Validation),
{reply, ok, State};
handle_call(#update{old = OldValidation, pos = Pos, new = NewValidation}, _From, State) ->
ok = do_update(OldValidation, Pos, NewValidation),
{reply, ok, State};
handle_call(#delete{validation = Validation}, _From, State) ->
do_delete(Validation),
{reply, ok, State};
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
handle_cast(_Cast, State) ->
{noreply, State}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
create_tables() ->
_ = emqx_utils_ets:new(?VALIDATION_TOPIC_INDEX, [public, ordered_set, {read_concurrency, true}]),
_ = emqx_utils_ets:new(?VALIDATION_TAB, [public, ordered_set, {read_concurrency, true}]),
ok.
do_reindex_positions(Validations) ->
lists:foreach(
fun({Pos, Validation}) ->
#{
name := Name,
topics := Topics
} = Validation,
do_insert_into_tab(Name, Validation, Pos),
update_topic_index(Name, Pos, Topics)
end,
lists:enumerate(Validations)
).
do_insert(Pos, Validation) ->
#{
name := Name,
topics := Topics
} = Validation,
maybe_create_metrics(Name),
do_insert_into_tab(Name, Validation, Pos),
update_topic_index(Name, Pos, Topics),
ok.
do_update(OldValidation, Pos, NewValidation) ->
#{topics := OldTopics} = OldValidation,
#{
name := Name,
topics := NewTopics
} = NewValidation,
maybe_create_metrics(Name),
do_insert_into_tab(Name, NewValidation, Pos),
delete_topic_index(Name, OldTopics),
update_topic_index(Name, Pos, NewTopics),
ok.
do_delete(Validation) ->
#{
name := Name,
topics := Topics
} = Validation,
ets:delete(?VALIDATION_TAB, Name),
delete_topic_index(Name, Topics),
drop_metrics(Name),
ok.
do_insert_into_tab(Name, Validation0, Pos) ->
Validation = transform_validation(Validation0#{pos => Pos}),
ets:insert(?VALIDATION_TAB, {Name, Validation}),
ok.
maybe_create_metrics(Name) ->
case emqx_metrics_worker:has_metrics(?METRIC_NAME, Name) of
true ->
ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, Name);
false ->
ok = emqx_metrics_worker:create_metrics(?METRIC_NAME, Name, ?METRICS, ?RATE_METRICS)
end.
drop_metrics(Name) ->
ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, Name).
update_topic_index(Name, Pos, Topics) ->
lists:foreach(
fun(Topic) ->
true = emqx_topic_index:insert(Topic, Name, Pos, ?VALIDATION_TOPIC_INDEX)
end,
Topics
).
delete_topic_index(Name, Topics) ->
lists:foreach(
fun(Topic) ->
true = emqx_topic_index:delete(Topic, Name, ?VALIDATION_TOPIC_INDEX)
end,
Topics
).
transform_validation(Validation = #{checks := Checks}) ->
Validation#{checks := lists:map(fun transform_check/1, Checks)}.
transform_check(#{type := sql, sql := SQL}) ->
{ok, Check} = emqx_message_validation:parse_sql_check(SQL),
Check;
transform_check(Check) ->
Check.

View File

@ -0,0 +1,234 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_message_validation_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1
]).
%% `minirest_trails' API
-export([
api_schema/1
]).
-export([validate_name/1]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `hocon_schema' API
%%------------------------------------------------------------------------------
namespace() -> message_validation.
roots() ->
[{message_validation, mk(ref(message_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
fields(message_validation) ->
[
{validations,
mk(
hoconsc:array(ref(validation)),
#{
default => [],
desc => ?DESC("validations"),
validator => fun validate_unique_names/1
}
)}
];
fields(validation) ->
[
{tags, emqx_schema:tags_schema()},
{description, emqx_schema:description_schema()},
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{name,
mk(
binary(),
#{
required => true,
validator => fun validate_name/1,
desc => ?DESC("name")
}
)},
{topics,
mk(
hoconsc:union([binary(), hoconsc:array(binary())]),
#{
desc => ?DESC("topics"),
converter => fun ensure_array/2,
required => true
}
)},
{strategy,
mk(
hoconsc:enum([any_pass, all_pass]),
#{desc => ?DESC("strategy"), required => true}
)},
{failure_action,
mk(
hoconsc:enum([drop, disconnect]),
#{desc => ?DESC("failure_action"), required => true}
)},
{log_failure,
mk(
ref(log_failure),
#{desc => ?DESC("log_failure_at"), default => #{}}
)},
{checks,
mk(
hoconsc:array(
hoconsc:union(fun checks_union_member_selector/1)
),
#{
required => true,
desc => ?DESC("checks"),
validator => fun
([]) ->
{error, "at least one check must be defined"};
(_) ->
ok
end
}
)}
];
fields(log_failure) ->
[
{level,
mk(
hoconsc:enum([error, warning, notice, info, debug]),
#{desc => ?DESC("log_failure_at"), default => info}
)}
];
fields(check_sql) ->
[
{type, mk(sql, #{default => sql, desc => ?DESC("check_sql_type")})},
{sql,
mk(binary(), #{
required => true,
desc => ?DESC("check_sql_type"),
validator => fun validate_sql/1
})}
];
fields(check_json) ->
[
{type, mk(json, #{default => json, desc => ?DESC("check_json_type")})},
{schema, mk(binary(), #{required => true, desc => ?DESC("check_json_type")})}
];
fields(check_protobuf) ->
[
{type, mk(protobuf, #{default => protobuf, desc => ?DESC("check_protobuf_type")})},
{schema, mk(binary(), #{required => true, desc => ?DESC("check_protobuf_schema")})},
{message_name,
mk(binary(), #{required => true, desc => ?DESC("check_protobuf_message_name")})}
];
fields(check_avro) ->
[
{type, mk(avro, #{default => avro, desc => ?DESC("check_avro_type")})},
{schema, mk(binary(), #{required => true, desc => ?DESC("check_avro_schema")})}
].
checks_union_member_selector(all_union_members) ->
checks_refs();
checks_union_member_selector({value, V}) ->
checks_refs(V).
checks_refs() ->
[ref(CheckType) || CheckType <- check_types()].
check_types() ->
[
check_sql,
check_json,
check_avro,
check_protobuf
].
checks_refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
checks_refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
checks_refs(#{<<"type">> := <<"sql">>}) ->
[ref(check_sql)];
checks_refs(#{<<"type">> := <<"json">>}) ->
[ref(check_json)];
checks_refs(#{<<"type">> := <<"avro">>}) ->
[ref(check_avro)];
checks_refs(#{<<"type">> := <<"protobuf">>}) ->
[ref(check_protobuf)];
checks_refs(_Value) ->
Expected = lists:join(
" | ",
[
Name
|| T <- check_types(),
"check_" ++ Name <- [atom_to_list(T)]
]
),
throw(#{
field_name => type,
expected => iolist_to_binary(Expected)
}).
%%------------------------------------------------------------------------------
%% `minirest_trails' API
%%------------------------------------------------------------------------------
api_schema(list) ->
hoconsc:array(ref(validation));
api_schema(lookup) ->
ref(validation);
api_schema(post) ->
ref(validation);
api_schema(put) ->
ref(validation).
%%------------------------------------------------------------------------------
%% Internal exports
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Name) -> hoconsc:ref(?MODULE, Name).
ensure_array(undefined, _) -> undefined;
ensure_array(L, _) when is_list(L) -> L;
ensure_array(B, _) -> [B].
validate_name(Name) ->
%% see `MAP_KEY_RE' in hocon_tconf
RE = <<"^[A-Za-z0-9]+[A-Za-z0-9-_]*$">>,
case re:run(Name, RE, [{capture, none}]) of
match ->
ok;
nomatch ->
{error, <<"must conform to regex: ", RE/binary>>}
end.
validate_sql(SQL) ->
case emqx_message_validation:parse_sql_check(SQL) of
{ok, _Parsed} ->
ok;
Error = {error, _} ->
Error
end.
validate_unique_names(Validations0) ->
Validations = emqx_utils_maps:binary_key_map(Validations0),
do_validate_unique_names(Validations, #{}).
do_validate_unique_names(_Validations = [], _Acc) ->
ok;
do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(Name, Acc) ->
{error, <<"duplicated name: ", Name/binary>>};
do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
do_validate_unique_names(Rest, Acc#{Name => true}).

View File

@ -0,0 +1,47 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_message_validation_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% `supervisor' API
-export([init/1]).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%------------------------------------------------------------------------------
%% `supervisor' API
%%------------------------------------------------------------------------------
init([]) ->
Registry = worker_spec(emqx_message_validation_registry),
Metrics = emqx_message_validation_registry:metrics_worker_spec(),
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 10
},
ChildSpecs = [Metrics, Registry],
{ok, {SupFlags, ChildSpecs}}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
worker_spec(Mod) ->
#{
id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5_000,
type => worker
}.

View File

@ -0,0 +1,691 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_message_validation_http_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Apps = emqx_cth_suite:start(
lists:flatten(
[
emqx,
emqx_conf,
emqx_rule_engine,
emqx_message_validation,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard(),
emqx_schema_registry
]
),
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config].
end_per_suite(Config) ->
Apps = ?config(apps, Config),
ok = emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
clear_all_validations(),
emqx_common_test_helpers:call_janitor(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
-define(assertIndexOrder(EXPECTED, TOPIC), assert_index_order(EXPECTED, TOPIC, #{line => ?LINE})).
clear_all_validations() ->
lists:foreach(
fun(#{name := Name}) ->
{ok, _} = emqx_message_validation:delete(Name)
end,
emqx_message_validation:list()
).
maybe_json_decode(X) ->
case emqx_utils_json:safe_decode(X, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> X
end.
request(Method, Path, Params) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
Body = maybe_json_decode(Body0),
{ok, {Status, Headers, Body}};
{error, {Status, Headers, Body0}} ->
Body =
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
{ok, Decoded0 = #{<<"message">> := Msg0}} ->
Msg = maybe_json_decode(Msg0),
Decoded0#{<<"message">> := Msg};
{ok, Decoded0} ->
Decoded0;
{error, _} ->
Body0
end,
{error, {Status, Headers, Body}};
Error ->
Error
end.
validation(Name, Checks) ->
validation(Name, Checks, _Overrides = #{}).
validation(Name, Checks, Overrides) ->
Default = #{
<<"tags">> => [<<"some">>, <<"tags">>],
<<"description">> => <<"my validation">>,
<<"enable">> => true,
<<"name">> => Name,
<<"topics">> => <<"t/+">>,
<<"strategy">> => <<"all_pass">>,
<<"failure_action">> => <<"drop">>,
<<"log_failure">> => #{<<"level">> => <<"warning">>},
<<"checks">> => Checks
},
emqx_utils_maps:deep_merge(Default, Overrides).
sql_check() ->
sql_check(<<"select * where true">>).
sql_check(SQL) ->
#{
<<"type">> => <<"sql">>,
<<"sql">> => SQL
}.
schema_check(Type, SerdeName) ->
schema_check(Type, SerdeName, _Overrides = #{}).
schema_check(Type, SerdeName, Overrides) ->
emqx_utils_maps:deep_merge(
#{
<<"type">> => emqx_utils_conv:bin(Type),
<<"schema">> => SerdeName
},
Overrides
).
api_root() -> "message_validations".
simplify_result(Res) ->
case Res of
{error, {{_, Status, _}, _, Body}} ->
{Status, Body};
{ok, {{_, Status, _}, _, Body}} ->
{Status, Body}
end.
list() ->
Path = emqx_mgmt_api_test_util:api_path([api_root()]),
Res = request(get, Path, _Params = []),
ct:pal("list result:\n ~p", [Res]),
simplify_result(Res).
lookup(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name]),
Res = request(get, Path, _Params = []),
ct:pal("lookup ~s result:\n ~p", [Name, Res]),
simplify_result(Res).
insert(Params) ->
Path = emqx_mgmt_api_test_util:api_path([api_root()]),
Res = request(post, Path, Params),
ct:pal("insert result:\n ~p", [Res]),
simplify_result(Res).
update(Params) ->
Path = emqx_mgmt_api_test_util:api_path([api_root()]),
Res = request(put, Path, Params),
ct:pal("update result:\n ~p", [Res]),
simplify_result(Res).
delete(Name) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), "validation", Name]),
Res = request(delete, Path, _Params = []),
ct:pal("delete result:\n ~p", [Res]),
simplify_result(Res).
reorder(Order) ->
Path = emqx_mgmt_api_test_util:api_path([api_root(), "reorder"]),
Params = #{<<"order">> => Order},
Res = request(post, Path, Params),
ct:pal("reorder result:\n ~p", [Res]),
simplify_result(Res).
connect(ClientId) ->
connect(ClientId, _IsPersistent = false).
connect(ClientId, IsPersistent) ->
Properties = emqx_utils_maps:put_if(#{}, 'Session-Expiry-Interval', 30, IsPersistent),
{ok, Client} = emqtt:start_link([
{clean_start, true},
{clientid, ClientId},
{properties, Properties},
{proto_ver, v5}
]),
{ok, _} = emqtt:connect(Client),
on_exit(fun() -> catch emqtt:stop(Client) end),
Client.
publish(Client, Topic, Payload) ->
publish(Client, Topic, Payload, _QoS = 0).
publish(Client, Topic, {raw, Payload}, QoS) ->
case emqtt:publish(Client, Topic, Payload, QoS) of
ok -> ok;
{ok, _} -> ok;
Err -> Err
end;
publish(Client, Topic, Payload, QoS) ->
case emqtt:publish(Client, Topic, emqx_utils_json:encode(Payload), QoS) of
ok -> ok;
{ok, _} -> ok;
Err -> Err
end.
json_valid_payloads() ->
[
#{i => 10, s => <<"s">>},
#{i => 10}
].
json_invalid_payloads() ->
[
#{i => <<"wrong type">>},
#{x => <<"unknown property">>}
].
json_create_serde(SerdeName) ->
Source = #{
type => object,
properties => #{
i => #{type => integer},
s => #{type => string}
},
required => [<<"i">>],
additionalProperties => false
},
Schema = #{type => json, source => emqx_utils_json:encode(Source)},
ok = emqx_schema_registry:add_schema(SerdeName, Schema),
on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
ok.
avro_valid_payloads(SerdeName) ->
lists:map(
fun(Payload) -> emqx_schema_registry_serde:encode(SerdeName, Payload) end,
[
#{i => 10, s => <<"s">>},
#{i => 10}
]
).
avro_invalid_payloads() ->
[
emqx_utils_json:encode(#{i => 10, s => <<"s">>}),
<<"">>
].
avro_create_serde(SerdeName) ->
Source = #{
type => record,
name => <<"test">>,
namespace => <<"emqx.com">>,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => [<<"null">>, <<"string">>], default => <<"null">>}
]
},
Schema = #{type => avro, source => emqx_utils_json:encode(Source)},
ok = emqx_schema_registry:add_schema(SerdeName, Schema),
on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
ok.
protobuf_valid_payloads(SerdeName, MessageName) ->
lists:map(
fun(Payload) -> emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageName]) end,
[
#{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
#{<<"name">> => <<"some name">>, <<"id">> => 10}
]
).
protobuf_invalid_payloads() ->
[
emqx_utils_json:encode(#{name => <<"a">>, id => 10, email => <<"email">>}),
<<"not protobuf">>
].
protobuf_create_serde(SerdeName) ->
Source =
<<
"message Person {\n"
" required string name = 1;\n"
" required int32 id = 2;\n"
" optional string email = 3;\n"
" }\n"
"message UnionValue {\n"
" oneof u {\n"
" int32 a = 1;\n"
" string b = 2;\n"
" }\n"
"}"
>>,
Schema = #{type => protobuf, source => Source},
ok = emqx_schema_registry:add_schema(SerdeName, Schema),
on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
ok.
%% Checks that the internal order in the registry/index matches expectation.
assert_index_order(ExpectedOrder, Topic, Comment) ->
?assertEqual(
ExpectedOrder,
[
N
|| #{name := N} <- emqx_message_validation_registry:matching_validations(Topic)
],
Comment
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
%% Smoke test where we have a single check and `all_pass' strategy.
t_smoke_test(_Config) ->
Name1 = <<"foo">>,
Check1 = sql_check(<<"select payload.value as x where x > 15">>),
Validation1 = validation(Name1, [Check1]),
{201, _} = insert(Validation1),
lists:foreach(
fun({QoS, IsPersistent}) ->
ct:pal("qos = ~b, is persistent = ~p", [QoS, IsPersistent]),
C = connect(<<"c1">>, IsPersistent),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
{200, _} = update(Validation1),
ok = publish(C, <<"t/1">>, #{value => 20}, QoS),
?assertReceive({publish, _}),
ok = publish(C, <<"t/1">>, #{value => 10}, QoS),
?assertNotReceive({publish, _}),
ok = publish(C, <<"t/1/a">>, #{value => 10}, QoS),
?assertReceive({publish, _}),
%% test `disconnect' failure action
Validation2 = validation(Name1, [Check1], #{<<"failure_action">> => <<"disconnect">>}),
{200, _} = update(Validation2),
unlink(C),
PubRes = publish(C, <<"t/1">>, #{value => 0}, QoS),
case QoS =:= 0 of
true ->
?assertMatch(ok, PubRes);
false ->
?assertMatch(
{error, {disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}},
PubRes
)
end,
?assertNotReceive({publish, _}),
?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}),
ok
end,
[
{QoS, IsPersistent}
|| IsPersistent <- [false, true],
QoS <- [0, 1, 2]
]
),
ok.
t_crud(_Config) ->
?assertMatch({200, []}, list()),
Name1 = <<"foo">>,
Validation1 = validation(Name1, [sql_check()]),
?assertMatch({201, #{<<"name">> := Name1}}, insert(Validation1)),
?assertMatch({200, #{<<"name">> := Name1}}, lookup(Name1)),
?assertMatch({200, [#{<<"name">> := Name1}]}, list()),
%% Duplicated name
?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, insert(Validation1)),
Name2 = <<"bar">>,
Validation2 = validation(Name2, [sql_check()]),
%% Not found
?assertMatch({404, _}, update(Validation2)),
?assertMatch({201, _}, insert(Validation2)),
?assertMatch(
{200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]},
list()
),
?assertMatch({200, #{<<"name">> := Name2}}, lookup(Name2)),
Validation1b = validation(Name1, [
sql_check(<<"select * where true">>),
sql_check(<<"select * where false">>)
]),
?assertMatch({200, _}, update(Validation1b)),
?assertMatch({200, #{<<"checks">> := [_, _]}}, lookup(Name1)),
%% order is unchanged
?assertMatch(
{200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]},
list()
),
?assertMatch({204, _}, delete(Name1)),
?assertMatch({404, _}, lookup(Name1)),
?assertMatch({200, [#{<<"name">> := Name2}]}, list()),
?assertMatch({404, _}, update(Validation1)),
ok.
%% test the "reorder" API
t_reorder(_Config) ->
%% no validations to reorder
?assertMatch({204, _}, reorder([])),
%% unknown validation
?assertMatch(
{400, #{<<"not_found">> := [<<"nonexistent">>]}},
reorder([<<"nonexistent">>])
),
Topic = <<"t">>,
Name1 = <<"foo">>,
Validation1 = validation(Name1, [sql_check()], #{<<"topics">> => Topic}),
{201, _} = insert(Validation1),
%% unknown validation
?assertMatch(
{400, #{
%% Note: minirest currently encodes empty lists as a "[]" string...
<<"duplicated">> := "[]",
<<"not_found">> := [<<"nonexistent">>],
<<"not_reordered">> := [Name1]
}},
reorder([<<"nonexistent">>])
),
%% repeated validations
?assertMatch(
{400, #{
<<"not_found">> := "[]",
<<"duplicated">> := [Name1],
<<"not_reordered">> := "[]"
}},
reorder([Name1, Name1])
),
%% mixed known, unknown and repeated validations
?assertMatch(
{400, #{
<<"not_found">> := [<<"nonexistent">>],
<<"duplicated">> := [Name1],
%% Note: minirest currently encodes empty lists as a "[]" string...
<<"not_reordered">> := "[]"
}},
reorder([Name1, <<"nonexistent">>, <<"nonexistent">>, Name1])
),
?assertMatch({204, _}, reorder([Name1])),
?assertMatch({200, [#{<<"name">> := Name1}]}, list()),
?assertIndexOrder([Name1], Topic),
Name2 = <<"bar">>,
Validation2 = validation(Name2, [sql_check()], #{<<"topics">> => Topic}),
{201, _} = insert(Validation2),
Name3 = <<"baz">>,
Validation3 = validation(Name3, [sql_check()], #{<<"topics">> => Topic}),
{201, _} = insert(Validation3),
?assertMatch(
{200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}, #{<<"name">> := Name3}]},
list()
),
?assertIndexOrder([Name1, Name2, Name3], Topic),
%% Doesn't mention all validations
?assertMatch(
{400, #{
%% Note: minirest currently encodes empty lists as a "[]" string...
<<"not_found">> := "[]",
<<"not_reordered">> := [_, _]
}},
reorder([Name1])
),
?assertMatch(
{200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}, #{<<"name">> := Name3}]},
list()
),
?assertIndexOrder([Name1, Name2, Name3], Topic),
?assertMatch({204, _}, reorder([Name3, Name2, Name1])),
?assertMatch(
{200, [#{<<"name">> := Name3}, #{<<"name">> := Name2}, #{<<"name">> := Name1}]},
list()
),
?assertIndexOrder([Name3, Name2, Name1], Topic),
?assertMatch({204, _}, reorder([Name1, Name3, Name2])),
?assertMatch(
{200, [#{<<"name">> := Name1}, #{<<"name">> := Name3}, #{<<"name">> := Name2}]},
list()
),
?assertIndexOrder([Name1, Name3, Name2], Topic),
ok.
%% Check the `all_pass' strategy
t_all_pass(_Config) ->
Name1 = <<"foo">>,
Check1 = sql_check(<<"select payload.x as x where x > 5">>),
Check2 = sql_check(<<"select payload.x as x where x > 10">>),
Validation1 = validation(Name1, [Check1, Check2], #{<<"strategy">> => <<"all_pass">>}),
{201, _} = insert(Validation1),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
ok = publish(C, <<"t/1">>, #{x => 0}),
?assertNotReceive({publish, _}),
ok = publish(C, <<"t/1">>, #{x => 7}),
?assertNotReceive({publish, _}),
ok = publish(C, <<"t/1">>, #{x => 11}),
?assertReceive({publish, _}),
ok.
%% Check the `any_pass' strategy
t_any_pass(_Config) ->
Name1 = <<"foo">>,
Check1 = sql_check(<<"select payload.x as x where x > 5">>),
Check2 = sql_check(<<"select payload.x as x where x > 10">>),
Validation1 = validation(Name1, [Check1, Check2], #{<<"strategy">> => <<"any_pass">>}),
{201, _} = insert(Validation1),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
ok = publish(C, <<"t/1">>, #{x => 11}),
?assertReceive({publish, _}),
ok = publish(C, <<"t/1">>, #{x => 7}),
?assertReceive({publish, _}),
ok = publish(C, <<"t/1">>, #{x => 0}),
?assertNotReceive({publish, _}),
ok.
%% Checks that multiple validations are run in order.
t_multiple_validations(_Config) ->
Name1 = <<"foo">>,
Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>),
Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}),
{201, _} = insert(Validation1),
Name2 = <<"bar">>,
Check2 = sql_check(<<"select payload.x as x where x > 5">>),
Validation2 = validation(Name2, [Check2], #{<<"failure_action">> => <<"disconnect">>}),
{201, _} = insert(Validation2),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
ok = publish(C, <<"t/1">>, #{x => 11, y => 1}),
?assertReceive({publish, _}),
ok = publish(C, <<"t/1">>, #{x => 7, y => 0}),
?assertNotReceive({publish, _}),
?assertNotReceive({disconnected, _, _}),
unlink(C),
ok = publish(C, <<"t/1">>, #{x => 0, y => 1}),
?assertNotReceive({publish, _}),
?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}),
ok.
t_schema_check_non_existent_serde(_Config) ->
SerdeName = <<"idontexist">>,
Name1 = <<"foo">>,
Check1 = schema_check(json, SerdeName),
Validation1 = validation(Name1, [Check1]),
{201, _} = insert(Validation1),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
ok = publish(C, <<"t/1">>, #{i => 10, s => <<"s">>}),
?assertNotReceive({publish, _}),
ok.
t_schema_check_json(_Config) ->
SerdeName = <<"myserde">>,
json_create_serde(SerdeName),
Name1 = <<"foo">>,
Check1 = schema_check(json, SerdeName),
Validation1 = validation(Name1, [Check1]),
{201, _} = insert(Validation1),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
lists:foreach(
fun(Payload) ->
ok = publish(C, <<"t/1">>, Payload),
?assertReceive({publish, _})
end,
json_valid_payloads()
),
lists:foreach(
fun(Payload) ->
ok = publish(C, <<"t/1">>, Payload),
?assertNotReceive({publish, _})
end,
json_invalid_payloads()
),
ok.
t_schema_check_avro(_Config) ->
SerdeName = <<"myserde">>,
avro_create_serde(SerdeName),
Name1 = <<"foo">>,
Check1 = schema_check(avro, SerdeName),
Validation1 = validation(Name1, [Check1]),
{201, _} = insert(Validation1),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
lists:foreach(
fun(Payload) ->
ok = publish(C, <<"t/1">>, {raw, Payload}),
?assertReceive({publish, _})
end,
avro_valid_payloads(SerdeName)
),
lists:foreach(
fun(Payload) ->
ok = publish(C, <<"t/1">>, {raw, Payload}),
?assertNotReceive({publish, _})
end,
avro_invalid_payloads()
),
ok.
t_schema_check_protobuf(_Config) ->
SerdeName = <<"myserde">>,
MessageName = <<"Person">>,
protobuf_create_serde(SerdeName),
Name1 = <<"foo">>,
Check1 = schema_check(protobuf, SerdeName, #{<<"message_name">> => MessageName}),
Validation1 = validation(Name1, [Check1]),
{201, _} = insert(Validation1),
C = connect(<<"c1">>),
{ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
lists:foreach(
fun(Payload) ->
ok = publish(C, <<"t/1">>, {raw, Payload}),
?assertReceive({publish, _})
end,
protobuf_valid_payloads(SerdeName, MessageName)
),
lists:foreach(
fun(Payload) ->
ok = publish(C, <<"t/1">>, {raw, Payload}),
?assertNotReceive({publish, _})
end,
protobuf_invalid_payloads()
),
%% Bad config: unknown message name
Check2 = schema_check(protobuf, SerdeName, #{<<"message_name">> => <<"idontexist">>}),
Validation2 = validation(Name1, [Check2]),
{200, _} = update(Validation2),
lists:foreach(
fun(Payload) ->
ok = publish(C, <<"t/1">>, {raw, Payload}),
?assertNotReceive({publish, _})
end,
protobuf_valid_payloads(SerdeName, MessageName)
),
ok.

View File

@ -0,0 +1,219 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_message_validation_tests).
-include_lib("eunit/include/eunit.hrl").
-define(VALIDATIONS_PATH, "message_validation.validations").
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
parse_and_check(InnerConfigs) ->
RootBin = <<"message_validation">>,
InnerBin = <<"validations">>,
RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
#{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
emqx_message_validation_schema,
RawConf,
#{
required => false,
atom_key => false,
make_serializable => false
}
),
Checked.
validation(Name, Checks) ->
validation(Name, Checks, _Overrides = #{}).
validation(Name, Checks, Overrides) ->
Default = #{
<<"tags">> => [<<"some">>, <<"tags">>],
<<"description">> => <<"my validation">>,
<<"enable">> => true,
<<"name">> => Name,
<<"topics">> => <<"t/+">>,
<<"strategy">> => <<"all_pass">>,
<<"failure_action">> => <<"drop">>,
<<"log_failure">> => #{<<"level">> => <<"warning">>},
<<"checks">> => Checks
},
emqx_utils_maps:deep_merge(Default, Overrides).
sql_check() ->
sql_check(<<"select * where true">>).
sql_check(SQL) ->
#{
<<"type">> => <<"sql">>,
<<"sql">> => SQL
}.
eval_sql(Message, SQL) ->
{ok, Check} = emqx_message_validation:parse_sql_check(SQL),
Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
emqx_message_validation:evaluate_sql_check(Check, Validation, Message).
message() ->
message(_Opts = #{}).
message(Opts) ->
Defaults = #{
id => emqx_guid:gen(),
qos => 0,
from => emqx_guid:to_hexstr(emqx_guid:gen()),
flags => #{retain => false},
headers => #{
proto_ver => v5,
properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
},
topic => <<"t/t">>,
payload => emqx_utils_json:encode(#{value => 10}),
timestamp => 1710272561615,
extra => []
},
emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
%%------------------------------------------------------------------------------
%% Test cases
%%------------------------------------------------------------------------------
schema_test_() ->
[
{"topics is always a list 1",
?_assertMatch(
[#{<<"topics">> := [<<"t/1">>]}],
parse_and_check([
validation(
<<"foo">>,
[sql_check()],
#{<<"topics">> => <<"t/1">>}
)
])
)},
{"topics is always a list 2",
?_assertMatch(
[#{<<"topics">> := [<<"t/1">>]}],
parse_and_check([
validation(
<<"foo">>,
[sql_check()],
#{<<"topics">> => [<<"t/1">>]}
)
])
)},
{"foreach expression is not allowed",
?_assertThrow(
{_Schema, [
#{
reason := foreach_not_allowed,
kind := validation_error
}
]},
parse_and_check([
validation(
<<"foo">>,
[sql_check(<<"foreach foo as f where true">>)]
)
])
)},
{"from clause is not allowed",
?_assertThrow(
{_Schema, [
#{
reason := non_empty_from_clause,
kind := validation_error
}
]},
parse_and_check([
validation(
<<"foo">>,
[sql_check(<<"select * from t">>)]
)
])
)},
{"names are unique",
?_assertThrow(
{_Schema, [
#{
reason := <<"duplicated name:", _/binary>>,
path := ?VALIDATIONS_PATH,
kind := validation_error
}
]},
parse_and_check([
validation(<<"foo">>, [sql_check()]),
validation(<<"foo">>, [sql_check()])
])
)},
{"checks must be non-empty",
?_assertThrow(
{_Schema, [
#{
reason := "at least one check must be defined",
kind := validation_error
}
]},
parse_and_check([
validation(
<<"foo">>,
[]
)
])
)},
{"bogus check type",
?_assertThrow(
{_Schema, [
#{
expected := <<"sql", _/binary>>,
kind := validation_error,
field_name := type
}
]},
parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
)}
].
invalid_names_test_() ->
[
{InvalidName,
?_assertThrow(
{_Schema, [
#{
reason := <<"must conform to regex:", _/binary>>,
kind := validation_error,
path := "message_validation.validations.1.name"
}
]},
parse_and_check([validation(InvalidName, [sql_check()])])
)}
|| InvalidName <- [
<<"">>,
<<"_name">>,
<<"name$">>,
<<"name!">>,
<<"some name">>,
<<"nãme"/utf8>>,
<<"test_哈哈"/utf8>>
]
].
check_test_() ->
[
{"denied by payload 1",
?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
{"denied by payload 2",
?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
{"allowed by payload 1",
?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
{"allowed by payload 2",
?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
{"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
{"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
{"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
{"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
{"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
].

View File

@ -51,12 +51,6 @@
get_rules_ordered_by_ts/0
]).
%% exported for cluster_call
-export([
do_delete_rule/1,
do_insert_rule/1
]).
-export([
load_hooks_for_rule/1,
unload_hooks_for_rule/1,

View File

@ -41,4 +41,5 @@ stop(_State) ->
RulePath = [RuleEngine | _] = ?KEY_PATH,
emqx_conf:remove_handler(RulePath ++ ['?']),
emqx_conf:remove_handler([RuleEngine]),
ok = emqx_rule_events:unload().
ok = emqx_rule_events:unload(),
ok.

View File

@ -38,7 +38,8 @@ namespace() -> rule_engine.
tags() ->
[<<"Rule Engine">>].
roots() -> [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}].
roots() ->
[{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}].
fields("rule_engine") ->
rule_engine_settings() ++

View File

@ -28,13 +28,25 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
Registry = #{
id => emqx_rule_engine,
start => {emqx_rule_engine, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_rule_engine]
RuleEngineRegistry = worker_spec(emqx_rule_engine),
RuleEngineMetrics = emqx_metrics_worker:child_spec(rule_metrics),
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 10
},
Metrics = emqx_metrics_worker:child_spec(rule_metrics),
{ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
Children = [
RuleEngineRegistry,
RuleEngineMetrics
],
{ok, {SupFlags, Children}}.
worker_spec(Mod) ->
#{
id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5_000,
type => worker,
modules => [Mod]
}.

View File

@ -26,6 +26,9 @@
inc_action_metrics/2
]).
%% Internal exports used by message validation
-export([evaluate_select/3, clear_rule_payload/0]).
-import(
emqx_rule_maps,
[
@ -129,27 +132,16 @@ do_apply_rule(
Columns,
Envs
) ->
{Selected, Collection} = ?RAISE(
select_and_collect(Fields, Columns),
{select_and_collect_error, {EXCLASS, EXCPTION, ST}}
),
ColumnsAndSelected = maps:merge(Columns, Selected),
case
?RAISE(
match_conditions(Conditions, ColumnsAndSelected),
{match_conditions_error, {EXCLASS, EXCPTION, ST}}
)
of
true ->
Collection2 = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection),
case Collection2 of
case evaluate_foreach(Fields, Columns, Conditions, InCase, DoEach) of
{ok, ColumnsAndSelected, FinalCollection} ->
case FinalCollection of
[] ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
_ ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
end,
NewEnvs = maps:merge(ColumnsAndSelected, Envs),
{ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]};
{ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]};
false ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
{error, nomatch}
@ -165,6 +157,16 @@ do_apply_rule(
Columns,
Envs
) ->
case evaluate_select(Fields, Columns, Conditions) of
{ok, Selected} ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
{ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
false ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
{error, nomatch}
end.
evaluate_select(Fields, Columns, Conditions) ->
Selected = ?RAISE(
select_and_transform(Fields, Columns),
{select_and_transform_error, {EXCLASS, EXCPTION, ST}}
@ -176,11 +178,28 @@ do_apply_rule(
)
of
true ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
{ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
{ok, Selected};
false ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
{error, nomatch}
false
end.
evaluate_foreach(Fields, Columns, Conditions, InCase, DoEach) ->
{Selected, Collection} = ?RAISE(
select_and_collect(Fields, Columns),
{select_and_collect_error, {EXCLASS, EXCPTION, ST}}
),
ColumnsAndSelected = maps:merge(Columns, Selected),
case
?RAISE(
match_conditions(Conditions, ColumnsAndSelected),
{match_conditions_error, {EXCLASS, EXCPTION, ST}}
)
of
true ->
FinalCollection = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection),
{ok, ColumnsAndSelected, FinalCollection};
false ->
false
end.
clear_rule_payload() ->
@ -281,6 +300,10 @@ match_conditions({'fun', {_, Name}, Args}, Data) ->
apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
compare(Op, eval(L, Data), eval(R, Data));
match_conditions({const, true}, _Data) ->
true;
match_conditions({const, false}, _Data) ->
false;
match_conditions({}, _Data) ->
true.

View File

@ -18,7 +18,7 @@
-include("rule_engine.hrl").
-export([parse/1]).
-export([parse/1, parse/2]).
-export([
select_fields/1,
@ -62,37 +62,29 @@
-type field() :: const() | variable() | {as, field(), alias()} | sql_func().
-type parse_opts() :: #{
%% Whether `from' clause should be mandatory.
%% Default: `true'.
with_from => boolean()
}.
-export_type([select/0]).
%% Parse one select statement.
-spec parse(string() | binary()) -> {ok, select()} | {error, term()}.
parse(Sql) ->
try
case rulesql:parsetree(Sql) of
{ok, {select, Clauses}} ->
{ok, #select{
is_foreach = false,
fields = get_value(fields, Clauses),
doeach = [],
incase = {},
from = get_value(from, Clauses),
where = get_value(where, Clauses)
}};
{ok, {foreach, Clauses}} ->
{ok, #select{
is_foreach = true,
fields = get_value(fields, Clauses),
doeach = get_value(do, Clauses, []),
incase = get_value(incase, Clauses, {}),
from = get_value(from, Clauses),
where = get_value(where, Clauses)
}};
Error ->
{error, Error}
end
catch
_Error:Reason:StackTrace ->
{error, {Reason, StackTrace}}
parse(Sql, _Opts = #{}).
-spec parse(string() | binary(), parse_opts()) -> {ok, select()} | {error, term()}.
parse(Sql, Opts) ->
WithFrom = maps:get(with_from, Opts, true),
case do_parse(Sql) of
{ok, Parsed} when WithFrom ->
ensure_non_empty_from(Parsed);
{ok, Parsed} ->
ensure_empty_from(Parsed);
Error = {error, _} ->
Error
end.
-spec select_fields(select()) -> list(field()).
@ -118,3 +110,45 @@ select_from(#select{from = From}) ->
-spec select_where(select()) -> tuple().
select_where(#select{where = Where}) ->
Where.
-spec do_parse(string() | binary()) -> {ok, select()} | {error, term()}.
do_parse(Sql) ->
try
case rulesql:parsetree(Sql) of
{ok, {select, Clauses}} ->
Parsed = #select{
is_foreach = false,
fields = get_value(fields, Clauses),
doeach = [],
incase = {},
from = get_value(from, Clauses),
where = get_value(where, Clauses)
},
{ok, Parsed};
{ok, {foreach, Clauses}} ->
Parsed = #select{
is_foreach = true,
fields = get_value(fields, Clauses),
doeach = get_value(do, Clauses, []),
incase = get_value(incase, Clauses, {}),
from = get_value(from, Clauses),
where = get_value(where, Clauses)
},
{ok, Parsed};
Error ->
{error, Error}
end
catch
_Error:Reason:StackTrace ->
{error, {Reason, StackTrace}}
end.
ensure_non_empty_from(#select{from = []}) ->
{error, empty_from_clause};
ensure_non_empty_from(Parsed) ->
{ok, Parsed}.
ensure_empty_from(#select{from = [_ | _]}) ->
{error, non_empty_from_clause};
ensure_empty_from(Parsed) ->
{ok, Parsed}.

View File

@ -27,12 +27,19 @@
%%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
init_per_suite(Config) ->
application:load(emqx_conf),
ConfigConf = <<"rule_engine {jq_function_default_timeout=10s}">>,
ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf),
Config.
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
{emqx_rule_engine, "rule_engine {jq_function_default_timeout=10s}"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_Config) ->
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
eventmsg_publish(Msg) ->

View File

@ -13,6 +13,7 @@
-export([
make_serde/3,
handle_rule_function/2,
schema_check/3,
destroy/1
]).

View File

@ -66,7 +66,8 @@
tcp_keepalive_opts/4,
format/1,
call_first_defined/1,
ntoa/1
ntoa/1,
foldl_while/3
]).
-export([
@ -176,6 +177,17 @@ pipeline([Fun | More], Input, State) ->
{error, Reason, NState} -> {error, Reason, NState}
end.
-spec foldl_while(fun((X, Acc) -> {cont | halt, Acc}), Acc, [X]) -> Acc.
foldl_while(_Fun, Acc, []) ->
Acc;
foldl_while(Fun, Acc, [X | Xs]) ->
case Fun(X, Acc) of
{cont, NewAcc} ->
foldl_while(Fun, NewAcc, Xs);
{halt, NewAcc} ->
NewAcc
end.
-compile({inline, [apply_fun/3]}).
apply_fun(Fun, Input, State) ->
case erlang:fun_info(Fun, arity) of

View File

@ -158,7 +158,9 @@ deep_convert(Val, _, _Args) ->
unsafe_atom_key_map(Map) ->
convert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
-spec binary_key_map(map()) -> map().
-spec binary_key_map
(map()) -> map();
(list()) -> list().
binary_key_map(Map) ->
deep_convert(
Map,

View File

@ -28,3 +28,37 @@ is_redacted_test_() ->
?_assert(emqx_utils:is_redacted(password, fun() -> <<"******">> end)),
?_assert(emqx_utils:is_redacted(password, emqx_secret:wrap(<<"******">>)))
].
foldl_while_test_() ->
[
?_assertEqual(
[3, 2, 1],
emqx_utils:foldl_while(fun(X, Acc) -> {cont, [X | Acc]} end, [], [1, 2, 3])
),
?_assertEqual(
[1],
emqx_utils:foldl_while(
fun
(X, Acc) when X == 2 ->
{halt, Acc};
(X, Acc) ->
{cont, [X | Acc]}
end,
[],
[1, 2, 3]
)
),
?_assertEqual(
finished,
emqx_utils:foldl_while(
fun
(X, _Acc) when X == 3 ->
{halt, finished};
(X, Acc) ->
{cont, [X | Acc]}
end,
[],
[1, 2, 3]
)
)
].

View File

@ -0,0 +1,3 @@
Implemented message validation feature.
With this feature, once validations are configured for certain topic filters, the configured checks are run against published messages and, if they are not accepted by a validation, the message is dropped and the client may be disconnected, depending on the configuration.

View File

@ -65,7 +65,7 @@ defmodule EMQXUmbrella.MixProject do
# maybe forbid to fetch quicer
{:emqtt,
github: "emqx/emqtt", tag: "1.10.1", override: true, system_env: maybe_no_quic_env()},
{:rulesql, github: "emqx/rulesql", tag: "0.1.8"},
{:rulesql, github: "emqx/rulesql", tag: "0.2.0"},
{:observer_cli, "1.7.1"},
{:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
{:telemetry, "1.1.0"},
@ -184,6 +184,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_s3,
:emqx_bridge_s3,
:emqx_schema_registry,
:emqx_message_validation,
:emqx_enterprise,
:emqx_bridge_kinesis,
:emqx_bridge_azure_event_hub,

View File

@ -91,7 +91,7 @@
{replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}},
{rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.8"}}},
{rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.0"}}},
% NOTE: depends on recon 2.5.x
{observer_cli, "1.7.1"},
{system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}},
@ -129,7 +129,7 @@
emqx_exproto_pb
]}.
{eunit_opts, [verbose]}.
{eunit_opts, [verbose, {print_depth, 100}]}.
{project_plugins, [
{erlfmt, "1.3.0"},

View File

@ -115,6 +115,7 @@ is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
is_community_umbrella_app("apps/emqx_message_validation") -> false;
is_community_umbrella_app(_) -> true.
%% BUILD_WITHOUT_JQ

View File

@ -0,0 +1,24 @@
emqx_message_validation_http_api {
list_validations.desc:
"""List validations"""
lookup_validation.desc:
"""Lookup a validation"""
update_validation.desc:
"""Update a validation"""
delete_validation.desc:
"""Delete a validation"""
append_validation.desc:
"""Append a new validation to the list of validations"""
reorder_validations.desc:
"""Reorder of all validations"""
param_path_name.desc:
"""Validation name"""
}

View File

@ -0,0 +1,88 @@
emqx_message_validation_schema {
check_avro_type.desc:
"""Avro schema check"""
check_avro_type.label:
"""Avro schema check"""
check_avro_schema.desc:
"""Schema name to use during check."""
check_avro_schema.label:
"""Schema name"""
check_json_type.desc:
"""JSON schema check"""
check_json_type.label:
"""JSON schema check"""
check_json_schema.desc:
"""Schema name to use during check."""
check_json_schema.label:
"""Schema name"""
check_protobuf_type.desc:
"""Protobuf schema check"""
check_protobuf_type.label:
"""Protobuf schema check"""
check_protobuf_schema.desc:
"""Schema name to use during check."""
check_protobuf_schema.label:
"""Schema name"""
check_protobuf_message_name.desc:
"""Message name to use during check."""
check_protobuf_message_name.label:
"""Message name"""
check_sql_type.desc:
"""Use rule-engine's SQL to validate the message. SQL here is the same as in rule-engine,
just with the different that the `FROM` clause must be omitted.
A SQL statement which yields any value is considered successfully validated, otherwise failed.
For example <code>SELECT payload.foo + payload.bar as sum WHERE sum > 0</code>
validates that the sum of field `foo` and `bar` is a positive value."""
check_sql_type.label:
"""SQL schema check"""
check_sql_schema.desc:
"""Schema name to use during check."""
check_sql_schema.label:
"""Schema name"""
topics.desc:
"""A single topic filter or list of topic filters that this validation should validate."""
topics.label:
"""Topic filter(s)"""
name.desc:
"""The name for this validation. Must be unique among all validations. It must be a combination of alphanumeric characters and underscores, and cannot start with neither number nor an underscore."""
name.desc:
"""Name"""
strategy.desc:
"""How the validation should consider the checks to be successful.
<code>all_pass</code>: All checks will be evaluated and must pass.
<code>any_pass</code>: Any passing check will suffice. Stops at the first success."""
strategy.desc:
"""Strategy"""
failure_action.desc:
"""How to proceed if the validation fails.
<code>drop</code>: The offending message is simply dropped without further processing.
<code>disconnect</code>: The message is not published, and the publishing client is disconnected."""
failure_action.label:
"""Failure action"""
log_failure_at.desc:
"""Log level at which failures will be logged."""
log_failure_at.label:
"""Failure log level"""
checks.desc:
"""Checks that will be performed during validation. They are evaluated in the same order as defined."""
checks.label:
"""Checks"""
}