* fix(dialyzer): remove the dialyzer errors

This commit is contained in:
Shawn 2020-12-17 10:04:49 +08:00 committed by GitHub
parent 0fe86341af
commit 0a44270932
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 414 additions and 475 deletions

View File

@ -2,7 +2,7 @@
-type selected_data() :: map().
-type env_vars() :: map().
-type bindings() :: list(#{atom() => term()}).
-type bindings() :: list({atom(), term()}).
-define(BINDING_KEYS, '__bindings__').

View File

@ -35,6 +35,8 @@
-type(hook() :: atom() | 'any').
-type(topic() :: binary()).
-type(resource_status() :: #{ alive := boolean()
, atom() => binary() | atom() | list(binary()|atom())
}).
@ -65,7 +67,7 @@
-record(rule,
{ id :: rule_id()
, for :: hook()
, for :: list(topic())
, rawsql :: binary()
, is_foreach :: boolean()
, fields :: list()
@ -110,9 +112,11 @@
-record(action_instance_params,
{ id :: action_instance_id()
, params :: #{} %% the params got after initializing the action
%% the params got after initializing the action
, params :: #{}
%% the Func/Bindings got after initializing the action
, apply :: fun((Data::map(), Envs::map()) -> any())
| {M::module(), F::atom(), Args::list()} %% the func got after initializing the action
| #{mod := module(), bindings := #{atom() => term()}}
}).
%% Arithmetic operators

View File

@ -1,285 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(COLUMNS(EVENT), [Key || {Key, _ExampleVal} <- ?EG_COLUMNS(EVENT)]).
-define(EG_COLUMNS(EVENT),
case EVENT of
'message.publish' ->
[ {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"headers">>, undefined}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'message.delivered' ->
[ {<<"event">>, 'message.delivered'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'message.acked' ->
[ {<<"event">>, 'message.acked'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'message.dropped' ->
[ {<<"event">>, 'message.dropped'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"reason">>, no_subscribers}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'client.connected' ->
[ {<<"event">>, 'client.connected'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"mountpoint">>, undefined}
, {<<"peername">>, <<"192.168.0.10:56431">>}
, {<<"sockname">>, <<"0.0.0.0:1883">>}
, {<<"proto_name">>, <<"MQTT">>}
, {<<"proto_ver">>, 5}
, {<<"keepalive">>, 60}
, {<<"clean_start">>, true}
, {<<"expiry_interval">>, 3600}
, {<<"is_bridge">>, false}
, {<<"connected_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'client.disconnected' ->
[ {<<"event">>, 'client.disconnected'}
, {<<"reason">>, normal}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peername">>, <<"192.168.0.10:56431">>}
, {<<"sockname">>, <<"0.0.0.0:1883">>}
, {<<"disconnected_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'session.subscribed' ->
[ {<<"event">>, 'session.subscribed'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
'session.unsubscribed' ->
[ {<<"event">>, 'session.unsubscribed'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
RuleType ->
error({unknown_rule_type, RuleType})
end).
-define(TEST_COLUMNS_MESSGE,
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
]).
-define(TEST_COLUMNS_MESSGE_DELIVERED_ACKED,
[ {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
]).
-define(TEST_COLUMNS(EVENT),
case EVENT of
'message.publish' -> ?TEST_COLUMNS_MESSGE;
'message.dropped' -> ?TEST_COLUMNS_MESSGE;
'message.delivered' -> ?TEST_COLUMNS_MESSGE_DELIVERED_ACKED;
'message.acked' -> ?TEST_COLUMNS_MESSGE_DELIVERED_ACKED;
'client.connected' ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peername">>, <<"127.0.0.1:52918">>}
];
'client.disconnected' ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"reason">>, <<"normal">>}
];
'session.subscribed' ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
];
'session.unsubscribed' ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
];
RuleType ->
error({unknown_rule_type, RuleType})
end).
-define(EVENT_INFO_MESSAGE_PUBLISH,
#{ event => '$events/message_publish',
title => #{en => <<"message publish">>, zh => <<"消息发布"/utf8>>},
description => #{en => <<"message publish">>, zh => <<"消息发布"/utf8>>},
test_columns => ?TEST_COLUMNS('message.publish'),
columns => ?COLUMNS('message.publish'),
sql_example => <<"SELECT payload.msg as msg FROM \"t/#\" WHERE msg = 'hello'">>
}).
-define(EVENT_INFO_MESSAGE_DELIVER,
#{ event => '$events/message_delivered',
title => #{en => <<"message delivered">>, zh => <<"消息投递"/utf8>>},
description => #{en => <<"message delivered">>, zh => <<"消息投递"/utf8>>},
test_columns => ?TEST_COLUMNS('message.delivered'),
columns => ?COLUMNS('message.delivered'),
sql_example => <<"SELECT * FROM \"$events/message_delivered\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO_MESSAGE_ACKED,
#{ event => '$events/message_acked',
title => #{en => <<"message acked">>, zh => <<"消息应答"/utf8>>},
description => #{en => <<"message acked">>, zh => <<"消息应答"/utf8>>},
test_columns => ?TEST_COLUMNS('message.acked'),
columns => ?COLUMNS('message.acked'),
sql_example => <<"SELECT * FROM \"$events/message_acked\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO_MESSAGE_DROPPED,
#{ event => '$events/message_dropped',
title => #{en => <<"message dropped">>, zh => <<"消息丢弃"/utf8>>},
description => #{en => <<"message dropped">>, zh => <<"消息丢弃"/utf8>>},
test_columns => ?TEST_COLUMNS('message.dropped'),
columns => ?COLUMNS('message.dropped'),
sql_example => <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO_CLIENT_CONNECTED,
#{ event => '$events/client_connected',
title => #{en => <<"client connected">>, zh => <<"连接建立"/utf8>>},
description => #{en => <<"client connected">>, zh => <<"连接建立"/utf8>>},
test_columns => ?TEST_COLUMNS('client.connected'),
columns => ?COLUMNS('client.connected'),
sql_example => <<"SELECT * FROM \"$events/client_connected\"">>
}).
-define(EVENT_INFO_CLIENT_DISCONNECTED,
#{ event => '$events/client_disconnected',
title => #{en => <<"client disconnected">>, zh => <<"连接断开"/utf8>>},
description => #{en => <<"client disconnected">>, zh => <<"连接断开"/utf8>>},
test_columns => ?TEST_COLUMNS('client.disconnected'),
columns => ?COLUMNS('client.disconnected'),
sql_example => <<"SELECT * FROM \"$events/client_disconnected\"">>
}).
-define(EVENT_INFO_SESSION_SUBSCRIBED,
#{ event => '$events/session_subscribed',
title => #{en => <<"session subscribed">>, zh => <<"会话订阅完成"/utf8>>},
description => #{en => <<"session subscribed">>, zh => <<"会话订阅完成"/utf8>>},
test_columns => ?TEST_COLUMNS('session.subscribed'),
columns => ?COLUMNS('session.subscribed'),
sql_example => <<"SELECT * FROM \"$events/session_subscribed\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO_SESSION_UNSUBSCRIBED,
#{ event => '$events/session_unsubscribed',
title => #{en => <<"session unsubscribed">>, zh => <<"会话取消订阅完成"/utf8>>},
description => #{en => <<"session unsubscribed">>, zh => <<"会话取消订阅完成"/utf8>>},
test_columns => ?TEST_COLUMNS('session.unsubscribed'),
columns => ?COLUMNS('session.unsubscribed'),
sql_example => <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">>
}).
-define(EVENT_INFO,
[ ?EVENT_INFO_MESSAGE_PUBLISH
, ?EVENT_INFO_MESSAGE_DELIVER
, ?EVENT_INFO_MESSAGE_ACKED
, ?EVENT_INFO_MESSAGE_DROPPED
, ?EVENT_INFO_CLIENT_CONNECTED
, ?EVENT_INFO_CLIENT_DISCONNECTED
, ?EVENT_INFO_SESSION_SUBSCRIBED
, ?EVENT_INFO_SESSION_UNSUBSCRIBED
]).
-define(EG_ENVS(EVENT_TOPIC),
case EVENT_TOPIC of
<<"$events/", _/binary>> ->
EventName = emqx_rule_events:event_name(EVENT_TOPIC),
emqx_rule_maps:atom_key_map(maps:from_list(?EG_COLUMNS(EventName)));
_PublishTopic ->
#{id => emqx_guid:to_hexstr(emqx_guid:gen()),
clientid => <<"c_emqx">>,
username => <<"u_emqx">>,
payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,
peerhost => <<"127.0.0.1">>,
topic => <<"t/a">>,
qos => 1,
flags => #{sys => true, event => true},
publish_received_at => emqx_rule_utils:now_ms(),
timestamp => emqx_rule_utils:now_ms(),
node => node()
}
end).

View File

@ -106,7 +106,6 @@
, on_action_do_nothing/2
]).
-spec(on_resource_create(binary(), map()) -> map()).
on_resource_create(_Name, Conf) ->
Conf.
@ -114,8 +113,7 @@ on_resource_create(_Name, Conf) ->
%%------------------------------------------------------------------------------
%% Action 'inspect'
%%------------------------------------------------------------------------------
-spec on_action_create_inspect(action_instance_id(), Params :: map())
-> NewParams :: map().
-spec on_action_create_inspect(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
on_action_create_inspect(Id, Params) ->
Params.
@ -131,8 +129,7 @@ on_action_inspect(Selected, Envs) ->
%%------------------------------------------------------------------------------
%% Action 'republish'
%%------------------------------------------------------------------------------
-spec on_action_create_republish(action_instance_id(), Params :: map())
-> NewParams :: map().
-spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
on_action_create_republish(Id, Params = #{
<<"target_topic">> := TargetTopic,
<<"target_qos">> := TargetQoS,
@ -203,8 +200,7 @@ increase_and_publish(ActId, Msg) ->
emqx_rule_metrics:inc_actions_success(ActId),
emqx_metrics:inc_msg(Msg).
-spec on_action_create_do_nothing(action_instance_id(), Params :: map())
-> NewParams :: map().
-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
on_action_create_do_nothing(ActId, Params) when is_binary(ActId) ->
Params.

View File

@ -72,8 +72,9 @@
%% Load all providers .
-spec(load_providers() -> ok).
load_providers() ->
[load_provider(App) || App <- ignore_lib_apps(application:loaded_applications())],
ok.
lists:foreach(fun(App) ->
load_provider(App)
end, ignore_lib_apps(application:loaded_applications())).
-spec(load_provider(App :: atom()) -> ok).
load_provider(App) when is_atom(App) ->
@ -86,8 +87,9 @@ load_provider(App) when is_atom(App) ->
%% Load all providers .
-spec(unload_providers() -> ok).
unload_providers() ->
[unload_provider(App) || App <- ignore_lib_apps(application:loaded_applications())],
ok.
lists:foreach(fun(App) ->
unload_provider(App)
end, ignore_lib_apps(application:loaded_applications())).
%% @doc Unload a provider.
-spec(unload_provider(App :: atom()) -> ok).
@ -147,8 +149,7 @@ find_attrs(App, Def) ->
module_attributes(Module) ->
try Module:module_info(attributes)
catch
error:undef -> [];
error:Reason -> error(Reason)
error:undef -> []
end.
%%------------------------------------------------------------------------------
@ -156,37 +157,52 @@ module_attributes(Module) ->
%%------------------------------------------------------------------------------
-dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]).
-spec(create_rule(#{}) -> {ok, rule()} | no_return()).
create_rule(Params = #{rawsql := Sql, actions := Actions}) ->
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
create_rule(Params = #{rawsql := Sql, actions := ActArgs}) ->
case emqx_rule_sqlparser:parse_select(Sql) of
{ok, Select} ->
RuleId = maps:get(id, Params, rule_id()),
Enabled = maps:get(enabled, Params, true),
Rule = #rule{id = RuleId,
rawsql = Sql,
for = emqx_rule_sqlparser:select_from(Select),
is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
fields = emqx_rule_sqlparser:select_fields(Select),
doeach = emqx_rule_sqlparser:select_doeach(Select),
incase = emqx_rule_sqlparser:select_incase(Select),
conditions = emqx_rule_sqlparser:select_where(Select),
on_action_failed = maps:get(on_action_failed, Params, continue),
actions = prepare_actions(Actions, Enabled),
enabled = Enabled,
description = maps:get(description, Params, "")},
ok = emqx_rule_registry:add_rule(Rule),
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
{ok, Rule};
Error -> error(Error)
try prepare_actions(ActArgs, Enabled) of
Actions ->
Rule = #rule{
id = RuleId,
rawsql = Sql,
for = emqx_rule_sqlparser:select_from(Select),
is_foreach = emqx_rule_sqlparser:select_is_foreach(Select),
fields = emqx_rule_sqlparser:select_fields(Select),
doeach = emqx_rule_sqlparser:select_doeach(Select),
incase = emqx_rule_sqlparser:select_incase(Select),
conditions = emqx_rule_sqlparser:select_where(Select),
on_action_failed = maps:get(on_action_failed, Params, continue),
actions = Actions,
enabled = Enabled,
description = maps:get(description, Params, "")
},
ok = emqx_rule_registry:add_rule(Rule),
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
{ok, Rule}
catch
throw:{action_not_found, ActionName} ->
{error, {action_not_found, ActionName}};
throw:Reason ->
{error, Reason}
end;
Reason -> {error, Reason}
end.
-spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}).
update_rule(Params = #{id := RuleId}) ->
case emqx_rule_registry:get_rule(RuleId) of
{ok, Rule0} ->
Rule = may_update_rule_params(Rule0, Params),
ok = emqx_rule_registry:add_rule(Rule),
{ok, Rule};
try may_update_rule_params(Rule0, Params) of
Rule ->
ok = emqx_rule_registry:add_rule(Rule),
{ok, Rule}
catch
throw:Reason ->
{error, Reason}
end;
not_found ->
{error, {not_found, RuleId}}
end.
@ -234,9 +250,12 @@ start_resource(ResId) ->
{ok, #resource{type = ResType, config = Config}} ->
{ok, #resource_type{on_create = {Mod, Create}}}
= emqx_rule_registry:find_resource_type(ResType),
_ = init_resource(Mod, Create, ResId, Config),
refresh_actions_of_a_resource(ResId),
ok;
try
init_resource(Mod, Create, ResId, Config),
refresh_actions_of_a_resource(ResId)
catch
throw:Reason -> {error, Reason}
end;
not_found ->
{error, {resource_not_found, ResId}}
end.
@ -247,9 +266,12 @@ test_resource(#{type := Type, config := Config}) ->
{ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} ->
ok = emqx_rule_validator:validate_params(Config, ParamSpec),
ResId = resource_id(),
cluster_call(init_resource, [ModC, Create, ResId, Config]),
cluster_call(clear_resource, [ModD, Destroy, ResId]),
ok;
try
cluster_call(init_resource, [ModC, Create, ResId, Config]),
cluster_call(clear_resource, [ModD, Destroy, ResId])
catch
throw:Reason -> {error, Reason}
end;
not_found ->
{error, {resource_type_not_found, Type}}
end.
@ -299,36 +321,37 @@ ensure_resource_deleted(ResId) ->
-spec(refresh_resources() -> ok).
refresh_resources() ->
[try refresh_resource(Res)
catch _:Error:ST ->
logger:critical(
"Can not re-stablish resource ~p: ~0p. The resource is disconnected."
"Fix the issue and establish it manually.\n"
"Stacktrace: ~0p",
[ResId, Error, ST])
end || Res = #resource{id = ResId}
<- emqx_rule_registry:get_resources()],
ok.
lists:foreach(fun(#resource{id = ResId} = Res) ->
try refresh_resource(Res)
catch Error:Reason:ST ->
logger:critical(
"Can not re-stablish resource ~p: ~0p. The resource is disconnected."
"Fix the issue and establish it manually.\n"
"Stacktrace: ~0p",
[ResId, {Error,Reason}, ST])
end
end, emqx_rule_registry:get_resources()).
refresh_resource(Type) when is_atom(Type) ->
[refresh_resource(Resource)
|| Resource <- emqx_rule_registry:get_resources_by_type(Type)];
lists:foreach(fun(Resource) ->
refresh_resource(Resource)
end, emqx_rule_registry:get_resources_by_type(Type));
refresh_resource(#resource{id = ResId, config = Config, type = Type}) ->
{ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
cluster_call(init_resource, [M, F, ResId, Config]).
-spec(refresh_rules() -> ok).
refresh_rules() ->
[try refresh_rule(Rule)
catch _:Error:ST ->
logger:critical(
"Can not re-build rule ~p: ~0p. The rule is disabled."
"Fix the issue and enable it manually.\n"
"Stacktrace: ~0p",
[RuleId, Error, ST])
end || Rule = #rule{id = RuleId}
<- emqx_rule_registry:get_rules()],
ok.
lists:foreach(fun(#rule{id = RuleId} = Rule) ->
try refresh_rule(Rule)
catch Error:Reason:ST ->
logger:critical(
"Can not re-build rule ~p: ~0p. The rule is disabled."
"Fix the issue and enable it manually.\n"
"Stacktrace: ~0p",
[RuleId, {Error,Reason}, ST])
end
end, emqx_rule_registry:get_rules()).
refresh_rule(#rule{id = RuleId, actions = Actions}) ->
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
@ -389,7 +412,7 @@ may_update_rule_params(Rule, Params = #{rawsql := SQL}) ->
conditions = emqx_rule_sqlparser:select_where(Select)
},
maps:remove(rawsql, Params));
Error -> error(Error)
Reason -> throw(Reason)
end;
may_update_rule_params(Rule, Params = #{enabled := Enabled}) ->
Enabled andalso refresh_rule(Rule),
@ -443,7 +466,7 @@ cluster_call(Func, Args) ->
end;
{ResL, BadNodes} ->
?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
throw({func_fail(Func), {nodes_not_exist, BadNodes}})
throw({func_fail(Func), {failed_on_nodes, BadNodes}})
end.
init_resource(Module, OnCreate, ResId, Config) ->

View File

@ -17,13 +17,11 @@
-module(emqx_rule_engine_api).
-include("rule_engine.hrl").
-include("rule_events.hrl").
-include_lib("emqx/include/logger.hrl").
-import(minirest, [return/1]).
-logger_header("[RuleEngineAPI]").
%% A lot of case clause no_match:es from rule_events.hrl
-dialyzer(no_match).
-import(minirest, [return/1]).
-rest_api(#{name => create_rule,
method => 'POST',
@ -173,16 +171,19 @@
-define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))).
-define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))).
-define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))).
-define(ERR_NO_HOOK(HOOK), list_to_binary(io_lib:format("Event ~s Not Found", [(HOOK)]))).
-define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))).
-define(ERR_UNKNOWN_COLUMN(COLUMN), list_to_binary(io_lib:format("Unknown Column: ~s", [(COLUMN)]))).
-define(ERR_START_RESOURCE(RESID), list_to_binary(io_lib:format("Start Resource ~s Failed", [(RESID)]))).
-define(ERR_BADARGS(REASON),
begin
R0 = list_to_binary(io_lib:format("~0p", [REASON])),
<<"Bad Arguments: ", R0/binary>>
end).
-dialyzer({nowarn_function, [create_rule/2,
test_rule_sql/1,
do_create_rule/1,
update_rule/2
]}).
%%------------------------------------------------------------------------------
%% Rules API
%%------------------------------------------------------------------------------
@ -192,60 +193,31 @@ create_rule(_Bindings, Params) ->
Params).
test_rule_sql(Params) ->
try emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of
case emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of
{ok, Result} -> return({ok, Result});
{error, nomatch} -> return({error, 404, <<"SQL Not Match">>})
catch
throw:{invalid_hook, Hook} ->
return({error, 400, ?ERR_NO_HOOK(Hook)});
throw:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)});
_:{parse_error,{unknown_column, Column}, _} ->
return({error, 400, ?ERR_UNKNOWN_COLUMN(Column)});
_Error:Reason:StackT ->
?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]),
{error, nomatch} -> return({error, 404, <<"SQL Not Match">>});
{error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
do_create_rule(Params) ->
try emqx_rule_engine:create_rule(parse_rule_params(Params)) of
{ok, Rule} ->
return({ok, record_to_map(Rule)});
case emqx_rule_engine:create_rule(parse_rule_params(Params)) of
{ok, Rule} -> return({ok, record_to_map(Rule)});
{error, {action_not_found, ActionName}} ->
return({error, 400, ?ERR_NO_ACTION(ActionName)})
catch
throw:{resource_not_found, ResId} ->
return({error, 400, ?ERR_NO_RESOURCE(ResId)});
throw:{invalid_hook, Hook} ->
return({error, 400, ?ERR_NO_HOOK(Hook)});
throw:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)});
_:{parse_error,{unknown_column, Column}} ->
return({error, 400, ?ERR_UNKNOWN_COLUMN(Column)});
_Error:Reason:StackT ->
?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]),
return({error, 400, ?ERR_NO_ACTION(ActionName)});
{error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
update_rule(#{id := Id}, Params) ->
try emqx_rule_engine:update_rule(parse_rule_params(Params, #{id => Id})) of
{ok, Rule} ->
return({ok, record_to_map(Rule)});
case emqx_rule_engine:update_rule(parse_rule_params(Params, #{id => Id})) of
{ok, Rule} -> return({ok, record_to_map(Rule)});
{error, {not_found, RuleId}} ->
return({error, 400, ?ERR_NO_RULE(RuleId)});
{error, {action_not_found, ActionName}} ->
return({error, 400, ?ERR_NO_ACTION(ActionName)})
catch
throw:{resource_not_found, ResId} ->
return({error, 400, ?ERR_NO_RESOURCE(ResId)});
throw:{invalid_hook, Hook} ->
return({error, 400, ?ERR_NO_HOOK(Hook)});
throw:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)});
_:{parse_error,{unknown_column, Column}} ->
return({error, 400, ?ERR_UNKNOWN_COLUMN(Column)});
_Error:Reason:StackT ->
?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]),
{error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
@ -264,7 +236,9 @@ delete_rule(#{id := Id}, _Params) ->
%%------------------------------------------------------------------------------
list_actions(#{}, _Params) ->
return_all(emqx_rule_registry:get_actions()).
return_all(
sort_by_title(action,
emqx_rule_registry:get_actions())).
show_action(#{name := Name}, _Params) ->
reply_with(fun emqx_rule_registry:find_action/1, Name).
@ -278,24 +252,15 @@ create_resource(#{}, Params) ->
Params).
do_create_resource(Create, Params) ->
try emqx_rule_engine:Create(parse_resource_params(Params)) of
case emqx_rule_engine:Create(parse_resource_params(Params)) of
ok ->
return(ok);
{ok, Resource} ->
return({ok, record_to_map(Resource)});
{error, {resource_type_not_found, Type}} ->
return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)})
catch
throw:{resource_type_not_found, Type} ->
return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)});
throw:{init_resource_failure, Reason} ->
%% only test_resource would throw exceptions, create_resource won't
?LOG(error, "[RuleEngineAPI] test_resource_failure: ~p", [Reason]),
return({error, 500, <<"Test Creating Resource Failed">>});
throw:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)});
_Error:Reason:StackT ->
?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]),
{error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
@ -337,35 +302,19 @@ get_resource_status(#{id := Id}, _Params) ->
end.
start_resource(#{id := Id}, _Params) ->
try emqx_rule_engine:start_resource(Id) of
case emqx_rule_engine:start_resource(Id) of
ok ->
return(ok);
{error, {resource_not_found, ResId}} ->
return({error, 400, ?ERR_NO_RESOURCE(ResId)})
catch
throw:{{init_resource_failure, _}, Reason} ->
?LOG(error, "[RuleEngineAPI] init_resource_failure: ~p", [Reason]),
return({error, 400, ?ERR_START_RESOURCE(Id)});
throw:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)});
_Error:Reason:StackT ->
?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]),
return({error, 400, ?ERR_NO_RESOURCE(ResId)});
{error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
delete_resource(#{id := Id}, _Params) ->
try
ok = emqx_rule_engine:ensure_resource_deleted(Id),
return(ok)
catch
_Error:{throw,Reason} ->
return({error, 400, ?ERR_BADARGS(Reason)});
throw:Reason ->
return({error, 400, ?ERR_BADARGS(Reason)});
_Error:Reason:StackT ->
?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
ok = emqx_rule_engine:ensure_resource_deleted(Id),
return(ok).
%%------------------------------------------------------------------------------
%% Resource Types API
@ -385,7 +334,7 @@ show_resource_type(#{name := Name}, _Params) ->
%%------------------------------------------------------------------------------
list_events(#{}, _Params) ->
return({ok, ?EVENT_INFO}).
return({ok, emqx_rule_events:event_info()}).
%%------------------------------------------------------------------------------
%% Internal functions

View File

@ -91,7 +91,7 @@ unload() ->
%%-----------------------------------------------------------------------------
%% 'rules' command
%%-----------------------------------------------------------------------------
-dialyzer([{nowarn_function, [rules/1]}]).
rules(["list"]) ->
print_all(emqx_rule_registry:get_rules());

View File

@ -37,6 +37,11 @@
, on_message_acked/3
]).
-export([ event_info/0
, columns/1
, columns_with_exam/1
]).
-define(SUPPORTED_HOOK,
[ 'client.connected'
, 'client.disconnected'
@ -309,6 +314,239 @@ make_msg(QoS, Topic, Payload) ->
emqx_message:set_flags(#{sys => true, event => true},
emqx_message:make(emqx_events, QoS, Topic, iolist_to_binary(Payload))).
%%--------------------------------------------------------------------
%% Columns
%%--------------------------------------------------------------------
columns(Event) ->
[Key || {Key, _ExampleVal} <- columns_with_exam(Event)].
event_info() ->
[ event_info_message_publish()
, event_info_message_deliver()
, event_info_message_acked()
, event_info_message_dropped()
, event_info_client_connected()
, event_info_client_disconnected()
, event_info_session_subscribed()
, event_info_session_unsubscribed()
].
event_info_message_publish() ->
event_info_common(
'message.publish',
{<<"message publish">>, <<"消息发布"/utf8>>},
{<<"message publish">>, <<"消息发布"/utf8>>},
<<"SELECT payload.msg as msg FROM \"t/#\" WHERE msg = 'hello'">>
).
event_info_message_deliver() ->
event_info_common(
'message.delivered',
{<<"message delivered">>, <<"消息已投递"/utf8>>},
{<<"message delivered">>, <<"消息已投递"/utf8>>},
<<"SELECT * FROM \"$events/message_delivered\" WHERE topic =~ 't/#'">>
).
event_info_message_acked() ->
event_info_common(
'message.acked',
{<<"message acked">>, <<"消息应答"/utf8>>},
{<<"message acked">>, <<"消息应答"/utf8>>},
<<"SELECT * FROM \"$events/message_acked\" WHERE topic =~ 't/#'">>
).
event_info_message_dropped() ->
event_info_common(
'message.dropped',
{<<"message dropped">>, <<"消息丢弃"/utf8>>},
{<<"message dropped">>, <<"消息丢弃"/utf8>>},
<<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">>
).
event_info_client_connected() ->
event_info_common(
'client.connected',
{<<"client connected">>, <<"连接建立"/utf8>>},
{<<"client connected">>, <<"连接建立"/utf8>>},
<<"SELECT * FROM \"$events/client_connected\"">>
).
event_info_client_disconnected() ->
event_info_common(
'client.disconnected',
{<<"client disconnected">>, <<"连接断开"/utf8>>},
{<<"client disconnected">>, <<"连接断开"/utf8>>},
<<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">>
).
event_info_session_subscribed() ->
event_info_common(
'session.subscribed',
{<<"session subscribed">>, <<"会话订阅完成"/utf8>>},
{<<"session subscribed">>, <<"会话订阅完成"/utf8>>},
<<"SELECT * FROM \"$events/session_subscribed\" WHERE topic =~ 't/#'">>
).
event_info_session_unsubscribed() ->
event_info_common(
'session.unsubscribed',
{<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>},
{<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>},
<<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">>
).
event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
#{event => event_topic(Event),
title => #{en => TitleEN, zh => TitleZH},
description => #{en => DescrEN, zh => DescrZH},
test_columns => test_columns(Event),
columns => columns(Event),
sql_example => SqlExam
}.
test_columns('message.dropped') ->
test_columns('message.publish');
test_columns('message.publish') ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
];
test_columns('message.acked') ->
test_columns('message.delivered');
test_columns('message.delivered') ->
[ {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
];
test_columns('client.connected') ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peername">>, <<"127.0.0.1:52918">>}
];
test_columns('client.disconnected') ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"reason">>, <<"normal">>}
];
test_columns('session.unsubscribed') ->
test_columns('session.subscribed');
test_columns('session.subscribed') ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
].
columns_with_exam('message.publish') ->
[ {<<"event">>, 'message.publish'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"headers">>, undefined}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('message.delivered') ->
[ {<<"event">>, 'message.delivered'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('message.acked') ->
[ {<<"event">>, 'message.acked'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('message.dropped') ->
[ {<<"event">>, 'message.dropped'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"reason">>, no_subscribers}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('client.connected') ->
[ {<<"event">>, 'client.connected'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"mountpoint">>, undefined}
, {<<"peername">>, <<"192.168.0.10:56431">>}
, {<<"sockname">>, <<"0.0.0.0:1883">>}
, {<<"proto_name">>, <<"MQTT">>}
, {<<"proto_ver">>, 5}
, {<<"keepalive">>, 60}
, {<<"clean_start">>, true}
, {<<"expiry_interval">>, 3600}
, {<<"is_bridge">>, false}
, {<<"connected_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('client.disconnected') ->
[ {<<"event">>, 'client.disconnected'}
, {<<"reason">>, normal}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peername">>, <<"192.168.0.10:56431">>}
, {<<"sockname">>, <<"0.0.0.0:1883">>}
, {<<"disconnected_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('session.subscribed') ->
[ {<<"event">>, 'session.subscribed'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('session.unsubscribed') ->
[ {<<"event">>, 'session.unsubscribed'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
].
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
@ -354,7 +592,8 @@ event_topic('session.subscribed') -> <<"$events/session_subscribed">>;
event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
event_topic('message.delivered') -> <<"$events/message_delivered">>;
event_topic('message.acked') -> <<"$events/message_acked">>;
event_topic('message.dropped') -> <<"$events/message_dropped">>.
event_topic('message.dropped') -> <<"$events/message_dropped">>;
event_topic('message.publish') -> <<"$events/message_publish">>.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->

View File

@ -106,11 +106,11 @@
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(create_rule_metrics(rule_id()) -> Ref :: counters:counters_ref()).
-spec(create_rule_metrics(rule_id()) -> ok).
create_rule_metrics(Id) ->
gen_server:call(?MODULE, {create_rule_metrics, Id}).
-spec(create_metrics(rule_id()) -> Ref :: counters:counters_ref()).
-spec(create_metrics(rule_id()) -> ok).
create_metrics(Id) ->
gen_server:call(?MODULE, {create_metrics, Id}).

View File

@ -19,7 +19,6 @@
-behaviour(gen_server).
-include("rule_engine.hrl").
-include("rule_events.hrl").
-include_lib("emqx/include/logger.hrl").
-export([start_link/0]).
@ -166,14 +165,10 @@ start_link() ->
get_rules() ->
get_all_records(?RULE_TAB).
%% TODO: emqx_rule_utils:can_topic_match_oneof(Topic::any(), For::atom())
%% will never return since it differs in the 2nd argument from the success
%% typing arguments: (any(), [binary() | ['' | '#' | '+' | binary()]])
-dialyzer([{nowarn_function, get_rules_for/1}]).
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_for(Topic) ->
[Rule || Rule = #rule{for = For} <- get_rules(),
Topic =:= For orelse emqx_rule_utils:can_topic_match_oneof(Topic, For)].
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
-spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found).
get_rule(Id) ->
@ -333,10 +328,14 @@ remove_resource_params(ResId) ->
%% @private
delete_resource(ResId) ->
%% TODO, change to foreache:s
_ = [[ResId =:= ResId1 andalso throw({dependency_exists, {rule, Id}})
|| #action_instance{args = #{<<"$resource">> := ResId1}} <- Actions]
|| #rule{id = Id, actions = Actions} <- get_rules()],
lists:foreach(fun(#rule{id = Id, actions = Actions}) ->
lists:foreach(
fun (#action_instance{args = #{<<"$resource">> := ResId1}})
when ResId =:= ResId1 ->
throw({dependency_exists, {rule, Id}});
(_) -> ok
end, Actions)
end, get_rules()),
mnesia:delete(?RES_TAB, ResId, write).
%% @private

View File

@ -71,9 +71,6 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
apply_rules(More, Input).
apply_rule_discard_result(Rule, Input) ->
%% TODO check if below two clauses are ok to discard:
%% {'error','nomatch'}
%% {'ok',[any()]}
_ = apply_rule(Rule, Input),
ok.

View File

@ -17,7 +17,6 @@
-module(emqx_rule_sqlparser).
-include("rule_engine.hrl").
-include("rule_events.hrl").
-export([parse_select/1]).

View File

@ -15,7 +15,6 @@
-module(emqx_rule_sqltester).
-include("rule_engine.hrl").
-include("rule_events.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ test/1
@ -27,10 +26,11 @@
test_rule/4,
flatten/1,
sql_test_action/0,
fill_default_values/2
fill_default_values/2,
envs_examp/1
]}).
-spec(test(#{}) -> {ok, Result::map()} | no_return()).
-spec(test(#{}) -> {ok, map() | list()} | {error, term()}).
test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
{ok, Select} = emqx_rule_sqlparser:parse_select(Sql),
InTopic = maps:get(<<"topic">>, Context, <<>>),
@ -96,4 +96,23 @@ sql_test_action() ->
end.
fill_default_values(Event, Context) ->
maps:merge(?EG_ENVS(Event), Context).
maps:merge(envs_examp(Event), Context).
envs_examp(<<"$events/", _/binary>> = EVENT_TOPIC) ->
EventName = emqx_rule_events:event_name(EVENT_TOPIC),
emqx_rule_maps:atom_key_map(
maps:from_list(
emqx_rule_events:columns_with_exam(EventName)));
envs_examp(_) ->
#{id => emqx_guid:to_hexstr(emqx_guid:gen()),
clientid => <<"c_emqx">>,
username => <<"u_emqx">>,
payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,
peerhost => <<"127.0.0.1">>,
topic => <<"t/a">>,
qos => 1,
flags => #{sys => true, event => true},
publish_received_at => emqx_rule_utils:now_ms(),
timestamp => emqx_rule_utils:now_ms(),
node => node()
}.

View File

@ -67,9 +67,7 @@
-type(tmpl_cmd() :: list(tmpl_token())).
-type(prepare_statement() :: binary()).
-type(prepare_params() :: fun((binary()) -> list())).
-type(prepare_statement_key() :: binary()).
%% preprocess template string with place holders
-spec(preproc_tmpl(binary()) -> tmpl_token()).
@ -94,6 +92,7 @@ put_head(Type, Term, List) ->
proc_tmpl(Tokens, Data) ->
proc_tmpl(Tokens, Data, #{return => full_binary}).
-spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()).
proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) ->
Trans = maps:get(var_trans, Opts, fun bin/1),
list_to_binary(
@ -115,19 +114,21 @@ preproc_cmd(Str) ->
SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return,binary},trim]),
[preproc_tmpl(SubStr) || SubStr <- SubStrList].
-spec(proc_cmd([tmpl_token()], map()) -> binary() | list()).
proc_cmd(Tokens, Data) ->
proc_cmd(Tokens, Data, #{return => full_binary}).
-spec(proc_cmd([tmpl_token()], map(), map()) -> list()).
proc_cmd(Tokens, Data, Opts) ->
[proc_tmpl(Tks, Data, Opts) || Tks <- Tokens].
%% preprocess SQL with place holders
-spec(preproc_sql(Sql::binary()) -> {prepare_statement(), prepare_params()}).
-dialyzer({nowarn_function,preproc_sql/1}).
-spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}).
preproc_sql(Sql) ->
preproc_sql(Sql, '?').
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n') -> {prepare_statement(), prepare_params()}).
-dialyzer({nowarn_function,preproc_sql/2}).
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n')
-> {prepare_statement_key(), tmpl_token()}).
preproc_sql(Sql, ReplaceWith) ->
case re:run(Sql, ?EX_PLACE_HOLDER, [{capture, all_but_first, binary}, global]) of
{match, PlaceHolders} ->
@ -330,5 +331,6 @@ now_ms() ->
erlang:system_time(millisecond).
can_topic_match_oneof(Topic, Filters) ->
MatchedFilters = [Fltr || Fltr <- Filters, emqx_topic:match(Topic, Fltr)],
length(MatchedFilters) > 0.
lists:any(fun(Fltr) ->
emqx_topic:match(Topic, Fltr)
end, Filters).

View File

@ -2378,7 +2378,7 @@ start_apps() ->
[start_apps(App, SchemaFile, ConfigFile) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, deps_path(emqx, "priv/emqx.schema"),
deps_path(emqx, "etc/gen.emqx.conf")},
deps_path(emqx, "etc/emqx.conf")},
{emqx_rule_engine, local_path("priv/emqx_rule_engine.schema"),
local_path("etc/emqx_rule_engine.conf")}]].
@ -2424,6 +2424,7 @@ set_special_configs(_App) ->
ok.
print_mock() ->
catch meck:unload(emqx_ctl),
meck:new(emqx_ctl, [non_strict, passthrough]),
meck:expect(emqx_ctl, print, fun(Arg) -> emqx_ctl:format(Arg) end),
meck:expect(emqx_ctl, print, fun(Msg, Arg) -> emqx_ctl:format(Msg, Arg) end),

View File

@ -121,12 +121,8 @@
zh => <<"将数据转发给 Web 服务"/utf8>>}
}).
-type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())).
-type(url() :: binary()).
-export_type([action_fun/0]).
-export([ on_resource_create/2
, on_get_resource_status/2
, on_resource_destroy/2
@ -166,7 +162,7 @@ on_resource_destroy(_ResId, _Params) ->
ok.
%% An action that forwards publish messages to a remote web server.
-spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> action_fun()).
-spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> {bindings(), NewParams :: map()}).
on_action_create_data_to_webserver(Id, Params) ->
#{url := Url, headers := Headers, method := Method, content_type := ContentType, payload_tmpl := PayloadTmpl, path := Path}
= parse_action_params(Params),