feat: test case fixes and compatibility layer probe etc

* test case fixes for Bridge V1 suite
* Bug fixes
* local_topic
* Bridge V1 probe compatibility functionality
This commit is contained in:
Kjell Winblad 2023-10-12 18:58:16 +02:00 committed by Zaiming (Stone) Shi
parent 1c62b5bcf3
commit 828bbc57ac
7 changed files with 369 additions and 300 deletions

View File

@ -542,7 +542,8 @@ schema("/bridges_probe") ->
case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
{ok, #{body := #{<<"type">> := ConnType} = Params}} ->
Params1 = maybe_deobfuscate_bridge_probe(Params),
case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of
Params2 = maps:remove(<<"type">>, Params1),
case emqx_bridge_resource:create_dry_run(ConnType, Params2) of
ok ->
?NO_CONTENT;
{error, #{kind := validation_error} = Reason0} ->

View File

@ -70,7 +70,8 @@
send_message/4,
start/2,
stop/2,
restart/2
restart/2,
reset_metrics/2
]).
%% Config Update Handler API
@ -188,6 +189,19 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
Error
end.
do_send_msg_with_enabled_config(
BridgeType, BridgeName, Message, QueryOpts0, Config
) ->
QueryMode = get_query_mode(BridgeType, Config),
QueryOpts = maps:merge(
emqx_bridge:query_opts(Config),
QueryOpts0#{
query_mode => QueryMode
}
),
BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName),
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
health_check(BridgeType, BridgeName) ->
case lookup_raw_conf(BridgeType, BridgeName) of
#{
@ -219,10 +233,10 @@ restart(Type, Name) ->
stop(Type, Name),
start(Type, Name).
%% TODO: The following functions just restart the bridge_v2 as a temporary solution.
%% TODO: it is not clear what these operations should do
stop(Type, Name) ->
%% Stop means that we should remove the channel from the connector and reset the metrrics
%% Stop means that we should remove the channel from the connector and reset the metrics
%% The emqx_resource_buffer_worker is not stopped
stop_helper(Type, Name, lookup_raw_conf(Type, Name)).
@ -230,7 +244,7 @@ stop_helper(_Type, _Name, #{enable := false}) ->
ok;
stop_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
ok = emqx_resource:clear_metrics(BridgeV2Id),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id),
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
),
@ -242,59 +256,22 @@ start(Type, Name) ->
start_helper(_Type, _Name, #{enable := false}) ->
ok;
start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) ->
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
%% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, #{connector => ConnectorName}).
emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config).
% do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config) ->
% BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName),
% ConnectorResourceId = emqx_bridge_v2:extract_connector_id_from_bridge_v2_id(BridgeV2Id),
% try
% case emqx_resource_manager:maybe_install_bridge_v2(ConnectorResourceId, BridgeV2Id) of
% ok ->
% do_send_msg_after_bridge_v2_installed(
% BridgeType,
% BridgeName,
% BridgeV2Id,
% Message,
% QueryOpts0,
% Config
% );
% InstallError ->
% throw(InstallError)
% end
% catch
% Error:Reason:Stack ->
% Msg = iolist_to_binary(
% io_lib:format(
% "Failed to install bridge_v2 ~p in connector ~p: ~p",
% [BridgeV2Id, ConnectorResourceId, Reason]
% )
% ),
% ?SLOG(error, #{
% msg => Msg,
% error => Error,
% reason => Reason,
% stacktrace => Stack
% })
% end.
reset_metrics(Type, Name) ->
reset_metrics_helper(Type, Name, lookup_raw_conf(Type, Name)).
do_send_msg_with_enabled_config(
BridgeType, BridgeName, Message, QueryOpts0, Config
) ->
QueryMode = get_query_mode(BridgeType, Config),
QueryOpts = maps:merge(
emqx_bridge:query_opts(Config),
QueryOpts0#{
query_mode => QueryMode
}
),
BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName),
emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts).
reset_metrics_helper(_Type, _Name, #{enable := false}) ->
ok;
reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) ->
BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id).
parse_id(Id) ->
case binary:split(Id, <<":">>, [global]) of
@ -401,7 +378,7 @@ lookup(Id) ->
lookup(Type, Name) ->
case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of
not_found ->
{error, bridge_not_found};
{error, not_found};
#{<<"connector">> := BridgeConnector} = RawConf ->
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(Type), BridgeConnector
@ -605,19 +582,37 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) ->
end.
bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
RawConf = maps:without(<<"name">>, RawConfig0),
RawConf = maps:without([<<"name">>], RawConfig0),
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
#{
connector_type := _ConnectorType,
connector_type := ConnectorType,
connector_name := _NewConnectorName,
connector_conf := _NewConnectorRawConf,
bridge_v2_type := _BridgeType,
bridge_v2_name := _BridgeName,
bridge_v2_conf := _NewBridgeV2RawConf
} =
split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf),
% TODO once we have implemented the dry-run for channels we should use it here
ok.
connector_conf := ConnectorRawConf,
bridge_v2_type := BridgeType,
bridge_v2_name := BridgeName,
bridge_v2_conf := BridgeV2RawConf
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf),
OnReadyCallback =
fun(ConnectorId) ->
{_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
ChannelTestId = id(BridgeType, BridgeName, ConnectorName),
BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf),
case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, BridgeV2Conf) of
{error, Reason} ->
{error, Reason};
ok ->
HealthCheckResult = emqx_resource_manager:channel_health_check(
ConnectorId, ChannelTestId
),
case HealthCheckResult of
{error, Reason} ->
{error, Reason};
_ ->
ok
end
end
end,
emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback).
%% NOTE: This function can cause broken references but it is only called from
%% test cases.

