Add portal transport over emqx_client.

This commit is contained in:
spring2maz 2019-02-10 14:44:43 +01:00 committed by Gilbert Wong
parent 141af0d02c
commit 6d51d78dfc
8 changed files with 249 additions and 33 deletions

View File

@ -1694,11 +1694,14 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Value: Number ## Value: Number
## bridge.aws.subscription.2.qos = 1 ## bridge.aws.subscription.2.qos = 1
## Maximum number of messages in one batch for buffer queue to store ## Maximum number of messages in one batch when sending to remote borkers
## NOTE: when bridging viar MQTT connection to remote broker, this config is only
## used for internal message passing optimization as the underlying MQTT
## protocol does not supports batching.
## ##
## Value: Integer ## Value: Integer
## default: 1000 ## default: 32
## bridge.aws.queue.batch_size = 1000 ## bridge.aws.queue.batch_size = 32
## Base directory for replayq to store messages on disk ## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined, ## If this config entry is missing or set to undefined,
@ -1844,11 +1847,14 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Value: Number ## Value: Number
## bridge.azure.subscription.2.qos = 1 ## bridge.azure.subscription.2.qos = 1
## Batch size for buffer queue stored ## Maximum number of messages in one batch when sending to remote borkers
## NOTE: when bridging viar MQTT connection to remote broker, this config is only
## used for internal message passing optimization as the underlying MQTT
## protocol does not supports batching.
## ##
## Value: Integer ## Value: Integer
## default: 1000 ## default: 32
## bridge.azure.queue.batch_size = 1000 ## bridge.azure.queue.batch_size = 32
## Base directory for replayq to store messages on disk ## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined, ## If this config entry is missing or set to undefined,

View File

@ -20,11 +20,12 @@
-optional_callbacks([]). -optional_callbacks([]).
%% map fields depend on implementation
-type config() :: map(). -type config() :: map().
-type connection() :: term(). -type connection() :: term().
-type conn_ref() :: term(). -type conn_ref() :: term().
-type batch() :: emqx_protal:batch(). -type batch() :: emqx_protal:batch().
-type batch_ref() :: reference(). -type ack_ref() :: emqx_portal:ack_ref().
-include("logger.hrl"). -include("logger.hrl").
@ -36,7 +37,7 @@
%% send to remote node/cluster %% send to remote node/cluster
%% portal worker (the caller process) should be expecting %% portal worker (the caller process) should be expecting
%% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster %% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster
-callback send(connection(), batch()) -> {ok, batch_ref()} | {error, any()}. -callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}.
%% called when owner is shutting down. %% called when owner is shutting down.
-callback stop(conn_ref(), connection()) -> ok. -callback stop(conn_ref(), connection()) -> ok.

View File

