diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 330291def..31caa498d 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -11,13 +11,13 @@ {deps, [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.2"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} - , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} + , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.6"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.1"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}} diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 35a6c048e..7e9e985b8 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -264,6 +264,7 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> {error, Reason} -> ?SLOG(error, #{msg => "failed_to_load_hocon_conf", reason => Reason, + pwd => file:get_cwd(), include_dirs => IncDir }), error(failed_to_load_hocon_conf) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index cbf21683d..123ee5379 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -38,7 +38,6 @@ -type ip_port() :: tuple(). -type cipher() :: map(). -type rfc3339_system_time() :: integer(). --type unicode_binary() :: binary(). -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). @@ -52,7 +51,6 @@ -typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}). -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). -typerefl_from_string({rfc3339_system_time/0, emqx_schema, rfc3339_to_system_time}). --typerefl_from_string({unicode_binary/0, emqx_schema, to_unicode_binary}). -export([ validate_heap_size/1 , parse_user_lookup_fun/1 @@ -66,8 +64,7 @@ to_bar_separated_list/1, to_ip_port/1, to_erl_cipher_suite/1, to_comma_separated_atoms/1, - rfc3339_to_system_time/1, - to_unicode_binary/1]). + rfc3339_to_system_time/1]). -behaviour(hocon_schema). @@ -76,8 +73,7 @@ comma_separated_list/0, bar_separated_list/0, ip_port/0, cipher/0, comma_separated_atoms/0, - rfc3339_system_time/0, - unicode_binary/0]). + rfc3339_system_time/0]). -export([namespace/0, roots/0, roots/1, fields/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). @@ -1407,9 +1403,6 @@ rfc3339_to_system_time(DateTime) -> {error, bad_rfc3339_timestamp} end. -to_unicode_binary(Str) -> - {ok, unicode:characters_to_binary(Str)}. - to_bar_separated_list(Str) -> {ok, string:tokens(Str, "| ")}. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 0ed7d282a..816eace0d 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -106,7 +106,7 @@ authenticate(#{password := Password} = Credential, resource_id := ResourceId, password_hash_algorithm := Algorithm}) -> Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), - case emqx_resource:query(ResourceId, {sql, Query, Params}) of + case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Query, Params}) of {ok, _Columns, []} -> ignore; {ok, Columns, [Row | _]} -> NColumns = [Name || #column{name = Name} <- Columns], diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index 8f1f12690..e33f5c100 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -441,12 +441,12 @@ create_user(Values) -> q(Sql) -> emqx_resource:query( ?PGSQL_RESOURCE, - {sql, Sql}). + {query, Sql}). q(Sql, Params) -> emqx_resource:query( ?PGSQL_RESOURCE, - {sql, Sql, Params}). + {query, Sql, Params}). drop_seeds() -> {ok, _, _} = q("DROP TABLE IF EXISTS users"), diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 54438793b..d02da0958 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -31,9 +31,7 @@ , lookup/0 , lookup/1 , move/2 - , move/3 , update/2 - , update/3 , authorize/5 ]). @@ -110,28 +108,19 @@ lookup(Type) -> {Source, _Front, _Rear} = take(Type), Source. -move(Type, Cmd) -> - move(Type, Cmd, #{}). - -move(Type, #{<<"before">> := Before}, Opts) -> - emqx:update_config( ?CONF_KEY_PATH - , {?CMD_MOVE, type(Type), ?CMD_MOVE_BEFORE(type(Before))}, Opts); -move(Type, #{<<"after">> := After}, Opts) -> - emqx:update_config( ?CONF_KEY_PATH - , {?CMD_MOVE, type(Type), ?CMD_MOVE_AFTER(type(After))}, Opts); -move(Type, Position, Opts) -> - emqx:update_config( ?CONF_KEY_PATH - , {?CMD_MOVE, type(Type), Position}, Opts). +move(Type, #{<<"before">> := Before}) -> + emqx_authz_utils:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), ?CMD_MOVE_BEFORE(type(Before))}); +move(Type, #{<<"after">> := After}) -> + emqx_authz_utils:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), ?CMD_MOVE_AFTER(type(After))}); +move(Type, Position) -> + emqx_authz_utils:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), Position}). +update({?CMD_REPLACE, Type}, Sources) -> + emqx_authz_utils:update_config(?CONF_KEY_PATH, {{?CMD_REPLACE, type(Type)}, Sources}); +update({?CMD_DELETE, Type}, Sources) -> + emqx_authz_utils:update_config(?CONF_KEY_PATH, {{?CMD_DELETE, type(Type)}, Sources}); update(Cmd, Sources) -> - update(Cmd, Sources, #{}). - -update({?CMD_REPLACE, Type}, Sources, Opts) -> - emqx:update_config(?CONF_KEY_PATH, {{?CMD_REPLACE, type(Type)}, Sources}, Opts); -update({?CMD_DELETE, Type}, Sources, Opts) -> - emqx:update_config(?CONF_KEY_PATH, {{?CMD_DELETE, type(Type)}, Sources}, Opts); -update(Cmd, Sources, Opts) -> - emqx:update_config(?CONF_KEY_PATH, {Cmd, Sources}, Opts). + emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}). do_update({?CMD_MOVE, Type, ?CMD_MOVE_TOP}, Conf) when is_list(Conf) -> {Source, Front, Rear} = take(Type, Conf), @@ -155,8 +144,8 @@ do_update({?CMD_APPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) -> NConf = Conf ++ Sources, ok = check_dup_types(NConf), NConf; -do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source}, Conf) when is_map(Source), - is_list(Conf) -> +do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source}, Conf) + when is_map(Source), is_list(Conf) -> case create_dry_run(Type, Source) of ok -> {_Old, Front, Rear} = take(Type, Conf), @@ -165,7 +154,8 @@ do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source}, Conf) when i NConf; {error, _} = Error -> Error end; -do_update({{?CMD_REPLACE, Type}, Source}, Conf) when is_map(Source), is_list(Conf) -> +do_update({{?CMD_REPLACE, Type}, Source}, Conf) + when is_map(Source), is_list(Conf) -> {_Old, Front, Rear} = take(Type, Conf), NConf = Front ++ [Source | Rear], ok = check_dup_types(NConf), diff --git a/apps/emqx_authz/src/emqx_authz_api_settings.erl b/apps/emqx_authz/src/emqx_authz_api_settings.erl index c2a87da16..c7d75bbba 100644 --- a/apps/emqx_authz/src/emqx_authz_api_settings.erl +++ b/apps/emqx_authz/src/emqx_authz_api_settings.erl @@ -54,8 +54,9 @@ settings(get, _Params) -> settings(put, #{body := #{<<"no_match">> := NoMatch, <<"deny_action">> := DenyAction, <<"cache">> := Cache}}) -> - {ok, _} = emqx:update_config([authorization, no_match], NoMatch), - {ok, _} = emqx:update_config([authorization, deny_action], DenyAction), - {ok, _} = emqx:update_config([authorization, cache], Cache), + {ok, _} = emqx_authz_utils:update_config([authorization, no_match], NoMatch), + {ok, _} = emqx_authz_utils:update_config( + [authorization, deny_action], DenyAction), + {ok, _} = emqx_authz_utils:update_config([authorization, cache], Cache), ok = emqx_authz_cache:drain_cache(), {200, authorization_settings()}. diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index c2ee96594..fe48b35d0 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -48,12 +48,12 @@ init(#{url := Url} = Source) -> end. destroy(#{annotations := #{id := Id}}) -> - ok = emqx_resource:remove(Id). + ok = emqx_resource:remove_local(Id). dry_run(Source) -> URIMap = maps:get(url, Source), NSource = maps:put(base_url, maps:remove(query, URIMap), Source), - emqx_resource:create_dry_run(emqx_connector_http, NSource). + emqx_resource:create_dry_run_local(emqx_connector_http, NSource). authorize(Client, PubSub, Topic, #{type := http, diff --git a/apps/emqx_authz/src/emqx_authz_mongodb.erl b/apps/emqx_authz/src/emqx_authz_mongodb.erl index 97fac9627..84ef2f302 100644 --- a/apps/emqx_authz/src/emqx_authz_mongodb.erl +++ b/apps/emqx_authz/src/emqx_authz_mongodb.erl @@ -46,10 +46,10 @@ init(Source) -> end. dry_run(Source) -> - emqx_resource:create_dry_run(emqx_connector_mongo, Source). + emqx_resource:create_dry_run_local(emqx_connector_mongo, Source). destroy(#{annotations := #{id := Id}}) -> - ok = emqx_resource:remove(Id). + ok = emqx_resource:remove_local(Id). authorize(Client, PubSub, Topic, #{collection := Collection, diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl index a5b14ec1b..181478a76 100644 --- a/apps/emqx_authz/src/emqx_authz_mysql.erl +++ b/apps/emqx_authz/src/emqx_authz_mysql.erl @@ -48,10 +48,10 @@ init(#{query := SQL} = Source) -> end. dry_run(Source) -> - emqx_resource:create_dry_run(emqx_connector_mysql, Source). + emqx_resource:create_dry_run_local(emqx_connector_mysql, Source). destroy(#{annotations := #{id := Id}}) -> - ok = emqx_resource:remove(Id). + ok = emqx_resource:remove_local(Id). authorize(Client, PubSub, Topic, #{annotations := #{id := ResourceID, diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index f101841c2..926f6fe3c 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -48,10 +48,10 @@ init(#{query := SQL} = Source) -> end. destroy(#{annotations := #{id := Id}}) -> - ok = emqx_resource:remove(Id). + ok = emqx_resource:remove_local(Id). dry_run(Source) -> - emqx_resource:create_dry_run(emqx_connector_pgsql, Source). + emqx_resource:create_dry_run_local(emqx_connector_pgsql, Source). parse_query(Sql) -> case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of @@ -73,7 +73,7 @@ authorize(Client, PubSub, Topic, query := {Query, Params} } }) -> - case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of + case emqx_resource:query(ResourceID, {prepared_query, ResourceID, Query, replvar(Params, Client)}) of {ok, _Columns, []} -> nomatch; {ok, Columns, Rows} -> do_authorize(Client, PubSub, Topic, Columns, Rows); diff --git a/apps/emqx_authz/src/emqx_authz_redis.erl b/apps/emqx_authz/src/emqx_authz_redis.erl index 1f6abe330..8765734cf 100644 --- a/apps/emqx_authz/src/emqx_authz_redis.erl +++ b/apps/emqx_authz/src/emqx_authz_redis.erl @@ -46,10 +46,10 @@ init(Source) -> end. destroy(#{annotations := #{id := Id}}) -> - ok = emqx_resource:remove(Id). + ok = emqx_resource:remove_local(Id). dry_run(Source) -> - emqx_resource:create_dry_run(emqx_connector_redis, Source). + emqx_resource:create_dry_run_local(emqx_connector_redis, Source). authorize(Client, PubSub, Topic, #{cmd := CMD, diff --git a/apps/emqx_authz/src/emqx_authz_utils.erl b/apps/emqx_authz/src/emqx_authz_utils.erl index 73132aacb..435388e44 100644 --- a/apps/emqx_authz/src/emqx_authz_utils.erl +++ b/apps/emqx_authz/src/emqx_authz_utils.erl @@ -18,9 +18,11 @@ -include_lib("emqx/include/emqx_placeholder.hrl"). --export([cleanup_resources/0, - make_resource_id/1, - create_resource/2]). +-export([ cleanup_resources/0 + , make_resource_id/1 + , create_resource/2 + , update_config/2 + ]). -define(RESOURCE_GROUP, <<"emqx_authz">>). @@ -30,7 +32,7 @@ create_resource(Module, Config) -> ResourceID = make_resource_id(Module), - case emqx_resource:create(ResourceID, Module, Config) of + case emqx_resource:create_local(ResourceID, Module, Config) of {ok, already_created} -> {ok, ResourceID}; {ok, _} -> {ok, ResourceID}; {error, Reason} -> {error, Reason} @@ -38,13 +40,17 @@ create_resource(Module, Config) -> cleanup_resources() -> lists:foreach( - fun emqx_resource:remove/1, + fun emqx_resource:remove_local/1, emqx_resource:list_group_instances(?RESOURCE_GROUP)). make_resource_id(Name) -> NameBin = bin(Name), emqx_resource:generate_id(?RESOURCE_GROUP, NameBin). +update_config(Path, ConfigRequest) -> + emqx_conf:update(Path, ConfigRequest, #{rawconf_with_defaults => true, + override_to => cluster}). + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index e18901fc5..7038b59e0 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -31,10 +31,9 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), - meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end), - meck:expect(emqx_resource, remove, fun(_) -> ok end), - meck:expect(emqx_resource, create_dry_run, fun(_, _) -> ok end), + meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end), + meck:expect(emqx_resource, remove_local, fun(_) -> ok end), + meck:expect(emqx_resource, create_dry_run_local, fun(_, _) -> ok end), ok = emqx_common_test_helpers:start_apps( [emqx_connector, emqx_conf, emqx_authz], diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index 67e1b05da..b1b6676ad 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -96,14 +96,13 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), - meck:expect(emqx_resource, create_dry_run, + meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end), + meck:expect(emqx_resource, create_dry_run_local, fun(emqx_connector_mysql, _) -> {ok, meck_data}; (T, C) -> meck:passthrough([T, C]) end), - meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, health_check, fun(_) -> ok end), - meck:expect(emqx_resource, remove, fun(_) -> ok end ), + meck:expect(emqx_resource, remove_local, fun(_) -> ok end ), ok = emqx_common_test_helpers:start_apps( [emqx_conf, emqx_authz, emqx_dashboard], diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index c438b3f4b..939c8fb5e 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -30,8 +30,8 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), - meck:expect(emqx_resource, remove, fun(_) -> ok end ), + meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end), + meck:expect(emqx_resource, remove_local, fun(_) -> ok end ), ok = emqx_common_test_helpers:start_apps( [emqx_conf, emqx_authz], diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl index 92c479f92..a264d3407 100644 --- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl @@ -228,12 +228,12 @@ raw_pgsql_authz_config() -> q(Sql) -> emqx_resource:query( ?PGSQL_RESOURCE, - {sql, Sql}). + {query, Sql}). insert(Sql, Params) -> {ok, _} = emqx_resource:query( ?PGSQL_RESOURCE, - {sql, Sql, Params}), + {query, Sql, Params}), ok. init_table() -> diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index 43cace332..540a6a070 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -87,6 +87,7 @@ basic_config() -> , {direction, mk(egress, #{ desc => "The direction of this bridge, MUST be egress" + , default => egress })} ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)). diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 3acfbcdef..82fc79ebf 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -71,6 +71,7 @@ metrics_status_fields() -> direction_field(Dir, Desc) -> {direction, mk(Dir, #{ nullable => false + , default => egress , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" ++ Desc })}. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 65baf7051..7724d467c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -47,7 +47,7 @@ groups() -> []. suite() -> - [{timetrap,{seconds,30}}]. + [{timetrap,{seconds,60}}]. init_per_suite(Config) -> ok = emqx_config:put([emqx_dashboard], #{ @@ -84,7 +84,7 @@ start_http_server(HandleFun) -> spawn_link(fun() -> {Port, Sock} = listen_on_random_port(), Parent ! {port, Port}, - loop(Sock, HandleFun) + loop(Sock, HandleFun, Parent) end), receive {port, Port} -> Port @@ -95,40 +95,49 @@ start_http_server(HandleFun) -> listen_on_random_port() -> Min = 1024, Max = 65000, Port = rand:uniform(Max - Min) + Min, - case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}]) of + case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}, binary]) of {ok, Sock} -> {Port, Sock}; {error, eaddrinuse} -> listen_on_random_port() end. -loop(Sock, HandleFun) -> +loop(Sock, HandleFun, Parent) -> {ok, Conn} = gen_tcp:accept(Sock), - Handler = spawn(fun () -> HandleFun(Conn) end), + Handler = spawn(fun () -> HandleFun(Conn, Parent) end), gen_tcp:controlling_process(Conn, Handler), - loop(Sock, HandleFun). + loop(Sock, HandleFun, Parent). make_response(CodeStr, Str) -> B = iolist_to_binary(Str), iolist_to_binary( io_lib:fwrite( - "HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s", + "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s", [CodeStr, size(B), B])). -handle_fun_200_ok(Conn) -> +handle_fun_200_ok(Conn, Parent) -> case gen_tcp:recv(Conn, 0) of - {ok, Request} -> + {ok, ReqStr} -> + ct:pal("the http handler got request: ~p", [ReqStr]), + Req = parse_http_request(ReqStr), + Parent ! {http_server, received, Req}, gen_tcp:send(Conn, make_response("200 OK", "Request OK")), - self() ! {http_server, received, Request}, - handle_fun_200_ok(Conn); + handle_fun_200_ok(Conn, Parent); {error, closed} -> gen_tcp:close(Conn) end. +parse_http_request(ReqStr0) -> + [Method, ReqStr1] = string:split(ReqStr0, " ", leading), + [Path, ReqStr2] = string:split(ReqStr1, " ", leading), + [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), + [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading), + #{method => Method, path => Path, body => Body}. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_http_crud_apis(_) -> - Port = start_http_server(fun handle_fun_200_ok/1), + Port = start_http_server(fun handle_fun_200_ok/2), %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -163,6 +172,20 @@ t_http_crud_apis(_) -> , <<"message">> := <<"bridge already exists">> }, jsx:decode(RetMsg)), + %% send an message to emqx and the message should be forwarded to the HTTP server + Body = <<"my msg">>, + emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), + ?assert( + receive + {http_server, received, #{method := <<"POST">>, path := <<"/path1">>, + body := Body}} -> + true; + Msg -> + ct:pal("error: http got unexpected request: ~p", [Msg]), + false + after 100 -> + false + end), %% update the request-path of the bridge URL2 = ?URL(Port, "path2"), {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), @@ -201,6 +224,19 @@ t_http_crud_apis(_) -> , <<"url">> := URL2 }, jsx:decode(Bridge3Str)), + %% send an message to emqx again, check the path has been changed + emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), + ?assert( + receive + {http_server, received, #{path := <<"/path2">>}} -> + true; + Msg2 -> + ct:pal("error: http got unexpected request: ~p", [Msg2]), + false + after 100 -> + false + end), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -215,7 +251,7 @@ t_http_crud_apis(_) -> ok. t_start_stop_bridges(_) -> - Port = start_http_server(fun handle_fun_200_ok/1), + Port = start_http_server(fun handle_fun_200_ok/2), URL1 = ?URL(Port, "abc"), {ok, 201, Bridge} = request(post, uri(["bridges"]), ?HTTP_BRIDGE(URL1)#{ diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config index 58706e950..eadc4e773 100644 --- a/apps/emqx_connector/rebar.config +++ b/apps/emqx_connector/rebar.config @@ -6,7 +6,7 @@ {deps, [ {eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}}, {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}, - {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.6.0"}}}, + {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.1"}}}, %% NOTE: mind poolboy version when updating mongodb-erlang version {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.10"}}}, %% NOTE: mind poolboy version when updating eredis_cluster version diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index f597a5c1d..ac0847a91 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -265,11 +265,16 @@ process_request(#{ } = Conf, Msg) -> Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) - , body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg) + , body => process_request_body(BodyTks, Msg) , headers => maps:to_list(proc_headers(HeadersTks, Msg)) , request_timeout => ReqTimeout }. +process_request_body([], Msg) -> + emqx_json:encode(Msg); +process_request_body(BodyTks, Msg) -> + emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). + proc_headers(HeaderTks, Msg) -> maps:fold(fun(K, V, Acc) -> Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) => diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 27d001806..a9647b2c1 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -207,7 +207,7 @@ basic_config(#{ username => User, password => Password, clean_start => CleanStart, - keepalive => KeepAlive, + keepalive => ms_to_s(KeepAlive), retry_interval => RetryIntv, max_inflight => MaxInflight, ssl => EnableSsl, @@ -215,5 +215,8 @@ basic_config(#{ if_record_metrics => true }. +ms_to_s(Ms) -> + erlang:ceil(Ms / 1000). + clientid(Id) -> iolist_to_binary([Id, ":", atom_to_list(node())]). diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 36bff3386..5d9d0e1d9 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -32,7 +32,9 @@ -export([connect/1]). --export([query/3]). +-export([ query/3 + , prepared_query/4 + ]). -export([do_health_check/1]). @@ -60,8 +62,7 @@ on_start(InstId, #{server := {Host, Port}, connector => InstId, config => Config}), SslOpts = case maps:get(enable, SSL) of true -> - [{ssl, [{server_name_indication, disable} | - emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}]; + [{ssl, [emqx_plugin_libs_ssl:save_files_return_opts(SSL, "connectors", InstId)]}]; false -> [] end, Options = [{host, Host}, @@ -80,12 +81,15 @@ on_stop(InstId, #{poolname := PoolName}) -> connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> - on_query(InstId, {sql, SQL, []}, AfterQuery, State); -on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> - ?TRACE("QUERY", "postgresql_connector_received", - #{connector => InstId, sql => SQL, state => State}), - case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of +on_query(InstId, QueryParams, AfterQuery, #{poolname := PoolName} = State) -> + {Command, Args} = case QueryParams of + {query, SQL} -> {query, [SQL, []]}; + {query, SQL, Params} -> {query, [SQL, Params]}; + {prepared_query, Name, SQL} -> {prepared_query, [Name, SQL, []]}; + {prepared_query, Name, SQL, Params} -> {prepared_query, [Name, SQL, Params]} + end, + ?TRACE("QUERY", "postgresql_connector_received", #{connector => InstId, command => Command, args => Args, state => State}}), + case Result = ecpool:pick_and_do(PoolName, {?MODULE, Command, Args}, no_handover) of {error, Reason} -> ?SLOG(error, #{ msg => "postgresql_connector_do_sql_query_failed", @@ -115,6 +119,9 @@ connect(Opts) -> query(Conn, SQL, Params) -> epgsql:equery(Conn, SQL, Params). +prepared_query(Conn, Name, SQL, Params) -> + epgsql:prepared_query2(Conn, Name, SQL, Params). + conn_opts(Opts) -> conn_opts(Opts, []). conn_opts([], Acc) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index eb483dcc5..1357037ee 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -66,7 +66,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = replace_vars_in_str(PayloadToken, MapMsg), + Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), #mqtt_msg{qos = QoS, @@ -82,13 +82,18 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = replace_vars_in_str(PayloadToken, MapMsg), + Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). +process_payload([], Msg) -> + emqx_json:encode(Msg); +process_payload(Tks, Msg) -> + replace_vars_in_str(Tks, Msg). + %% Replace a string contains vars to another string in which the placeholders are replace by the %% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: %% "a: 1". diff --git a/apps/emqx_dashboard/src/emqx_dashboard_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_api.erl index 45a3b7c56..ae2eb4e42 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_api.erl @@ -123,7 +123,7 @@ schema("/users/:username") -> #{in => path, example => <<"admin">>})}], 'requestBody' => [ { description - , mk(emqx_schema:unicode_binary(), + , mk(binary(), #{desc => <<"User description">>, example => <<"administrator">>})} ], responses => #{ @@ -176,7 +176,7 @@ schema("/users/:username/change_pwd") -> fields(user) -> [ {description, - mk(emqx_schema:unicode_binary(), + mk(binary(), #{desc => <<"User description">>, example => "administrator"})}, {username, mk(binary(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_app.erl b/apps/emqx_management/src/emqx_mgmt_api_app.erl index b77f1a214..dfce3cf30 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_app.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_app.erl @@ -91,16 +91,17 @@ fields(app) -> """They are useful for accessing public data anonymously,""" """and are used to associate API requests.""", example => <<"MzAyMjk3ODMwMDk0NjIzOTUxNjcwNzQ0NzQ3MTE2NDYyMDI">>})}, - {expired_at, hoconsc:mk(emqx_schema:rfc3339_system_time(), + {expired_at, hoconsc:mk(hoconsc:union([undefined, emqx_schema:rfc3339_system_time()]), #{desc => "No longer valid datetime", example => <<"2021-12-05T02:01:34.186Z">>, - nullable => true + nullable => true, + default => undefined })}, {created_at, hoconsc:mk(emqx_schema:rfc3339_system_time(), #{desc => "ApiKey create datetime", example => <<"2021-12-01T00:00:00.000Z">> })}, - {desc, hoconsc:mk(emqx_schema:unicode_binary(), + {desc, hoconsc:mk(binary(), #{example => <<"Note">>, nullable => true})}, {enable, hoconsc:mk(boolean(), #{desc => "Enable/Disable", nullable => true})} ]; @@ -136,9 +137,15 @@ api_key(post, #{body := App}) -> #{ <<"name">> := Name, <<"desc">> := Desc0, - <<"expired_at">> := ExpiredAt, <<"enable">> := Enable } = App, + %% undefined is never expired + ExpiredAt0 = maps:get(<<"expired_at">>, App, <<"undefined">>), + ExpiredAt = + case ExpiredAt0 of + <<"undefined">> -> undefined; + _ -> ExpiredAt0 + end, Desc = unicode:characters_to_binary(Desc0, unicode), case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of {ok, NewApp} -> {200, format(NewApp)}; @@ -164,8 +171,13 @@ api_key_by_name(put, #{bindings := #{name := Name}, body := Body}) -> {error, not_found} -> {404, <<"NOT_FOUND">>} end. -format(App = #{expired_at := ExpiredAt, created_at := CreateAt}) -> +format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) -> + ExpiredAt = + case ExpiredAt0 of + undefined -> <<"undefined">>; + _ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0)) + end, App#{ - expired_at => list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt)), + expired_at => ExpiredAt, created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt)) }. diff --git a/apps/emqx_management/src/emqx_mgmt_api_banned.erl b/apps/emqx_management/src/emqx_mgmt_api_banned.erl index 911b298a1..c6ff56ad0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_banned.erl @@ -101,15 +101,15 @@ fields(ban) -> desc => <<"Banned type clientid, username, peerhost">>, nullable => false, example => username})}, - {who, hoconsc:mk(emqx_schema:unicode_binary(), #{ + {who, hoconsc:mk(binary(), #{ desc => <<"Client info as banned type">>, nullable => false, example => <<"Badass坏"/utf8>>})}, - {by, hoconsc:mk(emqx_schema:unicode_binary(), #{ + {by, hoconsc:mk(binary(), #{ desc => <<"Commander">>, nullable => true, example => <<"mgmt_api">>})}, - {reason, hoconsc:mk(emqx_schema:unicode_binary(), #{ + {reason, hoconsc:mk(binary(), #{ desc => <<"Banned reason">>, nullable => true, example => <<"Too many requests">>})}, diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index ae6b0820d..512ec6b0f 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -37,7 +37,7 @@ api_secret_hash = <<>> :: binary() | '_', enable = true :: boolean() | '_', desc = <<>> :: binary() | '_', - expired_at = 0 :: integer() | '_', + expired_at = 0 :: integer() | undefined | '_', created_at = 0 :: integer() | '_' }). diff --git a/apps/emqx_management/test/emqx_mgmt_auth_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_auth_api_SUITE.erl index 185ad5343..73d4ad566 100644 --- a/apps/emqx_management/test/emqx_mgmt_auth_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_auth_api_SUITE.erl @@ -23,7 +23,7 @@ all() -> [{group, parallel}, {group, sequence}]. suite() -> [{timetrap, {minutes, 1}}]. groups() -> [ - {parallel, [parallel], [t_create, t_update, t_delete, t_authorize]}, + {parallel, [parallel], [t_create, t_update, t_delete, t_authorize, t_create_unexpired_app]}, {sequence, [], [t_create_failed]} ]. @@ -137,7 +137,15 @@ t_authorize(_Config) -> }, ?assertMatch({ok, #{<<"api_key">> := _, <<"enable">> := true}}, update_app(Name, Expired)), ?assertEqual(Unauthorized, emqx_mgmt_api_test_util:request_api(get, BanPath, BasicHeader)), + ok. +t_create_unexpired_app(_Config) -> + Name1 = <<"EMQX-UNEXPIRED-API-KEY-1">>, + Name2 = <<"EMQX-UNEXPIRED-API-KEY-2">>, + {ok, Create1} = create_unexpired_app(Name1, #{}), + ?assertMatch(#{<<"expired_at">> := <<"undefined">>}, Create1), + {ok, Create2} = create_unexpired_app(Name2, #{expired_at => <<"undefined">>}), + ?assertMatch(#{<<"expired_at">> := <<"undefined">>}, Create2), ok. @@ -170,6 +178,15 @@ create_app(Name) -> Error -> Error end. +create_unexpired_app(Name, Params) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Path = emqx_mgmt_api_test_util:api_path(["api_key"]), + App = maps:merge(#{name => Name, desc => <<"Note"/utf8>>, enable => true}, Params), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, App) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + delete_app(Name) -> DeletePath = emqx_mgmt_api_test_util:api_path(["api_key", Name]), emqx_mgmt_api_test_util:request_api(delete, DeletePath). diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 8137d9e63..9199d7b2c 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -44,6 +44,7 @@ -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). +-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 api_spec() -> emqx_dashboard_swagger:spec(?MODULE). @@ -157,11 +158,11 @@ delayed_message(get, #{bindings := #{msgid := Id}}) -> case emqx_delayed:get_delayed_message(Id) of {ok, Message} -> Payload = maps:get(payload, Message), - case size(Payload) > ?MAX_PAYLOAD_LENGTH of + case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of true -> - {200, Message#{payload => ?PAYLOAD_TOO_LARGE}}; + {200, Message}; _ -> - {200, Message#{payload => Payload}} + {200, Message#{payload => base64:encode(Payload)}} end; {error, id_schema_error} -> {400, generate_http_code_map(id_schema_error, Id)}; diff --git a/apps/emqx_modules/src/emqx_rewrite_api.erl b/apps/emqx_modules/src/emqx_rewrite_api.erl index 3f92cd11f..8435385f2 100644 --- a/apps/emqx_modules/src/emqx_rewrite_api.erl +++ b/apps/emqx_modules/src/emqx_rewrite_api.erl @@ -33,7 +33,7 @@ ]). api_spec() -> - emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}). + emqx_dashboard_swagger:spec(?MODULE). paths() -> ["/mqtt/topic_rewrite"]. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 9be449b60..24dd05524 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -27,10 +27,10 @@ , on_message_publish/2 ]). --export([ dispatch/4 +-export([ dispatch/5 , delete_message/2 , store_retained/2 - , deliver/5]). + , deliver/6]). -export([ get_expiry_time/1 , update_config/1 @@ -78,7 +78,7 @@ on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefin on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) -> IsNew = maps:get(is_new, Opts, true), case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of - true -> dispatch(Context, Topic); + true -> dispatch(Context, Topic, Opts); _ -> ok end. @@ -111,26 +111,26 @@ on_message_publish(Msg, _) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec dispatch(context(), pid(), topic(), cursor()) -> ok. -dispatch(Context, Pid, Topic, Cursor) -> +-spec dispatch(context(), pid(), topic(), emqx_types:subopts(), cursor()) -> ok. +dispatch(Context, Pid, Topic, Opts, Cursor) -> Mod = get_backend_module(), case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of false -> {ok, Result} = Mod:read_message(Context, Topic), - deliver(Result, Context, Pid, Topic, undefined); + deliver(Result, Context, Pid, Topic, Opts, undefined); true -> {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), - deliver(Result, Context, Pid, Topic, NewCursor) + deliver(Result, Context, Pid, Topic, Opts, NewCursor) end. -deliver([], Context, Pid, Topic, Cursor) -> +deliver([], Context, Pid, Topic, Opts, Cursor) -> case Cursor of undefined -> ok; _ -> - dispatch(Context, Pid, Topic, Cursor) + dispatch(Context, Pid, Topic, Opts, Cursor) end; -deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> +deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) -> case erlang:is_process_alive(Pid) of false -> ok; @@ -138,12 +138,12 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]), case MaxDeliverNum of 0 -> - _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result], + _ = [Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)} || Msg <- Result], ok; _ -> - case do_deliver(Result, Id, Pid, Topic) of + case do_deliver(Result, Id, Pid, Topic, Opts) of ok -> - deliver([], Context, Pid, Topic, Cursor); + deliver([], Context, Pid, Topic, Opts, Cursor); abort -> ok end @@ -280,9 +280,9 @@ is_too_big(Size) -> Limit > 0 andalso (Size > Limit). %% @private -dispatch(Context, Topic) -> - emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4, - [Context, self(), Topic, undefined]). +dispatch(Context, Topic, Opts) -> + emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/5, + [Context, self(), Topic, Opts, undefined]). -spec delete_message(context(), topic()) -> ok. delete_message(Context, Topic) -> @@ -305,16 +305,16 @@ clean(Context) -> Mod = get_backend_module(), Mod:clean(Context). --spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort. -do_deliver([Msg | T], Id, Pid, Topic) -> +-spec do_deliver(list(term()), pos_integer(), pid(), topic(), emqx_types:subopts()) -> ok | abort. +do_deliver([Msg | T], Id, Pid, Topic, Opts) -> case require_semaphore(?DELIVER_SEMAPHORE, Id) of true -> - Pid ! {deliver, Topic, Msg}, - do_deliver(T, Id, Pid, Topic); + Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)}, + do_deliver(T, Id, Pid, Topic, Opts); _ -> abort end; -do_deliver([], _, _, _) -> +do_deliver([], _, _, _, _) -> ok. -spec require_semaphore(semaphore(), pos_integer()) -> boolean(). @@ -484,3 +484,9 @@ load(Context) -> unload() -> emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). + +handle_retain_opts(#{rap := 0}, Message) -> + emqx_message:set_header(retain, false, Message); + +handle_retain_opts(_, Message) -> + Message. diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 26d341b53..5424629d9 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -28,13 +28,14 @@ -import(emqx_mgmt_api_configs, [gen_schema/1]). -import(emqx_mgmt_util, [ object_array_schema/2 + , object_schema/2 , schema/1 , schema/2 , error_schema/2 , page_params/0 , properties/1]). --define(MAX_BASE64_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 +-define(MAX_PAYLOAD_SIZE, 1048576). %% 1MB = 1024 x 1024 api_spec() -> {[lookup_retained_api(), with_topic_api(), config_api()], []}. @@ -64,7 +65,7 @@ parameters() -> lookup_retained_api() -> Metadata = #{ get => #{ - description => <<"lookup matching messages">>, + description => <<"List retained messages">>, parameters => page_params(), responses => #{ <<"200">> => object_array_schema( @@ -80,9 +81,10 @@ with_topic_api() -> MetaData = #{ get => #{ description => <<"lookup matching messages">>, - parameters => parameters() ++ page_params(), + parameters => parameters(), responses => #{ - <<"200">> => object_array_schema(message_props(), <<"List retained messages">>), + <<"200">> => object_schema(message_props(), <<"List retained messages">>), + <<"404">> => error_schema(<<"Reatined Not Exists">>, ['NOT_FOUND']), <<"405">> => schema(<<"NotAllowed">>) } }, @@ -139,35 +141,27 @@ config(put, #{body := Body}) -> %%------------------------------------------------------------------------------ %% Interval Funcs %%------------------------------------------------------------------------------ -lookup_retained(get, Params) -> - lookup(undefined, Params, fun format_message/1). +lookup_retained(get, #{query_string := Qs}) -> + Page = maps:get(page, Qs, 1), + Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()), + {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit), + {200, [format_message(Msg) || Msg <- Msgs]}. -with_topic(get, #{bindings := Bindings} = Params) -> +with_topic(get, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), - lookup(Topic, Params, fun format_detail_message/1); + {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, 1, 1), + case Msgs of + [H | _] -> + {200, format_detail_message(H)}; + _ -> + {404, #{code => 'NOT_FOUND'}} + end; with_topic(delete, #{bindings := Bindings}) -> Topic = maps:get(topic, Bindings), emqx_retainer_mnesia:delete_message(undefined, Topic), {204}. --spec lookup(undefined | binary(), - map(), - fun((emqx_types:message()) -> map())) -> - {200, map()}. -lookup(Topic, #{query_string := Qs}, Formatter) -> - Page = maps:get(page, Qs, 1), - Limit = maps:get(page, Qs, emqx_mgmt:max_row_limit()), - {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, Topic, Page, Limit), - {200, format_message(Msgs, Formatter)}. - - -format_message(Messages, Formatter) when is_list(Messages)-> - [Formatter(Message) || Message <- Messages]; - -format_message(Message, Formatter) -> - Formatter(Message). - format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From , timestamp = Timestamp, headers = Headers}) -> #{msgid => emqx_guid:to_hexstr(ID), @@ -181,12 +175,11 @@ format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From format_detail_message(#message{payload = Payload} = Msg) -> Base = format_message(Msg), - EncodePayload = base64:encode(Payload), - case erlang:byte_size(EncodePayload) =< ?MAX_BASE64_PAYLOAD_SIZE of + case erlang:byte_size(Payload) =< ?MAX_PAYLOAD_SIZE of true -> - Base#{payload => EncodePayload}; + Base#{payload => base64:encode(Payload)}; _ -> - Base#{payload => base64:encode(<<"PAYLOAD_TOO_LARGE">>)} + Base end. to_bin_string(Data) when is_binary(Data) -> diff --git a/bin/emqx b/bin/emqx index 7862a98b8..00133ed0b 100755 --- a/bin/emqx +++ b/bin/emqx @@ -204,7 +204,10 @@ fi if ! check_erlang_start >/dev/null 2>&1; then BUILT_ON="$(head -1 "${REL_DIR}/BUILT_ON")" ## failed to start, might be due to missing libs, try to be portable - export LD_LIBRARY_PATH="$DYNLIBS_DIR:$LD_LIBRARY_PATH" + export LD_LIBRARY_PATH="${LD_LIBRARY_PATH:-$DYNLIBS_DIR}" + if [ "$LD_LIBRARY_PATH" != "$DYNLIBS_DIR" ]; then + export LD_LIBRARY_PATH="$DYNLIBS_DIR:$LD_LIBRARY_PATH" + fi if ! check_erlang_start; then ## it's hopeless echoerr "FATAL: Unable to start Erlang." @@ -428,6 +431,26 @@ wait_for() { done } +wait_until_return_val() { + local RESULT + local WAIT_TIME + local CMD + RESULT="$1" + WAIT_TIME="$2" + shift 2 + CMD="$*" + while true; do + if [ "$($CMD 2>/dev/null)" = "$RESULT" ]; then + return 0 + fi + if [ "$WAIT_TIME" -le 0 ]; then + return 1 + fi + WAIT_TIME=$((WAIT_TIME - 1)) + sleep 1 + done +} + latest_vm_args() { local hint_var_name="$1" local vm_args_file @@ -553,7 +576,8 @@ case "${COMMAND}" in "$(relx_start_command)" WAIT_TIME=${WAIT_FOR_ERLANG:-15} - if wait_for "$WAIT_TIME" 'relx_nodetool' 'ping'; then + if wait_until_return_val "true" "$WAIT_TIME" 'relx_nodetool' \ + 'eval' 'emqx:is_running()'; then echo "$EMQX_DESCRIPTION $REL_VSN is started successfully!" exit 0 else diff --git a/rebar.config b/rebar.config index 14fe8eb65..7b08d38a0 100644 --- a/rebar.config +++ b/rebar.config @@ -46,7 +46,7 @@ {deps, [ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.2"}}} , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps - , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}} + , {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.6"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.12"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} @@ -65,7 +65,7 @@ , {system_monitor, {git, "https://github.com/klarna-incubator/system_monitor", {tag, "2.2.0"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.16.0"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.1"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}