fix(rules): improve specs and logs (#5821)

Co-authored-by: Zaiming Shi <zmstone@gmail.com>
This commit is contained in:
Shawn 2021-09-28 03:10:48 +08:00 committed by GitHub
parent e2721c144c
commit a9185f964e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 199 additions and 146 deletions

View File

@ -189,7 +189,8 @@ restart_bridge(Type, Name) ->
emqx_resource:restart(resource_id(Type, Name)). emqx_resource:restart(resource_id(Type, Name)).
create_bridge(Type, Name, Conf) -> create_bridge(Type, Name, Conf) ->
logger:info("create ~p bridge ~p use config: ~p", [Type, Name, Conf]), ?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
ResId = resource_id(Type, Name), ResId = resource_id(Type, Name),
case emqx_resource:create(ResId, case emqx_resource:create(ResId,
emqx_bridge:resource_type(Type), Conf) of emqx_bridge:resource_type(Type), Conf) of
@ -210,12 +211,13 @@ update_bridge(Type, Name, {_OldConf, Conf}) ->
%% `egress_channels` are changed, then we should not restart the bridge, we only restart/start %% `egress_channels` are changed, then we should not restart the bridge, we only restart/start
%% the channels. %% the channels.
%% %%
logger:info("update ~p bridge ~p use config: ~p", [Type, Name, Conf]), ?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
config => Conf}),
emqx_resource:recreate(resource_id(Type, Name), emqx_resource:recreate(resource_id(Type, Name),
emqx_bridge:resource_type(Type), Conf, []). emqx_bridge:resource_type(Type), Conf, []).
remove_bridge(Type, Name, _Conf) -> remove_bridge(Type, Name, _Conf) ->
logger:info("remove ~p bridge ~p", [Type, Name]), ?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
case emqx_resource:remove(resource_id(Type, Name)) of case emqx_resource:remove(resource_id(Type, Name)) of
ok -> ok; ok -> ok;
{error, not_found} -> ok; {error, not_found} -> ok;

View File

@ -130,7 +130,8 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
retry_interval := RetryInterval, retry_interval := RetryInterval,
pool_type := PoolType, pool_type := PoolType,
pool_size := PoolSize} = Config) -> pool_size := PoolSize} = Config) ->
logger:info("starting http connector: ~p, config: ~p", [InstId, Config]), ?SLOG(info, #{msg => "starting http connector",
connector => InstId, config => Config}),
{Transport, TransportOpts} = case Scheme of {Transport, TransportOpts} = case Scheme of
http -> http ->
{tcp, []}; {tcp, []};
@ -166,7 +167,8 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
end. end.
on_stop(InstId, #{pool_name := PoolName}) -> on_stop(InstId, #{pool_name := PoolName}) ->
logger:info("stopping http connector: ~p", [InstId]), ?SLOG(info, #{msg => "stopping http connector",
connector => InstId}),
ehttpc_sup:stop_pool(PoolName). ehttpc_sup:stop_pool(PoolName).
on_query(InstId, {send_message, ChannelId, Msg}, AfterQuery, #{channels := Channels} = State) -> on_query(InstId, {send_message, ChannelId, Msg}, AfterQuery, #{channels := Channels} = State) ->
@ -181,16 +183,20 @@ on_query(InstId, {Method, Request}, AfterQuery, State) ->
on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State); on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State);
on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) ->
on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State); on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State);
on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, #{pool_name := PoolName, on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
base_path := BasePath} = State) -> #{pool_name := PoolName, base_path := BasePath} = State) ->
logger:debug("http connector ~p received request: ~p, at state: ~p", [InstId, Request, State]), ?SLOG(debug, #{msg => "http connector received request",
request => Request, connector => InstId,
state => State}),
NRequest = update_path(BasePath, Request), NRequest = update_path(BasePath, Request),
case Result = ehttpc:request(case KeyOrNum of case Result = ehttpc:request(case KeyOrNum of
undefined -> PoolName; undefined -> PoolName;
_ -> {PoolName, KeyOrNum} _ -> {PoolName, KeyOrNum}
end, Method, NRequest, Timeout) of end, Method, NRequest, Timeout) of
{error, Reason} -> {error, Reason} ->
logger:debug("http connector ~p do reqeust failed, sql: ~p, reason: ~p", [InstId, NRequest, Reason]), ?SLOG(error, #{msg => "http connector do reqeust failed",
request => NRequest, reason => Reason,
connector => InstId}),
emqx_resource:query_failed(AfterQuery); emqx_resource:query_failed(AfterQuery);
_ -> _ ->
emqx_resource:query_success(AfterQuery) emqx_resource:query_success(AfterQuery)

View File

