diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 10d41e91e..76f9e0a54 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -241,14 +241,14 @@ schedule_subscribe( ScheduledActions1 = ScheduledActions0#{ ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}} }, - ?tp(warning, shared_subs_schedule_subscribe_override, #{ + ?tp(debug, shared_subs_schedule_subscribe_override, #{ share_topic_filter => ShareTopicFilter, new_type => {?schedule_subscribe, SubOpts}, old_action => format_schedule_action(ScheduledAction) }), SharedSubS0#{scheduled_actions := ScheduledActions1}; _ -> - ?tp(warning, shared_subs_schedule_subscribe_new, #{ + ?tp(debug, shared_subs_schedule_subscribe_new, #{ share_topic_filter => ShareTopicFilter, subopts => SubOpts }), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe( @@ -299,7 +299,7 @@ schedule_unsubscribe( ScheduledActions1 = ScheduledActions0#{ ShareTopicFilter => ScheduledAction1 }, - ?tp(warning, shared_subs_schedule_unsubscribe_override, #{ + ?tp(debug, shared_subs_schedule_unsubscribe_override, #{ share_topic_filter => ShareTopicFilter, new_type => ?schedule_unsubscribe, old_action => format_schedule_action(ScheduledAction0) @@ -314,7 +314,7 @@ schedule_unsubscribe( progresses => [] } }, - ?tp(warning, shared_subs_schedule_unsubscribe_new, #{ + ?tp(debug, shared_subs_schedule_unsubscribe_new, #{ share_topic_filter => ShareTopicFilter, stream_keys => format_stream_keys(StreamKeys) }), @@ -339,7 +339,7 @@ renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = Sh Agent0 ), StreamLeaseEvents =/= [] andalso - ?tp(warning, shared_subs_new_stream_lease_events, #{ + ?tp(debug, shared_subs_new_stream_lease_events, #{ stream_lease_events => format_lease_events(StreamLeaseEvents) }), S1 = lists:foldl( @@ -506,7 +506,7 @@ run_scheduled_action( Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0, case StreamKeysToWait1 of [] -> - ?tp(warning, shared_subs_schedule_action_complete, #{ + ?tp(debug, shared_subs_schedule_action_complete, #{ share_topic_filter => ShareTopicFilter, progresses => format_stream_progresses(Progresses1), type => Type @@ -530,7 +530,7 @@ run_scheduled_action( end; _ -> Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1}, - ?tp(warning, shared_subs_schedule_action_continue, #{ + ?tp(debug, shared_subs_schedule_action_continue, #{ share_topic_filter => ShareTopicFilter, new_action => format_schedule_action(Action1) }), diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index a90f1286d..1504ea697 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -100,7 +100,7 @@ open(TopicSubscriptions, Opts) -> State0 = init_state(Opts), State1 = lists:foldl( fun({ShareTopicFilter, #{}}, State) -> - ?tp(warning, ds_agent_open_subscription, #{ + ?tp(debug, ds_agent_open_subscription, #{ topic_filter => ShareTopicFilter }), add_shared_subscription(State, ShareTopicFilter) @@ -120,7 +120,7 @@ can_subscribe(_State, _ShareTopicFilter, _SubOpts) -> -spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t(). on_subscribe(State0, ShareTopicFilter, _SubOpts) -> - ?tp(warning, ds_agent_on_subscribe, #{ + ?tp(debug, ds_agent_on_subscribe, #{ share_topic_filter => ShareTopicFilter }), add_shared_subscription(State0, ShareTopicFilter). @@ -163,7 +163,7 @@ on_disconnect(#{groups := Groups0} = State, StreamProgresses) -> -spec on_info(t(), term()) -> t(). on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Version)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_lease_streams, group_id => GroupId, streams => StreamProgresses, @@ -176,7 +176,7 @@ on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Ve ) end); on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_renew_stream_lease, group_id => GroupId, version => Version @@ -185,7 +185,7 @@ on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) -> emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version) end); on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_renew_stream_lease, group_id => GroupId, version_old => VersionOld, @@ -195,7 +195,7 @@ on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew) emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) end); on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_update_streams, group_id => GroupId, version_old => VersionOld, @@ -208,7 +208,7 @@ on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, Str ) end); on_info(State, ?leader_invalidate_match(GroupId)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_invalidate, group_id => GroupId }), @@ -245,7 +245,7 @@ delete_shared_subscription(State, ShareTopicFilter, GroupProgress) -> add_shared_subscription( #{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter ) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => agent_add_shared_subscription, share_topic_filter => ShareTopicFilter }), diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index 4cae98f65..ec55cf359 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -120,7 +120,7 @@ new(#{ send_after := SendAfter }) -> ?SLOG( - info, + debug, #{ msg => group_sm_new, agent => Agent, @@ -133,7 +133,7 @@ new(#{ agent => Agent, send_after => SendAfter }, - ?tp(warning, group_sm_new, #{ + ?tp(debug, group_sm_new, #{ agent => Agent, share_topic_filter => ShareTopicFilter }), @@ -176,7 +176,7 @@ handle_disconnect( %% Connecting state handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) -> - ?tp(warning, group_sm_enter_connecting, #{ + ?tp(debug, group_sm_enter_connecting, #{ agent => Agent, share_topic_filter => ShareTopicFilter }), @@ -264,7 +264,7 @@ handle_leader_update_streams( VersionNew, StreamProgresses ) -> - ?tp(warning, shared_sub_group_sm_leader_update_streams, #{ + ?tp(debug, shared_sub_group_sm_leader_update_streams, #{ id => Id, version_old => VersionOld, version_new => VersionNew, @@ -305,7 +305,7 @@ handle_leader_update_streams( maps:keys(Streams1) ), StreamLeaseEvents = AddEvents ++ RevokeEvents, - ?tp(warning, shared_sub_group_sm_leader_update_streams, #{ + ?tp(debug, shared_sub_group_sm_leader_update_streams, #{ id => Id, stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events( StreamLeaseEvents @@ -435,24 +435,11 @@ handle_leader_invalidate(#{agent := Agent, share_topic_filter := ShareTopicFilte %% Internal API %%----------------------------------------------------------------------- -handle_state_timeout( - #{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM, - find_leader_timeout, - _Message -) -> - ?tp(debug, find_leader_timeout, #{share_topic_filter => ShareTopicFilter}), +handle_state_timeout(#{state := ?connecting} = GSM, find_leader_timeout, _Message) -> handle_find_leader_timeout(GSM); -handle_state_timeout( - #{state := ?replaying} = GSM, - renew_lease_timeout, - _Message -) -> +handle_state_timeout(#{state := ?replaying} = GSM, renew_lease_timeout, _Message) -> handle_renew_lease_timeout(GSM); -handle_state_timeout( - GSM, - update_stream_state_timeout, - _Message -) -> +handle_state_timeout(GSM, update_stream_state_timeout, _Message) -> ?tp(debug, update_stream_state_timeout, #{}), handle_stream_progress(GSM, []). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 912253205..877e59c2a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -164,7 +164,7 @@ handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_regist %%-------------------------------------------------------------------- %% repalying state handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) -> - ?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic}), + ?tp(debug, shared_sub_leader_enter_actve, #{topic => Topic}), {keep_state_and_data, [ {{timeout, #renew_streams{}}, 0, #renew_streams{}}, {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}, @@ -174,7 +174,7 @@ handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) -> %% timers %% renew_streams timer handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) -> - % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_streams}), + ?tp(debug, shared_sub_leader_timeout, #{timeout => renew_streams}), Data1 = renew_streams(Data0), {keep_state, Data1, { @@ -184,7 +184,7 @@ handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data }}; %% renew_leases timer handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) -> - % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_leases}), + ?tp(debug, shared_sub_leader_timeout, #{timeout => renew_leases}), Data1 = renew_leases(Data0), {keep_state, Data1, {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}}; @@ -279,7 +279,7 @@ renew_streams( Data2 = Data1#{stream_states => NewStreamStates, rank_progress => RankProgress1}, Data3 = revoke_streams(Data2), Data4 = assign_streams(Data3), - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_renew_streams, topic_filter => TopicFilter, new_streams => length(NewStreamsWRanks) @@ -368,7 +368,7 @@ revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) -> false -> AgentState0; true -> - ?tp(warning, shared_sub_leader_revoke_streams, #{ + ?tp(debug, shared_sub_leader_revoke_streams, #{ agent => Agent, agent_stream_count => length(Streams0), revoke_count => RevokeCount, @@ -421,7 +421,7 @@ assign_lacking_streams(Data0, Agent, DesiredCount) -> false -> Data0; true -> - ?tp(warning, shared_sub_leader_assign_streams, #{ + ?tp(debug, shared_sub_leader_assign_streams, #{ agent => Agent, agent_stream_count => length(Streams0), assign_count => AssignCount, @@ -449,7 +449,7 @@ select_streams_for_assign(Data0, _Agent, AssignCount) -> %% renew_leases - send lease confirmations to agents renew_leases(#{agents := AgentStates} = Data) -> - ?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}), + ?tp(debug, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}), ok = lists:foreach( fun({Agent, AgentState}) -> renew_lease(Data, Agent, AgentState) @@ -492,7 +492,7 @@ drop_timeout_agents(#{agents := Agents} = Data) -> (is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now) of true -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_agent_timeout, now => Now, update_deadline => UpdateDeadline, @@ -516,14 +516,14 @@ connect_agent( Agent, AgentMetadata ) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_agent_connected, agent => Agent, group_id => GroupId }), case Agents of #{Agent := AgentState} -> - ?tp(warning, shared_sub_leader_agent_already_connected, #{ + ?tp(debug, shared_sub_leader_agent_already_connected, #{ agent => Agent }), reconnect_agent(Data, Agent, AgentMetadata, AgentState); @@ -546,7 +546,7 @@ reconnect_agent( AgentMetadata, #{streams := OldStreams, revoked_streams := OldRevokedStreams} = _OldAgentState ) -> - ?tp(warning, shared_sub_leader_agent_reconnect, #{ + ?tp(debug, shared_sub_leader_agent_reconnect, #{ agent => Agent, agent_metadata => AgentMetadata, inherited_streams => OldStreams @@ -767,7 +767,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) -> case get_agent_state(Data0, Agent) of #{version := Version} -> - ?tp(warning, shared_sub_leader_disconnect_agent, #{ + ?tp(debug, shared_sub_leader_disconnect_agent, #{ agent => Agent, version => Version }), @@ -794,7 +794,7 @@ agent_transition_to_waiting_updating( Streams, RevokedStreams ) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => OldState, new_state => ?waiting_updating @@ -818,7 +818,7 @@ agent_transition_to_waiting_updating( agent_transition_to_waiting_replaying( #{group_id := GroupId} = _Data, Agent, #{state := OldState, version := Version} = AgentState0 ) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => OldState, new_state => ?waiting_replaying @@ -833,7 +833,7 @@ agent_transition_to_waiting_replaying( agent_transition_to_initial_waiting_replaying( #{group_id := GroupId} = Data, Agent, AgentMetadata, InitialStreams ) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => none, new_state => ?waiting_replaying @@ -856,7 +856,7 @@ agent_transition_to_initial_waiting_replaying( renew_no_replaying_deadline(AgentState). agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => ?waiting_replaying, new_state => ?replaying @@ -868,7 +868,7 @@ agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState }. agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => ?waiting_updating, new_state => ?updating @@ -995,7 +995,7 @@ drop_agent(#{agents := Agents} = Data0, Agent) -> #{streams := Streams, revoked_streams := RevokedStreams} = AgentState, AllStreams = Streams ++ RevokedStreams, Data1 = unassign_streams(Data0, AllStreams), - ?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}), + ?tp(debug, shared_sub_leader_drop_agent, #{agent => Agent}), Data1#{agents => maps:remove(Agent, Agents)}. invalidate_agent(#{group_id := GroupId}, Agent) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl index fa611463d..a07c15e4d 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl @@ -55,7 +55,7 @@ set_replayed({{RankX, RankY}, Stream}, State) -> State#{RankX => #{min_y => MinY, ys => Ys2}}; _ -> ?SLOG( - warning, + debug, #{ msg => leader_rank_progress_double_or_invalid_update, rank_x => RankX, diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl index eae212458..16b672e19 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl @@ -113,7 +113,7 @@ do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State) -> Pid -> Pid end, - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => lookup_leader, agent => Agent, share_topic_filter => ShareTopicFilter, diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 4f99a8455..a38045023 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -417,7 +417,7 @@ t_lease_reconnect(_Config) -> ?assertWaitEvent( {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1), - #{?snk_kind := find_leader_timeout}, + #{?snk_kind := group_sm_find_leader_timeout}, 5_000 ),