Merge pull request #4214 from emqx/resolve-dev/v4.3.0-to-dev/v5.0-conflict

Auto-pull-request-on-2021-02-20
This commit is contained in:
Zaiming Shi 2021-02-22 20:14:46 +01:00 committed by GitHub
commit 4293d15cf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 480 additions and 59 deletions

View File

@ -36,10 +36,14 @@ auth.jwt.verify_claims.enable = off
## The checklist of claims to validate
##
## Value: String
## auth.jwt.verify_claims.$name = expected
## Configuration format: auth.jwt.verify_claims.$name = $expected
## - $name: the name of the field in the JWT payload to be verified
## - $expected: the expected value
##
## Variables:
## - %u: username
## - %c: clientid
## The available placeholders for $expected:
## - %u: username
## - %c: clientid
##
## For example, to verify that the username in the JWT payload is the same
## as the client (MQTT protocol) username
#auth.jwt.verify_claims.username = "%u"

View File

@ -41,6 +41,15 @@ lwm2m.topics.register = "up/resp"
# The topic to which the lwm2m client's update message is published
lwm2m.topics.update = "up/resp"
# When publish the update message.
#
# Can be one of:
# - object_list_changed: only if the object list is changed
# - always: always publish the update message
#
# Defaults to object_list_changed
#lwm2m.publish_update_when = object_list_changed
# Dir where the object definition files can be found
lwm2m.xml_dir = "{{ platform_etc_dir }}/lwm2m_xml"

View File

@ -112,6 +112,11 @@ end}.
{default, "lwm2m/%e/up/resp"}
]}.
{mapping, "lwm2m.publish_update_when", "emqx_lwm2m.publish_update_when", [
{datatype, {enum, [object_list_changed, always]}},
{default, object_list_changed}
]}.
{translation, "emqx_lwm2m.topics", fun(Conf) ->
Topics = cuttlefish_variable:filter_by_prefix("lwm2m.topics", Conf),
Opts = lists:map(fun({[_,_, Key], Value}) ->

View File

@ -101,10 +101,12 @@ get_lwm2m_opts(Envs) ->
AutoObserve = proplists:get_value(auto_observe, Envs, []),
QmodeTimeWindow = proplists:get_value(qmode_time_window, Envs, []),
Topics = proplists:get_value(topics, Envs, []),
PublishUpdateWhen = proplists:get_value(publish_update_when, Envs, object_list_changed),
[{lifetime_max, LifetimeMax},
{lifetime_min, LifetimeMin},
{mountpoint, list_to_binary(Mountpoint)},
{port, Sockport},
{auto_observe, AutoObserve},
{qmode_time_window, QmodeTimeWindow},
{publish_update_when, PublishUpdateWhen},
{topics, Topics}].

View File

@ -115,15 +115,23 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName,
_ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState),
Lwm2mState#lwm2m_state{mqtt_topic = Topic}.
update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo,
coap_pid = CoapPid}) ->
update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{
life_timer = LifeTimer, register_info = RegInfo,
coap_pid = CoapPid}) ->
UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo),
%% - report the registration info update, but only when objectList is updated.
case NewRegInfo of
#{<<"objectList">> := _} ->
case proplists:get_value(publish_update_when,
lwm2m_coap_responder:options(), object_list_changed) of
always ->
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
_ -> ok
object_list_changed ->
%% - report the registration info update, but only when objectList is updated.
case NewRegInfo of
#{<<"objectList">> := _} ->
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
_ -> ok
end
end,
%% - flush cached donwlink commands

View File

@ -16,6 +16,8 @@
-define(APP, emqx_rule_engine).
-define(KV_TAB, '@rule_engine_db').
-type(maybe(T) :: T | undefined).
-type(rule_id() :: binary()).

View File

@ -65,6 +65,8 @@
, action_instance_params/0
]).
-define(T_RETRY, 60000).
%%------------------------------------------------------------------------------
%% Load resource/action providers from all available applications
%%------------------------------------------------------------------------------
@ -217,7 +219,7 @@ delete_rule(RuleId) ->
catch
Error:Reason:ST ->
?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]),
refresh_actions(Actions, fun(_) -> true end)
refresh_actions(Actions)
end;
not_found ->
ok
@ -388,16 +390,8 @@ refresh_resource(Type) when is_atom(Type) ->
lists:foreach(fun refresh_resource/1,
emqx_rule_registry:get_resources_by_type(Type));
refresh_resource(#resource{id = ResId, config = Config, type = Type}) ->
{ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type),
try cluster_call(init_resource, [M, F, ResId, Config])
catch Error:Reason:ST ->
logger:critical(
"Can not re-stablish resource ~p: ~0p. The resource is disconnected."
"Fix the issue and establish it manually.\n"
"Stacktrace: ~0p",
[ResId, {Error, Reason}, ST])
end.
refresh_resource(#resource{id = ResId}) ->
emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY).
-spec(refresh_rules() -> ok).
refresh_rules() ->
@ -412,9 +406,10 @@ refresh_rules() ->
end
end, emqx_rule_registry:get_rules()).
refresh_rule(#rule{id = RuleId, actions = Actions}) ->
refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) ->
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
refresh_actions(Actions, fun(_) -> true end).
lists:foreach(fun emqx_rule_events:load/1, Topics),
refresh_actions(Actions).
-spec(refresh_resource_status() -> ok).
refresh_resource_status() ->
@ -529,10 +524,7 @@ cluster_call(Func, Args) ->
end.
init_resource(Module, OnCreate, ResId, Config) ->
Params = ?RAISE(
begin
Module:OnCreate(ResId, Config)
end,
Params = ?RAISE(Module:OnCreate(ResId, Config),
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
ResParams = #resource_params{id = ResId,
params = Params,

View File

@ -24,8 +24,6 @@
-export([stop/1]).
-define(APP, emqx_rule_engine).
start(_Type, _Args) ->
{ok, Sup} = emqx_rule_engine_sup:start_link(),
_ = emqx_rule_engine_sup:start_locker(),
@ -33,13 +31,8 @@ start(_Type, _Args) ->
ok = emqx_rule_engine:refresh_resources(),
ok = emqx_rule_engine:refresh_rules(),
ok = emqx_rule_engine_cli:load(),
ok = emqx_rule_events:load(env()),
{ok, Sup}.
stop(_State) ->
ok = emqx_rule_events:unload(env()),
ok = emqx_rule_events:unload(),
ok = emqx_rule_engine_cli:unload().
env() ->
application:get_all_env(?APP)
.

View File

@ -27,7 +27,7 @@
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
Opts = [public, named_table, set, {read_concurrency, true}],
@ -45,7 +45,13 @@ init([]) ->
shutdown => 5000,
type => worker,
modules => [emqx_rule_metrics]},
{ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
Monitor = #{id => emqx_rule_monitor,
start => {emqx_rule_monitor, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_rule_monitor]},
{ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}.
start_locker() ->
Locker = #{id => emqx_rule_locker,

View File

@ -16,12 +16,14 @@
-module(emqx_rule_events).
-include("rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[RuleEvents]").
-export([ load/1
, unload/0
, unload/1
, event_name/1
, eventmsg_publish/1
@ -60,16 +62,22 @@
]).
-endif.
load(Env) ->
lists:foreach(
fun(HookPoint) ->
ok = emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [hook_conf(HookPoint, Env)]})
end, ?SUPPORTED_HOOK).
load(Topic) ->
HookPoint = event_name(Topic),
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint),
[hook_conf(HookPoint, env())]}).
unload(_Env) ->
[emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
|| HookPoint <- ?SUPPORTED_HOOK],
ok.
unload() ->
lists:foreach(fun(HookPoint) ->
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
end, ?SUPPORTED_HOOK).
unload(Topic) ->
HookPoint = event_name(Topic),
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}).
env() ->
application:get_all_env(?APP).
%%--------------------------------------------------------------------
%% Callbacks
@ -574,17 +582,19 @@ reason(_) -> internal_error.
ntoa(undefined) -> undefined;
ntoa({IpAddr, Port}) ->
iolist_to_binary([inet:ntoa(IpAddr),":",integer_to_list(Port)]);
iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
ntoa(IpAddr) ->
iolist_to_binary(inet:ntoa(IpAddr)).
event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed';
event_name(<<"$events/session_unsubscribed", _/binary>>) ->
'session.unsubscribed';
event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'.
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
event_name(_) -> 'message.publish'.
event_topic('client.connected') -> <<"$events/client_connected">>;
event_topic('client.disconnected') -> <<"$events/client_disconnected">>;

