diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index d68f624e4..7f84c665a 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -380,7 +380,8 @@ on_query_async( NRequest, Timeout, {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs]} - ). + ), + {ok, Worker}. on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> case do_get_status(PoolName, Timeout) of diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 585122539..71ed81dda 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -198,7 +198,10 @@ on_query_async( #{name := InstanceId} ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). + %% this is a cast, currently. + ok = emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}), + WorkerPid = get_worker_pid(InstanceId), + {ok, WorkerPid}. on_get_status(_InstId, #{name := InstanceId}) -> case emqx_connector_mqtt_worker:status(InstanceId) of @@ -212,6 +215,12 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) -> {error, Reason} -> {error, Reason} end. +%% mqtt workers, when created and called via bridge callbacks, are +%% registered. +-spec get_worker_pid(atom()) -> pid(). +get_worker_pid(InstanceId) -> + whereis(InstanceId). + make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> undefined; make_sub_confs(undefined, _Conf, _) -> diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 0cdd02f35..4e0ae9352 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -66,10 +66,11 @@ ?REPLY(FROM, REQUEST, SENT, RESULT) || ?QUERY(FROM, REQUEST, SENT) <- BATCH ]). --define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerPid), - {Ref, BatchOrQuery, IsRetriable, WorkerPid} +-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef), + {Ref, BatchOrQuery, IsRetriable, WorkerMRef} ). -define(RETRY_IDX, 3). +-define(WORKER_MREF_IDX, 4). -type id() :: binary(). -type index() :: pos_integer(). @@ -79,14 +80,15 @@ -type request_from() :: undefined | gen_statem:from(). -type state() :: blocked | running. -type data() :: #{ - id => id(), - index => index(), - inflight_tid => ets:tid(), - batch_size => pos_integer(), - batch_time => timer:time(), - queue => replayq:q(), - resume_interval => timer:time(), - tref => undefined | timer:tref() + id := id(), + index := index(), + inflight_tid := ets:tid(), + async_workers := #{pid() => reference()}, + batch_size := pos_integer(), + batch_time := timer:time(), + queue := replayq:q(), + resume_interval := timer:time(), + tref := undefined | timer:tref() }. callback_mode() -> [state_functions, state_enter]. @@ -172,21 +174,22 @@ init({Id, Index, Opts}) -> Queue = replayq:open(QueueOpts), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:inflight_set(Id, Index, 0), - InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), - InflightTID = inflight_new(InfltWinSZ, Id, Index), - HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), - St = #{ + InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), + InflightTID = inflight_new(InflightWinSize, Id, Index), + HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), + Data = #{ id => Id, index => Index, inflight_tid => InflightTID, + async_workers => #{}, batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, - resume_interval => maps:get(resume_interval, Opts, HCItvl), + resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), tref => undefined }, ?tp(resource_worker_init, #{id => Id, index => Index}), - {ok, blocked, St, {next_event, cast, resume}}. + {ok, blocked, Data, {next_event, cast, resume}}. running(enter, _, St) -> ?tp(resource_worker_enter_running, #{}), @@ -203,6 +206,11 @@ running(internal, flush, St) -> flush(St); running(info, {flush, _Ref}, _St) -> keep_state_and_data; +running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when + is_map_key(Pid, AsyncWorkers0) +-> + ?SLOG(info, #{msg => async_worker_died, state => running, reason => Reason}), + handle_async_worker_down(Data0, Pid); running(info, Info, _St) -> ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}), keep_state_and_data. @@ -224,6 +232,11 @@ blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) - {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> keep_state_and_data; +blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when + is_map_key(Pid, AsyncWorkers0) +-> + ?SLOG(info, #{msg => async_worker_died, state => blocked, reason => Reason}), + handle_async_worker_down(Data0, Pid); blocked(info, Info, _Data) -> ?SLOG(error, #{msg => unexpected_msg, state => blocked, info => Info}), keep_state_and_data. @@ -458,11 +471,15 @@ do_flush( is_recoverable_error_result(Result) orelse is_not_connected_result(Result), ShouldPreserveInInflight = is_not_connected_result(Result), - WorkerPid = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerPid), + %% we set it atomically just below; a limitation of having + %% to use tuples for atomic ets updates + WorkerMRef0 = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0), ShouldPreserveInInflight andalso inflight_append(InflightTID, InflightItem, Id, Index), IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), + {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), + store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( resource_worker_flush_nack, @@ -473,18 +490,20 @@ do_flush( result => Result } ), - {next_state, blocked, Data0}; + {next_state, blocked, Data1}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), + {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), + store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp(resource_worker_flush_ack, #{batch_or_query => Request}), case queue_count(Q1) > 0 of true -> - {keep_state, Data0, [{next_event, internal, flush}]}; + {keep_state, Data1, [{next_event, internal, flush}]}; false -> - {keep_state, Data0} + {keep_state, Data1} end end; do_flush(Data0, #{ @@ -518,11 +537,15 @@ do_flush(Data0, #{ is_recoverable_error_result(Result) orelse is_not_connected_result(Result), ShouldPreserveInInflight = is_not_connected_result(Result), - WorkerPid = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + %% we set it atomically just below; a limitation of having + %% to use tuples for atomic ets updates + WorkerMRef0 = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0), ShouldPreserveInInflight andalso inflight_append(InflightTID, InflightItem, Id, Index), IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), + {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), + store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp( resource_worker_flush_nack, @@ -533,22 +556,24 @@ do_flush(Data0, #{ result => Result } ), - {next_state, blocked, Data0}; + {next_state, blocked, Data1}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), + {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), + store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), ?tp(resource_worker_flush_ack, #{batch_or_query => Batch}), CurrentCount = queue_count(Q1), case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> - {keep_state, Data0}; + {keep_state, Data1}; {true, true} -> - {keep_state, Data0, [{next_event, internal, flush}]}; + {keep_state, Data1, [{next_event, internal, flush}]}; {true, false} -> - Data1 = ensure_flush_timer(Data0), - {keep_state, Data1} + Data2 = ensure_flush_timer(Data1), + {keep_state, Data2} end end. @@ -653,6 +678,8 @@ handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent) -> {ack, PostFn}; handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) -> {ack, fun() -> ok end}; +handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) -> + {ack, fun() -> ok end}; handle_query_result_pure(Id, Result, HasBeenSent) -> PostFn = fun() -> assert_ok_result(Result), @@ -661,6 +688,13 @@ handle_query_result_pure(Id, Result, HasBeenSent) -> end, {ack, PostFn}. +handle_async_worker_down(Data0, Pid) -> + #{async_workers := AsyncWorkers0} = Data0, + {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), + Data = Data0#{async_workers := AsyncWorkers}, + cancel_inflight_items(Data, WorkerMRef), + {keep_state, Data}. + is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when Error =:= not_connected; Error =:= blocked -> @@ -723,8 +757,8 @@ apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, {async_return, inflight_full}; false -> IsRetriable = false, - WorkerPid = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + WorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), ok = inflight_append(InflightTID, InflightItem, Id, Index), Mod:on_query(Id, Request, ResSt) end, @@ -745,8 +779,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt ReplyFun = fun ?MODULE:reply_after_query/7, Args = [self(), Id, Index, InflightTID, Ref, Query], IsRetriable = false, - WorkerPid = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + WorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), ok = inflight_append(InflightTID, InflightItem, Id, Index), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), {async_return, Result} @@ -769,8 +803,8 @@ apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, {async_return, inflight_full}; false -> IsRetriable = false, - WorkerPid = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + WorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), ok = inflight_append(InflightTID, InflightItem, Id, Index), Mod:on_batch_query(Id, Requests, ResSt) end, @@ -792,8 +826,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], IsRetriable = false, - WorkerPid = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + WorkerMRef = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), ok = inflight_append(InflightTID, InflightItem, Id, Index), Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), {async_return, Result} @@ -905,7 +939,7 @@ inflight_new(InfltWinSZ, Id, Index) -> inflight_get_first_retriable(InflightTID) -> MatchSpec = ets:fun2ms( - fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerPid)) when + fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when IsRetriable =:= true -> {Ref, BatchOrQuery} @@ -944,12 +978,12 @@ inflight_append(undefined, _InflightItem, _Id, _Index) -> ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerPid), + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerMRef), Id, Index ) -> Batch = mark_as_sent(Batch0), - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), @@ -958,12 +992,12 @@ inflight_append( ok; inflight_append( InflightTID, - ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerPid), + ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerMRef), Id, Index ) -> Query = mark_as_sent(Query0), - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), @@ -983,14 +1017,46 @@ mark_inflight_as_retriable(InflightTID, Ref) -> _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}), ok. +%% Track each worker pid only once. +ensure_async_worker_monitored( + Data0 = #{async_workers := AsyncWorkers}, {async_return, {ok, WorkerPid}} = _Result +) when + is_pid(WorkerPid), is_map_key(WorkerPid, AsyncWorkers) +-> + WorkerMRef = maps:get(WorkerPid, AsyncWorkers), + {Data0, WorkerMRef}; +ensure_async_worker_monitored( + Data0 = #{async_workers := AsyncWorkers0}, {async_return, {ok, WorkerPid}} +) when + is_pid(WorkerPid) +-> + WorkerMRef = monitor(process, WorkerPid), + AsyncWorkers = AsyncWorkers0#{WorkerPid => WorkerMRef}, + Data = Data0#{async_workers := AsyncWorkers}, + {Data, WorkerMRef}; +ensure_async_worker_monitored(Data0, _Result) -> + {Data0, undefined}. + +store_async_worker_reference(undefined = _InflightTID, _Ref, _WorkerMRef) -> + ok; +store_async_worker_reference(_InflightTID, _Ref, undefined = _WorkerRef) -> + ok; +store_async_worker_reference(InflightTID, Ref, WorkerMRef) when + is_reference(WorkerMRef) +-> + _ = ets:update_element( + InflightTID, Ref, {?WORKER_MREF_IDX, WorkerMRef} + ), + ok. + ack_inflight(undefined, _Ref, _Id, _Index) -> false; ack_inflight(InflightTID, Ref, Id, Index) -> Count = case ets:take(InflightTID, Ref) of - [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerPid)] -> + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerMRef)] -> 1; - [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerPid)] -> + [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> length(Batch); _ -> 0 @@ -1000,6 +1066,38 @@ ack_inflight(InflightTID, Ref, Id, Index) -> emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), IsAcked. +cancel_inflight_items(Data, WorkerMRef) -> + #{inflight_tid := InflightTID} = Data, + MatchSpec = + ets:fun2ms( + fun(?INFLIGHT_ITEM(Ref, _BatchOrQuery, _IsRetriable, WorkerMRef0)) when + WorkerMRef =:= WorkerMRef0 + -> + Ref + end + ), + Refs = ets:select(InflightTID, MatchSpec), + lists:foreach(fun(Ref) -> do_cancel_inflight_item(Data, Ref) end, Refs). + +do_cancel_inflight_item(Data, Ref) -> + #{id := Id, index := Index, inflight_tid := InflightTID} = Data, + {Count, Batch} = + case ets:take(InflightTID, Ref) of + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _) = Query, _IsRetriable, _WorkerMRef)] -> + {1, [Query]}; + [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, _IsRetriable, _WorkerMRef)] -> + {length(Batch0), Batch0}; + _ -> + {0, []} + end, + IsAcked = Count > 0, + IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), + Result = {error, interrupted}, + _ = batch_reply_caller(Id, Result, Batch), + ?tp(resource_worker_cancelled_inflight, #{ref => Ref}), + ok. + %%============================================================================== inc_sent_failed(Id, _HasBeenSent = true) -> diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index bbd2ba058..15d4a3b46 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -138,10 +138,10 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) -> Pid ! {inc, N, ReplyFun}, - ok; + {ok, Pid}; on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) -> Pid ! {get, ReplyFun}, - ok; + {ok, Pid}; on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> Pid ! {block_now, ReplyFun}, {ok, Pid}; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 2d6856acf..4ffb259e8 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1422,6 +1422,86 @@ t_dont_retry_async_inflight_batch(_Config) -> ), ok. +%% check that we monitor async worker pids and abort their inflight +%% requests if they die. +t_async_pool_worker_death(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + worker_pool_size => 2, + resume_interval => ResumeInterval + } + ), + Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), + Insert0 = fun(Tab, Ref, Result) -> + ct:pal("inserting ~p", [{Ref, Result}]), + ets:insert(Tab, {Ref, Result}) + end, + ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end, + ?check_trace( + begin + ok = emqx_resource:simple_sync_query(?ID, block), + + NumReqs = 10, + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event(#{?snk_kind := resource_worker_appended_to_inflight}), + NumReqs, + 1_000 + ), + inc_counter_in_parallel_increasing(NumReqs, 1, ReqOpts), + {ok, _} = snabbkaffe:receive_events(SRef0), + + Inflight0 = emqx_resource_metrics:inflight_get(?ID), + ?assertEqual(NumReqs, Inflight0), + + %% grab one of the worker pids and kill it + {ok, SRef1} = + snabbkaffe:subscribe( + ?match_event(#{?snk_kind := resource_worker_cancelled_inflight}), + NumReqs, + 1_000 + ), + {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state), + MRef = monitor(process, Pid0), + ct:pal("will kill ~p", [Pid0]), + exit(Pid0, kill), + receive + {'DOWN', MRef, process, Pid0, killed} -> + ct:pal("~p killed", [Pid0]), + ok + after 200 -> + ct:fail("worker should have died") + end, + + %% inflight requests should have been cancelled + {ok, _} = snabbkaffe:receive_events(SRef1), + Inflight1 = emqx_resource_metrics:inflight_get(?ID), + ?assertEqual(0, Inflight1), + + ?assert( + lists:all( + fun + ({_, {error, interrupted}}) -> true; + (_) -> false + end, + ets:tab2list(Tab0) + ), + #{tab => ets:tab2list(Tab0)} + ), + ok + end, + [] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index 262641d44..3af1868c7 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,7 +1,7 @@ {erl_opts, [debug_info]}. {deps, [ {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.6"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.7"}}}, {emqx, {path, "../../apps/emqx"}} ]}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 37d193edf..041bdec08 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -178,7 +178,7 @@ on_query(BridgeId, {send_message, Selected}, State) -> {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() -) -> ok. +) -> {ok, pid()}. on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) -> Requests = [{send_message, Selected}], ?TRACE( @@ -210,7 +210,7 @@ on_batch_query(BridgeId, Requests, State) -> [{send_message, map()}], {ReplyFun :: function(), Args :: list()}, state() -) -> ok. +) -> {ok, pid()}. on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) -> ?TRACE( "QUERY_ASYNC", @@ -496,7 +496,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> [{send_message, map()}], {ReplyFun :: function(), Args :: list()}, resource_id() -) -> ok. +) -> {ok, pid()}. do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> #{ pool_name := PoolName, @@ -531,7 +531,8 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> Request, RequestTimeout, {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} - ). + ), + {ok, Worker}. -spec reply_delegator( resource_id(), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 553e5369f..f966f56c6 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -356,7 +356,7 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> connector => InstId, points => Points }), - ok = influxdb:write_async(Client, Points, ReplyFunAndArgs). + {ok, _WorkerPid} = influxdb:write_async(Client, Points, ReplyFunAndArgs). %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans diff --git a/mix.exs b/mix.exs index 9f24cc088..7deab2625 100644 --- a/mix.exs +++ b/mix.exs @@ -131,7 +131,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.6", override: true}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.7", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.7.4"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.2", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},