diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl index 187ca1c64..3fce41ec3 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl @@ -67,11 +67,17 @@ on_start( {tcp_options, [{mode, binary}, {reuseaddr, true}, {nodelay, true}]} ], MFArgs = {?MODULE, start_link, [maps:with([handshake_timeout], Config)]}, - ok = emqx_resource:allocate_resource(InstanceId, listen_on, ListenOn), + %% Since the esockd only supports atomic name and we don't want to introduce a new atom per each instance + %% when the port is same for two instance/connector, them will reference to a same esockd listener + %% to prevent the failed one dealloctes the listener which created by a earlier instance + %% we need record only when the listen is successed case esockd:open(?MODULE, ListenOn, Options, MFArgs) of {ok, _} -> + ok = emqx_resource:allocate_resource(InstanceId, listen_on, ListenOn), {ok, #{listen_on => ListenOn}}; + {error, {already_started, _}} -> + {error, eaddrinuse}; Error -> Error end. @@ -83,7 +89,12 @@ on_stop(InstanceId, _State) -> }), case emqx_resource:get_allocated_resources(InstanceId) of #{listen_on := ListenOn} -> - esockd:close(?MODULE, ListenOn); + case esockd:close(?MODULE, ListenOn) of + {error, not_found} -> + ok; + Result -> + Result + end; _ -> ok end. diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index f85109080..74c167b32 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -107,7 +107,7 @@ parse_connector_id(ConnectorId) -> {atom(), atom() | binary()}. parse_connector_id(<<"connector:", ConnectorId/binary>>, Opts) -> parse_connector_id(ConnectorId, Opts); -parse_connector_id(<>, Opts) -> +parse_connector_id(<>, Opts) -> parse_connector_id(ConnectorId, Opts); parse_connector_id(ConnectorId, Opts) -> emqx_resource:parse_resource_id(ConnectorId, Opts). @@ -229,7 +229,10 @@ create_dry_run(Type, Conf0, Callback) -> TypeBin = bin(Type), TypeAtom = safe_atom(Type), %% We use a fixed name here to avoid creating an atom - TmpName = iolist_to_binary([?TEST_ID_PREFIX, TypeBin, ":", <<"probedryrun">>]), + %% to avoid potential race condition, the resource id should be unique + Prefix = emqx_resource_manager:make_test_id(), + TmpName = + iolist_to_binary([Prefix, TypeBin, ":", <<"probedryrun">>]), TmpPath = emqx_utils:safe_filename(TmpName), Conf1 = maps:without([<<"name">>], Conf0), RawConf = #{<<"connectors">> => #{TypeBin => #{<<"temp_name">> => Conf1}}}, diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 4fd566f26..b16520d3d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -50,7 +50,8 @@ ]). -export([ - set_resource_status_connecting/1 + set_resource_status_connecting/1, + make_test_id/0 ]). % Server