@ -18,6 +18,7 @@
-include("emqx_connector.hrl"). -include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-export([roots/0, fields/1]). -export([roots/0, fields/1]).
@ -53,7 +54,8 @@ on_start(InstId, #{servers := Servers0,
pool_size := PoolSize, pool_size := PoolSize,
auto_reconnect := AutoReconn, auto_reconnect := AutoReconn,
ssl := SSL} = Config) -> ssl := SSL} = Config) ->
logger:info("starting ldap connector: ~p, config: ~p", [InstId, Config]), ?SLOG(info, #{msg => "starting ldap connector",
connector => InstId, config => Config}),
Servers = [begin proplists:get_value(host, S) end || S <- Servers0], Servers = [begin proplists:get_value(host, S) end || S <- Servers0],
SslOpts = case maps:get(enable, SSL) of SslOpts = case maps:get(enable, SSL) of
true -> true ->
@ -75,14 +77,20 @@ on_start(InstId, #{servers := Servers0,
{ok, #{poolname => PoolName}}. {ok, #{poolname => PoolName}}.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
logger:info("stopping ldap connector: ~p", [InstId]), ?SLOG(info, #{msg => "stopping ldap connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("ldap connector ~p received request: ~p, at state: ~p", [InstId, {Base, Filter, Attributes}, State]), Request = {Base, Filter, Attributes},
?SLOG(debug, #{msg => "ldap connector received request",
request => Request, connector => InstId,
state => State}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of
{error, Reason} -> {error, Reason} ->
logger:debug("ldap connector ~p do request failed, request: ~p, reason: ~p", [InstId, {Base, Filter, Attributes}, Reason]), ?SLOG(error, #{msg => "ldap connector do request failed",
request => Request, connector => InstId,
reason => Reason}),
emqx_resource:query_failed(AfterQuery); emqx_resource:query_failed(AfterQuery);
_ -> _ ->
emqx_resource:query_success(AfterQuery) emqx_resource:query_success(AfterQuery)

View File

@ -18,6 +18,7 @@
-include("emqx_connector.hrl"). -include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-type server() :: emqx_schema:ip_port(). -type server() :: emqx_schema:ip_port().
-reflect_type([server/0]). -reflect_type([server/0]).
@ -93,7 +94,8 @@ on_jsonify(Config) ->
%% =================================================================== %% ===================================================================
on_start(InstId, Config = #{server := Server, on_start(InstId, Config = #{server := Server,
mongo_type := single}) -> mongo_type := single}) ->
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), ?SLOG(info, #{msg => "starting mongodb single connector",
connector => InstId, config => Config}),
Opts = [{type, single}, Opts = [{type, single},
{hosts, [emqx_connector_schema_lib:ip_port_to_string(Server)]} {hosts, [emqx_connector_schema_lib:ip_port_to_string(Server)]}
], ],
@ -102,7 +104,8 @@ on_start(InstId, Config = #{server := Server,
on_start(InstId, Config = #{servers := Servers, on_start(InstId, Config = #{servers := Servers,
mongo_type := rs, mongo_type := rs,
replica_set_name := RsName}) -> replica_set_name := RsName}) ->
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), ?SLOG(info, #{msg => "starting mongodb rs connector",
connector => InstId, config => Config}),
Opts = [{type, {rs, RsName}}, Opts = [{type, {rs, RsName}},
{hosts, [emqx_connector_schema_lib:ip_port_to_string(S) {hosts, [emqx_connector_schema_lib:ip_port_to_string(S)
|| S <- Servers]} || S <- Servers]}
@ -111,7 +114,8 @@ on_start(InstId, Config = #{servers := Servers,
on_start(InstId, Config = #{servers := Servers, on_start(InstId, Config = #{servers := Servers,
mongo_type := sharded}) -> mongo_type := sharded}) ->
logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), ?SLOG(info, #{msg => "starting mongodb sharded connector",
connector => InstId, config => Config}),
Opts = [{type, sharded}, Opts = [{type, sharded},
{hosts, [emqx_connector_schema_lib:ip_port_to_string(S) {hosts, [emqx_connector_schema_lib:ip_port_to_string(S)
|| S <- Servers]} || S <- Servers]}
@ -119,14 +123,20 @@ on_start(InstId, Config = #{servers := Servers,
do_start(InstId, Opts, Config). do_start(InstId, Opts, Config).
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
logger:info("stopping mongodb connector: ~p", [InstId]), ?SLOG(info, #{msg => "stopping mongodb connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("mongodb connector ~p received request: ~p, at state: ~p", [InstId, {Action, Collection, Selector, Docs}, State]), Request = {Action, Collection, Selector, Docs},
?SLOG(debug, #{msg => "mongodb connector received request",
request => Request, connector => InstId,
state => State}),
case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of
{error, Reason} -> {error, Reason} ->
logger:debug("mongodb connector ~p do sql query failed, request: ~p, reason: ~p", [InstId, {Action, Collection, Selector, Docs}, Reason]), ?SLOG(error, #{msg => "mongodb connector do query failed",
request => Request, reason => Reason,
connector => InstId}),
emqx_resource:query_failed(AfterQuery), emqx_resource:query_failed(AfterQuery),
{error, Reason}; {error, Reason};
{ok, Cursor} when is_pid(Cursor) -> {ok, Cursor} when is_pid(Cursor) ->

View File

@ -17,6 +17,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(supervisor). -behaviour(supervisor).
@ -88,13 +89,14 @@ drop_bridge(Name) ->
%% =================================================================== %% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called %% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
%% if the bridge received msgs from the remote broker. %% if the bridge received msgs from the remote broker.
on_message_received(Msg, ChannelName) -> on_message_received(Msg, ChannId) ->
Name = atom_to_binary(ChannelName, utf8), Name = atom_to_binary(ChannId, utf8),
emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]). emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]).
%% =================================================================== %% ===================================================================
on_start(InstId, Conf) -> on_start(InstId, Conf) ->
logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), ?SLOG(info, #{msg => "starting mqtt connector",
connector => InstId, config => Conf}),
"bridge:" ++ NamePrefix = binary_to_list(InstId), "bridge:" ++ NamePrefix = binary_to_list(InstId),
BasicConf = basic_config(Conf), BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}}, InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}},
@ -111,7 +113,8 @@ on_start(InstId, Conf) ->
end, InitRes, InOutConfigs). end, InitRes, InOutConfigs).
on_stop(InstId, #{channels := NameList}) -> on_stop(InstId, #{channels := NameList}) ->
logger:info("stopping mqtt connector: ~p", [InstId]), ?SLOG(info, #{msg => "stopping mqtt connector",
connector => InstId}),
lists:foreach(fun(Name) -> lists:foreach(fun(Name) ->
remove_channel(Name) remove_channel(Name)
end, NameList). end, NameList).
@ -122,7 +125,8 @@ on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
baisc_conf := BasicConf}) -> baisc_conf := BasicConf}) ->
create_channel(Conf, Prefix, BasicConf); create_channel(Conf, Prefix, BasicConf);
on_query(_InstId, {send_message, ChannelId, Msg}, _AfterQuery, _State) -> on_query(_InstId, {send_message, ChannelId, Msg}, _AfterQuery, _State) ->
logger:debug("send msg to remote node on channel: ~p, msg: ~p", [ChannelId, Msg]), ?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
channel_id => ChannelId}),
emqx_connector_mqtt_worker:send_to_remote(ChannelId, Msg). emqx_connector_mqtt_worker:send_to_remote(ChannelId, Msg).
on_health_check(_InstId, #{channels := NameList} = State) -> on_health_check(_InstId, #{channels := NameList} = State) ->
@ -135,35 +139,43 @@ on_health_check(_InstId, #{channels := NameList} = State) ->
create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT} = Conf}, create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT} = Conf},
NamePrefix, BasicConf) -> NamePrefix, BasicConf) ->
LocalT = maps:get(local_topic, Conf, undefined), LocalT = maps:get(local_topic, Conf, undefined),
Name = ingress_channel_name(NamePrefix, Id), ChannId = ingress_channel_id(NamePrefix, Id),
logger:info("creating ingress channel ~p, remote ~s -> local ~s", [Name, RemoteT, LocalT]), ?SLOG(info, #{msg => "creating ingress channel",
remote_topic => RemoteT,
local_topic => LocalT,
channel_id => ChannId}),
do_create_channel(BasicConf#{ do_create_channel(BasicConf#{
name => Name, name => ChannId,
clientid => clientid(Name), clientid => clientid(ChannId),
subscriptions => Conf#{ subscriptions => Conf#{
local_topic => LocalT, local_topic => LocalT,
on_message_received => {fun ?MODULE:on_message_received/2, [Name]} on_message_received => {fun ?MODULE:on_message_received/2, [ChannId]}
}, },
forwards => undefined}); forwards => undefined});
create_channel({{egress_channels, Id}, #{remote_topic := RemoteT} = Conf}, create_channel({{egress_channels, Id}, #{remote_topic := RemoteT} = Conf},
NamePrefix, BasicConf) -> NamePrefix, BasicConf) ->
LocalT = maps:get(subscribe_local_topic, Conf, undefined), LocalT = maps:get(subscribe_local_topic, Conf, undefined),
Name = egress_channel_name(NamePrefix, Id), ChannId = egress_channel_id(NamePrefix, Id),
logger:info("creating egress channel ~p, local ~s -> remote ~s", [Name, LocalT, RemoteT]), ?SLOG(info, #{msg => "creating egress channel",
remote_topic => RemoteT,
local_topic => LocalT,
channel_id => ChannId}),
do_create_channel(BasicConf#{ do_create_channel(BasicConf#{
name => Name, name => ChannId,
clientid => clientid(Name), clientid => clientid(ChannId),
subscriptions => undefined, subscriptions => undefined,
forwards => Conf#{subscribe_local_topic => LocalT}}). forwards => Conf#{subscribe_local_topic => LocalT}}).
remove_channel(ChannelName) -> remove_channel(ChannId) ->
logger:info("removing channel ~p", [ChannelName]), ?SLOG(info, #{msg => "removing channel",
case ?MODULE:drop_bridge(ChannelName) of channel_id => ChannId}),
case ?MODULE:drop_bridge(ChannId) of
ok -> ok; ok -> ok;
{error, not_found} -> ok; {error, not_found} -> ok;
{error, Reason} -> {error, Reason} ->
logger:error("stop channel ~p failed, error: ~p", [ChannelName, Reason]) ?SLOG(error, #{msg => "stop channel failed",
channel_id => ChannId, reason => Reason})
end. end.
do_create_channel(#{name := Name} = Conf) -> do_create_channel(#{name := Name} = Conf) ->
@ -216,9 +228,9 @@ basic_config(#{
taged_map_list(Tag, Map) -> taged_map_list(Tag, Map) ->
[{{Tag, K}, V} || {K, V} <- maps:to_list(Map)]. [{{Tag, K}, V} || {K, V} <- maps:to_list(Map)].
ingress_channel_name(Prefix, Id) -> ingress_channel_id(Prefix, Id) ->
channel_name("ingress_channels", Prefix, Id). channel_name("ingress_channels", Prefix, Id).
egress_channel_name(Prefix, Id) -> egress_channel_id(Prefix, Id) ->
channel_name("egress_channels", Prefix, Id). channel_name("egress_channels", Prefix, Id).
channel_name(Type, Prefix, Id) -> channel_name(Type, Prefix, Id) ->

View File

@ -17,6 +17,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([ on_start/2
@ -54,7 +55,8 @@ on_start(InstId, #{server := {Host, Port},
auto_reconnect := AutoReconn, auto_reconnect := AutoReconn,
pool_size := PoolSize, pool_size := PoolSize,
ssl := SSL } = Config) -> ssl := SSL } = Config) ->
logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]), ?SLOG(info, #{msg => "starting mysql connector",
connector => InstId, config => Config}),
SslOpts = case maps:get(enable, SSL) of SslOpts = case maps:get(enable, SSL) of
true -> true ->
[{ssl, [{server_name_indication, disable} | [{ssl, [{server_name_indication, disable} |
@ -73,16 +75,19 @@ on_start(InstId, #{server := {Host, Port},
{ok, #{poolname => PoolName}}. {ok, #{poolname => PoolName}}.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
logger:info("stopping mysql connector: ~p", [InstId]), ?SLOG(info, #{msg => "stopping mysql connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State); on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State);
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), ?SLOG(debug, #{msg => "mysql connector received sql query",
connector => InstId, sql => SQL, state => State}),
case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params]}, no_handover) of
{error, Reason} -> {error, Reason} ->
logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), ?SLOG(error, #{msg => "mysql connector do sql query failed",
connector => InstId, sql => SQL, reason => Reason}),
emqx_resource:query_failed(AfterQuery); emqx_resource:query_failed(AfterQuery);
_ -> _ ->
emqx_resource:query_success(AfterQuery) emqx_resource:query_success(AfterQuery)

View File

@ -17,6 +17,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-export([roots/0, fields/1]). -export([roots/0, fields/1]).
@ -54,7 +55,8 @@ on_start(InstId, #{server := {Host, Port},
auto_reconnect := AutoReconn, auto_reconnect := AutoReconn,
pool_size := PoolSize, pool_size := PoolSize,
ssl := SSL } = Config) -> ssl := SSL } = Config) ->
logger:info("starting postgresql connector: ~p, config: ~p", [InstId, Config]), ?SLOG(info, #{msg => "starting postgresql connector",
connector => InstId, config => Config}),
SslOpts = case maps:get(enable, SSL) of SslOpts = case maps:get(enable, SSL) of
true -> true ->
[{ssl, [{server_name_indication, disable} | [{ssl, [{server_name_indication, disable} |
@ -73,16 +75,20 @@ on_start(InstId, #{server := {Host, Port},
{ok, #{poolname => PoolName}}. {ok, #{poolname => PoolName}}.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
logger:info("stopping postgresql connector: ~p", [InstId]), ?SLOG(info, #{msg => "stopping postgresql connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) ->
on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State); on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State);
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("postgresql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), ?SLOG(debug, #{msg => "postgresql connector received sql query",
connector => InstId, sql => SQL, state => State}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of
{error, Reason} -> {error, Reason} ->
logger:debug("postgresql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), ?SLOG(error, #{
msg => "postgresql connector do sql query failed",
connector => InstId, sql => SQL, reason => Reason}),
emqx_resource:query_failed(AfterQuery); emqx_resource:query_failed(AfterQuery);
_ -> _ ->
emqx_resource:query_success(AfterQuery) emqx_resource:query_success(AfterQuery)

View File

@ -18,6 +18,7 @@
-include("emqx_connector.hrl"). -include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
-include_lib("emqx/include/logger.hrl").
-type server() :: tuple(). -type server() :: tuple().
@ -85,7 +86,8 @@ on_start(InstId, #{redis_type := Type,
pool_size := PoolSize, pool_size := PoolSize,
auto_reconnect := AutoReconn, auto_reconnect := AutoReconn,
ssl := SSL } = Config) -> ssl := SSL } = Config) ->
logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]), ?SLOG(info, #{msg => "starting redis connector",
connector => InstId, config => Config}),
Servers = case Type of Servers = case Type of
single -> [{servers, [maps:get(server, Config)]}]; single -> [{servers, [maps:get(server, Config)]}];
_ ->[{servers, maps:get(servers, Config)}] _ ->[{servers, maps:get(servers, Config)}]
@ -116,18 +118,21 @@ on_start(InstId, #{redis_type := Type,
{ok, #{poolname => PoolName, type => Type}}. {ok, #{poolname => PoolName, type => Type}}.
on_stop(InstId, #{poolname := PoolName}) -> on_stop(InstId, #{poolname := PoolName}) ->
logger:info("stopping redis connector: ~p", [InstId]), ?SLOG(info, #{msg => "stopping redis connector",
connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) ->
logger:debug("redis connector ~p received cmd query: ~p, at state: ~p", [InstId, Command, State]), ?SLOG(debug, #{msg => "redis connector received cmd query",
connector => InstId, sql => Command, state => State}),
Result = case Type of Result = case Type of
cluster -> eredis_cluster:q(PoolName, Command); cluster -> eredis_cluster:q(PoolName, Command);
_ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) _ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover)
end, end,
case Result of case Result of
{error, Reason} -> {error, Reason} ->
logger:debug("redis connector ~p do cmd query failed, cmd: ~p, reason: ~p", [InstId, Command, Reason]), ?SLOG(error, #{msg => "redis connector do cmd query failed",
connector => InstId, sql => Command, reason => Reason}),
emqx_resource:query_failed(AfterCommand); emqx_resource:query_failed(AfterCommand);
_ -> _ ->
emqx_resource:query_success(AfterCommand) emqx_resource:query_success(AfterCommand)

