From 828bbc57ac25da18de8e644137014c365658027e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 12 Oct 2023 18:58:16 +0200 Subject: [PATCH] feat: test case fixes and compatibility layer probe etc * test case fixes for Bridge V1 suite * Bug fixes * local_topic * Bridge V1 probe compatibility functionality --- apps/emqx_bridge/src/emqx_bridge_api.erl | 3 +- apps/emqx_bridge/src/emqx_bridge_v2.erl | 117 ++--- .../emqx_bridge_kafka_impl_producer_SUITE.erl | 496 ++++++++++-------- .../src/emqx_connector_resource.erl | 11 +- apps/emqx_resource/src/emqx_resource.erl | 8 + .../src/emqx_resource_manager.erl | 25 +- .../src/emqx_resource_manager_sup.erl | 9 +- 7 files changed, 369 insertions(+), 300 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index af498471c..789cf509a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -542,7 +542,8 @@ schema("/bridges_probe") -> case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of {ok, #{body := #{<<"type">> := ConnType} = Params}} -> Params1 = maybe_deobfuscate_bridge_probe(Params), - case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of + Params2 = maps:remove(<<"type">>, Params1), + case emqx_bridge_resource:create_dry_run(ConnType, Params2) of ok -> ?NO_CONTENT; {error, #{kind := validation_error} = Reason0} -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index c1b42ffdd..dd158aee2 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -70,7 +70,8 @@ send_message/4, start/2, stop/2, - restart/2 + restart/2, + reset_metrics/2 ]). %% Config Update Handler API @@ -188,6 +189,19 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) -> Error end. +do_send_msg_with_enabled_config( + BridgeType, BridgeName, Message, QueryOpts0, Config +) -> + QueryMode = get_query_mode(BridgeType, Config), + QueryOpts = maps:merge( + emqx_bridge:query_opts(Config), + QueryOpts0#{ + query_mode => QueryMode + } + ), + BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), + emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). + health_check(BridgeType, BridgeName) -> case lookup_raw_conf(BridgeType, BridgeName) of #{ @@ -219,10 +233,10 @@ restart(Type, Name) -> stop(Type, Name), start(Type, Name). -%% TODO: The following functions just restart the bridge_v2 as a temporary solution. +%% TODO: it is not clear what these operations should do stop(Type, Name) -> - %% Stop means that we should remove the channel from the connector and reset the metrrics + %% Stop means that we should remove the channel from the connector and reset the metrics %% The emqx_resource_buffer_worker is not stopped stop_helper(Type, Name, lookup_raw_conf(Type, Name)). @@ -230,7 +244,7 @@ stop_helper(_Type, _Name, #{enable := false}) -> ok; stop_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), - ok = emqx_resource:clear_metrics(BridgeV2Id), + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id), ConnectorId = emqx_connector_resource:resource_id( bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ), @@ -242,59 +256,22 @@ start(Type, Name) -> start_helper(_Type, _Name, #{enable := false}) -> ok; -start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> +start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) -> BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), %% Deinstall from connector ConnectorId = emqx_connector_resource:resource_id( bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ), - emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, #{connector => ConnectorName}). + emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config). -% do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config) -> -% BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), -% ConnectorResourceId = emqx_bridge_v2:extract_connector_id_from_bridge_v2_id(BridgeV2Id), -% try -% case emqx_resource_manager:maybe_install_bridge_v2(ConnectorResourceId, BridgeV2Id) of -% ok -> -% do_send_msg_after_bridge_v2_installed( -% BridgeType, -% BridgeName, -% BridgeV2Id, -% Message, -% QueryOpts0, -% Config -% ); -% InstallError -> -% throw(InstallError) -% end -% catch -% Error:Reason:Stack -> -% Msg = iolist_to_binary( -% io_lib:format( -% "Failed to install bridge_v2 ~p in connector ~p: ~p", -% [BridgeV2Id, ConnectorResourceId, Reason] -% ) -% ), -% ?SLOG(error, #{ -% msg => Msg, -% error => Error, -% reason => Reason, -% stacktrace => Stack -% }) -% end. +reset_metrics(Type, Name) -> + reset_metrics_helper(Type, Name, lookup_raw_conf(Type, Name)). -do_send_msg_with_enabled_config( - BridgeType, BridgeName, Message, QueryOpts0, Config -) -> - QueryMode = get_query_mode(BridgeType, Config), - QueryOpts = maps:merge( - emqx_bridge:query_opts(Config), - QueryOpts0#{ - query_mode => QueryMode - } - ), - BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), - emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). +reset_metrics_helper(_Type, _Name, #{enable := false}) -> + ok; +reset_metrics_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> + BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), + ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id). parse_id(Id) -> case binary:split(Id, <<":">>, [global]) of @@ -401,7 +378,7 @@ lookup(Id) -> lookup(Type, Name) -> case emqx:get_raw_config([?ROOT_KEY, Type, Name], not_found) of not_found -> - {error, bridge_not_found}; + {error, not_found}; #{<<"connector">> := BridgeConnector} = RawConf -> ConnectorId = emqx_connector_resource:resource_id( bridge_v2_type_to_connector_type(Type), BridgeConnector @@ -605,19 +582,37 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) -> end. bridge_v1_create_dry_run(BridgeType, RawConfig0) -> - RawConf = maps:without(<<"name">>, RawConfig0), + RawConf = maps:without([<<"name">>], RawConfig0), TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), #{ - connector_type := _ConnectorType, + connector_type := ConnectorType, connector_name := _NewConnectorName, - connector_conf := _NewConnectorRawConf, - bridge_v2_type := _BridgeType, - bridge_v2_name := _BridgeName, - bridge_v2_conf := _NewBridgeV2RawConf - } = - split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf), - % TODO once we have implemented the dry-run for channels we should use it here - ok. + connector_conf := ConnectorRawConf, + bridge_v2_type := BridgeType, + bridge_v2_name := BridgeName, + bridge_v2_conf := BridgeV2RawConf + } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf), + OnReadyCallback = + fun(ConnectorId) -> + {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), + ChannelTestId = id(BridgeType, BridgeName, ConnectorName), + BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), + case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, BridgeV2Conf) of + {error, Reason} -> + {error, Reason}; + ok -> + HealthCheckResult = emqx_resource_manager:channel_health_check( + ConnectorId, ChannelTestId + ), + case HealthCheckResult of + {error, Reason} -> + {error, Reason}; + _ -> + ok + end + end + end, + emqx_connector_resource:create_dry_run(ConnectorType, ConnectorRawConf, OnReadyCallback). %% NOTE: This function can cause broken references but it is only called from %% test cases. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 1ede4cbf1..a2e9b45db 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -162,11 +162,9 @@ t_query_mode(CtConfig) -> %% Test cases for all combinations of SSL, no SSL and authentication types %%------------------------------------------------------------------------------ -%% OK t_publish_no_auth(CtConfig) -> publish_with_and_without_ssl(CtConfig, "none"). -%% OK t_publish_no_auth_key_dispatch(CtConfig) -> publish_with_and_without_ssl(CtConfig, "none", #{"partition_strategy" => "key_dispatch"}). @@ -186,8 +184,8 @@ t_publish_no_auth_key_dispatch(CtConfig) -> %% Test cases for REST api %%------------------------------------------------------------------------------ -% t_kafka_bridge_rest_api_plain_text(_CtConfig) -> -% kafka_bridge_rest_api_all_auth_methods(false). +t_kafka_bridge_rest_api_plain_text(_CtConfig) -> + kafka_bridge_rest_api_all_auth_methods(false). % t_kafka_bridge_rest_api_ssl(_CtConfig) -> % kafka_bridge_rest_api_all_auth_methods(true). @@ -223,55 +221,62 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> SSLSettings ) ), - kafka_bridge_rest_api_helper( - maps:merge( - #{ - <<"bootstrap_hosts">> => SASLHostsString, - <<"authentication">> => BinifyMap(valid_sasl_plain_settings()) - }, - SSLSettings - ) - ), - kafka_bridge_rest_api_helper( - maps:merge( - #{ - <<"bootstrap_hosts">> => SASLHostsString, - <<"authentication">> => BinifyMap(valid_sasl_scram256_settings()) - }, - SSLSettings - ) - ), - kafka_bridge_rest_api_helper( - maps:merge( - #{ - <<"bootstrap_hosts">> => SASLHostsString, - <<"authentication">> => BinifyMap(valid_sasl_scram512_settings()) - }, - SSLSettings - ) - ), - kafka_bridge_rest_api_helper( - maps:merge( - #{ - <<"bootstrap_hosts">> => SASLHostsString, - <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings()) - }, - SSLSettings - ) - ), + % kafka_bridge_rest_api_helper( + % maps:merge( + % #{ + % <<"bootstrap_hosts">> => SASLHostsString, + % <<"authentication">> => BinifyMap(valid_sasl_plain_settings()) + % }, + % SSLSettings + % ) + % ), + % kafka_bridge_rest_api_helper( + % maps:merge( + % #{ + % <<"bootstrap_hosts">> => SASLHostsString, + % <<"authentication">> => BinifyMap(valid_sasl_scram256_settings()) + % }, + % SSLSettings + % ) + % ), + % kafka_bridge_rest_api_helper( + % maps:merge( + % #{ + % <<"bootstrap_hosts">> => SASLHostsString, + % <<"authentication">> => BinifyMap(valid_sasl_scram512_settings()) + % }, + % SSLSettings + % ) + % ), + % kafka_bridge_rest_api_helper( + % maps:merge( + % #{ + % <<"bootstrap_hosts">> => SASLHostsString, + % <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings()) + % }, + % SSLSettings + % ) + % ), ok. +%% So that we can check if new atoms are created when they are not supposed to be created +pre_create_atoms() -> + [ + 'kafka_producer__probe_', + probedryrun + ]. + kafka_bridge_rest_api_helper(Config) -> BridgeType = ?BRIDGE_TYPE, BridgeName = "my_kafka_bridge", BridgeID = emqx_bridge_resource:bridge_id( - erlang:list_to_binary(BridgeType), - erlang:list_to_binary(BridgeName) - ), - ResourceId = emqx_bridge_resource:resource_id( - erlang:list_to_binary(BridgeType), - erlang:list_to_binary(BridgeName) + list_to_binary(BridgeType), + list_to_binary(BridgeName) ), + % ResourceId = emqx_bridge_resource:resource_id( + % erlang:list_to_binary(BridgeType), + % erlang:list_to_binary(BridgeName) + % ), UrlEscColon = "%3A", BridgesProbeParts = ["bridges_probe"], BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName, @@ -331,12 +336,13 @@ kafka_bridge_rest_api_helper(Config) -> %% Check that the new bridge is in the list of bridges true = MyKafkaBridgeExists(), %% Probe should work - {ok, 204, _} = http_post(BridgesProbeParts, CreateBody), %% no extra atoms should be created when probing + %% See pre_create_atoms() above AtomsBefore = erlang:system_info(atom_count), {ok, 204, _} = http_post(BridgesProbeParts, CreateBody), AtomsAfter = erlang:system_info(atom_count), ?assertEqual(AtomsBefore, AtomsAfter), + {ok, 204, X} = http_post(BridgesProbeParts, CreateBody), %% Create a rule that uses the bridge {ok, 201, Rule} = http_post( ["rules"], @@ -348,20 +354,24 @@ kafka_bridge_rest_api_helper(Config) -> } ), #{<<"id">> := RuleId} = emqx_utils_json:decode(Rule, [return_maps]), + BridgeV2Id = emqx_bridge_v2:id( + list_to_binary(BridgeType), + list_to_binary(BridgeName) + ), %% counters should be empty before - ?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:matched_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_other_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:retried_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:retried_failed_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:retried_success_get(BridgeV2Id)), %% Get offset before sending message {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), %% Send message to topic and check that it got forwarded to Kafka @@ -369,34 +379,55 @@ kafka_bridge_rest_api_helper(Config) -> emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)), %% Give Kafka some time to get message timer:sleep(100), - %% Check that Kafka got message + % %% Check that Kafka got message BrodOut = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), {ok, {_, [KafkaMsg]}} = BrodOut, Body = KafkaMsg#kafka_message.value, %% Check crucial counters and gauges - ?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)), - ?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)), + ?assertEqual(1, emqx_resource_metrics:matched_get(BridgeV2Id)), + ?assertEqual(1, emqx_resource_metrics:success_get(BridgeV2Id)), ?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')), ?assertEqual(0, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')), - ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), - %% Perform operations + ?assertEqual(0, emqx_resource_metrics:dropped_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_other_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:retried_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:retried_failed_get(BridgeV2Id)), + ?assertEqual(0, emqx_resource_metrics:retried_success_get(BridgeV2Id)), + % %% Perform operations {ok, 204, _} = http_put(BridgesPartsOpDisable, #{}), + %% Success counter should be reset + ?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)), + emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)), + timer:sleep(100), + ?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)), + ?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')), + ?assertEqual(1, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.failed')), {ok, 204, _} = http_put(BridgesPartsOpDisable, #{}), {ok, 204, _} = http_put(BridgesPartsOpEnable, #{}), + ?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)), + %% Success counter should increase but + emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)), + timer:sleep(100), + ?assertEqual(1, emqx_resource_metrics:success_get(BridgeV2Id)), + ?assertEqual(2, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')), {ok, 204, _} = http_put(BridgesPartsOpEnable, #{}), {ok, 204, _} = http_post(BridgesPartsOpStop, #{}), + %% TODO: This is a bit tricky with the compatibility layer. Currently one + %% can send a message even to a stopped channel. How shall we handle this? + ?assertEqual(0, emqx_resource_metrics:success_get(BridgeV2Id)), {ok, 204, _} = http_post(BridgesPartsOpStop, #{}), {ok, 204, _} = http_post(BridgesPartsOpRestart, #{}), + %% Success counter should increase + emqx:publish(emqx_message:make(<<"kafka_bridge_topic/1">>, Body)), + timer:sleep(100), + ?assertEqual(1, emqx_resource_metrics:success_get(BridgeV2Id)), + ?assertEqual(3, emqx_metrics_worker:get(rule_metrics, RuleId, 'actions.success')), %% Cleanup {ok, 204, _} = http_delete(BridgesPartsIdDeleteAlsoActions), false = MyKafkaBridgeExists(), @@ -479,83 +510,79 @@ t_failed_creation_then_fix(Config) -> delete_all_bridges(), ok. -% t_custom_timestamp(_Config) -> -% HostsString = kafka_hosts_string_sasl(), -% AuthSettings = valid_sasl_plain_settings(), -% Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), -% Type = ?BRIDGE_TYPE, -% Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), -% ResourceId = emqx_bridge_resource:resource_id(Type, Name), -% KafkaTopic = test_topic_one_partition(), -% MQTTTopic = <<"t/local/kafka">>, -% emqx:subscribe(MQTTTopic), -% Conf0 = config(#{ -% "authentication" => AuthSettings, -% "kafka_hosts_string" => HostsString, -% "local_topic" => MQTTTopic, -% "kafka_topic" => KafkaTopic, -% "instance_id" => ResourceId, -% "ssl" => #{} -% }), -% Conf = emqx_utils_maps:deep_put( -% [<<"kafka">>, <<"message">>, <<"timestamp">>], -% Conf0, -% <<"123">> -% ), -% {ok, _} = emqx_bridge:create(Type, erlang:list_to_atom(Name), Conf), -% {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), -% ct:pal("base offset before testing ~p", [Offset]), -% Time = erlang:unique_integer(), -% BinTime = integer_to_binary(Time), -% Msg = #{ -% clientid => BinTime, -% payload => <<"payload">>, -% timestamp => Time -% }, -% emqx:publish(emqx_message:make(MQTTTopic, emqx_utils_json:encode(Msg))), -% {ok, {_, [KafkaMsg]}} = -% ?retry( -% _Interval = 500, -% _NAttempts = 20, -% {ok, {_, [_]}} = brod:fetch(kafka_hosts(), KafkaTopic, _Partition = 0, Offset) -% ), -% ?assertMatch(#kafka_message{ts = 123, ts_type = create}, KafkaMsg), -% delete_all_bridges(), -% ok. +t_custom_timestamp(_Config) -> + HostsString = kafka_hosts_string(), + AuthSettings = "none", + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + KafkaTopic = test_topic_one_partition(), + MQTTTopic = <<"t/local/kafka">>, + emqx:subscribe(MQTTTopic), + Conf0 = config(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "local_topic" => MQTTTopic, + "kafka_topic" => KafkaTopic, + "bridge_name" => Name, + "ssl" => #{} + }), + Conf = emqx_utils_maps:deep_put( + [<<"kafka">>, <<"message">>, <<"timestamp">>], + Conf0, + <<"123">> + ), + {ok, _} = emqx_bridge:create(list_to_atom(Type), list_to_atom(Name), Conf), + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + ct:pal("base offset before testing ~p", [Offset]), + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + emqx:publish(emqx_message:make(MQTTTopic, emqx_utils_json:encode(Msg))), + {ok, {_, [KafkaMsg]}} = + ?retry( + _Interval = 500, + _NAttempts = 20, + {ok, {_, [_]}} = brod:fetch(kafka_hosts(), KafkaTopic, _Partition = 0, Offset) + ), + ?assertMatch(#kafka_message{ts = 123, ts_type = create}, KafkaMsg), + delete_all_bridges(), + ok. -% t_nonexistent_topic(_Config) -> -% HostsString = kafka_hosts_string_sasl(), -% AuthSettings = valid_sasl_plain_settings(), -% Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), -% Type = ?BRIDGE_TYPE, -% Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), -% ResourceId = emqx_bridge_resource:resource_id(Type, Name), -% BridgeId = emqx_bridge_resource:bridge_id(Type, Name), -% KafkaTopic = "undefined-test-topic", -% Conf = config(#{ -% "authentication" => AuthSettings, -% "kafka_hosts_string" => HostsString, -% "kafka_topic" => KafkaTopic, -% "instance_id" => ResourceId, -% "producer" => #{ -% "kafka" => #{ -% "buffer" => #{ -% "memory_overload_protection" => false -% } -% } -% }, -% "ssl" => #{} -% }), -% {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( -% Type, erlang:list_to_atom(Name), Conf -% ), -% ValidConfigAtom = ValidConfigAtom1#{bridge_name => Name}, -% ?assertThrow(_, ?PRODUCER:on_start(ResourceId, ValidConfigAtom)), -% ok = emqx_bridge_resource:remove(BridgeId), -% delete_all_bridges(), -% ok. +t_nonexistent_topic(_Config) -> + HostsString = kafka_hosts_string(), + AuthSettings = "none", + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + KafkaTopic = "undefined-test-topic", + Conf = config(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "bridge_name" => Name, + "producer" => #{ + "kafka" => #{ + "buffer" => #{ + "memory_overload_protection" => false + } + } + }, + "ssl" => #{} + }), + {ok, #{config := _ValidConfigAtom1}} = emqx_bridge:create( + erlang:list_to_atom(Type), erlang:list_to_atom(Name), Conf + ), + % TODO: make sure the user facing APIs for Bridge V1 also get this error + {error, _} = emqx_bridge_v2:health_check(list_to_atom(Type), list_to_atom(Name)), + {ok, _} = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)), + delete_all_bridges(), + ok. -%% DONE t_send_message_with_headers(Config) -> %% TODO Change this back to SASL plain once we figure out why it is not working HostsString = kafka_hosts_string(), @@ -793,89 +820,92 @@ t_wrong_headers(_Config) -> ), ok. -% t_wrong_headers_from_message(Config) -> -% HostsString = kafka_hosts_string_sasl(), -% AuthSettings = valid_sasl_plain_settings(), -% Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), -% Type = ?BRIDGE_TYPE, -% Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), -% ResourceId = emqx_bridge_resource:resource_id(Type, Name), -% BridgeId = emqx_bridge_resource:bridge_id(Type, Name), -% KafkaTopic = test_topic_one_partition(), -% Conf = config_with_headers(#{ -% "authentication" => AuthSettings, -% "kafka_hosts_string" => HostsString, -% "kafka_topic" => KafkaTopic, -% "instance_id" => ResourceId, -% "kafka_headers" => <<"${payload}">>, -% "producer" => #{ -% "kafka" => #{ -% "buffer" => #{ -% "memory_overload_protection" => false -% } -% } -% }, -% "ssl" => #{} -% }), -% {ok, #{config := ConfigAtom1}} = emqx_bridge:create( -% Type, erlang:list_to_atom(Name), Conf -% ), -% ConfigAtom = ConfigAtom1#{bridge_name => Name}, -% {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom), -% Time1 = erlang:unique_integer(), -% Payload1 = <<"wrong_header">>, -% Msg1 = #{ -% clientid => integer_to_binary(Time1), -% payload => Payload1, -% timestamp => Time1 -% }, -% ?assertError( -% {badmatch, {error, {unrecoverable_error, {bad_kafka_headers, Payload1}}}}, -% send(Config, ResourceId, Msg1, State) -% ), -% Time2 = erlang:unique_integer(), -% Payload2 = <<"[{\"foo\":\"bar\"}, {\"foo2\":\"bar2\"}]">>, -% Msg2 = #{ -% clientid => integer_to_binary(Time2), -% payload => Payload2, -% timestamp => Time2 -% }, -% ?assertError( -% {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}}, -% send(Config, ResourceId, Msg2, State) -% ), -% Time3 = erlang:unique_integer(), -% Payload3 = <<"[{\"key\":\"foo\"}, {\"value\":\"bar\"}]">>, -% Msg3 = #{ -% clientid => integer_to_binary(Time3), -% payload => Payload3, -% timestamp => Time3 -% }, -% ?assertError( -% {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}}, -% send(Config, ResourceId, Msg3, State) -% ), -% Time4 = erlang:unique_integer(), -% Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>, -% Msg4 = #{ -% clientid => integer_to_binary(Time4), -% payload => Payload4, -% timestamp => Time4 -% }, -% ?assertError( -% {badmatch, -% {error, -% {unrecoverable_error, -% {bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}}, -% send(Config, ResourceId, Msg4, State) -% ), -% %% TODO: refactor those into init/end per testcase -% ok = ?PRODUCER:on_stop(ResourceId, State), -% ?assertEqual([], supervisor:which_children(wolff_client_sup)), -% ?assertEqual([], supervisor:which_children(wolff_producers_sup)), -% ok = emqx_bridge_resource:remove(BridgeId), -% delete_all_bridges(), -% ok. +t_wrong_headers_from_message(Config) -> + HostsString = kafka_hosts_string(), + AuthSettings = "none", + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + % ResourceId = emqx_bridge_resource:resource_id(Type, Name), + % BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + KafkaTopic = test_topic_one_partition(), + Conf = config_with_headers(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "bridge_name" => Name, + "kafka_headers" => <<"${payload}">>, + "producer" => #{ + "kafka" => #{ + "buffer" => #{ + "memory_overload_protection" => false + } + } + }, + "ssl" => #{} + }), + {ok, _} = emqx_bridge:create( + list_to_atom(Type), list_to_atom(Name), Conf + ), + % ConfigAtom = ConfigAtom1#{bridge_name => Name}, + % {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom), + ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)), + {ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId), + Time1 = erlang:unique_integer(), + Payload1 = <<"wrong_header">>, + Msg1 = #{ + clientid => integer_to_binary(Time1), + payload => Payload1, + timestamp => Time1 + }, + BridgeV2Id = emqx_bridge_v2:id(bin(Type), bin(Name)), + ?assertError( + {badmatch, {error, {unrecoverable_error, {bad_kafka_headers, Payload1}}}}, + send(Config, ResourceId, Msg1, State, BridgeV2Id) + ), + Time2 = erlang:unique_integer(), + Payload2 = <<"[{\"foo\":\"bar\"}, {\"foo2\":\"bar2\"}]">>, + Msg2 = #{ + clientid => integer_to_binary(Time2), + payload => Payload2, + timestamp => Time2 + }, + ?assertError( + {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}}, + send(Config, ResourceId, Msg2, State, BridgeV2Id) + ), + Time3 = erlang:unique_integer(), + Payload3 = <<"[{\"key\":\"foo\"}, {\"value\":\"bar\"}]">>, + Msg3 = #{ + clientid => integer_to_binary(Time3), + payload => Payload3, + timestamp => Time3 + }, + ?assertError( + {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}}, + send(Config, ResourceId, Msg3, State, BridgeV2Id) + ), + Time4 = erlang:unique_integer(), + Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>, + Msg4 = #{ + clientid => integer_to_binary(Time4), + payload => Payload4, + timestamp => Time4 + }, + ?assertError( + {badmatch, + {error, + {unrecoverable_error, + {bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}}, + send(Config, ResourceId, Msg4, State, BridgeV2Id) + ), + %% TODO: refactor those into init/end per testcase + ok = ?PRODUCER:on_stop(ResourceId, State), + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertEqual([], supervisor:which_children(wolff_producers_sup)), + {ok, _} = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)), + delete_all_bridges(), + ok. %%------------------------------------------------------------------------------ %% Helper functions diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 847665c15..2c3bdfa74 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -34,6 +34,7 @@ create/3, create/4, create_dry_run/2, + create_dry_run/3, recreate/2, recreate/3, remove/1, @@ -240,11 +241,15 @@ recreate(Type, Name, Conf, Opts) -> parse_opts(Conf, Opts) ). -create_dry_run(Type, Conf0) -> +create_dry_run(Type, Conf) -> + create_dry_run(Type, Conf, fun(_) -> ok end). + +create_dry_run(Type, Conf0, Callback) -> %% Already typechecked, no need to catch errors TypeBin = bin(Type), TypeAtom = safe_atom(Type), - TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", emqx_utils:gen_id(8)]), + %% We use a fixed name here to avoid createing an atom + TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", <<"probedryrun">>]), TmpPath = emqx_utils:safe_filename(TmpName), Conf1 = maps:without([<<"name">>], Conf0), RawConf = #{<<"connectors">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, @@ -261,7 +266,7 @@ create_dry_run(Type, Conf0) -> {ok, ConfNew} -> ParseConf = parse_confs(bin(Type), TmpName, ConfNew), emqx_resource:create_dry_run_local( - TmpName, connector_to_resource_type(Type), ParseConf + TmpName, connector_to_resource_type(Type), ParseConf, Callback ) end catch diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index fea720a3c..9af41adbc 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -51,6 +51,7 @@ create_dry_run/2, create_dry_run_local/2, create_dry_run_local/3, + create_dry_run_local/4, %% this will do create_dry_run, stop the old instance and start a new one recreate/3, recreate/4, @@ -296,6 +297,13 @@ create_dry_run_local(ResourceType, Config) -> create_dry_run_local(ResId, ResourceType, Config) -> emqx_resource_manager:create_dry_run(ResId, ResourceType, Config). +-spec create_dry_run_local(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> + ok | {error, Reason :: term()} +when + OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). +create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) -> + emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback). + -spec recreate(resource_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate(ResId, ResourceType, Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index f8272f273..b8023e6ee 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -27,6 +27,7 @@ remove/1, create_dry_run/2, create_dry_run/3, + create_dry_run/4, restart/2, start/2, stop/1, @@ -163,6 +164,16 @@ create_dry_run(ResourceType, Config) -> create_dry_run(ResId, ResourceType, Config). create_dry_run(ResId, ResourceType, Config) -> + create_dry_run(ResId, ResourceType, Config, fun do_nothing_on_ready/1). + +do_nothing_on_ready(_ResId) -> + ok. + +-spec create_dry_run(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> + ok | {error, Reason :: term()} +when + OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). +create_dry_run(ResId, ResourceType, Config, OnReadyCallback) -> Opts = case is_map(Config) of true -> maps:get(resource_opts, Config, #{}); @@ -173,7 +184,19 @@ create_dry_run(ResId, ResourceType, Config) -> Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000), case wait_for_ready(ResId, Timeout) of ok -> - remove(ResId); + CallbackResult = + try + OnReadyCallback(ResId) + catch + _:CallbackReason -> + {error, CallbackReason} + end, + case remove(ResId) of + ok -> + CallbackResult; + {error, _} = Error -> + Error + end; {error, Reason} -> _ = remove(ResId), {error, Reason}; diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 732d5e513..7e26589d2 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -26,7 +26,14 @@ -export([init/1]). ensure_child(ResId, Group, ResourceType, Config, Opts) -> - _ = supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)), + case supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)) of + {error, Reason} -> + %% This should not happen in production but it can be a huge time sink in + %% development environments if the error is just silently ignored. + error(Reason); + _ -> + ok + end, ok. delete_child(ResId) ->