diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 62ba70b33..45bb5e4dc 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -417,10 +417,14 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> ok. t_on_get_status(Config) -> + t_on_get_status(Config, _Opts = #{}). + +t_on_get_status(Config, Opts) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), ResourceId = resource_id(Config), + FailureStatus = maps:get(failure_status, Opts, disconnected), ?assertMatch({ok, _}, create_bridge(Config)), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -431,7 +435,7 @@ t_on_get_status(Config) -> ), emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ct:sleep(500), - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual({ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId)) end), %% Check that it recovers itself. ?retry( diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl index 81aa729e4..890a3faed 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl @@ -139,6 +139,15 @@ fields(producer) -> ]; fields(consumer) -> [ + %% Note: The minimum deadline pubsub does is 10 s. + {ack_deadline, + mk( + emqx_schema:timeout_duration_s(), + #{ + default => <<"60s">>, + importance => ?IMPORTANCE_HIDDEN + } + )}, {ack_retry_interval, mk( emqx_schema:timeout_duration_ms(), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index f2fa87d38..80283ee73 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -119,6 +119,7 @@ start( ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => ResourceId}), case ehttpc_sup:start_pool(ResourceId, PoolOpts) of {ok, _} -> + ?tp(gcp_pubsub_ehttpc_pool_started, #{pool_name => ResourceId}), {ok, State}; {error, {already_started, _}} -> ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => ResourceId}), @@ -166,7 +167,7 @@ query_sync({prepared_request, PreparedRequest = {_Method, _Path, _Body}}, State) {prepared_request, prepared_request()}, {ReplyFun :: function(), Args :: list()}, state() -) -> {ok, pid()}. +) -> {ok, pid()} | {error, no_pool_worker_available}. query_async( {prepared_request, PreparedRequest = {_Method, _Path, _Body}}, ReplyFunAndArgs, @@ -306,7 +307,7 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}) -> state(), {prepared_request, prepared_request()}, {ReplyFun :: function(), Args :: list()} -) -> {ok, pid()}. +) -> {ok, pid()} | {error, no_pool_worker_available}. do_send_requests_async( State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs ) -> @@ -323,21 +324,27 @@ do_send_requests_async( } ), Request = to_ehttpc_request(State, Method, Path, Body), - Worker = ehttpc_pool:pick_worker(PoolName), - ok = ehttpc:request_async( - Worker, - Method, - Request, - RequestTTL, - {fun ?MODULE:reply_delegator/3, [PoolName, ReplyFunAndArgs]} - ), - {ok, Worker}. + %% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers... + case ehttpc_pool:pick_worker(PoolName) of + false -> + {error, no_pool_worker_available}; + Worker -> + ok = ehttpc:request_async( + Worker, + Method, + Request, + RequestTTL, + {fun ?MODULE:reply_delegator/3, [PoolName, ReplyFunAndArgs]} + ), + {ok, Worker} + end. to_ehttpc_request(State, Method, Path, Body) -> #{jwt_config := JWTConfig} = State, Headers = get_jwt_authorization_header(JWTConfig), case {Method, Body} of {get, <<>>} -> {Path, Headers}; + {delete, <<>>} -> {Path, Headers}; _ -> {Path, Headers, Body} end. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index bb9e75a9c..ddceb4a11 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -24,44 +24,58 @@ ]). -export([get_subscription/1]). --export([reply_delegator/3, pull_async/1, process_pull_response/2, ensure_subscription/1]). +-export([reply_delegator/4, pull_async/1, process_pull_response/2, ensure_subscription/1]). -type subscription_id() :: binary(). -type bridge_name() :: atom() | binary(). -type ack_id() :: binary(). +-type message_id() :: binary(). -type config() :: #{ + ack_deadline := emqx_schema:timeout_duration_s(), ack_retry_interval := emqx_schema:timeout_duration_ms(), client := emqx_bridge_gcp_pubsub_client:state(), ecpool_worker_id => non_neg_integer(), + forget_interval := timer:time(), hookpoint := binary(), instance_id := binary(), mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), project_id := emqx_bridge_gcp_pubsub_client:project_id(), pull_max_messages := non_neg_integer(), + pull_retry_interval := emqx_schema:timeout_duration_ms(), subscription_id => subscription_id(), topic => emqx_bridge_gcp_pubsub_client:topic() }. -type state() :: #{ + ack_deadline := emqx_schema:timeout_duration_s(), ack_retry_interval := emqx_schema:timeout_duration_ms(), ack_timer := undefined | reference(), async_workers := #{pid() => reference()}, client := emqx_bridge_gcp_pubsub_client:state(), ecpool_worker_id := non_neg_integer(), + forget_interval := timer:time(), hookpoint := binary(), instance_id := binary(), - mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), - pending_acks => [ack_id()], + mqtt_config := emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), + pending_acks := #{message_id() => ack_id()}, project_id := emqx_bridge_gcp_pubsub_client:project_id(), pull_max_messages := non_neg_integer(), + pull_retry_interval := emqx_schema:timeout_duration_ms(), pull_timer := undefined | reference(), - subscription_id => subscription_id(), - topic => emqx_bridge_gcp_pubsub_client:topic() + %% In order to avoid re-processing the same message twice due to race conditions + %% between acknlowledging a message and receiving a duplicate pulled message, we need + %% to keep the seen message IDs for a while... + seen_message_ids := sets:set(message_id()), + subscription_id := subscription_id(), + topic := emqx_bridge_gcp_pubsub_client:topic() }. -type decoded_message() :: map(). +%% initialization states +-define(ensure_subscription, ensure_subscription). +-define(patch_subscription, patch_subscription). + -define(HEALTH_CHECK_TIMEOUT, 10_000). --define(OPTVAR_SUB_OK(PID), {?MODULE, PID}). --define(PULL_INTERVAL, 5_000). +-define(OPTVAR_SUB_OK(PID), {?MODULE, subscription_ok, PID}). %%------------------------------------------------------------------------------------------------- %% API used by `reply_delegator' @@ -79,19 +93,23 @@ process_pull_response(WorkerPid, RespBody) -> ensure_subscription(WorkerPid) -> gen_server:cast(WorkerPid, ensure_subscription). --spec reply_delegator(pid(), binary(), {ok, map()} | {error, timeout | term()}) -> ok. -reply_delegator(WorkerPid, InstanceId, Result) -> +-spec reply_delegator(pid(), pull_async, binary(), {ok, map()} | {error, timeout | term()}) -> ok. +reply_delegator(WorkerPid, pull_async = _Action, InstanceId, Result) -> + ?tp(gcp_pubsub_consumer_worker_reply_delegator, #{result => Result}), case Result of {error, timeout} -> ?MODULE:pull_async(WorkerPid); {error, Reason} -> - ?SLOG(warning, #{ - msg => "gcp_pubsub_consumer_worker_pull_error", - instance_id => InstanceId, - reason => Reason - }), + ?tp( + warning, + "gcp_pubsub_consumer_worker_pull_error", + #{ + instance_id => InstanceId, + reason => Reason + } + ), case Reason of - #{status_code := 409} -> + #{status_code := 404} -> %% the subscription was not found; deleted?! ?MODULE:ensure_subscription(WorkerPid); _ -> @@ -113,13 +131,13 @@ get_subscription(WorkerPid) -> %% `ecpool' health check %%------------------------------------------------------------------------------------------------- --spec health_check(pid()) -> boolean(). +-spec health_check(pid()) -> subscription_ok | topic_not_found | timeout. health_check(WorkerPid) -> case optvar:read(?OPTVAR_SUB_OK(WorkerPid), ?HEALTH_CHECK_TIMEOUT) of - {ok, _} -> - true; + {ok, Status} -> + Status; timeout -> - false + timeout end. %%------------------------------------------------------------------------------------------------- @@ -129,30 +147,36 @@ health_check(WorkerPid) -> connect(Opts0) -> Opts = maps:from_list(Opts0), #{ + ack_deadline := AckDeadlineSeconds, ack_retry_interval := AckRetryInterval, bridge_name := BridgeName, client := Client, ecpool_worker_id := WorkerId, + forget_interval := ForgetInterval, hookpoint := Hookpoint, instance_id := InstanceId, project_id := ProjectId, pull_max_messages := PullMaxMessages, + pull_retry_interval := PullRetryInterval, topic_mapping := TopicMapping } = Opts, TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)), Index = 1 + (WorkerId rem map_size(TopicMapping)), {Topic, MQTTConfig} = lists:nth(Index, TopicMappingList), Config = #{ + ack_deadline => AckDeadlineSeconds, ack_retry_interval => AckRetryInterval, %% Note: the `client' value here must be immutable and not changed by the %% bridge during `on_get_status', since we have handed it over to the pull %% workers. client => Client, + forget_interval => ForgetInterval, hookpoint => Hookpoint, instance_id => InstanceId, mqtt_config => MQTTConfig, project_id => ProjectId, pull_max_messages => PullMaxMessages, + pull_retry_interval => PullRetryInterval, topic => Topic, subscription_id => subscription_id(BridgeName, Topic) }, @@ -162,33 +186,55 @@ connect(Opts0) -> %% `gen_server' API %%------------------------------------------------------------------------------------------------- --spec init(config()) -> {ok, state(), {continue, ensure_subscription}}. +-spec init(config()) -> {ok, state(), {continue, ?ensure_subscription}}. init(Config) -> process_flag(trap_exit, true), State = Config#{ ack_timer => undefined, async_workers => #{}, - pending_acks => [], - pull_timer => undefined + pending_acks => #{}, + pull_timer => undefined, + seen_message_ids => sets:new([{version, 2}]) }, - {ok, State, {continue, ensure_subscription}}. + ?tp(gcp_pubsub_consumer_worker_init, #{topic => maps:get(topic, State)}), + {ok, State, {continue, ?ensure_subscription}}. -handle_continue(ensure_subscription, State0) -> +handle_continue(?ensure_subscription, State0) -> case ensure_subscription_exists(State0) of - ok -> + already_exists -> + {noreply, State0, {continue, ?patch_subscription}}; + continue -> #{instance_id := InstanceId} = State0, + ?MODULE:pull_async(self()), + optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok), ?tp( debug, "gcp_pubsub_consumer_worker_subscription_ready", #{instance_id => InstanceId} ), + {noreply, State0}; + retry -> + {noreply, State0, {continue, ?ensure_subscription}}; + not_found -> + %% there's nothing much to do if the topic suddenly doesn't exist anymore. + {stop, {error, topic_not_found}, State0} + end; +handle_continue(?patch_subscription, State0) -> + ?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}), + case patch_subscription(State0) of + ok -> + #{instance_id := InstanceId} = State0, ?MODULE:pull_async(self()), optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok), + ?tp( + debug, + "gcp_pubsub_consumer_worker_subscription_ready", + #{instance_id => InstanceId} + ), {noreply, State0}; error -> - %% FIXME: add delay if topic does not exist?! %% retry - {noreply, State0, {continue, ensure_subscription}} + {noreply, State0, {continue, ?patch_subscription}} end. handle_call(get_subscription, _From, State0) -> @@ -201,21 +247,20 @@ handle_cast(pull_async, State0) -> State = do_pull_async(State0), {noreply, State}; handle_cast({process_pull_response, RespBody}, State0) -> + ?tp(gcp_pubsub_consumer_worker_pull_response_received, #{}), State = do_process_pull_response(State0, RespBody), {noreply, State}; handle_cast(ensure_subscription, State0) -> - {noreply, State0, {continue, ensure_subscription}}; + {noreply, State0, {continue, ?ensure_subscription}}; handle_cast(_Request, State0) -> {noreply, State0}. handle_info({timeout, TRef, ack}, State0 = #{ack_timer := TRef}) -> - State1 = acknowledge(State0), - State = ensure_ack_timer(State1), + State = acknowledge(State0), {noreply, State}; handle_info({timeout, TRef, pull}, State0 = #{pull_timer := TRef}) -> State1 = State0#{pull_timer := undefined}, - State2 = do_pull_async(State1), - State = ensure_pull_timer(State2), + State = do_pull_async(State1), {noreply, State}; handle_info( {'DOWN', _Ref, process, AsyncWorkerPid, _Reason}, State0 = #{async_workers := Workers0} @@ -225,6 +270,13 @@ handle_info( Workers = maps:remove(AsyncWorkerPid, Workers0), State1 = State0#{async_workers := Workers}, State = do_pull_async(State1), + ?tp(gcp_pubsub_consumer_worker_handled_async_worker_down, #{async_worker_pid => AsyncWorkerPid}), + {noreply, State}; +handle_info({forget_message_ids, MsgIds}, State0) -> + State = maps:update_with( + seen_message_ids, fun(Seen) -> sets:subtract(Seen, MsgIds) end, State0 + ), + ?tp(gcp_pubsub_consumer_worker_message_ids_forgotten, #{message_ids => MsgIds}), {noreply, State}; handle_info(Msg, State0) -> #{ @@ -239,8 +291,18 @@ handle_info(Msg, State0) -> }), {noreply, State0}. +terminate({error, topic_not_found} = _Reason, State) -> + #{ + instance_id := InstanceId, + topic := _Topic + } = State, + optvar:unset(?OPTVAR_SUB_OK(self())), + emqx_bridge_gcp_pubsub_impl_consumer:mark_topic_as_nonexistent(InstanceId), + ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => _Reason, topic => _Topic}), + ok; terminate(_Reason, _State) -> optvar:unset(?OPTVAR_SUB_OK(self())), + ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => _Reason, topic => maps:get(topic, _State)}), ok. %%------------------------------------------------------------------------------------------------- @@ -252,21 +314,24 @@ start_link(Config) -> gen_server:start_link(?MODULE, Config, []). -spec ensure_ack_timer(state()) -> state(). -ensure_ack_timer(State = #{pending_acks := []}) -> - State; -ensure_ack_timer(State = #{ack_timer := TRef}) when is_reference(TRef) -> - State; -ensure_ack_timer(State = #{ack_retry_interval := AckRetryInterval}) -> - State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)}. +ensure_ack_timer(State = #{ack_timer := TRef, pending_acks := PendingAcks}) -> + case {map_size(PendingAcks) =:= 0, is_reference(TRef)} of + {false, false} -> + #{ack_retry_interval := AckRetryInterval} = State, + State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)}; + {_, _} -> + State + end. -spec ensure_pull_timer(state()) -> state(). ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) -> State; -ensure_pull_timer(State) -> - State#{pull_timer := emqx_utils:start_timer(?PULL_INTERVAL, pull)}. +ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) -> + State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}. --spec ensure_subscription_exists(state()) -> ok | error. +-spec ensure_subscription_exists(state()) -> continue | retry | not_found | already_exists. ensure_subscription_exists(State) -> + ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}), #{ client := Client, instance_id := InstanceId, @@ -281,60 +346,122 @@ ensure_subscription_exists(State) -> case Res of {error, #{status_code := 409}} -> %% already exists - ?SLOG(debug, #{ - msg => "gcp_pubsub_consumer_worker_subscription_already_exists", - instance_id => InstanceId, - topic => Topic, - subscription_id => SubscriptionId - }), - Method1 = patch, - Path1 = path(State, create), - Body1 = body(State, patch_subscription), - PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}}, - Res1 = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client), - ?SLOG(debug, #{ - msg => "gcp_pubsub_consumer_worker_subscription_patch", - instance_id => InstanceId, - topic => Topic, - subscription_id => SubscriptionId, - result => Res1 - }), - ok; + ?tp( + debug, + "gcp_pubsub_consumer_worker_subscription_already_exists", + #{ + instance_id => InstanceId, + topic => Topic, + subscription_id => SubscriptionId + } + ), + already_exists; + {error, #{status_code := 404}} -> + %% nonexistent topic + ?tp( + warning, + "gcp_pubsub_consumer_worker_nonexistent_topic", + #{ + instance_id => InstanceId, + topic => Topic + } + ), + not_found; {ok, #{status_code := 200}} -> - ?SLOG(debug, #{ - msg => "gcp_pubsub_consumer_worker_subscription_created", - instance_id => InstanceId, - topic => Topic, - subscription_id => SubscriptionId - }), + ?tp( + debug, + "gcp_pubsub_consumer_worker_subscription_created", + #{ + instance_id => InstanceId, + topic => Topic, + subscription_id => SubscriptionId + } + ), + continue; + {error, Reason} -> + ?tp( + error, + "gcp_pubsub_consumer_worker_subscription_error", + #{ + instance_id => InstanceId, + topic => Topic, + reason => Reason + } + ), + retry + end. + +-spec patch_subscription(state()) -> ok | error. +patch_subscription(State) -> + #{ + client := Client, + instance_id := InstanceId, + subscription_id := SubscriptionId, + topic := Topic + } = State, + Method1 = patch, + Path1 = path(State, create), + Body1 = body(State, patch_subscription), + PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}}, + Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client), + case Res of + {ok, _} -> + ?tp( + debug, + "gcp_pubsub_consumer_worker_subscription_patched", + #{ + instance_id => InstanceId, + topic => Topic, + subscription_id => SubscriptionId, + result => Res + } + ), ok; {error, Reason} -> - ?SLOG(error, #{ - msg => "gcp_pubsub_consumer_worker_subscription_error", - instance_id => InstanceId, - topic => Topic, - reason => Reason - }), + ?tp( + warning, + "gcp_pubsub_consumer_worker_subscription_patch_error", + #{ + instance_id => InstanceId, + topic => Topic, + subscription_id => SubscriptionId, + reason => Reason + } + ), error end. %% We use async requests so that this process will be more responsive to system messages. -do_pull_async(State) -> - #{ - client := Client, - instance_id := InstanceId - } = State, - Method = post, - Path = path(State, pull), - Body = body(State, pull), - PreparedRequest = {prepared_request, {Method, Path, Body}}, - ReplyFunAndArgs = {fun ?MODULE:reply_delegator/3, [self(), InstanceId]}, - {ok, AsyncWorkerPid} = emqx_bridge_gcp_pubsub_client:query_async( - PreparedRequest, - ReplyFunAndArgs, - Client - ), - ensure_async_worker_monitored(State, AsyncWorkerPid). +-spec do_pull_async(state()) -> state(). +do_pull_async(State0) -> + ?tp_span( + gcp_pubsub_consumer_worker_pull_async, + #{topic => maps:get(topic, State0), subscription_id => maps:get(subscription_id, State0)}, + begin + #{ + client := Client, + instance_id := InstanceId + } = State0, + Method = post, + Path = path(State0, pull), + Body = body(State0, pull), + PreparedRequest = {prepared_request, {Method, Path, Body}}, + ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, InstanceId]}, + %% `ehttpc_pool'/`gproc_pool' might return `false' if there are no workers... + Res = emqx_bridge_gcp_pubsub_client:query_async( + PreparedRequest, + ReplyFunAndArgs, + Client + ), + case Res of + {ok, AsyncWorkerPid} -> + State1 = ensure_pull_timer(State0), + ensure_async_worker_monitored(State1, AsyncWorkerPid); + {error, no_pool_worker_available} -> + ensure_pull_timer(State0) + end + end + ). -spec ensure_async_worker_monitored(state(), pid()) -> state(). ensure_async_worker_monitored(State = #{async_workers := Workers0}, AsyncWorkerPid) -> @@ -349,22 +476,58 @@ ensure_async_worker_monitored(State = #{async_workers := Workers0}, AsyncWorkerP -spec do_process_pull_response(state(), binary()) -> state(). do_process_pull_response(State0, RespBody) -> + #{ + pending_acks := PendingAcks, + seen_message_ids := SeenMsgIds + } = State0, Messages = decode_response(RespBody), - AckIds = lists:map(fun(Msg) -> handle_message(State0, Msg) end, Messages), - State1 = maps:update_with(pending_acks, fun(AckIds0) -> AckIds0 ++ AckIds end, State0), + ?tp(gcp_pubsub_consumer_worker_decoded_messages, #{messages => Messages}), + {NewPendingAcks, NewSeenMsgIds} = + lists:foldl( + fun( + Msg = #{ + <<"ackId">> := AckId, + <<"message">> := #{<<"messageId">> := MsgId} + }, + {AccAck, AccSeen} + ) -> + case is_map_key(MsgId, PendingAcks) or sets:is_element(MsgId, SeenMsgIds) of + true -> + ?tp(message_redelivered, #{message => Msg}), + %% even though it was redelivered, pubsub might change the ack + %% id... we should ack this latest value. + {AccAck#{MsgId => AckId}, AccSeen}; + false -> + _ = handle_message(State0, Msg), + {AccAck#{MsgId => AckId}, sets:add_element(MsgId, AccSeen)} + end + end, + {PendingAcks, SeenMsgIds}, + Messages + ), + State1 = State0#{pending_acks := NewPendingAcks, seen_message_ids := NewSeenMsgIds}, State2 = acknowledge(State1), pull_async(self()), - ensure_ack_timer(State2). + State2. -spec acknowledge(state()) -> state(). -acknowledge(State0 = #{pending_acks := []}) -> - State0; -acknowledge(State0) -> +acknowledge(State0 = #{pending_acks := PendingAcks}) -> + case map_size(PendingAcks) =:= 0 of + true -> + State0; + false -> + do_acknowledge(State0) + end. + +do_acknowledge(State0) -> + ?tp(gcp_pubsub_consumer_worker_acknowledge_enter, #{}), State1 = State0#{ack_timer := undefined}, #{ client := Client, - pending_acks := AckIds + forget_interval := ForgetInterval, + pending_acks := PendingAcks } = State1, + AckIds = maps:values(PendingAcks), Method = post, Path = path(State1, ack), Body = body(State1, ack, #{ack_ids => AckIds}), @@ -372,16 +535,27 @@ acknowledge(State0) -> Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client), case Res of {error, Reason} -> - ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", reason => Reason}), - State1; + ?tp( + warning, + "gcp_pubsub_consumer_worker_ack_error", + #{reason => Reason} + ), + ensure_ack_timer(State1); {ok, #{status_code := 200}} -> - ?tp(gcp_pubsub_consumer_worker_acknowledged, #{ack_ids => AckIds}), - State1#{pending_acks := []}; + ?tp(gcp_pubsub_consumer_worker_acknowledged, #{acks => PendingAcks}), + MsgIds = maps:keys(PendingAcks), + forget_message_ids_after(MsgIds, ForgetInterval), + State1#{pending_acks := #{}}; {ok, Details} -> - ?SLOG(warning, #{msg => "gcp_pubsub_consumer_worker_ack_error", details => Details}), - State1 + ?tp( + warning, + "gcp_pubsub_consumer_worker_ack_error", + #{details => Details} + ), + ensure_ack_timer(State1) end. +-spec do_get_subscription(state()) -> {ok, emqx_utils_json:json_term()} | {error, term()}. do_get_subscription(State) -> #{ client := Client @@ -442,12 +616,11 @@ body(State, pull) -> emqx_utils_json:encode(#{<<"maxMessages">> => PullMaxMessages}); body(State, create) -> #{ - ack_retry_interval := AckRetryInterval, + ack_deadline := AckDeadlineSeconds, project_id := ProjectId, topic := PubSubTopic } = State, TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>, - AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second), JSON = #{ <<"topic">> => TopicResource, <<"ackDeadlineSeconds">> => AckDeadlineSeconds @@ -455,14 +628,13 @@ body(State, create) -> emqx_utils_json:encode(JSON); body(State, patch_subscription) -> #{ - ack_retry_interval := AckRetryInterval, + ack_deadline := AckDeadlineSeconds, project_id := ProjectId, topic := PubSubTopic, subscription_id := SubscriptionId } = State, TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>, SubscriptionResource = subscription_resource(ProjectId, SubscriptionId), - AckDeadlineSeconds = 5 + erlang:convert_time_unit(AckRetryInterval, millisecond, second), JSON = #{ <<"subscription">> => #{ @@ -505,50 +677,52 @@ decode_response(RespBody) -> [] end. --spec handle_message(state(), decoded_message()) -> [ack_id()]. +-spec handle_message(state(), decoded_message()) -> ok. handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Message) -> - ?tp( + ?tp_span( debug, "gcp_pubsub_consumer_worker_handle_message", - #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId} - ), - #{ - instance_id := InstanceId, - hookpoint := Hookpoint, - mqtt_config := #{ - payload_template := PayloadTemplate, - qos := MQTTQoS, - mqtt_topic := MQTTTopic - }, - topic := Topic - } = State, - #{ - <<"messageId">> := MessageId, - <<"publishTime">> := PublishTime - } = InnerMsg, - FullMessage0 = #{ - message_id => MessageId, - publish_time => PublishTime, - topic => Topic - }, - FullMessage = - lists:foldl( - fun({FromKey, ToKey}, Acc) -> - add_if_present(FromKey, InnerMsg, ToKey, Acc) - end, - FullMessage0, - [ - {<<"data">>, value}, - {<<"attributes">>, attributes}, - {<<"orderingKey">>, ordering_key} - ] - ), - Payload = render(FullMessage, PayloadTemplate), - MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload), - _ = emqx:publish(MQTTMessage), - emqx:run_hook(Hookpoint, [FullMessage]), - emqx_resource_metrics:received_inc(InstanceId), - AckId. + #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId}, + begin + #{ + instance_id := InstanceId, + hookpoint := Hookpoint, + mqtt_config := #{ + payload_template := PayloadTemplate, + qos := MQTTQoS, + mqtt_topic := MQTTTopic + }, + topic := Topic + } = State, + #{ + <<"messageId">> := MessageId, + <<"publishTime">> := PublishTime + } = InnerMsg, + FullMessage0 = #{ + message_id => MessageId, + publish_time => PublishTime, + topic => Topic + }, + FullMessage = + lists:foldl( + fun({FromKey, ToKey}, Acc) -> + add_if_present(FromKey, InnerMsg, ToKey, Acc) + end, + FullMessage0, + [ + {<<"data">>, value}, + {<<"attributes">>, attributes}, + {<<"orderingKey">>, ordering_key} + ] + ), + Payload = render(FullMessage, PayloadTemplate), + MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload), + _ = emqx:publish(MQTTMessage), + emqx:run_hook(Hookpoint, [FullMessage]), + emqx_resource_metrics:received_inc(InstanceId), + ok + end + ). -spec add_if_present(any(), map(), any(), map()) -> map(). add_if_present(FromKey, Message, ToKey, Map) -> @@ -563,6 +737,11 @@ render(FullMessage, PayloadTemplate) -> Opts = #{return => full_binary}, emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts). +forget_message_ids_after(MsgIds0, Timeout) -> + MsgIds = sets:from_list(MsgIds0, [{version, 2}]), + _ = erlang:send_after(Timeout, self(), {forget_message_ids, MsgIds}), + ok. + to_bin(A) when is_atom(A) -> atom_to_binary(A); to_bin(L) when is_list(L) -> iolist_to_binary(L); to_bin(B) when is_binary(B) -> B. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 88b57be1e..e04794bfb 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -15,6 +15,13 @@ on_get_status/2 ]). +%% health check API +-export([ + mark_topic_as_nonexistent/1, + unset_nonexistent_topic/1, + is_nonexistent_topic/1 +]). + -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -39,6 +46,12 @@ -export_type([mqtt_config/0]). -define(AUTO_RECONNECT_S, 2). +-define(DEFAULT_FORGET_INTERVAL, timer:seconds(60)). +-define(OPTVAR_TOPIC_NOT_FOUND(INSTANCE_ID), {?MODULE, topic_not_found, INSTANCE_ID}). +-define(TOPIC_MESSAGE, + "GCP PubSub topics are invalid. Please check the logs, check if the " + "topics exist in GCP and if the service account has permissions to use them." +). %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API @@ -61,21 +74,45 @@ on_start(InstanceId, Config) -> -spec on_stop(resource_id(), state()) -> ok | {error, term()}. on_stop(InstanceId, _State) -> + ?tp(gcp_pubsub_consumer_stop_enter, #{}), + unset_nonexistent_topic(InstanceId), ok = stop_consumers(InstanceId), emqx_bridge_gcp_pubsub_client:stop(InstanceId). --spec on_get_status(resource_id(), state()) -> connected | disconnected. -on_get_status(InstanceId, _State) -> - %% Note: do *not* alter the `client' value here. It must be immutable, since - %% we have handed it over to the pull workers. - case - emqx_resource_pool:health_check_workers( - InstanceId, - fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1 - ) - of - true -> connected; - false -> connecting +-spec on_get_status(resource_id(), state()) -> connected | connecting | {disconnected, state(), _}. +on_get_status(InstanceId, State) -> + %% We need to check this flag separately because the workers might be gone when we + %% check them. + case is_nonexistent_topic(InstanceId) of + true -> + {disconnected, State, {unhealthy_target, ?TOPIC_MESSAGE}}; + false -> + #{client := Client} = State, + check_workers(InstanceId, Client) + end. + +%%------------------------------------------------------------------------------------------------- +%% Health check API (signalled by consumer worker) +%%------------------------------------------------------------------------------------------------- + +-spec mark_topic_as_nonexistent(resource_id()) -> ok. +mark_topic_as_nonexistent(InstanceId) -> + optvar:set(?OPTVAR_TOPIC_NOT_FOUND(InstanceId), true), + ok. + +-spec unset_nonexistent_topic(resource_id()) -> ok. +unset_nonexistent_topic(InstanceId) -> + optvar:unset(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)), + ?tp(gcp_pubsub_consumer_unset_nonexistent_topic, #{}), + ok. + +-spec is_nonexistent_topic(resource_id()) -> boolean(). +is_nonexistent_topic(InstanceId) -> + case optvar:peek(?OPTVAR_TOPIC_NOT_FOUND(InstanceId)) of + {ok, true} -> + true; + _ -> + false end. %%------------------------------------------------------------------------------------------------- @@ -87,6 +124,7 @@ start_consumers(InstanceId, Client, Config) -> bridge_name := BridgeName, consumer := ConsumerConfig0, hookpoint := Hookpoint, + resource_opts := #{request_ttl := RequestTTL}, service_account_json := #{project_id := ProjectId} } = Config, ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), @@ -97,22 +135,27 @@ start_consumers(InstanceId, Client, Config) -> auto_reconnect => ?AUTO_RECONNECT_S, bridge_name => BridgeName, client => Client, + forget_interval => forget_interval(RequestTTL), hookpoint => Hookpoint, instance_id => InstanceId, pool_size => PoolSize, - project_id => ProjectId + project_id => ProjectId, + pull_retry_interval => RequestTTL }, ConsumerOpts = maps:to_list(ConsumerConfig), - %% FIXME: mark as unhealthy if topics do not exist! case validate_pubsub_topics(TopicMapping, Client) of ok -> ok; - error -> + {error, not_found} -> _ = emqx_bridge_gcp_pubsub_client:stop(InstanceId), throw( - "GCP PubSub topics are invalid. Please check the logs, check if the " - "topic exists in GCP and if the service account has permissions to use them." - ) + {unhealthy_target, ?TOPIC_MESSAGE} + ); + {error, _} -> + %% connection might be down; we'll have to check topic existence during health + %% check, or the workers will kill themselves when they realized there's no + %% topic when upserting their subscription. + ok end, case emqx_resource_pool:start(InstanceId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts) @@ -170,8 +213,8 @@ do_validate_pubsub_topics(Client, [Topic | Rest]) -> case check_for_topic_existence(Topic, Client) of ok -> do_validate_pubsub_topics(Client, Rest); - {error, _} -> - error + {error, _} = Err -> + Err end; do_validate_pubsub_topics(_Client, []) -> %% we already validate that the mapping is not empty in the config schema. @@ -184,9 +227,38 @@ check_for_topic_existence(Topic, Client) -> ok; {error, #{status_code := 404}} -> {error, not_found}; - {error, Details} -> - ?tp(warning, "gcp_pubsub_consumer_check_topic_error", Details), - {error, Details} + {error, Reason} -> + ?tp(warning, "gcp_pubsub_consumer_check_topic_error", #{reason => Reason}), + {error, Reason} + end. + +-spec get_client_status(emqx_bridge_gcp_pubsub_client:state()) -> connected | connecting. +get_client_status(Client) -> + case emqx_bridge_gcp_pubsub_client:get_status(Client) of + disconnected -> connecting; + connected -> connected + end. + +-spec check_workers(resource_id(), emqx_bridge_gcp_pubsub_client:state()) -> connected | connecting. +check_workers(InstanceId, Client) -> + case + emqx_resource_pool:health_check_workers( + InstanceId, + fun emqx_bridge_gcp_pubsub_consumer_worker:health_check/1, + emqx_resource_pool:health_check_timeout(), + #{return_values => true} + ) + of + {ok, Values} -> + AllOk = lists:all(fun(S) -> S =:= subscription_ok end, Values), + case AllOk of + true -> + get_client_status(Client); + false -> + connecting + end; + {error, _} -> + connecting end. log_when_error(Fun, Log) -> @@ -199,3 +271,6 @@ log_when_error(Fun, Log) -> reason => E }) end. + +forget_interval(infinity) -> ?DEFAULT_FORGET_INTERVAL; +forget_interval(Timeout) -> 2 * Timeout. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index d2469160b..d2ca45aba 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -103,7 +103,7 @@ on_query(ResourceId, {send_message, Selected}, State) -> {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() -) -> {ok, pid()}. +) -> {ok, pid()} | {error, no_pool_worker_available}. on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) -> Requests = [{send_message, Selected}], ?TRACE( @@ -134,7 +134,7 @@ on_batch_query(ResourceId, Requests, State) -> [{send_message, map()}], {ReplyFun :: function(), Args :: list()}, state() -) -> {ok, pid()}. +) -> {ok, pid()} | {error, no_pool_worker_available}. on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> ?TRACE( "QUERY_ASYNC", @@ -177,7 +177,7 @@ do_send_requests_sync(State, Requests, InstanceId) -> state(), [{send_message, map()}], {ReplyFun :: function(), Args :: list()} -) -> {ok, pid()}. +) -> {ok, pid()} | {error, no_pool_worker_available}. do_send_requests_async(State, Requests, ReplyFunAndArgs0) -> #{client := Client} = State, Payloads = diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 849ed5325..819d14309 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -10,8 +10,6 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include_lib("jose/include/jose_jwt.hrl"). --include_lib("jose/include/jose_jws.hrl"). -define(BRIDGE_TYPE, gcp_pubsub_consumer). -define(BRIDGE_TYPE_BIN, <<"gcp_pubsub_consumer">>). @@ -33,9 +31,9 @@ init_per_suite(Config) -> ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), ProxyName = "gcp_emulator", + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), case emqx_common_test_helpers:is_tcp_server_available(GCPEmulatorHost, GCPEmulatorPort) of true -> - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_connector_test_helpers:start_apps([ emqx_resource, emqx_bridge, emqx_rule_engine @@ -44,7 +42,7 @@ init_per_suite(Config) -> emqx_mgmt_api_test_util:init_suite(), HostPort = GCPEmulatorHost ++ ":" ++ GCPEmulatorPortStr, true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort), - Client = start_control_connector(), + Client = start_control_client(), [ {proxy_name, ProxyName}, {proxy_host, ProxyHost}, @@ -65,7 +63,7 @@ init_per_suite(Config) -> end_per_suite(Config) -> Client = ?config(client, Config), - stop_control_connector(Client), + stop_control_client(Client), emqx_mgmt_api_test_util:end_suite(), ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), @@ -73,6 +71,32 @@ end_per_suite(Config) -> os:unsetenv("PUBSUB_EMULATOR_HOST"), ok. +init_per_testcase(TestCase, Config0) when + TestCase =:= t_multiple_topic_mappings; + TestCase =:= t_topic_deleted_while_consumer_is_running +-> + UniqueNum = integer_to_binary(erlang:unique_integer()), + TopicMapping = [ + #{ + pubsub_topic => <<"pubsub-1-", UniqueNum/binary>>, + mqtt_topic => <<"mqtt/topic/1/", UniqueNum/binary>>, + qos => 2, + payload_template => <<"${.}">> + }, + #{ + pubsub_topic => <<"pubsub-2-", UniqueNum/binary>>, + mqtt_topic => <<"mqtt/topic/2/", UniqueNum/binary>>, + qos => 1, + payload_template => to_payload_template( + #{ + <<"v">> => <<"${.value}">>, + <<"a">> => <<"${.attributes.key}">> + } + ) + } + ], + Config = [{topic_mapping, TopicMapping} | Config0], + common_init_per_testcase(TestCase, Config); init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config). @@ -83,7 +107,7 @@ common_init_per_testcase(TestCase, Config0) -> ConsumerTopic = << (atom_to_binary(TestCase))/binary, - (integer_to_binary(erlang:unique_integer()))/binary + (emqx_guid:to_hexstr(emqx_guid:gen()))/binary >>, UniqueNum = integer_to_binary(erlang:unique_integer()), MQTTTopic = proplists:get_value(mqtt_topic, Config0, <<"mqtt/topic/", UniqueNum/binary>>), @@ -111,6 +135,9 @@ common_init_per_testcase(TestCase, Config0) -> ensure_topics(Config), ok = snabbkaffe:start_trace(), [ + {bridge_type, ?BRIDGE_TYPE}, + {bridge_name, Name}, + {bridge_config, ConsumerConfig}, {consumer_name, Name}, {consumer_config_string, ConfigString}, {consumer_config, ConsumerConfig} @@ -145,7 +172,6 @@ consumer_config(TestCase, Config) -> ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON), MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>), MQTTQoS = proplists:get_value(mqtt_qos, Config, 0), - ConsumerWorkersPerTopic = proplists:get_value(consumer_workers_per_topic, Config, 1), DefaultTopicMapping = [ #{ pubsub_topic => ConsumerTopic, @@ -162,27 +188,27 @@ consumer_config(TestCase, Config) -> " enable = true\n" %% gcp pubsub emulator doesn't do pipelining very well... " pipelining = 1\n" - " connect_timeout = \"15s\"\n" + " connect_timeout = \"5s\"\n" " service_account_json = ~s\n" " consumer {\n" - " ack_retry_interval = \"5s\"\n" + " ack_deadline = \"60s\"\n" + " ack_retry_interval = \"1s\"\n" " pull_max_messages = 10\n" - " consumer_workers_per_topic = ~b\n" + " consumer_workers_per_topic = 1\n" %% topic mapping "~s" " }\n" " max_retries = 2\n" - " pipelining = 100\n" " pool_size = 8\n" " resource_opts {\n" " health_check_interval = \"1s\"\n" - " request_ttl = \"15s\"\n" + %% to fail and retry pulling faster + " request_ttl = \"5s\"\n" " }\n" "}\n", [ Name, ServiceAccountJSONStr, - ConsumerWorkersPerTopic, TopicMappingStr ] ), @@ -193,6 +219,7 @@ parse_and_check(ConfigString, Name) -> TypeBin = ?BRIDGE_TYPE_BIN, hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + ct:pal("config:\n ~p", [Config]), Config. topic_mapping(TopicMapping0) -> @@ -246,7 +273,7 @@ ensure_topic(Config, Topic) -> end, ok. -start_control_connector() -> +start_control_client() -> RawServiceAccount = emqx_bridge_gcp_pubsub_utils:generate_service_account_json(), ServiceAccount = emqx_utils_maps:unsafe_atom_key_map(RawServiceAccount), ConnectorConfig = @@ -261,7 +288,7 @@ start_control_connector() -> {ok, Client} = emqx_bridge_gcp_pubsub_client:start(PoolName, ConnectorConfig), Client. -stop_control_connector(Client) -> +stop_control_client(Client) -> ok = emqx_bridge_gcp_pubsub_client:stop(Client), ok. @@ -291,6 +318,30 @@ pubsub_publish(Config, Topic, Messages0) -> ), ok. +delete_topic(Config, Topic) -> + Client = ?config(client, Config), + ProjectId = ?config(project_id, Config), + Method = delete, + Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, + Body = <<>>, + {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( + {prepared_request, {Method, Path, Body}}, + Client + ), + ok. + +delete_subscription(Config, SubscriptionId) -> + Client = ?config(client, Config), + ProjectId = ?config(project_id, Config), + Method = delete, + Path = <<"/v1/projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>, + Body = <<>>, + {ok, _} = emqx_bridge_gcp_pubsub_client:query_sync( + {prepared_request, {Method, Path, Body}}, + Client + ), + ok. + create_bridge(Config) -> create_bridge(Config, _Overrides = #{}). @@ -301,6 +352,11 @@ create_bridge(Config, Overrides) -> BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), emqx_bridge:create(Type, Name, BridgeConfig). +remove_bridge(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(consumer_name, Config), + emqx_bridge:remove(Type, Name). + create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). @@ -359,6 +415,11 @@ resource_id(Config) -> Name = ?config(consumer_name, Config), emqx_bridge_resource:resource_id(Type, Name). +bridge_id(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(consumer_name, Config), + emqx_bridge_resource:bridge_id(Type, Name). + receive_published() -> receive_published(#{}). @@ -436,28 +497,261 @@ assert_non_received_metrics(BridgeName) -> ?assert(lists:all(fun(V) -> V == 0 end, maps:values(Gauges)), #{metrics => Metrics}), ok. +to_payload_template(Map) -> + PayloadTemplate0 = emqx_utils_json:encode(Map), + PayloadTemplate1 = io_lib:format("~p", [binary_to_list(PayloadTemplate0)]), + string:strip(lists:flatten(PayloadTemplate1), both, $"). + +wait_acked(Opts) -> + N = maps:get(n, Opts), + Timeout = maps:get(timeout, Opts, 30_000), + %% no need to check return value; we check the property in + %% the check phase. this is just to give it a chance to do + %% so and avoid flakiness. should be fast. + snabbkaffe:block_until( + ?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}), + Timeout + ), + ok. + +wait_forgotten() -> + wait_forgotten(_Opts = #{}). + +wait_forgotten(Opts0) -> + Timeout = maps:get(timeout, Opts0, 15_000), + %% no need to check return value; we check the property in + %% the check phase. this is just to give it a chance to do + %% so and avoid flakiness. + ?block_until( + #{?snk_kind := gcp_pubsub_consumer_worker_message_ids_forgotten}, + Timeout + ), + ok. + +get_pull_worker_pids(Config) -> + ResourceId = resource_id(Config), + Pids = + [ + PullWorkerPid + || {_WorkerName, PoolWorkerPid} <- ecpool:workers(ResourceId), + {ok, PullWorkerPid} <- [ecpool_worker:client(PoolWorkerPid)] + ], + %% assert + [_ | _] = Pids, + Pids. + +get_async_worker_pids(Config) -> + ResourceId = resource_id(Config), + Pids = + [ + AsyncWorkerPid + || {_WorkerName, AsyncWorkerPid} <- gproc_pool:active_workers(ehttpc:name(ResourceId)) + ], + %% assert + [_ | _] = Pids, + Pids. + +projection_optional_span(Trace) -> + [ + case maps:get(?snk_span, Evt, undefined) of + undefined -> + K; + start -> + {K, start}; + {complete, _} -> + {K, complete} + end + || #{?snk_kind := K} = Evt <- Trace + ]. + +cluster(Config) -> + PrivDataDir = ?config(priv_dir, Config), + Cluster = emqx_common_test_helpers:emqx_cluster( + [core, core], + [ + {apps, [emqx_conf, emqx_rule_engine, emqx_bridge]}, + {listener_ports, []}, + {peer_mod, slave}, + {priv_data_dir, PrivDataDir}, + {load_schema, true}, + {start_autocluster, true}, + {schema_mod, emqx_enterprise_schema}, + {env_handler, fun + (emqx) -> + application:set_env(emqx, boot_modules, [broker, router]), + ok; + (emqx_conf) -> + ok; + (_) -> + ok + end} + ] + ), + ct:pal("cluster: ~p", [Cluster]), + Cluster. + +start_cluster(Cluster) -> + Nodes = lists:map( + fun({Name, Opts}) -> + ct:pal("starting ~p", [Name]), + emqx_common_test_helpers:start_slave(Name, Opts) + end, + Cluster + ), + on_exit(fun() -> + emqx_utils:pmap( + fun(N) -> + ct:pal("stopping ~p", [N]), + emqx_common_test_helpers:stop_slave(N) + end, + Nodes + ) + end), + Nodes. + +wait_for_cluster_rpc(Node) -> + %% need to wait until the config handler is ready after + %% restarting during the cluster join. + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler])) + ). + +setup_and_start_listeners(Node, NodeOpts) -> + erpc:call( + Node, + fun() -> + lists:foreach( + fun(Type) -> + Port = emqx_common_test_helpers:listener_port(NodeOpts, Type), + ok = emqx_config:put( + [listeners, Type, default, bind], + {{127, 0, 0, 1}, Port} + ), + ok = emqx_config:put_raw( + [listeners, Type, default, bind], + iolist_to_binary([<<"127.0.0.1:">>, integer_to_binary(Port)]) + ), + ok + end, + [tcp, ssl, ws, wss] + ), + ok = emqx_listeners:start(), + ok + end + ). + %%------------------------------------------------------------------------------ %% Trace properties %%------------------------------------------------------------------------------ +prop_pulled_only_once() -> + {"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}. prop_pulled_only_once(Trace) -> - PulledIds = ?projection( - message_id, ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace) - ), + PulledIds = + [ + MsgId + || #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace), + #{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs + ], NumPulled = length(PulledIds), - UniqueNumPulled = sets:size(sets:from_list(PulledIds, [{version, 2}])), - ?assertEqual(UniqueNumPulled, NumPulled), + UniquePulledIds = sets:from_list(PulledIds, [{version, 2}]), + UniqueNumPulled = sets:size(UniquePulledIds), + ?assertEqual(UniqueNumPulled, NumPulled, #{pulled_ids => PulledIds}), ok. +prop_handled_only_once() -> + {"all pulled message are processed only once", fun ?MODULE:prop_handled_only_once/1}. +prop_handled_only_once(Trace) -> + HandledIds = + [ + MsgId + || #{?snk_span := start, message_id := MsgId} <- + ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace) + ], + UniqueHandledIds = lists:usort(HandledIds), + NumHandled = length(HandledIds), + NumUniqueHandled = length(UniqueHandledIds), + ?assertEqual(NumHandled, NumUniqueHandled, #{handled_ids => HandledIds}), + ok. + +prop_all_pulled_are_acked() -> + {"all pulled msg ids are acked", fun ?MODULE:prop_all_pulled_are_acked/1}. prop_all_pulled_are_acked(Trace) -> - PulledAckIds = ?projection( - ack_id, ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace) - ), - AckedIds0 = ?projection(ack_ids, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)), - AckedIds = lists:flatten(AckedIds0), + PulledMsgIds = + [ + MsgId + || #{messages := Msgs} <- ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace), + #{<<"message">> := #{<<"messageId">> := MsgId}} <- Msgs + ], + AckedMsgIds0 = ?projection(acks, ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace)), + AckedMsgIds1 = [ + MsgId + || PendingAcks <- AckedMsgIds0, {MsgId, _AckId} <- maps:to_list(PendingAcks) + ], + AckedMsgIds = sets:from_list(AckedMsgIds1, [{version, 2}]), ?assertEqual( - sets:from_list(PulledAckIds, [{version, 2}]), - sets:from_list(AckedIds, [{version, 2}]) + sets:from_list(PulledMsgIds, [{version, 2}]), + AckedMsgIds, + #{ + decoded_msgs => ?of_kind(gcp_pubsub_consumer_worker_decoded_messages, Trace), + acknlowledged => ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace) + } + ), + ok. + +prop_client_stopped() -> + {"client is stopped", fun ?MODULE:prop_client_stopped/1}. +prop_client_stopped(Trace) -> + ?assert( + ?strict_causality( + #{?snk_kind := gcp_pubsub_ehttpc_pool_started, pool_name := _P1}, + #{?snk_kind := gcp_pubsub_stop, resource_id := _P2}, + _P1 =:= _P2, + Trace + ) + ), + ok. + +prop_workers_stopped(Topic) -> + {"workers are stopped", fun(Trace) -> ?MODULE:prop_workers_stopped(Trace, Topic) end}. +prop_workers_stopped(Trace0, Topic) -> + %% no assert because they might not start in the first place + Trace = [Event || Event = #{topic := T} <- Trace0, T =:= Topic], + ?strict_causality( + #{?snk_kind := gcp_pubsub_consumer_worker_init, ?snk_meta := #{pid := _P1}}, + #{?snk_kind := gcp_pubsub_consumer_worker_terminate, ?snk_meta := #{pid := _P2}}, + _P1 =:= _P2, + Trace + ), + ok. + +prop_acked_ids_eventually_forgotten() -> + {"all acked message ids are eventually forgotten", + fun ?MODULE:prop_acked_ids_eventually_forgotten/1}. +prop_acked_ids_eventually_forgotten(Trace) -> + AckedMsgIds0 = + [ + MsgId + || #{acks := PendingAcks} <- ?of_kind(gcp_pubsub_consumer_worker_acknowledged, Trace), + {MsgId, _AckId} <- maps:to_list(PendingAcks) + ], + AckedMsgIds = sets:from_list(AckedMsgIds0, [{version, 2}]), + ForgottenMsgIds = sets:union( + ?projection( + message_ids, + ?of_kind(gcp_pubsub_consumer_worker_message_ids_forgotten, Trace) + ) + ), + EmptySet = sets:new([{version, 2}]), + ?assertEqual( + EmptySet, + sets:subtract(AckedMsgIds, ForgottenMsgIds), + #{ + forgotten => ForgottenMsgIds, + acked => AckedMsgIds + } ), ok. @@ -465,6 +759,34 @@ prop_all_pulled_are_acked(Trace) -> %% Testcases %%------------------------------------------------------------------------------ +t_start_stop(Config) -> + ResourceId = resource_id(Config), + [#{pubsub_topic := PubSubTopic} | _] = ?config(topic_mapping, Config), + ?check_trace( + begin + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}), + 40_000 + ), + ?assertMatch({ok, _}, create_bridge(Config)), + {ok, _} = snabbkaffe:receive_events(SRef0), + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + + ?assertMatch({ok, _}, remove_bridge(Config)), + ok + end, + [ + prop_client_stopped(), + prop_workers_stopped(PubSubTopic), + fun(Trace) -> + ?assertMatch([_], ?of_kind(gcp_pubsub_consumer_unset_nonexistent_topic, Trace)), + ok + end + ] + ), + ok. + t_consume_ok(Config) -> BridgeName = ?config(consumer_name, Config), TopicMapping = ?config(topic_mapping, Config), @@ -516,10 +838,7 @@ t_consume_ok(Config) -> ] when is_binary(MsgId) andalso is_binary(PubTime), Published0 ), - %% no need to check return value; we check the property in - %% the check phase. this is just to give it a chance to do - %% so and avoid flakiness. should be fast. - ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, 1_000), + wait_acked(#{n => 1}), ?retry( _Interval = 200, _NAttempts = 20, @@ -573,7 +892,9 @@ t_consume_ok(Config) -> %% the check phase. this is just to give it a chance to do %% so and avoid flakiness. should be fast. ?block_until( - #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged, ack_ids := [_, _]}, 1_000 + #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged, acks := Acks} when + map_size(Acks) =:= 2, + 5_000 ), ?retry( _Interval = 200, @@ -593,12 +914,19 @@ t_consume_ok(Config) -> %% ?assertEqual(AtomsBefore, AtomsAfter), assert_non_received_metrics(BridgeName), + ?block_until( + #{?snk_kind := gcp_pubsub_consumer_worker_message_ids_forgotten, message_ids := Ids} when + map_size(Ids) =:= 2, + 30_000 + ), ok end, [ - {"all pulled ack ids are acked", fun ?MODULE:prop_all_pulled_are_acked/1}, - {"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1} + prop_all_pulled_are_acked(), + prop_pulled_only_once(), + prop_handled_only_once(), + prop_acked_ids_eventually_forgotten() ] ), ok. @@ -668,22 +996,1048 @@ t_bridge_rule_action_source(Config) -> #{payload => Payload0} end, - [{"all pulled message ids are unique", fun ?MODULE:prop_pulled_only_once/1}] + [ + prop_pulled_only_once(), + prop_handled_only_once() + ] ), ok. -%% TODO TEST: -%% * multi-topic mapping -%% * get status -%% * 2+ pull workers do not duplicate delivered messages -%% * inexistent topic -%% * connection cut then restored -%% * pull worker death -%% * async worker death mid-pull -%% * ensure subscription creation error -%% * cluster subscription -%% * connection down during pull -%% * connection down during ack -%% * topic deleted while consumer is running -%% * subscription deleted while consumer is running -%% * ensure client is terminated when bridge stops +t_on_get_status(Config) -> + emqx_bridge_testlib:t_on_get_status(Config, #{failure_status => connecting}), + ok. + +t_create_via_http_api(_Config) -> + ct:comment("FIXME: implement after API specs are un-hidden in e5.2.0..."), + ok. + +t_multiple_topic_mappings(Config) -> + BridgeName = ?config(consumer_name, Config), + TopicMapping = ?config(topic_mapping, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + start_and_subscribe_mqtt(Config), + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, + 40_000 + ) + ), + [ + #{ + pubsub_topic := Topic0, + mqtt_topic := MQTTTopic0, + qos := QoS0 + }, + #{ + pubsub_topic := Topic1, + mqtt_topic := MQTTTopic1, + qos := QoS1 + } + ] = TopicMapping, + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + Messages = [ + #{ + <<"data">> => Payload, + <<"attributes">> => Attributes = #{<<"key">> => <<"value">>}, + <<"orderingKey">> => <<"some_ordering_key">> + } + ], + pubsub_publish(Config, Topic0, Messages), + pubsub_publish(Config, Topic1, Messages), + {ok, Published0} = receive_published(#{n => 2}), + Published = + lists:sort( + fun(#{topic := TA}, #{topic := TB}) -> + TA =< TB + end, + Published0 + ), + ?assertMatch( + [ + #{ + qos := QoS0, + topic := MQTTTopic0, + payload := + #{ + <<"attributes">> := Attributes, + <<"message_id">> := _, + <<"ordering_key">> := <<"some_ordering_key">>, + <<"publish_time">> := _, + <<"topic">> := _Topic, + <<"value">> := Payload + } + }, + #{ + qos := QoS1, + topic := MQTTTopic1, + payload := #{ + <<"v">> := Payload, + <<"a">> := <<"value">> + } + } + ], + Published + ), + wait_acked(#{n => 2}), + ?retry( + _Interval = 200, + _NAttempts = 20, + ?assertEqual(2, emqx_resource_metrics:received_get(ResourceId)) + ), + + assert_non_received_metrics(BridgeName), + + ok + end, + [ + prop_all_pulled_are_acked(), + prop_pulled_only_once(), + prop_handled_only_once() + ] + ), + ok. + +%% 2+ pull workers do not duplicate delivered messages +t_multiple_pull_workers(Config) -> + ct:timetrap({seconds, 120}), + BridgeName = ?config(consumer_name, Config), + TopicMapping = ?config(topic_mapping, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + NConsumers = 3, + start_and_subscribe_mqtt(Config), + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}), + NConsumers, + 40_000 + ), + {ok, _} = create_bridge( + Config, + #{ + <<"consumer">> => #{ + %% reduce flakiness + <<"ack_deadline">> => <<"10m">>, + <<"consumer_workers_per_topic">> => NConsumers + } + } + ), + {ok, _} = snabbkaffe:receive_events(SRef0), + [#{pubsub_topic := Topic}] = TopicMapping, + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + Messages = [#{<<"data">> => Payload}], + pubsub_publish(Config, Topic, Messages), + {ok, Published} = receive_published(), + ?assertMatch( + [#{payload := #{<<"value">> := Payload}}], + Published + ), + ?retry( + _Interval = 200, + _NAttempts = 20, + ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId)) + ), + + assert_non_received_metrics(BridgeName), + + wait_acked(#{n => 1, timeout => 90_000}), + + ok + end, + [ + prop_all_pulled_are_acked(), + prop_pulled_only_once(), + prop_handled_only_once(), + {"message is processed only once", fun(Trace) -> + ?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})), + ?assertMatch( + [#{?snk_span := start}, #{?snk_span := {complete, _}}], + ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace) + ), + ok + end} + ] + ), + ok. + +t_nonexistent_topic(Config) -> + BridgeName = ?config(bridge_name, Config), + [Mapping0] = ?config(topic_mapping, Config), + ResourceId = resource_id(Config), + PubSubTopic = <<"nonexistent-", (emqx_guid:to_hexstr(emqx_guid:gen()))/binary>>, + TopicMapping0 = [Mapping0#{pubsub_topic := PubSubTopic}], + TopicMapping = emqx_utils_maps:binary_key_map(TopicMapping0), + ?check_trace( + begin + {ok, _} = + create_bridge( + Config, + #{ + <<"consumer">> => + #{<<"topic_mapping">> => TopicMapping} + } + ), + ?assertMatch( + {ok, disconnected}, + emqx_resource_manager:health_check(ResourceId) + ), + ?assertMatch( + {ok, _Group, #{error := "GCP PubSub topics are invalid" ++ _}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + %% now create the topic and restart the bridge + ensure_topic(Config, PubSubTopic), + ?assertMatch( + ok, + emqx_bridge_resource:restart(?BRIDGE_TYPE, BridgeName) + ), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ?assertMatch( + {ok, _Group, #{error := undefined}}, + emqx_resource_manager:lookup_cached(ResourceId) + ), + ok + end, + [ + fun(Trace) -> + %% client is stopped after first failure + ?assertMatch([_], ?of_kind(gcp_pubsub_stop, Trace)), + ok + end + ] + ), + ok. + +t_topic_deleted_while_consumer_is_running(Config) -> + TopicMapping = [#{pubsub_topic := PubSubTopic} | _] = ?config(topic_mapping, Config), + NTopics = length(TopicMapping), + ResourceId = resource_id(Config), + ?check_trace( + begin + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}), + NTopics, + 40_000 + ), + {ok, _} = create_bridge(Config), + {ok, _} = snabbkaffe:receive_events(SRef0), + + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + + %% curiously, gcp pubsub doesn't seem to return any errors from the + %% subscription if the topic is deleted while the subscription still exists... + {ok, SRef1} = + snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := gcp_pubsub_consumer_worker_pull_async, + ?snk_span := start + }), + 2, + 40_000 + ), + delete_topic(Config, PubSubTopic), + {ok, _} = snabbkaffe:receive_events(SRef1), + + ok + end, + [] + ), + ok. + +t_connection_down_before_starting(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := gcp_pubsub_consumer_worker_init}, + 10_000 + ) + ), + ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + ok + end), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ok + end, + [] + ), + ok. + +t_connection_timeout_before_starting(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + emqx_common_test_helpers:with_failure( + timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := gcp_pubsub_consumer_worker_init}, + 10_000 + ) + ), + ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + ok + end + ), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ok + end, + [] + ), + ok. + +t_pull_worker_death(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := gcp_pubsub_consumer_worker_init}, + 10_000 + ) + ), + + [PullWorkerPid | _] = get_pull_worker_pids(Config), + Ref = monitor(process, PullWorkerPid), + sys:terminate(PullWorkerPid, die), + receive + {'DOWN', Ref, process, PullWorkerPid, _} -> + ok + after 500 -> ct:fail("pull worker didn't die") + end, + ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + + %% recovery + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + ok + end, + [] + ), + ok. + +t_async_worker_death_mid_pull(Config) -> + ct:timetrap({seconds, 120}), + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + ?check_trace( + begin + start_and_subscribe_mqtt(Config), + + ?force_ordering( + #{ + ?snk_kind := gcp_pubsub_consumer_worker_pull_async, + ?snk_span := {complete, _} + }, + #{?snk_kind := kill_async_worker, ?snk_span := start} + ), + ?force_ordering( + #{?snk_kind := kill_async_worker, ?snk_span := {complete, _}}, + #{?snk_kind := gcp_pubsub_consumer_worker_reply_delegator} + ), + spawn_link(fun() -> + ?tp_span( + kill_async_worker, + #{}, + begin + %% produce a message while worker is being killed + Messages = [#{<<"data">> => Payload}], + pubsub_publish(Config, PubSubTopic, Messages), + + AsyncWorkerPids = get_async_worker_pids(Config), + emqx_utils:pmap( + fun(AsyncWorkerPid) -> + Ref = monitor(process, AsyncWorkerPid), + sys:terminate(AsyncWorkerPid, die), + receive + {'DOWN', Ref, process, AsyncWorkerPid, _} -> + ok + after 500 -> ct:fail("async worker didn't die") + end, + ok + end, + AsyncWorkerPids + ), + + ok + end + ) + end), + + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + create_bridge( + Config, + #{<<"pool_size">> => 1} + ), + #{?snk_kind := gcp_pubsub_consumer_worker_init}, + 10_000 + ) + ), + + {ok, _} = + ?block_until( + #{?snk_kind := gcp_pubsub_consumer_worker_handled_async_worker_down}, + 30_000 + ), + + %% check that we eventually received the message. + %% for some reason, this can take forever in ci... + {ok, Published} = receive_published(#{timeout => 60_000}), + ?assertMatch([#{payload := #{<<"value">> := Payload}}], Published), + + ok + end, + [ + prop_handled_only_once(), + fun(Trace) -> + %% expected order of events; reply delegator called only once + SubTrace = ?of_kind( + [ + gcp_pubsub_consumer_worker_handled_async_worker_down, + gcp_pubsub_consumer_worker_pull_response_received, + gcp_pubsub_consumer_worker_reply_delegator + ], + Trace + ), + ?assertMatch( + [ + #{?snk_kind := gcp_pubsub_consumer_worker_handled_async_worker_down}, + #{?snk_kind := gcp_pubsub_consumer_worker_reply_delegator} + | _ + ], + SubTrace, + #{sub_trace => projection_optional_span(SubTrace)} + ), + ?assertMatch( + #{?snk_kind := gcp_pubsub_consumer_worker_pull_response_received}, + lists:last(SubTrace) + ), + ok + end + ] + ), + ok. + +t_connection_error_while_creating_subscription(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ?check_trace( + begin + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + %% check retries + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_error"}), + _NEvents0 = 2, + 10_000 + ), + {ok, _} = create_bridge(Config), + {ok, _} = snabbkaffe:receive_events(SRef0), + ok + end), + %% eventually succeeds + {ok, _} = + ?block_until( + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_created"}, + 10_000 + ), + ok + end, + [] + ), + ok. + +t_subscription_already_exists(Config) -> + BridgeName = ?config(bridge_name, Config), + ?check_trace( + begin + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_created"}, + 10_000 + ), + %% now restart the same bridge + {ok, _} = emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqx_bridge:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, + 10_000 + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [ + "gcp_pubsub_consumer_worker_subscription_already_exists", + "gcp_pubsub_consumer_worker_subscription_patched" + ], + ?projection( + ?snk_kind, + ?of_kind( + [ + "gcp_pubsub_consumer_worker_subscription_already_exists", + "gcp_pubsub_consumer_worker_subscription_patched" + ], + Trace + ) + ) + ), + ok + end + ), + ok. + +t_subscription_patch_error(Config) -> + BridgeName = ?config(bridge_name, Config), + ProxyName = ?config(proxy_name, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ?check_trace( + begin + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_created"}, + 10_000 + ), + %% now restart the same bridge + {ok, _} = emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + + ?force_ordering( + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_already_exists"}, + #{?snk_kind := cut_connection, ?snk_span := start} + ), + ?force_ordering( + #{?snk_kind := cut_connection, ?snk_span := {complete, _}}, + #{?snk_kind := gcp_pubsub_consumer_worker_patch_subscription_enter} + ), + spawn_link(fun() -> + ?tp_span( + cut_connection, + #{}, + emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort) + ) + end), + + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqx_bridge:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_patch_error"}, + 10_000 + ), + + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, + 10_000 + ), + + ok + end, + [] + ), + ok. + +t_topic_deleted_while_creating_subscription(Config) -> + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := gcp_pubsub_consumer_worker_init}, + #{?snk_kind := delete_topic, ?snk_span := start} + ), + ?force_ordering( + #{?snk_kind := delete_topic, ?snk_span := {complete, _}}, + #{?snk_kind := gcp_pubsub_consumer_worker_create_subscription_enter} + ), + spawn_link(fun() -> + ?tp_span( + delete_topic, + #{}, + delete_topic(Config, PubSubTopic) + ) + end), + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := gcp_pubsub_consumer_worker_terminate}, + 10_000 + ), + ?assertMatch({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ok + end, + [] + ), + ok. + +t_topic_deleted_while_patching_subscription(Config) -> + BridgeName = ?config(bridge_name, Config), + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_created"}, + 10_000 + ), + %% now restart the same bridge + {ok, _} = emqx_bridge:disable_enable(disable, ?BRIDGE_TYPE, BridgeName), + + ?force_ordering( + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_already_exists"}, + #{?snk_kind := delete_topic, ?snk_span := start} + ), + ?force_ordering( + #{?snk_kind := delete_topic, ?snk_span := {complete, _}}, + #{?snk_kind := gcp_pubsub_consumer_worker_patch_subscription_enter} + ), + spawn_link(fun() -> + ?tp_span( + delete_topic, + #{}, + delete_topic(Config, PubSubTopic) + ) + end), + %% as with deleting the topic of an existing subscription, patching after the + %% topic does not exist anymore doesn't return errors either... + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqx_bridge:disable_enable(enable, ?BRIDGE_TYPE, BridgeName), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, + 10_000 + ), + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ok + end, + [] + ), + ok. + +t_subscription_deleted_while_consumer_is_running(Config) -> + ResourceId = resource_id(Config), + ?check_trace( + begin + {{ok, _}, {ok, #{subscription_id := SubscriptionId}}} = + ?wait_async_action( + create_bridge(Config), + #{ + ?snk_kind := gcp_pubsub_consumer_worker_pull_async, + ?snk_span := {complete, _} + }, + 10_000 + ), + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event( + #{?snk_kind := "gcp_pubsub_consumer_worker_pull_error"} + ), + 30_000 + ), + {ok, SRef1} = + snabbkaffe:subscribe( + ?match_event( + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"} + ), + 30_000 + ), + delete_subscription(Config, SubscriptionId), + {ok, _} = snabbkaffe:receive_events(SRef0), + {ok, _} = snabbkaffe:receive_events(SRef1), + + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ok + end, + fun(Trace0) -> + SubTrace0 = ?of_kind( + [ + "gcp_pubsub_consumer_worker_pull_error", + "gcp_pubsub_consumer_worker_subscription_ready", + gcp_pubsub_consumer_worker_create_subscription_enter + ], + Trace0 + ), + SubTrace = projection_optional_span(SubTrace0), + ?assertMatch( + [ + gcp_pubsub_consumer_worker_create_subscription_enter, + "gcp_pubsub_consumer_worker_subscription_ready", + "gcp_pubsub_consumer_worker_pull_error", + gcp_pubsub_consumer_worker_create_subscription_enter + | _ + ], + SubTrace + ), + ?assertEqual( + "gcp_pubsub_consumer_worker_subscription_ready", + lists:last(SubTrace), + #{sub_trace => SubTrace} + ), + ok + end + ), + ok. + +t_subscription_and_topic_deleted_while_consumer_is_running(Config) -> + ct:timetrap({seconds, 90}), + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + {{ok, _}, {ok, #{subscription_id := SubscriptionId}}} = + ?wait_async_action( + create_bridge(Config), + #{ + ?snk_kind := gcp_pubsub_consumer_worker_pull_async, + ?snk_span := {complete, _} + }, + 10_000 + ), + delete_topic(Config, PubSubTopic), + delete_subscription(Config, SubscriptionId), + {ok, _} = ?block_until(#{?snk_kind := gcp_pubsub_consumer_worker_terminate}, 60_000), + + ?assertMatch({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ok + end, + [] + ), + ok. + +t_connection_down_during_ack(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + ?check_trace( + begin + start_and_subscribe_mqtt(Config), + + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, + 10_000 + ), + + ?force_ordering( + #{ + ?snk_kind := "gcp_pubsub_consumer_worker_handle_message", + ?snk_span := {complete, _} + }, + #{?snk_kind := cut_connection, ?snk_span := start} + ), + ?force_ordering( + #{?snk_kind := cut_connection, ?snk_span := {complete, _}}, + #{?snk_kind := gcp_pubsub_consumer_worker_acknowledge_enter} + ), + spawn_link(fun() -> + ?tp_span( + cut_connection, + #{}, + emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort) + ) + end), + + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + Messages = [#{<<"data">> => Payload}], + pubsub_publish(Config, PubSubTopic, Messages), + {ok, _} = ?block_until(#{?snk_kind := "gcp_pubsub_consumer_worker_ack_error"}, 10_000), + + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort), + #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}, + 30_000 + ), + + {ok, _Published} = receive_published(), + + ok + end, + [ + prop_all_pulled_are_acked(), + prop_pulled_only_once(), + prop_handled_only_once(), + {"message is processed only once", fun(Trace) -> + ?assertMatch({timeout, _}, receive_published(#{timeout => 5_000})), + ?assertMatch( + [#{?snk_span := start}, #{?snk_span := {complete, _}}], + ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace) + ), + ok + end} + ] + ), + ok. + +t_connection_down_during_ack_redeliver(Config) -> + ct:timetrap({seconds, 120}), + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + ?check_trace( + begin + start_and_subscribe_mqtt(Config), + + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge( + Config, + #{<<"consumer">> => #{<<"ack_deadline">> => <<"10s">>}} + ), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, + 10_000 + ), + + emqx_common_test_helpers:with_mock( + emqx_bridge_gcp_pubsub_client, + query_sync, + fun(PreparedRequest = {prepared_request, {_Method, Path, _Body}}, Client) -> + case re:run(Path, <<":acknowledge$">>) of + {match, _} -> + ct:sleep(800), + {error, timeout}; + nomatch -> + meck:passthrough([PreparedRequest, Client]) + end + end, + fun() -> + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + Messages = [#{<<"data">> => Payload}], + pubsub_publish(Config, PubSubTopic, Messages), + {ok, _} = snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := "gcp_pubsub_consumer_worker_ack_error"}), + 20_000 + ), + %% The minimum deadline pubsub does is 10 s. + {ok, _} = ?block_until(#{?snk_kind := message_redelivered}, 30_000), + ok + end + ), + + {ok, Published} = receive_published(), + ct:pal("received: ~p", [Published]), + + wait_forgotten(#{timeout => 60_000}), + + %% should be processed only once + Res = receive_published(#{timeout => 5_000}), + + Res + end, + [ + prop_acked_ids_eventually_forgotten(), + prop_all_pulled_are_acked(), + prop_handled_only_once(), + {"message is processed only once", fun(Res, Trace) -> + ?assertMatch({timeout, _}, Res), + ?assertMatch( + [#{?snk_span := start}, #{?snk_span := {complete, _}}], + ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace) + ), + ok + end} + ] + ), + ok. + +t_connection_down_during_pull(Config) -> + ct:timetrap({seconds, 90}), + ProxyName = ?config(proxy_name, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + [#{pubsub_topic := PubSubTopic}] = ?config(topic_mapping, Config), + FailureType = timeout, + ?check_trace( + begin + start_and_subscribe_mqtt(Config), + + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, + 10_000 + ), + + ?force_ordering( + #{ + ?snk_kind := gcp_pubsub_consumer_worker_pull_async, + ?snk_span := start + }, + #{?snk_kind := cut_connection, ?snk_span := start} + ), + ?force_ordering( + #{?snk_kind := cut_connection, ?snk_span := {complete, _}}, + #{ + ?snk_kind := gcp_pubsub_consumer_worker_pull_async, + ?snk_span := {complete, _} + } + ), + spawn_link(fun() -> + ?tp_span( + cut_connection, + #{}, + begin + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + Messages = [#{<<"data">> => Payload}], + pubsub_publish(Config, PubSubTopic, Messages), + emqx_common_test_helpers:enable_failure( + FailureType, ProxyName, ProxyHost, ProxyPort + ), + ok + end + ) + end), + + ?block_until("gcp_pubsub_consumer_worker_pull_error", 10_000), + emqx_common_test_helpers:heal_failure(FailureType, ProxyName, ProxyHost, ProxyPort), + + {ok, _Published} = receive_published(), + + Res = receive_published(#{timeout => 5_000}), + + wait_forgotten(#{timeout => 60_000}), + + Res + end, + [ + prop_acked_ids_eventually_forgotten(), + prop_all_pulled_are_acked(), + prop_handled_only_once(), + {"message is processed only once", fun(Res, Trace) -> + ?assertMatch({timeout, _}, Res), + ?assertMatch( + [#{?snk_span := start}, #{?snk_span := {complete, _}}], + ?of_kind("gcp_pubsub_consumer_worker_handle_message", Trace) + ), + ok + end} + ] + ), + ok. + +%% debugging api +t_get_subscription(Config) -> + ?check_trace( + begin + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}, + 10_000 + ) + ), + + [PullWorkerPid | _] = get_pull_worker_pids(Config), + ?retry( + _Interval0 = 200, + _NAttempts0 = 20, + ?assertMatch( + {ok, #{}}, + emqx_bridge_gcp_pubsub_consumer_worker:get_subscription(PullWorkerPid) + ) + ), + + ok + end, + [] + ), + ok. + +t_cluster_subscription(Config) -> + [ + #{ + mqtt_topic := MQTTTopic, + pubsub_topic := PubSubTopic + } + ] = ?config(topic_mapping, Config), + BridgeId = bridge_id(Config), + Cluster = [{_N1, Opts1} | _] = cluster(Config), + ?check_trace( + begin + Nodes = [N1, N2] = start_cluster(Cluster), + NumNodes = length(Nodes), + lists:foreach(fun wait_for_cluster_rpc/1, Nodes), + erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end), + lists:foreach( + fun(N) -> + ?assertMatch( + {ok, _}, + erpc:call(N, emqx_bridge, lookup, [BridgeId]), + #{node => N} + ) + end, + Nodes + ), + {ok, _} = snabbkaffe:block_until( + ?match_n_events( + NumNodes, + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"} + ), + 10_000 + ), + + setup_and_start_listeners(N1, Opts1), + TCPPort1 = emqx_common_test_helpers:listener_port(Opts1, tcp), + {ok, C1} = emqtt:start_link([{port, TCPPort1}, {proto_ver, v5}]), + on_exit(fun() -> catch emqtt:stop(C1) end), + {ok, _} = emqtt:connect(C1), + {ok, _, [2]} = emqtt:subscribe(C1, MQTTTopic, 2), + + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + Messages = [#{<<"data">> => Payload}], + pubsub_publish(Config, PubSubTopic, Messages), + + ?assertMatch({ok, _Published}, receive_published()), + + ok + end, + [prop_handled_only_once()] + ), + ok. diff --git a/apps/emqx_resource/src/emqx_resource_pool.erl b/apps/emqx_resource/src/emqx_resource_pool.erl index 3d464d51e..2efc47d45 100644 --- a/apps/emqx_resource/src/emqx_resource_pool.erl +++ b/apps/emqx_resource/src/emqx_resource_pool.erl @@ -19,6 +19,7 @@ -export([ start/3, stop/1, + health_check_timeout/0, health_check_workers/2, health_check_workers/3, health_check_workers/4 @@ -66,6 +67,9 @@ stop(Name) -> error({stop_pool_failed, Name, Reason}) end. +health_check_timeout() -> + ?HEALTH_CHECK_TIMEOUT. + health_check_workers(PoolName, CheckFunc) -> health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT, _Opts = #{}).