feat: convert `gcp_pubsub_consumer` to connector/source
Fixes https://emqx.atlassian.net/browse/EMQX-11471
This commit is contained in:
parent
11e4a295ed
commit
146d89bd89
|
@ -94,6 +94,7 @@
|
||||||
id/2,
|
id/2,
|
||||||
id/3,
|
id/3,
|
||||||
source_id/3,
|
source_id/3,
|
||||||
|
source_hookpoint/1,
|
||||||
bridge_v1_is_valid/2,
|
bridge_v1_is_valid/2,
|
||||||
bridge_v1_is_valid/3,
|
bridge_v1_is_valid/3,
|
||||||
extract_connector_id_from_bridge_v2_id/1
|
extract_connector_id_from_bridge_v2_id/1
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
|
@ -120,13 +121,29 @@ delete_all_connectors() ->
|
||||||
|
|
||||||
%% test helpers
|
%% test helpers
|
||||||
parse_and_check(Type, Name, InnerConfigMap0) ->
|
parse_and_check(Type, Name, InnerConfigMap0) ->
|
||||||
|
parse_and_check(action, Type, Name, InnerConfigMap0).
|
||||||
|
|
||||||
|
parse_and_check(Kind, Type, Name, InnerConfigMap0) ->
|
||||||
|
RootBin =
|
||||||
|
case Kind of
|
||||||
|
action -> <<"actions">>;
|
||||||
|
source -> <<"sources">>
|
||||||
|
end,
|
||||||
TypeBin = emqx_utils_conv:bin(Type),
|
TypeBin = emqx_utils_conv:bin(Type),
|
||||||
RawConf = #{<<"actions">> => #{TypeBin => #{Name => InnerConfigMap0}}},
|
RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}},
|
||||||
#{<<"actions">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
|
#{RootBin := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
|
||||||
emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}
|
emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}
|
||||||
),
|
),
|
||||||
InnerConfigMap.
|
InnerConfigMap.
|
||||||
|
|
||||||
|
parse_and_check_connector(Type, Name, InnerConfigMap0) ->
|
||||||
|
TypeBin = emqx_utils_conv:bin(Type),
|
||||||
|
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}},
|
||||||
|
#{<<"connectors">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
|
||||||
|
emqx_connector_schema, RawConf, #{required => false, atom_key => false}
|
||||||
|
),
|
||||||
|
InnerConfigMap.
|
||||||
|
|
||||||
bridge_id(Config) ->
|
bridge_id(Config) ->
|
||||||
BridgeType = ?config(bridge_type, Config),
|
BridgeType = ?config(bridge_type, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
|
@ -134,10 +151,30 @@ bridge_id(Config) ->
|
||||||
ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||||
<<"action:", BridgeId/binary, ":", ConnectorId/binary>>.
|
<<"action:", BridgeId/binary, ":", ConnectorId/binary>>.
|
||||||
|
|
||||||
|
source_hookpoint(Config) ->
|
||||||
|
#{kind := source, type := Type, name := Name} = get_common_values(Config),
|
||||||
|
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
|
||||||
|
emqx_bridge_v2:source_hookpoint(BridgeId).
|
||||||
|
|
||||||
|
add_source_hookpoint(Config) ->
|
||||||
|
Hookpoint = source_hookpoint(Config),
|
||||||
|
ok = emqx_hooks:add(Hookpoint, {?MODULE, source_hookpoint_callback, [self()]}, 1000),
|
||||||
|
on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, source_hookpoint_callback}) end),
|
||||||
|
ok.
|
||||||
|
|
||||||
resource_id(Config) ->
|
resource_id(Config) ->
|
||||||
BridgeType = ?config(bridge_type, Config),
|
#{
|
||||||
BridgeName = ?config(bridge_name, Config),
|
kind := Kind,
|
||||||
emqx_bridge_resource:resource_id(BridgeType, BridgeName).
|
type := Type,
|
||||||
|
name := Name,
|
||||||
|
connector_name := ConnectorName
|
||||||
|
} = get_common_values(Config),
|
||||||
|
case Kind of
|
||||||
|
source ->
|
||||||
|
emqx_bridge_v2:source_id(Type, Name, ConnectorName);
|
||||||
|
action ->
|
||||||
|
emqx_bridge_resource:resource_id(Type, Name)
|
||||||
|
end.
|
||||||
|
|
||||||
create_bridge(Config) ->
|
create_bridge(Config) ->
|
||||||
create_bridge(Config, _Overrides = #{}).
|
create_bridge(Config, _Overrides = #{}).
|
||||||
|
@ -506,6 +543,54 @@ bridges_api_spec_schemas() ->
|
||||||
actions_api_spec_schemas() ->
|
actions_api_spec_schemas() ->
|
||||||
api_spec_schemas("actions").
|
api_spec_schemas("actions").
|
||||||
|
|
||||||
|
get_value(Key, Config) ->
|
||||||
|
case proplists:get_value(Key, Config, undefined) of
|
||||||
|
undefined ->
|
||||||
|
error({missing_required_config, Key, Config});
|
||||||
|
Value ->
|
||||||
|
Value
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_common_values(Config) ->
|
||||||
|
Kind = proplists:get_value(bridge_kind, Config, action),
|
||||||
|
case Kind of
|
||||||
|
action ->
|
||||||
|
#{
|
||||||
|
conf_root_key => actions,
|
||||||
|
kind => Kind,
|
||||||
|
type => get_ct_config_with_fallback(Config, [action_type, bridge_type]),
|
||||||
|
name => get_ct_config_with_fallback(Config, [action_name, bridge_name]),
|
||||||
|
connector_type => get_value(connector_type, Config),
|
||||||
|
connector_name => get_value(connector_name, Config)
|
||||||
|
};
|
||||||
|
source ->
|
||||||
|
#{
|
||||||
|
conf_root_key => sources,
|
||||||
|
kind => Kind,
|
||||||
|
type => get_value(source_type, Config),
|
||||||
|
name => get_value(source_name, Config),
|
||||||
|
connector_type => get_value(connector_type, Config),
|
||||||
|
connector_name => get_value(connector_name, Config)
|
||||||
|
}
|
||||||
|
end.
|
||||||
|
|
||||||
|
connector_resource_id(Config) ->
|
||||||
|
#{connector_type := Type, connector_name := Name} = get_common_values(Config),
|
||||||
|
emqx_connector_resource:resource_id(Type, Name).
|
||||||
|
|
||||||
|
health_check_channel(Config) ->
|
||||||
|
ConnectorResId = connector_resource_id(Config),
|
||||||
|
ChannelResId = resource_id(Config),
|
||||||
|
emqx_resource_manager:channel_health_check(ConnectorResId, ChannelResId).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Internal export
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
source_hookpoint_callback(Message, TestPid) ->
|
||||||
|
TestPid ! {consumed_message, Message},
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -574,6 +659,55 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% - `ProduceFn': produces a message in the remote system that shall be consumed.
|
||||||
|
%% - `Tracepoint': marks the end of consumed message processing.
|
||||||
|
t_consume(Config, Opts) ->
|
||||||
|
#{
|
||||||
|
consumer_ready_tracepoint := ConsumerReadyTPFn,
|
||||||
|
produce_fn := ProduceFn,
|
||||||
|
check_fn := CheckFn,
|
||||||
|
produce_tracepoint := TracePointFn
|
||||||
|
} = Opts,
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?assertMatch(
|
||||||
|
{{ok, _}, {ok, _}},
|
||||||
|
snabbkaffe:wait_async_action(
|
||||||
|
fun() -> create_bridge_api(Config) end,
|
||||||
|
ConsumerReadyTPFn,
|
||||||
|
15_000
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok = add_source_hookpoint(Config),
|
||||||
|
ResourceId = resource_id(Config),
|
||||||
|
?retry(
|
||||||
|
_Sleep = 200,
|
||||||
|
_Attempts = 20,
|
||||||
|
?assertMatch(
|
||||||
|
#{status := ?status_connected},
|
||||||
|
health_check_channel(Config)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{_, {ok, _}},
|
||||||
|
snabbkaffe:wait_async_action(
|
||||||
|
ProduceFn,
|
||||||
|
TracePointFn,
|
||||||
|
15_000
|
||||||
|
)
|
||||||
|
),
|
||||||
|
receive
|
||||||
|
{consumed_message, Message} ->
|
||||||
|
CheckFn(Message)
|
||||||
|
after 5_000 ->
|
||||||
|
error({timeout, process_info(self(), messages)})
|
||||||
|
end,
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_create_via_http(Config) ->
|
t_create_via_http(Config) ->
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
|
@ -608,13 +742,15 @@ t_start_stop(Config, StopTracePoint) ->
|
||||||
|
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
ProbeRes0 = probe_bridge_api(
|
?assertMatch(
|
||||||
Kind,
|
{ok, {{_, 204, _}, _Headers, _Body}},
|
||||||
Type,
|
probe_bridge_api(
|
||||||
Name,
|
Kind,
|
||||||
BridgeConfig
|
Type,
|
||||||
|
Name,
|
||||||
|
BridgeConfig
|
||||||
|
)
|
||||||
),
|
),
|
||||||
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
|
|
||||||
%% Check that the bridge probe API doesn't leak atoms.
|
%% Check that the bridge probe API doesn't leak atoms.
|
||||||
AtomsBefore = erlang:system_info(atom_count),
|
AtomsBefore = erlang:system_info(atom_count),
|
||||||
%% Probe again; shouldn't have created more atoms.
|
%% Probe again; shouldn't have created more atoms.
|
||||||
|
|
|
@ -34,7 +34,8 @@ bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||||
ParamsKeys = source_action_parameters_field_keys(),
|
ParamsKeys = source_action_parameters_field_keys(),
|
||||||
Config1 = maps:with(CommonSourceKeys, BridgeV1Config),
|
Config1 = maps:with(CommonSourceKeys, BridgeV1Config),
|
||||||
ConsumerCfg = maps:get(<<"consumer">>, BridgeV1Config, #{}),
|
ConsumerCfg = maps:get(<<"consumer">>, BridgeV1Config, #{}),
|
||||||
Params = maps:with(ParamsKeys, ConsumerCfg),
|
Params0 = maps:with(ParamsKeys, ConsumerCfg),
|
||||||
|
Params = maybe_set_pubsub_topic(Params0),
|
||||||
{source, gcp_pubsub_consumer,
|
{source, gcp_pubsub_consumer,
|
||||||
emqx_utils_maps:update_if_present(
|
emqx_utils_maps:update_if_present(
|
||||||
<<"resource_opts">>,
|
<<"resource_opts">>,
|
||||||
|
@ -59,12 +60,20 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, SourceConfig) ->
|
||||||
fun(RO) -> maps:with(bridge_v1_resource_opts_fields(), RO) end,
|
fun(RO) -> maps:with(bridge_v1_resource_opts_fields(), RO) end,
|
||||||
BridgeV1Config2
|
BridgeV1Config2
|
||||||
),
|
),
|
||||||
emqx_utils_maps:rename(<<"parameters">>, <<"consumer">>, BridgeV1Config3).
|
BridgeV1Config4 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"topic">>], BridgeV1Config3),
|
||||||
|
emqx_utils_maps:rename(<<"parameters">>, <<"consumer">>, BridgeV1Config4).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------------------
|
||||||
%% Internal helper fns
|
%% Internal helper fns
|
||||||
%%------------------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% The new schema has a single pubsub topic, so we take it from topic mapping when
|
||||||
|
%% converting from v1.
|
||||||
|
maybe_set_pubsub_topic(#{<<"topic_mapping">> := [#{<<"pubsub_topic">> := Topic} | _]} = Params) ->
|
||||||
|
Params#{<<"topic">> => Topic};
|
||||||
|
maybe_set_pubsub_topic(Params) ->
|
||||||
|
Params.
|
||||||
|
|
||||||
resource_opts_fields() ->
|
resource_opts_fields() ->
|
||||||
[
|
[
|
||||||
to_bin(K)
|
to_bin(K)
|
||||||
|
|
|
@ -61,8 +61,35 @@ fields(consumer_source) ->
|
||||||
#{resource_opts_ref => ref(?MODULE, source_resource_opts)}
|
#{resource_opts_ref => ref(?MODULE, source_resource_opts)}
|
||||||
);
|
);
|
||||||
fields(source_parameters) ->
|
fields(source_parameters) ->
|
||||||
%% FIXME: check
|
Fields0 = emqx_bridge_gcp_pubsub:fields(consumer),
|
||||||
emqx_bridge_gcp_pubsub:fields(consumer);
|
Fields = lists:map(
|
||||||
|
fun
|
||||||
|
({topic_mapping = Name, Sc}) ->
|
||||||
|
%% to please dialyzer...
|
||||||
|
Override = #{
|
||||||
|
type => hocon_schema:field_schema(Sc, type),
|
||||||
|
required => false,
|
||||||
|
default => [],
|
||||||
|
validator => fun(_) -> ok end,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
},
|
||||||
|
{Name, hocon_schema:override(Sc, Override)};
|
||||||
|
(FieldSchema) ->
|
||||||
|
FieldSchema
|
||||||
|
end,
|
||||||
|
Fields0
|
||||||
|
),
|
||||||
|
[
|
||||||
|
{topic,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC(emqx_bridge_gcp_pubsub, "pubsub_topic")
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
| Fields
|
||||||
|
];
|
||||||
fields(source_resource_opts) ->
|
fields(source_resource_opts) ->
|
||||||
Fields = [
|
Fields = [
|
||||||
health_check_interval,
|
health_check_interval,
|
||||||
|
@ -174,23 +201,17 @@ source_example(get) ->
|
||||||
source_example(put) ->
|
source_example(put) ->
|
||||||
#{
|
#{
|
||||||
enable => true,
|
enable => true,
|
||||||
connector => <<"my_connector_name">>,
|
description => <<"my source">>,
|
||||||
description => <<"My action">>,
|
connector => <<"my_connector">>,
|
||||||
local_topic => <<"local/topic">>,
|
|
||||||
resource_opts =>
|
|
||||||
#{batch_size => 5},
|
|
||||||
parameters =>
|
parameters =>
|
||||||
#{
|
#{
|
||||||
pubsub_topic => <<"mytopic">>,
|
topic => <<"my-topic">>,
|
||||||
ordering_key_template => <<"${payload.ok}">>,
|
pull_max_messages => 100
|
||||||
payload_template => <<"${payload}">>,
|
},
|
||||||
attributes_template =>
|
resource_opts =>
|
||||||
[
|
#{
|
||||||
#{
|
request_ttl => <<"45s">>,
|
||||||
key => <<"${payload.attrs.k}">>,
|
health_check_interval => <<"30s">>
|
||||||
value => <<"${payload.attrs.v}">>
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -217,14 +238,18 @@ connector_example(post) ->
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
connector_example(put) ->
|
connector_example(put) ->
|
||||||
%% FIXME: revisit
|
|
||||||
#{
|
#{
|
||||||
enable => true,
|
enable => true,
|
||||||
connect_timeout => <<"10s">>,
|
description => <<"my connector">>,
|
||||||
|
connect_timeout => <<"15s">>,
|
||||||
pool_size => 8,
|
pool_size => 8,
|
||||||
pipelining => 100,
|
resource_opts =>
|
||||||
|
#{
|
||||||
|
start_after_created => true,
|
||||||
|
health_check_interval => <<"30s">>,
|
||||||
|
start_timeout => <<"5s">>
|
||||||
|
},
|
||||||
max_retries => 2,
|
max_retries => 2,
|
||||||
resource_opts => #{request_ttl => <<"60s">>},
|
|
||||||
service_account_json =>
|
service_account_json =>
|
||||||
#{
|
#{
|
||||||
auth_provider_x509_cert_url =>
|
auth_provider_x509_cert_url =>
|
||||||
|
@ -249,5 +274,6 @@ connector_example(put) ->
|
||||||
token_uri =>
|
token_uri =>
|
||||||
<<"https://oauth2.googleapis.com/token">>,
|
<<"https://oauth2.googleapis.com/token">>,
|
||||||
type => <<"service_account">>
|
type => <<"service_account">>
|
||||||
}
|
},
|
||||||
|
pipelining => 100
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -59,7 +59,7 @@
|
||||||
hookpoints := [binary()],
|
hookpoints := [binary()],
|
||||||
connector_resource_id := binary(),
|
connector_resource_id := binary(),
|
||||||
source_resource_id := binary(),
|
source_resource_id := binary(),
|
||||||
mqtt_config := emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
|
mqtt_config := #{} | emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
|
||||||
pending_acks := #{message_id() => ack_id()},
|
pending_acks := #{message_id() => ack_id()},
|
||||||
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
|
project_id := emqx_bridge_gcp_pubsub_client:project_id(),
|
||||||
pull_max_messages := non_neg_integer(),
|
pull_max_messages := non_neg_integer(),
|
||||||
|
@ -146,7 +146,7 @@ health_check(WorkerPid) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
%% `emqx_resource' API
|
%% `ecpool' API
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
connect(Opts0) ->
|
connect(Opts0) ->
|
||||||
|
@ -741,11 +741,7 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess
|
||||||
#{
|
#{
|
||||||
source_resource_id := SourceResId,
|
source_resource_id := SourceResId,
|
||||||
hookpoints := Hookpoints,
|
hookpoints := Hookpoints,
|
||||||
mqtt_config := #{
|
mqtt_config := MQTTConfig,
|
||||||
payload_template := PayloadTemplate,
|
|
||||||
qos := MQTTQoS,
|
|
||||||
mqtt_topic := MQTTTopic
|
|
||||||
},
|
|
||||||
topic := Topic
|
topic := Topic
|
||||||
} = State,
|
} = State,
|
||||||
#{
|
#{
|
||||||
|
@ -769,10 +765,7 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess
|
||||||
{<<"orderingKey">>, ordering_key}
|
{<<"orderingKey">>, ordering_key}
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
%% TODO: this should be optional
|
legacy_maybe_publish_mqtt_message(MQTTConfig, SourceResId, FullMessage),
|
||||||
Payload = render(FullMessage, PayloadTemplate),
|
|
||||||
MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
|
|
||||||
_ = emqx:publish(MQTTMessage),
|
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end,
|
fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end,
|
||||||
Hookpoints
|
Hookpoints
|
||||||
|
@ -782,6 +775,22 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
legacy_maybe_publish_mqtt_message(
|
||||||
|
_MQTTConfig = #{
|
||||||
|
payload_template := PayloadTemplate,
|
||||||
|
qos := MQTTQoS,
|
||||||
|
mqtt_topic := MQTTTopic
|
||||||
|
},
|
||||||
|
SourceResId,
|
||||||
|
FullMessage
|
||||||
|
) ->
|
||||||
|
Payload = render(FullMessage, PayloadTemplate),
|
||||||
|
MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
|
||||||
|
_ = emqx:publish(MQTTMessage),
|
||||||
|
ok;
|
||||||
|
legacy_maybe_publish_mqtt_message(_MQTTConfig, _SourceResId, _FullMessage) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
-spec add_if_present(any(), map(), any(), map()) -> map().
|
-spec add_if_present(any(), map(), any(), map()) -> map().
|
||||||
add_if_present(FromKey, Message, ToKey, Map) ->
|
add_if_present(FromKey, Message, ToKey, Map) ->
|
||||||
case maps:get(FromKey, Message, undefined) of
|
case maps:get(FromKey, Message, undefined) of
|
||||||
|
|
|
@ -62,7 +62,8 @@
|
||||||
consumer_workers_per_topic := pos_integer(),
|
consumer_workers_per_topic := pos_integer(),
|
||||||
topic_mapping := [topic_mapping(), ...]
|
topic_mapping := [topic_mapping(), ...]
|
||||||
},
|
},
|
||||||
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}
|
resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()},
|
||||||
|
topic := binary()
|
||||||
}.
|
}.
|
||||||
-type source_state() :: #{}.
|
-type source_state() :: #{}.
|
||||||
|
|
||||||
|
@ -151,7 +152,13 @@ on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) ->
|
||||||
{ok, connector_state()}.
|
{ok, connector_state()}.
|
||||||
on_remove_channel(_ConnectorResId, ConnectorState0, SourceResId) ->
|
on_remove_channel(_ConnectorResId, ConnectorState0, SourceResId) ->
|
||||||
#{installed_sources := InstalledSources0} = ConnectorState0,
|
#{installed_sources := InstalledSources0} = ConnectorState0,
|
||||||
InstalledSources = maps:remove(SourceResId, InstalledSources0),
|
case maps:take(SourceResId, InstalledSources0) of
|
||||||
|
{SourceState, InstalledSources} ->
|
||||||
|
stop_consumers1(SourceState),
|
||||||
|
ok;
|
||||||
|
error ->
|
||||||
|
InstalledSources = InstalledSources0
|
||||||
|
end,
|
||||||
ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
|
ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
|
||||||
{ok, ConnectorState}.
|
{ok, ConnectorState}.
|
||||||
|
|
||||||
|
@ -162,7 +169,11 @@ on_get_channels(ConnectorResId) ->
|
||||||
|
|
||||||
-spec on_get_channel_status(connector_resource_id(), source_resource_id(), connector_state()) ->
|
-spec on_get_channel_status(connector_resource_id(), source_resource_id(), connector_state()) ->
|
||||||
health_check_status().
|
health_check_status().
|
||||||
on_get_channel_status(_ConnectorResId, SourceResId, ConnectorState) ->
|
on_get_channel_status(
|
||||||
|
_ConnectorResId,
|
||||||
|
SourceResId,
|
||||||
|
ConnectorState = #{installed_sources := InstalledSources}
|
||||||
|
) when is_map_key(SourceResId, InstalledSources) ->
|
||||||
%% We need to check this flag separately because the workers might be gone when we
|
%% We need to check this flag separately because the workers might be gone when we
|
||||||
%% check them.
|
%% check them.
|
||||||
case check_if_unhealthy(SourceResId) of
|
case check_if_unhealthy(SourceResId) of
|
||||||
|
@ -174,8 +185,11 @@ on_get_channel_status(_ConnectorResId, SourceResId, ConnectorState) ->
|
||||||
{?status_disconnected, {unhealthy_target, ?PERMISSION_MESSAGE}};
|
{?status_disconnected, {unhealthy_target, ?PERMISSION_MESSAGE}};
|
||||||
ok ->
|
ok ->
|
||||||
#{client := Client} = ConnectorState,
|
#{client := Client} = ConnectorState,
|
||||||
check_workers(SourceResId, Client)
|
#{SourceResId := #{pool_name := PoolName}} = InstalledSources,
|
||||||
end.
|
check_workers(PoolName, Client)
|
||||||
|
end;
|
||||||
|
on_get_channel_status(_ConnectorResId, _SourceResId, _ConnectorState) ->
|
||||||
|
?status_disconnected.
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------------------
|
||||||
%% Health check API (signalled by consumer worker)
|
%% Health check API (signalled by consumer worker)
|
||||||
|
@ -228,7 +242,7 @@ start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) ->
|
||||||
hookpoints := Hookpoints,
|
hookpoints := Hookpoints,
|
||||||
resource_opts := #{request_ttl := RequestTTL}
|
resource_opts := #{request_ttl := RequestTTL}
|
||||||
} = SourceConfig,
|
} = SourceConfig,
|
||||||
ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
|
ConsumerConfig1 = ensure_topic_mapping(ConsumerConfig0),
|
||||||
TopicMapping = maps:get(topic_mapping, ConsumerConfig1),
|
TopicMapping = maps:get(topic_mapping, ConsumerConfig1),
|
||||||
ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1),
|
ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1),
|
||||||
PoolSize = map_size(TopicMapping) * ConsumerWorkersPerTopic,
|
PoolSize = map_size(TopicMapping) * ConsumerWorkersPerTopic,
|
||||||
|
@ -272,10 +286,7 @@ start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) ->
|
||||||
emqx_resource_pool:start(SourceResId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts)
|
emqx_resource_pool:start(SourceResId, emqx_bridge_gcp_pubsub_consumer_worker, ConsumerOpts)
|
||||||
of
|
of
|
||||||
ok ->
|
ok ->
|
||||||
State = #{
|
State = #{pool_name => SourceResId},
|
||||||
client => Client,
|
|
||||||
pool_name => SourceResId
|
|
||||||
},
|
|
||||||
{ok, State};
|
{ok, State};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
|
@ -284,20 +295,34 @@ start_consumers(ConnectorResId, SourceResId, Client, ProjectId, SourceConfig) ->
|
||||||
stop_consumers(ConnectorState) ->
|
stop_consumers(ConnectorState) ->
|
||||||
#{installed_sources := InstalledSources} = ConnectorState,
|
#{installed_sources := InstalledSources} = ConnectorState,
|
||||||
maps:foreach(
|
maps:foreach(
|
||||||
fun(SourceResId, _SourceState) ->
|
fun(_SourceResId, SourceState) ->
|
||||||
_ = log_when_error(
|
stop_consumers1(SourceState)
|
||||||
fun() ->
|
|
||||||
ok = emqx_resource_pool:stop(SourceResId)
|
|
||||||
end,
|
|
||||||
#{
|
|
||||||
msg => "failed_to_stop_pull_worker_pool",
|
|
||||||
instance_id => SourceResId
|
|
||||||
}
|
|
||||||
)
|
|
||||||
end,
|
end,
|
||||||
InstalledSources
|
InstalledSources
|
||||||
).
|
).
|
||||||
|
|
||||||
|
stop_consumers1(SourceState) ->
|
||||||
|
#{pool_name := PoolName} = SourceState,
|
||||||
|
_ = log_when_error(
|
||||||
|
fun() ->
|
||||||
|
ok = emqx_resource_pool:stop(PoolName)
|
||||||
|
end,
|
||||||
|
#{
|
||||||
|
msg => "failed_to_stop_pull_worker_pool",
|
||||||
|
pool_name => PoolName
|
||||||
|
}
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% This is to ensure backwards compatibility with the deprectated topic mapping.
|
||||||
|
ensure_topic_mapping(ConsumerConfig0 = #{topic_mapping := [_ | _]}) ->
|
||||||
|
%% There is an existing topic mapping: legacy config. We use it and ignore the single
|
||||||
|
%% pubsub topic so that the bridge keeps working as before.
|
||||||
|
maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0);
|
||||||
|
ensure_topic_mapping(ConsumerConfig0 = #{topic := PubsubTopic}) ->
|
||||||
|
%% No topic mapping: generate one without MQTT templates.
|
||||||
|
maps:put(topic_mapping, #{PubsubTopic => #{}}, ConsumerConfig0).
|
||||||
|
|
||||||
convert_topic_mapping(TopicMappingList) ->
|
convert_topic_mapping(TopicMappingList) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(Fields, Acc) ->
|
fun(Fields, Acc) ->
|
||||||
|
|
|
@ -1436,23 +1436,37 @@ t_connection_down_before_starting(Config) ->
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
?force_ordering(
|
||||||
?assertMatch(
|
#{?snk_kind := gcp_pubsub_consumer_worker_about_to_spawn},
|
||||||
{{ok, _}, {ok, _}},
|
#{?snk_kind := will_cut_connection}
|
||||||
?wait_async_action(
|
),
|
||||||
create_bridge(Config),
|
?force_ordering(
|
||||||
#{?snk_kind := gcp_pubsub_consumer_worker_init},
|
#{?snk_kind := connection_down},
|
||||||
10_000
|
#{?snk_kind := gcp_pubsub_consumer_worker_create_subscription_enter}
|
||||||
)
|
),
|
||||||
),
|
spawn_link(fun() ->
|
||||||
?assertMatch({ok, disconnected}, health_check(Config)),
|
?tp(notice, will_cut_connection, #{}),
|
||||||
ok
|
emqx_common_test_helpers:enable_failure(down, ProxyName, ProxyHost, ProxyPort),
|
||||||
|
?tp(notice, connection_down, #{})
|
||||||
end),
|
end),
|
||||||
|
%% check retries
|
||||||
|
{ok, SRef0} =
|
||||||
|
snabbkaffe:subscribe(
|
||||||
|
?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_error"}),
|
||||||
|
_NEvents0 = 2,
|
||||||
|
10_000
|
||||||
|
),
|
||||||
|
{ok, _} = create_bridge(Config),
|
||||||
|
{ok, _} = snabbkaffe:receive_events(SRef0),
|
||||||
|
?assertMatch({ok, connecting}, health_check(Config)),
|
||||||
|
|
||||||
|
emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort),
|
||||||
?retry(
|
?retry(
|
||||||
_Interval0 = 200,
|
_Interval0 = 200,
|
||||||
_NAttempts0 = 20,
|
_NAttempts0 = 20,
|
||||||
?assertMatch({ok, connected}, health_check(Config))
|
?assertMatch({ok, connected}, health_check(Config))
|
||||||
),
|
),
|
||||||
|
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
|
|
|
@ -0,0 +1,167 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_v2_gcp_pubsub_consumer_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_consumer">>).
|
||||||
|
-define(SOURCE_TYPE_BIN, <<"gcp_pubsub_consumer">>).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% CT boilerplate
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_bridge_gcp_pubsub_consumer_SUITE:init_per_suite(Config).
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
emqx_bridge_gcp_pubsub_consumer_SUITE:end_per_suite(Config).
|
||||||
|
|
||||||
|
init_per_testcase(TestCase, Config) ->
|
||||||
|
common_init_per_testcase(TestCase, Config).
|
||||||
|
|
||||||
|
common_init_per_testcase(TestCase, Config0) ->
|
||||||
|
ct:timetrap(timer:seconds(60)),
|
||||||
|
ServiceAccountJSON =
|
||||||
|
#{<<"project_id">> := ProjectId} =
|
||||||
|
emqx_bridge_gcp_pubsub_utils:generate_service_account_json(),
|
||||||
|
Name = atom_to_binary(TestCase),
|
||||||
|
ConnectorConfig = connector_config(Name, ServiceAccountJSON),
|
||||||
|
PubsubTopic = Name,
|
||||||
|
SourceConfig = source_config(#{
|
||||||
|
connector => Name,
|
||||||
|
parameters => #{topic => PubsubTopic}
|
||||||
|
}),
|
||||||
|
Config = [
|
||||||
|
{bridge_kind, source},
|
||||||
|
{source_type, ?SOURCE_TYPE_BIN},
|
||||||
|
{source_name, Name},
|
||||||
|
{source_config, SourceConfig},
|
||||||
|
{connector_name, Name},
|
||||||
|
{connector_type, ?CONNECTOR_TYPE_BIN},
|
||||||
|
{connector_config, ConnectorConfig},
|
||||||
|
{service_account_json, ServiceAccountJSON},
|
||||||
|
{project_id, ProjectId},
|
||||||
|
{pubsub_topic, PubsubTopic}
|
||||||
|
| Config0
|
||||||
|
],
|
||||||
|
ok = emqx_bridge_gcp_pubsub_consumer_SUITE:ensure_topic(Config, PubsubTopic),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_Testcase, Config) ->
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
||||||
|
emqx_common_test_helpers:call_janitor(60_000),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Helper fns
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
connector_config(Name, ServiceAccountJSON) ->
|
||||||
|
InnerConfigMap0 =
|
||||||
|
#{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"tags">> => [<<"bridge">>],
|
||||||
|
<<"description">> => <<"my cool bridge">>,
|
||||||
|
<<"connect_timeout">> => <<"5s">>,
|
||||||
|
<<"pool_size">> => 8,
|
||||||
|
<<"pipelining">> => <<"100">>,
|
||||||
|
<<"max_retries">> => <<"2">>,
|
||||||
|
<<"service_account_json">> => ServiceAccountJSON,
|
||||||
|
<<"resource_opts">> =>
|
||||||
|
#{
|
||||||
|
<<"health_check_interval">> => <<"1s">>,
|
||||||
|
<<"start_after_created">> => true,
|
||||||
|
<<"start_timeout">> => <<"5s">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_bridge_v2_testlib:parse_and_check_connector(?SOURCE_TYPE_BIN, Name, InnerConfigMap0).
|
||||||
|
|
||||||
|
source_config(Overrides0) ->
|
||||||
|
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
|
||||||
|
CommonConfig =
|
||||||
|
#{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"connector">> => <<"please override">>,
|
||||||
|
<<"parameters">> =>
|
||||||
|
#{
|
||||||
|
<<"topic">> => <<"my-topic">>
|
||||||
|
},
|
||||||
|
<<"resource_opts">> => #{
|
||||||
|
<<"health_check_interval">> => <<"1s">>,
|
||||||
|
<<"request_ttl">> => <<"1s">>,
|
||||||
|
<<"resume_interval">> => <<"1s">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
maps:merge(CommonConfig, Overrides).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Testcases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_start_stop(Config) ->
|
||||||
|
ok = emqx_bridge_v2_testlib:t_start_stop(Config, gcp_pubsub_stop),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_consume(Config) ->
|
||||||
|
Topic = ?config(pubsub_topic, Config),
|
||||||
|
Payload = #{<<"key">> => <<"value">>},
|
||||||
|
Attributes = #{<<"hkey">> => <<"hval">>},
|
||||||
|
ProduceFn = fun() ->
|
||||||
|
emqx_bridge_gcp_pubsub_consumer_SUITE:pubsub_publish(
|
||||||
|
Config,
|
||||||
|
Topic,
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"data">> => Payload,
|
||||||
|
<<"orderingKey">> => <<"ok">>,
|
||||||
|
<<"attributes">> => Attributes
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
Encoded = emqx_utils_json:encode(Payload),
|
||||||
|
CheckFn = fun(Message) ->
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
attributes := Attributes,
|
||||||
|
message_id := _,
|
||||||
|
ordering_key := <<"ok">>,
|
||||||
|
publish_time := _,
|
||||||
|
topic := Topic,
|
||||||
|
value := Encoded
|
||||||
|
},
|
||||||
|
Message
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
ok = emqx_bridge_v2_testlib:t_consume(
|
||||||
|
Config,
|
||||||
|
#{
|
||||||
|
consumer_ready_tracepoint => ?match_event(
|
||||||
|
#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}
|
||||||
|
),
|
||||||
|
produce_fn => ProduceFn,
|
||||||
|
check_fn => CheckFn,
|
||||||
|
produce_tracepoint => ?match_event(
|
||||||
|
#{
|
||||||
|
?snk_kind := "gcp_pubsub_consumer_worker_handle_message",
|
||||||
|
?snk_span := {complete, _}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
),
|
||||||
|
ok.
|
Loading…
Reference in New Issue