feat(rule-engine): update the configuration file to hocon

This commit is contained in:
Turtle 2021-06-28 11:30:38 +08:00 committed by turtleDeng
parent 97fa19f244
commit 54aeacee14
10 changed files with 66 additions and 196 deletions

View File

@ -1,42 +1,6 @@
##====================================================================
## Rule Engine for EMQ X R4.0
## Rule Engine for EMQ X R5.0
##====================================================================
rule_engine.ignore_sys_message = on
## Event Messages
##
## If enabled (on), rule engine publishes the event as an MQTT message
## with topic='$events/<event-name>' on the occurrence of an emqx event.
##
## If disabled, rule engine stops publishing the event messages, but
## the event message can still be processed by the rule SQL. e.g. rule SQL:
##
## SELECT * FROM "$events/client_connected"
##
## will still work even if 'rule_engine.events.client_connected' is set to 'off'
##
## EMQ Event to event message mapping:
##
## - client.connected -> $events/client_connected
## - client.disconnected -> $events/client_disconnected
## - session.subscribed -> $events/session_subscribed
## - session.unsubscribed -> $events/session_unsubscribed
## - message.delivered -> $events/message_delivered
## - message.acked -> $events/message_acked
## - message.dropped -> $events/message_dropped
##
## Config Value Format: Toggle, QoS-Level
##
## Toggle: on/off
##
## QoS-Level: qos0/qos1/qos2
#rule_engine.events.client_connected = "on, qos1"
rule_engine.events.client_connected = off
rule_engine.events.client_disconnected = off
rule_engine.events.session_subscribed = off
rule_engine.events.session_unsubscribed = off
rule_engine.events.message_delivered = off
rule_engine.events.message_acked = off
rule_engine.events.message_dropped = off
emqx_rule_engine:{
ignore_sys_message: true
}

View File

