99 lines
3.5 KiB
Erlang
99 lines
3.5 KiB
Erlang
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
|
|
-module(emqx_rule_sqltester).
|
|
|
|
-include("rule_engine.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-export([
|
|
test/1,
|
|
echo_action/2,
|
|
get_selected_data/3
|
|
]).
|
|
|
|
-spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}.
|
|
test(#{sql := Sql, context := Context}) ->
|
|
case emqx_rule_sqlparser:parse(Sql) of
|
|
{ok, Select} ->
|
|
InTopic = maps:get(topic, Context, <<>>),
|
|
EventTopics = emqx_rule_sqlparser:select_from(Select),
|
|
case lists:all(fun is_publish_topic/1, EventTopics) of
|
|
true ->
|
|
%% test if the topic matches the topic filters in the rule
|
|
case emqx_plugin_libs_rule:can_topic_match_oneof(InTopic, EventTopics) of
|
|
true -> test_rule(Sql, Select, Context, EventTopics);
|
|
false -> {error, nomatch}
|
|
end;
|
|
false ->
|
|
%% the rule is for both publish and events, test it directly
|
|
test_rule(Sql, Select, Context, EventTopics)
|
|
end;
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
test_rule(Sql, Select, Context, EventTopics) ->
|
|
RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
|
|
ok = emqx_rule_engine:maybe_add_metrics_for_rule(RuleId),
|
|
Rule = #{
|
|
id => RuleId,
|
|
sql => Sql,
|
|
from => EventTopics,
|
|
actions => [#{mod => ?MODULE, func => get_selected_data, args => #{}}],
|
|
enable => true,
|
|
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
|
|
fields => emqx_rule_sqlparser:select_fields(Select),
|
|
doeach => emqx_rule_sqlparser:select_doeach(Select),
|
|
incase => emqx_rule_sqlparser:select_incase(Select),
|
|
conditions => emqx_rule_sqlparser:select_where(Select),
|
|
created_at => erlang:system_time(millisecond)
|
|
},
|
|
FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
|
|
try emqx_rule_runtime:apply_rule(Rule, FullContext, #{}) of
|
|
{ok, Data} -> {ok, flatten(Data)};
|
|
{error, Reason} -> {error, Reason}
|
|
after
|
|
ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
|
|
end.
|
|
|
|
get_selected_data(Selected, _Envs, _Args) ->
|
|
Selected.
|
|
|
|
is_publish_topic(<<"$events/", _/binary>>) -> false;
|
|
is_publish_topic(<<"$bridges/", _/binary>>) -> false;
|
|
is_publish_topic(_Topic) -> true.
|
|
|
|
flatten([]) ->
|
|
[];
|
|
flatten([D1]) ->
|
|
D1;
|
|
flatten([D1 | L]) when is_list(D1) ->
|
|
D1 ++ flatten(L).
|
|
|
|
echo_action(Data, Envs) ->
|
|
?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}),
|
|
Data.
|
|
|
|
fill_default_values(Event, Context) ->
|
|
maps:merge(envs_examp(Event), Context).
|
|
|
|
envs_examp(EventTopic) ->
|
|
EventName = emqx_rule_events:event_name(EventTopic),
|
|
emqx_rule_maps:atom_key_map(
|
|
maps:from_list(
|
|
emqx_rule_events:columns_with_exam(EventName)
|
|
)
|
|
).
|