fix(stomp): enrich sub-opts if sub-id/ack absent

This commit is contained in:
JianBo He 2021-11-02 21:36:03 +08:00
parent 14515e680e
commit 0a7f04caa3
1 changed files with 33 additions and 8 deletions

View File

@ -75,6 +75,8 @@
default_user :: maybe(list())
}).
-define(DEFAULT_SUB_ACK, <<"auto">>).
-define(TIMER_TABLE, #{
incoming_timer => incoming,
outgoing_timer => outgoing,
@ -287,12 +289,17 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
State = #pstate{subscriptions = Subs}) ->
Id = header(<<"id">>, Headers),
Topic = header(<<"destination">>, Headers),
Ack = header(<<"ack">>, Headers, <<"auto">>),
Ack = header(<<"ack">>, Headers, ?DEFAULT_SUB_ACK),
case find_sub_by_id(Id, Subs) of
{Topic, #{sub_props := #{id := Id, ack := Ack}}} ->
{Topic, #{sub_props := #{id := Id}}} ->
?LOG(info, "Subscription has established: ~s", [Topic]),
maybe_send_receipt(receipt_id(Headers), State);
{InuseTopic, #{sub_props := #{id := InuseId}}} ->
?LOG(info, "Subscription id ~p inused by topic: ~s, "
"request topic: ~s", [InuseId, InuseTopic, Topic]),
send(error_frame(receipt_id(Headers),
["Request sub-id ", Id, " inused "]), State);
undefined ->
case check_acl(subscribe, Topic, State) of
allow ->
@ -466,8 +473,9 @@ timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) ->
handle_info({subscribe, TopicFilters}, State) ->
NState = lists:foldl(
fun({TopicFilter, SubOpts}, StateAcc) ->
do_subscribe(TopicFilter, SubOpts#{is_new => true}, StateAcc)
fun({TopicFilter, SubOpts}, StateAcc = #pstate{subscriptions = Subs}) ->
NSubOpts = enrich_sub_opts(SubOpts, Subs),
do_subscribe(TopicFilter, NSubOpts, StateAcc)
end, State, parse_topic_filters(TopicFilters)),
{ok, NState};
@ -646,7 +654,7 @@ do_subscribe(TopicFilter, SubOpts,
NSubOpts = SubOpts#{is_new => true},
_ = run_hooks('session.subscribed',
[ClientInfo, TopicFilter, NSubOpts]),
send_event_to_self(updated),
State#pstate{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}.
do_unsubscribe(TopicFilter, SubOpts,
@ -654,7 +662,7 @@ do_unsubscribe(TopicFilter, SubOpts,
ok = emqx_broker:unsubscribe(TopicFilter),
_ = run_hooks('session.unsubscribe',
[ClientInfo, TopicFilter, SubOpts]),
send_event_to_self(updated),
State#pstate{subscriptions = maps:remove(TopicFilter, Subs)}.
find_sub_by_topic(Topic, Subs) ->
@ -677,6 +685,21 @@ is_acl_enabled(_) ->
%% TODO: configs from somewhere
true.
%% automaticly fill the next sub-id and ack if sub-id is absent
enrich_sub_opts(SubOpts0, Subs) ->
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
SubProps = maps:get(sub_props, SubOpts, #{}),
SubOpts#{sub_props =>
maps:merge(#{id => next_sub_id(Subs),
ack => ?DEFAULT_SUB_ACK}, SubProps)}.
next_sub_id(Subs) ->
Ids = maps:fold(fun(_, SubOpts, Acc) ->
[binary_to_integer(
maps:get(id, maps:get(sub_props, SubOpts, #{}), <<"0">>)) | Acc]
end, [], Subs),
integer_to_binary(lists:max(Ids) + 1).
%%--------------------------------------------------------------------
%% helpers
@ -691,8 +714,7 @@ ensure_connected(State = #pstate{conninfo = ConnInfo,
connected => true,
connected_at => erlang:system_time(millisecond)
},
%% send connected event
self() ! {event, connected},
send_event_to_self(connected),
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
State#pstate{conninfo = NConnInfo,
connected = true
@ -703,6 +725,9 @@ ensure_disconnected(Reason, State = #pstate{conninfo = ConnInfo, clientinfo = Cl
ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]),
State#pstate{conninfo = NConnInfo, connected = false}.
send_event_to_self(Name) ->
self() ! {event, Name}, ok.
run_hooks(Name, Args) ->
emqx_hooks:run(Name, Args).