@ -1,61 +0,0 @@
%%-*- mode: erlang -*-
%% emqx_rule_engine config mapping
{mapping, "rule_engine.ignore_sys_message", "emqx_rule_engine.ignore_sys_message", [
{default, on},
{datatype, flag}
]}.
{mapping, "rule_engine.events.$name", "emqx_rule_engine.events", [
{default, "off, qos1"},
{datatype, string}
]}.
{translation, "emqx_rule_engine.events", fun(Conf) ->
SupportedHooks =
[ 'client.connected'
, 'client.disconnected'
, 'session.subscribed'
, 'session.unsubscribed'
, 'message.delivered'
, 'message.acked'
, 'message.dropped'
],
HookPoint = fun(Event) ->
case string:split(Event, "_") of
[Prefix, Name] ->
Point = list_to_atom(lists:append([Prefix, ".", Name])),
case lists:member(Point, SupportedHooks) of
true -> Point;
false -> error({unsupported_event, Event})
end;
[_] ->
error({invalid_event, Event})
end
end,
QoS = fun ("qos"++Level = QoSLevel) ->
case list_to_integer(Level) of
QoSL when QoSL =:= 0; QoSL =:= 1; QoSL =:= 2 ->
QoSL;
_ ->
error({invalid_qos_level, QoSLevel})
end;
(QoSLevel) ->
error({invalid_qos, QoSLevel})
end,
lists:foldl(
fun({EE=[_,"events",EvtName], Val}, Acc) ->
case string:split(string:trim(Val), ",", all) of
["on"++_, Snd] ->
[{HookPoint(EvtName), on, QoS(string:trim(Snd))} | Acc];
["on"++_] ->
[{HookPoint(EvtName), on, 1} | Acc];
[_] ->
Acc
end;
({_, _}, Acc) -> Acc
end, [], cuttlefish_variable:filter_by_prefix("rule_engine.events", Conf))
end}.

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine,
[{description, "EMQ X Rule Engine"},
{vsn, "4.3.3"}, % strict semver, bump manually!
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
{applications, [kernel,stdlib,rulesql,getopt]},

View File

@ -1,38 +0,0 @@
%% -*-: erlang -*-
{"4.3.3",
[ {"4.3.0",
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{"4.3.1",
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{"4.3.2",
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{<<".*">>, []}
],
[
{"4.3.0",
[ {load_module, emqx_rule_funcs, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{"4.3.1",
[ {load_module, emqx_rule_engine, brutal_purge, soft_purge, []}
, {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{"4.3.2",
[ {load_module, emqx_rule_registry, brutal_purge, soft_purge, []}
, {apply, {emqx_stats, cancel_update, [rule_registery_stats]}}
]},
{<<".*">>, []}
]
}.

View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine_schema).
-include_lib("typerefl/include/types.hrl").
-behaviour(hocon_schema).
-export([ structs/0
, fields/1]).
structs() -> ["emqx_rule_engine"].
fields("emqx_rule_engine") ->
[{ignore_sys_message, emqx_schema:t(boolean(), undefined, true)}].

View File

@ -63,9 +63,10 @@
-endif.
load(Topic) ->
IgnoreSys = proplists:get_value(ignore_sys_message, env(), true),
HookPoint = event_name(Topic),
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint),
[hook_conf(HookPoint, env())]}).
[#{ignore_sys_message => IgnoreSys}]}).
unload() ->
lists:foreach(fun(HookPoint) ->
@ -97,26 +98,26 @@ on_message_publish(Message = #message{topic = Topic}, _Env) ->
{ok, Message}.
on_client_connected(ClientInfo, ConnInfo, Env) ->
may_publish_and_apply('client.connected',
apply_event('client.connected',
fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env).
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
may_publish_and_apply('client.disconnected',
apply_event('client.disconnected',
fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env).
on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
may_publish_and_apply('session.subscribed',
apply_event('session.subscribed',
fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env).
on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) ->
may_publish_and_apply('session.unsubscribed',
apply_event('session.unsubscribed',
fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, Env).
on_message_dropped(Message = #message{flags = #{sys := true}},
_, _, #{ignore_sys_message := true}) ->
{ok, Message};
on_message_dropped(Message, _, Reason, Env) ->
may_publish_and_apply('message.dropped',
apply_event('message.dropped',
fun() -> eventmsg_dropped(Message, Reason) end, Env),
{ok, Message}.
@ -124,7 +125,7 @@ on_message_delivered(_ClientInfo, Message = #message{flags = #{sys := true}},
#{ignore_sys_message := true}) ->
{ok, Message};
on_message_delivered(ClientInfo, Message, Env) ->
may_publish_and_apply('message.delivered',
apply_event('message.delivered',
fun() -> eventmsg_delivered(ClientInfo, Message) end, Env),
{ok, Message}.
@ -132,7 +133,7 @@ on_message_acked(_ClientInfo, Message = #message{flags = #{sys := true}},
#{ignore_sys_message := true}) ->
{ok, Message};
on_message_acked(ClientInfo, Message, Env) ->
may_publish_and_apply('message.acked',
apply_event('message.acked',
fun() -> eventmsg_acked(ClientInfo, Message) end, Env),
{ok, Message}.
@ -297,31 +298,15 @@ with_basic_columns(EventName, Data) when is_map(Data) ->
}.
%%--------------------------------------------------------------------
%% Events publishing and rules applying
%% rules applying
%%--------------------------------------------------------------------
may_publish_and_apply(EventName, GenEventMsg, #{enabled := true, qos := QoS}) ->
EventTopic = event_topic(EventName),
EventMsg = GenEventMsg(),
case emqx_json:safe_encode(EventMsg) of
{ok, Payload} ->
_ = emqx_broker:safe_publish(make_msg(QoS, EventTopic, Payload)),
ok;
{error, _Reason} ->
?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg])
end,
emqx_rule_runtime:apply_rules(emqx_rule_registry:get_rules_for(EventTopic), EventMsg);
may_publish_and_apply(EventName, GenEventMsg, _Env) ->
apply_event(EventName, GenEventMsg, _Env) ->
EventTopic = event_topic(EventName),
case emqx_rule_registry:get_rules_for(EventTopic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
end.
make_msg(QoS, Topic, Payload) ->
emqx_message:set_flags(#{sys => true, event => true},
emqx_message:make(emqx_events, QoS, Topic, iolist_to_binary(Payload))).
%%--------------------------------------------------------------------
%% Columns
%%--------------------------------------------------------------------
@ -559,14 +544,6 @@ columns_with_exam('session.unsubscribed') ->
%% Helper functions
%%--------------------------------------------------------------------
hook_conf(HookPoint, Env) ->
Events = proplists:get_value(events, Env, []),
IgnoreSys = proplists:get_value(ignore_sys_message, Env, true),
case lists:keyfind(HookPoint, 1, Events) of
{_, on, QoS} -> #{enabled => true, qos => QoS, ignore_sys_message => IgnoreSys};
_ -> #{enabled => false, qos => 1, ignore_sys_message => IgnoreSys}
end.
hook_fun(Event) ->
case string:split(atom_to_list(Event), ".") of
[Prefix, Name] ->

View File

@ -149,11 +149,11 @@ groups() ->
init_per_suite(Config) ->
ok = ekka_mnesia:start(),
ok = emqx_rule_registry:mnesia(boot),
start_apps(),
ok = emqx_ct_helpers:start_apps([emqx_rule_engine], fun set_special_configs/1),
Config.
end_per_suite(_Config) ->
stop_apps(),
emqx_ct_helpers:stop_apps([emqx_rule_engine]),
ok.
on_resource_create(_id, _) -> #{}.
@ -2545,21 +2545,6 @@ init_events_counters() ->
%%------------------------------------------------------------------------------
%% Start Apps
%%------------------------------------------------------------------------------
stop_apps() ->
stopped = mnesia:stop(),
[application:stop(App) || App <- [emqx_rule_engine, emqx]].
start_apps() ->
[start_apps(App, SchemaFile, ConfigFile) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, emqx_schema, deps_path(emqx, "etc/emqx.conf")},
{emqx_rule_engine, local_path("priv/emqx_rule_engine.schema"),
local_path("etc/emqx_rule_engine.conf")}]].
start_apps(App, Schema, ConfigFile) ->
emqx_ct_helpers:start_app(App, Schema, ConfigFile, fun set_special_configs/1).
deps_path(App, RelativePath) ->
Path0 = code:lib_dir(App),
Path = case file:read_link(Path0) of

View File

@ -1,3 +1,19 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_telemetry_schema).
-include_lib("typerefl/include/types.hrl").

View File

@ -2,5 +2,4 @@
{emqx_dashboard, true}.
{emqx_modules, {{enable_plugin_emqx_modules}}}.
{emqx_retainer, {{enable_plugin_emqx_retainer}}}.
{emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}.
{emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}.

View File

@ -192,8 +192,7 @@ overlay_vars_rel(RelType) ->
cloud -> "vm.args";
edge -> "vm.args.edge"
end,
[ {enable_plugin_emqx_rule_engine, RelType =:= cloud}
, {enable_plugin_emqx_bridge_mqtt, RelType =:= edge}
[ {enable_plugin_emqx_bridge_mqtt, RelType =:= edge}
, {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
, {enable_plugin_emqx_retainer, true}
, {vm_args_file, VmArgs}
@ -254,6 +253,7 @@ relx_apps(ReleaseType) ->
, emqx_resource
, emqx_connector
, emqx_data_bridge
, emqx_rule_engine
]
++ [emqx_telemetry || not is_enterprise()]
++ [emqx_modules || not is_enterprise()]
@ -286,7 +286,6 @@ relx_plugin_apps(ReleaseType) ->
, emqx_stomp
, emqx_authentication
, emqx_web_hook
, emqx_rule_engine
, emqx_statsd
]
++ relx_plugin_apps_per_rel(ReleaseType)