Merge branch 'release-5.0-beta.3' into trace-formatter

This commit is contained in:
zhongwencool 2021-12-29 22:37:59 +08:00 committed by GitHub
commit 43141dffee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 268 additions and 168 deletions

View File

@ -11,13 +11,13 @@
{deps, {deps,
[ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.2"}}} [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.2"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.6"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.1"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}}

View File

@ -264,6 +264,7 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "failed_to_load_hocon_conf", ?SLOG(error, #{msg => "failed_to_load_hocon_conf",
reason => Reason, reason => Reason,
pwd => file:get_cwd(),
include_dirs => IncDir include_dirs => IncDir
}), }),
error(failed_to_load_hocon_conf) error(failed_to_load_hocon_conf)

View File

@ -38,7 +38,6 @@
-type ip_port() :: tuple(). -type ip_port() :: tuple().
-type cipher() :: map(). -type cipher() :: map().
-type rfc3339_system_time() :: integer(). -type rfc3339_system_time() :: integer().
-type unicode_binary() :: binary().
-typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
@ -52,7 +51,6 @@
-typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}). -typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}).
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
-typerefl_from_string({rfc3339_system_time/0, emqx_schema, rfc3339_to_system_time}). -typerefl_from_string({rfc3339_system_time/0, emqx_schema, rfc3339_to_system_time}).
-typerefl_from_string({unicode_binary/0, emqx_schema, to_unicode_binary}).
-export([ validate_heap_size/1 -export([ validate_heap_size/1
, parse_user_lookup_fun/1 , parse_user_lookup_fun/1
@ -66,8 +64,7 @@
to_bar_separated_list/1, to_ip_port/1, to_bar_separated_list/1, to_ip_port/1,
to_erl_cipher_suite/1, to_erl_cipher_suite/1,
to_comma_separated_atoms/1, to_comma_separated_atoms/1,
rfc3339_to_system_time/1, rfc3339_to_system_time/1]).
to_unicode_binary/1]).
-behaviour(hocon_schema). -behaviour(hocon_schema).
@ -76,8 +73,7 @@
comma_separated_list/0, bar_separated_list/0, ip_port/0, comma_separated_list/0, bar_separated_list/0, ip_port/0,
cipher/0, cipher/0,
comma_separated_atoms/0, comma_separated_atoms/0,
rfc3339_system_time/0, rfc3339_system_time/0]).
unicode_binary/0]).
-export([namespace/0, roots/0, roots/1, fields/1]). -export([namespace/0, roots/0, roots/1, fields/1]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]).
@ -1407,9 +1403,6 @@ rfc3339_to_system_time(DateTime) ->
{error, bad_rfc3339_timestamp} {error, bad_rfc3339_timestamp}
end. end.
to_unicode_binary(Str) ->
{ok, unicode:characters_to_binary(Str)}.
to_bar_separated_list(Str) -> to_bar_separated_list(Str) ->
{ok, string:tokens(Str, "| ")}. {ok, string:tokens(Str, "| ")}.

View File

@ -106,7 +106,7 @@ authenticate(#{password := Password} = Credential,
resource_id := ResourceId, resource_id := ResourceId,
password_hash_algorithm := Algorithm}) -> password_hash_algorithm := Algorithm}) ->
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
case emqx_resource:query(ResourceId, {sql, Query, Params}) of case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Query, Params}) of
{ok, _Columns, []} -> ignore; {ok, _Columns, []} -> ignore;
{ok, Columns, [Row | _]} -> {ok, Columns, [Row | _]} ->
NColumns = [Name || #column{name = Name} <- Columns], NColumns = [Name || #column{name = Name} <- Columns],

View File

@ -441,12 +441,12 @@ create_user(Values) ->
q(Sql) -> q(Sql) ->
emqx_resource:query( emqx_resource:query(
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
{sql, Sql}). {query, Sql}).
q(Sql, Params) -> q(Sql, Params) ->
emqx_resource:query( emqx_resource:query(
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
{sql, Sql, Params}). {query, Sql, Params}).
drop_seeds() -> drop_seeds() ->
{ok, _, _} = q("DROP TABLE IF EXISTS users"), {ok, _, _} = q("DROP TABLE IF EXISTS users"),

View File

@ -31,9 +31,7 @@
, lookup/0 , lookup/0
, lookup/1 , lookup/1
, move/2 , move/2
, move/3
, update/2 , update/2
, update/3
, authorize/5 , authorize/5
]). ]).
@ -110,28 +108,19 @@ lookup(Type) ->
{Source, _Front, _Rear} = take(Type), {Source, _Front, _Rear} = take(Type),
Source. Source.
move(Type, Cmd) -> move(Type, #{<<"before">> := Before}) ->
move(Type, Cmd, #{}). emqx_authz_utils:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), ?CMD_MOVE_BEFORE(type(Before))});
move(Type, #{<<"after">> := After}) ->
move(Type, #{<<"before">> := Before}, Opts) -> emqx_authz_utils:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), ?CMD_MOVE_AFTER(type(After))});
emqx:update_config( ?CONF_KEY_PATH move(Type, Position) ->
, {?CMD_MOVE, type(Type), ?CMD_MOVE_BEFORE(type(Before))}, Opts); emqx_authz_utils:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), Position}).
move(Type, #{<<"after">> := After}, Opts) ->
emqx:update_config( ?CONF_KEY_PATH
, {?CMD_MOVE, type(Type), ?CMD_MOVE_AFTER(type(After))}, Opts);
move(Type, Position, Opts) ->
emqx:update_config( ?CONF_KEY_PATH
, {?CMD_MOVE, type(Type), Position}, Opts).
update({?CMD_REPLACE, Type}, Sources) ->
emqx_authz_utils:update_config(?CONF_KEY_PATH, {{?CMD_REPLACE, type(Type)}, Sources});
update({?CMD_DELETE, Type}, Sources) ->
emqx_authz_utils:update_config(?CONF_KEY_PATH, {{?CMD_DELETE, type(Type)}, Sources});
update(Cmd, Sources) -> update(Cmd, Sources) ->
update(Cmd, Sources, #{}). emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}).
update({?CMD_REPLACE, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{?CMD_REPLACE, type(Type)}, Sources}, Opts);
update({?CMD_DELETE, Type}, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {{?CMD_DELETE, type(Type)}, Sources}, Opts);
update(Cmd, Sources, Opts) ->
emqx:update_config(?CONF_KEY_PATH, {Cmd, Sources}, Opts).
do_update({?CMD_MOVE, Type, ?CMD_MOVE_TOP}, Conf) when is_list(Conf) -> do_update({?CMD_MOVE, Type, ?CMD_MOVE_TOP}, Conf) when is_list(Conf) ->
{Source, Front, Rear} = take(Type, Conf), {Source, Front, Rear} = take(Type, Conf),
@ -155,8 +144,8 @@ do_update({?CMD_APPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
NConf = Conf ++ Sources, NConf = Conf ++ Sources,
ok = check_dup_types(NConf), ok = check_dup_types(NConf),
NConf; NConf;
do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source}, Conf) when is_map(Source), do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source}, Conf)
is_list(Conf) -> when is_map(Source), is_list(Conf) ->
case create_dry_run(Type, Source) of case create_dry_run(Type, Source) of
ok -> ok ->
{_Old, Front, Rear} = take(Type, Conf), {_Old, Front, Rear} = take(Type, Conf),
@ -165,7 +154,8 @@ do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source}, Conf) when i
NConf; NConf;
{error, _} = Error -> Error {error, _} = Error -> Error
end; end;
do_update({{?CMD_REPLACE, Type}, Source}, Conf) when is_map(Source), is_list(Conf) -> do_update({{?CMD_REPLACE, Type}, Source}, Conf)
when is_map(Source), is_list(Conf) ->
{_Old, Front, Rear} = take(Type, Conf), {_Old, Front, Rear} = take(Type, Conf),
NConf = Front ++ [Source | Rear], NConf = Front ++ [Source | Rear],
ok = check_dup_types(NConf), ok = check_dup_types(NConf),

