Merge pull request #12595 from thalesmg/kafka-consumer-source-m-20240223
feat: migrate kafka consumer bridge to source + connector
This commit is contained in:
commit
d56fb22208
|
@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() ->
|
||||||
emqx_bridge_gcp_pubsub_consumer_action_info,
|
emqx_bridge_gcp_pubsub_consumer_action_info,
|
||||||
emqx_bridge_gcp_pubsub_producer_action_info,
|
emqx_bridge_gcp_pubsub_producer_action_info,
|
||||||
emqx_bridge_kafka_action_info,
|
emqx_bridge_kafka_action_info,
|
||||||
|
emqx_bridge_kafka_consumer_action_info,
|
||||||
emqx_bridge_kinesis_action_info,
|
emqx_bridge_kinesis_action_info,
|
||||||
emqx_bridge_hstreamdb_action_info,
|
emqx_bridge_hstreamdb_action_info,
|
||||||
emqx_bridge_matrix_action_info,
|
emqx_bridge_matrix_action_info,
|
||||||
|
|
|
@ -765,19 +765,26 @@ create_dry_run(ConfRootKey, Type, Conf0) ->
|
||||||
{error, Reason1}
|
{error, Reason1}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
|
create_dry_run_helper(ConfRootKey, BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) ->
|
||||||
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
|
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
|
||||||
ConnectorType = connector_type(BridgeType),
|
ConnectorType = connector_type(BridgeV2Type),
|
||||||
OnReadyCallback =
|
OnReadyCallback =
|
||||||
fun(ConnectorId) ->
|
fun(ConnectorId) ->
|
||||||
{_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
|
{_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
|
||||||
ChannelTestId = id(BridgeType, BridgeName, ConnectorName),
|
ChannelTestId = id(BridgeV2Type, BridgeName, ConnectorName),
|
||||||
Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf),
|
BridgeV2Conf0 = fill_defaults(
|
||||||
|
BridgeV2Type,
|
||||||
|
BridgeV2RawConf,
|
||||||
|
bin(ConfRootKey),
|
||||||
|
emqx_bridge_v2_schema,
|
||||||
|
#{make_serializable => false}
|
||||||
|
),
|
||||||
|
BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2Conf0),
|
||||||
AugmentedConf = augment_channel_config(
|
AugmentedConf = augment_channel_config(
|
||||||
ConfRootKey,
|
ConfRootKey,
|
||||||
BridgeType,
|
BridgeV2Type,
|
||||||
BridgeName,
|
BridgeName,
|
||||||
Conf
|
BridgeV2Conf
|
||||||
),
|
),
|
||||||
case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, AugmentedConf) of
|
case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, AugmentedConf) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -1204,8 +1211,11 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], E
|
||||||
perform_bridge_changes(Tasks, Errors).
|
perform_bridge_changes(Tasks, Errors).
|
||||||
|
|
||||||
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
|
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
|
||||||
|
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, _Opts = #{}).
|
||||||
|
|
||||||
|
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, Opts) ->
|
||||||
PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),
|
PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),
|
||||||
FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, #{}),
|
FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, Opts),
|
||||||
unpack_bridge_conf(Type, FullConf, TopLevelConf).
|
unpack_bridge_conf(Type, FullConf, TopLevelConf).
|
||||||
|
|
||||||
pack_bridge_conf(Type, RawConf, TopLevelConf) ->
|
pack_bridge_conf(Type, RawConf, TopLevelConf) ->
|
||||||
|
|
|
@ -775,7 +775,7 @@ handle_update(ConfRootKey, Id, Conf0) ->
|
||||||
Id,
|
Id,
|
||||||
case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
|
case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
|
RawConf = emqx:get_raw_config([ConfRootKey, BridgeType, BridgeName], #{}),
|
||||||
Conf = emqx_utils:deobfuscate(Conf1, RawConf),
|
Conf = emqx_utils:deobfuscate(Conf1, RawConf),
|
||||||
update_bridge(ConfRootKey, BridgeType, BridgeName, Conf);
|
update_bridge(ConfRootKey, BridgeType, BridgeName, Conf);
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
|
|
|
@ -89,6 +89,7 @@ end_per_testcase(_Testcase, Config) ->
|
||||||
%% in CI, apparently this needs more time since the
|
%% in CI, apparently this needs more time since the
|
||||||
%% machines struggle with all the containers running...
|
%% machines struggle with all the containers running...
|
||||||
emqx_common_test_helpers:call_janitor(60_000),
|
emqx_common_test_helpers:call_janitor(60_000),
|
||||||
|
delete_all_bridges_and_connectors(),
|
||||||
ok = snabbkaffe:stop(),
|
ok = snabbkaffe:stop(),
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
@ -132,7 +133,13 @@ parse_and_check(Kind, Type, Name, InnerConfigMap0) ->
|
||||||
TypeBin = emqx_utils_conv:bin(Type),
|
TypeBin = emqx_utils_conv:bin(Type),
|
||||||
RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}},
|
RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}},
|
||||||
#{RootBin := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
|
#{RootBin := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
|
||||||
emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}
|
emqx_bridge_v2_schema,
|
||||||
|
RawConf,
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
atom_key => false,
|
||||||
|
make_serializable => true
|
||||||
|
}
|
||||||
),
|
),
|
||||||
InnerConfigMap.
|
InnerConfigMap.
|
||||||
|
|
||||||
|
@ -140,7 +147,13 @@ parse_and_check_connector(Type, Name, InnerConfigMap0) ->
|
||||||
TypeBin = emqx_utils_conv:bin(Type),
|
TypeBin = emqx_utils_conv:bin(Type),
|
||||||
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}},
|
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}},
|
||||||
#{<<"connectors">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
|
#{<<"connectors">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
|
||||||
emqx_connector_schema, RawConf, #{required => false, atom_key => false}
|
emqx_connector_schema,
|
||||||
|
RawConf,
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
atom_key => false,
|
||||||
|
make_serializable => true
|
||||||
|
}
|
||||||
),
|
),
|
||||||
InnerConfigMap.
|
InnerConfigMap.
|
||||||
|
|
||||||
|
@ -282,20 +295,23 @@ list_bridges_api() ->
|
||||||
ct:pal("list bridges result: ~p", [Res]),
|
ct:pal("list bridges result: ~p", [Res]),
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
|
get_source_api(BridgeType, BridgeName) ->
|
||||||
|
get_bridge_api(source, BridgeType, BridgeName).
|
||||||
|
|
||||||
get_bridge_api(BridgeType, BridgeName) ->
|
get_bridge_api(BridgeType, BridgeName) ->
|
||||||
|
get_bridge_api(action, BridgeType, BridgeName).
|
||||||
|
|
||||||
|
get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
|
||||||
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
|
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
|
||||||
Params = [],
|
Params = [],
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]),
|
Root =
|
||||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
case BridgeKind of
|
||||||
Opts = #{return_all => true},
|
source -> "sources";
|
||||||
ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]),
|
action -> "actions"
|
||||||
Res =
|
|
||||||
case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
|
|
||||||
{ok, {Status, Headers, Body0}} ->
|
|
||||||
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end,
|
end,
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
|
||||||
|
ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
|
||||||
|
Res = request(get, Path, Params),
|
||||||
ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]),
|
ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]),
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
|
@ -672,7 +688,8 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% - `ProduceFn': produces a message in the remote system that shall be consumed.
|
%% - `ProduceFn': produces a message in the remote system that shall be consumed. May be
|
||||||
|
%% a `{function(), integer()}' tuple.
|
||||||
%% - `Tracepoint': marks the end of consumed message processing.
|
%% - `Tracepoint': marks the end of consumed message processing.
|
||||||
t_consume(Config, Opts) ->
|
t_consume(Config, Opts) ->
|
||||||
#{
|
#{
|
||||||
|
@ -683,14 +700,17 @@ t_consume(Config, Opts) ->
|
||||||
} = Opts,
|
} = Opts,
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
?assertMatch(
|
ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
|
||||||
{{ok, _}, {ok, _}},
|
case ConsumerReadyTPFn of
|
||||||
snabbkaffe:wait_async_action(
|
{Predicate, NEvents} when is_function(Predicate) ->
|
||||||
fun() -> create_bridge_api(Config) end,
|
{ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, ConsumerReadyTimeout);
|
||||||
ConsumerReadyTPFn,
|
Predicate when is_function(Predicate) ->
|
||||||
15_000
|
{ok, SRef0} = snabbkaffe:subscribe(
|
||||||
)
|
Predicate, _NEvents = 1, ConsumerReadyTimeout
|
||||||
),
|
)
|
||||||
|
end,
|
||||||
|
?assertMatch({ok, _}, create_bridge_api(Config)),
|
||||||
|
?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
|
||||||
ok = add_source_hookpoint(Config),
|
ok = add_source_hookpoint(Config),
|
||||||
?retry(
|
?retry(
|
||||||
_Sleep = 200,
|
_Sleep = 200,
|
||||||
|
|
|
@ -12,7 +12,12 @@
|
||||||
brod,
|
brod,
|
||||||
brod_gssapi
|
brod_gssapi
|
||||||
]},
|
]},
|
||||||
{env, [{emqx_action_info_modules, [emqx_bridge_kafka_action_info]}]},
|
{env, [
|
||||||
|
{emqx_action_info_modules, [
|
||||||
|
emqx_bridge_kafka_action_info,
|
||||||
|
emqx_bridge_kafka_consumer_action_info
|
||||||
|
]}
|
||||||
|
]},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
|
|
||||||
{links, []}
|
{links, []}
|
||||||
|
|
|
@ -0,0 +1,105 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_kafka_consumer_action_info).
|
||||||
|
|
||||||
|
-behaviour(emqx_action_info).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
is_source/0,
|
||||||
|
is_action/0,
|
||||||
|
bridge_v1_type_name/0,
|
||||||
|
action_type_name/0,
|
||||||
|
connector_type_name/0,
|
||||||
|
schema_module/0,
|
||||||
|
connector_action_config_to_bridge_v1_config/2,
|
||||||
|
bridge_v1_config_to_action_config/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
is_source() -> true.
|
||||||
|
|
||||||
|
is_action() -> false.
|
||||||
|
|
||||||
|
bridge_v1_type_name() -> kafka_consumer.
|
||||||
|
|
||||||
|
action_type_name() -> kafka_consumer.
|
||||||
|
|
||||||
|
connector_type_name() -> kafka_consumer.
|
||||||
|
|
||||||
|
schema_module() -> emqx_bridge_kafka_consumer_schema.
|
||||||
|
|
||||||
|
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||||
|
V1Config1 = maps:remove(<<"connector">>, ActionConfig),
|
||||||
|
V1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, V1Config1),
|
||||||
|
V1Config3 = maybe_fabricate_topic_mapping(V1Config2),
|
||||||
|
{Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3),
|
||||||
|
TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka],
|
||||||
|
TopLevelCfg = maps:with(TopLevelCfgKeys, Params1),
|
||||||
|
%% `topic' is v2-only
|
||||||
|
Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1),
|
||||||
|
V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg),
|
||||||
|
V1Config = emqx_utils_maps:update_if_present(
|
||||||
|
<<"resource_opts">>,
|
||||||
|
%% Slightly different from default source resource opts...
|
||||||
|
fun(RO) -> maps:with(v1_fields(connector_resource_opts), RO) end,
|
||||||
|
V1Config5
|
||||||
|
),
|
||||||
|
maps:put(<<"kafka">>, Params, V1Config).
|
||||||
|
|
||||||
|
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
|
||||||
|
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
|
||||||
|
BridgeV1Conf, ConnectorName, schema_module(), source_parameters
|
||||||
|
),
|
||||||
|
TopicMapping = maps:get(<<"topic_mapping">>, BridgeV1Conf, []),
|
||||||
|
Params0 = maps:get(<<"kafka">>, BridgeV1Conf, #{}),
|
||||||
|
Params1 = maps:with(source_parameters_field_keys(), Params0),
|
||||||
|
Params2 = emqx_utils_maps:put_if(
|
||||||
|
Params1, <<"topic_mapping">>, TopicMapping, TopicMapping =/= []
|
||||||
|
),
|
||||||
|
Params = maybe_set_kafka_topic(Params2),
|
||||||
|
{source, action_type_name(), maps:put(<<"parameters">>, Params, Config0)}.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------------------
|
||||||
|
%% Internal helper functions
|
||||||
|
%%------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% The new schema has a single kafka topic, so we take it from topic mapping when
|
||||||
|
%% converting from v1.
|
||||||
|
maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) ->
|
||||||
|
Params#{<<"topic">> => Topic};
|
||||||
|
maybe_set_kafka_topic(Params) ->
|
||||||
|
Params.
|
||||||
|
|
||||||
|
%% The old schema requires `topic_mapping', which is now hidden.
|
||||||
|
maybe_fabricate_topic_mapping(#{<<"parameters">> := Params0} = BridgeV1Config0) ->
|
||||||
|
#{<<"topic">> := Topic} = Params0,
|
||||||
|
case maps:get(<<"topic_mapping">>, Params0, undefined) of
|
||||||
|
[_ | _] ->
|
||||||
|
BridgeV1Config0;
|
||||||
|
_ ->
|
||||||
|
%% Have to fabricate an MQTT topic, unfortunately... QoS and payload already
|
||||||
|
%% have defaults.
|
||||||
|
FakeTopicMapping = #{
|
||||||
|
<<"kafka_topic">> => Topic,
|
||||||
|
<<"mqtt_topic">> => <<>>
|
||||||
|
},
|
||||||
|
Params = Params0#{<<"topic_mapping">> => [FakeTopicMapping]},
|
||||||
|
BridgeV1Config0#{<<"parameters">> := Params}
|
||||||
|
end.
|
||||||
|
|
||||||
|
v1_fields(StructName) ->
|
||||||
|
[
|
||||||
|
to_bin(K)
|
||||||
|
|| {K, _} <- emqx_bridge_kafka:fields(StructName)
|
||||||
|
].
|
||||||
|
|
||||||
|
source_parameters_field_keys() ->
|
||||||
|
[
|
||||||
|
to_bin(K)
|
||||||
|
|| {K, _} <- emqx_bridge_kafka_consumer_schema:fields(source_parameters)
|
||||||
|
].
|
||||||
|
|
||||||
|
to_bin(B) when is_binary(B) -> B;
|
||||||
|
to_bin(L) when is_list(L) -> list_to_binary(L);
|
||||||
|
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
|
|
@ -0,0 +1,233 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_bridge_kafka_consumer_schema).
|
||||||
|
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
|
||||||
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
source_examples/1,
|
||||||
|
connector_examples/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
namespace/0,
|
||||||
|
roots/0,
|
||||||
|
fields/1,
|
||||||
|
desc/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(CONNECTOR_TYPE, kafka_consumer).
|
||||||
|
-define(SOURCE_TYPE, kafka_consumer).
|
||||||
|
|
||||||
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
%% `hocon_schema' API
|
||||||
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
namespace() -> "kafka_consumer".
|
||||||
|
|
||||||
|
roots() -> [].
|
||||||
|
|
||||||
|
%%=========================================
|
||||||
|
%% Source fields
|
||||||
|
%%=========================================
|
||||||
|
fields(source) ->
|
||||||
|
{kafka_consumer,
|
||||||
|
mk(
|
||||||
|
hoconsc:map(name, ref(?MODULE, consumer_source)),
|
||||||
|
#{
|
||||||
|
desc => <<"Kafka Consumer Source Config">>,
|
||||||
|
required => false
|
||||||
|
}
|
||||||
|
)};
|
||||||
|
fields(consumer_source) ->
|
||||||
|
emqx_bridge_v2_schema:make_consumer_action_schema(
|
||||||
|
mk(
|
||||||
|
ref(?MODULE, source_parameters),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC(consumer_source)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
fields(source_parameters) ->
|
||||||
|
Fields0 = emqx_bridge_kafka:fields(consumer_kafka_opts),
|
||||||
|
Fields1 = emqx_bridge_kafka:fields(consumer_opts),
|
||||||
|
Fields2 = proplists:delete(kafka, Fields1),
|
||||||
|
Fields = lists:map(
|
||||||
|
fun
|
||||||
|
({topic_mapping = Name, Sc}) ->
|
||||||
|
%% to please dialyzer...
|
||||||
|
Override = #{
|
||||||
|
type => hocon_schema:field_schema(Sc, type),
|
||||||
|
required => false,
|
||||||
|
default => [],
|
||||||
|
validator => fun(_) -> ok end,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
},
|
||||||
|
{Name, hocon_schema:override(Sc, Override)};
|
||||||
|
(FieldSchema) ->
|
||||||
|
FieldSchema
|
||||||
|
end,
|
||||||
|
Fields0 ++ Fields2
|
||||||
|
),
|
||||||
|
[
|
||||||
|
{topic,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic)
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
| Fields
|
||||||
|
];
|
||||||
|
%%=========================================
|
||||||
|
%% HTTP API fields: source
|
||||||
|
%%=========================================
|
||||||
|
fields(Field) when
|
||||||
|
Field == "get_source";
|
||||||
|
Field == "post_source";
|
||||||
|
Field == "put_source"
|
||||||
|
->
|
||||||
|
emqx_bridge_v2_schema:api_fields(Field, ?SOURCE_TYPE, fields(consumer_source));
|
||||||
|
%%=========================================
|
||||||
|
%% Connector fields
|
||||||
|
%%=========================================
|
||||||
|
fields("config_connector") ->
|
||||||
|
emqx_connector_schema:common_fields() ++
|
||||||
|
emqx_bridge_kafka:kafka_connector_config_fields();
|
||||||
|
%%=========================================
|
||||||
|
%% HTTP API fields: connector
|
||||||
|
%%=========================================
|
||||||
|
fields(Field) when
|
||||||
|
Field == "get_connector";
|
||||||
|
Field == "put_connector";
|
||||||
|
Field == "post_connector"
|
||||||
|
->
|
||||||
|
emqx_connector_schema:api_fields(
|
||||||
|
Field,
|
||||||
|
?CONNECTOR_TYPE,
|
||||||
|
emqx_bridge_kafka:kafka_connector_config_fields()
|
||||||
|
).
|
||||||
|
|
||||||
|
desc("config_connector") ->
|
||||||
|
?DESC("config_connector");
|
||||||
|
desc(source_parameters) ->
|
||||||
|
?DESC(source_parameters);
|
||||||
|
desc(consumer_source) ->
|
||||||
|
?DESC(consumer_source);
|
||||||
|
desc(connector_resource_opts) ->
|
||||||
|
?DESC(emqx_resource_schema, "resource_opts");
|
||||||
|
desc(source_resource_opts) ->
|
||||||
|
?DESC(emqx_resource_schema, "resource_opts");
|
||||||
|
desc(Field) when
|
||||||
|
Field =:= "get_connector";
|
||||||
|
Field =:= "put_connector";
|
||||||
|
Field =:= "post_connector"
|
||||||
|
->
|
||||||
|
"Configuration for Kafka Consumer Connector.";
|
||||||
|
desc(Field) when
|
||||||
|
Field =:= "get_source";
|
||||||
|
Field =:= "put_source";
|
||||||
|
Field =:= "post_source"
|
||||||
|
->
|
||||||
|
"Configuration for Kafka Consumer Source.";
|
||||||
|
desc(Name) ->
|
||||||
|
throw({missing_desc, ?MODULE, Name}).
|
||||||
|
|
||||||
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
%% `emqx_bridge_v2_schema' "unofficial" API
|
||||||
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
source_examples(Method) ->
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"kafka_consumer">> => #{
|
||||||
|
summary => <<"Kafka Consumer Source">>,
|
||||||
|
value => source_example(Method)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
].
|
||||||
|
|
||||||
|
connector_examples(Method) ->
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"kafka_consumer">> => #{
|
||||||
|
summary => <<"Kafka Consumer Connector">>,
|
||||||
|
value => connector_example(Method)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
].
|
||||||
|
|
||||||
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
%% Helper fns
|
||||||
|
%%-------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
source_example(post) ->
|
||||||
|
maps:merge(
|
||||||
|
source_example(put),
|
||||||
|
#{
|
||||||
|
type => <<"kafka_consumer">>,
|
||||||
|
name => <<"my_source">>
|
||||||
|
}
|
||||||
|
);
|
||||||
|
source_example(get) ->
|
||||||
|
maps:merge(
|
||||||
|
source_example(put),
|
||||||
|
#{
|
||||||
|
status => <<"connected">>,
|
||||||
|
node_status => [
|
||||||
|
#{
|
||||||
|
node => <<"emqx@localhost">>,
|
||||||
|
status => <<"connected">>
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
);
|
||||||
|
source_example(put) ->
|
||||||
|
#{
|
||||||
|
parameters =>
|
||||||
|
#{
|
||||||
|
topic => <<"mytopic">>
|
||||||
|
},
|
||||||
|
resource_opts =>
|
||||||
|
#{
|
||||||
|
health_check_interval => <<"30s">>
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
connector_example(get) ->
|
||||||
|
maps:merge(
|
||||||
|
connector_example(post),
|
||||||
|
#{
|
||||||
|
status => <<"connected">>,
|
||||||
|
node_status => [
|
||||||
|
#{
|
||||||
|
node => <<"emqx@localhost">>,
|
||||||
|
status => <<"connected">>
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
);
|
||||||
|
connector_example(post) ->
|
||||||
|
maps:merge(
|
||||||
|
connector_example(put),
|
||||||
|
#{
|
||||||
|
type => <<"kafka_consumer">>,
|
||||||
|
name => <<"my_connector">>
|
||||||
|
}
|
||||||
|
);
|
||||||
|
connector_example(put) ->
|
||||||
|
#{
|
||||||
|
bootstrap_hosts => <<"kafka.emqx.net:9092">>,
|
||||||
|
resource_opts =>
|
||||||
|
#{
|
||||||
|
start_after_created => true,
|
||||||
|
health_check_interval => <<"30s">>,
|
||||||
|
start_timeout => <<"5s">>
|
||||||
|
}
|
||||||
|
}.
|
|
@ -11,7 +11,12 @@
|
||||||
query_mode/1,
|
query_mode/1,
|
||||||
on_start/2,
|
on_start/2,
|
||||||
on_stop/2,
|
on_stop/2,
|
||||||
on_get_status/2
|
on_get_status/2,
|
||||||
|
|
||||||
|
on_add_channel/4,
|
||||||
|
on_remove_channel/3,
|
||||||
|
on_get_channels/1,
|
||||||
|
on_get_channel_status/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% `brod_group_consumer' API
|
%% `brod_group_consumer' API
|
||||||
|
@ -30,45 +35,57 @@
|
||||||
-include_lib("brod/include/brod.hrl").
|
-include_lib("brod/include/brod.hrl").
|
||||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
-type config() :: #{
|
-type connector_config() :: #{
|
||||||
authentication := term(),
|
authentication := term(),
|
||||||
bootstrap_hosts := binary(),
|
bootstrap_hosts := binary(),
|
||||||
bridge_name := atom(),
|
connector_name := atom() | binary(),
|
||||||
kafka := #{
|
connector_type := atom() | binary(),
|
||||||
max_batch_bytes := emqx_schema:bytesize(),
|
socket_opts := _,
|
||||||
max_rejoin_attempts := non_neg_integer(),
|
|
||||||
offset_commit_interval_seconds := pos_integer(),
|
|
||||||
offset_reset_policy := offset_reset_policy(),
|
|
||||||
topic := binary()
|
|
||||||
},
|
|
||||||
topic_mapping := nonempty_list(
|
|
||||||
#{
|
|
||||||
kafka_topic := kafka_topic(),
|
|
||||||
mqtt_topic := emqx_types:topic(),
|
|
||||||
qos := emqx_types:qos(),
|
|
||||||
payload_template := string()
|
|
||||||
}
|
|
||||||
),
|
|
||||||
ssl := _,
|
ssl := _,
|
||||||
any() => term()
|
any() => term()
|
||||||
}.
|
}.
|
||||||
|
-type source_config() :: #{
|
||||||
|
bridge_name := atom(),
|
||||||
|
hookpoints := [binary()],
|
||||||
|
parameters := source_parameters()
|
||||||
|
}.
|
||||||
|
-type source_parameters() :: #{
|
||||||
|
key_encoding_mode := encoding_mode(),
|
||||||
|
max_batch_bytes := emqx_schema:bytesize(),
|
||||||
|
max_rejoin_attempts := non_neg_integer(),
|
||||||
|
offset_commit_interval_seconds := pos_integer(),
|
||||||
|
offset_reset_policy := offset_reset_policy(),
|
||||||
|
topic := kafka_topic(),
|
||||||
|
value_encoding_mode := encoding_mode(),
|
||||||
|
topic_mapping => [one_topic_mapping()]
|
||||||
|
}.
|
||||||
|
-type one_topic_mapping() :: #{
|
||||||
|
kafka_topic => kafka_topic(),
|
||||||
|
mqtt_topic => emqx_types:topic(),
|
||||||
|
qos => emqx_types:qos(),
|
||||||
|
payload_template => string()
|
||||||
|
}.
|
||||||
-type subscriber_id() :: emqx_bridge_kafka_consumer_sup:child_id().
|
-type subscriber_id() :: emqx_bridge_kafka_consumer_sup:child_id().
|
||||||
-type kafka_topic() :: brod:topic().
|
-type kafka_topic() :: brod:topic().
|
||||||
-type kafka_message() :: #kafka_message{}.
|
-type kafka_message() :: #kafka_message{}.
|
||||||
-type state() :: #{
|
-type connector_state() :: #{
|
||||||
kafka_topics := nonempty_list(kafka_topic()),
|
kafka_client_id := brod:client_id(),
|
||||||
|
installed_sources := #{source_resource_id() => source_state()}
|
||||||
|
}.
|
||||||
|
-type source_state() :: #{
|
||||||
subscriber_id := subscriber_id(),
|
subscriber_id := subscriber_id(),
|
||||||
kafka_client_id := brod:client_id()
|
kafka_client_id := brod:client_id(),
|
||||||
|
kafka_topics := [kafka_topic()]
|
||||||
}.
|
}.
|
||||||
-type offset_reset_policy() :: latest | earliest.
|
-type offset_reset_policy() :: latest | earliest.
|
||||||
-type encoding_mode() :: none | base64.
|
-type encoding_mode() :: none | base64.
|
||||||
-type consumer_init_data() :: #{
|
-type consumer_init_data() :: #{
|
||||||
hookpoint := binary(),
|
hookpoints := [binary()],
|
||||||
key_encoding_mode := encoding_mode(),
|
key_encoding_mode := encoding_mode(),
|
||||||
resource_id := resource_id(),
|
resource_id := source_resource_id(),
|
||||||
topic_mapping := #{
|
topic_mapping := #{
|
||||||
kafka_topic() := #{
|
kafka_topic() := #{
|
||||||
payload_template := emqx_placeholder:tmpl_token(),
|
payload_template => emqx_placeholder:tmpl_token(),
|
||||||
mqtt_topic_template => emqx_placeholder:tmpl_token(),
|
mqtt_topic_template => emqx_placeholder:tmpl_token(),
|
||||||
qos => emqx_types:qos()
|
qos => emqx_types:qos()
|
||||||
}
|
}
|
||||||
|
@ -76,13 +93,13 @@
|
||||||
value_encoding_mode := encoding_mode()
|
value_encoding_mode := encoding_mode()
|
||||||
}.
|
}.
|
||||||
-type consumer_state() :: #{
|
-type consumer_state() :: #{
|
||||||
hookpoint := binary(),
|
hookpoints := [binary()],
|
||||||
kafka_topic := binary(),
|
kafka_topic := kafka_topic(),
|
||||||
key_encoding_mode := encoding_mode(),
|
key_encoding_mode := encoding_mode(),
|
||||||
resource_id := resource_id(),
|
resource_id := source_resource_id(),
|
||||||
topic_mapping := #{
|
topic_mapping := #{
|
||||||
kafka_topic() := #{
|
kafka_topic() := #{
|
||||||
payload_template := emqx_placeholder:tmpl_token(),
|
payload_template => emqx_placeholder:tmpl_token(),
|
||||||
mqtt_topic_template => emqx_placeholder:tmpl_token(),
|
mqtt_topic_template => emqx_placeholder:tmpl_token(),
|
||||||
qos => emqx_types:qos()
|
qos => emqx_types:qos()
|
||||||
}
|
}
|
||||||
|
@ -90,7 +107,7 @@
|
||||||
value_encoding_mode := encoding_mode()
|
value_encoding_mode := encoding_mode()
|
||||||
}.
|
}.
|
||||||
-type subscriber_init_info() :: #{
|
-type subscriber_init_info() :: #{
|
||||||
topic => brod:topic(),
|
topic := brod:topic(),
|
||||||
parition => brod:partition(),
|
parition => brod:partition(),
|
||||||
group_id => brod:group_id(),
|
group_id => brod:group_id(),
|
||||||
commit_fun => brod_group_subscriber_v2:commit_fun()
|
commit_fun => brod_group_subscriber_v2:commit_fun()
|
||||||
|
@ -116,27 +133,19 @@ callback_mode() ->
|
||||||
query_mode(_Config) ->
|
query_mode(_Config) ->
|
||||||
no_queries.
|
no_queries.
|
||||||
|
|
||||||
-spec on_start(resource_id(), config()) -> {ok, state()}.
|
-spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()}.
|
||||||
on_start(ResourceId, Config) ->
|
on_start(ConnectorResId, Config) ->
|
||||||
#{
|
#{
|
||||||
authentication := Auth,
|
authentication := Auth,
|
||||||
bootstrap_hosts := BootstrapHosts0,
|
bootstrap_hosts := BootstrapHosts0,
|
||||||
bridge_type := BridgeType,
|
connector_type := ConnectorType,
|
||||||
bridge_name := BridgeName,
|
connector_name := ConnectorName,
|
||||||
hookpoint := _,
|
|
||||||
kafka := #{
|
|
||||||
max_batch_bytes := _,
|
|
||||||
max_rejoin_attempts := _,
|
|
||||||
offset_commit_interval_seconds := _,
|
|
||||||
offset_reset_policy := _
|
|
||||||
},
|
|
||||||
socket_opts := SocketOpts0,
|
socket_opts := SocketOpts0,
|
||||||
ssl := SSL,
|
ssl := SSL
|
||||||
topic_mapping := _
|
|
||||||
} = Config,
|
} = Config,
|
||||||
BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
|
BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
|
||||||
%% Note: this is distinct per node.
|
%% Note: this is distinct per node.
|
||||||
ClientID = make_client_id(ResourceId, BridgeType, BridgeName),
|
ClientID = make_client_id(ConnectorResId, ConnectorType, ConnectorName),
|
||||||
ClientOpts0 =
|
ClientOpts0 =
|
||||||
case Auth of
|
case Auth of
|
||||||
none -> [];
|
none -> [];
|
||||||
|
@ -145,67 +154,146 @@ on_start(ResourceId, Config) ->
|
||||||
ClientOpts = add_ssl_opts(ClientOpts0, SSL),
|
ClientOpts = add_ssl_opts(ClientOpts0, SSL),
|
||||||
SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0),
|
SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0),
|
||||||
ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts],
|
ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts],
|
||||||
ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID),
|
ok = emqx_resource:allocate_resource(ConnectorResId, ?kafka_client_id, ClientID),
|
||||||
case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of
|
case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of
|
||||||
ok ->
|
ok ->
|
||||||
?tp(
|
?tp(
|
||||||
kafka_consumer_client_started,
|
kafka_consumer_client_started,
|
||||||
#{client_id => ClientID, resource_id => ResourceId}
|
#{client_id => ClientID, resource_id => ConnectorResId}
|
||||||
),
|
),
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "kafka_consumer_client_started",
|
msg => "kafka_consumer_client_started",
|
||||||
resource_id => ResourceId,
|
resource_id => ConnectorResId,
|
||||||
kafka_hosts => BootstrapHosts
|
kafka_hosts => BootstrapHosts
|
||||||
});
|
});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "failed_to_start_kafka_consumer_client",
|
msg => "failed_to_start_kafka_consumer_client",
|
||||||
resource_id => ResourceId,
|
resource_id => ConnectorResId,
|
||||||
kafka_hosts => BootstrapHosts,
|
kafka_hosts => BootstrapHosts,
|
||||||
reason => emqx_utils:redact(Reason)
|
reason => emqx_utils:redact(Reason)
|
||||||
}),
|
}),
|
||||||
throw(?CLIENT_DOWN_MESSAGE)
|
throw(?CLIENT_DOWN_MESSAGE)
|
||||||
end,
|
end,
|
||||||
start_consumer(Config, ResourceId, ClientID).
|
{ok, #{
|
||||||
|
kafka_client_id => ClientID,
|
||||||
|
installed_sources => #{}
|
||||||
|
}}.
|
||||||
|
|
||||||
-spec on_stop(resource_id(), state()) -> ok.
|
-spec on_stop(connector_resource_id(), connector_state()) -> ok.
|
||||||
on_stop(ResourceId, _State = undefined) ->
|
on_stop(ConnectorResId, _State = undefined) ->
|
||||||
case emqx_resource:get_allocated_resources(ResourceId) of
|
SubscribersStopped =
|
||||||
#{?kafka_client_id := ClientID, ?kafka_subscriber_id := SubscriberId} ->
|
maps:fold(
|
||||||
stop_subscriber(SubscriberId),
|
fun
|
||||||
stop_client(ClientID),
|
(?kafka_client_id, ClientID, Acc) ->
|
||||||
|
stop_client(ClientID),
|
||||||
|
Acc;
|
||||||
|
({?kafka_subscriber_id, _SourceResId}, SubscriberId, Acc) ->
|
||||||
|
stop_subscriber(SubscriberId),
|
||||||
|
Acc + 1
|
||||||
|
end,
|
||||||
|
0,
|
||||||
|
emqx_resource:get_allocated_resources(ConnectorResId)
|
||||||
|
),
|
||||||
|
case SubscribersStopped > 0 of
|
||||||
|
true ->
|
||||||
?tp(kafka_consumer_subcriber_and_client_stopped, #{}),
|
?tp(kafka_consumer_subcriber_and_client_stopped, #{}),
|
||||||
ok;
|
ok;
|
||||||
#{?kafka_client_id := ClientID} ->
|
false ->
|
||||||
stop_client(ClientID),
|
|
||||||
?tp(kafka_consumer_just_client_stopped, #{}),
|
?tp(kafka_consumer_just_client_stopped, #{}),
|
||||||
ok;
|
|
||||||
_ ->
|
|
||||||
ok
|
ok
|
||||||
end;
|
end;
|
||||||
on_stop(_ResourceId, State) ->
|
on_stop(ConnectorResId, State) ->
|
||||||
#{
|
#{
|
||||||
subscriber_id := SubscriberId,
|
installed_sources := InstalledSources,
|
||||||
kafka_client_id := ClientID
|
kafka_client_id := ClientID
|
||||||
} = State,
|
} = State,
|
||||||
stop_subscriber(SubscriberId),
|
maps:foreach(
|
||||||
|
fun(_SourceResId, #{subscriber_id := SubscriberId}) ->
|
||||||
|
stop_subscriber(SubscriberId)
|
||||||
|
end,
|
||||||
|
InstalledSources
|
||||||
|
),
|
||||||
stop_client(ClientID),
|
stop_client(ClientID),
|
||||||
|
?tp(kafka_consumer_subcriber_and_client_stopped, #{instance_id => ConnectorResId}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
|
-spec on_get_status(connector_resource_id(), connector_state()) ->
|
||||||
on_get_status(_ResourceID, State) ->
|
?status_connected | ?status_disconnected.
|
||||||
|
on_get_status(_ConnectorResId, _State = #{kafka_client_id := ClientID}) ->
|
||||||
|
case brod_sup:find_client(ClientID) of
|
||||||
|
[_Pid] -> ?status_connected;
|
||||||
|
_ -> ?status_disconnected
|
||||||
|
end;
|
||||||
|
on_get_status(_ConnectorResId, _State) ->
|
||||||
|
?status_disconnected.
|
||||||
|
|
||||||
|
-spec on_add_channel(
|
||||||
|
connector_resource_id(),
|
||||||
|
connector_state(),
|
||||||
|
source_resource_id(),
|
||||||
|
source_config()
|
||||||
|
) ->
|
||||||
|
{ok, connector_state()}.
|
||||||
|
on_add_channel(ConnectorResId, ConnectorState0, SourceResId, SourceConfig) ->
|
||||||
#{
|
#{
|
||||||
subscriber_id := SubscriberId,
|
|
||||||
kafka_client_id := ClientID,
|
kafka_client_id := ClientID,
|
||||||
kafka_topics := KafkaTopics
|
installed_sources := InstalledSources0
|
||||||
} = State,
|
} = ConnectorState0,
|
||||||
case do_get_status(ClientID, KafkaTopics, SubscriberId) of
|
case start_consumer(SourceConfig, ConnectorResId, SourceResId, ClientID) of
|
||||||
{disconnected, Message} ->
|
{ok, SourceState} ->
|
||||||
{disconnected, State, Message};
|
InstalledSources = InstalledSources0#{SourceResId => SourceState},
|
||||||
Res ->
|
ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
|
||||||
Res
|
{ok, ConnectorState};
|
||||||
|
Error = {error, _} ->
|
||||||
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec on_remove_channel(
|
||||||
|
connector_resource_id(),
|
||||||
|
connector_state(),
|
||||||
|
source_resource_id()
|
||||||
|
) ->
|
||||||
|
{ok, connector_state()}.
|
||||||
|
on_remove_channel(ConnectorResId, ConnectorState0, SourceResId) ->
|
||||||
|
#{installed_sources := InstalledSources0} = ConnectorState0,
|
||||||
|
case maps:take(SourceResId, InstalledSources0) of
|
||||||
|
{SourceState, InstalledSources} ->
|
||||||
|
#{subscriber_id := SubscriberId} = SourceState,
|
||||||
|
stop_subscriber(SubscriberId),
|
||||||
|
deallocate_subscriber_id(ConnectorResId, SourceResId),
|
||||||
|
ok;
|
||||||
|
error ->
|
||||||
|
InstalledSources = InstalledSources0
|
||||||
|
end,
|
||||||
|
ConnectorState = ConnectorState0#{installed_sources := InstalledSources},
|
||||||
|
{ok, ConnectorState}.
|
||||||
|
|
||||||
|
-spec on_get_channels(connector_resource_id()) ->
|
||||||
|
[{action_resource_id(), source_config()}].
|
||||||
|
on_get_channels(ConnectorResId) ->
|
||||||
|
emqx_bridge_v2:get_channels_for_connector(ConnectorResId).
|
||||||
|
|
||||||
|
-spec on_get_channel_status(
|
||||||
|
connector_resource_id(),
|
||||||
|
source_resource_id(),
|
||||||
|
connector_state()
|
||||||
|
) ->
|
||||||
|
?status_connected | ?status_disconnected.
|
||||||
|
on_get_channel_status(
|
||||||
|
_ConnectorResId,
|
||||||
|
SourceResId,
|
||||||
|
ConnectorState = #{installed_sources := InstalledSources}
|
||||||
|
) when is_map_key(SourceResId, InstalledSources) ->
|
||||||
|
#{kafka_client_id := ClientID} = ConnectorState,
|
||||||
|
#{
|
||||||
|
kafka_topics := KafkaTopics,
|
||||||
|
subscriber_id := SubscriberId
|
||||||
|
} = maps:get(SourceResId, InstalledSources),
|
||||||
|
do_get_status(ClientID, KafkaTopics, SubscriberId);
|
||||||
|
on_get_channel_status(_ConnectorResId, _SourceResId, _ConnectorState) ->
|
||||||
|
?status_disconnected.
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------
|
||||||
%% `brod_group_subscriber' API
|
%% `brod_group_subscriber' API
|
||||||
%%-------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------
|
||||||
|
@ -227,18 +315,13 @@ handle_message(Message, State) ->
|
||||||
|
|
||||||
do_handle_message(Message, State) ->
|
do_handle_message(Message, State) ->
|
||||||
#{
|
#{
|
||||||
hookpoint := Hookpoint,
|
hookpoints := Hookpoints,
|
||||||
kafka_topic := KafkaTopic,
|
kafka_topic := KafkaTopic,
|
||||||
key_encoding_mode := KeyEncodingMode,
|
key_encoding_mode := KeyEncodingMode,
|
||||||
resource_id := ResourceId,
|
resource_id := SourceResId,
|
||||||
topic_mapping := TopicMapping,
|
topic_mapping := TopicMapping,
|
||||||
value_encoding_mode := ValueEncodingMode
|
value_encoding_mode := ValueEncodingMode
|
||||||
} = State,
|
} = State,
|
||||||
#{
|
|
||||||
mqtt_topic_template := MQTTTopicTemplate,
|
|
||||||
qos := MQTTQoS,
|
|
||||||
payload_template := PayloadTemplate
|
|
||||||
} = maps:get(KafkaTopic, TopicMapping),
|
|
||||||
FullMessage = #{
|
FullMessage = #{
|
||||||
headers => maps:from_list(Message#kafka_message.headers),
|
headers => maps:from_list(Message#kafka_message.headers),
|
||||||
key => encode(Message#kafka_message.key, KeyEncodingMode),
|
key => encode(Message#kafka_message.key, KeyEncodingMode),
|
||||||
|
@ -248,16 +331,31 @@ do_handle_message(Message, State) ->
|
||||||
ts_type => Message#kafka_message.ts_type,
|
ts_type => Message#kafka_message.ts_type,
|
||||||
value => encode(Message#kafka_message.value, ValueEncodingMode)
|
value => encode(Message#kafka_message.value, ValueEncodingMode)
|
||||||
},
|
},
|
||||||
Payload = render(FullMessage, PayloadTemplate),
|
LegacyMQTTConfig = maps:get(KafkaTopic, TopicMapping, #{}),
|
||||||
MQTTTopic = render(FullMessage, MQTTTopicTemplate),
|
legacy_maybe_publish_mqtt_message(LegacyMQTTConfig, SourceResId, FullMessage),
|
||||||
MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
|
lists:foreach(fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end, Hookpoints),
|
||||||
_ = emqx_broker:safe_publish(MQTTMessage),
|
emqx_resource_metrics:received_inc(SourceResId),
|
||||||
emqx_hooks:run(Hookpoint, [FullMessage]),
|
|
||||||
emqx_resource_metrics:received_inc(ResourceId),
|
|
||||||
%% note: just `ack' does not commit the offset to the
|
%% note: just `ack' does not commit the offset to the
|
||||||
%% kafka consumer group.
|
%% kafka consumer group.
|
||||||
{ok, commit, State}.
|
{ok, commit, State}.
|
||||||
|
|
||||||
|
legacy_maybe_publish_mqtt_message(
|
||||||
|
_MQTTConfig = #{
|
||||||
|
payload_template := PayloadTemplate,
|
||||||
|
qos := MQTTQoS,
|
||||||
|
mqtt_topic_template := MQTTTopicTemplate
|
||||||
|
},
|
||||||
|
SourceResId,
|
||||||
|
FullMessage
|
||||||
|
) when MQTTTopicTemplate =/= <<>> ->
|
||||||
|
Payload = render(FullMessage, PayloadTemplate),
|
||||||
|
MQTTTopic = render(FullMessage, MQTTTopicTemplate),
|
||||||
|
MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
|
||||||
|
_ = emqx_broker:safe_publish(MQTTMessage),
|
||||||
|
ok;
|
||||||
|
legacy_maybe_publish_mqtt_message(_MQTTConfig, _SourceResId, _FullMessage) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------
|
||||||
%% Helper fns
|
%% Helper fns
|
||||||
%%-------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------
|
||||||
|
@ -292,28 +390,34 @@ ensure_consumer_supervisor_started() ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec start_consumer(config(), resource_id(), brod:client_id()) -> {ok, state()}.
|
-spec start_consumer(
|
||||||
start_consumer(Config, ResourceId, ClientID) ->
|
source_config(),
|
||||||
|
connector_resource_id(),
|
||||||
|
source_resource_id(),
|
||||||
|
brod:client_id()
|
||||||
|
) ->
|
||||||
|
{ok, source_state()} | {error, term()}.
|
||||||
|
start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
|
||||||
#{
|
#{
|
||||||
bootstrap_hosts := BootstrapHosts0,
|
|
||||||
bridge_name := BridgeName,
|
bridge_name := BridgeName,
|
||||||
hookpoint := Hookpoint,
|
hookpoints := Hookpoints,
|
||||||
kafka := #{
|
parameters := #{
|
||||||
|
key_encoding_mode := KeyEncodingMode,
|
||||||
max_batch_bytes := MaxBatchBytes,
|
max_batch_bytes := MaxBatchBytes,
|
||||||
max_rejoin_attempts := MaxRejoinAttempts,
|
max_rejoin_attempts := MaxRejoinAttempts,
|
||||||
offset_commit_interval_seconds := OffsetCommitInterval,
|
offset_commit_interval_seconds := OffsetCommitInterval,
|
||||||
offset_reset_policy := OffsetResetPolicy0
|
offset_reset_policy := OffsetResetPolicy0,
|
||||||
},
|
topic := _Topic,
|
||||||
key_encoding_mode := KeyEncodingMode,
|
value_encoding_mode := ValueEncodingMode
|
||||||
topic_mapping := TopicMapping0,
|
} = Params0
|
||||||
value_encoding_mode := ValueEncodingMode
|
|
||||||
} = Config,
|
} = Config,
|
||||||
ok = ensure_consumer_supervisor_started(),
|
ok = ensure_consumer_supervisor_started(),
|
||||||
TopicMapping = convert_topic_mapping(TopicMapping0),
|
?tp(kafka_consumer_sup_started, #{}),
|
||||||
|
TopicMapping = ensure_topic_mapping(Params0),
|
||||||
InitialState = #{
|
InitialState = #{
|
||||||
key_encoding_mode => KeyEncodingMode,
|
key_encoding_mode => KeyEncodingMode,
|
||||||
hookpoint => Hookpoint,
|
hookpoints => Hookpoints,
|
||||||
resource_id => ResourceId,
|
resource_id => SourceResId,
|
||||||
topic_mapping => TopicMapping,
|
topic_mapping => TopicMapping,
|
||||||
value_encoding_mode => ValueEncodingMode
|
value_encoding_mode => ValueEncodingMode
|
||||||
},
|
},
|
||||||
|
@ -355,30 +459,38 @@ start_consumer(Config, ResourceId, ClientID) ->
|
||||||
%% automatically, so we should not spawn duplicate workers.
|
%% automatically, so we should not spawn duplicate workers.
|
||||||
SubscriberId = make_subscriber_id(BridgeName),
|
SubscriberId = make_subscriber_id(BridgeName),
|
||||||
?tp(kafka_consumer_about_to_start_subscriber, #{}),
|
?tp(kafka_consumer_about_to_start_subscriber, #{}),
|
||||||
ok = emqx_resource:allocate_resource(ResourceId, ?kafka_subscriber_id, SubscriberId),
|
ok = allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId),
|
||||||
?tp(kafka_consumer_subscriber_allocated, #{}),
|
?tp(kafka_consumer_subscriber_allocated, #{}),
|
||||||
case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
|
case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of
|
||||||
{ok, _ConsumerPid} ->
|
{ok, _ConsumerPid} ->
|
||||||
?tp(
|
?tp(
|
||||||
kafka_consumer_subscriber_started,
|
kafka_consumer_subscriber_started,
|
||||||
#{resource_id => ResourceId, subscriber_id => SubscriberId}
|
#{resource_id => SourceResId, subscriber_id => SubscriberId}
|
||||||
),
|
),
|
||||||
{ok, #{
|
{ok, #{
|
||||||
subscriber_id => SubscriberId,
|
subscriber_id => SubscriberId,
|
||||||
kafka_client_id => ClientID,
|
kafka_client_id => ClientID,
|
||||||
kafka_topics => KafkaTopics
|
kafka_topics => KafkaTopics
|
||||||
}};
|
}};
|
||||||
{error, Reason2} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "failed_to_start_kafka_consumer",
|
msg => "failed_to_start_kafka_consumer",
|
||||||
resource_id => ResourceId,
|
resource_id => SourceResId,
|
||||||
kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
|
reason => emqx_utils:redact(Reason)
|
||||||
reason => emqx_utils:redact(Reason2)
|
|
||||||
}),
|
}),
|
||||||
stop_client(ClientID),
|
{error, Reason}
|
||||||
throw(failed_to_start_kafka_consumer)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% This is to ensure backwards compatibility with the deprectated topic mapping.
|
||||||
|
-spec ensure_topic_mapping(source_parameters()) -> #{kafka_topic() := map()}.
|
||||||
|
ensure_topic_mapping(#{topic_mapping := [_ | _] = TM}) ->
|
||||||
|
%% There is an existing topic mapping: legacy config. We use it and ignore the single
|
||||||
|
%% pubsub topic so that the bridge keeps working as before.
|
||||||
|
convert_topic_mapping(TM);
|
||||||
|
ensure_topic_mapping(#{topic := KafkaTopic}) ->
|
||||||
|
%% No topic mapping: generate one without MQTT templates.
|
||||||
|
#{KafkaTopic => #{}}.
|
||||||
|
|
||||||
-spec stop_subscriber(emqx_bridge_kafka_consumer_sup:child_id()) -> ok.
|
-spec stop_subscriber(emqx_bridge_kafka_consumer_sup:child_id()) -> ok.
|
||||||
stop_subscriber(SubscriberId) ->
|
stop_subscriber(SubscriberId) ->
|
||||||
_ = log_when_error(
|
_ = log_when_error(
|
||||||
|
@ -415,36 +527,38 @@ do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
|
||||||
case brod:get_partitions_count(ClientID, KafkaTopic) of
|
case brod:get_partitions_count(ClientID, KafkaTopic) of
|
||||||
{ok, NPartitions} ->
|
{ok, NPartitions} ->
|
||||||
case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
|
case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
|
||||||
connected -> do_get_status(ClientID, RestTopics, SubscriberId);
|
?status_connected ->
|
||||||
disconnected -> disconnected
|
do_get_status(ClientID, RestTopics, SubscriberId);
|
||||||
|
?status_disconnected ->
|
||||||
|
?status_disconnected
|
||||||
end;
|
end;
|
||||||
{error, {client_down, Context}} ->
|
{error, {client_down, Context}} ->
|
||||||
case infer_client_error(Context) of
|
case infer_client_error(Context) of
|
||||||
auth_error ->
|
auth_error ->
|
||||||
Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
|
Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
|
||||||
{disconnected, Message};
|
{?status_disconnected, Message};
|
||||||
{auth_error, Message0} ->
|
{auth_error, Message0} ->
|
||||||
Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
|
Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
|
||||||
{disconnected, Message};
|
{?status_disconnected, Message};
|
||||||
connection_refused ->
|
connection_refused ->
|
||||||
Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
|
Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
|
||||||
{disconnected, Message};
|
{?status_disconnected, Message};
|
||||||
_ ->
|
_ ->
|
||||||
{disconnected, ?CLIENT_DOWN_MESSAGE}
|
{?status_disconnected, ?CLIENT_DOWN_MESSAGE}
|
||||||
end;
|
end;
|
||||||
{error, leader_not_available} ->
|
{error, leader_not_available} ->
|
||||||
Message =
|
Message =
|
||||||
"Leader connection not available. Please check the Kafka topic used,"
|
"Leader connection not available. Please check the Kafka topic used,"
|
||||||
" the connection parameters and Kafka cluster health",
|
" the connection parameters and Kafka cluster health",
|
||||||
{disconnected, Message};
|
{?status_disconnected, Message};
|
||||||
_ ->
|
_ ->
|
||||||
disconnected
|
?status_disconnected
|
||||||
end;
|
end;
|
||||||
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
|
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
|
||||||
connected.
|
?status_connected.
|
||||||
|
|
||||||
-spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
|
-spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
|
||||||
connected | disconnected.
|
?status_connected | ?status_disconnected.
|
||||||
do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
|
do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
|
||||||
Results =
|
Results =
|
||||||
lists:map(
|
lists:map(
|
||||||
|
@ -467,9 +581,9 @@ do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
|
||||||
WorkersAlive = are_subscriber_workers_alive(SubscriberId),
|
WorkersAlive = are_subscriber_workers_alive(SubscriberId),
|
||||||
case AllLeadersOk andalso WorkersAlive of
|
case AllLeadersOk andalso WorkersAlive of
|
||||||
true ->
|
true ->
|
||||||
connected;
|
?status_connected;
|
||||||
false ->
|
false ->
|
||||||
disconnected
|
?status_disconnected
|
||||||
end.
|
end.
|
||||||
|
|
||||||
are_subscriber_workers_alive(SubscriberId) ->
|
are_subscriber_workers_alive(SubscriberId) ->
|
||||||
|
@ -507,19 +621,19 @@ consumer_group_id(BridgeName0) ->
|
||||||
BridgeName = to_bin(BridgeName0),
|
BridgeName = to_bin(BridgeName0),
|
||||||
<<"emqx-kafka-consumer-", BridgeName/binary>>.
|
<<"emqx-kafka-consumer-", BridgeName/binary>>.
|
||||||
|
|
||||||
-spec is_dry_run(resource_id()) -> boolean().
|
-spec is_dry_run(connector_resource_id()) -> boolean().
|
||||||
is_dry_run(ResourceId) ->
|
is_dry_run(ConnectorResId) ->
|
||||||
TestIdStart = string:find(ResourceId, ?TEST_ID_PREFIX),
|
TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX),
|
||||||
case TestIdStart of
|
case TestIdStart of
|
||||||
nomatch ->
|
nomatch ->
|
||||||
false;
|
false;
|
||||||
_ ->
|
_ ->
|
||||||
string:equal(TestIdStart, ResourceId)
|
string:equal(TestIdStart, ConnectorResId)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec make_client_id(resource_id(), binary(), atom() | binary()) -> atom().
|
-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
|
||||||
make_client_id(ResourceId, BridgeType, BridgeName) ->
|
make_client_id(ConnectorResId, BridgeType, BridgeName) ->
|
||||||
case is_dry_run(ResourceId) of
|
case is_dry_run(ConnectorResId) of
|
||||||
false ->
|
false ->
|
||||||
ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
|
ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
|
||||||
binary_to_atom(ClientID0);
|
binary_to_atom(ClientID0);
|
||||||
|
@ -583,3 +697,16 @@ infer_client_error(Error) ->
|
||||||
_ ->
|
_ ->
|
||||||
undefined
|
undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
allocate_subscriber_id(ConnectorResId, SourceResId, SubscriberId) ->
|
||||||
|
ok = emqx_resource:allocate_resource(
|
||||||
|
ConnectorResId,
|
||||||
|
{?kafka_subscriber_id, SourceResId},
|
||||||
|
SubscriberId
|
||||||
|
).
|
||||||
|
|
||||||
|
deallocate_subscriber_id(ConnectorResId, SourceResId) ->
|
||||||
|
ok = emqx_resource:deallocate_resource(
|
||||||
|
ConnectorResId,
|
||||||
|
{?kafka_subscriber_id, SourceResId}
|
||||||
|
).
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
-define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>).
|
-define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>).
|
||||||
|
-define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>).
|
||||||
|
-define(SOURCE_TYPE_BIN, <<"kafka_consumer">>).
|
||||||
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]).
|
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -78,13 +80,29 @@ testcases(once) ->
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
[{bridge_type, <<"kafka_consumer">>} | Config].
|
emqx_common_test_helpers:clear_screen(),
|
||||||
|
Apps = emqx_cth_suite:start(
|
||||||
|
[
|
||||||
|
emqx,
|
||||||
|
emqx_conf,
|
||||||
|
emqx_bridge_kafka,
|
||||||
|
emqx_bridge,
|
||||||
|
emqx_rule_engine,
|
||||||
|
emqx_management,
|
||||||
|
emqx_mgmt_api_test_util:emqx_dashboard()
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||||
|
),
|
||||||
|
{ok, _Api} = emqx_common_test_http:create_default_app(),
|
||||||
|
[
|
||||||
|
{apps, Apps},
|
||||||
|
{bridge_type, <<"kafka_consumer">>}
|
||||||
|
| Config
|
||||||
|
].
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(Config) ->
|
||||||
emqx_mgmt_api_test_util:end_suite(),
|
Apps = ?config(apps, Config),
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
emqx_cth_suite:stop(Apps),
|
||||||
ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
|
|
||||||
_ = application:stop(emqx_connector),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_group(plain = Type, Config) ->
|
init_per_group(plain = Type, Config) ->
|
||||||
|
@ -242,11 +260,6 @@ common_init_per_group() ->
|
||||||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
application:load(emqx_bridge),
|
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
|
||||||
ok = emqx_connector_test_helpers:start_apps(?APPS),
|
|
||||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
|
||||||
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
||||||
MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
|
MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>,
|
||||||
[
|
[
|
||||||
|
@ -262,7 +275,7 @@ common_end_per_group(Config) ->
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
delete_all_bridges(),
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
end_per_group(Group, Config) when
|
end_per_group(Group, Config) when
|
||||||
|
@ -327,7 +340,7 @@ init_per_testcase(TestCase, Config) ->
|
||||||
|
|
||||||
common_init_per_testcase(TestCase, Config0) ->
|
common_init_per_testcase(TestCase, Config0) ->
|
||||||
ct:timetrap(timer:seconds(60)),
|
ct:timetrap(timer:seconds(60)),
|
||||||
delete_all_bridges(),
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
||||||
emqx_config:delete_override_conf_files(),
|
emqx_config:delete_override_conf_files(),
|
||||||
KafkaTopic0 =
|
KafkaTopic0 =
|
||||||
<<
|
<<
|
||||||
|
@ -363,7 +376,12 @@ common_init_per_testcase(TestCase, Config0) ->
|
||||||
{kafka_name, Name},
|
{kafka_name, Name},
|
||||||
{kafka_config_string, ConfigString},
|
{kafka_config_string, ConfigString},
|
||||||
{kafka_config, KafkaConfig},
|
{kafka_config, KafkaConfig},
|
||||||
{kafka_producers, ProducersConfigs}
|
{kafka_producers, ProducersConfigs},
|
||||||
|
{bridge_kind, source},
|
||||||
|
{connector_name, Name},
|
||||||
|
{connector_type, ?CONNECTOR_TYPE_BIN},
|
||||||
|
{source_type, ?SOURCE_TYPE_BIN},
|
||||||
|
{source_name, Name}
|
||||||
| Config
|
| Config
|
||||||
].
|
].
|
||||||
|
|
||||||
|
@ -372,7 +390,7 @@ end_per_testcase(_Testcase, Config) ->
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
ProducersConfigs = ?config(kafka_producers, Config),
|
ProducersConfigs = ?config(kafka_producers, Config),
|
||||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
delete_all_bridges(),
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
||||||
#{clientid := KafkaProducerClientId, producers := ProducersMapping} =
|
#{clientid := KafkaProducerClientId, producers := ProducersMapping} =
|
||||||
ProducersConfigs,
|
ProducersConfigs,
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
@ -681,19 +699,6 @@ create_bridge_wait_for_balance(Config) ->
|
||||||
kill_group_subscriber_spy()
|
kill_group_subscriber_spy()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
delete_bridge(Config) ->
|
|
||||||
Type = ?BRIDGE_TYPE_BIN,
|
|
||||||
Name = ?config(kafka_name, Config),
|
|
||||||
emqx_bridge:remove(Type, Name).
|
|
||||||
|
|
||||||
delete_all_bridges() ->
|
|
||||||
lists:foreach(
|
|
||||||
fun(#{name := Name, type := Type}) ->
|
|
||||||
emqx_bridge:remove(Type, Name)
|
|
||||||
end,
|
|
||||||
emqx_bridge:list()
|
|
||||||
).
|
|
||||||
|
|
||||||
create_bridge_api(Config) ->
|
create_bridge_api(Config) ->
|
||||||
create_bridge_api(Config, _Overrides = #{}).
|
create_bridge_api(Config, _Overrides = #{}).
|
||||||
|
|
||||||
|
@ -752,9 +757,8 @@ send_message(Config, Payload) ->
|
||||||
emqx_bridge:send_message(BridgeId, Payload).
|
emqx_bridge:send_message(BridgeId, Payload).
|
||||||
|
|
||||||
resource_id(Config) ->
|
resource_id(Config) ->
|
||||||
Type = ?BRIDGE_TYPE_BIN,
|
|
||||||
Name = ?config(kafka_name, Config),
|
Name = ?config(kafka_name, Config),
|
||||||
emqx_bridge_resource:resource_id(Type, Name).
|
emqx_bridge_v2:source_id(?SOURCE_TYPE_BIN, Name, Name).
|
||||||
|
|
||||||
instance_id(Config) ->
|
instance_id(Config) ->
|
||||||
ResourceId = resource_id(Config),
|
ResourceId = resource_id(Config),
|
||||||
|
@ -1084,6 +1088,12 @@ cluster(Config) ->
|
||||||
ct:pal("cluster: ~p", [Cluster]),
|
ct:pal("cluster: ~p", [Cluster]),
|
||||||
Cluster.
|
Cluster.
|
||||||
|
|
||||||
|
start_peer(Name, Opts) ->
|
||||||
|
Node = emqx_common_test_helpers:start_peer(Name, Opts),
|
||||||
|
% Make it possible to call `ct:pal` and friends (if running under rebar3)
|
||||||
|
_ = emqx_cth_cluster:share_load_module(Node, cthr),
|
||||||
|
Node.
|
||||||
|
|
||||||
start_async_publisher(Config, KafkaTopic) ->
|
start_async_publisher(Config, KafkaTopic) ->
|
||||||
TId = ets:new(kafka_payloads, [public, ordered_set]),
|
TId = ets:new(kafka_payloads, [public, ordered_set]),
|
||||||
Loop = fun Go() ->
|
Loop = fun Go() ->
|
||||||
|
@ -1129,6 +1139,15 @@ kill_resource_managers() ->
|
||||||
supervisor:which_children(emqx_resource_manager_sup)
|
supervisor:which_children(emqx_resource_manager_sup)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
health_check(Config) ->
|
||||||
|
health_check(node(), Config).
|
||||||
|
|
||||||
|
health_check(Node, Config) ->
|
||||||
|
erpc:call(Node, fun() ->
|
||||||
|
#{status := Status} = emqx_bridge_v2_testlib:health_check_channel(Config),
|
||||||
|
{ok, Status}
|
||||||
|
end).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -1344,19 +1363,13 @@ t_on_get_status(Config) ->
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
ProxyName = ?config(proxy_name, Config),
|
ProxyName = ?config(proxy_name, Config),
|
||||||
KafkaName = ?config(kafka_name, Config),
|
|
||||||
ResourceId = emqx_bridge_resource:resource_id(kafka_consumer, KafkaName),
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, _},
|
{ok, _},
|
||||||
create_bridge(Config)
|
create_bridge(Config)
|
||||||
),
|
),
|
||||||
%% Since the connection process is async, we give it some time to
|
?retry(100, 20, ?assertEqual({ok, connected}, health_check(Config))),
|
||||||
%% stabilize and avoid flakiness.
|
|
||||||
ct:sleep(1_200),
|
|
||||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
|
|
||||||
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||||
ct:sleep(500),
|
?retry(100, 20, ?assertEqual({ok, disconnected}, health_check(Config)))
|
||||||
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
|
|
||||||
end),
|
end),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -1390,14 +1403,16 @@ t_failed_creation_then_fixed(Config) ->
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{{ok, _}, {ok, _}},
|
{{ok, _}, {ok, _}},
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
update_bridge_api(Config),
|
update_bridge_api(Config, #{
|
||||||
|
<<"resource_opts">> =>
|
||||||
|
#{<<"health_check_interval">> => <<"1s">>}
|
||||||
|
}),
|
||||||
#{?snk_kind := kafka_consumer_subscriber_started},
|
#{?snk_kind := kafka_consumer_subscriber_started},
|
||||||
60_000
|
60_000
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
wait_until_subscribers_are_ready(NPartitions, 120_000),
|
wait_until_subscribers_are_ready(NPartitions, 120_000),
|
||||||
ResourceId = resource_id(Config),
|
?assertEqual({ok, connected}, health_check(Config)),
|
||||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
|
|
||||||
ping_until_healthy(Config, _Period = 1_500, _Timeout = 24_000),
|
ping_until_healthy(Config, _Period = 1_500, _Timeout = 24_000),
|
||||||
|
|
||||||
{ok, C} = emqtt:start_link(),
|
{ok, C} = emqtt:start_link(),
|
||||||
|
@ -1459,7 +1474,6 @@ t_receive_after_recovery(Config) ->
|
||||||
KafkaName = ?config(kafka_name, Config),
|
KafkaName = ?config(kafka_name, Config),
|
||||||
KafkaNameA = binary_to_atom(KafkaName),
|
KafkaNameA = binary_to_atom(KafkaName),
|
||||||
KafkaClientId = consumer_clientid(Config),
|
KafkaClientId = consumer_clientid(Config),
|
||||||
ResourceId = resource_id(Config),
|
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
{ok, _} = create_bridge(
|
{ok, _} = create_bridge(
|
||||||
|
@ -1467,7 +1481,7 @@ t_receive_after_recovery(Config) ->
|
||||||
#{<<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}}
|
#{<<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}}
|
||||||
),
|
),
|
||||||
ping_until_healthy(Config, _Period = 1_500, _Timeout0 = 24_000),
|
ping_until_healthy(Config, _Period = 1_500, _Timeout0 = 24_000),
|
||||||
{ok, connected} = emqx_resource_manager:health_check(ResourceId),
|
{ok, connected} = health_check(Config),
|
||||||
%% 0) ensure each partition commits its offset so it can
|
%% 0) ensure each partition commits its offset so it can
|
||||||
%% recover later.
|
%% recover later.
|
||||||
Messages0 = [
|
Messages0 = [
|
||||||
|
@ -1718,14 +1732,13 @@ t_cluster_group(Config) ->
|
||||||
NPartitions = ?config(num_partitions, Config),
|
NPartitions = ?config(num_partitions, Config),
|
||||||
KafkaTopic = ?config(kafka_topic, Config),
|
KafkaTopic = ?config(kafka_topic, Config),
|
||||||
KafkaName = ?config(kafka_name, Config),
|
KafkaName = ?config(kafka_name, Config),
|
||||||
ResourceId = resource_id(Config),
|
|
||||||
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
|
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
|
||||||
Cluster = cluster(Config),
|
Cluster = cluster(Config),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
Nodes =
|
Nodes =
|
||||||
[_N1, N2 | _] = [
|
[_N1, N2 | _] = [
|
||||||
emqx_common_test_helpers:start_peer(Name, Opts)
|
start_peer(Name, Opts)
|
||||||
|| {Name, Opts} <- Cluster
|
|| {Name, Opts} <- Cluster
|
||||||
],
|
],
|
||||||
on_exit(fun() ->
|
on_exit(fun() ->
|
||||||
|
@ -1765,7 +1778,7 @@ t_cluster_group(Config) ->
|
||||||
fun(N) ->
|
fun(N) ->
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{ok, connected},
|
{ok, connected},
|
||||||
erpc:call(N, emqx_resource_manager, health_check, [ResourceId]),
|
health_check(N, Config),
|
||||||
#{node => N}
|
#{node => N}
|
||||||
)
|
)
|
||||||
end,
|
end,
|
||||||
|
@ -1801,14 +1814,13 @@ t_node_joins_existing_cluster(Config) ->
|
||||||
NPartitions = ?config(num_partitions, Config),
|
NPartitions = ?config(num_partitions, Config),
|
||||||
KafkaTopic = ?config(kafka_topic, Config),
|
KafkaTopic = ?config(kafka_topic, Config),
|
||||||
KafkaName = ?config(kafka_name, Config),
|
KafkaName = ?config(kafka_name, Config),
|
||||||
ResourceId = resource_id(Config),
|
|
||||||
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
|
BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName),
|
||||||
Cluster = cluster(Config),
|
Cluster = cluster(Config),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
[{Name1, Opts1}, {Name2, Opts2} | _] = Cluster,
|
[{Name1, Opts1}, {Name2, Opts2} | _] = Cluster,
|
||||||
ct:pal("starting ~p", [Name1]),
|
ct:pal("starting ~p", [Name1]),
|
||||||
N1 = emqx_common_test_helpers:start_peer(Name1, Opts1),
|
N1 = start_peer(Name1, Opts1),
|
||||||
on_exit(fun() ->
|
on_exit(fun() ->
|
||||||
ct:pal("stopping ~p", [N1]),
|
ct:pal("stopping ~p", [N1]),
|
||||||
ok = emqx_common_test_helpers:stop_peer(N1)
|
ok = emqx_common_test_helpers:stop_peer(N1)
|
||||||
|
@ -1834,7 +1846,7 @@ t_node_joins_existing_cluster(Config) ->
|
||||||
{ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, [N1], 30_000),
|
{ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, [N1], 30_000),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
{ok, connected},
|
{ok, connected},
|
||||||
erpc:call(N1, emqx_resource_manager, health_check, [ResourceId])
|
health_check(N1, Config)
|
||||||
),
|
),
|
||||||
|
|
||||||
%% Now, we start the second node and have it join the cluster.
|
%% Now, we start the second node and have it join the cluster.
|
||||||
|
@ -1851,7 +1863,7 @@ t_node_joins_existing_cluster(Config) ->
|
||||||
30_000
|
30_000
|
||||||
),
|
),
|
||||||
ct:pal("starting ~p", [Name2]),
|
ct:pal("starting ~p", [Name2]),
|
||||||
N2 = emqx_common_test_helpers:start_peer(Name2, Opts2),
|
N2 = start_peer(Name2, Opts2),
|
||||||
on_exit(fun() ->
|
on_exit(fun() ->
|
||||||
ct:pal("stopping ~p", [N2]),
|
ct:pal("stopping ~p", [N2]),
|
||||||
ok = emqx_common_test_helpers:stop_peer(N2)
|
ok = emqx_common_test_helpers:stop_peer(N2)
|
||||||
|
@ -1944,7 +1956,7 @@ t_cluster_node_down(Config) ->
|
||||||
lists:map(
|
lists:map(
|
||||||
fun({Name, Opts}) ->
|
fun({Name, Opts}) ->
|
||||||
ct:pal("starting ~p", [Name]),
|
ct:pal("starting ~p", [Name]),
|
||||||
emqx_common_test_helpers:start_peer(Name, Opts)
|
start_peer(Name, Opts)
|
||||||
end,
|
end,
|
||||||
Cluster
|
Cluster
|
||||||
),
|
),
|
||||||
|
@ -2078,7 +2090,9 @@ t_begin_offset_earliest(Config) ->
|
||||||
),
|
),
|
||||||
#{payloads => Payloads}
|
#{payloads => Payloads}
|
||||||
),
|
),
|
||||||
?assertEqual(NumMessages, emqx_resource_metrics:received_get(ResourceId)),
|
?retry(
|
||||||
|
100, 20, ?assertEqual(NumMessages, emqx_resource_metrics:received_get(ResourceId))
|
||||||
|
),
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
@ -2130,7 +2144,6 @@ t_resource_manager_crash_after_subscriber_started(Config) ->
|
||||||
_ ->
|
_ ->
|
||||||
ct:fail("unexpected result: ~p", [Res])
|
ct:fail("unexpected result: ~p", [Res])
|
||||||
end,
|
end,
|
||||||
?assertMatch(ok, delete_bridge(Config)),
|
|
||||||
?retry(
|
?retry(
|
||||||
_Sleep = 50,
|
_Sleep = 50,
|
||||||
_Attempts = 50,
|
_Attempts = 50,
|
||||||
|
@ -2143,10 +2156,11 @@ t_resource_manager_crash_after_subscriber_started(Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_resource_manager_crash_before_subscriber_started(Config) ->
|
t_resource_manager_crash_before_subscriber_started(Config) ->
|
||||||
|
Name = ?config(kafka_name, Config),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
?force_ordering(
|
?force_ordering(
|
||||||
#{?snk_kind := kafka_consumer_client_started},
|
#{?snk_kind := kafka_consumer_sup_started},
|
||||||
#{?snk_kind := will_kill_resource_manager}
|
#{?snk_kind := will_kill_resource_manager}
|
||||||
),
|
),
|
||||||
?force_ordering(
|
?force_ordering(
|
||||||
|
@ -2183,11 +2197,15 @@ t_resource_manager_crash_before_subscriber_started(Config) ->
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
%% the new manager may have had time to startup
|
%% the new manager may have had time to startup
|
||||||
%% before the resource status cache is read...
|
%% before the resource status cache is read...
|
||||||
|
{ok, {{_, 204, _}, _, _}} =
|
||||||
|
emqx_bridge_testlib:delete_bridge_http_api_v1(#{
|
||||||
|
name => Name,
|
||||||
|
type => ?BRIDGE_TYPE_BIN
|
||||||
|
}),
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
ct:fail("unexpected result: ~p", [Res])
|
ct:fail("unexpected result: ~p", [Res])
|
||||||
end,
|
end,
|
||||||
?assertMatch(ok, delete_bridge(Config)),
|
|
||||||
?retry(
|
?retry(
|
||||||
_Sleep = 50,
|
_Sleep = 50,
|
||||||
_Attempts = 50,
|
_Attempts = 50,
|
||||||
|
|
|
@ -0,0 +1,341 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_v2_kafka_consumer_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
|
-define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>).
|
||||||
|
-define(SOURCE_TYPE_BIN, <<"kafka_consumer">>).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% CT boilerplate
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
All0 = emqx_common_test_helpers:all(?MODULE),
|
||||||
|
All = All0 -- matrix_cases(),
|
||||||
|
Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()),
|
||||||
|
Groups ++ All.
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()).
|
||||||
|
|
||||||
|
matrix_cases() ->
|
||||||
|
[
|
||||||
|
t_start_stop
|
||||||
|
].
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_bridge_kafka_impl_consumer_SUITE:init_per_suite(Config).
|
||||||
|
|
||||||
|
end_per_suite(Config) ->
|
||||||
|
emqx_bridge_kafka_impl_consumer_SUITE:end_per_suite(Config).
|
||||||
|
|
||||||
|
init_per_testcase(TestCase, Config) ->
|
||||||
|
common_init_per_testcase(TestCase, Config).
|
||||||
|
|
||||||
|
common_init_per_testcase(TestCase, Config0) ->
|
||||||
|
ct:timetrap({seconds, 60}),
|
||||||
|
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
||||||
|
Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
|
||||||
|
ConnectorConfig = connector_config(Name, Config0),
|
||||||
|
Topic = Name,
|
||||||
|
SourceConfig = source_config(#{
|
||||||
|
connector => Name,
|
||||||
|
parameters => #{topic => Topic}
|
||||||
|
}),
|
||||||
|
Config1 = ensure_topic_and_producers(ConnectorConfig, SourceConfig, TestCase, Config0),
|
||||||
|
ct:comment(get_matrix_params(Config1)),
|
||||||
|
[
|
||||||
|
{kafka_topic, Topic},
|
||||||
|
{bridge_kind, source},
|
||||||
|
{source_type, ?SOURCE_TYPE_BIN},
|
||||||
|
{source_name, Name},
|
||||||
|
{source_config, SourceConfig},
|
||||||
|
{connector_name, Name},
|
||||||
|
{connector_type, ?CONNECTOR_TYPE_BIN},
|
||||||
|
{connector_config, ConnectorConfig},
|
||||||
|
{proxy_host, "toxiproxy"},
|
||||||
|
{proxy_port, 8474}
|
||||||
|
| Config1
|
||||||
|
].
|
||||||
|
|
||||||
|
end_per_testcase(TestCase, Config) ->
|
||||||
|
emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
auth_config(Config) ->
|
||||||
|
AuthType0 = maps:get(auth, get_matrix_params(Config)),
|
||||||
|
AuthType =
|
||||||
|
case AuthType0 of
|
||||||
|
none -> none;
|
||||||
|
sasl_auth_plain -> plain;
|
||||||
|
sasl_auth_scram256 -> scram_sha_256;
|
||||||
|
sasl_auth_scram512 -> scram_sha_512;
|
||||||
|
sasl_auth_kerberos -> kerberos
|
||||||
|
end,
|
||||||
|
{ok, #{<<"authentication">> := Auth}} =
|
||||||
|
hocon:binary(emqx_bridge_kafka_impl_consumer_SUITE:authentication(AuthType)),
|
||||||
|
Auth.
|
||||||
|
|
||||||
|
get_matrix_params(Config) ->
|
||||||
|
case group_path(Config) of
|
||||||
|
undefined ->
|
||||||
|
#{
|
||||||
|
host => <<"toxiproxy.emqx.net">>,
|
||||||
|
port => 9292,
|
||||||
|
tls => plain,
|
||||||
|
auth => none,
|
||||||
|
proxy_name => "kafka_plain"
|
||||||
|
};
|
||||||
|
[TLS, Auth | _] ->
|
||||||
|
#{
|
||||||
|
host => <<"toxiproxy.emqx.net">>,
|
||||||
|
port => toxiproxy_kafka_port(#{tls => TLS, auth => Auth}),
|
||||||
|
tls => TLS,
|
||||||
|
auth => Auth,
|
||||||
|
proxy_name => toxiproxy_proxy_name(#{tls => TLS, auth => Auth})
|
||||||
|
}
|
||||||
|
end.
|
||||||
|
|
||||||
|
toxiproxy_kafka_port(#{tls := plain, auth := none}) -> 9292;
|
||||||
|
toxiproxy_kafka_port(#{tls := tls, auth := none}) -> 9294;
|
||||||
|
toxiproxy_kafka_port(#{tls := tls, auth := sasl_auth_kerberos}) -> 9095;
|
||||||
|
toxiproxy_kafka_port(#{tls := plain, auth := sasl_auth_kerberos}) -> 9093;
|
||||||
|
toxiproxy_kafka_port(#{tls := plain, auth := _}) -> 9293;
|
||||||
|
toxiproxy_kafka_port(#{tls := tls, auth := _}) -> 9295.
|
||||||
|
|
||||||
|
toxiproxy_proxy_name(#{tls := plain, auth := none}) -> "kafka_plain";
|
||||||
|
toxiproxy_proxy_name(#{tls := tls, auth := none}) -> "kafka_ssl";
|
||||||
|
toxiproxy_proxy_name(#{tls := plain, auth := _}) -> "kafka_sasl_plain";
|
||||||
|
toxiproxy_proxy_name(#{tls := tls, auth := _}) -> "kafka_sasl_ssl".
|
||||||
|
|
||||||
|
toxiproxy_host(#{auth := sasl_auth_kerberos}) -> <<"kafka-1.emqx.net">>;
|
||||||
|
toxiproxy_host(_) -> <<"toxiproxy.emqx.net">>.
|
||||||
|
|
||||||
|
group_path(Config) ->
|
||||||
|
case emqx_common_test_helpers:group_path(Config) of
|
||||||
|
[] ->
|
||||||
|
undefined;
|
||||||
|
Path ->
|
||||||
|
Path
|
||||||
|
end.
|
||||||
|
|
||||||
|
merge(Maps) ->
|
||||||
|
lists:foldl(fun(M, Acc) -> emqx_utils_maps:deep_merge(Acc, M) end, #{}, Maps).
|
||||||
|
|
||||||
|
ensure_topic_and_producers(ConnectorConfig, SourceConfig, TestCase, TCConfig) ->
|
||||||
|
#{tls := TLS, auth := Auth} = get_matrix_params(TCConfig),
|
||||||
|
Topic = emqx_utils_maps:deep_get([<<"parameters">>, <<"topic">>], SourceConfig),
|
||||||
|
[{Host, Port}] = emqx_bridge_kafka_impl:hosts(maps:get(<<"bootstrap_hosts">>, ConnectorConfig)),
|
||||||
|
CreateConfig = maps:to_list(#{
|
||||||
|
topic_mapping => [#{kafka_topic => Topic}],
|
||||||
|
kafka_host => Host,
|
||||||
|
kafka_port => Port,
|
||||||
|
direct_kafka_host => Host,
|
||||||
|
direct_kafka_port => Port,
|
||||||
|
use_tls => TLS =:= tls,
|
||||||
|
use_sasl => Auth =/= none,
|
||||||
|
num_partitions => 1
|
||||||
|
}),
|
||||||
|
ok = emqx_bridge_kafka_impl_consumer_SUITE:ensure_topics(CreateConfig),
|
||||||
|
ProducerConfigs = emqx_bridge_kafka_impl_consumer_SUITE:start_producers(TestCase, CreateConfig),
|
||||||
|
[{kafka_producers, ProducerConfigs} | TCConfig].
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Helper fns
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
connector_config(Name, Config) ->
|
||||||
|
connector_config1(
|
||||||
|
Name,
|
||||||
|
connector_overrides(Config)
|
||||||
|
).
|
||||||
|
|
||||||
|
connector_config1(Name, Overrides0 = #{}) ->
|
||||||
|
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
|
||||||
|
InnerConfigMap0 =
|
||||||
|
#{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"tags">> => [<<"bridge">>],
|
||||||
|
<<"description">> => <<"my cool bridge">>,
|
||||||
|
|
||||||
|
<<"authentication">> => <<"please override">>,
|
||||||
|
<<"bootstrap_hosts">> => <<"please override">>,
|
||||||
|
<<"connect_timeout">> => <<"5s">>,
|
||||||
|
<<"metadata_request_timeout">> => <<"5s">>,
|
||||||
|
<<"min_metadata_refresh_interval">> => <<"3s">>,
|
||||||
|
|
||||||
|
<<"resource_opts">> =>
|
||||||
|
#{
|
||||||
|
<<"health_check_interval">> => <<"2s">>,
|
||||||
|
<<"start_after_created">> => true,
|
||||||
|
<<"start_timeout">> => <<"5s">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
InnerConfigMap = emqx_utils_maps:deep_merge(InnerConfigMap0, Overrides),
|
||||||
|
emqx_bridge_v2_testlib:parse_and_check_connector(?SOURCE_TYPE_BIN, Name, InnerConfigMap).
|
||||||
|
|
||||||
|
connector_overrides(TCConfig) ->
|
||||||
|
MatrixParams = #{tls := TLS} = get_matrix_params(TCConfig),
|
||||||
|
Host = toxiproxy_host(MatrixParams),
|
||||||
|
Port = toxiproxy_kafka_port(MatrixParams),
|
||||||
|
BootstrapHosts = <<Host/binary, ":", (integer_to_binary(Port))/binary>>,
|
||||||
|
AuthConfig = auth_config(TCConfig),
|
||||||
|
#{
|
||||||
|
<<"bootstrap_hosts">> => BootstrapHosts,
|
||||||
|
<<"authentication">> => AuthConfig,
|
||||||
|
<<"ssl">> => #{<<"enable">> => TLS =:= tls}
|
||||||
|
}.
|
||||||
|
|
||||||
|
source_config(Overrides0) ->
|
||||||
|
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
|
||||||
|
CommonConfig =
|
||||||
|
#{
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"connector">> => <<"please override">>,
|
||||||
|
<<"parameters">> =>
|
||||||
|
#{
|
||||||
|
<<"key_encoding_mode">> => <<"none">>,
|
||||||
|
<<"max_batch_bytes">> => <<"896KB">>,
|
||||||
|
<<"max_rejoin_attempts">> => <<"5">>,
|
||||||
|
<<"offset_reset_policy">> => <<"latest">>,
|
||||||
|
<<"topic">> => <<"please override">>,
|
||||||
|
<<"value_encoding_mode">> => <<"none">>
|
||||||
|
},
|
||||||
|
<<"resource_opts">> => #{
|
||||||
|
<<"health_check_interval">> => <<"2s">>,
|
||||||
|
<<"resume_interval">> => <<"2s">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
maps:merge(CommonConfig, Overrides).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Testcases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_start_stop(matrix) ->
|
||||||
|
[
|
||||||
|
[plain, none],
|
||||||
|
[plain, sasl_auth_plain],
|
||||||
|
[plain, sasl_auth_scram256],
|
||||||
|
[plain, sasl_auth_scram512],
|
||||||
|
[plain, sasl_auth_kerberos],
|
||||||
|
[tls, none],
|
||||||
|
[tls, sasl_auth_plain]
|
||||||
|
];
|
||||||
|
t_start_stop(Config) ->
|
||||||
|
ok = emqx_bridge_v2_testlib:t_start_stop(Config, kafka_consumer_subcriber_and_client_stopped),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_create_via_http(Config) ->
|
||||||
|
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_consume(Config) ->
|
||||||
|
Topic = ?config(kafka_topic, Config),
|
||||||
|
NumPartitions = 1,
|
||||||
|
Key = <<"mykey">>,
|
||||||
|
Payload = #{<<"key">> => <<"value">>},
|
||||||
|
Encoded = emqx_utils_json:encode(Payload),
|
||||||
|
Headers = [{<<"hkey">>, <<"hvalue">>}],
|
||||||
|
HeadersMap = maps:from_list(Headers),
|
||||||
|
ProduceFn = fun() ->
|
||||||
|
emqx_bridge_kafka_impl_consumer_SUITE:publish(
|
||||||
|
Config,
|
||||||
|
Topic,
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
key => Key,
|
||||||
|
value => Encoded,
|
||||||
|
headers => Headers
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
CheckFn = fun(Message) ->
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
headers := HeadersMap,
|
||||||
|
key := Key,
|
||||||
|
offset := _,
|
||||||
|
topic := Topic,
|
||||||
|
ts := _,
|
||||||
|
ts_type := _,
|
||||||
|
value := Encoded
|
||||||
|
},
|
||||||
|
Message
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
ok = emqx_bridge_v2_testlib:t_consume(
|
||||||
|
Config,
|
||||||
|
#{
|
||||||
|
consumer_ready_tracepoint => ?match_n_events(
|
||||||
|
NumPartitions,
|
||||||
|
#{?snk_kind := kafka_consumer_subscriber_init}
|
||||||
|
),
|
||||||
|
produce_fn => ProduceFn,
|
||||||
|
check_fn => CheckFn,
|
||||||
|
produce_tracepoint => ?match_event(
|
||||||
|
#{
|
||||||
|
?snk_kind := kafka_consumer_handle_message,
|
||||||
|
?snk_span := {complete, _}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_update_topic(Config) ->
|
||||||
|
%% Tests that, if a bridge originally has the legacy field `topic_mapping' filled in
|
||||||
|
%% and later is updated using v2 APIs, then the legacy field is cleared and the new
|
||||||
|
%% `topic' field is used.
|
||||||
|
ConnectorConfig = ?config(connector_config, Config),
|
||||||
|
SourceConfig = ?config(source_config, Config),
|
||||||
|
Name = ?config(source_name, Config),
|
||||||
|
V1Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config(
|
||||||
|
?SOURCE_TYPE_BIN,
|
||||||
|
ConnectorConfig,
|
||||||
|
SourceConfig
|
||||||
|
),
|
||||||
|
V1Config = emqx_utils_maps:deep_put(
|
||||||
|
[<<"kafka">>, <<"topic_mapping">>],
|
||||||
|
V1Config0,
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"kafka_topic">> => <<"old_topic">>,
|
||||||
|
<<"mqtt_topic">> => <<"">>,
|
||||||
|
<<"qos">> => 2,
|
||||||
|
<<"payload_template">> => <<"template">>
|
||||||
|
}
|
||||||
|
]
|
||||||
|
),
|
||||||
|
%% Note: using v1 API
|
||||||
|
{ok, {{_, 201, _}, _, _}} = emqx_bridge_testlib:create_bridge_api(
|
||||||
|
?SOURCE_TYPE_BIN,
|
||||||
|
Name,
|
||||||
|
V1Config
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 200, _}, _, #{<<"parameters">> := #{<<"topic">> := <<"old_topic">>}}}},
|
||||||
|
emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
|
||||||
|
),
|
||||||
|
%% Note: we don't add `topic_mapping' again here to the parameters.
|
||||||
|
{ok, {{_, 200, _}, _, _}} = emqx_bridge_v2_testlib:update_bridge_api(
|
||||||
|
Config,
|
||||||
|
#{<<"parameters">> => #{<<"topic">> => <<"new_topic">>}}
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 200, _}, _, #{<<"parameters">> := #{<<"topic">> := <<"new_topic">>}}}},
|
||||||
|
emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
|
||||||
|
),
|
||||||
|
ok.
|
|
@ -159,7 +159,7 @@ update(ConnectorId, {OldConf, Conf}) ->
|
||||||
update(Type, Name, {OldConf, Conf}) ->
|
update(Type, Name, {OldConf, Conf}) ->
|
||||||
update(Type, Name, {OldConf, Conf}, #{}).
|
update(Type, Name, {OldConf, Conf}, #{}).
|
||||||
|
|
||||||
update(Type, Name, {OldConf, Conf}, Opts) ->
|
update(Type, Name, {OldConf, Conf0}, Opts) ->
|
||||||
%% TODO: sometimes its not necessary to restart the connector connection.
|
%% TODO: sometimes its not necessary to restart the connector connection.
|
||||||
%%
|
%%
|
||||||
%% - if the connection related configs like `servers` is updated, we should restart/start
|
%% - if the connection related configs like `servers` is updated, we should restart/start
|
||||||
|
@ -168,6 +168,7 @@ update(Type, Name, {OldConf, Conf}, Opts) ->
|
||||||
%% the `method` or `headers` of a WebHook is changed, then the connector can be updated
|
%% the `method` or `headers` of a WebHook is changed, then the connector can be updated
|
||||||
%% without restarting the connector.
|
%% without restarting the connector.
|
||||||
%%
|
%%
|
||||||
|
Conf = Conf0#{connector_type => bin(Type), connector_name => bin(Name)},
|
||||||
case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
|
case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
|
||||||
false ->
|
false ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
|
|
|
@ -34,6 +34,8 @@ resource_type(gcp_pubsub_producer) ->
|
||||||
emqx_bridge_gcp_pubsub_impl_producer;
|
emqx_bridge_gcp_pubsub_impl_producer;
|
||||||
resource_type(hstreamdb) ->
|
resource_type(hstreamdb) ->
|
||||||
emqx_bridge_hstreamdb_connector;
|
emqx_bridge_hstreamdb_connector;
|
||||||
|
resource_type(kafka_consumer) ->
|
||||||
|
emqx_bridge_kafka_impl_consumer;
|
||||||
resource_type(kafka_producer) ->
|
resource_type(kafka_producer) ->
|
||||||
emqx_bridge_kafka_impl_producer;
|
emqx_bridge_kafka_impl_producer;
|
||||||
resource_type(kinesis) ->
|
resource_type(kinesis) ->
|
||||||
|
@ -160,11 +162,19 @@ connector_structs() ->
|
||||||
required => false
|
required => false
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
{kafka_consumer,
|
||||||
|
mk(
|
||||||
|
hoconsc:map(name, ref(emqx_bridge_kafka_consumer_schema, "config_connector")),
|
||||||
|
#{
|
||||||
|
desc => <<"Kafka Consumer Connector Config">>,
|
||||||
|
required => false
|
||||||
|
}
|
||||||
|
)},
|
||||||
{kafka_producer,
|
{kafka_producer,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
|
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
|
||||||
#{
|
#{
|
||||||
desc => <<"Kafka Connector Config">>,
|
desc => <<"Kafka Producer Connector Config">>,
|
||||||
required => false
|
required => false
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
@ -356,6 +366,7 @@ schema_modules() ->
|
||||||
emqx_bridge_gcp_pubsub_producer_schema,
|
emqx_bridge_gcp_pubsub_producer_schema,
|
||||||
emqx_bridge_hstreamdb,
|
emqx_bridge_hstreamdb,
|
||||||
emqx_bridge_kafka,
|
emqx_bridge_kafka,
|
||||||
|
emqx_bridge_kafka_consumer_schema,
|
||||||
emqx_bridge_kinesis,
|
emqx_bridge_kinesis,
|
||||||
emqx_bridge_matrix,
|
emqx_bridge_matrix,
|
||||||
emqx_bridge_mongodb,
|
emqx_bridge_mongodb,
|
||||||
|
@ -405,6 +416,7 @@ api_schemas(Method) ->
|
||||||
),
|
),
|
||||||
api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"),
|
||||||
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
|
||||||
|
api_ref(emqx_bridge_kafka_consumer_schema, <<"kafka_consumer">>, Method ++ "_connector"),
|
||||||
api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
|
||||||
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
|
||||||
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
|
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
|
||||||
|
|
|
@ -134,6 +134,8 @@ connector_type_to_bridge_types(gcp_pubsub_producer) ->
|
||||||
[gcp_pubsub, gcp_pubsub_producer];
|
[gcp_pubsub, gcp_pubsub_producer];
|
||||||
connector_type_to_bridge_types(hstreamdb) ->
|
connector_type_to_bridge_types(hstreamdb) ->
|
||||||
[hstreamdb];
|
[hstreamdb];
|
||||||
|
connector_type_to_bridge_types(kafka_consumer) ->
|
||||||
|
[kafka_consumer];
|
||||||
connector_type_to_bridge_types(kafka_producer) ->
|
connector_type_to_bridge_types(kafka_producer) ->
|
||||||
[kafka, kafka_producer];
|
[kafka, kafka_producer];
|
||||||
connector_type_to_bridge_types(kinesis) ->
|
connector_type_to_bridge_types(kinesis) ->
|
||||||
|
@ -207,7 +209,11 @@ bridge_configs_to_transform(
|
||||||
emqx_utils_maps:deep_get(
|
emqx_utils_maps:deep_get(
|
||||||
[<<"actions">>, to_bin(BridgeType), to_bin(BridgeName)],
|
[<<"actions">>, to_bin(BridgeType), to_bin(BridgeName)],
|
||||||
RawConfig,
|
RawConfig,
|
||||||
undefined
|
emqx_utils_maps:deep_get(
|
||||||
|
[<<"sources">>, to_bin(BridgeType), to_bin(BridgeName)],
|
||||||
|
RawConfig,
|
||||||
|
undefined
|
||||||
|
)
|
||||||
),
|
),
|
||||||
[
|
[
|
||||||
{BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}
|
{BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}
|
||||||
|
|
|
@ -136,20 +136,30 @@ t_connector_lifecycle(_Config) ->
|
||||||
?assert(meck:validate(?CONNECTOR)),
|
?assert(meck:validate(?CONNECTOR)),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[
|
[
|
||||||
{_, {?CONNECTOR, callback_mode, []}, _},
|
|
||||||
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
|
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
|
||||||
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
|
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
|
||||||
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
|
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
|
||||||
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
|
|
||||||
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
|
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
|
||||||
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
|
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
|
||||||
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
|
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok},
|
||||||
{_, {?CONNECTOR, callback_mode, []}, _},
|
|
||||||
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
|
{_, {?CONNECTOR, on_start, [_, _]}, {ok, connector_state}},
|
||||||
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
|
{_, {?CONNECTOR, on_get_status, [_, connector_state]}, connected},
|
||||||
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok}
|
{_, {?CONNECTOR, on_stop, [_, connector_state]}, ok}
|
||||||
],
|
],
|
||||||
meck:history(?CONNECTOR)
|
lists:filter(
|
||||||
|
fun({_, {?CONNECTOR, Fun, _Args}, _}) ->
|
||||||
|
lists:member(
|
||||||
|
Fun, [
|
||||||
|
on_start,
|
||||||
|
on_stop,
|
||||||
|
on_get_channels,
|
||||||
|
on_get_status,
|
||||||
|
on_add_channel
|
||||||
|
]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
meck:history(?CONNECTOR)
|
||||||
|
)
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
The Kafka Consumer bridge has been split into connector and source components. Old Kafka Consumer bridges will be upgraded automatically.
|
|
@ -0,0 +1,18 @@
|
||||||
|
emqx_bridge_kafka_consumer_schema {
|
||||||
|
|
||||||
|
source_parameters.desc:
|
||||||
|
"""Source specific configs."""
|
||||||
|
source_parameters.label:
|
||||||
|
"""Source Specific Configs"""
|
||||||
|
|
||||||
|
consumer_source.desc:
|
||||||
|
"""Source configs."""
|
||||||
|
consumer_source.label:
|
||||||
|
"""Source"""
|
||||||
|
|
||||||
|
config_connector.desc:
|
||||||
|
"""Configuration for a Kafka Consumer Client."""
|
||||||
|
config_connector.label:
|
||||||
|
"""Kafka Consumer Client Configuration"""
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue