refactor(mqttsn): takeover can resume the registrations of session
This commit is contained in:
parent
3c6afee690
commit
ac6693c8cc
|
@ -389,7 +389,7 @@ open_session(
|
|||
end,
|
||||
case takeover_session(GwName, ClientId) of
|
||||
{ok, ConnMod, ChanPid, Session} ->
|
||||
ok = emqx_session:resume(ClientInfo, Session),
|
||||
ok = SessionMod:resume(ClientInfo, Session),
|
||||
case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
|
||||
{ok, Pendings} ->
|
||||
register_channel(
|
||||
|
|
|
@ -51,8 +51,6 @@
|
|||
-record(channel, {
|
||||
%% Context
|
||||
ctx :: emqx_gateway_ctx:context(),
|
||||
%% Registry
|
||||
registry :: emqx_mqttsn_registry:registry(),
|
||||
%% Gateway Id
|
||||
gateway_id :: integer(),
|
||||
%% Enable negative_qos
|
||||
|
@ -62,7 +60,7 @@
|
|||
%% MQTT-SN Client Info
|
||||
clientinfo :: emqx_types:clientinfo(),
|
||||
%% Session
|
||||
session :: emqx_session:session() | undefined,
|
||||
session :: emqx_mqttsn_session:session() | undefined,
|
||||
%% Keepalive
|
||||
keepalive :: emqx_keepalive:keepalive() | undefined,
|
||||
%% Will Msg
|
||||
|
@ -147,7 +145,6 @@ init(
|
|||
) ->
|
||||
Peercert = maps:get(peercert, ConnInfo, undefined),
|
||||
Mountpoint = maps:get(mountpoint, Option, undefined),
|
||||
Registry = maps:get(registry, Option),
|
||||
GwId = maps:get(gateway_id, Option),
|
||||
EnableNegQoS = maps:get(enable_qos3, Option, true),
|
||||
ListenerId =
|
||||
|
@ -180,7 +177,6 @@ init(
|
|||
),
|
||||
#channel{
|
||||
ctx = Ctx,
|
||||
registry = Registry,
|
||||
gateway_id = GwId,
|
||||
enable_negative_qos = EnableNegQoS,
|
||||
conninfo = ConnInfo,
|
||||
|
@ -217,7 +213,7 @@ info(conn_state, #channel{conn_state = ConnState}) ->
|
|||
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
||||
ClientInfo;
|
||||
info(session, #channel{session = Session}) ->
|
||||
emqx_utils:maybe_apply(fun emqx_session:info/1, Session);
|
||||
emqx_utils:maybe_apply(fun emqx_mqttsn_session:info/1, Session);
|
||||
info(will_msg, #channel{will_msg = WillMsg}) ->
|
||||
WillMsg;
|
||||
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
|
||||
|
@ -229,7 +225,7 @@ info(ctx, #channel{ctx = Ctx}) ->
|
|||
stats(#channel{session = undefined}) ->
|
||||
[];
|
||||
stats(#channel{session = Session}) ->
|
||||
emqx_session:stats(Session).
|
||||
emqx_mqttsn_session:stats(Session).
|
||||
|
||||
set_conn_state(ConnState, Channel) ->
|
||||
Channel#channel{conn_state = ConnState}.
|
||||
|
@ -388,19 +384,15 @@ process_connect(
|
|||
clientinfo = ClientInfo
|
||||
}
|
||||
) ->
|
||||
SessFun = fun(ClientInfoT, _) ->
|
||||
Conf = emqx_cm:get_session_confs(
|
||||
ClientInfoT, #{receive_maximum => 1, expiry_interval => 0}
|
||||
),
|
||||
emqx_session:init(Conf)
|
||||
end,
|
||||
SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT) end,
|
||||
case
|
||||
emqx_gateway_ctx:open_session(
|
||||
Ctx,
|
||||
CleanStart,
|
||||
ClientInfo,
|
||||
ConnInfo,
|
||||
SessFun
|
||||
SessFun,
|
||||
_SessMod = emqx_mqttsn_session
|
||||
)
|
||||
of
|
||||
{ok, #{
|
||||
|
@ -470,7 +462,7 @@ handle_in(
|
|||
MsgId,
|
||||
Data
|
||||
),
|
||||
Channel = #channel{conn_state = idle, registry = Registry}
|
||||
Channel = #channel{conn_state = idle}
|
||||
) ->
|
||||
case check_negative_qos_enable(Publish, Channel) of
|
||||
ok ->
|
||||
|
@ -479,6 +471,7 @@ handle_in(
|
|||
?SN_SHORT_TOPIC ->
|
||||
TopicId;
|
||||
?SN_PREDEFINED_TOPIC ->
|
||||
Registry = emqx_mqttsn_registry:init(),
|
||||
emqx_mqttsn_registry:lookup_topic(TopicId, Registry);
|
||||
_ ->
|
||||
undefined
|
||||
|
@ -627,8 +620,9 @@ handle_in(
|
|||
end;
|
||||
handle_in(
|
||||
?SN_REGISTER_MSG(_TopicId, MsgId, TopicName),
|
||||
Channel = #channel{registry = Registry}
|
||||
Channel = #channel{session = Session}
|
||||
) ->
|
||||
Registry = emqx_mqttsn_session:registry(Session),
|
||||
case emqx_mqttsn_registry:reg(TopicName, Registry) of
|
||||
{ok, TopicId, NRegistry} ->
|
||||
?SLOG(debug, #{
|
||||
|
@ -637,7 +631,8 @@ handle_in(
|
|||
topic_id => TopicId
|
||||
}),
|
||||
AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED),
|
||||
{ok, {outgoing, AckPacket}, Channel#channel{registry = NRegistry}};
|
||||
NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
|
||||
{ok, {outgoing, AckPacket}, Channel#channel{session = NSession}};
|
||||
{error, too_large} ->
|
||||
?SLOG(error, #{
|
||||
msg => "register_topic_failed",
|
||||
|
@ -749,14 +744,14 @@ handle_in(
|
|||
?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
|
||||
Channel = #channel{
|
||||
ctx = Ctx,
|
||||
registry = Registry,
|
||||
session = Session,
|
||||
clientinfo = ClientInfo
|
||||
}
|
||||
) ->
|
||||
Registry = emqx_mqttsn_session:registry(Session),
|
||||
case ReturnCode of
|
||||
?SN_RC_ACCEPTED ->
|
||||
case emqx_session:puback(ClientInfo, MsgId, Session) of
|
||||
case emqx_mqttsn_session:puback(ClientInfo, MsgId, Session) of
|
||||
{ok, Msg, NSession} ->
|
||||
ok = after_message_acked(ClientInfo, Msg, Channel),
|
||||
{Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
|
||||
|
@ -813,7 +808,7 @@ handle_in(
|
|||
clientinfo = ClientInfo
|
||||
}
|
||||
) ->
|
||||
case emqx_session:pubrec(ClientInfo, MsgId, Session) of
|
||||
case emqx_mqttsn_session:pubrec(ClientInfo, MsgId, Session) of
|
||||
{ok, Msg, NSession} ->
|
||||
ok = after_message_acked(ClientInfo, Msg, Channel),
|
||||
NChannel = Channel#channel{session = NSession},
|
||||
|
@ -839,7 +834,7 @@ handle_in(
|
|||
?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
|
||||
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
|
||||
) ->
|
||||
case emqx_session:pubrel(ClientInfo, MsgId, Session) of
|
||||
case emqx_mqttsn_session:pubrel(ClientInfo, MsgId, Session) of
|
||||
{ok, NSession} ->
|
||||
NChannel = Channel#channel{session = NSession},
|
||||
handle_out(pubcomp, MsgId, NChannel);
|
||||
|
@ -856,7 +851,7 @@ handle_in(
|
|||
?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
|
||||
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
|
||||
) ->
|
||||
case emqx_session:pubcomp(ClientInfo, MsgId, Session) of
|
||||
case emqx_mqttsn_session:pubcomp(ClientInfo, MsgId, Session) of
|
||||
{ok, NSession} ->
|
||||
{Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
|
||||
Channel#channel{session = NSession}
|
||||
|
@ -1093,8 +1088,9 @@ convert_topic_id_to_name({{name, TopicName}, Flags, Data}, Channel) ->
|
|||
{ok, {TopicName, Flags, Data}, Channel};
|
||||
convert_topic_id_to_name(
|
||||
{{id, TopicId}, Flags, Data},
|
||||
Channel = #channel{registry = Registry}
|
||||
Channel = #channel{session = Session}
|
||||
) ->
|
||||
Registry = emqx_mqttsn_session:registry(Session),
|
||||
case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
|
||||
undefined ->
|
||||
{error, ?SN_RC_INVALID_TOPIC_ID};
|
||||
|
@ -1164,7 +1160,7 @@ do_publish(
|
|||
Msg = #message{qos = ?QOS_2},
|
||||
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
|
||||
) ->
|
||||
case emqx_session:publish(ClientInfo, MsgId, Msg, Session) of
|
||||
case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of
|
||||
{ok, _PubRes, NSession} ->
|
||||
NChannel1 = ensure_timer(
|
||||
await_timer,
|
||||
|
@ -1197,8 +1193,9 @@ preproc_subs_type(
|
|||
TopicName,
|
||||
QoS
|
||||
),
|
||||
Channel = #channel{registry = Registry}
|
||||
Channel = #channel{session = Session}
|
||||
) ->
|
||||
Registry = emqx_mqttsn_session:registry(Session),
|
||||
%% If the gateway is able accept the subscription,
|
||||
%% it assigns a topic id to the received topic name
|
||||
%% and returns it within a SUBACK message
|
||||
|
@ -1214,7 +1211,8 @@ preproc_subs_type(
|
|||
%% topic name to be sent to the client, see also Section 6.10.
|
||||
{ok, {?SN_INVALID_TOPIC_ID, TopicName, QoS}, Channel};
|
||||
{ok, TopicId, NRegistry} ->
|
||||
{ok, {TopicId, TopicName, QoS}, Channel#channel{registry = NRegistry}}
|
||||
NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
|
||||
{ok, {TopicId, TopicName, QoS}, Channel#channel{session = NSession}}
|
||||
end;
|
||||
preproc_subs_type(
|
||||
?SN_SUBSCRIBE_MSG_TYPE(
|
||||
|
@ -1222,8 +1220,9 @@ preproc_subs_type(
|
|||
TopicId,
|
||||
QoS
|
||||
),
|
||||
Channel = #channel{registry = Registry}
|
||||
Channel = #channel{session = Session}
|
||||
) ->
|
||||
Registry = emqx_mqttsn_session:registry(Session),
|
||||
case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
|
||||
undefined ->
|
||||
{error, ?SN_RC_INVALID_TOPIC_ID};
|
||||
|
@ -1301,7 +1300,7 @@ do_subscribe(
|
|||
) ->
|
||||
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
|
||||
NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
|
||||
case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of
|
||||
case emqx_mqttsn_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of
|
||||
{ok, NSession} ->
|
||||
{ok, {TopicId, NTopicName, NSubOpts}, Channel#channel{session = NSession}};
|
||||
{error, ?RC_QUOTA_EXCEEDED} ->
|
||||
|
@ -1329,8 +1328,9 @@ preproc_unsub_type(
|
|||
?SN_PREDEFINED_TOPIC,
|
||||
TopicId
|
||||
),
|
||||
Channel = #channel{registry = Registry}
|
||||
Channel = #channel{session = Session}
|
||||
) ->
|
||||
Registry = emqx_mqttsn_session:registry(Session),
|
||||
case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
|
||||
undefined ->
|
||||
{error, not_found};
|
||||
|
@ -1391,7 +1391,7 @@ do_unsubscribe(
|
|||
SubOpts
|
||||
),
|
||||
case
|
||||
emqx_session:unsubscribe(
|
||||
emqx_mqttsn_session:unsubscribe(
|
||||
ClientInfo,
|
||||
NTopicName,
|
||||
NSubOpts,
|
||||
|
@ -1436,9 +1436,9 @@ awake(
|
|||
clientid => ClientId,
|
||||
previous_state => ConnState
|
||||
}),
|
||||
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
|
||||
{ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session),
|
||||
{NPublishes, NSession} =
|
||||
case emqx_session:deliver(ClientInfo, [], Session1) of
|
||||
case emqx_mqttsn_session:deliver(ClientInfo, [], Session1) of
|
||||
{ok, Session2} ->
|
||||
{Publishes, Session2};
|
||||
{ok, More, Session2} ->
|
||||
|
@ -1466,8 +1466,8 @@ goto_asleep_if_buffered_msgs_sent(
|
|||
}
|
||||
) ->
|
||||
case
|
||||
emqx_mqueue:is_empty(emqx_session:info(mqueue, Session)) andalso
|
||||
emqx_inflight:is_empty(emqx_session:info(inflight, Session))
|
||||
emqx_mqueue:is_empty(emqx_mqttsn_session:info(mqueue, Session)) andalso
|
||||
emqx_inflight:is_empty(emqx_mqttsn_session:info(inflight, Session))
|
||||
of
|
||||
true ->
|
||||
?SLOG(info, #{
|
||||
|
@ -1560,7 +1560,7 @@ handle_out(
|
|||
register_inflight = undefined
|
||||
}
|
||||
) ->
|
||||
{MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session),
|
||||
{MsgId, NSession} = emqx_mqttsn_session:obtain_next_pkt_id(Session),
|
||||
Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)},
|
||||
NChannel = Channel#channel{
|
||||
session = NSession,
|
||||
|
@ -1636,7 +1636,7 @@ maybe_resume_session(
|
|||
resuming = true
|
||||
}
|
||||
) ->
|
||||
Subs = emqx_session:info(subscriptions, Session),
|
||||
Subs = emqx_mqttsn_session:info(subscriptions, Session),
|
||||
case subs_resume() andalso map_size(Subs) =/= 0 of
|
||||
true ->
|
||||
TopicNames = lists:filter(fun(T) -> not emqx_topic:wildcard(T) end, maps:keys(Subs)),
|
||||
|
@ -1661,9 +1661,9 @@ resume_or_replay_messages(
|
|||
false ->
|
||||
{[], Channel}
|
||||
end,
|
||||
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
|
||||
{ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session),
|
||||
{NPublishes, NSession} =
|
||||
case emqx_session:deliver(ClientInfo, NPendings, Session1) of
|
||||
case emqx_mqttsn_session:deliver(ClientInfo, NPendings, Session1) of
|
||||
{ok, Session2} ->
|
||||
{Publishes, Session2};
|
||||
{ok, More, Session2} ->
|
||||
|
@ -1734,7 +1734,7 @@ outgoing_deliver_and_register({Packets, Channel}) ->
|
|||
message_to_packet(
|
||||
MsgId,
|
||||
Message,
|
||||
#channel{registry = Registry}
|
||||
#channel{session = Session}
|
||||
) ->
|
||||
QoS = emqx_message:qos(Message),
|
||||
Topic = emqx_message:topic(Message),
|
||||
|
@ -1744,6 +1744,7 @@ message_to_packet(
|
|||
?QOS_0 -> 0;
|
||||
_ -> MsgId
|
||||
end,
|
||||
Registry = emqx_mqttsn_session:registry(Session),
|
||||
case emqx_mqttsn_registry:lookup_topic_id(Topic, Registry) of
|
||||
{predef, PredefTopicId} ->
|
||||
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC},
|
||||
|
@ -1779,7 +1780,7 @@ handle_call({unsubscribe, Topic}, _From, Channel) ->
|
|||
{ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel),
|
||||
reply_and_update(ok, NChannel);
|
||||
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
|
||||
reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel);
|
||||
reply({ok, maps:to_list(emqx_mqttsn_session:info(subscriptions, Session))}, Channel);
|
||||
handle_call(kick, _From, Channel) ->
|
||||
NChannel = ensure_disconnected(kicked, Channel),
|
||||
shutdown_and_reply(kicked, ok, NChannel);
|
||||
|
@ -1800,7 +1801,7 @@ handle_call(
|
|||
pendings = Pendings
|
||||
}
|
||||
) ->
|
||||
ok = emqx_session:takeover(Session),
|
||||
ok = emqx_mqttsn_session:takeover(Session),
|
||||
%% TODO: Should not drain deliver here (side effect)
|
||||
Delivers = emqx_utils:drain_deliver(),
|
||||
AllPendings = lists:append(Delivers, Pendings),
|
||||
|
@ -1877,7 +1878,8 @@ handle_info(clean_authz_cache, Channel) ->
|
|||
{ok, Channel};
|
||||
handle_info({subscribe, _}, Channel) ->
|
||||
{ok, Channel};
|
||||
handle_info({register, TopicName}, Channel = #channel{registry = Registry}) ->
|
||||
handle_info({register, TopicName}, Channel = #channel{session = Session}) ->
|
||||
Registry = emqx_mqttsn_session:registry(Session),
|
||||
case emqx_mqttsn_registry:reg(TopicName, Registry) of
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
|
@ -1887,7 +1889,8 @@ handle_info({register, TopicName}, Channel = #channel{registry = Registry}) ->
|
|||
}),
|
||||
{ok, Channel};
|
||||
{ok, TopicId, NRegistry} ->
|
||||
handle_out(register, {TopicId, TopicName}, Channel#channel{registry = NRegistry})
|
||||
NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
|
||||
handle_out(register, {TopicId, TopicName}, Channel#channel{session = NSession})
|
||||
end;
|
||||
handle_info(Info, Channel) ->
|
||||
?SLOG(error, #{
|
||||
|
@ -1954,7 +1957,7 @@ handle_deliver(
|
|||
ConnState =:= disconnected;
|
||||
ConnState =:= asleep
|
||||
->
|
||||
NSession = emqx_session:enqueue(
|
||||
NSession = emqx_mqttsn_session:enqueue(
|
||||
ClientInfo,
|
||||
ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx),
|
||||
Session
|
||||
|
@ -1990,7 +1993,7 @@ handle_deliver(
|
|||
}
|
||||
) ->
|
||||
case
|
||||
emqx_session:deliver(
|
||||
emqx_mqttsn_session:deliver(
|
||||
ClientInfo,
|
||||
ignore_local(Delivers, ClientId, Session, Ctx),
|
||||
Session
|
||||
|
@ -2008,7 +2011,7 @@ handle_deliver(
|
|||
end.
|
||||
|
||||
ignore_local(Delivers, Subscriber, Session, Ctx) ->
|
||||
Subs = emqx_session:info(subscriptions, Session),
|
||||
Subs = emqx_mqttsn_session:info(subscriptions, Session),
|
||||
lists:filter(
|
||||
fun({deliver, Topic, #message{from = Publisher}}) ->
|
||||
case maps:find(Topic, Subs) of
|
||||
|
@ -2083,7 +2086,7 @@ handle_timeout(
|
|||
retry_delivery,
|
||||
Channel = #channel{session = Session, clientinfo = ClientInfo}
|
||||
) ->
|
||||
case emqx_session:retry(ClientInfo, Session) of
|
||||
case emqx_mqttsn_session:retry(ClientInfo, Session) of
|
||||
{ok, NSession} ->
|
||||
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
||||
{ok, Publishes, Timeout, NSession} ->
|
||||
|
@ -2108,7 +2111,7 @@ handle_timeout(
|
|||
expire_awaiting_rel,
|
||||
Channel = #channel{session = Session, clientinfo = ClientInfo}
|
||||
) ->
|
||||
case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
|
||||
case emqx_mqttsn_session:expire(ClientInfo, awaiting_rel, Session) of
|
||||
{ok, NSession} ->
|
||||
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
|
||||
{ok, Timeout, NSession} ->
|
||||
|
@ -2252,9 +2255,9 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
|||
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
||||
emqx_keepalive:info(interval, KeepAlive);
|
||||
interval(retry_timer, #channel{session = Session}) ->
|
||||
emqx_session:info(retry_interval, Session);
|
||||
emqx_mqttsn_session:info(retry_interval, Session);
|
||||
interval(await_timer, #channel{session = Session}) ->
|
||||
emqx_session:info(await_rel_timeout, Session).
|
||||
emqx_mqttsn_session:info(await_rel_timeout, Session).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
|
|
|
@ -90,29 +90,17 @@ init() ->
|
|||
| {error, term()}.
|
||||
reg(
|
||||
TopicName,
|
||||
Registry = #{
|
||||
next_topic_id := TopicId0,
|
||||
id_to_name := IdMap,
|
||||
name_to_id := NameMap
|
||||
}
|
||||
Registry
|
||||
) when is_binary(TopicName) ->
|
||||
case emqx_topic:wildcard(TopicName) of
|
||||
false ->
|
||||
case maps:find(TopicName, NameMap) of
|
||||
{ok, TopicId} ->
|
||||
case lookup_topic_id(TopicName, Registry) of
|
||||
{predef, TopicId} when is_integer(TopicId) ->
|
||||
{ok, TopicId, Registry};
|
||||
error ->
|
||||
case next_topic_id(TopicId0) of
|
||||
{error, too_large} ->
|
||||
{error, too_large};
|
||||
NextTopicId ->
|
||||
NRegistry = Registry#{
|
||||
next_topic_id := NextTopicId,
|
||||
id_to_name := maps:put(NextTopicId, TopicName, IdMap),
|
||||
name_to_id := maps:put(TopicName, NextTopicId, NameMap)
|
||||
},
|
||||
{ok, NextTopicId, NRegistry}
|
||||
end
|
||||
TopicId when is_integer(TopicId) ->
|
||||
{ok, TopicId, Registry};
|
||||
undefined ->
|
||||
do_reg(TopicName, Registry)
|
||||
end;
|
||||
%% TopicId: in case of “accepted” the value that will be used as topic
|
||||
%% id by the gateway when sending PUBLISH messages to the client (not
|
||||
|
@ -122,6 +110,26 @@ reg(
|
|||
{error, wildcard_topic}
|
||||
end.
|
||||
|
||||
do_reg(
|
||||
TopicName,
|
||||
Registry = #{
|
||||
next_topic_id := TopicId0,
|
||||
id_to_name := IdMap,
|
||||
name_to_id := NameMap
|
||||
}
|
||||
) ->
|
||||
case next_topic_id(TopicId0) of
|
||||
{error, too_large} ->
|
||||
{error, too_large};
|
||||
NextTopicId ->
|
||||
NRegistry = Registry#{
|
||||
next_topic_id := NextTopicId,
|
||||
id_to_name := maps:put(NextTopicId, TopicName, IdMap),
|
||||
name_to_id := maps:put(TopicName, NextTopicId, NameMap)
|
||||
},
|
||||
{ok, NextTopicId, NRegistry}
|
||||
end.
|
||||
|
||||
next_topic_id(Id) when is_integer(Id) andalso (Id < 16#FFFF) ->
|
||||
Id + 1;
|
||||
next_topic_id(Id) when is_integer(Id) ->
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_mqttsn_session).
|
||||
|
||||
-export([registry/1, set_registry/2]).
|
||||
|
||||
-export([
|
||||
init/1,
|
||||
info/1,
|
||||
info/2,
|
||||
stats/1,
|
||||
resume/2
|
||||
]).
|
||||
|
||||
-export([
|
||||
publish/4,
|
||||
subscribe/4,
|
||||
unsubscribe/4,
|
||||
puback/3,
|
||||
pubrec/3,
|
||||
pubrel/3,
|
||||
pubcomp/3
|
||||
]).
|
||||
|
||||
-export([
|
||||
replay/2,
|
||||
deliver/3,
|
||||
obtain_next_pkt_id/1,
|
||||
takeover/1,
|
||||
enqueue/3,
|
||||
retry/2,
|
||||
expire/3
|
||||
]).
|
||||
|
||||
-type session() :: #{
|
||||
registry := emqx_mqttsn_registry:registry(),
|
||||
session := emqx_session:session()
|
||||
}.
|
||||
|
||||
-export_type([session/0]).
|
||||
|
||||
init(ClientInfo) ->
|
||||
Conf = emqx_cm:get_session_confs(
|
||||
ClientInfo, #{receive_maximum => 1, expiry_interval => 0}
|
||||
),
|
||||
#{
|
||||
registry => emqx_mqttsn_registry:init(),
|
||||
session => emqx_session:init(Conf)
|
||||
}.
|
||||
|
||||
registry(#{registry := Registry}) ->
|
||||
Registry.
|
||||
|
||||
set_registry(Registry, Session) ->
|
||||
Session#{registry := Registry}.
|
||||
|
||||
info(#{session := Session}) ->
|
||||
emqx_session:info(Session).
|
||||
|
||||
info(Key, #{session := Session}) ->
|
||||
emqx_session:info(Key, Session).
|
||||
|
||||
stats(#{session := Session}) ->
|
||||
emqx_session:stats(Session).
|
||||
|
||||
puback(ClientInfo, MsgId, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session).
|
||||
|
||||
pubrec(ClientInfo, MsgId, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session).
|
||||
|
||||
pubrel(ClientInfo, MsgId, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session).
|
||||
|
||||
pubcomp(ClientInfo, MsgId, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session).
|
||||
|
||||
publish(ClientInfo, MsgId, Msg, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId, Msg], Session).
|
||||
|
||||
subscribe(ClientInfo, Topic, SubOpts, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, Topic, SubOpts], Session).
|
||||
|
||||
unsubscribe(ClientInfo, Topic, SubOpts, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, Topic, SubOpts], Session).
|
||||
|
||||
replay(ClientInfo, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo], Session).
|
||||
|
||||
deliver(ClientInfo, Delivers, Session1) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, Delivers], Session1).
|
||||
|
||||
obtain_next_pkt_id(Session = #{session := Sess}) ->
|
||||
{Id, Sess1} = emqx_session:obtain_next_pkt_id(Sess),
|
||||
{Id, Session#{session := Sess1}}.
|
||||
|
||||
takeover(_Session = #{session := Sess}) ->
|
||||
emqx_session:takeover(Sess).
|
||||
|
||||
enqueue(ClientInfo, Delivers, Session = #{session := Sess}) ->
|
||||
Sess1 = emqx_session:enqueue(ClientInfo, Delivers, Sess),
|
||||
Session#{session := Sess1}.
|
||||
|
||||
retry(ClientInfo, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo], Session).
|
||||
|
||||
expire(ClientInfo, awaiting_rel, Session) ->
|
||||
with_sess(?FUNCTION_NAME, [ClientInfo, awaiting_rel], Session).
|
||||
|
||||
resume(ClientInfo, #{session := Sess}) ->
|
||||
emqx_session:resume(ClientInfo, Sess).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% internal funcs
|
||||
|
||||
with_sess(Fun, Args, Session = #{session := Sess}) ->
|
||||
case apply(emqx_session, Fun, Args ++ [Sess]) of
|
||||
%% for subscribe
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
%% for pubrel
|
||||
{ok, Sess1} ->
|
||||
{ok, Session#{session := Sess1}};
|
||||
%% for publish and puback
|
||||
{ok, Result, Sess1} ->
|
||||
{ok, Result, Session#{session := Sess1}};
|
||||
%% for puback
|
||||
{ok, Msgs, Replies, Sess1} ->
|
||||
{ok, Msgs, Replies, Session#{session := Sess1}}
|
||||
end.
|
Loading…
Reference in New Issue