feat: don't record dry_run log

This commit is contained in:
zhongwencool 2024-07-11 14:16:18 +08:00
parent a2bed1efb8
commit cba3f532f8
6 changed files with 76 additions and 69 deletions

View File

@ -628,16 +628,6 @@ consumer_group_id(BridgeName0) ->
BridgeName = to_bin(BridgeName0), BridgeName = to_bin(BridgeName0),
<<"emqx-kafka-consumer-", BridgeName/binary>>. <<"emqx-kafka-consumer-", BridgeName/binary>>.
-spec is_dry_run(connector_resource_id()) -> boolean().
is_dry_run(ConnectorResId) ->
TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX),
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, ConnectorResId)
end.
-spec check_client_connectivity(pid()) -> -spec check_client_connectivity(pid()) ->
?status_connected ?status_connected
| ?status_disconnected | ?status_disconnected
@ -673,7 +663,7 @@ maybe_clean_error(Reason) ->
-spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom(). -spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
make_client_id(ConnectorResId, BridgeType, BridgeName) -> make_client_id(ConnectorResId, BridgeType, BridgeName) ->
case is_dry_run(ConnectorResId) of case emqx_resource:is_dry_run(ConnectorResId) of
false -> false ->
ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName),
binary_to_atom(ClientID0); binary_to_atom(ClientID0);

View File

