fix(connector): update the connector config failed
This commit is contained in:
parent
a44e18e869
commit
cfaad15364
|
@ -129,12 +129,11 @@ on_start(InstId, Conf) ->
|
||||||
},
|
},
|
||||||
case ?MODULE:create_bridge(BridgeConf) of
|
case ?MODULE:create_bridge(BridgeConf) of
|
||||||
{ok, _Pid} ->
|
{ok, _Pid} ->
|
||||||
case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
|
ensure_mqtt_worker_started(InstanceId);
|
||||||
ok -> {ok, #{name => InstanceId}};
|
|
||||||
{error, Reason} -> {error, Reason}
|
|
||||||
end;
|
|
||||||
{error, {already_started, _Pid}} ->
|
{error, {already_started, _Pid}} ->
|
||||||
{ok, #{name => InstanceId}};
|
ok = ?MODULE:drop_bridge(InstanceId),
|
||||||
|
{ok, _} = ?MODULE:create_bridge(BridgeConf),
|
||||||
|
ensure_mqtt_worker_started(InstanceId);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -162,6 +161,12 @@ on_health_check(_InstId, #{name := InstanceId} = State) ->
|
||||||
_ -> {error, {connector_down, InstanceId}, State}
|
_ -> {error, {connector_down, InstanceId}, State}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
ensure_mqtt_worker_started(InstanceId) ->
|
||||||
|
case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
|
||||||
|
ok -> {ok, #{name => InstanceId}};
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
|
make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
|
||||||
undefined;
|
undefined;
|
||||||
make_sub_confs(undefined) ->
|
make_sub_confs(undefined) ->
|
||||||
|
|
|
@ -422,9 +422,12 @@ t_mqtt_conn_update2(_) ->
|
||||||
, <<"connector">> := ?CONNECTR_ID
|
, <<"connector">> := ?CONNECTR_ID
|
||||||
}, jsx:decode(Bridge)),
|
}, jsx:decode(Bridge)),
|
||||||
%% we fix the 'server' parameter to a normal one, it should work
|
%% we fix the 'server' parameter to a normal one, it should work
|
||||||
{ok, 200, Bridge2} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
||||||
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
|
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
|
||||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge2)),
|
{ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
||||||
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
||||||
|
, <<"status">> := <<"connected">>
|
||||||
|
}, jsx:decode(BridgeStr)),
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
|
|
Loading…
Reference in New Issue