View File

@ -162,11 +162,9 @@ t_query_mode(CtConfig) ->
%% Test cases for all combinations of SSL, no SSL and authentication types
%%------------------------------------------------------------------------------
%% OK
t_publish_no_auth(CtConfig) ->
publish_with_and_without_ssl(CtConfig, "none").
%% OK
t_publish_no_auth_key_dispatch(CtConfig) ->
publish_with_and_without_ssl(CtConfig, "none", #{"partition_strategy" => "key_dispatch"}).
@ -186,8 +184,8 @@ t_publish_no_auth_key_dispatch(CtConfig) ->
%% Test cases for REST api
%%------------------------------------------------------------------------------
% t_kafka_bridge_rest_api_plain_text(_CtConfig) ->
% kafka_bridge_rest_api_all_auth_methods(false).
t_kafka_bridge_rest_api_plain_text(_CtConfig) ->
kafka_bridge_rest_api_all_auth_methods(false).
% t_kafka_bridge_rest_api_ssl(_CtConfig) ->
% kafka_bridge_rest_api_all_auth_methods(true).
@ -223,55 +221,62 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_plain_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
},
SSLSettings
)
),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_plain_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
% },
% SSLSettings
% )
% ),
% kafka_bridge_rest_api_helper(
% maps:merge(
% #{
% <<"bootstrap_hosts">> => SASLHostsString,
% <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
% },
% SSLSettings
% )
% ),
ok.
%% So that we can check if new atoms are created when they are not supposed to be created
pre_create_atoms() ->
[
'kafka_producer__probe_',
probedryrun
].
kafka_bridge_rest_api_helper(Config) ->
BridgeType = ?BRIDGE_TYPE,
BridgeName = "my_kafka_bridge",
BridgeID = emqx_bridge_resource:bridge_id(
erlang:list_to_binary(BridgeType),
erlang:list_to_binary(BridgeName)
),
ResourceId = emqx_bridge_resource:resource_id(
erlang:list_to_binary(BridgeType),
erlang:list_to_binary(BridgeName)
list_to_binary(BridgeType),
list_to_binary(BridgeName)
),
% ResourceId = emqx_bridge_resource:resource_id(
% erlang:list_to_binary(BridgeType),
% erlang:list_to_binary(BridgeName)
% ),
UrlEscColon = "%3A",
BridgesProbeParts = ["bridges_probe"],
BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName,
@ -331,12 +336,13 @@ kafka_bridge_rest_api_helper(Config) ->
%% Check that the new bridge is in the list of bridges
true = MyKafkaBridgeExists(),
%% Probe should work
{ok, 204, _} = http_post(BridgesProbeParts, CreateBody),
%% no extra atoms should be created when probing
%% See pre_create_atoms() above
AtomsBefore = erlang:system_info(atom_count),
{ok, 204, _} = http_post(BridgesProbeParts, CreateBody),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
{ok, 204, X} = http_post(BridgesProbeParts, CreateBody),
%% Create a rule that uses the bridge
{ok, 201, Rule} = http_post(
["rules"],
@ -348,20 +354,24 @@ kafka_bridge_rest_api_helper(Config) ->
}
),
#{<<"id">> := RuleId} = emqx_utils_json:decode(Rule, [return_maps]),
BridgeV2Id = emqx_bridge_v2:id(
list_to_binary(BridgeType),
list_to_binary(BridgeName)
),
%% counters should be empty before
?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:matched_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:inflight_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:queuing_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_other_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:retried_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:retried_failed_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:retried_success_get(BridgeV2Id)),
%% Get offset before sending message
{ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
%% Send message to topic and check that it got forwarded to Kafka
@ -369,34 +379,55 @@ kafka_bridge_rest_api_helper(Config) ->
emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)),
%% Give Kafka some time to get message
timer:sleep(100),
%% Check that Kafka got message
% %% Check that Kafka got message
BrodOut = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
{ok, {_, [KafkaMsg]}} = BrodOut,
Body = KafkaMsg#kafka_message.value,
%% Check crucial counters and gauges
?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)),
?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)),
?assertEqual(1, emqx_resource_metrics:matched_get(BridgeV2Id)),
?assertEqual(1, emqx_resource_metrics:success_get(BridgeV2Id)),
?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')),
?assertEqual(0, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')),
?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
%% Perform operations
?assertEqual(0, emqx_resource_metrics:dropped_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:inflight_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:queuing_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_other_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:retried_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:retried_failed_get(BridgeV2Id)),
?assertEqual(0, emqx_resource_metrics:retried_success_get(BridgeV2Id)),
% %% Perform operations
{ok, 204, _} = http_put(BridgesPartsOpDisable, #{}),
%% Success counter should be reset
?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)),
emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)),
timer:sleep(100),
?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)),
?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')),
?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')),
{ok, 204, _} = http_put(BridgesPartsOpDisable, #{}),
{ok, 204, _} = http_put(BridgesPartsOpEnable, #{}),
?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)),
%% Success counter should increase but
emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)),
timer:sleep(100),
?assertEqual(1, emqx_resource_metrics:success_get(BridgeV2Id)),
?assertEqual(2, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')),
{ok, 204, _} = http_put(BridgesPartsOpEnable, #{}),
{ok, 204, _} = http_post(BridgesPartsOpStop, #{}),
%% TODO: This is a bit tricky with the compatibility layer. Currently one
%% can send a message even to a stopped channel. How shall we handle this?
?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)),
{ok, 204, _} = http_post(BridgesPartsOpStop, #{}),
{ok, 204, _} = http_post(BridgesPartsOpRestart, #{}),
%% Success counter should increase
emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)),
timer:sleep(100),
?assertEqual(1, emqx_resource_metrics:success_get(BridgeV2Id)),
?assertEqual(3, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')),
%% Cleanup
{ok, 204, _} = http_delete(BridgesPartsIdDeleteAlsoActions),
false = MyKafkaBridgeExists(),
@ -479,83 +510,79 @@ t_failed_creation_then_fix(Config) ->
delete_all_bridges(),
ok.
% t_custom_timestamp(_Config) ->
% HostsString = kafka_hosts_string_sasl(),
% AuthSettings = valid_sasl_plain_settings(),
% Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
% Type = ?BRIDGE_TYPE,
% Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
% ResourceId = emqx_bridge_resource:resource_id(Type, Name),
% KafkaTopic = test_topic_one_partition(),
% MQTTTopic = <<"t/local/kafka">>,
% emqx:subscribe(MQTTTopic),
% Conf0 = config(#{
% "authentication" => AuthSettings,
% "kafka_hosts_string" => HostsString,
% "local_topic" => MQTTTopic,
% "kafka_topic" => KafkaTopic,
% "instance_id" => ResourceId,
% "ssl" => #{}
% }),
% Conf = emqx_utils_maps:deep_put(
% [<<"kafka">>, <<"message">>, <<"timestamp">>],
% Conf0,
% <<"123">>
% ),
% {ok, _} = emqx_bridge:create(Type, erlang:list_to_atom(Name), Conf),
% {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
% ct:pal("base offset before testing ~p", [Offset]),
% Time = erlang:unique_integer(),
% BinTime = integer_to_binary(Time),
% Msg = #{
% clientid => BinTime,
% payload => <<"payload">>,
% timestamp => Time
% },
% emqx:publish(emqx_message:make(MQTTTopic, emqx_utils_json:encode(Msg))),
% {ok, {_, [KafkaMsg]}} =
% ?retry(
% _Interval = 500,
% _NAttempts = 20,
% {ok, {_, [_]}} = brod:fetch(kafka_hosts(), KafkaTopic, _Partition = 0, Offset)
% ),
% ?assertMatch(#kafka_message{ts = 123, ts_type = create}, KafkaMsg),
% delete_all_bridges(),
% ok.
t_custom_timestamp(_Config) ->
HostsString = kafka_hosts_string(),
AuthSettings = "none",
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
KafkaTopic = test_topic_one_partition(),
MQTTTopic = <<"t/local/kafka">>,
emqx:subscribe(MQTTTopic),
Conf0 = config(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"local_topic" => MQTTTopic,
"kafka_topic" => KafkaTopic,
"bridge_name" => Name,
"ssl" => #{}
}),
Conf = emqx_utils_maps:deep_put(
[<<"kafka">>, <<"message">>, <<"timestamp">>],
Conf0,
<<"123">>
),
{ok, _} = emqx_bridge:create(list_to_atom(Type), list_to_atom(Name), Conf),
{ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
ct:pal("base offset before testing ~p", [Offset]),
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
emqx:publish(emqx_message:make(MQTTTopic, emqx_utils_json:encode(Msg))),
{ok, {_, [KafkaMsg]}} =
?retry(
_Interval = 500,
_NAttempts = 20,
{ok, {_, [_]}} = brod:fetch(kafka_hosts(), KafkaTopic, _Partition = 0, Offset)
),
?assertMatch(#kafka_message{ts = 123, ts_type = create}, KafkaMsg),
delete_all_bridges(),
ok.
% t_nonexistent_topic(_Config) ->
% HostsString = kafka_hosts_string_sasl(),
% AuthSettings = valid_sasl_plain_settings(),
% Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
% Type = ?BRIDGE_TYPE,
% Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
% ResourceId = emqx_bridge_resource:resource_id(Type, Name),
% BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
% KafkaTopic = "undefined-test-topic",
% Conf = config(#{
% "authentication" => AuthSettings,
% "kafka_hosts_string" => HostsString,
% "kafka_topic" => KafkaTopic,
% "instance_id" => ResourceId,
% "producer" => #{
% "kafka" => #{
% "buffer" => #{
% "memory_overload_protection" => false
% }
% }
% },
% "ssl" => #{}
% }),
% {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
% Type, erlang:list_to_atom(Name), Conf
% ),
% ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name},
% ?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)),
% ok = emqx_bridge_resource:remove(BridgeId),
% delete_all_bridges(),
% ok.
t_nonexistent_topic(_Config) ->
HostsString = kafka_hosts_string(),
AuthSettings = "none",
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
KafkaTopic = "undefined-test-topic",
Conf = config(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"bridge_name" => Name,
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
}),
{ok, #{config := _ValidConfigAtom1}} = emqx_bridge:create(
erlang:list_to_atom(Type), erlang:list_to_atom(Name), Conf
),
% TODO: make sure the user facing APIs for Bridge V1 also get this error
{error, _} = emqx_bridge_v2:health_check(list_to_atom(Type), list_to_atom(Name)),
{ok, _} = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)),
delete_all_bridges(),
ok.
%% DONE
t_send_message_with_headers(Config) ->
%% TODO Change this back to SASL plain once we figure out why it is not working
HostsString = kafka_hosts_string(),
@ -793,89 +820,92 @@ t_wrong_headers(_Config) ->
),
ok.
% t_wrong_headers_from_message(Config) ->
% HostsString = kafka_hosts_string_sasl(),
% AuthSettings = valid_sasl_plain_settings(),
% Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
% Type = ?BRIDGE_TYPE,
% Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
% ResourceId = emqx_bridge_resource:resource_id(Type, Name),
% BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
% KafkaTopic = test_topic_one_partition(),
% Conf = config_with_headers(#{
% "authentication" => AuthSettings,
% "kafka_hosts_string" => HostsString,
% "kafka_topic" => KafkaTopic,
% "instance_id" => ResourceId,
% "kafka_headers" => <<"${payload}">>,
% "producer" => #{
% "kafka" => #{
% "buffer" => #{
% "memory_overload_protection" => false
% }
% }
% },
% "ssl" => #{}
% }),
% {ok, #{config := ConfigAtom1}} = emqx_bridge:create(
% Type, erlang:list_to_atom(Name), Conf
% ),
% ConfigAtom = ConfigAtom1#{bridge_name => Name},
% {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
% Time1 = erlang:unique_integer(),
% Payload1 = <<"wrong_header">>,
% Msg1 = #{
% clientid => integer_to_binary(Time1),
% payload => Payload1,
% timestamp => Time1
% },
% ?assertError(
% {badmatch, {error, {unrecoverable_error, {bad_kafka_headers, Payload1}}}},
% send(Config, ResourceId, Msg1, State)
% ),
% Time2 = erlang:unique_integer(),
% Payload2 = <<"[{\"foo\":\"bar\"}, {\"foo2\":\"bar2\"}]">>,
% Msg2 = #{
% clientid => integer_to_binary(Time2),
% payload => Payload2,
% timestamp => Time2
% },
% ?assertError(
% {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}},
% send(Config, ResourceId, Msg2, State)
% ),
% Time3 = erlang:unique_integer(),
% Payload3 = <<"[{\"key\":\"foo\"}, {\"value\":\"bar\"}]">>,
% Msg3 = #{
% clientid => integer_to_binary(Time3),
% payload => Payload3,
% timestamp => Time3
% },
% ?assertError(
% {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}},
% send(Config, ResourceId, Msg3, State)
% ),
% Time4 = erlang:unique_integer(),
% Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>,
% Msg4 = #{
% clientid => integer_to_binary(Time4),
% payload => Payload4,
% timestamp => Time4
% },
% ?assertError(
% {badmatch,
% {error,
% {unrecoverable_error,
% {bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}},
% send(Config, ResourceId, Msg4, State)
% ),
% %% TODO: refactor those into init/end per testcase
% ok = ?PRODUCER:on_stop(ResourceId, State),
% ?assertEqual([], supervisor:which_children(wolff_client_sup)),
% ?assertEqual([], supervisor:which_children(wolff_producers_sup)),
% ok = emqx_bridge_resource:remove(BridgeId),
% delete_all_bridges(),
% ok.
t_wrong_headers_from_message(Config) ->
HostsString = kafka_hosts_string(),
AuthSettings = "none",
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
% ResourceId = emqx_bridge_resource:resource_id(Type, Name),
% BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
KafkaTopic = test_topic_one_partition(),
Conf = config_with_headers(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"bridge_name" => Name,
"kafka_headers" => <<"${payload}">>,
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
}),
{ok, _} = emqx_bridge:create(
list_to_atom(Type), list_to_atom(Name), Conf
),
% ConfigAtom = ConfigAtom1#{bridge_name => Name},
% {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId),
Time1 = erlang:unique_integer(),
Payload1 = <<"wrong_header">>,
Msg1 = #{
clientid => integer_to_binary(Time1),
payload => Payload1,
timestamp => Time1
},
BridgeV2Id = emqx_bridge_v2:id(bin(Type), bin(Name)),
?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_headers, Payload1}}}},
send(Config, ResourceId, Msg1, State, BridgeV2Id)
),
Time2 = erlang:unique_integer(),
Payload2 = <<"[{\"foo\":\"bar\"}, {\"foo2\":\"bar2\"}]">>,
Msg2 = #{
clientid => integer_to_binary(Time2),
payload => Payload2,
timestamp => Time2
},
?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}},
send(Config, ResourceId, Msg2, State, BridgeV2Id)
),
Time3 = erlang:unique_integer(),
Payload3 = <<"[{\"key\":\"foo\"}, {\"value\":\"bar\"}]">>,
Msg3 = #{
clientid => integer_to_binary(Time3),
payload => Payload3,
timestamp => Time3
},
?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}},
send(Config, ResourceId, Msg3, State, BridgeV2Id)
),
Time4 = erlang:unique_integer(),
Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>,
Msg4 = #{
clientid => integer_to_binary(Time4),
payload => Payload4,
timestamp => Time4
},
?assertError(
{badmatch,
{error,
{unrecoverable_error,
{bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}},
send(Config, ResourceId, Msg4, State, BridgeV2Id)
),
%% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
{ok, _} = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)),
delete_all_bridges(),
ok.
%%------------------------------------------------------------------------------
%% Helper functions

