feat(queue): do not use ee app from emqx app

This commit is contained in:
Ilya Averyanov 2024-07-04 21:33:22 +03:00
parent 649cf88042
commit b74189570d
1 changed files with 37 additions and 5 deletions

View File

@ -55,10 +55,17 @@
-type scheduled_action_type() ::
{?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe.
-type agent_stream_progress() :: #{
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator(),
use_finished := boolean()
}.
-type scheduled_action() :: #{
type := scheduled_action_type(),
stream_keys_to_wait := [stream_key()],
progresses := [emqx_ds_shared_sub_proto:agent_stream_progress()]
progresses := [agent_stream_progress()]
}.
-type t() :: #{
@ -269,7 +276,7 @@ schedule_unsubscribe(
},
?tp(warning, shared_subs_schedule_unsubscribe_new, #{
topic_filter => TopicFilter,
stream_keys => emqx_ds_shared_sub_proto:format_stream_keys(StreamKeys)
stream_keys => format_stream_keys(StreamKeys)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1}
end.
@ -430,7 +437,7 @@ run_scheduled_action(
[] ->
?tp(warning, shared_subs_schedule_action_complete, #{
topic_filter => TopicFilter,
progresses => emqx_ds_shared_sub_proto:format_streams(Progresses1),
progresses => format_streams(Progresses1),
type => Type
}),
%% Regular progress won't se unsubscribed streams, so we need to
@ -617,11 +624,36 @@ is_use_finished(_S, #srs{unsubscribed = Unsubscribed}) ->
is_stream_fully_acked(S, SRS) ->
emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S).
%%--------------------------------------------------------------------
%% Formatters
%%--------------------------------------------------------------------
format_schedule_action(#{
type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
}) ->
#{
type => Type,
progresses => emqx_ds_shared_sub_proto:format_streams(Progresses),
stream_keys_to_wait => emqx_ds_shared_sub_proto:format_stream_keys(StreamKeysToWait)
progresses => format_streams(Progresses),
stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
}.
format_streams(Streams) ->
lists:map(
fun format_stream/1,
Streams
).
format_stream(#{stream := Stream, iterator := Iterator} = Value) ->
Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}.
format_stream_key({SubId, Stream}) ->
{SubId, format_opaque(Stream)}.
format_stream_keys(StreamKeys) ->
lists:map(
fun format_stream_key/1,
StreamKeys
).
format_opaque(Opaque) ->
erlang:phash2(Opaque).