From ec72513cf8f92c896b50d19c57eb9539aa708502 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 30 Jul 2021 11:29:19 +0800 Subject: [PATCH] feat(gw-stomp): call unsubscribe hook --- apps/emqx_gateway/README.md | 1 + .../src/stomp/emqx_stomp_channel.erl | 40 +++++++++++++------ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/apps/emqx_gateway/README.md b/apps/emqx_gateway/README.md index f8b825eeb..4d39190c8 100644 --- a/apps/emqx_gateway/README.md +++ b/apps/emqx_gateway/README.md @@ -38,6 +38,7 @@ Gateway v0.2: "Integration & Friendly Management" - The Concept Review Gateway v0.3: "Fault tolerance and high availability" + - A common session modoule for message delivery policy - The restart mechanism for gateway-instance - Consistency of cluster state - Configuration hot update diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 6c1d678e1..ef6e21e66 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -59,6 +59,8 @@ conninfo :: emqx_types:conninfo(), %% Stomp Client Info clientinfo :: emqx_types:clientinfo(), + %% Session + session :: undefined | map(), %% ClientInfo override specs clientinfo_override :: map(), %% Connection Channel @@ -303,12 +305,12 @@ process_connect(Channel = #channel{ ConnInfo, SessFun ) of - {ok, _Sess} -> %% The stomp protocol doesn't have session + {ok, #{session := Session}} -> #{proto_ver := Version} = ConnInfo, #{heartbeat := Heartbeat} = ClientInfo, Headers = [{<<"version">>, Version}, {<<"heart-beat">>, reverse_heartbeats(Heartbeat)}], - handle_out(connected, Headers, Channel); + handle_out(connected, Headers, Channel#channel{session = Session}); {error, Reason} -> ?LOG(error, "Failed to open session du to ~p", [Reason]), Headers = [{<<"version">>, <<"1.0,1.1,1.2">>}, @@ -406,15 +408,26 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers), end; handle_in(?PACKET(?CMD_UNSUBSCRIBE, Headers), - Channel = #channel{subscriptions = Subs}) -> + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo + = #{mountpoint := Mountpoint}, + subscriptions = Subs}) -> SubId = header(<<"id">>, Headers), - {ok, NChannel} = case lists:keyfind(SubId, 1, Subs) of - {SubId, Topic, _Ack} -> - ok = emqx_broker:unsubscribe(Topic), - {ok, Channel#channel{subscriptions = lists:keydelete(SubId, 1, Subs)}}; - false -> - {ok, Channel} - end, + {ok, NChannel} = + case lists:keyfind(SubId, 1, Subs) of + {SubId, MountedTopic, _Ack} -> + Topic = emqx_mountpoint:unmount(Mountpoint, MountedTopic), + %% XXX: eval the return topics? + _ = run_hooks(Ctx, 'client.unsubscribe', + [ClientInfo, #{}], [{Topic, #{}}]), + ok = emqx_broker:unsubscribe(MountedTopic), + _ = run_hooks(Ctx, 'session.unsubscribe', + [ClientInfo, MountedTopic, #{}]), + {ok, Channel#channel{subscriptions = lists:keydelete(SubId, 1, Subs)}}; + false -> + {ok, Channel} + end, handle_out(receipt, receipt_id(Headers), NChannel); %% XXX: How to ack a frame ??? @@ -827,8 +840,11 @@ handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) -> %% Terminate %%-------------------------------------------------------------------- -terminate(_Reason, _Channel) -> - ok. +terminate(Reason, #channel{ + ctx = Ctx, + session = Session, + clientinfo = ClientInfo}) -> + run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]). reply(Reply, Channel) -> {reply, Reply, Channel}.