fix(stomp): support pub/sub operations

This commit is contained in:
JianBo He 2021-11-02 13:24:21 +08:00
parent d2924e82ab
commit 7734d6969c
2 changed files with 88 additions and 46 deletions

View File

@ -349,8 +349,7 @@ handle_info({deliver, _Topic, Msg}, State = #state{pstate = PState}) ->
end});
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
noreply(State).
with_proto(handle_info, [Info], State).
terminate(Reason, #state{transport = Transport,
socket = Sock,
@ -374,6 +373,8 @@ code_change(_OldVsn, State, _Extra) ->
with_proto(Fun, Args, State = #state{pstate = PState}) ->
case erlang:apply(emqx_stomp_protocol, Fun, Args ++ [PState]) of
ok ->
noreply(State);
{ok, NPState} ->
noreply(State#state{pstate = NPState});
{F, Reason, NPState} when F == stop;

View File

@ -40,6 +40,9 @@
, timeout/3
]).
-export([ handle_info/2
]).
%% for trans callback
-export([ handle_recv_send_frame/2
, handle_recv_ack_frame/2
@ -60,7 +63,7 @@
%% Transaction
transaction :: #{binary() => list()},
%% Subscriptions
subscriptions = [],
subscriptions = #{},
%% Send function
sendfun :: function(),
%% Heartbeat function
@ -109,7 +112,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
sockport => SockPort,
clientid => undefined,
username => undefined,
mountpoint => undefined,
mountpoint => undefined, %% XXX: not supported now
is_bridge => false,
is_superuser => false
},
@ -183,10 +186,7 @@ info(will_msg, _) ->
undefined.
session_info(#pstate{conninfo = ConnInfo, subscriptions = Subs}) ->
NSubs = lists:foldl(fun({_Id, Topic, _Ack}, Acc) ->
Acc#{Topic => ?DEFAULT_SUBOPTS}
end, #{}, Subs),
#{subscriptions => NSubs,
#{subscriptions => Subs,
upgrade_qos => false,
retry_interval => 0,
await_rel_timeout => 0,
@ -195,7 +195,7 @@ session_info(#pstate{conninfo = ConnInfo, subscriptions = Subs}) ->
-spec stats(pstate()) -> emqx_types:stats().
stats(#pstate{subscriptions = Subs}) ->
[{subscriptions_cnt, length(Subs)},
[{subscriptions_cnt, maps:size(Subs)},
{subscriptions_max, 0},
{inflight_cnt, 0},
{inflight_max, 0},
@ -284,29 +284,23 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
Topic = header(<<"destination">>, Headers),
Ack = header(<<"ack">>, Headers, <<"auto">>),
case lists:keyfind(Id, 1, Subs) of
{Id, Topic, Ack} ->
case find_sub_by_id(Id, Subs) of
{Topic, #{sub_props := #{id := Id, ack := Ack}}} ->
?LOG(info, "Subscription has established: ~s", [Topic]),
maybe_send_receipt(receipt_id(Headers), State);
false ->
undefined ->
case check_acl(subscribe, Topic, State) of
allow ->
ClientInfo = State#pstate.clientinfo,
ClientId = maps:get(clientid, ClientInfo),
%% XXX: We don't parse the request topic name or filter
%% which meaning stomp does not support shared
%% subscription
[{TopicFilter, SubOpts}] = parse_topic_filters(
[{Topic, ?DEFAULT_SUBOPTS}
]),
NSubOpts = SubOpts#{sub_props => #{id => Id, ack => Ack}},
_ = run_hooks('client.subscribe',
[ClientInfo, _SubProps = #{}],
[{Topic, _TopicOpts = #{}}]),
emqx_broker:subscribe(Topic, ClientId),
SubOpts = ?DEFAULT_SUBOPTS#{is_new => true},
_ = run_hooks('session.subscribed',
[ClientInfo, Topic, SubOpts]),
NState = put_subs({Id, Topic, Ack}, State),
[{TopicFilter, NSubOpts}]),
NState = do_subscribe(TopicFilter, NSubOpts, State),
maybe_send_receipt(receipt_id(Headers), NState)
end
end;
@ -314,22 +308,17 @@ received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
State = #pstate{subscriptions = Subs, clientinfo = ClientInfo}) ->
Id = header(<<"id">>, Headers),
{ok, State1} = case lists:keyfind(Id, 1, Subs) of
{Id, Topic, _Ack} ->
{ok, NState} = case find_sub_by_id(Id, Subs) of
{Topic, #{sub_props := #{id := Id}}} ->
_ = run_hooks('client.unsubscribe',
[ClientInfo, #{}],
[{Topic, #{}}]),
ok = emqx_broker:unsubscribe(Topic),
_ = run_hooks('session.unsubscribe',
[ClientInfo, Topic, ?DEFAULT_SUBOPTS]),
{ok, remove_subs(Id, State)};
false ->
State1 = do_unsubscribe(Topic, ?DEFAULT_SUBOPTS, State),
{ok, State1};
undefined ->
{ok, State}
end,
maybe_send_receipt(receipt_id(Headers), State1);
maybe_send_receipt(receipt_id(Headers), NState);
%% ACK
%% id:12345
@ -407,8 +396,8 @@ received(#stomp_frame{command = <<"DISCONNECT">>, headers = Headers}, State) ->
send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
State = #pstate{subscriptions = Subs}) ->
case lists:keyfind(Topic, 2, Subs) of
{Id, Topic, Ack} ->
case find_sub_by_topic(Topic, Subs) of
{Topic, #{sub_props := #{id := Id, ack := Ack}}} ->
Headers0 = [{<<"subscription">>, Id},
{<<"message-id">>, next_msgid()},
{<<"destination">>, Topic},
@ -423,7 +412,7 @@ send(Msg = #message{topic = Topic, headers = Headers, payload = Payload},
headers = Headers1 ++ maps:get(stomp_headers, Headers, []),
body = Payload},
send(Frame, State);
false ->
undefined ->
?LOG(error, "Stomp dropped: ~p", [Msg]),
{error, dropped, State}
end;
@ -466,6 +455,27 @@ timeout(_TRef, clean_trans, State = #pstate{transaction = Trans}) ->
NTrans = maps:filter(fun(_, {Ts, _}) -> Ts + ?TRANS_TIMEOUT < Now end, Trans),
{ok, ensure_clean_trans_timer(State#pstate{transaction = NTrans})}.
-spec(handle_info(Info :: term(), pstate())
-> ok | {ok, pstate()} | {shutdown, Reason :: term(), pstate()}).
handle_info({subscribe, TopicFilters}, State) ->
NState = lists:foldl(
fun({TopicFilter, SubOpts}, StateAcc) ->
do_subscribe(TopicFilter, SubOpts#{is_new => true}, StateAcc)
end, State, parse_topic_filters(TopicFilters)),
{ok, NState};
handle_info({unsubscribe, TopicFilters}, State) ->
NState = lists:foldl(fun({TopicFilter, SubOpts}, StateAcc) ->
do_unsubscribe(TopicFilter, SubOpts, StateAcc)
end, State, parse_topic_filters(TopicFilters)),
{ok, NState};
handle_info(Info, State) ->
?LOG(warning, "Unexpected info ~p", [Info]),
{ok, State}.
negotiate_version(undefined) ->
{ok, <<"1.0">>};
negotiate_version(Accepts) ->
@ -612,7 +622,10 @@ start_heartbeart_timer(Heartbeats, State) ->
State#pstate{heart_beats = emqx_stomp_heartbeat:init(Heartbeats)}).
%%--------------------------------------------------------------------
%% ...
%% pub & sub helpers
parse_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) ->
case is_acl_enabled(State) andalso
@ -621,19 +634,47 @@ check_acl(PubSub, Topic, State = #pstate{clientinfo = ClientInfo}) ->
Res -> Res
end.
put_subs({Id, Topic, Ack}, State = #pstate{subscriptions = Subs}) ->
State#pstate{subscriptions = lists:keystore(Id, 1, Subs, {Id, Topic, Ack})}.
do_subscribe(TopicFilter, SubOpts,
State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
ClientId = maps:get(clientid, ClientInfo),
_ = emqx_broker:subscribe(TopicFilter, ClientId),
NSubOpts = SubOpts#{is_new => true},
_ = run_hooks('session.subscribed',
[ClientInfo, TopicFilter, NSubOpts]),
remove_subs(Id, State = #pstate{subscriptions = Subs}) ->
State#pstate{subscriptions = lists:keydelete(Id, 1, Subs)}.
State#pstate{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}.
%%--------------------------------------------------------------------
%% ...
do_unsubscribe(TopicFilter, SubOpts,
State = #pstate{clientinfo = ClientInfo, subscriptions = Subs}) ->
ok = emqx_broker:unsubscribe(TopicFilter),
_ = run_hooks('session.unsubscribe',
[ClientInfo, TopicFilter, SubOpts]),
State#pstate{subscriptions = maps:remove(TopicFilter, Subs)}.
find_sub_by_topic(Topic, Subs) ->
case maps:get(Topic, Subs, undefined) of
undefined -> undefined;
SubOpts -> {Topic, SubOpts}
end.
find_sub_by_id(Id, Subs) ->
Found = maps:filter(fun(_, SubOpts) ->
%% FIXME: datatype??
maps:get(id, maps:get(sub_props, SubOpts, #{}), -1) == Id
end, Subs),
case maps:to_list(Found) of
[] -> undefined;
[Sub|_] -> Sub
end.
is_acl_enabled(_) ->
%% TODO: configs from somewhere
true.
%%--------------------------------------------------------------------
%% helpers
default_user(#pstate{default_user = DefaultUser}) ->
DefaultUser.
allow_anonymous(#pstate{allow_anonymous = AllowAnonymous}) ->