From c42c1e698a88fddc6377b549664a3dc1edfa5dea Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 3 Sep 2021 14:56:00 +0800 Subject: [PATCH] chore(gw): add From param for _channel:handle_call/3 --- .../src/bhvrs/emqx_gateway_channel.erl | 8 +++- .../src/bhvrs/emqx_gateway_conn.erl | 14 +++++-- .../src/coap/emqx_coap_channel.erl | 4 +- .../src/exproto/emqx_exproto_channel.erl | 31 +++++++-------- .../src/lwm2m/emqx_lwm2m_channel.erl | 4 +- .../src/mqttsn/emqx_sn_channel.erl | 33 ++++++++-------- .../src/stomp/emqx_stomp_channel.erl | 38 +++++++++---------- 7 files changed, 72 insertions(+), 60 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl index 06efe4fd0..c4fd114e4 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl @@ -44,6 +44,8 @@ -type conn_state() :: idle | connecting | connected | disconnected | atom(). +-type gen_server_from() :: {pid(), Tag :: term()}. + -type reply() :: {outgoing, emqx_gateway_frame:packet()} | {outgoing, [emqx_gateway_frame:packet()]} | {event, conn_state() | updated} @@ -71,11 +73,13 @@ | {shutdown, Reason :: any(), channel()}. %% @doc Handle the custom gen_server:call/2 for its connection process --callback handle_call(Req :: any(), channel()) +-callback handle_call(Req :: any(), From :: gen_server_from(), channel()) -> {reply, Reply :: any(), channel()} %% Reply to caller and trigger an event(s) | {reply, Reply :: any(), - EventOrEvents:: tuple() | list(tuple()), channel()} + EventOrEvents :: tuple() | list(tuple()), channel()} + | {noreply, channel()} + | {noreply, EventOrEvents :: tuple() | list(tuple()), channel()} | {shutdown, Reason :: any(), Reply :: any(), channel()} %% Shutdown the process, reply to caller and write a packet to client | {shutdown, Reason :: any(), Reply :: any(), diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 528f8dfd8..51bcbd358 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -394,6 +394,10 @@ append_msg(Q, Msg) -> handle_msg({'$gen_call', From, Req}, State) -> case handle_call(From, Req, State) of + {noreply, NState} -> + {ok, NState}; + {noreply, Msgs, NState} -> + {ok, next_msgs(Msgs), NState}; {reply, Reply, NState} -> gen_server:reply(From, Reply), {ok, NState}; @@ -545,10 +549,14 @@ handle_call(_From, info, State) -> handle_call(_From, stats, State) -> {reply, stats(State), State}; -handle_call(_From, Req, State = #state{ +handle_call(From, Req, State = #state{ chann_mod = ChannMod, channel = Channel}) -> - case ChannMod:handle_call(Req, Channel) of + case ChannMod:handle_call(Req, From, Channel) of + {noreply, NChannel} -> + {noreply, State#state{channel = NChannel}}; + {noreply, Msgs, NChannel} -> + {noreply, Msgs, State#state{channel = NChannel}}; {reply, Reply, NChannel} -> {reply, Reply, State#state{channel = NChannel}}; {reply, Reply, Msgs, NChannel} -> @@ -559,8 +567,6 @@ handle_call(_From, Req, State = #state{ NState = State#state{channel = NChannel}, ok = handle_outgoing(Packet, NState), shutdown(Reason, Reply, NState) - - end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index 24f06549b..6e554b9ef 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -34,7 +34,7 @@ , terminate/2 ]). --export([ handle_call/2 +-export([ handle_call/3 , handle_cast/2 , handle_info/2 ]). @@ -165,7 +165,7 @@ handle_timeout(_, _, Channel) -> %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- -handle_call(Req, Channel) -> +handle_call(Req, _From, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, Channel}. diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index ace9a7be5..3de231958 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -31,7 +31,7 @@ , handle_in/2 , handle_deliver/2 , handle_timeout/3 - , handle_call/2 + , handle_call/3 , handle_cast/2 , handle_info/2 , terminate/2 @@ -243,23 +243,24 @@ handle_timeout(_TRef, Msg, Channel) -> ?WARN("Unexpected timeout: ~p", [Msg]), {ok, Channel}. --spec handle_call(any(), channel()) +-spec handle_call(Req :: any(), From :: any(), channel()) -> {reply, Reply :: term(), channel()} | {reply, Reply :: term(), replies(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()}. -handle_call({send, Data}, Channel) -> +handle_call({send, Data}, _From, Channel) -> {reply, ok, [{outgoing, Data}], Channel}; -handle_call(close, Channel = #channel{conn_state = connected}) -> +handle_call(close, _From, Channel = #channel{conn_state = connected}) -> {reply, ok, [{event, disconnected}, {close, normal}], Channel}; -handle_call(close, Channel) -> +handle_call(close, _From, Channel) -> {reply, ok, [{close, normal}], Channel}; -handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) -> +handle_call({auth, ClientInfo, _Password}, _From, + Channel = #channel{conn_state = connected}) -> ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; -handle_call({auth, ClientInfo0, Password}, +handle_call({auth, ClientInfo0, Password}, _From, Channel = #channel{ ctx = Ctx, conninfo = ConnInfo, @@ -300,7 +301,7 @@ handle_call({auth, ClientInfo0, Password}, {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel} end; -handle_call({start_timer, keepalive, Interval}, +handle_call({start_timer, keepalive, Interval}, _From, Channel = #channel{ conninfo = ConnInfo, clientinfo = ClientInfo @@ -310,7 +311,7 @@ handle_call({start_timer, keepalive, Interval}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, {reply, ok, ensure_keepalive(NChannel)}; -handle_call({subscribe_from_client, TopicFilter, Qos}, +handle_call({subscribe_from_client, TopicFilter, Qos}, _From, Channel = #channel{ ctx = Ctx, conn_state = connected, @@ -323,20 +324,20 @@ handle_call({subscribe_from_client, TopicFilter, Qos}, {reply, ok, NChannel} end; -handle_call({subscribe, Topic, SubOpts}, Channel) -> +handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> {ok, NChannel} = do_subscribe([{Topic, SubOpts}], Channel), {reply, ok, NChannel}; -handle_call({unsubscribe_from_client, TopicFilter}, +handle_call({unsubscribe_from_client, TopicFilter}, _From, Channel = #channel{conn_state = connected}) -> {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel), {reply, ok, NChannel}; -handle_call({unsubscribe, Topic}, Channel) -> +handle_call({unsubscribe, Topic}, _From, Channel) -> {ok, NChannel} = do_unsubscribe([Topic], Channel), {reply, ok, NChannel}; -handle_call({publish, Topic, Qos, Payload}, +handle_call({publish, Topic, Qos, Payload}, _From, Channel = #channel{ ctx = Ctx, conn_state = connected, @@ -353,10 +354,10 @@ handle_call({publish, Topic, Qos, Payload}, {reply, ok, Channel} end; -handle_call(kick, Channel) -> +handle_call(kick, _From, Channel) -> {shutdown, kicked, ok, Channel}; -handle_call(Req, Channel) -> +handle_call(Req, _From, Channel) -> ?LOG(warning, "Unexpected call: ~p", [Req]), {reply, {error, unexpected_call}, Channel}. diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index 80078407b..b67032313 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -35,7 +35,7 @@ , terminate/2 ]). --export([ handle_call/2 +-export([ handle_call/3 , handle_cast/2 , handle_info/2 ]). @@ -152,7 +152,7 @@ handle_timeout(_, _, Channel) -> %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- -handle_call(Req, Channel) -> +handle_call(Req, _From, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, Channel}. diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 2834c27f6..d9576c253 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -38,7 +38,7 @@ , set_conn_state/2 ]). --export([ handle_call/2 +-export([ handle_call/3 , handle_cast/2 , handle_info/2 ]). @@ -1113,12 +1113,13 @@ message_to_packet(MsgId, Message, %% Handle call %%-------------------------------------------------------------------- --spec handle_call(Req :: term(), channel()) - -> {reply, Reply :: term(), channel()} - | {shutdown, Reason :: term(), Reply :: term(), channel()} - | {shutdown, Reason :: term(), Reply :: term(), - emqx_types:packet(), channel()}. -handle_call({subscribe, Topic, SubOpts}, Channel) -> +-spec handle_call(Req :: term(), From :: term(), channel()) + -> {reply, Reply :: term(), channel()} + | {reply, Reply :: term(), replies(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), + emqx_types:packet(), channel()}. +handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> %% XXX: Only support short_topic_name SubProps = maps:get(sub_props, SubOpts, #{}), case maps:get(subtype, SubProps, short_topic_name) of @@ -1141,26 +1142,26 @@ handle_call({subscribe, Topic, SubOpts}, Channel) -> reply({error, only_support_short_name_topic}, Channel) end; -handle_call({unsubscribe, Topic}, Channel) -> +handle_call({unsubscribe, Topic}, _From, Channel) -> TopicFilters = [emqx_topic:parse(Topic)], {ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel), reply(ok, NChannel); -handle_call(subscriptions, Channel = #channel{session = Session}) -> +handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> reply(maps:to_list(emqx_session:info(subscriptions, Session)), Channel); -handle_call(kick, Channel) -> +handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), shutdown_and_reply(kicked, ok, NChannel); -handle_call(discard, Channel) -> +handle_call(discard, _From, Channel) -> shutdown_and_reply(discarded, ok, Channel); %% XXX: No Session Takeover -%handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> +%handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) -> % reply(Session, Channel#channel{takeover = true}); % -%handle_call({takeover, 'end'}, Channel = #channel{session = Session, +%handle_call({takeover, 'end'}, _From, Channel = #channel{session = Session, % pendings = Pendings}) -> % ok = emqx_session:takeover(Session), % %% TODO: Should not drain deliver here (side effect) @@ -1168,16 +1169,16 @@ handle_call(discard, Channel) -> % AllPendings = lists:append(Delivers, Pendings), % shutdown_and_reply(takeovered, AllPendings, Channel); -%handle_call(list_authz_cache, Channel) -> +%handle_call(list_authz_cache, _From, Channel) -> % {reply, emqx_authz_cache:list_authz_cache(), Channel}; %% XXX: No Quota Now -% handle_call({quota, Policy}, Channel) -> +% handle_call({quota, Policy}, _From, Channel) -> % Zone = info(zone, Channel), % Quota = emqx_limiter:init(Zone, Policy), % reply(ok, Channel#channel{quota = Quota}); -handle_call(Req, Channel) -> +handle_call(Req, _From, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), reply(ignored, Channel). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 9bd2dac1b..673535ebd 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -39,7 +39,7 @@ , set_conn_state/2 ]). --export([ handle_call/2 +-export([ handle_call/3 , handle_cast/2 , handle_info/2 ]). @@ -586,10 +586,10 @@ do_subscribe([{ParsedTopic, SubOpts0}|More], %%-------------------------------------------------------------------- -spec(handle_out(atom(), term(), channel()) - -> {ok, channel()} - | {ok, replies(), channel()} - | {shutdown, Reason :: term(), channel()} - | {shutdown, Reason :: term(), replies(), channel()}). + -> {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()} + | {shutdown, Reason :: term(), replies(), channel()}). handle_out(connerr, {Headers, ReceiptId, ErrMsg}, Channel) -> Frame = error_frame(Headers, ReceiptId, ErrMsg), @@ -620,11 +620,12 @@ handle_out(receipt, ReceiptId, Channel) -> %% Handle call %%-------------------------------------------------------------------- --spec(handle_call(Req :: term(), channel()) - -> {reply, Reply :: term(), channel()} - | {shutdown, Reason :: term(), Reply :: term(), channel()} - | {shutdown, Reason :: term(), Reply :: term(), stomp_frame(), channel()}). -handle_call({subscribe, Topic, SubOpts}, +-spec(handle_call(Req :: term(), From :: term(), channel()) + -> {reply, Reply :: term(), channel()} + | {reply, Reply :: term(), replies(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), stomp_frame(), channel()}). +handle_call({subscribe, Topic, SubOpts}, _From, Channel = #channel{ subscriptions = Subs }) -> @@ -653,7 +654,7 @@ handle_call({subscribe, Topic, SubOpts}, end end; -handle_call({unsubscribe, Topic}, +handle_call({unsubscribe, Topic}, _From, Channel = #channel{ ctx = Ctx, clientinfo = ClientInfo = #{mountpoint := Mountpoint}, @@ -670,27 +671,27 @@ handle_call({unsubscribe, Topic}, ); %% Reply :: [{emqx_types:topic(), emqx_types:subopts()}] -handle_call(subscriptions, Channel = #channel{subscriptions = Subs}) -> +handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) -> Reply = lists:map( fun({_SubId, Topic, _Ack, SubOpts}) -> {Topic, SubOpts} end, Subs), reply(Reply, Channel); -handle_call(kick, Channel) -> +handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), Frame = error_frame(undefined, <<"Kicked out">>), shutdown_and_reply(kicked, ok, Frame, NChannel); -handle_call(discard, Channel) -> +handle_call(discard, _From, Channel) -> Frame = error_frame(undefined, <<"Discarded">>), shutdown_and_reply(discarded, ok, Frame, Channel); %% XXX: No Session Takeover -%handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> +%handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) -> % reply(Session, Channel#channel{takeover = true}); % -%handle_call({takeover, 'end'}, Channel = #channel{session = Session, +%handle_call({takeover, 'end'}, _From, Channel = #channel{session = Session, % pendings = Pendings}) -> % ok = emqx_session:takeover(Session), % %% TODO: Should not drain deliver here (side effect) @@ -698,7 +699,7 @@ handle_call(discard, Channel) -> % AllPendings = lists:append(Delivers, Pendings), % shutdown_and_reply(takeovered, AllPendings, Channel); -handle_call(list_authz_cache, Channel) -> +handle_call(list_authz_cache, _From, Channel) -> %% This won't work {reply, emqx_authz_cache:list_authz_cache(), Channel}; @@ -708,11 +709,10 @@ handle_call(list_authz_cache, Channel) -> % Quota = emqx_limiter:init(Zone, Policy), % reply(ok, Channel#channel{quota = Quota}); -handle_call(Req, Channel) -> +handle_call(Req, _From, Channel) -> ?LOG(error, "Unexpected call: ~p", [Req]), reply(ignored, Channel). - %%-------------------------------------------------------------------- %% Handle cast %%--------------------------------------------------------------------