Merge branch 'master' into emqx_config

This commit is contained in:
Shawn 2021-07-15 13:18:57 +08:00
commit 0cc01c2bee
56 changed files with 743 additions and 1045 deletions

View File

@ -5,6 +5,7 @@
## TODO:
emqx_gateway: {
stomp.1: {
frame: {
max_headers: 10
@ -27,4 +28,47 @@ emqx_gateway: {
active_n: 100
}
}
mqttsn.1: {
## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id: 1
## Enable broadcast this gateway to WLAN
broadcast: true
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats: true
## To control whether accept and process the received
## publish message with qos=-1.
enable_qos3: true
## Idle timeout for a MQTT-SN channel
idle_timeout: 30s
## The pre-defined topic name corresponding to the pre-defined topic
## id of N.
## Note that the pre-defined topic id of 0 is reserved.
predefined: [
{ id: 1
topic: "/predefined/topic/name/hello"
},
{ id: 2
topic: "/predefined/topic/name/nice"
}
]
### ClientInfo override
clientinfo_override: {
username: "mqtt_sn_user"
password: "abc"
}
listener.udp.1: {
bind: 1884
max_connections: 10240000
max_conn_rate: 1000
}
}
}

View File

@ -31,6 +31,7 @@
)
-> {error, reason()}
| {ok, [GwInstaPid :: pid()], GwInstaState :: state()}
%% TODO: v0.2 The child spec is better for restarting child process
| {ok, [Childspec :: supervisor:child_spec()], GwInstaState :: state()}.
%% @doc

View File

@ -45,7 +45,7 @@ load_default_gateway_applications() ->
gateway_type_searching() ->
%% FIXME: Hardcoded apps
[emqx_stomp_impl].
[emqx_stomp_impl, emqx_sn_impl].
load(Mod) ->
try
@ -65,7 +65,7 @@ create_gateway_by_default([]) ->
create_gateway_by_default([{Type, Name, Confs}|More]) ->
case emqx_gateway_registry:lookup(Type) of
undefined ->
?LOG(error, "Skip to start ~p#~p: not_registred_type",
?LOG(error, "Skip to start ~s#~s: not_registred_type",
[Type, Name]);
_ ->
case emqx_gateway:create(Type,
@ -73,9 +73,9 @@ create_gateway_by_default([{Type, Name, Confs}|More]) ->
<<>>,
Confs) of
{ok, _} ->
?LOG(debug, "Start ~p#~p successfully!", [Type, Name]);
?LOG(debug, "Start ~s#~s successfully!", [Type, Name]);
{error, Reason} ->
?LOG(error, "Start ~p#~p failed: ~0p",
?LOG(error, "Start ~s#~s failed: ~0p",
[Type, Name, Reason])
end
end,

View File

@ -54,7 +54,6 @@
start_link(Insta, Ctx, GwDscrptr) ->
gen_server:start_link(
{local, ?MODULE},
?MODULE,
[Insta, Ctx, GwDscrptr],
[]

View File

@ -49,7 +49,7 @@
%%--------------------------------------------------------------------
start_link(Type) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Type], []).
gen_server:start_link(?MODULE, [Type], []).
-spec inc(gateway_type(), atom()) -> ok.
inc(Type, Name) ->

View File

@ -32,7 +32,9 @@
structs() -> ["emqx_gateway"].
fields("emqx_gateway") ->
[{stomp, t(ref(stomp))}];
[{stomp, t(ref(stomp))},
{mqttsn, t(ref(mqttsn))}
];
fields(stomp) ->
[{"$id", t(ref(stomp_structs))}];
@ -41,7 +43,7 @@ fields(stomp_structs) ->
[ {frame, t(ref(stomp_frame))}
, {clientinfo_override, t(ref(clientinfo_override))}
, {authenticator, t(union([allow_anonymous]))}
, {listener, t(ref(listener))}
, {listener, t(ref(tcp_listener_group))}
];
fields(stomp_frame) ->
@ -50,13 +52,38 @@ fields(stomp_frame) ->
, {max_body_length, t(integer(), undefined, 8192)}
];
fields(mqttsn) ->
[{"$id", t(ref(mqttsn_structs))}];
fields(mqttsn_structs) ->
[ {gateway_id, t(integer())}
, {broadcast, t(boolean())}
, {enable_stats, t(boolean())}
, {enable_qos3, t(boolean())}
, {idle_timeout, t(duration())}
, {predefined, hoconsc:array(ref(mqttsn_predefined))}
, {clientinfo_override, t(ref(clientinfo_override))}
, {listener, t(ref(udp_listener_group))}
];
fields(mqttsn_predefined) ->
%% FIXME: How to check the $id is a integer ???
[ {id, t(integer())}
, {topic, t(string())}
];
fields(clientinfo_override) ->
[ {username, t(string())}
, {password, t(string())}
, {clientid, t(string())}
];
fields(listener) ->
fields(udp_listener_group) ->
[ {udp, t(ref(udp_listener))}
, {dtls, t(ref(dtls_listener))}
];
fields(tcp_listener_group) ->
[ {tcp, t(ref(tcp_listener))}
, {ssl, t(ref(ssl_listener))}
];
@ -67,7 +94,14 @@ fields(tcp_listener) ->
fields(ssl_listener) ->
[ {"$name", t(ref(ssl_listener_settings))}];
fields(udp_listener) ->
[ {"$name", t(ref(udp_listener_settings))}];
fields(dtls_listener) ->
[ {"$name", t(ref(dtls_listener_settings))}];
fields(listener_settings) ->
% FIXME:
%[ {"bind", t(union(ip_port(), integer()))}
[ {bind, t(integer())}
, {acceptors, t(integer(), undefined, 8)}
@ -104,6 +138,19 @@ fields(ssl_listener_settings) ->
, depth => 10
, reuse_sessions => true}) ++ fields(listener_settings);
fields(udp_listener_settings) ->
[
%% some special confs for udp listener
] ++ fields(listener_settings);
fields(dtls_listener_settings) ->
[
%% some special confs for dtls listener
] ++
ssl(undefined, #{handshake_timeout => "15s"
, depth => 10
, reuse_sessions => true}) ++ fields(listener_settings);
fields(access) ->
[ {"$id", #{type => string(),
nullable => true}}];

View File

