Merge branch 'master' into EMQX-782

This commit is contained in:
x1001100011 2021-10-20 00:01:07 -07:00
commit dbe45d9d6f
71 changed files with 1602 additions and 685 deletions

View File

@ -57,7 +57,7 @@ jobs:
- uses: actions/checkout@v2
with:
repository: emqx/emqx-fvt
ref: v1.2.0
ref: v1.3.0
path: .
- uses: actions/setup-java@v1
with:

View File

@ -39,7 +39,7 @@ jobs:
- uses: actions/checkout@v2
with:
repository: emqx/emqtt-bench
ref: master
ref: 0.3.4
path: emqtt-bench
- uses: actions/checkout@v2
with:

View File

@ -18,7 +18,7 @@ IsQuicSupp = fun() ->
end,
Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.8"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.9"}}},
ExtraDeps = fun(C) ->
{deps, Deps0} = lists:keyfind(deps, 1, C),

View File

@ -935,7 +935,7 @@ switch_version(State = #{version := ?VER_1}) ->
switch_version(State = #{version := ?VER_2}) ->
State#{version := ?VER_1};
switch_version(State) ->
State#{version => ?VER_1}.
State#{version => ?VER_2}.
authn_type(#{mechanism := Mechanism, backend := Backend}) ->
{Mechanism, Backend};

View File

@ -165,6 +165,13 @@ fields("authorization") ->
[ {"no_match",
sc(hoconsc:enum([allow, deny]),
#{ default => allow
%% TODO: make sources a reference link
, desc => """
Default access control action if the user or client matches no ACL rules,
or if no such user or client is found by the configurable authorization
sources such as built-in-database, an HTTP API, or a query against PostgreSQL.
Find more details in 'authorization.sources' config.
"""
})}
, {"deny_action",
sc(hoconsc:enum([ignore, disconnect]),
@ -456,31 +463,31 @@ fields("listeners") ->
[ {"tcp",
sc(map(name, ref("mqtt_tcp_listener")),
#{ desc => "TCP listeners"
, nullable => {true, recursive}
, nullable => {true, recursively}
})
}
, {"ssl",
sc(map(name, ref("mqtt_ssl_listener")),
#{ desc => "SSL listeners"
, nullable => {true, recursive}
, nullable => {true, recursively}
})
}
, {"ws",
sc(map(name, ref("mqtt_ws_listener")),
#{ desc => "HTTP websocket listeners"
, nullable => {true, recursive}
, nullable => {true, recursively}
})
}
, {"wss",
sc(map(name, ref("mqtt_wss_listener")),
#{ desc => "HTTPS websocket listeners"
, nullable => {true, recursive}
, nullable => {true, recursively}
})
}
, {"quic",
sc(map(name, ref("mqtt_quic_listener")),
#{ desc => "QUIC listeners"
, nullable => {true, recursive}
, nullable => {true, recursively}
})
}
];
@ -1319,7 +1326,7 @@ validate_heap_size(Siz) ->
false -> ok
end.
parse_user_lookup_fun(StrConf) ->
[ModStr, FunStr] = string:tokens(StrConf, ":"),
[ModStr, FunStr] = string:tokens(str(StrConf), ":"),
Mod = list_to_atom(ModStr),
Fun = list_to_atom(FunStr),
{fun Mod:Fun/3, undefined}.
@ -1338,3 +1345,10 @@ validate_tls_versions(Versions) ->
[] -> ok;
Vs -> {error, {unsupported_ssl_versions, Vs}}
end.
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.

View File

