From 5fdf7fd24c0fc9053b7ad99debbb197c9887b447 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 19 Jan 2023 12:39:42 +0100 Subject: [PATCH] fix(kafka): use async callback to bump success counters some telemetry events from wolff are discarded: * dropped: this is double counted in wolff, we now only subscribe to the dropped_queue_full event * retried_failed: it has different meanings in wolff, in wolff, it means it's the 2nd (or onward) produce attempt in EMQX, it means it's eventually failed after some retries * retried_success since we are going to handle the success counters in callbac this having this reported from wolff will only make things harder to understand * failed wolff never fails (unelss drop which is a different counter) --- apps/emqx_bridge/src/emqx_bridge.erl | 16 ++- apps/emqx_resource/include/emqx_resource.hrl | 2 + apps/emqx_resource/src/emqx_resource.erl | 5 +- .../src/emqx_resource_buffer_worker.erl | 70 ++++++++----- .../src/kafka/emqx_bridge_impl_kafka.erl | 8 +- .../kafka/emqx_bridge_impl_kafka_producer.erl | 98 ++++++++----------- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 84 +++++++++++----- scripts/ct/run.sh | 2 +- 8 files changed, 172 insertions(+), 113 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 5b3fe796b..fb199522d 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -171,14 +171,22 @@ send_message(BridgeId, Message) -> not_found -> {error, {bridge_not_found, BridgeId}}; #{enable := true} = Config -> - Timeout = emqx_map_lib:deep_get( - [resource_opts, request_timeout], Config, timer:seconds(15) - ), - emqx_resource:query(ResId, {send_message, Message}, #{timeout => Timeout}); + QueryOpts = query_opts(Config), + emqx_resource:query(ResId, {send_message, Message}, QueryOpts); #{enable := false} -> {error, {bridge_stopped, BridgeId}} end. +query_opts(Config) -> + case emqx_map_lib:deep_get([resource_opts, request_timeout], Config, false) of + Timeout when is_integer(Timeout) -> + %% request_timeout is configured + #{timeout => Timeout}; + _ -> + %% emqx_resource has a default value (15s) + #{} + end. + config_key_path() -> [bridges]. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 03be8fae8..7464eb4f8 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -89,6 +89,8 @@ -define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>). +-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). + %% count -define(DEFAULT_BATCH_SIZE, 1). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index bb27b6acd..ad7f30b47 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -255,7 +255,7 @@ reset_metrics(ResId) -> query(ResId, Request) -> query(ResId, Request, #{}). --spec query(resource_id(), Request :: term(), emqx_resource_buffer_worker:query_opts()) -> +-spec query(resource_id(), Request :: term(), query_opts()) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:ets_lookup(ResId) of @@ -263,7 +263,8 @@ query(ResId, Request, Opts) -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of {true, _} -> - emqx_resource_buffer_worker:simple_sync_query(ResId, Request); + %% only Kafka so far + emqx_resource_buffer_worker:simple_async_query(ResId, Request); {false, sync} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {false, async} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 634b0e954..11d3753f0 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -38,7 +38,8 @@ ]). -export([ - simple_sync_query/2 + simple_sync_query/2, + simple_async_query/2 ]). -export([ @@ -61,6 +62,7 @@ -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). +-define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)). -define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}). -define(EXPAND(RESULT, BATCH), [ ?REPLY(FROM, REQUEST, SENT, RESULT) @@ -116,8 +118,8 @@ async_query(Id, Request, Opts0) -> emqx_resource_metrics:matched_inc(Id), pick_cast(Id, PickKey, {query, Request, Opts}). -%% simple query the resource without batching and queuing messages. --spec simple_sync_query(id(), request()) -> Result :: term(). +%% simple query the resource without batching and queuing. +-spec simple_sync_query(id(), request()) -> term(). simple_sync_query(Id, Request) -> %% Note: since calling this function implies in bypassing the %% buffer workers, and each buffer worker index is used when @@ -126,18 +128,27 @@ simple_sync_query(Id, Request) -> %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. Index = undefined, - QueryOpts0 = #{simple_query => true, timeout => infinity}, - QueryOpts = #{expire_at := ExpireAt} = ensure_expire_at(QueryOpts0), + QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), Ref = make_message_ref(), - HasBeenSent = false, - From = self(), - Result = call_query( - sync, Id, Index, Ref, ?QUERY(From, Request, HasBeenSent, ExpireAt), QueryOpts - ), - _ = handle_query_result(Id, Result, HasBeenSent), + Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. +%% simple async-query the resource without batching and queuing. +-spec simple_async_query(id(), request()) -> term(). +simple_async_query(Id, Request) -> + Index = undefined, + QueryOpts = simple_query_opts(), + emqx_resource_metrics:matched_inc(Id), + Ref = make_message_ref(), + Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + _ = handle_query_result(Id, Result, _HasBeenSent = false), + Result. + +simple_query_opts() -> + ensure_expire_at(#{simple_query => true, timeout => infinity}). + -spec block(pid()) -> ok. block(ServerRef) -> gen_statem:cast(ServerRef, block). @@ -848,9 +859,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> QM = - case QM0 of - configured -> maps:get(query_mode, Data); - _ -> QM0 + case QM0 =:= configured of + true -> maps:get(query_mode, Data); + false -> QM0 end, CBM = maps:get(callback_mode, Data), CallMode = call_mode(QM, CBM), @@ -991,11 +1002,7 @@ do_reply_after_query( ref => Ref, result => Result }), - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso PostFn(), - IsFullBefore andalso ?MODULE:flush_worker(Pid), - ok + do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) end. batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> @@ -1049,13 +1056,23 @@ do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, R ref => Ref, result => Result }), - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso PostFn(), - IsFullBefore andalso ?MODULE:flush_worker(Pid), - ok + do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) end. +do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> + IsFullBefore = is_inflight_full(InflightTID), + IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index), + case maps:get(simple_query, QueryOpts, false) of + true -> + PostFn(); + false when IsKnownRef -> + PostFn(); + false -> + ok + end, + IsFullBefore andalso ?MODULE:flush_worker(WorkerPid), + ok. + %%============================================================================== %% operations for queue queue_item_marshaller(Bin) when is_binary(Bin) -> @@ -1113,7 +1130,7 @@ inflight_new(InfltWinSZ, Id, Index) -> inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index), inflight_append( - TableId, {?INITIAL_MONOTONIC_TIME_REF, erlang:monotonic_time(nanosecond)}, Id, Index + TableId, {?INITIAL_MONOTONIC_TIME_REF, make_message_ref()}, Id, Index ), TableId. @@ -1426,8 +1443,7 @@ now_() -> ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) -> Opts; ensure_timeout_query_opts(#{} = Opts0, sync) -> - TimeoutMS = timer:seconds(15), - Opts0#{timeout => TimeoutMS}; + Opts0#{timeout => ?DEFAULT_REQUEST_TIMEOUT}; ensure_timeout_query_opts(#{} = Opts0, async) -> Opts0#{timeout => infinity}. diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl index 747cd187d..49ca9fb86 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl @@ -12,6 +12,7 @@ on_start/2, on_stop/2, on_query/3, + on_query_async/4, on_get_status/2, is_buffer_supported/0 ]). @@ -26,8 +27,11 @@ on_start(InstId, Config) -> on_stop(InstId, State) -> emqx_bridge_impl_kafka_producer:on_stop(InstId, State). -on_query(InstId, Msg, State) -> - emqx_bridge_impl_kafka_producer:on_query(InstId, Msg, State). +on_query(InstId, Req, State) -> + emqx_bridge_impl_kafka_producer:on_query(InstId, Req, State). + +on_query_async(InstId, Req, ReplyFn, State) -> + emqx_bridge_impl_kafka_producer:on_query_async(InstId, Req, ReplyFn, State). on_get_status(InstId, State) -> emqx_bridge_impl_kafka_producer:on_get_status(InstId, State). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 25741b6cd..18e27b775 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -11,6 +11,7 @@ on_start/2, on_stop/2, on_query/3, + on_query_async/4, on_get_status/2 ]). @@ -140,19 +141,48 @@ on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_i } ). +on_query( + _InstId, + {send_message, Message}, + #{message_template := Template, producers := Producers} +) -> + KafkaMessage = render_message(Template, Message), + %% TODO: this function is not used so far, + %% timeout should be configurable + %% or the on_query/3 should be on_query/4 instead. + try + {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], 5000), + ok + catch + error:{producer_down, _} = Reason -> + {error, Reason}; + error:timeout -> + {error, timeout} + end. + %% @doc The callback API for rule-engine (or bridge without rules) %% The input argument `Message' is an enriched format (as a map()) %% of the original #message{} record. %% The enrichment is done by rule-engine or by the data bridge framework. %% E.g. the output of rule-engine process chain %% or the direct mapping from an MQTT message. -on_query(_InstId, {send_message, Message}, #{message_template := Template, producers := Producers}) -> +on_query_async( + _InstId, + {send_message, Message}, + AsyncReplyFn, + #{message_template := Template, producers := Producers} +) -> KafkaMessage = render_message(Template, Message), + %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs + %% * Must be a single element batch because wolff books calls, but not batch sizes + %% for counters and gauges. + Batch = [KafkaMessage], %% The retuned information is discarded here. %% If the producer process is down when sending, this function would %% raise an error exception which is to be caught by the caller of this callback - {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}), - {async_return, ok}. + {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}), + %% this Pid is so far never used because Kafka producer is by-passing the buffer worker + {ok, Pid}. compile_message_template(T) -> KeyTemplate = maps:get(key, T, <<"${.clientid}">>), @@ -194,9 +224,14 @@ render_timestamp(Template, Message) -> erlang:system_time(millisecond) end. -on_kafka_ack(_Partition, _Offset, _Extra) -> - %% Do nothing so far. - %% Maybe need to bump some counters? +%% Wolff producer never gives up retrying +%% so there can only be 'ok' results. +on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) -> + %% the ReplyFn is emqx_resource_worker:reply_after_query/8 + apply(ReplyFn, Args ++ [ok]); +on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> + %% wolff should bump the dropped_queue_full counter + %% do not apply the callback (which is basically to bump success or fail counter) ok. on_get_status(_InstId, _State) -> @@ -345,27 +380,13 @@ get_required(Field, Config, Throw) -> %% we *must* match the bridge id in the event metadata with that in %% the handler config; otherwise, multiple kafka producer bridges will %% install multiple handlers to the same wolff events, multiplying the -handle_telemetry_event( - [wolff, dropped], - #{counter_inc := Val}, - #{bridge_id := ID}, - #{bridge_id := ID} -) when is_integer(Val) -> - emqx_resource_metrics:dropped_inc(ID, Val); handle_telemetry_event( [wolff, dropped_queue_full], #{counter_inc := Val}, #{bridge_id := ID}, #{bridge_id := ID} ) when is_integer(Val) -> - %% When wolff emits a `dropped_queue_full' event due to replayq - %% overflow, it also emits a `dropped' event (at the time of - %% writing, wolff is 1.7.4). Since we already bump `dropped' when - %% `dropped.queue_full' occurs, we have to correct it here. This - %% correction will have to be dropped if wolff stops also emitting - %% `dropped'. - emqx_resource_metrics:dropped_queue_full_inc(ID, Val), - emqx_resource_metrics:dropped_inc(ID, -Val); + emqx_resource_metrics:dropped_queue_full_inc(ID, Val); handle_telemetry_event( [wolff, queuing], #{gauge_set := Val}, @@ -380,13 +401,6 @@ handle_telemetry_event( #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:retried_inc(ID, Val); -handle_telemetry_event( - [wolff, failed], - #{counter_inc := Val}, - #{bridge_id := ID}, - #{bridge_id := ID} -) when is_integer(Val) -> - emqx_resource_metrics:failed_inc(ID, Val); handle_telemetry_event( [wolff, inflight], #{gauge_set := Val}, @@ -394,27 +408,6 @@ handle_telemetry_event( #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:inflight_set(ID, PartitionID, Val); -handle_telemetry_event( - [wolff, retried_failed], - #{counter_inc := Val}, - #{bridge_id := ID}, - #{bridge_id := ID} -) when is_integer(Val) -> - emqx_resource_metrics:retried_failed_inc(ID, Val); -handle_telemetry_event( - [wolff, retried_success], - #{counter_inc := Val}, - #{bridge_id := ID}, - #{bridge_id := ID} -) when is_integer(Val) -> - emqx_resource_metrics:retried_success_inc(ID, Val); -handle_telemetry_event( - [wolff, success], - #{counter_inc := Val}, - #{bridge_id := ID}, - #{bridge_id := ID} -) when is_integer(Val) -> - emqx_resource_metrics:success_inc(ID, Val); handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. @@ -437,15 +430,10 @@ maybe_install_wolff_telemetry_handlers(ResourceID) -> %% unique handler id telemetry_handler_id(ResourceID), [ - [wolff, dropped], [wolff, dropped_queue_full], [wolff, queuing], [wolff, retried], - [wolff, failed], - [wolff, inflight], - [wolff, retried_failed], - [wolff, retried_success], - [wolff, success] + [wolff, inflight] ], fun ?MODULE:handle_telemetry_event/4, %% we *must* keep track of the same id that is handed down to diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 14567dd39..10a6d8d9a 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -46,7 +46,14 @@ %%------------------------------------------------------------------------------ all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, on_query}, + {group, on_query_async} + ]. + +groups() -> + All = emqx_common_test_helpers:all(?MODULE), + [{on_query, All}, {on_query_async, All}]. wait_until_kafka_is_up() -> wait_until_kafka_is_up(0). @@ -89,6 +96,12 @@ end_per_suite(_Config) -> _ = application:stop(emqx_connector), ok. +init_per_group(GroupName, Config) -> + [{query_api, GroupName} | Config]. + +end_per_group(_, _) -> + ok. + set_special_configs(emqx_management) -> Listeners = #{http => #{port => 8081}}, Config = #{ @@ -106,23 +119,23 @@ set_special_configs(_) -> %% Test cases for all combinations of SSL, no SSL and authentication types %%------------------------------------------------------------------------------ -t_publish_no_auth(_CtConfig) -> - publish_with_and_without_ssl("none"). +t_publish_no_auth(CtConfig) -> + publish_with_and_without_ssl(CtConfig, "none"). -t_publish_no_auth_key_dispatch(_CtConfig) -> - publish_with_and_without_ssl("none", #{"partition_strategy" => "key_dispatch"}). +t_publish_no_auth_key_dispatch(CtConfig) -> + publish_with_and_without_ssl(CtConfig, "none", #{"partition_strategy" => "key_dispatch"}). -t_publish_sasl_plain(_CtConfig) -> - publish_with_and_without_ssl(valid_sasl_plain_settings()). +t_publish_sasl_plain(CtConfig) -> + publish_with_and_without_ssl(CtConfig, valid_sasl_plain_settings()). -t_publish_sasl_scram256(_CtConfig) -> - publish_with_and_without_ssl(valid_sasl_scram256_settings()). +t_publish_sasl_scram256(CtConfig) -> + publish_with_and_without_ssl(CtConfig, valid_sasl_scram256_settings()). -t_publish_sasl_scram512(_CtConfig) -> - publish_with_and_without_ssl(valid_sasl_scram512_settings()). +t_publish_sasl_scram512(CtConfig) -> + publish_with_and_without_ssl(CtConfig, valid_sasl_scram512_settings()). -t_publish_sasl_kerberos(_CtConfig) -> - publish_with_and_without_ssl(valid_sasl_kerberos_settings()). +t_publish_sasl_kerberos(CtConfig) -> + publish_with_and_without_ssl(CtConfig, valid_sasl_kerberos_settings()). %%------------------------------------------------------------------------------ %% Test cases for REST api @@ -350,7 +363,7 @@ kafka_bridge_rest_api_helper(Config) -> %% exists and it will. This is specially bad if the %% original crash was due to misconfiguration and we are %% trying to fix it... -t_failed_creation_then_fix(_Config) -> +t_failed_creation_then_fix(Config) -> HostsString = kafka_hosts_string_sasl(), ValidAuthSettings = valid_sasl_plain_settings(), WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"}, @@ -394,7 +407,7 @@ t_failed_creation_then_fix(_Config) -> }, {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), ct:pal("base offset before testing ~p", [Offset]), - ?assertEqual({async_return, ok}, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)), + ok = send(Config, ResourceId, Msg, State), {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), %% TODO: refactor those into init/end per testcase @@ -406,11 +419,37 @@ t_failed_creation_then_fix(_Config) -> %% Helper functions %%------------------------------------------------------------------------------ -publish_with_and_without_ssl(AuthSettings) -> - publish_with_and_without_ssl(AuthSettings, #{}). +send(Config, ResourceId, Msg, State) when is_list(Config) -> + Ref = make_ref(), + ok = do_send(Ref, Config, ResourceId, Msg, State), + receive + {ack, Ref} -> + ok + after 10000 -> + error(timeout) + end. -publish_with_and_without_ssl(AuthSettings, Config) -> +do_send(Ref, Config, ResourceId, Msg, State) when is_list(Config) -> + Caller = self(), + F = fun(ok) -> + Caller ! {ack, Ref}, + ok + end, + case proplists:get_value(query_api, Config) of + on_query -> + ok = ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State), + F(ok); + on_query_async -> + {ok, _} = ?PRODUCER:on_query_async(ResourceId, {send_message, Msg}, {F, []}, State), + ok + end. + +publish_with_and_without_ssl(CtConfig, AuthSettings) -> + publish_with_and_without_ssl(CtConfig, AuthSettings, #{}). + +publish_with_and_without_ssl(CtConfig, AuthSettings, Config) -> publish_helper( + CtConfig, #{ auth_settings => AuthSettings, ssl_settings => #{} @@ -418,6 +457,7 @@ publish_with_and_without_ssl(AuthSettings, Config) -> Config ), publish_helper( + CtConfig, #{ auth_settings => AuthSettings, ssl_settings => valid_ssl_settings() @@ -426,10 +466,11 @@ publish_with_and_without_ssl(AuthSettings, Config) -> ), ok. -publish_helper(AuthSettings) -> - publish_helper(AuthSettings, #{}). +publish_helper(CtConfig, AuthSettings) -> + publish_helper(CtConfig, AuthSettings, #{}). publish_helper( + CtConfig, #{ auth_settings := AuthSettings, ssl_settings := SSLSettings @@ -477,8 +518,7 @@ publish_helper( ct:pal("base offset before testing ~p", [Offset]), StartRes = ?PRODUCER:on_start(InstId, Conf), {ok, State} = StartRes, - OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State), - {async_return, ok} = OnQueryRes, + ok = send(CtConfig, InstId, Msg, State), {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ok = ?PRODUCER:on_stop(InstId, State), diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 07d45efe1..69427c7c3 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -195,7 +195,7 @@ fi echo "Fixing file owners and permissions for $UID_GID" # rebar and hex cache directory need to be writable by $UID -docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "mkdir -p /.cache && chown $UID_GID /.cache && chown -R $UID_GID /emqx" +docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "mkdir -p /.cache && chown $UID_GID /.cache && chown -R $UID_GID /emqx/.git /emqx/.ci /emqx/_build/default/lib" # need to initialize .erlang.cookie manually here because / is not writable by $UID docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "openssl rand -base64 16 > /.erlang.cookie && chown $UID_GID /.erlang.cookie && chmod 0400 /.erlang.cookie"