@ -25,6 +25,7 @@
]).
-export([ apply/2
, format_listenon/1
]).
-export([ normalize_rawconf/1
@ -89,6 +90,13 @@ apply(F, A2) when is_function(F),
is_list(A2) ->
erlang:apply(F, A2).
format_listenon(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format_listenon({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format_listenon({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
-type listener() :: #{}.
-type rawconf() ::

View File

@ -1,10 +1,9 @@
emqx-sn
=======
# MQTT-SN Gateway
EMQ X MQTT-SN Gateway.
Configure Plugin
----------------
## Configure Plugin
File: etc/emqx_sn.conf
@ -72,8 +71,7 @@ mqtt.sn.password = abc
- mqtt.sn.password
* This parameter is optional. Pair with username above.
Load Plugin
-----------
## Load Plugin
```
./bin/emqx_ctl plugins load emqx_sn
@ -95,23 +93,18 @@ Load Plugin
- https://github.com/njh/mqtt-sn-tools
- https://github.com/arobenko/mqtt-sn
sleeping device
-----------
### sleeping device
PINGREQ must have a ClientId which is identical to the one in CONNECT message. Without ClientId, emqx-sn will ignore such PINGREQ.
pre-defined topics
-----------
### pre-defined topics
The mapping of a pre-defined topic id and topic name should be known inadvance by both client's application and gateway. We define this mapping info in emqx_sn.conf file, and which shall be kept equivalent in all client's side.
License
-------
## License
Apache License Version 2.0
Author
------
EMQ X-Men Team.
## Author
EMQ X Team.

View File

@ -18,7 +18,7 @@
-behaviour(gen_server).
-include("emqx_sn.hrl").
-include("src/mqttsn/include/emqx_sn.hrl").
-export([ start_link/2
, stop/0

View File

@ -17,7 +17,7 @@
-module(emqx_sn_frame).
-include("emqx_sn.hrl").
-include("src/mqttsn/include/emqx_sn.hrl").
-export([ parse/1
, serialize/1

View File

@ -18,7 +18,7 @@
-behaviour(gen_statem).
-include("emqx_sn.hrl").
-include("src/mqttsn/include/emqx_sn.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
@ -76,6 +76,7 @@
}).
-record(state, {gwid :: integer(),
registry :: emqx_sn_registry:registry(),
socket :: port(),
sockpid :: pid(),
sockstate :: emqx_types:sockstate(),
@ -145,16 +146,18 @@ kick(GwPid) ->
%%--------------------------------------------------------------------
init([{_, SockPid, Sock}, Peername, Options]) ->
GwId = proplists:get_value(gateway_id, Options),
Username = proplists:get_value(username, Options, undefined),
Password = proplists:get_value(password, Options, undefined),
EnableQos3 = proplists:get_value(enable_qos3, Options, false),
IdleTimeout = proplists:get_value(idle_timeout, Options, 30000),
EnableStats = proplists:get_value(enable_stats, Options, false),
GwId = maps:get(gateway_id, Options),
Registry = maps:get(registry, Options),
Username = maps:get(username, Options, undefined),
Password = maps:get(password, Options, undefined),
EnableQos3 = maps:get(enable_qos3, Options, false),
IdleTimeout = maps:get(idle_timeout, Options, 30000),
EnableStats = maps:get(enable_stats, Options, false),
case inet:sockname(Sock) of
{ok, Sockname} ->
Channel = emqx_channel:init(?CONN_INFO(Sockname, Peername), ?DEFAULT_CHAN_OPTIONS),
State = #state{gwid = GwId,
registry = Registry,
username = Username,
password = Password,
socket = Sock,
@ -202,10 +205,16 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State =
idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
topic_id_type = TopicIdType
}, TopicId, _MsgId, Data)},
State = #state{clientid = ClientId}) ->
State = #state{registry = Registry, clientid = ClientId}) ->
TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of
false -> emqx_sn_registry:lookup_topic(ClientId, TopicId);
true -> <<TopicId:16>>
false ->
emqx_sn_registry:lookup_topic(
Registry,
ClientId,
TopicId
);
true ->
<<TopicId:16>>
end,
_ = case TopicName =/= undefined of
true ->
@ -290,9 +299,9 @@ wait_for_will_msg(EventType, EventContent, State) ->
handle_event(EventType, EventContent, wait_for_will_msg, State).
connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
State = #state{clientid = ClientId}) ->
State = #state{registry = Registry, clientid = ClientId}) ->
State0 =
case emqx_sn_registry:register_topic(ClientId, TopicName) of
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
TopicId when is_integer(TopicId) ->
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State);
@ -579,13 +588,13 @@ handle_event(EventType, EventContent, StateName, State) ->
[StateName, {EventType, EventContent}]),
{keep_state, State}.
terminate(Reason, _StateName, #state{channel = Channel}) ->
terminate(Reason, _StateName, #state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
case Reason of
{shutdown, takeovered} ->
ok;
_ ->
emqx_sn_registry:unregister_topic(ClientId)
emqx_sn_registry:unregister_topic(Registry, ClientId)
end,
emqx_channel:terminate(Reason, Channel),
ok.
@ -721,12 +730,13 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
?SN_UNSUBACK_MSG(MsgId);
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) ->
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload),
#state{registry = Registry, channel = Channel}) ->
NewPacketId = if QoS =:= ?QOS_0 -> 0;
true -> PacketId
end,
ClientId = emqx_channel:info(clientid, Channel),
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of
{predef, PredefTopicId} ->
{?SN_PREDEFINED_TOPIC, PredefTopicId};
TopicId when is_integer(TopicId) ->
@ -848,9 +858,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
end.
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
State=#state{channel = Channel}) ->
State=#state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:register_topic(ClientId, TopicName) of
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
{error, too_large} ->
State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
?SN_INVALID_TOPIC_ID,
@ -864,9 +874,9 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
end;
handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId,
State = #state{channel = Channel}) ->
State = #state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
State0 = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
TopicId,
@ -895,9 +905,9 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) ->
proto_unsubscribe(TopicId, MsgId, State);
handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId,
State = #state{channel = Channel}) ->
State = #state{registry = Registry, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
{keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)};
PredefinedTopic ->
@ -915,15 +925,16 @@ handle_unsubscribe(_, _TopicId, MsgId, State) ->
{keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}.
do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
%% XXX: Handle normal topic id as predefined topic id, to be compatible with paho mqtt-sn library
%% XXX: Handle normal topic id as predefined topic id, to be
%% compatible with paho mqtt-sn library
<<TopicId:16>> = TopicName,
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State);
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
State=#state{channel = Channel}) ->
State=#state{registry = Registry, channel = Channel}) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS),
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined ->
{keep_state, maybe_send_puback(NewQoS, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID,
State)};
@ -962,18 +973,21 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) ->
ok.
do_puback(TopicId, MsgId, ReturnCode, StateName,
State=#state{channel = Channel}) ->
State=#state{registry = Registry, channel = Channel}) ->
case ReturnCode of
?SN_RC_ACCEPTED ->
handle_incoming(?PUBACK_PACKET(MsgId), StateName, State);
?SN_RC_INVALID_TOPIC_ID ->
ClientId = emqx_channel:info(clientid, Channel),
case emqx_sn_registry:lookup_topic(ClientId, TopicId) of
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
undefined -> {keep_state, State};
TopicName ->
%%notice that this TopicName maybe normal or predefined,
%% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels
{keep_state, send_register(TopicName, TopicId, MsgId, State)}
%% involving the predefined topic name in register to
%% enhance the gateway's robustness even inconsistent
%% with MQTT-SN channels
{keep_state, send_register(TopicName, TopicId,
MsgId, State)}
end;
_ ->
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
@ -1057,10 +1071,10 @@ handle_outgoing(Packets, State) when is_list(Packets) ->
end, State, Packets);
handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
State = #state{channel = Channel}) ->
State = #state{registry = Registry, channel = Channel}) ->
?LOG(debug, "Handle outgoing publish: ~0p", [PubPkt]),
ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
TopicId = emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName),
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
true -> register_and_notify_client(PubPkt, State);
false -> send_message(mqtt2sn(PubPkt, State), State)
@ -1070,8 +1084,9 @@ handle_outgoing(Packet, State) ->
send_message(mqtt2sn(Packet, State), State).
cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) ->
?LOG(debug, "cache non-registered publish message for topic-id: ~p, msg: ~0p, pendings: ~0p",
[TopicId, PubPkt, Pendings]),
?LOG(debug, "cache non-registered publish message "
"for topic-id: ~p, msg: ~0p, pendings: ~0p",
[TopicId, PubPkt, Pendings]),
Msgs = maps:get(pending_topic_ids, Pendings, []),
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.
@ -1084,11 +1099,11 @@ replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} =
State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
State = #state{pending_topic_ids = Pendings, channel = Channel}) ->
State = #state{registry = Registry, pending_topic_ids = Pendings, channel = Channel}) ->
MsgId = message_id(PacketId),
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:register_topic(ClientId, TopicName),
TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName),
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State),

View File

@ -0,0 +1,161 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The MQTT-SN Gateway Implement interface
-module(emqx_sn_impl).
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
]).
-export([]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
]).
-define(UDP_SOCKOPTS, []).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
load() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
YourOptions = [params1, params2],
emqx_gateway_registry:load(mqttsn, RegistryOptions, YourOptions).
unload() ->
emqx_gateway_registry:unload(mqttsn).
init(_) ->
GwState = #{},
{ok, GwState}.
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_insta_create(_Insta = #{ id := InstaId,
rawconf := RawConf
}, Ctx, _GwState) ->
%% We Also need to start `emqx_sn_broadcast` &
%% `emqx_sn_registry` process
SnGwId = maps:get(gateway_id, RawConf),
case maps:get(broadcast, RawConf) of
false ->
ok;
true ->
%% FIXME:
Port = 1884,
_ = emqx_sn_broadcast:start_link(SnGwId, Port), ok
end,
PredefTopics = maps:get(predefined, RawConf),
{ok, RegistrySvr} = emqx_sn_registry:start_link(InstaId, PredefTopics),
NRawConf = maps:without(
[broadcast, predefined],
RawConf#{registry => emqx_sn_registry:lookup_name(RegistrySvr)}
),
Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
catch
Class : Reason : Stk ->
logger:error("Failed to update stomp instance ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := RawConf
}, _GwInstaState, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
io:format("Start mqttsn ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
Pid;
{error, Reason} ->
io:format(standard_error,
"Failed to start mqttsn ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, Type),
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
{emqx_sn_gateway, start_link, [Cfg#{ctx => Ctx}]}).
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
merge_default(Options) ->
case lists:keytake(udp_options, 1, Options) of
{value, {udp_options, TcpOpts}, Options1} ->
[{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} | Options1];
false ->
[{udp_options, ?UDP_SOCKOPTS} | Options]
end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> io:format("Stop mqttsn ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
{error, Reason} ->
io:format(standard_error,
"Failed to stop mqttsn ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]
)
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
esockd:close(Name, ListenOn).

View File

@ -0,0 +1,240 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The MQTT-SN Topic Registry
%%
%% XXX:
-module(emqx_sn_registry).
-behaviour(gen_server).
-include("src/mqttsn/include/emqx_sn.hrl").
-define(LOG(Level, Format, Args),
emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)).
-export([ start_link/2
]).
-export([ register_topic/3
, unregister_topic/2
]).
-export([ lookup_topic/3
, lookup_topic_id/3
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([lookup_name/1]).
-define(SN_SHARD, emqx_sn_shard).
-record(state, {tabname, max_predef_topic_id = 0}).
-record(emqx_sn_registry, {key, value}).
%% Mnesia bootstrap
%-export([mnesia/1]).
%-boot_mnesia({mnesia, [boot]}).
%-copy_mnesia({mnesia, [copy]}).
%-rlog_shard({?SN_SHARD, ?TAB}).
%%% @doc Create or replicate tables.
%-spec(mnesia(boot | copy) -> ok).
%mnesia(boot) ->
% %% Optimize storage
% StoreProps = [{ets, [{read_concurrency, true}]}],
% ok = ekka_mnesia:create_table(?MODULE, [
% {attributes, record_info(fields, emqx_sn_registry)},
% {ram_copies, [node()]},
% {storage_properties, StoreProps}]);
%
%mnesia(copy) ->
% ok = ekka_mnesia:copy_table(?MODULE, ram_copies).
-type registry() :: {Tab :: atom(),
RegistryPid :: pid()}.
%%-----------------------------------------------------------------------------
-spec start_link(atom(), list())
-> ignore
| {ok, pid()}
| {error, Reason :: term()}.
start_link(InstaId, PredefTopics) ->
gen_server:start_link(?MODULE, [InstaId, PredefTopics], []).
-spec register_topic(registry(), emqx_types:clientid(), emqx_types:topic())
-> integer()
| {error, term()}.
register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) ->
case emqx_topic:wildcard(TopicName) of
false ->
gen_server:call(Pid, {register, ClientId, TopicName});
%% TopicId: in case of accepted the value that will be used as topic
%% id by the gateway when sending PUBLISH messages to the client (not
%% relevant in case of subscriptions to a short topic name or to a topic
%% name which contains wildcard characters)
true -> {error, wildcard_topic}
end.
-spec lookup_topic(registry(), emqx_types:clientid(), pos_integer())
-> undefined
| binary().
lookup_topic({Tab, _}, ClientId, TopicId) when is_integer(TopicId) ->
case lookup_element(Tab, {predef, TopicId}, 3) of
undefined ->
lookup_element(Tab, {ClientId, TopicId}, 3);
Topic -> Topic
end.
-spec lookup_topic_id(registry(), emqx_types:clientid(), emqx_types:topic())
-> undefined
| pos_integer()
| {predef, integer()}.
lookup_topic_id({Tab, _}, ClientId, TopicName) when is_binary(TopicName) ->
case lookup_element(Tab, {predef, TopicName}, 3) of
undefined ->
lookup_element(Tab, {ClientId, TopicName}, 3);
TopicId ->
{predef, TopicId}
end.
%% @private
lookup_element(Tab, Key, Pos) ->
try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end.
-spec unregister_topic(registry(), emqx_types:clientid()) -> ok.
unregister_topic({_, Pid}, ClientId) ->
gen_server:call(Pid, {unregister, ClientId}).
lookup_name(Pid) ->
gen_server:call(Pid, name).
%%-----------------------------------------------------------------------------
name(InstaId) ->
list_to_atom(lists:concat([emqx_sn_, InstaId, '_registry'])).
init([InstaId, PredefTopics]) ->
%% {predef, TopicId} -> TopicName
%% {predef, TopicName} -> TopicId
%% {ClientId, TopicId} -> TopicName
%% {ClientId, TopicName} -> TopicId
Tab = name(InstaId),
ok = ekka_mnesia:create_table(Tab, [
{ram_copies, [node()]},
{record_name, emqx_sn_registry},
{attributes, record_info(fields, emqx_sn_registry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]),
ok = ekka_mnesia:copy_table(Tab, ram_copies),
% FIXME:
%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
MaxPredefId = lists:foldl(
fun(#{id := TopicId, topic := TopicName}, AccId) ->
ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicId},
value = TopicName}
),
ekka_mnesia:dirty_write(Tab, #emqx_sn_registry{
key = {predef, TopicName},
value = TopicId}
),
if TopicId > AccId -> TopicId; true -> AccId end
end, 0, PredefTopics),
{ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}.
handle_call({register, ClientId, TopicName}, _From,
State = #state{tabname = Tab, max_predef_topic_id = PredefId}) ->
case lookup_topic_id({Tab, self()}, ClientId, TopicName) of
{predef, PredefTopicId} when is_integer(PredefTopicId) ->
{reply, PredefTopicId, State};
TopicId when is_integer(TopicId) ->
{reply, TopicId, State};
undefined ->
case next_topic_id(Tab, PredefId, ClientId) of
TopicId when TopicId >= 16#FFFF ->
{reply, {error, too_large}, State};
TopicId ->
Fun = fun() ->
mnesia:write(Tab, #emqx_sn_registry{
key = {ClientId, next_topic_id},
value = TopicId + 1}, write),
mnesia:write(Tab, #emqx_sn_registry{
key = {ClientId, TopicName},
value = TopicId}, write),
mnesia:write(Tab, #emqx_sn_registry{
key = {ClientId, TopicId},
value = TopicName}, write)
end,
case ekka_mnesia:transaction(?SN_SHARD, Fun) of
{atomic, ok} ->
{reply, TopicId, State};
{aborted, Error} ->
{reply, {error, Error}, State}
end
end
end;
handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) ->
Registry = mnesia:dirty_match_object(
Tab,
{emqx_sn_registry, {ClientId, '_'}, '_'}
),
lists:foreach(fun(R) ->
ekka_mnesia:dirty_delete_object(Tab, R)
end, Registry),
{reply, ok, State};
handle_call(name, _From, State = #state{tabname = Tab}) ->
{reply, {Tab, self()}, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected request: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%-----------------------------------------------------------------------------
next_topic_id(Tab, PredefId, ClientId) ->
case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of
[#emqx_sn_registry{value = Id}] -> Id;
[] -> PredefId + 1
end.

View File

@ -33,11 +33,9 @@ init([{_Ip, Port}, GwId, PredefTopics]) ->
type => worker,
modules => [emqx_sn_broadcast]},
Registry = #{id => emqx_sn_registry,
start => {emqx_sn_registry, start_link, [PredefTopics]},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => [emqx_sn_registry]},
start => {emqx_sn_registry, start_link, [PredefTopics]},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => [emqx_sn_registry]},
{ok, {{one_for_one, 10, 3600}, [Broadcast, Registry]}}.

View File

@ -71,13 +71,12 @@ on_insta_create(_Insta = #{ id := InstaId,
%% FIXME: Assign ctx to InstaState
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
%% @private
on_insta_update(NewInsta, OldInstace, GwInstaState = #{ctx := Ctx}, GwState) ->
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInstace, GwInstaState, GwState),
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
catch
Class : Reason : Stk ->
@ -100,15 +99,16 @@ on_insta_destroy(_Insta = #{ id := InstaId,
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
io:format("Start stomp ~s:~s listener on ~s successfully.~n",
[InstaId, Type, format(ListenOn)]),
[InstaId, Type, ListenOnStr]),
Pid;
{error, Reason} ->
io:format(standard_error,
"Failed to start stomp ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, format(ListenOn), Reason]),
[InstaId, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
@ -128,22 +128,16 @@ merge_default(Options) ->
[{tcp_options, ?TCP_OPTS} | Options]
end.
format(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> io:format("Stop stomp ~s:~s listener on ~s successfully.~n",
[InstaId, Type, format(ListenOn)]);
[InstaId, Type, ListenOnStr]);
{error, Reason} ->
io:format(standard_error,
"Failed to stop stomp ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, format(ListenOn), Reason]
[InstaId, Type, ListenOnStr, Reason]
)
end,
StopRet.

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_sn_frame, [ parse/1

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -59,23 +59,41 @@ all() ->
init_per_suite(Config) ->
logger:set_module_level(emqx_sn_gateway, debug),
emqx_ct_helpers:start_apps([emqx_sn], fun set_special_confs/1),
emqx_ct_helpers:start_apps([emqx_gateway], fun set_special_confs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_sn]).
emqx_ct_helpers:stop_apps([emqx_gateway]).
set_special_confs(emqx) ->
application:set_env(emqx, plugins_loaded_file,
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
set_special_confs(emqx_sn) ->
application:set_env(emqx_sn, enable_qos3, ?ENABLE_QOS3),
application:set_env(emqx_sn, enable_stats, true),
application:set_env(emqx_sn, username, <<"user1">>),
application:set_env(emqx_sn, password, <<"pw123">>),
application:set_env(emqx_sn, predefined,
[{?PREDEF_TOPIC_ID1, ?PREDEF_TOPIC_NAME1},
{?PREDEF_TOPIC_ID2, ?PREDEF_TOPIC_NAME2}]);
set_special_confs(emqx_gateway) ->
emqx_config:put(
[emqx_gateway],
#{ mqttsn =>
#{'1' =>
#{broadcast => true,
clientinfo_override =>
#{password => "pw123",
username => "user1"
},
enable_qos3 => true,
enable_stats => true,
gateway_id => 1,
idle_timeout => 30000,
listener =>
#{udp =>
#{'1' =>
#{acceptors => 8,active_n => 100,backlog => 1024,bind => 1884,
high_watermark => 1048576,max_conn_rate => 1000,
max_connections => 10240000,send_timeout => 15000,
send_timeout_close => true}}},
predefined =>
[#{id => ?PREDEF_TOPIC_ID1, topic => ?PREDEF_TOPIC_NAME1},
#{id => ?PREDEF_TOPIC_ID2, topic => ?PREDEF_TOPIC_NAME2}]}}
});
set_special_confs(_App) ->
ok.
@ -87,7 +105,7 @@ set_special_confs(_App) ->
%% Connect
t_connect(_) ->
SockName = {'mqttsn:udp', {{0,0,0,0}, 1884}},
SockName = {'mqttsn#1:udp', 1884},
?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
{ok, Socket} = gen_udp:open(0, [binary]),

View File

@ -0,0 +1,127 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sn_registry_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(REGISTRY, emqx_sn_registry).
-define(MAX_PREDEF_ID, 2).
-define(PREDEF_TOPICS, [#{id => 1, topic => <<"/predefined/topic/name/hello">>},
#{id => 2, topic => <<"/predefined/topic/name/nice">>}]).
-define(INSTA_ID, 'mqttsn#1').
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
application:ensure_all_started(ekka),
ekka_mnesia:start(),
Config.
end_per_suite(_Config) ->
application:stop(ekka),
ok.
init_per_testcase(_TestCase, Config) ->
{ok, Pid} = ?REGISTRY:start_link(?INSTA_ID, ?PREDEF_TOPICS),
{Tab, Pid} = ?REGISTRY:lookup_name(Pid),
[{reg, {Tab, Pid}} | Config].
end_per_testcase(_TestCase, Config) ->
{Tab, _Pid} = proplists:get_value(reg, Config),
ekka_mnesia:clear_table(Tab),
Config.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_register(Config) ->
Reg = proplists:get_value(reg, Config),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)),
?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)),
emqx_sn_registry:unregister_topic(Reg, <<"ClientId">>),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)).
t_register_case2(Config) ->
Reg = proplists:get_value(reg, Config),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic3">>)),
?REGISTRY:unregister_topic(Reg, <<"ClientId">>),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)).
t_reach_maximum(Config) ->
Reg = proplists:get_value(reg, Config),
register_a_lot(?MAX_PREDEF_ID+1, 16#ffff, Reg),
?assertEqual({error, too_large}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicABC">>)),
Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+1])),
Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+2])),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)),
?REGISTRY:unregister_topic(Reg, <<"ClientId">>),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)).
t_register_case4(Config) ->
Reg = proplists:get_value(reg, Config),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicA">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicB">>)),
?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicC">>)),
?REGISTRY:unregister_topic(Reg, <<"ClientId">>),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicD">>)).
t_deny_wildcard_topic(Config) ->
Reg = proplists:get_value(reg, Config),
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/TopicA/#">>)),
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/+/TopicB">>)).
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------
register_a_lot(Max, Max, _Reg) ->
ok;
register_a_lot(N, Max, Reg) when N < Max ->
Topic = iolist_to_binary(["Topic", integer_to_list(N)]),
?assertEqual(N, ?REGISTRY:register_topic(Reg, <<"ClientId">>, Topic)),
register_a_lot(N+1, Max, Reg).

View File

@ -16,7 +16,7 @@
-module(emqx_sn_proper_types).
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl").
-include_lib("proper/include/proper.hrl").
-compile({no_auto_import, [register/1]}).

View File

@ -16,7 +16,7 @@
-module(prop_emqx_sn_frame).
-include_lib("emqx_sn/include/emqx_sn.hrl").
-include_lib("src/mqttsn/include/emqx_sn.hrl").
-include_lib("proper/include/proper.hrl").
-compile({no_auto_import, [register/1]}).

View File

@ -1,40 +0,0 @@
.eunit
deps
*.o
*.beam
*.plt
erl_crash.dump
ebin
rel/example_project
.concrete/DEV_MODE
.rebar
_rel/
emqx_sn.d
logs/
.erlang.mk/
data/
.idea/
*.iml
*.d
_build/
.rebar3
rebar3.crashdump
.DS_Store
bbmustache/
etc/gen.emqx.conf
cuttlefish
rebar.lock
xrefr
intergration_test/emqx-rel/
intergration_test/paho.mqtt-sn.embedded-c/
intergration_test/client/*.exe
intergration_test/client/*.txt
.DS_Store
cover/
ct.coverdata
eunit.coverdata
test/ct.cover.spec
erlang.mk
etc/emqx_sn.conf.rendered
.rebar3/
*.swp

View File

@ -1,126 +0,0 @@
-module(simple_example).
-include("emqx_sn.hrl").
-define(HOST, {127,0,0,1}).
-define(PORT, 1884).
-export([start/0]).
start() ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
%% create udp socket
{ok, Socket} = gen_udp:open(0, [binary]),
%% connect to emqx_sn broker
Packet = gen_connect_packet(<<"client1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
io:format("send connect packet=~p~n", [Packet]),
%% receive message
wait_response(),
%% register topic_id
RegisterPacket = gen_register_packet(<<"TopicA">>, 0),
ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket),
io:format("send register packet=~p~n", [RegisterPacket]),
TopicId = wait_response(),
%% subscribe
SubscribePacket = gen_subscribe_packet(TopicId),
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
io:format("send subscribe packet=~p~n", [SubscribePacket]),
wait_response(),
%% publish
PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
io:format("send publish packet=~p~n", [PublishPacket]),
wait_response(),
% wait for subscribed message from broker
wait_response(),
%% disconnect from emqx_sn broker
DisConnectPacket = gen_disconnect_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
io:format("send disconnect packet=~p~n", [DisConnectPacket]).
gen_connect_packet(ClientId) ->
Length = 6+byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
ProtocolId = 1,
Duration = 10,
<<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
gen_subscribe_packet(TopicId) ->
Length = 7,
MsgType = ?SN_SUBSCRIBE,
Dup = 0,
Retain = 0,
Will = 0,
QoS = 1,
CleanSession = 0,
TopicIdType = 1,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
MsgId = 1,
<<Length:8, MsgType:8, Flag/binary, MsgId:16, TopicId:16>>.
gen_register_packet(Topic, TopicId) ->
Length = 6+byte_size(Topic),
MsgType = ?SN_REGISTER,
MsgId = 1,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
gen_publish_packet(TopicId, Payload) ->
Length = 7+byte_size(Payload),
MsgType = ?SN_PUBLISH,
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicIdType = 1,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
<<Length:8, MsgType:8, Flag/binary, TopicId:16, MsgId:16, Payload/binary>>.
gen_disconnect_packet()->
Length = 2,
MsgType = ?SN_DISCONNECT,
<<Length:8, MsgType:8>>.
wait_response() ->
receive
{udp, _Socket, _, _, Bin} ->
case Bin of
<<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]);
<<_Len:8, ?SN_CONNACK, 0:8>> ->
io:format("recv connect ack~n");
<<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
TopicId;
<<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
<<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
_ ->
io:format("ignore bin=~p~n", [Bin])
end;
Any ->
io:format("recv something else from udp socket ~p~n", [Any])
after
2000 ->
io:format("Error: receive timeout!~n"),
wait_response()
end.

View File

@ -1,120 +0,0 @@
-module(simple_example2).
-include("emqx_sn.hrl").
-define(HOST, "localhost").
-define(PORT, 1884).
-export([start/0]).
start() ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
%% create udp socket
{ok, Socket} = gen_udp:open(0, [binary]),
%% connect to emqx_sn broker
Packet = gen_connect_packet(<<"client1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
io:format("send connect packet=~p~n", [Packet]),
%% receive message
wait_response(),
%% subscribe, SHORT TOPIC NAME
SubscribePacket = gen_subscribe_packet(<<"T1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
io:format("send subscribe packet=~p~n", [SubscribePacket]),
wait_response(),
%% publish, SHORT TOPIC NAME
PublishPacket = gen_publish_packet(<<"T1">>, <<"Payload...">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
io:format("send publish packet=~p~n", [PublishPacket]),
wait_response(),
% wait for subscribed message from broker
wait_response(),
%% disconnect from emqx_sn broker
DisConnectPacket = gen_disconnect_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
io:format("send disconnect packet=~p~n", [DisConnectPacket]).
gen_connect_packet(ClientId) ->
Length = 6+byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
ProtocolId = 1,
Duration = 10,
<<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
gen_subscribe_packet(ShortTopic) ->
Length = 7,
MsgType = ?SN_SUBSCRIBE,
Dup = 0,
Retain = 0,
Will = 0,
QoS = 1,
CleanSession = 0,
TopicIdType = 2, % SHORT TOPIC NAME
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
MsgId = 1,
<<Length:8, MsgType:8, Flag/binary, MsgId:16, ShortTopic/binary>>.
gen_register_packet(Topic, TopicId) ->
Length = 6+byte_size(Topic),
MsgType = ?SN_REGISTER,
MsgId = 1,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
gen_publish_packet(ShortTopic, Payload) ->
Length = 7+byte_size(Payload),
MsgType = ?SN_PUBLISH,
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicIdType = 2, % SHORT TOPIC NAME
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
<<Length:8, MsgType:8, Flag/binary, ShortTopic/binary, MsgId:16, Payload/binary>>.
gen_disconnect_packet()->
Length = 2,
MsgType = ?SN_DISCONNECT,
<<Length:8, MsgType:8>>.
wait_response() ->
receive
{udp, _Socket, _, _, Bin} ->
case Bin of
<<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]);
<<_Len:8, ?SN_CONNACK, 0:8>> ->
io:format("recv connect ack~n");
<<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
TopicId;
<<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
<<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
_ ->
io:format("ignore bin=~p~n", [Bin])
end;
Any ->
io:format("recv something else from udp socket ~p~n", [Any])
after
2000 ->
io:format("Error: receive timeout!~n"),
wait_response()
end.

View File

@ -1,120 +0,0 @@
-module(simple_example3).
-include("emqx_sn.hrl").
-define(HOST, "localhost").
-define(PORT, 1884).
-export([start/0]).
start() ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
%% create udp socket
{ok, Socket} = gen_udp:open(0, [binary]),
%% connect to emqx_sn broker
Packet = gen_connect_packet(<<"client1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
io:format("send connect packet=~p~n", [Packet]),
%% receive message
wait_response(),
%% subscribe normal topic name
SubscribePacket = gen_subscribe_packet(<<"T3">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
io:format("send subscribe packet=~p~n", [SubscribePacket]),
wait_response(),
%% publish SHORT TOPIC NAME
PublishPacket = gen_publish_packet(<<"T3">>, <<"Payload...">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
io:format("send publish packet=~p~n", [PublishPacket]),
wait_response(),
% wait for subscribed message from broker
wait_response(),
%% disconnect from emqx_sn broker
DisConnectPacket = gen_disconnect_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
io:format("send disconnect packet=~p~n", [DisConnectPacket]).
gen_connect_packet(ClientId) ->
Length = 6+byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
ProtocolId = 1,
Duration = 10,
<<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
gen_subscribe_packet(ShortTopic) ->
Length = 7,
MsgType = ?SN_SUBSCRIBE,
Dup = 0,
Retain = 0,
Will = 0,
QoS = 1,
CleanSession = 0,
TopicIdType = 0, % normal topic name
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
MsgId = 1,
<<Length:8, MsgType:8, Flag/binary, MsgId:16, ShortTopic/binary>>.
gen_register_packet(Topic, TopicId) ->
Length = 6+byte_size(Topic),
MsgType = ?SN_REGISTER,
MsgId = 1,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
gen_publish_packet(ShortTopic, Payload) ->
Length = 7+byte_size(Payload),
MsgType = ?SN_PUBLISH,
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicIdType = 2, % SHORT TOPIC NAME
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
<<Length:8, MsgType:8, Flag/binary, ShortTopic/binary, MsgId:16, Payload/binary>>.
gen_disconnect_packet()->
Length = 2,
MsgType = ?SN_DISCONNECT,
<<Length:8, MsgType:8>>.
wait_response() ->
receive
{udp, _Socket, _, _, Bin} ->
case Bin of
<<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]);
<<_Len:8, ?SN_CONNACK, 0:8>> ->
io:format("recv connect ack~n");
<<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
TopicId;
<<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
<<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
_ ->
io:format("ignore bin=~p~n", [Bin])
end;
Any ->
io:format("recv something else from udp socket ~p~n", [Any])
after
2000 ->
io:format("Error: receive timeout!~n"),
wait_response()
end.

View File

@ -1,151 +0,0 @@
-module(simple_example4).
-include("emqx_sn.hrl").
-define(HOST, {127,0,0,1}).
-define(PORT, 1884).
-export([start/0]).
start(LoopTimes) ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
%% create udp socket
{ok, Socket} = gen_udp:open(0, [binary]),
%% connect to emqx_sn broker
Packet = gen_connect_packet(<<"client1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
io:format("send connect packet=~p~n", [Packet]),
%% receive message
wait_response(),
%% register topic_id
RegisterPacket = gen_register_packet(<<"TopicA">>, 0),
ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket),
io:format("send register packet=~p~n", [RegisterPacket]),
TopicId = wait_response(),
%% subscribe
SubscribePacket = gen_subscribe_packet(TopicId),
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
io:format("send subscribe packet=~p~n", [SubscribePacket]),
wait_response(),
%% loop publish
[begin
timer:sleep(1000),
io:format("~n-------------------- publish ~p start --------------------~n", [N]),
PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
io:format("send publish packet=~p~n", [PublishPacket]),
% wait for publish ack
wait_response(),
% wait for subscribed message from broker
wait_response(),
PingReqPacket = gen_pingreq_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket),
% wait for pingresp
wait_response(),
io:format("--------------------- publish ~p end ---------------------~n", [N])
end || N <- lists:seq(1, LoopTimes)],
%% disconnect from emqx_sn broker
DisConnectPacket = gen_disconnect_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
io:format("send disconnect packet=~p~n", [DisConnectPacket]).
gen_connect_packet(ClientId) ->
Length = 6+byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
ProtocolId = 1,
Duration = 10,
<<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
gen_subscribe_packet(TopicId) ->
Length = 7,
MsgType = ?SN_SUBSCRIBE,
Dup = 0,
Retain = 0,
Will = 0,
QoS = 1,
CleanSession = 0,
TopicIdType = 1,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
MsgId = 1,
<<Length:8, MsgType:8, Flag/binary, MsgId:16, TopicId:16>>.
gen_register_packet(Topic, TopicId) ->
Length = 6+byte_size(Topic),
MsgType = ?SN_REGISTER,
MsgId = 1,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
gen_publish_packet(TopicId, Payload) ->
Length = 7+byte_size(Payload),
MsgType = ?SN_PUBLISH,
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicIdType = 1,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
<<Length:8, MsgType:8, Flag/binary, TopicId:16, MsgId:16, Payload/binary>>.
gen_puback_packet(TopicId, MsgId) ->
Length = 7,
MsgType = ?SN_PUBACK,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, 0:8>>.
gen_pingreq_packet() ->
Length = 2,
MsgType = ?SN_PINGREQ,
<<Length:8, MsgType:8>>.
gen_disconnect_packet()->
Length = 2,
MsgType = ?SN_DISCONNECT,
<<Length:8, MsgType:8>>.
wait_response() ->
receive
{udp, Socket, _, _, Bin} ->
case Bin of
<<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]),
ok = gen_udp:send(Socket, ?HOST, ?PORT, gen_puback_packet(TopicId, MsgId));
<<_Len:8, ?SN_CONNACK, 0:8>> ->
io:format("recv connect ack~n");
<<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
TopicId;
<<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
<<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
<<_Len:8, ?SN_PINGRESP>> ->
io:format("recv pingresp~n");
_ ->
io:format("ignore bin=~p~n", [Bin])
end;
Any ->
io:format("recv something else from udp socket ~p~n", [Any])
after
2000 ->
io:format("Error: receive timeout!~n"),
wait_response()
end.

View File

@ -1,26 +0,0 @@
{deps, []}.
{plugins, [rebar3_proper]}.
{deps,
[{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{dialyzer, [{warnings, [unmatched_returns, error_handling, race_conditions]}
]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{plugins, [coveralls]}.

View File

@ -1,14 +0,0 @@
{application, emqx_sn,
[{description, "EMQ X MQTT-SN Plugin"},
{vsn, "4.4.0"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,esockd]},
{mod, {emqx_sn_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-sn"}
]}
]}.

View File

@ -1,19 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.2", [
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
]},
{<<"4.3.[0-1]">>, [
{restart_application, emqx_sn}
]}
],
[
{"4.3.2", [
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
]},
{<<"4.3.[0-1]">>, [
{restart_application, emqx_sn}
]}
]
}.

View File

@ -1,202 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sn_registry).
-behaviour(gen_server).
-include("emqx_sn.hrl").
-define(LOG(Level, Format, Args),
emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)).
-export([ start_link/1
, stop/0
]).
-export([ register_topic/2
, unregister_topic/1
]).
-export([ lookup_topic/2
, lookup_topic_id/2
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(SN_SHARD, emqx_sn_shard).
-define(TAB, ?MODULE).
-record(state, {max_predef_topic_id = 0}).
-record(emqx_sn_registry, {key, value}).
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-rlog_shard({?SN_SHARD, ?TAB}).
%% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
%% Optimize storage
StoreProps = [{ets, [{read_concurrency, true}]}],
ok = ekka_mnesia:create_table(?MODULE, [
{attributes, record_info(fields, emqx_sn_registry)},
{ram_copies, [node()]},
{storage_properties, StoreProps}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?MODULE, ram_copies).
%%-----------------------------------------------------------------------------
-spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}).
start_link(PredefTopics) ->
ekka_rlog:wait_for_shards([?SN_SHARD], infinity),
gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []).
-spec(stop() -> ok).
stop() ->
gen_server:stop(?MODULE, normal, infinity).
-spec(register_topic(binary(), binary()) -> integer() | {error, term()}).
register_topic(ClientId, TopicName) when is_binary(TopicName) ->
case emqx_topic:wildcard(TopicName) of
false ->
gen_server:call(?MODULE, {register, ClientId, TopicName});
%% TopicId: in case of accepted the value that will be used as topic
%% id by the gateway when sending PUBLISH messages to the client (not
%% relevant in case of subscriptions to a short topic name or to a topic
%% name which contains wildcard characters)
true -> {error, wildcard_topic}
end.
-spec(lookup_topic(binary(), pos_integer()) -> undefined | binary()).
lookup_topic(ClientId, TopicId) when is_integer(TopicId) ->
case lookup_element(?TAB, {predef, TopicId}, 3) of
undefined ->
lookup_element(?TAB, {ClientId, TopicId}, 3);
Topic -> Topic
end.
-spec(lookup_topic_id(binary(), binary())
-> undefined
| pos_integer()
| {predef, integer()}).
lookup_topic_id(ClientId, TopicName) when is_binary(TopicName) ->
case lookup_element(?TAB, {predef, TopicName}, 3) of
undefined ->
lookup_element(?TAB, {ClientId, TopicName}, 3);
TopicId ->
{predef, TopicId}
end.
%% @private
lookup_element(Tab, Key, Pos) ->
try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end.
-spec(unregister_topic(binary()) -> ok).
unregister_topic(ClientId) ->
gen_server:call(?MODULE, {unregister, ClientId}).
%%-----------------------------------------------------------------------------
init([PredefTopics]) ->
%% {predef, TopicId} -> TopicName
%% {predef, TopicName} -> TopicId
%% {ClientId, TopicId} -> TopicName
%% {ClientId, TopicName} -> TopicId
MaxPredefId = lists:foldl(
fun({TopicId, TopicName}, AccId) ->
ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId},
value = TopicName}),
ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName},
value = TopicId}),
if TopicId > AccId -> TopicId; true -> AccId end
end, 0, PredefTopics),
{ok, #state{max_predef_topic_id = MaxPredefId}}.
handle_call({register, ClientId, TopicName}, _From,
State = #state{max_predef_topic_id = PredefId}) ->
case lookup_topic_id(ClientId, TopicName) of
{predef, PredefTopicId} when is_integer(PredefTopicId) ->
{reply, PredefTopicId, State};
TopicId when is_integer(TopicId) ->
{reply, TopicId, State};
undefined ->
case next_topic_id(?TAB, PredefId, ClientId) of
TopicId when TopicId >= 16#FFFF ->
{reply, {error, too_large}, State};
TopicId ->
Fun = fun() ->
mnesia:write(#emqx_sn_registry{key = {ClientId, next_topic_id},
value = TopicId + 1}),
mnesia:write(#emqx_sn_registry{key = {ClientId, TopicName},
value = TopicId}),
mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId},
value = TopicName})
end,
case ekka_mnesia:transaction(?SN_SHARD, Fun) of
{atomic, ok} ->
{reply, TopicId, State};
{aborted, Error} ->
{reply, {error, Error}, State}
end
end
end;
handle_call({unregister, ClientId}, _From, State) ->
Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}),
lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Registry),
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected request: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%-----------------------------------------------------------------------------
next_topic_id(Tab, PredefId, ClientId) ->
case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of
[#emqx_sn_registry{value = Id}] -> Id;
[] -> PredefId + 1
end.

View File

@ -1,121 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sn_registry_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(REGISTRY, emqx_sn_registry).
-define(MAX_PREDEF_ID, 2).
-define(PREDEF_TOPICS, [{1, <<"/predefined/topic/name/hello">>},
{2, <<"/predefined/topic/name/nice">>}]).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
_ = application:set_env(emqx_sn, predefined, ?PREDEF_TOPICS),
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
application:set_env(ekka, strict_mode, true),
ekka_mnesia:start(),
emqx_sn_registry:mnesia(boot),
ekka_mnesia:clear_table(emqx_sn_registry),
PredefTopics = application:get_env(emqx_sn, predefined, []),
{ok, _Pid} = ?REGISTRY:start_link(PredefTopics),
Config.
end_per_testcase(_TestCase, Config) ->
?REGISTRY:stop(),
Config.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_register(_Config) ->
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)),
?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)),
emqx_sn_registry:unregister_topic(<<"ClientId">>),
?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)).
t_register_case2(_Config) ->
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)),
?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic3">>)),
?REGISTRY:unregister_topic(<<"ClientId">>),
?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)).
t_reach_maximum(_Config) ->
register_a_lot(?MAX_PREDEF_ID+1, 16#ffff),
?assertEqual({error, too_large}, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicABC">>)),
Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+1])),
Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+2])),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)),
?REGISTRY:unregister_topic(<<"ClientId">>),
?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)).
t_register_case4(_Config) ->
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicA">>)),
?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicB">>)),
?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicC">>)),
?REGISTRY:unregister_topic(<<"ClientId">>),
?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicD">>)).
t_deny_wildcard_topic(_Config) ->
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)),
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)).
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------
register_a_lot(Max, Max) ->
ok;
register_a_lot(N, Max) when N < Max ->
Topic = iolist_to_binary(["Topic", integer_to_list(N)]),
?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)),
register_a_lot(N+1, Max).

View File

@ -1,8 +0,0 @@
%% vars here are for test only, not intended for release
{platform_bin_dir, "bin"}.
{platform_data_dir, "data"}.
{platform_etc_dir, "etc"}.
{platform_lib_dir, "lib"}.
{platform_log_dir, "log"}.
{platform_plugins_dir, "data/plugins"}.