View File

@ -54,8 +54,9 @@ settings(get, _Params) ->
settings(put, #{body := #{<<"no_match">> := NoMatch, settings(put, #{body := #{<<"no_match">> := NoMatch,
<<"deny_action">> := DenyAction, <<"deny_action">> := DenyAction,
<<"cache">> := Cache}}) -> <<"cache">> := Cache}}) ->
{ok, _} = emqx:update_config([authorization, no_match], NoMatch), {ok, _} = emqx_authz_utils:update_config([authorization, no_match], NoMatch),
{ok, _} = emqx:update_config([authorization, deny_action], DenyAction), {ok, _} = emqx_authz_utils:update_config(
{ok, _} = emqx:update_config([authorization, cache], Cache), [authorization, deny_action], DenyAction),
{ok, _} = emqx_authz_utils:update_config([authorization, cache], Cache),
ok = emqx_authz_cache:drain_cache(), ok = emqx_authz_cache:drain_cache(),
{200, authorization_settings()}. {200, authorization_settings()}.

View File

@ -48,12 +48,12 @@ init(#{url := Url} = Source) ->
end. end.
destroy(#{annotations := #{id := Id}}) -> destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove(Id). ok = emqx_resource:remove_local(Id).
dry_run(Source) -> dry_run(Source) ->
URIMap = maps:get(url, Source), URIMap = maps:get(url, Source),
NSource = maps:put(base_url, maps:remove(query, URIMap), Source), NSource = maps:put(base_url, maps:remove(query, URIMap), Source),
emqx_resource:create_dry_run(emqx_connector_http, NSource). emqx_resource:create_dry_run_local(emqx_connector_http, NSource).
authorize(Client, PubSub, Topic, authorize(Client, PubSub, Topic,
#{type := http, #{type := http,

View File

@ -46,10 +46,10 @@ init(Source) ->
end. end.
dry_run(Source) -> dry_run(Source) ->
emqx_resource:create_dry_run(emqx_connector_mongo, Source). emqx_resource:create_dry_run_local(emqx_connector_mongo, Source).
destroy(#{annotations := #{id := Id}}) -> destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove(Id). ok = emqx_resource:remove_local(Id).
authorize(Client, PubSub, Topic, authorize(Client, PubSub, Topic,
#{collection := Collection, #{collection := Collection,

View File

@ -48,10 +48,10 @@ init(#{query := SQL} = Source) ->
end. end.
dry_run(Source) -> dry_run(Source) ->
emqx_resource:create_dry_run(emqx_connector_mysql, Source). emqx_resource:create_dry_run_local(emqx_connector_mysql, Source).
destroy(#{annotations := #{id := Id}}) -> destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove(Id). ok = emqx_resource:remove_local(Id).
authorize(Client, PubSub, Topic, authorize(Client, PubSub, Topic,
#{annotations := #{id := ResourceID, #{annotations := #{id := ResourceID,

View File

@ -48,10 +48,10 @@ init(#{query := SQL} = Source) ->
end. end.
destroy(#{annotations := #{id := Id}}) -> destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove(Id). ok = emqx_resource:remove_local(Id).
dry_run(Source) -> dry_run(Source) ->
emqx_resource:create_dry_run(emqx_connector_pgsql, Source). emqx_resource:create_dry_run_local(emqx_connector_pgsql, Source).
parse_query(Sql) -> parse_query(Sql) ->
case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of
@ -73,7 +73,7 @@ authorize(Client, PubSub, Topic,
query := {Query, Params} query := {Query, Params}
} }
}) -> }) ->
case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of case emqx_resource:query(ResourceID, {prepared_query, ResourceID, Query, replvar(Params, Client)}) of
{ok, _Columns, []} -> nomatch; {ok, _Columns, []} -> nomatch;
{ok, Columns, Rows} -> {ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows); do_authorize(Client, PubSub, Topic, Columns, Rows);

View File

@ -46,10 +46,10 @@ init(Source) ->
end. end.
destroy(#{annotations := #{id := Id}}) -> destroy(#{annotations := #{id := Id}}) ->
ok = emqx_resource:remove(Id). ok = emqx_resource:remove_local(Id).
dry_run(Source) -> dry_run(Source) ->
emqx_resource:create_dry_run(emqx_connector_redis, Source). emqx_resource:create_dry_run_local(emqx_connector_redis, Source).
authorize(Client, PubSub, Topic, authorize(Client, PubSub, Topic,
#{cmd := CMD, #{cmd := CMD,

View File

@ -18,9 +18,11 @@
-include_lib("emqx/include/emqx_placeholder.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl").
-export([cleanup_resources/0, -export([ cleanup_resources/0
make_resource_id/1, , make_resource_id/1
create_resource/2]). , create_resource/2
, update_config/2
]).
-define(RESOURCE_GROUP, <<"emqx_authz">>). -define(RESOURCE_GROUP, <<"emqx_authz">>).
@ -30,7 +32,7 @@
create_resource(Module, Config) -> create_resource(Module, Config) ->
ResourceID = make_resource_id(Module), ResourceID = make_resource_id(Module),
case emqx_resource:create(ResourceID, Module, Config) of case emqx_resource:create_local(ResourceID, Module, Config) of
{ok, already_created} -> {ok, ResourceID}; {ok, already_created} -> {ok, ResourceID};
{ok, _} -> {ok, ResourceID}; {ok, _} -> {ok, ResourceID};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
@ -38,13 +40,17 @@ create_resource(Module, Config) ->
cleanup_resources() -> cleanup_resources() ->
lists:foreach( lists:foreach(
fun emqx_resource:remove/1, fun emqx_resource:remove_local/1,
emqx_resource:list_group_instances(?RESOURCE_GROUP)). emqx_resource:list_group_instances(?RESOURCE_GROUP)).
make_resource_id(Name) -> make_resource_id(Name) ->
NameBin = bin(Name), NameBin = bin(Name),
emqx_resource:generate_id(?RESOURCE_GROUP, NameBin). emqx_resource:generate_id(?RESOURCE_GROUP, NameBin).
update_config(Path, ConfigRequest) ->
emqx_conf:update(Path, ConfigRequest, #{rawconf_with_defaults => true,
override_to => cluster}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -31,10 +31,9 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, remove_local, fun(_) -> ok end),
meck:expect(emqx_resource, remove, fun(_) -> ok end), meck:expect(emqx_resource, create_dry_run_local, fun(_, _) -> ok end),
meck:expect(emqx_resource, create_dry_run, fun(_, _) -> ok end),
ok = emqx_common_test_helpers:start_apps( ok = emqx_common_test_helpers:start_apps(
[emqx_connector, emqx_conf, emqx_authz], [emqx_connector, emqx_conf, emqx_authz],

View File

@ -96,14 +96,13 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, create_dry_run, meck:expect(emqx_resource, create_dry_run_local,
fun(emqx_connector_mysql, _) -> {ok, meck_data}; fun(emqx_connector_mysql, _) -> {ok, meck_data};
(T, C) -> meck:passthrough([T, C]) (T, C) -> meck:passthrough([T, C])
end), end),
meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, health_check, fun(_) -> ok end), meck:expect(emqx_resource, health_check, fun(_) -> ok end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove_local, fun(_) -> ok end ),
ok = emqx_common_test_helpers:start_apps( ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz, emqx_dashboard], [emqx_conf, emqx_authz, emqx_dashboard],

View File

@ -30,8 +30,8 @@ groups() ->
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove_local, fun(_) -> ok end ),
ok = emqx_common_test_helpers:start_apps( ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz], [emqx_conf, emqx_authz],

View File

@ -228,12 +228,12 @@ raw_pgsql_authz_config() ->
q(Sql) -> q(Sql) ->
emqx_resource:query( emqx_resource:query(
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
{sql, Sql}). {query, Sql}).
insert(Sql, Params) -> insert(Sql, Params) ->
{ok, _} = emqx_resource:query( {ok, _} = emqx_resource:query(
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
{sql, Sql, Params}), {query, Sql, Params}),
ok. ok.
init_table() -> init_table() ->

View File

@ -87,6 +87,7 @@ basic_config() ->
, {direction, , {direction,
mk(egress, mk(egress,
#{ desc => "The direction of this bridge, MUST be egress" #{ desc => "The direction of this bridge, MUST be egress"
, default => egress
})} })}
] ]
++ proplists:delete(base_url, emqx_connector_http:fields(config)). ++ proplists:delete(base_url, emqx_connector_http:fields(config)).

View File

@ -71,6 +71,7 @@ metrics_status_fields() ->
direction_field(Dir, Desc) -> direction_field(Dir, Desc) ->
{direction, mk(Dir, {direction, mk(Dir,
#{ nullable => false #{ nullable => false
, default => egress
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>" , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>"
++ Desc ++ Desc
})}. })}.

View File

@ -47,7 +47,7 @@ groups() ->
[]. [].
suite() -> suite() ->
[{timetrap,{seconds,30}}]. [{timetrap,{seconds,60}}].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_config:put([emqx_dashboard], #{ ok = emqx_config:put([emqx_dashboard], #{
@ -84,7 +84,7 @@ start_http_server(HandleFun) ->
spawn_link(fun() -> spawn_link(fun() ->
{Port, Sock} = listen_on_random_port(), {Port, Sock} = listen_on_random_port(),
Parent ! {port, Port}, Parent ! {port, Port},
loop(Sock, HandleFun) loop(Sock, HandleFun, Parent)
end), end),
receive receive
{port, Port} -> Port {port, Port} -> Port
@ -95,40 +95,49 @@ start_http_server(HandleFun) ->
listen_on_random_port() -> listen_on_random_port() ->
Min = 1024, Max = 65000, Min = 1024, Max = 65000,
Port = rand:uniform(Max - Min) + Min, Port = rand:uniform(Max - Min) + Min,
case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}]) of case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}, binary]) of
{ok, Sock} -> {Port, Sock}; {ok, Sock} -> {Port, Sock};
{error, eaddrinuse} -> listen_on_random_port() {error, eaddrinuse} -> listen_on_random_port()
end. end.
loop(Sock, HandleFun) -> loop(Sock, HandleFun, Parent) ->
{ok, Conn} = gen_tcp:accept(Sock), {ok, Conn} = gen_tcp:accept(Sock),
Handler = spawn(fun () -> HandleFun(Conn) end), Handler = spawn(fun () -> HandleFun(Conn, Parent) end),
gen_tcp:controlling_process(Conn, Handler), gen_tcp:controlling_process(Conn, Handler),
loop(Sock, HandleFun). loop(Sock, HandleFun, Parent).
make_response(CodeStr, Str) -> make_response(CodeStr, Str) ->
B = iolist_to_binary(Str), B = iolist_to_binary(Str),
iolist_to_binary( iolist_to_binary(
io_lib:fwrite( io_lib:fwrite(
"HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s", "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s",
[CodeStr, size(B), B])). [CodeStr, size(B), B])).
handle_fun_200_ok(Conn) -> handle_fun_200_ok(Conn, Parent) ->
case gen_tcp:recv(Conn, 0) of case gen_tcp:recv(Conn, 0) of
{ok, Request} -> {ok, ReqStr} ->
ct:pal("the http handler got request: ~p", [ReqStr]),
Req = parse_http_request(ReqStr),
Parent ! {http_server, received, Req},
gen_tcp:send(Conn, make_response("200 OK", "Request OK")), gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
self() ! {http_server, received, Request}, handle_fun_200_ok(Conn, Parent);
handle_fun_200_ok(Conn);
{error, closed} -> {error, closed} ->
gen_tcp:close(Conn) gen_tcp:close(Conn)
end. end.
parse_http_request(ReqStr0) ->
[Method, ReqStr1] = string:split(ReqStr0, " ", leading),
[Path, ReqStr2] = string:split(ReqStr1, " ", leading),
[_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading),
[_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading),
#{method => Method, path => Path, body => Body}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_http_crud_apis(_) -> t_http_crud_apis(_) ->
Port = start_http_server(fun handle_fun_200_ok/1), Port = start_http_server(fun handle_fun_200_ok/2),
%% assert we there's no bridges at first %% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@ -163,6 +172,20 @@ t_http_crud_apis(_) ->
, <<"message">> := <<"bridge already exists">> , <<"message">> := <<"bridge already exists">>
}, jsx:decode(RetMsg)), }, jsx:decode(RetMsg)),
%% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert(
receive
{http_server, received, #{method := <<"POST">>, path := <<"/path1">>,
body := Body}} ->
true;
Msg ->
ct:pal("error: http got unexpected request: ~p", [Msg]),
false
after 100 ->
false
end),
%% update the request-path of the bridge %% update the request-path of the bridge
URL2 = ?URL(Port, "path2"), URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]),
@ -201,6 +224,19 @@ t_http_crud_apis(_) ->
, <<"url">> := URL2 , <<"url">> := URL2
}, jsx:decode(Bridge3Str)), }, jsx:decode(Bridge3Str)),
%% send an message to emqx again, check the path has been changed
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert(
receive
{http_server, received, #{path := <<"/path2">>}} ->
true;
Msg2 ->
ct:pal("error: http got unexpected request: ~p", [Msg2]),
false
after 100 ->
false
end),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
@ -215,7 +251,7 @@ t_http_crud_apis(_) ->
ok. ok.
t_start_stop_bridges(_) -> t_start_stop_bridges(_) ->
Port = start_http_server(fun handle_fun_200_ok/1), Port = start_http_server(fun handle_fun_200_ok/2),
URL1 = ?URL(Port, "abc"), URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{ ?HTTP_BRIDGE(URL1)#{

View File

@ -6,7 +6,7 @@
{deps, [ {deps, [
{eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}}, {eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}},
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}, {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}},
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.6.0"}}}, {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.1"}}},
%% NOTE: mind poolboy version when updating mongodb-erlang version %% NOTE: mind poolboy version when updating mongodb-erlang version
{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}}, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}},
%% NOTE: mind poolboy version when updating eredis_cluster version %% NOTE: mind poolboy version when updating eredis_cluster version