@ -52,6 +52,9 @@
%% to support automatic load-balancing, i.e. in case it can not keep up %% to support automatic load-balancing, i.e. in case it can not keep up
%% with the amount of messages comming in, administrator should split and %% with the amount of messages comming in, administrator should split and
%% balance topics between worker/connections manually. %% balance topics between worker/connections manually.
%%
%% NOTES:
%% * Local messages are all normalised to QoS-1 when exporting to remote
-module(emqx_portal). -module(emqx_portal).
-behaviour(gen_statem). -behaviour(gen_statem).
@ -71,17 +74,18 @@
-export_type([config/0, -export_type([config/0,
batch/0, batch/0,
ref/0 ack_ref/0
]). ]).
-type config() :: map(). -type config() :: map().
-type batch() :: [emqx_portal_msg:msg()]. -type batch() :: [emqx_portal_msg:msg()].
-type ref() :: reference(). -type ack_ref() :: term().
-include("logger.hrl"). -include("logger.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-define(DEFAULT_BATCH_COUNT, 100). %% same as default in-flight limit for emqx_client
-define(DEFAULT_BATCH_COUNT, 32).
-define(DEFAULT_BATCH_BYTES, 1 bsl 20). -define(DEFAULT_BATCH_BYTES, 1 bsl 20).
-define(DEFAULT_SEND_AHEAD, 8). -define(DEFAULT_SEND_AHEAD, 8).
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
@ -110,7 +114,7 @@
start_link(Name, Config) when is_list(Config) -> start_link(Name, Config) when is_list(Config) ->
start_link(Name, maps:from_list(Config)); start_link(Name, maps:from_list(Config));
start_link(Name, Config) -> start_link(Name, Config) ->
gen_statem:start_link({local, Name}, ?MODULE, Config, []). gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []).
stop(Pid) -> gen_statem:stop(Pid). stop(Pid) -> gen_statem:stop(Pid).
@ -122,7 +126,7 @@ import_batch(Batch, AckFun) ->
%% @doc This function is to be evaluated on message/batch exporter side %% @doc This function is to be evaluated on message/batch exporter side
%% when message/batch is accepted by remote node. %% when message/batch is accepted by remote node.
-spec handle_ack(pid(), ref()) -> ok. -spec handle_ack(pid(), ack_ref()) -> ok.
handle_ack(Pid, Ref) when node() =:= node(Pid) -> handle_ack(Pid, Ref) when node() =:= node(Pid) ->
Pid ! {batch_ack, Ref}, Pid ! {batch_ack, Ref},
ok. ok.
@ -231,7 +235,8 @@ connected(internal, maybe_send, State) ->
end; end;
connected(info, {disconnected, ConnRef, Reason}, connected(info, {disconnected, ConnRef, Reason},
#{conn_ref := ConnRef, connection := Conn} = State) -> #{conn_ref := ConnRef, connection := Conn} = State) ->
?INFO("Portal ~p diconnected~nreason=~p", [Conn, Reason]), ?INFO("Portal ~p diconnected~nreason=~p",
[name(), Conn, Reason]),
{next_state, connecting, {next_state, connecting,
State#{conn_ref := undefined, State#{conn_ref := undefined,
connection := undefined connection := undefined
@ -255,7 +260,8 @@ common(_StateName, info, {dispatch, _, Msg},
NewQ = replayq:append(Q, collect([Msg])), NewQ = replayq:append(Q, collect([Msg])),
{keep_state, State#{replayq => NewQ}, ?maybe_send}; {keep_state, State#{replayq => NewQ}, ?maybe_send};
common(StateName, Type, Content, State) -> common(StateName, Type, Content, State) ->
?INFO("Ignored unknown ~p event ~p at state ~p", [Type, Content, StateName]), ?DEBUG("Portal ~p discarded ~p type event at state ~p:~p",
[name(), Type, StateName, Content]),
{keep_state, State}. {keep_state, State}.
collect(Acc) -> collect(Acc) ->
@ -300,6 +306,7 @@ pop_and_send(#{replayq := Q,
do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) -> do_send(State = #{inflight := Inflight}, QAckRef, [_ | _] = Batch) ->
case maybe_send(State, Batch) of case maybe_send(State, Batch) of
{ok, Ref} -> {ok, Ref} ->
%% this is a list of inflight BATCHes, not expecting it to be too long
NewInflight = Inflight ++ [#{q_ack_ref => QAckRef, NewInflight = Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => Ref, send_ack_ref => Ref,
batch => Batch batch => Batch
@ -326,8 +333,6 @@ subscribe_local_topics(Topics) ->
emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()}) emqx_broker:subscribe(Topic, #{qos => ?QOS_1, subid => name()})
end, Topics). end, Topics).
name() -> {_, Name} = process_info(self(), registered_name), Name.
disconnect(#{connection := Conn, disconnect(#{connection := Conn,
conn_ref := ConnRef, conn_ref := ConnRef,
connect_module := Module connect_module := Module
@ -347,10 +352,14 @@ maybe_send(#{connect_module := Module,
connection := Connection, connection := Connection,
mountpoint := Mountpoint mountpoint := Mountpoint
}, Batch) -> }, Batch) ->
Module:send(Connection, [emqx_portal_msg:apply_mountpoint(M, Mountpoint) || M <- Batch]). Module:send(Connection, [emqx_portal_msg:to_export(M, Mountpoint) || M <- Batch]).
format_mountpoint(undefined) -> format_mountpoint(undefined) ->
undefined; undefined;
format_mountpoint(Prefix) -> format_mountpoint(Prefix) ->
binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
name() -> {_, Name} = process_info(self(), registered_name), Name.
name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).

View File

@ -0,0 +1,135 @@
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc This module implements EMQX Portal transport layer on top of MQTT protocol
-module(emqx_portal_mqtt).
-behaviour(emqx_portal_connect).
%% behaviour callbacks
-export([start/1,
send/2,
stop/2
]).
-include("emqx_mqtt.hrl").
-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}).
%% Messages towards ack collector process
-define(SENT(MaxPktId), {sent, MaxPktId}).
-define(ACKED(AnyPktId), {acked, AnyPktId}).
-define(STOP(Ref), {stop, Ref}).
start(Config) ->
Ref = make_ref(),
Parent = self(),
AckCollector = spawn_link(fun() -> ack_collector(Parent, Ref) end),
Handlers = make_hdlr(Parent, AckCollector, Ref),
case emqx_client:start_link(Config#{msg_handler => Handlers, owner => AckCollector}) of
{ok, Pid} ->
case emqx_client:connect(Pid) of
{ok, _} ->
%% ack collector is always a new pid every reconnect.
%% use it as a connection reference
{ok, Ref, #{ack_collector => AckCollector,
client_pid => Pid}};
{error, Reason} ->
ok = stop(AckCollector, Pid),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
stop(Ref, #{ack_collector := AckCollector,
client_pid := Pid}) ->
MRef = monitor(process, AckCollector),
unlink(AckCollector),
_ = AckCollector ! ?STOP(Ref),
receive
{'DOWN', MRef, _, _, _} ->
ok
after
1000 ->
exit(AckCollector, kill)
end,
_ = emqx_client:stop(Pid),
ok.
send(#{client_pid := ClientPid, ack_collector := AckCollector}, Batch) ->
send_loop(ClientPid, AckCollector, Batch).
send_loop(ClientPid, AckCollector, [Msg | Rest]) ->
case emqx_client:publish(ClientPid, Msg) of
{ok, PktId} when Rest =:= [] ->
Rest =:= [] andalso AckCollector ! ?SENT(PktId),
{ok, PktId};
{ok, _PktId} ->
send_loop(ClientPid, AckCollector, Rest);
{error, {_PacketId, inflight_full}} ->
timer:sleep(100),
send_loop(ClientPid, AckCollector, [Msg | Rest]);
{error, Reason} ->
%% There is no partial sucess of a batch and recover from the middle
%% only to retry all messages in one batch
{error, Reason}
end.
ack_collector(Parent, ConnRef) ->
ack_collector(Parent, ConnRef, []).
ack_collector(Parent, ConnRef, PktIds) ->
NewIds =
receive
?STOP(ConnRef) ->
exit(normal);
?SENT(PktId) ->
%% this ++ only happens per-BATCH, hence no optimization
PktIds ++ [PktId];
?ACKED(PktId) ->
handle_ack(Parent, PktId, PktIds)
after
200 ->
PktIds
end,
ack_collector(Parent, ConnRef, NewIds).
handle_ack(Parent, PktId, [PktId | Rest]) ->
%% A batch is finished, time to ack portal
ok = emqx_portal:handle_ack(Parent, PktId),
Rest;
handle_ack(_Parent, PktId, [BatchMaxPktId | _] = All) ->
%% partial ack of a batch, terminate here.
true = (PktId < BatchMaxPktId), %% bad order otherwise
All.
%% When puback for QoS-1 message is received from remote MQTT broker
%% NOTE: no support for QoS-2
handle_puback(AckCollector, #{packet_id := PktId, reason_code := RC}) ->
RC =:= ?RC_SUCCESS andalso error(RC),
AckCollector ! ?ACKED(PktId),
ok.
%% Message published from remote broker. Import to local broker.
import_msg(Msg) ->
%% auto-ack should be enabled in emqx_client, hence dummy ack-fun.
emqx_portal:import_batch([Msg], _AckFun = fun() -> ok end).
make_hdlr(Parent, AckCollector, Ref) ->
#{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end,
publish => fun(Msg) -> import_msg(Msg) end,
disconnected => fun(RC, _Properties) -> Parent ! {disconnected, Ref, RC}, ok end
}.

View File

@ -16,7 +16,7 @@
-export([ to_binary/1 -export([ to_binary/1
, from_binary/1 , from_binary/1
, apply_mountpoint/2 , to_export/2
, to_broker_msgs/1 , to_broker_msgs/1
, estimate_size/1 , estimate_size/1
]). ]).
@ -28,10 +28,12 @@
-type msg() :: emqx_types:message(). -type msg() :: emqx_types:message().
%% @doc Mount topic to a prefix. %% @doc Make export format:
-spec apply_mountpoint(msg(), undefined | binary()) -> msg(). %% 1. Mount topic to a prefix
apply_mountpoint(#{topic := Topic} = Msg, Mountpoint) -> %% 2. fix QoS to 1
Msg#{topic := topic(Mountpoint, Topic)}. -spec to_export(msg(), undefined | binary()) -> msg().
to_export(#{topic := Topic} = Msg, Mountpoint) ->
Msg#{topic := topic(Mountpoint, Topic), qos => 1}.
%% @doc Make `binary()' in order to make iodata to be persisted on disk. %% @doc Make `binary()' in order to make iodata to be persisted on disk.
-spec to_binary(msg()) -> binary(). -spec to_binary(msg()) -> binary().

View File

@ -29,7 +29,7 @@
, heartbeat/2 , heartbeat/2
]). ]).
-type batch_ref() :: emqx_portal:batch_ref(). -type ack_ref() :: emqx_portal:ack_ref().
-type batch() :: emqx_portal:batch(). -type batch() :: emqx_portal:batch().
-define(HEARTBEAT_INTERVAL, timer:seconds(1)). -define(HEARTBEAT_INTERVAL, timer:seconds(1)).
@ -59,7 +59,7 @@ stop(Pid, _Remote) when is_pid(Pid) ->
ok. ok.
%% @doc Callback for `emqx_portal_connect' behaviour %% @doc Callback for `emqx_portal_connect' behaviour
-spec send(node(), batch()) -> {ok, batch_ref()} | {error, any()}. -spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}.
send(Remote, Batch) -> send(Remote, Batch) ->
Sender = self(), Sender = self(),
case ?RPC:call(Remote, ?MODULE, handle_send, [Sender, Batch]) of case ?RPC:call(Remote, ?MODULE, handle_send, [Sender, Batch]) of
@ -68,7 +68,7 @@ send(Remote, Batch) ->
end. end.
%% @doc Handle send on receiver side. %% @doc Handle send on receiver side.
-spec handle_send(pid(), batch()) -> {ok, batch_ref()} | {error, any()}. -spec handle_send(pid(), batch()) -> {ok, ack_ref()} | {error, any()}.
handle_send(SenderPid, Batch) -> handle_send(SenderPid, Batch) ->
SenderNode = node(SenderPid), SenderNode = node(SenderPid),
Ref = make_ref(), Ref = make_ref(),