View File

@ -34,6 +34,7 @@
create/3,
create/4,
create_dry_run/2,
create_dry_run/3,
recreate/2,
recreate/3,
remove/1,
@ -240,11 +241,15 @@ recreate(Type, Name, Conf, Opts) ->
parse_opts(Conf, Opts)
).
create_dry_run(Type, Conf0) ->
create_dry_run(Type, Conf) ->
create_dry_run(Type, Conf, fun(_) -> ok end).
create_dry_run(Type, Conf0, Callback) ->
%% Already typechecked, no need to catch errors
TypeBin = bin(Type),
TypeAtom = safe_atom(Type),
TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", emqx_utils:gen_id(8)]),
%% We use a fixed name here to avoid createing an atom
TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", <<"probedryrun">>]),
TmpPath = emqx_utils:safe_filename(TmpName),
Conf1 = maps:without([<<"name">>], Conf0),
RawConf = #{<<"connectors">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},
@ -261,7 +266,7 @@ create_dry_run(Type, Conf0) ->
{ok, ConfNew} ->
ParseConf = parse_confs(bin(Type), TmpName, ConfNew),
emqx_resource:create_dry_run_local(
TmpName, connector_to_resource_type(Type), ParseConf
TmpName, connector_to_resource_type(Type), ParseConf, Callback
)
end
catch

