From 9ba454a63d3cc75686112251f6f553d2d323f67c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 12:00:43 +0800 Subject: [PATCH 01/25] fix(bridge): filter the topic of received msgs got from remote MQTT broker --- .../src/mqtt/emqx_connector_mqtt_mod.erl | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index d7abcda84..30a1ccb30 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -173,15 +173,27 @@ handle_publish(Msg, Vars) -> _ = erlang:apply(Mod, Func, [Msg | Args]); _ -> ok end, - case maps:get(local_topic, Vars, undefined) of - undefined -> ok; - _Topic -> - emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)) - end. + maybe_publish_to_local_broker(Msg, Vars). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. +maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> + case maps:get(local_topic, Vars, undefined) of + undefined -> + %% local topic is not set, discard it + ok; + _ -> + case emqx_topic:match(Topic, SubTopic) of + true -> + _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), + ok; + false -> + ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", + message => Msg, subscribed => SubTopic, got_topic => Topic}) + end + end. + make_hdlr(Parent, Vars) -> #{puback => {fun ?MODULE:handle_puback/2, [Parent]}, publish => {fun ?MODULE:handle_publish/2, [Vars]}, From f19ccdfcde868eda1b17375e13cd93a463c159ca Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 30 Dec 2021 16:42:50 +0800 Subject: [PATCH 02/25] fix(auto_subscribe): update config in cluster --- .../src/emqx_auto_subscribe.erl | 16 ++++++++++++++-- .../src/emqx_auto_subscribe_api.erl | 8 ++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 558e5005c..564772fe4 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -16,6 +16,8 @@ -module(emqx_auto_subscribe). +-include_lib("emqx/include/logger.hrl"). + -define(HOOK_POINT, 'client.connected'). -define(MAX_AUTO_SUBSCRIBE, 20). @@ -80,8 +82,18 @@ format(Rule = #{topic := Topic}) when is_map(Rule) -> }. update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE -> - {ok, _} = emqx:update_config([auto_subscribe, topics], Topics), - update_hook(); + case emqx_conf:update([auto_subscribe, topics], + Topics, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{config := NewTopics}} -> + ok = update_hook(), + {ok, NewTopics}; + {error, Reason} -> + ?LOG(error, "Auto Subscribe update config failed: ~0p", [Reason]), + {error, Reason} + end; + % {ok, _} = emqx:update_config([auto_subscribe, topics], Topics), + % update_hook(); update_(_Topics) -> {error, quota_exceeded}. diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl index d1207544a..cb5372d5d 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl @@ -22,6 +22,7 @@ -export([auto_subscribe/2]). +-define(INTERNAL_ERROR, 'INTERNAL_ERROR'). -define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(BAD_REQUEST, 'BAD_REQUEST'). @@ -90,6 +91,9 @@ auto_subscribe(put, #{body := Params}) -> Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p", [emqx_auto_subscribe:max_limit()])), {409, #{code => ?EXCEED_LIMIT, message => Message}}; - ok -> - {200, emqx_auto_subscribe:list()} + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {500, #{code => ?INTERNAL_ERROR, message => Message}}; + {ok, NewTopics} -> + {200, NewTopics} end. From 23cf74d82976f79d74a3c7abd9d83b8c48b68af3 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 30 Dec 2021 16:43:09 +0800 Subject: [PATCH 03/25] fix(delayed): update config in cluster --- apps/emqx_modules/src/emqx_delayed.erl | 2 +- apps/emqx_modules/src/emqx_delayed_api.erl | 73 +++++++++++----------- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 36569d52e..73a21d3e6 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -187,7 +187,7 @@ delete_delayed_message(Id0) -> mria:dirty_delete(?TAB, {Timestamp, Id}) end. update_config(Config) -> - {ok, _} = emqx:update_config([delayed], Config). + emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). %%-------------------------------------------------------------------- %% gen_server callback diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 9199d7b2c..e582d9189 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -25,12 +25,14 @@ -define(MAX_PAYLOAD_LENGTH, 2048). -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). --export([status/2 - , delayed_messages/2 - , delayed_message/2 -]). +-export([ status/2 + , delayed_messages/2 + , delayed_message/2 + ]). --export([paths/0, fields/1, schema/1]). +-export([ paths/0 + , fields/1 + , schema/1]). %% for rpc -export([update_config_/1]). @@ -40,6 +42,7 @@ -define(ALREADY_ENABLED, 'ALREADY_ENABLED'). -define(ALREADY_DISABLED, 'ALREADY_DISABLED'). +-define(INTERNAL_ERROR, 'INTERNAL_ERROR'). -define(BAD_REQUEST, 'BAD_REQUEST'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). @@ -49,7 +52,11 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE). -paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"]. +paths() -> + [ "/mqtt/delayed" + , "/mqtt/delayed/messages" + , "/mqtt/delayed/messages/:msgid" + ]. schema("/mqtt/delayed") -> #{ @@ -189,8 +196,7 @@ get_status() -> update_config(Config) -> case generate_config(Config) of {ok, Config} -> - update_config_(Config), - {200, get_status()}; + update_config_(Config); {error, {Code, Message}} -> {400, #{code => Code, message => Message}} end. @@ -215,29 +221,28 @@ generate_max_delayed_messages(Config) -> {ok, Config}. update_config_(Config) -> - lists:foreach(fun(Node) -> - update_config_(Node, Config) - end, mria_mnesia:running_nodes()). - -update_config_(Node, Config) when Node =:= node() -> - _ = emqx_delayed:update_config(Config), - case maps:get(<<"enable">>, Config, undefined) of - undefined -> - ignore; - true -> - emqx_delayed:enable(); - false -> - emqx_delayed:disable() - end, - case maps:get(<<"max_delayed_messages">>, Config, undefined) of - undefined -> - ignore; - Max -> - ok = emqx_delayed:set_max_delayed_messages(Max) - end; - -update_config_(Node, Config) -> - rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]). + case emqx_delayed:update_config(Config) of + {ok, #{config := NewDelayed}} -> + case maps:get(<<"enable">>, Config, undefined) of + undefined -> + ignore; + true -> + emqx_delayed:enable(); + false -> + emqx_delayed:disable() + end, + case maps:get(<<"max_delayed_messages">>, Config, undefined) of + undefined -> + ignore; + Max -> + ok = emqx_delayed:set_max_delayed_messages(Max) + end, + {200, NewDelayed}; + {error, Reason} -> + Message = list_to_binary( + io_lib:format("Update delayed message config failed ~p", [Reason])), + {500, ?INTERNAL_ERROR, Message} + end. generate_http_code_map(id_schema_error, Id) -> #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => @@ -245,9 +250,3 @@ generate_http_code_map(id_schema_error, Id) -> generate_http_code_map(not_found, Id) -> #{code => ?MESSAGE_ID_NOT_FOUND, message => iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}. - -rpc_call(Node, Module, Fun, Args) -> - case rpc:call(Node, Module, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Result -> Result - end. From 6c52fb4806e92091186b79ce76fd09ad594e5f6d Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 30 Dec 2021 16:50:19 +0800 Subject: [PATCH 04/25] fix: code format --- apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl | 5 ----- 1 file changed, 5 deletions(-) diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 564772fe4..0183726da 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -16,8 +16,6 @@ -module(emqx_auto_subscribe). --include_lib("emqx/include/logger.hrl"). - -define(HOOK_POINT, 'client.connected'). -define(MAX_AUTO_SUBSCRIBE, 20). @@ -89,11 +87,8 @@ update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE -> ok = update_hook(), {ok, NewTopics}; {error, Reason} -> - ?LOG(error, "Auto Subscribe update config failed: ~0p", [Reason]), {error, Reason} end; - % {ok, _} = emqx:update_config([auto_subscribe, topics], Topics), - % update_hook(); update_(_Topics) -> {error, quota_exceeded}. From f0330d9334737f06d23bb3b338e8af8248ddaf29 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 30 Dec 2021 17:36:36 +0800 Subject: [PATCH 05/25] fix(event_message): update config in cluster --- apps/emqx_modules/src/emqx_delayed_api.erl | 2 +- apps/emqx_modules/src/emqx_event_message.erl | 11 +++++++++-- apps/emqx_modules/src/emqx_event_message_api.erl | 9 +++++++-- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index e582d9189..66d4a0fdc 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -240,7 +240,7 @@ update_config_(Config) -> {200, NewDelayed}; {error, Reason} -> Message = list_to_binary( - io_lib:format("Update delayed message config failed ~p", [Reason])), + io_lib:format("Update config failed ~p", [Reason])), {500, ?INTERNAL_ERROR, Message} end. diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl index ccdb75ccb..3918e4dfc 100644 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ b/apps/emqx_modules/src/emqx_event_message.erl @@ -44,8 +44,15 @@ list() -> update(Params) -> disable(), - {ok, _} = emqx:update_config([event_message], Params), - enable(). + case emqx_conf:update([event_message], + Params, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{config := NewEventMessage}} -> + enable(), + {ok, NewEventMessage}; + {error, Reason} -> + {error, Reason} + end. enable() -> lists:foreach(fun({_Topic, false}) -> ok; diff --git a/apps/emqx_modules/src/emqx_event_message_api.erl b/apps/emqx_modules/src/emqx_event_message_api.erl index 80e5825d1..e27311e15 100644 --- a/apps/emqx_modules/src/emqx_event_message_api.erl +++ b/apps/emqx_modules/src/emqx_event_message_api.erl @@ -53,5 +53,10 @@ event_message(get, _Params) -> {200, emqx_event_message:list()}; event_message(put, #{body := Body}) -> - _ = emqx_event_message:update(Body), - {200, emqx_event_message:list()}. + case emqx_event_message:update(Body) of + {ok, NewConfig} -> + {200, NewConfig}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {500, 'INTERNAL_ERROR', Message} + end. From a630044688d53ff30264ba788bf05e5b967842d7 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 30 Dec 2021 18:35:20 +0800 Subject: [PATCH 06/25] fix: update result by row_config --- apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl | 2 +- apps/emqx_modules/src/emqx_delayed_api.erl | 2 +- apps/emqx_modules/src/emqx_event_message.erl | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 0183726da..81b0d70a4 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -83,7 +83,7 @@ update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE -> case emqx_conf:update([auto_subscribe, topics], Topics, #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{config := NewTopics}} -> + {ok, #{raw_config := NewTopics}} -> ok = update_hook(), {ok, NewTopics}; {error, Reason} -> diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 66d4a0fdc..5caad8aa1 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -222,7 +222,7 @@ generate_max_delayed_messages(Config) -> update_config_(Config) -> case emqx_delayed:update_config(Config) of - {ok, #{config := NewDelayed}} -> + {ok, #{raw_config := NewDelayed}} -> case maps:get(<<"enable">>, Config, undefined) of undefined -> ignore; diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl index 3918e4dfc..3af57a38d 100644 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ b/apps/emqx_modules/src/emqx_event_message.erl @@ -47,7 +47,7 @@ update(Params) -> case emqx_conf:update([event_message], Params, #{rawconf_with_defaults => true, override_to => cluster}) of - {ok, #{config := NewEventMessage}} -> + {ok, #{raw_config := NewEventMessage}} -> enable(), {ok, NewEventMessage}; {error, Reason} -> From 173ae4653803860a8aa1ebbf9943ca62c2057997 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 31 Dec 2021 10:04:06 +0800 Subject: [PATCH 07/25] fix(auto_subscribe): bad test suite --- .../emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 0e5022533..ab8f9a408 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -85,7 +85,7 @@ init_per_suite(Config) -> } ] }">>), - emqx_common_test_helpers:start_apps([emqx_dashboard, ?APP], fun set_special_configs/1), + emqx_common_test_helpers:start_apps([emqx_dashboard, emqx_conf, ?APP], fun set_special_configs/1), Config. set_special_configs(emqx_dashboard) -> @@ -113,15 +113,17 @@ topic_config(T) -> end_per_suite(_) -> application:unload(emqx_management), + application:unload(emqx_conf), application:unload(?APP), meck:unload(emqx_resource), meck:unload(emqx_schema), emqx_common_test_helpers:stop_apps([emqx_dashboard, ?APP]). t_auto_subscribe(_) -> + emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]), {ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}), {ok, _} = emqtt:connect(Client), - timer:sleep(100), + timer:sleep(200), ?assertEqual(check_subs(length(?TOPICS)), ok), emqtt:disconnect(Client), ok. @@ -148,6 +150,7 @@ t_update(_) -> check_subs(Count) -> Subs = ets:tab2list(emqx_suboption), + ct:pal("---> ~p ~p ~n", [Subs, Count]), ?assert(length(Subs) >= Count), check_subs((Subs), ?ENSURE_TOPICS). From c7693246febb4a5b6988547981deb8bacd8de23a Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Tue, 28 Dec 2021 21:38:10 -0800 Subject: [PATCH 08/25] feat(emqx_resource): add health_ckeck process, it will periodically perform health checks, and print error logs and generate alarms when the checks fail. --- .../src/emqx_resource_health_check.erl | 43 +++++++++++++++++++ .../src/emqx_resource_health_check_sup.erl | 40 +++++++++++++++++ .../src/emqx_resource_instance.erl | 6 ++- apps/emqx_resource/src/emqx_resource_sup.erl | 7 ++- 4 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 apps/emqx_resource/src/emqx_resource_health_check.erl create mode 100644 apps/emqx_resource/src/emqx_resource_health_check_sup.erl diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl new file mode 100644 index 000000000..50b236daa --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_resource_health_check). + +-export([child_spec/2]). + +-export([start_link/2]). + +-export([health_check/2]). + +child_spec(Name, Sleep) -> + #{id => {health_check, Name}, + start => {?MODULE, start_link, [Name, Sleep]}, + restart => transient, + shutdown => 5000, type => worker, modules => [?MODULE]}. + +start_link(Name, Sleep) -> + Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), + {ok, Pid}. + +health_check(Name, SleepTime) -> + timer:sleep(SleepTime), + case emqx_resource:health_check(Name) of + ok -> + emqx_alarm:deactivate(Name); + {error, _} -> + emqx_alarm:activate(Name, #{name => Name}, + <>) + end, + health_check(Name, SleepTime). \ No newline at end of file diff --git a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl new file mode 100644 index 000000000..571cd6338 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl @@ -0,0 +1,40 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_resource_health_check_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1, + create_health_check_process/2, + delete_health_check_process/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, + {ok, {SupFlags, []}}. + +create_health_check_process(Name, Sleep) -> + supervisor:start_child(emqx_resource_health_check_sup, + emqx_resource_health_check:child_spec(Name, Sleep)). + +delete_health_check_process(Name) -> + _ = supervisor:terminate_child(emqx_resource_health_check_sup, {health_check, Name}), + _ = supervisor:delete_child(emqx_resource_health_check_sup, {health_check, Name}), + ok. \ No newline at end of file diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 745b8b684..af9c999cf 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -173,6 +173,8 @@ do_create(InstId, ResourceType, Config, Opts) -> %% this is the first time we do health check, this will update the %% status and then do ets:insert/2 _ = do_health_check(Res0#{state => ResourceState}), + HealthCheckInterval = maps:get(health_check_interval, Opts, 15000), + emqx_resource_health_check_sup:create_health_check_process(InstId, HealthCheckInterval), {ok, force_lookup(InstId)}; {error, Reason} when ForceCreate == true -> logger:error("start ~ts resource ~ts failed: ~p, " @@ -216,7 +218,9 @@ do_remove(Mod, InstId, ResourceState, ClearMetrics) -> case ClearMetrics of true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); false -> ok - end. + end, + _ = emqx_resource_health_check_sup:delete_health_check_process(InstId), + ok. do_restart(InstId) -> case lookup(InstId) of diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 534777b69..b5655e301 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -45,7 +45,12 @@ init([]) -> restart => transient, shutdown => 5000, type => worker, modules => [Mod]} end || Idx <- lists:seq(1, ?POOL_SIZE)], - {ok, {SupFlags, [Metrics | ResourceInsts]}}. + HealthCheck = + #{id => emqx_resource_health_check_sup, + start => {emqx_resource_health_check_sup, start_link, []}, + restart => transient, + shutdown => 5000, type => supervisor, modules => [emqx_resource_health_check_sup]}, + {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}. %% internal functions ensure_pool(Pool, Type, Opts) -> From d18a2ab57c1dac3981114e931fcc67cefebeb06c Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 31 Dec 2021 12:07:53 +0800 Subject: [PATCH 09/25] fix(delayed): base64 encode twice --- apps/emqx_modules/src/emqx_delayed.erl | 2 +- apps/emqx_retainer/src/emqx_retainer_api.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 36569d52e..69e9ebb5b 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -155,7 +155,7 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed, }, case WithPayload of true -> - Result#{payload => base64:encode(Payload)}; + Result#{payload => Payload}; _ -> Result end. diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index b60ac5627..7739d60dc 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -84,7 +84,7 @@ with_topic_api() -> parameters => parameters(), responses => #{ <<"200">> => object_schema(message_props(), <<"List retained messages">>), - <<"404">> => error_schema(<<"Reatined Not Exists">>, ['NOT_FOUND']), + <<"404">> => error_schema(<<"Retained Not Exists">>, ['NOT_FOUND']), <<"405">> => schema(<<"NotAllowed">>) } }, From 6cde540fd1b1d4b4579b742ed971bf1dd503a62d Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 31 Dec 2021 14:17:50 +0800 Subject: [PATCH 10/25] fix(test): close app at end_per_suite --- apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index ab8f9a408..92eb9a9ab 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -117,7 +117,7 @@ end_per_suite(_) -> application:unload(?APP), meck:unload(emqx_resource), meck:unload(emqx_schema), - emqx_common_test_helpers:stop_apps([emqx_dashboard, ?APP]). + emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_conf, ?APP]). t_auto_subscribe(_) -> emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]), From e299d8d138ab8477638fd14317344ed117272bf4 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 15:47:03 +0800 Subject: [PATCH 11/25] fix(rule): rules not triggered after the ingress mqtt bridge received some msg --- apps/emqx_bridge/src/emqx_bridge.erl | 23 +++++++++++++------ .../src/emqx_rule_runtime.erl | 11 +++++---- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d46ce217e..1814a11fe 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -90,7 +90,14 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> send_message(BridgeId, Message) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), ResId = emqx_bridge:resource_id(BridgeType, BridgeName), - emqx_resource:query(ResId, {send_message, Message}). + case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of + not_found -> + throw({bridge_not_found, BridgeId}); + #{enable := true} -> + emqx_resource:query(ResId, {send_message, Message}); + #{enable := false} -> + throw({bridge_stopped, BridgeId}) + end. config_key_path() -> [bridges]. @@ -279,6 +286,8 @@ get_matched_bridges(Topic) -> end, Acc0, Conf) end, [], Bridges). +get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) -> + Acc; get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) -> case emqx_topic:match(Topic, Filter) of true -> [bridge_id(BType, BName) | Acc]; @@ -309,21 +318,21 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) {Type, ConnName} -> ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), make_resource_confs(Direction, ConnectorConfs, - maps:without([connector, direction], Conf), Name); + maps:without([connector, direction], Conf), Type, Name); {_ConnType, _ConnName} -> error({cannot_use_connector_with_different_type, ConnId}) end; -parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) +parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when is_map(ConnectorConfs) -> make_resource_confs(Direction, ConnectorConfs, - maps:without([connector, direction], Conf), Name). + maps:without([connector, direction], Conf), Type, Name). -make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) -> - BName = bin(Name), +make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> + BName = bridge_id(Type, Name), ConnectorConfs#{ ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} }; -make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) -> +make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) -> ConnectorConfs#{ egress => BridgeConf }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 60a7cbaad..428a1ef8c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -101,7 +101,7 @@ do_apply_rule(#{ true -> ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), Collection2 = filter_collection(Input, InCase, DoEach, Collection), - {ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]}; + {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; false -> {error, nomatch} end; @@ -118,7 +118,7 @@ do_apply_rule(#{id := RuleId, {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), - {ok, handle_output_list(Outputs, Selected, Input)}; + {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; false -> {error, nomatch} end. @@ -231,14 +231,15 @@ number(Bin) -> catch error:badarg -> binary_to_float(Bin) end. -handle_output_list(Outputs, Selected, Envs) -> - [handle_output(Out, Selected, Envs) || Out <- Outputs]. +handle_output_list(RuleId, Outputs, Selected, Envs) -> + [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs]. -handle_output(OutId, Selected, Envs) -> +handle_output(RuleId, OutId, Selected, Envs) -> try do_handle_output(OutId, Selected, Envs) catch Err:Reason:ST -> + ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId), ?SLOG(error, #{msg => "output_failed", output => OutId, exception => Err, From 657ecef67bec35b769bb59ec2087caaadd474d6a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 20:57:34 +0800 Subject: [PATCH 12/25] fix(resource): don't crash on resource stopped --- apps/emqx_bridge/src/emqx_bridge.erl | 5 ++++- apps/emqx_resource/include/emqx_resource.hrl | 4 +++- apps/emqx_resource/src/emqx_resource.erl | 16 +++++++++++----- .../emqx_resource/src/emqx_resource_instance.erl | 14 +++++++++++--- apps/emqx_resource/test/emqx_resource_SUITE.erl | 2 +- apps/emqx_rule_engine/src/emqx_rule_runtime.erl | 3 ++- 6 files changed, 32 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 1814a11fe..3ae4e5820 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -94,7 +94,10 @@ send_message(BridgeId, Message) -> not_found -> throw({bridge_not_found, BridgeId}); #{enable := true} -> - emqx_resource:query(ResId, {send_message, Message}); + case emqx_resource:query(ResId, {send_message, Message}) of + {error, {emqx_resource, Reason}} -> throw({bridge_not_ready, Reason}); + Result -> Result + end; #{enable := false} -> throw({bridge_stopped, BridgeId}) end. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 08c230401..ed1de18cf 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -25,7 +25,7 @@ mod := module(), config := resource_config(), state := resource_state(), - status := started | stopped, + status := started | stopped | starting, metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). @@ -41,3 +41,5 @@ %% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback %% actions upon query failure -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. + +-define(TEST_ID_PREFIX, "_test_:"). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 37c4caa2e..5ec5fd92a 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -82,7 +82,6 @@ ]). -define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}). - -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -optional_callbacks([ on_query/4 @@ -170,7 +169,7 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - InstId = iolist_to_binary(emqx_misc:gen_id(16)), + InstId = emqx_resource_instance:make_test_id(), call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). -spec recreate(instance_id(), resource_type(), resource_config(), term()) -> @@ -201,14 +200,18 @@ query(InstId, Request) -> -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). query(InstId, Request, AfterQuery) -> case get_instance(InstId) of + {ok, #{status := starting}} -> + query_error(starting, <<"cannot serve query when the resource " + "instance is still starting">>); {ok, #{status := stopped}} -> - error({resource_stopped, InstId}); + query_error(stopped, <<"cannot serve query when the resource " + "instance is stopped">>); {ok, #{mod := Mod, state := ResourceState, status := started}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe Mod:on_query(InstId, Request, AfterQuery, ResourceState); - {error, Reason} -> - error({get_instance, {InstId, Reason}}) + {error, not_found} -> + query_error(not_found, <<"the resource id not exists">>) end. -spec restart(instance_id()) -> ok | {error, Reason :: term()}. @@ -368,3 +371,6 @@ cluster_call(Func, Args) -> {ok, _TxnId, Result} -> Result; Failed -> Failed end. + +query_error(Reason, Msg) -> + {error, {?MODULE, #{reason => Reason, msg => Msg}}}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 745b8b684..c413691d9 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -26,6 +26,7 @@ -export([ lookup/1 , get_metrics/1 , list_all/0 + , make_test_id/0 ]). -export([ hash_call/2 @@ -61,7 +62,7 @@ hash_call(InstId, Request) -> hash_call(InstId, Request, Timeout) -> gen_server:call(pick(InstId), Request, Timeout). --spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. +-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}. lookup(InstId) -> case ets:lookup(emqx_resource_instance, InstId) of [] -> {error, not_found}; @@ -69,6 +70,10 @@ lookup(InstId) -> {ok, Data#{id => InstId, metrics => get_metrics(InstId)}} end. +make_test_id() -> + RandId = iolist_to_binary(emqx_misc:gen_id(16)), + <>. + get_metrics(InstId) -> emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). @@ -146,7 +151,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> Config = emqx_resource:call_config_merge(ResourceType, OldConfig, NewConfig, Params), - TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), + TestInstId = make_test_id(), case do_create_dry_run(TestInstId, ResourceType, Config) of ok -> do_remove(ResourceType, InstId, ResourceState, false), @@ -166,7 +171,9 @@ do_create(InstId, ResourceType, Config, Opts) -> {ok, _} -> {ok, already_created}; _ -> Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => stopped, state => undefined}, + status => starting, state => undefined}, + %% The `emqx_resource:call_start/3` need the instance exist beforehand + ets:insert(emqx_resource_instance, {InstId, Res0}), case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), @@ -181,6 +188,7 @@ do_create(InstId, ResourceType, Config, Opts) -> ets:insert(emqx_resource_instance, {InstId, Res0}), {ok, Res0}; {error, Reason} when ForceCreate == false -> + ets:delete(emqx_resource_instance, InstId), {error, Reason} end end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 6b2e5903e..0b65b5a78 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -142,7 +142,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)), + ?assertMatch({emqx_resource, #{reason := stopped}}, emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 428a1ef8c..049829c59 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -240,7 +240,8 @@ handle_output(RuleId, OutId, Selected, Envs) -> catch Err:Reason:ST -> ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId), - ?SLOG(error, #{msg => "output_failed", + Level = case Err of throw -> debug; _ -> error end, + ?SLOG(Level, #{msg => "output_failed", output => OutId, exception => Err, reason => Reason, From f65eca4c4705e56e611ef7b8cbc189a41e4b24eb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 21:08:07 +0800 Subject: [PATCH 13/25] fix(authn): update testcase for resource not running --- apps/emqx_authn/test/emqx_authn_http_SUITE.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/emqx_authn/test/emqx_authn_http_SUITE.erl b/apps/emqx_authn/test/emqx_authn_http_SUITE.erl index 2c0716e8b..146e9ef32 100644 --- a/apps/emqx_authn/test/emqx_authn_http_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_http_SUITE.erl @@ -153,9 +153,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_http:authenticate( Credentials, State)). From 658f819aabff26a08effb2b5607be36608efad5b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 21:28:32 +0800 Subject: [PATCH 14/25] fix(bridges): keep multiple bridges from affecting each other on crash --- apps/emqx_bridge/src/emqx_bridge.erl | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 3ae4e5820..3518190c7 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -80,26 +80,35 @@ unload_hook() -> on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> - lists:foreach(fun (Id) -> - send_message(Id, emqx_rule_events:eventmsg_publish(Message)) - end, get_matched_bridges(Topic)); + Msg = emqx_rule_events:eventmsg_publish(Message), + send_to_egress_matched_bridges(Topic, Msg); true -> ok end, {ok, Message}. +send_to_egress_matched_bridges(Topic, Msg) -> + lists:foreach(fun (Id) -> + try send_message(Id, Msg) of + ok -> ok; + Error -> ?SLOG(error, #{msg => "send_message_to_bridge_failed", + bridge => Id, error => Error}) + catch Err:Reason:ST -> + ?SLOG(error, #{msg => "send_message_to_bridge_crash", + bridge => Id, error => Err, reason => Reason, + stacktrace => ST}) + end + end, get_matched_bridges(Topic)). + send_message(BridgeId, Message) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), ResId = emqx_bridge:resource_id(BridgeType, BridgeName), case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of not_found -> - throw({bridge_not_found, BridgeId}); + {error, {bridge_not_found, BridgeId}}; #{enable := true} -> - case emqx_resource:query(ResId, {send_message, Message}) of - {error, {emqx_resource, Reason}} -> throw({bridge_not_ready, Reason}); - Result -> Result - end; + emqx_resource:query(ResId, {send_message, Message}); #{enable := false} -> - throw({bridge_stopped, BridgeId}) + {error, {bridge_stopped, BridgeId}} end. config_key_path() -> From b74a9bfda1d72871fa73a0f3ca94f175e8d665d7 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 22:20:22 +0800 Subject: [PATCH 15/25] fix(swagger): duplicate keys in swagger doc --- apps/emqx_bridge/src/emqx_bridge_http_schema.erl | 4 ---- apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl | 8 -------- apps/emqx_connector/src/emqx_connector_mqtt.erl | 4 ---- 3 files changed, 16 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index 494911d21..152289cf1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -68,7 +68,6 @@ How long will the HTTP request timeout. fields("post") -> [ type_field() - , name_field() ] ++ fields("bridge"); fields("put") -> @@ -103,8 +102,5 @@ id_field() -> type_field() -> {type, mk(http, #{desc => "The Bridge Type"})}. -name_field() -> - {name, mk(binary(), #{desc => "The Bridge Name"})}. - method() -> enum([post, put, get, delete]). diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl index 3de011b4c..96c9a1d38 100644 --- a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl @@ -24,11 +24,9 @@ fields("egress") -> fields("post_ingress") -> [ type_field() - , name_field() ] ++ proplists:delete(enable, fields("ingress")); fields("post_egress") -> [ type_field() - , name_field() ] ++ proplists:delete(enable, fields("egress")); fields("put_ingress") -> @@ -49,9 +47,3 @@ id_field() -> type_field() -> {type, mk(mqtt, #{desc => "The Bridge Type"})}. - -name_field() -> - {name, mk(binary(), - #{ desc => "The Bridge Name" - , example => "some_bridge_name" - })}. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index c216e905c..beeff6d3e 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -68,10 +68,6 @@ fields("put") -> fields("post") -> [ {type, mk(mqtt, #{desc => "The Connector Type"})} - , {name, mk(binary(), - #{ desc => "The Connector Name" - , example => <<"my_mqtt_connector">> - })} ] ++ fields("put"). %% =================================================================== From efec4564f0a1bbdcdd6d724cbb257069f47e2b9c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 22:25:45 +0800 Subject: [PATCH 16/25] fix(resource): update test cases on resource not_found --- apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl | 5 ++--- apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl | 5 ++--- apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl | 5 ++--- apps/emqx_authn/test/emqx_authn_redis_SUITE.erl | 5 ++--- apps/emqx_resource/test/emqx_resource_SUITE.erl | 7 +++---- 5 files changed, 11 insertions(+), 16 deletions(-) diff --git a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl index edd91be55..855a2226d 100644 --- a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl @@ -146,9 +146,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_mongodb:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 95eecdead..ffe98be65 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -159,9 +159,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_mysql:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index e33f5c100..56044faf4 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -159,9 +159,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_pgsql:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index de556a7bd..eeb674f86 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -164,9 +164,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_redis:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 0b65b5a78..4e9c35efc 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -96,9 +96,7 @@ t_query(_) -> ?assert(false) end, - ?assertException( - error, - {get_instance, _Reason}, + ?assertMatch({error, {emqx_resource, #{reason := not_found}}}, emqx_resource:query(<<"unknown">>, get_state)), ok = emqx_resource:remove_local(?ID). @@ -142,7 +140,8 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertMatch({emqx_resource, #{reason := stopped}}, emqx_resource:query(?ID, get_state)), + ?assertMatch({error, {emqx_resource, #{reason := stopped}}}, + emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), From 9a7452e1c5abe778a89711d582a1bdd8162d3f0a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 03:07:31 +0800 Subject: [PATCH 17/25] fix(connector): add testcase for binding ingress mqtt bridge to rules --- apps/emqx_bridge/src/emqx_bridge.erl | 4 +- .../src/mqtt/emqx_connector_mqtt_mod.erl | 63 ++++++++++---- .../src/mqtt/emqx_connector_mqtt_msg.erl | 30 +------ .../test/emqx_connector_api_SUITE.erl | 82 +++++++++++++++++-- 4 files changed, 125 insertions(+), 54 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 3518190c7..0495f00e4 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -81,12 +81,12 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> Msg = emqx_rule_events:eventmsg_publish(Message), - send_to_egress_matched_bridges(Topic, Msg); + send_to_matched_egress_bridges(Topic, Msg); true -> ok end, {ok, Message}. -send_to_egress_matched_bridges(Topic, Msg) -> +send_to_matched_egress_bridges(Topic, Msg) -> lists:foreach(fun (Id) -> try send_message(Id, Msg) of ok -> ok; diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 30a1ccb30..7d5021f82 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -165,7 +165,8 @@ handle_publish(Msg, undefined) -> ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" "_'ingress'_is_not_configured", message => Msg}); -handle_publish(Msg, Vars) -> +handle_publish(Msg0, Vars) -> + Msg = format_msg_received(Msg0), ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), case Vars of @@ -173,27 +174,11 @@ handle_publish(Msg, Vars) -> _ = erlang:apply(Mod, Func, [Msg | Args]); _ -> ok end, - maybe_publish_to_local_broker(Msg, Vars). + maybe_publish_to_local_broker(Msg0, Vars). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. -maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> - case maps:get(local_topic, Vars, undefined) of - undefined -> - %% local topic is not set, discard it - ok; - _ -> - case emqx_topic:match(Topic, SubTopic) of - true -> - _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), - ok; - false -> - ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", - message => Msg, subscribed => SubTopic, got_topic => Topic}) - end - end. - make_hdlr(Parent, Vars) -> #{puback => {fun ?MODULE:handle_puback/2, [Parent]}, publish => {fun ?MODULE:handle_publish/2, [Vars]}, @@ -209,3 +194,45 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) -> process_config(Config) -> maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). + +maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> + case maps:get(local_topic, Vars, undefined) of + undefined -> + ok; %% local topic is not set, discard it + _ -> + case emqx_topic:match(Topic, SubTopic) of + true -> + _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), + ok; + false -> + ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", + message => Msg, subscribed => SubTopic, got_topic => Topic}) + end + end. + +format_msg_received(#{dup := Dup, payload := Payload, properties := Props, + qos := QoS, retain := Retain, topic := Topic}) -> + #{event => '$bridges/mqtt', + id => emqx_guid:to_hexstr(emqx_guid:gen()), + payload => Payload, + topic => Topic, + qos => QoS, + dup => Dup, + retain => Retain, + pub_props => printable_maps(Props), + timestamp => erlang:system_time(millisecond) + }. + +printable_maps(undefined) -> #{}; +printable_maps(Headers) -> + maps:fold( + fun ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; + (K, V0, AccIn) -> AccIn#{K => V0} + end, #{}, Headers). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index a0dd9eec1..35bcf3de1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -78,10 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection -to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, +to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> - MapMsg = format_msg_received(MapMsg0), Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), @@ -90,33 +89,6 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). -format_msg_received(#{dup := Dup, payload := Payload, properties := Props, - qos := QoS, retain := Retain, topic := Topic}) -> - #{event => '$bridges/mqtt', - id => emqx_guid:to_hexstr(emqx_guid:gen()), - payload => Payload, - topic => Topic, - qos => QoS, - flags => #{dup => Dup, retain => Retain}, - pub_props => printable_maps(Props), - timestamp => erlang:system_time(millisecond), - node => node() - }. - -printable_maps(undefined) -> #{}; -printable_maps(Headers) -> - maps:fold( - fun ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{ - 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [#{ - key => Key, - value => Value - } || {Key, Value} <- V0] - }; - (K, V0, AccIn) -> AccIn#{K => V0} - end, #{}, Headers). - process_payload([], Msg) -> emqx_json:encode(Msg); process_payload(Tks, Msg) -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 1a96a3596..96d793640 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -22,7 +22,10 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(CONF_DEFAULT, <<"connectors: {}">>). +%% output functions +-export([ inspect/3 + ]). + -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_NAME, <<"test_connector">>). @@ -67,6 +70,9 @@ <<"failed">> := FAILED, <<"rate">> := SPEED, <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}). +inspect(Selected, _Envs, _Args) -> + persistent_term:put(?MODULE, #{inspect => Selected}). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -89,13 +95,15 @@ init_per_suite(Config) -> %% some testcases (may from other app) already get emqx_connector started _ = application:stop(emqx_resource), _ = application:stop(emqx_connector), - ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]), - ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT), + ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector, + emqx_bridge, emqx_dashboard]), + ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>), + ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>), ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]), + emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]), ok. init_per_testcase(_, Config) -> @@ -223,7 +231,6 @@ t_mqtt_conn_bridge_ingress(_) -> %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. emqx:publish(emqx_message:make(RemoteTopic, Payload)), - %% we should receive a message on the local broker, with specified topic ?assert( receive @@ -435,6 +442,71 @@ t_mqtt_conn_testing(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS }). +t_ingress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_INGRESS + }), + #{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule get messages from a source mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}], + <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + + RemoteTopic = <<"remote_topic/1">>, + LocalTopic = <<"local_topic/", RemoteTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(LocalTopic), + %% PUBLISH a message to the 'remote' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(RemoteTopic, Payload)), + %% we should receive a message on the local broker, with specified topic + ?assert( + receive + {deliver, LocalTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + %% and also the rule should be matched, with matched + 1: + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we also check if the outputs of the rule is triggered + ?assertMatch(#{inspect := #{ + event := '$bridges/mqtt', + id := MsgId, + payload := Payload, + topic := RemoteTopic, + qos := 0, + dup := false, + retain := false, + pub_props := #{}, + timestamp := _ + }} when is_binary(MsgId), persistent_term:get(?MODULE)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- From 925d46fe86433aef4973424e8403239ddd00673e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 04:12:20 +0800 Subject: [PATCH 18/25] fix(connector): add testcase for binding egress mqtt bridge to rules --- apps/emqx_bridge/src/emqx_bridge.erl | 15 +++ .../test/emqx_connector_api_SUITE.erl | 91 +++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 0495f00e4..2e610b2b9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -39,11 +39,14 @@ , lookup/3 , list/0 , list_bridges_by_connector/1 + , create/2 , create/3 , recreate/2 , recreate/3 , create_dry_run/2 + , remove/1 , remove/3 + , update/2 , update/3 , start/2 , stop/2 @@ -207,6 +210,10 @@ stop(Type, Name) -> restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). +create(BridgeId, Conf) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + create(BridgeType, BridgeName, Conf). + create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), @@ -217,6 +224,10 @@ create(Type, Name, Conf) -> {error, Reason} -> {error, Reason} end. +update(BridgeId, {OldConf, Conf}) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + update(BridgeType, BridgeName, {OldConf, Conf}). + update(Type, Name, {OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% @@ -263,6 +274,10 @@ create_dry_run(Type, Conf) -> Error end. +remove(BridgeId) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + remove(BridgeType, BridgeName, #{}). + remove(Type, Name, _Conf) -> ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 96d793640..936982e75 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -110,8 +110,20 @@ init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Config. end_per_testcase(_, _Config) -> + clear_resources(), ok. +clear_resources() -> + lists:foreach(fun(#{id := Id}) -> + ok = emqx_rule_engine:delete_rule(Id) + end, emqx_rule_engine:get_rules()), + lists:foreach(fun(#{id := Id}) -> + ok = emqx_bridge:remove(Id) + end, emqx_bridge:list()), + lists:foreach(fun(#{<<"id">> := Id}) -> + ok = emqx_connector:delete(Id) + end, emqx_connector:list()). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -507,6 +519,85 @@ t_ingress_mqtt_bridge_with_rules(_) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). +t_egress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + #{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [BridgeIDEgress], + <<"sql">> => <<"SELECT * from \"t/1\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + emqx:unsubscribe(RemoteTopic), + + %% PUBLISH a message to the rule. + Payload2 = <<"hi">>, + RuleTopic = <<"t/1">>, + RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, + emqx:subscribe(RemoteTopic2), + emqx:publish(emqx_message:make(RuleTopic, Payload2)), + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic2, #message{payload = Payload2}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress + , <<"metrics">> := ?metrics(2, 2, 0, _, _, _) + , <<"node_metrics">> := + [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] + }, jsx:decode(BridgeStr)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- From 59e2614574b6cff5837a6524b4c07f98930d8edb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 04:23:51 +0800 Subject: [PATCH 19/25] fix(dialyzer): unmatched results in emqx_statsd_api --- apps/emqx_statsd/src/emqx_statsd_api.erl | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index 2278fa492..d545003b0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -59,12 +59,10 @@ statsd(put, #{body := Body}) -> Body, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfig, config := Config}} -> + _ = emqx_statsd_sup:stop_child(?APP), case maps:get(<<"enable">>, Body) of - true -> - _ = emqx_statsd_sup:stop_child(?APP), - emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); - false -> - _ = emqx_statsd_sup:stop_child(?APP) + true -> emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); + false -> ok end, {200, NewConfig}; {error, Reason} -> From 808646c2a173a3982bd7c73cd07c7b155b09c8dc Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 15:47:27 +0800 Subject: [PATCH 20/25] fix(bridge): prohibit deleting connectors that are in use --- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- apps/emqx_connector/src/emqx_connector.erl | 38 ++++++++------- .../emqx_connector/src/emqx_connector_api.erl | 4 ++ .../test/emqx_connector_api_SUITE.erl | 47 ++++++++++++------- 4 files changed, 56 insertions(+), 35 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 2e610b2b9..6e014f2ec 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -259,7 +259,7 @@ update(Type, Name, {OldConf, Conf}) -> end. recreate(Type, Name) -> - recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])). + recreate(Type, Name, emqx:get_config([bridges, Type, Name])). recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 940e958e3..db1caefbb 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -37,31 +37,26 @@ config_key_path() -> [connectors]. +-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]). post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), - LinkedBridgeIds = lists:foldl(fun - (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc) - when ConnId0 == ConnId -> - [BId | Acc]; - (_, Acc) -> Acc - end, [], emqx_bridge:list()), - case LinkedBridgeIds of - [] -> ok; - _ -> {error, {dependency_bridges_exist, LinkedBridgeIds}} + try foreach_linked_bridges(ConnId, fun(#{id := BId}) -> + throw({dependency_bridges_exist, BId}) + end) + catch throw:Error -> {error, Error} end; -post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) -> +post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), - lists:foreach(fun - (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId -> + foreach_linked_bridges(ConnId, + fun(#{id := BId}) -> {BType, BName} = emqx_bridge:parse_bridge_id(BId), BridgeConf = emqx:get_config([bridges, BType, BName]), - case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of - {ok, _} -> ok; + case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf}, + BridgeConf#{connector => NewConf}}) of + ok -> ok; {error, Reason} -> error({update_bridge_error, Reason}) - end; - (_) -> - ok - end, emqx_bridge:list()). + end + end). connector_id(Type0, Name0) -> Type = bin(Type0), @@ -112,3 +107,10 @@ delete(Type, Name) -> bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +foreach_linked_bridges(ConnId, Do) -> + lists:foreach(fun + (#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId -> + Do(Bridge); + (_) -> ok + end, emqx_bridge:list()). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 4989cf17e..72938649c 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -253,6 +253,10 @@ schema("/connectors/:id") -> {ok, _} -> case emqx_connector:delete(ConnType, ConnName) of {ok, _} -> {204}; + {error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} -> + {403, error_msg('DEPENDENCY_EXISTS', + <<"Cannot remove the connector as it's in use by a bridge: ", + BridgeID/binary>>)}; {error, Error} -> {400, error_msg('BAD_ARG', Error)} end; {error, not_found} -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 936982e75..4caf700a9 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -108,6 +108,9 @@ end_per_suite(_Config) -> init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + %% assert we there's no connectors and no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), Config. end_per_testcase(_, _Config) -> clear_resources(), @@ -200,10 +203,6 @@ t_mqtt_crud_apis(_) -> ok. t_mqtt_conn_bridge_ingress(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -272,10 +271,6 @@ t_mqtt_conn_bridge_ingress(_) -> ok. t_mqtt_conn_bridge_egress(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -350,10 +345,6 @@ t_mqtt_conn_bridge_egress(_) -> %% - update a connector should also update all of the the bridges %% - cannot delete a connector that is used by at least one bridge t_mqtt_conn_update(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST {ok, 201, Connector} = request(post, uri(["connectors"]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) @@ -396,10 +387,6 @@ t_mqtt_conn_update(_) -> {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). t_mqtt_conn_update2(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST %% but this connector is point to a unreachable server "2603" {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -440,6 +427,34 @@ t_mqtt_conn_update2(_) -> {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). +t_mqtt_conn_update3(_) -> + %% we add a mqtt connector, using POST + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) + #{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + %% ... and a MQTT bridge, using POST + %% we bind this bridge to the connector created just now + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + #{ <<"id">> := BridgeIDEgress + , <<"status">> := <<"connected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), + + %% delete the connector should fail because it is in use by a bridge + {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + %% the connector now can be deleted without problems + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + t_mqtt_conn_testing(_) -> %% APIs for testing the connectivity %% then we add a mqtt connector, using POST From 2277b75b2f01d754c6bcb0402072d988a7854618 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 2 Jan 2022 16:41:50 +0800 Subject: [PATCH 21/25] refactor(resource): improve the process starting/stopping resource instances --- apps/emqx_resource/src/emqx_resource.erl | 46 +++--- .../src/emqx_resource_health_check.erl | 24 +-- .../src/emqx_resource_health_check_sup.erl | 37 +++-- .../src/emqx_resource_instance.erl | 153 ++++++++---------- apps/emqx_resource/src/emqx_resource_sup.erl | 2 +- 5 files changed, 127 insertions(+), 135 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5ec5fd92a..12ae912e8 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -58,6 +58,7 @@ %% Calls to the callback module with current resource state %% They also save the state after the call finished (except query/2,3). -export([ restart/1 %% restart the instance. + , restart/2 , health_check/1 %% verify if the resource is working normally , stop/1 %% stop the instance , query/2 %% query the instance @@ -68,7 +69,6 @@ -export([ call_start/3 %% start the instance , call_health_check/3 %% verify if the resource is working normally , call_stop/3 %% stop the instance - , call_config_merge/4 %% merge the config when updating , call_jsonify/2 ]). @@ -86,12 +86,9 @@ -optional_callbacks([ on_query/4 , on_health_check/2 - , on_config_merge/3 , on_jsonify/1 ]). --callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config(). - -callback on_jsonify(resource_config()) -> jsx:json_term(). %% when calling emqx_resource:start/1 @@ -169,18 +166,17 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - InstId = emqx_resource_instance:make_test_id(), - call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). + call_instance(<>, {create_dry_run, ResourceType, Config}). --spec recreate(instance_id(), resource_type(), resource_config(), term()) -> +-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate(InstId, ResourceType, Config, Params) -> - cluster_call(recreate_local, [InstId, ResourceType, Config, Params]). +recreate(InstId, ResourceType, Config, Opts) -> + cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]). --spec recreate_local(instance_id(), resource_type(), resource_config(), term()) -> +-spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(InstId, ResourceType, Config, Params) -> - call_instance(InstId, {recreate, InstId, ResourceType, Config, Params}). +recreate_local(InstId, ResourceType, Config, Opts) -> + call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}). -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> @@ -216,7 +212,11 @@ query(InstId, Request, AfterQuery) -> -spec restart(instance_id()) -> ok | {error, Reason :: term()}. restart(InstId) -> - call_instance(InstId, {restart, InstId}). + restart(InstId, #{}). + +-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +restart(InstId, Opts) -> + call_instance(InstId, {restart, InstId, Opts}). -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> @@ -276,14 +276,6 @@ call_health_check(InstId, Mod, ResourceState) -> call_stop(InstId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)). --spec call_config_merge(module(), resource_config(), resource_config(), term()) -> - resource_config(). -call_config_merge(Mod, OldConfig, NewConfig, Params) -> - case erlang:function_exported(Mod, on_config_merge, 3) of - true -> ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)); - false -> NewConfig - end. - -spec call_jsonify(module(), resource_config()) -> jsx:json_term(). call_jsonify(Mod, Config) -> case erlang:function_exported(Mod, on_jsonify, 1) of @@ -330,17 +322,17 @@ check_and_create_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) -> +-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_recreate(InstId, ResourceType, RawConfig, Params) -> +check_and_recreate(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Params) end). + fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), term()) -> +-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_recreate_local(InstId, ResourceType, RawConfig, Params) -> +check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Params) end). + fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end). check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> case check_config(ResourceType, RawConfig) of diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 50b236daa..dc0795fb3 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -15,29 +15,21 @@ %%-------------------------------------------------------------------- -module(emqx_resource_health_check). --export([child_spec/2]). - -export([start_link/2]). -export([health_check/2]). -child_spec(Name, Sleep) -> - #{id => {health_check, Name}, - start => {?MODULE, start_link, [Name, Sleep]}, - restart => transient, - shutdown => 5000, type => worker, modules => [?MODULE]}. - -start_link(Name, Sleep) -> +start_link(Name, Sleep) -> Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), {ok, Pid}. -health_check(Name, SleepTime) -> +health_check(Name, SleepTime) -> timer:sleep(SleepTime), - case emqx_resource:health_check(Name) of - ok -> + case emqx_resource:health_check(Name) of + ok -> emqx_alarm:deactivate(Name); - {error, _} -> - emqx_alarm:activate(Name, #{name => Name}, - <>) + {error, _} -> + emqx_alarm:activate(Name, #{name => Name}, + <>) end, - health_check(Name, SleepTime). \ No newline at end of file + health_check(Name, SleepTime). diff --git a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl index 571cd6338..6a2b07e94 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl @@ -20,8 +20,17 @@ -export([start_link/0]). -export([init/1, - create_health_check_process/2, - delete_health_check_process/1]). + create_checker/2, + delete_checker/1]). + +-define(HEALTH_CHECK_MOD, emqx_resource_health_check). +-define(ID(NAME), {resource_health_check, NAME}). + +child_spec(Name, Sleep) -> + #{id => ?ID(Name), + start => {?HEALTH_CHECK_MOD, start_link, [Name, Sleep]}, + restart => transient, + shutdown => 5000, type => worker, modules => [?HEALTH_CHECK_MOD]}. start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -30,11 +39,21 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, []}}. -create_health_check_process(Name, Sleep) -> - supervisor:start_child(emqx_resource_health_check_sup, - emqx_resource_health_check:child_spec(Name, Sleep)). +create_checker(Name, Sleep) -> + case supervisor:start_child(?MODULE, child_spec(Name, Sleep)) of + {ok, _} -> ok; + {error, already_present} -> ok; + {error, {already_started, _}} -> ok; + Error -> Error + end. -delete_health_check_process(Name) -> - _ = supervisor:terminate_child(emqx_resource_health_check_sup, {health_check, Name}), - _ = supervisor:delete_child(emqx_resource_health_check_sup, {health_check, Name}), - ok. \ No newline at end of file +delete_checker(Name) -> + case supervisor:terminate_child(?MODULE, {health_check, Name}) of + ok -> + case supervisor:delete_child(?MODULE, {health_check, Name}) of + {error, not_found} -> ok; + Error -> Error + end; + {error, not_found} -> ok; + Error -> Error + end. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index b2f2d0d27..2701b70d6 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -103,17 +103,17 @@ init({Pool, Id}) -> handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) -> {reply, do_create(InstId, ResourceType, Config, Opts), State}; -handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) -> - {reply, do_create_dry_run(InstId, ResourceType, Config), State}; +handle_call({create_dry_run, ResourceType, Config}, _From, State) -> + {reply, do_create_dry_run(ResourceType, Config), State}; -handle_call({recreate, InstId, ResourceType, Config, Params}, _From, State) -> - {reply, do_recreate(InstId, ResourceType, Config, Params), State}; +handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) -> + {reply, do_recreate(InstId, ResourceType, Config, Opts), State}; handle_call({remove, InstId}, _From, State) -> {reply, do_remove(InstId), State}; -handle_call({restart, InstId}, _From, State) -> - {reply, do_restart(InstId), State}; +handle_call({restart, InstId, Opts}, _From, State) -> + {reply, do_restart(InstId, Opts), State}; handle_call({stop, InstId}, _From, State) -> {reply, do_stop(InstId), State}; @@ -142,20 +142,17 @@ code_change(_OldVsn, State, _Extra) -> %% suppress the race condition check, as these functions are protected in gproc workers -dialyzer({nowarn_function, [do_recreate/4, do_create/4, - do_restart/1, + do_restart/2, do_stop/1, do_health_check/1]}). -do_recreate(InstId, ResourceType, NewConfig, Params) -> +do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> - Config = emqx_resource:call_config_merge(ResourceType, OldConfig, - NewConfig, Params), - TestInstId = make_test_id(), - case do_create_dry_run(TestInstId, ResourceType, Config) of + {ok, #{mod := ResourceType} = Data} -> + case do_create_dry_run(ResourceType, NewConfig) of ok -> - do_remove(ResourceType, InstId, ResourceState, false), - do_create(InstId, ResourceType, Config, #{force_create => true}); + do_remove(Data, false), + do_create(InstId, ResourceType, NewConfig, Opts#{force_create => true}); Error -> Error end; @@ -166,100 +163,86 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> end. do_create(InstId, ResourceType, Config, Opts) -> - ForceCreate = maps:get(force_create, Opts, false), case lookup(InstId) of {ok, _} -> {ok, already_created}; - _ -> - Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => starting, state => undefined}, - %% The `emqx_resource:call_start/3` need the instance exist beforehand - ets:insert(emqx_resource_instance, {InstId, Res0}), - case emqx_resource:call_start(InstId, ResourceType, Config) of - {ok, ResourceState} -> - ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), - %% this is the first time we do health check, this will update the - %% status and then do ets:insert/2 - _ = do_health_check(Res0#{state => ResourceState}), - HealthCheckInterval = maps:get(health_check_interval, Opts, 15000), - emqx_resource_health_check_sup:create_health_check_process(InstId, HealthCheckInterval), + {error, not_found} -> + case do_start(InstId, ResourceType, Config, Opts) of + ok -> + ok = emqx_resource_health_check_sup:create_checker(InstId, + maps:get(health_check_interval, Opts, 15000)), + ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), {ok, force_lookup(InstId)}; - {error, Reason} when ForceCreate == true -> - logger:error("start ~ts resource ~ts failed: ~p, " - "force_create it as a stopped resource", - [ResourceType, InstId, Reason]), - ets:insert(emqx_resource_instance, {InstId, Res0}), - {ok, Res0}; - {error, Reason} when ForceCreate == false -> - ets:delete(emqx_resource_instance, InstId), - {error, Reason} + Error -> Error end end. -do_create_dry_run(InstId, ResourceType, Config) -> - case emqx_resource:call_start(InstId, ResourceType, Config) of - {ok, ResourceState0} -> - Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of - {ok, ResourceState1} -> ok; - {error, Reason, ResourceState1} -> - {error, Reason} - end, - _ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1), +do_create_dry_run(ResourceType, Config) -> + InstId = make_test_id(), + Opts = #{force_create => false}, + case do_create(InstId, ResourceType, Config, Opts) of + {ok, Data} -> + Return = do_health_check(Data), + _ = do_remove(Data), Return; {error, Reason} -> {error, Reason} end. -do_remove(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState}} -> - do_remove(Mod, InstId, ResourceState); - Error -> - Error - end. +do_remove(Instance) -> + do_remove(Instance, true). -do_remove(Mod, InstId, ResourceState) -> - do_remove(Mod, InstId, ResourceState, true). - -do_remove(Mod, InstId, ResourceState, ClearMetrics) -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), +do_remove(InstId, ClearMetrics) when is_binary(InstId) -> + do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]); +do_remove(#{id := InstId} = Data, ClearMetrics) -> + _ = do_stop(Data), ets:delete(emqx_resource_instance, InstId), case ClearMetrics of true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); false -> ok end, - _ = emqx_resource_health_check_sup:delete_health_check_process(InstId), + _ = emqx_resource_health_check_sup:delete_checker(InstId), ok. -do_restart(InstId) -> +do_restart(InstId, Opts) -> case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> - _ = case ResourceState of - undefined -> ok; - _ -> emqx_resource:call_stop(InstId, Mod, ResourceState) - end, - case emqx_resource:call_start(InstId, Mod, Config) of - {ok, NewResourceState} -> - ets:insert(emqx_resource_instance, - {InstId, Data#{state => NewResourceState, status => started}}), - ok; - {error, Reason} -> - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), - {error, Reason} - end; + {ok, #{mod := ResourceType, config := Config} = Data} -> + ok = do_stop(Data), + do_start(InstId, ResourceType, Config, Opts); Error -> Error end. -do_stop(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState} = Data} -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), +do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) -> + ForceCreate = maps:get(force_create, Opts, false), + Res0 = #{id => InstId, mod => ResourceType, config => Config, + status => starting, state => undefined}, + %% The `emqx_resource:call_start/3` need the instance exist beforehand + ets:insert(emqx_resource_instance, {InstId, Res0}), + case emqx_resource:call_start(InstId, ResourceType, Config) of + {ok, ResourceState} -> + %% this is the first time we do health check, this will update the + %% status and then do ets:insert/2 + _ = do_health_check(Res0#{state => ResourceState}), ok; - Error -> - Error + {error, Reason} when ForceCreate == true -> + logger:warning("start ~ts resource ~ts failed: ~p, force_create it", + [ResourceType, InstId, Reason]), + ets:insert(emqx_resource_instance, {InstId, Res0}), + ok; + {error, Reason} when ForceCreate == false -> + ets:delete(emqx_resource_instance, InstId), + {error, Reason} end. +do_stop(InstId) when is_binary(InstId) -> + do_with_instance_data(InstId, fun do_stop/1, []); +do_stop(#{state := undefined}) -> + ok; +do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> + _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + ok. + do_health_check(InstId) when is_binary(InstId) -> case lookup(InstId) of {ok, Data} -> do_health_check(Data); @@ -284,6 +267,12 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> %% internal functions %%------------------------------------------------------------------------------ +do_with_instance_data(InstId, Do, Args) -> + case lookup(InstId) of + {ok, Data} -> erlang:apply(Do, [Data | Args]); + Error -> Error + end. + proc_name(Mod, Id) -> list_to_atom(lists:concat([Mod, "_", Id])). diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index b5655e301..99d601ec4 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -49,7 +49,7 @@ init([]) -> #{id => emqx_resource_health_check_sup, start => {emqx_resource_health_check_sup, start_link, []}, restart => transient, - shutdown => 5000, type => supervisor, modules => [emqx_resource_health_check_sup]}, + shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]}, {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}. %% internal functions From e1ab331a30b2ef32289b9de255ebb8438dc03a3c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 2 Jan 2022 20:11:25 +0800 Subject: [PATCH 22/25] refactor(resource): support async create mode --- apps/emqx_authn/src/emqx_authn_api.erl | 2 +- apps/emqx_bridge/src/emqx_bridge.erl | 5 +- apps/emqx_resource/include/emqx_resource.hrl | 2 +- .../src/emqx_resource_instance.erl | 64 +++++++++++-------- 4 files changed, 42 insertions(+), 31 deletions(-) diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index 64ce3e6a4..a7e073581 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -979,7 +979,7 @@ authenticator_examples() -> mechanism => <<"password-based">>, backend => <<"http">>, method => <<"post">>, - url => <<"http://127.0.0.2:8080">>, + url => <<"http://127.0.0.1:18083">>, headers => #{ <<"content-type">> => <<"application/json">> }, diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 6e014f2ec..b7a74c9a4 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -218,7 +218,7 @@ create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), #{force_create => true}) of + parse_confs(Type, Name, Conf), #{async_create => true}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} @@ -263,7 +263,8 @@ recreate(Type, Name) -> recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), - emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []). + emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), + #{async_create => true}). create_dry_run(Type, Conf) -> Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index ed1de18cf..363e40a5f 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -33,7 +33,7 @@ %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails, %% the 'status' of the resource will be 'stopped' in this case. %% Defaults to 'false' - force_create => boolean() + async_create => boolean() }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 2701b70d6..4ac72ca4d 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -148,14 +148,19 @@ code_change(_OldVsn, State, _Extra) -> do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, #{mod := ResourceType} = Data} -> + {ok, #{mod := ResourceType, status := started} = Data} -> + %% If this resource is in use (status='started'), we should make sure + %% the new config is OK before removing the old one. case do_create_dry_run(ResourceType, NewConfig) of ok -> do_remove(Data, false), - do_create(InstId, ResourceType, NewConfig, Opts#{force_create => true}); + do_create(InstId, ResourceType, NewConfig, Opts); Error -> Error end; + {ok, #{mod := ResourceType, status := _} = Data} -> + do_remove(Data, false), + do_create(InstId, ResourceType, NewConfig, Opts); {ok, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> @@ -164,21 +169,21 @@ do_recreate(InstId, ResourceType, NewConfig, Opts) -> do_create(InstId, ResourceType, Config, Opts) -> case lookup(InstId) of - {ok, _} -> {ok, already_created}; + {ok, _} -> + {ok, already_created}; {error, not_found} -> case do_start(InstId, ResourceType, Config, Opts) of ok -> - ok = emqx_resource_health_check_sup:create_checker(InstId, - maps:get(health_check_interval, Opts, 15000)), ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), {ok, force_lookup(InstId)}; - Error -> Error + Error -> + Error end end. do_create_dry_run(ResourceType, Config) -> InstId = make_test_id(), - Opts = #{force_create => false}, + Opts = #{async_create => false}, case do_create(InstId, ResourceType, Config, Opts) of {ok, Data} -> Return = do_health_check(Data), @@ -200,7 +205,6 @@ do_remove(#{id := InstId} = Data, ClearMetrics) -> true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); false -> ok end, - _ = emqx_resource_health_check_sup:delete_checker(InstId), ok. do_restart(InstId, Opts) -> @@ -213,24 +217,32 @@ do_restart(InstId, Opts) -> end. do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) -> - ForceCreate = maps:get(force_create, Opts, false), - Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => starting, state => undefined}, + InitData = #{id => InstId, mod => ResourceType, config => Config, + status => starting, state => undefined}, %% The `emqx_resource:call_start/3` need the instance exist beforehand - ets:insert(emqx_resource_instance, {InstId, Res0}), + ets:insert(emqx_resource_instance, {InstId, InitData}), + case maps:get(async_create, Opts, false) of + false -> + start_and_check(InstId, ResourceType, Config, Opts, InitData); + true -> + spawn(fun() -> + start_and_check(InstId, ResourceType, Config, Opts, InitData) + end), + ok + end. + +start_and_check(InstId, ResourceType, Config, Opts, Data) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> - %% this is the first time we do health check, this will update the - %% status and then do ets:insert/2 - _ = do_health_check(Res0#{state => ResourceState}), - ok; - {error, Reason} when ForceCreate == true -> - logger:warning("start ~ts resource ~ts failed: ~p, force_create it", - [ResourceType, InstId, Reason]), - ets:insert(emqx_resource_instance, {InstId, Res0}), - ok; - {error, Reason} when ForceCreate == false -> - ets:delete(emqx_resource_instance, InstId), + Data2 = Data#{state => ResourceState}, + ets:insert(emqx_resource_instance, {InstId, Data2}), + case maps:get(async_create, Opts, false) of + false -> do_health_check(Data2); + true -> emqx_resource_health_check_sup:create_checker(InstId, + maps:get(health_check_interval, Opts, 15000)) + end; + {error, Reason} -> + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), {error, Reason} end. @@ -240,14 +252,12 @@ do_stop(#{state := undefined}) -> ok; do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + ok = emqx_resource_health_check_sup:delete_checker(InstId), ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), ok. do_health_check(InstId) when is_binary(InstId) -> - case lookup(InstId) of - {ok, Data} -> do_health_check(Data); - Error -> Error - end; + do_with_instance_data(InstId, fun do_health_check/1, []); do_health_check(#{state := undefined}) -> {error, resource_not_initialized}; do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> From a64b29ff76f9278d053b1264cf6bc48fd91407b5 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 2 Jan 2022 20:32:24 +0800 Subject: [PATCH 23/25] fix(resource): re-create the helth checker if already exists --- .../src/emqx_resource_health_check.erl | 35 ++++++++++++++++++- .../src/emqx_resource_health_check_sup.erl | 32 +---------------- .../src/emqx_resource_instance.erl | 4 +-- 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index dc0795fb3..dfda683e1 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -15,14 +15,47 @@ %%-------------------------------------------------------------------- -module(emqx_resource_health_check). --export([start_link/2]). +-export([ start_link/2 + , create_checker/2 + , delete_checker/1 + ]). -export([health_check/2]). +-define(SUP, emqx_resource_health_check_sup). +-define(ID(NAME), {resource_health_check, NAME}). + +child_spec(Name, Sleep) -> + #{id => ?ID(Name), + start => {?MODULE, start_link, [Name, Sleep]}, + restart => transient, + shutdown => 5000, type => worker, modules => [?MODULE]}. + start_link(Name, Sleep) -> Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), {ok, Pid}. +create_checker(Name, Sleep) -> + case supervisor:start_child(?SUP, child_spec(Name, Sleep)) of + {ok, _} -> ok; + {error, already_present} -> ok; + {error, {already_started, _}} -> + ok = delete_checker(Name), + create_checker(Name, Sleep); + Error -> Error + end. + +delete_checker(Name) -> + case supervisor:terminate_child(?SUP, {health_check, Name}) of + ok -> + case supervisor:delete_child(?SUP, {health_check, Name}) of + {error, not_found} -> ok; + Error -> Error + end; + {error, not_found} -> ok; + Error -> Error + end. + health_check(Name, SleepTime) -> timer:sleep(SleepTime), case emqx_resource:health_check(Name) of diff --git a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl index 6a2b07e94..e17186114 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl @@ -19,18 +19,7 @@ -export([start_link/0]). --export([init/1, - create_checker/2, - delete_checker/1]). - --define(HEALTH_CHECK_MOD, emqx_resource_health_check). --define(ID(NAME), {resource_health_check, NAME}). - -child_spec(Name, Sleep) -> - #{id => ?ID(Name), - start => {?HEALTH_CHECK_MOD, start_link, [Name, Sleep]}, - restart => transient, - shutdown => 5000, type => worker, modules => [?HEALTH_CHECK_MOD]}. +-export([init/1]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -38,22 +27,3 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, []}}. - -create_checker(Name, Sleep) -> - case supervisor:start_child(?MODULE, child_spec(Name, Sleep)) of - {ok, _} -> ok; - {error, already_present} -> ok; - {error, {already_started, _}} -> ok; - Error -> Error - end. - -delete_checker(Name) -> - case supervisor:terminate_child(?MODULE, {health_check, Name}) of - ok -> - case supervisor:delete_child(?MODULE, {health_check, Name}) of - {error, not_found} -> ok; - Error -> Error - end; - {error, not_found} -> ok; - Error -> Error - end. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 4ac72ca4d..a847e85f5 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -238,7 +238,7 @@ start_and_check(InstId, ResourceType, Config, Opts, Data) -> ets:insert(emqx_resource_instance, {InstId, Data2}), case maps:get(async_create, Opts, false) of false -> do_health_check(Data2); - true -> emqx_resource_health_check_sup:create_checker(InstId, + true -> emqx_resource_health_check:create_checker(InstId, maps:get(health_check_interval, Opts, 15000)) end; {error, Reason} -> @@ -252,7 +252,7 @@ do_stop(#{state := undefined}) -> ok; do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), - ok = emqx_resource_health_check_sup:delete_checker(InstId), + ok = emqx_resource_health_check:delete_checker(InstId), ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), ok. From 11736dc1d7b8b211c2256ad11e3fad3d5b12c07a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 2 Jan 2022 22:45:32 +0800 Subject: [PATCH 24/25] fix(bridge): check health immediately after updated --- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- .../src/emqx_resource_health_check.erl | 18 ++++++------- .../src/emqx_resource_instance.erl | 27 ++++++++++--------- 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index b7a74c9a4..786b99c44 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -247,7 +247,7 @@ update(Type, Name, {OldConf, Conf}) -> ?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one" , type => Type, name => Name, config => Conf}), create(Type, Name, Conf); - {error, Reason} -> {update_bridge_failed, Reason} + {error, Reason} -> {error, {update_bridge_failed, Reason}} end; true -> %% we don't need to recreate the bridge if this config change is only to diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index dfda683e1..032ff6999 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -36,28 +36,25 @@ start_link(Name, Sleep) -> {ok, Pid}. create_checker(Name, Sleep) -> + create_checker(Name, Sleep, false). + +create_checker(Name, Sleep, Retry) -> case supervisor:start_child(?SUP, child_spec(Name, Sleep)) of {ok, _} -> ok; {error, already_present} -> ok; - {error, {already_started, _}} -> + {error, {already_started, _}} when Retry == false -> ok = delete_checker(Name), - create_checker(Name, Sleep); + create_checker(Name, Sleep, true); Error -> Error end. delete_checker(Name) -> - case supervisor:terminate_child(?SUP, {health_check, Name}) of - ok -> - case supervisor:delete_child(?SUP, {health_check, Name}) of - {error, not_found} -> ok; - Error -> Error - end; - {error, not_found} -> ok; + case supervisor:terminate_child(?SUP, ?ID(Name)) of + ok -> supervisor:delete_child(?SUP, ?ID(Name)); Error -> Error end. health_check(Name, SleepTime) -> - timer:sleep(SleepTime), case emqx_resource:health_check(Name) of ok -> emqx_alarm:deactivate(Name); @@ -65,4 +62,5 @@ health_check(Name, SleepTime) -> emqx_alarm:activate(Name, #{name => Name}, <>) end, + timer:sleep(SleepTime), health_check(Name, SleepTime). diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index a847e85f5..86318e355 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -140,11 +140,14 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% suppress the race condition check, as these functions are protected in gproc workers --dialyzer({nowarn_function, [do_recreate/4, - do_create/4, - do_restart/2, - do_stop/1, - do_health_check/1]}). +-dialyzer({nowarn_function, [ do_recreate/4 + , do_create/4 + , do_restart/2 + , do_start/4 + , do_stop/1 + , do_health_check/1 + , start_and_check/5 + ]}). do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of @@ -183,12 +186,12 @@ do_create(InstId, ResourceType, Config, Opts) -> do_create_dry_run(ResourceType, Config) -> InstId = make_test_id(), - Opts = #{async_create => false}, - case do_create(InstId, ResourceType, Config, Opts) of - {ok, Data} -> - Return = do_health_check(Data), - _ = do_remove(Data), - Return; + case emqx_resource:call_start(InstId, ResourceType, Config) of + {ok, ResourceState} -> + case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of + {ok, _} -> ok; + {error, Reason, _} -> {error, Reason} + end; {error, Reason} -> {error, Reason} end. @@ -252,7 +255,7 @@ do_stop(#{state := undefined}) -> ok; do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), - ok = emqx_resource_health_check:delete_checker(InstId), + _ = emqx_resource_health_check:delete_checker(InstId), ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), ok. From e95445728c1570ec05783fd39281ab5746574f2d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 2 Jan 2022 23:46:58 +0800 Subject: [PATCH 25/25] fix(test): wait until the bridge ready --- apps/emqx_bridge/src/emqx_bridge.erl | 5 ++++ .../test/emqx_bridge_api_SUITE.erl | 13 +++++++++++ .../test/emqx_connector_api_SUITE.erl | 23 ++++++++++++++++++- 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 786b99c44..f27603bf9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -35,6 +35,7 @@ ]). -export([ load/0 + , lookup/1 , lookup/2 , lookup/3 , list/0 @@ -191,6 +192,10 @@ list_bridges_by_connector(ConnectorId) -> [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(), ConnectorId =:= Id]. +lookup(Id) -> + {Type, Name} = parse_bridge_id(Id), + lookup(Type, Name). + lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 807ad32f6..16da7395f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -160,6 +160,7 @@ t_http_crud_apis(_) -> } = jsx:decode(Bridge), %% send an message to emqx and the message should be forwarded to the HTTP server + wait_for_resource_ready(BridgeID, 5), Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( @@ -212,6 +213,7 @@ t_http_crud_apis(_) -> }, jsx:decode(Bridge3Str)), %% send an message to emqx again, check the path has been changed + wait_for_resource_ready(BridgeID, 5), emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( receive @@ -320,3 +322,14 @@ auth_header_() -> operation_path(Oper, BridgeID) -> uri(["bridges", BridgeID, "operation", Oper]). + +wait_for_resource_ready(InstId, 0) -> + ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), + ct:fail(wait_resource_timeout); +wait_for_resource_ready(InstId, Retry) -> + case emqx_bridge:lookup(InstId) of + {ok, #{resource_data := #{status := started}}} -> ok; + _ -> + timer:sleep(100), + wait_for_resource_ready(InstId, Retry-1) + end. diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 4caf700a9..12a3a8e23 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -241,6 +241,7 @@ t_mqtt_conn_bridge_ingress(_) -> emqx:subscribe(LocalTopic), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -309,6 +310,7 @@ t_mqtt_conn_bridge_egress(_) -> emqx:subscribe(RemoteTopic), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic @@ -370,6 +372,7 @@ t_mqtt_conn_update(_) -> , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 2), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' @@ -412,6 +415,11 @@ t_mqtt_conn_update2(_) -> , <<"status">> := <<"disconnected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + %% We try to fix the 'server' parameter, to another unavailable server.. + %% The update should success: we don't check the connectivity of the new config + %% if the resource is now disconnected. + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:2604">>)), %% we fix the 'server' parameter to a normal one, it should work {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), @@ -444,9 +452,9 @@ t_mqtt_conn_update3(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS }), #{ <<"id">> := BridgeIDEgress - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 2), %% delete the connector should fail because it is in use by a bridge {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), @@ -499,6 +507,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> emqx:subscribe(LocalTopic), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -563,6 +572,7 @@ t_egress_mqtt_bridge_with_rules(_) -> emqx:subscribe(RemoteTopic), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic ?assert( @@ -583,6 +593,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RuleTopic = <<"t/1">>, RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, emqx:subscribe(RemoteTopic2), + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(RuleTopic, Payload2)), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId @@ -646,3 +657,13 @@ auth_header_() -> {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. +wait_for_resource_ready(InstId, 0) -> + ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), + ct:fail(wait_resource_timeout); +wait_for_resource_ready(InstId, Retry) -> + case emqx_bridge:lookup(InstId) of + {ok, #{resource_data := #{status := started}}} -> ok; + _ -> + timer:sleep(100), + wait_for_resource_ready(InstId, Retry-1) + end.