refactor(gw): move mqtt-sn to gateway

This commit is contained in:
JianBo He 2021-07-06 19:14:35 +08:00
parent fe779925c5
commit d2430e70a8
54 changed files with 305 additions and 680 deletions

View File

@ -27,4 +27,47 @@ emqx_gateway: {
active_n: 100
}
}
mqttsn.1: {
## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id: 1
## Enable broadcast this gateway to WLAN
broadcast: true
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats: true
## To control whether accept and process the received
## publish message with qos=-1.
enable_qos3: true
## Idle timeout for a MQTT-SN channel
idle_timeout: 30s
## The pre-defined topic name corresponding to the pre-defined topic
## id of N.
## Note that the pre-defined topic id of 0 is reserved.
predefined: [
{ id: 1
topic: "/predefined/topic/name/hello"
},
{ id: 2
topic: "/predefined/topic/name/nice"
}
]
### ClientInfo override
clientinfo_override: {
username: "mqtt_sn_user"
password: "abc"
}
listener.udp.1: {
bind: 1884
max_connections: 10240000
max_conn_rate: 1000
}
}
}

View File

@ -31,6 +31,7 @@
)
-> {error, reason()}
| {ok, [GwInstaPid :: pid()], GwInstaState :: state()}
%% TODO: v0.2 The child spec is better for restarting child process
| {ok, [Childspec :: supervisor:child_spec()], GwInstaState :: state()}.
%% @doc

View File

@ -45,7 +45,7 @@ load_default_gateway_applications() ->
gateway_type_searching() ->
%% FIXME: Hardcoded apps
[emqx_stomp_impl].
[emqx_stomp_impl, emqx_sn_impl].
load(Mod) ->
try
@ -65,7 +65,7 @@ create_gateway_by_default([]) ->
create_gateway_by_default([{Type, Name, Confs}|More]) ->
case emqx_gateway_registry:lookup(Type) of
undefined ->
?LOG(error, "Skip to start ~p#~p: not_registred_type",
?LOG(error, "Skip to start ~s#~s: not_registred_type",
[Type, Name]);
_ ->
case emqx_gateway:create(Type,
@ -73,9 +73,9 @@ create_gateway_by_default([{Type, Name, Confs}|More]) ->
<<>>,
Confs) of
{ok, _} ->
?LOG(debug, "Start ~p#~p successfully!", [Type, Name]);
?LOG(debug, "Start ~s#~s successfully!", [Type, Name]);
{error, Reason} ->
?LOG(error, "Start ~p#~p failed: ~0p",
?LOG(error, "Start ~s#~s failed: ~0p",
[Type, Name, Reason])
end
end,

View File

@ -54,7 +54,6 @@
start_link(Insta, Ctx, GwDscrptr) ->
gen_server:start_link(
{local, ?MODULE},
?MODULE,
[Insta, Ctx, GwDscrptr],
[]

View File

@ -49,7 +49,7 @@
%%--------------------------------------------------------------------
start_link(Type) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Type], []).
gen_server:start_link(?MODULE, [Type], []).
-spec inc(gateway_type(), atom()) -> ok.
inc(Type, Name) ->

View File

@ -35,7 +35,9 @@
structs() -> ["emqx_gateway"].
fields("emqx_gateway") ->
[{stomp, t(ref(stomp))}];
[{stomp, t(ref(stomp))},
{mqttsn, t(ref(mqttsn))}
];
fields(stomp) ->
[{"$id", t(ref(stomp_structs))}];
@ -44,7 +46,7 @@ fields(stomp_structs) ->
[ {frame, t(ref(stomp_frame))}
, {clientinfo_override, t(ref(clientinfo_override))}
, {authenticator, t(union([allow_anonymous]))}
, {listener, t(ref(listener))}
, {listener, t(ref(tcp_listener_group))}
];
fields(stomp_frame) ->
@ -53,13 +55,38 @@ fields(stomp_frame) ->
, {max_body_length, t(integer(), undefined, 8192)}
];
fields(mqttsn) ->
[{"$id", t(ref(mqttsn_structs))}];
fields(mqttsn_structs) ->
[ {gateway_id, t(integer())}
, {broadcast, t(boolean())}
, {enable_stats, t(boolean())}
, {enable_qos3, t(boolean())}
, {idle_timeout, t(duration())}
, {predefined, hoconsc:array(ref(mqttsn_predefined))}
, {clientinfo_override, t(ref(clientinfo_override))}
, {listener, t(ref(udp_listener_group))}
];
fields(mqttsn_predefined) ->
%% FIXME: How to check the $id is a integer ???
[ {id, t(integer())}
, {topic, t(string())}
];
fields(clientinfo_override) ->
[ {username, t(string())}
, {password, t(string())}
, {clientid, t(string())}
];
fields(listener) ->
fields(udp_listener_group) ->
[ {udp, t(ref(udp_listener))}
, {dtls, t(ref(dtls_listener))}
];
fields(tcp_listener_group) ->
[ {tcp, t(ref(tcp_listener))}
, {ssl, t(ref(ssl_listener))}
];
@ -70,7 +97,14 @@ fields(tcp_listener) ->
fields(ssl_listener) ->
[ {"$name", t(ref(ssl_listener_settings))}];
fields(udp_listener) ->
[ {"$name", t(ref(udp_listener_settings))}];
fields(dtls_listener) ->
[ {"$name", t(ref(dtls_listener_settings))}];
fields(listener_settings) ->
% FIXME:
%[ {"bind", t(union(ip_port(), integer()))}
[ {bind, t(integer())}
, {acceptors, t(integer(), undefined, 8)}
@ -107,6 +141,19 @@ fields(ssl_listener_settings) ->
, depth => 10
, reuse_sessions => true}) ++ fields(listener_settings);
fields(udp_listener_settings) ->
[
%% some special confs for udp listener
] ++ fields(listener_settings);
fields(dtls_listener_settings) ->
[
%% some special confs for dtls listener
] ++
ssl(undefined, #{handshake_timeout => "15s"
, depth => 10
, reuse_sessions => true}) ++ fields(listener_settings);
fields(access) ->
[ {"$id", #{type => string(),
nullable => true}}];