@ -148,27 +148,10 @@ start_app(App, Handler) ->
app_path(App, filename:join(["etc", atom_to_list(App) ++ ".conf"])),
Handler).
%% TODO: get rid of cuttlefish
app_schema(App) ->
CuttlefishSchema = app_path(App, filename:join(["priv", atom_to_list(App) ++ ".schema"])),
case filelib:is_regular(CuttlefishSchema) of
true ->
CuttlefishSchema;
false ->
Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
try
true = is_list(Mod:roots()),
Mod
catch
C : E ->
error(#{app => App,
file => CuttlefishSchema,
module => Mod,
exeption => C,
reason => E
})
end
end.
Mod.
mustache_vars(App) ->
[{platform_data_dir, app_path(App, "data")},
@ -208,11 +191,7 @@ read_schema_configs(Schema, ConfigFile) ->
generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) ->
{ok, Conf0} = hocon:load(ConfigFile, #{format => richmap}),
hocon_schema:generate(SchemaModule, Conf0);
generate_config(SchemaFile, ConfigFile) ->
{ok, Conf1} = hocon:load(ConfigFile, #{format => proplists}),
Schema = cuttlefish_schema:files([SchemaFile]),
cuttlefish_generator:map(Schema, Conf1).
hocon_schema:generate(SchemaModule, Conf0).
-spec(stop_apps(list()) -> ok).
stop_apps(Apps) ->

View File

@ -0,0 +1,83 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_common_test_http).
-include_lib("common_test/include/ct.hrl").
-export([ request_api/3
, request_api/4
, request_api/5
, get_http_data/1
, create_default_app/0
, delete_default_app/0
, default_auth_header/0
, auth_header/2
]).
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).
request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, Body) ->
request_api(Method, Url, QueryParams, Auth, Body, []).
request_api(Method, Url, QueryParams, Auth, Body, HttpOpts) ->
NewUrl = case QueryParams of
[] ->
Url;
_ ->
Url ++ "?" ++ QueryParams
end,
Request = case Body of
[] ->
{NewUrl, [Auth]};
_ ->
{NewUrl, [Auth], "application/json", emqx_json:encode(Body)}
end,
do_request_api(Method, Request, HttpOpts).
do_request_api(Method, Request, HttpOpts) ->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, HttpOpts, [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} }
when Code =:= 200 orelse Code =:= 201 ->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
get_http_data(ResponseBody) ->
maps:get(<<"data">>, emqx_json:decode(ResponseBody, [return_maps])).
auth_header(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
default_auth_header() ->
AppId = <<"myappid">>,
AppSecret = emqx_mgmt_auth:get_appsecret(AppId),
auth_header(erlang:binary_to_list(AppId), erlang:binary_to_list(AppSecret)).
create_default_app() ->
emqx_mgmt_auth:add_app(<<"myappid">>, <<"test">>).
delete_default_app() ->
emqx_mgmt_auth:del_app(<<"myappid">>).

View File

@ -17,7 +17,7 @@
{profiles,
[{test, [
{deps, [{emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers.git", {branch,"hocon"}}}
{deps, [
]}
]}
]}.

View File

@ -17,7 +17,7 @@
{profiles,
[{test, [
{deps, [{emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers.git", {branch,"hocon"}}}
{deps, [
]}
]}
]}.

View File

@ -49,7 +49,7 @@
ignore = 'client.authorize.ignore'
}).
-define(CMD_REPLCAE, replace).
-define(CMD_REPLACE, replace).
-define(CMD_DELETE, delete).
-define(CMD_PREPEND, prepend).
-define(CMD_APPEND, append).

View File

@ -73,8 +73,8 @@ move(Type, Position, Opts) ->
update(Cmd, Sources) ->
update(Cmd, Sources, #{}).
update({?CMD_REPLCAE, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{?CMD_REPLCAE, type(Type)}, Sources}, Opts);
update({?CMD_REPLACE, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{?CMD_REPLACE, type(Type)}, Sources}, Opts);
update({?CMD_DELETE, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{?CMD_DELETE, type(Type)}, Sources}, Opts);
update(Cmd, Sources, Opts) ->
@ -102,7 +102,7 @@ do_update({?CMD_APPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
NConf = Conf ++ Sources,
ok = check_dup_types(NConf),
NConf;
do_update({{?CMD_REPLCAE, Type}, Source}, Conf) when is_map(Source), is_list(Conf) ->
do_update({{?CMD_REPLACE, Type}, Source}, Conf) when is_map(Source), is_list(Conf) ->
{_Old, Front, Rear} = take(Type, Conf),
NConf = Front ++ [Source | Rear],
ok = check_dup_types(NConf),
@ -113,7 +113,9 @@ do_update({{?CMD_DELETE, Type}, _Source}, Conf) when is_list(Conf) ->
NConf;
do_update({_, Sources}, _Conf) when is_list(Sources)->
%% overwrite the entire config!
Sources.
Sources;
do_update({Op, Sources}, Conf) ->
error({bad_request, #{op => Op, sources => Sources, conf => Conf}}).
pre_config_update(Cmd, Conf) ->
{ok, do_update(Cmd, Conf)}.
@ -138,7 +140,7 @@ do_post_update({?CMD_APPEND, Sources}, _NewSources) ->
InitedSources = init_sources(check_sources(Sources)),
emqx_hooks:put('client.authorize', {?MODULE, authorize, [lookup() ++ InitedSources]}, -1),
ok = emqx_authz_cache:drain_cache();
do_post_update({{?CMD_REPLCAE, Type}, Source}, _NewSources) when is_map(Source) ->
do_post_update({{?CMD_REPLACE, Type}, Source}, _NewSources) when is_map(Source) ->
OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources),
ok = ensure_resource_deleted(OldSource),
@ -202,13 +204,13 @@ init_source(#{type := file,
{ok, Terms} ->
[emqx_authz_rule:compile(Term) || Term <- Terms];
{error, eacces} ->
?LOG(alert, "Insufficient permissions to read the ~ts file", [Path]),
?SLOG(alert, #{msg => "insufficient_permissions_to_read_file", path => Path}),
error(eaccess);
{error, enoent} ->
?LOG(alert, "The ~ts file does not exist", [Path]),
?SLOG(alert, #{msg => "file_does_not_exist", path => Path}),
error(enoent);
{error, Reason} ->
?LOG(alert, "Failed to read ~ts: ~p", [Path, Reason]),
?SLOG(alert, #{msg => "failed_to_read_file", path => Path, reason => Reason}),
error(Reason)
end,
Source#{annotations => #{rules => Rules}};
@ -256,15 +258,15 @@ authorize(#{username := Username,
} = Client, PubSub, Topic, DefaultResult, Sources) ->
case do_authorize(Client, PubSub, Topic, Sources) of
{matched, allow} ->
?LOG(info, "Client succeeded authorization: Username: ~p, IP: ~p, Topic: ~p, Permission: allow", [Username, IpAddress, Topic]),
?SLOG(info, #{msg => "authorization_permission_allowed", username => Username, ipaddr => IpAddress, topic => Topic}),
emqx_metrics:inc(?AUTHZ_METRICS(allow)),
{stop, allow};
{matched, deny} ->
?LOG(info, "Client failed authorization: Username: ~p, IP: ~p, Topic: ~p, Permission: deny", [Username, IpAddress, Topic]),
?SLOG(info, #{msg => "authorization_permission_denied", username => Username, ipaddr => IpAddress, topic => Topic}),
emqx_metrics:inc(?AUTHZ_METRICS(deny)),
{stop, deny};
nomatch ->
?LOG(info, "Client failed authorization: Username: ~p, IP: ~p, Topic: ~p, Reasion: ~p", [Username, IpAddress, Topic, "no-match rule"]),
?SLOG(info, #{msg => "authorization_failed_nomatch", username => Username, ipaddr => IpAddress, topic => Topic, reason => "no-match rule"}),
{stop, DefaultResult}
end.

View File

@ -632,14 +632,18 @@ all(put, #{body := #{<<"rules">> := Rules}}) ->
purge(delete, _) ->
case emqx_authz_api_sources:get_raw_source(<<"built-in-database">>) of
[#{enable := false}] ->
[#{<<"enable">> := false}] ->
ok = lists:foreach(fun(Key) ->
ok = ekka_mnesia:dirty_delete(?ACL_TABLE, Key)
end, mnesia:dirty_all_keys(?ACL_TABLE)),
{204};
_ ->
[#{<<"enable">> := true}] ->
{400, #{code => <<"BAD_REQUEST">>,
message => <<"'built-in-database' type source must be disabled before purge.">>}}
message => <<"'built-in-database' type source must be disabled before purge.">>}};
[] ->
{404, #{code => <<"BAD_REQUEST">>,
message => <<"'built-in-database' type source is not found.">>
}}
end.
format_rules(Rules) when is_list(Rules) ->

View File

@ -347,17 +347,17 @@ sources(post, #{body := #{<<"type">> := <<"file">>, <<"rules">> := Rules}}) ->
{ok, Filename} = write_file(filename:join([emqx:get_config([node, data_dir]), "acl.conf"]), Rules),
update_config(?CMD_PREPEND, [#{<<"type">> => <<"file">>, <<"enable">> => true, <<"path">> => Filename}]);
sources(post, #{body := Body}) when is_map(Body) ->
update_config(?CMD_PREPEND, [write_cert(Body)]);
update_config(?CMD_PREPEND, [maybe_write_certs(Body)]);
sources(put, #{body := Body}) when is_list(Body) ->
NBody = [ begin
case Source of
#{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable} ->
{ok, Filename} = write_file(filename:join([emqx:get_config([node, data_dir]), "acl.conf"]), Rules),
#{<<"type">> => <<"file">>, <<"enable">> => Enable, <<"path">> => Filename};
_ -> write_cert(Source)
_ -> maybe_write_certs(Source)
end
end || Source <- Body],
update_config(?CMD_REPLCAE, NBody).
update_config(?CMD_REPLACE, NBody).
source(get, #{bindings := #{type := Type}}) ->
case get_raw_source(Type) of
@ -379,14 +379,14 @@ source(get, #{bindings := #{type := Type}}) ->
end;
source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable}}) ->
{ok, Filename} = write_file(maps:get(path, emqx_authz:lookup(file), ""), Rules),
case emqx_authz:update({?CMD_REPLCAE, <<"file">>}, #{<<"type">> => <<"file">>, <<"enable">> => Enable, <<"path">> => Filename}) of
case emqx_authz:update({?CMD_REPLACE, <<"file">>}, #{<<"type">> => <<"file">>, <<"enable">> => Enable, <<"path">> => Filename}) of
{ok, _} -> {204};
{error, Reason} ->
{400, #{code => <<"BAD_REQUEST">>,
message => bin(Reason)}}
end;
source(put, #{bindings := #{type := Type}, body := Body}) when is_map(Body) ->
update_config({?CMD_REPLCAE, Type}, write_cert(Body));
update_config({?CMD_REPLACE, Type}, maybe_write_certs(Body#{<<"type">> => Type}));
source(delete, #{bindings := #{type := Type}}) ->
update_config({?CMD_DELETE, Type}, #{}).
@ -402,7 +402,7 @@ move_source(post, #{bindings := #{type := Type}, body := #{<<"position">> := Pos
end.
get_raw_sources() ->
RawSources = emqx:get_raw_config([authorization, sources]),
RawSources = emqx:get_raw_config([authorization, sources], []),
Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}},
Conf = #{<<"sources">> => RawSources},
#{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, #{only_fill_defaults => true}),
@ -447,7 +447,7 @@ read_cert(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
};
read_cert(Source) -> Source.
write_cert(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
maybe_write_certs(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
CertPath = filename:join([emqx:get_config([node, data_dir]), "certs"]),
CaCert = case maps:is_key(<<"cacertfile">>, SSL) of
true ->
@ -475,7 +475,7 @@ write_cert(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
<<"keyfile">> => Key
}
};
write_cert(Source) -> Source.
maybe_write_certs(Source) -> Source.
write_file(Filename, Bytes0) ->
ok = filelib:ensure_dir(Filename),
@ -492,7 +492,7 @@ do_write_file(Filename, Bytes) ->
case file:write_file(Filename, Bytes) of
ok -> {ok, iolist_to_binary(Filename)};
{error, Reason} ->
?LOG(error, "Write File ~p Error: ~p", [Filename, Reason]),
?SLOG(error, #{filename => Filename, msg => "write_file_error", reason => Reason}),
error(Reason)
end.

View File

@ -40,7 +40,7 @@ authorize(Client, PubSub, Topic,
}) ->
case emqx_resource:query(ResourceID, {find, Collection, replvar(Selector, Client), #{}}) of
{error, Reason} ->
?LOG(error, "[AuthZ] Query mongo error: ~p", [Reason]),
?SLOG(error, #{msg => "query_mongo_error", reason => Reason, resource_id => ResourceID}),
nomatch;
[] -> nomatch;
Rows ->

View File

@ -55,7 +55,7 @@ authorize(Client, PubSub, Topic,
{ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows);
{error, Reason} ->
?LOG(error, "[AuthZ] Query mysql error: ~p~n", [Reason]),
?SLOG(error, #{msg => "query_mysql_error", reason => Reason, resource_id => ResourceID}),
nomatch
end.

View File

@ -59,7 +59,7 @@ authorize(Client, PubSub, Topic,
{ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows);
{error, Reason} ->
?LOG(error, "[AuthZ] Query postgresql error: ~p~n", [Reason]),
?SLOG(error, #{msg => "query_postgresql_error", reason => Reason, resource_id => ResourceID}),
nomatch
end.

View File

@ -43,7 +43,7 @@ authorize(Client, PubSub, Topic,
{ok, Rows} ->
do_authorize(Client, PubSub, Topic, Rows);
{error, Reason} ->
?LOG(error, "[AuthZ] Query redis error: ~p", [Reason]),
?SLOG(error, #{msg => "query_redis_error", reason => Reason, resource_id => ResourceID}),
nomatch
end.

View File

@ -40,7 +40,30 @@ fields("authorization") ->
, hoconsc:ref(?MODULE, redis_single)
, hoconsc:ref(?MODULE, redis_sentinel)
, hoconsc:ref(?MODULE, redis_cluster)
])}
]),
default => [],
desc =>
"""
Authorization data sources.<br>
An array of authorization (ACL) data providers.
It is designed as an array but not a hash-map so the sources can be
ordered to form a chain of access controls.<br>
When authorizing a publish or subscribe action, the configured
sources are checked in order. When checking an ACL source,
in case the client (identified by username or client ID) is not found,
it moves on to the next source. And it stops immediatly
once an 'allow' or 'deny' decision is returned.<br>
If the client is not found in any of the sources,
the default action configured in 'authorization.no_match' is applied.<br>
NOTE:
The source elements are identified by their 'type'.
It is NOT allowed to configure two or more sources of the same type.
"""
}
}
];
fields(file) ->

View File

@ -50,14 +50,14 @@ init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(?CMD_REPLCAE, []),
{ok, _} = emqx_authz:update(?CMD_REPLACE, []),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]),
meck:unload(emqx_resource),
meck:unload(emqx_schema),
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_authz:update(?CMD_REPLCAE, []),
{ok, _} = emqx_authz:update(?CMD_REPLACE, []),
Config.
-define(SOURCE1, #{<<"type">> => <<"http">>,
@ -120,7 +120,7 @@ init_per_testcase(_, Config) ->
%%------------------------------------------------------------------------------
t_update_source(_) ->
{ok, _} = emqx_authz:update(?CMD_REPLCAE, [?SOURCE3]),
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE3]),
{ok, _} = emqx_authz:update(?CMD_PREPEND, [?SOURCE2]),
{ok, _} = emqx_authz:update(?CMD_PREPEND, [?SOURCE1]),
{ok, _} = emqx_authz:update(?CMD_APPEND, [?SOURCE4]),
@ -135,12 +135,12 @@ t_update_source(_) ->
, #{type := file, enable := true}
], emqx:get_config([authorization, sources], [])),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, http}, ?SOURCE1#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, mongodb}, ?SOURCE2#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, mysql}, ?SOURCE3#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, postgresql}, ?SOURCE4#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, redis}, ?SOURCE5#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLCAE, file}, ?SOURCE6#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE3#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE4#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE5#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE6#{<<"enable">> := false}),
?assertMatch([ #{type := http, enable := false}
, #{type := mongodb, enable := false}
@ -150,10 +150,10 @@ t_update_source(_) ->
, #{type := file, enable := false}
], emqx:get_config([authorization, sources], [])),
{ok, _} = emqx_authz:update(?CMD_REPLCAE, []).
{ok, _} = emqx_authz:update(?CMD_REPLACE, []).
t_move_source(_) ->
{ok, _} = emqx_authz:update(?CMD_REPLCAE, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]),
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]),
?assertMatch([ #{type := http}
, #{type := mongodb}
, #{type := mysql}

View File

@ -22,16 +22,14 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
-import(emqx_ct_http, [ request_api/3
, request_api/5
, get_http_data/1
, create_default_app/0
, delete_default_app/0
, default_auth_header/0
, auth_header/2
]).
-define(CONF_DEFAULT, <<"""
authorization
{sources = [
{ type = \"built-in-database\"
enable = true
}
]}
""">>).
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
@ -82,33 +80,26 @@
]
}).
roots() -> ["authorization"].
fields("authorization") ->
emqx_authz_schema:fields("authorization") ++
emqx_schema:fields("authorization").
all() ->
[]. %% Todo: Waiting for @terry-xiaoyu to fix the config_not_found error
% emqx_common_test_helpers:all(?MODULE).
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT),
ok = emqx_common_test_helpers:start_apps([emqx_authz, emqx_dashboard], fun set_special_configs/1),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
ok = emqx_common_test_helpers:start_apps([emqx_authz, emqx_dashboard],
fun set_special_configs/1),
Config.
end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_dashboard]),
meck:unload(emqx_schema),
ok.
set_special_configs(emqx_dashboard) ->
@ -123,9 +114,9 @@ set_special_configs(emqx_dashboard) ->
emqx_config:put([emqx_dashboard], Config),
ok;
set_special_configs(emqx_authz) ->
emqx_config:put([authorization], #{sources => [#{type => 'built-in-database',
enable => true}
]}),
ok = emqx_config:init_load(?MODULE, ?CONF_DEFAULT),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
ok;
set_special_configs(_App) ->
ok.
@ -167,12 +158,12 @@ t_api(_) ->
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "all"]), ?EXAMPLE_ALL),
{ok, 200, Request7} = request(get, uri(["authorization", "sources", "built-in-database", "all"]), []),
[#{<<"rules">> := Rules5}] = jsx:decode(Request7),
#{<<"rules">> := Rules5} = jsx:decode(Request7),
?assertEqual(3, length(Rules5)),
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database", "all"]), ?EXAMPLE_ALL#{rules => []}),
{ok, 200, Request8} = request(get, uri(["authorization", "sources", "built-in-database", "all"]), []),
[#{<<"rules">> := Rules6}] = jsx:decode(Request8),
#{<<"rules">> := Rules6} = jsx:decode(Request8),
?assertEqual(0, length(Rules6)),
{ok, 204, _} = request(post, uri(["authorization", "sources", "built-in-database", "username"]), [ #{username => N, rules => []} || N <- lists:seq(1, 20) ]),
@ -184,11 +175,14 @@ t_api(_) ->
{ok, 200, Request10} = request(get, uri(["authorization", "sources", "built-in-database", "clientid?limit=5"]), []),
?assertEqual(5, length(jsx:decode(Request10))),
{ok, 400, _} = request(delete, uri(["authorization", "sources", "built-in-database", "purge-all"]), []),
{ok, 400, Msg1} = request(delete, uri(["authorization", "sources", "built-in-database", "purge-all"]), []),
?assertMatch({match, _}, re:run(Msg1, "must\sbe\sdisabled\sbefore")),
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database"]), #{<<"enable">> => true}),
%% test idempotence
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database"]), #{<<"enable">> => true}),
{ok, 204, _} = request(put, uri(["authorization", "sources", "built-in-database"]), #{<<"enable">> => false}),
{ok, 204, _} = request(delete, uri(["authorization", "sources", "built-in-database", "purge-all"]), []),
?assertEqual([], mnesia:dirty_all_keys(?ACL_TABLE)),
ok.
%%--------------------------------------------------------------------

View File

@ -24,15 +24,6 @@
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
-import(emqx_ct_http, [ request_api/3
, request_api/5
, get_http_data/1
, create_default_app/0
, delete_default_app/0
, default_auth_header/0
, auth_header/2
]).
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").

View File

@ -24,15 +24,6 @@
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
-import(emqx_ct_http, [ request_api/3
, request_api/5
, get_http_data/1
, create_default_app/0
, delete_default_app/0
, default_auth_header/0
, auth_header/2
]).
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").

View File

@ -8,7 +8,7 @@
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}},
{epgsql, {git, "https://github.com/epgsql/epgsql", {tag, "4.4.0"}}},
%% NOTE: mind poolboy version when updating mongodb-erlang version
{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.8"}}},
{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.9"}}},
%% NOTE: mind poolboy version when updating eredis_cluster version
{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.7"}}},
%% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git

View File

@ -148,15 +148,24 @@ on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname :=
end.
-dialyzer({nowarn_function, [on_health_check/2]}).
on_health_check(_InstId, #{test_opts := TestOpts} = State) ->
case mc_worker_api:connect(TestOpts) of
{ok, TestConn} ->
mc_worker_api:disconnect(TestConn),
{ok, State};
{error, _} ->
{error, health_check_failed, State}
on_health_check(_InstId, #{poolname := PoolName} = State) ->
case health_check(PoolName) of
true -> {ok, State};
false -> {error, health_check_failed, State}
end.
health_check(PoolName) ->
Status = [begin
case ecpool_worker:client(Worker) of
{ok, Conn} ->
%% we don't care if this returns something or not, we just to test the connection
Res = mongo_api:find_one(Conn, <<"foo">>, {}, #{}),
Res == undefined orelse is_map(Res);
_ -> false
end
end || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status).
%% ===================================================================
connect(Opts) ->
Type = proplists:get_value(mongo_type, Opts, single),

View File

@ -19,7 +19,7 @@
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_ct_http,
-import(emqx_common_test_http,
[ request_api/3
, request_api/5
, get_http_data/1

View File

@ -266,7 +266,7 @@ gateway.lwm2m {
lifetime_max = 86400s
qmode_time_window = 22
qmode_time_window = 22s
auto_observe = false

View File

@ -476,7 +476,7 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_info({sock_error, Reason}, State);
handle_msg({close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{
@ -525,7 +525,7 @@ handle_msg(Msg, State) ->
terminate(Reason, State = #state{
chann_mod = ChannMod,
channel = Channel}) ->
?LOG(debug, "Terminated due to ~p", [Reason]),
?SLOG(debug, #{msg => "conn_process_terminated", reason => Reason}),
_ = ChannMod:terminate(Reason, Channel),
_ = close_socket(State),
exit(Reason).
@ -620,7 +620,7 @@ handle_timeout(TRef, Msg, State) ->
parse_incoming(Data, State = #state{
chann_mod = ChannMod,
channel = Channel}) ->
?LOG(debug, "RECV ~0p", [Data]),
?SLOG(debug, #{msg => "RECV_data", data => Data}),
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
Ctx = ChannMod:info(ctx, Channel),
@ -643,8 +643,12 @@ parse_incoming(Data, Packets,
parse_incoming(Rest, [Packet|Packets], NState)
catch
error:Reason:Stk ->
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p",
[Reason, Stk, Data]),
?SLOG(error, #{ msg => "parse_frame_failed"
, at_state => ParseState
, input_bytes => Data
, reason => Reason
, stacktrace => Stk
}),
{[{frame_error, Reason}|Packets], State}
end.
@ -663,7 +667,9 @@ handle_incoming(Packet, State = #state{
}) ->
Ctx = ChannMod:info(ctx, Channel),
ok = inc_incoming_stats(Ctx, FrameMod, Packet),
?LOG(debug, "RECV ~ts", [FrameMod:format(Packet)]),
?SLOG(debug, #{ msg => "RECV_packet"
, packet => FrameMod:format(Packet)
}),
with_channel(handle_in, [Packet], State).
%%--------------------------------------------------------------------
@ -715,12 +721,17 @@ serialize_and_inc_stats_fun(#state{
Ctx = ChannMod:info(ctx, Channel),
fun(Packet) ->
case FrameMod:serialize_pkt(Packet, Serialize) of
<<>> -> ?LOG(warning, "~ts is discarded due to the frame is too large!",
[FrameMod:format(Packet)]),
<<>> ->
?SLOG(warning, #{ msg => "packet_too_large_discarded"
, packet => FrameMod:format(Packet)
}),
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
<<>>;
Data -> ?LOG(debug, "SEND ~ts", [FrameMod:format(Packet)]),
Data ->
?SLOG(debug, #{ msg => "SEND_packet"
, packet => FrameMod:format(Packet)
}),
ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
Data
end
@ -760,7 +771,9 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
end;
handle_info({sock_error, Reason}, State) ->
?LOG(debug, "Socket error: ~p", [Reason]),
?SLOG(debug, #{ msg => "sock_error"
, reason => Reason
}),
handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State) ->
@ -775,7 +788,10 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
{ok, Limiter1} ->
State#state{limiter = Limiter1};
{pause, Time, Limiter1} ->
?LOG(warning, "Pause ~pms due to rate limit", [Time]),
%% XXX: which limiter reached?
?SLOG(warning, #{ msg => "reach_rate_limit"
, pause => Time
}),
TRef = emqx_misc:start_timer(Time, limit_timeout),
State#state{sockstate = blocked,
limiter = Limiter1,

View File

@ -189,14 +189,14 @@ handle_call({send_request, Msg}, From, Channel) ->
erlang:setelement(1, Result, noreply);
handle_call(Req, _From, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]),
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, Channel}.
%%--------------------------------------------------------------------
%% Handle Cast
%%--------------------------------------------------------------------
handle_cast(Req, Channel) ->
?LOG(error, "Unexpected cast: ~p", [Req]),
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
{ok, Channel}.
%%--------------------------------------------------------------------
@ -206,7 +206,7 @@ handle_info({subscribe, _}, Channel) ->
{ok, Channel};
handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p", [Info]),
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{ok, Channel}.
%%--------------------------------------------------------------------
@ -331,8 +331,11 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx,
{ok, NClientInfo} ->
{ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} ->
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
[ClientId, Username, Reason]),
?SLOG(warning, #{ msg => "client_login_failed"
, username => Username
, clientid => ClientId
, reason => Reason
}),
{error, Reason}
end.
@ -375,7 +378,10 @@ process_connect(#channel{ctx = Ctx,
reply({ok, created}, Token, Msg, Result),
Channel#channel{token = Token});
{error, Reason} ->
?LOG(error, "Failed to open session du to ~p", [Reason]),
?SLOG(error, #{ msg => "failed_open_session"
, clientid => maps:get(clientid, ClientInfo)
, reason => Reason
}),
iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
end.

View File

@ -92,17 +92,36 @@ gateway_insta(delete, #{bindings := #{name := Name0}}) ->
end
end);
gateway_insta(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(_, _) ->
try
binary_to_existing_atom(Name0)
of
GwName ->
case emqx_gateway:lookup(GwName) of
undefined ->
{200, #{name => GwName, status => unloaded}};
Gateway ->
GwConf = emqx_gateway_conf:gateway(Name0),
{200, GwConf#{<<"name">> => Name0}}
end);
GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
[created_at, started_at, stopped_at],
Gateway),
GwInfo1 = maps:with([name,
status,
created_at,
started_at,
stopped_at], GwInfo0),
{200, maps:merge(GwConf, GwInfo1)}
end
catch
error : badarg ->
return_http_error(400, "Bad gateway name")
end;
gateway_insta(put, #{body := GwConf,
bindings := #{name := Name0}
}) ->
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_conf:update_gateway(GwName, GwConf) of
ok ->
{200};
{204};
{error, Reason} ->
return_http_error(500, Reason)
end

View File

@ -53,7 +53,14 @@ apis() ->
authn(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) ->
{200, emqx_gateway_http:authn(GwName)}
try
emqx_gateway_http:authn(GwName)
of
Authn -> {200, Authn}
catch
error : {config_not_found, _} ->
{204}
end
end);
authn(put, #{bindings := #{name := Name0},
@ -104,10 +111,11 @@ swagger("/gateway/:name/authentication", get) ->
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_authn()
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name/authentication", put) ->
#{ description => <<"Create the gateway authentication">>
#{ description => <<"Update authentication for the gateway">>
, parameters => params_gateway_name_in_path()
, requestBody => schema_authn()
, responses =>

View File

@ -71,6 +71,11 @@ apis() ->
, {<<"lte_created_at">>, timestamp}
, {<<"gte_connected_at">>, timestamp}
, {<<"lte_connected_at">>, timestamp}
%% special keys for lwm2m protocol
, {<<"endpoint_name">>, binary}
, {<<"like_endpoint_name">>, binary}
, {<<"gte_lifetime">>, timestamp}
, {<<"lte_lifetime">>, timestamp}
]).
-define(query_fun, {?MODULE, query}).
@ -105,8 +110,9 @@ clients_insta(get, #{ bindings := #{name := Name0,
[ClientInfo] ->
{200, ClientInfo};
[ClientInfo | _More] ->
?LOG(warning, "More than one client info was returned on ~ts",
[ClientId]),
?SLOG(warning, #{ msg => "more_than_one_channel_found"
, clientid => ClientId
}),
{200, ClientInfo};
[] ->
return_http_error(404, "Client not found")
@ -118,7 +124,7 @@ clients_insta(delete, #{ bindings := #{name := Name0,
ClientId = emqx_mgmt_util:urldecode(ClientId0),
with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway_http:kickout_client(GwName, ClientId),
{200}
{204}
end).
%% FIXME:
@ -152,7 +158,7 @@ subscriptions(post, #{ bindings := #{name := Name0,
{error, Reason} ->
return_http_error(404, Reason);
ok ->
{200}
{204}
end
end
end);
@ -167,7 +173,7 @@ subscriptions(delete, #{ bindings := #{name := Name0,
Topic = emqx_mgmt_util:urldecode(Topic0),
with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
{200}
{204}
end).
%%--------------------------------------------------------------------
@ -230,7 +236,7 @@ ms(username, X) ->
ms(zone, X) ->
#{clientinfo => #{zone => X}};
ms(ip_address, X) ->
#{clientinfo => #{peerhost => X}};
#{clientinfo => #{peername => {X, '_'}}};
ms(conn_state, X) ->
#{conn_state => X};
ms(clean_start, X) ->
@ -240,7 +246,12 @@ ms(proto_ver, X) ->
ms(connected_at, X) ->
#{conninfo => #{connected_at => X}};
ms(created_at, X) ->
#{session => #{created_at => X}}.
#{session => #{created_at => X}};
%% lwm2m fields
ms(endpoint_name, X) ->
#{clientinfo => #{endpoint_name => X}};
ms(lifetime, X) ->
#{clientinfo => #{lifetime => X}}.
%%--------------------------------------------------------------------
%% Fuzzy filter funcs
@ -267,7 +278,7 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE} | Fuzzy]
%%--------------------------------------------------------------------
%% format funcs
format_channel_info({_, Infos, Stats}) ->
format_channel_info({_, Infos, Stats} = R) ->
ClientInfo = maps:get(clientinfo, Infos, #{}),
ConnInfo = maps:get(conninfo, Infos, #{}),
SessInfo = maps:get(session, Infos, #{}),
@ -276,7 +287,8 @@ format_channel_info({_, Infos, Stats}) ->
, {username, ClientInfo}
, {proto_name, ConnInfo}
, {proto_ver, ConnInfo}
, {ip_address, {peername, ConnInfo, fun peer_to_binary/1}}
, {ip_address, {peername, ConnInfo, fun peer_to_binary_addr/1}}
, {port, {peername, ConnInfo, fun peer_to_port/1}}
, {is_bridge, ClientInfo, false}
, {connected_at,
{connected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
@ -309,7 +321,20 @@ format_channel_info({_, Infos, Stats}) ->
, {heap_size, Stats, 0}
, {reductions, Stats, 0}
],
eval(FetchX).
eval(FetchX ++ extra_feilds(R)).
extra_feilds({_, Infos, _Stats} = R) ->
extra_feilds(
maps:get(protocol, maps:get(clientinfo, Infos)),
R).
extra_feilds(lwm2m, {_, Infos, _Stats}) ->
ClientInfo = maps:get(clientinfo, Infos, #{}),
[ {endpoint_name, ClientInfo}
, {lifetime, ClientInfo}
];
extra_feilds(_, _) ->
[].
eval(Ls) ->
eval(Ls, #{}).
@ -341,13 +366,14 @@ key_get(K, M) when is_map(M) ->
key_get(K, L) when is_list(L) ->
proplists:get_value(K, L).
peer_to_binary({Addr, Port}) ->
AddrBinary = list_to_binary(inet:ntoa(Addr)),
PortBinary = integer_to_binary(Port),
<<AddrBinary/binary, ":", PortBinary/binary>>;
peer_to_binary(Addr) ->
-spec(peer_to_binary_addr(emqx_types:peername()) -> binary()).
peer_to_binary_addr({Addr, _}) ->
list_to_binary(inet:ntoa(Addr)).
-spec(peer_to_port(emqx_types:peername()) -> inet:port_number()).
peer_to_port({_, Port}) ->
Port.
conn_state_to_connected(connected) -> true;
conn_state_to_connected(_) -> false.
@ -419,7 +445,7 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) ->
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) ->
@ -523,6 +549,7 @@ schema_subscription() ->
%% properties defines
properties_client() ->
%% FIXME: enum for every protocol's client
emqx_mgmt_util:properties(
[ {node, string,
<<"Name of the node to which the client is connected">>}
@ -536,6 +563,8 @@ properties_client() ->
<<"Protocol version used by the client">>}
, {ip_address, string,
<<"Client's IP address">>}
, {port, integer,
<<"Client's port">>}
, {is_bridge, boolean,
<<"Indicates whether the client is connectedvia bridge">>}
, {connected_at, string,

View File

@ -112,7 +112,14 @@ listeners_insta_authn(get, #{bindings := #{name := Name0,
id := ListenerId0}}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(GwName, _) ->
{200, emqx_gateway_http:authn(GwName, ListenerId)}
try
emqx_gateway_http:authn(GwName, ListenerId)
of
Authn -> {200, Authn}
catch
error : {config_not_found, _} ->
{204}
end
end);
listeners_insta_authn(post, #{body := Conf,
bindings := #{name := Name0,
@ -222,6 +229,7 @@ swagger("/gateway/:name/listeners/:id/authentication", get) ->
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_authn()
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name/listeners/:id/authentication", post) ->

View File

@ -40,7 +40,6 @@ stop(_State) ->
load_default_gateway_applications() ->
Apps = gateway_type_searching(),
?LOG(info, "Starting the default gateway types: ~p", [Apps]),
lists:foreach(fun reg/1, Apps).
gateway_type_searching() ->
@ -51,12 +50,16 @@ gateway_type_searching() ->
reg(Mod) ->
try
Mod:reg(),
?LOG(info, "Register ~ts gateway application successfully!", [Mod])
?SLOG(debug, #{ msg => "register_gateway_succeed"
, callback_module => Mod
})
catch
Class : Reason : Stk ->
?LOG(error, "Failed to register ~ts gateway application: {~p, ~p}\n"
"Stacktrace: ~0p",
[Mod, Class, Reason, Stk])
?SLOG(error, #{ msg => "failed_to_register_gateway"
, callback_module => Mod
, reason => {Class, Reason}
, stacktrace => Stk
})
end.
load_gateway_by_default() ->
@ -67,14 +70,19 @@ load_gateway_by_default([]) ->
load_gateway_by_default([{Type, Confs}|More]) ->
case emqx_gateway_registry:lookup(Type) of
undefined ->
?LOG(error, "Skip to load ~ts gateway, because it is not registered",
[Type]);
?SLOG(error, #{ msg => "skip_to_load_gateway"
, gateway_name => Type
});
_ ->
case emqx_gateway:load(Type, Confs) of
{ok, _} ->
?LOG(debug, "Load ~ts gateway successfully!", [Type]);
?SLOG(debug, #{ msg => "load_gateway_succeed"
, gateway_name => Type
});
{error, Reason} ->
?LOG(error, "Failed to load ~ts gateway: ~0p", [Type, Reason])
?SLOG(error, #{ msg => "load_gateway_failed"
, gateway_name => Type
, reason => Reason})
end
end,
load_gateway_by_default(More).

View File

@ -282,8 +282,12 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
Session
catch
Class : Reason : Stk ->
?LOG(error, "Failed to create a session: ~p, ~p "
"Stacktrace:~0p", [Class, Reason, Stk]),
?SLOG(error, #{ msg => "failed_create_session"
, clientid => maps:get(clientid, ClientInfo, undefined)
, username => maps:get(username, ClientInfo, undefined)
, reason => {Class, Reason}
, stacktrace => Stk
}),
throw(Reason)
end.
@ -337,7 +341,9 @@ kick_session(GwName, ClientId) ->
kick_session(GwName, ClientId, ChanPid);
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
?SLOG(error, #{ msg => "more_than_one_channel_found"
, chan_pids => ChanPids
}),
lists:foreach(fun(StalePid) ->
catch discard_session(GwName, ClientId, StalePid)
end, StalePids),

View File

@ -365,7 +365,7 @@ pre_config_update(UnknownReq, _RawConf) ->
emqx_config:config(), emqx_config:app_envs())
-> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update(Req, NewConfig, OldConfig, _AppEnvs) ->
post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
[_Tag, GwName0|_] = tuple_to_list(Req),
GwName = binary_to_existing_atom(GwName0),
@ -379,4 +379,6 @@ post_config_update(Req, NewConfig, OldConfig, _AppEnvs) ->
emqx_gateway:load(GwName, New);
{New, Old} when is_map(New), is_map(Old) ->
emqx_gateway:update(GwName, New)
end.
end;
post_config_update(_Req, _NewConfig, _OldConfig, _AppEnvs) ->
ok.

View File

@ -294,7 +294,7 @@ with_channel(GwName, ClientId, Fun) ->
return_http_error(Code, Msg) ->
{Code, emqx_json:encode(
#{code => codestr(Code),
reason => emqx_gateway_utils:stringfy(Msg)
message => emqx_gateway_utils:stringfy(Msg)
})
}.
@ -336,8 +336,10 @@ with_gateway(GwName0, Fun) ->
error : {update_conf_error, already_exist} ->
return_http_error(400, "Resource already exist");
Class : Reason : Stk ->
?LOG(error, "Uncatched error: {~p, ~p}, stacktrace: ~0p",
[Class, Reason, Stk]),
?SLOG(error, #{ msg => "uncatched_error"
, reason => {Class, Reason}
, stacktrace => Stk
}),
return_http_error(500, {Class, Reason, Stk})
end.

View File

@ -103,7 +103,9 @@ init([Gateway, Ctx, _GwDscrptr]) ->
},
case maps:get(enable, Config, true) of
false ->
?LOG(info, "Skipp to start ~ts gateway due to disabled", [GwName]),
?SLOG(info, #{ msg => "skip_to_start_gateway_due_to_disabled"
, gateway_name => GwName
}),
{ok, State};
true ->
case cb_gateway_load(State) of
@ -160,13 +162,19 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) ->
handle_info({'EXIT', Pid, Reason}, State = #state{name = Name,
child_pids = Pids}) ->
case lists:member(Pid, Pids) of
true ->
?LOG(error, "Child process ~p exited: ~0p.", [Pid, Reason]),
?SLOG(error, #{ msg => "child_process_exited"
, child => Pid
, reason => Reason
}),
case Pids -- [Pid]of
[] ->
?LOG(error, "All child process exited!"),
?SLOG(error, #{ msg => "gateway_all_children_process_existed"
, gateway_name => Name
}),
{noreply, State#state{status = stopped,
child_pids = [],
gw_state = undefined}};
@ -174,12 +182,18 @@ handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) ->
{noreply, State#state{child_pids = RemainPids}}
end;
_ ->
?LOG(error, "Unknown process exited ~p:~0p", [Pid, Reason]),
?SLOG(error, #{ msg => "gateway_catch_a_unknown_process_exited"
, child => Pid
, reason => Reason
, gateway_name => Name
}),
{noreply, State}
end;
handle_info(Info, State) ->
?LOG(warning, "Unexcepted info: ~p", [Info]),
?SLOG(warning, #{ msg => "unexcepted_info"
, info => Info
}),
{noreply, State}.
terminate(_Reason, State = #state{child_pids = Pids}) ->
@ -266,14 +280,18 @@ do_create_authn_chain(ChainName, AuthConf) ->
case emqx_authentication:create_authenticator(ChainName, AuthConf) of
{ok, _} -> ok;
{error, Reason} ->
?LOG(error, "Failed to create authenticator chain ~ts, "
"reason: ~p, config: ~p",
[ChainName, Reason, AuthConf]),
?SLOG(error, #{ msg => "failed_to_create_authenticator"
, chain_name => ChainName
, reason => Reason
, config => AuthConf
}),
throw({badauth, Reason})
end;
{error, Reason} ->
?LOG(error, "Falied to create authn chain ~ts, reason ~p",
[ChainName, Reason]),
?SLOG(error, #{ msg => "failed_to_create_authn_chanin"
, chain_name => ChainName
, reason => Reason
}),
throw({badauth, Reason})
end.
@ -293,8 +311,10 @@ do_deinit_authn(Names) ->
ok -> ok;
{error, {not_found, _}} -> ok;
{error, Reason} ->
?LOG(error, "Failed to clean authentication chain: ~ts, "
"reason: ~p", [ChainName, Reason])
?SLOG(error, #{ msg => "failed_to_clean_authn_chain"
, chain_name => ChainName
, reason => Reason
})
end
end, Names).
@ -348,10 +368,12 @@ cb_gateway_unload(State = #state{name = GwName,
stopped_at = erlang:system_time(millisecond)}}
catch
Class : Reason : Stk ->
?LOG(error, "Failed to unload gateway (~0p, ~0p) crashed: "
"{~p, ~p}, stacktrace: ~0p",
[GwName, GwState,
Class, Reason, Stk]),
?SLOG(error, #{ msg => "unload_gateway_crashed"
, gateway_name => GwName
, inner_state => GwState
, reason => {Class, Reason}
, stacktrace => Stk
}),
{error, {Class, Reason, Stk}}
after
_ = do_deinit_authn(State#state.authns)
@ -388,10 +410,13 @@ cb_gateway_load(State = #state{name = GwName,
end
catch
Class : Reason1 : Stk ->
?LOG(error, "Failed to load ~ts gateway (~0p, ~0p) "
"crashed: {~p, ~p}, stacktrace: ~0p",
[GwName, Gateway, Ctx,
Class, Reason1, Stk]),
?SLOG(error, #{ msg => "load_gateway_crashed"
, gateway_name => GwName
, gateway => Gateway
, ctx => Ctx
, reason => {Class, Reason1}
, stacktrace => Stk
}),
{error, {Class, Reason1, Stk}}
end.
@ -413,9 +438,12 @@ cb_gateway_update(Config,
end
catch
Class : Reason1 : Stk ->
?LOG(error, "Failed to update ~ts gateway to config: ~0p crashed: "
"{~p, ~p}, stacktrace: ~0p",
[GwName, Config, Class, Reason1, Stk]),
?SLOG(error, #{ msg => "update_gateway_crashed"
, gateway_name => GwName
, new_config => Config
, reason => {Class, Reason1}
, stacktrace => Stk
}),
{error, {Class, Reason1, Stk}}
end.

View File

@ -28,16 +28,19 @@
-type ip_port() :: tuple().
-type duration() :: integer().
-type duration_s() :: integer().
-type bytesize() :: integer().
-type comma_separated_list() :: list().
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
-typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}).
-typerefl_from_string({comma_separated_list/0, emqx_schema,
to_comma_separated_list}).
-reflect_type([ duration/0
, duration_s/0
, bytesize/0
, comma_separated_list/0
, ip_port/0
@ -70,8 +73,8 @@ fields(stomp_frame) ->
fields(mqttsn) ->
[ {gateway_id, sc(integer())}
, {broadcast, sc(boolean())}
, {enable_qos3, sc(boolean())}
, {broadcast, sc(boolean(), false)}
, {enable_qos3, sc(boolean(), true)}
, {predefined, hoconsc:array(ref(mqttsn_predefined))}
, {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
@ -91,13 +94,14 @@ fields(coap) ->
] ++ gateway_common_options();
fields(lwm2m) ->
[ {xml_dir, sc(binary())}
[ {xml_dir, sc(binary(), "etc/lwm2m_xml")}
, {lifetime_min, sc(duration(), "1s")}
, {lifetime_max, sc(duration(), "86400s")}
, {qmode_time_window, sc(integer(), 22)}
, {qmode_time_window, sc(duration_s(), "22s")}
%% TODO: Support config resource path
, {auto_observe, sc(boolean(), false)}
, {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))}
, {translators, sc(ref(translators))}
, {translators, sc_meta(ref(translators), #{nullable => false})}
, {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
@ -109,14 +113,27 @@ fields(exproto) ->
fields(exproto_grpc_server) ->
[ {bind, sc(hoconsc:union([ip_port(), integer()]))}
%% TODO: ssl options
, {ssl, sc_meta(ref(ssl_server_opts),
#{nullable => {true, recursively}})}
];
fields(exproto_grpc_handler) ->
[ {address, sc(binary())}
%% TODO: ssl
, {ssl, sc_meta(ref(ssl_client_opts),
#{nullable => {true, recursively}})}
];
fields(ssl_server_opts) ->
emqx_schema:server_ssl_opts_schema(
#{ depth => 10
, reuse_sessions => true
, versions => tls_all_available
, ciphers => tls_all_available
}, true);
fields(ssl_client_opts) ->
emqx_schema:client_ssl_opts_schema(#{});
fields(clientinfo_override) ->
[ {username, sc(binary())}
, {password, sc(binary())}
@ -133,7 +150,7 @@ fields(translators) ->
fields(translator) ->
[ {topic, sc(binary())}
, {qos, sc(range(0, 2))}
, {qos, sc(range(0, 2), 0)}
];
fields(udp_listeners) ->

View File

@ -37,6 +37,7 @@
]).
-export([ stringfy/1
, parse_address/1
]).
-export([ normalize_config/1
@ -182,6 +183,19 @@ stringfy(T) when is_list(T); is_binary(T) ->
stringfy(T) ->
iolist_to_binary(io_lib:format("~0p", [T])).
-spec parse_address(binary()|list()) -> {list(), integer()}.
parse_address(S) when is_binary(S); is_list(S) ->
S1 = case is_binary(S) of
true -> lists:reverse(binary_to_list(S));
_ -> lists:reverse(S)
end,
case re:split(S1, ":", [{parts, 2}, {return, list}]) of
[Port0, Host0] ->
{lists:reverse(Host0), list_to_integer(lists:reverse(Port0))};
_ ->
error(badarg)
end.
-spec normalize_config(emqx_config:config())
-> list({ Type :: udp | tcp | ssl | dtls
, Name :: atom()

View File

@ -263,7 +263,9 @@ handle_call(close, _From, Channel) ->
handle_call({auth, ClientInfo, _Password}, _From,
Channel = #channel{conn_state = connected}) ->
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
?SLOG(warning, #{ msg => "ingore_duplicated_authorized_command"
, request_clientinfo => ClientInfo
}),
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
handle_call({auth, ClientInfo0, Password}, _From,
Channel = #channel{
@ -271,7 +273,7 @@ handle_call({auth, ClientInfo0, Password}, _From,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
ConnInfo1 = enrich_conninfo(ClientInfo1, ConnInfo),
ConnInfo1 = enrich_conninfo(ClientInfo0, ConnInfo),
Channel1 = Channel#channel{conninfo = ConnInfo1,
clientinfo = ClientInfo1},
@ -291,18 +293,25 @@ handle_call({auth, ClientInfo0, Password}, _From,
SessFun
) of
{ok, _Session} ->
?LOG(debug, "Client ~ts (Username: '~ts') authorized successfully!",
[ClientId, Username]),
?SLOG(debug, #{ msg => "client_login_succeed"
, clientid => ClientId
, username => Username
}),
{reply, ok, [{event, connected}],
ensure_connected(Channel1#channel{clientinfo = NClientInfo})};
{error, Reason} ->
?LOG(warning, "Client ~ts (Username: '~ts') open session failed for ~0p",
[ClientId, Username, Reason]),
?SLOG(warning, #{ msg => "client_login_failed"
, clientid => ClientId
, username => Username
, reason => Reason
}),
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
end;
{error, Reason} ->
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
[ClientId, Username, Reason]),
?SLOG(warning, #{ msg => "client_login_failed"
, clientid => ClientId
, username => Username
, reason => Reason}),
{reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
end;
@ -363,7 +372,9 @@ handle_call(kick, _From, Channel) ->
{shutdown, kicked, ok, Channel};
handle_call(Req, _From, Channel) ->
?LOG(warning, "Unexpected call: ~p", [Req]),
?SLOG(warning, #{ msg => "unexpected_call"
, call => Req
}),
{reply, {error, unexpected_call}, Channel}.
-spec handle_cast(any(), channel())
@ -371,7 +382,9 @@ handle_call(Req, _From, Channel) ->
| {ok, replies(), channel()}
| {shutdown, Reason :: term(), channel()}.
handle_cast(Req, Channel) ->
?WARN("Unexpected call: ~p", [Req]),
?SLOG(warning, #{ msg => "unexpected_call"
, call => Req
}),
{ok, Channel}.
-spec handle_info(any(), channel())
@ -383,7 +396,7 @@ handle_info({sock_closed, Reason},
andalso Inflight =:= undefined of
true ->
Channel1 = ensure_disconnected({sock_closed, Reason}, Channel),
{shutdown, {sock_closed, Reason}, Channel1};
{shutdown, Reason, Channel1};
_ ->
%% delayed close process for flushing all callback funcs to gRPC server
Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}},
@ -403,7 +416,9 @@ handle_info({hreply, FunName, {error, Reason}}, Channel) ->
{shutdown, {error, {FunName, Reason}}, Channel};
handle_info(Info, Channel) ->
?LOG(warning, "Unexpected info: ~p", [Info]),
?SLOG(warning, #{ msg => "unexpected_info"
, info => Info
}),
{ok, Channel}.
-spec terminate(any(), channel()) -> channel().

View File

@ -82,22 +82,42 @@ handle_call(_Request, _From, State) ->
handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) ->
case ensure_stream_opened(Fun, Options, Streams) of
{error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p) failed, reason: ~0p",
[?CONN_ADAPTER_MOD, Fun, Options, Reason]),
?SLOG(error, #{ msg => "request_grpc_server_failed"
, function => {?CONN_ADAPTER_MOD, Fun, Options}
, reason => Reason}),
reply(From, Fun, {error, Reason}),
{noreply, State#state{streams = Streams#{Fun => undefined}}};
{ok, Stream} ->
case catch grpc_client:send(Stream, Req) of
ok ->
?LOG(debug, "Send to ~p method successfully, request: ~0p", [Fun, Req]),
?SLOG(debug, #{ msg => "send_grpc_request_succeed"
, function => {?CONN_ADAPTER_MOD, Fun}
, request => Req
}),
reply(From, Fun, ok),
{noreply, State#state{streams = Streams#{Fun => Stream}}};
{'EXIT', {not_found, _Stk}} ->
%% Not found the stream, reopen it
?SLOG(info, #{ msg => "cannt_find_old_stream_ref"
, function => {?CONN_ADAPTER_MOD, Fun}
}),
handle_cast(
{rpc, Fun, Req, Options, From},
State#state{streams = maps:remove(Fun, Streams)});
{'EXIT', {timeout, _Stk}} ->
?LOG(error, "Send to ~p method timeout, request: ~0p", [Fun, Req]),
?SLOG(error, #{ msg => "send_grpc_request_timeout"
, function => {?CONN_ADAPTER_MOD, Fun}
, request => Req
}),
reply(From, Fun, {error, timeout}),
{noreply, State#state{streams = Streams#{Fun => Stream}}};
{'EXIT', {Reason1, _Stk}} ->
?LOG(error, "Send to ~p method failure, request: ~0p, stacktrace: ~0p", [Fun, Req, _Stk]),
{'EXIT', {Reason1, Stk}} ->
?SLOG(error, #{ msg => "send_grpc_request_failed"
, function => {?CONN_ADAPTER_MOD, Fun}
, request => Req
, error => Reason1
, stacktrace => Stk
}),
reply(From, Fun, {error, Reason1}),
{noreply, State#state{streams = Streams#{Fun => undefined}}}
end

View File

@ -44,14 +44,20 @@
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
send(Req = #{conn := Conn, bytes := Bytes}, Md) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response(call(Conn, {send, Bytes})), Md}.
-spec close(emqx_exproto_pb:close_socket_request(), grpc:metadata())
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
close(Req = #{conn := Conn}, Md) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response(call(Conn, close)), Md}.
-spec authenticate(emqx_exproto_pb:authenticate_request(), grpc:metadata())
@ -60,7 +66,10 @@ close(Req = #{conn := Conn}, Md) ->
authenticate(Req = #{conn := Conn,
password := Password,
clientinfo := ClientInfo}, Md) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
case validate(clientinfo, ClientInfo) of
false ->
{ok, response({error, ?RESP_REQUIRED_PARAMS_MISSED}), Md};
@ -73,10 +82,18 @@ authenticate(Req = #{conn := Conn,
| {error, grpc_cowboy_h:error_response()}.
start_timer(Req = #{conn := Conn, type := Type, interval := Interval}, Md)
when Type =:= 'KEEPALIVE' andalso Interval > 0 ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response(call(Conn, {start_timer, keepalive, Interval})), Md};
start_timer(Req, Md) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
-spec publish(emqx_exproto_pb:publish_request(), grpc:metadata())
@ -84,11 +101,18 @@ start_timer(Req, Md) ->
| {error, grpc_cowboy_h:error_response()}.
publish(Req = #{conn := Conn, topic := Topic, qos := Qos, payload := Payload}, Md)
when ?IS_QOS(Qos) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response(call(Conn, {publish, Topic, Qos, Payload})), Md};
publish(Req, Md) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
-spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata())
@ -96,18 +120,27 @@ publish(Req, Md) ->
| {error, grpc_cowboy_h:error_response()}.
subscribe(Req = #{conn := Conn, topic := Topic, qos := Qos}, Md)
when ?IS_QOS(Qos) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response(call(Conn, {subscribe_from_client, Topic, Qos})), Md};
subscribe(Req, Md) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}.
-spec unsubscribe(emqx_exproto_pb:unsubscribe_request(), grpc:metadata())
-> {ok, emqx_exproto_pb:code_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) ->
?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]),
?SLOG(debug, #{ msg => "recv_grpc_function_call"
, function => ?FUNCTION_NAME
, request => Req
}),
{ok, response(call(Conn, {unsubscribe_from_client, Topic})), Md}.
%%--------------------------------------------------------------------
@ -130,9 +163,12 @@ call(ConnStr, Req) ->
exit : timeout ->
{error, ?RESP_UNKNOWN, <<"Connection is not answered">>};
Class : Reason : Stk->
?LOG(error, "Call ~p crashed: {~0p, ~0p}, "
"stacktrace: ~0p",
[Class, Reason, Stk]),
?SLOG(error, #{ msg => "call_conn_process_crashed"
, request => Req
, conn_str=> ConnStr
, reason => {Class, Reason}
, stacktrace => Stk
}),
{error, ?RESP_UNKNOWN, <<"Unkwown crashs">>}
end.

View File

@ -62,26 +62,34 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
_ = grpc:start_server(GwName, ListenOn, Services, SvrOptions),
?ULOG("Start ~ts gRPC server on ~p successfully.~n", [GwName, ListenOn]).
start_grpc_client_channel(_GwType, undefined) ->
stop_grpc_server(GwName) ->
_ = grpc:stop_server(GwName),
?ULOG("Stop ~s gRPC server successfully.~n", [GwName]).
start_grpc_client_channel(_GwName, undefined) ->
undefined;
start_grpc_client_channel(GwName, Options = #{address := UriStr}) ->
UriMap = uri_string:parse(UriStr),
Scheme = maps:get(scheme, UriMap),
Host = maps:get(host, UriMap),
Port = maps:get(port, UriMap),
SvrAddr = lists:flatten(
io_lib:format(
"~ts://~ts:~w", [Scheme, Host, Port])
),
ClientOpts = case Scheme of
"https" ->
SslOpts = maps:to_list(maps:get(ssl, Options, #{})),
#{gun_opts =>
start_grpc_client_channel(GwName, Options = #{address := Address}) ->
{Host, Port} = emqx_gateway_utils:parse_address(Address),
case maps:to_list(maps:get(ssl, Options, #{})) of
[] ->
SvrAddr = compose_http_uri(http, Host, Port),
grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{});
SslOpts ->
ClientOpts = #{gun_opts =>
#{transport => ssl,
transport_opts => SslOpts}};
_ -> #{}
end,
grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts).
transport_opts => SslOpts}},
SvrAddr = compose_http_uri(https, Host, Port),
grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts)
end.
compose_http_uri(Scheme, Host, Port) ->
lists:flatten(
io_lib:format(
"~s://~s:~w", [Scheme, Host, Port])).
stop_grpc_client_channel(GwName) ->
_ = grpc_client_sup:stop_channel_pool(GwName),
ok.
on_gateway_load(_Gateway = #{ name := GwName,
config := Config
@ -90,10 +98,12 @@ on_gateway_load(_Gateway = #{ name := GwName,
%% Start grpc client pool & client channel
PoolName = pool_name(GwName),
PoolSize = emqx_vm:schedulers() * 2,
{ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize,
{ok, PoolSup} = emqx_pool_sup:start_link(
PoolName, hash, PoolSize,
{emqx_exproto_gcli, start_link, []}),
_ = start_grpc_client_channel(GwName, maps:get(handler, Config, undefined)),
_ = start_grpc_client_channel(GwName,
maps:get(handler, Config, undefined)
),
%% XXX: How to monitor it ?
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
@ -107,7 +117,7 @@ on_gateway_load(_Gateway = #{ name := GwName,
ListenerPids = lists:map(fun(Lis) ->
start_listener(GwName, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
{ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}.
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
GwName = maps:get(name, Gateway),
@ -126,8 +136,12 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
on_gateway_unload(_Gateway = #{ name := GwName,
config := Config
}, _GwState) ->
}, _GwState = #{pool := PoolSup}) ->
Listeners = emqx_gateway_utils:normalize_config(Config),
%% Stop funcs???
exit(PoolSup, kill),
stop_grpc_server(GwName),
stop_grpc_client_channel(GwName),
lists:foreach(fun(Lis) ->
stop_listener(GwName, Lis)
end, Listeners).

View File

@ -49,19 +49,26 @@
clientinfo :: emqx_types:clientinfo(),
%% Session
session :: emqx_lwm2m_session:session() | undefined,
%% Timer
timers :: #{atom() => disable | undefined | reference()},
with_context :: function()
}).
%% TODO:
-define(DEFAULT_OVERRIDE,
#{ clientid => <<"">> %% Generate clientid by default
, username => <<"${Packet.uri_query.ep}">>
, password => <<"">>
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
@ -75,7 +82,7 @@ info(conn_state, _) ->
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
emqx_misc:maybe_apply(fun emqx_session:info/1, Session);
emqx_misc:maybe_apply(fun emqx_lwm2m_session:info/1, Session);
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(ctx, #channel{ctx = Ctx}) ->
@ -114,10 +121,10 @@ init(ConnInfo = #{peername := {PeerHost, _},
, clientinfo = ClientInfo
, timers = #{}
, session = emqx_lwm2m_session:new()
%% FIXME: don't store anonymouse func
, with_context = with_context(Ctx, ClientInfo)
}.
with_context(Ctx, ClientInfo) ->
fun(Type, Topic) ->
with_context(Type, Topic, Ctx, ClientInfo)
@ -165,14 +172,18 @@ handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Chann
{reply, {ok, Result}, Channel};
handle_call(Req, _From, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]),
?SLOG(error, #{ msg => "unexpected_call"
, call => Req
}),
{reply, ignored, Channel}.
%%--------------------------------------------------------------------
%% Handle Cast
%%--------------------------------------------------------------------
handle_cast(Req, Channel) ->
?LOG(error, "Unexpected cast: ~p", [Req]),
?SLOG(error, #{ msg => "unexpected_cast"
, cast => Req
}),
{ok, Channel}.
%%--------------------------------------------------------------------
@ -183,7 +194,9 @@ handle_info({subscribe, _AutoSubs}, Channel) ->
{ok, Channel};
handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p", [Info]),
?SLOG(error, #{ msg => "unexpected_info"
, info => Info
}),
{ok, Channel}.
%%--------------------------------------------------------------------
@ -276,7 +289,9 @@ check_lwm2m_version(#coap_message{options = Opts},
},
{ok, Channel#channel{conninfo = NConnInfo}};
true ->
?LOG(error, "Reject REGISTER due to unsupported version: ~0p", [Ver]),
?SLOG(error, #{ msg => "reject_REGISTRE_request"
, reason => {unsupported_version, Ver}
}),
{error, "invalid lwm2m version", Channel}
end.
@ -293,18 +308,22 @@ enrich_clientinfo(#coap_message{options = Options} = Msg,
Channel = #channel{clientinfo = ClientInfo0}) ->
Query = maps:get(uri_query, Options, #{}),
case Query of
#{<<"ep">> := Epn} ->
UserName = maps:get(<<"imei">>, Query, Epn),
#{<<"ep">> := Epn, <<"lt">> := Lifetime} ->
Username = maps:get(<<"imei">>, Query, Epn),
Password = maps:get(<<"password">>, Query, undefined),
ClientId = maps:get(<<"device_id">>, Query, Epn),
ClientInfo =
ClientInfo0#{username => UserName,
ClientInfo0#{endpoint_name => Epn,
lifetime => binary_to_integer(Lifetime),
username => Username,
password => Password,
clientid => ClientId},
{ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
{ok, Channel#channel{clientinfo = NClientInfo}};
_ ->
?LOG(error, "Reject REGISTER due to wrong parameters, Query=~p", [Query]),
?SLOG(error, #{ msg => "reject_REGISTER_request"
, reason => {wrong_paramters, Query}
}),
{error, "invalid queries", Channel}
end.
@ -320,8 +339,11 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx,
{ok, Channel#channel{clientinfo = NClientInfo,
with_context = with_context(Ctx, ClientInfo)}};
{error, Reason} ->
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
[ClientId, Username, Reason]),
?SLOG(warning, #{ msg => "client_login_failed"
, clientid => ClientId
, username => Username
, reason => Reason
}),
{error, Reason}
end.
@ -356,10 +378,18 @@ process_connect(Channel = #channel{ctx = Ctx,
) of
{ok, _} ->
Mountpoint = maps:get(mountpoint, ClientInfo, <<>>),
NewResult = emqx_lwm2m_session:init(Msg, Mountpoint, WithContext, Session),
iter(Iter, maps:merge(Result, NewResult), Channel);
NewResult0 = emqx_lwm2m_session:init(
Msg,
Mountpoint,
WithContext,
Session
),
NewResult1 = NewResult0#{events => [{event, connected}]},
iter(Iter, maps:merge(Result, NewResult1), Channel);
{error, Reason} ->
?LOG(error, "Failed to open session du to ~p", [Reason]),
?SLOG(error, #{ msg => "falied_to_open_session"
, reason => Reason
}),
iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
end.
@ -383,17 +413,24 @@ with_context(publish, [Topic, Msg], Ctx, ClientInfo) ->
allow ->
emqx:publish(Msg);
_ ->
?LOG(error, "topic:~p not allow to publish ", [Topic])
?SLOG(error, #{ msg => "publish_denied"
, topic => Topic
})
end;
with_context(subscribe, [Topic, Opts], Ctx, #{username := UserName} = ClientInfo) ->
with_context(subscribe, [Topic, Opts], Ctx, #{username := Username} = ClientInfo) ->
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic) of
allow ->
run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, UserName]),
?LOG(debug, "Subscribe topic: ~0p, Opts: ~0p, EndpointName: ~0p", [Topic, Opts, UserName]),
emqx:subscribe(Topic, UserName, Opts);
run_hooks(Ctx, 'session.subscribed', [ClientInfo, Topic, Opts]),
?SLOG(debug, #{ msg => "subscribe_topic_succeed"
, topic => Topic
, endpoint_name => Username
}),
emqx:subscribe(Topic, Username, Opts);
_ ->
?LOG(error, "Topic: ~0p not allow to subscribe", [Topic])
?SLOG(error, #{ msg => "subscribe_denied"
, topic => Topic
})
end;
with_context(metrics, Name, Ctx, _ClientInfo) ->
@ -479,14 +516,15 @@ process_out(Outs, Result, Channel, _) ->
Reply ->
[Reply | Outs2]
end,
{ok, {outgoing, Outs3}, Channel}.
Events = maps:get(events, Result, []),
{ok, [{outgoing, Outs3}] ++ Events, Channel}.
process_reply(Reply, Result, #channel{session = Session} = Channel, _) ->
Session2 = emqx_lwm2m_session:set_reply(Reply, Session),
Outs = maps:get(out, Result, []),
Outs2 = lists:reverse(Outs),
{ok, {outgoing, [Reply | Outs2]}, Channel#channel{session = Session2}}.
Events = maps:get(events, Result, []),
{ok, [{outgoing, [Reply | Outs2]}] ++ Events, Channel#channel{session = Session2}}.
process_lifetime(_, Result, Channel, Iter) ->
iter(Iter, Result, update_life_timer(Channel)).

View File

@ -168,7 +168,10 @@ read_resp_to_mqtt({ok, SuccessCode}, CoapPayload, Format, Ref) ->
catch
error:not_implemented -> make_response(not_implemented, Ref);
_:Ex:_ST ->
?LOG(error, "~0p, bad payload format: ~0p", [Ex, CoapPayload]),
?SLOG(error, #{ msg => "bad_payload_format"
, payload => CoapPayload
, reason => Ex
, stacktrace => _ST}),
make_response(bad_request, Ref)
end.

View File

@ -51,21 +51,21 @@ on_gateway_load(_Gateway = #{ name := GwName,
config := Config
}, Ctx) ->
%% Xml registry
{ok, _} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, Config)),
{ok, RegPid} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, Config)),
Listeners = emqx_gateway_utils:normalize_config(Config),
ListenerPids = lists:map(fun(Lis) ->
start_listener(GwName, Ctx, Lis)
end, Listeners),
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
{ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}}.
on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
GwName = maps:get(name, NewGateway),
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
GwName = maps:get(name, Gateway),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_gateway_unload(OldGateway, GwState),
on_gateway_load(NewGateway, Ctx)
on_gateway_unload(Gateway, GwState),
on_gateway_load(Gateway#{config => Config}, Ctx)
catch
Class : Reason : Stk ->
logger:error("Failed to update ~ts; "
@ -76,7 +76,8 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) ->
on_gateway_unload(_Gateway = #{ name := GwName,
config := Config
}, _GwState) ->
}, _GwState = #{registry := RegPid}) ->
exit(RegPid, kill),
Listeners = emqx_gateway_utils:normalize_config(Config),
lists:foreach(fun(Lis) ->
stop_listener(GwName, Lis)

View File

@ -25,8 +25,6 @@
-include("emqx_lwm2m.hrl").
-define(LOG(Level, Format, Args), logger:Level("LWM2M-JSON: " ++ Format, Args)).
tlv_to_json(BaseName, TlvData) ->
DecodedTlv = emqx_lwm2m_tlv:parse(TlvData),
ObjectId = object_id(BaseName),

View File

@ -62,6 +62,7 @@
, is_cache_mode :: boolean()
, mountpoint :: binary()
, last_active_at :: non_neg_integer()
, created_at :: non_neg_integer()
, cmd_record :: cmd_record()
}).
@ -109,6 +110,7 @@ new() ->
#session{ coap = emqx_coap_tm:new()
, queue = queue:new()
, last_active_at = ?NOW
, created_at = erlang:system_time(millisecond)
, is_cache_mode = false
, mountpoint = <<>>
, cmd_record = #{}
@ -206,7 +208,7 @@ info(awaiting_rel_max, _) ->
infinity;
info(await_rel_timeout, _) ->
infinity;
info(created_at, #session{last_active_at = CreatedAt}) ->
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt.
%% @doc Get stats of the session.
@ -343,7 +345,7 @@ update(#coap_message{options = Opts, payload = Payload} = Msg,
WithContext,
CmdType,
#session{reg_info = OldRegInfo} = Session) ->
Query = maps:get(uri_query, Opts),
Query = maps:get(uri_query, Opts, #{}),
RegInfo = append_object_list(Query, Payload),
UpdateRegInfo = maps:merge(OldRegInfo, RegInfo),
LifeTime = get_lifetime(UpdateRegInfo, OldRegInfo),
@ -403,7 +405,7 @@ send_auto_observe(RegInfo, Session) ->
ObjectList = maps:get(<<"objectList">>, RegInfo, []),
observe_object_list(AlternatePath, ObjectList, Session);
_ ->
?LOG(info, "Auto Observe Disabled", []),
?SLOG(info, #{ msg => "skip_auto_observe_due_to_disabled"}),
Session
end.
@ -433,7 +435,10 @@ observe_object(AlternatePath, ObjectPath, Session) ->
deliver_auto_observe_to_coap(AlternatePath, Payload, Session).
deliver_auto_observe_to_coap(AlternatePath, TermData, Session) ->
?LOG(info, "Auto Observe, SEND To CoAP, AlternatePath=~0p, Data=~0p ", [AlternatePath, TermData]),
?SLOG(info, #{ msg => "send_auto_observe"
, path => AlternatePath
, data => TermData
}),
{Req, Ctx} = emqx_lwm2m_cmd:mqtt_to_coap(AlternatePath, TermData),
maybe_do_deliver_to_coap(Ctx, Req, 0, false, Session).
@ -566,11 +571,15 @@ send_to_coap(#session{queue = Queue} = Session) ->
end.
send_to_coap(Ctx, Req, Session) ->
?LOG(debug, "Deliver To CoAP, CoapRequest: ~0p", [Req]),
?SLOG(debug, #{ msg => "deliver_to_coap"
, coap_request => Req
}),
out_to_coap(Ctx, Req, Session#session{wait_ack = Ctx}).
send_msg_not_waiting_ack(Ctx, Req, Session) ->
?LOG(debug, "Deliver To CoAP not waiting ack, CoapRequest: ~0p", [Req]),
?SLOG(debug, #{ msg => "deliver_to_coap_and_no_ack"
, coap_request => Req
}),
%% cmd_sent(Ref, LwM2MOpts).
out_to_coap(Ctx, Req, Session).
@ -598,6 +607,7 @@ proto_publish(Topic, Payload, Qos, Headers, WithContext,
mount(Topic, #session{mountpoint = MountPoint}) when is_binary(Topic) ->
<<MountPoint/binary, Topic/binary>>.
%% XXX: get these confs from params instead of shared mem
downlink_topic() ->
emqx:get_config([gateway, lwm2m, translators, command]).
@ -633,8 +643,11 @@ deliver_to_coap(AlternatePath, JsonData, MQTT, CacheMode, WithContext, Session)
deliver_to_coap(AlternatePath, TermData, MQTT, CacheMode, WithContext, Session)
catch
ExClass:Error:ST ->
?LOG(error, "deliver_to_coap - Invalid JSON: ~0p, Exception: ~0p, stacktrace: ~0p",
[JsonData, {ExClass, Error}, ST]),
?SLOG(error, #{ msg => "invaild_json_format_to_deliver"
, data => JsonData
, reason => {ExClass, Error}
, stacktrace => ST
}),
WithContext(metrics, 'delivery.dropped'),
Session
end;

View File

@ -27,8 +27,6 @@
-include("emqx_lwm2m.hrl").
-define(LOG(Level, Format, Args), logger:Level("LWM2M-TLV: " ++ Format, Args)).
-define(TLV_TYPE_OBJECT_INSTANCE, 0).
-define(TLV_TYPE_RESOURCE_INSTANCE, 1).
-define(TLV_TYPE_MULTIPLE_RESOURCE, 2).
@ -39,8 +37,6 @@
-define(TLV_LEGNTH_16_BIT, 2).
-define(TLV_LEGNTH_24_BIT, 3).
%----------------------------------------------------------------------------------------------------------------------------------------
% [#{tlv_object_instance := Id11, value := Value11}, #{tlv_object_instance := Id12, value := Value12}, ...]
% where Value11 and Value12 is a list:

View File

@ -28,9 +28,6 @@
, get_resource_operations/2
]).
-define(LOG(Level, Format, Args),
logger:Level("LWM2M-OBJ: " ++ Format, Args)).
% This module is for future use. Disabled now.
get_obj_def(ObjectIdInt, true) ->
@ -50,7 +47,6 @@ get_object_and_resource_id(ResourceNameBinary, ObjDefinition) ->
ResourceNameString = binary_to_list(ResourceNameBinary),
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
[#xmlAttribute{value=ResourceId}] = xmerl_xpath:string("Resources/Item/Name[.=\""++ResourceNameString++"\"]/../@ID", ObjDefinition),
?LOG(debug, "get_object_and_resource_id ObjectId=~p, ResourceId=~p", [ObjectId, ResourceId]),
{ObjectId, ResourceId}.
get_resource_type(ResourceIdInt, ObjDefinition) ->

View File

@ -17,6 +17,7 @@
-module(emqx_lwm2m_xml_object_db).
-include_lib("xmerl/include/xmerl.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_lwm2m.hrl").
% This module is for future use. Disabled now.
@ -37,9 +38,6 @@
, code_change/3
]).
-define(LOG(Level, Format, Args),
logger:Level("LWM2M-OBJ-DB: " ++ Format, Args)).
-define(LWM2M_OBJECT_DEF_TAB, lwm2m_object_def_tab).
-define(LWM2M_OBJECT_NAME_TO_ID_TAB, lwm2m_object_name_to_id_tab).
@ -130,7 +128,11 @@ load_loop([FileName|T]) ->
[#xmlText{value=Name}] = xmerl_xpath:string("Name/text()", ObjectXml),
ObjectId = list_to_integer(ObjectIdString),
NameBinary = list_to_binary(Name),
?LOG(debug, "load_loop FileName=~p, ObjectId=~p, Name=~p", [FileName, ObjectId, NameBinary]),
?SLOG(debug, #{ msg => "load_object_succeed"
, filename => FileName
, object_id => ObjectId
, object_name => NameBinary
}),
ets:insert(?LWM2M_OBJECT_DEF_TAB, {ObjectId, ObjectXml}),
ets:insert(?LWM2M_OBJECT_NAME_TO_ID_TAB, {NameBinary, ObjectId}),
load_loop(T).

View File

@ -57,18 +57,24 @@ init([GwId, Port]) ->
sock = Sock, port = Port, duration = Duration})}.
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected request: ~p", [Req]),
?SLOG(error, #{ msg => "unexpected_call"
, call => Req
}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected msg: ~p", [Msg]),
?SLOG(error, #{ msg => "unexpected_cast"
, cast => Msg
}),
{noreply, State}.
handle_info(broadcast_advertise, State) ->
{noreply, ensure_advertise(State), hibernate};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
?SLOG(error, #{ msg => "unexpected_info"
, info => Info
}),
{noreply, State}.
terminate(_Reason, #state{tref = Timer}) ->
@ -90,7 +96,9 @@ send_advertise(#state{gwid = GwId, sock = Sock, port = Port,
addrs = Addrs, duration = Duration}) ->
Data = emqx_sn_frame:serialize_pkt(?SN_ADVERTISE_MSG(GwId, Duration), #{}),
lists:foreach(fun(Addr) ->
?LOG(debug, "SEND SN_ADVERTISE to ~p~n", [Addr]),
?SLOG(debug, #{ msg => "send_ADVERTISE_msg"
, address => Addr
}),
gen_udp:send(Sock, Addr, Port, Data)
end, Addrs).

