Skip to content
Snippets Groups Projects
Commit 0673fdf3 authored by Michael Anstis's avatar Michael Anstis
Browse files

Merge pull request #23 from cyrilsochor/jms-connector-jboss-7-1

JBRULES-3529 customizable JSM messenger for jboss 7.1(cherry picked from commit ea91c75b)
parent 2df77310
Branches
Tags
No related merge requests found
......@@ -28,5 +28,12 @@ public interface JmsMessengerProvider {
String destinationName,
ResultHandlerFactory resultHandlerFactory);
Service newJmsMessenger(Pipeline pipeline,
Properties properties,
String connectionFactoryName,
boolean useSecurityPrincipalForConnection,
String destinationName,
ResultHandlerFactory resultHandlerFactory);
Action newJmsUnwrapMessageObject();
}
......@@ -402,6 +402,7 @@ public class PipelineFactory {
* @param destinationName
* @param resultHandlerFactory
* @return
* @see #newJmsMessenger(Pipeline, Properties, String, boolean, String, ResultHandlerFactory)
*/
public static Service newJmsMessenger(Pipeline pipeline,
Properties properties,
......@@ -413,6 +414,70 @@ public class PipelineFactory {
resultHandlerFactory );
}
/**
* Creates a new JmsMessenger which runs as a service in it's own thread. It expects an existing JNDI entry for connectionFactoryName
* Which will be used to create the MessageConsumer which will feed into the specified pipeline.
*
* This method is more customizable then {@link #newJmsMessenger(Pipeline, Properties, String, ResultHandlerFactory)},
* Connection factory JNDI name is additional paramter and connection with principal may be configured.
* E.g. for JBOSS AS 7.1 use: connectionFactoryName="jms/RemoteConnectionFactory", useSecurityPrincipalForConnection=true.
*
* <pre>
* // as this is a service, it's more likely the results will be logged or sent as a return message
* Action resultHandlerStage = PipelineFactory.newExecuteResultHandler();
*
* // Insert the transformed object into the session associated with the PipelineContext
* KnowledgeRuntimeCommand insertStage = PipelineFactory.newStatefulKnowledgeSessionInsert();
* insertStage.setReceiver( resultHandlerStage );
*
* // Create the transformer instance and create the Transformer stage, where we are going from Xml to Pojo. Jaxb needs an array of the available classes
* JAXBContext jaxbCtx = KnowledgeBuilderHelper.newJAXBContext( classNames,
* kbase );
* Unmarshaller unmarshaller = jaxbCtx.createUnmarshaller();
* Transformer transformer = PipelineFactory.newJaxbFromXmlTransformer( unmarshaller );
* transformer.setReceiver( insertStage );
*
* // payloads for JMS arrive in a Message wrapper, we need to unwrap this object
* Action unwrapObjectStage = PipelineFactory.newJmsUnwrapMessageObject();
* unwrapObjectStage.setReceiver( transformer );
*
* // Create the start adapter Pipeline for StatefulKnowledgeSessions
* Pipeline pipeline = PipelineFactory.newStatefulKnowledgeSessionPipeline( ksession );
* pipeline.setReceiver( unwrapObjectStage );
*
* // Services, like JmsMessenger take a ResultHandlerFactory implementation, this is because a result handler must be created for each incoming message.
* ResultHandleFactoryImpl factory = new ResultHandleFactoryImpl();
* Service messenger = PipelineFactory.newJmsMessenger( pipeline,
* props,
* "jms/RemoteConnectionFactory",
* true,
* destinationName,
* factory );
* messenger.start();
* </pre>
* @param pipeline
* @param properties
* @param connectionFactoryName
* @param useSecurityPrincipalForConnection
* @param destinationName
* @param resultHandlerFactory
* @return
* @see #newJmsMessenger(Pipeline, Properties, String, ResultHandlerFactory)
*/
public static Service newJmsMessenger(Pipeline pipeline,
Properties properties,
String connectionFactoryName,
boolean useSecurityPrincipalForConnection,
String destinationName,
ResultHandlerFactory resultHandlerFactory) {
return getJmsMessengerProvider().newJmsMessenger( pipeline,
properties,
connectionFactoryName,
useSecurityPrincipalForConnection,
destinationName,
resultHandlerFactory );
}
/**
* Unwrap the payload from the JMS Message and propagate it as the payload object.
* @return
......
......@@ -47,17 +47,33 @@ public class JmsMessenger extends BaseService
private JmsMessengerRunner jmsFeederRunner;
private String connectionPrincipal;
private String connectionCredentials;
public JmsMessenger(Pipeline pipeline,
Properties properties,
String destinationName,
ResultHandlerFactory resultHandlerFactory) {
this(pipeline, properties, "ConnectionFactory", false, destinationName, resultHandlerFactory);
}
public JmsMessenger(Pipeline pipeline,
Properties properties,
String connectionFactoryName,
boolean useSecurityPrincipalForConnection,
String destinationName,
ResultHandlerFactory resultHandlerFactory) {
super();
this.pipeline = pipeline;
this.resultHandlerFactory = resultHandlerFactory;
if( useSecurityPrincipalForConnection ){
this.connectionPrincipal = properties.getProperty("java.naming.security.principal");
this.connectionCredentials = properties.getProperty("java.naming.security.credentials");
}
try {
InitialContext jndiContext = new InitialContext( properties );
this.connectionFactory = (ConnectionFactory) jndiContext.lookup( "ConnectionFactory" );
this.connectionFactory = (ConnectionFactory) jndiContext.lookup( connectionFactoryName );
this.destination = (Destination) jndiContext.lookup( destinationName );
} catch ( Exception e ) {
throw new RuntimeException( "Unable to instantiate JmsFeeder",
......@@ -67,7 +83,11 @@ public class JmsMessenger extends BaseService
public void start() {
try {
if( connectionPrincipal != null ){
this.connection = this.connectionFactory.createConnection(connectionPrincipal, connectionCredentials);
} else {
this.connection = this.connectionFactory.createConnection();
}
this.session = this.connection.createSession( false,
Session.AUTO_ACKNOWLEDGE );
this.consumer = this.session.createConsumer( destination );
......
......@@ -37,6 +37,20 @@ public class JmsMessengerProviderImpl
resultHandlerFactory );
}
public Service newJmsMessenger(Pipeline pipeline,
Properties properties,
String connectionFactoryName,
boolean useSecurityPrincipalForConnection,
String destinationName,
ResultHandlerFactory resultHandlerFactory) {
return new JmsMessenger( pipeline,
properties,
connectionFactoryName,
useSecurityPrincipalForConnection,
destinationName,
resultHandlerFactory );
}
public Action newJmsUnwrapMessageObject() {
return new JmsUnwrapMessageObject();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment