WebSphere MQ - JMS przykład wykorzystania

Autor: Marcin Kasiński
27.06.2012 16:03:49 +0200

Technologia JMS, to pewnego rodzaju uniwersalny interfejs (kolejna warstwa) do systemów kolejkowych. Systemy takie przystosowane są do obsługi komunikacji asynchronicznej. Pozwalają one na wysyłanie i odbieranie komunikatów pomiędzy systemami. Ważną zaletą technologii JMS jest to, że przesłania ona dedykowane dla konkretnego systemu kolejkowania API własnym ustandardyzowanym API, co powoduje, że ułatwione jest przejście z jednego systemu na inny bez dużych zmian w kodzie aplikacji. Jako, że jest to kolejna warstwa aplikacyjna istnieje tutaj pewien narzut na wydajności. Jeśli nie ma jakiś szczególnych wymagań co do wydajności, wydaje się, że zaleta uniwersalności przeważy te wydajnościowe niedoskonałości.

Konfiguracja

Do zarządzania obiektami JMS w WebSphere MQ może służyć narzędzie JMSAdmin zjandujące sie w pakiecie instalacyjnym. Narzędzie to znajduje się w katalogu {Katalog WebSphere MQ}\java\bin\. Dla systemu operacyjnego będzie to JMSAdmin.bat , a dla systemów Linuxowych JMSAdmin.sh. Celem konfiguracji narzędzia należy zmodyfikować plik konfiguracyjny JMSAdmin.config znajdujący się w tym samym katalogu.

ParametrOpis
INITIAL_CONTEXT_FACTORYtyp repozytorium przechowywania
PROVIDER_URLmiejsce przechowywania obiektów JMS

Przykładowe konfiguracje znajdują się w pliku konfiguracyjnym. W naszym przykładzie chcąc przechowywać dane JMS w pliku na dysku w katalogu C:\JNDI-Directory parametry te będa miały postać:


INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
PROVIDER_URL=file:/C:/JNDI-Directory

Jeśli jeszcze nie mamymenadżera kolejek możemy go założyć komendą

crtmqm {Nazwa_Menadzera_kolejek} 

Teraz możemy w naszym menadżerze kolejek założyć komendą runmqsc odpowiednie obiekty MQ, tj kolejki, kanał oraz nasłuchiwacz na odpowiednim porcie.

define ql (MQSI.APPQM1.QIN)
define ql (MQSI.APPQM2.QIN)
define ql (MQSI.APPQM3.QIN)

DEFINE CHANNEL(MKSVRCONN) CHLTYPE(SVRCONN)

DEFINE LISTENER(LISTENER.TCP) TRPTYPE(TCP) PORT(1417)

Przy takiej konfigurcji JMS, założonych obiektach MQ i potwierdzeniu że dany katalog C:\JNDI-Directory istnieje możemy wywołac narzędzie JMSAdmin i w nim wykonać poniższe komendy:

def qcf(jms_conn) qmgr(QM_WBRK) transport(CLIENT) host(localhost) channel(MKSVRCONN) port(1417) 
   
define q(jms_queue) queue(MQSI.APPQM1.QIN) 
display ctx

Powyższe operacje utworzą odpowiednie obiekty JMS i zapiszą ich definicje do pliku .bindings w katalogu określonym parametrem PROVIDER_URL , czyli w naszym przypadku C:\JNDI-Directory. Plik ten będziemy mogli wskazać jako konfiguracja w naszej aplikacji JMS.

Wielowątkowość

W przypadku aplikacji wielowątkowych ważne jest, że wszystkie obiekty administracyjne i obiekty związane z połączeniem mogą być współdzielone przez wątki. Obiekt typu Session oraz obiekty typu MessageProducers i MessageConsumer z kolei są przystosowane do pracy w ramach jednego wątku JAVA

Przykładowe aplikacje

Przykładowy kod JMS realizujący wysyłkę komunikatu za pomocą JMS może mieć postać:

package mk;

import java.io.IOException;
import java.util.Hashtable;

import javax.jms.*;
import javax.naming.*;
import javax.naming.directory.*;

import com.ibm.msg.client.jms.*;

public class JMSProducer {

	private static String initialContextUrl = null;
	private static String connectionFactoryFromJndi = null;
	private static String destinationFromJndi = null;

	// System exit status value (assume unset value to be 1)
	private static int status = 1;