View File

@ -16,6 +16,8 @@
-module(emqx_rule_funcs).
-include("rule_engine.hrl").
%% IoT Funcs
-export([ msgid/0
, qos/0
@ -91,6 +93,8 @@
, int/1
, float/1
, map/1
, bin2hexstr/1
, hexstr2bin/1
]).
%% Data Type Validation Funcs
@ -169,6 +173,8 @@
, base64_decode/1
, json_decode/1
, json_encode/1
, term_decode/1
, term_encode/1
]).
%% Date functions
@ -178,6 +184,16 @@
, now_timestamp/1
]).
%% Proc Dict Func
-export([ proc_dict_get/1
, proc_dict_put/2
, proc_dict_del/1
, kv_store_get/1
, kv_store_get/2
, kv_store_put/2
, kv_store_del/1
]).
-export(['$handle_undefined_function'/2]).
-compile({no_auto_import,
@ -495,6 +511,14 @@ float(Data) ->
map(Data) ->
emqx_rule_utils:map(Data).
bin2hexstr(Bin) when is_binary(Bin) ->
IntL = binary_to_list(Bin),
list_to_binary([io_lib:format("~2.16.0B", [Int]) || Int <- IntL]).
hexstr2bin(Str) when is_binary(Str) ->
list_to_binary([binary_to_integer(W, 16) || <<W:2/binary>> <= Str]).
%%------------------------------------------------------------------------------
%% NULL Funcs
%%------------------------------------------------------------------------------
@ -804,7 +828,7 @@ hexstring(<<X:256/big-unsigned-integer>>) ->
iolist_to_binary(io_lib:format("~64.16.0b", [X])).
%%------------------------------------------------------------------------------
%% Base64 Funcs
%% Data encode and decode Funcs
%%------------------------------------------------------------------------------
base64_encode(Data) when is_binary(Data) ->
@ -819,6 +843,40 @@ json_encode(Data) ->
json_decode(Data) ->
emqx_json:decode(Data, [return_maps]).
term_encode(Term) ->
erlang:term_to_binary(Term).
term_decode(Data) when is_binary(Data) ->
erlang:binary_to_term(Data).
%%------------------------------------------------------------------------------
%% Dict Funcs
%%------------------------------------------------------------------------------
-define(DICT_KEY(KEY), {'@rule_engine', KEY}).
proc_dict_get(Key) ->
erlang:get(?DICT_KEY(Key)).
proc_dict_put(Key, Val) ->
erlang:put(?DICT_KEY(Key), Val).
proc_dict_del(Key) ->
erlang:erase(?DICT_KEY(Key)).
kv_store_put(Key, Val) ->
ets:insert(?KV_TAB, {Key, Val}).
kv_store_get(Key) ->
kv_store_get(Key, undefined).
kv_store_get(Key, Default) ->
case ets:lookup(?KV_TAB, Key) of
[{_, Val}] -> Val;
_ -> Default
end.
kv_store_del(Key) ->
ets:delete(?KV_TAB, Key).
%%--------------------------------------------------------------------
%% Date functions
%%--------------------------------------------------------------------

View File

@ -0,0 +1,117 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_monitor).
-behavior(gen_server).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[Rule Monitor]").
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([ start_link/0
, stop/0
, ensure_resource_retrier/2
, retry_loop/3
]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:stop(?MODULE).
init([]) ->
_ = erlang:process_flag(trap_exit, true),
{ok, #{retryers => #{}}}.
ensure_resource_retrier(ResId, Interval) ->
gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}).
handle_call(_Msg, _From, State) ->
{reply, ok, State}.
handle_cast({create_restart_handler, Tag, Obj, Interval}, State) ->
Objects = maps:get(Tag, State, #{}),
NewState = case maps:find(Obj, Objects) of
error ->
update_object(Tag, Obj,
create_restart_handler(Tag, Obj, Interval), State);
{ok, _Pid} ->
State
end,
{noreply, NewState};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) ->
case maps:take(Pid, Retryers) of
{{Tag, Obj}, Retryers2} ->
Objects = maps:get(Tag, State, #{}),
{noreply, State#{Tag => maps:remove(Obj, Objects),
retryers => Retryers2}};
error ->
?LOG(error, "got unexpected proc down: ~p ~p", [Pid, Reason]),
{noreply, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
update_object(Tag, Obj, Retryer, State) ->
Objects = maps:get(Tag, State, #{}),
Retryers = maps:get(retryers, State, #{}),
State#{
Tag => Objects#{Obj => Retryer},
retryers => Retryers#{Retryer => {Tag, Obj}}
}.
create_restart_handler(Tag, Obj, Interval) ->
?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]),
%% spawn a dedicated process to handle the restarting asynchronously
spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]).
retry_loop(resource, ResId, Interval) ->
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = Type, config = Config}} ->
try
{ok, #resource_type{on_create = {M, F}}} =
emqx_rule_registry:find_resource_type(Type),
emqx_rule_engine:init_resource(M, F, ResId, Config)
catch
Err:Reason:ST ->
?LOG(warning, "init_resource failed: ~p, ~0p",
[{Err, Reason}, ST]),
timer:sleep(Interval),
?MODULE:retry_loop(resource, ResId, Interval)
end;
not_found ->
ok
end.

View File

@ -26,6 +26,7 @@
%% Rule Management
-export([ get_rules/0
, get_rules_for/1
, get_rules_with_same_event/1
, get_rule/1
, add_rule/1
, add_rules/1
@ -91,6 +92,8 @@
, {?RES_TAB, 'resources.count', 'resources.max'}
]).
-define(T_CALL, 10000).
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%------------------------------------------------------------------------------
@ -170,6 +173,15 @@ get_rules_for(Topic) ->
[Rule || Rule = #rule{for = For} <- get_rules(),
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_with_same_event(Topic) ->
EventName = emqx_rule_events:event_name(Topic),
[Rule || Rule = #rule{for = For} <- get_rules(),
lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)].
is_of_event_name(EventName, Topic) ->
EventName =:= emqx_rule_events:event_name(Topic).
-spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found).
get_rule(Id) ->
case mnesia:dirty_read(?RULE_TAB, Id) of
@ -179,22 +191,23 @@ get_rule(Id) ->
-spec(add_rule(emqx_rule_engine:rule()) -> ok).
add_rule(Rule) when is_record(Rule, rule) ->
trans(fun insert_rule/1, [Rule]).
add_rules([Rule]).
-spec(add_rules(list(emqx_rule_engine:rule())) -> ok).
add_rules(Rules) ->
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]).
gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL).
-spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok).
remove_rule(RuleOrId) ->
trans(fun delete_rule/1, [RuleOrId]).
remove_rules([RuleOrId]).
-spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok).
remove_rules(Rules) ->
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]).
gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
%% @private
insert_rule(Rule = #rule{}) ->
insert_rule(Rule = #rule{for = Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics),
mnesia:write(?RULE_TAB, Rule, write).
%% @private
@ -203,7 +216,14 @@ delete_rule(RuleId) when is_binary(RuleId) ->
{ok, Rule} -> delete_rule(Rule);
not_found -> ok
end;
delete_rule(Rule = #rule{}) when is_record(Rule, rule) ->
delete_rule(Rule = #rule{id = Id, for = Topics}) ->
lists:foreach(fun(Topic) ->
case get_rules_with_same_event(Topic) of
[#rule{id = Id}] -> %% we are now deleting the last rule
emqx_rule_events:unload(Topic);
_ -> ok
end
end, Topics),
mnesia:delete_object(?RULE_TAB, Rule, write).
%%------------------------------------------------------------------------------
@ -387,8 +407,18 @@ delete_resource_type(Type) ->
init([]) ->
%% Enable stats timer
ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0),
ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
{read_concurrency, true}]),
{ok, #{}}.
handle_call({add_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
{reply, ok, State};
handle_call({remove_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]),
{reply, ignored, State}.

View File

@ -69,7 +69,9 @@ groups() ->
t_resource_types_cli
]},
{funcs, [],
[t_topic_func]},
[t_topic_func,
t_kv_store
]},
{registry, [sequence],
[t_add_get_remove_rule,
t_add_get_remove_rules,
@ -77,6 +79,7 @@ groups() ->
t_update_rule,
t_get_rules_for,
t_get_rules_for_2,
t_get_rules_with_same_event,
t_add_get_remove_action,
t_add_get_remove_actions,
t_remove_actions_of,
@ -597,6 +600,14 @@ t_topic_func(_Config) ->
%%TODO:
ok.
t_kv_store(_) ->
undefined = emqx_rule_funcs:kv_store_get(<<"abc">>),
<<"not_found">> = emqx_rule_funcs:kv_store_get(<<"abc">>, <<"not_found">>),
emqx_rule_funcs:kv_store_put(<<"abc">>, 1),
1 = emqx_rule_funcs:kv_store_get(<<"abc">>),
emqx_rule_funcs:kv_store_del(<<"abc">>),
undefined = emqx_rule_funcs:kv_store_get(<<"abc">>).
%%------------------------------------------------------------------------------
%% Test cases for rule registry
%%------------------------------------------------------------------------------
@ -704,6 +715,39 @@ t_get_rules_for_2(_Config) ->
ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]),
ok.
t_get_rules_with_same_event(_Config) ->
PubT = <<"simple/1">>,
PubN = length(emqx_rule_registry:get_rules_with_same_event(PubT)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>)),
ok = emqx_rule_registry:add_rules(
[make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]),
make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]),
make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>, [<<"$events/client_disconnected">>]),
make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>, [<<"$events/session_subscribed">>]),
make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>, [<<"$events/session_unsubscribed">>]),
make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>, [<<"$events/message_delivered">>]),
make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>, [<<"$events/message_acked">>]),
make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]),
make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>])
]),
?assertEqual(PubN + 3, length(emqx_rule_registry:get_rules_with_same_event(PubT))),
?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>))),
?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>))),
ok = emqx_rule_registry:remove_rules([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]),
ok.
t_add_get_remove_action(_Config) ->
ActionName0 = 'action-debug-0',
Action0 = make_simple_action(ActionName0),

