From cf56050759d96806b1e1cb39a564cdf7f3acbb44 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 17 Apr 2024 16:12:12 +0200 Subject: [PATCH] feat: avoid mixing request with and without the stop_after_render flag Previously a batch of requests that was sent to a connector could contain both requests with the stop_after_rendering flag and requests without this flag. When this happened a warning message was generated and the stop_after_render flags for the batch would be ignored. This commit fixes so that a mixed batch is never created so there is no longer any need for a warning message or ignoring flags. --- apps/emqx_bridge/src/emqx_action_info.erl | 10 +- .../src/emqx_connector_info.erl | 10 +- .../src/emqx_resource_buffer_worker.erl | 110 +++++++---- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 182 ++++++++++++++++-- .../emqx_rule_engine_test_action_info.erl | 101 ++++++++++ .../test/emqx_rule_engine_test_connector.erl | 99 ++++++++++ .../emqx_rule_engine_test_connector_info.erl | 43 +++++ mix.exs | 2 +- rebar.config | 2 +- 9 files changed, 505 insertions(+), 54 deletions(-) create mode 100644 apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl create mode 100644 apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl create mode 100644 apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 464b2e429..a8aaf9fdd 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -41,6 +41,9 @@ ]). -export([clean_cache/0]). +%% For tests +-export([hard_coded_test_action_info_modules/0]). + -callback bridge_v1_type_name() -> atom() | { @@ -128,8 +131,13 @@ hard_coded_action_info_modules_common() -> emqx_bridge_mqtt_pubsub_action_info ]. +%% This exists so that it can be mocked for test cases +hard_coded_test_action_info_modules() -> []. + hard_coded_action_info_modules() -> - hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee(). + hard_coded_action_info_modules_common() ++ + hard_coded_action_info_modules_ee() ++ + ?MODULE:hard_coded_test_action_info_modules(). %% ==================================================================== %% API diff --git a/apps/emqx_connector/src/emqx_connector_info.erl b/apps/emqx_connector/src/emqx_connector_info.erl index 766f34168..e87c2ad7e 100644 --- a/apps/emqx_connector/src/emqx_connector_info.erl +++ b/apps/emqx_connector/src/emqx_connector_info.erl @@ -31,6 +31,9 @@ -export([clean_cache/0]). +%% For tests +-export([hard_coded_test_connector_info_modules/0]). + %% The type name for the conncector -callback type_name() -> atom(). @@ -117,8 +120,13 @@ hard_coded_connector_info_modules_common() -> emqx_bridge_mqtt_pubsub_connector_info ]. +%% This exists so that it can be mocked for test cases +hard_coded_test_connector_info_modules() -> []. + hard_coded_connector_info_modules() -> - hard_coded_connector_info_modules_common() ++ hard_coded_connector_info_modules_ee(). + hard_coded_connector_info_modules_common() ++ + hard_coded_connector_info_modules_ee() ++ + ?MODULE:hard_coded_test_connector_info_modules(). %% -------------------------------------------------------------------- %% Atom macros to avoid typos diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 6dfcde88c..24eb86d37 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -583,7 +583,11 @@ flush(Data0) -> {keep_state, Data1}; {_, false} -> ?tp(buffer_worker_flush_before_pop, #{}), - {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), + PopOpts = #{ + count_limit => BatchSize, + stop_before => {fun stop_before_mixed_stop_after_render/2, initial_state} + }, + {Q1, QAckRef, Batch} = replayq:pop(Q0, PopOpts), Data2 = Data1#{queue := Q1}, ?tp(buffer_worker_flush_before_sieve_expired, #{}), Now = now_(), @@ -619,6 +623,73 @@ flush(Data0) -> end end. +stop_before_mixed_stop_after_render( + ?QUERY( + _, + _, + _, + _, + #{stop_action_after_render := true} = _TraceCtx + ), + initial_state +) -> + stop_action_after_render; +stop_before_mixed_stop_after_render( + ?QUERY( + _, + _, + _, + _, + _TraceCtx + ), + initial_state +) -> + no_stop_action_after_render; +stop_before_mixed_stop_after_render( + ?QUERY( + _, + _, + _, + _, + #{stop_action_after_render := true} = _TraceCtx + ), + no_stop_action_after_render +) -> + true; +stop_before_mixed_stop_after_render( + ?QUERY( + _, + _, + _, + _, + #{stop_action_after_render := true} = _TraceCtx + ), + stop_action_after_render +) -> + stop_action_after_render; +stop_before_mixed_stop_after_render( + ?QUERY( + _, + _, + _, + _, + _TraceCtx + ), + stop_action_after_render +) -> + true; +stop_before_mixed_stop_after_render( + ?QUERY( + _, + _, + _, + _, + _TraceCtx + ), + State +) -> + State. + -spec do_flush(data(), #{ is_batch := boolean(), batch := [queue_query()], @@ -1119,25 +1190,13 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> %% Get the rule ids from requests RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests), ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), - StopAfterRender = lists:foldl(fun collect_stop_after_render/2, no_info, Requests), StopAfterRenderVal = - case StopAfterRender of - only_true -> - logger:update_process_metadata(#{stop_action_after_render => false}), + case Requests of + %% We know that the batch is not mixed since we prevent this by + %% using a stop_after function in the replayq:pop call + [?QUERY(_, _, _, _, #{stop_action_after_render := true}) | _] -> true; - only_false -> - false; - mixed -> - ?TRACE( - warning, - "ACTION", - "mixed_stop_action_after_render_batch " - "(A batch will be sent to connector where some but " - "not all requests has stop_action_after_render set. " - "The batch will get assigned " - "stop_action_after_render = false)", - #{rule_ids => RuleIDs, client_ids => ClientIDs} - ), + [?QUERY(_, _, _, _, _TraceCTX) | _] -> false end, logger:update_process_metadata(#{ @@ -1158,21 +1217,6 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) -> collect_client_id(?QUERY(_, _, _, _, _), Acc) -> Acc. -collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), no_info) -> - only_true; -collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), only_true) -> - only_true; -collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), only_false) -> - mixed; -collect_stop_after_render(?QUERY(_, _, _, _, _), no_info) -> - only_false; -collect_stop_after_render(?QUERY(_, _, _, _, _), only_true) -> - mixed; -collect_stop_after_render(?QUERY(_, _, _, _, _), only_false) -> - only_false; -collect_stop_after_render(?QUERY(_, _, _, _, _), mixed) -> - mixed. - unset_rule_id_trace_meta_data() -> logger:update_process_metadata(#{ rule_ids => #{}, client_ids => #{}, stop_action_after_render => false diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index 576806464..e73c2c44d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -94,9 +94,6 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> %% =================================== %% Create trace for RuleId %% =================================== - Now = erlang:system_time(second) - 10, - Start = Now, - End = Now + 60, TraceName = atom_to_binary(?FUNCTION_NAME), TraceValue = case TraceType of @@ -105,16 +102,7 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> clientid -> ClientId end, - Trace = #{ - name => TraceName, - type => TraceType, - TraceType => TraceValue, - start_at => Start, - end_at => End - }, - emqx_trace_SUITE:reload(), - ok = emqx_trace:clear(), - {ok, _} = emqx_trace:create(Trace), + create_trace(TraceName, TraceType, TraceValue), %% =================================== Context = #{ clientid => ClientId, @@ -125,13 +113,12 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> username => <<"u_emqx">> }, Params = #{ - % body => #{ <<"context">> => Context, <<"stop_action_after_template_rendering">> => StopAfterRender - % } }, emqx_trace:check(), ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType), + Now = erlang:system_time(second) - 10, {ok, _} = file:read_file(emqx_trace:log_file(TraceName, Now)), ?assertMatch({ok, _}, call_apply_rule_api(RuleId, Params)), ?retry( @@ -173,14 +160,175 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> emqx_trace:delete(TraceName), ok. +create_trace(TraceName, TraceType, TraceValue) -> + Now = erlang:system_time(second) - 10, + Start = Now, + End = Now + 60, + Trace = #{ + name => TraceName, + type => TraceType, + TraceType => TraceValue, + start_at => Start, + end_at => End + }, + emqx_trace_SUITE:reload(), + ok = emqx_trace:clear(), + {ok, _} = emqx_trace:create(Trace). + +t_apply_rule_test_batch_separation_stop_after_render(_Config) -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + catch meck:new(emqx_connector_info, MeckOpts), + meck:expect( + emqx_connector_info, + hard_coded_test_connector_info_modules, + 0, + [emqx_rule_engine_test_connector_info] + ), + emqx_connector_info:clean_cache(), + catch meck:new(emqx_action_info, MeckOpts), + meck:expect( + emqx_action_info, + hard_coded_test_action_info_modules, + 0, + [emqx_rule_engine_test_action_info] + ), + emqx_action_info:clean_cache(), + {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), + Name = atom_to_binary(?FUNCTION_NAME), + ActionConf = + #{ + <<"connector">> => Name, + <<"parameters">> => + #{ + <<"values">> => + #{ + <<"send_to_pid">> => emqx_utils:bin_to_hexstr( + term_to_binary(self()), upper + ) + } + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1000, + <<"batch_time">> => 500 + } + }, + {ok, _} = emqx_bridge_v2:create( + rule_engine_test, + ?FUNCTION_NAME, + ActionConf + ), + SQL = <<"SELECT payload.is_stop_after_render as stop_after_render FROM \"", Name/binary, "\"">>, + {ok, RuleID} = create_rule_with_action( + rule_engine_test, + ?FUNCTION_NAME, + SQL + ), + create_trace(Name, ruleid, RuleID), + emqx_trace:check(), + ok = emqx_trace_handler_SUITE:filesync(Name, ruleid), + Now = erlang:system_time(second) - 10, + %% Stop + ParmsStopAfterRender = apply_rule_parms(true, Name), + ParmsNoStopAfterRender = apply_rule_parms(false, Name), + %% Check that batching is working + Count = 400, + CountMsgFun = + fun + CountMsgFunRec(0 = _CurCount, GotBatchWithAtLeastTwo) -> + GotBatchWithAtLeastTwo; + CountMsgFunRec(CurCount, GotBatchWithAtLeastTwo) -> + receive + List -> + Len = length(List), + CountMsgFunRec(CurCount - Len, GotBatchWithAtLeastTwo orelse (Len > 1)) + end + end, + lists:foreach( + fun(_) -> + {ok, _} = call_apply_rule_api(RuleID, ParmsStopAfterRender) + end, + lists:seq(1, Count) + ), + %% We should get the messages and at least one batch with more than 1 + true = CountMsgFun(Count, false), + %% We should check that we don't get any mixed batch + CheckBatchesFun = + fun + CheckBatchesFunRec(0 = _CurCount) -> + ok; + CheckBatchesFunRec(CurCount) -> + receive + [{_, #{<<"stop_after_render">> := StopValue}} | _] = List -> + [ + ?assertMatch(#{<<"stop_after_render">> := StopValue}, Msg) + || {_, Msg} <- List + ], + Len = length(List), + CheckBatchesFunRec(CurCount - Len) + end + end, + lists:foreach( + fun(_) -> + case rand:normal() < 0 of + true -> + {ok, _} = call_apply_rule_api(RuleID, ParmsStopAfterRender); + false -> + {ok, _} = call_apply_rule_api(RuleID, ParmsNoStopAfterRender) + end + end, + lists:seq(1, Count) + ), + CheckBatchesFun(Count), + %% Just check that the log file is created as expected + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + begin + Bin = read_rule_trace_file(Name, ruleid, Now), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])) + end + ), + ok. + +apply_rule_parms(StopAfterRender, Name) -> + Payload = #{<<"is_stop_after_render">> => StopAfterRender}, + Context = #{ + clientid => Name, + event_type => message_publish, + payload => emqx_utils_json:encode(Payload), + qos => 1, + topic => Name, + username => <<"u_emqx">> + }, + #{ + <<"context">> => Context, + <<"stop_action_after_template_rendering">> => StopAfterRender + }. + +create_rule_with_action(ActionType, ActionName, SQL) -> + BridgeId = emqx_bridge_resource:bridge_id(ActionType, ActionName), + Params = #{ + enable => true, + sql => SQL, + actions => [BridgeId] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ct:pal("rule action params: ~p", [Params]), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res0} -> + #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]), + {ok, RuleId}; + Error -> + Error + end. + %% Helper Functions call_apply_rule_api(RuleId, Params) -> Method = post, Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId, "test"]), - ct:pal("sql test (http):\n ~p", [Params]), Res = request(Method, Path, Params), - ct:pal("sql test (http) result:\n ~p", [Res]), Res. request(Method, Path, Params) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl new file mode 100644 index 000000000..91bbcb442 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl @@ -0,0 +1,101 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_test_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(CONNECTOR_TYPE, rule_engine_test). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + +bridge_v1_type_name() -> ?ACTION_TYPE. + +action_type_name() -> ?ACTION_TYPE. + +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> emqx_rule_engine_test_action_info. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions + +namespace() -> "bridge_test_action_info". + +roots() -> []. + +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(rule_engine_test_action)); +fields(action) -> + {?ACTION_TYPE, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, rule_engine_test_action)), + #{ + desc => <<"Test Action Config">>, + required => false + } + )}; +fields(rule_engine_test_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => undefined + } + ) + ); +fields(action_parameters) -> + [ + {values, + hoconsc:mk( + typerefl:map(), + #{desc => undefined, default => #{}} + )} + ]; +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + fields(connector_fields) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields("config") -> + emqx_resource_schema:fields("resource_opts") ++ + fields(connector_fields); +fields(connector_fields) -> + [ + {values, + hoconsc:mk( + typerefl:map(), + #{desc => undefined, default => #{}} + )} + ]. +desc(_) -> + undefined. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl new file mode 100644 index 000000000..5633ed27e --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_rule_engine_test_connector). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 +]). + +%% =================================================================== +callback_mode() -> always_sync. + +on_start( + _InstId, + _Config +) -> + {ok, #{installed_channels => #{}}}. + +on_stop(_InstId, _State) -> + ok. + +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId, + ChannelConfig +) -> + NewInstalledChannels = maps:put(ChannelId, ChannelConfig, InstalledChannels), + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_remove_channel( + _InstId, + OldState, + _ChannelId +) -> + {ok, OldState}. + +on_get_channel_status( + _ResId, + _ChannelId, + _State +) -> + connected. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + +on_query( + _InstId, + _Query, + _State +) -> + ok. + +on_batch_query( + _InstId, + [{ChannelId, _Req} | _] = Msg, + #{installed_channels := Channels} = _State +) -> + #{parameters := #{values := #{send_to_pid := PidBin}}} = maps:get(ChannelId, Channels), + Pid = binary_to_term(emqx_utils:hexstr_to_bin(PidBin)), + Pid ! Msg, + ok. + +on_get_status(_InstId, _State) -> + connected. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl new file mode 100644 index 000000000..1c300bff8 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_test_connector_info). + +-behaviour(emqx_connector_info). + +-export([ + type_name/0, + bridge_types/0, + resource_callback_module/0, + config_schema/0, + schema_module/0, + api_schema/1 +]). + +type_name() -> + rule_engine_test. + +bridge_types() -> + [rule_engine_test]. + +resource_callback_module() -> + emqx_rule_engine_test_connector. + +config_schema() -> + {rule_engine_test, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(emqx_rule_engine_test_action_info, "config_connector")), + #{ + desc => <<"Test Connector Config">>, + required => false + } + )}. + +schema_module() -> + emqx_rule_engine_test_action_info. + +api_schema(Method) -> + emqx_connector_schema:api_ref( + ?MODULE, <<"rule_engine_test">>, Method ++ "_connector" + ). diff --git a/mix.exs b/mix.exs index 486484f72..6ce5c4885 100644 --- a/mix.exs +++ b/mix.exs @@ -60,7 +60,7 @@ defmodule EMQXUmbrella.MixProject do {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, {:minirest, github: "emqx/minirest", tag: "1.4.0", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true}, - {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, + {:replayq, github: "emqx/replayq", tag: "0.3.8", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, # maybe forbid to fetch quicer {:emqtt, diff --git a/rebar.config b/rebar.config index 8bbe178aa..a0a726fcf 100644 --- a/rebar.config +++ b/rebar.config @@ -88,7 +88,7 @@ {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}}, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}}, - {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}, + {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.8"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}}, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.0"}}},