feat(gw-stomp): call unsubscribe hook

This commit is contained in:
JianBo He 2021-07-30 11:29:19 +08:00
parent f87bef9ffb
commit ec72513cf8
2 changed files with 29 additions and 12 deletions

View File

@ -38,6 +38,7 @@ Gateway v0.2: "Integration & Friendly Management"
- The Concept Review - The Concept Review
Gateway v0.3: "Fault tolerance and high availability" Gateway v0.3: "Fault tolerance and high availability"
- A common session modoule for message delivery policy
- The restart mechanism for gateway-instance - The restart mechanism for gateway-instance
- Consistency of cluster state - Consistency of cluster state
- Configuration hot update - Configuration hot update

View File

@ -59,6 +59,8 @@
conninfo :: emqx_types:conninfo(), conninfo :: emqx_types:conninfo(),
%% Stomp Client Info %% Stomp Client Info
clientinfo :: emqx_types:clientinfo(), clientinfo :: emqx_types:clientinfo(),
%% Session
session :: undefined | map(),
%% ClientInfo override specs %% ClientInfo override specs
clientinfo_override :: map(), clientinfo_override :: map(),
%% Connection Channel %% Connection Channel
@ -303,12 +305,12 @@ process_connect(Channel = #channel{
ConnInfo, ConnInfo,
SessFun SessFun
) of ) of
{ok, _Sess} -> %% The stomp protocol doesn't have session {ok, #{session := Session}} ->
#{proto_ver := Version} = ConnInfo, #{proto_ver := Version} = ConnInfo,
#{heartbeat := Heartbeat} = ClientInfo, #{heartbeat := Heartbeat} = ClientInfo,
Headers = [{<<"version">>, Version}, Headers = [{<<"version">>, Version},
{<<"heart-beat">>, reverse_heartbeats(Heartbeat)}], {<<"heart-beat">>, reverse_heartbeats(Heartbeat)}],
handle_out(connected, Headers, Channel); handle_out(connected, Headers, Channel#channel{session = Session});
{error, Reason} -> {error, Reason} ->
?LOG(error, "Failed to open session du to ~p", [Reason]), ?LOG(error, "Failed to open session du to ~p", [Reason]),
Headers = [{<<"version">>, <<"1.0,1.1,1.2">>}, Headers = [{<<"version">>, <<"1.0,1.1,1.2">>},
@ -406,15 +408,26 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers),
end; end;
handle_in(?PACKET(?CMD_UNSUBSCRIBE, Headers), handle_in(?PACKET(?CMD_UNSUBSCRIBE, Headers),
Channel = #channel{subscriptions = Subs}) -> Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo
= #{mountpoint := Mountpoint},
subscriptions = Subs}) ->
SubId = header(<<"id">>, Headers), SubId = header(<<"id">>, Headers),
{ok, NChannel} = case lists:keyfind(SubId, 1, Subs) of {ok, NChannel} =
{SubId, Topic, _Ack} -> case lists:keyfind(SubId, 1, Subs) of
ok = emqx_broker:unsubscribe(Topic), {SubId, MountedTopic, _Ack} ->
{ok, Channel#channel{subscriptions = lists:keydelete(SubId, 1, Subs)}}; Topic = emqx_mountpoint:unmount(Mountpoint, MountedTopic),
false -> %% XXX: eval the return topics?
{ok, Channel} _ = run_hooks(Ctx, 'client.unsubscribe',
end, [ClientInfo, #{}], [{Topic, #{}}]),
ok = emqx_broker:unsubscribe(MountedTopic),
_ = run_hooks(Ctx, 'session.unsubscribe',
[ClientInfo, MountedTopic, #{}]),
{ok, Channel#channel{subscriptions = lists:keydelete(SubId, 1, Subs)}};
false ->
{ok, Channel}
end,
handle_out(receipt, receipt_id(Headers), NChannel); handle_out(receipt, receipt_id(Headers), NChannel);
%% XXX: How to ack a frame ??? %% XXX: How to ack a frame ???
@ -827,8 +840,11 @@ handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) ->
%% Terminate %% Terminate
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
terminate(_Reason, _Channel) -> terminate(Reason, #channel{
ok. ctx = Ctx,
session = Session,
clientinfo = ClientInfo}) ->
run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]).
reply(Reply, Channel) -> reply(Reply, Channel) ->
{reply, Reply, Channel}. {reply, Reply, Channel}.