refactor(gw): improve http-api return structure

This commit is contained in:
JianBo He 2021-09-08 18:59:59 +08:00
parent e94e09075c
commit 0453702ce5
10 changed files with 228 additions and 75 deletions

View File

@ -43,6 +43,10 @@ gateway.stomp {
max_connections = 1024000 max_connections = 1024000
max_conn_rate = 1000 max_conn_rate = 1000
access_rules = [
"allow all"
]
## TCP options ## TCP options
## See ${example_common_tcp_options} for more information ## See ${example_common_tcp_options} for more information
tcp.active_n = 100 tcp.active_n = 100
@ -68,6 +72,16 @@ gateway.stomp {
ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem" ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem" ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
#ssl.verify = verify_none
#ssl.fail_if_no_peer_cert = false
#ssl.server_name_indication = disable
#ssl.secure_renegotiate = false
#ssl.reuse_sessions = false
#ssl.honor_cipher_order = false
#ssl.handshake_timeout = 15s
#ssl.depth = 10
#ssl.password = foo
#ssl.dhfile = path-to-your-file
} }
} }
@ -116,6 +130,9 @@ gateway.coap {
## DTLS Options ## DTLS Options
## See #{example_common_dtls_options} for more information ## See #{example_common_dtls_options} for more information
dtls.versions = ["dtlsv1"] dtls.versions = ["dtlsv1"]
dtls.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
dtls.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
dtls.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
} }
} }

View File

@ -99,7 +99,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
end. end.
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
frame_mod => emqx_coap_frame, frame_mod => emqx_coap_frame,
@ -114,9 +114,6 @@ do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])).
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
@ -130,5 +127,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet. StopRet.
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).

View File