View File

@ -265,11 +265,16 @@ process_request(#{
} = Conf, Msg) -> } = Conf, Msg) ->
Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg))
, path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg)
, body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg) , body => process_request_body(BodyTks, Msg)
, headers => maps:to_list(proc_headers(HeadersTks, Msg)) , headers => maps:to_list(proc_headers(HeadersTks, Msg))
, request_timeout => ReqTimeout , request_timeout => ReqTimeout
}. }.
process_request_body([], Msg) ->
emqx_json:encode(Msg);
process_request_body(BodyTks, Msg) ->
emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg).
proc_headers(HeaderTks, Msg) -> proc_headers(HeaderTks, Msg) ->
maps:fold(fun(K, V, Acc) -> maps:fold(fun(K, V, Acc) ->
Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) => Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) =>

View File

@ -207,7 +207,7 @@ basic_config(#{
username => User, username => User,
password => Password, password => Password,
clean_start => CleanStart, clean_start => CleanStart,
keepalive => KeepAlive, keepalive => ms_to_s(KeepAlive),
retry_interval => RetryIntv, retry_interval => RetryIntv,
max_inflight => MaxInflight, max_inflight => MaxInflight,
ssl => EnableSsl, ssl => EnableSsl,
@ -215,5 +215,8 @@ basic_config(#{
if_record_metrics => true if_record_metrics => true
}. }.
ms_to_s(Ms) ->
erlang:ceil(Ms / 1000).
clientid(Id) -> clientid(Id) ->
iolist_to_binary([Id, ":", atom_to_list(node())]). iolist_to_binary([Id, ":", atom_to_list(node())]).

