diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index eb628ce47..0dbfbb13b 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -75,6 +75,8 @@ default_user :: maybe(list()) }). +-define(DEFAULT_SUB_ACK, <<"auto">>). + -define(TIMER_TABLE, #{ incoming_timer => incoming, outgoing_timer => outgoing, @@ -287,12 +289,17 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, State = #pstate{subscriptions = Subs}) -> Id = header(<<"id">>, Headers), Topic = header(<<"destination">>, Headers), - Ack = header(<<"ack">>, Headers, <<"auto">>), + Ack = header(<<"ack">>, Headers, ?DEFAULT_SUB_ACK), case find_sub_by_id(Id, Subs) of - {Topic, #{sub_props := #{id := Id, ack := Ack}}} -> + {Topic, #{sub_props := #{id := Id}}} -> ?LOG(info, "Subscription has established: ~s", [Topic]), maybe_send_receipt(receipt_id(Headers), State); + {InuseTopic, #{sub_props := #{id := InuseId}}} -> + ?LOG(info, "Subscription id ~p inused by topic: ~s, " + "request topic: ~s", [InuseId, InuseTopic, Topic]), + send(error_frame(receipt_id(Headers), + ["Request sub-id ", Id, " inused "]), State); undefined -> case check_acl(subscribe, Topic, State) of allow -> @@ -466,8 +473,9 @@ timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) -> handle_info({subscribe, TopicFilters}, State) -> NState = lists:foldl( - fun({TopicFilter, SubOpts}, StateAcc) -> - do_subscribe(TopicFilter, SubOpts#{is_new => true}, StateAcc) + fun({TopicFilter, SubOpts}, StateAcc = #pstate{subscriptions = Subs}) -> + NSubOpts = enrich_sub_opts(SubOpts, Subs), + do_subscribe(TopicFilter, NSubOpts, StateAcc) end, State, parse_topic_filters(TopicFilters)), {ok, NState}; @@ -646,7 +654,7 @@ do_subscribe(TopicFilter, SubOpts, NSubOpts = SubOpts#{is_new => true}, _ = run_hooks('session.subscribed', [ClientInfo, TopicFilter, NSubOpts]), - + send_event_to_self(updated), State#pstate{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}. do_unsubscribe(TopicFilter, SubOpts, @@ -654,7 +662,7 @@ do_unsubscribe(TopicFilter, SubOpts, ok = emqx_broker:unsubscribe(TopicFilter), _ = run_hooks('session.unsubscribe', [ClientInfo, TopicFilter, SubOpts]), - + send_event_to_self(updated), State#pstate{subscriptions = maps:remove(TopicFilter, Subs)}. find_sub_by_topic(Topic, Subs) -> @@ -677,6 +685,21 @@ is_acl_enabled(_) -> %% TODO: configs from somewhere true. +%% automaticly fill the next sub-id and ack if sub-id is absent +enrich_sub_opts(SubOpts0, Subs) -> + SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), + SubProps = maps:get(sub_props, SubOpts, #{}), + SubOpts#{sub_props => + maps:merge(#{id => next_sub_id(Subs), + ack => ?DEFAULT_SUB_ACK}, SubProps)}. + +next_sub_id(Subs) -> + Ids = maps:fold(fun(_, SubOpts, Acc) -> + [binary_to_integer( + maps:get(id, maps:get(sub_props, SubOpts, #{}), <<"0">>)) | Acc] + end, [], Subs), + integer_to_binary(lists:max(Ids) + 1). + %%-------------------------------------------------------------------- %% helpers @@ -691,8 +714,7 @@ ensure_connected(State = #pstate{conninfo = ConnInfo, connected => true, connected_at => erlang:system_time(millisecond) }, - %% send connected event - self() ! {event, connected}, + send_event_to_self(connected), ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), State#pstate{conninfo = NConnInfo, connected = true @@ -703,6 +725,9 @@ ensure_disconnected(Reason, State = #pstate{conninfo = ConnInfo, clientinfo = Cl ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), State#pstate{conninfo = NConnInfo, connected = false}. +send_event_to_self(Name) -> + self() ! {event, Name}, ok. + run_hooks(Name, Args) -> emqx_hooks:run(Name, Args).