From be4d2495f0c3f411a364d837031583189217659a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 13 Aug 2021 19:13:14 +0800 Subject: [PATCH] refactor(gw): single instance support only --- apps/emqx_gateway/etc/emqx_gateway.conf | 306 +++++++++--------- apps/emqx_gateway/include/emqx_gateway.hrl | 10 +- .../src/bhvrs/emqx_gateway_impl.erl | 28 +- apps/emqx_gateway/src/coap/emqx_coap_impl.erl | 94 +++--- apps/emqx_gateway/src/emqx_gateway.erl | 60 ++-- apps/emqx_gateway/src/emqx_gateway_app.erl | 55 ++-- apps/emqx_gateway/src/emqx_gateway_gw_sup.erl | 58 ++-- .../src/emqx_gateway_insta_sup.erl | 147 ++++----- .../src/emqx_gateway_registry.erl | 76 ++--- apps/emqx_gateway/src/emqx_gateway_schema.erl | 27 +- apps/emqx_gateway/src/emqx_gateway_sup.erl | 107 +++--- .../src/exproto/emqx_exproto_impl.erl | 114 +++---- .../src/lwm2m/emqx_lwm2m_impl.erl | 122 ++++--- apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl | 91 +++--- .../src/stomp/emqx_stomp_impl.erl | 97 +++--- apps/emqx_gateway/test/emqx_exproto_SUITE.erl | 13 +- .../test/emqx_gateway_registry_SUITE.erl | 2 +- apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl | 4 +- .../test/emqx_sn_protocol_SUITE.erl | 35 +- .../test/emqx_sn_registry_SUITE.erl | 4 +- apps/emqx_gateway/test/emqx_stomp_SUITE.erl | 6 +- 21 files changed, 647 insertions(+), 809 deletions(-) diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index 8c77fe652..0a5b6065e 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -4,167 +4,169 @@ gateway: { - stomp.1: { - frame: { - max_headers: 10 - max_headers_length: 1024 - max_body_length: 8192 - } + stomp: { - clientinfo_override: { - username: "${Packet.headers.login}" - password: "${Packet.headers.passcode}" - } - - authentication: { - enable: true - authenticators: [ - { - name: "authenticator1" - mechanism: password-based - server_type: built-in-database - user_id_type: clientid - } - ] - } - - listener.tcp.1: { - bind: 61613 - acceptors: 16 - max_connections: 1024000 - max_conn_rate: 1000 - active_n: 100 - } + frame: { + max_headers: 10 + max_headers_length: 1024 + max_body_length: 8192 } - coap.1: { - enable_stats: false - - authentication: { - enable: true - authenticators: [ - { - name: "authenticator1" - mechanism: password-based - server_type: built-in-database - user_id_type: clientid - } - ] - } - - #authentication.enable: false - - heartbeat: 30s - notify_type: qos - subscribe_qos: qos0 - publish_qos: qos1 - listener.udp.1: { - bind: 5683 - } + clientinfo_override: { + username: "${Packet.headers.login}" + password: "${Packet.headers.passcode}" } - mqttsn.1: { - ## The MQTT-SN Gateway ID in ADVERTISE message. - gateway_id: 1 - - ## Enable broadcast this gateway to WLAN - broadcast: true - - ## To control whether write statistics data into ETS table - ## for dashbord to read. - enable_stats: true - - ## To control whether accept and process the received - ## publish message with qos=-1. - enable_qos3: true - - ## Idle timeout for a MQTT-SN channel - idle_timeout: 30s - - ## The pre-defined topic name corresponding to the pre-defined topic - ## id of N. - ## Note that the pre-defined topic id of 0 is reserved. - predefined: [ - { id: 1 - topic: "/predefined/topic/name/hello" - }, - { id: 2 - topic: "/predefined/topic/name/nice" - } - ] - - ### ClientInfo override - clientinfo_override: { - username: "mqtt_sn_user" - password: "abc" - } - - listener.udp.1: { - bind: 1884 - max_connections: 10240000 - max_conn_rate: 1000 - } + authentication: { + enable: true + authenticators: [ + { + name: "authenticator1" + mechanism: password-based + server_type: built-in-database + user_id_type: clientid + } + ] } + listener.tcp.1: { + bind: 61613 + acceptors: 16 + max_connections: 1024000 + max_conn_rate: 1000 + active_n: 100 + } + } + + coap: { + + enable_stats: false + + authentication: { + enable: true + authenticators: [ + { + name: "authenticator1" + mechanism: password-based + server_type: built-in-database + user_id_type: clientid + } + ] + } + + #authentication.enable: false + + heartbeat: 30s + notify_type: qos + subscribe_qos: qos0 + publish_qos: qos1 + listener.udp.1: { + bind: 5683 + } + } + + mqttsn: { + ## The MQTT-SN Gateway ID in ADVERTISE message. + gateway_id: 1 + + ## Enable broadcast this gateway to WLAN + broadcast: true + + ## To control whether write statistics data into ETS table + ## for dashbord to read. + enable_stats: true + + ## To control whether accept and process the received + ## publish message with qos=-1. + enable_qos3: true + + ## Idle timeout for a MQTT-SN channel + idle_timeout: 30s + + ## The pre-defined topic name corresponding to the pre-defined topic + ## id of N. + ## Note that the pre-defined topic id of 0 is reserved. + predefined: [ + { id: 1 + topic: "/predefined/topic/name/hello" + }, + { id: 2 + topic: "/predefined/topic/name/nice" + } + ] + + ### ClientInfo override + clientinfo_override: { + username: "mqtt_sn_user" + password: "abc" + } + + listener.udp.1: { + bind: 1884 + max_connections: 10240000 + max_conn_rate: 1000 + } + } + ## Extension Protocol Gateway - exproto.1: { - - ## The gRPC server to accept requests - server: { - bind: 9100 - #ssl.keyfile: - #ssl.certfile: - #ssl.cacertfile: - } - - handler: { - address: "http://127.0.0.1:9001" - #ssl.keyfile: - #ssl.certfile: - #ssl.cacertfile: - } - - authentication.enable: false - - listener.tcp.1: { - bind: 7993 - acceptors: 8 - max_connections: 10240 - max_conn_rate: 1000 - } - - #listener.ssl.1: {} - #listener.udp.1: {} - #listener.dtls.1: {} + exproto: { + ## The gRPC server to accept requests + server: { + bind: 9100 + #ssl.keyfile: + #ssl.certfile: + #ssl.cacertfile: } - lwm2m_xml_dir: "{{ platform_etc_dir }}/lwm2m_xml" - - lwm2m.1: { - - lifetime_min: 1s - - lifetime_max: 86400s - - qmode_time_windonw: 22 - - auto_observe: false - - mountpoint: "lwm2m/%e/" - - ## always | contains_object_list - update_msg_publish_condition: contains_object_list - - translators: { - command: "dn/#" - response: "up/resp" - notify: "up/notify" - register: "up/resp" - update: "up/resp" - } - - listener.udp.1 { - bind: 5783 - } + handler: { + address: "http://127.0.0.1:9001" + #ssl.keyfile: + #ssl.certfile: + #ssl.cacertfile: } + + authentication.enable: false + + listener.tcp.1: { + bind: 7993 + acceptors: 8 + max_connections: 10240 + max_conn_rate: 1000 + } + + #listener.ssl.1: {} + #listener.udp.1: {} + #listener.dtls.1: {} + } + + + lwm2m: { + + xml_dir: "{{ platform_etc_dir }}/lwm2m_xml" + + lifetime_min: 1s + + lifetime_max: 86400s + + qmode_time_windonw: 22 + + auto_observe: false + + mountpoint: "lwm2m/%e/" + + ## always | contains_object_list + update_msg_publish_condition: contains_object_list + + translators: { + command: "dn/#" + response: "up/resp" + notify: "up/notify" + register: "up/resp" + update: "up/resp" + } + + listener.udp.1 { + bind: 5783 + } + } } diff --git a/apps/emqx_gateway/include/emqx_gateway.hrl b/apps/emqx_gateway/include/emqx_gateway.hrl index 35fad7f23..997d6bc72 100644 --- a/apps/emqx_gateway/include/emqx_gateway.hrl +++ b/apps/emqx_gateway/include/emqx_gateway.hrl @@ -20,15 +20,13 @@ -type instance_id() :: atom(). -type gateway_type() :: atom(). -%% @doc The Gateway Instace defination --type instance() :: - #{ id := instance_id() - , type := gateway_type() - , name := binary() +%% @doc The Gateway defination +-type gateway() :: + #{ type := gateway_type() , descr => binary() | undefined %% Appears only in creating or detailed info , rawconf => map() - %% Appears only in getting instance status/info + %% Appears only in getting gateway status/info , status => stopped | running }. diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl index 8d413e49c..6906043d9 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl @@ -22,29 +22,21 @@ -type reason() :: any(). %% @doc --callback init(Options :: list()) -> {error, reason()} | {ok, GwState :: state()}. - -%% @doc --callback on_insta_create(Insta :: instance(), - Ctx :: emqx_gateway_ctx:context(), - GwState :: state() - ) +-callback on_gateway_load(Gateway :: gateway(), + Ctx :: emqx_gateway_ctx:context()) -> {error, reason()} - | {ok, [GwInstaPid :: pid()], GwInstaState :: state()} + | {ok, [ChildPid :: pid()], GwState :: state()} %% TODO: v0.2 The child spec is better for restarting child process - | {ok, [Childspec :: supervisor:child_spec()], GwInstaState :: state()}. + | {ok, [Childspec :: supervisor:child_spec()], GwState :: state()}. %% @doc --callback on_insta_update(NewInsta :: instance(), - OldInsta :: instance(), - GwInstaState :: state(), - GwState :: state()) +-callback on_gateway_update(NewGateway :: gateway(), + OldGateway :: gateway(), + GwState :: state()) -> ok - | {ok, [GwInstaPid :: pid()], GwInstaState :: state()} - | {ok, [Childspec :: supervisor:child_spec()], GwInstaState :: state()} + | {ok, [ChildPid :: pid()], NGwState :: state()} + | {ok, [Childspec :: supervisor:child_spec()], NGwState :: state()} | {error, reason()}. %% @doc --callback on_insta_destroy(Insta :: instance(), - GwInstaState :: state(), - GwState :: state()) -> ok. +-callback on_gateway_unload(Gateway :: gateway(), GwState :: state()) -> ok. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl index 09426a13d..a27db934f 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl @@ -21,95 +21,85 @@ -behavior(emqx_gateway_impl). %% APIs --export([ load/0 - , unload/0 +-export([ reg/0 + , unreg/0 ]). --export([ init/1 - , on_insta_create/3 - , on_insta_update/4 - , on_insta_destroy/3 +-export([ on_gateway_load/2 + , on_gateway_update/3 + , on_gateway_unload/2 ]). -include_lib("emqx/include/logger.hrl"). --dialyzer({nowarn_function, [load/0]}). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -load() -> +reg() -> RegistryOptions = [ {cbkmod, ?MODULE} ], - Options = [], - emqx_gateway_registry:load(coap, RegistryOptions, Options). + emqx_gateway_registry:reg(coap, RegistryOptions). -unload() -> - emqx_gateway_registry:unload(coap). - -init([]) -> - GwState = #{}, - {ok, GwState}. +unreg() -> + emqx_gateway_registry:unreg(coap). %%-------------------------------------------------------------------- %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -on_insta_create(_Insta = #{id := InstaId, - rawconf := RawConf - }, Ctx, _GwState) -> +on_gateway_load(_Gateway = #{type := GwType, + rawconf := RawConf + }, Ctx) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), ListenerPids = lists:map(fun(Lis) -> - start_listener(InstaId, Ctx, Lis) + start_listener(GwType, Ctx, Lis) end, Listeners), {ok, ListenerPids, #{ctx => Ctx}}. -on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> - InstaId = maps:get(id, NewInsta), +on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> + GwType = maps:get(type, NewGateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_insta_destroy(OldInsta, GwInstaState, GwState), - on_insta_create(NewInsta, Ctx, GwState) + on_gateway_unload(OldGateway, GwState), + on_gateway_load(NewGateway, Ctx) catch Class : Reason : Stk -> - logger:error("Failed to update coap instance ~s; " + logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [InstaId, Class, Reason, Stk]), + [GwType, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_insta_destroy(_Insta = #{ id := InstaId, - rawconf := RawConf - }, - _GwInstaState, - _GWState) -> +on_gateway_unload(_Gateway = #{ type := GwType, + rawconf := RawConf + }, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(InstaId, Lis) - end, Listeners). + stop_listener(GwType, Lis) + end, Listeners). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start coap ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]), + ?ULOG("Start ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start coap ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(InstaId, Type), +start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwType, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_coap_frame, @@ -124,21 +114,21 @@ do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). -name(InstaId, Type) -> - list_to_atom(lists:concat([InstaId, ":", Type])). +name(GwType, Type) -> + list_to_atom(lists:concat([GwType, ":", Type])). -stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop coap ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop coap ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(InstaId, Type), +stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwType, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index f6c12ad53..2835b91ff 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -20,8 +20,8 @@ %% APIs -export([ registered_gateway/0 - , create/4 - , remove/1 + , load/2 + , unload/1 , lookup/1 , update/1 , start/1 @@ -37,48 +37,40 @@ registered_gateway() -> %%-------------------------------------------------------------------- %% Gateway Instace APIs --spec list() -> [instance()]. +-spec list() -> [gateway()]. list() -> - lists:append(lists:map( - fun({_, Insta}) -> Insta end, - emqx_gateway_sup:list_gateway_insta() - )). + emqx_gateway_sup:list_gateway_insta(). --spec create(gateway_type(), binary(), binary(), map()) +-spec load(gateway_type(), map()) -> {ok, pid()} | {error, any()}. -create(Type, Name, Descr, RawConf) -> - Insta = #{ id => clacu_insta_id(Type, Name) - , type => Type - , name => Name - , descr => Descr - , rawconf => RawConf - }, - emqx_gateway_sup:create_gateway_insta(Insta). +load(GwType, RawConf) -> + Gateway = #{ type => GwType + , descr => undefined + , rawconf => RawConf + }, + emqx_gateway_sup:load_gateway(Gateway). --spec remove(instance_id()) -> ok | {error, any()}. -remove(InstaId) -> - emqx_gateway_sup:remove_gateway_insta(InstaId). +-spec unload(gateway_type()) -> ok | {error, any()}. +unload(GwType) -> + emqx_gateway_sup:unload_gateway(GwType). --spec lookup(instance_id()) -> instance() | undefined. -lookup(InstaId) -> - emqx_gateway_sup:lookup_gateway_insta(InstaId). +-spec lookup(gateway_type()) -> gateway() | undefined. +lookup(GwType) -> + emqx_gateway_sup:lookup_gateway(GwType). --spec update(instance()) -> ok | {error, any()}. -update(NewInsta) -> - emqx_gateway_sup:update_gateway_insta(NewInsta). +-spec update(gateway()) -> ok | {error, any()}. +update(NewGateway) -> + emqx_gateway_sup:update_gateway(NewGateway). --spec start(instance_id()) -> ok | {error, any()}. -start(InstaId) -> - emqx_gateway_sup:start_gateway_insta(InstaId). +-spec start(gateway_type()) -> ok | {error, any()}. +start(GwType) -> + emqx_gateway_sup:start_gateway_insta(GwType). --spec stop(instance_id()) -> ok | {error, any()}. -stop(InstaId) -> - emqx_gateway_sup:stop_gateway_insta(InstaId). +-spec stop(gateway_type()) -> ok | {error, any()}. +stop(GwType) -> + emqx_gateway_sup:stop_gateway_insta(GwType). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- - -clacu_insta_id(Type, Name) when is_binary(Name) -> - list_to_atom(lists:concat([Type, "#", binary_to_list(Name)])). diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl index f4918e75d..b27319eac 100644 --- a/apps/emqx_gateway/src/emqx_gateway_app.erl +++ b/apps/emqx_gateway/src/emqx_gateway_app.erl @@ -27,7 +27,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_gateway_sup:start_link(), emqx_gateway_cli:load(), load_default_gateway_applications(), - create_gateway_by_default(), + load_gateway_by_default(), {ok, Sup}. stop(_State) -> @@ -40,56 +40,43 @@ stop(_State) -> load_default_gateway_applications() -> Apps = gateway_type_searching(), ?LOG(info, "Starting the default gateway types: ~p", [Apps]), - lists:foreach(fun load/1, Apps). + lists:foreach(fun reg/1, Apps). gateway_type_searching() -> %% FIXME: Hardcoded apps [emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl, emqx_coap_impl, emqx_lwm2m_impl]. -load(Mod) -> +reg(Mod) -> try - Mod:load(), - ?LOG(info, "Load ~s gateway application successfully!", [Mod]) + Mod:reg(), + ?LOG(info, "Register ~s gateway application successfully!", [Mod]) catch - Class : Reason -> - ?LOG(error, "Load ~s gateway application failed: {~p, ~p}", - [Mod, Class, Reason]) + Class : Reason : Stk -> + ?LOG(error, "Failed to register ~s gateway application: {~p, ~p}\n" + "Stacktrace: ~0p", + [Mod, Class, Reason, Stk]) end. -create_gateway_by_default() -> - create_gateway_by_default(zipped_confs()). +load_gateway_by_default() -> + load_gateway_by_default(confs()). -create_gateway_by_default([]) -> +load_gateway_by_default([]) -> ok; -create_gateway_by_default([{Type, Name, Confs}|More]) -> +load_gateway_by_default([{Type, Confs}|More]) -> case emqx_gateway_registry:lookup(Type) of undefined -> - ?LOG(error, "Skip to start ~s#~s: not_registred_type", - [Type, Name]); + ?LOG(error, "Skip to load ~s gateway, because it is not registered", + [Type]); _ -> - case emqx_gateway:create(Type, - atom_to_binary(Name, utf8), - <<>>, - Confs) of + case emqx_gateway:load(Type, Confs) of {ok, _} -> - ?LOG(debug, "Start ~s#~s successfully!", [Type, Name]); + ?LOG(debug, "Load ~s gateway successfully!", [Type]); {error, Reason} -> - ?LOG(error, "Start ~s#~s failed: ~0p", - [Type, Name, Reason]) + ?LOG(error, "Failed to load ~s gateway: ~0p", [Type, Reason]) end end, - create_gateway_by_default(More). + load_gateway_by_default(More). -zipped_confs() -> - All = maps:to_list( - maps:without(exclude_options(), emqx_config:get([gateway]))), - lists:append(lists:foldr( - fun({Type, Gws}, Acc) -> - {Names, Confs} = lists:unzip(maps:to_list(Gws)), - Types = [ Type || _ <- lists:seq(1, length(Names))], - [lists:zip3(Types, Names, Confs) | Acc] - end, [], All)). - -exclude_options() -> - [lwm2m_xml_dir]. +confs() -> + maps:to_list(emqx_config:get([gateway], [])). diff --git a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl index 21ad30c0d..37bfd10a3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl @@ -15,6 +15,10 @@ %%-------------------------------------------------------------------- %% @doc The Gateway Top supervisor. +%% +%% This supervisor has monitor a bunch of process/resources depended by +%% gateway runtime +%% -module(emqx_gateway_gw_sup). -behaviour(supervisor). @@ -41,64 +45,62 @@ start_link(Type) -> supervisor:start_link({local, Type}, ?MODULE, [Type]). --spec create_insta(pid(), instance(), map()) -> {ok, GwInstaPid :: pid()} | {error, any()}. -create_insta(Sup, Insta = #{id := InstaId}, GwDscrptr) -> - case emqx_gateway_utils:find_sup_child(Sup, InstaId) of +-spec create_insta(pid(), gateway(), map()) -> {ok, GwInstaPid :: pid()} | {error, any()}. +create_insta(Sup, Gateway = #{type := GwType}, GwDscrptr) -> + case emqx_gateway_utils:find_sup_child(Sup, GwType) of {ok, _GwInstaPid} -> {error, alredy_existed}; false -> - %% XXX: More instances options to it? - %% - Ctx = ctx(Sup, InstaId), + Ctx = ctx(Sup, GwType), %% ChildSpec = emqx_gateway_utils:childspec( - InstaId, + GwType, worker, emqx_gateway_insta_sup, - [Insta, Ctx, GwDscrptr] + [Gateway, Ctx, GwDscrptr] ), emqx_gateway_utils:supervisor_ret( supervisor:start_child(Sup, ChildSpec) ) end. --spec remove_insta(pid(), InstaId :: atom()) -> ok | {error, any()}. -remove_insta(Sup, InstaId) -> - case emqx_gateway_utils:find_sup_child(Sup, InstaId) of +-spec remove_insta(pid(), GwType :: gateway_type()) -> ok | {error, any()}. +remove_insta(Sup, GwType) -> + case emqx_gateway_utils:find_sup_child(Sup, GwType) of false -> ok; {ok, _GwInstaPid} -> - ok = supervisor:terminate_child(Sup, InstaId), - ok = supervisor:delete_child(Sup, InstaId) + ok = supervisor:terminate_child(Sup, GwType), + ok = supervisor:delete_child(Sup, GwType) end. --spec update_insta(pid(), NewInsta :: instance()) -> ok | {error, any()}. -update_insta(Sup, NewInsta = #{id := InstaId}) -> - case emqx_gateway_utils:find_sup_child(Sup, InstaId) of +-spec update_insta(pid(), NewGateway :: gateway()) -> ok | {error, any()}. +update_insta(Sup, NewGateway = #{type := GwType}) -> + case emqx_gateway_utils:find_sup_child(Sup, GwType) of false -> {error, not_found}; {ok, GwInstaPid} -> - emqx_gateway_insta_sup:update(GwInstaPid, NewInsta) + emqx_gateway_insta_sup:update(GwInstaPid, NewGateway) end. --spec start_insta(pid(), atom()) -> ok | {error, any()}. -start_insta(Sup, InstaId) -> - case emqx_gateway_utils:find_sup_child(Sup, InstaId) of +-spec start_insta(pid(), gateway_type()) -> ok | {error, any()}. +start_insta(Sup, GwType) -> + case emqx_gateway_utils:find_sup_child(Sup, GwType) of false -> {error, not_found}; {ok, GwInstaPid} -> emqx_gateway_insta_sup:enable(GwInstaPid) end. --spec stop_insta(pid(), atom()) -> ok | {error, any()}. -stop_insta(Sup, InstaId) -> - case emqx_gateway_utils:find_sup_child(Sup, InstaId) of +-spec stop_insta(pid(), gateway_type()) -> ok | {error, any()}. +stop_insta(Sup, GwType) -> + case emqx_gateway_utils:find_sup_child(Sup, GwType) of false -> {error, not_found}; {ok, GwInstaPid} -> emqx_gateway_insta_sup:disable(GwInstaPid) end. --spec list_insta(pid()) -> [instance()]. +-spec list_insta(pid()) -> [gateway()]. list_insta(Sup) -> lists:filtermap( - fun({InstaId, GwInstaPid, _Type, _Mods}) -> - is_gateway_insta_id(InstaId) + fun({GwType, GwInstaPid, _Type, _Mods}) -> + is_gateway_insta_id(GwType) andalso {true, emqx_gateway_insta_sup:info(GwInstaPid)} end, supervisor:which_children(Sup)). @@ -119,10 +121,10 @@ init([Type]) -> %% Internal funcs %%-------------------------------------------------------------------- -ctx(Sup, InstaId) -> +ctx(Sup, GwType) -> {_, Type} = erlang:process_info(Sup, registered_name), {ok, CM} = emqx_gateway_utils:find_sup_child(Sup, emqx_gateway_cm), - #{ instid => InstaId + #{ instid => GwType , type => Type , cm => CM }. diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 286be76e5..4fb909483 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc The gateway instance management +%% @doc The gateway runtime -module(emqx_gateway_insta_sup). -behaviour(gen_server). @@ -40,42 +40,42 @@ ]). -record(state, { - insta :: instance(), + gw :: gateway(), ctx :: emqx_gateway_ctx:context(), status :: stopped | running, child_pids :: [pid()], - insta_state :: emqx_gateway_impl:state() | undefined + gw_state :: emqx_gateway_impl:state() | undefined }). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -start_link(Insta, Ctx, GwDscrptr) -> +start_link(Gateway, Ctx, GwDscrptr) -> gen_server:start_link( ?MODULE, - [Insta, Ctx, GwDscrptr], + [Gateway, Ctx, GwDscrptr], [] ). --spec info(pid()) -> instance(). +-spec info(pid()) -> gateway(). info(Pid) -> gen_server:call(Pid, info). -%% @doc Stop instance +%% @doc Stop gateway -spec disable(pid()) -> ok | {error, any()}. disable(Pid) -> call(Pid, disable). -%% @doc Start instance +%% @doc Start gateway -spec enable(pid()) -> ok | {error, any()}. enable(Pid) -> call(Pid, enable). -%% @doc Update the gateway instance configurations --spec update(pid(), instance()) -> ok | {error, any()}. -update(Pid, NewInsta) -> - call(Pid, {update, NewInsta}). +%% @doc Update the gateway configurations +-spec update(pid(), gateway()) -> ok | {error, any()}. +update(Pid, NewGateway) -> + call(Pid, {update, NewGateway}). call(Pid, Req) -> gen_server:call(Pid, Req, 5000). @@ -84,30 +84,29 @@ call(Pid, Req) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Insta, Ctx0, _GwDscrptr]) -> +init([Gateway, Ctx0, _GwDscrptr]) -> process_flag(trap_exit, true), - #{id := InstaId, rawconf := RawConf} = Insta, - Ctx = do_init_context(InstaId, RawConf, Ctx0), + #{type := GwType, rawconf := RawConf} = Gateway, + Ctx = do_init_context(GwType, RawConf, Ctx0), State = #state{ - insta = Insta, + gw = Gateway, ctx = Ctx, child_pids = [], status = stopped }, - case cb_insta_create(State) of - {error, _Reason} -> + case cb_gateway_load(State) of + {error, Reason} -> do_deinit_context(Ctx), - %% XXX: Return Reason?? - {stop, create_gateway_instance_failed}; + {stop, {load_gateway_failure, Reason}}; {ok, NState} -> {ok, NState} end. -do_init_context(InstaId, RawConf, Ctx) -> +do_init_context(GwType, RawConf, Ctx) -> Auth = case maps:get(authentication, RawConf, #{enable => false}) of #{enable := true, authenticators := AuthCfgs} when is_list(AuthCfgs) -> - create_authenticators_for_gateway_insta(InstaId, AuthCfgs); + create_authenticators_for_gateway_insta(GwType, AuthCfgs); _ -> undefined end, @@ -117,13 +116,13 @@ do_deinit_context(Ctx) -> cleanup_authenticators_for_gateway_insta(maps:get(auth, Ctx)), ok. -handle_call(info, _From, State = #state{insta = Insta}) -> - {reply, Insta, State}; +handle_call(info, _From, State = #state{gw = Gateway}) -> + {reply, Gateway, State}; handle_call(disable, _From, State = #state{status = Status}) -> case Status of running -> - case cb_insta_destroy(State) of + case cb_gateway_unload(State) of {ok, NState} -> {reply, ok, NState}; {error, Reason} -> @@ -136,7 +135,7 @@ handle_call(disable, _From, State = #state{status = Status}) -> handle_call(enable, _From, State = #state{status = Status}) -> case Status of stopped -> - case cb_insta_create(State) of + case cb_gateway_load(State) of {error, Reason} -> {reply, {error, Reason}, State}; {ok, NState} -> @@ -147,28 +146,30 @@ handle_call(enable, _From, State = #state{status = Status}) -> end; %% Stopped -> update -handle_call({update, NewInsta}, _From, State = #state{insta = Insta, +handle_call({update, NewGateway}, _From, State = #state{gw = Gateway, status = stopped}) -> - case maps:get(id, NewInsta, undefined) == maps:get(id, Insta, undefined) of + case maps:get(type, NewGateway, undefined) + == maps:get(type, Gateway, undefined) of true -> - {reply, ok, State#state{insta = NewInsta}}; + {reply, ok, State#state{gw = NewGateway}}; false -> - {reply, {error, bad_instan_id}, State} + {reply, {error, gateway_type_not_match}, State} end; %% Running -> update -handle_call({update, NewInsta}, _From, State = #state{insta = Insta, +handle_call({update, NewGateway}, _From, State = #state{gw = Gateway, status = running}) -> - case maps:get(id, NewInsta, undefined) == maps:get(id, Insta, undefined) of + case maps:get(type, NewGateway, undefined) + == maps:get(type, Gateway, undefined) of true -> - case cb_insta_update(NewInsta, State) of + case cb_gateway_update(NewGateway, State) of {ok, NState} -> {reply, ok, NState}; {error, Reason} -> {reply, {error, Reason}, State} end; false -> - {reply, {error, bad_instan_id}, State} + {reply, {error, gateway_type_not_match}, State} end; handle_call(_Request, _From, State) -> @@ -187,7 +188,7 @@ handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) -> logger:error("All child process exited!"), {noreply, State#state{status = stopped, child_pids = [], - insta_state = undefined}}; + gw_state = undefined}}; RemainPids -> {noreply, State#state{child_pids = RemainPids}} end; @@ -201,10 +202,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, State = #state{ctx = Ctx, child_pids = Pids}) -> - %% Cleanup instances - %% Step1. Destory instance - Pids /= [] andalso (_ = cb_insta_destroy(State)), - %% Step2. Delete authenticator resources + Pids /= [] andalso (_ = cb_gateway_unload(State)), _ = do_deinit_context(Ctx), ok. @@ -217,8 +215,8 @@ code_change(_OldVsn, State, _Extra) -> %% @doc AuthCfgs is a array of authenticatior configurations, %% see: emqx_authn_schema:authenticators/1 -create_authenticators_for_gateway_insta(InstaId0, AuthCfgs) -> - ChainId = atom_to_binary(InstaId0, utf8), +create_authenticators_for_gateway_insta(GwType, AuthCfgs) -> + ChainId = atom_to_binary(GwType, utf8), case emqx_authn:create_chain(#{id => ChainId}) of {ok, _ChainInfo} -> Results = lists:map(fun(AuthCfg = #{name := Name}) -> @@ -245,88 +243,85 @@ cleanup_authenticators_for_gateway_insta(ChainId) -> case emqx_authn:delete_chain(ChainId) of ok -> ok; {error, {not_found, _}} -> - logger:warning("Failed clean authenticator chain: ~s, " + logger:warning("Failed to clean authenticator chain: ~s, " "reason: not_found", [ChainId]); {error, Reason} -> - logger:error("Failed clean authenticator chain: ~s, " + logger:error("Failed to clean authenticator chain: ~s, " "reason: ~p", [ChainId, Reason]) end. -cb_insta_destroy(State = #state{insta = Insta = #{type := Type}, - insta_state = InstaState}) -> +cb_gateway_unload(State = #state{gw = Gateway = #{type := GwType}, + gw_state = GwState}) -> try - #{cbkmod := CbMod, - state := GwState} = emqx_gateway_registry:lookup(Type), - CbMod:on_insta_destroy(Insta, InstaState, GwState), + #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType), + CbMod:on_gateway_unload(Gateway, GwState, GwState), {ok, State#state{child_pids = [], - insta_state = undefined, + gw_state = undefined, status = stopped}} catch Class : Reason : Stk -> - logger:error("Destroy instance (~0p, ~0p, _) crashed: " + logger:error("Failed to unload gateway (~0p, ~0p) crashed: " "{~p, ~p}, stacktrace: ~0p", - [Insta, InstaState, + [Gateway, GwState, Class, Reason, Stk]), {error, {Class, Reason, Stk}} end. -cb_insta_create(State = #state{insta = Insta = #{type := Type}, - ctx = Ctx}) -> +cb_gateway_load(State = #state{gw = Gateway = #{type := GwType}, + ctx = Ctx}) -> try - #{cbkmod := CbMod, - state := GwState} = emqx_gateway_registry:lookup(Type), - case CbMod:on_insta_create(Insta, Ctx, GwState) of + #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType), + case CbMod:on_gateway_load(Gateway, Ctx) of {error, Reason} -> throw({callback_return_error, Reason}); - {ok, InstaPidOrSpecs, InstaState} -> - ChildPids = start_child_process(InstaPidOrSpecs), + {ok, ChildPidOrSpecs, GwState} -> + ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ status = running, child_pids = ChildPids, - insta_state = InstaState + gw_state = GwState }} end catch Class : Reason1 : Stk -> - logger:error("Create instance (~0p, ~0p, _) crashed: " + logger:error("Failed to load ~s gateway (~0p, ~0p) crashed: " "{~p, ~p}, stacktrace: ~0p", - [Insta, Ctx, + [GwType, Gateway, Ctx, Class, Reason1, Stk]), {error, {Class, Reason1, Stk}} end. -cb_insta_update(NewInsta, - State = #state{insta = Insta = #{type := Type}, - ctx = Ctx, - insta_state = GwInstaState}) -> +cb_gateway_update(NewGateway, + State = #state{gw = Gateway = #{type := GwType}, + ctx = Ctx, + gw_state = GwState}) -> try - #{cbkmod := CbMod, - state := GwState} = emqx_gateway_registry:lookup(Type), - case CbMod:on_insta_update(NewInsta, Insta, GwInstaState, GwState) of + #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType), + case CbMod:on_gateway_update(NewGateway, Gateway, GwState) of {error, Reason} -> throw({callback_return_error, Reason}); - {ok, InstaPidOrSpecs, InstaState} -> + {ok, ChildPidOrSpecs, NGwState} -> %% XXX: Hot-upgrade ??? - ChildPids = start_child_process(InstaPidOrSpecs), + ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ status = running, child_pids = ChildPids, - insta_state = InstaState + gw_state = NGwState }} end catch Class : Reason1 : Stk -> - logger:error("Update instance (~0p, ~0p, ~0p, _) crashed: " + logger:error("Failed to update gateway (~0p, ~0p, ~0p) crashed: " "{~p, ~p}, stacktrace: ~0p", - [NewInsta, Insta, Ctx, + [NewGateway, Gateway, Ctx, Class, Reason1, Stk]), {error, {Class, Reason1, Stk}} end. -start_child_process([Indictor|_] = InstaPidOrSpecs) -> +start_child_process([Indictor|_] = ChildPidOrSpecs) -> case erlang:is_pid(Indictor) of true -> - InstaPidOrSpecs; + ChildPidOrSpecs; _ -> - do_start_child_process(InstaPidOrSpecs) + do_start_child_process(ChildPidOrSpecs) end. do_start_child_process(ChildSpecs) when is_list(ChildSpecs) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_registry.erl b/apps/emqx_gateway/src/emqx_gateway_registry.erl index 07cc37bd8..cc46a0173 100644 --- a/apps/emqx_gateway/src/emqx_gateway_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_registry.erl @@ -23,11 +23,9 @@ -behavior(gen_server). %% APIs for Impl. --export([ load/3 - , unload/1 - ]). - --export([ list/0 +-export([ reg/2 + , unreg/1 + , list/0 , lookup/1 ]). @@ -44,9 +42,17 @@ ]). -record(state, { - loaded = #{} :: #{ gateway_type() => descriptor() } + reged = #{} :: #{ gateway_type() => descriptor() } }). +-type registry_options() :: [registry_option()]. + +-type registry_option() :: {cbkmod, atom()}. + +-type descriptor() :: #{ cbkmod := atom() + , rgopts := registry_options() + }. + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -58,37 +64,24 @@ start_link() -> %% Mgmt %%-------------------------------------------------------------------- --type registry_options() :: [registry_option()]. - --type registry_option() :: {cbkmod, atom()}. - --type gateway_options() :: list(). - --type descriptor() :: #{ cbkmod := atom() - , rgopts := registry_options() - , gwopts := gateway_options() - , state => any() - }. - --spec load(gateway_type(), registry_options(), gateway_options()) +-spec reg(gateway_type(), registry_options()) -> ok | {error, any()}. -load(Type, RgOpts, GwOpts) -> +reg(Type, RgOpts) -> CbMod = proplists:get_value(cbkmod, RgOpts, Type), Dscrptr = #{ cbkmod => CbMod , rgopts => RgOpts - , gwopts => GwOpts }, - call({load, Type, Dscrptr}). + call({reg, Type, Dscrptr}). --spec unload(gateway_type()) -> ok | {error, any()}. -unload(Type) -> +-spec unreg(gateway_type()) -> ok | {error, any()}. +unreg(Type) -> %% TODO: Checking ALL INSTACE HAS STOPPED - call({unload, Type}). + call({unreg, Type}). %% TODO: -%unload(Type, Force) -> -% call({unload, Type, Froce}). +%unreg(Type, Force) -> +% call({unreg, Type, Froce}). %% @doc Return all registered protocol gateway implementation -spec list() -> [{gateway_type(), descriptor()}]. @@ -109,41 +102,30 @@ call(Req) -> init([]) -> %% TODO: Metrics ??? process_flag(trap_exit, true), - {ok, #state{loaded = #{}}}. + {ok, #state{reged = #{}}}. -handle_call({load, Type, Dscrptr}, _From, State = #state{loaded = Gateways}) -> +handle_call({reg, Type, Dscrptr}, _From, State = #state{reged = Gateways}) -> case maps:get(Type, Gateways, notfound) of notfound -> - try - GwOpts = maps:get(gwopts, Dscrptr), - CbMod = maps:get(cbkmod, Dscrptr), - {ok, GwState} = CbMod:init(GwOpts), - NDscrptr = maps:put(state, GwState, Dscrptr), - NGateways = maps:put(Type, NDscrptr, Gateways), - {reply, ok, State#state{loaded = NGateways}} - catch - Class : Reason : Stk -> - logger:error("Load ~s crashed {~p, ~p}; stacktrace: ~0p", - [Type, Class, Reason, Stk]), - {reply, {error, {Class, Reason}}, State} - end; + NGateways = maps:put(Type, Dscrptr, Gateways), + {reply, ok, State#state{reged = NGateways}}; _ -> {reply, {error, already_existed}, State} end; -handle_call({unload, Type}, _From, State = #state{loaded = Gateways}) -> +handle_call({unreg, Type}, _From, State = #state{reged = Gateways}) -> case maps:get(Type, Gateways, undefined) of undefined -> {reply, ok, State}; _ -> - emqx_gateway_sup:stop_all_suptree(Type), - {reply, ok, State#state{loaded = maps:remove(Type, Gateways)}} + emqx_gateway_sup:unload_gateway(Type), + {reply, ok, State#state{reged = maps:remove(Type, Gateways)}} end; -handle_call(all, _From, State = #state{loaded = Gateways}) -> +handle_call(all, _From, State = #state{reged = Gateways}) -> {reply, maps:to_list(Gateways), State}; -handle_call({lookup, Type}, _From, State = #state{loaded = Gateways}) -> +handle_call({lookup, Type}, _From, State = #state{reged = Gateways}) -> Reply = maps:get(Type, Gateways, undefined), {reply, Reply, State}; diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 5c98e1f34..85d57a51b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -32,17 +32,13 @@ structs() -> ["gateway"]. fields("gateway") -> - [{stomp, t(ref(stomp))}, - {mqttsn, t(ref(mqttsn))}, - {coap, t(ref(coap))}, - {lwm2m, t(ref(lwm2m))}, - {lwm2m_xml_dir, t(string())}, - {exproto, t(ref(exproto))} + [{stomp, t(ref(stomp_structs))}, + {mqttsn, t(ref(mqttsn_structs))}, + {coap, t(ref(coap_structs))}, + {lwm2m, t(ref(lwm2m_structs))}, + {exproto, t(ref(exproto_structs))} ]; -fields(stomp) -> - [{"$id", t(ref(stomp_structs))}]; - fields(stomp_structs) -> [ {frame, t(ref(stomp_frame))} , {clientinfo_override, t(ref(clientinfo_override))} @@ -56,9 +52,6 @@ fields(stomp_frame) -> , {max_body_length, t(integer(), undefined, 8192)} ]; -fields(mqttsn) -> - [{"$id", t(ref(mqttsn_structs))}]; - fields(mqttsn_structs) -> [ {gateway_id, t(integer())} , {broadcast, t(boolean())} @@ -76,12 +69,9 @@ fields(mqttsn_predefined) -> , {topic, t(string())} ]; -fields(lwm2m) -> - [{"$id", t(ref(lwm2m_structs))} - ]; - fields(lwm2m_structs) -> - [ {lifetime_min, t(duration())} + [ {xml_dir, t(string())} + , {lifetime_min, t(duration())} , {lifetime_max, t(duration())} , {qmode_time_windonw, t(integer())} , {auto_observe, t(boolean())} @@ -91,9 +81,6 @@ fields(lwm2m_structs) -> , {listener, t(ref(udp_listener_group))} ]; -fields(exproto) -> - [{"$id", t(ref(exproto_structs))}]; - fields(exproto_structs) -> [ {server, t(ref(exproto_grpc_server))} , {handler, t(ref(exproto_grpc_handler))} diff --git a/apps/emqx_gateway/src/emqx_gateway_sup.erl b/apps/emqx_gateway/src/emqx_gateway_sup.erl index c974060d0..4f340913e 100644 --- a/apps/emqx_gateway/src/emqx_gateway_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_sup.erl @@ -22,22 +22,16 @@ -export([start_link/0]). -%% Gateway Instance APIs --export([ create_gateway_insta/1 - , remove_gateway_insta/1 - , lookup_gateway_insta/1 - , update_gateway_insta/1 +%% Gateway APIs +-export([ load_gateway/1 + , unload_gateway/1 + , lookup_gateway/1 + , update_gateway/1 , start_gateway_insta/1 , stop_gateway_insta/1 - , list_gateway_insta/1 , list_gateway_insta/0 ]). -%% Gateway APs --export([ list_started_gateway/0 - , stop_all_suptree/1 - ]). - %% supervisor callbacks -export([init/1]). @@ -48,88 +42,71 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec create_gateway_insta(instance()) -> {ok, pid()} | {error, any()}. -create_gateway_insta(Insta = #{type := Type}) -> - case emqx_gateway_registry:lookup(Type) of - undefined -> {error, {unknown_gateway_id, Type}}; + +-spec load_gateway(gateway()) -> {ok, pid()} | {error, any()}. +load_gateway(Gateway = #{type := GwType}) -> + case emqx_gateway_registry:lookup(GwType) of + undefined -> {error, {unknown_gateway_type, GwType}}; GwDscrptr -> - {ok, GwSup} = ensure_gateway_suptree_ready(gatewayid(Type)), - emqx_gateway_gw_sup:create_insta(GwSup, Insta, GwDscrptr) + {ok, GwSup} = ensure_gateway_suptree_ready(GwType), + emqx_gateway_gw_sup:create_insta(GwSup, Gateway, GwDscrptr) end. --spec remove_gateway_insta(instance_id()) -> ok | {error, any()}. -remove_gateway_insta(InstaId) -> - case search_gateway_insta_proc(InstaId) of - {ok, {GwSup, _}} -> - emqx_gateway_gw_sup:remove_insta(GwSup, InstaId); +-spec unload_gateway(gateway_type()) -> ok | {error, not_found}. +unload_gateway(GwType) -> + case lists:keyfind(GwType, 1, supervisor:which_children(?MODULE)) of + false -> {error, not_found}; _ -> + _ = supervisor:terminate_child(?MODULE, GwType), + _ = supervisor:delete_child(?MODULE, GwType), ok end. --spec lookup_gateway_insta(instance_id()) -> instance() | undefined. -lookup_gateway_insta(InstaId) -> - case search_gateway_insta_proc(InstaId) of +-spec lookup_gateway(gateway_type()) -> gateway() | undefined. +lookup_gateway(GwType) -> + case search_gateway_insta_proc(GwType) of {ok, {_, GwInstaPid}} -> emqx_gateway_insta_sup:info(GwInstaPid); _ -> undefined end. --spec update_gateway_insta(instance()) +-spec update_gateway(gateway_type()) -> ok | {error, any()}. -update_gateway_insta(NewInsta = #{type := Type}) -> - case emqx_gateway_utils:find_sup_child(?MODULE, gatewayid(Type)) of +update_gateway(NewGateway = #{type := GwType}) -> + case emqx_gateway_utils:find_sup_child(?MODULE, GwType) of {ok, GwSup} -> - emqx_gateway_gw_sup:update_insta(GwSup, NewInsta); + emqx_gateway_gw_sup:update_insta(GwSup, NewGateway); _ -> {error, not_found} end. -start_gateway_insta(InstaId) -> - case search_gateway_insta_proc(InstaId) of +start_gateway_insta(GwType) -> + case search_gateway_insta_proc(GwType) of {ok, {GwSup, _}} -> - emqx_gateway_gw_sup:start_insta(GwSup, InstaId); + emqx_gateway_gw_sup:start_insta(GwSup, GwType); _ -> {error, not_found} end. --spec stop_gateway_insta(instance_id()) -> ok | {error, any()}. -stop_gateway_insta(InstaId) -> - case search_gateway_insta_proc(InstaId) of +-spec stop_gateway_insta(gateway_type()) -> ok | {error, any()}. +stop_gateway_insta(GwType) -> + case search_gateway_insta_proc(GwType) of {ok, {GwSup, _}} -> - emqx_gateway_gw_sup:stop_insta(GwSup, InstaId); + emqx_gateway_gw_sup:stop_insta(GwSup, GwType); _ -> {error, not_found} end. --spec list_gateway_insta(gateway_type()) -> {ok, [instance()]} | {error, any()}. -list_gateway_insta(Type) -> - case emqx_gateway_utils:find_sup_child(?MODULE, gatewayid(Type)) of - {ok, GwSup} -> - {ok, emqx_gateway_gw_sup:list_insta(GwSup)}; - _ -> {error, not_found} - end. - --spec list_gateway_insta() -> [{gateway_type(), instance()}]. +-spec list_gateway_insta() -> [gateway()]. list_gateway_insta() -> - lists:map( + lists:append(lists:map( fun(SupId) -> - Instas = emqx_gateway_gw_sup:list_insta(SupId), - {SupId, Instas} - end, list_started_gateway()). + emqx_gateway_gw_sup:list_insta(SupId) + end, list_started_gateway())). -spec list_started_gateway() -> [gateway_type()]. list_started_gateway() -> started_gateway_type(). --spec stop_all_suptree(atom()) -> ok. -stop_all_suptree(Type) -> - case lists:keyfind(Type, 1, supervisor:which_children(?MODULE)) of - false -> ok; - _ -> - _ = supervisor:terminate_child(?MODULE, Type), - _ = supervisor:delete_child(?MODULE, Type), - ok - end. - %% Supervisor callback init([]) -> @@ -145,17 +122,14 @@ init([]) -> %% Internal funcs %%-------------------------------------------------------------------- -gatewayid(Type) -> - list_to_atom(lists:concat([Type])). - -ensure_gateway_suptree_ready(Type) -> - case lists:keyfind(Type, 1, supervisor:which_children(?MODULE)) of +ensure_gateway_suptree_ready(GwType) -> + case lists:keyfind(GwType, 1, supervisor:which_children(?MODULE)) of false -> ChildSpec = emqx_gateway_utils:childspec( - Type, + GwType, supervisor, emqx_gateway_gw_sup, - [Type] + [GwType] ), emqx_gateway_utils:supervisor_ret( supervisor:start_child(?MODULE, ChildSpec) @@ -190,4 +164,3 @@ started_gateway_pid() -> is_a_gateway_id(Id) -> Id /= emqx_gateway_registry. - diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 6f47e25ff..363af9f16 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -20,16 +20,13 @@ -behavior(emqx_gateway_impl). %% APIs --export([ load/0 - , unload/0 +-export([ reg/0 + , unreg/0 ]). --export([]). - --export([ init/1 - , on_insta_create/3 - , on_insta_update/4 - , on_insta_destroy/3 +-export([ on_gateway_load/2 + , on_gateway_update/3 + , on_gateway_unload/2 ]). -include_lib("emqx/include/logger.hrl"). @@ -38,24 +35,19 @@ %% APIs %%-------------------------------------------------------------------- -load() -> +reg() -> RegistryOptions = [ {cbkmod, ?MODULE} ], - emqx_gateway_registry:load(exproto, RegistryOptions, []). - -unload() -> - emqx_gateway_registry:unload(exproto). - -init(_) -> - GwState = #{}, - {ok, GwState}. + emqx_gateway_registry:reg(exproto, RegistryOptions). +unreg() -> + emqx_gateway_registry:unreg(exproto). %%-------------------------------------------------------------------- %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -start_grpc_server(InstaId, Options = #{bind := ListenOn}) -> +start_grpc_server(GwType, Options = #{bind := ListenOn}) -> Services = #{protos => [emqx_exproto_pb], services => #{ 'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr} @@ -65,10 +57,10 @@ start_grpc_server(InstaId, Options = #{bind := ListenOn}) -> SslOpts -> [{ssl_options, SslOpts}] end, - _ = grpc:start_server(InstaId, ListenOn, Services, SvrOptions), - ?ULOG("Start ~s gRPC server on ~p successfully.~n", [InstaId, ListenOn]). + _ = grpc:start_server(GwType, ListenOn, Services, SvrOptions), + ?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwType, ListenOn]). -start_grpc_client_channel(InstaId, Options = #{address := UriStr}) -> +start_grpc_client_channel(GwType, Options = #{address := UriStr}) -> UriMap = uri_string:parse(UriStr), Scheme = maps:get(scheme, UriMap), Host = maps:get(host, UriMap), @@ -85,79 +77,79 @@ start_grpc_client_channel(InstaId, Options = #{address := UriStr}) -> transport_opts => SslOpts}}; _ -> #{} end, - grpc_client_sup:create_channel_pool(InstaId, SvrAddr, ClientOpts). + grpc_client_sup:create_channel_pool(GwType, SvrAddr, ClientOpts). -on_insta_create(_Insta = #{ id := InstaId, - rawconf := RawConf - }, Ctx, _GwState) -> +on_gateway_load(_Gateway = #{ type := GwType, + rawconf := RawConf + }, Ctx) -> %% XXX: How to monitor it ? %% Start grpc client pool & client channel - PoolName = pool_name(InstaId), + PoolName = pool_name(GwType), PoolSize = emqx_vm:schedulers() * 2, {ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize, {emqx_exproto_gcli, start_link, []}), - _ = start_grpc_client_channel(InstaId, maps:get(handler, RawConf)), + _ = start_grpc_client_channel(GwType, maps:get(handler, RawConf)), %% XXX: How to monitor it ? - _ = start_grpc_server(InstaId, maps:get(server, RawConf)), + _ = start_grpc_server(GwType, maps:get(server, RawConf)), NRawConf = maps:without( [server, handler], RawConf#{pool_name => PoolName} ), Listeners = emqx_gateway_utils:normalize_rawconf( - NRawConf#{handler => InstaId} + NRawConf#{handler => GwType} ), ListenerPids = lists:map(fun(Lis) -> - start_listener(InstaId, Ctx, Lis) + start_listener(GwType, Ctx, Lis) end, Listeners), - {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. + {ok, ListenerPids, _GwState = #{ctx => Ctx}}. -on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> - InstaId = maps:get(id, NewInsta), +on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> + GwType = maps:get(type, NewGateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_insta_destroy(OldInsta, GwInstaState, GwState), - on_insta_create(NewInsta, Ctx, GwState) + on_gateway_unload(OldGateway, GwState), + on_gateway_load(NewGateway, Ctx) catch Class : Reason : Stk -> - logger:error("Failed to update exproto instance ~s; " + logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [InstaId, Class, Reason, Stk]), + [GwType, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_insta_destroy(_Insta = #{ id := InstaId, - rawconf := RawConf - }, _GwInstaState, _GwState) -> +on_gateway_unload(_Gateway = #{ type := GwType, + rawconf := RawConf + }, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(InstaId, Lis) + stop_listener(GwType, Lis) end, Listeners). -pool_name(InstaId) -> - list_to_atom(lists:concat([InstaId, "_gcli_pool"])). +pool_name(GwType) -> + list_to_atom(lists:concat([GwType, "_gcli_pool"])). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start exproto ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]), + ?ULOG("Start ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start exproto ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(InstaId, Type), +start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwType, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_exproto_frame, @@ -176,8 +168,8 @@ do_start_listener(udp, Name, ListenOn, Opts, MFA) -> do_start_listener(dtls, Name, ListenOn, Opts, MFA) -> esockd:open_dtls(Name, ListenOn, Opts, MFA). -name(InstaId, Type) -> - list_to_atom(lists:concat([InstaId, ":", Type])). +name(GwType, Type) -> + list_to_atom(lists:concat([GwType, ":", Type])). merge_default_by_type(Type, Options) when Type =:= tcp; Type =:= ssl -> @@ -200,18 +192,18 @@ merge_default_by_type(Type, Options) when Type =:= udp; [{udp_options, Default} | Options] end. -stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop exproto ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop exproto ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(InstaId, Type), +stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwType, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index d94c9fa8b..cf1a1a017 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -20,36 +20,37 @@ -behavior(emqx_gateway_impl). %% APIs --export([ load/0 - , unload/0 +-export([ reg/0 + , unreg/0 ]). --export([]). - --export([ init/1 - , on_insta_create/3 - , on_insta_update/4 - , on_insta_destroy/3 +-export([ on_gateway_load/2 + , on_gateway_update/3 + , on_gateway_unload/2 ]). +-include_lib("emqx/include/logger.hrl"). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -load() -> +reg() -> RegistryOptions = [ {cbkmod, ?MODULE} ], - emqx_gateway_registry:load(lwm2m, RegistryOptions, []). + emqx_gateway_registry:reg(lwm2m, RegistryOptions). -unload() -> - %% XXX: - lwm2m_coap_server_registry:remove_handler( - [<<"rd">>], - emqx_lwm2m_coap_resource, undefined - ), - emqx_gateway_registry:unload(lwm2m). +unreg() -> + emqx_gateway_registry:unreg(lwm2m). + +%%-------------------------------------------------------------------- +%% emqx_gateway_registry callbacks +%%-------------------------------------------------------------------- + +on_gateway_load(_Gateway = #{ type := GwType, + rawconf := RawConf + }, Ctx) -> -init(_) -> %% Handler _ = lwm2m_coap_server:start_registry(), lwm2m_coap_server_registry:add_handler( @@ -57,75 +58,66 @@ init(_) -> emqx_lwm2m_coap_resource, undefined ), %% Xml registry - {ok, _} = emqx_lwm2m_xml_object_db:start_link( - emqx_config:get([gateway, lwm2m_xml_dir]) - ), + {ok, _} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, RawConf)), %% XXX: Self managed table? %% TODO: Improve it later {ok, _} = emqx_lwm2m_cm:start_link(), - GwState = #{}, - {ok, GwState}. - -%% TODO: deinit - -%%-------------------------------------------------------------------- -%% emqx_gateway_registry callbacks -%%-------------------------------------------------------------------- - -on_insta_create(_Insta = #{ id := InstaId, - rawconf := RawConf - }, Ctx, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), ListenerPids = lists:map(fun(Lis) -> - start_listener(InstaId, Ctx, Lis) + start_listener(GwType, Ctx, Lis) end, Listeners), - {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. + {ok, ListenerPids, _GwState = #{ctx => Ctx}}. -on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> - InstaId = maps:get(id, NewInsta), +on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> + GwType = maps:get(type, NewGateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_insta_destroy(OldInsta, GwInstaState, GwState), - on_insta_create(NewInsta, Ctx, GwState) + on_gateway_unload(OldGateway, GwState), + on_gateway_load(NewGateway, Ctx) catch Class : Reason : Stk -> - logger:error("Failed to update stomp instance ~s; " + logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [InstaId, Class, Reason, Stk]), + [GwType, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_insta_destroy(_Insta = #{ id := InstaId, - rawconf := RawConf - }, _GwInstaState, _GwState) -> +on_gateway_unload(_Gateway = #{ type := GwType, + rawconf := RawConf + }, _GwState) -> + %% XXX: + lwm2m_coap_server_registry:remove_handler( + [<<"rd">>], + emqx_lwm2m_coap_resource, undefined + ), + Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(InstaId, Lis) + stop_listener(GwType, Lis) end, Listeners). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - io:format("Start lwm2m ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]), + ?ULOG("Start ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]), Pid; {error, Reason} -> - io:format(standard_error, - "Failed to start lwm2m ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(InstaId, udp), +start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwType, udp), NCfg = Cfg#{ctx => Ctx}, NSocketOpts = merge_default(SocketOpts), Options = [{config, NCfg}|NSocketOpts], @@ -136,8 +128,8 @@ start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> lwm2m_coap_server:start_dtls(Name, ListenOn, Options) end. -name(InstaId, Type) -> - list_to_atom(lists:concat([InstaId, ":", Type])). +name(GwType, Type) -> + list_to_atom(lists:concat([GwType, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_udp_options(), @@ -149,22 +141,20 @@ merge_default(Options) -> [{udp_options, Default} | Options] end. -stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> io:format("Stop lwm2m ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]); {error, Reason} -> - io:format(standard_error, - "Failed to stop lwm2m ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason] - ) + ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(InstaId, Type), +stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwType, Type), case Type of udp -> lwm2m_coap_server:stop_udp(Name, ListenOn); diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 57070206c..03eebfd22 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -20,16 +20,13 @@ -behavior(emqx_gateway_impl). %% APIs --export([ load/0 - , unload/0 +-export([ reg/0 + , unreg/0 ]). --export([]). - --export([ init/1 - , on_insta_create/3 - , on_insta_update/4 - , on_insta_destroy/3 +-export([ on_gateway_load/2 + , on_gateway_update/3 + , on_gateway_unload/2 ]). -include_lib("emqx/include/logger.hrl"). @@ -38,25 +35,21 @@ %% APIs %%-------------------------------------------------------------------- -load() -> +reg() -> RegistryOptions = [ {cbkmod, ?MODULE} ], - emqx_gateway_registry:load(mqttsn, RegistryOptions, []). + emqx_gateway_registry:reg(mqttsn, RegistryOptions). -unload() -> - emqx_gateway_registry:unload(mqttsn). - -init(_) -> - GwState = #{}, - {ok, GwState}. +unreg() -> + emqx_gateway_registry:unreg(mqttsn). %%-------------------------------------------------------------------- %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -on_insta_create(_Insta = #{ id := InstaId, - rawconf := RawConf - }, Ctx, _GwState) -> +on_gateway_load(_Gateway = #{ type := GwType, + rawconf := RawConf + }, Ctx) -> %% We Also need to start `emqx_sn_broadcast` & %% `emqx_sn_registry` process @@ -71,7 +64,7 @@ on_insta_create(_Insta = #{ id := InstaId, end, PredefTopics = maps:get(predefined, RawConf), - {ok, RegistrySvr} = emqx_sn_registry:start_link(InstaId, PredefTopics), + {ok, RegistrySvr} = emqx_sn_registry:start_link(GwType, PredefTopics), NRawConf = maps:without( [broadcast, predefined], @@ -80,52 +73,52 @@ on_insta_create(_Insta = #{ id := InstaId, Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf), ListenerPids = lists:map(fun(Lis) -> - start_listener(InstaId, Ctx, Lis) + start_listener(GwType, Ctx, Lis) end, Listeners), {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. -on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> - InstaId = maps:get(id, NewInsta), +on_gateway_update(NewGateway = #{type := GwType}, OldGateway, + GwState = #{ctx := Ctx}) -> try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_insta_destroy(OldInsta, GwInstaState, GwState), - on_insta_create(NewInsta, Ctx, GwState) + on_gateway_unload(OldGateway, GwState), + on_gateway_load(NewGateway, Ctx) catch Class : Reason : Stk -> - logger:error("Failed to update stomp instance ~s; " + logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [InstaId, Class, Reason, Stk]), + [GwType, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_insta_destroy(_Insta = #{ id := InstaId, - rawconf := RawConf - }, _GwInstaState, _GwState) -> +on_gateway_unload(_Insta = #{ type := GwType, + rawconf := RawConf + }, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(InstaId, Lis) + stop_listener(GwType, Lis) end, Listeners). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start mqttsn ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]), + ?ULOG("Start ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start mqttsn ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(InstaId, Type), +start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwType, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_sn_frame, @@ -134,8 +127,8 @@ start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), {emqx_gateway_conn, start_link, [NCfg]}). -name(InstaId, Type) -> - list_to_atom(lists:concat([InstaId, ":", Type])). +name(GwType, Type) -> + list_to_atom(lists:concat([GwType, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_udp_options(), @@ -147,18 +140,18 @@ merge_default(Options) -> [{udp_options, Default} | Options] end. -stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop mqttsn ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop mqttsn ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(InstaId, Type), +stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwType, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 95183ad5e..84b10e97a 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -19,14 +19,13 @@ -behavior(emqx_gateway_impl). %% APIs --export([ load/0 - , unload/0 +-export([ reg/0 + , unreg/0 ]). --export([ init/1 - , on_insta_create/3 - , on_insta_update/4 - , on_insta_destroy/3 +-export([ on_gateway_load/2 + , on_gateway_update/3 + , on_gateway_unload/2 ]). -include_lib("emqx_gateway/include/emqx_gateway.hrl"). @@ -36,79 +35,75 @@ %% APIs %%-------------------------------------------------------------------- --spec load() -> ok | {error, any()}. -load() -> +-spec reg() -> ok | {error, any()}. +reg() -> RegistryOptions = [ {cbkmod, ?MODULE} ], - emqx_gateway_registry:load(stomp, RegistryOptions, []). + emqx_gateway_registry:reg(stomp, RegistryOptions). --spec unload() -> ok | {error, any()}. -unload() -> - emqx_gateway_registry:unload(stomp). - -init(_) -> - GwState = #{}, - {ok, GwState}. +-spec unreg() -> ok | {error, any()}. +unreg() -> + emqx_gateway_registry:unreg(stomp). %%-------------------------------------------------------------------- %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- -on_insta_create(_Insta = #{ id := InstaId, - rawconf := RawConf - }, Ctx, _GwState) -> +on_gateway_load(_Gateway = #{ type := GwType, + rawconf := RawConf + }, Ctx) -> %% Step1. Fold the rawconfs to listeners Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), %% Step2. Start listeners or escokd:specs ListenerPids = lists:map(fun(Lis) -> - start_listener(InstaId, Ctx, Lis) + start_listener(GwType, Ctx, Lis) end, Listeners), %% FIXME: How to throw an exception to interrupt the restart logic ? - %% FIXME: Assign ctx to InstaState - {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. + %% FIXME: Assign ctx to GwState + {ok, ListenerPids, _GwState = #{ctx => Ctx}}. -on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> - InstaId = maps:get(id, NewInsta), +on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> + GwType = maps:get(type, NewGateway), try %% XXX: 1. How hot-upgrade the changes ??? - %% XXX: 2. Check the New confs first before destroy old instance ??? - on_insta_destroy(OldInsta, GwInstaState, GwState), - on_insta_create(NewInsta, Ctx, GwState) + %% XXX: 2. Check the New confs first before destroy old state??? + on_gateway_unload(OldGateway, GwState), + on_gateway_load(NewGateway, Ctx) catch Class : Reason : Stk -> - logger:error("Failed to update stomp instance ~s; " + logger:error("Failed to update ~s; " "reason: {~0p, ~0p} stacktrace: ~0p", - [InstaId, Class, Reason, Stk]), + [GwType, Class, Reason, Stk]), {error, {Class, Reason}} end. -on_insta_destroy(_Insta = #{ id := InstaId, - rawconf := RawConf - }, _GwInstaState, _GwState) -> +on_gateway_unload(_Gateway = #{ type := GwType, + rawconf := RawConf + }, _GwState) -> Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> - stop_listener(InstaId, Lis) + stop_listener(GwType, Lis) end, Listeners). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start stomp ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]), + ?ULOG("Start ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start stomp ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(InstaId, Type), +start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) -> + Name = name(GwType, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_stomp_frame, @@ -117,8 +112,8 @@ start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) -> esockd:open(Name, ListenOn, merge_default(SocketOpts), {emqx_gateway_conn, start_link, [NCfg]}). -name(InstaId, Type) -> - list_to_atom(lists:concat([InstaId, ":", Type])). +name(GwType, Type) -> + list_to_atom(lists:concat([GwType, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_tcp_options(), @@ -130,18 +125,18 @@ merge_default(Options) -> [{tcp_options, Default} | Options] end. -stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop stomp ~s:~s listener on ~s successfully.~n", - [InstaId, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", + [GwType, Type, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop stomp ~s:~s listener on ~s: ~0p~n", - [InstaId, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", + [GwType, Type, ListenOnStr, Reason]) end, StopRet. -stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(InstaId, Type), +stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwType, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 6e4faa40a..43c1d9a86 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -67,18 +67,17 @@ set_special_cfg(emqx_gateway) -> LisType = get(grpname), emqx_config:put( [gateway, exproto], - #{'1' => - #{authenticator => allow_anonymous, - server => #{bind => 9100}, - handler => #{address => "http://127.0.0.1:9001"}, - listener => listener_confs(LisType) - }}); + #{authenticator => allow_anonymous, + server => #{bind => 9100}, + handler => #{address => "http://127.0.0.1:9001"}, + listener => listener_confs(LisType) + }); set_special_cfg(_App) -> ok. listener_confs(Type) -> Default = #{bind => 7993, acceptors => 8}, - #{Type => #{'1' => maps:merge(Default, maps:from_list(socketopts(Type)))}}. + #{Type => maps:merge(Default, maps:from_list(socketopts(Type)))}. %%-------------------------------------------------------------------- %% Tests cases diff --git a/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl index 33d577a46..7887ece0a 100644 --- a/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl @@ -23,7 +23,7 @@ -define(CONF_DEFAULT, <<""" gateway: { - stomp.1 {} + stomp {} } """>>). diff --git a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl index 0e19d9b4f..ff6e416e4 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl @@ -30,8 +30,8 @@ -define(CONF_DEFAULT, <<" gateway: { - lwm2m_xml_dir: \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\" - lwm2m.1: { + lwm2m: { + xml_dir: \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\" lifetime_min: 1s lifetime_max: 86400s qmode_time_windonw: 22 diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 0c60d964f..2b8475cf7 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -51,9 +51,9 @@ -define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-", integer_to_list(erlang:system_time())])). --define(CONF_DEFAULT, <<""" +-define(CONF_DEFAULT, <<" gateway: { - mqttsn.1: { + mqttsn: { gateway_id: 1 broadcast: true enable_stats: true @@ -73,7 +73,7 @@ gateway: { } } } -""">>). +">>). %%-------------------------------------------------------------------- %% Setups @@ -90,35 +90,6 @@ init_per_suite(Config) -> end_per_suite(_) -> emqx_ct_helpers:stop_apps([emqx_gateway]). -set_special_confs(emqx_gateway) -> - emqx_config:put( - [gateway], - #{ mqttsn => - #{'1' => - #{broadcast => true, - clientinfo_override => - #{password => "pw123", - username => "user1" - }, - enable_qos3 => true, - enable_stats => true, - gateway_id => 1, - idle_timeout => 30000, - listener => - #{udp => - #{'1' => - #{acceptors => 8,active_n => 100,backlog => 1024,bind => 1884, - high_watermark => 1048576,max_conn_rate => 1000, - max_connections => 10240000,send_timeout => 15000, - send_timeout_close => true}}}, - predefined => - [#{id => ?PREDEF_TOPIC_ID1, topic => ?PREDEF_TOPIC_NAME1}, - #{id => ?PREDEF_TOPIC_ID2, topic => ?PREDEF_TOPIC_NAME2}]}} - }); - -set_special_confs(_App) -> - ok. - %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl index 6161687f2..9aebfe791 100644 --- a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl @@ -26,8 +26,6 @@ -define(PREDEF_TOPICS, [#{id => 1, topic => <<"/predefined/topic/name/hello">>}, #{id => 2, topic => <<"/predefined/topic/name/nice">>}]). --define(INSTA_ID, 'mqttsn#1'). - %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -45,7 +43,7 @@ end_per_suite(_Config) -> ok. init_per_testcase(_TestCase, Config) -> - {ok, Pid} = ?REGISTRY:start_link(?INSTA_ID, ?PREDEF_TOPICS), + {ok, Pid} = ?REGISTRY:start_link('mqttsn', ?PREDEF_TOPICS), {Tab, Pid} = ?REGISTRY:lookup_name(Pid), [{reg, {Tab, Pid}} | Config]. diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl index b2e9bd84e..3d05b73c3 100644 --- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl @@ -23,9 +23,9 @@ -define(HEARTBEAT, <<$\n>>). --define(CONF_DEFAULT, <<""" +-define(CONF_DEFAULT, <<" gateway: { - stomp.1: { + stomp: { clientinfo_override: { username: \"${Packet.headers.login}\" password: \"${Packet.headers.passcode}\" @@ -35,7 +35,7 @@ gateway: { } } } -""">>). +">>). all() -> emqx_ct:all(?MODULE).