View File

@ -51,6 +51,7 @@
create_dry_run/2,
create_dry_run_local/2,
create_dry_run_local/3,
create_dry_run_local/4,
%% this will do create_dry_run, stop the old instance and start a new one
recreate/3,
recreate/4,
@ -296,6 +297,13 @@ create_dry_run_local(ResourceType, Config) ->
create_dry_run_local(ResId, ResourceType, Config) ->
emqx_resource_manager:create_dry_run(ResId, ResourceType, Config).
-spec create_dry_run_local(resource_id(), resource_type(), resource_config(), OnReadyCallback) ->
ok | {error, Reason :: term()}
when
OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}).
create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) ->
emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback).
-spec recreate(resource_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, Reason :: term()}.
recreate(ResId, ResourceType, Config) ->

View File

@ -27,6 +27,7 @@
remove/1,
create_dry_run/2,
create_dry_run/3,
create_dry_run/4,
restart/2,
start/2,
stop/1,
@ -163,6 +164,16 @@ create_dry_run(ResourceType, Config) ->
create_dry_run(ResId, ResourceType, Config).
create_dry_run(ResId, ResourceType, Config) ->
create_dry_run(ResId, ResourceType, Config, fun do_nothing_on_ready/1).
do_nothing_on_ready(_ResId) ->
ok.
-spec create_dry_run(resource_id(), resource_type(), resource_config(), OnReadyCallback) ->
ok | {error, Reason :: term()}
when
OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}).
create_dry_run(ResId, ResourceType, Config, OnReadyCallback) ->
Opts =
case is_map(Config) of
true -> maps:get(resource_opts, Config, #{});
@ -173,7 +184,19 @@ create_dry_run(ResId, ResourceType, Config) ->
Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000),
case wait_for_ready(ResId, Timeout) of
ok ->
remove(ResId);
CallbackResult =
try
OnReadyCallback(ResId)
catch
_:CallbackReason ->
{error, CallbackReason}
end,
case remove(ResId) of
ok ->
CallbackResult;
{error, _} = Error ->
Error
end;
{error, Reason} ->
_ = remove(ResId),
{error, Reason};

View File

@ -26,7 +26,14 @@
-export([init/1]).
ensure_child(ResId, Group, ResourceType, Config, Opts) ->
_ = supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)),
case supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)) of
{error, Reason} ->
%% This should not happen in production but it can be a huge time sink in
%% development environments if the error is just silently ignored.
error(Reason);
_ ->
ok
end,
ok.
delete_child(ResId) ->