View File

@ -0,0 +1,62 @@
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_portal_mqtt_tests).
-include_lib("eunit/include/eunit.hrl").
send_and_ack_test() ->
%% delegate from gen_rpc to rpc for unit test
Tester = self(),
meck:new(emqx_client, [passthrough, no_history]),
meck:expect(emqx_client, start_link, 1,
fun(#{msg_handler := Hdlr}) -> {ok, Hdlr} end),
meck:expect(emqx_client, connect, 1, {ok, dummy}),
meck:expect(emqx_client, stop, 1, ok),
meck:expect(emqx_client, publish, 2,
fun(_Conn, Msg) ->
case rand:uniform(100) of
1 ->
{error, {dummy, inflight_full}};
_ ->
Tester ! {published, Msg},
{ok, Msg}
end
end),
try
Max = 100,
Batch = lists:seq(1, Max),
{ok, Ref, Conn} = emqx_portal_mqtt:start(#{}),
%% return last packet id as batch reference
{ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch),
%% expect batch ack
{ok, LastId} = collect_acks(Conn, Batch),
%% asset received ack matches the batch ref returned in send API
?assertEqual(AckRef, LastId),
ok = emqx_portal_mqtt:stop(Ref, Conn)
after
meck:unload(emqx_client)
end.
collect_acks(_Conn, []) ->
receive {batch_ack, Id} -> {ok, Id} end;
collect_acks(#{client_pid := Client} = Conn, [Id | Rest]) ->
%% mocked for testing, should be a pid() at runtime
#{puback := PubAckCallback} = Client,
receive
{published, Id} ->
PubAckCallback(#{packet_id => Id, reason_code => dummy}),
collect_acks(Conn, Rest)
end.

View File

@ -18,7 +18,8 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-define(PORTAL_NAME, test_portal). -define(PORTAL_NAME, test).
-define(PORTAL_REG_NAME, emqx_portal_test).
-define(WAIT(PATTERN, TIMEOUT), -define(WAIT(PATTERN, TIMEOUT),
receive receive
PATTERN -> PATTERN ->
@ -49,11 +50,11 @@ reconnect_test() ->
Config = make_config(Ref, self(), {error, test}), Config = make_config(Ref, self(), {error, test}),
{ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config),
%% assert name registered %% assert name registered
?assertEqual(Pid, whereis(?PORTAL_NAME)), ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
?WAIT({connection_start_attempt, Ref}, 1000), ?WAIT({connection_start_attempt, Ref}, 1000),
%% expect same message again %% expect same message again
?WAIT({connection_start_attempt, Ref}, 1000), ?WAIT({connection_start_attempt, Ref}, 1000),
ok = emqx_portal:stop(?PORTAL_NAME), ok = emqx_portal:stop(?PORTAL_REG_NAME),
ok. ok.
%% connect first, disconnect, then connect again %% connect first, disconnect, then connect again
@ -61,11 +62,11 @@ disturbance_test() ->
Ref = make_ref(), Ref = make_ref(),
Config = make_config(Ref, self(), {ok, Ref, connection}), Config = make_config(Ref, self(), {ok, Ref, connection}),
{ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config),
?assertEqual(Pid, whereis(?PORTAL_NAME)), ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
?WAIT({connection_start_attempt, Ref}, 1000), ?WAIT({connection_start_attempt, Ref}, 1000),
Pid ! {disconnected, Ref, test}, Pid ! {disconnected, Ref, test},
?WAIT({connection_start_attempt, Ref}, 1000), ?WAIT({connection_start_attempt, Ref}, 1000),
ok = emqx_portal:stop(?PORTAL_NAME). ok = emqx_portal:stop(?PORTAL_REG_NAME).
%% buffer should continue taking in messages when disconnected %% buffer should continue taking in messages when disconnected
buffer_when_disconnected_test_() -> buffer_when_disconnected_test_() ->
@ -88,11 +89,11 @@ test_buffer_when_disconnected() ->
{ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config), {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config),
Sender ! {portal, Pid}, Sender ! {portal, Pid},
Receiver ! {portal, Pid}, Receiver ! {portal, Pid},
?assertEqual(Pid, whereis(?PORTAL_NAME)), ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
Pid ! {disconnected, Ref, test}, Pid ! {disconnected, Ref, test},
?WAIT({'DOWN', SenderMref, process, Sender, normal}, 2000), ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 2000),
?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000), ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000),
ok = emqx_portal:stop(?PORTAL_NAME). ok = emqx_portal:stop(?PORTAL_REG_NAME).
%% Feed messages to portal %% Feed messages to portal
sender_loop(_Pid, [], _) -> exit(normal); sender_loop(_Pid, [], _) -> exit(normal);