From cfaad153648e61ce2e3898a4fda598bc35bcde7d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 18 Dec 2021 20:19:34 +0800 Subject: [PATCH] fix(connector): update the connector config failed --- apps/emqx_connector/src/emqx_connector_mqtt.erl | 15 ++++++++++----- .../test/emqx_connector_api_SUITE.erl | 7 +++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 079f17716..f8d17ce32 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -129,12 +129,11 @@ on_start(InstId, Conf) -> }, case ?MODULE:create_bridge(BridgeConf) of {ok, _Pid} -> - case emqx_connector_mqtt_worker:ensure_started(InstanceId) of - ok -> {ok, #{name => InstanceId}}; - {error, Reason} -> {error, Reason} - end; + ensure_mqtt_worker_started(InstanceId); {error, {already_started, _Pid}} -> - {ok, #{name => InstanceId}}; + ok = ?MODULE:drop_bridge(InstanceId), + {ok, _} = ?MODULE:create_bridge(BridgeConf), + ensure_mqtt_worker_started(InstanceId); {error, Reason} -> {error, Reason} end. @@ -162,6 +161,12 @@ on_health_check(_InstId, #{name := InstanceId} = State) -> _ -> {error, {connector_down, InstanceId}, State} end. +ensure_mqtt_worker_started(InstanceId) -> + case emqx_connector_mqtt_worker:ensure_started(InstanceId) of + ok -> {ok, #{name => InstanceId}}; + {error, Reason} -> {error, Reason} + end. + make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 -> undefined; make_sub_confs(undefined) -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 11f9460b4..307852546 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -422,9 +422,12 @@ t_mqtt_conn_update2(_) -> , <<"connector">> := ?CONNECTR_ID }, jsx:decode(Bridge)), %% we fix the 'server' parameter to a normal one, it should work - {ok, 200, Bridge2} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge2)), + {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + , <<"status">> := <<"connected">> + }, jsx:decode(BridgeStr)), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),