View File

@ -287,8 +287,11 @@ auth_connect(_Packet, Channel = #channel{ctx = Ctx,
{ok, NClientInfo} ->
{ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} ->
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
[ClientId, Username, Reason]),
?SLOG(warning, #{ msg => "client_login_failed"
, clientid => ClientId
, username => Username
, reason => Reason
}),
%% FIXME: ReasonCode?
{error, Reason}
end.
@ -321,7 +324,9 @@ process_connect(Channel = #channel{
handle_out(connack, ?SN_RC_ACCEPTED,
Channel#channel{session = Session});
{error, Reason} ->
?LOG(error, "Failed to open session due to ~p", [Reason]),
?SLOG(error, #{ msg => "failed_to_open_session"
, reason => Reason
}),
handle_out(connack, ?SN_RC_FAILED_SESSION, Channel)
end.
@ -383,19 +388,24 @@ handle_in(?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
false ->
ok
end,
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!",
[?NEG_QOS_CLIENT_ID]),
?SLOG(debug, #{ msg => "receive_qo3_message_in_idle_mode"
, topic => TopicName
, data => Data
}),
{ok, Channel};
handle_in(Pkt = #mqtt_sn_message{type = Type},
Channel = #channel{conn_state = idle})
when Type /= ?SN_CONNECT ->
?LOG(warning, "Receive unknown packet ~0p in idle state", [Pkt]),
?SLOG(warning, #{ msg => "receive_unknown_packet_in_idle_state"
, packet => Pkt
}),
shutdown(normal, Channel);
handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
Channel = #channel{conn_state = connecting}) ->
?LOG(warning, "Receive connect packet in connecting state"),
?SLOG(warning, #{ msg => "receive_connect_packet_in_connecting_state"
}),
{ok, Channel};
handle_in(?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId),
@ -461,12 +471,17 @@ handle_in(?SN_REGISTER_MSG(_TopicId, MsgId, TopicName),
clientinfo = #{clientid := ClientId}}) ->
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
TopicId when is_integer(TopicId) ->
?LOG(debug, "register TopicName=~p, TopicId=~p",
[TopicName, TopicId]),
?SLOG(debug, #{ msg => "registered_topic_name"
, topic_name => TopicName
, topic_id => TopicId
}),
AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED),
{ok, {outgoing, AckPacket}, Channel};
{error, too_large} ->
?LOG(error, "TopicId is full! TopicName=~p", [TopicName]),
?SLOG(error, #{ msg => "register_topic_failed"
, topic_name => TopicName
, reason => topic_id_fulled
}),
AckPacket = ?SN_REGACK_MSG(
?SN_INVALID_TOPIC_ID,
MsgId,
@ -474,8 +489,10 @@ handle_in(?SN_REGISTER_MSG(_TopicId, MsgId, TopicName),
),
{ok, {outgoing, AckPacket}, Channel};
{error, wildcard_topic} ->
?LOG(error, "wildcard topic can not be registered! TopicName=~p",
[TopicName]),
?SLOG(error, #{ msg => "register_topic_failed"
, topic_name => TopicName
, reason => not_support_wildcard_topic
}),
AckPacket = ?SN_REGACK_MSG(
?SN_INVALID_TOPIC_ID,
MsgId,
@ -520,13 +537,17 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
Publishes,
Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBACK MsgId ~w is inuse.",
[MsgId]),
?SLOG(warning, #{ msg => "commit_puback_failed"
, msg_id => MsgId
, reason => msg_id_inused
}),
ok = metrics_inc(Ctx, 'packets.puback.inuse'),
{ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBACK MsgId ~w is not found.",
[MsgId]),
?SLOG(warning, #{ msg => "commit_puback_failed"
, msg_id => MsgId
, reason => not_found
}),
ok = metrics_inc(Ctx, 'packets.puback.missed'),
{ok, Channel}
end;
@ -543,7 +564,9 @@ handle_in(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
{ok, {outgoing, RegPkt}, Channel}
end;
_ ->
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
?SLOG(error, #{ msg => "cannt_handle_PUBACK"
, return_code => ReturnCode
}),
{ok, Channel}
end;
@ -557,11 +580,17 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREC, MsgId),
NChannel = Channel#channel{session = NSession},
handle_out(pubrel, MsgId, NChannel);
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?LOG(warning, "The PUBREC MsgId ~w is inuse.", [MsgId]),
?SLOG(warning, #{ msg => "commit_PUBREC_failed"
, msg_id => MsgId
, reason => msg_id_inused
}),
ok = metrics_inc(Ctx, 'packets.pubrec.inuse'),
handle_out(pubrel, MsgId, Channel);
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREC ~w is not found.", [MsgId]),
?SLOG(warning, #{ msg => "commit_PUBREC_failed"
, msg_id => MsgId
, reason => not_found
}),
ok = metrics_inc(Ctx, 'packets.pubrec.missed'),
handle_out(pubrel, MsgId, Channel)
end;
@ -573,7 +602,10 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, MsgId, NChannel);
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBREL MsgId ~w is not found.", [MsgId]),
?SLOG(warning, #{ msg => "commit_PUBREL_failed"
, msg_id => MsgId
, reason => not_found
}),
ok = metrics_inc(Ctx, 'packets.pubrel.missed'),
handle_out(pubcomp, MsgId, Channel)
end;
@ -587,10 +619,17 @@ handle_in(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
handle_out(publish, Publishes,
Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?SLOG(warning, #{ msg => "commit_PUBCOMP_failed"
, msg_id => MsgId
, reason => msg_id_inused
}),
ok = metrics_inc(Ctx, 'packets.pubcomp.inuse'),
{ok, Channel};
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?LOG(warning, "The PUBCOMP MsgId ~w is not found", [MsgId]),
?SLOG(warning, #{ msg => "commit_PUBCOMP_failed"
, msg_id => MsgId
, reason => not_found
}),
ok = metrics_inc(Ctx, 'packets.pubcomp.missed'),
{ok, Channel}
end;
@ -626,8 +665,10 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName),
UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
{ok, outgoing_and_update(UnsubAck), NChannel};
{error, Reason, NChannel} ->
?LOG(warning, "Unsubscribe ~p failed: ~0p",
[TopicIdOrName, Reason]),
?SLOG(warning, #{ msg => "unsubscribe_failed"
, topic => TopicIdOrName
, reason => Reason
}),
%% XXX: Even if it fails, the reply is successful.
UnsubAck = ?SN_UNSUBACK_MSG(MsgId),
{ok, {outgoing, UnsubAck}, NChannel}
@ -674,7 +715,9 @@ handle_in(?SN_WILLMSGUPD_MSG(Payload),
handle_in({frame_error, Reason},
Channel = #channel{conn_state = _ConnState}) ->
?LOG(error, "Unexpected frame error: ~p", [Reason]),
?SLOG(error, #{ msg => "unexpected_frame_error"
, reason => Reason
}),
shutdown(Reason, Channel).
after_message_acked(ClientInfo, Msg, #channel{ctx = Ctx}) ->
@ -689,13 +732,15 @@ outgoing_and_update(Pkt) ->
%%--------------------------------------------------------------------
%% Handle Publish
check_qos3_enable(?SN_PUBLISH_MSG(Flags, _, _, _),
check_qos3_enable(?SN_PUBLISH_MSG(Flags, TopicId, _MsgId, Data),
#channel{enable_qos3 = EnableQoS3}) ->
#mqtt_sn_flags{qos = QoS} = Flags,
case EnableQoS3 =:= false andalso QoS =:= ?QOS_NEG1 of
true ->
?LOG(debug, "The enable_qos3 is false, ignore the received "
"publish with QoS=-1 in connected mode!"),
?SLOG(debug, #{ msg => "ignore_msg_due_to_qos3_disabled"
, topic_id => TopicId
, data => Data
}),
{error, ?SN_RC_NOT_SUPPORTED};
false ->
ok
@ -781,8 +826,9 @@ do_publish(TopicId, MsgId, Msg = #message{qos = ?QOS_2},
handle_out(puback , {TopicId, MsgId, ?SN_RC_NOT_SUPPORTED},
Channel);
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} ->
?LOG(warning, "Dropped the qos2 packet ~w "
"due to awaiting_rel is full.", [MsgId]),
?SLOG(warning, #{ msg => "dropped_the_qos2_packet_due_to_awaiting_rel_full"
, msg_id => MsgId
}),
ok = metrics_inc(Ctx, 'packets.publish.dropped'),
handle_out(puback, {TopicId, MsgId, ?SN_RC_CONGESTION}, Channel)
end.
@ -860,8 +906,10 @@ run_client_subs_hook({TopicId, TopicName, QoS},
case run_hooks(Ctx, 'client.subscribe',
[ClientInfo, #{}], TopicFilters) of
[] ->
?LOG(warning, "Skip to subscribe ~ts, "
"due to 'client.subscribe' denied!", [TopicName]),
?SLOG(warning, #{ msg => "skip_to_subscribe"
, topic_name => TopicName
, reason => "'client.subscribe' filtered it"
}),
{error, ?SN_EXCEED_LIMITATION};
[{NTopicName, NSubOpts}|_] ->
{ok, {TopicId, NTopicName, NSubOpts}, Channel}
@ -879,8 +927,10 @@ do_subscribe({TopicId, TopicName, SubOpts},
{ok, {TopicId, NTopicName, NSubOpts},
Channel#channel{session = NSession}};
{error, ?RC_QUOTA_EXCEEDED} ->
?LOG(warning, "Cannot subscribe ~ts due to ~ts.",
[TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]),
?SLOG(warning, #{ msg => "cannt_subscribe_due_to_quota_exceeded"
, topic_name => TopicName
, reason => emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)
}),
{error, ?SN_EXCEED_LIMITATION}
end.
@ -1185,7 +1235,9 @@ handle_call(discard, _From, Channel) ->
% reply(ok, Channel#channel{quota = Quota});
handle_call(Req, _From, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]),
?SLOG(error, #{ msg => "unexpected_call"
, call => Req
}),
reply(ignored, Channel).
%%--------------------------------------------------------------------
@ -1225,7 +1277,9 @@ handle_info({sock_closed, Reason},
handle_info({sock_closed, Reason},
Channel = #channel{conn_state = disconnected}) ->
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
?SLOG(error, #{ msg => "unexpected_sock_closed"
, reason => Reason
}),
{ok, Channel};
handle_info(clean_authz_cache, Channel) ->
@ -1233,7 +1287,9 @@ handle_info(clean_authz_cache, Channel) ->
{ok, Channel};
handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p", [Info]),
?SLOG(error, #{ msg => "unexpected_info"
, info => Info
}),
{ok, Channel}.
%%--------------------------------------------------------------------
@ -1389,7 +1445,9 @@ handle_timeout(_TRef, expire_asleep, Channel) ->
shutdown(asleep_timeout, Channel);
handle_timeout(_TRef, Msg, Channel) ->
?LOG(error, "Unexpected timeout: ~p~n", [Msg]),
?SLOG(error, #{ msg => "unexpected_timeout"
, timeout_msg => Msg
}),
{ok, Channel}.
%%--------------------------------------------------------------------

View File

@ -22,9 +22,7 @@
-behaviour(gen_server).
-include("emqx_sn.hrl").
-define(LOG(Level, Format, Args),
emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)).
-include_lib("emqx/include/logger.hrl").
-export([ start_link/2
]).
@ -215,15 +213,21 @@ handle_call(name, _From, State = #state{tabname = Tab}) ->
{reply, {Tab, self()}, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected request: ~p", [Req]),
?SLOG(error, #{ msg => "unexpected_call"
, call => Req
}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected msg: ~p", [Msg]),
?SLOG(error, #{ msg => "unexpected_cast"
, cast => Msg
}),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
?SLOG(error, #{ msg => "unexpected_info"
, info => Info
}),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -280,8 +280,11 @@ auth_connect(_Packet, Channel = #channel{ctx = Ctx,
{ok, NClientInfo} ->
{ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} ->
?LOG(warning, "Client ~ts (Username: '~ts') login failed for ~0p",
[ClientId, Username, Reason]),
?SLOG(warning, #{ msg => "client_login_failed"
, clientid => ClientId
, username => Username
, reason => Reason
}),
{error, Reason}
end.
@ -315,7 +318,9 @@ process_connect(Channel = #channel{
{<<"heart-beat">>, reverse_heartbeats(Heartbeat)}],
handle_out(connected, Headers, Channel#channel{session = Session});
{error, Reason} ->
?LOG(error, "Failed to open session du to ~p", [Reason]),
?SLOG(error, #{ msg => "failed_to_open_session"
, reason => Reason
}),
Headers = [{<<"version">>, <<"1.0,1.1,1.2">>},
{<<"content-type">>, <<"text/plain">>}],
handle_out(connerr, {Headers, undefined, <<"Not Authenticated">>}, Channel)
@ -403,8 +408,10 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers),
handle_out(receipt, receipt_id(Headers), NChannel1)
end;
{error, ErrMsg, NChannel} ->
?LOG(error, "Failed to subscribe topic ~ts, reason: ~ts",
[Topic, ErrMsg]),
?SLOG(error, #{ msg => "failed_top_subscribe_topic"
, topic => Topic
, reason => ErrMsg
}),
handle_out(error, {receipt_id(Headers), ErrMsg}, NChannel)
end;
@ -507,7 +514,9 @@ handle_in(?PACKET(?CMD_DISCONNECT, Headers), Channel) ->
shutdown_with_recepit(normal, receipt_id(Headers), Channel);
handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
?LOG(error, "Unexpected frame error: ~p", [Reason]),
?SLOG(error, #{ msg => "unexpected_frame_error"
, reason => Reason
}),
shutdown(Reason, Channel).
with_transaction(Headers, Channel = #channel{transaction = Trans}, Fun) ->
@ -653,8 +662,10 @@ handle_call({subscribe, Topic, SubOpts}, _From,
NChannel1 = NChannel#channel{subscriptions = NSubs},
reply(ok, NChannel1);
{error, ErrMsg, NChannel} ->
?LOG(error, "Failed to subscribe topic ~ts, reason: ~ts",
[Topic, ErrMsg]),
?SLOG(error, #{ msg => "failed_to_subscribe_topic"
, topic => Topic
, reason => ErrMsg
}),
reply({error, ErrMsg}, NChannel)
end
end;
@ -715,7 +726,9 @@ handle_call(list_authz_cache, _From, Channel) ->
% reply(ok, Channel#channel{quota = Quota});
handle_call(Req, _From, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]),
?SLOG(error, #{ msg => "unexpected_call"
, call => Req
}),
reply(ignored, Channel).
%%--------------------------------------------------------------------
@ -755,7 +768,9 @@ handle_info({sock_closed, Reason},
handle_info({sock_closed, Reason},
Channel = #channel{conn_state = disconnected}) ->
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
?SLOG(error, #{ msg => "unexpected_sock_closed"
, reason => Reason
}),
{ok, Channel};
handle_info(clean_authz_cache, Channel) ->
@ -763,7 +778,9 @@ handle_info(clean_authz_cache, Channel) ->
{ok, Channel};
handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p", [Info]),
?SLOG(error, #{ msg => "unexpected_info"
, info => Info
}),
{ok, Channel}.
%%--------------------------------------------------------------------
@ -828,9 +845,10 @@ handle_deliver(Delivers,
},
[Frame|Acc];
false ->
?LOG(error, "Dropped message ~0p due to not found "
"subscription id for ~ts",
[Message, emqx_message:topic(Message)]),
?SLOG(error, #{ msg => "dropped_message_due_to_subscription_not_found"
, message => Message
, topic => emqx_message:topic(Message)
}),
metrics_inc('delivery.dropped', Channel),
metrics_inc('delivery.dropped.no_subid', Channel),
Acc

