diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index ab5b52143..ba1e8168b 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -27,4 +27,47 @@ emqx_gateway: { active_n: 100 } } + + mqttsn.1: { + ## The MQTT-SN Gateway ID in ADVERTISE message. + gateway_id: 1 + + ## Enable broadcast this gateway to WLAN + broadcast: true + + ## To control whether write statistics data into ETS table + ## for dashbord to read. + enable_stats: true + + ## To control whether accept and process the received + ## publish message with qos=-1. + enable_qos3: true + + ## Idle timeout for a MQTT-SN channel + idle_timeout: 30s + + ## The pre-defined topic name corresponding to the pre-defined topic + ## id of N. + ## Note that the pre-defined topic id of 0 is reserved. + predefined: [ + { id: 1 + topic: "/predefined/topic/name/hello" + }, + { id: 2 + topic: "/predefined/topic/name/nice" + } + ] + + ### ClientInfo override + clientinfo_override: { + username: "mqtt_sn_user" + password: "abc" + } + + listener.udp.1: { + bind: 1884 + max_connections: 10240000 + max_conn_rate: 1000 + } + } } diff --git a/apps/emqx_sn/etc/emqx_sn.conf b/apps/emqx_gateway/etc/emqx_sn.conf similarity index 100% rename from apps/emqx_sn/etc/emqx_sn.conf rename to apps/emqx_gateway/etc/emqx_sn.conf diff --git a/apps/emqx_sn/priv/emqx_sn.schema b/apps/emqx_gateway/etc/priv/emqx_sn.schema similarity index 100% rename from apps/emqx_sn/priv/emqx_sn.schema rename to apps/emqx_gateway/etc/priv/emqx_sn.schema diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl index 9726dad02..8d413e49c 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl @@ -31,6 +31,7 @@ ) -> {error, reason()} | {ok, [GwInstaPid :: pid()], GwInstaState :: state()} + %% TODO: v0.2 The child spec is better for restarting child process | {ok, [Childspec :: supervisor:child_spec()], GwInstaState :: state()}. %% @doc diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl index c99228f17..3982e260b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_app.erl +++ b/apps/emqx_gateway/src/emqx_gateway_app.erl @@ -45,7 +45,7 @@ load_default_gateway_applications() -> gateway_type_searching() -> %% FIXME: Hardcoded apps - [emqx_stomp_impl]. + [emqx_stomp_impl, emqx_sn_impl]. load(Mod) -> try @@ -65,7 +65,7 @@ create_gateway_by_default([]) -> create_gateway_by_default([{Type, Name, Confs}|More]) -> case emqx_gateway_registry:lookup(Type) of undefined -> - ?LOG(error, "Skip to start ~p#~p: not_registred_type", + ?LOG(error, "Skip to start ~s#~s: not_registred_type", [Type, Name]); _ -> case emqx_gateway:create(Type, @@ -73,9 +73,9 @@ create_gateway_by_default([{Type, Name, Confs}|More]) -> <<>>, Confs) of {ok, _} -> - ?LOG(debug, "Start ~p#~p successfully!", [Type, Name]); + ?LOG(debug, "Start ~s#~s successfully!", [Type, Name]); {error, Reason} -> - ?LOG(error, "Start ~p#~p failed: ~0p", + ?LOG(error, "Start ~s#~s failed: ~0p", [Type, Name, Reason]) end end, diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 9f21f0e05..7994a6cea 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -54,7 +54,6 @@ start_link(Insta, Ctx, GwDscrptr) -> gen_server:start_link( - {local, ?MODULE}, ?MODULE, [Insta, Ctx, GwDscrptr], [] diff --git a/apps/emqx_gateway/src/emqx_gateway_metrics.erl b/apps/emqx_gateway/src/emqx_gateway_metrics.erl index 04b711d0a..461eb3344 100644 --- a/apps/emqx_gateway/src/emqx_gateway_metrics.erl +++ b/apps/emqx_gateway/src/emqx_gateway_metrics.erl @@ -49,7 +49,7 @@ %%-------------------------------------------------------------------- start_link(Type) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Type], []). + gen_server:start_link(?MODULE, [Type], []). -spec inc(gateway_type(), atom()) -> ok. inc(Type, Name) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 8f05582c7..f1b1f9fa4 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -35,7 +35,9 @@ structs() -> ["emqx_gateway"]. fields("emqx_gateway") -> - [{stomp, t(ref(stomp))}]; + [{stomp, t(ref(stomp))}, + {mqttsn, t(ref(mqttsn))} + ]; fields(stomp) -> [{"$id", t(ref(stomp_structs))}]; @@ -44,7 +46,7 @@ fields(stomp_structs) -> [ {frame, t(ref(stomp_frame))} , {clientinfo_override, t(ref(clientinfo_override))} , {authenticator, t(union([allow_anonymous]))} - , {listener, t(ref(listener))} + , {listener, t(ref(tcp_listener_group))} ]; fields(stomp_frame) -> @@ -53,13 +55,38 @@ fields(stomp_frame) -> , {max_body_length, t(integer(), undefined, 8192)} ]; +fields(mqttsn) -> + [{"$id", t(ref(mqttsn_structs))}]; + +fields(mqttsn_structs) -> + [ {gateway_id, t(integer())} + , {broadcast, t(boolean())} + , {enable_stats, t(boolean())} + , {enable_qos3, t(boolean())} + , {idle_timeout, t(duration())} + , {predefined, hoconsc:array(ref(mqttsn_predefined))} + , {clientinfo_override, t(ref(clientinfo_override))} + , {listener, t(ref(udp_listener_group))} + ]; + +fields(mqttsn_predefined) -> + %% FIXME: How to check the $id is a integer ??? + [ {id, t(integer())} + , {topic, t(string())} + ]; + fields(clientinfo_override) -> [ {username, t(string())} , {password, t(string())} , {clientid, t(string())} ]; -fields(listener) -> +fields(udp_listener_group) -> + [ {udp, t(ref(udp_listener))} + , {dtls, t(ref(dtls_listener))} + ]; + +fields(tcp_listener_group) -> [ {tcp, t(ref(tcp_listener))} , {ssl, t(ref(ssl_listener))} ]; @@ -70,7 +97,14 @@ fields(tcp_listener) -> fields(ssl_listener) -> [ {"$name", t(ref(ssl_listener_settings))}]; +fields(udp_listener) -> + [ {"$name", t(ref(udp_listener_settings))}]; + +fields(dtls_listener) -> + [ {"$name", t(ref(dtls_listener_settings))}]; + fields(listener_settings) -> + % FIXME: %[ {"bind", t(union(ip_port(), integer()))} [ {bind, t(integer())} , {acceptors, t(integer(), undefined, 8)} @@ -107,6 +141,19 @@ fields(ssl_listener_settings) -> , depth => 10 , reuse_sessions => true}) ++ fields(listener_settings); +fields(udp_listener_settings) -> + [ + %% some special confs for udp listener + ] ++ fields(listener_settings); + +fields(dtls_listener_settings) -> + [ + %% some special confs for dtls listener + ] ++ + ssl(undefined, #{handshake_timeout => "15s" + , depth => 10 + , reuse_sessions => true}) ++ fields(listener_settings); + fields(access) -> [ {"$id", #{type => string(), nullable => true}}]; diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 184c3ff87..b7e6658d1 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -25,6 +25,7 @@ ]). -export([ apply/2 + , format_listenon/1 ]). -export([ normalize_rawconf/1 @@ -89,6 +90,13 @@ apply(F, A2) when is_function(F), is_list(A2) -> erlang:apply(F, A2). +format_listenon(Port) when is_integer(Port) -> + io_lib:format("0.0.0.0:~w", [Port]); +format_listenon({Addr, Port}) when is_list(Addr) -> + io_lib:format("~s:~w", [Addr, Port]); +format_listenon({Addr, Port}) when is_tuple(Addr) -> + io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). + -type listener() :: #{}. -type rawconf() :: diff --git a/apps/emqx_sn/README.md b/apps/emqx_gateway/src/mqttsn/README.md similarity index 94% rename from apps/emqx_sn/README.md rename to apps/emqx_gateway/src/mqttsn/README.md index d7251c49c..8179dde62 100644 --- a/apps/emqx_sn/README.md +++ b/apps/emqx_gateway/src/mqttsn/README.md @@ -1,10 +1,9 @@ -emqx-sn -======= +# MQTT-SN Gateway EMQ X MQTT-SN Gateway. -Configure Plugin ----------------- +## Configure Plugin + File: etc/emqx_sn.conf @@ -72,8 +71,7 @@ mqtt.sn.password = abc - mqtt.sn.password * This parameter is optional. Pair with username above. -Load Plugin ------------ +## Load Plugin ``` ./bin/emqx_ctl plugins load emqx_sn @@ -95,23 +93,18 @@ Load Plugin - https://github.com/njh/mqtt-sn-tools - https://github.com/arobenko/mqtt-sn -sleeping device ------------ +### sleeping device PINGREQ must have a ClientId which is identical to the one in CONNECT message. Without ClientId, emqx-sn will ignore such PINGREQ. -pre-defined topics ------------ +### pre-defined topics The mapping of a pre-defined topic id and topic name should be known inadvance by both client's application and gateway. We define this mapping info in emqx_sn.conf file, and which shall be kept equivalent in all client's side. -License -------- +## License Apache License Version 2.0 -Author ------- - -EMQ X-Men Team. +## Author +EMQ X Team. diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_app.erl similarity index 100% rename from apps/emqx_sn/src/emqx_sn_app.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_app.erl diff --git a/apps/emqx_sn/src/emqx_sn_asleep_timer.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_asleep_timer.erl similarity index 100% rename from apps/emqx_sn/src/emqx_sn_asleep_timer.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_asleep_timer.erl diff --git a/apps/emqx_sn/src/emqx_sn_broadcast.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl similarity index 98% rename from apps/emqx_sn/src/emqx_sn_broadcast.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl index a1630b844..69eb9a2c5 100644 --- a/apps/emqx_sn/src/emqx_sn_broadcast.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --include("emqx_sn.hrl"). +-include("src/mqttsn/include/emqx_sn.hrl"). -export([ start_link/2 , stop/0 diff --git a/apps/emqx_sn/src/emqx_sn_frame.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl similarity index 99% rename from apps/emqx_sn/src/emqx_sn_frame.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl index eed32803d..301247fbc 100644 --- a/apps/emqx_sn/src/emqx_sn_frame.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl @@ -17,7 +17,7 @@ -module(emqx_sn_frame). --include("emqx_sn.hrl"). +-include("src/mqttsn/include/emqx_sn.hrl"). -export([ parse/1 , serialize/1 diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl similarity index 99% rename from apps/emqx_sn/src/emqx_sn_gateway.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl index 1bccf0c1a..27eb16498 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_gateway.erl @@ -18,7 +18,7 @@ -behaviour(gen_statem). --include("emqx_sn.hrl"). +-include("src/mqttsn/include/emqx_sn.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -915,7 +915,8 @@ handle_unsubscribe(_, _TopicId, MsgId, State) -> {keep_state, send_message(?SN_UNSUBACK_MSG(MsgId), State)}. do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) -> - %% XXX: Handle normal topic id as predefined topic id, to be compatible with paho mqtt-sn library + %% XXX: Handle normal topic id as predefined topic id, to be + %% compatible with paho mqtt-sn library <> = TopicName, do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State); do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, @@ -972,8 +973,11 @@ do_puback(TopicId, MsgId, ReturnCode, StateName, undefined -> {keep_state, State}; TopicName -> %%notice that this TopicName maybe normal or predefined, - %% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels - {keep_state, send_register(TopicName, TopicId, MsgId, State)} + %% involving the predefined topic name in register to + %% enhance the gateway's robustness even inconsistent + %% with MQTT-SN channels + {keep_state, send_register(TopicName, TopicId, + MsgId, State)} end; _ -> ?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]), @@ -1070,8 +1074,9 @@ handle_outgoing(Packet, State) -> send_message(mqtt2sn(Packet, State), State). cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) -> - ?LOG(debug, "cache non-registered publish message for topic-id: ~p, msg: ~0p, pendings: ~0p", - [TopicId, PubPkt, Pendings]), + ?LOG(debug, "cache non-registered publish message " + "for topic-id: ~p, msg: ~0p, pendings: ~0p", + [TopicId, PubPkt, Pendings]), Msgs = maps:get(pending_topic_ids, Pendings, []), Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}. diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl new file mode 100644 index 000000000..c3b679381 --- /dev/null +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -0,0 +1,161 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The MQTT-SN Gateway Implement interface +-module(emqx_sn_impl). + +-behavior(emqx_gateway_impl). + +%% APIs +-export([ load/0 + , unload/0 + ]). + +-export([]). + +-export([ init/1 + , on_insta_create/3 + , on_insta_update/4 + , on_insta_destroy/3 + ]). + +-define(UDP_SOCKOPTS, []). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +load() -> + RegistryOptions = [ {cbkmod, ?MODULE} + ], + YourOptions = [params1, params2], + emqx_gateway_registry:load(mqttsn, RegistryOptions, YourOptions). + +unload() -> + emqx_gateway_registry:unload(mqttsn). + +init(_) -> + GwState = #{}, + {ok, GwState}. + +%%-------------------------------------------------------------------- +%% emqx_gateway_registry callbacks +%%-------------------------------------------------------------------- + +on_insta_create(_Insta = #{ id := InstaId, + rawconf := RawConf + }, Ctx, _GwState) -> + + %% We Also need to start `emqx_sn_broadcast` & + %% `emqx_sn_registry` process + SnGwId = maps:get(gateway_id, RawConf), + case maps:get(broadcast, RawConf) of + false -> + ok; + true -> + %% FIXME: + Port = 1884, + _ = emqx_sn_broadcast:start_link(SnGwId, Port) + end, + + PredefTopics = maps:get(predefined, RawConf), + {ok, RegistrySvr} = emqx_sn_registry:start_link(PredefTopics), + + NRawConf = maps:without( + [gateway_id, broadcast, predefined], + RawConf#{registry => RegistrySvr} + ), + Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf), + + ListenerPids = lists:map(fun(Lis) -> + start_listener(InstaId, Ctx, Lis) + end, Listeners), + {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. + +on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> + InstaId = maps:get(id, NewInsta), + try + %% XXX: 1. How hot-upgrade the changes ??? + %% XXX: 2. Check the New confs first before destroy old instance ??? + on_insta_destroy(OldInsta, GwInstaState, GwState), + on_insta_create(NewInsta, Ctx, GwState) + catch + Class : Reason : Stk -> + logger:error("Failed to update stomp instance ~s; " + "reason: {~0p, ~0p} stacktrace: ~0p", + [InstaId, Class, Reason, Stk]), + {error, {Class, Reason}} + end. + +on_insta_destroy(_Insta = #{ id := InstaId, + rawconf := RawConf + }, _GwInstaState, _GwState) -> + Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), + lists:foreach(fun(Lis) -> + stop_listener(InstaId, Lis) + end, Listeners). + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of + {ok, Pid} -> + io:format("Start mqttsn ~s:~s listener on ~s successfully.~n", + [InstaId, Type, ListenOnStr]), + Pid; + {error, Reason} -> + io:format(standard_error, + "Failed to start mqttsn ~s:~s listener on ~s: ~0p~n", + [InstaId, Type, ListenOnStr, Reason]), + throw({badconf, Reason}) + end. + +start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(InstaId, Type), + esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), + {emqx_sn_gateway, start_link, [Cfg#{ctx => Ctx}]}). + +name(InstaId, Type) -> + list_to_atom(lists:concat([InstaId, ":", Type])). + +merge_default(Options) -> + case lists:keytake(udp_options, 1, Options) of + {value, {udp_options, TcpOpts}, Options1} -> + [{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} | Options1]; + false -> + [{udp_options, ?UDP_SOCKOPTS} | Options] + end. + +stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + case StopRet of + ok -> io:format("Stop mqttsn ~s:~s listener on ~s successfully.~n", + [InstaId, Type, ListenOnStr]); + {error, Reason} -> + io:format(standard_error, + "Failed to stop mqttsn ~s:~s listener on ~s: ~0p~n", + [InstaId, Type, ListenOnStr, Reason] + ) + end, + StopRet. + +stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(InstaId, Type), + esockd:close(Name, ListenOn). diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl similarity index 98% rename from apps/emqx_sn/src/emqx_sn_registry.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 903f61c70..f2f87d93b 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --include("emqx_sn.hrl"). +-include("src/mqttsn/include/emqx_sn.hrl"). -define(LOG(Level, Format, Args), emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). @@ -132,7 +132,7 @@ init([PredefTopics]) -> %% {ClientId, TopicId} -> TopicName %% {ClientId, TopicName} -> TopicId MaxPredefId = lists:foldl( - fun({TopicId, TopicName}, AccId) -> + fun(#{id := TopicId, topic := TopicName}, AccId) -> ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, value = TopicName}), ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, diff --git a/apps/emqx_sn/src/emqx_sn_sup.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_sup.erl similarity index 85% rename from apps/emqx_sn/src/emqx_sn_sup.erl rename to apps/emqx_gateway/src/mqttsn/emqx_sn_sup.erl index 3d4fe602f..e78b41766 100644 --- a/apps/emqx_sn/src/emqx_sn_sup.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_sup.erl @@ -33,11 +33,9 @@ init([{_Ip, Port}, GwId, PredefTopics]) -> type => worker, modules => [emqx_sn_broadcast]}, Registry = #{id => emqx_sn_registry, - start => {emqx_sn_registry, start_link, [PredefTopics]}, - restart => permanent, - shutdown => brutal_kill, - type => worker, - modules => [emqx_sn_registry]}, + start => {emqx_sn_registry, start_link, [PredefTopics]}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => [emqx_sn_registry]}, {ok, {{one_for_one, 10, 3600}, [Broadcast, Registry]}}. - - diff --git a/apps/emqx_sn/include/emqx_sn.hrl b/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl similarity index 100% rename from apps/emqx_sn/include/emqx_sn.hrl rename to apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index e6e62565a..86cce9c91 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -71,13 +71,12 @@ on_insta_create(_Insta = #{ id := InstaId, %% FIXME: Assign ctx to InstaState {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. -%% @private -on_insta_update(NewInsta, OldInstace, GwInstaState = #{ctx := Ctx}, GwState) -> +on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> InstaId = maps:get(id, NewInsta), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_insta_destroy(OldInstace, GwInstaState, GwState), + on_insta_destroy(OldInsta, GwInstaState, GwState), on_insta_create(NewInsta, Ctx, GwState) catch Class : Reason : Stk -> @@ -100,15 +99,16 @@ on_insta_destroy(_Insta = #{ id := InstaId, %%-------------------------------------------------------------------- start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> io:format("Start stomp ~s:~s listener on ~s successfully.~n", - [InstaId, Type, format(ListenOn)]), + [InstaId, Type, ListenOnStr]), Pid; {error, Reason} -> io:format(standard_error, "Failed to start stomp ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, format(ListenOn), Reason]), + [InstaId, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. @@ -128,22 +128,16 @@ merge_default(Options) -> [{tcp_options, ?TCP_OPTS} | Options] end. -format(Port) when is_integer(Port) -> - io_lib:format("0.0.0.0:~w", [Port]); -format({Addr, Port}) when is_list(Addr) -> - io_lib:format("~s:~w", [Addr, Port]); -format({Addr, Port}) when is_tuple(Addr) -> - io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). - stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of ok -> io:format("Stop stomp ~s:~s listener on ~s successfully.~n", - [InstaId, Type, format(ListenOn)]); + [InstaId, Type, ListenOnStr]); {error, Reason} -> io:format(standard_error, "Failed to stop stomp ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, format(ListenOn), Reason] + [InstaId, Type, ListenOnStr, Reason] ) end, StopRet. diff --git a/apps/emqx_sn/test/broadcast_test.py b/apps/emqx_gateway/test/broadcast_test.py similarity index 100% rename from apps/emqx_sn/test/broadcast_test.py rename to apps/emqx_gateway/test/broadcast_test.py diff --git a/apps/emqx_sn/test/emqx_sn_frame_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl similarity index 100% rename from apps/emqx_sn/test/emqx_sn_frame_SUITE.erl rename to apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl similarity index 100% rename from apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl rename to apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl diff --git a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl similarity index 100% rename from apps/emqx_sn/test/emqx_sn_registry_SUITE.erl rename to apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl diff --git a/apps/emqx_sn/intergration_test/Makefile b/apps/emqx_gateway/test/intergration_test/Makefile similarity index 100% rename from apps/emqx_sn/intergration_test/Makefile rename to apps/emqx_gateway/test/intergration_test/Makefile diff --git a/apps/emqx_sn/intergration_test/README.md b/apps/emqx_gateway/test/intergration_test/README.md similarity index 100% rename from apps/emqx_sn/intergration_test/README.md rename to apps/emqx_gateway/test/intergration_test/README.md diff --git a/apps/emqx_sn/intergration_test/add_emqx_sn_to_project.py b/apps/emqx_gateway/test/intergration_test/add_emqx_sn_to_project.py similarity index 100% rename from apps/emqx_sn/intergration_test/add_emqx_sn_to_project.py rename to apps/emqx_gateway/test/intergration_test/add_emqx_sn_to_project.py diff --git a/apps/emqx_sn/intergration_test/client/case1_qos0pub.c b/apps/emqx_gateway/test/intergration_test/client/case1_qos0pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case1_qos0pub.c rename to apps/emqx_gateway/test/intergration_test/client/case1_qos0pub.c diff --git a/apps/emqx_sn/intergration_test/client/case1_qos0sub.c b/apps/emqx_gateway/test/intergration_test/client/case1_qos0sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case1_qos0sub.c rename to apps/emqx_gateway/test/intergration_test/client/case1_qos0sub.c diff --git a/apps/emqx_sn/intergration_test/client/case2_qos0pub.c b/apps/emqx_gateway/test/intergration_test/client/case2_qos0pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case2_qos0pub.c rename to apps/emqx_gateway/test/intergration_test/client/case2_qos0pub.c diff --git a/apps/emqx_sn/intergration_test/client/case2_qos0sub.c b/apps/emqx_gateway/test/intergration_test/client/case2_qos0sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case2_qos0sub.c rename to apps/emqx_gateway/test/intergration_test/client/case2_qos0sub.c diff --git a/apps/emqx_sn/intergration_test/client/case3_qos0pub.c b/apps/emqx_gateway/test/intergration_test/client/case3_qos0pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case3_qos0pub.c rename to apps/emqx_gateway/test/intergration_test/client/case3_qos0pub.c diff --git a/apps/emqx_sn/intergration_test/client/case3_qos0sub.c b/apps/emqx_gateway/test/intergration_test/client/case3_qos0sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case3_qos0sub.c rename to apps/emqx_gateway/test/intergration_test/client/case3_qos0sub.c diff --git a/apps/emqx_sn/intergration_test/client/case4_qos3pub.c b/apps/emqx_gateway/test/intergration_test/client/case4_qos3pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case4_qos3pub.c rename to apps/emqx_gateway/test/intergration_test/client/case4_qos3pub.c diff --git a/apps/emqx_sn/intergration_test/client/case4_qos3sub.c b/apps/emqx_gateway/test/intergration_test/client/case4_qos3sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case4_qos3sub.c rename to apps/emqx_gateway/test/intergration_test/client/case4_qos3sub.c diff --git a/apps/emqx_sn/intergration_test/client/case5_qos3pub.c b/apps/emqx_gateway/test/intergration_test/client/case5_qos3pub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case5_qos3pub.c rename to apps/emqx_gateway/test/intergration_test/client/case5_qos3pub.c diff --git a/apps/emqx_sn/intergration_test/client/case5_qos3sub.c b/apps/emqx_gateway/test/intergration_test/client/case5_qos3sub.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case5_qos3sub.c rename to apps/emqx_gateway/test/intergration_test/client/case5_qos3sub.c diff --git a/apps/emqx_sn/intergration_test/client/case6_sleep.c b/apps/emqx_gateway/test/intergration_test/client/case6_sleep.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case6_sleep.c rename to apps/emqx_gateway/test/intergration_test/client/case6_sleep.c diff --git a/apps/emqx_sn/intergration_test/client/case7_double_connect.c b/apps/emqx_gateway/test/intergration_test/client/case7_double_connect.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/case7_double_connect.c rename to apps/emqx_gateway/test/intergration_test/client/case7_double_connect.c diff --git a/apps/emqx_sn/intergration_test/client/int_test_result.c b/apps/emqx_gateway/test/intergration_test/client/int_test_result.c similarity index 100% rename from apps/emqx_sn/intergration_test/client/int_test_result.c rename to apps/emqx_gateway/test/intergration_test/client/int_test_result.c diff --git a/apps/emqx_sn/intergration_test/client/int_test_result.h b/apps/emqx_gateway/test/intergration_test/client/int_test_result.h similarity index 100% rename from apps/emqx_sn/intergration_test/client/int_test_result.h rename to apps/emqx_gateway/test/intergration_test/client/int_test_result.h diff --git a/apps/emqx_sn/intergration_test/disable_qos3.py b/apps/emqx_gateway/test/intergration_test/disable_qos3.py similarity index 100% rename from apps/emqx_sn/intergration_test/disable_qos3.py rename to apps/emqx_gateway/test/intergration_test/disable_qos3.py diff --git a/apps/emqx_sn/intergration_test/enable_qos3.py b/apps/emqx_gateway/test/intergration_test/enable_qos3.py similarity index 100% rename from apps/emqx_sn/intergration_test/enable_qos3.py rename to apps/emqx_gateway/test/intergration_test/enable_qos3.py diff --git a/apps/emqx_sn/test/props/emqx_sn_proper_types.erl b/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl similarity index 100% rename from apps/emqx_sn/test/props/emqx_sn_proper_types.erl rename to apps/emqx_gateway/test/props/emqx_sn_proper_types.erl diff --git a/apps/emqx_sn/test/props/prop_emqx_sn_frame.erl b/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl similarity index 100% rename from apps/emqx_sn/test/props/prop_emqx_sn_frame.erl rename to apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl diff --git a/apps/emqx_sn/.gitignore b/apps/emqx_sn/.gitignore deleted file mode 100644 index 46861cdec..000000000 --- a/apps/emqx_sn/.gitignore +++ /dev/null @@ -1,40 +0,0 @@ -.eunit -deps -*.o -*.beam -*.plt -erl_crash.dump -ebin -rel/example_project -.concrete/DEV_MODE -.rebar -_rel/ -emqx_sn.d -logs/ -.erlang.mk/ -data/ -.idea/ -*.iml -*.d -_build/ -.rebar3 -rebar3.crashdump -.DS_Store -bbmustache/ -etc/gen.emqx.conf -cuttlefish -rebar.lock -xrefr -intergration_test/emqx-rel/ -intergration_test/paho.mqtt-sn.embedded-c/ -intergration_test/client/*.exe -intergration_test/client/*.txt -.DS_Store -cover/ -ct.coverdata -eunit.coverdata -test/ct.cover.spec -erlang.mk -etc/emqx_sn.conf.rendered -.rebar3/ -*.swp diff --git a/apps/emqx_sn/examples/simple_example.erl b/apps/emqx_sn/examples/simple_example.erl deleted file mode 100644 index ce19c4133..000000000 --- a/apps/emqx_sn/examples/simple_example.erl +++ /dev/null @@ -1,126 +0,0 @@ --module(simple_example). - --include("emqx_sn.hrl"). - --define(HOST, {127,0,0,1}). --define(PORT, 1884). - --export([start/0]). - -start() -> - io:format("start to connect ~p:~p~n", [?HOST, ?PORT]), - - %% create udp socket - {ok, Socket} = gen_udp:open(0, [binary]), - - %% connect to emqx_sn broker - Packet = gen_connect_packet(<<"client1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet), - io:format("send connect packet=~p~n", [Packet]), - %% receive message - wait_response(), - - %% register topic_id - RegisterPacket = gen_register_packet(<<"TopicA">>, 0), - ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket), - io:format("send register packet=~p~n", [RegisterPacket]), - TopicId = wait_response(), - - %% subscribe - SubscribePacket = gen_subscribe_packet(TopicId), - ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket), - io:format("send subscribe packet=~p~n", [SubscribePacket]), - wait_response(), - - %% publish - PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket), - io:format("send publish packet=~p~n", [PublishPacket]), - wait_response(), - - % wait for subscribed message from broker - wait_response(), - - %% disconnect from emqx_sn broker - DisConnectPacket = gen_disconnect_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket), - io:format("send disconnect packet=~p~n", [DisConnectPacket]). - - - -gen_connect_packet(ClientId) -> - Length = 6+byte_size(ClientId), - MsgType = ?SN_CONNECT, - Dup = 0, - QoS = 0, - Retain = 0, - Will = 0, - CleanSession = 1, - TopicIdType = 0, - Flag = <>, - ProtocolId = 1, - Duration = 10, - <>. - -gen_subscribe_packet(TopicId) -> - Length = 7, - MsgType = ?SN_SUBSCRIBE, - Dup = 0, - Retain = 0, - Will = 0, - QoS = 1, - CleanSession = 0, - TopicIdType = 1, - Flag = <>, - MsgId = 1, - <>. - -gen_register_packet(Topic, TopicId) -> - Length = 6+byte_size(Topic), - MsgType = ?SN_REGISTER, - MsgId = 1, - <>. - -gen_publish_packet(TopicId, Payload) -> - Length = 7+byte_size(Payload), - MsgType = ?SN_PUBLISH, - Dup = 0, - QoS = 1, - Retain = 0, - Will = 0, - CleanSession = 0, - MsgId = 1, - TopicIdType = 1, - Flag = <>, - <>. - -gen_disconnect_packet()-> - Length = 2, - MsgType = ?SN_DISCONNECT, - <>. - -wait_response() -> - receive - {udp, _Socket, _, _, Bin} -> - case Bin of - <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> -> - io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]); - <<_Len:8, ?SN_CONNACK, 0:8>> -> - io:format("recv connect ack~n"); - <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]), - TopicId; - <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]); - <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]); - _ -> - io:format("ignore bin=~p~n", [Bin]) - end; - Any -> - io:format("recv something else from udp socket ~p~n", [Any]) - after - 2000 -> - io:format("Error: receive timeout!~n"), - wait_response() - end. diff --git a/apps/emqx_sn/examples/simple_example2.erl b/apps/emqx_sn/examples/simple_example2.erl deleted file mode 100644 index b9ada6d22..000000000 --- a/apps/emqx_sn/examples/simple_example2.erl +++ /dev/null @@ -1,120 +0,0 @@ --module(simple_example2). - --include("emqx_sn.hrl"). - --define(HOST, "localhost"). --define(PORT, 1884). - --export([start/0]). - -start() -> - io:format("start to connect ~p:~p~n", [?HOST, ?PORT]), - - %% create udp socket - {ok, Socket} = gen_udp:open(0, [binary]), - - %% connect to emqx_sn broker - Packet = gen_connect_packet(<<"client1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet), - io:format("send connect packet=~p~n", [Packet]), - %% receive message - wait_response(), - - %% subscribe, SHORT TOPIC NAME - SubscribePacket = gen_subscribe_packet(<<"T1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket), - io:format("send subscribe packet=~p~n", [SubscribePacket]), - wait_response(), - - %% publish, SHORT TOPIC NAME - PublishPacket = gen_publish_packet(<<"T1">>, <<"Payload...">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket), - io:format("send publish packet=~p~n", [PublishPacket]), - wait_response(), - - % wait for subscribed message from broker - wait_response(), - - %% disconnect from emqx_sn broker - DisConnectPacket = gen_disconnect_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket), - io:format("send disconnect packet=~p~n", [DisConnectPacket]). - - - -gen_connect_packet(ClientId) -> - Length = 6+byte_size(ClientId), - MsgType = ?SN_CONNECT, - Dup = 0, - QoS = 0, - Retain = 0, - Will = 0, - CleanSession = 1, - TopicIdType = 0, - Flag = <>, - ProtocolId = 1, - Duration = 10, - <>. - -gen_subscribe_packet(ShortTopic) -> - Length = 7, - MsgType = ?SN_SUBSCRIBE, - Dup = 0, - Retain = 0, - Will = 0, - QoS = 1, - CleanSession = 0, - TopicIdType = 2, % SHORT TOPIC NAME - Flag = <>, - MsgId = 1, - <>. - -gen_register_packet(Topic, TopicId) -> - Length = 6+byte_size(Topic), - MsgType = ?SN_REGISTER, - MsgId = 1, - <>. - -gen_publish_packet(ShortTopic, Payload) -> - Length = 7+byte_size(Payload), - MsgType = ?SN_PUBLISH, - Dup = 0, - QoS = 1, - Retain = 0, - Will = 0, - CleanSession = 0, - MsgId = 1, - TopicIdType = 2, % SHORT TOPIC NAME - Flag = <>, - <>. - -gen_disconnect_packet()-> - Length = 2, - MsgType = ?SN_DISCONNECT, - <>. - -wait_response() -> - receive - {udp, _Socket, _, _, Bin} -> - case Bin of - <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> -> - io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]); - <<_Len:8, ?SN_CONNACK, 0:8>> -> - io:format("recv connect ack~n"); - <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]), - TopicId; - <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]); - <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]); - _ -> - io:format("ignore bin=~p~n", [Bin]) - end; - Any -> - io:format("recv something else from udp socket ~p~n", [Any]) - after - 2000 -> - io:format("Error: receive timeout!~n"), - wait_response() - end. diff --git a/apps/emqx_sn/examples/simple_example3.erl b/apps/emqx_sn/examples/simple_example3.erl deleted file mode 100644 index 40f0bf572..000000000 --- a/apps/emqx_sn/examples/simple_example3.erl +++ /dev/null @@ -1,120 +0,0 @@ --module(simple_example3). - --include("emqx_sn.hrl"). - --define(HOST, "localhost"). --define(PORT, 1884). - --export([start/0]). - -start() -> - io:format("start to connect ~p:~p~n", [?HOST, ?PORT]), - - %% create udp socket - {ok, Socket} = gen_udp:open(0, [binary]), - - %% connect to emqx_sn broker - Packet = gen_connect_packet(<<"client1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet), - io:format("send connect packet=~p~n", [Packet]), - %% receive message - wait_response(), - - %% subscribe normal topic name - SubscribePacket = gen_subscribe_packet(<<"T3">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket), - io:format("send subscribe packet=~p~n", [SubscribePacket]), - wait_response(), - - %% publish SHORT TOPIC NAME - PublishPacket = gen_publish_packet(<<"T3">>, <<"Payload...">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket), - io:format("send publish packet=~p~n", [PublishPacket]), - wait_response(), - - % wait for subscribed message from broker - wait_response(), - - %% disconnect from emqx_sn broker - DisConnectPacket = gen_disconnect_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket), - io:format("send disconnect packet=~p~n", [DisConnectPacket]). - - - -gen_connect_packet(ClientId) -> - Length = 6+byte_size(ClientId), - MsgType = ?SN_CONNECT, - Dup = 0, - QoS = 0, - Retain = 0, - Will = 0, - CleanSession = 1, - TopicIdType = 0, - Flag = <>, - ProtocolId = 1, - Duration = 10, - <>. - -gen_subscribe_packet(ShortTopic) -> - Length = 7, - MsgType = ?SN_SUBSCRIBE, - Dup = 0, - Retain = 0, - Will = 0, - QoS = 1, - CleanSession = 0, - TopicIdType = 0, % normal topic name - Flag = <>, - MsgId = 1, - <>. - -gen_register_packet(Topic, TopicId) -> - Length = 6+byte_size(Topic), - MsgType = ?SN_REGISTER, - MsgId = 1, - <>. - -gen_publish_packet(ShortTopic, Payload) -> - Length = 7+byte_size(Payload), - MsgType = ?SN_PUBLISH, - Dup = 0, - QoS = 1, - Retain = 0, - Will = 0, - CleanSession = 0, - MsgId = 1, - TopicIdType = 2, % SHORT TOPIC NAME - Flag = <>, - <>. - -gen_disconnect_packet()-> - Length = 2, - MsgType = ?SN_DISCONNECT, - <>. - -wait_response() -> - receive - {udp, _Socket, _, _, Bin} -> - case Bin of - <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> -> - io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]); - <<_Len:8, ?SN_CONNACK, 0:8>> -> - io:format("recv connect ack~n"); - <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]), - TopicId; - <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]); - <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]); - _ -> - io:format("ignore bin=~p~n", [Bin]) - end; - Any -> - io:format("recv something else from udp socket ~p~n", [Any]) - after - 2000 -> - io:format("Error: receive timeout!~n"), - wait_response() - end. diff --git a/apps/emqx_sn/examples/simple_example4.erl b/apps/emqx_sn/examples/simple_example4.erl deleted file mode 100644 index 6beb5835c..000000000 --- a/apps/emqx_sn/examples/simple_example4.erl +++ /dev/null @@ -1,151 +0,0 @@ --module(simple_example4). - --include("emqx_sn.hrl"). - --define(HOST, {127,0,0,1}). --define(PORT, 1884). - --export([start/0]). - -start(LoopTimes) -> - io:format("start to connect ~p:~p~n", [?HOST, ?PORT]), - - %% create udp socket - {ok, Socket} = gen_udp:open(0, [binary]), - - %% connect to emqx_sn broker - Packet = gen_connect_packet(<<"client1">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet), - io:format("send connect packet=~p~n", [Packet]), - %% receive message - wait_response(), - - %% register topic_id - RegisterPacket = gen_register_packet(<<"TopicA">>, 0), - ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket), - io:format("send register packet=~p~n", [RegisterPacket]), - TopicId = wait_response(), - - %% subscribe - SubscribePacket = gen_subscribe_packet(TopicId), - ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket), - io:format("send subscribe packet=~p~n", [SubscribePacket]), - wait_response(), - - %% loop publish - [begin - timer:sleep(1000), - io:format("~n-------------------- publish ~p start --------------------~n", [N]), - - PublishPacket = gen_publish_packet(TopicId, <<"Payload...">>), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket), - io:format("send publish packet=~p~n", [PublishPacket]), - % wait for publish ack - wait_response(), - % wait for subscribed message from broker - wait_response(), - - PingReqPacket = gen_pingreq_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket), - % wait for pingresp - wait_response(), - - io:format("--------------------- publish ~p end ---------------------~n", [N]) - end || N <- lists:seq(1, LoopTimes)], - - %% disconnect from emqx_sn broker - DisConnectPacket = gen_disconnect_packet(), - ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket), - io:format("send disconnect packet=~p~n", [DisConnectPacket]). - - - -gen_connect_packet(ClientId) -> - Length = 6+byte_size(ClientId), - MsgType = ?SN_CONNECT, - Dup = 0, - QoS = 0, - Retain = 0, - Will = 0, - CleanSession = 1, - TopicIdType = 0, - Flag = <>, - ProtocolId = 1, - Duration = 10, - <>. - -gen_subscribe_packet(TopicId) -> - Length = 7, - MsgType = ?SN_SUBSCRIBE, - Dup = 0, - Retain = 0, - Will = 0, - QoS = 1, - CleanSession = 0, - TopicIdType = 1, - Flag = <>, - MsgId = 1, - <>. - -gen_register_packet(Topic, TopicId) -> - Length = 6+byte_size(Topic), - MsgType = ?SN_REGISTER, - MsgId = 1, - <>. - -gen_publish_packet(TopicId, Payload) -> - Length = 7+byte_size(Payload), - MsgType = ?SN_PUBLISH, - Dup = 0, - QoS = 1, - Retain = 0, - Will = 0, - CleanSession = 0, - MsgId = 1, - TopicIdType = 1, - Flag = <>, - <>. - -gen_puback_packet(TopicId, MsgId) -> - Length = 7, - MsgType = ?SN_PUBACK, - <>. - -gen_pingreq_packet() -> - Length = 2, - MsgType = ?SN_PINGREQ, - <>. - -gen_disconnect_packet()-> - Length = 2, - MsgType = ?SN_DISCONNECT, - <>. - -wait_response() -> - receive - {udp, Socket, _, _, Bin} -> - case Bin of - <<_Len:8, ?SN_PUBLISH, _Flag:8, TopicId:16, MsgId:16, Data/binary>> -> - io:format("recv publish TopicId: ~p, MsgId: ~p, Data: ~p~n", [TopicId, MsgId, Data]), - ok = gen_udp:send(Socket, ?HOST, ?PORT, gen_puback_packet(TopicId, MsgId)); - <<_Len:8, ?SN_CONNACK, 0:8>> -> - io:format("recv connect ack~n"); - <<_Len:8, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv regack TopicId=~p, MsgId=~p~n", [TopicId, MsgId]), - TopicId; - <<_Len:8, ?SN_SUBACK, Flags:8, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv suback Flags=~p TopicId=~p, MsgId=~p~n", [Flags, TopicId, MsgId]); - <<_Len:8, ?SN_PUBACK, TopicId:16, MsgId:16, 0:8>> -> - io:format("recv puback TopicId=~p, MsgId=~p~n", [TopicId, MsgId]); - <<_Len:8, ?SN_PINGRESP>> -> - io:format("recv pingresp~n"); - _ -> - io:format("ignore bin=~p~n", [Bin]) - end; - Any -> - io:format("recv something else from udp socket ~p~n", [Any]) - after - 2000 -> - io:format("Error: receive timeout!~n"), - wait_response() - end. diff --git a/apps/emqx_sn/rebar.config b/apps/emqx_sn/rebar.config deleted file mode 100644 index cbdac78f6..000000000 --- a/apps/emqx_sn/rebar.config +++ /dev/null @@ -1,26 +0,0 @@ -{deps, []}. -{plugins, [rebar3_proper]}. - -{deps, - [{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}} - ]}. - -{edoc_opts, [{preprocess, true}]}. -{erl_opts, [warn_unused_vars, - warn_shadow_vars, - warn_unused_import, - warn_obsolete_guard, - debug_info, - {parse_transform}]}. - -{dialyzer, [{warnings, [unmatched_returns, error_handling, race_conditions]} - ]}. - -{xref_checks, [undefined_function_calls, undefined_functions, - locals_not_used, deprecated_function_calls, - warnings_as_errors, deprecated_functions]}. -{cover_enabled, true}. -{cover_opts, [verbose]}. -{cover_export_enabled, true}. - -{plugins, [coveralls]}. diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src deleted file mode 100644 index 0e4e53dc8..000000000 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ /dev/null @@ -1,14 +0,0 @@ -{application, emqx_sn, - [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.4.0"}, % strict semver, bump manually! - {modules, []}, - {registered, []}, - {applications, [kernel,stdlib,esockd]}, - {mod, {emqx_sn_app,[]}}, - {env, []}, - {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQ X Team "]}, - {links, [{"Homepage", "https://emqx.io/"}, - {"Github", "https://github.com/emqx/emqx-sn"} - ]} - ]}. diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src deleted file mode 100644 index 2bd6f5646..000000000 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ /dev/null @@ -1,19 +0,0 @@ -%% -*-: erlang -*- -{VSN, - [ - {"4.3.2", [ - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []} - ]}, - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_sn} - ]} - ], - [ - {"4.3.2", [ - {load_module, emqx_sn_gateway, brutal_purge, soft_purge, []} - ]}, - {<<"4.3.[0-1]">>, [ - {restart_application, emqx_sn} - ]} - ] -}. diff --git a/apps/emqx_sn/vars b/apps/emqx_sn/vars deleted file mode 100644 index e39aa2801..000000000 --- a/apps/emqx_sn/vars +++ /dev/null @@ -1,8 +0,0 @@ -%% vars here are for test only, not intended for release - -{platform_bin_dir, "bin"}. -{platform_data_dir, "data"}. -{platform_etc_dir, "etc"}. -{platform_lib_dir, "lib"}. -{platform_log_dir, "log"}. -{platform_plugins_dir, "data/plugins"}.