diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index 31627db81..16092f262 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -33,6 +33,9 @@ desc/1 ]). +%% Allocatable resources +-define(hstreamdb_client, hstreamdb_client). + -define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)). -define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). @@ -43,17 +46,22 @@ callback_mode() -> always_sync. on_start(InstId, Config) -> start_client(InstId, Config). -on_stop(InstId, #{client := Client, producer := Producer}) -> - StopClientRes = hstreamdb:stop_client(Client), - StopProducerRes = hstreamdb:stop_producer(Producer), - ?SLOG(info, #{ - msg => "stop hstreamdb connector", - connector => InstId, - client => Client, - producer => Producer, - stop_client => StopClientRes, - stop_producer => StopProducerRes - }). +on_stop(InstId, _State) -> + case emqx_resource:get_allocated_resources(InstId) of + #{client := Client, producer := Producer} -> + StopClientRes = hstreamdb:stop_client(Client), + StopProducerRes = hstreamdb:stop_producer(Producer), + ?SLOG(info, #{ + msg => "stop hstreamdb connector", + connector => InstId, + client => Client, + producer => Producer, + stop_client => StopClientRes, + stop_producer => StopProducerRes + }); + _ -> + ok + end. -define(FAILED_TO_APPLY_HRECORD_TEMPLATE, {error, {unrecoverable_error, failed_to_apply_hrecord_template}} @@ -237,6 +245,9 @@ start_producer( ), record_template => record_template(Options) }, + ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{ + client => Client, producer => Producer + }), {ok, State}; {error, {already_started, Pid}} -> ?SLOG(info, #{