diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c4f66dfff..35ffbc90b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -628,16 +628,6 @@ consumer_group_id(BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. --spec is_dry_run(connector_resource_id()) -> boolean(). -is_dry_run(ConnectorResId) -> - TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX), - case TestIdStart of - nomatch -> - false; - _ -> - string:equal(TestIdStart, ConnectorResId) - end. - -spec check_client_connectivity(pid()) -> ?status_connected | ?status_disconnected @@ -673,7 +663,7 @@ maybe_clean_error(Reason) -> -spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom(). make_client_id(ConnectorResId, BridgeType, BridgeName) -> - case is_dry_run(ConnectorResId) of + case emqx_resource:is_dry_run(ConnectorResId) of false -> ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), binary_to_atom(ClientID0); diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index b819925ac..8b6326545 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -137,14 +137,7 @@ create_producers_for_bridge_v2( KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), - TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), - IsDryRun = - case TestIdStart of - nomatch -> - false; - _ -> - string:equal(TestIdStart, InstId) - end, + IsDryRun = emqx_resource:is_dry_run(BridgeV2Id), ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index a8eeba483..dcb86a3ca 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 9d269493d..4157deec2 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -255,7 +255,7 @@ format_servers(Servers0) -> -spec make_client_id(resource_id()) -> pulsar_client_id(). make_client_id(InstanceId) -> - case is_dry_run(InstanceId) of + case emqx_resource:is_dry_run(InstanceId) of true -> pulsar_producer_probe; false -> @@ -269,14 +269,6 @@ make_client_id(InstanceId) -> binary_to_atom(ClientIdBin) end. --spec is_dry_run(resource_id()) -> boolean(). -is_dry_run(InstanceId) -> - TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), - case TestIdStart of - nomatch -> false; - _ -> string:equal(TestIdStart, InstanceId) - end. - conn_opts(#{authentication := none}) -> #{}; conn_opts(#{authentication := #{username := Username, password := Password}}) -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 15c0a0293..b6f01fde5 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -140,6 +140,8 @@ validate_name/1 ]). +-export([is_dry_run/1]). + -export_type([ query_mode/0, resource_id/0, @@ -769,6 +771,13 @@ validate_name(Name) -> _ = validate_name(Name, #{atom_name => false}), ok. +-spec is_dry_run(resource_id()) -> boolean(). +is_dry_run(ResId) -> + case string:find(ResId, ?TEST_ID_PREFIX) of + nomatch -> false; + TestIdStart -> string:equal(TestIdStart, ResId) + end. + validate_name(<<>>, _Opts) -> invalid_data("Name cannot be empty string"); validate_name(Name, _Opts) when size(Name) >= 255 -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 50a25620c..95b1271f4 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -752,7 +752,8 @@ handle_remove_event(From, ClearMetrics, Data) -> start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of + ResId = Data#data.id, + case emqx_resource:call_start(ResId, Data#data.mod, Data#data.config) of {ok, ResourceState} -> UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState}, %% Perform an initial health_check immediately before transitioning into a connected state @@ -760,12 +761,13 @@ start_resource(Data, From) -> Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), {next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions}; {error, Reason} = Err -> - ?SLOG(warning, #{ + IsDryRun = emqx_resource:is_dry_run(ResId), + ?SLOG(log_level(IsDryRun), #{ msg => "start_resource_failed", - id => Data#data.id, + id => ResId, reason => Reason }), - _ = maybe_alarm(?status_disconnected, Data#data.id, Err, Data#data.error), + _ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error), %% Add channels and raise alarms NewData1 = channels_health_check(?status_disconnected, add_channels(Data)), %% Keep track of the error reason why the connection did not work @@ -796,9 +798,10 @@ add_channels(Data) -> add_channels_in_list([], Data) -> Data; add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> + Id = Data#data.id, case emqx_resource:call_add_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig + Id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig ) of {ok, NewState} -> @@ -816,9 +819,10 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> }, add_channels_in_list(Rest, NewData); {error, Reason} = Error -> - ?SLOG(warning, #{ - msg => add_channel_failed, - id => Data#data.id, + IsDryRun = emqx_resource:is_dry_run(Id), + ?SLOG(log_level(IsDryRun), #{ + msg => "add_channel_failed", + id => Id, channel_id => ChannelID, reason => Reason }), @@ -832,7 +836,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> added_channels = NewAddedChannelsMap }, %% Raise an alarm since the channel could not be added - _ = maybe_alarm(?status_disconnected, ChannelID, Error, no_prev_error), + _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error), add_channels_in_list(Rest, NewData) end. @@ -856,7 +860,8 @@ stop_resource(#data{id = ResId} = Data) -> false -> ok end, - _ = maybe_clear_alarm(ResId), + IsDryRun = emqx_resource:is_dry_run(ResId), + _ = maybe_clear_alarm(IsDryRun, ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), NewData#data{status = ?rm_status_stopped}. @@ -868,15 +873,17 @@ remove_channels_in_list([], Data, _KeepInChannelMap) -> Data; remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> AddedChannelsMap = Data#data.added_channels, + Id = Data#data.id, + IsDryRun = emqx_resource:is_dry_run(Id), NewAddedChannelsMap = case KeepInChannelMap of true -> AddedChannelsMap; false -> - _ = maybe_clear_alarm(ChannelID), + _ = maybe_clear_alarm(IsDryRun, ChannelID), maps:remove(ChannelID, AddedChannelsMap) end, - case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of + case safe_call_remove_channel(Id, Data#data.mod, Data#data.state, ChannelID) of {ok, NewState} -> NewData = Data#data{ state = NewState, @@ -884,9 +891,9 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> }, remove_channels_in_list(Rest, NewData, KeepInChannelMap); {error, Reason} -> - ?SLOG(warning, #{ - msg => remove_channel_failed, - id => Data#data.id, + ?SLOG(log_level(IsDryRun), #{ + msg => "remove_channel_failed", + id => Id, channel_id => ChannelID, reason => Reason }), @@ -968,8 +975,8 @@ handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) -> handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, - %% Deactivate alarm - _ = maybe_clear_alarm(ChannelId), + IsDryRun = emqx_resource:is_dry_run(Data#data.id), + _ = maybe_clear_alarm(IsDryRun, ChannelId), case channel_status_is_channel_added( maps:get(ChannelId, Channels, channel_status_not_added(undefined)) @@ -990,13 +997,13 @@ handle_remove_channel(From, ChannelId, Data) -> end. handle_remove_channel_exists(From, ChannelId, Data) -> + #data{id = Id, added_channels = AddedChannelsMap} = Data, case emqx_resource:call_remove_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelId + Id, Data#data.mod, Data#data.state, ChannelId ) of {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, NewAddedChannelsMap = maps:remove(ChannelId, AddedChannelsMap), UpdatedData = Data#data{ state = NewState, @@ -1004,10 +1011,10 @@ handle_remove_channel_exists(From, ChannelId, Data) -> }, {keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]}; {error, Reason} = Error -> - %% Log the error as a warning - ?SLOG(warning, #{ - msg => remove_channel_failed, - id => Data#data.id, + IsDryRun = emqx_resource:is_dry_run(Id), + ?SLOG(log_level(IsDryRun), #{ + msg => "remove_channel_failed", + id => Id, channel_id => ChannelId, reason => Reason }), @@ -1021,7 +1028,8 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, NewChannels = maps:remove(ChannelId, Channels), NewData = Data#data{added_channels = NewChannels}, - _ = maybe_clear_alarm(ChannelId), + IsDryRun = emqx_resource:is_dry_run(Data#data.id), + _ = maybe_clear_alarm(IsDryRun, ChannelId), {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when @@ -1090,7 +1098,8 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) -> error = PrevError } = Data0, {NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0), - _ = maybe_alarm(NewStatus, ResId, Err, PrevError), + IsDryRun = emqx_resource:is_dry_run(ResId), + _ = maybe_alarm(NewStatus, IsDryRun, ResId, Err, PrevError), ok = maybe_resume_resource_workers(ResId, NewStatus), Data1 = Data0#data{ state = NewState, status = NewStatus, error = Err @@ -1114,7 +1123,8 @@ continue_resource_health_check_connected(NewStatus, Data0) -> Actions = Replies ++ resource_health_check_actions(Data), {keep_state, Data, Actions}; _ -> - ?SLOG(warning, #{ + IsDryRun = emqx_resource:is_dry_run(Data0#data.id), + ?SLOG(log_level(IsDryRun), #{ msg => "health_check_failed", id => Data0#data.id, status => NewStatus @@ -1214,7 +1224,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) -> channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> %% Whenever the resource is connecting: %% 1. Change the status of all added channels to connecting - %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) + %% 2. Raise alarms Channels = Data0#data.added_channels, ChannelsToChangeStatusFor = [ {ChannelId, Config} @@ -1240,9 +1250,10 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> || {ChannelId, NewStatus} <- maps:to_list(NewChannels) ], %% Raise alarms for all channels + IsDryRun = emqx_resource:is_dry_run(Data0#data.id), lists:foreach( fun({ChannelId, Status, PrevStatus}) -> - maybe_alarm(?status_connecting, ChannelId, Status, PrevStatus) + maybe_alarm(?status_connecting, IsDryRun, ChannelId, Status, PrevStatus) end, ChannelsWithNewAndPrevErrorStatuses ), @@ -1275,9 +1286,10 @@ channels_health_check(ConnectorStatus, Data0) -> || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels) ], %% Raise alarms + IsDryRun = emqx_resource:is_dry_run(Data1#data.id), _ = lists:foreach( fun({ChannelId, OldStatus, NewStatus}) -> - _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus) + _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus) end, ChannelsWithNewAndOldStatuses ), @@ -1386,13 +1398,14 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta NewStatus = maps:get(ChannelId, Data1#data.added_channels), ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], Data = remove_channels_in_list(ChannelsToRemove, Data1, true), + IsDryRun = emqx_resource:is_dry_run(Data1#data.id), %% Raise/clear alarms case NewStatus of #{status := ?status_connected} -> - _ = maybe_clear_alarm(ChannelId), + _ = maybe_clear_alarm(IsDryRun, ChannelId), ok; _ -> - _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus), + _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus), ok end, Data. @@ -1556,15 +1569,21 @@ remove_runtime_data(#data{} = Data0) -> health_check_interval(Opts) -> maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). --spec maybe_alarm(resource_status(), resource_id(), _Error :: term(), _PrevError :: term()) -> ok. -maybe_alarm(?status_connected, _ResId, _Error, _PrevError) -> +-spec maybe_alarm( + resource_status(), + boolean(), + resource_id(), + _Error :: term(), + _PrevError :: term() +) -> ok. +maybe_alarm(?status_connected, _IsDryRun, _ResId, _Error, _PrevError) -> ok; -maybe_alarm(_Status, <>, _Error, _PrevError) -> +maybe_alarm(_Status, true, _ResId, _Error, _PrevError) -> ok; %% Assume that alarm is already active -maybe_alarm(_Status, _ResId, Error, Error) -> +maybe_alarm(_Status, _IsDryRun, _ResId, Error, Error) -> ok; -maybe_alarm(_Status, ResId, Error, _PrevError) -> +maybe_alarm(_Status, false, ResId, Error, _PrevError) -> HrError = case Error of {error, undefined} -> @@ -1596,10 +1615,10 @@ maybe_resume_resource_workers(ResId, ?status_connected) -> maybe_resume_resource_workers(_, _) -> ok. --spec maybe_clear_alarm(resource_id()) -> ok | {error, not_found}. -maybe_clear_alarm(<>) -> +-spec maybe_clear_alarm(boolean(), resource_id()) -> ok | {error, not_found}. +maybe_clear_alarm(true, _ResId) -> ok; -maybe_clear_alarm(ResId) -> +maybe_clear_alarm(false, ResId) -> emqx_alarm:safe_deactivate(ResId). parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> @@ -1767,10 +1786,14 @@ add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) -> ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig), NewChannels = maps:put(ChannelId, ChannelStatus, Channels), ResStatus = state_to_status(State), - maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), + IsDryRun = emqx_resource:is_dry_run(ChannelId), + maybe_alarm(ResStatus, IsDryRun, ChannelId, ChannelStatus, no_prev), Data#data{added_channels = NewChannels}. state_to_status(?state_stopped) -> ?rm_status_stopped; state_to_status(?state_connected) -> ?status_connected; state_to_status(?state_connecting) -> ?status_connecting; state_to_status(?state_disconnected) -> ?status_disconnected. + +log_level(true) -> info; +log_level(false) -> warning.