feat(queue): reduce logging levels
This commit is contained in:
parent
e17becb84d
commit
9ad65c6ac1
|
@ -241,14 +241,14 @@ schedule_subscribe(
|
||||||
ScheduledActions1 = ScheduledActions0#{
|
ScheduledActions1 = ScheduledActions0#{
|
||||||
ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
|
ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
|
||||||
},
|
},
|
||||||
?tp(warning, shared_subs_schedule_subscribe_override, #{
|
?tp(debug, shared_subs_schedule_subscribe_override, #{
|
||||||
share_topic_filter => ShareTopicFilter,
|
share_topic_filter => ShareTopicFilter,
|
||||||
new_type => {?schedule_subscribe, SubOpts},
|
new_type => {?schedule_subscribe, SubOpts},
|
||||||
old_action => format_schedule_action(ScheduledAction)
|
old_action => format_schedule_action(ScheduledAction)
|
||||||
}),
|
}),
|
||||||
SharedSubS0#{scheduled_actions := ScheduledActions1};
|
SharedSubS0#{scheduled_actions := ScheduledActions1};
|
||||||
_ ->
|
_ ->
|
||||||
?tp(warning, shared_subs_schedule_subscribe_new, #{
|
?tp(debug, shared_subs_schedule_subscribe_new, #{
|
||||||
share_topic_filter => ShareTopicFilter, subopts => SubOpts
|
share_topic_filter => ShareTopicFilter, subopts => SubOpts
|
||||||
}),
|
}),
|
||||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
|
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
|
||||||
|
@ -299,7 +299,7 @@ schedule_unsubscribe(
|
||||||
ScheduledActions1 = ScheduledActions0#{
|
ScheduledActions1 = ScheduledActions0#{
|
||||||
ShareTopicFilter => ScheduledAction1
|
ShareTopicFilter => ScheduledAction1
|
||||||
},
|
},
|
||||||
?tp(warning, shared_subs_schedule_unsubscribe_override, #{
|
?tp(debug, shared_subs_schedule_unsubscribe_override, #{
|
||||||
share_topic_filter => ShareTopicFilter,
|
share_topic_filter => ShareTopicFilter,
|
||||||
new_type => ?schedule_unsubscribe,
|
new_type => ?schedule_unsubscribe,
|
||||||
old_action => format_schedule_action(ScheduledAction0)
|
old_action => format_schedule_action(ScheduledAction0)
|
||||||
|
@ -314,7 +314,7 @@ schedule_unsubscribe(
|
||||||
progresses => []
|
progresses => []
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
?tp(warning, shared_subs_schedule_unsubscribe_new, #{
|
?tp(debug, shared_subs_schedule_unsubscribe_new, #{
|
||||||
share_topic_filter => ShareTopicFilter,
|
share_topic_filter => ShareTopicFilter,
|
||||||
stream_keys => format_stream_keys(StreamKeys)
|
stream_keys => format_stream_keys(StreamKeys)
|
||||||
}),
|
}),
|
||||||
|
@ -339,7 +339,7 @@ renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = Sh
|
||||||
Agent0
|
Agent0
|
||||||
),
|
),
|
||||||
StreamLeaseEvents =/= [] andalso
|
StreamLeaseEvents =/= [] andalso
|
||||||
?tp(warning, shared_subs_new_stream_lease_events, #{
|
?tp(debug, shared_subs_new_stream_lease_events, #{
|
||||||
stream_lease_events => format_lease_events(StreamLeaseEvents)
|
stream_lease_events => format_lease_events(StreamLeaseEvents)
|
||||||
}),
|
}),
|
||||||
S1 = lists:foldl(
|
S1 = lists:foldl(
|
||||||
|
@ -506,7 +506,7 @@ run_scheduled_action(
|
||||||
Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
|
Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
|
||||||
case StreamKeysToWait1 of
|
case StreamKeysToWait1 of
|
||||||
[] ->
|
[] ->
|
||||||
?tp(warning, shared_subs_schedule_action_complete, #{
|
?tp(debug, shared_subs_schedule_action_complete, #{
|
||||||
share_topic_filter => ShareTopicFilter,
|
share_topic_filter => ShareTopicFilter,
|
||||||
progresses => format_stream_progresses(Progresses1),
|
progresses => format_stream_progresses(Progresses1),
|
||||||
type => Type
|
type => Type
|
||||||
|
@ -530,7 +530,7 @@ run_scheduled_action(
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
|
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
|
||||||
?tp(warning, shared_subs_schedule_action_continue, #{
|
?tp(debug, shared_subs_schedule_action_continue, #{
|
||||||
share_topic_filter => ShareTopicFilter,
|
share_topic_filter => ShareTopicFilter,
|
||||||
new_action => format_schedule_action(Action1)
|
new_action => format_schedule_action(Action1)
|
||||||
}),
|
}),
|
||||||
|
|
|
@ -100,7 +100,7 @@ open(TopicSubscriptions, Opts) ->
|
||||||
State0 = init_state(Opts),
|
State0 = init_state(Opts),
|
||||||
State1 = lists:foldl(
|
State1 = lists:foldl(
|
||||||
fun({ShareTopicFilter, #{}}, State) ->
|
fun({ShareTopicFilter, #{}}, State) ->
|
||||||
?tp(warning, ds_agent_open_subscription, #{
|
?tp(debug, ds_agent_open_subscription, #{
|
||||||
topic_filter => ShareTopicFilter
|
topic_filter => ShareTopicFilter
|
||||||
}),
|
}),
|
||||||
add_shared_subscription(State, ShareTopicFilter)
|
add_shared_subscription(State, ShareTopicFilter)
|
||||||
|
@ -120,7 +120,7 @@ can_subscribe(_State, _ShareTopicFilter, _SubOpts) ->
|
||||||
|
|
||||||
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
|
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
|
||||||
on_subscribe(State0, ShareTopicFilter, _SubOpts) ->
|
on_subscribe(State0, ShareTopicFilter, _SubOpts) ->
|
||||||
?tp(warning, ds_agent_on_subscribe, #{
|
?tp(debug, ds_agent_on_subscribe, #{
|
||||||
share_topic_filter => ShareTopicFilter
|
share_topic_filter => ShareTopicFilter
|
||||||
}),
|
}),
|
||||||
add_shared_subscription(State0, ShareTopicFilter).
|
add_shared_subscription(State0, ShareTopicFilter).
|
||||||
|
@ -163,7 +163,7 @@ on_disconnect(#{groups := Groups0} = State, StreamProgresses) ->
|
||||||
|
|
||||||
-spec on_info(t(), term()) -> t().
|
-spec on_info(t(), term()) -> t().
|
||||||
on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Version)) ->
|
on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Version)) ->
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => leader_lease_streams,
|
msg => leader_lease_streams,
|
||||||
group_id => GroupId,
|
group_id => GroupId,
|
||||||
streams => StreamProgresses,
|
streams => StreamProgresses,
|
||||||
|
@ -176,7 +176,7 @@ on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Ve
|
||||||
)
|
)
|
||||||
end);
|
end);
|
||||||
on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) ->
|
on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) ->
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => leader_renew_stream_lease,
|
msg => leader_renew_stream_lease,
|
||||||
group_id => GroupId,
|
group_id => GroupId,
|
||||||
version => Version
|
version => Version
|
||||||
|
@ -185,7 +185,7 @@ on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) ->
|
||||||
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version)
|
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version)
|
||||||
end);
|
end);
|
||||||
on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)) ->
|
on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)) ->
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => leader_renew_stream_lease,
|
msg => leader_renew_stream_lease,
|
||||||
group_id => GroupId,
|
group_id => GroupId,
|
||||||
version_old => VersionOld,
|
version_old => VersionOld,
|
||||||
|
@ -195,7 +195,7 @@ on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)
|
||||||
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew)
|
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew)
|
||||||
end);
|
end);
|
||||||
on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew)) ->
|
on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew)) ->
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => leader_update_streams,
|
msg => leader_update_streams,
|
||||||
group_id => GroupId,
|
group_id => GroupId,
|
||||||
version_old => VersionOld,
|
version_old => VersionOld,
|
||||||
|
@ -208,7 +208,7 @@ on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, Str
|
||||||
)
|
)
|
||||||
end);
|
end);
|
||||||
on_info(State, ?leader_invalidate_match(GroupId)) ->
|
on_info(State, ?leader_invalidate_match(GroupId)) ->
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => leader_invalidate,
|
msg => leader_invalidate,
|
||||||
group_id => GroupId
|
group_id => GroupId
|
||||||
}),
|
}),
|
||||||
|
@ -245,7 +245,7 @@ delete_shared_subscription(State, ShareTopicFilter, GroupProgress) ->
|
||||||
add_shared_subscription(
|
add_shared_subscription(
|
||||||
#{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter
|
#{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter
|
||||||
) ->
|
) ->
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => agent_add_shared_subscription,
|
msg => agent_add_shared_subscription,
|
||||||
share_topic_filter => ShareTopicFilter
|
share_topic_filter => ShareTopicFilter
|
||||||
}),
|
}),
|
||||||
|
|
|
@ -120,7 +120,7 @@ new(#{
|
||||||
send_after := SendAfter
|
send_after := SendAfter
|
||||||
}) ->
|
}) ->
|
||||||
?SLOG(
|
?SLOG(
|
||||||
info,
|
debug,
|
||||||
#{
|
#{
|
||||||
msg => group_sm_new,
|
msg => group_sm_new,
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
|
@ -133,7 +133,7 @@ new(#{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
send_after => SendAfter
|
send_after => SendAfter
|
||||||
},
|
},
|
||||||
?tp(warning, group_sm_new, #{
|
?tp(debug, group_sm_new, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
share_topic_filter => ShareTopicFilter
|
share_topic_filter => ShareTopicFilter
|
||||||
}),
|
}),
|
||||||
|
@ -176,7 +176,7 @@ handle_disconnect(
|
||||||
%% Connecting state
|
%% Connecting state
|
||||||
|
|
||||||
handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
|
handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
|
||||||
?tp(warning, group_sm_enter_connecting, #{
|
?tp(debug, group_sm_enter_connecting, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
share_topic_filter => ShareTopicFilter
|
share_topic_filter => ShareTopicFilter
|
||||||
}),
|
}),
|
||||||
|
@ -264,7 +264,7 @@ handle_leader_update_streams(
|
||||||
VersionNew,
|
VersionNew,
|
||||||
StreamProgresses
|
StreamProgresses
|
||||||
) ->
|
) ->
|
||||||
?tp(warning, shared_sub_group_sm_leader_update_streams, #{
|
?tp(debug, shared_sub_group_sm_leader_update_streams, #{
|
||||||
id => Id,
|
id => Id,
|
||||||
version_old => VersionOld,
|
version_old => VersionOld,
|
||||||
version_new => VersionNew,
|
version_new => VersionNew,
|
||||||
|
@ -305,7 +305,7 @@ handle_leader_update_streams(
|
||||||
maps:keys(Streams1)
|
maps:keys(Streams1)
|
||||||
),
|
),
|
||||||
StreamLeaseEvents = AddEvents ++ RevokeEvents,
|
StreamLeaseEvents = AddEvents ++ RevokeEvents,
|
||||||
?tp(warning, shared_sub_group_sm_leader_update_streams, #{
|
?tp(debug, shared_sub_group_sm_leader_update_streams, #{
|
||||||
id => Id,
|
id => Id,
|
||||||
stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events(
|
stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events(
|
||||||
StreamLeaseEvents
|
StreamLeaseEvents
|
||||||
|
@ -435,24 +435,11 @@ handle_leader_invalidate(#{agent := Agent, share_topic_filter := ShareTopicFilte
|
||||||
%% Internal API
|
%% Internal API
|
||||||
%%-----------------------------------------------------------------------
|
%%-----------------------------------------------------------------------
|
||||||
|
|
||||||
handle_state_timeout(
|
handle_state_timeout(#{state := ?connecting} = GSM, find_leader_timeout, _Message) ->
|
||||||
#{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM,
|
|
||||||
find_leader_timeout,
|
|
||||||
_Message
|
|
||||||
) ->
|
|
||||||
?tp(debug, find_leader_timeout, #{share_topic_filter => ShareTopicFilter}),
|
|
||||||
handle_find_leader_timeout(GSM);
|
handle_find_leader_timeout(GSM);
|
||||||
handle_state_timeout(
|
handle_state_timeout(#{state := ?replaying} = GSM, renew_lease_timeout, _Message) ->
|
||||||
#{state := ?replaying} = GSM,
|
|
||||||
renew_lease_timeout,
|
|
||||||
_Message
|
|
||||||
) ->
|
|
||||||
handle_renew_lease_timeout(GSM);
|
handle_renew_lease_timeout(GSM);
|
||||||
handle_state_timeout(
|
handle_state_timeout(GSM, update_stream_state_timeout, _Message) ->
|
||||||
GSM,
|
|
||||||
update_stream_state_timeout,
|
|
||||||
_Message
|
|
||||||
) ->
|
|
||||||
?tp(debug, update_stream_state_timeout, #{}),
|
?tp(debug, update_stream_state_timeout, #{}),
|
||||||
handle_stream_progress(GSM, []).
|
handle_stream_progress(GSM, []).
|
||||||
|
|
||||||
|
|
|
@ -164,7 +164,7 @@ handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_regist
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% repalying state
|
%% repalying state
|
||||||
handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) ->
|
handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) ->
|
||||||
?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic}),
|
?tp(debug, shared_sub_leader_enter_actve, #{topic => Topic}),
|
||||||
{keep_state_and_data, [
|
{keep_state_and_data, [
|
||||||
{{timeout, #renew_streams{}}, 0, #renew_streams{}},
|
{{timeout, #renew_streams{}}, 0, #renew_streams{}},
|
||||||
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}},
|
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}},
|
||||||
|
@ -174,7 +174,7 @@ handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) ->
|
||||||
%% timers
|
%% timers
|
||||||
%% renew_streams timer
|
%% renew_streams timer
|
||||||
handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) ->
|
handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) ->
|
||||||
% ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_streams}),
|
?tp(debug, shared_sub_leader_timeout, #{timeout => renew_streams}),
|
||||||
Data1 = renew_streams(Data0),
|
Data1 = renew_streams(Data0),
|
||||||
{keep_state, Data1,
|
{keep_state, Data1,
|
||||||
{
|
{
|
||||||
|
@ -184,7 +184,7 @@ handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data
|
||||||
}};
|
}};
|
||||||
%% renew_leases timer
|
%% renew_leases timer
|
||||||
handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) ->
|
handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) ->
|
||||||
% ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_leases}),
|
?tp(debug, shared_sub_leader_timeout, #{timeout => renew_leases}),
|
||||||
Data1 = renew_leases(Data0),
|
Data1 = renew_leases(Data0),
|
||||||
{keep_state, Data1,
|
{keep_state, Data1,
|
||||||
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}};
|
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}};
|
||||||
|
@ -279,7 +279,7 @@ renew_streams(
|
||||||
Data2 = Data1#{stream_states => NewStreamStates, rank_progress => RankProgress1},
|
Data2 = Data1#{stream_states => NewStreamStates, rank_progress => RankProgress1},
|
||||||
Data3 = revoke_streams(Data2),
|
Data3 = revoke_streams(Data2),
|
||||||
Data4 = assign_streams(Data3),
|
Data4 = assign_streams(Data3),
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => leader_renew_streams,
|
msg => leader_renew_streams,
|
||||||
topic_filter => TopicFilter,
|
topic_filter => TopicFilter,
|
||||||
new_streams => length(NewStreamsWRanks)
|
new_streams => length(NewStreamsWRanks)
|
||||||
|
@ -368,7 +368,7 @@ revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) ->
|
||||||
false ->
|
false ->
|
||||||
AgentState0;
|
AgentState0;
|
||||||
true ->
|
true ->
|
||||||
?tp(warning, shared_sub_leader_revoke_streams, #{
|
?tp(debug, shared_sub_leader_revoke_streams, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
agent_stream_count => length(Streams0),
|
agent_stream_count => length(Streams0),
|
||||||
revoke_count => RevokeCount,
|
revoke_count => RevokeCount,
|
||||||
|
@ -421,7 +421,7 @@ assign_lacking_streams(Data0, Agent, DesiredCount) ->
|
||||||
false ->
|
false ->
|
||||||
Data0;
|
Data0;
|
||||||
true ->
|
true ->
|
||||||
?tp(warning, shared_sub_leader_assign_streams, #{
|
?tp(debug, shared_sub_leader_assign_streams, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
agent_stream_count => length(Streams0),
|
agent_stream_count => length(Streams0),
|
||||||
assign_count => AssignCount,
|
assign_count => AssignCount,
|
||||||
|
@ -449,7 +449,7 @@ select_streams_for_assign(Data0, _Agent, AssignCount) ->
|
||||||
%% renew_leases - send lease confirmations to agents
|
%% renew_leases - send lease confirmations to agents
|
||||||
|
|
||||||
renew_leases(#{agents := AgentStates} = Data) ->
|
renew_leases(#{agents := AgentStates} = Data) ->
|
||||||
?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
|
?tp(debug, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
|
||||||
ok = lists:foreach(
|
ok = lists:foreach(
|
||||||
fun({Agent, AgentState}) ->
|
fun({Agent, AgentState}) ->
|
||||||
renew_lease(Data, Agent, AgentState)
|
renew_lease(Data, Agent, AgentState)
|
||||||
|
@ -492,7 +492,7 @@ drop_timeout_agents(#{agents := Agents} = Data) ->
|
||||||
(is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now)
|
(is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now)
|
||||||
of
|
of
|
||||||
true ->
|
true ->
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => leader_agent_timeout,
|
msg => leader_agent_timeout,
|
||||||
now => Now,
|
now => Now,
|
||||||
update_deadline => UpdateDeadline,
|
update_deadline => UpdateDeadline,
|
||||||
|
@ -516,14 +516,14 @@ connect_agent(
|
||||||
Agent,
|
Agent,
|
||||||
AgentMetadata
|
AgentMetadata
|
||||||
) ->
|
) ->
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => leader_agent_connected,
|
msg => leader_agent_connected,
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
group_id => GroupId
|
group_id => GroupId
|
||||||
}),
|
}),
|
||||||
case Agents of
|
case Agents of
|
||||||
#{Agent := AgentState} ->
|
#{Agent := AgentState} ->
|
||||||
?tp(warning, shared_sub_leader_agent_already_connected, #{
|
?tp(debug, shared_sub_leader_agent_already_connected, #{
|
||||||
agent => Agent
|
agent => Agent
|
||||||
}),
|
}),
|
||||||
reconnect_agent(Data, Agent, AgentMetadata, AgentState);
|
reconnect_agent(Data, Agent, AgentMetadata, AgentState);
|
||||||
|
@ -546,7 +546,7 @@ reconnect_agent(
|
||||||
AgentMetadata,
|
AgentMetadata,
|
||||||
#{streams := OldStreams, revoked_streams := OldRevokedStreams} = _OldAgentState
|
#{streams := OldStreams, revoked_streams := OldRevokedStreams} = _OldAgentState
|
||||||
) ->
|
) ->
|
||||||
?tp(warning, shared_sub_leader_agent_reconnect, #{
|
?tp(debug, shared_sub_leader_agent_reconnect, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
agent_metadata => AgentMetadata,
|
agent_metadata => AgentMetadata,
|
||||||
inherited_streams => OldStreams
|
inherited_streams => OldStreams
|
||||||
|
@ -767,7 +767,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
|
||||||
disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) ->
|
disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) ->
|
||||||
case get_agent_state(Data0, Agent) of
|
case get_agent_state(Data0, Agent) of
|
||||||
#{version := Version} ->
|
#{version := Version} ->
|
||||||
?tp(warning, shared_sub_leader_disconnect_agent, #{
|
?tp(debug, shared_sub_leader_disconnect_agent, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
version => Version
|
version => Version
|
||||||
}),
|
}),
|
||||||
|
@ -794,7 +794,7 @@ agent_transition_to_waiting_updating(
|
||||||
Streams,
|
Streams,
|
||||||
RevokedStreams
|
RevokedStreams
|
||||||
) ->
|
) ->
|
||||||
?tp(warning, shared_sub_leader_agent_state_transition, #{
|
?tp(debug, shared_sub_leader_agent_state_transition, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
old_state => OldState,
|
old_state => OldState,
|
||||||
new_state => ?waiting_updating
|
new_state => ?waiting_updating
|
||||||
|
@ -818,7 +818,7 @@ agent_transition_to_waiting_updating(
|
||||||
agent_transition_to_waiting_replaying(
|
agent_transition_to_waiting_replaying(
|
||||||
#{group_id := GroupId} = _Data, Agent, #{state := OldState, version := Version} = AgentState0
|
#{group_id := GroupId} = _Data, Agent, #{state := OldState, version := Version} = AgentState0
|
||||||
) ->
|
) ->
|
||||||
?tp(warning, shared_sub_leader_agent_state_transition, #{
|
?tp(debug, shared_sub_leader_agent_state_transition, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
old_state => OldState,
|
old_state => OldState,
|
||||||
new_state => ?waiting_replaying
|
new_state => ?waiting_replaying
|
||||||
|
@ -833,7 +833,7 @@ agent_transition_to_waiting_replaying(
|
||||||
agent_transition_to_initial_waiting_replaying(
|
agent_transition_to_initial_waiting_replaying(
|
||||||
#{group_id := GroupId} = Data, Agent, AgentMetadata, InitialStreams
|
#{group_id := GroupId} = Data, Agent, AgentMetadata, InitialStreams
|
||||||
) ->
|
) ->
|
||||||
?tp(warning, shared_sub_leader_agent_state_transition, #{
|
?tp(debug, shared_sub_leader_agent_state_transition, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
old_state => none,
|
old_state => none,
|
||||||
new_state => ?waiting_replaying
|
new_state => ?waiting_replaying
|
||||||
|
@ -856,7 +856,7 @@ agent_transition_to_initial_waiting_replaying(
|
||||||
renew_no_replaying_deadline(AgentState).
|
renew_no_replaying_deadline(AgentState).
|
||||||
|
|
||||||
agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) ->
|
agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) ->
|
||||||
?tp(warning, shared_sub_leader_agent_state_transition, #{
|
?tp(debug, shared_sub_leader_agent_state_transition, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
old_state => ?waiting_replaying,
|
old_state => ?waiting_replaying,
|
||||||
new_state => ?replaying
|
new_state => ?replaying
|
||||||
|
@ -868,7 +868,7 @@ agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState
|
||||||
}.
|
}.
|
||||||
|
|
||||||
agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) ->
|
agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) ->
|
||||||
?tp(warning, shared_sub_leader_agent_state_transition, #{
|
?tp(debug, shared_sub_leader_agent_state_transition, #{
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
old_state => ?waiting_updating,
|
old_state => ?waiting_updating,
|
||||||
new_state => ?updating
|
new_state => ?updating
|
||||||
|
@ -995,7 +995,7 @@ drop_agent(#{agents := Agents} = Data0, Agent) ->
|
||||||
#{streams := Streams, revoked_streams := RevokedStreams} = AgentState,
|
#{streams := Streams, revoked_streams := RevokedStreams} = AgentState,
|
||||||
AllStreams = Streams ++ RevokedStreams,
|
AllStreams = Streams ++ RevokedStreams,
|
||||||
Data1 = unassign_streams(Data0, AllStreams),
|
Data1 = unassign_streams(Data0, AllStreams),
|
||||||
?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}),
|
?tp(debug, shared_sub_leader_drop_agent, #{agent => Agent}),
|
||||||
Data1#{agents => maps:remove(Agent, Agents)}.
|
Data1#{agents => maps:remove(Agent, Agents)}.
|
||||||
|
|
||||||
invalidate_agent(#{group_id := GroupId}, Agent) ->
|
invalidate_agent(#{group_id := GroupId}, Agent) ->
|
||||||
|
|
|
@ -55,7 +55,7 @@ set_replayed({{RankX, RankY}, Stream}, State) ->
|
||||||
State#{RankX => #{min_y => MinY, ys => Ys2}};
|
State#{RankX => #{min_y => MinY, ys => Ys2}};
|
||||||
_ ->
|
_ ->
|
||||||
?SLOG(
|
?SLOG(
|
||||||
warning,
|
debug,
|
||||||
#{
|
#{
|
||||||
msg => leader_rank_progress_double_or_invalid_update,
|
msg => leader_rank_progress_double_or_invalid_update,
|
||||||
rank_x => RankX,
|
rank_x => RankX,
|
||||||
|
|
|
@ -113,7 +113,7 @@ do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State) ->
|
||||||
Pid ->
|
Pid ->
|
||||||
Pid
|
Pid
|
||||||
end,
|
end,
|
||||||
?SLOG(info, #{
|
?SLOG(debug, #{
|
||||||
msg => lookup_leader,
|
msg => lookup_leader,
|
||||||
agent => Agent,
|
agent => Agent,
|
||||||
share_topic_filter => ShareTopicFilter,
|
share_topic_filter => ShareTopicFilter,
|
||||||
|
|
|
@ -417,7 +417,7 @@ t_lease_reconnect(_Config) ->
|
||||||
|
|
||||||
?assertWaitEvent(
|
?assertWaitEvent(
|
||||||
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1),
|
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1),
|
||||||
#{?snk_kind := find_leader_timeout},
|
#{?snk_kind := group_sm_find_leader_timeout},
|
||||||
5_000
|
5_000
|
||||||
),
|
),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue