From a9185f964e1b3c14cb867b2ff88851a52bcb2ad4 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 28 Sep 2021 03:10:48 +0800 Subject: [PATCH] fix(rules): improve specs and logs (#5821) Co-authored-by: Zaiming Shi --- apps/emqx_bridge/src/emqx_bridge.erl | 8 +-- .../src/emqx_connector_http.erl | 18 ++++--- .../src/emqx_connector_ldap.erl | 16 ++++-- .../src/emqx_connector_mongo.erl | 22 +++++--- .../src/emqx_connector_mqtt.erl | 52 ++++++++++++------- .../src/emqx_connector_mysql.erl | 13 +++-- .../src/emqx_connector_pgsql.erl | 14 +++-- .../src/emqx_connector_redis.erl | 13 +++-- .../src/mqtt/emqx_connector_mqtt_mod.erl | 12 +++-- .../src/mqtt/emqx_connector_mqtt_worker.erl | 26 ++++++---- apps/emqx_rule_engine/include/rule_engine.hrl | 26 ++++------ .../src/emqx_rule_api_schema.erl | 10 ++-- .../emqx_rule_engine/src/emqx_rule_engine.erl | 7 +-- .../src/emqx_rule_engine_api.erl | 15 ++---- .../src/emqx_rule_outputs.erl | 8 +-- .../src/emqx_rule_registry.erl | 6 +-- .../src/emqx_rule_runtime.erl | 42 +++++++++------ .../src/emqx_rule_sqlparser.erl | 14 ++--- .../src/emqx_rule_sqltester.erl | 21 +++----- rebar.config | 2 +- 20 files changed, 199 insertions(+), 146 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 402c4f597..351e6aeca 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -189,7 +189,8 @@ restart_bridge(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). create_bridge(Type, Name, Conf) -> - logger:info("create ~p bridge ~p use config: ~p", [Type, Name, Conf]), + ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, + config => Conf}), ResId = resource_id(Type, Name), case emqx_resource:create(ResId, emqx_bridge:resource_type(Type), Conf) of @@ -210,12 +211,13 @@ update_bridge(Type, Name, {_OldConf, Conf}) -> %% `egress_channels` are changed, then we should not restart the bridge, we only restart/start %% the channels. %% - logger:info("update ~p bridge ~p use config: ~p", [Type, Name, Conf]), + ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, + config => Conf}), emqx_resource:recreate(resource_id(Type, Name), emqx_bridge:resource_type(Type), Conf, []). remove_bridge(Type, Name, _Conf) -> - logger:info("remove ~p bridge ~p", [Type, Name]), + ?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}), case emqx_resource:remove(resource_id(Type, Name)) of ok -> ok; {error, not_found} -> ok; diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 2f4aa2af4..aff6e7255 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -130,7 +130,8 @@ on_start(InstId, #{base_url := #{scheme := Scheme, retry_interval := RetryInterval, pool_type := PoolType, pool_size := PoolSize} = Config) -> - logger:info("starting http connector: ~p, config: ~p", [InstId, Config]), + ?SLOG(info, #{msg => "starting http connector", + connector => InstId, config => Config}), {Transport, TransportOpts} = case Scheme of http -> {tcp, []}; @@ -166,7 +167,8 @@ on_start(InstId, #{base_url := #{scheme := Scheme, end. on_stop(InstId, #{pool_name := PoolName}) -> - logger:info("stopping http connector: ~p", [InstId]), + ?SLOG(info, #{msg => "stopping http connector", + connector => InstId}), ehttpc_sup:stop_pool(PoolName). on_query(InstId, {send_message, ChannelId, Msg}, AfterQuery, #{channels := Channels} = State) -> @@ -181,16 +183,20 @@ on_query(InstId, {Method, Request}, AfterQuery, State) -> on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State); on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> on_query(InstId, {undefined, Method, Request, Timeout}, AfterQuery, State); -on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, #{pool_name := PoolName, - base_path := BasePath} = State) -> - logger:debug("http connector ~p received request: ~p, at state: ~p", [InstId, Request, State]), +on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, + #{pool_name := PoolName, base_path := BasePath} = State) -> + ?SLOG(debug, #{msg => "http connector received request", + request => Request, connector => InstId, + state => State}), NRequest = update_path(BasePath, Request), case Result = ehttpc:request(case KeyOrNum of undefined -> PoolName; _ -> {PoolName, KeyOrNum} end, Method, NRequest, Timeout) of {error, Reason} -> - logger:debug("http connector ~p do reqeust failed, sql: ~p, reason: ~p", [InstId, NRequest, Reason]), + ?SLOG(error, #{msg => "http connector do reqeust failed", + request => NRequest, reason => Reason, + connector => InstId}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index fadf7f56f..85e42b0f3 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -18,6 +18,7 @@ -include("emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-include_lib("emqx/include/logger.hrl"). -export([roots/0, fields/1]). @@ -53,7 +54,8 @@ on_start(InstId, #{servers := Servers0, pool_size := PoolSize, auto_reconnect := AutoReconn, ssl := SSL} = Config) -> - logger:info("starting ldap connector: ~p, config: ~p", [InstId, Config]), + ?SLOG(info, #{msg => "starting ldap connector", + connector => InstId, config => Config}), Servers = [begin proplists:get_value(host, S) end || S <- Servers0], SslOpts = case maps:get(enable, SSL) of true -> @@ -75,14 +77,20 @@ on_start(InstId, #{servers := Servers0, {ok, #{poolname => PoolName}}. on_stop(InstId, #{poolname := PoolName}) -> - logger:info("stopping ldap connector: ~p", [InstId]), + ?SLOG(info, #{msg => "stopping ldap connector", + connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := PoolName} = State) -> - logger:debug("ldap connector ~p received request: ~p, at state: ~p", [InstId, {Base, Filter, Attributes}, State]), + Request = {Base, Filter, Attributes}, + ?SLOG(debug, #{msg => "ldap connector received request", + request => Request, connector => InstId, + state => State}), case Result = ecpool:pick_and_do(PoolName, {?MODULE, search, [Base, Filter, Attributes]}, no_handover) of {error, Reason} -> - logger:debug("ldap connector ~p do request failed, request: ~p, reason: ~p", [InstId, {Base, Filter, Attributes}, Reason]), + ?SLOG(error, #{msg => "ldap connector do request failed", + request => Request, connector => InstId, + reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 906b57fb3..0cb40adbb 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -18,6 +18,7 @@ -include("emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-include_lib("emqx/include/logger.hrl"). -type server() :: emqx_schema:ip_port(). -reflect_type([server/0]). @@ -93,7 +94,8 @@ on_jsonify(Config) -> %% =================================================================== on_start(InstId, Config = #{server := Server, mongo_type := single}) -> - logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), + ?SLOG(info, #{msg => "starting mongodb single connector", + connector => InstId, config => Config}), Opts = [{type, single}, {hosts, [emqx_connector_schema_lib:ip_port_to_string(Server)]} ], @@ -102,7 +104,8 @@ on_start(InstId, Config = #{server := Server, on_start(InstId, Config = #{servers := Servers, mongo_type := rs, replica_set_name := RsName}) -> - logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), + ?SLOG(info, #{msg => "starting mongodb rs connector", + connector => InstId, config => Config}), Opts = [{type, {rs, RsName}}, {hosts, [emqx_connector_schema_lib:ip_port_to_string(S) || S <- Servers]} @@ -111,7 +114,8 @@ on_start(InstId, Config = #{servers := Servers, on_start(InstId, Config = #{servers := Servers, mongo_type := sharded}) -> - logger:info("starting mongodb connector: ~p, config: ~p", [InstId, Config]), + ?SLOG(info, #{msg => "starting mongodb sharded connector", + connector => InstId, config => Config}), Opts = [{type, sharded}, {hosts, [emqx_connector_schema_lib:ip_port_to_string(S) || S <- Servers]} @@ -119,14 +123,20 @@ on_start(InstId, Config = #{servers := Servers, do_start(InstId, Opts, Config). on_stop(InstId, #{poolname := PoolName}) -> - logger:info("stopping mongodb connector: ~p", [InstId]), + ?SLOG(info, #{msg => "stopping mongodb connector", + connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname := PoolName} = State) -> - logger:debug("mongodb connector ~p received request: ~p, at state: ~p", [InstId, {Action, Collection, Selector, Docs}, State]), + Request = {Action, Collection, Selector, Docs}, + ?SLOG(debug, #{msg => "mongodb connector received request", + request => Request, connector => InstId, + state => State}), case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of {error, Reason} -> - logger:debug("mongodb connector ~p do sql query failed, request: ~p, reason: ~p", [InstId, {Action, Collection, Selector, Docs}, Reason]), + ?SLOG(error, #{msg => "mongodb connector do query failed", + request => Request, reason => Reason, + connector => InstId}), emqx_resource:query_failed(AfterQuery), {error, Reason}; {ok, Cursor} when is_pid(Cursor) -> diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 424933ae4..a4527984a 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-include_lib("emqx/include/logger.hrl"). -behaviour(supervisor). @@ -88,13 +89,14 @@ drop_bridge(Name) -> %% =================================================================== %% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called %% if the bridge received msgs from the remote broker. -on_message_received(Msg, ChannelName) -> - Name = atom_to_binary(ChannelName, utf8), +on_message_received(Msg, ChannId) -> + Name = atom_to_binary(ChannId, utf8), emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]). %% =================================================================== on_start(InstId, Conf) -> - logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), + ?SLOG(info, #{msg => "starting mqtt connector", + connector => InstId, config => Conf}), "bridge:" ++ NamePrefix = binary_to_list(InstId), BasicConf = basic_config(Conf), InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}}, @@ -111,7 +113,8 @@ on_start(InstId, Conf) -> end, InitRes, InOutConfigs). on_stop(InstId, #{channels := NameList}) -> - logger:info("stopping mqtt connector: ~p", [InstId]), + ?SLOG(info, #{msg => "stopping mqtt connector", + connector => InstId}), lists:foreach(fun(Name) -> remove_channel(Name) end, NameList). @@ -122,7 +125,8 @@ on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, baisc_conf := BasicConf}) -> create_channel(Conf, Prefix, BasicConf); on_query(_InstId, {send_message, ChannelId, Msg}, _AfterQuery, _State) -> - logger:debug("send msg to remote node on channel: ~p, msg: ~p", [ChannelId, Msg]), + ?SLOG(debug, #{msg => "send msg to remote node", message => Msg, + channel_id => ChannelId}), emqx_connector_mqtt_worker:send_to_remote(ChannelId, Msg). on_health_check(_InstId, #{channels := NameList} = State) -> @@ -135,35 +139,43 @@ on_health_check(_InstId, #{channels := NameList} = State) -> create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT} = Conf}, NamePrefix, BasicConf) -> LocalT = maps:get(local_topic, Conf, undefined), - Name = ingress_channel_name(NamePrefix, Id), - logger:info("creating ingress channel ~p, remote ~s -> local ~s", [Name, RemoteT, LocalT]), + ChannId = ingress_channel_id(NamePrefix, Id), + ?SLOG(info, #{msg => "creating ingress channel", + remote_topic => RemoteT, + local_topic => LocalT, + channel_id => ChannId}), do_create_channel(BasicConf#{ - name => Name, - clientid => clientid(Name), + name => ChannId, + clientid => clientid(ChannId), subscriptions => Conf#{ local_topic => LocalT, - on_message_received => {fun ?MODULE:on_message_received/2, [Name]} + on_message_received => {fun ?MODULE:on_message_received/2, [ChannId]} }, forwards => undefined}); create_channel({{egress_channels, Id}, #{remote_topic := RemoteT} = Conf}, NamePrefix, BasicConf) -> LocalT = maps:get(subscribe_local_topic, Conf, undefined), - Name = egress_channel_name(NamePrefix, Id), - logger:info("creating egress channel ~p, local ~s -> remote ~s", [Name, LocalT, RemoteT]), + ChannId = egress_channel_id(NamePrefix, Id), + ?SLOG(info, #{msg => "creating egress channel", + remote_topic => RemoteT, + local_topic => LocalT, + channel_id => ChannId}), do_create_channel(BasicConf#{ - name => Name, - clientid => clientid(Name), + name => ChannId, + clientid => clientid(ChannId), subscriptions => undefined, forwards => Conf#{subscribe_local_topic => LocalT}}). -remove_channel(ChannelName) -> - logger:info("removing channel ~p", [ChannelName]), - case ?MODULE:drop_bridge(ChannelName) of +remove_channel(ChannId) -> + ?SLOG(info, #{msg => "removing channel", + channel_id => ChannId}), + case ?MODULE:drop_bridge(ChannId) of ok -> ok; {error, not_found} -> ok; {error, Reason} -> - logger:error("stop channel ~p failed, error: ~p", [ChannelName, Reason]) + ?SLOG(error, #{msg => "stop channel failed", + channel_id => ChannId, reason => Reason}) end. do_create_channel(#{name := Name} = Conf) -> @@ -216,9 +228,9 @@ basic_config(#{ taged_map_list(Tag, Map) -> [{{Tag, K}, V} || {K, V} <- maps:to_list(Map)]. -ingress_channel_name(Prefix, Id) -> +ingress_channel_id(Prefix, Id) -> channel_name("ingress_channels", Prefix, Id). -egress_channel_name(Prefix, Id) -> +egress_channel_id(Prefix, Id) -> channel_name("egress_channels", Prefix, Id). channel_name(Type, Prefix, Id) -> diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 9dc194c55..8b87af65f 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-include_lib("emqx/include/logger.hrl"). %% callbacks of behaviour emqx_resource -export([ on_start/2 @@ -54,7 +55,8 @@ on_start(InstId, #{server := {Host, Port}, auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL } = Config) -> - logger:info("starting mysql connector: ~p, config: ~p", [InstId, Config]), + ?SLOG(info, #{msg => "starting mysql connector", + connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of true -> [{ssl, [{server_name_indication, disable} | @@ -73,16 +75,19 @@ on_start(InstId, #{server := {Host, Port}, {ok, #{poolname => PoolName}}. on_stop(InstId, #{poolname := PoolName}) -> - logger:info("stopping mysql connector: ~p", [InstId]), + ?SLOG(info, #{msg => "stopping mysql connector", + connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State); on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> - logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), + ?SLOG(debug, #{msg => "mysql connector received sql query", + connector => InstId, sql => SQL, state => State}), case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params]}, no_handover) of {error, Reason} -> - logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), + ?SLOG(error, #{msg => "mysql connector do sql query failed", + connector => InstId, sql => SQL, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 8472c661e..0034737e8 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-include_lib("emqx/include/logger.hrl"). -export([roots/0, fields/1]). @@ -54,7 +55,8 @@ on_start(InstId, #{server := {Host, Port}, auto_reconnect := AutoReconn, pool_size := PoolSize, ssl := SSL } = Config) -> - logger:info("starting postgresql connector: ~p, config: ~p", [InstId, Config]), + ?SLOG(info, #{msg => "starting postgresql connector", + connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of true -> [{ssl, [{server_name_indication, disable} | @@ -73,16 +75,20 @@ on_start(InstId, #{server := {Host, Port}, {ok, #{poolname => PoolName}}. on_stop(InstId, #{poolname := PoolName}) -> - logger:info("stopping postgresql connector: ~p", [InstId]), + ?SLOG(info, #{msg => "stopping postgresql connector", + connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State); on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> - logger:debug("postgresql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), + ?SLOG(debug, #{msg => "postgresql connector received sql query", + connector => InstId, sql => SQL, state => State}), case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of {error, Reason} -> - logger:debug("postgresql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), + ?SLOG(error, #{ + msg => "postgresql connector do sql query failed", + connector => InstId, sql => SQL, reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 44b036f39..aed06e724 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -18,6 +18,7 @@ -include("emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-include_lib("emqx/include/logger.hrl"). -type server() :: tuple(). @@ -85,7 +86,8 @@ on_start(InstId, #{redis_type := Type, pool_size := PoolSize, auto_reconnect := AutoReconn, ssl := SSL } = Config) -> - logger:info("starting redis connector: ~p, config: ~p", [InstId, Config]), + ?SLOG(info, #{msg => "starting redis connector", + connector => InstId, config => Config}), Servers = case Type of single -> [{servers, [maps:get(server, Config)]}]; _ ->[{servers, maps:get(servers, Config)}] @@ -116,18 +118,21 @@ on_start(InstId, #{redis_type := Type, {ok, #{poolname => PoolName, type => Type}}. on_stop(InstId, #{poolname := PoolName}) -> - logger:info("stopping redis connector: ~p", [InstId]), + ?SLOG(info, #{msg => "stopping redis connector", + connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> - logger:debug("redis connector ~p received cmd query: ~p, at state: ~p", [InstId, Command, State]), + ?SLOG(debug, #{msg => "redis connector received cmd query", + connector => InstId, sql => Command, state => State}), Result = case Type of cluster -> eredis_cluster:q(PoolName, Command); _ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) end, case Result of {error, Reason} -> - logger:debug("redis connector ~p do cmd query failed, cmd: ~p, reason: ~p", [InstId, Command, Reason]), + ?SLOG(error, #{msg => "redis connector do cmd query failed", + connector => InstId, sql => Command, reason => Reason}), emqx_resource:query_failed(AfterCommand); _ -> emqx_resource:query_success(AfterCommand) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 560500d3d..853221eec 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -155,14 +155,18 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) RC =:= ?RC_NO_MATCHING_SUBSCRIBERS -> Parent ! {batch_ack, PktId}, ok; handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) -> - ?LOG(warning, "publish ~p to remote node falied, reason_code: ~p", [PktId, RC]). + ?SLOG(warning, #{msg => "publish to remote node falied", + packet_id => PktId, reason_code => RC}). handle_publish(Msg, undefined) -> - ?LOG(error, "cannot publish to local broker as 'bridge.mqtt..in' not configured, msg: ~p", [Msg]); + ?SLOG(error, #{msg => "cannot publish to local broker as" + " ingress_channles' is not configured", + message => Msg}); handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) -> - ?LOG(debug, "publish to local broker, msg: ~p, vars: ~p", [Msg, Vars]), + ?SLOG(debug, #{msg => "publish to local broker", + message => Msg, vars => Vars}), emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), - _ = erlang:apply(OnMsgRcvdFunc, [Msg] ++ Args), + _ = erlang:apply(OnMsgRcvdFunc, [Msg | Args]), case maps:get(local_topic, Vars, undefined) of undefined -> ok; _Topic -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index c98efd322..990d15ef5 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -63,6 +63,7 @@ -behaviour(gen_statem). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/logger.hrl"). %% APIs -export([ start_link/1 @@ -189,7 +190,8 @@ callback_mode() -> [state_functions]. %% @doc Config should be a map(). init(#{name := Name} = ConnectOpts) -> - ?LOG(debug, "starting bridge worker for ~p", [Name]), + ?SLOG(debug, #{msg => "starting bridge worker", + name => Name}), erlang:process_flag(trap_exit, true), Queue = open_replayq(Name, maps:get(replayq, ConnectOpts, #{})), State = init_state(ConnectOpts), @@ -335,8 +337,9 @@ common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) -> NewQ = replayq:append(Q, [Msg]), {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(StateName, Type, Content, #{name := Name} = State) -> - ?LOG(notice, "Bridge ~p discarded ~p type event at state ~p:~p", - [Name, Type, StateName, Content]), + ?SLOG(notice, #{msg => "Bridge discarded event", + name => Name, type => Type, state_name => StateName, + content => Content}), {keep_state, State}. do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards}, @@ -352,8 +355,8 @@ do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards}, {ok, State#{connection => Conn}}; {error, Reason} -> ConnectOpts1 = obfuscate(ConnectOpts), - ?LOG(error, "Failed to connect \n" - "config=~p\nreason:~p", [ConnectOpts1, Reason]), + ?SLOG(error, #{msg => "Failed to connect", + config => ConnectOpts1, reason => Reason}), {error, Reason, State} end. @@ -399,7 +402,9 @@ pop_and_send_loop(#{replayq := Q} = State, N) -> %% Assert non-empty batch because we have a is_empty check earlier. do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) -> - ?LOG(error, "cannot forward messages to remote broker as 'bridge.mqtt..in' not configured, msg: ~p", [Batch]); + ?SLOG(error, #{msg => "cannot forward messages to remote broker" + " as egress_channel is not configured", + messages => Batch}); do_send(#{inflight := Inflight, connection := Connection, mountpoint := Mountpoint, @@ -409,14 +414,16 @@ do_send(#{inflight := Inflight, emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'), emqx_connector_mqtt_msg:to_remote_msg(Message, Vars) end, - ?LOG(debug, "publish to remote broker, msg: ~p, vars: ~p", [Batch, Vars]), + ?SLOG(debug, #{msg => "publish to remote broker", + message => Batch, vars => Vars}), case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(M) || M <- Batch]) of {ok, Refs} -> {ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef, send_ack_ref => map_set(Refs), batch => Batch}]}}; {error, Reason} -> - ?LOG(info, "mqtt_bridge_produce_failed ~p", [Reason]), + ?SLOG(info, #{msg => "mqtt_bridge_produce_failed", + reason => Reason}), {error, State} end. @@ -436,7 +443,8 @@ handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) -> State#{inflight := Inflight}. do_ack([], Ref) -> - ?LOG(debug, "stale_batch_ack_reference ~p", [Ref]), + ?SLOG(debug, #{msg => "stale_batch_ack_reference", + ref => Ref}), []; do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) -> case maps:is_key(Ref, Refs) of diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index b46d9149c..2908051fe 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -18,19 +18,17 @@ -define(KV_TAB, '@rule_engine_db'). --type(maybe(T) :: T | undefined). +-type maybe(T) :: T | undefined. --type(rule_id() :: binary()). --type(rule_name() :: binary()). +-type rule_id() :: binary(). +-type rule_name() :: binary(). --type(descr() :: #{en := binary(), zh => binary()}). +-type mf() :: {Module::atom(), Fun::atom()}. --type(mf() :: {Module::atom(), Fun::atom()}). +-type hook() :: atom() | 'any'. --type(hook() :: atom() | 'any'). - --type(topic() :: binary()). --type(bridge_channel_id() :: binary()). +-type topic() :: binary(). +-type bridge_channel_id() :: binary(). -type selected_data() :: map(). -type envs() :: map(). -type output_type() :: bridge | builtin | func. @@ -43,20 +41,18 @@ }. -type output_fun() :: fun((selected_data(), envs(), output_fun_args()) -> any()). --type(rule_info() :: +-type rule_info() :: #{ from := list(topic()) , outputs := [output()] , sql := binary() , is_foreach := boolean() , fields := list() , doeach := term() - , incase := list() + , incase := term() , conditions := tuple() , enabled := boolean() - , description := binary() - }). - --define(descr, #{en => <<>>, zh => <<>>}). + , description => binary() + }. -record(rule, { id :: rule_id() diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 9a78f27d4..c96e82ecb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -3,6 +3,7 @@ -behaviour(hocon_schema). -include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). -export([ check_params/2 ]). @@ -19,7 +20,10 @@ check_params(Params, Tag) -> #{Tag := Checked} -> {ok, Checked} catch Error:Reason:ST -> - logger:error("check rule params failed: ~p", [{Error, Reason, ST}]), + ?SLOG(error, #{msg => "check_rule_params_failed", + exception => Error, + reason => Reason, + stacktrace => ST}), {error, {Reason, ST}} end. @@ -27,8 +31,8 @@ check_params(Params, Tag) -> %% Hocon Schema Definitions roots() -> - [ {"rule_creation", sc(ref("rule_creation"), #{})} - , {"rule_test", sc(ref("rule_test"), #{})} + [ {"rule_creation", sc(ref("rule_creation"), #{desc => "Schema for creating rules"})} + , {"rule_test", sc(ref("rule_test"), #{desc => "Schema for testing rules"})} ]. fields("rule_creation") -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 1e27b68ce..04d35931a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -26,7 +26,7 @@ -export_type([rule/0]). --type(rule() :: #rule{}). +-type rule() :: #rule{}. -define(T_RETRY, 60000). @@ -63,11 +63,6 @@ delete_rule(RuleId) -> %% Internal Functions %%------------------------------------------------------------------------------ -%% The pattern {'ok', Select} can never match the type {'error',{_,[{_,_,_,_}]}}. -%% probably due to stack depth, or inlines. --dialyzer({nowarn_function, [do_create_rule/1, parse_outputs/1, do_parse_outputs/1, - pre_process_repub_args/1, preproc_vars/1]}). - do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index c9112b7c3..b097c7169 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -237,10 +237,6 @@ param_path_id() -> %% Rules API %%------------------------------------------------------------------------------ -%% The pattern {'ok', Rule} can never match the type {'error',{_,'invalid_string' | binary() | [tuple()] | {_,[any()]} | {_,'sql_lex',{_,_}}}} -%% probably due to stack depth, or inlines. --dialyzer({nowarn_function, [crud_rules/2, crud_rules_by_id/2]}). - list_events(#{}, _Params) -> {200, emqx_rule_events:event_info()}. @@ -252,17 +248,14 @@ crud_rules(post, #{body := Params}) -> ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:create_rule(CheckedParams) of {ok, Rule} -> {201, format_rule_resp(Rule)}; {error, Reason} -> - ?LOG(error, "create rule failed: ~0p", [Reason]), + ?SLOG(error, #{msg => "create_rule_failed", reason => Reason}), {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} end). rule_test(post, #{body := Params}) -> ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of {ok, Result} -> {200, Result}; - {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}; - {error, Reason} -> - ?LOG(error, "rule test failed: ~0p", [Reason]), - {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}} end). crud_rules_by_id(get, #{bindings := #{id := Id}}) -> @@ -280,7 +273,9 @@ crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params0}) -> {error, not_found} -> {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}; {error, Reason} -> - ?LOG(error, "update rule failed: ~0p", [Reason]), + ?SLOG(error, #{msg => "update_rule_failed", + id => Id, + reason => Reason}), {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} end); diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl index e322aba9b..cd59d3fa5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -31,7 +31,7 @@ console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) -> republish(_Selected, #{topic := Topic, headers := #{republish_by := RuleId}, metadata := #{rule_id := RuleId}}, _Args) -> - ?LOG(error, "[republish] recursively republish detected, msg topic: ~p", [Topic]); + ?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic}); %% republish a PUBLISH message republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}}, @@ -44,7 +44,7 @@ republish(Selected, #{flags := Flags, metadata := #{rule_id := RuleId}}, Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected), Retain = replace_simple_var(RetainTks, Selected), - ?LOG(debug, "[republish] to: ~p, payload: ~p", [Topic, Payload]), + ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}), safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload); %% in case this is a "$events/" event @@ -58,7 +58,7 @@ republish(Selected, #{metadata := #{rule_id := RuleId}}, Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected), Retain = replace_simple_var(RetainTks, Selected), - ?LOG(debug, "[republish] to: ~p, payload: ~p", [Topic, Payload]), + ?SLOG(debug, #{msg => "republish", topic => Topic, payload => Payload}), safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload). safe_publish(RuleId, Topic, QoS, Flags, Payload) -> @@ -79,4 +79,4 @@ replace_simple_var(Tokens, Data) when is_list(Tokens) -> [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), Var; replace_simple_var(Val, _Data) -> - Val. \ No newline at end of file + Val. diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 8261149a7..370a72933 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -207,15 +207,15 @@ handle_call({remove_rules, Rules}, _From, State) -> {reply, ok, State}; handle_call(Req, _From, State) -> - ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), + ?SLOG(error, #{msg => "unexpected_call", request => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]), + ?SLOG(error, #{msg => "unexpected_cast", request => Msg}), {noreply, State}. handle_info(Info, State) -> - ?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]), + ?SLOG(error, #{msg => "unexpected_info", request => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 836b545a2..aafc6cddc 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -33,9 +33,9 @@ -compile({no_auto_import,[alias/1]}). --type(input() :: map()). --type(alias() :: atom()). --type(collection() :: {alias(), [term()]}). +-type input() :: map(). +-type alias() :: atom(). +-type collection() :: {alias(), [term()]}. -define(ephemeral_alias(TYPE, NAME), iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))). @@ -55,20 +55,24 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) -> catch %% ignore the errors if select or match failed _:{select_and_transform_error, Error} -> - ?LOG(warning, "SELECT clause exception for ~s failed: ~p", - [RuleID, Error]); + ?SLOG(warning, #{msg => "SELECT_clause_exception", + rule_id => RuleID, reason => Error}); _:{match_conditions_error, Error} -> - ?LOG(warning, "WHERE clause exception for ~s failed: ~p", - [RuleID, Error]); + ?SLOG(warning, #{msg => "WHERE_clause_exception", + rule_id => RuleID, reason => Error}); _:{select_and_collect_error, Error} -> - ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", - [RuleID, Error]); + ?SLOG(warning, #{msg => "FOREACH_clause_exception", + rule_id => RuleID, reason => Error}); _:{match_incase_error, Error} -> - ?LOG(warning, "INCASE clause exception for ~s failed: ~p", - [RuleID, Error]); - _:Error:StkTrace -> - ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", - [RuleID, Error, StkTrace]) + ?SLOG(warning, #{msg => "INCASE_clause_exception", + rule_id => RuleID, reason => Error}); + Class:Error:StkTrace -> + ?SLOG(error, #{msg => "apply_rule_failed", + rule_id => RuleID, + exception => Class, + reason => Error, + stacktrace => StkTrace + }) end, apply_rules(More, Input). @@ -166,7 +170,6 @@ select_and_collect([Field|More], Input, {Output, LastKV}) -> {nested_put(Key, Val, Output), LastKV}). %% Filter each item got from FOREACH --dialyzer({nowarn_function, filter_collection/4}). filter_collection(Input, InCase, DoEach, {CollKey, CollVal}) -> lists:filtermap( fun(Item) -> @@ -235,11 +238,16 @@ handle_output(OutId, Selected, Envs) -> do_handle_output(OutId, Selected, Envs) catch Err:Reason:ST -> - ?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}]) + ?SLOG(error, #{msg => "output_failed", + output => OutId, + exception => Err, + reason => Reason, + stacktrace => ST + }) end. do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) -> - ?LOG(debug, "output to bridge: ~p", [ChannelId]), + ?SLOG(debug, #{msg => "output to bridge", channel_id => ChannelId}), emqx_bridge:send_message(ChannelId, Selected); do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) -> erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]); diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl index b7833234b..02c5a02e9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl @@ -36,22 +36,18 @@ -opaque(select() :: #select{}). --type(const() :: {const, number()|binary()}). +-type const() :: {const, number()|binary()}. --type(variable() :: binary() | list(binary())). +-type variable() :: binary() | list(binary()). --type(alias() :: binary() | list(binary())). +-type alias() :: binary() | list(binary()). --type(field() :: const() | variable() +-type field() :: const() | variable() | {as, field(), alias()} - | {'fun', atom(), list(field())}). + | {'fun', atom(), list(field())}. -export_type([select/0]). -%% Dialyzer gives up on the generated code. -%% probably due to stack depth, or inlines. --dialyzer({nowarn_function, [parse/1]}). - %% Parse one select statement. -spec(parse(string() | binary()) -> {ok, select()} | {error, term()}). parse(Sql) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 620361c0c..ec263b35a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -22,16 +22,7 @@ , get_selected_data/3 ]). -%% Dialyzer gives up on the generated code. -%% probably due to stack depth, or inlines. --dialyzer({nowarn_function, [test/1, - test_rule/4, - flatten/1, - fill_default_values/2, - envs_examp/1 - ]}). - --spec(test(#{}) -> {ok, map() | list()} | {error, term()}). +-spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, nomatch}. test(#{sql := Sql, context := Context}) -> {ok, Select} = emqx_rule_sqlparser:parse(Sql), InTopic = maps:get(topic, Context, <<>>), @@ -63,7 +54,8 @@ test_rule(Sql, Select, Context, EventTopics) -> doeach => emqx_rule_sqlparser:select_doeach(Select), incase => emqx_rule_sqlparser:select_incase(Select), conditions => emqx_rule_sqlparser:select_where(Select) - } + }, + created_at = erlang:system_time(millisecond) }, FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)), try @@ -76,7 +68,7 @@ test_rule(Sql, Select, Context, EventTopics) -> end. get_selected_data(Selected, _Envs, _Args) -> - Selected. + Selected. is_publish_topic(<<"$events/", _/binary>>) -> false; is_publish_topic(_Topic) -> true. @@ -86,8 +78,9 @@ flatten([D1]) -> D1; flatten([D1 | L]) when is_list(D1) -> D1 ++ flatten(L). -echo_action(Data, _Envs) -> - ?LOG(info, "Testing Rule SQL OK"), Data. +echo_action(Data, Envs) -> + ?SLOG(debug, #{msg => "testing_rule_sql_ok", data => Data, envs => Envs}), + Data. fill_default_values(Event, Context) -> maps:merge(envs_examp(Event), Context). diff --git a/rebar.config b/rebar.config index c438bbe35..ca8dd3e22 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {replayq, "0.3.3"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}} - , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} + , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}