View File

@ -58,6 +58,7 @@ set_special_cfg(_) ->
ok.
end_per_suite(Config) ->
{ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]),
emqx_common_test_helpers:stop_apps([emqx_gateway]),
Config.

View File

@ -57,6 +57,7 @@ init_per_suite(Config) ->
Config.
end_per_suite(Config) ->
{ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway]),
Config.

View File

@ -60,6 +60,7 @@ init_per_group(GrpName, Cfg) ->
[{servers, Svrs}, {listener_type, GrpName} | Cfg].
end_per_group(_, Cfg) ->
{ok, _} = emqx:remove_config([gateway, exproto]),
emqx_common_test_helpers:stop_apps([emqx_gateway]),
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
@ -68,7 +69,7 @@ set_special_cfg(emqx_gateway) ->
emqx_config:put(
[gateway, exproto],
#{server => #{bind => 9100},
handler => #{address => "http://127.0.0.1:9001"},
handler => #{address => "127.0.0.1:9001"},
listeners => listener_confs(LisType)
});
set_special_cfg(_App) ->

View File

@ -0,0 +1,274 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-import(emqx_gateway_test_utils,
[ assert_confs/2
, assert_feilds_apperence/2
, request/2
, request/3
]).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% Setup
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
%% FIXME: Magic line. for saving gateway schema name for emqx_config
emqx_config:init_load(emqx_gateway_schema, <<"gateway {}">>),
emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
%% Start emqx-authn separately, due to emqx_authn_schema
%% not implementing the roots/0 method, it cannot be started with
%% emqx-ct-helpers at the moment.
{ok, _} = application:ensure_all_started(emqx_authn),
Conf.
end_per_suite(Conf) ->
application:stop(emqx_authn),
emqx_mgmt_api_test_util:end_suite([emqx_gateway]),
Conf.
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
t_gateway(_) ->
{200, Gateways}= request(get, "/gateway"),
lists:foreach(fun assert_gw_unloaded/1, Gateways),
{400, BadReq} = request(get, "/gateway/uname_gateway"),
assert_bad_request(BadReq),
{204, _} = request(post, "/gateway", #{name => <<"stomp">>}),
{200, StompGw1} = request(get, "/gateway/stomp"),
assert_feilds_apperence([name, status, enable, created_at, started_at],
StompGw1),
{204, _} = request(delete, "/gateway/stomp"),
{200, StompGw2} = request(get, "/gateway/stomp"),
assert_gw_unloaded(StompGw2),
ok.
t_gateway_stomp(_) ->
{200, Gw} = request(get, "/gateway/stomp"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{name => <<"stomp">>,
frame => #{max_headers => 5,
max_headers_length => 100,
max_body_length => 100
},
listeners => [
#{name => <<"def">>, type => <<"tcp">>, bind => <<"61613">>}
]
},
{204, _} = request(post, "/gateway", GwConf),
{200, ConfResp} = request(get, "/gateway/stomp"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{frame => #{max_headers => 10}}),
{204, _} = request(put, "/gateway/stomp", maps:without([name], GwConf2)),
{200, ConfResp2} = request(get, "/gateway/stomp"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateway/stomp").
t_gateway_mqttsn(_) ->
{200, Gw} = request(get, "/gateway/mqttsn"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{name => <<"mqttsn">>,
gateway_id => 1,
broadcast => true,
predefined => [#{id => 1, topic => <<"t/a">>}],
enable_qos3 => true,
listeners => [
#{name => <<"def">>, type => <<"udp">>, bind => <<"1884">>}
]
},
{204, _} = request(post, "/gateway", GwConf),
{200, ConfResp} = request(get, "/gateway/mqttsn"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{predefined => []}),
{204, _} = request(put, "/gateway/mqttsn", maps:without([name], GwConf2)),
{200, ConfResp2} = request(get, "/gateway/mqttsn"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateway/mqttsn").
t_gateway_coap(_) ->
{200, Gw} = request(get, "/gateway/coap"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{name => <<"coap">>,
heartbeat => <<"60s">>,
connection_required => true,
listeners => [
#{name => <<"def">>, type => <<"udp">>, bind => <<"5683">>}
]
},
{204, _} = request(post, "/gateway", GwConf),
{200, ConfResp} = request(get, "/gateway/coap"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{heartbeat => <<"10s">>}),
{204, _} = request(put, "/gateway/coap", maps:without([name], GwConf2)),
{200, ConfResp2} = request(get, "/gateway/coap"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateway/coap").
t_gateway_lwm2m(_) ->
{200, Gw} = request(get, "/gateway/lwm2m"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{name => <<"lwm2m">>,
xml_dir => <<"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml">>,
lifetime_min => <<"1s">>,
lifetime_max => <<"1000s">>,
qmode_time_window => <<"30s">>,
auto_observe => true,
translators => #{
command => #{ topic => <<"dn/#">>},
response => #{ topic => <<"up/resp">>},
notify => #{ topic => <<"up/resp">>},
register => #{ topic => <<"up/resp">>},
update => #{ topic => <<"up/resp">>}
},
listeners => [
#{name => <<"def">>, type => <<"udp">>, bind => <<"5783">>}
]
},
{204, _} = request(post, "/gateway", GwConf),
{200, ConfResp} = request(get, "/gateway/lwm2m"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{qmode_time_window => <<"10s">>}),
{204, _} = request(put, "/gateway/lwm2m", maps:without([name], GwConf2)),
{200, ConfResp2} = request(get, "/gateway/lwm2m"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateway/lwm2m").
t_gateway_exproto(_) ->
{200, Gw} = request(get, "/gateway/exproto"),
assert_gw_unloaded(Gw),
%% post
GwConf = #{name => <<"exproto">>,
server => #{bind => <<"9100">>},
handler => #{address => <<"127.0.0.1:9001">>},
listeners => [
#{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>}
]
},
{204, _} = request(post, "/gateway", GwConf),
{200, ConfResp} = request(get, "/gateway/exproto"),
assert_confs(GwConf, ConfResp),
%% put
GwConf2 = emqx_map_lib:deep_merge(GwConf, #{server => #{bind => <<"9200">>}}),
{204, _} = request(put, "/gateway/exproto", maps:without([name], GwConf2)),
{200, ConfResp2} = request(get, "/gateway/exproto"),
assert_confs(GwConf2, ConfResp2),
{204, _} = request(delete, "/gateway/exproto").
t_authn(_) ->
GwConf = #{name => <<"stomp">>},
{204, _} = request(post, "/gateway", GwConf),
{204, _} = request(get, "/gateway/stomp/authentication"),
AuthConf = #{mechanism => <<"password-based">>,
backend => <<"built-in-database">>,
user_id_type => <<"clientid">>
},
{204, _} = request(post, "/gateway/stomp/authentication", AuthConf),
{200, ConfResp} = request(get, "/gateway/stomp/authentication"),
assert_confs(AuthConf, ConfResp),
AuthConf2 = maps:merge(AuthConf, #{user_id_type => <<"username">>}),
{204, _} = request(put, "/gateway/stomp/authentication", AuthConf2),
{200, ConfResp2} = request(get, "/gateway/stomp/authentication"),
assert_confs(AuthConf2, ConfResp2),
{204, _} = request(delete, "/gateway/stomp/authentication"),
{204, _} = request(get, "/gateway/stomp/authentication"),
{204, _} = request(delete, "/gateway/stomp").
t_listeners(_) ->
GwConf = #{name => <<"stomp">>},
{204, _} = request(post, "/gateway", GwConf),
{404, _} = request(get, "/gateway/stomp/listeners"),
LisConf = #{name => <<"def">>,
type => <<"tcp">>,
bind => <<"61613">>
},
{204, _} = request(post, "/gateway/stomp/listeners", LisConf),
{200, ConfResp} = request(get, "/gateway/stomp/listeners"),
assert_confs([LisConf], ConfResp),
{200, ConfResp1} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"),
assert_confs(LisConf, ConfResp1),
LisConf2 = maps:merge(LisConf, #{bind => <<"61614">>}),
{204, _} = request(
put,
"/gateway/stomp/listeners/stomp:tcp:def",
LisConf2
),
{200, ConfResp2} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"),
assert_confs(LisConf2, ConfResp2),
{204, _} = request(delete, "/gateway/stomp/listeners/stomp:tcp:def"),
{404, _} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"),
{204, _} = request(delete, "/gateway/stomp").
t_listeners_authn(_) ->
GwConf = #{name => <<"stomp">>,
listeners => [
#{name => <<"def">>,
type => <<"tcp">>,
bind => <<"61613">>
}]},
{204, _} = request(post, "/gateway", GwConf),
{200, ConfResp} = request(get, "/gateway/stomp"),
assert_confs(GwConf, ConfResp),
AuthConf = #{mechanism => <<"password-based">>,
backend => <<"built-in-database">>,
user_id_type => <<"clientid">>
},
Path = "/gateway/stomp/listeners/stomp:tcp:def/authentication",
{204, _} = request(post, Path, AuthConf),
{200, ConfResp2} = request(get, Path),
assert_confs(AuthConf, ConfResp2),
AuthConf2 = maps:merge(AuthConf, #{user_id_type => <<"username">>}),
{204, _} = request(put, Path, AuthConf2),
{200, ConfResp3} = request(get, Path),
assert_confs(AuthConf2, ConfResp3),
{204, _} = request(delete, "/gateway/stomp").
%%--------------------------------------------------------------------
%% Asserts
assert_gw_unloaded(Gateway) ->
?assertEqual(<<"unloaded">>, maps:get(status, Gateway)).
assert_bad_request(BadReq) ->
?assertEqual(<<"BAD_REQUEST">>, maps:get(code, BadReq)).

View File

@ -19,6 +19,11 @@
-compile(export_all).
-compile(nowarn_export_all).
-import(emqx_gateway_test_utils,
[ assert_confs/2
, maybe_unconvert_listeners/1
]).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
@ -32,9 +37,11 @@ init_per_suite(Conf) ->
%% FIXME: Magic line. for saving gateway schema name for emqx_config
emqx_config:init_load(emqx_gateway_schema, <<"gateway {}">>),
emqx_common_test_helpers:start_apps([emqx_gateway]),
{ok, _} = application:ensure_all_started(emqx_authn),
Conf.
end_per_suite(_Conf) ->
application:stop(emqx_authn),
emqx_common_test_helpers:stop_apps([emqx_gateway]).
init_per_testcase(_CaseName, Conf) ->
@ -228,33 +235,3 @@ compose_listener_authn(Basic, Listener, Authn) ->
listener(L) ->
#{<<"listeners">> => [L#{<<"type">> => <<"tcp">>,
<<"name">> => <<"default">>}]}.
assert_confs(Expected0, Effected) ->
Expected = maybe_unconvert_listeners(Expected0),
case do_assert_confs(Expected, Effected) of
false ->
io:format(standard_error, "Expected config: ~p,\n"
"Effected config: ~p",
[Expected, Effected]),
exit(conf_not_match);
true ->
ok
end.
do_assert_confs(Expected, Effected) when is_map(Expected),
is_map(Effected) ->
Ks1 = maps:keys(Expected),
lists:all(fun(K) ->
do_assert_confs(maps:get(K, Expected),
maps:get(K, Effected, undefined))
end, Ks1);
do_assert_confs(Expected, Effected) ->
Expected =:= Effected.
maybe_unconvert_listeners(Conf) ->
case maps:take(<<"listeners">>, Conf) of
error -> Conf;
{Ls, Conf1} ->
Conf1#{<<"listeners">> =>
emqx_gateway_conf:unconvert_listeners(Ls)}
end.

View File

@ -0,0 +1,112 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_test_utils).
-compile(export_all).
-compile(nowarn_export_all).
assert_confs(Expected0, Effected) ->
Expected = maybe_unconvert_listeners(Expected0),
case do_assert_confs(Expected, Effected) of
false ->
io:format(standard_error, "Expected config: ~p,\n"
"Effected config: ~p",
[Expected, Effected]),
exit(conf_not_match);
true ->
ok
end.
do_assert_confs(Expected, Effected) when is_map(Expected),
is_map(Effected) ->
Ks1 = maps:keys(Expected),
lists:all(fun(K) ->
do_assert_confs(maps:get(K, Expected),
maps:get(K, Effected, undefined))
end, Ks1);
do_assert_confs([Expected|More1], [Effected|More2]) ->
do_assert_confs(Expected, Effected) andalso do_assert_confs(More1, More2);
do_assert_confs([], []) ->
true;
do_assert_confs(Expected, Effected) ->
Res = Expected =:= Effected,
Res == false andalso
ct:pal("Errors: conf not match, "
"expected: ~p, got: ~p~n", [Expected, Effected]),
Res.
maybe_unconvert_listeners(Conf) when is_map(Conf) ->
case maps:take(<<"listeners">>, Conf) of
error -> Conf;
{Ls, Conf1} ->
Conf1#{<<"listeners">> =>
emqx_gateway_conf:unconvert_listeners(Ls)}
end;
maybe_unconvert_listeners(Conf) ->
Conf.
assert_feilds_apperence(Ks, Map) ->
lists:foreach(fun(K) ->
_ = maps:get(K, Map)
end, Ks).
%%--------------------------------------------------------------------
%% http
-define(http_api_host, "http://127.0.0.1:18083/api/v5").
-define(default_user, "admin").
-define(default_pass, "public").
request(delete = Mth, Path) ->
do_request(Mth, req(Path, []));
request(get = Mth, Path) ->
do_request(Mth, req(Path, [])).
request(get = Mth, Path, Qs) ->
do_request(Mth, req(Path, Qs));
request(put = Mth, Path, Body) ->
do_request(Mth, req(Path, [], Body));
request(post = Mth, Path, Body) ->
do_request(Mth, req(Path, [], Body)).
do_request(Mth, Req) ->
case httpc:request(Mth, Req, [], [{body_format, binary}]) of
{ok, {{_Vsn, Code, _Text}, _, Resp}} ->
NResp = case Resp of
<<>> -> #{};
_ ->
emqx_map_lib:unsafe_atom_key_map(
emqx_json:decode(Resp, [return_maps]))
end,
{Code, NResp};
{error, Reason} ->
error({failed_to_request, Reason})
end.
req(Path, Qs) ->
{url(Path, Qs), auth([])}.
req(Path, Qs, Body) ->
{url(Path, Qs), auth([]), "application/json", emqx_json:encode(Body)}.
url(Path, Qs) ->
lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]).
auth(Headers) ->
Token = base64:encode(?default_user ++ ":" ++ ?default_pass),
[{"Authorization", "Basic " ++ binary_to_list(Token)}] ++ Headers.

