diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index eb45ef014..0bdbff30a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -55,10 +55,17 @@ -type scheduled_action_type() :: {?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe. + +-type agent_stream_progress() :: #{ + stream := emqx_ds:stream(), + iterator := emqx_ds:iterator(), + use_finished := boolean() +}. + -type scheduled_action() :: #{ type := scheduled_action_type(), stream_keys_to_wait := [stream_key()], - progresses := [emqx_ds_shared_sub_proto:agent_stream_progress()] + progresses := [agent_stream_progress()] }. -type t() :: #{ @@ -269,7 +276,7 @@ schedule_unsubscribe( }, ?tp(warning, shared_subs_schedule_unsubscribe_new, #{ topic_filter => TopicFilter, - stream_keys => emqx_ds_shared_sub_proto:format_stream_keys(StreamKeys) + stream_keys => format_stream_keys(StreamKeys) }), SharedSubS0#{scheduled_actions := ScheduledActions1} end. @@ -430,7 +437,7 @@ run_scheduled_action( [] -> ?tp(warning, shared_subs_schedule_action_complete, #{ topic_filter => TopicFilter, - progresses => emqx_ds_shared_sub_proto:format_streams(Progresses1), + progresses => format_streams(Progresses1), type => Type }), %% Regular progress won't se unsubscribed streams, so we need to @@ -617,11 +624,36 @@ is_use_finished(_S, #srs{unsubscribed = Unsubscribed}) -> is_stream_fully_acked(S, SRS) -> emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S). +%%-------------------------------------------------------------------- +%% Formatters +%%-------------------------------------------------------------------- + format_schedule_action(#{ type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait }) -> #{ type => Type, - progresses => emqx_ds_shared_sub_proto:format_streams(Progresses), - stream_keys_to_wait => emqx_ds_shared_sub_proto:format_stream_keys(StreamKeysToWait) + progresses => format_streams(Progresses), + stream_keys_to_wait => format_stream_keys(StreamKeysToWait) }. + +format_streams(Streams) -> + lists:map( + fun format_stream/1, + Streams + ). + +format_stream(#{stream := Stream, iterator := Iterator} = Value) -> + Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. + +format_stream_key({SubId, Stream}) -> + {SubId, format_opaque(Stream)}. + +format_stream_keys(StreamKeys) -> + lists:map( + fun format_stream_key/1, + StreamKeys + ). + +format_opaque(Opaque) -> + erlang:phash2(Opaque).