View File

@ -32,7 +32,9 @@
-export([connect/1]). -export([connect/1]).
-export([query/3]). -export([ query/3
, prepared_query/4
]).
-export([do_health_check/1]). -export([do_health_check/1]).
@ -60,8 +62,7 @@ on_start(InstId, #{server := {Host, Port},
connector => InstId, config => Config}), connector => InstId, config => Config}),
SslOpts = case maps:get(enable, SSL) of SslOpts = case maps:get(enable, SSL) of
true -> true ->
[{ssl, [{server_name_indication, disable} | [{ssl, [emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}];
emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}];
false -> [] false -> []
end, end,
Options = [{host, Host}, Options = [{host, Host},
@ -80,12 +81,15 @@ on_stop(InstId, #{poolname := PoolName}) ->
connector => InstId}), connector => InstId}),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) ->
on_query(InstId, {sql, SQL, []}, AfterQuery, State); {Command, Args} = case QueryParams of
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> {query, SQL} -> {query, [SQL, []]};
?TRACE("QUERY", "postgresql_connector_received", {query, SQL, Params} -> {query, [SQL, Params]};
#{connector => InstId, sql => SQL, state => State}), {prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]};
case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of {prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]}
end,
?TRACE("QUERY", "postgresql_connector_received", #{connector => InstId, command => Command, args => Args, state => State}}),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "postgresql_connector_do_sql_query_failed", msg => "postgresql_connector_do_sql_query_failed",
@ -115,6 +119,9 @@ connect(Opts) ->
query(Conn, SQL, Params) -> query(Conn, SQL, Params) ->
epgsql:equery(Conn, SQL, Params). epgsql:equery(Conn, SQL, Params).
prepared_query(Conn, Name, SQL, Params) ->
epgsql:prepared_query2(Conn, Name, SQL, Params).
conn_opts(Opts) -> conn_opts(Opts) ->
conn_opts(Opts, []). conn_opts(Opts, []).
conn_opts([], Acc) -> conn_opts([], Acc) ->

View File

@ -66,7 +66,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg),
#mqtt_msg{qos = QoS, #mqtt_msg{qos = QoS,
@ -82,13 +82,18 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
#{local_topic := TopicToken, payload := PayloadToken, #{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg),
set_headers(Props, set_headers(Props,
emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:set_flags(#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
process_payload([], Msg) ->
emqx_json:encode(Msg);
process_payload(Tks, Msg) ->
replace_vars_in_str(Tks, Msg).
%% Replace a string contains vars to another string in which the placeholders are replace by the %% Replace a string contains vars to another string in which the placeholders are replace by the
%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: %% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
%% "a: 1". %% "a: 1".

View File

@ -123,7 +123,7 @@ schema("/users/:username") ->
#{in => path, example => <<"admin">>})}], #{in => path, example => <<"admin">>})}],
'requestBody' => [ 'requestBody' => [
{ description { description
, mk(emqx_schema:unicode_binary(), , mk(binary(),
#{desc => <<"User description">>, example => <<"administrator">>})} #{desc => <<"User description">>, example => <<"administrator">>})}
], ],
responses => #{ responses => #{
@ -176,7 +176,7 @@ schema("/users/:username/change_pwd") ->
fields(user) -> fields(user) ->
[ [
{description, {description,
mk(emqx_schema:unicode_binary(), mk(binary(),
#{desc => <<"User description">>, example => "administrator"})}, #{desc => <<"User description">>, example => "administrator"})},
{username, {username,
mk(binary(), mk(binary(),

View File

@ -91,16 +91,17 @@ fields(app) ->
"""They are useful for accessing public data anonymously,""" """They are useful for accessing public data anonymously,"""
"""and are used to associate API requests.""", """and are used to associate API requests.""",
example => <<"MzAyMjk3ODMwMDk0NjIzOTUxNjcwNzQ0NzQ3MTE2NDYyMDI">>})}, example => <<"MzAyMjk3ODMwMDk0NjIzOTUxNjcwNzQ0NzQ3MTE2NDYyMDI">>})},
{expired_at, hoconsc:mk(emqx_schema:rfc3339_system_time(), {expired_at, hoconsc:mk(hoconsc:union([undefined, emqx_schema:rfc3339_system_time()]),
#{desc => "No longer valid datetime", #{desc => "No longer valid datetime",
example => <<"2021-12-05T02:01:34.186Z">>, example => <<"2021-12-05T02:01:34.186Z">>,
nullable => true nullable => true,
default => undefined
})}, })},
{created_at, hoconsc:mk(emqx_schema:rfc3339_system_time(), {created_at, hoconsc:mk(emqx_schema:rfc3339_system_time(),
#{desc => "ApiKey create datetime", #{desc => "ApiKey create datetime",
example => <<"2021-12-01T00:00:00.000Z">> example => <<"2021-12-01T00:00:00.000Z">>
})}, })},
{desc, hoconsc:mk(emqx_schema:unicode_binary(), {desc, hoconsc:mk(binary(),
#{example => <<"Note">>, nullable => true})}, #{example => <<"Note">>, nullable => true})},
{enable, hoconsc:mk(boolean(), #{desc => "Enable/Disable", nullable => true})} {enable, hoconsc:mk(boolean(), #{desc => "Enable/Disable", nullable => true})}
]; ];
@ -136,9 +137,15 @@ api_key(post, #{body := App}) ->
#{ #{
<<"name">> := Name, <<"name">> := Name,
<<"desc">> := Desc0, <<"desc">> := Desc0,
<<"expired_at">> := ExpiredAt,
<<"enable">> := Enable <<"enable">> := Enable
} = App, } = App,
%% undefined is never expired
ExpiredAt0 = maps:get(<<"expired_at">>, App, <<"undefined">>),
ExpiredAt =
case ExpiredAt0 of
<<"undefined">> -> undefined;
_ -> ExpiredAt0
end,
Desc = unicode:characters_to_binary(Desc0, unicode), Desc = unicode:characters_to_binary(Desc0, unicode),
case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of
{ok, NewApp} -> {200, format(NewApp)}; {ok, NewApp} -> {200, format(NewApp)};
@ -164,8 +171,13 @@ api_key_by_name(put, #{bindings := #{name := Name}, body := Body}) ->
{error, not_found} -> {404, <<"NOT_FOUND">>} {error, not_found} -> {404, <<"NOT_FOUND">>}
end. end.
format(App = #{expired_at := ExpiredAt, created_at := CreateAt}) -> format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) ->
ExpiredAt =
case ExpiredAt0 of
undefined -> <<"undefined">>;
_ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0))
end,
App#{ App#{
expired_at => list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt)), expired_at => ExpiredAt,
created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt)) created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt))
}. }.

View File

@ -101,15 +101,15 @@ fields(ban) ->
desc => <<"Banned type clientid, username, peerhost">>, desc => <<"Banned type clientid, username, peerhost">>,
nullable => false, nullable => false,
example => username})}, example => username})},
{who, hoconsc:mk(emqx_schema:unicode_binary(), #{ {who, hoconsc:mk(binary(), #{
desc => <<"Client info as banned type">>, desc => <<"Client info as banned type">>,
nullable => false, nullable => false,
example => <<"Badass坏"/utf8>>})}, example => <<"Badass坏"/utf8>>})},
{by, hoconsc:mk(emqx_schema:unicode_binary(), #{ {by, hoconsc:mk(binary(), #{
desc => <<"Commander">>, desc => <<"Commander">>,
nullable => true, nullable => true,
example => <<"mgmt_api">>})}, example => <<"mgmt_api">>})},
{reason, hoconsc:mk(emqx_schema:unicode_binary(), #{ {reason, hoconsc:mk(binary(), #{
desc => <<"Banned reason">>, desc => <<"Banned reason">>,
nullable => true, nullable => true,
example => <<"Too many requests">>})}, example => <<"Too many requests">>})},