View File

@ -25,6 +25,7 @@
]).
-export([ apply/2
, format_listenon/1
]).
-export([ normalize_rawconf/1
@ -89,6 +90,13 @@ apply(F, A2) when is_function(F),
is_list(A2) ->
erlang:apply(F, A2).
format_listenon(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format_listenon({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format_listenon({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
-type listener() :: #{}.
-type rawconf() ::

View File

@ -1,10 +1,9 @@
emqx-sn
=======
# MQTT-SN Gateway
EMQ X MQTT-SN Gateway.
Configure Plugin
----------------
## Configure Plugin
File: etc/emqx_sn.conf
@ -72,8 +71,7 @@ mqtt.sn.password = abc
- mqtt.sn.password
* This parameter is optional. Pair with username above.
Load Plugin
-----------
## Load Plugin
```
./bin/emqx_ctl plugins load emqx_sn
@ -95,23 +93,18 @@ Load Plugin
- https://github.com/njh/mqtt-sn-tools
- https://github.com/arobenko/mqtt-sn
sleeping device
-----------
### sleeping device
PINGREQ must have a ClientId which is identical to the one in CONNECT message. Without ClientId, emqx-sn will ignore such PINGREQ.
pre-defined topics
-----------
### pre-defined topics
The mapping of a pre-defined topic id and topic name should be known inadvance by both client's application and gateway. We define this mapping info in emqx_sn.conf file, and which shall be kept equivalent in all client's side.
License
-------
## License
Apache License Version 2.0
Author
------
EMQ X-Men Team.
## Author
EMQ X Team.

View File

@ -18,7 +18,7 @@
-behaviour(gen_server).
-include("emqx_sn.hrl").
-include("src/mqttsn/include/emqx_sn.hrl").
-export([ start_link/2
, stop/0

View File

@ -17,7 +17,7 @@
-module(emqx_sn_frame).
-include("emqx_sn.hrl").
-include("src/mqttsn/include/emqx_sn.hrl").
-export([ parse/1
, serialize/1

View File

@ -18,7 +18,7 @@
-behaviour(gen_statem).
-include("emqx_sn.hrl").
-include("src/mqttsn/include/emqx_sn.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
@ -915,7 +915,8 @@ handle_unsubscribe(_, _TopicId, MsgId, State) ->
{keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}.
do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
%% XXX: Handle normal topic id as predefined topic id, to be compatible with paho mqtt-sn library
%% XXX: Handle normal topic id as predefined topic id, to be
%% compatible with paho mqtt-sn library
<<TopicId:16>> = TopicName,
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State);
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
@ -972,8 +973,11 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
undefined -> {keep_state, State};
TopicName ->
%%notice that this TopicName maybe normal or predefined,
%% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels
{keep_state, send_register(TopicName, TopicId, MsgId, State)}
%% involving the predefined topic name in register to
%% enhance the gateway's robustness even inconsistent
%% with MQTT-SN channels
{keep_state, send_register(TopicName, TopicId,
MsgId, State)}
end;
_ ->
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
@ -1070,8 +1074,9 @@ handle_outgoing(Packet, State) ->
send_message(mqtt2sn(Packet, State), State).
cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) ->
?LOG(debug, "cache non-registered publish message for topic-id: ~p, msg: ~0p, pendings: ~0p",
[TopicId, PubPkt, Pendings]),
?LOG(debug, "cache non-registered publish message "
"for topic-id: ~p, msg: ~0p, pendings: ~0p",
[TopicId, PubPkt, Pendings]),
Msgs = maps:get(pending_topic_ids, Pendings, []),
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.

View File

@ -0,0 +1,161 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The MQTT-SN Gateway Implement interface
-module(emqx_sn_impl).
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
]).
-export([]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
]).
-define(UDP_SOCKOPTS, []).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
load() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
YourOptions = [params1, params2],
emqx_gateway_registry:load(mqttsn, RegistryOptions, YourOptions).
unload() ->
emqx_gateway_registry:unload(mqttsn).
init(_) ->
GwState = #{},
{ok, GwState}.
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_insta_create(_Insta = #{ id := InstaId,
rawconf := RawConf
}, Ctx, _GwState) ->
%% We Also need to start `emqx_sn_broadcast` &
%% `emqx_sn_registry` process
SnGwId = maps:get(gateway_id, RawConf),
case maps:get(broadcast, RawConf) of
false ->
ok;
true ->
%% FIXME:
Port = 1884,
_ = emqx_sn_broadcast:start_link(SnGwId, Port)
end,
PredefTopics = maps:get(predefined, RawConf),
{ok, RegistrySvr} = emqx_sn_registry:start_link(PredefTopics),
NRawConf = maps:without(
[gateway_id, broadcast, predefined],
RawConf#{registry => RegistrySvr}
),
Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
catch
Class : Reason : Stk ->
logger:error("Failed to update stomp instance ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := RawConf
}, _GwInstaState, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
io:format("Start mqttsn ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
Pid;
{error, Reason} ->
io:format(standard_error,
"Failed to start mqttsn ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, Type),
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
{emqx_sn_gateway, start_link, [Cfg#{ctx => Ctx}]}).
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
merge_default(Options) ->
case lists:keytake(udp_options, 1, Options) of
{value, {udp_options, TcpOpts}, Options1} ->
[{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} | Options1];
false ->
[{udp_options, ?UDP_SOCKOPTS} | Options]
end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> io:format("Stop mqttsn ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
{error, Reason} ->
io:format(standard_error,
"Failed to stop mqttsn ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]
)
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
esockd:close(Name, ListenOn).

View File

@ -18,7 +18,7 @@
-behaviour(gen_server).
-include("emqx_sn.hrl").
-include("src/mqttsn/include/emqx_sn.hrl").
-define(LOG(Level, Format, Args),
emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)).
@ -132,7 +132,7 @@ init([PredefTopics]) ->
%% {ClientId, TopicId} -> TopicName
%% {ClientId, TopicName} -> TopicId
MaxPredefId = lists:foldl(
fun({TopicId, TopicName}, AccId) ->
fun(#{id := TopicId, topic := TopicName}, AccId) ->
ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId},
value = TopicName}),
ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName},

View File

@ -33,11 +33,9 @@ init([{_Ip, Port}, GwId, PredefTopics]) ->
type => worker,
modules => [emqx_sn_broadcast]},
Registry = #{id => emqx_sn_registry,
start => {emqx_sn_registry, start_link, [PredefTopics]},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => [emqx_sn_registry]},
start => {emqx_sn_registry, start_link, [PredefTopics]},
restart => permanent,
shutdown => brutal_kill,
type => worker,
modules => [emqx_sn_registry]},
{ok, {{one_for_one, 10, 3600}, [Broadcast, Registry]}}.

View File

@ -71,13 +71,12 @@ on_insta_create(_Insta = #{ id := InstaId,
%% FIXME: Assign ctx to InstaState
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
%% @private
on_insta_update(NewInsta, OldInstace, GwInstaState = #{ctx := Ctx}, GwState) ->
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInstace, GwInstaState, GwState),
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
catch
Class : Reason : Stk ->
@ -100,15 +99,16 @@ on_insta_destroy(_Insta = #{ id := InstaId,
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
io:format("Start stomp ~s:~s listener on ~s successfully.~n",
[InstaId, Type, format(ListenOn)]),
[InstaId, Type, ListenOnStr]),
Pid;
{error, Reason} ->
io:format(standard_error,
"Failed to start stomp ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, format(ListenOn), Reason]),
[InstaId, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
@ -128,22 +128,16 @@ merge_default(Options) ->
[{tcp_options, ?TCP_OPTS} | Options]
end.
format(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> io:format("Stop stomp ~s:~s listener on ~s successfully.~n",
[InstaId, Type, format(ListenOn)]);
[InstaId, Type, ListenOnStr]);
{error, Reason} ->
io:format(standard_error,
"Failed to stop stomp ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, format(ListenOn), Reason]
[InstaId, Type, ListenOnStr, Reason]
)
end,
StopRet.

View File

@ -1,40 +0,0 @@
.eunit
deps
*.o
*.beam
*.plt
erl_crash.dump
ebin
rel/example_project
.concrete/DEV_MODE
.rebar
_rel/
emqx_sn.d
logs/
.erlang.mk/
data/
.idea/
*.iml
*.d
_build/
.rebar3
rebar3.crashdump
.DS_Store
bbmustache/
etc/gen.emqx.conf
cuttlefish
rebar.lock
xrefr
intergration_test/emqx-rel/
intergration_test/paho.mqtt-sn.embedded-c/
intergration_test/client/*.exe
intergration_test/client/*.txt
.DS_Store
cover/
ct.coverdata
eunit.coverdata
test/ct.cover.spec
erlang.mk
etc/emqx_sn.conf.rendered
.rebar3/
*.swp

View File

@ -1,126 +0,0 @@
-module(simple_example).
-include("emqx_sn.hrl").
-define(HOST, {127,0,0,1}).
-define(PORT, 1884).
-export([start/0]).
start() ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
%% create udp socket
{ok, Socket} = gen_udp:open(0, [binary]),
%% connect to emqx_sn broker
Packet = gen_connect_packet(<<"client1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
io:format("send connect packet=~p~n", [Packet]),
%% receive message
wait_response(),
%% register topic_id
RegisterPacket = gen_register_packet(<<"TopicA">>, 0),
ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket),
io:format("send register packet=~p~n", [RegisterPacket]),
TopicId = wait_response(),
%% subscribe
SubscribePacket = gen_subscribe_packet(TopicId),
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
io:format("send subscribe packet=~p~n", [SubscribePacket]),
wait_response(),
%% publish
PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
io:format("send publish packet=~p~n", [PublishPacket]),
wait_response(),
% wait for subscribed message from broker
wait_response(),
%% disconnect from emqx_sn broker
DisConnectPacket = gen_disconnect_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
io:format("send disconnect packet=~p~n", [DisConnectPacket]).
gen_connect_packet(ClientId) ->
Length = 6+byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
ProtocolId = 1,
Duration = 10,
<<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
gen_subscribe_packet(TopicId) ->
Length = 7,
MsgType = ?SN_SUBSCRIBE,
Dup = 0,
Retain = 0,
Will = 0,
QoS = 1,
CleanSession = 0,
TopicIdType = 1,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
MsgId = 1,
<<Length:8, MsgType:8, Flag/binary, MsgId:16, TopicId:16>>.
gen_register_packet(Topic, TopicId) ->
Length = 6+byte_size(Topic),
MsgType = ?SN_REGISTER,
MsgId = 1,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
gen_publish_packet(TopicId, Payload) ->
Length = 7+byte_size(Payload),
MsgType = ?SN_PUBLISH,
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicIdType = 1,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
<<Length:8, MsgType:8, Flag/binary, TopicId:16, MsgId:16, Payload/binary>>.
gen_disconnect_packet()->
Length = 2,
MsgType = ?SN_DISCONNECT,
<<Length:8, MsgType:8>>.
wait_response() ->
receive
{udp, _Socket, _, _, Bin} ->
case Bin of
<<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]);
<<_Len:8, ?SN_CONNACK, 0:8>> ->
io:format("recv connect ack~n");
<<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
TopicId;
<<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
<<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
_ ->
io:format("ignore bin=~p~n", [Bin])
end;
Any ->
io:format("recv something else from udp socket ~p~n", [Any])
after
2000 ->
io:format("Error: receive timeout!~n"),
wait_response()
end.

View File

@ -1,120 +0,0 @@
-module(simple_example2).
-include("emqx_sn.hrl").
-define(HOST, "localhost").
-define(PORT, 1884).
-export([start/0]).
start() ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
%% create udp socket
{ok, Socket} = gen_udp:open(0, [binary]),
%% connect to emqx_sn broker
Packet = gen_connect_packet(<<"client1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
io:format("send connect packet=~p~n", [Packet]),
%% receive message
wait_response(),
%% subscribe, SHORT TOPIC NAME
SubscribePacket = gen_subscribe_packet(<<"T1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
io:format("send subscribe packet=~p~n", [SubscribePacket]),
wait_response(),
%% publish, SHORT TOPIC NAME
PublishPacket = gen_publish_packet(<<"T1">>, <<"Payload...">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
io:format("send publish packet=~p~n", [PublishPacket]),
wait_response(),
% wait for subscribed message from broker
wait_response(),
%% disconnect from emqx_sn broker
DisConnectPacket = gen_disconnect_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
io:format("send disconnect packet=~p~n", [DisConnectPacket]).
gen_connect_packet(ClientId) ->
Length = 6+byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
ProtocolId = 1,
Duration = 10,
<<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
gen_subscribe_packet(ShortTopic) ->
Length = 7,
MsgType = ?SN_SUBSCRIBE,
Dup = 0,
Retain = 0,
Will = 0,
QoS = 1,
CleanSession = 0,
TopicIdType = 2, % SHORT TOPIC NAME
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
MsgId = 1,
<<Length:8, MsgType:8, Flag/binary, MsgId:16, ShortTopic/binary>>.
gen_register_packet(Topic, TopicId) ->
Length = 6+byte_size(Topic),
MsgType = ?SN_REGISTER,
MsgId = 1,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
gen_publish_packet(ShortTopic, Payload) ->
Length = 7+byte_size(Payload),
MsgType = ?SN_PUBLISH,
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicIdType = 2, % SHORT TOPIC NAME
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
<<Length:8, MsgType:8, Flag/binary, ShortTopic/binary, MsgId:16, Payload/binary>>.
gen_disconnect_packet()->
Length = 2,
MsgType = ?SN_DISCONNECT,
<<Length:8, MsgType:8>>.
wait_response() ->
receive
{udp, _Socket, _, _, Bin} ->
case Bin of
<<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]);
<<_Len:8, ?SN_CONNACK, 0:8>> ->
io:format("recv connect ack~n");
<<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
TopicId;
<<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
<<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
_ ->
io:format("ignore bin=~p~n", [Bin])
end;
Any ->
io:format("recv something else from udp socket ~p~n", [Any])
after
2000 ->
io:format("Error: receive timeout!~n"),
wait_response()
end.

View File

@ -1,120 +0,0 @@
-module(simple_example3).
-include("emqx_sn.hrl").
-define(HOST, "localhost").
-define(PORT, 1884).
-export([start/0]).
start() ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
%% create udp socket
{ok, Socket} = gen_udp:open(0, [binary]),
%% connect to emqx_sn broker
Packet = gen_connect_packet(<<"client1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
io:format("send connect packet=~p~n", [Packet]),
%% receive message
wait_response(),
%% subscribe normal topic name
SubscribePacket = gen_subscribe_packet(<<"T3">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
io:format("send subscribe packet=~p~n", [SubscribePacket]),
wait_response(),
%% publish SHORT TOPIC NAME
PublishPacket = gen_publish_packet(<<"T3">>, <<"Payload...">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
io:format("send publish packet=~p~n", [PublishPacket]),
wait_response(),
% wait for subscribed message from broker
wait_response(),
%% disconnect from emqx_sn broker
DisConnectPacket = gen_disconnect_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
io:format("send disconnect packet=~p~n", [DisConnectPacket]).
gen_connect_packet(ClientId) ->
Length = 6+byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
ProtocolId = 1,
Duration = 10,
<<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
gen_subscribe_packet(ShortTopic) ->
Length = 7,
MsgType = ?SN_SUBSCRIBE,
Dup = 0,
Retain = 0,
Will = 0,
QoS = 1,
CleanSession = 0,
TopicIdType = 0, % normal topic name
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
MsgId = 1,
<<Length:8, MsgType:8, Flag/binary, MsgId:16, ShortTopic/binary>>.
gen_register_packet(Topic, TopicId) ->
Length = 6+byte_size(Topic),
MsgType = ?SN_REGISTER,
MsgId = 1,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
gen_publish_packet(ShortTopic, Payload) ->
Length = 7+byte_size(Payload),
MsgType = ?SN_PUBLISH,
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicIdType = 2, % SHORT TOPIC NAME
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
<<Length:8, MsgType:8, Flag/binary, ShortTopic/binary, MsgId:16, Payload/binary>>.
gen_disconnect_packet()->
Length = 2,
MsgType = ?SN_DISCONNECT,
<<Length:8, MsgType:8>>.
wait_response() ->
receive
{udp, _Socket, _, _, Bin} ->
case Bin of
<<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]);
<<_Len:8, ?SN_CONNACK, 0:8>> ->
io:format("recv connect ack~n");
<<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
TopicId;
<<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
<<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
_ ->
io:format("ignore bin=~p~n", [Bin])
end;
Any ->
io:format("recv something else from udp socket ~p~n", [Any])
after
2000 ->
io:format("Error: receive timeout!~n"),
wait_response()
end.

View File

@ -1,151 +0,0 @@
-module(simple_example4).
-include("emqx_sn.hrl").
-define(HOST, {127,0,0,1}).
-define(PORT, 1884).
-export([start/0]).
start(LoopTimes) ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),
%% create udp socket
{ok, Socket} = gen_udp:open(0, [binary]),
%% connect to emqx_sn broker
Packet = gen_connect_packet(<<"client1">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet),
io:format("send connect packet=~p~n", [Packet]),
%% receive message
wait_response(),
%% register topic_id
RegisterPacket = gen_register_packet(<<"TopicA">>, 0),
ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket),
io:format("send register packet=~p~n", [RegisterPacket]),
TopicId = wait_response(),
%% subscribe
SubscribePacket = gen_subscribe_packet(TopicId),
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket),
io:format("send subscribe packet=~p~n", [SubscribePacket]),
wait_response(),
%% loop publish
[begin
timer:sleep(1000),
io:format("~n-------------------- publish ~p start --------------------~n", [N]),
PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket),
io:format("send publish packet=~p~n", [PublishPacket]),
% wait for publish ack
wait_response(),
% wait for subscribed message from broker
wait_response(),
PingReqPacket = gen_pingreq_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket),
% wait for pingresp
wait_response(),
io:format("--------------------- publish ~p end ---------------------~n", [N])
end || N <- lists:seq(1, LoopTimes)],
%% disconnect from emqx_sn broker
DisConnectPacket = gen_disconnect_packet(),
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket),
io:format("send disconnect packet=~p~n", [DisConnectPacket]).
gen_connect_packet(ClientId) ->
Length = 6+byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
ProtocolId = 1,
Duration = 10,
<<Length:8, MsgType:8, Flag/binary, ProtocolId:8, Duration:16, ClientId/binary>>.
gen_subscribe_packet(TopicId) ->
Length = 7,
MsgType = ?SN_SUBSCRIBE,
Dup = 0,
Retain = 0,
Will = 0,
QoS = 1,
CleanSession = 0,
TopicIdType = 1,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
MsgId = 1,
<<Length:8, MsgType:8, Flag/binary, MsgId:16, TopicId:16>>.
gen_register_packet(Topic, TopicId) ->
Length = 6+byte_size(Topic),
MsgType = ?SN_REGISTER,
MsgId = 1,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, Topic/binary>>.
gen_publish_packet(TopicId, Payload) ->
Length = 7+byte_size(Payload),
MsgType = ?SN_PUBLISH,
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicIdType = 1,
Flag = <<Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, TopicIdType:2>>,
<<Length:8, MsgType:8, Flag/binary, TopicId:16, MsgId:16, Payload/binary>>.
gen_puback_packet(TopicId, MsgId) ->
Length = 7,
MsgType = ?SN_PUBACK,
<<Length:8, MsgType:8, TopicId:16, MsgId:16, 0:8>>.
gen_pingreq_packet() ->
Length = 2,
MsgType = ?SN_PINGREQ,
<<Length:8, MsgType:8>>.
gen_disconnect_packet()->
Length = 2,
MsgType = ?SN_DISCONNECT,
<<Length:8, MsgType:8>>.
wait_response() ->
receive
{udp, Socket, _, _, Bin} ->
case Bin of
<<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> ->
io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]),
ok = gen_udp:send(Socket, ?HOST, ?PORT, gen_puback_packet(TopicId, MsgId));
<<_Len:8, ?SN_CONNACK, 0:8>> ->
io:format("recv connect ack~n");
<<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]),
TopicId;
<<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]);
<<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> ->
io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]);
<<_Len:8, ?SN_PINGRESP>> ->
io:format("recv pingresp~n");
_ ->
io:format("ignore bin=~p~n", [Bin])
end;
Any ->
io:format("recv something else from udp socket ~p~n", [Any])
after
2000 ->
io:format("Error: receive timeout!~n"),
wait_response()
end.

View File

@ -1,26 +0,0 @@
{deps, []}.
{plugins, [rebar3_proper]}.
{deps,
[{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{dialyzer, [{warnings, [unmatched_returns, error_handling, race_conditions]}
]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{plugins, [coveralls]}.

View File

@ -1,14 +0,0 @@
{application, emqx_sn,
[{description, "EMQ X MQTT-SN Plugin"},
{vsn, "4.4.0"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,esockd]},
{mod, {emqx_sn_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-sn"}
]}
]}.

View File

@ -1,19 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.2", [
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
]},
{<<"4.3.[0-1]">>, [
{restart_application, emqx_sn}
]}
],
[
{"4.3.2", [
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []}
]},
{<<"4.3.[0-1]">>, [
{restart_application, emqx_sn}
]}
]
}.

View File

@ -1,8 +0,0 @@
%% vars here are for test only, not intended for release
{platform_bin_dir, "bin"}.
{platform_data_dir, "data"}.
{platform_etc_dir, "etc"}.
{platform_lib_dir, "lib"}.
{platform_log_dir, "log"}.
{platform_plugins_dir, "data/plugins"}.