Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,20 @@ public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
}

@Override
public TransportServer doBind(URI location) throws IOException {
if (SslContext.getCurrentSslContext() != null) {
public TransportServer doBind(URI location, SslContext sslContext) throws IOException {
if (sslContext != null) {
try {
context = SslContext.getCurrentSslContext().getSSLContext();
context = sslContext.getSSLContext();
} catch (Exception e) {
throw new IOException(e);
}
}
return super.doBind(location);
}

@Override
@SuppressWarnings("deprecation")
public TransportServer doBind(URI location) throws IOException {
return doBind(location, SslContext.getCurrentSslContext());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,8 @@ protected TransportServer createSslTransportServer(URI brokerURI, KeyManager[] k
// If given an SSL URI, use an SSL TransportFactory and configure
// it to use the given key and trust managers.
SslTransportFactory transportFactory = new SslTransportFactory();

SslContext ctx = new SslContext(km, tm, random);
SslContext.setCurrentSslContext(ctx);
try {
return transportFactory.doBind(brokerURI);
} finally {
SslContext.setCurrentSslContext(null);
}
SslContext ctx = new DefaultSslContext(km, tm, random);
return transportFactory.doBind(brokerURI, ctx);

} else {
// Else, business as usual.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private boolean warnOnRemoteClose = false;
private boolean displayStackTrace = false;
private boolean autoStart = true;
private SslContext sslContext;

LinkedList<String> peerBrokers = new LinkedList<String>();
private AtomicBoolean started = new AtomicBoolean(false);
Expand Down Expand Up @@ -350,7 +351,8 @@ protected TransportServer createTransportServer() throws IOException, URISyntaxE
throw new IllegalArgumentException(
"You must specify the brokerService property. Maybe this connector should be added to a broker?");
}
return TransportFactorySupport.bind(brokerService, uri);
SslContext ctx = sslContext != null ? sslContext : brokerService.getSslContext();
return TransportFactorySupport.bind(brokerService, uri, ctx);
}

public DiscoveryAgent getDiscoveryAgent() throws IOException {
Expand Down Expand Up @@ -717,6 +719,14 @@ public boolean isAutoStart() {
return autoStart;
}

public SslContext getSslContext() {
return sslContext;
}

public void setSslContext(SslContext sslContext) {
this.sslContext = sslContext;
}

@Override
public int getConnectionCount() {
return connections.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,43 +119,35 @@ public void onServiceAdd(DiscoveryEvent event) {

LOG.info("Establishing network connection from {} to {}", localURI, connectUri);

SslContext sslContext = getSslContext() != null ? getSslContext() : getBrokerService().getSslContext();

Transport remoteTransport;
Transport localTransport;
try {
// Allows the transport to access the broker's ssl configuration.
if (getSslContext() != null) {
SslContext.setCurrentSslContext(getSslContext());
} else {
SslContext.setCurrentSslContext(getBrokerService().getSslContext());
}
remoteTransport = TransportFactory.connect(connectUri, sslContext);
} catch (Exception e) {
LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage());
LOG.debug("Connection failure exception: ", e);
try {
remoteTransport = TransportFactory.connect(connectUri);
} catch (Exception e) {
LOG.warn("Could not connect to remote URI: {}: {}", connectUri, e.getMessage());
LOG.debug("Connection failure exception: ", e);
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e1) {
LOG.debug("Failure while handling create remote transport failure event: {}", e1.getMessage(), e1);
}
return;
discoveryAgent.serviceFailed(event);
} catch (IOException e1) {
LOG.debug("Failure while handling create remote transport failure event: {}", e1.getMessage(), e1);
}
try {
localTransport = createLocalTransport();
} catch (Exception e) {
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage());
LOG.debug("Connection failure exception: ", e);
return;
}
try {
localTransport = createLocalTransport();
} catch (Exception e) {
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not connect to local URI: {}: {}", localURI, e.getMessage());
LOG.debug("Connection failure exception: ", e);

try {
discoveryAgent.serviceFailed(event);
} catch (IOException e1) {
LOG.debug("Failure while handling create local transport failure event: {}", e1.getMessage(), e1);
}
return;
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e1) {
LOG.debug("Failure while handling create local transport failure event: {}", e1.getMessage(), e1);
}
} finally {
SslContext.setCurrentSslContext(null);
return;
}
NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,15 @@
public class TransportFactorySupport {

public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
return bind(brokerService, location, brokerService != null ? brokerService.getSslContext() : null);
}

public static TransportServer bind(BrokerService brokerService, URI location, SslContext sslContext) throws IOException {
TransportFactory tf = TransportFactory.findTransportFactory(location);
if( brokerService!=null && tf instanceof BrokerServiceAware) {
((BrokerServiceAware)tf).setBrokerService(brokerService);
}
try {
if( brokerService!=null ) {
SslContext.setCurrentSslContext(brokerService.getSslContext());
}
return tf.doBind(location);
} finally {
SslContext.setCurrentSslContext(null);
if (brokerService != null && tf instanceof BrokerServiceAware) {
((BrokerServiceAware) tf).setBrokerService(brokerService);
}
return tf.doBind(location, sslContext);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
Expand All @@ -54,19 +55,31 @@ public void setBrokerService(BrokerService brokerService) {

private Set<String> enabledProtocols;

/**
* Overriding to use SslTransportServer and allow for proper reflection.
*/
@Override
@SuppressWarnings("deprecation")
public TransportServer doBind(final URI location) throws IOException {
return doBind(location, SslContext.getCurrentSslContext());
}

@Override
public TransportServer doBind(final URI location, SslContext sslContext) throws IOException {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));

Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto.");
this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols"));

ServerSocketFactory serverSocketFactory = createServerSocketFactory();
AutoSslTransportServer server = createAutoSslTransportServer(location, (SSLServerSocketFactory)serverSocketFactory);
SSLServerSocketFactory serverSocketFactory;
if (sslContext != null) {
try {
serverSocketFactory = sslContext.getSSLContext().getServerSocketFactory();
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
} else {
serverSocketFactory = (SSLServerSocketFactory) SSLServerSocketFactory.getDefault();
}
AutoSslTransportServer server = createAutoSslTransportServer(location, serverSocketFactory);
if (options.get("allowLinkStealing") != null){
allowLinkStealingSet = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,17 @@ protected Transport createTransport(Socket socket, WireFormat format, SSLEngine
private Set<String> enabledProtocols;

@Override
@SuppressWarnings("deprecation")
public TransportServer doBind(final URI location) throws IOException {
return doBind(location, SslContext.getCurrentSslContext());
}

@Override
public TransportServer doBind(final URI location, SslContext sslContext) throws IOException {
try {
if (SslContext.getCurrentSslContext() != null) {
if (sslContext != null) {
try {
context = SslContext.getCurrentSslContext().getSSLContext();
context = sslContext.getSSLContext();
} catch (Exception e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import javax.net.ssl.TrustManagerFactory;

import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.DefaultSslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.JMSExceptionSupport;

/**
Expand Down Expand Up @@ -108,20 +110,31 @@ public void setKeyAndTrustManagers(final KeyManager[] km, final TrustManager[] t
*/
@Override
protected Transport createTransport() throws JMSException {
SslContext existing = SslContext.getCurrentSslContext();
try {
if (keyStore != null || trustStore != null) {
keyManager = createKeyManager();
trustManager = createTrustManager();
}
if (keyManager != null || trustManager != null) {
SslContext.setCurrentSslContext(new SslContext(keyManager, trustManager, secureRandom));
SslContext sslContext = new DefaultSslContext(keyManager, trustManager, secureRandom);
URI connectBrokerUL = brokerURL;
String scheme = brokerURL.getScheme();
if (scheme != null) {
if (scheme.equals("auto")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
} else if (scheme.equals("auto+nio")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
} else if (scheme.equals("auto+nio+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
}
}
return TransportFactory.connect(connectBrokerUL, sslContext);
}
return super.createTransport();
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
} finally {
SslContext.setCurrentSslContext(existing);
}
}

Expand Down
Loading
Loading