View File

@ -155,14 +155,18 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS -> RC =:= ?RC_NO_MATCHING_SUBSCRIBERS ->
Parent ! {batch_ack, PktId}, ok; Parent ! {batch_ack, PktId}, ok;
handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
?LOG(warning, "publish ~p to remote node falied, reason_code: ~p", [PktId, RC]). ?SLOG(warning, #{msg => "publish to remote node falied",
packet_id => PktId, reason_code => RC}).
handle_publish(Msg, undefined) -> handle_publish(Msg, undefined) ->
?LOG(error, "cannot publish to local broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Msg]); ?SLOG(error, #{msg => "cannot publish to local broker as"
" ingress_channles' is not configured",
message => Msg});
handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) -> handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) ->
?LOG(debug, "publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]), ?SLOG(debug, #{msg => "publish to local broker",
message => Msg, vars => Vars}),
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
_ = erlang:apply(OnMsgRcvdFunc, [Msg] ++ Args), _ = erlang:apply(OnMsgRcvdFunc, [Msg | Args]),
case maps:get(local_topic, Vars, undefined) of case maps:get(local_topic, Vars, undefined) of
undefined -> ok; undefined -> ok;
_Topic -> _Topic ->

View File

@ -63,6 +63,7 @@
-behaviour(gen_statem). -behaviour(gen_statem).
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/logger.hrl").
%% APIs %% APIs
-export([ start_link/1 -export([ start_link/1
@ -189,7 +190,8 @@ callback_mode() -> [state_functions].
%% @doc Config should be a map(). %% @doc Config should be a map().
init(#{name := Name} = ConnectOpts) -> init(#{name := Name} = ConnectOpts) ->
?LOG(debug, "starting bridge worker for ~p", [Name]), ?SLOG(debug, #{msg => "starting bridge worker",
name => Name}),
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})),
State = init_state(ConnectOpts), State = init_state(ConnectOpts),
@ -335,8 +337,9 @@ common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) ->
NewQ = replayq:append(Q, [Msg]), NewQ = replayq:append(Q, [Msg]),
{keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
common(StateName, Type, Content, #{name := Name} = State) -> common(StateName, Type, Content, #{name := Name} = State) ->
?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:~p", ?SLOG(notice, #{msg => "Bridge discarded event",
[Name, Type, StateName, Content]), name => Name, type => Type, state_name => StateName,
content => Content}),
{keep_state, State}. {keep_state, State}.
do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards}, do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards},
@ -352,8 +355,8 @@ do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards},
{ok, State#{connection => Conn}}; {ok, State#{connection => Conn}};
{error, Reason} -> {error, Reason} ->
ConnectOpts1 = obfuscate(ConnectOpts), ConnectOpts1 = obfuscate(ConnectOpts),
?LOG(error, "Failed to connect \n" ?SLOG(error, #{msg => "Failed to connect",
"config=~p\nreason:~p", [ConnectOpts1, Reason]), config => ConnectOpts1, reason => Reason}),
{error, Reason, State} {error, Reason, State}
end. end.
@ -399,7 +402,9 @@ pop_and_send_loop(#{replayq := Q} = State, N) ->
%% Assert non-empty batch because we have a is_empty check earlier. %% Assert non-empty batch because we have a is_empty check earlier.
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) -> do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) ->
?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt.<name>.in' not configured, msg: ~p", [Batch]); ?SLOG(error, #{msg => "cannot forward messages to remote broker"
" as egress_channel is not configured",
messages => Batch});
do_send(#{inflight := Inflight, do_send(#{inflight := Inflight,
connection := Connection, connection := Connection,
mountpoint := Mountpoint, mountpoint := Mountpoint,
@ -409,14 +414,16 @@ do_send(#{inflight := Inflight,
emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'), emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'),
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
end, end,
?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]), ?SLOG(debug, #{msg => "publish to remote broker",
message => Batch, vars => Vars}),
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(M) || M <- Batch]) of case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(M) || M <- Batch]) of
{ok, Refs} -> {ok, Refs} ->
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef, {ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => map_set(Refs), send_ack_ref => map_set(Refs),
batch => Batch}]}}; batch => Batch}]}};
{error, Reason} -> {error, Reason} ->
?LOG(info, "mqtt_bridge_produce_failed ~p", [Reason]), ?SLOG(info, #{msg => "mqtt_bridge_produce_failed",
reason => Reason}),
{error, State} {error, State}
end. end.
@ -436,7 +443,8 @@ handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) ->
State#{inflight := Inflight}. State#{inflight := Inflight}.
do_ack([], Ref) -> do_ack([], Ref) ->
?LOG(debug, "stale_batch_ack_reference ~p", [Ref]), ?SLOG(debug, #{msg => "stale_batch_ack_reference",
ref => Ref}),
[]; [];
do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) -> do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) ->
case maps:is_key(Ref, Refs) of case maps:is_key(Ref, Refs) of

