From 997958aed17c56cebe0a8293d8da178f221d610b Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 9 Nov 2018 08:51:03 +0800 Subject: [PATCH] Change the start_link API for emqx_client Prior to this change, emqx_client:start_link does 2 works in one call: - init an erlang process for emqx_client - send MQTT CONNECT to remote broker But this solution have some drawbacks: - the return value of `start_link` compiles the return values of the 2 works: {ok, Pid, MqttResult}. It is inconsistent with the return value of `gen_statem:start_link`, may causes confusions. - the return mode of the 2 works are different: `start_link` should always return {ok, Pid} or {error, Reason}, but connecting to mqtt may throw out exceptions as it handles the socket. But the caller couldn't have thought of the exception, he would pattern match on the result of `emqx_client:start_link`, but it crashed! - If the init work succeed but the connection failed, the caller couldn't get a Pid from the return value, but indeed it was created inside the emqx_client. This hides the fact that the Pid was created, and when the Pid dies, the caller would receive an message from a Pid it doesn' know about. This change divived these 2 work into 2 APIs: - `start_link/1` is to build and verify the options, and returns {ok,Pid} (on success) or {error, Reason} (on failure). - `connect/1` is to send MQTT CONNECT, and returns {ok, MQTTResult::properties()} or {error, MQTTReason}. MQTT reason codes will contains in the `MQTTReason`. --- etc/emqx.conf | 20 ++++++--- priv/emqx.schema | 29 +++++++------ src/emqx_bridge.erl | 44 ++++++++++++-------- src/emqx_client.erl | 19 +++------ test/emqx_client_SUITE.erl | 83 ++++++++++++++++++++++++++------------ 5 files changed, 121 insertions(+), 74 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 788c11c3f..1e04839e0 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1612,7 +1612,11 @@ bridge.aws.client_id = bridge_aws ## The Clean start flag of a remote bridge. ## ## Value: boolean -bridge.aws.clean_start = false +## Default: true +## +## NOTE: Some IoT platforms require clean_start +## must be set to 'true' +## bridge.aws.clean_start = true ## The username for a remote bridge. ## @@ -1682,12 +1686,12 @@ bridge.aws.ssl = off ## Value: File ## bridge.aws.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem -## SSL Certfile of the bridge. +## Client SSL Certfile of the bridge. ## ## Value: File ## bridge.aws.certfile = {{ platform_etc_dir }}/certs/client-cert.pem -## SSL Keyfile of the bridge. +## Client SSL Keyfile of the bridge. ## ## Value: File ## bridge.aws.keyfile = {{ platform_etc_dir }}/certs/client-key.pem @@ -1745,7 +1749,11 @@ bridge.aws.ssl = off ## The Clean start flag of a remote bridge. ## ## Value: boolean -## bridge.azure.clean_start = false +## Default: true +## +## NOTE: Some IoT platforms require clean_start +## must be set to 'true' +## bridge.azure.clean_start = true ## The username for a remote bridge. ## @@ -1811,12 +1819,12 @@ bridge.aws.ssl = off ## Value: File ## bridge.azure.cacertfile = cacert.pem -## SSL Certfile of the bridge. +## Client SSL Certfile of the bridge. ## ## Value: File ## bridge.azure.certfile = cert.pem -## SSL Keyfile of the bridge. +## Client SSL Keyfile of the bridge. ## ## Value: File ## bridge.azure.keyfile = key.pem diff --git a/priv/emqx.schema b/priv/emqx.schema index fc89586ba..e9667b114 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1549,7 +1549,8 @@ end}. ]}. {mapping, "bridge.$name.forwards", "emqx.bridges", [ - {datatype, string} + {datatype, string}, + {default, ""} ]}. {mapping, "bridge.$name.ssl", "emqx.bridges", [ @@ -1624,22 +1625,24 @@ end}. {ciphers, Split(Ciphers)}; (Opt, Val) -> {Opt, Val} - end, + end, - Merge = fun(Opt, Val, Opts) -> - case IsSsl(Opt) of - true -> - SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])], - lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); - false -> - [{Opt, Val}|Opts] - end + Merge = fun(forwards, Val, Opts) -> + [{forwards, string:tokens(Val, ",")}|Opts]; + (Opt, Val, Opts) -> + case IsSsl(Opt) of + true -> + SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])], + lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); + false -> + [{Opt, Val}|Opts] + end end, Subscriptions = fun(Name) -> - Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf), - lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], - [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])]) + Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf), + lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], + [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])]) end, maps:to_list( diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index d335d8655..b8f0dff56 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -188,20 +188,23 @@ handle_cast(Msg, State) -> %% start message bridge %%---------------------------------------------------------------- handle_info(start, State = #state{options = Options, - client_pid = undefined, - reconnect_interval = ReconnectInterval}) -> + client_pid = undefined}) -> case emqx_client:start_link([{owner, self()}|options(Options)]) of - {ok, ClientPid, _} -> - Subs = [{i2b(Topic), Qos} || {Topic, Qos} <- get_value(subscriptions, Options, []), - emqx_topic:validate({filter, i2b(Topic)})], - Forwards = [i2b(Topic) || Topic <- string:tokens(get_value(forwards, Options, ""), ","), - emqx_topic:validate({filter, i2b(Topic)})], - [emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs], - [emqx_broker:subscribe(Topic) || Topic <- Forwards], - {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}}; + {ok, ClientPid} -> + case emqx_client:connect(ClientPid) of + {ok, _} -> + emqx_logger:info("[Bridge] connected to remote sucessfully"), + Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), + Forwards = subscribe_local_topics(get_value(forwards, Options, [])), + {noreply, State#state{client_pid = ClientPid, + subscriptions = Subs, + forwards = Forwards}}; + {error, Reason} -> + emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), + {noreply, State#state{client_pid = ClientPid}} + end; {error, Reason} -> - logger:error("[Bridge] start failed! error: ~p", [Reason]), - erlang:send_after(ReconnectInterval, self(), start), + emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]), {noreply, State} end; @@ -219,7 +222,7 @@ handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{r {ok, PkgId} -> {noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; {error, Reason} -> - emqx_logger:error("Publish fail:~p", [Reason]), + emqx_logger:error("[Bridge] Publish fail:~p", [Reason]), {noreply, State} end; @@ -241,11 +244,12 @@ handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueu {noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}}; handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> + emqx_logger:warning("[Bridge] stop ~p", [normal]), {noreply, State#state{client_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, reconnect_interval = ReconnectInterval}) -> - lager:warning("emqx bridge stop reason:~p", [Reason]), + emqx_logger:error("[Bridge] stop ~p", [Reason]), erlang:send_after(ReconnectInterval, self(), start), {noreply, State#state{client_pid = undefined}}; @@ -259,6 +263,14 @@ terminate(_Reason, #state{}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +subscribe_remote_topics(ClientPid, Subscriptions) -> + [begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end + || {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})]. + +subscribe_local_topics(Topics) -> + [begin emqx_broker:subscribe(bin(Topic)), bin(Topic) end + || Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})]. + proto_ver(mqttv3) -> v3; proto_ver(mqttv4) -> v4; proto_ver(mqttv5) -> v5. @@ -296,7 +308,7 @@ options([_Option | Options], Acc) -> name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). -i2b(L) -> iolist_to_binary(L). +bin(L) -> iolist_to_binary(L). mountpoint(undefined, Topic) -> Topic; @@ -306,7 +318,7 @@ mountpoint(Prefix, Topic) -> format_mountpoint(undefined) -> undefined; format_mountpoint(Prefix) -> - binary:replace(i2b(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). + binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg -> [Data | Queue]; diff --git a/src/emqx_client.erl b/src/emqx_client.erl index e4077e4d9..2d9cb0a80 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -21,6 +21,7 @@ -export([start_link/0, start_link/1]). -export([request/5, request/6, request_async/7, receive_response/3]). -export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]). +-export([connect/1]). -export([subscribe/2, subscribe/3, subscribe/4]). -export([publish/2, publish/3, publish/4, publish/5]). -export([unsubscribe/2, unsubscribe/3]). @@ -200,18 +201,11 @@ start_link(Options) when is_map(Options) -> start_link(Options) when is_list(Options) -> ok = emqx_mqtt_props:validate( proplists:get_value(properties, Options, #{})), - case start_client(with_owner(Options)) of - {ok, Client} -> - connect(Client); - Error -> Error - end. - -start_client(Options) -> case proplists:get_value(name, Options) of undefined -> - gen_statem:start_link(?MODULE, [Options], []); + gen_statem:start_link(?MODULE, [with_owner(Options)], []); Name when is_atom(Name) -> - gen_statem:start_link({local, Name}, ?MODULE, [Options], []) + gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], []) end. with_owner(Options) -> @@ -220,8 +214,7 @@ with_owner(Options) -> undefined -> [{owner, self()} | Options] end. -%% @private --spec(connect(client()) -> {ok, client(), properties()} | {error, term()}). +-spec(connect(client()) -> {ok, properties()} | {error, term()}). connect(Client) -> gen_statem:call(Client, connect, infinity). @@ -692,7 +685,7 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, undefined -> AllProps; _ -> maps:merge(AllProps, Properties) end, - Reply = {ok, self(), Properties}, + Reply = {ok, Properties}, State2 = State1#state{client_id = assign_id(ClientId, AllProps1), properties = AllProps1, session_present = SessPresent}, @@ -1004,7 +997,7 @@ handle_event(info, {Error, _Sock, Reason}, _StateName, State) handle_event(info, {Closed, _Sock}, _StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> - {stop, Closed, State}; + {stop, {shutdown, Closed}, State}; handle_event(info, {'EXIT', Owner, Reason}, _, #state{owner = Owner}) -> {stop, Reason}; diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 2ad3dbaf7..c7b3455ad 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -59,20 +59,26 @@ end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). request_response_exception(QoS) -> - {ok, Client, _} = emqx_client:start_link([{proto_ver, v5}, - {properties, #{ 'Request-Response-Information' => 0 }}]), + {ok, Client} = emqx_client:start_link([{proto_ver, v5}, + {properties, #{ 'Request-Response-Information' => 0 }}]), + {ok, _} = emqx_client:connect(Client), ?assertError(no_response_information, emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)), ok = emqx_client:disconnect(Client). request_response_per_qos(QoS) -> - {ok, Requester, _} = emqx_client:start_link([{proto_ver, v5}, - {client_id, <<"requester">>}, - {properties, #{ 'Request-Response-Information' => 1}}]), - {ok, Responser, _} = emqx_client:start_link([{proto_ver, v5}, - {client_id, <<"responser">>}, - {properties, #{ 'Request-Response-Information' => 1}}, - {request_handler, fun(_Req) -> <<"ResponseTest">> end}]), + {ok, Requester} = emqx_client:start_link([{proto_ver, v5}, + {client_id, <<"requester">>}, + {properties, #{ 'Request-Response-Information' => 1}}]), + {ok, _} = emqx_client:connect(Requester), + {ok, Responser} = emqx_client:start_link([{proto_ver, v5}, + {client_id, <<"responser">>}, + {properties, #{ + 'Request-Response-Information' => 1}}, + {request_handler, + fun(_Req) -> <<"ResponseTest">> end} + ]), + {ok, _} = emqx_client:connect(Responser), ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>), {ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS), ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) -> @@ -108,9 +114,15 @@ share_sub_request_topic_per_qos(QoS) -> {client_id, atom_to_binary(ClientId, utf8)}, {properties, Properties} ] end, - {ok, Requester, _} = emqx_client:start_link(Opts(requester)), - {ok, Responser1, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]), - {ok, Responser2, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]), + {ok, Requester} = emqx_client:start_link(Opts(requester)), + {ok, _} = emqx_client:connect(Requester), + + {ok, Responser1} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]), + {ok, _} = emqx_client:connect(Responser1), + + {ok, Responser2} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]), + {ok, _} = emqx_client:connect(Responser2), + ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group), ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group), %% Send a request, wait for response, validate response then return responser ID @@ -148,7 +160,9 @@ receive_messages(Count, Msgs) -> basic_test(_Config) -> Topic = nth(1, ?TOPICS), ct:print("Basic test starting"), - {ok, C, _} = emqx_client:start_link(), + {ok, C} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(C), + {ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), @@ -157,11 +171,15 @@ basic_test(_Config) -> ok = emqx_client:disconnect(C). will_message_test(_Config) -> - {ok, C1, _} = emqx_client:start_link([{clean_start, true}, + {ok, C1} = emqx_client:start_link([{clean_start, true}, {will_topic, nth(3, ?TOPICS)}, {will_payload, <<"client disconnected">>}, {keepalive, 2}]), - {ok, C2, _} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(C1), + + {ok, C2} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(C2), + {ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2), timer:sleep(10), ok = emqx_client:stop(C1), @@ -171,26 +189,33 @@ will_message_test(_Config) -> ct:print("Will message test succeeded"). offline_message_queueing_test(_) -> - {ok, C1, _} = emqx_client:start_link([{clean_start, false}, - {client_id, <<"c1">>}]), + {ok, C1} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c1">>}]), + {ok, _} = emqx_client:connect(C1), + {ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2), ok = emqx_client:disconnect(C1), - {ok, C2, _} = emqx_client:start_link([{clean_start, true}, - {client_id, <<"c2">>}]), + {ok, C2} = emqx_client:start_link([{clean_start, true}, + {client_id, <<"c2">>}]), + {ok, _} = emqx_client:connect(C2), ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1), {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), timer:sleep(10), emqx_client:disconnect(C2), - {ok, C3, _} = emqx_client:start_link([{clean_start, false}, + {ok, C3} = emqx_client:start_link([{clean_start, false}, {client_id, <<"c1">>}]), + {ok, _} = emqx_client:connect(C3), + timer:sleep(10), emqx_client:disconnect(C3), ?assertEqual(3, length(receive_messages(3))). overlapping_subscriptions_test(_) -> - {ok, C, _} = emqx_client:start_link([]), + {ok, C} = emqx_client:start_link([]), + {ok, _} = emqx_client:connect(C), + {ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2}, {nth(1, ?WILD_TOPICS), 1}]), timer:sleep(10), @@ -228,8 +253,10 @@ overlapping_subscriptions_test(_) -> redelivery_on_reconnect_test(_) -> ct:print("Redelivery on reconnect test starting"), - {ok, C1, _} = emqx_client:start_link([{clean_start, false}, - {client_id, <<"c">>}]), + {ok, C1} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c">>}]), + {ok, _} = emqx_client:connect(C1), + {ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2), timer:sleep(10), ok = emqx_client:pause(C1), @@ -240,8 +267,10 @@ redelivery_on_reconnect_test(_) -> timer:sleep(10), ok = emqx_client:disconnect(C1), ?assertEqual(0, length(receive_messages(2))), - {ok, C2, _} = emqx_client:start_link([{clean_start, false}, + {ok, C2} = emqx_client:start_link([{clean_start, false}, {client_id, <<"c">>}]), + {ok, _} = emqx_client:connect(C2), + timer:sleep(10), ok = emqx_client:disconnect(C2), ?assertEqual(2, length(receive_messages(2))). @@ -255,8 +284,10 @@ redelivery_on_reconnect_test(_) -> dollar_topics_test(_) -> ct:print("$ topics test starting"), - {ok, C, _} = emqx_client:start_link([{clean_start, true}, - {keepalive, 0}]), + {ok, C} = emqx_client:start_link([{clean_start, true}, + {keepalive, 0}]), + {ok, _} = emqx_client:connect(C), + {ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1), {ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>, <<"test">>, [{qos, 1}, {retain, false}]),