diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index 487201e8e..e857fc681 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -61,20 +61,7 @@ translate_env(EnvName) -> _ -> 80 end), Path = path(Path0), - Host = case inet:parse_address(Host0) of - {ok, {_,_,_,_} = Addr} -> Addr; - {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr; - {error, einval} -> Host0 - end, - Inet = case Host of - {_,_,_,_} -> inet; - {_,_,_,_,_,_,_,_} -> inet6; - _ -> - case inet:getaddr(Host, inet6) of - {error, _} -> inet; - {ok, _} -> inet6 - end - end, + {Inet, Host} = parse_host(Host0), MoreOpts = case Scheme of "http" -> [{transport_opts, [Inet]}]; @@ -152,6 +139,17 @@ unload_hooks() -> ehttpc_sup:stop_pool('emqx_auth_http/acl_req'), ok. +parse_host(Host) -> + case inet:parse_address(Host) of + {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr}; + {ok, Addr} when size(Addr) =:= 8 -> {inet6, Addr}; + {error, einval} -> + case inet:getaddr(Host, inet6) of + {ok, _} -> {inet6, Host}; + {error, _} -> {inet, Host} + end + end. + to_lower(Headers) -> [{string:to_lower(K), V} || {K, V} <- Headers]. diff --git a/apps/emqx_auth_redis/rebar.config b/apps/emqx_auth_redis/rebar.config index a85b22fd1..72c91aa4e 100644 --- a/apps/emqx_auth_redis/rebar.config +++ b/apps/emqx_auth_redis/rebar.config @@ -1,5 +1,5 @@ {deps, - [{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.3"}}} + [{eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.4"}}} ]}. {erl_opts, [warn_unused_vars, diff --git a/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema b/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema index 05c048d0a..72c0ff98e 100644 --- a/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema +++ b/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema @@ -100,7 +100,7 @@ {mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [ {default, "20s"}, - {datatype, {duration, ms}} + {datatype, {duration, s}} ]}. {mapping, "bridge.mqtt.$name.max_inflight", "emqx_bridge_mqtt.bridges", [ diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 7708521e8..d63f20141 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -759,7 +759,7 @@ options(Options, PoolName) -> {username, str(Get(<<"username">>))}, {password, str(Get(<<"password">>))}, {proto_ver, mqtt_ver(Get(<<"proto_ver">>))}, - {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), ms)}, + {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)}, {ssl, cuttlefish_flag:parse(str(Get(<<"ssl">>)))}, {ssl_opts, [{versions, tls_versions()}, {ciphers, ciphers(Get(<<"ciphers">>))}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 29f7d31ef..5e1374eca 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -377,9 +377,14 @@ common(_StateName, {call, From}, {ensure_present, What, Topic}, State) -> common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> {Result, NewState} = ensure_absent(What, Topic, State), {keep_state, NewState, [{reply, From, Result}]}; -common(_StateName, info, {deliver, _, Msg}, #{replayq := Q, if_record_metrics := IfRecordMetric} = State) -> - bridges_metrics_inc(IfRecordMetric, 'bridge.mqtt.message_received'), - NewQ = replayq:append(Q, collect([Msg])), +common(_StateName, info, {deliver, _, Msg}, + State = #{replayq := Q, if_record_metrics := IfRecordMetric}) -> + Msgs = collect([Msg]), + bridges_metrics_inc(IfRecordMetric, + 'bridge.mqtt.message_received', + length(Msgs) + ), + NewQ = replayq:append(Q, Msgs), {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; @@ -586,3 +591,8 @@ bridges_metrics_inc(true, Metric) -> emqx_metrics:inc(Metric); bridges_metrics_inc(_IsRecordMetric, _Metric) -> ok. + +bridges_metrics_inc(true, Metric, Value) -> + emqx_metrics:inc(Metric, Value); +bridges_metrics_inc(_IsRecordMetric, _Metric, _Value) -> + ok. diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 49ded91ff..04ed8b414 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -255,7 +255,7 @@ handle_call(close, Channel) -> handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) -> ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), - {ok, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; + {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; handle_call({auth, ClientInfo0, Password}, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> diff --git a/apps/emqx_recon/rebar.config b/apps/emqx_recon/rebar.config index adb1d4c65..1a6f20d2b 100644 --- a/apps/emqx_recon/rebar.config +++ b/apps/emqx_recon/rebar.config @@ -1,5 +1,5 @@ {deps, [ - {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.0"}}} +%% recon "https://github.com/ferd/recon" at root rebar.config ]}. {edoc_opts, [{preprocess, true}]}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 26f6ef738..4f41f9d55 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -297,8 +297,7 @@ do_update_resource(#{id := Id, type := Type, description:= NewDescription, confi type = Type, config = Config, description = NewDescription, - created_at = erlang:system_time(millisecond)}), - cluster_call(clear_resource, [Module, Destroy, Id]) + created_at = erlang:system_time(millisecond)}) end. -spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}). diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 0abd1553b..7e7a901c9 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -340,24 +340,11 @@ pool_opts(Params = #{<<"url">> := URL}) -> end), PoolSize = maps:get(<<"pool_size">>, Params, 32), ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)), - Host = case inet:parse_address(Host0) of - {ok, {_,_,_,_} = Addr} -> Addr; - {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr; - {error, einval} -> Host0 - end, - Inet = case Host of - {_,_,_,_} -> inet; - {_,_,_,_,_,_,_,_} -> inet6; - _ -> - case inet:getaddr(Host, inet6) of - {error, _} -> inet; - {ok, _} -> inet6 - end - end, + {Inet, Host} = parse_host(Host0), MoreOpts = case Scheme of - <<"http">> -> + "http" -> [{transport_opts, [Inet]}]; - <<"https">> -> + "https" -> KeyFile = maps:get(<<"keyfile">>, Params), CertFile = maps:get(<<"certfile">>, Params), CACertFile = maps:get(<<"cacertfile">>, Params), @@ -388,3 +375,14 @@ pool_opts(Params = #{<<"url">> := URL}) -> pool_name(ResId) -> list_to_atom("webhook:" ++ str(ResId)). + +parse_host(Host) -> + case inet:parse_address(Host) of + {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr}; + {ok, Addr} when size(Addr) =:= 8 -> {inet6, Addr}; + {error, einval} -> + case inet:getaddr(Host, inet6) of + {ok, _} -> {inet6, Host}; + {error, _} -> {inet, Host} + end + end. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_app.erl b/apps/emqx_web_hook/src/emqx_web_hook_app.erl index 2c0697c81..2ec8ebf42 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_app.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_app.erl @@ -58,20 +58,7 @@ translate_env() -> _ -> 80 end), Path = path(Path0), - Host = case inet:parse_address(Host0) of - {ok, {_,_,_,_} = Addr} -> Addr; - {ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr; - {error, einval} -> Host0 - end, - Inet = case Host of - {_,_,_,_} -> inet; - {_,_,_,_,_,_,_,_} -> inet6; - _ -> - case inet:getaddr(Host, inet6) of - {error, _} -> inet; - {ok, _} -> inet6 - end - end, + {Inet, Host} = parse_host(Host0), PoolSize = application:get_env(?APP, pool_size, 32), MoreOpts = case Scheme of "http" -> @@ -118,4 +105,15 @@ path(Path) -> set_content_type(Headers) -> NHeaders = proplists:delete(<<"Content-Type">>, proplists:delete(<<"content-type">>, Headers)), - [{<<"content-type">>, <<"application/json">>} | NHeaders]. \ No newline at end of file + [{<<"content-type">>, <<"application/json">>} | NHeaders]. + +parse_host(Host) -> + case inet:parse_address(Host) of + {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr}; + {ok, Addr} when size(Addr) =:= 8 -> {inet6, Addr}; + {error, einval} -> + case inet:getaddr(Host, inet6) of + {ok, _} -> {inet6, Host}; + {error, _} -> {inet, Host} + end + end. \ No newline at end of file diff --git a/rebar.config b/rebar.config index 82b95f4eb..efdbbac64 100644 --- a/rebar.config +++ b/rebar.config @@ -54,6 +54,7 @@ , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} + , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {getopt, "1.0.1"} ]}. diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index e1ab29b8b..5094eea03 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -275,14 +275,8 @@ util_alloc()-> alloc(?UTIL_ALLOCATORS). alloc(Type) -> - [{{T, Instance}, Props} || {{T, Instance}, Props} <- allocators(), lists:member(T, Type)]. - -allocators() -> - UtilAllocators = erlang:system_info(alloc_util_allocators), - Allocators = [sys_alloc, mseg_alloc|UtilAllocators], - [{{A, N}, lists:sort(proplists:delete(versions, Props))} || - A <- Allocators, Allocs <- [erlang:system_info({allocator, A})], - Allocs =/= false, {_, N, Props} <- Allocs]. + [{{T, Instance}, Props} || + {{T, Instance}, Props} <- recon_alloc:allocators(), lists:member(T, Type)]. container_size(Prop, Keyword, Container) -> Sbcs = container_value(Prop, Keyword, sbcs, Container),