View File

@ -18,19 +18,17 @@
-define(KV_TAB, '@rule_engine_db'). -define(KV_TAB, '@rule_engine_db').
-type(maybe(T) :: T | undefined). -type maybe(T) :: T | undefined.
-type(rule_id() :: binary()). -type rule_id() :: binary().
-type(rule_name() :: binary()). -type rule_name() :: binary().
-type(descr() :: #{en := binary(), zh => binary()}). -type mf() :: {Module::atom(), Fun::atom()}.
-type(mf() :: {Module::atom(), Fun::atom()}). -type hook() :: atom() | 'any'.
-type(hook() :: atom() | 'any'). -type topic() :: binary().
-type bridge_channel_id() :: binary().
-type(topic() :: binary()).
-type(bridge_channel_id() :: binary()).
-type selected_data() :: map(). -type selected_data() :: map().
-type envs() :: map(). -type envs() :: map().
-type output_type() :: bridge | builtin | func. -type output_type() :: bridge | builtin | func.
@ -43,20 +41,18 @@
}. }.
-type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()). -type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()).
-type(rule_info() :: -type rule_info() ::
#{ from := list(topic()) #{ from := list(topic())
, outputs := [output()] , outputs := [output()]
, sql := binary() , sql := binary()
, is_foreach := boolean() , is_foreach := boolean()
, fields := list() , fields := list()
, doeach := term() , doeach := term()
, incase := list() , incase := term()
, conditions := tuple() , conditions := tuple()
, enabled := boolean() , enabled := boolean()
, description := binary() , description => binary()
}). }.
-define(descr, #{en => <<>>, zh => <<>>}).
-record(rule, -record(rule,
{ id :: rule_id() { id :: rule_id()

View File

@ -3,6 +3,7 @@
-behaviour(hocon_schema). -behaviour(hocon_schema).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ check_params/2 -export([ check_params/2
]). ]).
@ -19,7 +20,10 @@ check_params(Params, Tag) ->
#{Tag := Checked} -> {ok, Checked} #{Tag := Checked} -> {ok, Checked}
catch catch
Error:Reason:ST -> Error:Reason:ST ->
logger:error("check rule params failed: ~p", [{Error, Reason, ST}]), ?SLOG(error, #{msg => "check_rule_params_failed",
exception => Error,
reason => Reason,
stacktrace => ST}),
{error, {Reason, ST}} {error, {Reason, ST}}
end. end.
@ -27,8 +31,8 @@ check_params(Params, Tag) ->
%% Hocon Schema Definitions %% Hocon Schema Definitions
roots() -> roots() ->
[ {"rule_creation", sc(ref("rule_creation"), #{})} [ {"rule_creation", sc(ref("rule_creation"), #{desc => "Schema for creating rules"})}
, {"rule_test", sc(ref("rule_test"), #{})} , {"rule_test", sc(ref("rule_test"), #{desc => "Schema for testing rules"})}
]. ].
fields("rule_creation") -> fields("rule_creation") ->

View File

@ -26,7 +26,7 @@
-export_type([rule/0]). -export_type([rule/0]).
-type(rule() :: #rule{}). -type rule() :: #rule{}.
-define(T_RETRY, 60000). -define(T_RETRY, 60000).
@ -63,11 +63,6 @@ delete_rule(RuleId) ->
%% Internal Functions %% Internal Functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% The pattern {'ok', Select} can never match the type {'error',{_,[{_,_,_,_}]}}.
%% probably due to stack depth, or inlines.
-dialyzer({nowarn_function, [do_create_rule/1, parse_outputs/1, do_parse_outputs/1,
pre_process_repub_args/1, preproc_vars/1]}).
do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
case emqx_rule_sqlparser:parse(Sql) of case emqx_rule_sqlparser:parse(Sql) of
{ok, Select} -> {ok, Select} ->

View File

@ -237,10 +237,6 @@ param_path_id() ->
%% Rules API %% Rules API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% The pattern {'ok', Rule} can never match the type {'error',{_,'invalid_string' | binary() | [tuple()] | {_,[any()]} | {_,'sql_lex',{_,_}}}}
%% probably due to stack depth, or inlines.
-dialyzer({nowarn_function, [crud_rules/2, crud_rules_by_id/2]}).
list_events(#{}, _Params) -> list_events(#{}, _Params) ->
{200, emqx_rule_events:event_info()}. {200, emqx_rule_events:event_info()}.
@ -252,17 +248,14 @@ crud_rules(post, #{body := Params}) ->
?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:create_rule(CheckedParams) of ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:create_rule(CheckedParams) of
{ok, Rule} -> {201, format_rule_resp(Rule)}; {ok, Rule} -> {201, format_rule_resp(Rule)};
{error, Reason} -> {error, Reason} ->
?LOG(error, "create rule failed: ~0p", [Reason]), ?SLOG(error, #{msg => "create_rule_failed", reason => Reason}),
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
end). end).
rule_test(post, #{body := Params}) -> rule_test(post, #{body := Params}) ->
?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of
{ok, Result} -> {200, Result}; {ok, Result} -> {200, Result};
{error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}; {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}
{error, Reason} ->
?LOG(error, "rule test failed: ~0p", [Reason]),
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
end). end).
crud_rules_by_id(get, #{bindings := #{id := Id}}) -> crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
@ -280,7 +273,9 @@ crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params0}) ->
{error, not_found} -> {error, not_found} ->
{404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}; {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}};
{error, Reason} -> {error, Reason} ->
?LOG(error, "update rule failed: ~0p", [Reason]), ?SLOG(error, #{msg => "update_rule_failed",
id => Id,
reason => Reason}),
{400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}}
end); end);

