From cad4eff28328d2bac9bac010b2f9d7c5331fba0f Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Tue, 11 Jan 2022 11:24:56 +0100 Subject: [PATCH] refactor(persistent_session): Decorate API calls --- apps/emqx/src/emqx_persistent_session.erl | 4 +- apps/emqx/src/emqx_rpc.erl | 10 +++++ .../emqx_persistent_session_proto_v1.erl | 40 +++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl diff --git a/apps/emqx/src/emqx_persistent_session.erl b/apps/emqx/src/emqx_persistent_session.erl index e55fef4f6..e625b8fbd 100644 --- a/apps/emqx/src/emqx_persistent_session.erl +++ b/apps/emqx/src/emqx_persistent_session.erl @@ -309,11 +309,11 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> {Session2, Pendings4 ++ WriterPendings}. resume_begin(Nodes, SessionID) -> - Res = erpc:multicall(Nodes, emqx_session_router, resume_begin, [self(), SessionID]), + Res = emqx_persistent_session_proto_v1:resume_begin(Nodes, self(), SessionID), [{Node, Marker} || {{ok, {ok, Marker}}, Node} <- lists:zip(Res, Nodes)]. resume_end(Nodes, SessionID) -> - Res = erpc:multicall(Nodes, emqx_session_router, resume_end, [self(), SessionID]), + Res = emqx_persistent_session_proto_v1:resume_end(Nodes, self(), SessionID), ?tp(ps_erpc_multical_result, #{ res => Res, sid => SessionID }), %% TODO: Should handle the errors [ {deliver, STopic, M} diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index 1c1a921f6..91e796725 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -31,6 +31,8 @@ , call_result/0 , cast_result/0 , multicall_result/0 + , erpc/1 + , erpc_multicast/1 ]). -compile({inline, @@ -48,6 +50,14 @@ -type multicall_result() :: {_Results :: [term()], _BadNodes :: [node()]}. +-type erpc(Ret) :: {ok, Ret} + | {throw, _Err} + | {exit, {exception | signal, _Reason}} + | {error, {exception, _Reason, _Stack :: list()}} + | {error, {erpc, _Reason}}. + +-type erpc_multicast(Ret) :: [erpc(Ret)]. + -spec call(node(), module(), atom(), list()) -> call_result(). call(Node, Mod, Fun, Args) -> filter_result(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)). diff --git a/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl new file mode 100644 index 000000000..b1baf4334 --- /dev/null +++ b/apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl @@ -0,0 +1,40 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + , resume_begin/3 + , resume_end/3 + ]). + +-include("bpapi.hrl"). +-include("emqx.hrl"). + +introduced_in() -> + "5.0.0". + +-spec resume_begin([node()], pid(), binary()) -> + emqx_rpc:erpc_multicall([{node(), emqx_guid:guid()}]). +resume_begin(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) -> + erpc:multicall(Nodes, emqx_session_router, resume_begin, [Pid, SessionID]). + +-spec resume_end([node()], pid(), binary()) -> + emqx_rpc:erpc_multicall({'ok', [emqx_types:message()]} | {'error', term()}). +resume_end(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) -> + erpc:multicall(Nodes, emqx_session_router, resume_end, [Pid, SessionID]).