diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index a3dc147d8..64862c7c9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -94,6 +94,7 @@ id/2, id/3, source_id/3, + source_hookpoint/1, bridge_v1_is_valid/2, bridge_v1_is_valid/3, extract_connector_id_from_bridge_v2_id/1 diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 6e731cb80..6580259d4 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -120,13 +121,29 @@ delete_all_connectors() -> %% test helpers parse_and_check(Type, Name, InnerConfigMap0) -> + parse_and_check(action, Type, Name, InnerConfigMap0). + +parse_and_check(Kind, Type, Name, InnerConfigMap0) -> + RootBin = + case Kind of + action -> <<"actions">>; + source -> <<"sources">> + end, TypeBin = emqx_utils_conv:bin(Type), - RawConf = #{<<"actions">> => #{TypeBin => #{Name => InnerConfigMap0}}}, - #{<<"actions">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain( + RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}}, + #{RootBin := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain( emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false} ), InnerConfigMap. +parse_and_check_connector(Type, Name, InnerConfigMap0) -> + TypeBin = emqx_utils_conv:bin(Type), + RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}}, + #{<<"connectors">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain( + emqx_connector_schema, RawConf, #{required => false, atom_key => false} + ), + InnerConfigMap. + bridge_id(Config) -> BridgeType = ?config(bridge_type, Config), BridgeName = ?config(bridge_name, Config), @@ -134,10 +151,30 @@ bridge_id(Config) -> ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), <<"action:", BridgeId/binary, ":", ConnectorId/binary>>. +source_hookpoint(Config) -> + #{kind := source, type := Type, name := Name} = get_common_values(Config), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + emqx_bridge_v2:source_hookpoint(BridgeId). + +add_source_hookpoint(Config) -> + Hookpoint = source_hookpoint(Config), + ok = emqx_hooks:add(Hookpoint, {?MODULE, source_hookpoint_callback, [self()]}, 1000), + on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, source_hookpoint_callback}) end), + ok. + resource_id(Config) -> - BridgeType = ?config(bridge_type, Config), - BridgeName = ?config(bridge_name, Config), - emqx_bridge_resource:resource_id(BridgeType, BridgeName). + #{ + kind := Kind, + type := Type, + name := Name, + connector_name := ConnectorName + } = get_common_values(Config), + case Kind of + source -> + emqx_bridge_v2:source_id(Type, Name, ConnectorName); + action -> + emqx_bridge_resource:resource_id(Type, Name) + end. create_bridge(Config) -> create_bridge(Config, _Overrides = #{}). @@ -506,6 +543,54 @@ bridges_api_spec_schemas() -> actions_api_spec_schemas() -> api_spec_schemas("actions"). +get_value(Key, Config) -> + case proplists:get_value(Key, Config, undefined) of + undefined -> + error({missing_required_config, Key, Config}); + Value -> + Value + end. + +get_common_values(Config) -> + Kind = proplists:get_value(bridge_kind, Config, action), + case Kind of + action -> + #{ + conf_root_key => actions, + kind => Kind, + type => get_ct_config_with_fallback(Config, [action_type, bridge_type]), + name => get_ct_config_with_fallback(Config, [action_name, bridge_name]), + connector_type => get_value(connector_type, Config), + connector_name => get_value(connector_name, Config) + }; + source -> + #{ + conf_root_key => sources, + kind => Kind, + type => get_value(source_type, Config), + name => get_value(source_name, Config), + connector_type => get_value(connector_type, Config), + connector_name => get_value(connector_name, Config) + } + end. + +connector_resource_id(Config) -> + #{connector_type := Type, connector_name := Name} = get_common_values(Config), + emqx_connector_resource:resource_id(Type, Name). + +health_check_channel(Config) -> + ConnectorResId = connector_resource_id(Config), + ChannelResId = resource_id(Config), + emqx_resource_manager:channel_health_check(ConnectorResId, ChannelResId). + +%%------------------------------------------------------------------------------ +%% Internal export +%%------------------------------------------------------------------------------ + +source_hookpoint_callback(Message, TestPid) -> + TestPid ! {consumed_message, Message}, + ok. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -574,6 +659,55 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> end, ok. +%% - `ProduceFn': produces a message in the remote system that shall be consumed. +%% - `Tracepoint': marks the end of consumed message processing. +t_consume(Config, Opts) -> + #{ + consumer_ready_tracepoint := ConsumerReadyTPFn, + produce_fn := ProduceFn, + check_fn := CheckFn, + produce_tracepoint := TracePointFn + } = Opts, + ?check_trace( + begin + ?assertMatch( + {{ok, _}, {ok, _}}, + snabbkaffe:wait_async_action( + fun() -> create_bridge_api(Config) end, + ConsumerReadyTPFn, + 15_000 + ) + ), + ok = add_source_hookpoint(Config), + ResourceId = resource_id(Config), + ?retry( + _Sleep = 200, + _Attempts = 20, + ?assertMatch( + #{status := ?status_connected}, + health_check_channel(Config) + ) + ), + ?assertMatch( + {_, {ok, _}}, + snabbkaffe:wait_async_action( + ProduceFn, + TracePointFn, + 15_000 + ) + ), + receive + {consumed_message, Message} -> + CheckFn(Message) + after 5_000 -> + error({timeout, process_info(self(), messages)}) + end, + ok + end, + [] + ), + ok. + t_create_via_http(Config) -> ?check_trace( begin @@ -608,13 +742,15 @@ t_start_stop(Config, StopTracePoint) -> ?check_trace( begin - ProbeRes0 = probe_bridge_api( - Kind, - Type, - Name, - BridgeConfig + ?assertMatch( + {ok, {{_, 204, _}, _Headers, _Body}}, + probe_bridge_api( + Kind, + Type, + Name, + BridgeConfig + ) ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), %% Check that the bridge probe API doesn't leak atoms. AtomsBefore = erlang:system_info(atom_count), %% Probe again; shouldn't have created more atoms. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_action_info.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_action_info.erl index 16129b997..0e602e610 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_action_info.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_action_info.erl @@ -34,7 +34,8 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> ParamsKeys = source_action_parameters_field_keys(), Config1 = maps:with(CommonSourceKeys, BridgeV1Config), ConsumerCfg = maps:get(<<"consumer">>, BridgeV1Config, #{}), - Params = maps:with(ParamsKeys, ConsumerCfg), + Params0 = maps:with(ParamsKeys, ConsumerCfg), + Params = maybe_set_pubsub_topic(Params0), {source, gcp_pubsub_consumer, emqx_utils_maps:update_if_present( <<"resource_opts">>, @@ -59,12 +60,20 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, SourceConfig) -> fun(RO) -> maps:with(bridge_v1_resource_opts_fields(), RO) end, BridgeV1Config2 ), - emqx_utils_maps:rename(<<"parameters">>, <<"consumer">>, BridgeV1Config3). + BridgeV1Config4 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"topic">>], BridgeV1Config3), + emqx_utils_maps:rename(<<"parameters">>, <<"consumer">>, BridgeV1Config4). %%------------------------------------------------------------------------------------------ %% Internal helper fns %%------------------------------------------------------------------------------------------ +%% The new schema has a single pubsub topic, so we take it from topic mapping when +%% converting from v1. +maybe_set_pubsub_topic(#{<<"topic_mapping">> := [#{<<"pubsub_topic">> := Topic} | _]} = Params) -> + Params#{<<"topic">> => Topic}; +maybe_set_pubsub_topic(Params) -> + Params. + resource_opts_fields() -> [ to_bin(K) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_schema.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_schema.erl index 0be13d64a..00c527e65 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_schema.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_schema.erl @@ -61,8 +61,35 @@ fields(consumer_source) -> #{resource_opts_ref => ref(?MODULE, source_resource_opts)} ); fields(source_parameters) -> - %% FIXME: check - emqx_bridge_gcp_pubsub:fields(consumer); + Fields0 = emqx_bridge_gcp_pubsub:fields(consumer), + Fields = lists:map( + fun + ({topic_mapping = Name, Sc}) -> + %% to please dialyzer... + Override = #{ + type => hocon_schema:field_schema(Sc, type), + required => false, + default => [], + validator => fun(_) -> ok end, + importance => ?IMPORTANCE_HIDDEN + }, + {Name, hocon_schema:override(Sc, Override)}; + (FieldSchema) -> + FieldSchema + end, + Fields0 + ), + [ + {topic, + mk( + binary(), + #{ + required => true, + desc => ?DESC(emqx_bridge_gcp_pubsub, "pubsub_topic") + } + )} + | Fields + ]; fields(source_resource_opts) -> Fields = [ health_check_interval, @@ -174,23 +201,17 @@ source_example(get) -> source_example(put) -> #{ enable => true, - connector => <<"my_connector_name">>, - description => <<"My action">>, - local_topic => <<"local/topic">>, - resource_opts => - #{batch_size => 5}, + description => <<"my source">>, + connector => <<"my_connector">>, parameters => #{ - pubsub_topic => <<"mytopic">>, - ordering_key_template => <<"${payload.ok}">>, - payload_template => <<"${payload}">>, - attributes_template => - [ - #{ - key => <<"${payload.attrs.k}">>, - value => <<"${payload.attrs.v}">> - } - ] + topic => <<"my-topic">>, + pull_max_messages => 100 + }, + resource_opts => + #{ + request_ttl => <<"45s">>, + health_check_interval => <<"30s">> } }. @@ -217,14 +238,18 @@ connector_example(post) -> } ); connector_example(put) -> - %% FIXME: revisit #{ enable => true, - connect_timeout => <<"10s">>, + description => <<"my connector">>, + connect_timeout => <<"15s">>, pool_size => 8, - pipelining => 100, + resource_opts => + #{ + start_after_created => true, + health_check_interval => <<"30s">>, + start_timeout => <<"5s">> + }, max_retries => 2, - resource_opts => #{request_ttl => <<"60s">>}, service_account_json => #{ auth_provider_x509_cert_url => @@ -249,5 +274,6 @@ connector_example(put) -> token_uri => <<"https://oauth2.googleapis.com/token">>, type => <<"service_account">> - } + }, + pipelining => 100 }. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 39e1c19d5..5161839f2 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -59,7 +59,7 @@ hookpoints := [binary()], connector_resource_id := binary(), source_resource_id := binary(), - mqtt_config := emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), + mqtt_config := #{} | emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(), pending_acks := #{message_id() => ack_id()}, project_id := emqx_bridge_gcp_pubsub_client:project_id(), pull_max_messages := non_neg_integer(), @@ -146,7 +146,7 @@ health_check(WorkerPid) -> end. %%------------------------------------------------------------------------------------------------- -%% `emqx_resource' API +%% `ecpool' API %%------------------------------------------------------------------------------------------------- connect(Opts0) -> @@ -741,11 +741,7 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess #{ source_resource_id := SourceResId, hookpoints := Hookpoints, - mqtt_config := #{ - payload_template := PayloadTemplate, - qos := MQTTQoS, - mqtt_topic := MQTTTopic - }, + mqtt_config := MQTTConfig, topic := Topic } = State, #{ @@ -769,10 +765,7 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess {<<"orderingKey">>, ordering_key} ] ), - %% TODO: this should be optional - Payload = render(FullMessage, PayloadTemplate), - MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload), - _ = emqx:publish(MQTTMessage), + legacy_maybe_publish_mqtt_message(MQTTConfig, SourceResId, FullMessage), lists:foreach( fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end, Hookpoints @@ -782,6 +775,22 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess end ). +legacy_maybe_publish_mqtt_message( + _MQTTConfig = #{ + payload_template := PayloadTemplate, + qos := MQTTQoS, + mqtt_topic := MQTTTopic + }, + SourceResId, + FullMessage +) -> + Payload = render(FullMessage, PayloadTemplate), + MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload), + _ = emqx:publish(MQTTMessage), + ok; +legacy_maybe_publish_mqtt_message(_MQTTConfig, _SourceResId, _FullMessage) -> + ok. + -spec add_if_present(any(), map(), any(), map()) -> map(). add_if_present(FromKey, Message, ToKey, Map) -> case maps:get(FromKey, Message, undefined) of diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 9da56c677..e0a261acc 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -62,7 +62,8 @@ consumer_workers_per_topic := pos_integer(), topic_mapping := [topic_mapping(), ...] }, - resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()} + resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, + topic := binary() }. -type source_state() :: #{}. @@ -151,7 +152,13 @@ on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) -> {ok, connector_state()}. on_remove_channel(_ConnectorResId, ConnectorState0, SourceResId) -> #{installed_sources := InstalledSources0} = ConnectorState0, - InstalledSources = maps:remove(SourceResId, InstalledSources0), + case maps:take(SourceResId, InstalledSources0) of + {SourceState, InstalledSources} -> + stop_consumers1(SourceState), + ok; + error -> + InstalledSources = InstalledSources0 + end, ConnectorState = ConnectorState0#{installed_sources := InstalledSources}, {ok, ConnectorState}. @@ -162,7 +169,11 @@ on_get_channels(ConnectorResId) -> -spec on_get_channel_status(connector_resource_id(), source_resource_id(), connector_state()) -> health_check_status(). -on_get_channel_status(_ConnectorResId, SourceResId, ConnectorState) -> +on_get_channel_status( + _ConnectorResId, + SourceResId, + ConnectorState = #{installed_sources := InstalledSources} +) when is_map_key(SourceResId, InstalledSources) -> %% We need to check this flag separately because the workers might be gone when we %% check them. case check_if_unhealthy(SourceResId) of @@ -174,8 +185,11 @@ on_get_channel_status(_ConnectorResId, SourceResId, ConnectorState) -> {?status_disconnected, {unhealthy_target, ?PERMISSION_MESSAGE}}; ok -> #{client := Client} = ConnectorState, - check_workers(SourceResId, Client) - end. + #{SourceResId := #{pool_name := PoolName}} = InstalledSources, + check_workers(PoolName, Client) + end; +on_get_channel_status(_ConnectorResId, _SourceResId, _ConnectorState) -> + ?status_disconnected. %%------------------------------------------------------------------------------------------------- %% Health check API (signalled by consumer worker) @@ -228,7 +242,7 @@ start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) -> hookpoints := Hookpoints, resource_opts := #{request_ttl := RequestTTL} } = SourceConfig, - ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), + ConsumerConfig1 = ensure_topic_mapping(ConsumerConfig0), TopicMapping = maps:get(topic_mapping, ConsumerConfig1), ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1), PoolSize = map_size(TopicMapping) * ConsumerWorkersPerTopic, @@ -272,10 +286,7 @@ start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) -> emqx_resource_pool:start(SourceResId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts) of ok -> - State = #{ - client => Client, - pool_name => SourceResId - }, + State = #{pool_name => SourceResId}, {ok, State}; {error, Reason} -> {error, Reason} @@ -284,20 +295,34 @@ start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) -> stop_consumers(ConnectorState) -> #{installed_sources := InstalledSources} = ConnectorState, maps:foreach( - fun(SourceResId, _SourceState) -> - _ = log_when_error( - fun() -> - ok = emqx_resource_pool:stop(SourceResId) - end, - #{ - msg => "failed_to_stop_pull_worker_pool", - instance_id => SourceResId - } - ) + fun(_SourceResId, SourceState) -> + stop_consumers1(SourceState) end, InstalledSources ). +stop_consumers1(SourceState) -> + #{pool_name := PoolName} = SourceState, + _ = log_when_error( + fun() -> + ok = emqx_resource_pool:stop(PoolName) + end, + #{ + msg => "failed_to_stop_pull_worker_pool", + pool_name => PoolName + } + ), + ok. + +%% This is to ensure backwards compatibility with the deprectated topic mapping. +ensure_topic_mapping(ConsumerConfig0 = #{topic_mapping := [_ | _]}) -> + %% There is an existing topic mapping: legacy config. We use it and ignore the single + %% pubsub topic so that the bridge keeps working as before. + maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0); +ensure_topic_mapping(ConsumerConfig0 = #{topic := PubsubTopic}) -> + %% No topic mapping: generate one without MQTT templates. + maps:put(topic_mapping, #{PubsubTopic => #{}}, ConsumerConfig0). + convert_topic_mapping(TopicMappingList) -> lists:foldl( fun(Fields, Acc) -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 78167d439..39056bd31 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -1436,23 +1436,37 @@ t_connection_down_before_starting(Config) -> ProxyPort = ?config(proxy_port, Config), ?check_trace( begin - emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch( - {{ok, _}, {ok, _}}, - ?wait_async_action( - create_bridge(Config), - #{?snk_kind := gcp_pubsub_consumer_worker_init}, - 10_000 - ) - ), - ?assertMatch({ok, disconnected}, health_check(Config)), - ok + ?force_ordering( + #{?snk_kind := gcp_pubsub_consumer_worker_about_to_spawn}, + #{?snk_kind := will_cut_connection} + ), + ?force_ordering( + #{?snk_kind := connection_down}, + #{?snk_kind := gcp_pubsub_consumer_worker_create_subscription_enter} + ), + spawn_link(fun() -> + ?tp(notice, will_cut_connection, #{}), + emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort), + ?tp(notice, connection_down, #{}) end), + %% check retries + {ok, SRef0} = + snabbkaffe:subscribe( + ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_error"}), + _NEvents0 = 2, + 10_000 + ), + {ok, _} = create_bridge(Config), + {ok, _} = snabbkaffe:receive_events(SRef0), + ?assertMatch({ok, connecting}, health_check(Config)), + + emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort), ?retry( _Interval0 = 200, _NAttempts0 = 20, ?assertMatch({ok, connected}, health_check(Config)) ), + ok end, [] diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_consumer_SUITE.erl new file mode 100644 index 000000000..8a4362bf3 --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_consumer_SUITE.erl @@ -0,0 +1,167 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_gcp_pubsub_consumer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_consumer">>). +-define(SOURCE_TYPE_BIN, <<"gcp_pubsub_consumer">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_bridge_gcp_pubsub_consumer_SUITE:init_per_suite(Config). + +end_per_suite(Config) -> + emqx_bridge_gcp_pubsub_consumer_SUITE:end_per_suite(Config). + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +common_init_per_testcase(TestCase, Config0) -> + ct:timetrap(timer:seconds(60)), + ServiceAccountJSON = + #{<<"project_id">> := ProjectId} = + emqx_bridge_gcp_pubsub_utils:generate_service_account_json(), + Name = atom_to_binary(TestCase), + ConnectorConfig = connector_config(Name, ServiceAccountJSON), + PubsubTopic = Name, + SourceConfig = source_config(#{ + connector => Name, + parameters => #{topic => PubsubTopic} + }), + Config = [ + {bridge_kind, source}, + {source_type, ?SOURCE_TYPE_BIN}, + {source_name, Name}, + {source_config, SourceConfig}, + {connector_name, Name}, + {connector_type, ?CONNECTOR_TYPE_BIN}, + {connector_config, ConnectorConfig}, + {service_account_json, ServiceAccountJSON}, + {project_id, ProjectId}, + {pubsub_topic, PubsubTopic} + | Config0 + ], + ok = emqx_bridge_gcp_pubsub_consumer_SUITE:ensure_topic(Config, PubsubTopic), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +connector_config(Name, ServiceAccountJSON) -> + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"tags">> => [<<"bridge">>], + <<"description">> => <<"my cool bridge">>, + <<"connect_timeout">> => <<"5s">>, + <<"pool_size">> => 8, + <<"pipelining">> => <<"100">>, + <<"max_retries">> => <<"2">>, + <<"service_account_json">> => ServiceAccountJSON, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"1s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }, + emqx_bridge_v2_testlib:parse_and_check_connector(?SOURCE_TYPE_BIN, Name, InnerConfigMap0). + +source_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + CommonConfig = + #{ + <<"enable">> => true, + <<"connector">> => <<"please override">>, + <<"parameters">> => + #{ + <<"topic">> => <<"my-topic">> + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">>, + <<"request_ttl">> => <<"1s">>, + <<"resume_interval">> => <<"1s">> + } + }, + maps:merge(CommonConfig, Overrides). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_stop(Config) -> + ok = emqx_bridge_v2_testlib:t_start_stop(Config, gcp_pubsub_stop), + ok. + +t_consume(Config) -> + Topic = ?config(pubsub_topic, Config), + Payload = #{<<"key">> => <<"value">>}, + Attributes = #{<<"hkey">> => <<"hval">>}, + ProduceFn = fun() -> + emqx_bridge_gcp_pubsub_consumer_SUITE:pubsub_publish( + Config, + Topic, + [ + #{ + <<"data">> => Payload, + <<"orderingKey">> => <<"ok">>, + <<"attributes">> => Attributes + } + ] + ) + end, + Encoded = emqx_utils_json:encode(Payload), + CheckFn = fun(Message) -> + ?assertMatch( + #{ + attributes := Attributes, + message_id := _, + ordering_key := <<"ok">>, + publish_time := _, + topic := Topic, + value := Encoded + }, + Message + ) + end, + ok = emqx_bridge_v2_testlib:t_consume( + Config, + #{ + consumer_ready_tracepoint => ?match_event( + #{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"} + ), + produce_fn => ProduceFn, + check_fn => CheckFn, + produce_tracepoint => ?match_event( + #{ + ?snk_kind := "gcp_pubsub_consumer_worker_handle_message", + ?snk_span := {complete, _} + } + ) + } + ), + ok.