View File

@ -31,7 +31,7 @@ console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
republish(_Selected, #{topic := Topic, headers := #{republish_by := RuleId}, republish(_Selected, #{topic := Topic, headers := #{republish_by := RuleId},
metadata := #{rule_id := RuleId}}, _Args) -> metadata := #{rule_id := RuleId}}, _Args) ->
?LOG(error, "[republish] recursively republish detected, msg topic: ~p", [Topic]); ?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic});
%% republish a PUBLISH message %% republish a PUBLISH message
republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}}, republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
@ -44,7 +44,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}},
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected), QoS = replace_simple_var(QoSTks, Selected),
Retain = replace_simple_var(RetainTks, Selected), Retain = replace_simple_var(RetainTks, Selected),
?LOG(debug, "[republish] to: ~p, payload: ~p", [Topic, Payload]), ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload); safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
%% in case this is a "$events/" event %% in case this is a "$events/" event
@ -58,7 +58,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}},
Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected), QoS = replace_simple_var(QoSTks, Selected),
Retain = replace_simple_var(RetainTks, Selected), Retain = replace_simple_var(RetainTks, Selected),
?LOG(debug, "[republish] to: ~p, payload: ~p", [Topic, Payload]), ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}),
safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload). safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
safe_publish(RuleId, Topic, QoS, Flags, Payload) -> safe_publish(RuleId, Topic, QoS, Flags, Payload) ->