@ -66,18 +66,21 @@ gateway_insta(get, #{bindings := #{name := Name0}}) ->
Name = binary_to_existing_atom(Name0), Name = binary_to_existing_atom(Name0),
case emqx_gateway:lookup(Name) of case emqx_gateway:lookup(Name) of
#{config := _Config} -> #{config := _Config} ->
%% FIXME: Got the parsed config, but we should return rawconfig to GwCfs = filled_raw_confs([<<"gateway">>, Name0]),
%% frontend NGwCfs = GwCfs#{<<"listeners">> =>
RawConf = emqx_config:fill_defaults( emqx_gateway_http:mapping_listener_m2l(
emqx_config:get_root_raw([<<"gateway">>]) Name0, maps:get(<<"listeners">>, GwCfs, #{})
), )
{200, emqx_map_lib:deep_get([<<"gateway">>, Name0], RawConf)}; },
{200, NGwCfs};
undefined -> undefined ->
return_http_error(404, <<"Gateway not found">>) return_http_error(404, <<"Gateway not found">>)
end; end;
gateway_insta(put, #{body := RawConfsIn, gateway_insta(put, #{body := RawConfsIn0,
bindings := #{name := Name} bindings := #{name := Name}
}) -> }) ->
RawConfsIn = maps:without([<<"authentication">>,
<<"listeners">>], RawConfsIn0),
%% FIXME: Cluster Consistence ?? %% FIXME: Cluster Consistence ??
case emqx_gateway:update_rawconf(Name, RawConfsIn) of case emqx_gateway:update_rawconf(Name, RawConfsIn) of
ok -> ok ->
@ -91,6 +94,12 @@ gateway_insta(put, #{body := RawConfsIn,
gateway_insta_stats(get, _Req) -> gateway_insta_stats(get, _Req) ->
return_http_error(401, <<"Implement it later (maybe 5.1)">>). return_http_error(401, <<"Implement it later (maybe 5.1)">>).
filled_raw_confs(Path) ->
RawConf = emqx_config:fill_defaults(
emqx_config:get_root_raw(Path)
),
Confs = emqx_map_lib:deep_get(Path, RawConf),
emqx_map_lib:jsonable_map(Confs).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Swagger defines %% Swagger defines
@ -199,8 +208,13 @@ schema_gateway_overview_list() ->
<<"enable">> => true, <<"enable">> => true,
<<"enable_stats">> => true,<<"heartbeat">> => <<"30s">>, <<"enable_stats">> => true,<<"heartbeat">> => <<"30s">>,
<<"idle_timeout">> => <<"30s">>, <<"idle_timeout">> => <<"30s">>,
<<"listeners">> => <<"listeners">> => [
#{<<"udp">> => #{<<"default">> => #{<<"bind">> => 5683}}}, #{<<"id">> => <<"coap:udp:default">>,
<<"type">> => <<"udp">>,
<<"running">> => true,
<<"acceptors">> => 8,<<"bind">> => 5683,
<<"max_conn_rate">> => 1000,
<<"max_connections">> => 10240}],
<<"mountpoint">> => <<>>,<<"notify_type">> => <<"qos">>, <<"mountpoint">> => <<>>,<<"notify_type">> => <<"qos">>,
<<"publish_qos">> => <<"qos1">>, <<"publish_qos">> => <<"qos1">>,
<<"subscribe_qos">> => <<"qos0">>} <<"subscribe_qos">> => <<"qos0">>}
@ -212,12 +226,13 @@ schema_gateway_overview_list() ->
<<"handler">> => <<"handler">> =>
#{<<"address">> => <<"http://127.0.0.1:9001">>}, #{<<"address">> => <<"http://127.0.0.1:9001">>},
<<"idle_timeout">> => <<"30s">>, <<"idle_timeout">> => <<"30s">>,
<<"listeners">> => <<"listeners">> => [
#{<<"tcp">> => #{<<"id">> => <<"exproto:tcp:default">>,
#{<<"default">> => <<"type">> => <<"tcp">>,
#{<<"acceptors">> => 8,<<"bind">> => 7993, <<"running">> => true,
<<"max_conn_rate">> => 1000, <<"acceptors">> => 8,<<"bind">> => 7993,
<<"max_connections">> => 10240}}}, <<"max_conn_rate">> => 1000,
<<"max_connections">> => 10240}],
<<"mountpoint">> => <<>>, <<"mountpoint">> => <<>>,
<<"server">> => #{<<"bind">> => 9100}} <<"server">> => #{<<"bind">> => 9100}}
). ).
@ -229,8 +244,11 @@ schema_gateway_overview_list() ->
<<"idle_timeout">> => <<"30s">>, <<"idle_timeout">> => <<"30s">>,
<<"lifetime_max">> => <<"86400s">>, <<"lifetime_max">> => <<"86400s">>,
<<"lifetime_min">> => <<"1s">>, <<"lifetime_min">> => <<"1s">>,
<<"listeners">> => <<"listeners">> => [
#{<<"udp">> => #{<<"default">> => #{<<"bind">> => 5783}}}, #{<<"id">> => <<"lwm2m:udp:default">>,
<<"type">> => <<"udp">>,
<<"running">> => true,
<<"bind">> => 5783}],
<<"mountpoint">> => <<"lwm2m/%e/">>, <<"mountpoint">> => <<"lwm2m/%e/">>,
<<"qmode_time_windonw">> => 22, <<"qmode_time_windonw">> => 22,
<<"translators">> => <<"translators">> =>
@ -251,11 +269,12 @@ schema_gateway_overview_list() ->
<<"enable">> => true, <<"enable">> => true,
<<"enable_qos3">> => true,<<"enable_stats">> => true, <<"enable_qos3">> => true,<<"enable_stats">> => true,
<<"gateway_id">> => 1,<<"idle_timeout">> => <<"30s">>, <<"gateway_id">> => 1,<<"idle_timeout">> => <<"30s">>,
<<"listeners">> => <<"listeners">> => [
#{<<"udp">> => #{<<"id">> => <<"mqttsn:udp:default">>,
#{<<"default">> => <<"type">> => <<"udp">>,
#{<<"bind">> => 1884,<<"max_conn_rate">> => 1000, <<"running">> => true,
<<"max_connections">> => 10240000}}}, <<"bind">> => 1884,<<"max_conn_rate">> => 1000,
<<"max_connections">> => 10240000}],
<<"mountpoint">> => <<>>, <<"mountpoint">> => <<>>,
<<"predefined">> => <<"predefined">> =>
[#{<<"id">> => 1, [#{<<"id">> => 1,
@ -279,12 +298,13 @@ schema_gateway_overview_list() ->
#{<<"max_body_length">> => 8192,<<"max_headers">> => 10, #{<<"max_body_length">> => 8192,<<"max_headers">> => 10,
<<"max_headers_length">> => 1024}, <<"max_headers_length">> => 1024},
<<"idle_timeout">> => <<"30s">>, <<"idle_timeout">> => <<"30s">>,
<<"listeners">> => <<"listeners">> => [
#{<<"tcp">> => #{<<"id">> => <<"stomp:tcp:default">>,
#{<<"default">> => <<"type">> => <<"tcp">>,
#{<<"acceptors">> => 16,<<"active_n">> => 100, <<"running">> => true,
<<"bind">> => 61613,<<"max_conn_rate">> => 1000, <<"acceptors">> => 16,<<"active_n">> => 100,
<<"max_connections">> => 1024000}}}, <<"bind">> => 61613,<<"max_conn_rate">> => 1000,
<<"max_connections">> => 1024000}],
<<"mountpoint">> => <<>>} <<"mountpoint">> => <<>>}
). ).
@ -312,10 +332,12 @@ schema_gateway_stats() ->
properties_gateway_overview() -> properties_gateway_overview() ->
ListenerProps = ListenerProps =
[ {name, string, [ {id, string,
<<"Listener Name">>} <<"Listener ID">>}
, {status, string, , {running, boolean,
<<"Listener Status">>, [<<"activing">>, <<"inactived">>]} <<"Listener Running status">>}
, {type, string,
<<"Listener Type">>, [<<"tcp">>, <<"ssl">>, <<"udp">>, <<"dtls">>]}
], ],
emqx_mgmt_util:properties( emqx_mgmt_util:properties(
[ {name, string, [ {name, string,
@ -323,9 +345,13 @@ properties_gateway_overview() ->
, {status, string, , {status, string,
<<"Gateway Status">>, <<"Gateway Status">>,
[<<"running">>, <<"stopped">>, <<"unloaded">>]} [<<"running">>, <<"stopped">>, <<"unloaded">>]}
, {created_at, string,
<<>>}
, {started_at, string, , {started_at, string,
<<>>} <<>>}
, {max_connection, integer, <<>>} , {stopped_at, string,
, {current_connection, integer, <<>>} <<>>}
, {max_connections, integer, <<>>}
, {current_connections, integer, <<>>}
, {listeners, {array, object}, ListenerProps} , {listeners, {array, object}, ListenerProps}
]). ]).

View File

@ -24,6 +24,12 @@
-export([ gateways/1 -export([ gateways/1
]). ]).
%% Mgmt APIs - listeners
-export([ listeners/1
, listener/2
, mapping_listener_m2l/2
]).
%% Mgmt APIs - clients %% Mgmt APIs - clients
-export([ lookup_client/3 -export([ lookup_client/3
, lookup_client/4 , lookup_client/4
@ -42,8 +48,8 @@
#{ name := binary() #{ name := binary()
, status := running | stopped | unloaded , status := running | stopped | unloaded
, started_at => binary() , started_at => binary()
, max_connection => integer() , max_connections => integer()
, current_connect => integer() , current_connections => integer()
, listeners => [] , listeners => []
}. }.
@ -68,8 +74,10 @@ gateways(Status) ->
created_at, created_at,
started_at, started_at,
stopped_at], GwInfo0), stopped_at], GwInfo0),
GwInfo1#{listeners => get_listeners_status(GwName, Config)} GwInfo1#{
max_connections => max_connections_count(Config),
current_connections => current_connections_count(GwName),
listeners => get_listeners_status(GwName, Config)}
end end
end, emqx_gateway_registry:list()), end, emqx_gateway_registry:list()),
case Status of case Status of
@ -78,24 +86,78 @@ gateways(Status) ->
[Gw || Gw = #{status := S} <- Gateways, S == Status] [Gw || Gw = #{status := S} <- Gateways, S == Status]
end. end.
%% @private
max_connections_count(Config) ->
Listeners = emqx_gateway_utils:normalize_config(Config),
lists:foldl(fun({_, _, _, SocketOpts, _}, Acc) ->
Acc + proplists:get_value(max_connections, SocketOpts, 0)
end, 0, Listeners).
%% @private
current_connections_count(GwName) ->
try
InfoTab = emqx_gateway_cm:tabname(info, GwName),
ets:info(InfoTab, size)
catch _ : _ ->
0
end.
%% @private %% @private
get_listeners_status(GwName, Config) -> get_listeners_status(GwName, Config) ->
Listeners = emqx_gateway_utils:normalize_config(Config), Listeners = emqx_gateway_utils:normalize_config(Config),
lists:map(fun({Type, LisName, ListenOn, _, _}) -> lists:map(fun({Type, LisName, ListenOn, _, _}) ->
Name0 = listener_name(GwName, Type, LisName), Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName),
Name = {Name0, ListenOn}, Name = {Name0, ListenOn},
LisO = #{id => Name0, type => Type},
case catch esockd:listener(Name) of case catch esockd:listener(Name) of
_Pid when is_pid(_Pid) -> _Pid when is_pid(_Pid) ->
#{Name0 => <<"activing">>}; LisO#{running => true};
_ -> _ ->
#{Name0 => <<"inactived">>} LisO#{running => false}
end end
end, Listeners). end, Listeners).
%% @private %%--------------------------------------------------------------------
listener_name(GwName, Type, LisName) -> %% Mgmt APIs - listeners
list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). %%--------------------------------------------------------------------
listeners(GwName) when is_atom (GwName) ->
listeners(atom_to_binary(GwName));
listeners(GwName) ->
RawConf = emqx_config:fill_defaults(
emqx_config:get_root_raw([<<"gateway">>])),
Listeners = emqx_map_lib:jsonable_map(
emqx_map_lib:deep_get(
[<<"gateway">>, GwName, <<"listeners">>], RawConf)),
mapping_listener_m2l(GwName, Listeners).
listener(_GwName, _ListenerId) ->
ok.
mapping_listener_m2l(GwName, Listeners0) ->
Listeners = maps:to_list(Listeners0),
lists:append([listener(GwName, Type, maps:to_list(Conf))
|| {Type, Conf} <- Listeners]).
listener(GwName, Type, Conf) ->
[begin
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LName),
Running = is_running(ListenerId, LConf),
LConf#{
id => ListenerId,
type => Type,
running => Running
}
end || {LName, LConf} <- Conf, is_map(LConf)].
is_running(ListenerId, #{<<"bind">> := ListenOn0}) ->
ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0),
try esockd:listener({ListenerId, ListenOn}) of
Pid when is_pid(Pid)->
true
catch _:_ ->
false
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mgmt APIs - clients %% Mgmt APIs - clients
@ -145,7 +207,7 @@ list_client_subscriptions(GwName, ClientId) ->
with_channel(GwName, ClientId, with_channel(GwName, ClientId,
fun(Pid) -> fun(Pid) ->
Subs = emqx_gateway_conn:call( Subs = emqx_gateway_conn:call(
Pid, Pid,
subscriptions, ?DEFAULT_CALL_TIMEOUT), subscriptions, ?DEFAULT_CALL_TIMEOUT),
{ok, lists:map(fun({Topic, SubOpts}) -> {ok, lists:map(fun({Topic, SubOpts}) ->
SubOpts#{topic => Topic} SubOpts#{topic => Topic}

View File

@ -204,7 +204,6 @@ fields(udp_opts) ->
fields(dtls_listener_ssl_opts) -> fields(dtls_listener_ssl_opts) ->
Base = emqx_schema:fields("listener_ssl_opts"), Base = emqx_schema:fields("listener_ssl_opts"),
%% XXX: ciphers ???
DtlsVers = hoconsc:mk( DtlsVers = hoconsc:mk(
typerefl:alias("string", list(atom())), typerefl:alias("string", list(atom())),
#{ default => default_dtls_vsns(), #{ default => default_dtls_vsns(),
@ -212,12 +211,41 @@ fields(dtls_listener_ssl_opts) ->
[dtls_vsn(iolist_to_binary(V)) || V <- Vsns] [dtls_vsn(iolist_to_binary(V)) || V <- Vsns]
end end
}), }),
lists:keyreplace("versions", 1, Base, {"versions", DtlsVers}); Ciphers = sc(hoconsc:array(string()), default_ciphers()),
lists:keydelete(
"handshake_timeout", 1,
lists:keyreplace(
"ciphers", 1,
lists:keyreplace("versions", 1, Base, {"versions", DtlsVers}),
{"ciphers", Ciphers}
)
);
fields(ExtraField) -> fields(ExtraField) ->
Mod = list_to_atom(ExtraField++"_schema"), Mod = list_to_atom(ExtraField++"_schema"),
Mod:fields(ExtraField). Mod:fields(ExtraField).
default_ciphers() ->
["ECDHE-ECDSA-AES256-GCM-SHA384",
"ECDHE-RSA-AES256-GCM-SHA384", "ECDHE-ECDSA-AES256-SHA384", "ECDHE-RSA-AES256-SHA384",
"ECDHE-ECDSA-DES-CBC3-SHA", "ECDH-ECDSA-AES256-GCM-SHA384", "ECDH-RSA-AES256-GCM-SHA384",
"ECDH-ECDSA-AES256-SHA384", "ECDH-RSA-AES256-SHA384", "DHE-DSS-AES256-GCM-SHA384",
"DHE-DSS-AES256-SHA256", "AES256-GCM-SHA384", "AES256-SHA256",
"ECDHE-ECDSA-AES128-GCM-SHA256", "ECDHE-RSA-AES128-GCM-SHA256",
"ECDHE-ECDSA-AES128-SHA256", "ECDHE-RSA-AES128-SHA256", "ECDH-ECDSA-AES128-GCM-SHA256",
"ECDH-RSA-AES128-GCM-SHA256", "ECDH-ECDSA-AES128-SHA256", "ECDH-RSA-AES128-SHA256",
"DHE-DSS-AES128-GCM-SHA256", "DHE-DSS-AES128-SHA256", "AES128-GCM-SHA256", "AES128-SHA256",
"ECDHE-ECDSA-AES256-SHA", "ECDHE-RSA-AES256-SHA", "DHE-DSS-AES256-SHA",
"ECDH-ECDSA-AES256-SHA", "ECDH-RSA-AES256-SHA", "AES256-SHA", "ECDHE-ECDSA-AES128-SHA",
"ECDHE-RSA-AES128-SHA", "DHE-DSS-AES128-SHA", "ECDH-ECDSA-AES128-SHA",
"ECDH-RSA-AES128-SHA", "AES128-SHA"
] ++ psk_ciphers().
psk_ciphers() ->
["PSK-AES128-CBC-SHA", "PSK-AES256-CBC-SHA",
"PSK-3DES-EDE-CBC-SHA", "PSK-RC4-SHA"
].
% authentication() -> % authentication() ->
% hoconsc:union( % hoconsc:union(
% [ undefined % [ undefined
@ -242,7 +270,7 @@ gateway_common_options() ->
[ {enable, sc(boolean(), true)} [ {enable, sc(boolean(), true)}
, {enable_stats, sc(boolean(), true)} , {enable_stats, sc(boolean(), true)}
, {idle_timeout, sc(duration(), <<"30s">>)} , {idle_timeout, sc(duration(), <<"30s">>)}
, {mountpoint, sc(binary())} , {mountpoint, sc(binary(), undefined)}
, {clientinfo_override, sc(ref(clientinfo_override))} , {clientinfo_override, sc(ref(clientinfo_override))}
, {authentication, sc(hoconsc:lazy(map()))} , {authentication, sc(hoconsc:lazy(map()))}
]. ].
@ -254,6 +282,7 @@ common_listener_opts() ->
, {max_connections, sc(integer(), 1024)} , {max_connections, sc(integer(), 1024)}
, {max_conn_rate, sc(integer())} , {max_conn_rate, sc(integer())}
%, {rate_limit, sc(comma_separated_list())} %, {rate_limit, sc(comma_separated_list())}
, {mountpoint, sc(binary(), undefined)}
, {access_rules, sc(hoconsc:array(string()), [])} , {access_rules, sc(hoconsc:array(string()), [])}
]. ].

View File

@ -28,8 +28,11 @@
-export([ apply/2 -export([ apply/2
, format_listenon/1 , format_listenon/1
, parse_listenon/1
, unix_ts_to_rfc3339/1 , unix_ts_to_rfc3339/1
, unix_ts_to_rfc3339/2 , unix_ts_to_rfc3339/2
, listener_id/3
, parse_listener_id/1
]). ]).
-export([ stringfy/1 -export([ stringfy/1
@ -112,6 +115,38 @@ format_listenon({Addr, Port}) when is_list(Addr) ->
format_listenon({Addr, Port}) when is_tuple(Addr) -> format_listenon({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
parse_listenon(Port) when is_integer(Port) ->
Port;
parse_listenon(Str) when is_binary(Str) ->
parse_listenon(binary_to_list(Str));
parse_listenon(Str) when is_list(Str) ->
case emqx_schema:to_ip_port(Str) of
{ok, R} -> R;
{error, _} ->
error({invalid_listenon_name, Str})
end.
listener_id(GwName, Type, LisName) ->
binary_to_atom(
<<(bin(GwName))/binary, ":",
(bin(Type))/binary, ":",
(bin(LisName))/binary
>>).
parse_listener_id(Id) ->
try
[GwName, Type, Name] = binary:split(bin(Id), <<":">>, [global]),
{binary_to_existing_atom(GwName), binary_to_existing_atom(Type),
binary_to_atom(Name)}
catch
_ : _ -> error({invalid_listener_id, Id})
end.
bin(A) when is_atom(A) ->
atom_to_binary(A);
bin(L) when is_list(L); is_binary(L) ->
iolist_to_binary(L).
unix_ts_to_rfc3339(Keys, Map) when is_list(Keys) -> unix_ts_to_rfc3339(Keys, Map) when is_list(Keys) ->
lists:foldl(fun(K, Acc) -> unix_ts_to_rfc3339(K, Acc) end, Map, Keys); lists:foldl(fun(K, Acc) -> unix_ts_to_rfc3339(K, Acc) end, Map, Keys);
unix_ts_to_rfc3339(Key, Map) -> unix_ts_to_rfc3339(Key, Map) ->

View File

@ -153,7 +153,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
end. end.
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
frame_mod => emqx_exproto_frame, frame_mod => emqx_exproto_frame,
@ -172,9 +172,6 @@ do_start_listener(udp, Name, ListenOn, Opts, MFA) ->
do_start_listener(dtls, Name, ListenOn, Opts, MFA) -> do_start_listener(dtls, Name, ListenOn, Opts, MFA) ->
esockd:open_dtls(Name, ListenOn, Opts, MFA). esockd:open_dtls(Name, ListenOn, Opts, MFA).
name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])).
merge_default_by_type(Type, Options) when Type =:= tcp; merge_default_by_type(Type, Options) when Type =:= tcp;
Type =:= ssl -> Type =:= ssl ->
Default = emqx_gateway_utils:default_tcp_options(), Default = emqx_gateway_utils:default_tcp_options(),
@ -209,5 +206,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet. StopRet.
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).

View File

@ -100,7 +100,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
end. end.
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, LisName, udp), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
NCfg = Cfg#{ ctx => Ctx NCfg = Cfg#{ ctx => Ctx
, frame_mod => emqx_coap_frame , frame_mod => emqx_coap_frame
, chann_mod => emqx_lwm2m_channel , chann_mod => emqx_lwm2m_channel
@ -119,16 +119,12 @@ merge_default(Options) ->
[{udp_options, Default} | Options] [{udp_options, Default} | Options]
end. end.
name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])).
do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
esockd:open_udp(Name, ListenOn, SocketOpts, MFA); esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
@ -142,5 +138,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet. StopRet.
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).

View File

@ -118,7 +118,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
end. end.
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
frame_mod => emqx_sn_frame, frame_mod => emqx_sn_frame,
@ -127,9 +127,6 @@ start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}). {emqx_gateway_conn, start_link, [NCfg]}).
name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])).
merge_default(Options) -> merge_default(Options) ->
Default = emqx_gateway_utils:default_udp_options(), Default = emqx_gateway_utils:default_udp_options(),
case lists:keytake(udp_options, 1, Options) of case lists:keytake(udp_options, 1, Options) of
@ -153,5 +150,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet. StopRet.
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).

View File

@ -103,7 +103,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
end. end.
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
frame_mod => emqx_stomp_frame, frame_mod => emqx_stomp_frame,
@ -112,9 +112,6 @@ start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
esockd:open(Name, ListenOn, merge_default(SocketOpts), esockd:open(Name, ListenOn, merge_default(SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}). {emqx_gateway_conn, start_link, [NCfg]}).
name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])).
merge_default(Options) -> merge_default(Options) ->
Default = emqx_gateway_utils:default_tcp_options(), Default = emqx_gateway_utils:default_tcp_options(),
case lists:keytake(tcp_options, 1, Options) of case lists:keytake(tcp_options, 1, Options) of
@ -138,5 +135,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet. StopRet.
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, LisName, Type), Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).