	public void send(String data) {

		// Variables
		Connection connection = null;
		Session session = null;
		Destination destination = null;
		MessageProducer producer = null;

		try {

			// Instantiate the initial context
			String contextFactory = "com.sun.jndi.fscontext.RefFSContextFactory";
			Hashtable environment = new Hashtable();
			environment.put(Context.INITIAL_CONTEXT_FACTORY,
					"com.sun.jndi.fscontext.RefFSContextFactory");
			environment.put(Context.PROVIDER_URL,
					"file:///D:/svn/MK/javaclasses_eclipse/MQTester/conv");

			Context context;
			context = new InitialDirContext(environment);
			System.out.println("Initial context found!");

			// Lookup the connection factory
			JmsConnectionFactory cf = (JmsConnectionFactory) context
					.lookup("jms_conn");

			// Lookup the destination
			destination = (JmsDestination) context.lookup("jms_queue");

			// Create JMS objects
			connection = cf.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			producer = session.createProducer(destination);

			long uniqueNumber = System.currentTimeMillis() % 1000;
			TextMessage message = session
					.createTextMessage("JmsJndiProducer: Your lucky number today is "
							+ uniqueNumber);

			// Start the connection
			connection.start();

			// And, send the message
			producer.send(message);
			System.out.println("Sent message:\n" + message);

		} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		finally {
			if (producer != null) {
				try {
					producer.close();
				} catch (JMSException jmsex) {
					System.out.println("Producer could not be closed.");
					recordFailure(jmsex);
				}
			}

			if (session != null) {
				try {
					session.close();
				} catch (JMSException jmsex) {
					System.out.println("Session could not be closed.");
					recordFailure(jmsex);
				}
			}

			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException jmsex) {
					System.out.println("Connection could not be closed.");
					recordFailure(jmsex);
				}
			}
		}

	}

	private static void processJMSException(JMSException jmsex) {
		System.out.println(jmsex);
		Throwable innerException = jmsex.getLinkedException();
		if (innerException != null) {
			System.out.println("Inner exception(s):");
		}
		while (innerException != null) {
			System.out.println(innerException);
			innerException = innerException.getCause();
		}
		return;
	}

	private static void recordFailure(Exception ex) {
		if (ex != null) {
			if (ex instanceof JMSException) {
				processJMSException((JMSException) ex);
			} else {
				System.out.println(ex);
			}
		}
		System.out.println("FAILURE");
		status = -1;
		return;
	}

	public static void main(String[] args) {

		JMSProducer producer = new JMSProducer();

		producer.send("<A>aaaa</A>");
		System.out.println("Done.");

	}

}

Przykładowy kod JMS realizujący odbiór komunikatu za pomocą JMS może mieć postać:

package mk;

import java.io.IOException;
import java.util.Hashtable;

import javax.jms.*;
import javax.naming.*;
import javax.naming.directory.*;

import com.ibm.msg.client.jms.*;

public class JMSConsumer {

	private static String initialContextUrl = null;
	private static String connectionFactoryFromJndi = null;
	private static String destinationFromJndi = null;

	private static int timeout = 5000; // in ms or 5 seconds

	// System exit status value (assume unset value to be 1)
	private static int status = 1;

	public void receive() {

		// Variables
		Connection connection = null;
		Session session = null;
		Destination destination = null;
		MessageConsumer consumer = null;

		try {

			// Instantiate the initial context
			String contextFactory = "com.sun.jndi.fscontext.RefFSContextFactory";
			Hashtable environment = new Hashtable();
			environment.put(Context.INITIAL_CONTEXT_FACTORY,
					"com.sun.jndi.fscontext.RefFSContextFactory");
			environment.put(Context.PROVIDER_URL,
					"file:///D:/svn/MK/javaclasses_eclipse/MQTester/conv");

			Context context;
			context = new InitialDirContext(environment);
			System.out.println("Initial context found!");

			// Lookup the connection factory
			JmsConnectionFactory cf = (JmsConnectionFactory) context
					.lookup("jms_conn");

			// Lookup the destination
			destination = (JmsDestination) context.lookup("jms_queue");

			// Create JMS objects
			connection = cf.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			consumer = session.createConsumer(destination);

			// Start the connection
			connection.start();

			// And, receive the message
			TextMessage message = (TextMessage) consumer.receive(timeout);
			if (message != null) {
				System.out.println("Received message:\n" + message.getText());
			} else {
				System.out.println("No message received!\n");
//				recordFailure(null);
			}

		} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		finally {
			if (consumer != null) {
				try {
					consumer.close();
				} catch (JMSException jmsex) {
					System.out.println("Producer could not be closed.");
					recordFailure(jmsex);
				}
			}

			if (session != null) {
				try {
					session.close();
				} catch (JMSException jmsex) {
					System.out.println("Session could not be closed.");
					recordFailure(jmsex);
				}
			}

			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException jmsex) {
					System.out.println("Connection could not be closed.");
					recordFailure(jmsex);
				}
			}
		}

	}

	private static void processJMSException(JMSException jmsex) {
		System.out.println(jmsex);
		Throwable innerException = jmsex.getLinkedException();
		if (innerException != null) {
			System.out.println("Inner exception(s):");
		}
		while (innerException != null) {
			System.out.println(innerException);
			innerException = innerException.getCause();
		}
		return;
	}

	private static void recordFailure(Exception ex) {
		if (ex != null) {
			if (ex instanceof JMSException) {
				processJMSException((JMSException) ex);
			} else {
				System.out.println(ex);
			}
		}
		System.out.println("FAILURE");
		status = -1;
		return;
	}

	public static void main(String[] args) {

		JMSConsumer producer = new JMSConsumer();

		producer.receive();
		System.out.println("Done.");


	}

}


powrót
Zachęcam do przedstawienia swoich uwag i opinii w polu komentarzy.

Komentarze

Dodaj Komentarz