View File

@ -87,6 +87,7 @@ init_per_suite(Config) ->
Config.
end_per_suite(_) ->
{ok, _} = emqx:remove_config([gateway, mqttsn]),
emqx_common_test_helpers:stop_apps([emqx_gateway]).
%%--------------------------------------------------------------------

View File

@ -16,11 +16,18 @@
-module(emqx_stomp_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_gateway/src/stomp/include/emqx_stomp.hrl").
-compile(export_all).
-compile(nowarn_export_all).
-import(emqx_gateway_test_utils,
[ assert_feilds_apperence/2
, request/2
, request/3
]).
-define(HEARTBEAT, <<$\n>>).
-define(CONF_DEFAULT, <<"
@ -43,11 +50,11 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Cfg) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_gateway]),
emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
Cfg.
end_per_suite(_Cfg) ->
emqx_common_test_helpers:stop_apps([emqx_gateway]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway]),
ok.
%%--------------------------------------------------------------------
@ -339,6 +346,67 @@ t_ack(_) ->
body = _}, _, _} = parse(Data4)
end).
t_rest_clienit_info(_) ->
with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _,
body = _}, _, _} = parse(Data),
%% client lists
{200, Clients} = request(get, "/gateway/stomp/clients"),
?assertEqual(1, length(maps:get(data, Clients))),
StompClient = lists:nth(1, maps:get(data, Clients)),
ClientId = maps:get(clientid, StompClient),
ClientPath = "/gateway/stomp/clients/"
++ binary_to_list(ClientId),
{200, StompClient1} = request(get, ClientPath),
?assertEqual(StompClient, StompClient1),
assert_feilds_apperence(
[proto_name, awaiting_rel_max, inflight_cnt, disconnected_at,
send_msg, heap_size, connected, recv_cnt, send_pkt, mailbox_len,
username, recv_pkt, expiry_interval, clientid, mqueue_max,
send_oct, ip_address, is_bridge, awaiting_rel_cnt, mqueue_dropped,
mqueue_len, node, inflight_max, reductions, subscriptions_max,
connected_at, keepalive, created_at, clean_start,
subscriptions_cnt, recv_msg, send_cnt, proto_ver, recv_oct
], StompClient),
%% sub & unsub
{200, []} = request(get, ClientPath ++ "/subscriptions"),
gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>,
[{<<"id">>, 0},
{<<"destination">>, <<"/queue/foo">>},
{<<"ack">>, <<"client">>}])),
timer:sleep(100),
{200, Subs} = request(get, ClientPath ++ "/subscriptions"),
?assertEqual(1, length(Subs)),
assert_feilds_apperence([topic, qos], lists:nth(1, Subs)),
{204, _} = request(post, ClientPath ++ "/subscriptions",
#{topic => <<"t/a">>, qos => 1,
sub_props => #{subid => <<"1001">>}}),
{200, Subs1} = request(get, ClientPath ++ "/subscriptions"),
?assertEqual(2, length(Subs1)),
{204, _} = request(delete, ClientPath ++ "/subscriptions/t%2Fa"),
{200, Subs2} = request(get, ClientPath ++ "/subscriptions"),
?assertEqual(1, length(Subs2)),
%% kickout
{204, _} = request(delete, ClientPath),
{200, Clients2} = request(get, "/gateway/stomp/clients"),
?assertEqual(0, length(maps:get(data, Clients2)))
end).
%% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride,
%% Listeners, Metrics, Stats, ClientInfo
%%