View File

@ -207,15 +207,15 @@ handle_call({remove_rules, Rules}, _From, State) ->
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), ?SLOG(error, #{msg => "unexpected_call", request => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]), ?SLOG(error, #{msg => "unexpected_cast", request => Msg}),
{noreply, State}. {noreply, State}.
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]), ?SLOG(error, #{msg => "unexpected_info", request => Info}),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->

View File

@ -33,9 +33,9 @@
-compile({no_auto_import,[alias/1]}). -compile({no_auto_import,[alias/1]}).
-type(input() :: map()). -type input() :: map().
-type(alias() :: atom()). -type alias() :: atom().
-type(collection() :: {alias(), [term()]}). -type collection() :: {alias(), [term()]}.
-define(ephemeral_alias(TYPE, NAME), -define(ephemeral_alias(TYPE, NAME),
iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))). iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))).
@ -55,20 +55,24 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
catch catch
%% ignore the errors if select or match failed %% ignore the errors if select or match failed
_:{select_and_transform_error, Error} -> _:{select_and_transform_error, Error} ->
?LOG(warning, "SELECT clause exception for ~s failed: ~p", ?SLOG(warning, #{msg => "SELECT_clause_exception",
[RuleID, Error]); rule_id => RuleID, reason => Error});
_:{match_conditions_error, Error} -> _:{match_conditions_error, Error} ->
?LOG(warning, "WHERE clause exception for ~s failed: ~p", ?SLOG(warning, #{msg => "WHERE_clause_exception",
[RuleID, Error]); rule_id => RuleID, reason => Error});
_:{select_and_collect_error, Error} -> _:{select_and_collect_error, Error} ->
?LOG(warning, "FOREACH clause exception for ~s failed: ~p", ?SLOG(warning, #{msg => "FOREACH_clause_exception",
[RuleID, Error]); rule_id => RuleID, reason => Error});
_:{match_incase_error, Error} -> _:{match_incase_error, Error} ->
?LOG(warning, "INCASE clause exception for ~s failed: ~p", ?SLOG(warning, #{msg => "INCASE_clause_exception",
[RuleID, Error]); rule_id => RuleID, reason => Error});
_:Error:StkTrace -> Class:Error:StkTrace ->
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", ?SLOG(error, #{msg => "apply_rule_failed",
[RuleID, Error, StkTrace]) rule_id => RuleID,
exception => Class,
reason => Error,
stacktrace => StkTrace
})
end, end,
apply_rules(More, Input). apply_rules(More, Input).
@ -166,7 +170,6 @@ select_and_collect([Field|More], Input, {Output, LastKV}) ->
{nested_put(Key, Val, Output), LastKV}). {nested_put(Key, Val, Output), LastKV}).
%% Filter each item got from FOREACH %% Filter each item got from FOREACH
-dialyzer({nowarn_function, filter_collection/4}).
filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) -> filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) ->
lists:filtermap( lists:filtermap(
fun(Item) -> fun(Item) ->
@ -235,11 +238,16 @@ handle_output(OutId, Selected, Envs) ->
do_handle_output(OutId, Selected, Envs) do_handle_output(OutId, Selected, Envs)
catch catch
Err:Reason:ST -> Err:Reason:ST ->
?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}]) ?SLOG(error, #{msg => "output_failed",
output => OutId,
exception => Err,
reason => Reason,
stacktrace => ST
})
end. end.
do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) -> do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) ->
?LOG(debug, "output to bridge: ~p", [ChannelId]), ?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}),
emqx_bridge:send_message(ChannelId, Selected); emqx_bridge:send_message(ChannelId, Selected);
do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) -> do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) ->
erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]); erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]);