@ -137,14 +137,7 @@ create_producers_for_bridge_v2(
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), IsDryRun = emqx_resource:is_dry_run(BridgeV2Id),
IsDryRun =
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, InstId)
end,
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config( WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [ {application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"}, {description, "EMQX Pulsar Bridge"},
{vsn, "0.2.2"}, {vsn, "0.2.3"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -255,7 +255,7 @@ format_servers(Servers0) ->
-spec make_client_id(resource_id()) -> pulsar_client_id(). -spec make_client_id(resource_id()) -> pulsar_client_id().
make_client_id(InstanceId) -> make_client_id(InstanceId) ->
case is_dry_run(InstanceId) of case emqx_resource:is_dry_run(InstanceId) of
true -> true ->
pulsar_producer_probe; pulsar_producer_probe;
false -> false ->
@ -269,14 +269,6 @@ make_client_id(InstanceId) ->
binary_to_atom(ClientIdBin) binary_to_atom(ClientIdBin)
end. end.
-spec is_dry_run(resource_id()) -> boolean().
is_dry_run(InstanceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
case TestIdStart of
nomatch -> false;
_ -> string:equal(TestIdStart, InstanceId)
end.
conn_opts(#{authentication := none}) -> conn_opts(#{authentication := none}) ->
#{}; #{};
conn_opts(#{authentication := #{username := Username, password := Password}}) -> conn_opts(#{authentication := #{username := Username, password := Password}}) ->

View File

@ -140,6 +140,8 @@
validate_name/1 validate_name/1
]). ]).
-export([is_dry_run/1]).
-export_type([ -export_type([
query_mode/0, query_mode/0,
resource_id/0, resource_id/0,
@ -769,6 +771,13 @@ validate_name(Name) ->
_ = validate_name(Name, #{atom_name => false}), _ = validate_name(Name, #{atom_name => false}),
ok. ok.
-spec is_dry_run(resource_id()) -> boolean().
is_dry_run(ResId) ->
case string:find(ResId, ?TEST_ID_PREFIX) of
nomatch -> false;
TestIdStart -> string:equal(TestIdStart, ResId)
end.
validate_name(<<>>, _Opts) -> validate_name(<<>>, _Opts) ->
invalid_data("Name cannot be empty string"); invalid_data("Name cannot be empty string");
validate_name(Name, _Opts) when size(Name) >= 255 -> validate_name(Name, _Opts) when size(Name) >= 255 ->

View File

@ -752,7 +752,8 @@ handle_remove_event(From, ClearMetrics, Data) ->
start_resource(Data, From) -> start_resource(Data, From) ->
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of ResId = Data#data.id,
case emqx_resource:call_start(ResId, Data#data.mod, Data#data.config) of
{ok, ResourceState} -> {ok, ResourceState} ->
UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState}, UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState},
%% Perform an initial health_check immediately before transitioning into a connected state %% Perform an initial health_check immediately before transitioning into a connected state
@ -760,12 +761,13 @@ start_resource(Data, From) ->
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
{next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions}; {next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions};
{error, Reason} = Err -> {error, Reason} = Err ->
?SLOG(warning, #{ IsDryRun = emqx_resource:is_dry_run(ResId),
?SLOG(log_level(IsDryRun), #{
msg => "start_resource_failed", msg => "start_resource_failed",
id => Data#data.id, id => ResId,
reason => Reason reason => Reason
}), }),
_ = maybe_alarm(?status_disconnected, Data#data.id, Err, Data#data.error), _ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error),
%% Add channels and raise alarms %% Add channels and raise alarms
NewData1 = channels_health_check(?status_disconnected, add_channels(Data)), NewData1 = channels_health_check(?status_disconnected, add_channels(Data)),
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
@ -796,9 +798,10 @@ add_channels(Data) ->
add_channels_in_list([], Data) -> add_channels_in_list([], Data) ->
Data; Data;
add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
Id = Data#data.id,
case case
emqx_resource:call_add_channel( emqx_resource:call_add_channel(
Data#data.id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig Id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig
) )
of of
{ok, NewState} -> {ok, NewState} ->
@ -816,9 +819,10 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
}, },
add_channels_in_list(Rest, NewData); add_channels_in_list(Rest, NewData);
{error, Reason} = Error -> {error, Reason} = Error ->
?SLOG(warning, #{ IsDryRun = emqx_resource:is_dry_run(Id),
msg => add_channel_failed, ?SLOG(log_level(IsDryRun), #{
id => Data#data.id, msg => "add_channel_failed",
id => Id,
channel_id => ChannelID, channel_id => ChannelID,
reason => Reason reason => Reason
}), }),
@ -832,7 +836,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
}, },
%% Raise an alarm since the channel could not be added %% Raise an alarm since the channel could not be added
_ = maybe_alarm(?status_disconnected, ChannelID, Error, no_prev_error), _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error),
add_channels_in_list(Rest, NewData) add_channels_in_list(Rest, NewData)
end. end.
@ -856,7 +860,8 @@ stop_resource(#data{id = ResId} = Data) ->
false -> false ->
ok ok
end, end,
_ = maybe_clear_alarm(ResId), IsDryRun = emqx_resource:is_dry_run(ResId),
_ = maybe_clear_alarm(IsDryRun, ResId),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
NewData#data{status = ?rm_status_stopped}. NewData#data{status = ?rm_status_stopped}.
@ -868,15 +873,17 @@ remove_channels_in_list([], Data, _KeepInChannelMap) ->
Data; Data;
remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
AddedChannelsMap = Data#data.added_channels, AddedChannelsMap = Data#data.added_channels,
Id = Data#data.id,
IsDryRun = emqx_resource:is_dry_run(Id),
NewAddedChannelsMap = NewAddedChannelsMap =
case KeepInChannelMap of case KeepInChannelMap of
true -> true ->
AddedChannelsMap; AddedChannelsMap;
false -> false ->
_ = maybe_clear_alarm(ChannelID), _ = maybe_clear_alarm(IsDryRun, ChannelID),
maps:remove(ChannelID, AddedChannelsMap) maps:remove(ChannelID, AddedChannelsMap)
end, end,
case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of case safe_call_remove_channel(Id, Data#data.mod, Data#data.state, ChannelID) of
{ok, NewState} -> {ok, NewState} ->
NewData = Data#data{ NewData = Data#data{
state = NewState, state = NewState,
@ -884,9 +891,9 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
}, },
remove_channels_in_list(Rest, NewData, KeepInChannelMap); remove_channels_in_list(Rest, NewData, KeepInChannelMap);
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{ ?SLOG(log_level(IsDryRun), #{
msg => remove_channel_failed, msg => "remove_channel_failed",
id => Data#data.id, id => Id,
channel_id => ChannelID, channel_id => ChannelID,
reason => Reason reason => Reason
}), }),
@ -968,8 +975,8 @@ handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) ->
handle_remove_channel(From, ChannelId, Data) -> handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
%% Deactivate alarm IsDryRun = emqx_resource:is_dry_run(Data#data.id),
_ = maybe_clear_alarm(ChannelId), _ = maybe_clear_alarm(IsDryRun, ChannelId),
case case
channel_status_is_channel_added( channel_status_is_channel_added(
maps:get(ChannelId, Channels, channel_status_not_added(undefined)) maps:get(ChannelId, Channels, channel_status_not_added(undefined))
@ -990,13 +997,13 @@ handle_remove_channel(From, ChannelId, Data) ->
end. end.
handle_remove_channel_exists(From, ChannelId, Data) -> handle_remove_channel_exists(From, ChannelId, Data) ->
#data{id = Id, added_channels = AddedChannelsMap} = Data,
case case
emqx_resource:call_remove_channel( emqx_resource:call_remove_channel(
Data#data.id, Data#data.mod, Data#data.state, ChannelId Id, Data#data.mod, Data#data.state, ChannelId
) )
of of
{ok, NewState} -> {ok, NewState} ->
AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:remove(ChannelId, AddedChannelsMap), NewAddedChannelsMap = maps:remove(ChannelId, AddedChannelsMap),
UpdatedData = Data#data{ UpdatedData = Data#data{
state = NewState, state = NewState,
@ -1004,10 +1011,10 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
}, },
{keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]}; {keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]};
{error, Reason} = Error -> {error, Reason} = Error ->
%% Log the error as a warning IsDryRun = emqx_resource:is_dry_run(Id),
?SLOG(warning, #{ ?SLOG(log_level(IsDryRun), #{
msg => remove_channel_failed, msg => "remove_channel_failed",
id => Data#data.id, id => Id,
channel_id => ChannelId, channel_id => ChannelId,
reason => Reason reason => Reason
}), }),
@ -1021,7 +1028,8 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
NewChannels = maps:remove(ChannelId, Channels), NewChannels = maps:remove(ChannelId, Channels),
NewData = Data#data{added_channels = NewChannels}, NewData = Data#data{added_channels = NewChannels},
_ = maybe_clear_alarm(ChannelId), IsDryRun = emqx_resource:is_dry_run(Data#data.id),
_ = maybe_clear_alarm(IsDryRun, ChannelId),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}. {keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when
@ -1090,7 +1098,8 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
error = PrevError error = PrevError
} = Data0, } = Data0,
{NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0), {NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0),
_ = maybe_alarm(NewStatus, ResId, Err, PrevError), IsDryRun = emqx_resource:is_dry_run(ResId),
_ = maybe_alarm(NewStatus, IsDryRun, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, NewStatus), ok = maybe_resume_resource_workers(ResId, NewStatus),
Data1 = Data0#data{ Data1 = Data0#data{
state = NewState, status = NewStatus, error = Err state = NewState, status = NewStatus, error = Err
@ -1114,7 +1123,8 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
Actions = Replies ++ resource_health_check_actions(Data), Actions = Replies ++ resource_health_check_actions(Data),
{keep_state, Data, Actions}; {keep_state, Data, Actions};
_ -> _ ->
?SLOG(warning, #{ IsDryRun = emqx_resource:is_dry_run(Data0#data.id),
?SLOG(log_level(IsDryRun), #{
msg => "health_check_failed", msg => "health_check_failed",
id => Data0#data.id, id => Data0#data.id,
status => NewStatus status => NewStatus
@ -1214,7 +1224,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
%% Whenever the resource is connecting: %% Whenever the resource is connecting:
%% 1. Change the status of all added channels to connecting %% 1. Change the status of all added channels to connecting
%% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) %% 2. Raise alarms
Channels = Data0#data.added_channels, Channels = Data0#data.added_channels,
ChannelsToChangeStatusFor = [ ChannelsToChangeStatusFor = [
{ChannelId, Config} {ChannelId, Config}
@ -1240,9 +1250,10 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
|| {ChannelId, NewStatus} <- maps:to_list(NewChannels) || {ChannelId, NewStatus} <- maps:to_list(NewChannels)
], ],
%% Raise alarms for all channels %% Raise alarms for all channels
IsDryRun = emqx_resource:is_dry_run(Data0#data.id),
lists:foreach( lists:foreach(
fun({ChannelId, Status, PrevStatus}) -> fun({ChannelId, Status, PrevStatus}) ->
maybe_alarm(?status_connecting, ChannelId, Status, PrevStatus) maybe_alarm(?status_connecting, IsDryRun, ChannelId, Status, PrevStatus)
end, end,
ChannelsWithNewAndPrevErrorStatuses ChannelsWithNewAndPrevErrorStatuses
), ),
@ -1275,9 +1286,10 @@ channels_health_check(ConnectorStatus, Data0) ->
|| {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels) || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels)
], ],
%% Raise alarms %% Raise alarms
IsDryRun = emqx_resource:is_dry_run(Data1#data.id),
_ = lists:foreach( _ = lists:foreach(
fun({ChannelId, OldStatus, NewStatus}) -> fun({ChannelId, OldStatus, NewStatus}) ->
_ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus) _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus)
end, end,
ChannelsWithNewAndOldStatuses ChannelsWithNewAndOldStatuses
), ),
@ -1386,13 +1398,14 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta
NewStatus = maps:get(ChannelId, Data1#data.added_channels), NewStatus = maps:get(ChannelId, Data1#data.added_channels),
ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
Data = remove_channels_in_list(ChannelsToRemove, Data1, true), Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
IsDryRun = emqx_resource:is_dry_run(Data1#data.id),
%% Raise/clear alarms %% Raise/clear alarms
case NewStatus of case NewStatus of
#{status := ?status_connected} -> #{status := ?status_connected} ->
_ = maybe_clear_alarm(ChannelId), _ = maybe_clear_alarm(IsDryRun, ChannelId),
ok; ok;
_ -> _ ->
_ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus), _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus),
ok ok
end, end,
Data. Data.
@ -1556,15 +1569,21 @@ remove_runtime_data(#data{} = Data0) ->
health_check_interval(Opts) -> health_check_interval(Opts) ->
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
-spec maybe_alarm(resource_status(), resource_id(), _Error :: term(), _PrevError :: term()) -> ok. -spec maybe_alarm(
maybe_alarm(?status_connected, _ResId, _Error, _PrevError) -> resource_status(),
boolean(),
resource_id(),
_Error :: term(),
_PrevError :: term()
) -> ok.
maybe_alarm(?status_connected, _IsDryRun, _ResId, _Error, _PrevError) ->
ok; ok;
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error, _PrevError) -> maybe_alarm(_Status, true, _ResId, _Error, _PrevError) ->
ok; ok;
%% Assume that alarm is already active %% Assume that alarm is already active
maybe_alarm(_Status, _ResId, Error, Error) -> maybe_alarm(_Status, _IsDryRun, _ResId, Error, Error) ->
ok; ok;
maybe_alarm(_Status, ResId, Error, _PrevError) -> maybe_alarm(_Status, false, ResId, Error, _PrevError) ->
HrError = HrError =
case Error of case Error of
{error, undefined} -> {error, undefined} ->
@ -1596,10 +1615,10 @@ maybe_resume_resource_workers(ResId, ?status_connected) ->
maybe_resume_resource_workers(_, _) -> maybe_resume_resource_workers(_, _) ->
ok. ok.
-spec maybe_clear_alarm(resource_id()) -> ok | {error, not_found}. -spec maybe_clear_alarm(boolean(), resource_id()) -> ok | {error, not_found}.
maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) -> maybe_clear_alarm(true, _ResId) ->
ok; ok;
maybe_clear_alarm(ResId) -> maybe_clear_alarm(false, ResId) ->
emqx_alarm:safe_deactivate(ResId). emqx_alarm:safe_deactivate(ResId).
parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> parse_health_check_result(Status, Data) when ?IS_STATUS(Status) ->
@ -1767,10 +1786,14 @@ add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) ->
ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig), ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig),
NewChannels = maps:put(ChannelId, ChannelStatus, Channels), NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
ResStatus = state_to_status(State), ResStatus = state_to_status(State),
maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), IsDryRun = emqx_resource:is_dry_run(ChannelId),
maybe_alarm(ResStatus, IsDryRun, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels}. Data#data{added_channels = NewChannels}.
state_to_status(?state_stopped) -> ?rm_status_stopped; state_to_status(?state_stopped) -> ?rm_status_stopped;
state_to_status(?state_connected) -> ?status_connected; state_to_status(?state_connected) -> ?status_connected;
state_to_status(?state_connecting) -> ?status_connecting; state_to_status(?state_connecting) -> ?status_connecting;
state_to_status(?state_disconnected) -> ?status_disconnected. state_to_status(?state_disconnected) -> ?status_disconnected.
log_level(true) -> info;
log_level(false) -> warning.