95 lines
3.5 KiB
Erlang
95 lines
3.5 KiB
Erlang
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
|
|
-module(emqx_rule_sqltester).
|
|
|
|
-include("rule_engine.hrl").
|
|
|
|
-export([ test/1
|
|
]).
|
|
|
|
-spec(test(#{}) -> {ok, map() | list()} | {error, term()}).
|
|
test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
|
|
{ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
|
|
InTopic = maps:get(<<"topic">>, Context, <<>>),
|
|
EventTopics = emqx_rule_sqlparser:select_from(Select),
|
|
case lists:all(fun is_publish_topic/1, EventTopics) of
|
|
true ->
|
|
%% test if the topic matches the topic filters in the rule
|
|
case emqx_rule_utils:can_topic_match_oneof(InTopic, EventTopics) of
|
|
true -> test_rule(Sql, Select, Context, EventTopics);
|
|
false -> {error, nomatch}
|
|
end;
|
|
false ->
|
|
%% the rule is for both publish and events, test it directly
|
|
test_rule(Sql, Select, Context, EventTopics)
|
|
end.
|
|
|
|
test_rule(Sql, Select, Context, EventTopics) ->
|
|
RuleId = iolist_to_binary(["test_rule", emqx_rule_id:gen()]),
|
|
ActInstId = iolist_to_binary(["test_action", emqx_rule_id:gen()]),
|
|
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
|
|
ok = emqx_rule_metrics:create_metrics(ActInstId),
|
|
Rule = #rule{
|
|
id = RuleId,
|
|
rawsql = Sql,
|
|
for = EventTopics,
|
|
is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
|
|
fields = emqx_rule_sqlparser:select_fields(Select),
|
|
doeach = emqx_rule_sqlparser:select_doeach(Select),
|
|
incase = emqx_rule_sqlparser:select_incase(Select),
|
|
conditions = emqx_rule_sqlparser:select_where(Select),
|
|
actions = [#action_instance{
|
|
id = ActInstId,
|
|
name = test_rule_sql}]
|
|
},
|
|
FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
|
|
try
|
|
ok = emqx_rule_registry:add_action_instance_params(
|
|
#action_instance_params{id = ActInstId,
|
|
params = #{},
|
|
apply = sql_test_action()}),
|
|
R = emqx_rule_runtime:apply_rule(Rule, FullContext),
|
|
emqx_rule_metrics:clear_rule_metrics(RuleId),
|
|
emqx_rule_metrics:clear_metrics(ActInstId),
|
|
R
|
|
of
|
|
{ok, Data} -> {ok, flatten(Data)};
|
|
{error, Reason} -> {error, Reason}
|
|
after
|
|
ok = emqx_rule_registry:remove_action_instance_params(ActInstId)
|
|
end.
|
|
|
|
is_publish_topic(<<"$events/", _/binary>>) -> false;
|
|
is_publish_topic(_Topic) -> true.
|
|
|
|
flatten([]) -> [];
|
|
flatten([D1]) -> D1;
|
|
flatten([D1 | L]) when is_list(D1) ->
|
|
D1 ++ flatten(L).
|
|
|
|
sql_test_action() ->
|
|
fun(Data, _Envs) ->
|
|
?LOG(info, "Testing Rule SQL OK"), Data
|
|
end.
|
|
|
|
fill_default_values(Event, Context) ->
|
|
maps:merge(envs_examp(Event), Context).
|
|
|
|
envs_examp(EVENT_TOPIC) ->
|
|
EventName = emqx_rule_events:event_name(EVENT_TOPIC),
|
|
emqx_rule_maps:atom_key_map(
|
|
maps:from_list(
|
|
emqx_rule_events:columns_with_exam(EventName))).
|