View File

@ -36,22 +36,18 @@
-opaque(select() :: #select{}). -opaque(select() :: #select{}).
-type(const() :: {const, number()|binary()}). -type const() :: {const, number()|binary()}.
-type(variable() :: binary() | list(binary())). -type variable() :: binary() | list(binary()).
-type(alias() :: binary() | list(binary())). -type alias() :: binary() | list(binary()).
-type(field() :: const() | variable() -type field() :: const() | variable()
| {as, field(), alias()} | {as, field(), alias()}
| {'fun', atom(), list(field())}). | {'fun', atom(), list(field())}.
-export_type([select/0]). -export_type([select/0]).
%% Dialyzer gives up on the generated code.
%% probably due to stack depth, or inlines.
-dialyzer({nowarn_function, [parse/1]}).
%% Parse one select statement. %% Parse one select statement.
-spec(parse(string() | binary()) -> {ok, select()} | {error, term()}). -spec(parse(string() | binary()) -> {ok, select()} | {error, term()}).
parse(Sql) -> parse(Sql) ->

View File

@ -22,16 +22,7 @@
, get_selected_data/3 , get_selected_data/3
]). ]).
%% Dialyzer gives up on the generated code. -spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, nomatch}.
%% probably due to stack depth, or inlines.
-dialyzer({nowarn_function, [test/1,
test_rule/4,
flatten/1,
fill_default_values/2,
envs_examp/1
]}).
-spec(test(#{}) -> {ok, map() | list()} | {error, term()}).
test(#{sql := Sql, context := Context}) -> test(#{sql := Sql, context := Context}) ->
{ok, Select} = emqx_rule_sqlparser:parse(Sql), {ok, Select} = emqx_rule_sqlparser:parse(Sql),
InTopic = maps:get(topic, Context, <<>>), InTopic = maps:get(topic, Context, <<>>),
@ -63,7 +54,8 @@ test_rule(Sql, Select, Context, EventTopics) ->
doeach => emqx_rule_sqlparser:select_doeach(Select), doeach => emqx_rule_sqlparser:select_doeach(Select),
incase => emqx_rule_sqlparser:select_incase(Select), incase => emqx_rule_sqlparser:select_incase(Select),
conditions => emqx_rule_sqlparser:select_where(Select) conditions => emqx_rule_sqlparser:select_where(Select)
} },
created_at = erlang:system_time(millisecond)
}, },
FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)), FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)),
try try
@ -86,8 +78,9 @@ flatten([D1]) -> D1;
flatten([D1 | L]) when is_list(D1) -> flatten([D1 | L]) when is_list(D1) ->
D1 ++ flatten(L). D1 ++ flatten(L).
echo_action(Data, _Envs) -> echo_action(Data, Envs) ->
?LOG(info, "Testing Rule SQL OK"), Data. ?SLOG(debug, #{msg => "testing_rule_sql_ok", data => Data, envs => Envs}),
Data.
fill_default_values(Event, Context) -> fill_default_values(Event, Context) ->
maps:merge(envs_examp(Event), Context). maps:merge(envs_examp(Event), Context).

View File

@ -56,7 +56,7 @@
, {replayq, "0.3.3"} , {replayq, "0.3.3"}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}