refactor(gw): single instance support only

This commit is contained in:
JianBo He 2021-08-13 19:13:14 +08:00
parent 50ee840220
commit be4d2495f0
21 changed files with 647 additions and 809 deletions

View File

@ -4,167 +4,169 @@
gateway: {
stomp.1: {
frame: {
max_headers: 10
max_headers_length: 1024
max_body_length: 8192
}
stomp: {
clientinfo_override: {
username: "${Packet.headers.login}"
password: "${Packet.headers.passcode}"
}
authentication: {
enable: true
authenticators: [
{
name: "authenticator1"
mechanism: password-based
server_type: built-in-database
user_id_type: clientid
}
]
}
listener.tcp.1: {
bind: 61613
acceptors: 16
max_connections: 1024000
max_conn_rate: 1000
active_n: 100
}
frame: {
max_headers: 10
max_headers_length: 1024
max_body_length: 8192
}
coap.1: {
enable_stats: false
authentication: {
enable: true
authenticators: [
{
name: "authenticator1"
mechanism: password-based
server_type: built-in-database
user_id_type: clientid
}
]
}
#authentication.enable: false
heartbeat: 30s
notify_type: qos
subscribe_qos: qos0
publish_qos: qos1
listener.udp.1: {
bind: 5683
}
clientinfo_override: {
username: "${Packet.headers.login}"
password: "${Packet.headers.passcode}"
}
mqttsn.1: {
## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id: 1
## Enable broadcast this gateway to WLAN
broadcast: true
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats: true
## To control whether accept and process the received
## publish message with qos=-1.
enable_qos3: true
## Idle timeout for a MQTT-SN channel
idle_timeout: 30s
## The pre-defined topic name corresponding to the pre-defined topic
## id of N.
## Note that the pre-defined topic id of 0 is reserved.
predefined: [
{ id: 1
topic: "/predefined/topic/name/hello"
},
{ id: 2
topic: "/predefined/topic/name/nice"
}
]
### ClientInfo override
clientinfo_override: {
username: "mqtt_sn_user"
password: "abc"
}
listener.udp.1: {
bind: 1884
max_connections: 10240000
max_conn_rate: 1000
}
authentication: {
enable: true
authenticators: [
{
name: "authenticator1"
mechanism: password-based
server_type: built-in-database
user_id_type: clientid
}
]
}
listener.tcp.1: {
bind: 61613
acceptors: 16
max_connections: 1024000
max_conn_rate: 1000
active_n: 100
}
}
coap: {
enable_stats: false
authentication: {
enable: true
authenticators: [
{
name: "authenticator1"
mechanism: password-based
server_type: built-in-database
user_id_type: clientid
}
]
}
#authentication.enable: false
heartbeat: 30s
notify_type: qos
subscribe_qos: qos0
publish_qos: qos1
listener.udp.1: {
bind: 5683
}
}
mqttsn: {
## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id: 1
## Enable broadcast this gateway to WLAN
broadcast: true
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats: true
## To control whether accept and process the received
## publish message with qos=-1.
enable_qos3: true
## Idle timeout for a MQTT-SN channel
idle_timeout: 30s
## The pre-defined topic name corresponding to the pre-defined topic
## id of N.
## Note that the pre-defined topic id of 0 is reserved.
predefined: [
{ id: 1
topic: "/predefined/topic/name/hello"
},
{ id: 2
topic: "/predefined/topic/name/nice"
}
]
### ClientInfo override
clientinfo_override: {
username: "mqtt_sn_user"
password: "abc"
}
listener.udp.1: {
bind: 1884
max_connections: 10240000
max_conn_rate: 1000
}
}
## Extension Protocol Gateway
exproto.1: {
## The gRPC server to accept requests
server: {
bind: 9100
#ssl.keyfile:
#ssl.certfile:
#ssl.cacertfile:
}
handler: {
address: "http://127.0.0.1:9001"
#ssl.keyfile:
#ssl.certfile:
#ssl.cacertfile:
}
authentication.enable: false
listener.tcp.1: {
bind: 7993
acceptors: 8
max_connections: 10240
max_conn_rate: 1000
}
#listener.ssl.1: {}
#listener.udp.1: {}
#listener.dtls.1: {}
exproto: {
## The gRPC server to accept requests
server: {
bind: 9100
#ssl.keyfile:
#ssl.certfile:
#ssl.cacertfile:
}
lwm2m_xml_dir: "{{ platform_etc_dir }}/lwm2m_xml"
lwm2m.1: {
lifetime_min: 1s
lifetime_max: 86400s
qmode_time_windonw: 22
auto_observe: false
mountpoint: "lwm2m/%e/"
## always | contains_object_list
update_msg_publish_condition: contains_object_list
translators: {
command: "dn/#"
response: "up/resp"
notify: "up/notify"
register: "up/resp"
update: "up/resp"
}
listener.udp.1 {
bind: 5783
}
handler: {
address: "http://127.0.0.1:9001"
#ssl.keyfile:
#ssl.certfile:
#ssl.cacertfile:
}
authentication.enable: false
listener.tcp.1: {
bind: 7993
acceptors: 8
max_connections: 10240
max_conn_rate: 1000
}
#listener.ssl.1: {}
#listener.udp.1: {}
#listener.dtls.1: {}
}
lwm2m: {
xml_dir: "{{ platform_etc_dir }}/lwm2m_xml"
lifetime_min: 1s
lifetime_max: 86400s
qmode_time_windonw: 22
auto_observe: false
mountpoint: "lwm2m/%e/"
## always | contains_object_list
update_msg_publish_condition: contains_object_list
translators: {
command: "dn/#"
response: "up/resp"
notify: "up/notify"
register: "up/resp"
update: "up/resp"
}
listener.udp.1 {
bind: 5783
}
}
}

View File

@ -20,15 +20,13 @@
-type instance_id() :: atom().
-type gateway_type() :: atom().
%% @doc The Gateway Instace defination
-type instance() ::
#{ id := instance_id()
, type := gateway_type()
, name := binary()
%% @doc The Gateway defination
-type gateway() ::
#{ type := gateway_type()
, descr => binary() | undefined
%% Appears only in creating or detailed info
, rawconf => map()
%% Appears only in getting instance status/info
%% Appears only in getting gateway status/info
, status => stopped | running
}.

View File

@ -22,29 +22,21 @@
-type reason() :: any().
%% @doc
-callback init(Options :: list()) -> {error, reason()} | {ok, GwState :: state()}.
%% @doc
-callback on_insta_create(Insta :: instance(),
Ctx :: emqx_gateway_ctx:context(),
GwState :: state()
)
-callback on_gateway_load(Gateway :: gateway(),
Ctx :: emqx_gateway_ctx:context())
-> {error, reason()}
| {ok, [GwInstaPid :: pid()], GwInstaState :: state()}
| {ok, [ChildPid :: pid()], GwState :: state()}
%% TODO: v0.2 The child spec is better for restarting child process
| {ok, [Childspec :: supervisor:child_spec()], GwInstaState :: state()}.
| {ok, [Childspec :: supervisor:child_spec()], GwState :: state()}.
%% @doc
-callback on_insta_update(NewInsta :: instance(),
OldInsta :: instance(),
GwInstaState :: state(),
GwState :: state())
-callback on_gateway_update(NewGateway :: gateway(),
OldGateway :: gateway(),
GwState :: state())
-> ok
| {ok, [GwInstaPid :: pid()], GwInstaState :: state()}
| {ok, [Childspec :: supervisor:child_spec()], GwInstaState :: state()}
| {ok, [ChildPid :: pid()], NGwState :: state()}
| {ok, [Childspec :: supervisor:child_spec()], NGwState :: state()}
| {error, reason()}.
%% @doc
-callback on_insta_destroy(Insta :: instance(),
GwInstaState :: state(),
GwState :: state()) -> ok.
-callback on_gateway_unload(Gateway :: gateway(), GwState :: state()) -> ok.

View File

@ -21,95 +21,85 @@
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
-export([ reg/0
, unreg/0
]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
-export([ on_gateway_load/2
, on_gateway_update/3
, on_gateway_unload/2
]).
-include_lib("emqx/include/logger.hrl").
-dialyzer({nowarn_function, [load/0]}).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
load() ->
reg() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
Options = [],
emqx_gateway_registry:load(coap, RegistryOptions, Options).
emqx_gateway_registry:reg(coap, RegistryOptions).
unload() ->
emqx_gateway_registry:unload(coap).
init([]) ->
GwState = #{},
{ok, GwState}.
unreg() ->
emqx_gateway_registry:unreg(coap).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_insta_create(_Insta = #{id := InstaId,
rawconf := RawConf
}, Ctx, _GwState) ->
on_gateway_load(_Gateway = #{type := GwType,
rawconf := RawConf
}, Ctx) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, Lis)
start_listener(GwType, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwType = maps:get(type, NewGateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
on_gateway_unload(OldGateway, GwState),
on_gateway_load(NewGateway, Ctx)
catch
Class : Reason : Stk ->
logger:error("Failed to update coap instance ~s; "
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
[GwType, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := RawConf
},
_GwInstaState,
_GWState) ->
on_gateway_unload(_Gateway = #{ type := GwType,
rawconf := RawConf
}, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
end, Listeners).
stop_listener(GwType, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start coap ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start coap ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, Type),
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_coap_frame,
@ -124,21 +114,21 @@ do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop coap ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop coap ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason])
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
esockd:close(Name, ListenOn).

View File

@ -20,8 +20,8 @@
%% APIs
-export([ registered_gateway/0
, create/4
, remove/1
, load/2
, unload/1
, lookup/1
, update/1
, start/1
@ -37,48 +37,40 @@ registered_gateway() ->
%%--------------------------------------------------------------------
%% Gateway Instace APIs
-spec list() -> [instance()].
-spec list() -> [gateway()].
list() ->
lists:append(lists:map(
fun({_, Insta}) -> Insta end,
emqx_gateway_sup:list_gateway_insta()
)).
emqx_gateway_sup:list_gateway_insta().
-spec create(gateway_type(), binary(), binary(), map())
-spec load(gateway_type(), map())
-> {ok, pid()}
| {error, any()}.
create(Type, Name, Descr, RawConf) ->
Insta = #{ id => clacu_insta_id(Type, Name)
, type => Type
, name => Name
, descr => Descr
, rawconf => RawConf
},
emqx_gateway_sup:create_gateway_insta(Insta).
load(GwType, RawConf) ->
Gateway = #{ type => GwType
, descr => undefined
, rawconf => RawConf
},
emqx_gateway_sup:load_gateway(Gateway).
-spec remove(instance_id()) -> ok | {error, any()}.
remove(InstaId) ->
emqx_gateway_sup:remove_gateway_insta(InstaId).
-spec unload(gateway_type()) -> ok | {error, any()}.
unload(GwType) ->
emqx_gateway_sup:unload_gateway(GwType).
-spec lookup(instance_id()) -> instance() | undefined.
lookup(InstaId) ->
emqx_gateway_sup:lookup_gateway_insta(InstaId).
-spec lookup(gateway_type()) -> gateway() | undefined.
lookup(GwType) ->
emqx_gateway_sup:lookup_gateway(GwType).
-spec update(instance()) -> ok | {error, any()}.
update(NewInsta) ->
emqx_gateway_sup:update_gateway_insta(NewInsta).
-spec update(gateway()) -> ok | {error, any()}.
update(NewGateway) ->
emqx_gateway_sup:update_gateway(NewGateway).
-spec start(instance_id()) -> ok | {error, any()}.
start(InstaId) ->
emqx_gateway_sup:start_gateway_insta(InstaId).
-spec start(gateway_type()) -> ok | {error, any()}.
start(GwType) ->
emqx_gateway_sup:start_gateway_insta(GwType).
-spec stop(instance_id()) -> ok | {error, any()}.
stop(InstaId) ->
emqx_gateway_sup:stop_gateway_insta(InstaId).
-spec stop(gateway_type()) -> ok | {error, any()}.
stop(GwType) ->
emqx_gateway_sup:stop_gateway_insta(GwType).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
clacu_insta_id(Type, Name) when is_binary(Name) ->
list_to_atom(lists:concat([Type, "#", binary_to_list(Name)])).

View File

@ -27,7 +27,7 @@ start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_gateway_sup:start_link(),
emqx_gateway_cli:load(),
load_default_gateway_applications(),
create_gateway_by_default(),
load_gateway_by_default(),
{ok, Sup}.
stop(_State) ->
@ -40,56 +40,43 @@ stop(_State) ->
load_default_gateway_applications() ->
Apps = gateway_type_searching(),
?LOG(info, "Starting the default gateway types: ~p", [Apps]),
lists:foreach(fun load/1, Apps).
lists:foreach(fun reg/1, Apps).
gateway_type_searching() ->
%% FIXME: Hardcoded apps
[emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl,
emqx_coap_impl, emqx_lwm2m_impl].
load(Mod) ->
reg(Mod) ->
try
Mod:load(),
?LOG(info, "Load ~s gateway application successfully!", [Mod])
Mod:reg(),
?LOG(info, "Register ~s gateway application successfully!", [Mod])
catch
Class : Reason ->
?LOG(error, "Load ~s gateway application failed: {~p, ~p}",
[Mod, Class, Reason])
Class : Reason : Stk ->
?LOG(error, "Failed to register ~s gateway application: {~p, ~p}\n"
"Stacktrace: ~0p",
[Mod, Class, Reason, Stk])
end.
create_gateway_by_default() ->
create_gateway_by_default(zipped_confs()).
load_gateway_by_default() ->
load_gateway_by_default(confs()).
create_gateway_by_default([]) ->
load_gateway_by_default([]) ->
ok;
create_gateway_by_default([{Type, Name, Confs}|More]) ->
load_gateway_by_default([{Type, Confs}|More]) ->
case emqx_gateway_registry:lookup(Type) of
undefined ->
?LOG(error, "Skip to start ~s#~s: not_registred_type",
[Type, Name]);
?LOG(error, "Skip to load ~s gateway, because it is not registered",
[Type]);
_ ->
case emqx_gateway:create(Type,
atom_to_binary(Name, utf8),
<<>>,
Confs) of
case emqx_gateway:load(Type, Confs) of
{ok, _} ->
?LOG(debug, "Start ~s#~s successfully!", [Type, Name]);
?LOG(debug, "Load ~s gateway successfully!", [Type]);
{error, Reason} ->
?LOG(error, "Start ~s#~s failed: ~0p",
[Type, Name, Reason])
?LOG(error, "Failed to load ~s gateway: ~0p", [Type, Reason])
end
end,
create_gateway_by_default(More).
load_gateway_by_default(More).
zipped_confs() ->
All = maps:to_list(
maps:without(exclude_options(), emqx_config:get([gateway]))),
lists:append(lists:foldr(
fun({Type, Gws}, Acc) ->
{Names, Confs} = lists:unzip(maps:to_list(Gws)),
Types = [ Type || _ <- lists:seq(1, length(Names))],
[lists:zip3(Types, Names, Confs) | Acc]
end, [], All)).
exclude_options() ->
[lwm2m_xml_dir].
confs() ->
maps:to_list(emqx_config:get([gateway], [])).

View File

@ -15,6 +15,10 @@
%%--------------------------------------------------------------------
%% @doc The Gateway Top supervisor.
%%
%% This supervisor has monitor a bunch of process/resources depended by
%% gateway runtime
%%
-module(emqx_gateway_gw_sup).
-behaviour(supervisor).
@ -41,64 +45,62 @@
start_link(Type) ->
supervisor:start_link({local, Type}, ?MODULE, [Type]).
-spec create_insta(pid(), instance(), map()) -> {ok, GwInstaPid :: pid()} | {error, any()}.
create_insta(Sup, Insta = #{id := InstaId}, GwDscrptr) ->
case emqx_gateway_utils:find_sup_child(Sup, InstaId) of
-spec create_insta(pid(), gateway(), map()) -> {ok, GwInstaPid :: pid()} | {error, any()}.
create_insta(Sup, Gateway = #{type := GwType}, GwDscrptr) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
{ok, _GwInstaPid} -> {error, alredy_existed};
false ->
%% XXX: More instances options to it?
%%
Ctx = ctx(Sup, InstaId),
Ctx = ctx(Sup, GwType),
%%
ChildSpec = emqx_gateway_utils:childspec(
InstaId,
GwType,
worker,
emqx_gateway_insta_sup,
[Insta, Ctx, GwDscrptr]
[Gateway, Ctx, GwDscrptr]
),
emqx_gateway_utils:supervisor_ret(
supervisor:start_child(Sup, ChildSpec)
)
end.
-spec remove_insta(pid(), InstaId :: atom()) -> ok | {error, any()}.
remove_insta(Sup, InstaId) ->
case emqx_gateway_utils:find_sup_child(Sup, InstaId) of
-spec remove_insta(pid(), GwType :: gateway_type()) -> ok | {error, any()}.
remove_insta(Sup, GwType) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
false -> ok;
{ok, _GwInstaPid} ->
ok = supervisor:terminate_child(Sup, InstaId),
ok = supervisor:delete_child(Sup, InstaId)
ok = supervisor:terminate_child(Sup, GwType),
ok = supervisor:delete_child(Sup, GwType)
end.
-spec update_insta(pid(), NewInsta :: instance()) -> ok | {error, any()}.
update_insta(Sup, NewInsta = #{id := InstaId}) ->
case emqx_gateway_utils:find_sup_child(Sup, InstaId) of
-spec update_insta(pid(), NewGateway :: gateway()) -> ok | {error, any()}.
update_insta(Sup, NewGateway = #{type := GwType}) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
false -> {error, not_found};
{ok, GwInstaPid} ->
emqx_gateway_insta_sup:update(GwInstaPid, NewInsta)
emqx_gateway_insta_sup:update(GwInstaPid, NewGateway)
end.
-spec start_insta(pid(), atom()) -> ok | {error, any()}.
start_insta(Sup, InstaId) ->
case emqx_gateway_utils:find_sup_child(Sup, InstaId) of
-spec start_insta(pid(), gateway_type()) -> ok | {error, any()}.
start_insta(Sup, GwType) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
false -> {error, not_found};
{ok, GwInstaPid} ->
emqx_gateway_insta_sup:enable(GwInstaPid)
end.
-spec stop_insta(pid(), atom()) -> ok | {error, any()}.
stop_insta(Sup, InstaId) ->
case emqx_gateway_utils:find_sup_child(Sup, InstaId) of
-spec stop_insta(pid(), gateway_type()) -> ok | {error, any()}.
stop_insta(Sup, GwType) ->
case emqx_gateway_utils:find_sup_child(Sup, GwType) of
false -> {error, not_found};
{ok, GwInstaPid} ->
emqx_gateway_insta_sup:disable(GwInstaPid)
end.
-spec list_insta(pid()) -> [instance()].
-spec list_insta(pid()) -> [gateway()].
list_insta(Sup) ->
lists:filtermap(
fun({InstaId, GwInstaPid, _Type, _Mods}) ->
is_gateway_insta_id(InstaId)
fun({GwType, GwInstaPid, _Type, _Mods}) ->
is_gateway_insta_id(GwType)
andalso {true, emqx_gateway_insta_sup:info(GwInstaPid)}
end, supervisor:which_children(Sup)).
@ -119,10 +121,10 @@ init([Type]) ->
%% Internal funcs
%%--------------------------------------------------------------------
ctx(Sup, InstaId) ->
ctx(Sup, GwType) ->
{_, Type} = erlang:process_info(Sup, registered_name),
{ok, CM} = emqx_gateway_utils:find_sup_child(Sup, emqx_gateway_cm),
#{ instid => InstaId
#{ instid => GwType
, type => Type
, cm => CM
}.

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The gateway instance management
%% @doc The gateway runtime
-module(emqx_gateway_insta_sup).
-behaviour(gen_server).
@ -40,42 +40,42 @@
]).
-record(state, {
insta :: instance(),
gw :: gateway(),
ctx :: emqx_gateway_ctx:context(),
status :: stopped | running,
child_pids :: [pid()],
insta_state :: emqx_gateway_impl:state() | undefined
gw_state :: emqx_gateway_impl:state() | undefined
}).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
start_link(Insta, Ctx, GwDscrptr) ->
start_link(Gateway, Ctx, GwDscrptr) ->
gen_server:start_link(
?MODULE,
[Insta, Ctx, GwDscrptr],
[Gateway, Ctx, GwDscrptr],
[]
).
-spec info(pid()) -> instance().
-spec info(pid()) -> gateway().
info(Pid) ->
gen_server:call(Pid, info).
%% @doc Stop instance
%% @doc Stop gateway
-spec disable(pid()) -> ok | {error, any()}.
disable(Pid) ->
call(Pid, disable).
%% @doc Start instance
%% @doc Start gateway
-spec enable(pid()) -> ok | {error, any()}.
enable(Pid) ->
call(Pid, enable).
%% @doc Update the gateway instance configurations
-spec update(pid(), instance()) -> ok | {error, any()}.
update(Pid, NewInsta) ->
call(Pid, {update, NewInsta}).
%% @doc Update the gateway configurations
-spec update(pid(), gateway()) -> ok | {error, any()}.
update(Pid, NewGateway) ->
call(Pid, {update, NewGateway}).
call(Pid, Req) ->
gen_server:call(Pid, Req, 5000).
@ -84,30 +84,29 @@ call(Pid, Req) ->
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Insta, Ctx0, _GwDscrptr]) ->
init([Gateway, Ctx0, _GwDscrptr]) ->
process_flag(trap_exit, true),
#{id := InstaId, rawconf := RawConf} = Insta,
Ctx = do_init_context(InstaId, RawConf, Ctx0),
#{type := GwType, rawconf := RawConf} = Gateway,
Ctx = do_init_context(GwType, RawConf, Ctx0),
State = #state{
insta = Insta,
gw = Gateway,
ctx = Ctx,
child_pids = [],
status = stopped
},
case cb_insta_create(State) of
{error, _Reason} ->
case cb_gateway_load(State) of
{error, Reason} ->
do_deinit_context(Ctx),
%% XXX: Return Reason??
{stop, create_gateway_instance_failed};
{stop, {load_gateway_failure, Reason}};
{ok, NState} ->
{ok, NState}
end.
do_init_context(InstaId, RawConf, Ctx) ->
do_init_context(GwType, RawConf, Ctx) ->
Auth = case maps:get(authentication, RawConf, #{enable => false}) of
#{enable := true,
authenticators := AuthCfgs} when is_list(AuthCfgs) ->
create_authenticators_for_gateway_insta(InstaId, AuthCfgs);
create_authenticators_for_gateway_insta(GwType, AuthCfgs);
_ ->
undefined
end,
@ -117,13 +116,13 @@ do_deinit_context(Ctx) ->
cleanup_authenticators_for_gateway_insta(maps:get(auth, Ctx)),
ok.
handle_call(info, _From, State = #state{insta = Insta}) ->
{reply, Insta, State};
handle_call(info, _From, State = #state{gw = Gateway}) ->
{reply, Gateway, State};
handle_call(disable, _From, State = #state{status = Status}) ->
case Status of
running ->
case cb_insta_destroy(State) of
case cb_gateway_unload(State) of
{ok, NState} ->
{reply, ok, NState};
{error, Reason} ->
@ -136,7 +135,7 @@ handle_call(disable, _From, State = #state{status = Status}) ->
handle_call(enable, _From, State = #state{status = Status}) ->
case Status of
stopped ->
case cb_insta_create(State) of
case cb_gateway_load(State) of
{error, Reason} ->
{reply, {error, Reason}, State};
{ok, NState} ->
@ -147,28 +146,30 @@ handle_call(enable, _From, State = #state{status = Status}) ->
end;
%% Stopped -> update
handle_call({update, NewInsta}, _From, State = #state{insta = Insta,
handle_call({update, NewGateway}, _From, State = #state{gw = Gateway,
status = stopped}) ->
case maps:get(id, NewInsta, undefined) == maps:get(id, Insta, undefined) of
case maps:get(type, NewGateway, undefined)
== maps:get(type, Gateway, undefined) of
true ->
{reply, ok, State#state{insta = NewInsta}};
{reply, ok, State#state{gw = NewGateway}};
false ->
{reply, {error, bad_instan_id}, State}
{reply, {error, gateway_type_not_match}, State}
end;
%% Running -> update
handle_call({update, NewInsta}, _From, State = #state{insta = Insta,
handle_call({update, NewGateway}, _From, State = #state{gw = Gateway,
status = running}) ->
case maps:get(id, NewInsta, undefined) == maps:get(id, Insta, undefined) of
case maps:get(type, NewGateway, undefined)
== maps:get(type, Gateway, undefined) of
true ->
case cb_insta_update(NewInsta, State) of
case cb_gateway_update(NewGateway, State) of
{ok, NState} ->
{reply, ok, NState};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
false ->
{reply, {error, bad_instan_id}, State}
{reply, {error, gateway_type_not_match}, State}
end;
handle_call(_Request, _From, State) ->
@ -187,7 +188,7 @@ handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) ->
logger:error("All child process exited!"),
{noreply, State#state{status = stopped,
child_pids = [],
insta_state = undefined}};
gw_state = undefined}};
RemainPids ->
{noreply, State#state{child_pids = RemainPids}}
end;
@ -201,10 +202,7 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, State = #state{ctx = Ctx, child_pids = Pids}) ->
%% Cleanup instances
%% Step1. Destory instance
Pids /= [] andalso (_ = cb_insta_destroy(State)),
%% Step2. Delete authenticator resources
Pids /= [] andalso (_ = cb_gateway_unload(State)),
_ = do_deinit_context(Ctx),
ok.
@ -217,8 +215,8 @@ code_change(_OldVsn, State, _Extra) ->
%% @doc AuthCfgs is a array of authenticatior configurations,
%% see: emqx_authn_schema:authenticators/1
create_authenticators_for_gateway_insta(InstaId0, AuthCfgs) ->
ChainId = atom_to_binary(InstaId0, utf8),
create_authenticators_for_gateway_insta(GwType, AuthCfgs) ->
ChainId = atom_to_binary(GwType, utf8),
case emqx_authn:create_chain(#{id => ChainId}) of
{ok, _ChainInfo} ->
Results = lists:map(fun(AuthCfg = #{name := Name}) ->
@ -245,88 +243,85 @@ cleanup_authenticators_for_gateway_insta(ChainId) ->
case emqx_authn:delete_chain(ChainId) of
ok -> ok;
{error, {not_found, _}} ->
logger:warning("Failed clean authenticator chain: ~s, "
logger:warning("Failed to clean authenticator chain: ~s, "
"reason: not_found", [ChainId]);
{error, Reason} ->
logger:error("Failed clean authenticator chain: ~s, "
logger:error("Failed to clean authenticator chain: ~s, "
"reason: ~p", [ChainId, Reason])
end.
cb_insta_destroy(State = #state{insta = Insta = #{type := Type},
insta_state = InstaState}) ->
cb_gateway_unload(State = #state{gw = Gateway = #{type := GwType},
gw_state = GwState}) ->
try
#{cbkmod := CbMod,
state := GwState} = emqx_gateway_registry:lookup(Type),
CbMod:on_insta_destroy(Insta, InstaState, GwState),
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType),
CbMod:on_gateway_unload(Gateway, GwState, GwState),
{ok, State#state{child_pids = [],
insta_state = undefined,
gw_state = undefined,
status = stopped}}
catch
Class : Reason : Stk ->
logger:error("Destroy instance (~0p, ~0p, _) crashed: "
logger:error("Failed to unload gateway (~0p, ~0p) crashed: "
"{~p, ~p}, stacktrace: ~0p",
[Insta, InstaState,
[Gateway, GwState,
Class, Reason, Stk]),
{error, {Class, Reason, Stk}}
end.
cb_insta_create(State = #state{insta = Insta = #{type := Type},
ctx = Ctx}) ->
cb_gateway_load(State = #state{gw = Gateway = #{type := GwType},
ctx = Ctx}) ->
try
#{cbkmod := CbMod,
state := GwState} = emqx_gateway_registry:lookup(Type),
case CbMod:on_insta_create(Insta, Ctx, GwState) of
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType),
case CbMod:on_gateway_load(Gateway, Ctx) of
{error, Reason} -> throw({callback_return_error, Reason});
{ok, InstaPidOrSpecs, InstaState} ->
ChildPids = start_child_process(InstaPidOrSpecs),
{ok, ChildPidOrSpecs, GwState} ->
ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{
status = running,
child_pids = ChildPids,
insta_state = InstaState
gw_state = GwState
}}
end
catch
Class : Reason1 : Stk ->
logger:error("Create instance (~0p, ~0p, _) crashed: "
logger:error("Failed to load ~s gateway (~0p, ~0p) crashed: "
"{~p, ~p}, stacktrace: ~0p",
[Insta, Ctx,
[GwType, Gateway, Ctx,
Class, Reason1, Stk]),
{error, {Class, Reason1, Stk}}
end.
cb_insta_update(NewInsta,
State = #state{insta = Insta = #{type := Type},
ctx = Ctx,
insta_state = GwInstaState}) ->
cb_gateway_update(NewGateway,
State = #state{gw = Gateway = #{type := GwType},
ctx = Ctx,
gw_state = GwState}) ->
try
#{cbkmod := CbMod,
state := GwState} = emqx_gateway_registry:lookup(Type),
case CbMod:on_insta_update(NewInsta, Insta, GwInstaState, GwState) of
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType),
case CbMod:on_gateway_update(NewGateway, Gateway, GwState) of
{error, Reason} -> throw({callback_return_error, Reason});
{ok, InstaPidOrSpecs, InstaState} ->
{ok, ChildPidOrSpecs, NGwState} ->
%% XXX: Hot-upgrade ???
ChildPids = start_child_process(InstaPidOrSpecs),
ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{
status = running,
child_pids = ChildPids,
insta_state = InstaState
gw_state = NGwState
}}
end
catch
Class : Reason1 : Stk ->
logger:error("Update instance (~0p, ~0p, ~0p, _) crashed: "
logger:error("Failed to update gateway (~0p, ~0p, ~0p) crashed: "
"{~p, ~p}, stacktrace: ~0p",
[NewInsta, Insta, Ctx,
[NewGateway, Gateway, Ctx,
Class, Reason1, Stk]),
{error, {Class, Reason1, Stk}}
end.
start_child_process([Indictor|_] = InstaPidOrSpecs) ->
start_child_process([Indictor|_] = ChildPidOrSpecs) ->
case erlang:is_pid(Indictor) of
true ->
InstaPidOrSpecs;
ChildPidOrSpecs;
_ ->
do_start_child_process(InstaPidOrSpecs)
do_start_child_process(ChildPidOrSpecs)
end.
do_start_child_process(ChildSpecs) when is_list(ChildSpecs) ->

View File

@ -23,11 +23,9 @@
-behavior(gen_server).
%% APIs for Impl.
-export([ load/3
, unload/1
]).
-export([ list/0
-export([ reg/2
, unreg/1
, list/0
, lookup/1
]).
@ -44,9 +42,17 @@
]).
-record(state, {
loaded = #{} :: #{ gateway_type() => descriptor() }
reged = #{} :: #{ gateway_type() => descriptor() }
}).
-type registry_options() :: [registry_option()].
-type registry_option() :: {cbkmod, atom()}.
-type descriptor() :: #{ cbkmod := atom()
, rgopts := registry_options()
}.
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
@ -58,37 +64,24 @@ start_link() ->
%% Mgmt
%%--------------------------------------------------------------------
-type registry_options() :: [registry_option()].
-type registry_option() :: {cbkmod, atom()}.
-type gateway_options() :: list().
-type descriptor() :: #{ cbkmod := atom()
, rgopts := registry_options()
, gwopts := gateway_options()
, state => any()
}.
-spec load(gateway_type(), registry_options(), gateway_options())
-spec reg(gateway_type(), registry_options())
-> ok
| {error, any()}.
load(Type, RgOpts, GwOpts) ->
reg(Type, RgOpts) ->
CbMod = proplists:get_value(cbkmod, RgOpts, Type),
Dscrptr = #{ cbkmod => CbMod
, rgopts => RgOpts
, gwopts => GwOpts
},
call({load, Type, Dscrptr}).
call({reg, Type, Dscrptr}).
-spec unload(gateway_type()) -> ok | {error, any()}.
unload(Type) ->
-spec unreg(gateway_type()) -> ok | {error, any()}.
unreg(Type) ->
%% TODO: Checking ALL INSTACE HAS STOPPED
call({unload, Type}).
call({unreg, Type}).
%% TODO:
%unload(Type, Force) ->
% call({unload, Type, Froce}).
%unreg(Type, Force) ->
% call({unreg, Type, Froce}).
%% @doc Return all registered protocol gateway implementation
-spec list() -> [{gateway_type(), descriptor()}].
@ -109,41 +102,30 @@ call(Req) ->
init([]) ->
%% TODO: Metrics ???
process_flag(trap_exit, true),
{ok, #state{loaded = #{}}}.
{ok, #state{reged = #{}}}.
handle_call({load, Type, Dscrptr}, _From, State = #state{loaded = Gateways}) ->
handle_call({reg, Type, Dscrptr}, _From, State = #state{reged = Gateways}) ->
case maps:get(Type, Gateways, notfound) of
notfound ->
try
GwOpts = maps:get(gwopts, Dscrptr),
CbMod = maps:get(cbkmod, Dscrptr),
{ok, GwState} = CbMod:init(GwOpts),
NDscrptr = maps:put(state, GwState, Dscrptr),
NGateways = maps:put(Type, NDscrptr, Gateways),
{reply, ok, State#state{loaded = NGateways}}
catch
Class : Reason : Stk ->
logger:error("Load ~s crashed {~p, ~p}; stacktrace: ~0p",
[Type, Class, Reason, Stk]),
{reply, {error, {Class, Reason}}, State}
end;
NGateways = maps:put(Type, Dscrptr, Gateways),
{reply, ok, State#state{reged = NGateways}};
_ ->
{reply, {error, already_existed}, State}
end;
handle_call({unload, Type}, _From, State = #state{loaded = Gateways}) ->
handle_call({unreg, Type}, _From, State = #state{reged = Gateways}) ->
case maps:get(Type, Gateways, undefined) of
undefined ->
{reply, ok, State};
_ ->
emqx_gateway_sup:stop_all_suptree(Type),
{reply, ok, State#state{loaded = maps:remove(Type, Gateways)}}
emqx_gateway_sup:unload_gateway(Type),
{reply, ok, State#state{reged = maps:remove(Type, Gateways)}}
end;
handle_call(all, _From, State = #state{loaded = Gateways}) ->
handle_call(all, _From, State = #state{reged = Gateways}) ->
{reply, maps:to_list(Gateways), State};
handle_call({lookup, Type}, _From, State = #state{loaded = Gateways}) ->
handle_call({lookup, Type}, _From, State = #state{reged = Gateways}) ->
Reply = maps:get(Type, Gateways, undefined),
{reply, Reply, State};

View File

@ -32,17 +32,13 @@
structs() -> ["gateway"].
fields("gateway") ->
[{stomp, t(ref(stomp))},
{mqttsn, t(ref(mqttsn))},
{coap, t(ref(coap))},
{lwm2m, t(ref(lwm2m))},
{lwm2m_xml_dir, t(string())},
{exproto, t(ref(exproto))}
[{stomp, t(ref(stomp_structs))},
{mqttsn, t(ref(mqttsn_structs))},
{coap, t(ref(coap_structs))},
{lwm2m, t(ref(lwm2m_structs))},
{exproto, t(ref(exproto_structs))}
];
fields(stomp) ->
[{"$id", t(ref(stomp_structs))}];
fields(stomp_structs) ->
[ {frame, t(ref(stomp_frame))}
, {clientinfo_override, t(ref(clientinfo_override))}
@ -56,9 +52,6 @@ fields(stomp_frame) ->
, {max_body_length, t(integer(), undefined, 8192)}
];
fields(mqttsn) ->
[{"$id", t(ref(mqttsn_structs))}];
fields(mqttsn_structs) ->
[ {gateway_id, t(integer())}
, {broadcast, t(boolean())}
@ -76,12 +69,9 @@ fields(mqttsn_predefined) ->
, {topic, t(string())}
];
fields(lwm2m) ->
[{"$id", t(ref(lwm2m_structs))}
];
fields(lwm2m_structs) ->
[ {lifetime_min, t(duration())}
[ {xml_dir, t(string())}
, {lifetime_min, t(duration())}
, {lifetime_max, t(duration())}
, {qmode_time_windonw, t(integer())}
, {auto_observe, t(boolean())}
@ -91,9 +81,6 @@ fields(lwm2m_structs) ->
, {listener, t(ref(udp_listener_group))}
];
fields(exproto) ->
[{"$id", t(ref(exproto_structs))}];
fields(exproto_structs) ->
[ {server, t(ref(exproto_grpc_server))}
, {handler, t(ref(exproto_grpc_handler))}

View File

@ -22,22 +22,16 @@
-export([start_link/0]).
%% Gateway Instance APIs
-export([ create_gateway_insta/1
, remove_gateway_insta/1
, lookup_gateway_insta/1
, update_gateway_insta/1
%% Gateway APIs
-export([ load_gateway/1
, unload_gateway/1
, lookup_gateway/1
, update_gateway/1
, start_gateway_insta/1
, stop_gateway_insta/1
, list_gateway_insta/1
, list_gateway_insta/0
]).
%% Gateway APs
-export([ list_started_gateway/0
, stop_all_suptree/1
]).
%% supervisor callbacks
-export([init/1]).
@ -48,88 +42,71 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec create_gateway_insta(instance()) -> {ok, pid()} | {error, any()}.
create_gateway_insta(Insta = #{type := Type}) ->
case emqx_gateway_registry:lookup(Type) of
undefined -> {error, {unknown_gateway_id, Type}};
-spec load_gateway(gateway()) -> {ok, pid()} | {error, any()}.
load_gateway(Gateway = #{type := GwType}) ->
case emqx_gateway_registry:lookup(GwType) of
undefined -> {error, {unknown_gateway_type, GwType}};
GwDscrptr ->
{ok, GwSup} = ensure_gateway_suptree_ready(gatewayid(Type)),
emqx_gateway_gw_sup:create_insta(GwSup, Insta, GwDscrptr)
{ok, GwSup} = ensure_gateway_suptree_ready(GwType),
emqx_gateway_gw_sup:create_insta(GwSup, Gateway, GwDscrptr)
end.
-spec remove_gateway_insta(instance_id()) -> ok | {error, any()}.
remove_gateway_insta(InstaId) ->
case search_gateway_insta_proc(InstaId) of
{ok, {GwSup, _}} ->
emqx_gateway_gw_sup:remove_insta(GwSup, InstaId);
-spec unload_gateway(gateway_type()) -> ok | {error, not_found}.
unload_gateway(GwType) ->
case lists:keyfind(GwType, 1, supervisor:which_children(?MODULE)) of
false -> {error, not_found};
_ ->
_ = supervisor:terminate_child(?MODULE, GwType),
_ = supervisor:delete_child(?MODULE, GwType),
ok
end.
-spec lookup_gateway_insta(instance_id()) -> instance() | undefined.
lookup_gateway_insta(InstaId) ->
case search_gateway_insta_proc(InstaId) of
-spec lookup_gateway(gateway_type()) -> gateway() | undefined.
lookup_gateway(GwType) ->
case search_gateway_insta_proc(GwType) of
{ok, {_, GwInstaPid}} ->
emqx_gateway_insta_sup:info(GwInstaPid);
_ ->
undefined
end.
-spec update_gateway_insta(instance())
-spec update_gateway(gateway_type())
-> ok
| {error, any()}.
update_gateway_insta(NewInsta = #{type := Type}) ->
case emqx_gateway_utils:find_sup_child(?MODULE, gatewayid(Type)) of
update_gateway(NewGateway = #{type := GwType}) ->
case emqx_gateway_utils:find_sup_child(?MODULE, GwType) of
{ok, GwSup} ->
emqx_gateway_gw_sup:update_insta(GwSup, NewInsta);
emqx_gateway_gw_sup:update_insta(GwSup, NewGateway);
_ -> {error, not_found}
end.
start_gateway_insta(InstaId) ->
case search_gateway_insta_proc(InstaId) of
start_gateway_insta(GwType) ->
case search_gateway_insta_proc(GwType) of
{ok, {GwSup, _}} ->
emqx_gateway_gw_sup:start_insta(GwSup, InstaId);
emqx_gateway_gw_sup:start_insta(GwSup, GwType);
_ -> {error, not_found}
end.
-spec stop_gateway_insta(instance_id()) -> ok | {error, any()}.
stop_gateway_insta(InstaId) ->
case search_gateway_insta_proc(InstaId) of
-spec stop_gateway_insta(gateway_type()) -> ok | {error, any()}.
stop_gateway_insta(GwType) ->
case search_gateway_insta_proc(GwType) of
{ok, {GwSup, _}} ->
emqx_gateway_gw_sup:stop_insta(GwSup, InstaId);
emqx_gateway_gw_sup:stop_insta(GwSup, GwType);
_ -> {error, not_found}
end.
-spec list_gateway_insta(gateway_type()) -> {ok, [instance()]} | {error, any()}.
list_gateway_insta(Type) ->
case emqx_gateway_utils:find_sup_child(?MODULE, gatewayid(Type)) of
{ok, GwSup} ->
{ok, emqx_gateway_gw_sup:list_insta(GwSup)};
_ -> {error, not_found}
end.
-spec list_gateway_insta() -> [{gateway_type(), instance()}].
-spec list_gateway_insta() -> [gateway()].
list_gateway_insta() ->
lists:map(
lists:append(lists:map(
fun(SupId) ->
Instas = emqx_gateway_gw_sup:list_insta(SupId),
{SupId, Instas}
end, list_started_gateway()).
emqx_gateway_gw_sup:list_insta(SupId)
end, list_started_gateway())).
-spec list_started_gateway() -> [gateway_type()].
list_started_gateway() ->
started_gateway_type().
-spec stop_all_suptree(atom()) -> ok.
stop_all_suptree(Type) ->
case lists:keyfind(Type, 1, supervisor:which_children(?MODULE)) of
false -> ok;
_ ->
_ = supervisor:terminate_child(?MODULE, Type),
_ = supervisor:delete_child(?MODULE, Type),
ok
end.
%% Supervisor callback
init([]) ->
@ -145,17 +122,14 @@ init([]) ->
%% Internal funcs
%%--------------------------------------------------------------------
gatewayid(Type) ->
list_to_atom(lists:concat([Type])).
ensure_gateway_suptree_ready(Type) ->
case lists:keyfind(Type, 1, supervisor:which_children(?MODULE)) of
ensure_gateway_suptree_ready(GwType) ->
case lists:keyfind(GwType, 1, supervisor:which_children(?MODULE)) of
false ->
ChildSpec = emqx_gateway_utils:childspec(
Type,
GwType,
supervisor,
emqx_gateway_gw_sup,
[Type]
[GwType]
),
emqx_gateway_utils:supervisor_ret(
supervisor:start_child(?MODULE, ChildSpec)
@ -190,4 +164,3 @@ started_gateway_pid() ->
is_a_gateway_id(Id) ->
Id /= emqx_gateway_registry.

View File

@ -20,16 +20,13 @@
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
-export([ reg/0
, unreg/0
]).
-export([]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
-export([ on_gateway_load/2
, on_gateway_update/3
, on_gateway_unload/2
]).
-include_lib("emqx/include/logger.hrl").
@ -38,24 +35,19 @@
%% APIs
%%--------------------------------------------------------------------
load() ->
reg() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
emqx_gateway_registry:load(exproto, RegistryOptions, []).
unload() ->
emqx_gateway_registry:unload(exproto).
init(_) ->
GwState = #{},
{ok, GwState}.
emqx_gateway_registry:reg(exproto, RegistryOptions).
unreg() ->
emqx_gateway_registry:unreg(exproto).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
start_grpc_server(InstaId, Options = #{bind := ListenOn}) ->
start_grpc_server(GwType, Options = #{bind := ListenOn}) ->
Services = #{protos => [emqx_exproto_pb],
services => #{
'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr}
@ -65,10 +57,10 @@ start_grpc_server(InstaId, Options = #{bind := ListenOn}) ->
SslOpts ->
[{ssl_options, SslOpts}]
end,
_ = grpc:start_server(InstaId, ListenOn, Services, SvrOptions),
?ULOG("Start ~s gRPC server on ~p successfully.~n", [InstaId, ListenOn]).
_ = grpc:start_server(GwType, ListenOn, Services, SvrOptions),
?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwType, ListenOn]).
start_grpc_client_channel(InstaId, Options = #{address := UriStr}) ->
start_grpc_client_channel(GwType, Options = #{address := UriStr}) ->
UriMap = uri_string:parse(UriStr),
Scheme = maps:get(scheme, UriMap),
Host = maps:get(host, UriMap),
@ -85,79 +77,79 @@ start_grpc_client_channel(InstaId, Options = #{address := UriStr}) ->
transport_opts => SslOpts}};
_ -> #{}
end,
grpc_client_sup:create_channel_pool(InstaId, SvrAddr, ClientOpts).
grpc_client_sup:create_channel_pool(GwType, SvrAddr, ClientOpts).
on_insta_create(_Insta = #{ id := InstaId,
rawconf := RawConf
}, Ctx, _GwState) ->
on_gateway_load(_Gateway = #{ type := GwType,
rawconf := RawConf
}, Ctx) ->
%% XXX: How to monitor it ?
%% Start grpc client pool & client channel
PoolName = pool_name(InstaId),
PoolName = pool_name(GwType),
PoolSize = emqx_vm:schedulers() * 2,
{ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize,
{emqx_exproto_gcli, start_link, []}),
_ = start_grpc_client_channel(InstaId, maps:get(handler, RawConf)),
_ = start_grpc_client_channel(GwType, maps:get(handler, RawConf)),
%% XXX: How to monitor it ?
_ = start_grpc_server(InstaId, maps:get(server, RawConf)),
_ = start_grpc_server(GwType, maps:get(server, RawConf)),
NRawConf = maps:without(
[server, handler],
RawConf#{pool_name => PoolName}
),
Listeners = emqx_gateway_utils:normalize_rawconf(
NRawConf#{handler => InstaId}
NRawConf#{handler => GwType}
),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, Lis)
start_listener(GwType, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwType = maps:get(type, NewGateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
on_gateway_unload(OldGateway, GwState),
on_gateway_load(NewGateway, Ctx)
catch
Class : Reason : Stk ->
logger:error("Failed to update exproto instance ~s; "
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
[GwType, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := RawConf
}, _GwInstaState, _GwState) ->
on_gateway_unload(_Gateway = #{ type := GwType,
rawconf := RawConf
}, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
stop_listener(GwType, Lis)
end, Listeners).
pool_name(InstaId) ->
list_to_atom(lists:concat([InstaId, "_gcli_pool"])).
pool_name(GwType) ->
list_to_atom(lists:concat([GwType, "_gcli_pool"])).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start exproto ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start exproto ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, Type),
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_exproto_frame,
@ -176,8 +168,8 @@ do_start_listener(udp, Name, ListenOn, Opts, MFA) ->
do_start_listener(dtls, Name, ListenOn, Opts, MFA) ->
esockd:open_dtls(Name, ListenOn, Opts, MFA).
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
merge_default_by_type(Type, Options) when Type =:= tcp;
Type =:= ssl ->
@ -200,18 +192,18 @@ merge_default_by_type(Type, Options) when Type =:= udp;
[{udp_options, Default} | Options]
end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop exproto ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop exproto ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason])
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
esockd:close(Name, ListenOn).

View File

@ -20,36 +20,37 @@
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
-export([ reg/0
, unreg/0
]).
-export([]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
-export([ on_gateway_load/2
, on_gateway_update/3
, on_gateway_unload/2
]).
-include_lib("emqx/include/logger.hrl").
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
load() ->
reg() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
emqx_gateway_registry:load(lwm2m, RegistryOptions, []).
emqx_gateway_registry:reg(lwm2m, RegistryOptions).
unload() ->
%% XXX:
lwm2m_coap_server_registry:remove_handler(
[<<"rd">>],
emqx_lwm2m_coap_resource, undefined
),
emqx_gateway_registry:unload(lwm2m).
unreg() ->
emqx_gateway_registry:unreg(lwm2m).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_gateway_load(_Gateway = #{ type := GwType,
rawconf := RawConf
}, Ctx) ->
init(_) ->
%% Handler
_ = lwm2m_coap_server:start_registry(),
lwm2m_coap_server_registry:add_handler(
@ -57,75 +58,66 @@ init(_) ->
emqx_lwm2m_coap_resource, undefined
),
%% Xml registry
{ok, _} = emqx_lwm2m_xml_object_db:start_link(
emqx_config:get([gateway, lwm2m_xml_dir])
),
{ok, _} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, RawConf)),
%% XXX: Self managed table?
%% TODO: Improve it later
{ok, _} = emqx_lwm2m_cm:start_link(),
GwState = #{},
{ok, GwState}.
%% TODO: deinit
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_insta_create(_Insta = #{ id := InstaId,
rawconf := RawConf
}, Ctx, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, Lis)
start_listener(GwType, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwType = maps:get(type, NewGateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
on_gateway_unload(OldGateway, GwState),
on_gateway_load(NewGateway, Ctx)
catch
Class : Reason : Stk ->
logger:error("Failed to update stomp instance ~s; "
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
[GwType, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := RawConf
}, _GwInstaState, _GwState) ->
on_gateway_unload(_Gateway = #{ type := GwType,
rawconf := RawConf
}, _GwState) ->
%% XXX:
lwm2m_coap_server_registry:remove_handler(
[<<"rd">>],
emqx_lwm2m_coap_resource, undefined
),
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
stop_listener(GwType, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
io:format("Start lwm2m ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
Pid;
{error, Reason} ->
io:format(standard_error,
"Failed to start lwm2m ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, udp),
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, udp),
NCfg = Cfg#{ctx => Ctx},
NSocketOpts = merge_default(SocketOpts),
Options = [{config, NCfg}|NSocketOpts],
@ -136,8 +128,8 @@ start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
lwm2m_coap_server:start_dtls(Name, ListenOn, Options)
end.
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
merge_default(Options) ->
Default = emqx_gateway_utils:default_udp_options(),
@ -149,22 +141,20 @@ merge_default(Options) ->
[{udp_options, Default} | Options]
end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> io:format("Stop lwm2m ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
{error, Reason} ->
io:format(standard_error,
"Failed to stop lwm2m ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]
)
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
case Type of
udp ->
lwm2m_coap_server:stop_udp(Name, ListenOn);

View File

@ -20,16 +20,13 @@
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
-export([ reg/0
, unreg/0
]).
-export([]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
-export([ on_gateway_load/2
, on_gateway_update/3
, on_gateway_unload/2
]).
-include_lib("emqx/include/logger.hrl").
@ -38,25 +35,21 @@
%% APIs
%%--------------------------------------------------------------------
load() ->
reg() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
emqx_gateway_registry:load(mqttsn, RegistryOptions, []).
emqx_gateway_registry:reg(mqttsn, RegistryOptions).
unload() ->
emqx_gateway_registry:unload(mqttsn).
init(_) ->
GwState = #{},
{ok, GwState}.
unreg() ->
emqx_gateway_registry:unreg(mqttsn).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_insta_create(_Insta = #{ id := InstaId,
rawconf := RawConf
}, Ctx, _GwState) ->
on_gateway_load(_Gateway = #{ type := GwType,
rawconf := RawConf
}, Ctx) ->
%% We Also need to start `emqx_sn_broadcast` &
%% `emqx_sn_registry` process
@ -71,7 +64,7 @@ on_insta_create(_Insta = #{ id := InstaId,
end,
PredefTopics = maps:get(predefined, RawConf),
{ok, RegistrySvr} = emqx_sn_registry:start_link(InstaId, PredefTopics),
{ok, RegistrySvr} = emqx_sn_registry:start_link(GwType, PredefTopics),
NRawConf = maps:without(
[broadcast, predefined],
@ -80,52 +73,52 @@ on_insta_create(_Insta = #{ id := InstaId,
Listeners = emqx_gateway_utils:normalize_rawconf(NRawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, Lis)
start_listener(GwType, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
on_gateway_update(NewGateway = #{type := GwType}, OldGateway,
GwState = #{ctx := Ctx}) ->
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
on_gateway_unload(OldGateway, GwState),
on_gateway_load(NewGateway, Ctx)
catch
Class : Reason : Stk ->
logger:error("Failed to update stomp instance ~s; "
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
[GwType, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := RawConf
}, _GwInstaState, _GwState) ->
on_gateway_unload(_Insta = #{ type := GwType,
rawconf := RawConf
}, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
stop_listener(GwType, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start mqttsn ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start mqttsn ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, Type),
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_sn_frame,
@ -134,8 +127,8 @@ start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}).
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
merge_default(Options) ->
Default = emqx_gateway_utils:default_udp_options(),
@ -147,18 +140,18 @@ merge_default(Options) ->
[{udp_options, Default} | Options]
end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop mqttsn ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop mqttsn ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason])
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
esockd:close(Name, ListenOn).

View File

@ -19,14 +19,13 @@
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
-export([ reg/0
, unreg/0
]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
-export([ on_gateway_load/2
, on_gateway_update/3
, on_gateway_unload/2
]).
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
@ -36,79 +35,75 @@
%% APIs
%%--------------------------------------------------------------------
-spec load() -> ok | {error, any()}.
load() ->
-spec reg() -> ok | {error, any()}.
reg() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
emqx_gateway_registry:load(stomp, RegistryOptions, []).
emqx_gateway_registry:reg(stomp, RegistryOptions).
-spec unload() -> ok | {error, any()}.
unload() ->
emqx_gateway_registry:unload(stomp).
init(_) ->
GwState = #{},
{ok, GwState}.
-spec unreg() -> ok | {error, any()}.
unreg() ->
emqx_gateway_registry:unreg(stomp).
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_insta_create(_Insta = #{ id := InstaId,
rawconf := RawConf
}, Ctx, _GwState) ->
on_gateway_load(_Gateway = #{ type := GwType,
rawconf := RawConf
}, Ctx) ->
%% Step1. Fold the rawconfs to listeners
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
%% Step2. Start listeners or escokd:specs
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, Lis)
start_listener(GwType, Ctx, Lis)
end, Listeners),
%% FIXME: How to throw an exception to interrupt the restart logic ?
%% FIXME: Assign ctx to InstaState
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
%% FIXME: Assign ctx to GwState
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwType = maps:get(type, NewGateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
%% XXX: 2. Check the New confs first before destroy old state???
on_gateway_unload(OldGateway, GwState),
on_gateway_load(NewGateway, Ctx)
catch
Class : Reason : Stk ->
logger:error("Failed to update stomp instance ~s; "
logger:error("Failed to update ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
[GwType, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := RawConf
}, _GwInstaState, _GwState) ->
on_gateway_unload(_Gateway = #{ type := GwType,
rawconf := RawConf
}, _GwState) ->
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
stop_listener(GwType, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(GwType, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
case start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start stomp ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
?ULOG("Start ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]),
Pid;
{error, Reason} ->
?ELOG("Failed to start stomp ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, Type),
start_listener(GwType, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(GwType, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_stomp_frame,
@ -117,8 +112,8 @@ start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
esockd:open(Name, ListenOn, merge_default(SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}).
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
name(GwType, Type) ->
list_to_atom(lists:concat([GwType, ":", Type])).
merge_default(Options) ->
Default = emqx_gateway_utils:default_tcp_options(),
@ -130,18 +125,18 @@ merge_default(Options) ->
[{tcp_options, Default} | Options]
end.
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
stop_listener(GwType, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwType, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> ?ULOG("Stop stomp ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n",
[GwType, Type, ListenOnStr]);
{error, Reason} ->
?ELOG("Failed to stop stomp ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason])
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n",
[GwType, Type, ListenOnStr, Reason])
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
stop_listener(GwType, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwType, Type),
esockd:close(Name, ListenOn).

View File

@ -67,18 +67,17 @@ set_special_cfg(emqx_gateway) ->
LisType = get(grpname),
emqx_config:put(
[gateway, exproto],
#{'1' =>
#{authenticator => allow_anonymous,
server => #{bind => 9100},
handler => #{address => "http://127.0.0.1:9001"},
listener => listener_confs(LisType)
}});
#{authenticator => allow_anonymous,
server => #{bind => 9100},
handler => #{address => "http://127.0.0.1:9001"},
listener => listener_confs(LisType)
});
set_special_cfg(_App) ->
ok.
listener_confs(Type) ->
Default = #{bind => 7993, acceptors => 8},
#{Type => #{'1' => maps:merge(Default, maps:from_list(socketopts(Type)))}}.
#{Type => maps:merge(Default, maps:from_list(socketopts(Type)))}.
%%--------------------------------------------------------------------
%% Tests cases

View File

@ -23,7 +23,7 @@
-define(CONF_DEFAULT, <<"""
gateway: {
stomp.1 {}
stomp {}
}
""">>).

View File

@ -30,8 +30,8 @@
-define(CONF_DEFAULT, <<"
gateway: {
lwm2m_xml_dir: \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"
lwm2m.1: {
lwm2m: {
xml_dir: \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"
lifetime_min: 1s
lifetime_max: 86400s
qmode_time_windonw: 22

View File

@ -51,9 +51,9 @@
-define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-",
integer_to_list(erlang:system_time())])).
-define(CONF_DEFAULT, <<"""
-define(CONF_DEFAULT, <<"
gateway: {
mqttsn.1: {
mqttsn: {
gateway_id: 1
broadcast: true
enable_stats: true
@ -73,7 +73,7 @@ gateway: {
}
}
}
""">>).
">>).
%%--------------------------------------------------------------------
%% Setups
@ -90,35 +90,6 @@ init_per_suite(Config) ->
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_gateway]).
set_special_confs(emqx_gateway) ->
emqx_config:put(
[gateway],
#{ mqttsn =>
#{'1' =>
#{broadcast => true,
clientinfo_override =>
#{password => "pw123",
username => "user1"
},
enable_qos3 => true,
enable_stats => true,
gateway_id => 1,
idle_timeout => 30000,
listener =>
#{udp =>
#{'1' =>
#{acceptors => 8,active_n => 100,backlog => 1024,bind => 1884,
high_watermark => 1048576,max_conn_rate => 1000,
max_connections => 10240000,send_timeout => 15000,
send_timeout_close => true}}},
predefined =>
[#{id => ?PREDEF_TOPIC_ID1, topic => ?PREDEF_TOPIC_NAME1},
#{id => ?PREDEF_TOPIC_ID2, topic => ?PREDEF_TOPIC_NAME2}]}}
});
set_special_confs(_App) ->
ok.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------

View File

@ -26,8 +26,6 @@
-define(PREDEF_TOPICS, [#{id => 1, topic => <<"/predefined/topic/name/hello">>},
#{id => 2, topic => <<"/predefined/topic/name/nice">>}]).
-define(INSTA_ID, 'mqttsn#1').
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
@ -45,7 +43,7 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
{ok, Pid} = ?REGISTRY:start_link(?INSTA_ID, ?PREDEF_TOPICS),
{ok, Pid} = ?REGISTRY:start_link('mqttsn', ?PREDEF_TOPICS),
{Tab, Pid} = ?REGISTRY:lookup_name(Pid),
[{reg, {Tab, Pid}} | Config].

View File

@ -23,9 +23,9 @@
-define(HEARTBEAT, <<$\n>>).
-define(CONF_DEFAULT, <<"""
-define(CONF_DEFAULT, <<"
gateway: {
stomp.1: {
stomp: {
clientinfo_override: {
username: \"${Packet.headers.login}\"
password: \"${Packet.headers.passcode}\"
@ -35,7 +35,7 @@ gateway: {
}
}
}
""">>).
">>).
all() -> emqx_ct:all(?MODULE).