From fc5baf8fd40c39782ee12b96e8efd0a670aea6e8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 7 Jul 2021 18:29:47 +0800 Subject: [PATCH] refactor(gw-sn): support mutil-registry process --- .../src/mqttsn/emqx_sn_gateway.erl | 68 ++++--- apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl | 6 +- .../src/mqttsn/emqx_sn_registry.erl | 167 +++++++++++------- 3 files changed, 142 insertions(+), 99 deletions(-) diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl index 27eb16498..28d461b9b 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl @@ -76,6 +76,7 @@ }). -record(state, {gwid :: integer(), + registry :: emqx_sn_registry:registry(), socket :: port(), sockpid :: pid(), sockstate :: emqx_types:sockstate(), @@ -145,16 +146,18 @@ kick(GwPid) -> %%-------------------------------------------------------------------- init([{_, SockPid, Sock}, Peername, Options]) -> - GwId = proplists:get_value(gateway_id, Options), - Username = proplists:get_value(username, Options, undefined), - Password = proplists:get_value(password, Options, undefined), - EnableQos3 = proplists:get_value(enable_qos3, Options, false), - IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), - EnableStats = proplists:get_value(enable_stats, Options, false), + GwId = maps:get(gateway_id, Options), + Registry = maps:get(registry, Options), + Username = maps:get(username, Options, undefined), + Password = maps:get(password, Options, undefined), + EnableQos3 = maps:get(enable_qos3, Options, false), + IdleTimeout = maps:get(idle_timeout, Options, 30000), + EnableStats = maps:get(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> Channel = emqx_channel:init(?CONN_INFO(Sockname, Peername), ?DEFAULT_CHAN_OPTIONS), State = #state{gwid = GwId, + registry = Registry, username = Username, password = Password, socket = Sock, @@ -202,10 +205,16 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, topic_id_type = TopicIdType }, TopicId, _MsgId, Data)}, - State = #state{clientid = ClientId}) -> + State = #state{registry = Registry, clientid = ClientId}) -> TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of - false -> emqx_sn_registry:lookup_topic(ClientId, TopicId); - true -> <> + false -> + emqx_sn_registry:lookup_topic( + Registry, + ClientId, + TopicId + ); + true -> + <> end, _ = case TopicName =/= undefined of true -> @@ -290,9 +299,9 @@ wait_for_will_msg(EventType, EventContent, State) -> handle_event(EventType, EventContent, wait_for_will_msg, State). connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)}, - State = #state{clientid = ClientId}) -> + State = #state{registry = Registry, clientid = ClientId}) -> State0 = - case emqx_sn_registry:register_topic(ClientId, TopicName) of + case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of TopicId when is_integer(TopicId) -> ?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]), send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State); @@ -579,13 +588,13 @@ handle_event(EventType, EventContent, StateName, State) -> [StateName, {EventType, EventContent}]), {keep_state, State}. -terminate(Reason, _StateName, #state{channel = Channel}) -> +terminate(Reason, _StateName, #state{registry = Registry, channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), case Reason of {shutdown, takeovered} -> ok; _ -> - emqx_sn_registry:unregister_topic(ClientId) + emqx_sn_registry:unregister_topic(Registry, ClientId) end, emqx_channel:terminate(Reason, Channel), ok. @@ -721,12 +730,13 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) -> mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)-> ?SN_UNSUBACK_MSG(MsgId); -mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) -> +mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), + #state{registry = Registry, channel = Channel}) -> NewPacketId = if QoS =:= ?QOS_0 -> 0; true -> PacketId end, ClientId = emqx_channel:info(clientid, Channel), - {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of + {TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of {predef, PredefTopicId} -> {?SN_PREDEFINED_TOPIC, PredefTopicId}; TopicId when is_integer(TopicId) -> @@ -848,9 +858,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> end. handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, - State=#state{channel = Channel}) -> + State=#state{registry = Registry, channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:register_topic(ClientId, TopicName) of + case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of {error, too_large} -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, ?SN_INVALID_TOPIC_ID, @@ -864,9 +874,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId, end; handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId, - State = #state{channel = Channel}) -> + State = #state{registry = Registry, channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS}, TopicId, @@ -895,9 +905,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) -> proto_unsubscribe(TopicId, MsgId, State); handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId, - State = #state{channel = Channel}) -> + State = #state{registry = Registry, channel = Channel}) -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}; PredefinedTopic -> @@ -920,11 +930,11 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> <> = TopicName, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, - State=#state{channel = Channel}) -> + State=#state{registry = Registry, channel = Channel}) -> #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, NewQoS = get_corrected_qos(QoS), ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID, State)}; @@ -963,13 +973,13 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) -> ok. do_puback(TopicId, MsgId, ReturnCode, StateName, - State=#state{channel = Channel}) -> + State=#state{registry = Registry, channel = Channel}) -> case ReturnCode of ?SN_RC_ACCEPTED -> handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); ?SN_RC_INVALID_TOPIC_ID -> ClientId = emqx_channel:info(clientid, Channel), - case emqx_sn_registry:lookup_topic(ClientId, TopicId) of + case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> {keep_state, State}; TopicName -> %%notice that this TopicName maybe normal or predefined, @@ -1061,10 +1071,10 @@ handle_outgoing(Packets, State) when is_list(Packets) -> end, State, Packets); handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _), - State = #state{channel = Channel}) -> + State = #state{registry = Registry, channel = Channel}) -> ?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]), ClientId = emqx_channel:info(clientid, Channel), - TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName), + TopicId = emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName), case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of true -> register_and_notify_client(PubPkt, State); false -> send_message(mqtt2sn(PubPkt, State), State) @@ -1089,11 +1099,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}. register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt, - State = #state{pending_topic_ids = Pendings, channel = Channel}) -> + State = #state{registry = Registry, pending_topic_ids = Pendings, channel = Channel}) -> MsgId = message_id(PacketId), #mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt, ClientId = emqx_channel:info(clientid, Channel), - TopicId = emqx_sn_registry:register_topic(ClientId, TopicName), + TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName), ?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, " "Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]), NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index c3b679381..3085afe89 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -72,11 +72,11 @@ on_insta_create(_Insta = #{ id := InstaId, end, PredefTopics = maps:get(predefined, RawConf), - {ok, RegistrySvr} = emqx_sn_registry:start_link(PredefTopics), + {ok, RegistrySvr} = emqx_sn_registry:start_link(InstaId, PredefTopics), NRawConf = maps:without( - [gateway_id, broadcast, predefined], - RawConf#{registry => RegistrySvr} + [broadcast, predefined], + RawConf#{registry => emqx_sn_registry:lookup_name(RegistrySvr)} ), Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index f2f87d93b..30583c443 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -14,6 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% @doc The MQTT-SN Topic Registry +%% +%% XXX: -module(emqx_sn_registry). -behaviour(gen_server). @@ -23,16 +26,15 @@ -define(LOG(Level, Format, Args), emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). --export([ start_link/1 - , stop/0 +-export([ start_link/2 ]). --export([ register_topic/2 - , unregister_topic/1 +-export([ register_topic/3 + , unregister_topic/2 ]). --export([ lookup_topic/2 - , lookup_topic_id/2 +-export([ lookup_topic/3 + , lookup_topic_id/3 ]). %% gen_server callbacks @@ -44,51 +46,54 @@ , code_change/3 ]). +-export([lookup_name/1]). + -define(SN_SHARD, emqx_sn_shard). --define(TAB, ?MODULE). - --record(state, {max_predef_topic_id = 0}). +-record(state, {tabname, max_predef_topic_id = 0}). -record(emqx_sn_registry, {key, value}). %% Mnesia bootstrap --export([mnesia/1]). +%-export([mnesia/1]). --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). +%-boot_mnesia({mnesia, [boot]}). +%-copy_mnesia({mnesia, [copy]}). --rlog_shard({?SN_SHARD, ?TAB}). +%-rlog_shard({?SN_SHARD, ?TAB}). -%% @doc Create or replicate tables. --spec(mnesia(boot | copy) -> ok). -mnesia(boot) -> - %% Optimize storage - StoreProps = [{ets, [{read_concurrency, true}]}], - ok = ekka_mnesia:create_table(?MODULE, [ - {attributes, record_info(fields, emqx_sn_registry)}, - {ram_copies, [node()]}, - {storage_properties, StoreProps}]); +%%% @doc Create or replicate tables. +%-spec(mnesia(boot | copy) -> ok). +%mnesia(boot) -> +% %% Optimize storage +% StoreProps = [{ets, [{read_concurrency, true}]}], +% ok = ekka_mnesia:create_table(?MODULE, [ +% {attributes, record_info(fields, emqx_sn_registry)}, +% {ram_copies, [node()]}, +% {storage_properties, StoreProps}]); +% +%mnesia(copy) -> +% ok = ekka_mnesia:copy_table(?MODULE, ram_copies). -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?MODULE, ram_copies). +-type registry() :: {Tab :: atom(), + RegistryPid :: pid()}. %%----------------------------------------------------------------------------- --spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). -start_link(PredefTopics) -> - ekka_rlog:wait_for_shards([?SN_SHARD], infinity), - gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). +-spec start_link(atom(), list()) + -> ignore + | {ok, pid()} + | {error, Reason :: term()}. +start_link(InstaId, PredefTopics) -> + gen_server:start_link(?MODULE, [InstaId, PredefTopics], []). --spec(stop() -> ok). -stop() -> - gen_server:stop(?MODULE, normal, infinity). - --spec(register_topic(binary(), binary()) -> integer() | {error, term()}). -register_topic(ClientId, TopicName) when is_binary(TopicName) -> +-spec register_topic(registry(), emqx_types:clientid(), emqx_types:topic()) + -> integer() + | {error, term()}. +register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> case emqx_topic:wildcard(TopicName) of false -> - gen_server:call(?MODULE, {register, ClientId, TopicName}); + gen_server:call(Pid, {register, ClientId, TopicName}); %% TopicId: in case of “accepted” the value that will be used as topic %% id by the gateway when sending PUBLISH messages to the client (not %% relevant in case of subscriptions to a short topic name or to a topic @@ -96,22 +101,24 @@ register_topic(ClientId, TopicName) when is_binary(TopicName) -> true -> {error, wildcard_topic} end. --spec(lookup_topic(binary(), pos_integer()) -> undefined | binary()). -lookup_topic(ClientId, TopicId) when is_integer(TopicId) -> - case lookup_element(?TAB, {predef, TopicId}, 3) of +-spec lookup_topic(registry(), emqx_types:clientid(), pos_integer()) + -> undefined + | binary(). +lookup_topic({Tab, _}, ClientId, TopicId) when is_integer(TopicId) -> + case lookup_element(Tab, {predef, TopicId}, 3) of undefined -> - lookup_element(?TAB, {ClientId, TopicId}, 3); + lookup_element(Tab, {ClientId, TopicId}, 3); Topic -> Topic end. --spec(lookup_topic_id(binary(), binary()) - -> undefined - | pos_integer() - | {predef, integer()}). -lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) -> - case lookup_element(?TAB, {predef, TopicName}, 3) of +-spec lookup_topic_id(registry(), emqx_types:clientid(), emqx_types:topic()) + -> undefined + | pos_integer() + | {predef, integer()}. +lookup_topic_id({Tab, _}, ClientId, TopicName) when is_binary(TopicName) -> + case lookup_element(Tab, {predef, TopicName}, 3) of undefined -> - lookup_element(?TAB, {ClientId, TopicName}, 3); + lookup_element(Tab, {ClientId, TopicName}, 3); TopicId -> {predef, TopicId} end. @@ -120,46 +127,69 @@ lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) -> lookup_element(Tab, Key, Pos) -> try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end. --spec(unregister_topic(binary()) -> ok). -unregister_topic(ClientId) -> - gen_server:call(?MODULE, {unregister, ClientId}). +-spec unregister_topic(registry(), emqx_types:clientid()) -> ok. +unregister_topic({_, Pid}, ClientId) -> + gen_server:call(Pid, {unregister, ClientId}). + +lookup_name(Pid) -> + gen_server:call(Pid, name). %%----------------------------------------------------------------------------- -init([PredefTopics]) -> +name(InstaId) -> + list_to_atom(lists:concat([emqx_sn_, InstaId, '_registry'])). + +init([InstaId, PredefTopics]) -> %% {predef, TopicId} -> TopicName %% {predef, TopicName} -> TopicId %% {ClientId, TopicId} -> TopicName %% {ClientId, TopicName} -> TopicId + Tab = name(InstaId), + ok = ekka_mnesia:create_table(Tab, [ + {ram_copies, [node()]}, + {record_name, emqx_sn_registry}, + {attributes, record_info(fields, emqx_sn_registry)}, + {storage_properties, [{ets, [{read_concurrency, true}]}]} + ]), + ok = ekka_mnesia:copy_table(Tab, ram_copies), + % FIXME: + %ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), MaxPredefId = lists:foldl( fun(#{id := TopicId, topic := TopicName}, AccId) -> - ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, - value = TopicName}), - ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, - value = TopicId}), + ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicId}, + value = TopicName} + ), + ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{ + key = {predef, TopicName}, + value = TopicId} + ), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), - {ok, #state{max_predef_topic_id = MaxPredefId}}. + {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}. handle_call({register, ClientId, TopicName}, _From, - State = #state{max_predef_topic_id = PredefId}) -> - case lookup_topic_id(ClientId, TopicName) of + State = #state{tabname = Tab, max_predef_topic_id = PredefId}) -> + case lookup_topic_id({Tab, self()}, ClientId, TopicName) of {predef, PredefTopicId} when is_integer(PredefTopicId) -> {reply, PredefTopicId, State}; TopicId when is_integer(TopicId) -> {reply, TopicId, State}; undefined -> - case next_topic_id(?TAB, PredefId, ClientId) of + case next_topic_id(Tab, PredefId, ClientId) of TopicId when TopicId >= 16#FFFF -> {reply, {error, too_large}, State}; TopicId -> Fun = fun() -> - mnesia:write(#emqx_sn_registry{key = {ClientId, next_topic_id}, - value = TopicId + 1}), - mnesia:write(#emqx_sn_registry{key = {ClientId, TopicName}, - value = TopicId}), - mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId}, - value = TopicName}) + mnesia:write(Tab, #emqx_sn_registry{ + key = {ClientId, next_topic_id}, + value = TopicId + 1}, write), + mnesia:write(Tab, #emqx_sn_registry{ + key = {ClientId, TopicName}, + value = TopicId}, write), + mnesia:write(Tab, #emqx_sn_registry{ + key = {ClientId, TopicId}, + value = TopicName}, write) end, case ekka_mnesia:transaction(?SN_SHARD, Fun) of {atomic, ok} -> @@ -170,11 +200,14 @@ handle_call({register, ClientId, TopicName}, _From, end end; -handle_call({unregister, ClientId}, _From, State) -> - Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Registry), +handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> + Registry = mnesia:dirty_match_object({Tab, {ClientId, '_'}, '_'}), + lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(Tab, R) end, Registry), {reply, ok, State}; +handle_call(name, _From, State = #state{tabname = Tab}) -> + {reply, {Tab, self()}, State}; + handle_call(Req, _From, State) -> ?LOG(error, "Unexpected request: ~p", [Req]), {reply, ignored, State}.