View File

@ -143,6 +143,40 @@ t_bool(_) ->
?assertEqual(false, emqx_rule_funcs:bool(<<"false">>)),
?assertError({invalid_boolean, _}, emqx_rule_funcs:bool(3)).
t_proc_dict_put_get_del(_) ->
?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)),
emqx_rule_funcs:proc_dict_put(<<"abc">>, 1),
?assertEqual(1, emqx_rule_funcs:proc_dict_get(<<"abc">>)),
emqx_rule_funcs:proc_dict_del(<<"abc">>),
?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)).
t_term_encode(_) ->
TestData = [<<"abc">>, #{a => 1}, #{<<"3">> => [1,2,4]}],
lists:foreach(fun(Data) ->
?assertEqual(Data,
emqx_rule_funcs:term_decode(
emqx_rule_funcs:term_encode(Data)))
end, TestData).
t_hexstr2bin(_) ->
?assertEqual(<<1,2>>, emqx_rule_funcs:hexstr2bin(<<"0102">>)),
?assertEqual(<<17,33>>, emqx_rule_funcs:hexstr2bin(<<"1121">>)).
t_bin2hexstr(_) ->
?assertEqual(<<"0102">>, emqx_rule_funcs:bin2hexstr(<<1,2>>)),
?assertEqual(<<"1121">>, emqx_rule_funcs:bin2hexstr(<<17,33>>)).
t_hex_convert(_) ->
?PROPTEST(hex_convert).
hex_convert() ->
?FORALL(L, list(range(0, 255)),
begin
AbitraryBin = list_to_binary(L),
AbitraryBin == emqx_rule_funcs:hexstr2bin(
emqx_rule_funcs:bin2hexstr(AbitraryBin))
end).
t_is_null(_) ->
?assertEqual(true, emqx_rule_funcs:is_null(undefined)),
?assertEqual(false, emqx_rule_funcs:is_null(a)),

View File

@ -0,0 +1,107 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_monitor_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
[ {group, resource}
].
suite() ->
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
groups() ->
[{resource, [sequence],
[ t_restart_resource
]}
].
init_per_suite(Config) ->
ok = ekka_mnesia:start(),
ok = emqx_rule_registry:mnesia(boot),
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(t_restart_resource, Config) ->
Opts = [public, named_table, set, {read_concurrency, true}],
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
ets:new(t_restart_resource, [named_table, public]),
ets:insert(t_restart_resource, {failed_count, 0}),
ets:insert(t_restart_resource, {succ_count, 0}),
Config;
init_per_testcase(_, Config) ->
Config.
end_per_testcase(t_restart_resource, Config) ->
ets:delete(t_restart_resource),
Config;
end_per_testcase(_, Config) ->
Config.
t_restart_resource(_) ->
{ok, _} = emqx_rule_monitor:start_link(),
ok = emqx_rule_registry:register_resource_types(
[#resource_type{
name = test_res_1,
provider = ?APP,
params_spec = #{},
on_create = {?MODULE, on_resource_create},
on_destroy = {?MODULE, on_resource_destroy},
on_status = {?MODULE, on_get_resource_status},
title = #{en => <<"Test Resource">>},
description = #{en => <<"Test Resource">>}}]),
ok = emqx_rule_engine:load_providers(),
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
#{type => test_res_1,
config => #{},
description => <<"debug resource">>}),
[{_, 1}] = ets:lookup(t_restart_resource, failed_count),
[{_, 0}] = ets:lookup(t_restart_resource, succ_count),
ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]),
emqx_rule_monitor:ensure_resource_retrier(ResId, 100),
timer:sleep(1000),
[{_, 5}] = ets:lookup(t_restart_resource, failed_count),
[{_, 1}] = ets:lookup(t_restart_resource, succ_count),
#{retryers := Pids} = sys:get_state(whereis(emqx_rule_monitor)),
?assertEqual(0, map_size(Pids)),
ok = emqx_rule_engine:unload_providers(),
emqx_rule_registry:remove_resource(ResId),
emqx_rule_monitor:stop(),
ok.
on_resource_create(Id, _) ->
case ets:lookup(t_restart_resource, failed_count) of
[{_, 5}] ->
ets:insert(t_restart_resource, {succ_count, 1}),
#{};
[{_, N}] ->
ets:insert(t_restart_resource, {failed_count, N+1}),
error({incorrect_params, Id})
end.
on_resource_destroy(_Id, _) -> ok.
on_get_resource_status(_Id, _) -> #{}.