From c2e1bc039b3a698179713ae97f530bc00295763c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 20 Feb 2021 17:10:10 +0800 Subject: [PATCH 1/5] feat(rule): add more sql functions #4143 (#4144) --- apps/emqx_rule_engine/include/rule_engine.hrl | 2 + apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 60 ++++++++++++++++++- .../src/emqx_rule_registry.erl | 2 + .../test/emqx_rule_engine_SUITE.erl | 12 +++- .../test/emqx_rule_funcs_SUITE.erl | 34 +++++++++++ 5 files changed, 108 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 3b344014d..97263099d 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -16,6 +16,8 @@ -define(APP, emqx_rule_engine). +-define(KV_TAB, '@rule_engine_db'). + -type(maybe(T) :: T | undefined). -type(rule_id() :: binary()). diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 10f00ce72..7354a217f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -16,6 +16,8 @@ -module(emqx_rule_funcs). +-include("rule_engine.hrl"). + %% IoT Funcs -export([ msgid/0 , qos/0 @@ -91,6 +93,8 @@ , int/1 , float/1 , map/1 + , bin2hexstr/1 + , hexstr2bin/1 ]). %% Data Type Validation Funcs @@ -169,6 +173,8 @@ , base64_decode/1 , json_decode/1 , json_encode/1 + , term_decode/1 + , term_encode/1 ]). %% Date functions @@ -178,6 +184,16 @@ , now_timestamp/1 ]). +%% Proc Dict Func + -export([ proc_dict_get/1 + , proc_dict_put/2 + , proc_dict_del/1 + , kv_store_get/1 + , kv_store_get/2 + , kv_store_put/2 + , kv_store_del/1 + ]). + -export(['$handle_undefined_function'/2]). -compile({no_auto_import, @@ -495,6 +511,14 @@ float(Data) -> map(Data) -> emqx_rule_utils:map(Data). +bin2hexstr(Bin) when is_binary(Bin) -> + IntL = binary_to_list(Bin), + list_to_binary([io_lib:format("~2.16.0B", [Int]) || Int <- IntL]). + +hexstr2bin(Str) when is_binary(Str) -> + list_to_binary([binary_to_integer(W, 16) || <> <= Str]). + + %%------------------------------------------------------------------------------ %% NULL Funcs %%------------------------------------------------------------------------------ @@ -804,7 +828,7 @@ hexstring(<>) -> iolist_to_binary(io_lib:format("~64.16.0b", [X])). %%------------------------------------------------------------------------------ -%% Base64 Funcs +%% Data encode and decode Funcs %%------------------------------------------------------------------------------ base64_encode(Data) when is_binary(Data) -> @@ -819,6 +843,40 @@ json_encode(Data) -> json_decode(Data) -> emqx_json:decode(Data, [return_maps]). +term_encode(Term) -> + erlang:term_to_binary(Term). + +term_decode(Data) when is_binary(Data) -> + erlang:binary_to_term(Data). + +%%------------------------------------------------------------------------------ +%% Dict Funcs +%%------------------------------------------------------------------------------ + +-define(DICT_KEY(KEY), {'@rule_engine', KEY}). +proc_dict_get(Key) -> + erlang:get(?DICT_KEY(Key)). + +proc_dict_put(Key, Val) -> + erlang:put(?DICT_KEY(Key), Val). + +proc_dict_del(Key) -> + erlang:erase(?DICT_KEY(Key)). + +kv_store_put(Key, Val) -> + ets:insert(?KV_TAB, {Key, Val}). + +kv_store_get(Key) -> + kv_store_get(Key, undefined). +kv_store_get(Key, Default) -> + case ets:lookup(?KV_TAB, Key) of + [{_, Val}] -> Val; + _ -> Default + end. + +kv_store_del(Key) -> + ets:delete(?KV_TAB, Key). + %%-------------------------------------------------------------------- %% Date functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index fcbcb1e20..0e5c81d4a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -387,6 +387,8 @@ delete_resource_type(Type) -> init([]) -> %% Enable stats timer ok = emqx_stats:update_interval(rule_registery_stats, fun update_stats/0), + ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, + {read_concurrency, true}]), {ok, #{}}. handle_call(Req, _From, State) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index ea0a39c24..b6e930011 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -69,7 +69,9 @@ groups() -> t_resource_types_cli ]}, {funcs, [], - [t_topic_func]}, + [t_topic_func, + t_kv_store + ]}, {registry, [sequence], [t_add_get_remove_rule, t_add_get_remove_rules, @@ -597,6 +599,14 @@ t_topic_func(_Config) -> %%TODO: ok. +t_kv_store(_) -> + undefined = emqx_rule_funcs:kv_store_get(<<"abc">>), + <<"not_found">> = emqx_rule_funcs:kv_store_get(<<"abc">>, <<"not_found">>), + emqx_rule_funcs:kv_store_put(<<"abc">>, 1), + 1 = emqx_rule_funcs:kv_store_get(<<"abc">>), + emqx_rule_funcs:kv_store_del(<<"abc">>), + undefined = emqx_rule_funcs:kv_store_get(<<"abc">>). + %%------------------------------------------------------------------------------ %% Test cases for rule registry %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index d857ac8fb..9844efd5d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -143,6 +143,40 @@ t_bool(_) -> ?assertEqual(false, emqx_rule_funcs:bool(<<"false">>)), ?assertError({invalid_boolean, _}, emqx_rule_funcs:bool(3)). +t_proc_dict_put_get_del(_) -> + ?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)), + emqx_rule_funcs:proc_dict_put(<<"abc">>, 1), + ?assertEqual(1, emqx_rule_funcs:proc_dict_get(<<"abc">>)), + emqx_rule_funcs:proc_dict_del(<<"abc">>), + ?assertEqual(undefined, emqx_rule_funcs:proc_dict_get(<<"abc">>)). + +t_term_encode(_) -> + TestData = [<<"abc">>, #{a => 1}, #{<<"3">> => [1,2,4]}], + lists:foreach(fun(Data) -> + ?assertEqual(Data, + emqx_rule_funcs:term_decode( + emqx_rule_funcs:term_encode(Data))) + end, TestData). + +t_hexstr2bin(_) -> + ?assertEqual(<<1,2>>, emqx_rule_funcs:hexstr2bin(<<"0102">>)), + ?assertEqual(<<17,33>>, emqx_rule_funcs:hexstr2bin(<<"1121">>)). + +t_bin2hexstr(_) -> + ?assertEqual(<<"0102">>, emqx_rule_funcs:bin2hexstr(<<1,2>>)), + ?assertEqual(<<"1121">>, emqx_rule_funcs:bin2hexstr(<<17,33>>)). + +t_hex_convert(_) -> + ?PROPTEST(hex_convert). + +hex_convert() -> + ?FORALL(L, list(range(0, 255)), + begin + AbitraryBin = list_to_binary(L), + AbitraryBin == emqx_rule_funcs:hexstr2bin( + emqx_rule_funcs:bin2hexstr(AbitraryBin)) + end). + t_is_null(_) -> ?assertEqual(true, emqx_rule_funcs:is_null(undefined)), ?assertEqual(false, emqx_rule_funcs:is_null(a)), From 1e047e84c24df6a5d41234fdde0a9763f5d66fcb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 20 Feb 2021 17:10:50 +0800 Subject: [PATCH 2/5] feat(resource): keep restart disconnected resources after emqx bootup (#4125) * feat(resource): keep restart disconnected resources after emqx bootup * feat(resource): improve the restart monitor * fix(test): improve emqx_rule_monitor_SUITE * fix(resource): refresh resource should be only applied on local node * fix(test): improve the test case for restart_resource * fix(resource): rename some functions --- .../emqx_rule_engine/src/emqx_rule_engine.erl | 23 ++-- .../src/emqx_rule_engine_sup.erl | 10 +- .../src/emqx_rule_monitor.erl | 117 ++++++++++++++++++ .../test/emqx_rule_monitor_SUITE.erl | 107 ++++++++++++++++ 4 files changed, 239 insertions(+), 18 deletions(-) create mode 100644 apps/emqx_rule_engine/src/emqx_rule_monitor.erl create mode 100644 apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 3b82c915c..46fb20666 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -65,6 +65,8 @@ , action_instance_params/0 ]). +-define(T_RETRY, 60000). + %%------------------------------------------------------------------------------ %% Load resource/action providers from all available applications %%------------------------------------------------------------------------------ @@ -217,7 +219,7 @@ delete_rule(RuleId) -> catch Error:Reason:ST -> ?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]), - refresh_actions(Actions, fun(_) -> true end) + refresh_actions(Actions) end; not_found -> ok @@ -388,16 +390,8 @@ refresh_resource(Type) when is_atom(Type) -> lists:foreach(fun refresh_resource/1, emqx_rule_registry:get_resources_by_type(Type)); -refresh_resource(#resource{id = ResId, config = Config, type = Type}) -> - {ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type), - try cluster_call(init_resource, [M, F, ResId, Config]) - catch Error:Reason:ST -> - logger:critical( - "Can not re-stablish resource ~p: ~0p. The resource is disconnected." - "Fix the issue and establish it manually.\n" - "Stacktrace: ~0p", - [ResId, {Error, Reason}, ST]) - end. +refresh_resource(#resource{id = ResId}) -> + emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY). -spec(refresh_rules() -> ok). refresh_rules() -> @@ -414,7 +408,7 @@ refresh_rules() -> refresh_rule(#rule{id = RuleId, actions = Actions}) -> ok = emqx_rule_metrics:create_rule_metrics(RuleId), - refresh_actions(Actions, fun(_) -> true end). + refresh_actions(Actions). -spec(refresh_resource_status() -> ok). refresh_resource_status() -> @@ -529,10 +523,7 @@ cluster_call(Func, Args) -> end. init_resource(Module, OnCreate, ResId, Config) -> - Params = ?RAISE( - begin - Module:OnCreate(ResId, Config) - end, + Params = ?RAISE(Module:OnCreate(ResId, Config), {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}), ResParams = #resource_params{id = ResId, params = Params, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 6f2e0018f..d561340bb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -27,7 +27,7 @@ -export([init/1]). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). + supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> Opts = [public, named_table, set, {read_concurrency, true}], @@ -45,7 +45,13 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_rule_metrics]}, - {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}. + Monitor = #{id => emqx_rule_monitor, + start => {emqx_rule_monitor, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_rule_monitor]}, + {ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}. start_locker() -> Locker = #{id => emqx_rule_locker, diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl new file mode 100644 index 000000000..b4c1ff2fc --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl @@ -0,0 +1,117 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_monitor). + +-behavior(gen_server). + +-include("rule_engine.hrl"). +-include_lib("emqx/include/logger.hrl"). +-logger_header("[Rule Monitor]"). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-export([ start_link/0 + , stop/0 + , ensure_resource_retrier/2 + , retry_loop/3 + ]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +stop() -> + gen_server:stop(?MODULE). + +init([]) -> + _ = erlang:process_flag(trap_exit, true), + {ok, #{retryers => #{}}}. + +ensure_resource_retrier(ResId, Interval) -> + gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). + +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + +handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> + Objects = maps:get(Tag, State, #{}), + NewState = case maps:find(Obj, Objects) of + error -> + update_object(Tag, Obj, + create_restart_handler(Tag, Obj, Interval), State); + {ok, _Pid} -> + State + end, + {noreply, NewState}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) -> + case maps:take(Pid, Retryers) of + {{Tag, Obj}, Retryers2} -> + Objects = maps:get(Tag, State, #{}), + {noreply, State#{Tag => maps:remove(Obj, Objects), + retryers => Retryers2}}; + error -> + ?LOG(error, "got unexpected proc down: ~p ~p", [Pid, Reason]), + {noreply, State} + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +update_object(Tag, Obj, Retryer, State) -> + Objects = maps:get(Tag, State, #{}), + Retryers = maps:get(retryers, State, #{}), + State#{ + Tag => Objects#{Obj => Retryer}, + retryers => Retryers#{Retryer => {Tag, Obj}} + }. + +create_restart_handler(Tag, Obj, Interval) -> + ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]), + %% spawn a dedicated process to handle the restarting asynchronously + spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). + +retry_loop(resource, ResId, Interval) -> + case emqx_rule_registry:find_resource(ResId) of + {ok, #resource{type = Type, config = Config}} -> + try + {ok, #resource_type{on_create = {M, F}}} = + emqx_rule_registry:find_resource_type(Type), + emqx_rule_engine:init_resource(M, F, ResId, Config) + catch + Err:Reason:ST -> + ?LOG(warning, "init_resource failed: ~p, ~0p", + [{Err, Reason}, ST]), + timer:sleep(Interval), + ?MODULE:retry_loop(resource, ResId, Interval) + end; + not_found -> + ok + end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl new file mode 100644 index 000000000..fe44d8df7 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl @@ -0,0 +1,107 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_monitor_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("emqx/include/emqx.hrl"). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [ {group, resource} + ]. + +suite() -> + [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. + +groups() -> + [{resource, [sequence], + [ t_restart_resource + ]} + ]. + +init_per_suite(Config) -> + ok = ekka_mnesia:start(), + ok = emqx_rule_registry:mnesia(boot), + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(t_restart_resource, Config) -> + Opts = [public, named_table, set, {read_concurrency, true}], + _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), + ets:new(t_restart_resource, [named_table, public]), + ets:insert(t_restart_resource, {failed_count, 0}), + ets:insert(t_restart_resource, {succ_count, 0}), + Config; + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(t_restart_resource, Config) -> + ets:delete(t_restart_resource), + Config; +end_per_testcase(_, Config) -> + Config. + +t_restart_resource(_) -> + {ok, _} = emqx_rule_monitor:start_link(), + ok = emqx_rule_registry:register_resource_types( + [#resource_type{ + name = test_res_1, + provider = ?APP, + params_spec = #{}, + on_create = {?MODULE, on_resource_create}, + on_destroy = {?MODULE, on_resource_destroy}, + on_status = {?MODULE, on_get_resource_status}, + title = #{en => <<"Test Resource">>}, + description = #{en => <<"Test Resource">>}}]), + ok = emqx_rule_engine:load_providers(), + {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( + #{type => test_res_1, + config => #{}, + description => <<"debug resource">>}), + [{_, 1}] = ets:lookup(t_restart_resource, failed_count), + [{_, 0}] = ets:lookup(t_restart_resource, succ_count), + ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]), + emqx_rule_monitor:ensure_resource_retrier(ResId, 100), + timer:sleep(1000), + [{_, 5}] = ets:lookup(t_restart_resource, failed_count), + [{_, 1}] = ets:lookup(t_restart_resource, succ_count), + #{retryers := Pids} = sys:get_state(whereis(emqx_rule_monitor)), + ?assertEqual(0, map_size(Pids)), + ok = emqx_rule_engine:unload_providers(), + emqx_rule_registry:remove_resource(ResId), + emqx_rule_monitor:stop(), + ok. + +on_resource_create(Id, _) -> + case ets:lookup(t_restart_resource, failed_count) of + [{_, 5}] -> + ets:insert(t_restart_resource, {succ_count, 1}), + #{}; + [{_, N}] -> + ets:insert(t_restart_resource, {failed_count, N+1}), + error({incorrect_params, Id}) + end. +on_resource_destroy(_Id, _) -> ok. +on_get_resource_status(_Id, _) -> #{}. From 1be62b7cbbf9194588bd2875e5057a039cff7f76 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 20 Feb 2021 17:14:14 +0800 Subject: [PATCH 3/5] feat(lwm2m): always publish update message (#4201) * feat(lwm2m): always publish update message * fix(lwm2m): change the publish_update_when to enum --- apps/emqx_lwm2m/etc/emqx_lwm2m.conf | 9 +++++++++ apps/emqx_lwm2m/priv/emqx_lwm2m.schema | 5 +++++ .../emqx_lwm2m/src/emqx_lwm2m_coap_server.erl | 2 ++ apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl | 20 +++++++++++++------ 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/apps/emqx_lwm2m/etc/emqx_lwm2m.conf b/apps/emqx_lwm2m/etc/emqx_lwm2m.conf index c9baf6feb..83111c956 100644 --- a/apps/emqx_lwm2m/etc/emqx_lwm2m.conf +++ b/apps/emqx_lwm2m/etc/emqx_lwm2m.conf @@ -41,6 +41,15 @@ lwm2m.topics.register = up/resp # The topic to which the lwm2m client's update message is published lwm2m.topics.update = up/resp +# When publish the update message. +# +# Can be one of: +# - object_list_changed: only if the object list is changed +# - always: always publish the update message +# +# Defaults to object_list_changed +#lwm2m.publish_update_when = object_list_changed + # Dir where the object definition files can be found lwm2m.xml_dir = {{ platform_etc_dir }}/lwm2m_xml diff --git a/apps/emqx_lwm2m/priv/emqx_lwm2m.schema b/apps/emqx_lwm2m/priv/emqx_lwm2m.schema index 1cd89824e..f15269833 100644 --- a/apps/emqx_lwm2m/priv/emqx_lwm2m.schema +++ b/apps/emqx_lwm2m/priv/emqx_lwm2m.schema @@ -112,6 +112,11 @@ end}. {default, "lwm2m/%e/up/resp"} ]}. +{mapping, "lwm2m.publish_update_when", "emqx_lwm2m.publish_update_when", [ + {datatype, {enum, [object_list_changed, always]}}, + {default, object_list_changed} +]}. + {translation, "emqx_lwm2m.topics", fun(Conf) -> Topics = cuttlefish_variable:filter_by_prefix("lwm2m.topics", Conf), Opts = lists:map(fun({[_,_, Key], Value}) -> diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl index 47ea6a2ba..ed5743203 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_coap_server.erl @@ -101,10 +101,12 @@ get_lwm2m_opts(Envs) -> AutoObserve = proplists:get_value(auto_observe, Envs, []), QmodeTimeWindow = proplists:get_value(qmode_time_window, Envs, []), Topics = proplists:get_value(topics, Envs, []), + PublishUpdateWhen = proplists:get_value(publish_update_when, Envs, object_list_changed), [{lifetime_max, LifetimeMax}, {lifetime_min, LifetimeMin}, {mountpoint, list_to_binary(Mountpoint)}, {port, Sockport}, {auto_observe, AutoObserve}, {qmode_time_window, QmodeTimeWindow}, + {publish_update_when, PublishUpdateWhen}, {topics, Topics}]. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 1fd3f5c54..c571e7cb0 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -115,15 +115,23 @@ post_init(Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName, _ = send_to_broker(<<"register">>, #{<<"data">> => RegInfo}, Lwm2mState), Lwm2mState#lwm2m_state{mqtt_topic = Topic}. -update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, register_info = RegInfo, - coap_pid = CoapPid}) -> +update_reg_info(NewRegInfo, Lwm2mState = #lwm2m_state{ + life_timer = LifeTimer, register_info = RegInfo, + coap_pid = CoapPid}) -> + UpdatedRegInfo = maps:merge(RegInfo, NewRegInfo), - %% - report the registration info update, but only when objectList is updated. - case NewRegInfo of - #{<<"objectList">> := _} -> + case proplists:get_value(publish_update_when, + lwm2m_coap_responder:options(), object_list_changed) of + always -> send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState); - _ -> ok + object_list_changed -> + %% - report the registration info update, but only when objectList is updated. + case NewRegInfo of + #{<<"objectList">> := _} -> + send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState); + _ -> ok + end end, %% - flush cached donwlink commands From 7778cd8623aa55929c0eeafe55b6019dae961251 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 22 Feb 2021 11:16:47 +0800 Subject: [PATCH 4/5] Add hooks only when creating the rules (#4160) * refactor(rules): add hook only when creating rules * fix(rule): update hooks after application restarted * fix(rule): remove the extra guard --- .../emqx_rule_engine/src/emqx_rule_engine.erl | 3 +- .../src/emqx_rule_engine_app.erl | 9 +---- .../emqx_rule_engine/src/emqx_rule_events.erl | 34 ++++++++++------ .../src/emqx_rule_registry.erl | 40 ++++++++++++++++--- .../test/emqx_rule_engine_SUITE.erl | 34 ++++++++++++++++ 5 files changed, 93 insertions(+), 27 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 46fb20666..3621a2442 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -406,8 +406,9 @@ refresh_rules() -> end end, emqx_rule_registry:get_rules()). -refresh_rule(#rule{id = RuleId, actions = Actions}) -> +refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) -> ok = emqx_rule_metrics:create_rule_metrics(RuleId), + lists:foreach(fun emqx_rule_events:load/1, Topics), refresh_actions(Actions). -spec(refresh_resource_status() -> ok). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index b244f8323..ad9e5ba37 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -24,8 +24,6 @@ -export([stop/1]). --define(APP, emqx_rule_engine). - start(_Type, _Args) -> {ok, Sup} = emqx_rule_engine_sup:start_link(), _ = emqx_rule_engine_sup:start_locker(), @@ -33,13 +31,8 @@ start(_Type, _Args) -> ok = emqx_rule_engine:refresh_resources(), ok = emqx_rule_engine:refresh_rules(), ok = emqx_rule_engine_cli:load(), - ok = emqx_rule_events:load(env()), {ok, Sup}. stop(_State) -> - ok = emqx_rule_events:unload(env()), + ok = emqx_rule_events:unload(), ok = emqx_rule_engine_cli:unload(). - -env() -> - application:get_all_env(?APP) - . diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index c6df377bb..fc3096f6a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -16,12 +16,14 @@ -module(emqx_rule_events). +-include("rule_engine.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -logger_header("[RuleEvents]"). -export([ load/1 + , unload/0 , unload/1 , event_name/1 , eventmsg_publish/1 @@ -60,16 +62,22 @@ ]). -endif. -load(Env) -> - lists:foreach( - fun(HookPoint) -> - ok = emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [hook_conf(HookPoint, Env)]}) - end, ?SUPPORTED_HOOK). +load(Topic) -> + HookPoint = event_name(Topic), + emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), + [hook_conf(HookPoint, env())]}). -unload(_Env) -> - [emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}) - || HookPoint <- ?SUPPORTED_HOOK], - ok. +unload() -> + lists:foreach(fun(HookPoint) -> + emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}) + end, ?SUPPORTED_HOOK). + +unload(Topic) -> + HookPoint = event_name(Topic), + emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}). + +env() -> + application:get_all_env(?APP). %%-------------------------------------------------------------------- %% Callbacks @@ -574,17 +582,19 @@ reason(_) -> internal_error. ntoa(undefined) -> undefined; ntoa({IpAddr, Port}) -> - iolist_to_binary([inet:ntoa(IpAddr),":",integer_to_list(Port)]); + iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected'; event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected'; event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed'; -event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed'; +event_name(<<"$events/session_unsubscribed", _/binary>>) -> + 'session.unsubscribed'; event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; -event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'. +event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; +event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 0e5c81d4a..3e4e4b74c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -26,6 +26,7 @@ %% Rule Management -export([ get_rules/0 , get_rules_for/1 + , get_rules_with_same_event/1 , get_rule/1 , add_rule/1 , add_rules/1 @@ -91,6 +92,8 @@ , {?RES_TAB, 'resources.count', 'resources.max'} ]). +-define(T_CALL, 10000). + %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ @@ -170,6 +173,15 @@ get_rules_for(Topic) -> [Rule || Rule = #rule{for = For} <- get_rules(), emqx_rule_utils:can_topic_match_oneof(Topic, For)]. +-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())). +get_rules_with_same_event(Topic) -> + EventName = emqx_rule_events:event_name(Topic), + [Rule || Rule = #rule{for = For} <- get_rules(), + lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)]. + +is_of_event_name(EventName, Topic) -> + EventName =:= emqx_rule_events:event_name(Topic). + -spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found). get_rule(Id) -> case mnesia:dirty_read(?RULE_TAB, Id) of @@ -179,22 +191,23 @@ get_rule(Id) -> -spec(add_rule(emqx_rule_engine:rule()) -> ok). add_rule(Rule) when is_record(Rule, rule) -> - trans(fun insert_rule/1, [Rule]). + add_rules([Rule]). -spec(add_rules(list(emqx_rule_engine:rule())) -> ok). add_rules(Rules) -> - trans(fun lists:foreach/2, [fun insert_rule/1, Rules]). + gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL). -spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok). remove_rule(RuleOrId) -> - trans(fun delete_rule/1, [RuleOrId]). + remove_rules([RuleOrId]). -spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok). remove_rules(Rules) -> - trans(fun lists:foreach/2, [fun delete_rule/1, Rules]). + gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). %% @private -insert_rule(Rule = #rule{}) -> +insert_rule(Rule = #rule{for = Topics}) -> + lists:foreach(fun emqx_rule_events:load/1, Topics), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -203,7 +216,14 @@ delete_rule(RuleId) when is_binary(RuleId) -> {ok, Rule} -> delete_rule(Rule); not_found -> ok end; -delete_rule(Rule = #rule{}) when is_record(Rule, rule) -> +delete_rule(Rule = #rule{id = Id, for = Topics}) -> + lists:foreach(fun(Topic) -> + case get_rules_with_same_event(Topic) of + [#rule{id = Id}] -> %% we are now deleting the last rule + emqx_rule_events:unload(Topic); + _ -> ok + end + end, Topics), mnesia:delete_object(?RULE_TAB, Rule, write). %%------------------------------------------------------------------------------ @@ -391,6 +411,14 @@ init([]) -> {read_concurrency, true}]), {ok, #{}}. +handle_call({add_rules, Rules}, _From, State) -> + trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), + {reply, ok, State}; + +handle_call({remove_rules, Rules}, _From, State) -> + trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), + {reply, ok, State}; + handle_call(Req, _From, State) -> ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), {reply, ignored, State}. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index b6e930011..ff78275c0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -79,6 +79,7 @@ groups() -> t_update_rule, t_get_rules_for, t_get_rules_for_2, + t_get_rules_with_same_event, t_add_get_remove_action, t_add_get_remove_actions, t_remove_actions_of, @@ -714,6 +715,39 @@ t_get_rules_for_2(_Config) -> ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]), ok. +t_get_rules_with_same_event(_Config) -> + PubT = <<"simple/1">>, + PubN = length(emqx_rule_registry:get_rules_with_same_event(PubT)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>)), + ok = emqx_rule_registry:add_rules( + [make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]), + make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]), + make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]), + make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>, [<<"$events/client_disconnected">>]), + make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>, [<<"$events/session_subscribed">>]), + make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>, [<<"$events/session_unsubscribed">>]), + make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>, [<<"$events/message_delivered">>]), + make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>, [<<"$events/message_acked">>]), + make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]), + make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>]) + ]), + ?assertEqual(PubN + 3, length(emqx_rule_registry:get_rules_with_same_event(PubT))), + ?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>))), + ?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>))), + ok = emqx_rule_registry:remove_rules([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]), + ok. + t_add_get_remove_action(_Config) -> ActionName0 = 'action-debug-0', Action0 = make_simple_action(ActionName0), From 2c9ea3c29e8bce79601891ab599eea08503d30dc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 20 Feb 2021 11:39:01 +0800 Subject: [PATCH 5/5] chore(jwt): clearer explanation for verify_claims --- apps/emqx_auth_jwt/etc/emqx_auth_jwt.conf | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/emqx_auth_jwt/etc/emqx_auth_jwt.conf b/apps/emqx_auth_jwt/etc/emqx_auth_jwt.conf index 5a599ca23..748902f9f 100644 --- a/apps/emqx_auth_jwt/etc/emqx_auth_jwt.conf +++ b/apps/emqx_auth_jwt/etc/emqx_auth_jwt.conf @@ -36,10 +36,14 @@ auth.jwt.verify_claims = off ## The checklist of claims to validate ## -## Value: String -## auth.jwt.verify_claims.$name = expected +## Configuration format: auth.jwt.verify_claims.$name = $expected +## - $name: the name of the field in the JWT payload to be verified +## - $expected: the expected value ## -## Variables: -## - %u: username -## - %c: clientid +## The available placeholders for $expected: +## - %u: username +## - %c: clientid +## +## For example, to verify that the username in the JWT payload is the same +## as the client (MQTT protocol) username #auth.jwt.verify_claims.username = %u