emqx/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

95 lines
3.5 KiB
Erlang

%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_rule_sqltester).
-include("rule_engine.hrl").
-export([ test/1
]).
-spec(test(#{}) -> {ok, map() | list()} | {error, term()}).
test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
{ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
InTopic = maps:get(<<"topic">>, Context, <<>>),
EventTopics = emqx_rule_sqlparser:select_from(Select),
case lists:all(fun is_publish_topic/1, EventTopics) of
true ->
%% test if the topic matches the topic filters in the rule
case emqx_rule_utils:can_topic_match_oneof(InTopic, EventTopics) of
true -> test_rule(Sql, Select, Context, EventTopics);
false -> {error, nomatch}
end;
false ->
%% the rule is for both publish and events, test it directly
test_rule(Sql, Select, Context, EventTopics)
end.
test_rule(Sql, Select, Context, EventTopics) ->
RuleId = iolist_to_binary(["test_rule", emqx_rule_id:gen()]),
ActInstId = iolist_to_binary(["test_action", emqx_rule_id:gen()]),
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
ok = emqx_rule_metrics:create_metrics(ActInstId),
Rule = #rule{
id = RuleId,
rawsql = Sql,
for = EventTopics,
is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
fields = emqx_rule_sqlparser:select_fields(Select),
doeach = emqx_rule_sqlparser:select_doeach(Select),
incase = emqx_rule_sqlparser:select_incase(Select),
conditions = emqx_rule_sqlparser:select_where(Select),
actions = [#action_instance{
id = ActInstId,
name = test_rule_sql}]
},
FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
try
ok = emqx_rule_registry:add_action_instance_params(
#action_instance_params{id = ActInstId,
params = #{},
apply = sql_test_action()}),
R = emqx_rule_runtime:apply_rule(Rule, FullContext),
emqx_rule_metrics:clear_rule_metrics(RuleId),
emqx_rule_metrics:clear_metrics(ActInstId),
R
of
{ok, Data} -> {ok, flatten(Data)};
{error, Reason} -> {error, Reason}
after
ok = emqx_rule_registry:remove_action_instance_params(ActInstId)
end.
is_publish_topic(<<"$events/", _/binary>>) -> false;
is_publish_topic(_Topic) -> true.
flatten([]) -> [];
flatten([D1]) -> D1;
flatten([D1 | L]) when is_list(D1) ->
D1 ++ flatten(L).
sql_test_action() ->
fun(Data, _Envs) ->
?LOG(info, "Testing Rule SQL OK"), Data
end.
fill_default_values(Event, Context) ->
maps:merge(envs_examp(Event), Context).
envs_examp(EVENT_TOPIC) ->
EventName = emqx_rule_events:event_name(EVENT_TOPIC),
emqx_rule_maps:atom_key_map(
maps:from_list(
emqx_rule_events:columns_with_exam(EventName))).