View File

@ -37,7 +37,7 @@
api_secret_hash = <<>> :: binary() | '_', api_secret_hash = <<>> :: binary() | '_',
enable = true :: boolean() | '_', enable = true :: boolean() | '_',
desc = <<>> :: binary() | '_', desc = <<>> :: binary() | '_',
expired_at = 0 :: integer() | '_', expired_at = 0 :: integer() | undefined | '_',
created_at = 0 :: integer() | '_' created_at = 0 :: integer() | '_'
}). }).

View File

@ -23,7 +23,7 @@
all() -> [{group, parallel}, {group, sequence}]. all() -> [{group, parallel}, {group, sequence}].
suite() -> [{timetrap, {minutes, 1}}]. suite() -> [{timetrap, {minutes, 1}}].
groups() -> [ groups() -> [
{parallel, [parallel], [t_create, t_update, t_delete, t_authorize]}, {parallel, [parallel], [t_create, t_update, t_delete, t_authorize, t_create_unexpired_app]},
{sequence, [], [t_create_failed]} {sequence, [], [t_create_failed]}
]. ].
@ -137,7 +137,15 @@ t_authorize(_Config) ->
}, },
?assertMatch({ok, #{<<"api_key">> := _, <<"enable">> := true}}, update_app(Name, Expired)), ?assertMatch({ok, #{<<"api_key">> := _, <<"enable">> := true}}, update_app(Name, Expired)),
?assertEqual(Unauthorized, emqx_mgmt_api_test_util:request_api(get, BanPath, BasicHeader)), ?assertEqual(Unauthorized, emqx_mgmt_api_test_util:request_api(get, BanPath, BasicHeader)),
ok.
t_create_unexpired_app(_Config) ->
Name1 = <<"EMQX-UNEXPIRED-API-KEY-1">>,
Name2 = <<"EMQX-UNEXPIRED-API-KEY-2">>,
{ok, Create1} = create_unexpired_app(Name1, #{}),
?assertMatch(#{<<"expired_at">> := <<"undefined">>}, Create1),
{ok, Create2} = create_unexpired_app(Name2, #{expired_at => <<"undefined">>}),
?assertMatch(#{<<"expired_at">> := <<"undefined">>}, Create2),
ok. ok.
@ -170,6 +178,15 @@ create_app(Name) ->
Error -> Error Error -> Error
end. end.
create_unexpired_app(Name, Params) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Path = emqx_mgmt_api_test_util:api_path(["api_key"]),
App = maps:merge(#{name => Name, desc => <<"Note"/utf8>>, enable => true}, Params),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, App) of
{ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
Error -> Error
end.
delete_app(Name) -> delete_app(Name) ->
DeletePath = emqx_mgmt_api_test_util:api_path(["api_key", Name]), DeletePath = emqx_mgmt_api_test_util:api_path(["api_key", Name]),
emqx_mgmt_api_test_util:request_api(delete, DeletePath). emqx_mgmt_api_test_util:request_api(delete, DeletePath).

View File

@ -44,6 +44,7 @@
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
-define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR').
-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE). emqx_dashboard_swagger:spec(?MODULE).
@ -157,11 +158,11 @@ delayed_message(get, #{bindings := #{msgid := Id}}) ->
case emqx_delayed:get_delayed_message(Id) of case emqx_delayed:get_delayed_message(Id) of
{ok, Message} -> {ok, Message} ->
Payload = maps:get(payload, Message), Payload = maps:get(payload, Message),
case size(Payload) > ?MAX_PAYLOAD_LENGTH of case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of
true -> true ->
{200, Message#{payload => ?PAYLOAD_TOO_LARGE}}; {200, Message};
_ -> _ ->
{200, Message#{payload => Payload}} {200, Message#{payload => base64:encode(Payload)}}
end; end;
{error, id_schema_error} -> {error, id_schema_error} ->
{400, generate_http_code_map(id_schema_error, Id)}; {400, generate_http_code_map(id_schema_error, Id)};

View File

@ -33,7 +33,7 @@
]). ]).
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). emqx_dashboard_swagger:spec(?MODULE).
paths() -> paths() ->
["/mqtt/topic_rewrite"]. ["/mqtt/topic_rewrite"].

View File

@ -27,10 +27,10 @@
, on_message_publish/2 , on_message_publish/2
]). ]).
-export([ dispatch/4 -export([ dispatch/5
, delete_message/2 , delete_message/2
, store_retained/2 , store_retained/2
, deliver/5]). , deliver/6]).
-export([ get_expiry_time/1 -export([ get_expiry_time/1
, update_config/1 , update_config/1
@ -78,7 +78,7 @@ on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefin
on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) -> on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) ->
IsNew = maps:get(is_new, Opts, true), IsNew = maps:get(is_new, Opts, true),
case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of
true -> dispatch(Context, Topic); true -> dispatch(Context, Topic, Opts);
_ -> ok _ -> ok
end. end.
@ -111,26 +111,26 @@ on_message_publish(Msg, _) ->
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec dispatch(context(), pid(), topic(), cursor()) -> ok. -spec dispatch(context(), pid(), topic(), emqx_types:subopts(), cursor()) -> ok.
dispatch(Context, Pid, Topic, Cursor) -> dispatch(Context, Pid, Topic, Opts, Cursor) ->
Mod = get_backend_module(), Mod = get_backend_module(),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
false -> false ->
{ok, Result} = Mod:read_message(Context, Topic), {ok, Result} = Mod:read_message(Context, Topic),
deliver(Result, Context, Pid, Topic, undefined); deliver(Result, Context, Pid, Topic, Opts, undefined);
true -> true ->
{ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor),
deliver(Result, Context, Pid, Topic, NewCursor) deliver(Result, Context, Pid, Topic, Opts, NewCursor)
end. end.
deliver([], Context, Pid, Topic, Cursor) -> deliver([], Context, Pid, Topic, Opts, Cursor) ->
case Cursor of case Cursor of
undefined -> undefined ->
ok; ok;
_ -> _ ->
dispatch(Context, Pid, Topic, Cursor) dispatch(Context, Pid, Topic, Opts, Cursor)
end; end;
deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) ->
case erlang:is_process_alive(Pid) of case erlang:is_process_alive(Pid) of
false -> false ->
ok; ok;
@ -138,12 +138,12 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) ->
#{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]), #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]),
case MaxDeliverNum of case MaxDeliverNum of
0 -> 0 ->
_ = [Pid ! {deliver, Topic, Msg} || Msg <- Result], _ = [Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)} || Msg <- Result],
ok; ok;
_ -> _ ->
case do_deliver(Result, Id, Pid, Topic) of case do_deliver(Result, Id, Pid, Topic, Opts) of
ok -> ok ->
deliver([], Context, Pid, Topic, Cursor); deliver([], Context, Pid, Topic, Opts, Cursor);
abort -> abort ->
ok ok
end end
@ -280,9 +280,9 @@ is_too_big(Size) ->
Limit > 0 andalso (Size > Limit). Limit > 0 andalso (Size > Limit).
%% @private %% @private
dispatch(Context, Topic) -> dispatch(Context, Topic, Opts) ->
emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4, emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/5,
[Context, self(), Topic, undefined]). [Context, self(), Topic, Opts, undefined]).
-spec delete_message(context(), topic()) -> ok. -spec delete_message(context(), topic()) -> ok.
delete_message(Context, Topic) -> delete_message(Context, Topic) ->
@ -305,16 +305,16 @@ clean(Context) ->
Mod = get_backend_module(), Mod = get_backend_module(),
Mod:clean(Context). Mod:clean(Context).
-spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort. -spec do_deliver(list(term()), pos_integer(), pid(), topic(), emqx_types:subopts()) -> ok | abort.
do_deliver([Msg | T], Id, Pid, Topic) -> do_deliver([Msg | T], Id, Pid, Topic, Opts) ->
case require_semaphore(?DELIVER_SEMAPHORE, Id) of case require_semaphore(?DELIVER_SEMAPHORE, Id) of
true -> true ->
Pid ! {deliver, Topic, Msg}, Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)},
do_deliver(T, Id, Pid, Topic); do_deliver(T, Id, Pid, Topic, Opts);
_ -> _ ->
abort abort
end; end;
do_deliver([], _, _, _) -> do_deliver([], _, _, _, _) ->
ok. ok.
-spec require_semaphore(semaphore(), pos_integer()) -> boolean(). -spec require_semaphore(semaphore(), pos_integer()) -> boolean().
@ -484,3 +484,9 @@ load(Context) ->
unload() -> unload() ->
emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx:unhook('message.publish', {?MODULE, on_message_publish}),
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}).
handle_retain_opts(#{rap := 0}, Message) ->
emqx_message:set_header(retain, false, Message);
handle_retain_opts(_, Message) ->
Message.

View File

@ -28,13 +28,14 @@
-import(emqx_mgmt_api_configs, [gen_schema/1]). -import(emqx_mgmt_api_configs, [gen_schema/1]).
-import(emqx_mgmt_util, [ object_array_schema/2 -import(emqx_mgmt_util, [ object_array_schema/2
, object_schema/2
, schema/1 , schema/1
, schema/2 , schema/2
, error_schema/2 , error_schema/2
, page_params/0 , page_params/0
, properties/1]). , properties/1]).
-define(MAX_BASE64_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 -define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024
api_spec() -> api_spec() ->
{[lookup_retained_api(), with_topic_api(), config_api()], []}. {[lookup_retained_api(), with_topic_api(), config_api()], []}.
@ -64,7 +65,7 @@ parameters() ->
lookup_retained_api() -> lookup_retained_api() ->
Metadata = #{ Metadata = #{
get => #{ get => #{
description => <<"lookup matching messages">>, description => <<"List retained messages">>,
parameters => page_params(), parameters => page_params(),
responses => #{ responses => #{
<<"200">> => object_array_schema( <<"200">> => object_array_schema(
@ -80,9 +81,10 @@ with_topic_api() ->
MetaData = #{ MetaData = #{
get => #{ get => #{
description => <<"lookup matching messages">>, description => <<"lookup matching messages">>,
parameters => parameters() ++ page_params(), parameters => parameters(),
responses => #{ responses => #{
<<"200">> => object_array_schema(message_props(), <<"List retained messages">>), <<"200">> => object_schema(message_props(), <<"List retained messages">>),
<<"404">> => error_schema(<<"Reatined Not Exists">>, ['NOT_FOUND']),
<<"405">> => schema(<<"NotAllowed">>) <<"405">> => schema(<<"NotAllowed">>)
} }
}, },
@ -139,35 +141,27 @@ config(put, #{body := Body}) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Interval Funcs %% Interval Funcs
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
lookup_retained(get, Params) -> lookup_retained(get, #{query_string := Qs}) ->
lookup(undefined, Params, fun format_message/1). Page = maps:get(page, Qs, 1),
Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit),
{200, [format_message(Msg) || Msg <- Msgs]}.
with_topic(get, #{bindings := Bindings} = Params) -> with_topic(get, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
lookup(Topic, Params, fun format_detail_message/1); {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1),
case Msgs of
[H | _] ->
{200, format_detail_message(H)};
_ ->
{404, #{code => 'NOT_FOUND'}}
end;
with_topic(delete, #{bindings := Bindings}) -> with_topic(delete, #{bindings := Bindings}) ->
Topic = maps:get(topic, Bindings), Topic = maps:get(topic, Bindings),
emqx_retainer_mnesia:delete_message(undefined, Topic), emqx_retainer_mnesia:delete_message(undefined, Topic),
{204}. {204}.
-spec lookup(undefined | binary(),
map(),
fun((emqx_types:message()) -> map())) ->
{200, map()}.
lookup(Topic, #{query_string := Qs}, Formatter) ->
Page = maps:get(page, Qs, 1),
Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit),
{200, format_message(Msgs, Formatter)}.
format_message(Messages, Formatter) when is_list(Messages)->
[Formatter(Message) || Message <- Messages];
format_message(Message, Formatter) ->
Formatter(Message).
format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From
, timestamp = Timestamp, headers = Headers}) -> , timestamp = Timestamp, headers = Headers}) ->
#{msgid => emqx_guid:to_hexstr(ID), #{msgid => emqx_guid:to_hexstr(ID),
@ -181,12 +175,11 @@ format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From
format_detail_message(#message{payload = Payload} = Msg) -> format_detail_message(#message{payload = Payload} = Msg) ->
Base = format_message(Msg), Base = format_message(Msg),
EncodePayload = base64:encode(Payload), case erlang:byte_size(Payload) =< ?MAX_PAYLOAD_SIZE of
case erlang:byte_size(EncodePayload) =< ?MAX_BASE64_PAYLOAD_SIZE of
true -> true ->
Base#{payload => EncodePayload}; Base#{payload => base64:encode(Payload)};
_ -> _ ->
Base#{payload => base64:encode(<<"PAYLOAD_TOO_LARGE">>)} Base
end. end.
to_bin_string(Data) when is_binary(Data) -> to_bin_string(Data) when is_binary(Data) ->

View File

@ -204,7 +204,10 @@ fi
if ! check_erlang_start >/dev/null 2>&1; then if ! check_erlang_start >/dev/null 2>&1; then
BUILT_ON="$(head -1 "${REL_DIR}/BUILT_ON")" BUILT_ON="$(head -1 "${REL_DIR}/BUILT_ON")"
## failed to start, might be due to missing libs, try to be portable ## failed to start, might be due to missing libs, try to be portable
export LD_LIBRARY_PATH="${LD_LIBRARY_PATH:-$DYNLIBS_DIR}"
if [ "$LD_LIBRARY_PATH" != "$DYNLIBS_DIR" ]; then
export LD_LIBRARY_PATH="$DYNLIBS_DIR:$LD_LIBRARY_PATH" export LD_LIBRARY_PATH="$DYNLIBS_DIR:$LD_LIBRARY_PATH"
fi
if ! check_erlang_start; then if ! check_erlang_start; then
## it's hopeless ## it's hopeless
echoerr "FATAL: Unable to start Erlang." echoerr "FATAL: Unable to start Erlang."
@ -428,6 +431,26 @@ wait_for() {
done done
} }
wait_until_return_val() {
local RESULT
local WAIT_TIME
local CMD
RESULT="$1"
WAIT_TIME="$2"
shift 2
CMD="$*"
while true; do
if [ "$($CMD 2>/dev/null)" = "$RESULT" ]; then
return 0
fi
if [ "$WAIT_TIME" -le 0 ]; then
return 1
fi
WAIT_TIME=$((WAIT_TIME - 1))
sleep 1
done
}
latest_vm_args() { latest_vm_args() {
local hint_var_name="$1" local hint_var_name="$1"
local vm_args_file local vm_args_file
@ -553,7 +576,8 @@ case "${COMMAND}" in
"$(relx_start_command)" "$(relx_start_command)"
WAIT_TIME=${WAIT_FOR_ERLANG:-15} WAIT_TIME=${WAIT_FOR_ERLANG:-15}
if wait_for "$WAIT_TIME" 'relx_nodetool' 'ping'; then if wait_until_return_val "true" "$WAIT_TIME" 'relx_nodetool' \
'eval' 'emqx:is_running()'; then
echo "$EMQX_DESCRIPTION $REL_VSN is started successfully!" echo "$EMQX_DESCRIPTION $REL_VSN is started successfully!"
exit 0 exit 0
else else

View File

@ -46,7 +46,7 @@
{deps, {deps,
[ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.2"}}} [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.2"}}}
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.6"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
@ -65,7 +65,7 @@
, {system_monitor, {git, "https://github.com/klarna-incubator/system_monitor", {tag, "2.2.0"}}} , {system_monitor, {git, "https://github.com/klarna-incubator/system_monitor", {tag, "2.2.0"}}}
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.1"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}