View File

@ -88,9 +88,17 @@ roots() ->
})}
, {"authorization",
sc(hoconsc:ref("authorization"),
#{ desc => "In EMQ X, MQTT client access control can be just a few "
"lines of text based rules, or delegated to an external "
"HTTP API, or base externa database query results."
#{ desc => """
Authorization a.k.a ACL.<br>
In EMQ X, MQTT client access control is extremly flexible.<br>
An out of the box set of authorization data sources are supported.
For example,<br>
'file' source is to support concise and yet generic ACL rules in a file;<br>
'built-in-database' source can be used to store per-client customisable rule sets,
natively in the EMQ X node;<br>
'http' source to make EMQ X call an external HTTP API to make the decision;<br>
'postgresql' etc. to look up clients or rules from external databases;<br>
"""
})}
] ++
emqx_schema:roots(medium) ++

View File

@ -280,9 +280,9 @@ pick_params_to_qs([], _, Acc1, Acc2) ->
NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)],
{lists:reverse(Acc1), lists:reverse(NAcc2)};
pick_params_to_qs([{Key, Value}|Params], QsKits, Acc1, Acc2) ->
case proplists:get_value(Key, QsKits) of
undefined -> pick_params_to_qs(Params, QsKits, Acc1, Acc2);
pick_params_to_qs([{Key, Value} | Params], QsSchema, Acc1, Acc2) ->
case proplists:get_value(Key, QsSchema) of
undefined -> pick_params_to_qs(Params, QsSchema, Acc1, Acc2);
Type ->
case Key of
<<Prefix:4/binary, NKey/binary>>
@ -294,16 +294,16 @@ pick_params_to_qs([{Key, Value}|Params], QsKits, Acc1, Acc2) ->
end,
case lists:keytake(OpposeKey, 1, Params) of
false ->
pick_params_to_qs(Params, QsKits, [qs(Key, Value, Type) | Acc1], Acc2);
pick_params_to_qs(Params, QsSchema, [qs(Key, Value, Type) | Acc1], Acc2);
{value, {K2, V2}, NParams} ->
pick_params_to_qs(NParams, QsKits, [qs(Key, Value, K2, V2, Type) | Acc1], Acc2)
pick_params_to_qs(NParams, QsSchema, [qs(Key, Value, K2, V2, Type) | Acc1], Acc2)
end;
_ ->
case is_fuzzy_key(Key) of
true ->
pick_params_to_qs(Params, QsKits, Acc1, [qs(Key, Value, Type) | Acc2]);
pick_params_to_qs(Params, QsSchema, Acc1, [qs(Key, Value, Type) | Acc2]);
_ ->
pick_params_to_qs(Params, QsKits, [qs(Key, Value, Type) | Acc1], Acc2)
pick_params_to_qs(Params, QsSchema, [qs(Key, Value, Type) | Acc1], Acc2)
end
end

View File

@ -55,7 +55,7 @@
[ {<<"node">>, atom}
, {<<"username">>, binary}
, {<<"zone">>, atom}
, {<<"ip_address">>, ip_port}
, {<<"ip_address">>, ip}
, {<<"conn_state">>, atom}
, {<<"clean_start">>, atom}
, {<<"proto_name">>, binary}
@ -114,6 +114,7 @@ properties(client) ->
{inflight_cnt, integer, <<"Current length of inflight">>},
{inflight_max, integer, <<"v4 api name [max_inflight]. Maximum length of inflight">>},
{ip_address, string , <<"Client's IP address">>},
{port, integer, <<"Client's port">>},
{is_bridge, boolean, <<"Indicates whether the client is connectedvia bridge">>},
{keepalive, integer, <<"keepalive time, with the unit of second">>},
{mailbox_len, integer, <<"Process mailbox size">>},
@ -189,7 +190,7 @@ clients_api() ->
name => ip_address,
in => query,
required => false,
description => <<"IP address">>,
description => <<"Client's IP address">>,
schema => #{type => string}
},
#{
@ -602,7 +603,7 @@ ms(zone, X) ->
ms(conn_state, X) ->
#{conn_state => X};
ms(ip_address, X) ->
#{conninfo => #{peername => X}};
#{conninfo => #{peername => {X, '_'}}};
ms(clean_start, X) ->
#{conninfo => #{clean_start => X}};
ms(proto_name, X) ->
@ -643,12 +644,13 @@ format_channel_info({_, ClientInfo, ClientStats}) ->
StatsMap = maps:without([memory, next_pkt_id, total_heap_size],
maps:from_list(ClientStats)),
ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo),
IpAddress = peer_to_binary(maps:get(peername, ClientInfoMap0)),
{IpAddress, Port} = peername_dispart(maps:get(peername, ClientInfoMap0)),
Connected = maps:get(conn_state, ClientInfoMap0) =:= connected,
ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0),
ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1),
ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap3),
ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap4),
RemoveList =
[ auth_result
, peername
@ -692,12 +694,11 @@ result_format_time_fun(Key, NClientInfoMap) ->
#{} -> NClientInfoMap
end.
peer_to_binary({Addr, Port}) ->
-spec(peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}).
peername_dispart({Addr, Port}) ->
AddrBinary = list_to_binary(inet:ntoa(Addr)),
PortBinary = integer_to_binary(Port),
<<AddrBinary/binary, ":", PortBinary/binary>>;
peer_to_binary(Addr) ->
list_to_binary(inet:ntoa(Addr)).
%% PortBinary = integer_to_binary(Port),
{AddrBinary, Port}.
format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
#{ access => PubSub,

View File

@ -22,7 +22,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(emqx_ct_http, [ request_api/3
-import(emqx_common_test_http, [ request_api/3
, request_api/5
, get_http_data/1
, create_default_app/0

0
log/emqx.log.1 Normal file
View File

BIN
log/emqx.log.idx Normal file

Binary file not shown.

BIN
log/emqx.log.siz Normal file

Binary file not shown.

View File

@ -16,7 +16,7 @@ bcrypt() ->
{bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}.
quicer() ->
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.8"}}}.
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}.
deps(Config) ->
{deps, OldDeps} = lists:keyfind(deps, 1, Config),